1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include <stdlib.h> 7 8 #include "nxt_main.h" 9 #include "nxt_port_memory_int.h" 10 11 #include "nxt_unit.h" 12 #include "nxt_unit_request.h" 13 #include "nxt_unit_response.h" 14 #include "nxt_unit_websocket.h" 15 16 #include "nxt_websocket.h" 17 18 #if (NXT_HAVE_MEMFD_CREATE) 19 #include <linux/memfd.h> 20 #endif 21 22 #define NXT_UNIT_MAX_PLAIN_SIZE 1024 23 #define NXT_UNIT_LOCAL_BUF_SIZE \ 24 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) 25 26 typedef struct nxt_unit_impl_s nxt_unit_impl_t; 27 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 28 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 29 typedef struct nxt_unit_process_s nxt_unit_process_t; 30 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 31 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 32 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t; 33 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 34 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 35 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; 36 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; 37 38 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 39 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, 40 nxt_unit_ctx_impl_t *ctx_impl, void *data); 41 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl); 42 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl); 43 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); 44 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); 45 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 46 nxt_unit_mmap_buf_t *mmap_buf); 47 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 48 nxt_unit_mmap_buf_t *mmap_buf); 49 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); 50 static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 51 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, 52 int *log_fd, uint32_t *stream, uint32_t *shm_limit); 53 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); 54 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 55 nxt_unit_recv_msg_t *recv_msg); 56 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 57 nxt_unit_recv_msg_t *recv_msg); 58 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 59 nxt_unit_recv_msg_t *recv_msg); 60 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); 61 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 62 nxt_unit_ctx_t *ctx); 63 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 64 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); 65 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( 66 nxt_unit_ctx_t *ctx); 67 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); 68 static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws); 69 static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, 70 nxt_unit_recv_msg_t *recv_msg); 71 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 72 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 73 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 74 nxt_unit_mmap_buf_t *mmap_buf, int last); 75 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); 76 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); 77 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx); 78 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl( 79 nxt_unit_ctx_impl_t *ctx_impl); 80 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 81 nxt_unit_read_buf_t *rbuf); 82 static nxt_unit_mmap_buf_t *nxt_unit_request_preread( 83 nxt_unit_request_info_t *req, size_t size); 84 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, 85 size_t size); 86 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 87 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n); 88 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 89 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); 90 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 91 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 92 nxt_unit_port_t *port, int n); 93 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 94 int fd); 95 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, 96 nxt_unit_port_t *port, uint32_t size, 97 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); 98 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 99 100 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 101 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); 102 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); 103 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 104 static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, 105 nxt_unit_process_t *process, uint32_t id); 106 static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, 107 nxt_unit_recv_msg_t *recv_msg); 108 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, 109 nxt_unit_recv_msg_t *recv_msg); 110 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 111 nxt_unit_process_t *process, 112 nxt_port_mmap_header_t *hdr, void *start, uint32_t size); 113 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); 114 115 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib, 116 pid_t pid); 117 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, 118 pid_t pid, int remove); 119 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); 120 static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, 121 nxt_unit_read_buf_t *rbuf); 122 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); 123 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); 124 125 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, 126 nxt_unit_port_t *port); 127 128 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); 129 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); 130 nxt_inline nxt_unit_process_t *nxt_unit_port_process(nxt_unit_port_t *port); 131 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, 132 nxt_unit_port_t *port); 133 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, 134 nxt_unit_port_id_t *port_id); 135 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, 136 nxt_unit_port_id_t *port_id); 137 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); 138 static void nxt_unit_remove_process(nxt_unit_impl_t *lib, 139 nxt_unit_process_t *process); 140 static void nxt_unit_quit(nxt_unit_ctx_t *ctx); 141 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, 142 nxt_unit_port_t *port, const void *buf, size_t buf_size, 143 const void *oob, size_t oob_size); 144 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 145 const void *buf, size_t buf_size, const void *oob, size_t oob_size); 146 static ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *ctx, 147 nxt_unit_port_t *port, void *buf, size_t buf_size, 148 void *oob, size_t oob_size); 149 150 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 151 nxt_unit_port_t *port); 152 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 153 nxt_unit_port_id_t *port_id, int remove); 154 155 static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, 156 nxt_unit_request_info_impl_t *req_impl); 157 static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find( 158 nxt_lvlhsh_t *request_hash, uint32_t stream, int remove); 159 160 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); 161 162 163 struct nxt_unit_mmap_buf_s { 164 nxt_unit_buf_t buf; 165 166 nxt_unit_mmap_buf_t *next; 167 nxt_unit_mmap_buf_t **prev; 168 169 nxt_port_mmap_header_t *hdr; 170 nxt_unit_request_info_t *req; 171 nxt_unit_ctx_impl_t *ctx_impl; 172 nxt_unit_process_t *process; 173 char *free_ptr; 174 char *plain_ptr; 175 }; 176 177 178 struct nxt_unit_recv_msg_s { 179 uint32_t stream; 180 nxt_pid_t pid; 181 nxt_port_id_t reply_port; 182 183 uint8_t last; /* 1 bit */ 184 uint8_t mmap; /* 1 bit */ 185 186 void *start; 187 uint32_t size; 188 189 int fd; 190 nxt_unit_process_t *process; 191 192 nxt_unit_mmap_buf_t *incoming_buf; 193 }; 194 195 196 typedef enum { 197 NXT_UNIT_RS_START = 0, 198 NXT_UNIT_RS_RESPONSE_INIT, 199 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 200 NXT_UNIT_RS_RESPONSE_SENT, 201 NXT_UNIT_RS_RELEASED, 202 } nxt_unit_req_state_t; 203 204 205 struct nxt_unit_request_info_impl_s { 206 nxt_unit_request_info_t req; 207 208 uint32_t stream; 209 210 nxt_unit_process_t *process; 211 212 nxt_unit_mmap_buf_t *outgoing_buf; 213 nxt_unit_mmap_buf_t *incoming_buf; 214 215 nxt_unit_req_state_t state; 216 uint8_t websocket; 217 218 nxt_queue_link_t link; 219 220 char extra_data[]; 221 }; 222 223 224 struct nxt_unit_websocket_frame_impl_s { 225 nxt_unit_websocket_frame_t ws; 226 227 nxt_unit_mmap_buf_t *buf; 228 229 nxt_queue_link_t link; 230 231 nxt_unit_ctx_impl_t *ctx_impl; 232 }; 233 234 235 struct nxt_unit_read_buf_s { 236 nxt_unit_read_buf_t *next; 237 ssize_t size; 238 char buf[16384]; 239 char oob[256]; 240 }; 241 242 243 struct nxt_unit_ctx_impl_s { 244 nxt_unit_ctx_t ctx; 245 246 nxt_atomic_t use_count; 247 248 pthread_mutex_t mutex; 249 250 nxt_unit_port_t *read_port; 251 252 nxt_queue_link_t link; 253 254 nxt_unit_mmap_buf_t *free_buf; 255 256 /* of nxt_unit_request_info_impl_t */ 257 nxt_queue_t free_req; 258 259 /* of nxt_unit_websocket_frame_impl_t */ 260 nxt_queue_t free_ws; 261 262 /* of nxt_unit_request_info_impl_t */ 263 nxt_queue_t active_req; 264 265 /* of nxt_unit_request_info_impl_t */ 266 nxt_lvlhsh_t requests; 267 268 nxt_unit_read_buf_t *pending_read_head; 269 nxt_unit_read_buf_t **pending_read_tail; 270 nxt_unit_read_buf_t *free_read_buf; 271 272 nxt_unit_mmap_buf_t ctx_buf[2]; 273 nxt_unit_read_buf_t ctx_read_buf; 274 275 nxt_unit_request_info_impl_t req; 276 }; 277 278 279 struct nxt_unit_impl_s { 280 nxt_unit_t unit; 281 nxt_unit_callbacks_t callbacks; 282 283 nxt_atomic_t use_count; 284 285 uint32_t request_data_size; 286 uint32_t shm_mmap_limit; 287 288 pthread_mutex_t mutex; 289 290 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ 291 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ 292 293 nxt_unit_port_t *router_port; 294 295 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ 296 297 pid_t pid; 298 int log_fd; 299 int online; 300 301 nxt_unit_ctx_impl_t main_ctx; 302 }; 303 304 305 struct nxt_unit_port_impl_s { 306 nxt_unit_port_t port; 307 308 nxt_atomic_t use_count; 309 310 nxt_queue_link_t link; 311 nxt_unit_process_t *process; 312 }; 313 314 315 struct nxt_unit_mmap_s { 316 nxt_port_mmap_header_t *hdr; 317 }; 318 319 320 struct nxt_unit_mmaps_s { 321 pthread_mutex_t mutex; 322 uint32_t size; 323 uint32_t cap; 324 nxt_atomic_t allocated_chunks; 325 nxt_unit_mmap_t *elts; 326 }; 327 328 329 struct nxt_unit_process_s { 330 pid_t pid; 331 332 nxt_queue_t ports; 333 334 nxt_unit_mmaps_t incoming; 335 nxt_unit_mmaps_t outgoing; 336 337 nxt_unit_impl_t *lib; 338 339 nxt_atomic_t use_count; 340 341 uint32_t next_port_id; 342 }; 343 344 345 /* Explicitly using 32 bit types to avoid possible alignment. */ 346 typedef struct { 347 int32_t pid; 348 uint32_t id; 349 } nxt_unit_port_hash_id_t; 350 351 352 nxt_unit_ctx_t * 353 nxt_unit_init(nxt_unit_init_t *init) 354 { 355 int rc; 356 uint32_t ready_stream, shm_limit; 357 nxt_unit_ctx_t *ctx; 358 nxt_unit_impl_t *lib; 359 nxt_unit_port_t ready_port, router_port, read_port; 360 361 lib = nxt_unit_create(init); 362 if (nxt_slow_path(lib == NULL)) { 363 return NULL; 364 } 365 366 if (init->ready_port.id.pid != 0 367 && init->ready_stream != 0 368 && init->read_port.id.pid != 0) 369 { 370 ready_port = init->ready_port; 371 ready_stream = init->ready_stream; 372 router_port = init->router_port; 373 read_port = init->read_port; 374 lib->log_fd = init->log_fd; 375 376 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, 377 ready_port.id.id); 378 nxt_unit_port_id_init(&router_port.id, router_port.id.pid, 379 router_port.id.id); 380 nxt_unit_port_id_init(&read_port.id, read_port.id.pid, 381 read_port.id.id); 382 383 } else { 384 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, 385 &lib->log_fd, &ready_stream, &shm_limit); 386 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 387 goto fail; 388 } 389 390 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) 391 / PORT_MMAP_DATA_SIZE; 392 } 393 394 if (nxt_slow_path(lib->shm_mmap_limit < 1)) { 395 lib->shm_mmap_limit = 1; 396 } 397 398 lib->pid = read_port.id.pid; 399 400 ctx = &lib->main_ctx.ctx; 401 402 lib->router_port = nxt_unit_add_port(ctx, &router_port); 403 if (nxt_slow_path(lib->router_port == NULL)) { 404 nxt_unit_alert(NULL, "failed to add router_port"); 405 406 goto fail; 407 } 408 409 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port); 410 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { 411 nxt_unit_alert(NULL, "failed to add read_port"); 412 413 goto fail; 414 } 415 416 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream); 417 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 418 nxt_unit_alert(NULL, "failed to send READY message"); 419 420 goto fail; 421 } 422 423 close(ready_port.out_fd); 424 425 return ctx; 426 427 fail: 428 429 nxt_unit_ctx_release(&lib->main_ctx); 430 431 return NULL; 432 } 433 434 435 static nxt_unit_impl_t * 436 nxt_unit_create(nxt_unit_init_t *init) 437 { 438 int rc; 439 nxt_unit_impl_t *lib; 440 nxt_unit_callbacks_t *cb; 441 442 lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size); 443 if (nxt_slow_path(lib == NULL)) { 444 nxt_unit_alert(NULL, "failed to allocate unit struct"); 445 446 return NULL; 447 } 448 449 rc = pthread_mutex_init(&lib->mutex, NULL); 450 if (nxt_slow_path(rc != 0)) { 451 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 452 453 goto fail; 454 } 455 456 lib->unit.data = init->data; 457 lib->callbacks = init->callbacks; 458 459 lib->request_data_size = init->request_data_size; 460 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) 461 / PORT_MMAP_DATA_SIZE; 462 463 lib->processes.slot = NULL; 464 lib->ports.slot = NULL; 465 466 lib->log_fd = STDERR_FILENO; 467 lib->online = 1; 468 469 nxt_queue_init(&lib->contexts); 470 471 lib->use_count = 0; 472 lib->router_port = NULL; 473 474 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); 475 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 476 goto fail; 477 } 478 479 cb = &lib->callbacks; 480 481 if (cb->request_handler == NULL) { 482 nxt_unit_alert(NULL, "request_handler is NULL"); 483 484 goto fail; 485 } 486 487 return lib; 488 489 fail: 490 491 free(lib); 492 493 return NULL; 494 } 495 496 497 static int 498 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 499 void *data) 500 { 501 int rc; 502 503 ctx_impl->ctx.data = data; 504 ctx_impl->ctx.unit = &lib->unit; 505 506 rc = pthread_mutex_init(&ctx_impl->mutex, NULL); 507 if (nxt_slow_path(rc != 0)) { 508 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 509 510 return NXT_UNIT_ERROR; 511 } 512 513 nxt_unit_lib_use(lib); 514 515 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 516 517 ctx_impl->use_count = 1; 518 519 nxt_queue_init(&ctx_impl->free_req); 520 nxt_queue_init(&ctx_impl->free_ws); 521 nxt_queue_init(&ctx_impl->active_req); 522 523 ctx_impl->free_buf = NULL; 524 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 525 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 526 527 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 528 529 ctx_impl->pending_read_head = NULL; 530 ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; 531 ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf; 532 ctx_impl->ctx_read_buf.next = NULL; 533 534 ctx_impl->req.req.ctx = &ctx_impl->ctx; 535 ctx_impl->req.req.unit = &lib->unit; 536 537 ctx_impl->read_port = NULL; 538 ctx_impl->requests.slot = 0; 539 540 return NXT_UNIT_OK; 541 } 542 543 544 nxt_inline void 545 nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl) 546 { 547 nxt_atomic_fetch_add(&ctx_impl->use_count, 1); 548 } 549 550 551 nxt_inline void 552 nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl) 553 { 554 long c; 555 556 c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); 557 558 if (c == 1) { 559 nxt_unit_ctx_free(ctx_impl); 560 } 561 } 562 563 564 nxt_inline void 565 nxt_unit_lib_use(nxt_unit_impl_t *lib) 566 { 567 nxt_atomic_fetch_add(&lib->use_count, 1); 568 } 569 570 571 nxt_inline void 572 nxt_unit_lib_release(nxt_unit_impl_t *lib) 573 { 574 long c; 575 nxt_unit_process_t *process; 576 577 c = nxt_atomic_fetch_add(&lib->use_count, -1); 578 579 if (c == 1) { 580 for ( ;; ) { 581 pthread_mutex_lock(&lib->mutex); 582 583 process = nxt_unit_process_pop_first(lib); 584 if (process == NULL) { 585 pthread_mutex_unlock(&lib->mutex); 586 587 break; 588 } 589 590 nxt_unit_remove_process(lib, process); 591 } 592 593 pthread_mutex_destroy(&lib->mutex); 594 595 if (nxt_fast_path(lib->router_port != NULL)) { 596 nxt_unit_port_release(lib->router_port); 597 } 598 599 free(lib); 600 } 601 } 602 603 604 nxt_inline void 605 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 606 nxt_unit_mmap_buf_t *mmap_buf) 607 { 608 mmap_buf->next = *head; 609 610 if (mmap_buf->next != NULL) { 611 mmap_buf->next->prev = &mmap_buf->next; 612 } 613 614 *head = mmap_buf; 615 mmap_buf->prev = head; 616 } 617 618 619 nxt_inline void 620 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 621 nxt_unit_mmap_buf_t *mmap_buf) 622 { 623 while (*prev != NULL) { 624 prev = &(*prev)->next; 625 } 626 627 nxt_unit_mmap_buf_insert(prev, mmap_buf); 628 } 629 630 631 nxt_inline void 632 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) 633 { 634 nxt_unit_mmap_buf_t **prev; 635 636 prev = mmap_buf->prev; 637 638 if (mmap_buf->next != NULL) { 639 mmap_buf->next->prev = prev; 640 } 641 642 if (prev != NULL) { 643 *prev = mmap_buf->next; 644 } 645 } 646 647 648 static int 649 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, 650 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, 651 uint32_t *shm_limit) 652 { 653 int rc; 654 int ready_fd, router_fd, read_fd; 655 char *unit_init, *version_end; 656 long version_length; 657 int64_t ready_pid, router_pid, read_pid; 658 uint32_t ready_stream, router_id, ready_id, read_id; 659 660 unit_init = getenv(NXT_UNIT_INIT_ENV); 661 if (nxt_slow_path(unit_init == NULL)) { 662 nxt_unit_alert(NULL, "%s is not in the current environment", 663 NXT_UNIT_INIT_ENV); 664 665 return NXT_UNIT_ERROR; 666 } 667 668 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); 669 670 version_length = nxt_length(NXT_VERSION); 671 672 version_end = strchr(unit_init, ';'); 673 if (version_end == NULL 674 || version_end - unit_init != version_length 675 || memcmp(unit_init, NXT_VERSION, version_length) != 0) 676 { 677 nxt_unit_alert(NULL, "version check error"); 678 679 return NXT_UNIT_ERROR; 680 } 681 682 rc = sscanf(version_end + 1, 683 "%"PRIu32";" 684 "%"PRId64",%"PRIu32",%d;" 685 "%"PRId64",%"PRIu32",%d;" 686 "%"PRId64",%"PRIu32",%d;" 687 "%d,%"PRIu32, 688 &ready_stream, 689 &ready_pid, &ready_id, &ready_fd, 690 &router_pid, &router_id, &router_fd, 691 &read_pid, &read_id, &read_fd, 692 log_fd, shm_limit); 693 694 if (nxt_slow_path(rc != 12)) { 695 nxt_unit_alert(NULL, "failed to scan variables: %d", rc); 696 697 return NXT_UNIT_ERROR; 698 } 699 700 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); 701 702 ready_port->in_fd = -1; 703 ready_port->out_fd = ready_fd; 704 ready_port->data = NULL; 705 706 nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id); 707 708 router_port->in_fd = -1; 709 router_port->out_fd = router_fd; 710 router_port->data = NULL; 711 712 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); 713 714 read_port->in_fd = read_fd; 715 read_port->out_fd = -1; 716 read_port->data = NULL; 717 718 *stream = ready_stream; 719 720 return NXT_UNIT_OK; 721 } 722 723 724 static int 725 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) 726 { 727 ssize_t res; 728 nxt_port_msg_t msg; 729 nxt_unit_impl_t *lib; 730 731 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 732 733 msg.stream = stream; 734 msg.pid = lib->pid; 735 msg.reply_port = 0; 736 msg.type = _NXT_PORT_MSG_PROCESS_READY; 737 msg.last = 1; 738 msg.mmap = 0; 739 msg.nf = 0; 740 msg.mf = 0; 741 msg.tracking = 0; 742 743 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0); 744 if (res != sizeof(msg)) { 745 return NXT_UNIT_ERROR; 746 } 747 748 return NXT_UNIT_OK; 749 } 750 751 752 int 753 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, 754 void *buf, size_t buf_size, void *oob, size_t oob_size) 755 { 756 int rc; 757 pid_t pid; 758 struct cmsghdr *cm; 759 nxt_port_msg_t *port_msg; 760 nxt_unit_impl_t *lib; 761 nxt_unit_recv_msg_t recv_msg; 762 763 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 764 765 rc = NXT_UNIT_ERROR; 766 recv_msg.fd = -1; 767 recv_msg.process = NULL; 768 port_msg = buf; 769 cm = oob; 770 771 if (oob_size >= CMSG_SPACE(sizeof(int)) 772 && cm->cmsg_len == CMSG_LEN(sizeof(int)) 773 && cm->cmsg_level == SOL_SOCKET 774 && cm->cmsg_type == SCM_RIGHTS) 775 { 776 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); 777 } 778 779 recv_msg.incoming_buf = NULL; 780 781 if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { 782 nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); 783 goto fail; 784 } 785 786 recv_msg.stream = port_msg->stream; 787 recv_msg.pid = port_msg->pid; 788 recv_msg.reply_port = port_msg->reply_port; 789 recv_msg.last = port_msg->last; 790 recv_msg.mmap = port_msg->mmap; 791 792 recv_msg.start = port_msg + 1; 793 recv_msg.size = buf_size - sizeof(nxt_port_msg_t); 794 795 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 796 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", 797 port_msg->stream, (int) port_msg->type); 798 goto fail; 799 } 800 801 if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) { 802 rc = NXT_UNIT_OK; 803 804 goto fail; 805 } 806 807 /* Fragmentation is unsupported. */ 808 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 809 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", 810 port_msg->stream, (int) port_msg->type); 811 goto fail; 812 } 813 814 if (port_msg->mmap) { 815 if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) { 816 goto fail; 817 } 818 } 819 820 switch (port_msg->type) { 821 822 case _NXT_PORT_MSG_QUIT: 823 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); 824 825 nxt_unit_quit(ctx); 826 rc = NXT_UNIT_OK; 827 break; 828 829 case _NXT_PORT_MSG_NEW_PORT: 830 rc = nxt_unit_process_new_port(ctx, &recv_msg); 831 break; 832 833 case _NXT_PORT_MSG_CHANGE_FILE: 834 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 835 port_msg->stream, recv_msg.fd); 836 837 if (dup2(recv_msg.fd, lib->log_fd) == -1) { 838 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", 839 port_msg->stream, recv_msg.fd, lib->log_fd, 840 strerror(errno), errno); 841 842 goto fail; 843 } 844 845 rc = NXT_UNIT_OK; 846 break; 847 848 case _NXT_PORT_MSG_MMAP: 849 if (nxt_slow_path(recv_msg.fd < 0)) { 850 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", 851 port_msg->stream, recv_msg.fd); 852 853 goto fail; 854 } 855 856 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd); 857 break; 858 859 case _NXT_PORT_MSG_REQ_HEADERS: 860 rc = nxt_unit_process_req_headers(ctx, &recv_msg); 861 break; 862 863 case _NXT_PORT_MSG_WEBSOCKET: 864 rc = nxt_unit_process_websocket(ctx, &recv_msg); 865 break; 866 867 case _NXT_PORT_MSG_REMOVE_PID: 868 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 869 nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size " 870 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 871 (int) sizeof(pid)); 872 873 goto fail; 874 } 875 876 memcpy(&pid, recv_msg.start, sizeof(pid)); 877 878 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 879 port_msg->stream, (int) pid); 880 881 nxt_unit_remove_pid(lib, pid); 882 883 rc = NXT_UNIT_OK; 884 break; 885 886 case _NXT_PORT_MSG_SHM_ACK: 887 rc = nxt_unit_process_shm_ack(ctx); 888 break; 889 890 default: 891 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", 892 port_msg->stream, (int) port_msg->type); 893 894 goto fail; 895 } 896 897 fail: 898 899 if (recv_msg.fd != -1) { 900 close(recv_msg.fd); 901 } 902 903 while (recv_msg.incoming_buf != NULL) { 904 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 905 } 906 907 if (recv_msg.process != NULL) { 908 nxt_unit_process_release(recv_msg.process); 909 } 910 911 return rc; 912 } 913 914 915 static int 916 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 917 { 918 int nb; 919 nxt_unit_port_t new_port, *port; 920 nxt_port_msg_new_port_t *new_port_msg; 921 922 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 923 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 924 "invalid message size (%d)", 925 recv_msg->stream, (int) recv_msg->size); 926 927 return NXT_UNIT_ERROR; 928 } 929 930 if (nxt_slow_path(recv_msg->fd < 0)) { 931 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", 932 recv_msg->stream, recv_msg->fd); 933 934 return NXT_UNIT_ERROR; 935 } 936 937 new_port_msg = recv_msg->start; 938 939 nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", 940 recv_msg->stream, (int) new_port_msg->pid, 941 (int) new_port_msg->id, recv_msg->fd); 942 943 nb = 0; 944 945 if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { 946 nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " 947 "failed: %s (%d)", 948 recv_msg->stream, recv_msg->fd, strerror(errno), errno); 949 950 return NXT_UNIT_ERROR; 951 } 952 953 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 954 new_port_msg->id); 955 956 new_port.in_fd = -1; 957 new_port.out_fd = recv_msg->fd; 958 new_port.data = NULL; 959 960 recv_msg->fd = -1; 961 962 port = nxt_unit_add_port(ctx, &new_port); 963 if (nxt_slow_path(port == NULL)) { 964 return NXT_UNIT_ERROR; 965 } 966 967 nxt_unit_port_release(port); 968 969 return NXT_UNIT_OK; 970 } 971 972 973 static int 974 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 975 { 976 nxt_unit_impl_t *lib; 977 nxt_unit_port_t *port; 978 nxt_unit_port_id_t port_id; 979 nxt_unit_request_t *r; 980 nxt_unit_mmap_buf_t *b; 981 nxt_unit_request_info_t *req; 982 nxt_unit_request_info_impl_t *req_impl; 983 984 if (nxt_slow_path(recv_msg->mmap == 0)) { 985 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 986 recv_msg->stream); 987 988 return NXT_UNIT_ERROR; 989 } 990 991 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { 992 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 993 "%d expected", recv_msg->stream, (int) recv_msg->size, 994 (int) sizeof(nxt_unit_request_t)); 995 996 return NXT_UNIT_ERROR; 997 } 998 999 req_impl = nxt_unit_request_info_get(ctx); 1000 if (nxt_slow_path(req_impl == NULL)) { 1001 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 1002 recv_msg->stream); 1003 1004 return NXT_UNIT_ERROR; 1005 } 1006 1007 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); 1008 1009 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1010 1011 pthread_mutex_lock(&lib->mutex); 1012 1013 port = nxt_unit_port_hash_find(&lib->ports, &port_id, 0); 1014 1015 pthread_mutex_unlock(&lib->mutex); 1016 1017 if (nxt_slow_path(port == NULL)) { 1018 nxt_unit_alert(ctx, "#%"PRIu32": response port %d,%d not found", 1019 recv_msg->stream, 1020 (int) recv_msg->pid, (int) recv_msg->reply_port); 1021 1022 return NXT_UNIT_ERROR; 1023 } 1024 1025 req = &req_impl->req; 1026 1027 req->response_port = port; 1028 1029 req->request = recv_msg->start; 1030 1031 b = recv_msg->incoming_buf; 1032 1033 req->request_buf = &b->buf; 1034 req->response = NULL; 1035 req->response_buf = NULL; 1036 1037 r = req->request; 1038 1039 req->content_length = r->content_length; 1040 1041 req->content_buf = req->request_buf; 1042 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); 1043 1044 /* "Move" process reference to req_impl. */ 1045 req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg); 1046 if (nxt_slow_path(req_impl->process == NULL)) { 1047 return NXT_UNIT_ERROR; 1048 } 1049 1050 recv_msg->process = NULL; 1051 1052 req_impl->stream = recv_msg->stream; 1053 1054 req_impl->outgoing_buf = NULL; 1055 1056 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1057 b->req = req; 1058 } 1059 1060 /* "Move" incoming buffer list to req_impl. */ 1061 req_impl->incoming_buf = recv_msg->incoming_buf; 1062 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 1063 recv_msg->incoming_buf = NULL; 1064 1065 req->content_fd = recv_msg->fd; 1066 recv_msg->fd = -1; 1067 1068 req->response_max_fields = 0; 1069 req_impl->state = NXT_UNIT_RS_START; 1070 req_impl->websocket = 0; 1071 1072 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 1073 (int) r->method_length, 1074 (char *) nxt_unit_sptr_get(&r->method), 1075 (int) r->target_length, 1076 (char *) nxt_unit_sptr_get(&r->target), 1077 (int) r->content_length); 1078 1079 lib->callbacks.request_handler(req); 1080 1081 return NXT_UNIT_OK; 1082 } 1083 1084 1085 static int 1086 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1087 { 1088 size_t hsize; 1089 nxt_unit_impl_t *lib; 1090 nxt_unit_mmap_buf_t *b; 1091 nxt_unit_ctx_impl_t *ctx_impl; 1092 nxt_unit_callbacks_t *cb; 1093 nxt_unit_request_info_t *req; 1094 nxt_unit_request_info_impl_t *req_impl; 1095 nxt_unit_websocket_frame_impl_t *ws_impl; 1096 1097 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1098 1099 req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream, 1100 recv_msg->last); 1101 if (req_impl == NULL) { 1102 return NXT_UNIT_OK; 1103 } 1104 1105 req = &req_impl->req; 1106 1107 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1108 cb = &lib->callbacks; 1109 1110 if (cb->websocket_handler && recv_msg->size >= 2) { 1111 ws_impl = nxt_unit_websocket_frame_get(ctx); 1112 if (nxt_slow_path(ws_impl == NULL)) { 1113 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", 1114 req_impl->stream); 1115 1116 return NXT_UNIT_ERROR; 1117 } 1118 1119 ws_impl->ws.req = req; 1120 1121 ws_impl->buf = NULL; 1122 1123 if (recv_msg->mmap) { 1124 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1125 b->req = req; 1126 } 1127 1128 /* "Move" incoming buffer list to ws_impl. */ 1129 ws_impl->buf = recv_msg->incoming_buf; 1130 ws_impl->buf->prev = &ws_impl->buf; 1131 recv_msg->incoming_buf = NULL; 1132 1133 b = ws_impl->buf; 1134 1135 } else { 1136 b = nxt_unit_mmap_buf_get(ctx); 1137 if (nxt_slow_path(b == NULL)) { 1138 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", 1139 req_impl->stream); 1140 1141 nxt_unit_websocket_frame_release(&ws_impl->ws); 1142 1143 return NXT_UNIT_ERROR; 1144 } 1145 1146 b->req = req; 1147 b->buf.start = recv_msg->start; 1148 b->buf.free = b->buf.start; 1149 b->buf.end = b->buf.start + recv_msg->size; 1150 1151 nxt_unit_mmap_buf_insert(&ws_impl->buf, b); 1152 } 1153 1154 ws_impl->ws.header = (void *) b->buf.start; 1155 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( 1156 ws_impl->ws.header); 1157 1158 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); 1159 1160 if (ws_impl->ws.header->mask) { 1161 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; 1162 1163 } else { 1164 ws_impl->ws.mask = NULL; 1165 } 1166 1167 b->buf.free += hsize; 1168 1169 ws_impl->ws.content_buf = &b->buf; 1170 ws_impl->ws.content_length = ws_impl->ws.payload_len; 1171 1172 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " 1173 "payload_len=%"PRIu64, 1174 ws_impl->ws.header->opcode, 1175 ws_impl->ws.payload_len); 1176 1177 cb->websocket_handler(&ws_impl->ws); 1178 } 1179 1180 if (recv_msg->last) { 1181 req_impl->websocket = 0; 1182 1183 if (cb->close_handler) { 1184 nxt_unit_req_debug(req, "close_handler"); 1185 1186 cb->close_handler(req); 1187 1188 } else { 1189 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1190 } 1191 } 1192 1193 return NXT_UNIT_OK; 1194 } 1195 1196 1197 static int 1198 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) 1199 { 1200 nxt_unit_impl_t *lib; 1201 nxt_unit_callbacks_t *cb; 1202 1203 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1204 cb = &lib->callbacks; 1205 1206 if (cb->shm_ack_handler != NULL) { 1207 cb->shm_ack_handler(ctx); 1208 } 1209 1210 return NXT_UNIT_OK; 1211 } 1212 1213 1214 static nxt_unit_request_info_impl_t * 1215 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 1216 { 1217 nxt_unit_impl_t *lib; 1218 nxt_queue_link_t *lnk; 1219 nxt_unit_ctx_impl_t *ctx_impl; 1220 nxt_unit_request_info_impl_t *req_impl; 1221 1222 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1223 1224 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1225 1226 pthread_mutex_lock(&ctx_impl->mutex); 1227 1228 if (nxt_queue_is_empty(&ctx_impl->free_req)) { 1229 pthread_mutex_unlock(&ctx_impl->mutex); 1230 1231 req_impl = malloc(sizeof(nxt_unit_request_info_impl_t) 1232 + lib->request_data_size); 1233 if (nxt_slow_path(req_impl == NULL)) { 1234 return NULL; 1235 } 1236 1237 req_impl->req.unit = ctx->unit; 1238 req_impl->req.ctx = ctx; 1239 1240 pthread_mutex_lock(&ctx_impl->mutex); 1241 1242 } else { 1243 lnk = nxt_queue_first(&ctx_impl->free_req); 1244 nxt_queue_remove(lnk); 1245 1246 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link); 1247 } 1248 1249 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); 1250 1251 pthread_mutex_unlock(&ctx_impl->mutex); 1252 1253 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; 1254 1255 return req_impl; 1256 } 1257 1258 1259 static void 1260 nxt_unit_request_info_release(nxt_unit_request_info_t *req) 1261 { 1262 nxt_unit_ctx_impl_t *ctx_impl; 1263 nxt_unit_request_info_impl_t *req_impl; 1264 1265 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1266 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1267 1268 req->response = NULL; 1269 req->response_buf = NULL; 1270 1271 if (req_impl->websocket) { 1272 nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); 1273 1274 req_impl->websocket = 0; 1275 } 1276 1277 while (req_impl->outgoing_buf != NULL) { 1278 nxt_unit_mmap_buf_free(req_impl->outgoing_buf); 1279 } 1280 1281 while (req_impl->incoming_buf != NULL) { 1282 nxt_unit_mmap_buf_free(req_impl->incoming_buf); 1283 } 1284 1285 if (req->content_fd != -1) { 1286 close(req->content_fd); 1287 1288 req->content_fd = -1; 1289 } 1290 1291 /* 1292 * Process release should go after buffers release to guarantee mmap 1293 * existence. 1294 */ 1295 if (req_impl->process != NULL) { 1296 nxt_unit_process_release(req_impl->process); 1297 1298 req_impl->process = NULL; 1299 } 1300 1301 if (req->response_port != NULL) { 1302 nxt_unit_port_release(req->response_port); 1303 1304 req->response_port = NULL; 1305 } 1306 1307 pthread_mutex_lock(&ctx_impl->mutex); 1308 1309 nxt_queue_remove(&req_impl->link); 1310 1311 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); 1312 1313 pthread_mutex_unlock(&ctx_impl->mutex); 1314 1315 req_impl->state = NXT_UNIT_RS_RELEASED; 1316 } 1317 1318 1319 static void 1320 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 1321 { 1322 nxt_unit_ctx_impl_t *ctx_impl; 1323 1324 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 1325 1326 nxt_queue_remove(&req_impl->link); 1327 1328 if (req_impl != &ctx_impl->req) { 1329 free(req_impl); 1330 } 1331 } 1332 1333 1334 static nxt_unit_websocket_frame_impl_t * 1335 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) 1336 { 1337 nxt_queue_link_t *lnk; 1338 nxt_unit_ctx_impl_t *ctx_impl; 1339 nxt_unit_websocket_frame_impl_t *ws_impl; 1340 1341 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1342 1343 pthread_mutex_lock(&ctx_impl->mutex); 1344 1345 if (nxt_queue_is_empty(&ctx_impl->free_ws)) { 1346 pthread_mutex_unlock(&ctx_impl->mutex); 1347 1348 ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t)); 1349 if (nxt_slow_path(ws_impl == NULL)) { 1350 return NULL; 1351 } 1352 1353 } else { 1354 lnk = nxt_queue_first(&ctx_impl->free_ws); 1355 nxt_queue_remove(lnk); 1356 1357 pthread_mutex_unlock(&ctx_impl->mutex); 1358 1359 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); 1360 } 1361 1362 ws_impl->ctx_impl = ctx_impl; 1363 1364 return ws_impl; 1365 } 1366 1367 1368 static void 1369 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) 1370 { 1371 nxt_unit_websocket_frame_impl_t *ws_impl; 1372 1373 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 1374 1375 while (ws_impl->buf != NULL) { 1376 nxt_unit_mmap_buf_free(ws_impl->buf); 1377 } 1378 1379 ws->req = NULL; 1380 1381 pthread_mutex_lock(&ws_impl->ctx_impl->mutex); 1382 1383 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); 1384 1385 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex); 1386 } 1387 1388 1389 static void 1390 nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl) 1391 { 1392 nxt_queue_remove(&ws_impl->link); 1393 1394 free(ws_impl); 1395 } 1396 1397 1398 uint16_t 1399 nxt_unit_field_hash(const char *name, size_t name_length) 1400 { 1401 u_char ch; 1402 uint32_t hash; 1403 const char *p, *end; 1404 1405 hash = 159406; /* Magic value copied from nxt_http_parse.c */ 1406 end = name + name_length; 1407 1408 for (p = name; p < end; p++) { 1409 ch = *p; 1410 hash = (hash << 4) + hash + nxt_lowcase(ch); 1411 } 1412 1413 hash = (hash >> 16) ^ hash; 1414 1415 return hash; 1416 } 1417 1418 1419 void 1420 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) 1421 { 1422 uint32_t i, j; 1423 nxt_unit_field_t *fields, f; 1424 nxt_unit_request_t *r; 1425 1426 nxt_unit_req_debug(req, "group_dup_fields"); 1427 1428 r = req->request; 1429 fields = r->fields; 1430 1431 for (i = 0; i < r->fields_count; i++) { 1432 1433 switch (fields[i].hash) { 1434 case NXT_UNIT_HASH_CONTENT_LENGTH: 1435 r->content_length_field = i; 1436 break; 1437 1438 case NXT_UNIT_HASH_CONTENT_TYPE: 1439 r->content_type_field = i; 1440 break; 1441 1442 case NXT_UNIT_HASH_COOKIE: 1443 r->cookie_field = i; 1444 break; 1445 }; 1446 1447 for (j = i + 1; j < r->fields_count; j++) { 1448 if (fields[i].hash != fields[j].hash) { 1449 continue; 1450 } 1451 1452 if (j == i + 1) { 1453 continue; 1454 } 1455 1456 f = fields[j]; 1457 f.name.offset += (j - (i + 1)) * sizeof(f); 1458 f.value.offset += (j - (i + 1)) * sizeof(f); 1459 1460 while (j > i + 1) { 1461 fields[j] = fields[j - 1]; 1462 fields[j].name.offset -= sizeof(f); 1463 fields[j].value.offset -= sizeof(f); 1464 j--; 1465 } 1466 1467 fields[j] = f; 1468 1469 i++; 1470 } 1471 } 1472 } 1473 1474 1475 int 1476 nxt_unit_response_init(nxt_unit_request_info_t *req, 1477 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size) 1478 { 1479 uint32_t buf_size; 1480 nxt_unit_buf_t *buf; 1481 nxt_unit_request_info_impl_t *req_impl; 1482 1483 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1484 1485 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1486 nxt_unit_req_warn(req, "init: response already sent"); 1487 1488 return NXT_UNIT_ERROR; 1489 } 1490 1491 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status, 1492 (int) max_fields_count, (int) max_fields_size); 1493 1494 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) { 1495 nxt_unit_req_debug(req, "duplicate response init"); 1496 } 1497 1498 /* 1499 * Each field name and value 0-terminated by libunit, 1500 * this is the reason of '+ 2' below. 1501 */ 1502 buf_size = sizeof(nxt_unit_response_t) 1503 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 1504 + max_fields_size; 1505 1506 if (nxt_slow_path(req->response_buf != NULL)) { 1507 buf = req->response_buf; 1508 1509 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) { 1510 goto init_response; 1511 } 1512 1513 nxt_unit_buf_free(buf); 1514 1515 req->response_buf = NULL; 1516 req->response = NULL; 1517 req->response_max_fields = 0; 1518 1519 req_impl->state = NXT_UNIT_RS_START; 1520 } 1521 1522 buf = nxt_unit_response_buf_alloc(req, buf_size); 1523 if (nxt_slow_path(buf == NULL)) { 1524 return NXT_UNIT_ERROR; 1525 } 1526 1527 init_response: 1528 1529 memset(buf->start, 0, sizeof(nxt_unit_response_t)); 1530 1531 req->response_buf = buf; 1532 1533 req->response = (nxt_unit_response_t *) buf->start; 1534 req->response->status = status; 1535 1536 buf->free = buf->start + sizeof(nxt_unit_response_t) 1537 + max_fields_count * sizeof(nxt_unit_field_t); 1538 1539 req->response_max_fields = max_fields_count; 1540 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT; 1541 1542 return NXT_UNIT_OK; 1543 } 1544 1545 1546 int 1547 nxt_unit_response_realloc(nxt_unit_request_info_t *req, 1548 uint32_t max_fields_count, uint32_t max_fields_size) 1549 { 1550 char *p; 1551 uint32_t i, buf_size; 1552 nxt_unit_buf_t *buf; 1553 nxt_unit_field_t *f, *src; 1554 nxt_unit_response_t *resp; 1555 nxt_unit_request_info_impl_t *req_impl; 1556 1557 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1558 1559 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1560 nxt_unit_req_warn(req, "realloc: response not init"); 1561 1562 return NXT_UNIT_ERROR; 1563 } 1564 1565 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1566 nxt_unit_req_warn(req, "realloc: response already sent"); 1567 1568 return NXT_UNIT_ERROR; 1569 } 1570 1571 if (nxt_slow_path(max_fields_count < req->response->fields_count)) { 1572 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small"); 1573 1574 return NXT_UNIT_ERROR; 1575 } 1576 1577 /* 1578 * Each field name and value 0-terminated by libunit, 1579 * this is the reason of '+ 2' below. 1580 */ 1581 buf_size = sizeof(nxt_unit_response_t) 1582 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 1583 + max_fields_size; 1584 1585 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size); 1586 1587 buf = nxt_unit_response_buf_alloc(req, buf_size); 1588 if (nxt_slow_path(buf == NULL)) { 1589 nxt_unit_req_warn(req, "realloc: new buf allocation failed"); 1590 return NXT_UNIT_ERROR; 1591 } 1592 1593 resp = (nxt_unit_response_t *) buf->start; 1594 1595 memset(resp, 0, sizeof(nxt_unit_response_t)); 1596 1597 resp->status = req->response->status; 1598 resp->content_length = req->response->content_length; 1599 1600 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t); 1601 f = resp->fields; 1602 1603 for (i = 0; i < req->response->fields_count; i++) { 1604 src = req->response->fields + i; 1605 1606 if (nxt_slow_path(src->skip != 0)) { 1607 continue; 1608 } 1609 1610 if (nxt_slow_path(src->name_length + src->value_length + 2 1611 > (uint32_t) (buf->end - p))) 1612 { 1613 nxt_unit_req_warn(req, "realloc: not enough space for field" 1614 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required", 1615 i, src, src->name_length, src->value_length); 1616 1617 goto fail; 1618 } 1619 1620 nxt_unit_sptr_set(&f->name, p); 1621 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length); 1622 *p++ = '\0'; 1623 1624 nxt_unit_sptr_set(&f->value, p); 1625 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length); 1626 *p++ = '\0'; 1627 1628 f->hash = src->hash; 1629 f->skip = 0; 1630 f->name_length = src->name_length; 1631 f->value_length = src->value_length; 1632 1633 resp->fields_count++; 1634 f++; 1635 } 1636 1637 if (req->response->piggyback_content_length > 0) { 1638 if (nxt_slow_path(req->response->piggyback_content_length 1639 > (uint32_t) (buf->end - p))) 1640 { 1641 nxt_unit_req_warn(req, "realloc: not enought space for content" 1642 " #%"PRIu32", %"PRIu32" required", 1643 i, req->response->piggyback_content_length); 1644 1645 goto fail; 1646 } 1647 1648 resp->piggyback_content_length = 1649 req->response->piggyback_content_length; 1650 1651 nxt_unit_sptr_set(&resp->piggyback_content, p); 1652 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), 1653 req->response->piggyback_content_length); 1654 } 1655 1656 buf->free = p; 1657 1658 nxt_unit_buf_free(req->response_buf); 1659 1660 req->response = resp; 1661 req->response_buf = buf; 1662 req->response_max_fields = max_fields_count; 1663 1664 return NXT_UNIT_OK; 1665 1666 fail: 1667 1668 nxt_unit_buf_free(buf); 1669 1670 return NXT_UNIT_ERROR; 1671 } 1672 1673 1674 int 1675 nxt_unit_response_is_init(nxt_unit_request_info_t *req) 1676 { 1677 nxt_unit_request_info_impl_t *req_impl; 1678 1679 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1680 1681 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT; 1682 } 1683 1684 1685 int 1686 nxt_unit_response_add_field(nxt_unit_request_info_t *req, 1687 const char *name, uint8_t name_length, 1688 const char *value, uint32_t value_length) 1689 { 1690 nxt_unit_buf_t *buf; 1691 nxt_unit_field_t *f; 1692 nxt_unit_response_t *resp; 1693 nxt_unit_request_info_impl_t *req_impl; 1694 1695 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1696 1697 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) { 1698 nxt_unit_req_warn(req, "add_field: response not initialized or " 1699 "already sent"); 1700 1701 return NXT_UNIT_ERROR; 1702 } 1703 1704 resp = req->response; 1705 1706 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { 1707 nxt_unit_req_warn(req, "add_field: too many response fields"); 1708 1709 return NXT_UNIT_ERROR; 1710 } 1711 1712 buf = req->response_buf; 1713 1714 if (nxt_slow_path(name_length + value_length + 2 1715 > (uint32_t) (buf->end - buf->free))) 1716 { 1717 nxt_unit_req_warn(req, "add_field: response buffer overflow"); 1718 1719 return NXT_UNIT_ERROR; 1720 } 1721 1722 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s", 1723 resp->fields_count, 1724 (int) name_length, name, 1725 (int) value_length, value); 1726 1727 f = resp->fields + resp->fields_count; 1728 1729 nxt_unit_sptr_set(&f->name, buf->free); 1730 buf->free = nxt_cpymem(buf->free, name, name_length); 1731 *buf->free++ = '\0'; 1732 1733 nxt_unit_sptr_set(&f->value, buf->free); 1734 buf->free = nxt_cpymem(buf->free, value, value_length); 1735 *buf->free++ = '\0'; 1736 1737 f->hash = nxt_unit_field_hash(name, name_length); 1738 f->skip = 0; 1739 f->name_length = name_length; 1740 f->value_length = value_length; 1741 1742 resp->fields_count++; 1743 1744 return NXT_UNIT_OK; 1745 } 1746 1747 1748 int 1749 nxt_unit_response_add_content(nxt_unit_request_info_t *req, 1750 const void* src, uint32_t size) 1751 { 1752 nxt_unit_buf_t *buf; 1753 nxt_unit_response_t *resp; 1754 nxt_unit_request_info_impl_t *req_impl; 1755 1756 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1757 1758 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1759 nxt_unit_req_warn(req, "add_content: response not initialized yet"); 1760 1761 return NXT_UNIT_ERROR; 1762 } 1763 1764 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1765 nxt_unit_req_warn(req, "add_content: response already sent"); 1766 1767 return NXT_UNIT_ERROR; 1768 } 1769 1770 buf = req->response_buf; 1771 1772 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) { 1773 nxt_unit_req_warn(req, "add_content: buffer overflow"); 1774 1775 return NXT_UNIT_ERROR; 1776 } 1777 1778 resp = req->response; 1779 1780 if (resp->piggyback_content_length == 0) { 1781 nxt_unit_sptr_set(&resp->piggyback_content, buf->free); 1782 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT; 1783 } 1784 1785 resp->piggyback_content_length += size; 1786 1787 buf->free = nxt_cpymem(buf->free, src, size); 1788 1789 return NXT_UNIT_OK; 1790 } 1791 1792 1793 int 1794 nxt_unit_response_send(nxt_unit_request_info_t *req) 1795 { 1796 int rc; 1797 nxt_unit_mmap_buf_t *mmap_buf; 1798 nxt_unit_request_info_impl_t *req_impl; 1799 1800 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1801 1802 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1803 nxt_unit_req_warn(req, "send: response is not initialized yet"); 1804 1805 return NXT_UNIT_ERROR; 1806 } 1807 1808 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1809 nxt_unit_req_warn(req, "send: response already sent"); 1810 1811 return NXT_UNIT_ERROR; 1812 } 1813 1814 if (req->request->websocket_handshake && req->response->status == 101) { 1815 nxt_unit_response_upgrade(req); 1816 } 1817 1818 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 1819 req->response->fields_count, 1820 (int) (req->response_buf->free 1821 - req->response_buf->start)); 1822 1823 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 1824 1825 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 1826 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 1827 req->response = NULL; 1828 req->response_buf = NULL; 1829 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 1830 1831 nxt_unit_mmap_buf_free(mmap_buf); 1832 } 1833 1834 return rc; 1835 } 1836 1837 1838 int 1839 nxt_unit_response_is_sent(nxt_unit_request_info_t *req) 1840 { 1841 nxt_unit_request_info_impl_t *req_impl; 1842 1843 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1844 1845 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 1846 } 1847 1848 1849 nxt_unit_buf_t * 1850 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 1851 { 1852 int rc; 1853 nxt_unit_mmap_buf_t *mmap_buf; 1854 nxt_unit_request_info_impl_t *req_impl; 1855 1856 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 1857 nxt_unit_req_warn(req, "response_buf_alloc: " 1858 "requested buffer (%"PRIu32") too big", size); 1859 1860 return NULL; 1861 } 1862 1863 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 1864 1865 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1866 1867 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 1868 if (nxt_slow_path(mmap_buf == NULL)) { 1869 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf"); 1870 1871 return NULL; 1872 } 1873 1874 mmap_buf->req = req; 1875 1876 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); 1877 1878 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 1879 size, size, mmap_buf, 1880 NULL); 1881 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1882 nxt_unit_mmap_buf_release(mmap_buf); 1883 1884 return NULL; 1885 } 1886 1887 return &mmap_buf->buf; 1888 } 1889 1890 1891 static nxt_unit_process_t * 1892 nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1893 { 1894 nxt_unit_impl_t *lib; 1895 1896 if (recv_msg->process != NULL) { 1897 return recv_msg->process; 1898 } 1899 1900 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1901 1902 pthread_mutex_lock(&lib->mutex); 1903 1904 recv_msg->process = nxt_unit_process_find(lib, recv_msg->pid, 0); 1905 1906 pthread_mutex_unlock(&lib->mutex); 1907 1908 if (recv_msg->process == NULL) { 1909 nxt_unit_warn(ctx, "#%"PRIu32": process %d not found", 1910 recv_msg->stream, (int) recv_msg->pid); 1911 } 1912 1913 return recv_msg->process; 1914 } 1915 1916 1917 static nxt_unit_mmap_buf_t * 1918 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 1919 { 1920 nxt_unit_mmap_buf_t *mmap_buf; 1921 nxt_unit_ctx_impl_t *ctx_impl; 1922 1923 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1924 1925 pthread_mutex_lock(&ctx_impl->mutex); 1926 1927 if (ctx_impl->free_buf == NULL) { 1928 pthread_mutex_unlock(&ctx_impl->mutex); 1929 1930 mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t)); 1931 if (nxt_slow_path(mmap_buf == NULL)) { 1932 return NULL; 1933 } 1934 1935 } else { 1936 mmap_buf = ctx_impl->free_buf; 1937 1938 nxt_unit_mmap_buf_unlink(mmap_buf); 1939 1940 pthread_mutex_unlock(&ctx_impl->mutex); 1941 } 1942 1943 mmap_buf->ctx_impl = ctx_impl; 1944 1945 mmap_buf->hdr = NULL; 1946 mmap_buf->free_ptr = NULL; 1947 1948 return mmap_buf; 1949 } 1950 1951 1952 static void 1953 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 1954 { 1955 nxt_unit_mmap_buf_unlink(mmap_buf); 1956 1957 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); 1958 1959 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); 1960 1961 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex); 1962 } 1963 1964 1965 int 1966 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) 1967 { 1968 return req->request->websocket_handshake; 1969 } 1970 1971 1972 int 1973 nxt_unit_response_upgrade(nxt_unit_request_info_t *req) 1974 { 1975 int rc; 1976 nxt_unit_ctx_impl_t *ctx_impl; 1977 nxt_unit_request_info_impl_t *req_impl; 1978 1979 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1980 1981 if (nxt_slow_path(req_impl->websocket != 0)) { 1982 nxt_unit_req_debug(req, "upgrade: already upgraded"); 1983 1984 return NXT_UNIT_OK; 1985 } 1986 1987 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1988 nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); 1989 1990 return NXT_UNIT_ERROR; 1991 } 1992 1993 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1994 nxt_unit_req_warn(req, "upgrade: response already sent"); 1995 1996 return NXT_UNIT_ERROR; 1997 } 1998 1999 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 2000 2001 rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl); 2002 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2003 nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); 2004 2005 return NXT_UNIT_ERROR; 2006 } 2007 2008 req_impl->websocket = 1; 2009 2010 req->response->status = 101; 2011 2012 return NXT_UNIT_OK; 2013 } 2014 2015 2016 int 2017 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) 2018 { 2019 nxt_unit_request_info_impl_t *req_impl; 2020 2021 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2022 2023 return req_impl->websocket; 2024 } 2025 2026 2027 nxt_unit_request_info_t * 2028 nxt_unit_get_request_info_from_data(void *data) 2029 { 2030 nxt_unit_request_info_impl_t *req_impl; 2031 2032 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); 2033 2034 return &req_impl->req; 2035 } 2036 2037 2038 int 2039 nxt_unit_buf_send(nxt_unit_buf_t *buf) 2040 { 2041 int rc; 2042 nxt_unit_mmap_buf_t *mmap_buf; 2043 nxt_unit_request_info_t *req; 2044 nxt_unit_request_info_impl_t *req_impl; 2045 2046 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2047 2048 req = mmap_buf->req; 2049 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2050 2051 nxt_unit_req_debug(req, "buf_send: %d bytes", 2052 (int) (buf->free - buf->start)); 2053 2054 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2055 nxt_unit_req_warn(req, "buf_send: response not initialized yet"); 2056 2057 return NXT_UNIT_ERROR; 2058 } 2059 2060 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 2061 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 2062 2063 return NXT_UNIT_ERROR; 2064 } 2065 2066 if (nxt_fast_path(buf->free > buf->start)) { 2067 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2068 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2069 return rc; 2070 } 2071 } 2072 2073 nxt_unit_mmap_buf_free(mmap_buf); 2074 2075 return NXT_UNIT_OK; 2076 } 2077 2078 2079 static void 2080 nxt_unit_buf_send_done(nxt_unit_buf_t *buf) 2081 { 2082 int rc; 2083 nxt_unit_mmap_buf_t *mmap_buf; 2084 nxt_unit_request_info_t *req; 2085 2086 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2087 2088 req = mmap_buf->req; 2089 2090 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1); 2091 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 2092 nxt_unit_mmap_buf_free(mmap_buf); 2093 2094 nxt_unit_request_info_release(req); 2095 2096 } else { 2097 nxt_unit_request_done(req, rc); 2098 } 2099 } 2100 2101 2102 static int 2103 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 2104 nxt_unit_mmap_buf_t *mmap_buf, int last) 2105 { 2106 struct { 2107 nxt_port_msg_t msg; 2108 nxt_port_mmap_msg_t mmap_msg; 2109 } m; 2110 2111 int rc; 2112 u_char *last_used, *first_free; 2113 ssize_t res; 2114 nxt_chunk_id_t first_free_chunk; 2115 nxt_unit_buf_t *buf; 2116 nxt_unit_impl_t *lib; 2117 nxt_port_mmap_header_t *hdr; 2118 nxt_unit_request_info_impl_t *req_impl; 2119 2120 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 2121 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2122 2123 buf = &mmap_buf->buf; 2124 hdr = mmap_buf->hdr; 2125 2126 m.mmap_msg.size = buf->free - buf->start; 2127 2128 m.msg.stream = req_impl->stream; 2129 m.msg.pid = lib->pid; 2130 m.msg.reply_port = 0; 2131 m.msg.type = _NXT_PORT_MSG_DATA; 2132 m.msg.last = last != 0; 2133 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; 2134 m.msg.nf = 0; 2135 m.msg.mf = 0; 2136 m.msg.tracking = 0; 2137 2138 rc = NXT_UNIT_ERROR; 2139 2140 if (m.msg.mmap) { 2141 m.mmap_msg.mmap_id = hdr->id; 2142 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, 2143 (u_char *) buf->start); 2144 2145 nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 2146 req_impl->stream, 2147 (int) m.mmap_msg.mmap_id, 2148 (int) m.mmap_msg.chunk_id, 2149 (int) m.mmap_msg.size); 2150 2151 res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m), 2152 NULL, 0); 2153 if (nxt_slow_path(res != sizeof(m))) { 2154 goto free_buf; 2155 } 2156 2157 last_used = (u_char *) buf->free - 1; 2158 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 2159 2160 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { 2161 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 2162 2163 buf->start = (char *) first_free; 2164 buf->free = buf->start; 2165 2166 if (buf->end < buf->start) { 2167 buf->end = buf->start; 2168 } 2169 2170 } else { 2171 buf->start = NULL; 2172 buf->free = NULL; 2173 buf->end = NULL; 2174 2175 mmap_buf->hdr = NULL; 2176 } 2177 2178 nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks, 2179 (int) m.mmap_msg.chunk_id - (int) first_free_chunk); 2180 2181 nxt_unit_debug(req->ctx, "process %d allocated_chunks %d", 2182 mmap_buf->process->pid, 2183 (int) mmap_buf->process->outgoing.allocated_chunks); 2184 2185 } else { 2186 if (nxt_slow_path(mmap_buf->plain_ptr == NULL 2187 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) 2188 { 2189 nxt_unit_alert(req->ctx, 2190 "#%"PRIu32": failed to send plain memory buffer" 2191 ": no space reserved for message header", 2192 req_impl->stream); 2193 2194 goto free_buf; 2195 } 2196 2197 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); 2198 2199 nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d", 2200 req_impl->stream, 2201 (int) (sizeof(m.msg) + m.mmap_msg.size)); 2202 2203 res = nxt_unit_port_send(req->ctx, req->response_port, 2204 buf->start - sizeof(m.msg), 2205 m.mmap_msg.size + sizeof(m.msg), 2206 NULL, 0); 2207 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { 2208 goto free_buf; 2209 } 2210 } 2211 2212 rc = NXT_UNIT_OK; 2213 2214 free_buf: 2215 2216 nxt_unit_free_outgoing_buf(mmap_buf); 2217 2218 return rc; 2219 } 2220 2221 2222 void 2223 nxt_unit_buf_free(nxt_unit_buf_t *buf) 2224 { 2225 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); 2226 } 2227 2228 2229 static void 2230 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) 2231 { 2232 nxt_unit_free_outgoing_buf(mmap_buf); 2233 2234 nxt_unit_mmap_buf_release(mmap_buf); 2235 } 2236 2237 2238 static void 2239 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) 2240 { 2241 if (mmap_buf->hdr != NULL) { 2242 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, 2243 mmap_buf->process, 2244 mmap_buf->hdr, mmap_buf->buf.start, 2245 mmap_buf->buf.end - mmap_buf->buf.start); 2246 2247 mmap_buf->hdr = NULL; 2248 2249 return; 2250 } 2251 2252 if (mmap_buf->free_ptr != NULL) { 2253 free(mmap_buf->free_ptr); 2254 2255 mmap_buf->free_ptr = NULL; 2256 } 2257 } 2258 2259 2260 static nxt_unit_read_buf_t * 2261 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) 2262 { 2263 nxt_unit_ctx_impl_t *ctx_impl; 2264 2265 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2266 2267 pthread_mutex_lock(&ctx_impl->mutex); 2268 2269 return nxt_unit_read_buf_get_impl(ctx_impl); 2270 } 2271 2272 2273 static nxt_unit_read_buf_t * 2274 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) 2275 { 2276 nxt_unit_read_buf_t *rbuf; 2277 2278 if (ctx_impl->free_read_buf != NULL) { 2279 rbuf = ctx_impl->free_read_buf; 2280 ctx_impl->free_read_buf = rbuf->next; 2281 2282 pthread_mutex_unlock(&ctx_impl->mutex); 2283 2284 return rbuf; 2285 } 2286 2287 pthread_mutex_unlock(&ctx_impl->mutex); 2288 2289 rbuf = malloc(sizeof(nxt_unit_read_buf_t)); 2290 2291 return rbuf; 2292 } 2293 2294 2295 static void 2296 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 2297 nxt_unit_read_buf_t *rbuf) 2298 { 2299 nxt_unit_ctx_impl_t *ctx_impl; 2300 2301 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2302 2303 pthread_mutex_lock(&ctx_impl->mutex); 2304 2305 rbuf->next = ctx_impl->free_read_buf; 2306 ctx_impl->free_read_buf = rbuf; 2307 2308 pthread_mutex_unlock(&ctx_impl->mutex); 2309 } 2310 2311 2312 nxt_unit_buf_t * 2313 nxt_unit_buf_next(nxt_unit_buf_t *buf) 2314 { 2315 nxt_unit_mmap_buf_t *mmap_buf; 2316 2317 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2318 2319 if (mmap_buf->next == NULL) { 2320 return NULL; 2321 } 2322 2323 return &mmap_buf->next->buf; 2324 } 2325 2326 2327 uint32_t 2328 nxt_unit_buf_max(void) 2329 { 2330 return PORT_MMAP_DATA_SIZE; 2331 } 2332 2333 2334 uint32_t 2335 nxt_unit_buf_min(void) 2336 { 2337 return PORT_MMAP_CHUNK_SIZE; 2338 } 2339 2340 2341 int 2342 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 2343 size_t size) 2344 { 2345 ssize_t res; 2346 2347 res = nxt_unit_response_write_nb(req, start, size, size); 2348 2349 return res < 0 ? -res : NXT_UNIT_OK; 2350 } 2351 2352 2353 ssize_t 2354 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, 2355 size_t size, size_t min_size) 2356 { 2357 int rc; 2358 ssize_t sent; 2359 uint32_t part_size, min_part_size, buf_size; 2360 const char *part_start; 2361 nxt_unit_mmap_buf_t mmap_buf; 2362 nxt_unit_request_info_impl_t *req_impl; 2363 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2364 2365 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2366 2367 part_start = start; 2368 sent = 0; 2369 2370 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2371 nxt_unit_req_alert(req, "write: response not initialized yet"); 2372 2373 return -NXT_UNIT_ERROR; 2374 } 2375 2376 /* Check if response is not send yet. */ 2377 if (nxt_slow_path(req->response_buf != NULL)) { 2378 part_size = req->response_buf->end - req->response_buf->free; 2379 part_size = nxt_min(size, part_size); 2380 2381 rc = nxt_unit_response_add_content(req, part_start, part_size); 2382 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2383 return -rc; 2384 } 2385 2386 rc = nxt_unit_response_send(req); 2387 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2388 return -rc; 2389 } 2390 2391 size -= part_size; 2392 part_start += part_size; 2393 sent += part_size; 2394 2395 min_size -= nxt_min(min_size, part_size); 2396 } 2397 2398 while (size > 0) { 2399 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 2400 min_part_size = nxt_min(min_size, part_size); 2401 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); 2402 2403 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size, 2404 min_part_size, &mmap_buf, local_buf); 2405 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2406 return -rc; 2407 } 2408 2409 buf_size = mmap_buf.buf.end - mmap_buf.buf.free; 2410 if (nxt_slow_path(buf_size == 0)) { 2411 return sent; 2412 } 2413 part_size = nxt_min(buf_size, part_size); 2414 2415 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 2416 part_start, part_size); 2417 2418 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2419 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2420 return -rc; 2421 } 2422 2423 size -= part_size; 2424 part_start += part_size; 2425 sent += part_size; 2426 2427 min_size -= nxt_min(min_size, part_size); 2428 } 2429 2430 return sent; 2431 } 2432 2433 2434 int 2435 nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 2436 nxt_unit_read_info_t *read_info) 2437 { 2438 int rc; 2439 ssize_t n; 2440 uint32_t buf_size; 2441 nxt_unit_buf_t *buf; 2442 nxt_unit_mmap_buf_t mmap_buf; 2443 nxt_unit_request_info_impl_t *req_impl; 2444 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2445 2446 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2447 2448 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2449 nxt_unit_req_alert(req, "write: response not initialized yet"); 2450 2451 return NXT_UNIT_ERROR; 2452 } 2453 2454 /* Check if response is not send yet. */ 2455 if (nxt_slow_path(req->response_buf != NULL)) { 2456 2457 /* Enable content in headers buf. */ 2458 rc = nxt_unit_response_add_content(req, "", 0); 2459 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2460 nxt_unit_req_error(req, "Failed to add piggyback content"); 2461 2462 return rc; 2463 } 2464 2465 buf = req->response_buf; 2466 2467 while (buf->end - buf->free > 0) { 2468 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2469 if (nxt_slow_path(n < 0)) { 2470 nxt_unit_req_error(req, "Read error"); 2471 2472 return NXT_UNIT_ERROR; 2473 } 2474 2475 /* Manually increase sizes. */ 2476 buf->free += n; 2477 req->response->piggyback_content_length += n; 2478 2479 if (read_info->eof) { 2480 break; 2481 } 2482 } 2483 2484 rc = nxt_unit_response_send(req); 2485 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2486 nxt_unit_req_error(req, "Failed to send headers with content"); 2487 2488 return rc; 2489 } 2490 2491 if (read_info->eof) { 2492 return NXT_UNIT_OK; 2493 } 2494 } 2495 2496 while (!read_info->eof) { 2497 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", 2498 read_info->buf_size); 2499 2500 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); 2501 2502 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2503 buf_size, buf_size, 2504 &mmap_buf, local_buf); 2505 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2506 return rc; 2507 } 2508 2509 buf = &mmap_buf.buf; 2510 2511 while (!read_info->eof && buf->end > buf->free) { 2512 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2513 if (nxt_slow_path(n < 0)) { 2514 nxt_unit_req_error(req, "Read error"); 2515 2516 nxt_unit_free_outgoing_buf(&mmap_buf); 2517 2518 return NXT_UNIT_ERROR; 2519 } 2520 2521 buf->free += n; 2522 } 2523 2524 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2525 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2526 nxt_unit_req_error(req, "Failed to send content"); 2527 2528 return rc; 2529 } 2530 } 2531 2532 return NXT_UNIT_OK; 2533 } 2534 2535 2536 ssize_t 2537 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 2538 { 2539 ssize_t buf_res, res; 2540 2541 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, 2542 dst, size); 2543 2544 if (buf_res < (ssize_t) size && req->content_fd != -1) { 2545 res = read(req->content_fd, dst, size); 2546 if (res < 0) { 2547 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 2548 strerror(errno), errno); 2549 2550 return res; 2551 } 2552 2553 if (res < (ssize_t) size) { 2554 close(req->content_fd); 2555 2556 req->content_fd = -1; 2557 } 2558 2559 req->content_length -= res; 2560 size -= res; 2561 2562 dst = nxt_pointer_to(dst, res); 2563 2564 } else { 2565 res = 0; 2566 } 2567 2568 return buf_res + res; 2569 } 2570 2571 2572 ssize_t 2573 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size) 2574 { 2575 char *p; 2576 size_t l_size, b_size; 2577 nxt_unit_buf_t *b; 2578 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf; 2579 2580 if (req->content_length == 0) { 2581 return 0; 2582 } 2583 2584 l_size = 0; 2585 2586 b = req->content_buf; 2587 2588 while (b != NULL) { 2589 b_size = b->end - b->free; 2590 p = memchr(b->free, '\n', b_size); 2591 2592 if (p != NULL) { 2593 p++; 2594 l_size += p - b->free; 2595 break; 2596 } 2597 2598 l_size += b_size; 2599 2600 if (max_size <= l_size) { 2601 break; 2602 } 2603 2604 mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf); 2605 if (mmap_buf->next == NULL 2606 && req->content_fd != -1 2607 && l_size < req->content_length) 2608 { 2609 preread_buf = nxt_unit_request_preread(req, 16384); 2610 if (nxt_slow_path(preread_buf == NULL)) { 2611 return -1; 2612 } 2613 2614 nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf); 2615 } 2616 2617 b = nxt_unit_buf_next(b); 2618 } 2619 2620 return nxt_min(max_size, l_size); 2621 } 2622 2623 2624 static nxt_unit_mmap_buf_t * 2625 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) 2626 { 2627 ssize_t res; 2628 nxt_unit_mmap_buf_t *mmap_buf; 2629 2630 if (req->content_fd == -1) { 2631 nxt_unit_req_alert(req, "preread: content_fd == -1"); 2632 return NULL; 2633 } 2634 2635 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 2636 if (nxt_slow_path(mmap_buf == NULL)) { 2637 nxt_unit_req_alert(req, "preread: failed to allocate buf"); 2638 return NULL; 2639 } 2640 2641 mmap_buf->free_ptr = malloc(size); 2642 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 2643 nxt_unit_req_alert(req, "preread: failed to allocate buf memory"); 2644 nxt_unit_mmap_buf_release(mmap_buf); 2645 return NULL; 2646 } 2647 2648 mmap_buf->plain_ptr = mmap_buf->free_ptr; 2649 2650 mmap_buf->hdr = NULL; 2651 mmap_buf->buf.start = mmap_buf->free_ptr; 2652 mmap_buf->buf.free = mmap_buf->buf.start; 2653 mmap_buf->buf.end = mmap_buf->buf.start + size; 2654 mmap_buf->process = NULL; 2655 2656 res = read(req->content_fd, mmap_buf->free_ptr, size); 2657 if (res < 0) { 2658 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 2659 strerror(errno), errno); 2660 2661 nxt_unit_mmap_buf_free(mmap_buf); 2662 2663 return NULL; 2664 } 2665 2666 if (res < (ssize_t) size) { 2667 close(req->content_fd); 2668 2669 req->content_fd = -1; 2670 } 2671 2672 nxt_unit_req_debug(req, "preread: read %d", (int) res); 2673 2674 mmap_buf->buf.end = mmap_buf->buf.free + res; 2675 2676 return mmap_buf; 2677 } 2678 2679 2680 static ssize_t 2681 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) 2682 { 2683 u_char *p; 2684 size_t rest, copy, read; 2685 nxt_unit_buf_t *buf, *last_buf; 2686 2687 p = dst; 2688 rest = size; 2689 2690 buf = *b; 2691 last_buf = buf; 2692 2693 while (buf != NULL) { 2694 last_buf = buf; 2695 2696 copy = buf->end - buf->free; 2697 copy = nxt_min(rest, copy); 2698 2699 p = nxt_cpymem(p, buf->free, copy); 2700 2701 buf->free += copy; 2702 rest -= copy; 2703 2704 if (rest == 0) { 2705 if (buf->end == buf->free) { 2706 buf = nxt_unit_buf_next(buf); 2707 } 2708 2709 break; 2710 } 2711 2712 buf = nxt_unit_buf_next(buf); 2713 } 2714 2715 *b = last_buf; 2716 2717 read = size - rest; 2718 2719 *len -= read; 2720 2721 return read; 2722 } 2723 2724 2725 void 2726 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 2727 { 2728 uint32_t size; 2729 nxt_port_msg_t msg; 2730 nxt_unit_impl_t *lib; 2731 nxt_unit_request_info_impl_t *req_impl; 2732 2733 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2734 2735 nxt_unit_req_debug(req, "done: %d", rc); 2736 2737 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2738 goto skip_response_send; 2739 } 2740 2741 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2742 2743 size = nxt_length("Content-Type") + nxt_length("text/plain"); 2744 2745 rc = nxt_unit_response_init(req, 200, 1, size); 2746 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2747 goto skip_response_send; 2748 } 2749 2750 rc = nxt_unit_response_add_field(req, "Content-Type", 2751 nxt_length("Content-Type"), 2752 "text/plain", nxt_length("text/plain")); 2753 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2754 goto skip_response_send; 2755 } 2756 } 2757 2758 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 2759 2760 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 2761 2762 nxt_unit_buf_send_done(req->response_buf); 2763 2764 return; 2765 } 2766 2767 skip_response_send: 2768 2769 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); 2770 2771 msg.stream = req_impl->stream; 2772 msg.pid = lib->pid; 2773 msg.reply_port = 0; 2774 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 2775 : _NXT_PORT_MSG_RPC_ERROR; 2776 msg.last = 1; 2777 msg.mmap = 0; 2778 msg.nf = 0; 2779 msg.mf = 0; 2780 msg.tracking = 0; 2781 2782 (void) nxt_unit_port_send(req->ctx, req->response_port, 2783 &msg, sizeof(msg), NULL, 0); 2784 2785 nxt_unit_request_info_release(req); 2786 } 2787 2788 2789 int 2790 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 2791 uint8_t last, const void *start, size_t size) 2792 { 2793 const struct iovec iov = { (void *) start, size }; 2794 2795 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1); 2796 } 2797 2798 2799 int 2800 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, 2801 uint8_t last, const struct iovec *iov, int iovcnt) 2802 { 2803 int i, rc; 2804 size_t l, copy; 2805 uint32_t payload_len, buf_size, alloc_size; 2806 const uint8_t *b; 2807 nxt_unit_buf_t *buf; 2808 nxt_unit_mmap_buf_t mmap_buf; 2809 nxt_websocket_header_t *wh; 2810 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2811 2812 payload_len = 0; 2813 2814 for (i = 0; i < iovcnt; i++) { 2815 payload_len += iov[i].iov_len; 2816 } 2817 2818 buf_size = 10 + payload_len; 2819 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 2820 2821 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2822 alloc_size, alloc_size, 2823 &mmap_buf, local_buf); 2824 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2825 return rc; 2826 } 2827 2828 buf = &mmap_buf.buf; 2829 2830 buf->start[0] = 0; 2831 buf->start[1] = 0; 2832 2833 buf_size -= buf->end - buf->start; 2834 2835 wh = (void *) buf->free; 2836 2837 buf->free = nxt_websocket_frame_init(wh, payload_len); 2838 wh->fin = last; 2839 wh->opcode = opcode; 2840 2841 for (i = 0; i < iovcnt; i++) { 2842 b = iov[i].iov_base; 2843 l = iov[i].iov_len; 2844 2845 while (l > 0) { 2846 copy = buf->end - buf->free; 2847 copy = nxt_min(l, copy); 2848 2849 buf->free = nxt_cpymem(buf->free, b, copy); 2850 b += copy; 2851 l -= copy; 2852 2853 if (l > 0) { 2854 if (nxt_fast_path(buf->free > buf->start)) { 2855 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2856 2857 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2858 return rc; 2859 } 2860 } 2861 2862 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 2863 2864 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2865 alloc_size, alloc_size, 2866 &mmap_buf, local_buf); 2867 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2868 return rc; 2869 } 2870 2871 buf_size -= buf->end - buf->start; 2872 } 2873 } 2874 } 2875 2876 if (buf->free > buf->start) { 2877 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2878 } 2879 2880 return rc; 2881 } 2882 2883 2884 ssize_t 2885 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, 2886 size_t size) 2887 { 2888 ssize_t res; 2889 uint8_t *b; 2890 uint64_t i, d; 2891 2892 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length, 2893 dst, size); 2894 2895 if (ws->mask == NULL) { 2896 return res; 2897 } 2898 2899 b = dst; 2900 d = (ws->payload_len - ws->content_length - res) % 4; 2901 2902 for (i = 0; i < (uint64_t) res; i++) { 2903 b[i] ^= ws->mask[ (i + d) % 4 ]; 2904 } 2905 2906 return res; 2907 } 2908 2909 2910 int 2911 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) 2912 { 2913 char *b; 2914 size_t size; 2915 nxt_unit_websocket_frame_impl_t *ws_impl; 2916 2917 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 2918 2919 if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) { 2920 return NXT_UNIT_OK; 2921 } 2922 2923 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start; 2924 2925 b = malloc(size); 2926 if (nxt_slow_path(b == NULL)) { 2927 return NXT_UNIT_ERROR; 2928 } 2929 2930 memcpy(b, ws_impl->buf->buf.start, size); 2931 2932 ws_impl->buf->buf.start = b; 2933 ws_impl->buf->buf.free = b; 2934 ws_impl->buf->buf.end = b + size; 2935 2936 ws_impl->buf->free_ptr = b; 2937 2938 return NXT_UNIT_OK; 2939 } 2940 2941 2942 void 2943 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) 2944 { 2945 nxt_unit_websocket_frame_release(ws); 2946 } 2947 2948 2949 static nxt_port_mmap_header_t * 2950 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 2951 nxt_chunk_id_t *c, int *n, int min_n) 2952 { 2953 int res, nchunks, i; 2954 uint32_t outgoing_size; 2955 nxt_unit_mmap_t *mm, *mm_end; 2956 nxt_unit_impl_t *lib; 2957 nxt_unit_process_t *process; 2958 nxt_port_mmap_header_t *hdr; 2959 2960 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2961 2962 process = nxt_unit_port_process(port); 2963 if (nxt_slow_path(process == NULL)) { 2964 nxt_unit_alert(ctx, "mmap_get: port %d,%d already closed", 2965 (int) port->id.pid, (int) port->id.id); 2966 2967 return NULL; 2968 } 2969 2970 pthread_mutex_lock(&process->outgoing.mutex); 2971 2972 retry: 2973 2974 outgoing_size = process->outgoing.size; 2975 2976 mm_end = process->outgoing.elts + outgoing_size; 2977 2978 for (mm = process->outgoing.elts; mm < mm_end; mm++) { 2979 hdr = mm->hdr; 2980 2981 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id.id) { 2982 continue; 2983 } 2984 2985 *c = 0; 2986 2987 while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) { 2988 nchunks = 1; 2989 2990 while (nchunks < *n) { 2991 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, 2992 *c + nchunks); 2993 2994 if (res == 0) { 2995 if (nchunks >= min_n) { 2996 *n = nchunks; 2997 2998 goto unlock; 2999 } 3000 3001 for (i = 0; i < nchunks; i++) { 3002 nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i); 3003 } 3004 3005 *c += nchunks + 1; 3006 nchunks = 0; 3007 break; 3008 } 3009 3010 nchunks++; 3011 } 3012 3013 if (nchunks >= min_n) { 3014 *n = nchunks; 3015 3016 goto unlock; 3017 } 3018 } 3019 3020 hdr->oosm = 1; 3021 } 3022 3023 if (outgoing_size >= lib->shm_mmap_limit) { 3024 /* Cannot allocate more shared memory. */ 3025 pthread_mutex_unlock(&process->outgoing.mutex); 3026 3027 if (min_n == 0) { 3028 *n = 0; 3029 } 3030 3031 if (nxt_slow_path(process->outgoing.allocated_chunks + min_n 3032 >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) 3033 { 3034 /* Memory allocated by application, but not send to router. */ 3035 return NULL; 3036 } 3037 3038 /* Notify router about OOSM condition. */ 3039 3040 res = nxt_unit_send_oosm(ctx, port); 3041 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3042 return NULL; 3043 } 3044 3045 /* Return if caller can handle OOSM condition. Non-blocking mode. */ 3046 3047 if (min_n == 0) { 3048 return NULL; 3049 } 3050 3051 nxt_unit_debug(ctx, "oosm: waiting for ACK"); 3052 3053 res = nxt_unit_wait_shm_ack(ctx); 3054 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3055 return NULL; 3056 } 3057 3058 nxt_unit_debug(ctx, "oosm: retry"); 3059 3060 pthread_mutex_lock(&process->outgoing.mutex); 3061 3062 goto retry; 3063 } 3064 3065 *c = 0; 3066 hdr = nxt_unit_new_mmap(ctx, port, *n); 3067 3068 unlock: 3069 3070 nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n); 3071 3072 nxt_unit_debug(ctx, "process %d allocated_chunks %d", 3073 process->pid, 3074 (int) process->outgoing.allocated_chunks); 3075 3076 pthread_mutex_unlock(&process->outgoing.mutex); 3077 3078 return hdr; 3079 } 3080 3081 3082 static int 3083 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 3084 { 3085 ssize_t res; 3086 nxt_port_msg_t msg; 3087 nxt_unit_impl_t *lib; 3088 3089 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3090 3091 msg.stream = 0; 3092 msg.pid = lib->pid; 3093 msg.reply_port = 0; 3094 msg.type = _NXT_PORT_MSG_OOSM; 3095 msg.last = 0; 3096 msg.mmap = 0; 3097 msg.nf = 0; 3098 msg.mf = 0; 3099 msg.tracking = 0; 3100 3101 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); 3102 if (nxt_slow_path(res != sizeof(msg))) { 3103 return NXT_UNIT_ERROR; 3104 } 3105 3106 return NXT_UNIT_OK; 3107 } 3108 3109 3110 static int 3111 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) 3112 { 3113 nxt_port_msg_t *port_msg; 3114 nxt_unit_ctx_impl_t *ctx_impl; 3115 nxt_unit_read_buf_t *rbuf; 3116 3117 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3118 3119 while (1) { 3120 rbuf = nxt_unit_read_buf_get(ctx); 3121 if (nxt_slow_path(rbuf == NULL)) { 3122 return NXT_UNIT_ERROR; 3123 } 3124 3125 nxt_unit_read_buf(ctx, rbuf); 3126 3127 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 3128 nxt_unit_read_buf_release(ctx, rbuf); 3129 3130 return NXT_UNIT_ERROR; 3131 } 3132 3133 port_msg = (nxt_port_msg_t *) rbuf->buf; 3134 3135 if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) { 3136 nxt_unit_read_buf_release(ctx, rbuf); 3137 3138 break; 3139 } 3140 3141 pthread_mutex_lock(&ctx_impl->mutex); 3142 3143 *ctx_impl->pending_read_tail = rbuf; 3144 ctx_impl->pending_read_tail = &rbuf->next; 3145 rbuf->next = NULL; 3146 3147 pthread_mutex_unlock(&ctx_impl->mutex); 3148 3149 if (port_msg->type == _NXT_PORT_MSG_QUIT) { 3150 nxt_unit_debug(ctx, "oosm: quit received"); 3151 3152 return NXT_UNIT_ERROR; 3153 } 3154 } 3155 3156 return NXT_UNIT_OK; 3157 } 3158 3159 3160 static nxt_unit_mmap_t * 3161 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) 3162 { 3163 uint32_t cap; 3164 3165 cap = mmaps->cap; 3166 3167 if (cap == 0) { 3168 cap = i + 1; 3169 } 3170 3171 while (i + 1 > cap) { 3172 3173 if (cap < 16) { 3174 cap = cap * 2; 3175 3176 } else { 3177 cap = cap + cap / 2; 3178 } 3179 } 3180 3181 if (cap != mmaps->cap) { 3182 3183 mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts)); 3184 if (nxt_slow_path(mmaps->elts == NULL)) { 3185 return NULL; 3186 } 3187 3188 memset(mmaps->elts + mmaps->cap, 0, 3189 sizeof(*mmaps->elts) * (cap - mmaps->cap)); 3190 3191 mmaps->cap = cap; 3192 } 3193 3194 if (i + 1 > mmaps->size) { 3195 mmaps->size = i + 1; 3196 } 3197 3198 return mmaps->elts + i; 3199 } 3200 3201 3202 static nxt_port_mmap_header_t * 3203 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) 3204 { 3205 int i, fd, rc; 3206 void *mem; 3207 char name[64]; 3208 nxt_unit_mmap_t *mm; 3209 nxt_unit_impl_t *lib; 3210 nxt_unit_process_t *process; 3211 nxt_port_mmap_header_t *hdr; 3212 3213 process = nxt_unit_port_process(port); 3214 if (nxt_slow_path(process == NULL)) { 3215 nxt_unit_alert(ctx, "new_mmap: port %d,%d already closed", 3216 (int) port->id.pid, (int) port->id.id); 3217 3218 return NULL; 3219 } 3220 3221 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3222 3223 mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size); 3224 if (nxt_slow_path(mm == NULL)) { 3225 nxt_unit_alert(ctx, "failed to add mmap to outgoing array"); 3226 3227 return NULL; 3228 } 3229 3230 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", 3231 lib->pid, (void *) pthread_self()); 3232 3233 #if (NXT_HAVE_MEMFD_CREATE) 3234 3235 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 3236 if (nxt_slow_path(fd == -1)) { 3237 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, 3238 strerror(errno), errno); 3239 3240 goto remove_fail; 3241 } 3242 3243 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); 3244 3245 #elif (NXT_HAVE_SHM_OPEN_ANON) 3246 3247 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 3248 if (nxt_slow_path(fd == -1)) { 3249 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", 3250 strerror(errno), errno); 3251 3252 goto remove_fail; 3253 } 3254 3255 #elif (NXT_HAVE_SHM_OPEN) 3256 3257 /* Just in case. */ 3258 shm_unlink(name); 3259 3260 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 3261 if (nxt_slow_path(fd == -1)) { 3262 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, 3263 strerror(errno), errno); 3264 3265 goto remove_fail; 3266 } 3267 3268 if (nxt_slow_path(shm_unlink(name) == -1)) { 3269 nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name, 3270 strerror(errno), errno); 3271 } 3272 3273 #else 3274 3275 #error No working shared memory implementation. 3276 3277 #endif 3278 3279 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { 3280 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, 3281 strerror(errno), errno); 3282 3283 goto remove_fail; 3284 } 3285 3286 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 3287 if (nxt_slow_path(mem == MAP_FAILED)) { 3288 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, 3289 strerror(errno), errno); 3290 3291 goto remove_fail; 3292 } 3293 3294 mm->hdr = mem; 3295 hdr = mem; 3296 3297 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 3298 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 3299 3300 hdr->id = process->outgoing.size - 1; 3301 hdr->src_pid = lib->pid; 3302 hdr->dst_pid = process->pid; 3303 hdr->sent_over = port->id.id; 3304 3305 /* Mark first n chunk(s) as busy */ 3306 for (i = 0; i < n; i++) { 3307 nxt_port_mmap_set_chunk_busy(hdr->free_map, i); 3308 } 3309 3310 /* Mark as busy chunk followed the last available chunk. */ 3311 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 3312 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 3313 3314 pthread_mutex_unlock(&process->outgoing.mutex); 3315 3316 rc = nxt_unit_send_mmap(ctx, port, fd); 3317 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3318 munmap(mem, PORT_MMAP_SIZE); 3319 hdr = NULL; 3320 3321 } else { 3322 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", 3323 hdr->id, (int) lib->pid, (int) process->pid); 3324 } 3325 3326 close(fd); 3327 3328 pthread_mutex_lock(&process->outgoing.mutex); 3329 3330 if (nxt_fast_path(hdr != NULL)) { 3331 return hdr; 3332 } 3333 3334 remove_fail: 3335 3336 process->outgoing.size--; 3337 3338 return NULL; 3339 } 3340 3341 3342 static int 3343 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) 3344 { 3345 ssize_t res; 3346 nxt_port_msg_t msg; 3347 nxt_unit_impl_t *lib; 3348 union { 3349 struct cmsghdr cm; 3350 char space[CMSG_SPACE(sizeof(int))]; 3351 } cmsg; 3352 3353 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3354 3355 msg.stream = 0; 3356 msg.pid = lib->pid; 3357 msg.reply_port = 0; 3358 msg.type = _NXT_PORT_MSG_MMAP; 3359 msg.last = 0; 3360 msg.mmap = 0; 3361 msg.nf = 0; 3362 msg.mf = 0; 3363 msg.tracking = 0; 3364 3365 /* 3366 * Fill all padding fields with 0. 3367 * Code in Go 1.11 validate cmsghdr using padding field as part of len. 3368 * See Cmsghdr definition and socketControlMessageHeaderAndData function. 3369 */ 3370 memset(&cmsg, 0, sizeof(cmsg)); 3371 3372 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 3373 cmsg.cm.cmsg_level = SOL_SOCKET; 3374 cmsg.cm.cmsg_type = SCM_RIGHTS; 3375 3376 /* 3377 * memcpy() is used instead of simple 3378 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 3379 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 3380 * dereferencing type-punned pointer will break strict-aliasing rules 3381 * 3382 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 3383 * in the same simple assignment as in the code above. 3384 */ 3385 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 3386 3387 res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), 3388 &cmsg, sizeof(cmsg)); 3389 if (nxt_slow_path(res != sizeof(msg))) { 3390 return NXT_UNIT_ERROR; 3391 } 3392 3393 return NXT_UNIT_OK; 3394 } 3395 3396 3397 static int 3398 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 3399 uint32_t size, uint32_t min_size, 3400 nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) 3401 { 3402 int nchunks, min_nchunks; 3403 nxt_chunk_id_t c; 3404 nxt_port_mmap_header_t *hdr; 3405 3406 if (size <= NXT_UNIT_MAX_PLAIN_SIZE) { 3407 if (local_buf != NULL) { 3408 mmap_buf->free_ptr = NULL; 3409 mmap_buf->plain_ptr = local_buf; 3410 3411 } else { 3412 mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t)); 3413 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 3414 return NXT_UNIT_ERROR; 3415 } 3416 3417 mmap_buf->plain_ptr = mmap_buf->free_ptr; 3418 } 3419 3420 mmap_buf->hdr = NULL; 3421 mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); 3422 mmap_buf->buf.free = mmap_buf->buf.start; 3423 mmap_buf->buf.end = mmap_buf->buf.start + size; 3424 mmap_buf->process = nxt_unit_port_process(port); 3425 3426 nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", 3427 mmap_buf->buf.start, (int) size); 3428 3429 return NXT_UNIT_OK; 3430 } 3431 3432 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3433 min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3434 3435 hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks); 3436 if (nxt_slow_path(hdr == NULL)) { 3437 if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) { 3438 mmap_buf->hdr = NULL; 3439 mmap_buf->buf.start = NULL; 3440 mmap_buf->buf.free = NULL; 3441 mmap_buf->buf.end = NULL; 3442 mmap_buf->free_ptr = NULL; 3443 3444 return NXT_UNIT_OK; 3445 } 3446 3447 return NXT_UNIT_ERROR; 3448 } 3449 3450 mmap_buf->hdr = hdr; 3451 mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); 3452 mmap_buf->buf.free = mmap_buf->buf.start; 3453 mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; 3454 mmap_buf->process = nxt_unit_port_process(port); 3455 mmap_buf->free_ptr = NULL; 3456 mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3457 3458 nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", 3459 (int) hdr->id, (int) c, 3460 (int) (nchunks * PORT_MMAP_CHUNK_SIZE)); 3461 3462 return NXT_UNIT_OK; 3463 } 3464 3465 3466 static int 3467 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) 3468 { 3469 int rc; 3470 void *mem; 3471 struct stat mmap_stat; 3472 nxt_unit_mmap_t *mm; 3473 nxt_unit_impl_t *lib; 3474 nxt_unit_process_t *process; 3475 nxt_port_mmap_header_t *hdr; 3476 3477 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3478 3479 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); 3480 3481 pthread_mutex_lock(&lib->mutex); 3482 3483 process = nxt_unit_process_find(lib, pid, 0); 3484 3485 pthread_mutex_unlock(&lib->mutex); 3486 3487 if (nxt_slow_path(process == NULL)) { 3488 nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d", 3489 (int) pid, fd); 3490 3491 return NXT_UNIT_ERROR; 3492 } 3493 3494 rc = NXT_UNIT_ERROR; 3495 3496 if (fstat(fd, &mmap_stat) == -1) { 3497 nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, 3498 strerror(errno), errno); 3499 3500 goto fail; 3501 } 3502 3503 mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE, 3504 MAP_SHARED, fd, 0); 3505 if (nxt_slow_path(mem == MAP_FAILED)) { 3506 nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)", 3507 strerror(errno), errno); 3508 3509 goto fail; 3510 } 3511 3512 hdr = mem; 3513 3514 if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) { 3515 3516 nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " 3517 "detected: %d != %d or %d != %d", (int) hdr->src_pid, 3518 (int) pid, (int) hdr->dst_pid, (int) lib->pid); 3519 3520 munmap(mem, PORT_MMAP_SIZE); 3521 3522 goto fail; 3523 } 3524 3525 pthread_mutex_lock(&process->incoming.mutex); 3526 3527 mm = nxt_unit_mmap_at(&process->incoming, hdr->id); 3528 if (nxt_slow_path(mm == NULL)) { 3529 nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array"); 3530 3531 munmap(mem, PORT_MMAP_SIZE); 3532 3533 } else { 3534 mm->hdr = hdr; 3535 3536 hdr->sent_over = 0xFFFFu; 3537 3538 rc = NXT_UNIT_OK; 3539 } 3540 3541 pthread_mutex_unlock(&process->incoming.mutex); 3542 3543 fail: 3544 3545 nxt_unit_process_release(process); 3546 3547 return rc; 3548 } 3549 3550 3551 static void 3552 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) 3553 { 3554 pthread_mutex_init(&mmaps->mutex, NULL); 3555 3556 mmaps->size = 0; 3557 mmaps->cap = 0; 3558 mmaps->elts = NULL; 3559 mmaps->allocated_chunks = 0; 3560 } 3561 3562 3563 nxt_inline void 3564 nxt_unit_process_use(nxt_unit_process_t *process) 3565 { 3566 nxt_atomic_fetch_add(&process->use_count, 1); 3567 } 3568 3569 3570 nxt_inline void 3571 nxt_unit_process_release(nxt_unit_process_t *process) 3572 { 3573 long c; 3574 3575 c = nxt_atomic_fetch_add(&process->use_count, -1); 3576 3577 if (c == 1) { 3578 nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid); 3579 3580 nxt_unit_mmaps_destroy(&process->incoming); 3581 nxt_unit_mmaps_destroy(&process->outgoing); 3582 3583 free(process); 3584 } 3585 } 3586 3587 3588 static void 3589 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) 3590 { 3591 nxt_unit_mmap_t *mm, *end; 3592 3593 if (mmaps->elts != NULL) { 3594 end = mmaps->elts + mmaps->size; 3595 3596 for (mm = mmaps->elts; mm < end; mm++) { 3597 munmap(mm->hdr, PORT_MMAP_SIZE); 3598 } 3599 3600 free(mmaps->elts); 3601 } 3602 3603 pthread_mutex_destroy(&mmaps->mutex); 3604 } 3605 3606 3607 static nxt_port_mmap_header_t * 3608 nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 3609 uint32_t id) 3610 { 3611 nxt_port_mmap_header_t *hdr; 3612 3613 if (nxt_fast_path(process->incoming.size > id)) { 3614 hdr = process->incoming.elts[id].hdr; 3615 3616 } else { 3617 hdr = NULL; 3618 } 3619 3620 return hdr; 3621 } 3622 3623 3624 static int 3625 nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 3626 { 3627 int rc; 3628 nxt_chunk_id_t c; 3629 nxt_unit_process_t *process; 3630 nxt_port_mmap_header_t *hdr; 3631 nxt_port_mmap_tracking_msg_t *tracking_msg; 3632 3633 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 3634 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", 3635 recv_msg->stream, (int) recv_msg->size); 3636 3637 return 0; 3638 } 3639 3640 tracking_msg = recv_msg->start; 3641 3642 recv_msg->start = tracking_msg + 1; 3643 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); 3644 3645 process = nxt_unit_msg_get_process(ctx, recv_msg); 3646 if (nxt_slow_path(process == NULL)) { 3647 return 0; 3648 } 3649 3650 pthread_mutex_lock(&process->incoming.mutex); 3651 3652 hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id); 3653 if (nxt_slow_path(hdr == NULL)) { 3654 pthread_mutex_unlock(&process->incoming.mutex); 3655 3656 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " 3657 "invalid mmap id %d,%"PRIu32, 3658 recv_msg->stream, (int) process->pid, 3659 tracking_msg->mmap_id); 3660 3661 return 0; 3662 } 3663 3664 c = tracking_msg->tracking_id; 3665 rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); 3666 3667 if (rc == 0) { 3668 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", 3669 recv_msg->stream); 3670 3671 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 3672 } 3673 3674 pthread_mutex_unlock(&process->incoming.mutex); 3675 3676 return rc; 3677 } 3678 3679 3680 static int 3681 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 3682 { 3683 void *start; 3684 uint32_t size; 3685 nxt_unit_process_t *process; 3686 nxt_unit_mmap_buf_t *b, **incoming_tail; 3687 nxt_port_mmap_msg_t *mmap_msg, *end; 3688 nxt_port_mmap_header_t *hdr; 3689 3690 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 3691 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", 3692 recv_msg->stream, (int) recv_msg->size); 3693 3694 return NXT_UNIT_ERROR; 3695 } 3696 3697 process = nxt_unit_msg_get_process(ctx, recv_msg); 3698 if (nxt_slow_path(process == NULL)) { 3699 return NXT_UNIT_ERROR; 3700 } 3701 3702 mmap_msg = recv_msg->start; 3703 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 3704 3705 incoming_tail = &recv_msg->incoming_buf; 3706 3707 for (; mmap_msg < end; mmap_msg++) { 3708 b = nxt_unit_mmap_buf_get(ctx); 3709 if (nxt_slow_path(b == NULL)) { 3710 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", 3711 recv_msg->stream); 3712 3713 return NXT_UNIT_ERROR; 3714 } 3715 3716 nxt_unit_mmap_buf_insert(incoming_tail, b); 3717 incoming_tail = &b->next; 3718 } 3719 3720 b = recv_msg->incoming_buf; 3721 mmap_msg = recv_msg->start; 3722 3723 pthread_mutex_lock(&process->incoming.mutex); 3724 3725 for (; mmap_msg < end; mmap_msg++) { 3726 hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id); 3727 if (nxt_slow_path(hdr == NULL)) { 3728 pthread_mutex_unlock(&process->incoming.mutex); 3729 3730 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " 3731 "invalid mmap id %d,%"PRIu32, 3732 recv_msg->stream, (int) process->pid, 3733 mmap_msg->mmap_id); 3734 3735 return NXT_UNIT_ERROR; 3736 } 3737 3738 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 3739 size = mmap_msg->size; 3740 3741 if (recv_msg->start == mmap_msg) { 3742 recv_msg->start = start; 3743 recv_msg->size = size; 3744 } 3745 3746 b->buf.start = start; 3747 b->buf.free = start; 3748 b->buf.end = b->buf.start + size; 3749 b->hdr = hdr; 3750 b->process = process; 3751 3752 b = b->next; 3753 3754 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", 3755 recv_msg->stream, 3756 start, (int) size, 3757 (int) hdr->src_pid, (int) hdr->dst_pid, 3758 (int) hdr->id, (int) mmap_msg->chunk_id, 3759 (int) mmap_msg->size); 3760 } 3761 3762 pthread_mutex_unlock(&process->incoming.mutex); 3763 3764 return NXT_UNIT_OK; 3765 } 3766 3767 3768 static void 3769 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 3770 nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, 3771 void *start, uint32_t size) 3772 { 3773 int freed_chunks; 3774 u_char *p, *end; 3775 nxt_chunk_id_t c; 3776 nxt_unit_impl_t *lib; 3777 3778 memset(start, 0xA5, size); 3779 3780 p = start; 3781 end = p + size; 3782 c = nxt_port_mmap_chunk_id(hdr, p); 3783 freed_chunks = 0; 3784 3785 while (p < end) { 3786 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 3787 3788 p += PORT_MMAP_CHUNK_SIZE; 3789 c++; 3790 freed_chunks++; 3791 } 3792 3793 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3794 3795 if (hdr->src_pid == lib->pid && freed_chunks != 0) { 3796 nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, 3797 -freed_chunks); 3798 3799 nxt_unit_debug(ctx, "process %d allocated_chunks %d", 3800 process->pid, 3801 (int) process->outgoing.allocated_chunks); 3802 } 3803 3804 if (hdr->dst_pid == lib->pid 3805 && freed_chunks != 0 3806 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) 3807 { 3808 nxt_unit_send_shm_ack(ctx, hdr->src_pid); 3809 } 3810 } 3811 3812 3813 static int 3814 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) 3815 { 3816 ssize_t res; 3817 nxt_port_msg_t msg; 3818 nxt_unit_impl_t *lib; 3819 3820 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3821 3822 msg.stream = 0; 3823 msg.pid = lib->pid; 3824 msg.reply_port = 0; 3825 msg.type = _NXT_PORT_MSG_SHM_ACK; 3826 msg.last = 0; 3827 msg.mmap = 0; 3828 msg.nf = 0; 3829 msg.mf = 0; 3830 msg.tracking = 0; 3831 3832 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); 3833 if (nxt_slow_path(res != sizeof(msg))) { 3834 return NXT_UNIT_ERROR; 3835 } 3836 3837 return NXT_UNIT_OK; 3838 } 3839 3840 3841 static nxt_int_t 3842 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 3843 { 3844 nxt_process_t *process; 3845 3846 process = data; 3847 3848 if (lhq->key.length == sizeof(pid_t) 3849 && *(pid_t *) lhq->key.start == process->pid) 3850 { 3851 return NXT_OK; 3852 } 3853 3854 return NXT_DECLINED; 3855 } 3856 3857 3858 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 3859 NXT_LVLHSH_DEFAULT, 3860 nxt_unit_lvlhsh_pid_test, 3861 nxt_lvlhsh_alloc, 3862 nxt_lvlhsh_free, 3863 }; 3864 3865 3866 static inline void 3867 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) 3868 { 3869 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 3870 lhq->key.length = sizeof(*pid); 3871 lhq->key.start = (u_char *) pid; 3872 lhq->proto = &lvlhsh_processes_proto; 3873 } 3874 3875 3876 static nxt_unit_process_t * 3877 nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) 3878 { 3879 nxt_unit_process_t *process; 3880 nxt_lvlhsh_query_t lhq; 3881 3882 nxt_unit_process_lhq_pid(&lhq, &pid); 3883 3884 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { 3885 process = lhq.value; 3886 nxt_unit_process_use(process); 3887 3888 return process; 3889 } 3890 3891 process = malloc(sizeof(nxt_unit_process_t)); 3892 if (nxt_slow_path(process == NULL)) { 3893 nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid); 3894 3895 return NULL; 3896 } 3897 3898 process->pid = pid; 3899 process->use_count = 1; 3900 process->next_port_id = 0; 3901 process->lib = lib; 3902 3903 nxt_queue_init(&process->ports); 3904 3905 nxt_unit_mmaps_init(&process->incoming); 3906 nxt_unit_mmaps_init(&process->outgoing); 3907 3908 lhq.replace = 0; 3909 lhq.value = process; 3910 3911 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) { 3912 3913 case NXT_OK: 3914 break; 3915 3916 default: 3917 nxt_unit_alert(NULL, "process %d insert failed", (int) pid); 3918 3919 pthread_mutex_destroy(&process->outgoing.mutex); 3920 pthread_mutex_destroy(&process->incoming.mutex); 3921 free(process); 3922 process = NULL; 3923 break; 3924 } 3925 3926 nxt_unit_process_use(process); 3927 3928 return process; 3929 } 3930 3931 3932 static nxt_unit_process_t * 3933 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) 3934 { 3935 int rc; 3936 nxt_lvlhsh_query_t lhq; 3937 3938 nxt_unit_process_lhq_pid(&lhq, &pid); 3939 3940 if (remove) { 3941 rc = nxt_lvlhsh_delete(&lib->processes, &lhq); 3942 3943 } else { 3944 rc = nxt_lvlhsh_find(&lib->processes, &lhq); 3945 } 3946 3947 if (rc == NXT_OK) { 3948 if (!remove) { 3949 nxt_unit_process_use(lhq.value); 3950 } 3951 3952 return lhq.value; 3953 } 3954 3955 return NULL; 3956 } 3957 3958 3959 static nxt_unit_process_t * 3960 nxt_unit_process_pop_first(nxt_unit_impl_t *lib) 3961 { 3962 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL); 3963 } 3964 3965 3966 int 3967 nxt_unit_run(nxt_unit_ctx_t *ctx) 3968 { 3969 int rc; 3970 nxt_unit_impl_t *lib; 3971 nxt_unit_ctx_impl_t *ctx_impl; 3972 3973 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3974 3975 nxt_unit_ctx_use(ctx_impl); 3976 3977 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3978 rc = NXT_UNIT_OK; 3979 3980 while (nxt_fast_path(lib->online)) { 3981 rc = nxt_unit_run_once(ctx); 3982 3983 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3984 break; 3985 } 3986 } 3987 3988 nxt_unit_ctx_release(ctx_impl); 3989 3990 return rc; 3991 } 3992 3993 3994 int 3995 nxt_unit_run_once(nxt_unit_ctx_t *ctx) 3996 { 3997 int rc; 3998 nxt_unit_ctx_impl_t *ctx_impl; 3999 nxt_unit_read_buf_t *rbuf; 4000 4001 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4002 4003 nxt_unit_ctx_use(ctx_impl); 4004 4005 pthread_mutex_lock(&ctx_impl->mutex); 4006 4007 if (ctx_impl->pending_read_head != NULL) { 4008 rbuf = ctx_impl->pending_read_head; 4009 ctx_impl->pending_read_head = rbuf->next; 4010 4011 if (ctx_impl->pending_read_tail == &rbuf->next) { 4012 ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; 4013 } 4014 4015 pthread_mutex_unlock(&ctx_impl->mutex); 4016 4017 } else { 4018 rbuf = nxt_unit_read_buf_get_impl(ctx_impl); 4019 if (nxt_slow_path(rbuf == NULL)) { 4020 4021 nxt_unit_ctx_release(ctx_impl); 4022 4023 return NXT_UNIT_ERROR; 4024 } 4025 4026 nxt_unit_read_buf(ctx, rbuf); 4027 } 4028 4029 if (nxt_fast_path(rbuf->size > 0)) { 4030 rc = nxt_unit_process_msg(ctx, 4031 rbuf->buf, rbuf->size, 4032 rbuf->oob, sizeof(rbuf->oob)); 4033 4034 #if (NXT_DEBUG) 4035 memset(rbuf->buf, 0xAC, rbuf->size); 4036 #endif 4037 4038 } else { 4039 rc = NXT_UNIT_ERROR; 4040 } 4041 4042 nxt_unit_read_buf_release(ctx, rbuf); 4043 4044 nxt_unit_ctx_release(ctx_impl); 4045 4046 return rc; 4047 } 4048 4049 4050 static void 4051 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) 4052 { 4053 nxt_unit_ctx_impl_t *ctx_impl; 4054 4055 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4056 4057 memset(rbuf->oob, 0, sizeof(struct cmsghdr)); 4058 4059 rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port, 4060 rbuf->buf, sizeof(rbuf->buf), 4061 rbuf->oob, sizeof(rbuf->oob)); 4062 } 4063 4064 4065 void 4066 nxt_unit_done(nxt_unit_ctx_t *ctx) 4067 { 4068 nxt_unit_ctx_impl_t *ctx_impl; 4069 4070 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4071 4072 nxt_unit_ctx_release(ctx_impl); 4073 } 4074 4075 4076 nxt_unit_ctx_t * 4077 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) 4078 { 4079 int rc; 4080 nxt_unit_impl_t *lib; 4081 nxt_unit_port_t *port; 4082 nxt_unit_ctx_impl_t *new_ctx; 4083 4084 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4085 4086 new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size); 4087 if (nxt_slow_path(new_ctx == NULL)) { 4088 nxt_unit_alert(ctx, "failed to allocate context"); 4089 4090 return NULL; 4091 } 4092 4093 port = nxt_unit_create_port(ctx); 4094 if (nxt_slow_path(port == NULL)) { 4095 free(new_ctx); 4096 4097 return NULL; 4098 } 4099 4100 rc = nxt_unit_send_port(ctx, lib->router_port, port); 4101 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4102 goto fail; 4103 } 4104 4105 rc = nxt_unit_ctx_init(lib, new_ctx, data); 4106 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4107 goto fail; 4108 } 4109 4110 new_ctx->read_port = port; 4111 4112 return &new_ctx->ctx; 4113 4114 fail: 4115 4116 nxt_unit_remove_port(lib, &port->id); 4117 nxt_unit_port_release(port); 4118 4119 free(new_ctx); 4120 4121 return NULL; 4122 } 4123 4124 4125 static void 4126 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) 4127 { 4128 nxt_unit_impl_t *lib; 4129 nxt_unit_mmap_buf_t *mmap_buf; 4130 nxt_unit_request_info_impl_t *req_impl; 4131 nxt_unit_websocket_frame_impl_t *ws_impl; 4132 4133 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); 4134 4135 nxt_queue_each(req_impl, &ctx_impl->active_req, 4136 nxt_unit_request_info_impl_t, link) 4137 { 4138 nxt_unit_req_warn(&req_impl->req, "active request on ctx free"); 4139 4140 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR); 4141 4142 } nxt_queue_loop; 4143 4144 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]); 4145 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]); 4146 4147 while (ctx_impl->free_buf != NULL) { 4148 mmap_buf = ctx_impl->free_buf; 4149 nxt_unit_mmap_buf_unlink(mmap_buf); 4150 free(mmap_buf); 4151 } 4152 4153 nxt_queue_each(req_impl, &ctx_impl->free_req, 4154 nxt_unit_request_info_impl_t, link) 4155 { 4156 nxt_unit_request_info_free(req_impl); 4157 4158 } nxt_queue_loop; 4159 4160 nxt_queue_each(ws_impl, &ctx_impl->free_ws, 4161 nxt_unit_websocket_frame_impl_t, link) 4162 { 4163 nxt_unit_websocket_frame_free(ws_impl); 4164 4165 } nxt_queue_loop; 4166 4167 pthread_mutex_destroy(&ctx_impl->mutex); 4168 4169 nxt_queue_remove(&ctx_impl->link); 4170 4171 if (nxt_fast_path(ctx_impl->read_port != NULL)) { 4172 nxt_unit_port_release(ctx_impl->read_port); 4173 } 4174 4175 if (ctx_impl != &lib->main_ctx) { 4176 free(ctx_impl); 4177 } 4178 4179 nxt_unit_lib_release(lib); 4180 } 4181 4182 4183 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */ 4184 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET) 4185 #define NXT_UNIX_SOCKET SOCK_SEQPACKET 4186 #else 4187 #define NXT_UNIX_SOCKET SOCK_DGRAM 4188 #endif 4189 4190 4191 void 4192 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) 4193 { 4194 nxt_unit_port_hash_id_t port_hash_id; 4195 4196 port_hash_id.pid = pid; 4197 port_hash_id.id = id; 4198 4199 port_id->pid = pid; 4200 port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id)); 4201 port_id->id = id; 4202 } 4203 4204 4205 static nxt_unit_port_t * 4206 nxt_unit_create_port(nxt_unit_ctx_t *ctx) 4207 { 4208 int rc, port_sockets[2]; 4209 nxt_unit_impl_t *lib; 4210 nxt_unit_port_t new_port, *port; 4211 nxt_unit_process_t *process; 4212 4213 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4214 4215 rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets); 4216 if (nxt_slow_path(rc != 0)) { 4217 nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", 4218 strerror(errno), errno); 4219 4220 return NULL; 4221 } 4222 4223 nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", 4224 port_sockets[0], port_sockets[1]); 4225 4226 pthread_mutex_lock(&lib->mutex); 4227 4228 process = nxt_unit_process_get(lib, lib->pid); 4229 if (nxt_slow_path(process == NULL)) { 4230 pthread_mutex_unlock(&lib->mutex); 4231 4232 close(port_sockets[0]); 4233 close(port_sockets[1]); 4234 4235 return NULL; 4236 } 4237 4238 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); 4239 4240 new_port.in_fd = port_sockets[0]; 4241 new_port.out_fd = port_sockets[1]; 4242 new_port.data = NULL; 4243 4244 pthread_mutex_unlock(&lib->mutex); 4245 4246 nxt_unit_process_release(process); 4247 4248 port = nxt_unit_add_port(ctx, &new_port); 4249 if (nxt_slow_path(port == NULL)) { 4250 nxt_unit_alert(ctx, "create_port: add_port() failed"); 4251 4252 close(port_sockets[0]); 4253 close(port_sockets[1]); 4254 } 4255 4256 return port; 4257 } 4258 4259 4260 static int 4261 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, 4262 nxt_unit_port_t *port) 4263 { 4264 ssize_t res; 4265 nxt_unit_impl_t *lib; 4266 4267 struct { 4268 nxt_port_msg_t msg; 4269 nxt_port_msg_new_port_t new_port; 4270 } m; 4271 4272 union { 4273 struct cmsghdr cm; 4274 char space[CMSG_SPACE(sizeof(int))]; 4275 } cmsg; 4276 4277 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4278 4279 m.msg.stream = 0; 4280 m.msg.pid = lib->pid; 4281 m.msg.reply_port = 0; 4282 m.msg.type = _NXT_PORT_MSG_NEW_PORT; 4283 m.msg.last = 0; 4284 m.msg.mmap = 0; 4285 m.msg.nf = 0; 4286 m.msg.mf = 0; 4287 m.msg.tracking = 0; 4288 4289 m.new_port.id = port->id.id; 4290 m.new_port.pid = port->id.pid; 4291 m.new_port.type = NXT_PROCESS_APP; 4292 m.new_port.max_size = 16 * 1024; 4293 m.new_port.max_share = 64 * 1024; 4294 4295 memset(&cmsg, 0, sizeof(cmsg)); 4296 4297 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 4298 cmsg.cm.cmsg_level = SOL_SOCKET; 4299 cmsg.cm.cmsg_type = SCM_RIGHTS; 4300 4301 /* 4302 * memcpy() is used instead of simple 4303 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 4304 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 4305 * dereferencing type-punned pointer will break strict-aliasing rules 4306 * 4307 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 4308 * in the same simple assignment as in the code above. 4309 */ 4310 memcpy(CMSG_DATA(&cmsg.cm), &port->out_fd, sizeof(int)); 4311 4312 res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); 4313 4314 return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR; 4315 } 4316 4317 4318 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port) 4319 { 4320 nxt_unit_port_impl_t *port_impl; 4321 4322 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 4323 4324 nxt_atomic_fetch_add(&port_impl->use_count, 1); 4325 } 4326 4327 4328 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) 4329 { 4330 long c; 4331 nxt_unit_port_impl_t *port_impl; 4332 4333 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 4334 4335 c = nxt_atomic_fetch_add(&port_impl->use_count, -1); 4336 4337 if (c == 1) { 4338 nxt_unit_debug(NULL, "destroy port %d,%d", 4339 (int) port->id.pid, (int) port->id.id); 4340 4341 nxt_unit_process_release(port_impl->process); 4342 4343 if (port->in_fd != -1) { 4344 close(port->in_fd); 4345 4346 port->in_fd = -1; 4347 } 4348 4349 if (port->out_fd != -1) { 4350 close(port->out_fd); 4351 4352 port->out_fd = -1; 4353 } 4354 4355 free(port_impl); 4356 } 4357 } 4358 4359 4360 nxt_inline nxt_unit_process_t * 4361 nxt_unit_port_process(nxt_unit_port_t *port) 4362 { 4363 nxt_unit_port_impl_t *port_impl; 4364 4365 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 4366 4367 return port_impl->process; 4368 } 4369 4370 4371 static nxt_unit_port_t * 4372 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 4373 { 4374 int rc; 4375 nxt_unit_impl_t *lib; 4376 nxt_unit_port_t *old_port; 4377 nxt_unit_process_t *process; 4378 nxt_unit_port_impl_t *new_port; 4379 4380 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4381 4382 pthread_mutex_lock(&lib->mutex); 4383 4384 old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); 4385 4386 if (nxt_slow_path(old_port != NULL)) { 4387 nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d", 4388 port->id.pid, port->id.id, 4389 port->in_fd, port->out_fd); 4390 4391 if (old_port->data == NULL) { 4392 old_port->data = port->data; 4393 port->data = NULL; 4394 } 4395 4396 if (old_port->in_fd == -1) { 4397 old_port->in_fd = port->in_fd; 4398 port->in_fd = -1; 4399 } 4400 4401 if (port->in_fd != -1) { 4402 close(port->in_fd); 4403 port->in_fd = -1; 4404 } 4405 4406 if (old_port->out_fd == -1) { 4407 old_port->out_fd = port->out_fd; 4408 port->out_fd = -1; 4409 } 4410 4411 if (port->out_fd != -1) { 4412 close(port->out_fd); 4413 port->out_fd = -1; 4414 } 4415 4416 *port = *old_port; 4417 4418 pthread_mutex_unlock(&lib->mutex); 4419 4420 if (lib->callbacks.add_port != NULL 4421 && (port->in_fd != -1 || port->out_fd != -1)) 4422 { 4423 lib->callbacks.add_port(ctx, old_port); 4424 } 4425 4426 return old_port; 4427 } 4428 4429 new_port = NULL; 4430 4431 nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", 4432 port->id.pid, port->id.id, 4433 port->in_fd, port->out_fd); 4434 4435 process = nxt_unit_process_get(lib, port->id.pid); 4436 if (nxt_slow_path(process == NULL)) { 4437 goto unlock; 4438 } 4439 4440 if (port->id.id >= process->next_port_id) { 4441 process->next_port_id = port->id.id + 1; 4442 } 4443 4444 new_port = malloc(sizeof(nxt_unit_port_impl_t)); 4445 if (nxt_slow_path(new_port == NULL)) { 4446 goto unlock; 4447 } 4448 4449 new_port->port = *port; 4450 4451 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port); 4452 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4453 nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed", 4454 port->id.pid, port->id.id); 4455 4456 free(new_port); 4457 4458 new_port = NULL; 4459 4460 goto unlock; 4461 } 4462 4463 nxt_queue_insert_tail(&process->ports, &new_port->link); 4464 4465 new_port->use_count = 2; 4466 new_port->process = process; 4467 4468 process = NULL; 4469 4470 unlock: 4471 4472 pthread_mutex_unlock(&lib->mutex); 4473 4474 if (nxt_slow_path(process != NULL)) { 4475 nxt_unit_process_release(process); 4476 } 4477 4478 if (lib->callbacks.add_port != NULL 4479 && new_port != NULL 4480 && (port->in_fd != -1 || port->out_fd != -1)) 4481 { 4482 lib->callbacks.add_port(ctx, &new_port->port); 4483 } 4484 4485 return &new_port->port; 4486 } 4487 4488 4489 static void 4490 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) 4491 { 4492 nxt_unit_port_t *port; 4493 nxt_unit_port_impl_t *port_impl; 4494 4495 pthread_mutex_lock(&lib->mutex); 4496 4497 port = nxt_unit_remove_port_unsafe(lib, port_id); 4498 4499 if (nxt_fast_path(port != NULL)) { 4500 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 4501 4502 nxt_queue_remove(&port_impl->link); 4503 } 4504 4505 pthread_mutex_unlock(&lib->mutex); 4506 4507 if (lib->callbacks.remove_port != NULL && port != NULL) { 4508 lib->callbacks.remove_port(&lib->unit, port); 4509 } 4510 4511 if (nxt_fast_path(port != NULL)) { 4512 nxt_unit_port_release(port); 4513 } 4514 } 4515 4516 4517 static nxt_unit_port_t * 4518 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) 4519 { 4520 nxt_unit_port_t *port; 4521 4522 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); 4523 if (nxt_slow_path(port == NULL)) { 4524 nxt_unit_debug(NULL, "remove_port: port %d,%d not found", 4525 (int) port_id->pid, (int) port_id->id); 4526 4527 return NULL; 4528 } 4529 4530 nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p", 4531 (int) port_id->pid, (int) port_id->id, 4532 port->in_fd, port->out_fd, port->data); 4533 4534 return port; 4535 } 4536 4537 4538 static void 4539 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid) 4540 { 4541 nxt_unit_process_t *process; 4542 4543 pthread_mutex_lock(&lib->mutex); 4544 4545 process = nxt_unit_process_find(lib, pid, 1); 4546 if (nxt_slow_path(process == NULL)) { 4547 nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid); 4548 4549 pthread_mutex_unlock(&lib->mutex); 4550 4551 return; 4552 } 4553 4554 nxt_unit_remove_process(lib, process); 4555 4556 if (lib->callbacks.remove_pid != NULL) { 4557 lib->callbacks.remove_pid(&lib->unit, pid); 4558 } 4559 } 4560 4561 4562 static void 4563 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) 4564 { 4565 nxt_queue_t ports; 4566 nxt_unit_port_impl_t *port; 4567 4568 nxt_queue_init(&ports); 4569 4570 nxt_queue_add(&ports, &process->ports); 4571 4572 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 4573 4574 nxt_unit_remove_port_unsafe(lib, &port->port.id); 4575 4576 } nxt_queue_loop; 4577 4578 pthread_mutex_unlock(&lib->mutex); 4579 4580 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 4581 4582 nxt_queue_remove(&port->link); 4583 4584 if (lib->callbacks.remove_port != NULL) { 4585 lib->callbacks.remove_port(&lib->unit, &port->port); 4586 } 4587 4588 nxt_unit_port_release(&port->port); 4589 4590 } nxt_queue_loop; 4591 4592 nxt_unit_process_release(process); 4593 } 4594 4595 4596 static void 4597 nxt_unit_quit(nxt_unit_ctx_t *ctx) 4598 { 4599 nxt_unit_impl_t *lib; 4600 4601 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4602 4603 lib->online = 0; 4604 4605 if (lib->callbacks.quit != NULL) { 4606 lib->callbacks.quit(ctx); 4607 } 4608 } 4609 4610 4611 static ssize_t 4612 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 4613 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 4614 { 4615 nxt_unit_impl_t *lib; 4616 4617 nxt_unit_debug(ctx, "port_send: port %d,%d fd %d", 4618 (int) port->id.pid, (int) port->id.id, port->out_fd); 4619 4620 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4621 4622 if (lib->callbacks.port_send != NULL) { 4623 return lib->callbacks.port_send(ctx, port, buf, buf_size, 4624 oob, oob_size); 4625 } 4626 4627 return nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, 4628 oob, oob_size); 4629 } 4630 4631 4632 static ssize_t 4633 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 4634 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 4635 { 4636 ssize_t res; 4637 struct iovec iov[1]; 4638 struct msghdr msg; 4639 4640 iov[0].iov_base = (void *) buf; 4641 iov[0].iov_len = buf_size; 4642 4643 msg.msg_name = NULL; 4644 msg.msg_namelen = 0; 4645 msg.msg_iov = iov; 4646 msg.msg_iovlen = 1; 4647 msg.msg_flags = 0; 4648 msg.msg_control = (void *) oob; 4649 msg.msg_controllen = oob_size; 4650 4651 retry: 4652 4653 res = sendmsg(fd, &msg, 0); 4654 4655 if (nxt_slow_path(res == -1)) { 4656 if (errno == EINTR) { 4657 goto retry; 4658 } 4659 4660 /* 4661 * FIXME: This should be "alert" after router graceful shutdown 4662 * implementation. 4663 */ 4664 nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)", 4665 fd, (int) buf_size, strerror(errno), errno); 4666 4667 } else { 4668 nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, 4669 (int) res); 4670 } 4671 4672 return res; 4673 } 4674 4675 4676 static ssize_t 4677 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 4678 void *buf, size_t buf_size, void *oob, size_t oob_size) 4679 { 4680 int fd; 4681 ssize_t res; 4682 struct iovec iov[1]; 4683 struct msghdr msg; 4684 nxt_unit_impl_t *lib; 4685 4686 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4687 4688 if (lib->callbacks.port_recv != NULL) { 4689 return lib->callbacks.port_recv(ctx, port, 4690 buf, buf_size, oob, oob_size); 4691 } 4692 4693 iov[0].iov_base = buf; 4694 iov[0].iov_len = buf_size; 4695 4696 msg.msg_name = NULL; 4697 msg.msg_namelen = 0; 4698 msg.msg_iov = iov; 4699 msg.msg_iovlen = 1; 4700 msg.msg_flags = 0; 4701 msg.msg_control = oob; 4702 msg.msg_controllen = oob_size; 4703 4704 fd = port->in_fd; 4705 4706 retry: 4707 4708 res = recvmsg(fd, &msg, 0); 4709 4710 if (nxt_slow_path(res == -1)) { 4711 if (errno == EINTR) { 4712 goto retry; 4713 } 4714 4715 nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", 4716 fd, strerror(errno), errno); 4717 4718 } else { 4719 nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res); 4720 } 4721 4722 return res; 4723 } 4724 4725 4726 static nxt_int_t 4727 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 4728 { 4729 nxt_unit_port_t *port; 4730 nxt_unit_port_hash_id_t *port_id; 4731 4732 port = data; 4733 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start; 4734 4735 if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t) 4736 && port_id->pid == port->id.pid 4737 && port_id->id == port->id.id) 4738 { 4739 return NXT_OK; 4740 } 4741 4742 return NXT_DECLINED; 4743 } 4744 4745 4746 static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = { 4747 NXT_LVLHSH_DEFAULT, 4748 nxt_unit_port_hash_test, 4749 nxt_lvlhsh_alloc, 4750 nxt_lvlhsh_free, 4751 }; 4752 4753 4754 static inline void 4755 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq, 4756 nxt_unit_port_hash_id_t *port_hash_id, 4757 nxt_unit_port_id_t *port_id) 4758 { 4759 port_hash_id->pid = port_id->pid; 4760 port_hash_id->id = port_id->id; 4761 4762 if (nxt_fast_path(port_id->hash != 0)) { 4763 lhq->key_hash = port_id->hash; 4764 4765 } else { 4766 lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id)); 4767 4768 port_id->hash = lhq->key_hash; 4769 4770 nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X", 4771 (int) port_id->pid, (int) port_id->id, 4772 (int) port_id->hash); 4773 } 4774 4775 lhq->key.length = sizeof(nxt_unit_port_hash_id_t); 4776 lhq->key.start = (u_char *) port_hash_id; 4777 lhq->proto = &lvlhsh_ports_proto; 4778 lhq->pool = NULL; 4779 } 4780 4781 4782 static int 4783 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port) 4784 { 4785 nxt_int_t res; 4786 nxt_lvlhsh_query_t lhq; 4787 nxt_unit_port_hash_id_t port_hash_id; 4788 4789 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id); 4790 lhq.replace = 0; 4791 lhq.value = port; 4792 4793 res = nxt_lvlhsh_insert(port_hash, &lhq); 4794 4795 switch (res) { 4796 4797 case NXT_OK: 4798 return NXT_UNIT_OK; 4799 4800 default: 4801 return NXT_UNIT_ERROR; 4802 } 4803 } 4804 4805 4806 static nxt_unit_port_t * 4807 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, 4808 int remove) 4809 { 4810 nxt_int_t res; 4811 nxt_lvlhsh_query_t lhq; 4812 nxt_unit_port_hash_id_t port_hash_id; 4813 4814 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id); 4815 4816 if (remove) { 4817 res = nxt_lvlhsh_delete(port_hash, &lhq); 4818 4819 } else { 4820 res = nxt_lvlhsh_find(port_hash, &lhq); 4821 } 4822 4823 switch (res) { 4824 4825 case NXT_OK: 4826 if (!remove) { 4827 nxt_unit_port_use(lhq.value); 4828 } 4829 4830 return lhq.value; 4831 4832 default: 4833 return NULL; 4834 } 4835 } 4836 4837 4838 static nxt_int_t 4839 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 4840 { 4841 return NXT_OK; 4842 } 4843 4844 4845 static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { 4846 NXT_LVLHSH_DEFAULT, 4847 nxt_unit_request_hash_test, 4848 nxt_lvlhsh_alloc, 4849 nxt_lvlhsh_free, 4850 }; 4851 4852 4853 static int 4854 nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, 4855 nxt_unit_request_info_impl_t *req_impl) 4856 { 4857 uint32_t *stream; 4858 nxt_int_t res; 4859 nxt_lvlhsh_query_t lhq; 4860 4861 stream = &req_impl->stream; 4862 4863 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); 4864 lhq.key.length = sizeof(*stream); 4865 lhq.key.start = (u_char *) stream; 4866 lhq.proto = &lvlhsh_requests_proto; 4867 lhq.pool = NULL; 4868 lhq.replace = 0; 4869 lhq.value = req_impl; 4870 4871 res = nxt_lvlhsh_insert(request_hash, &lhq); 4872 4873 switch (res) { 4874 4875 case NXT_OK: 4876 return NXT_UNIT_OK; 4877 4878 default: 4879 return NXT_UNIT_ERROR; 4880 } 4881 } 4882 4883 4884 static nxt_unit_request_info_impl_t * 4885 nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, 4886 int remove) 4887 { 4888 nxt_int_t res; 4889 nxt_lvlhsh_query_t lhq; 4890 4891 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); 4892 lhq.key.length = sizeof(stream); 4893 lhq.key.start = (u_char *) &stream; 4894 lhq.proto = &lvlhsh_requests_proto; 4895 lhq.pool = NULL; 4896 4897 if (remove) { 4898 res = nxt_lvlhsh_delete(request_hash, &lhq); 4899 4900 } else { 4901 res = nxt_lvlhsh_find(request_hash, &lhq); 4902 } 4903 4904 switch (res) { 4905 4906 case NXT_OK: 4907 return lhq.value; 4908 4909 default: 4910 return NULL; 4911 } 4912 } 4913 4914 4915 void 4916 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) 4917 { 4918 int log_fd, n; 4919 char msg[NXT_MAX_ERROR_STR], *p, *end; 4920 pid_t pid; 4921 va_list ap; 4922 nxt_unit_impl_t *lib; 4923 4924 if (nxt_fast_path(ctx != NULL)) { 4925 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4926 4927 pid = lib->pid; 4928 log_fd = lib->log_fd; 4929 4930 } else { 4931 pid = getpid(); 4932 log_fd = STDERR_FILENO; 4933 } 4934 4935 p = msg; 4936 end = p + sizeof(msg) - 1; 4937 4938 p = nxt_unit_snprint_prefix(p, end, pid, level); 4939 4940 va_start(ap, fmt); 4941 p += vsnprintf(p, end - p, fmt, ap); 4942 va_end(ap); 4943 4944 if (nxt_slow_path(p > end)) { 4945 memcpy(end - 5, "[...]", 5); 4946 p = end; 4947 } 4948 4949 *p++ = '\n'; 4950 4951 n = write(log_fd, msg, p - msg); 4952 if (nxt_slow_path(n < 0)) { 4953 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 4954 } 4955 } 4956 4957 4958 void 4959 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) 4960 { 4961 int log_fd, n; 4962 char msg[NXT_MAX_ERROR_STR], *p, *end; 4963 pid_t pid; 4964 va_list ap; 4965 nxt_unit_impl_t *lib; 4966 nxt_unit_request_info_impl_t *req_impl; 4967 4968 if (nxt_fast_path(req != NULL)) { 4969 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 4970 4971 pid = lib->pid; 4972 log_fd = lib->log_fd; 4973 4974 } else { 4975 pid = getpid(); 4976 log_fd = STDERR_FILENO; 4977 } 4978 4979 p = msg; 4980 end = p + sizeof(msg) - 1; 4981 4982 p = nxt_unit_snprint_prefix(p, end, pid, level); 4983 4984 if (nxt_fast_path(req != NULL)) { 4985 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 4986 4987 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream); 4988 } 4989 4990 va_start(ap, fmt); 4991 p += vsnprintf(p, end - p, fmt, ap); 4992 va_end(ap); 4993 4994 if (nxt_slow_path(p > end)) { 4995 memcpy(end - 5, "[...]", 5); 4996 p = end; 4997 } 4998 4999 *p++ = '\n'; 5000 5001 n = write(log_fd, msg, p - msg); 5002 if (nxt_slow_path(n < 0)) { 5003 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 5004 } 5005 } 5006 5007 5008 static const char * nxt_unit_log_levels[] = { 5009 "alert", 5010 "error", 5011 "warn", 5012 "notice", 5013 "info", 5014 "debug", 5015 }; 5016 5017 5018 static char * 5019 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level) 5020 { 5021 struct tm tm; 5022 struct timespec ts; 5023 5024 (void) clock_gettime(CLOCK_REALTIME, &ts); 5025 5026 #if (NXT_HAVE_LOCALTIME_R) 5027 (void) localtime_r(&ts.tv_sec, &tm); 5028 #else 5029 tm = *localtime(&ts.tv_sec); 5030 #endif 5031 5032 #if (NXT_DEBUG) 5033 p += snprintf(p, end - p, 5034 "%4d/%02d/%02d %02d:%02d:%02d.%03d ", 5035 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 5036 tm.tm_hour, tm.tm_min, tm.tm_sec, 5037 (int) ts.tv_nsec / 1000000); 5038 #else 5039 p += snprintf(p, end - p, 5040 "%4d/%02d/%02d %02d:%02d:%02d ", 5041 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 5042 tm.tm_hour, tm.tm_min, tm.tm_sec); 5043 #endif 5044 5045 p += snprintf(p, end - p, 5046 "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level], 5047 (int) pid, 5048 (uint64_t) (uintptr_t) nxt_thread_get_tid()); 5049 5050 return p; 5051 } 5052 5053 5054 /* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */ 5055 5056 void * 5057 nxt_memalign(size_t alignment, size_t size) 5058 { 5059 void *p; 5060 nxt_err_t err; 5061 5062 err = posix_memalign(&p, alignment, size); 5063 5064 if (nxt_fast_path(err == 0)) { 5065 return p; 5066 } 5067 5068 return NULL; 5069 } 5070 5071 #if (NXT_DEBUG) 5072 5073 void 5074 nxt_free(void *p) 5075 { 5076 free(p); 5077 } 5078 5079 #endif 5080