1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include <stdlib.h> 7 8 #include "nxt_main.h" 9 #include "nxt_port_memory_int.h" 10 11 #include "nxt_unit.h" 12 #include "nxt_unit_request.h" 13 #include "nxt_unit_response.h" 14 #include "nxt_unit_websocket.h" 15 16 #include "nxt_websocket.h" 17 18 #if (NXT_HAVE_MEMFD_CREATE) 19 #include <linux/memfd.h> 20 #endif 21 22 #define NXT_UNIT_MAX_PLAIN_SIZE 1024 23 #define NXT_UNIT_LOCAL_BUF_SIZE \ 24 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) 25 26 typedef struct nxt_unit_impl_s nxt_unit_impl_t; 27 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 28 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 29 typedef struct nxt_unit_process_s nxt_unit_process_t; 30 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 31 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 32 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t; 33 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 34 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 35 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; 36 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; 37 38 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 39 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, 40 nxt_unit_ctx_impl_t *ctx_impl, void *data); 41 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl); 42 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl); 43 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); 44 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); 45 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 46 nxt_unit_mmap_buf_t *mmap_buf); 47 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 48 nxt_unit_mmap_buf_t *mmap_buf); 49 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); 50 static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 51 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, 52 int *log_fd, uint32_t *stream, uint32_t *shm_limit); 53 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); 54 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 55 nxt_unit_recv_msg_t *recv_msg); 56 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 57 nxt_unit_recv_msg_t *recv_msg); 58 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 59 nxt_unit_recv_msg_t *recv_msg); 60 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); 61 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 62 nxt_unit_ctx_t *ctx); 63 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 64 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); 65 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( 66 nxt_unit_ctx_t *ctx); 67 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); 68 static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws); 69 static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, 70 nxt_unit_recv_msg_t *recv_msg); 71 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 72 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 73 static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, 74 nxt_unit_mmap_buf_t *mmap_buf, int last); 75 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); 76 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); 77 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx); 78 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl( 79 nxt_unit_ctx_impl_t *ctx_impl); 80 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 81 nxt_unit_read_buf_t *rbuf); 82 static nxt_unit_mmap_buf_t *nxt_unit_request_preread( 83 nxt_unit_request_info_t *req, size_t size); 84 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, 85 size_t size); 86 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 87 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, 88 nxt_chunk_id_t *c, int *n, int min_n); 89 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); 90 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); 91 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 92 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 93 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n); 94 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 95 int fd); 96 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, 97 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size, 98 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); 99 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 100 101 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 102 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); 103 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); 104 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 105 static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, 106 nxt_unit_process_t *process, uint32_t id); 107 static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, 108 nxt_unit_recv_msg_t *recv_msg); 109 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, 110 nxt_unit_recv_msg_t *recv_msg); 111 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 112 nxt_unit_process_t *process, 113 nxt_port_mmap_header_t *hdr, void *start, uint32_t size); 114 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); 115 116 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib, 117 pid_t pid); 118 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, 119 pid_t pid, int remove); 120 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); 121 static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, 122 nxt_unit_read_buf_t *rbuf); 123 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); 124 static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, 125 nxt_unit_port_id_t *port_id, int *fd); 126 127 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, 128 nxt_unit_port_id_t *new_port, int fd); 129 130 static int nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 131 static int nxt_unit_remove_port(nxt_unit_impl_t *lib, 132 nxt_unit_port_id_t *port_id); 133 static int nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, 134 nxt_unit_port_id_t *port_id, nxt_unit_port_t **r_port, 135 nxt_unit_process_t **process); 136 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); 137 static void nxt_unit_remove_process(nxt_unit_impl_t *lib, 138 nxt_unit_process_t *process); 139 static void nxt_unit_quit(nxt_unit_ctx_t *ctx); 140 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, 141 nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, 142 const void *oob, size_t oob_size); 143 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 144 const void *buf, size_t buf_size, const void *oob, size_t oob_size); 145 static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, 146 nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, 147 void *oob, size_t oob_size); 148 149 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 150 nxt_unit_port_t *port); 151 static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 152 nxt_unit_port_id_t *port_id, int remove); 153 154 static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, 155 nxt_unit_request_info_impl_t *req_impl); 156 static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find( 157 nxt_lvlhsh_t *request_hash, uint32_t stream, int remove); 158 159 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); 160 161 162 struct nxt_unit_mmap_buf_s { 163 nxt_unit_buf_t buf; 164 165 nxt_unit_mmap_buf_t *next; 166 nxt_unit_mmap_buf_t **prev; 167 168 nxt_port_mmap_header_t *hdr; 169 nxt_unit_port_id_t port_id; 170 nxt_unit_request_info_t *req; 171 nxt_unit_ctx_impl_t *ctx_impl; 172 nxt_unit_process_t *process; 173 char *free_ptr; 174 char *plain_ptr; 175 }; 176 177 178 struct nxt_unit_recv_msg_s { 179 uint32_t stream; 180 nxt_pid_t pid; 181 nxt_port_id_t reply_port; 182 183 uint8_t last; /* 1 bit */ 184 uint8_t mmap; /* 1 bit */ 185 186 void *start; 187 uint32_t size; 188 189 int fd; 190 nxt_unit_process_t *process; 191 192 nxt_unit_mmap_buf_t *incoming_buf; 193 }; 194 195 196 typedef enum { 197 NXT_UNIT_RS_START = 0, 198 NXT_UNIT_RS_RESPONSE_INIT, 199 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 200 NXT_UNIT_RS_RESPONSE_SENT, 201 NXT_UNIT_RS_RELEASED, 202 } nxt_unit_req_state_t; 203 204 205 struct nxt_unit_request_info_impl_s { 206 nxt_unit_request_info_t req; 207 208 uint32_t stream; 209 210 nxt_unit_process_t *process; 211 212 nxt_unit_mmap_buf_t *outgoing_buf; 213 nxt_unit_mmap_buf_t *incoming_buf; 214 215 nxt_unit_req_state_t state; 216 uint8_t websocket; 217 218 nxt_queue_link_t link; 219 220 char extra_data[]; 221 }; 222 223 224 struct nxt_unit_websocket_frame_impl_s { 225 nxt_unit_websocket_frame_t ws; 226 227 nxt_unit_mmap_buf_t *buf; 228 229 nxt_queue_link_t link; 230 231 nxt_unit_ctx_impl_t *ctx_impl; 232 }; 233 234 235 struct nxt_unit_read_buf_s { 236 nxt_unit_read_buf_t *next; 237 ssize_t size; 238 char buf[16384]; 239 char oob[256]; 240 }; 241 242 243 struct nxt_unit_ctx_impl_s { 244 nxt_unit_ctx_t ctx; 245 246 nxt_atomic_t use_count; 247 248 pthread_mutex_t mutex; 249 250 nxt_unit_port_id_t read_port_id; 251 int read_port_fd; 252 253 nxt_queue_link_t link; 254 255 nxt_unit_mmap_buf_t *free_buf; 256 257 /* of nxt_unit_request_info_impl_t */ 258 nxt_queue_t free_req; 259 260 /* of nxt_unit_websocket_frame_impl_t */ 261 nxt_queue_t free_ws; 262 263 /* of nxt_unit_request_info_impl_t */ 264 nxt_queue_t active_req; 265 266 /* of nxt_unit_request_info_impl_t */ 267 nxt_lvlhsh_t requests; 268 269 nxt_unit_read_buf_t *pending_read_head; 270 nxt_unit_read_buf_t **pending_read_tail; 271 nxt_unit_read_buf_t *free_read_buf; 272 273 nxt_unit_mmap_buf_t ctx_buf[2]; 274 nxt_unit_read_buf_t ctx_read_buf; 275 276 nxt_unit_request_info_impl_t req; 277 }; 278 279 280 struct nxt_unit_impl_s { 281 nxt_unit_t unit; 282 nxt_unit_callbacks_t callbacks; 283 284 nxt_atomic_t use_count; 285 286 uint32_t request_data_size; 287 uint32_t shm_mmap_limit; 288 289 pthread_mutex_t mutex; 290 291 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ 292 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ 293 294 nxt_unit_port_id_t router_port_id; 295 296 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ 297 298 pid_t pid; 299 int log_fd; 300 int online; 301 302 nxt_unit_ctx_impl_t main_ctx; 303 }; 304 305 306 struct nxt_unit_port_impl_s { 307 nxt_unit_port_t port; 308 309 nxt_queue_link_t link; 310 nxt_unit_process_t *process; 311 }; 312 313 314 struct nxt_unit_mmap_s { 315 nxt_port_mmap_header_t *hdr; 316 }; 317 318 319 struct nxt_unit_mmaps_s { 320 pthread_mutex_t mutex; 321 uint32_t size; 322 uint32_t cap; 323 nxt_atomic_t allocated_chunks; 324 nxt_unit_mmap_t *elts; 325 }; 326 327 328 struct nxt_unit_process_s { 329 pid_t pid; 330 331 nxt_queue_t ports; 332 333 nxt_unit_mmaps_t incoming; 334 nxt_unit_mmaps_t outgoing; 335 336 nxt_unit_impl_t *lib; 337 338 nxt_atomic_t use_count; 339 340 uint32_t next_port_id; 341 }; 342 343 344 /* Explicitly using 32 bit types to avoid possible alignment. */ 345 typedef struct { 346 int32_t pid; 347 uint32_t id; 348 } nxt_unit_port_hash_id_t; 349 350 351 nxt_unit_ctx_t * 352 nxt_unit_init(nxt_unit_init_t *init) 353 { 354 int rc; 355 uint32_t ready_stream, shm_limit; 356 nxt_unit_ctx_t *ctx; 357 nxt_unit_impl_t *lib; 358 nxt_unit_port_t ready_port, router_port, read_port; 359 360 lib = nxt_unit_create(init); 361 if (nxt_slow_path(lib == NULL)) { 362 return NULL; 363 } 364 365 if (init->ready_port.id.pid != 0 366 && init->ready_stream != 0 367 && init->read_port.id.pid != 0) 368 { 369 ready_port = init->ready_port; 370 ready_stream = init->ready_stream; 371 router_port = init->router_port; 372 read_port = init->read_port; 373 lib->log_fd = init->log_fd; 374 375 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, 376 ready_port.id.id); 377 nxt_unit_port_id_init(&router_port.id, router_port.id.pid, 378 router_port.id.id); 379 nxt_unit_port_id_init(&read_port.id, read_port.id.pid, 380 read_port.id.id); 381 382 } else { 383 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, 384 &lib->log_fd, &ready_stream, &shm_limit); 385 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 386 goto fail; 387 } 388 389 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) 390 / PORT_MMAP_DATA_SIZE; 391 } 392 393 if (nxt_slow_path(lib->shm_mmap_limit < 1)) { 394 lib->shm_mmap_limit = 1; 395 } 396 397 lib->pid = read_port.id.pid; 398 ctx = &lib->main_ctx.ctx; 399 400 rc = nxt_unit_add_port(ctx, &router_port); 401 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 402 nxt_unit_alert(NULL, "failed to add router_port"); 403 404 goto fail; 405 } 406 407 lib->router_port_id = router_port.id; 408 409 rc = nxt_unit_add_port(ctx, &read_port); 410 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 411 nxt_unit_alert(NULL, "failed to add read_port"); 412 413 goto fail; 414 } 415 416 lib->main_ctx.read_port_id = read_port.id; 417 418 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream); 419 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 420 nxt_unit_alert(NULL, "failed to send READY message"); 421 422 goto fail; 423 } 424 425 close(ready_port.out_fd); 426 427 return ctx; 428 429 fail: 430 431 free(lib); 432 433 return NULL; 434 } 435 436 437 static nxt_unit_impl_t * 438 nxt_unit_create(nxt_unit_init_t *init) 439 { 440 int rc; 441 nxt_unit_impl_t *lib; 442 nxt_unit_callbacks_t *cb; 443 444 lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size); 445 if (nxt_slow_path(lib == NULL)) { 446 nxt_unit_alert(NULL, "failed to allocate unit struct"); 447 448 return NULL; 449 } 450 451 rc = pthread_mutex_init(&lib->mutex, NULL); 452 if (nxt_slow_path(rc != 0)) { 453 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 454 455 goto fail; 456 } 457 458 lib->unit.data = init->data; 459 lib->callbacks = init->callbacks; 460 461 lib->request_data_size = init->request_data_size; 462 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) 463 / PORT_MMAP_DATA_SIZE; 464 465 lib->processes.slot = NULL; 466 lib->ports.slot = NULL; 467 468 lib->log_fd = STDERR_FILENO; 469 lib->online = 1; 470 471 nxt_queue_init(&lib->contexts); 472 473 lib->use_count = 0; 474 475 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); 476 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 477 goto fail; 478 } 479 480 cb = &lib->callbacks; 481 482 if (cb->request_handler == NULL) { 483 nxt_unit_alert(NULL, "request_handler is NULL"); 484 485 goto fail; 486 } 487 488 if (cb->port_recv == NULL) { 489 cb->port_recv = nxt_unit_port_recv_default; 490 } 491 492 return lib; 493 494 fail: 495 496 free(lib); 497 498 return NULL; 499 } 500 501 502 static int 503 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 504 void *data) 505 { 506 int rc; 507 508 ctx_impl->ctx.data = data; 509 ctx_impl->ctx.unit = &lib->unit; 510 511 rc = pthread_mutex_init(&ctx_impl->mutex, NULL); 512 if (nxt_slow_path(rc != 0)) { 513 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 514 515 return NXT_UNIT_ERROR; 516 } 517 518 nxt_unit_lib_use(lib); 519 520 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 521 522 ctx_impl->use_count = 1; 523 524 nxt_queue_init(&ctx_impl->free_req); 525 nxt_queue_init(&ctx_impl->free_ws); 526 nxt_queue_init(&ctx_impl->active_req); 527 528 ctx_impl->free_buf = NULL; 529 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 530 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 531 532 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 533 534 ctx_impl->pending_read_head = NULL; 535 ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; 536 ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf; 537 ctx_impl->ctx_read_buf.next = NULL; 538 539 ctx_impl->req.req.ctx = &ctx_impl->ctx; 540 ctx_impl->req.req.unit = &lib->unit; 541 542 ctx_impl->read_port_fd = -1; 543 ctx_impl->requests.slot = 0; 544 545 return NXT_UNIT_OK; 546 } 547 548 549 nxt_inline void 550 nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl) 551 { 552 nxt_atomic_fetch_add(&ctx_impl->use_count, 1); 553 } 554 555 556 nxt_inline void 557 nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl) 558 { 559 long c; 560 561 c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); 562 563 if (c == 1) { 564 nxt_unit_ctx_free(ctx_impl); 565 } 566 } 567 568 569 nxt_inline void 570 nxt_unit_lib_use(nxt_unit_impl_t *lib) 571 { 572 nxt_atomic_fetch_add(&lib->use_count, 1); 573 } 574 575 576 nxt_inline void 577 nxt_unit_lib_release(nxt_unit_impl_t *lib) 578 { 579 long c; 580 nxt_unit_process_t *process; 581 582 c = nxt_atomic_fetch_add(&lib->use_count, -1); 583 584 if (c == 1) { 585 for ( ;; ) { 586 pthread_mutex_lock(&lib->mutex); 587 588 process = nxt_unit_process_pop_first(lib); 589 if (process == NULL) { 590 pthread_mutex_unlock(&lib->mutex); 591 592 break; 593 } 594 595 nxt_unit_remove_process(lib, process); 596 } 597 598 pthread_mutex_destroy(&lib->mutex); 599 600 free(lib); 601 } 602 } 603 604 605 nxt_inline void 606 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 607 nxt_unit_mmap_buf_t *mmap_buf) 608 { 609 mmap_buf->next = *head; 610 611 if (mmap_buf->next != NULL) { 612 mmap_buf->next->prev = &mmap_buf->next; 613 } 614 615 *head = mmap_buf; 616 mmap_buf->prev = head; 617 } 618 619 620 nxt_inline void 621 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 622 nxt_unit_mmap_buf_t *mmap_buf) 623 { 624 while (*prev != NULL) { 625 prev = &(*prev)->next; 626 } 627 628 nxt_unit_mmap_buf_insert(prev, mmap_buf); 629 } 630 631 632 nxt_inline void 633 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) 634 { 635 nxt_unit_mmap_buf_t **prev; 636 637 prev = mmap_buf->prev; 638 639 if (mmap_buf->next != NULL) { 640 mmap_buf->next->prev = prev; 641 } 642 643 if (prev != NULL) { 644 *prev = mmap_buf->next; 645 } 646 } 647 648 649 static int 650 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, 651 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, 652 uint32_t *shm_limit) 653 { 654 int rc; 655 int ready_fd, router_fd, read_fd; 656 char *unit_init, *version_end; 657 long version_length; 658 int64_t ready_pid, router_pid, read_pid; 659 uint32_t ready_stream, router_id, ready_id, read_id; 660 661 unit_init = getenv(NXT_UNIT_INIT_ENV); 662 if (nxt_slow_path(unit_init == NULL)) { 663 nxt_unit_alert(NULL, "%s is not in the current environment", 664 NXT_UNIT_INIT_ENV); 665 666 return NXT_UNIT_ERROR; 667 } 668 669 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); 670 671 version_length = nxt_length(NXT_VERSION); 672 673 version_end = strchr(unit_init, ';'); 674 if (version_end == NULL 675 || version_end - unit_init != version_length 676 || memcmp(unit_init, NXT_VERSION, version_length) != 0) 677 { 678 nxt_unit_alert(NULL, "version check error"); 679 680 return NXT_UNIT_ERROR; 681 } 682 683 rc = sscanf(version_end + 1, 684 "%"PRIu32";" 685 "%"PRId64",%"PRIu32",%d;" 686 "%"PRId64",%"PRIu32",%d;" 687 "%"PRId64",%"PRIu32",%d;" 688 "%d,%"PRIu32, 689 &ready_stream, 690 &ready_pid, &ready_id, &ready_fd, 691 &router_pid, &router_id, &router_fd, 692 &read_pid, &read_id, &read_fd, 693 log_fd, shm_limit); 694 695 if (nxt_slow_path(rc != 12)) { 696 nxt_unit_alert(NULL, "failed to scan variables: %d", rc); 697 698 return NXT_UNIT_ERROR; 699 } 700 701 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); 702 703 ready_port->in_fd = -1; 704 ready_port->out_fd = ready_fd; 705 ready_port->data = NULL; 706 707 nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id); 708 709 router_port->in_fd = -1; 710 router_port->out_fd = router_fd; 711 router_port->data = NULL; 712 713 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); 714 715 read_port->in_fd = read_fd; 716 read_port->out_fd = -1; 717 read_port->data = NULL; 718 719 *stream = ready_stream; 720 721 return NXT_UNIT_OK; 722 } 723 724 725 static int 726 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) 727 { 728 ssize_t res; 729 nxt_port_msg_t msg; 730 nxt_unit_impl_t *lib; 731 732 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 733 734 msg.stream = stream; 735 msg.pid = lib->pid; 736 msg.reply_port = 0; 737 msg.type = _NXT_PORT_MSG_PROCESS_READY; 738 msg.last = 1; 739 msg.mmap = 0; 740 msg.nf = 0; 741 msg.mf = 0; 742 msg.tracking = 0; 743 744 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0); 745 if (res != sizeof(msg)) { 746 return NXT_UNIT_ERROR; 747 } 748 749 return NXT_UNIT_OK; 750 } 751 752 753 int 754 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 755 void *buf, size_t buf_size, void *oob, size_t oob_size) 756 { 757 int rc; 758 pid_t pid; 759 struct cmsghdr *cm; 760 nxt_port_msg_t *port_msg; 761 nxt_unit_impl_t *lib; 762 nxt_unit_recv_msg_t recv_msg; 763 764 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 765 766 rc = NXT_UNIT_ERROR; 767 recv_msg.fd = -1; 768 recv_msg.process = NULL; 769 port_msg = buf; 770 cm = oob; 771 772 if (oob_size >= CMSG_SPACE(sizeof(int)) 773 && cm->cmsg_len == CMSG_LEN(sizeof(int)) 774 && cm->cmsg_level == SOL_SOCKET 775 && cm->cmsg_type == SCM_RIGHTS) 776 { 777 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); 778 } 779 780 recv_msg.incoming_buf = NULL; 781 782 if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { 783 nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); 784 goto fail; 785 } 786 787 recv_msg.stream = port_msg->stream; 788 recv_msg.pid = port_msg->pid; 789 recv_msg.reply_port = port_msg->reply_port; 790 recv_msg.last = port_msg->last; 791 recv_msg.mmap = port_msg->mmap; 792 793 recv_msg.start = port_msg + 1; 794 recv_msg.size = buf_size - sizeof(nxt_port_msg_t); 795 796 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 797 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", 798 port_msg->stream, (int) port_msg->type); 799 goto fail; 800 } 801 802 if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) { 803 rc = NXT_UNIT_OK; 804 805 goto fail; 806 } 807 808 /* Fragmentation is unsupported. */ 809 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 810 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", 811 port_msg->stream, (int) port_msg->type); 812 goto fail; 813 } 814 815 if (port_msg->mmap) { 816 if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) { 817 goto fail; 818 } 819 } 820 821 switch (port_msg->type) { 822 823 case _NXT_PORT_MSG_QUIT: 824 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); 825 826 nxt_unit_quit(ctx); 827 rc = NXT_UNIT_OK; 828 break; 829 830 case _NXT_PORT_MSG_NEW_PORT: 831 rc = nxt_unit_process_new_port(ctx, &recv_msg); 832 break; 833 834 case _NXT_PORT_MSG_CHANGE_FILE: 835 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 836 port_msg->stream, recv_msg.fd); 837 838 if (dup2(recv_msg.fd, lib->log_fd) == -1) { 839 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", 840 port_msg->stream, recv_msg.fd, lib->log_fd, 841 strerror(errno), errno); 842 843 goto fail; 844 } 845 846 rc = NXT_UNIT_OK; 847 break; 848 849 case _NXT_PORT_MSG_MMAP: 850 if (nxt_slow_path(recv_msg.fd < 0)) { 851 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", 852 port_msg->stream, recv_msg.fd); 853 854 goto fail; 855 } 856 857 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd); 858 break; 859 860 case _NXT_PORT_MSG_REQ_HEADERS: 861 rc = nxt_unit_process_req_headers(ctx, &recv_msg); 862 break; 863 864 case _NXT_PORT_MSG_WEBSOCKET: 865 rc = nxt_unit_process_websocket(ctx, &recv_msg); 866 break; 867 868 case _NXT_PORT_MSG_REMOVE_PID: 869 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 870 nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size " 871 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 872 (int) sizeof(pid)); 873 874 goto fail; 875 } 876 877 memcpy(&pid, recv_msg.start, sizeof(pid)); 878 879 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 880 port_msg->stream, (int) pid); 881 882 nxt_unit_remove_pid(lib, pid); 883 884 rc = NXT_UNIT_OK; 885 break; 886 887 case _NXT_PORT_MSG_SHM_ACK: 888 rc = nxt_unit_process_shm_ack(ctx); 889 break; 890 891 default: 892 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", 893 port_msg->stream, (int) port_msg->type); 894 895 goto fail; 896 } 897 898 fail: 899 900 if (recv_msg.fd != -1) { 901 close(recv_msg.fd); 902 } 903 904 while (recv_msg.incoming_buf != NULL) { 905 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 906 } 907 908 if (recv_msg.process != NULL) { 909 nxt_unit_process_release(recv_msg.process); 910 } 911 912 return rc; 913 } 914 915 916 static int 917 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 918 { 919 int nb; 920 nxt_unit_port_t new_port; 921 nxt_port_msg_new_port_t *new_port_msg; 922 923 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 924 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 925 "invalid message size (%d)", 926 recv_msg->stream, (int) recv_msg->size); 927 928 return NXT_UNIT_ERROR; 929 } 930 931 if (nxt_slow_path(recv_msg->fd < 0)) { 932 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", 933 recv_msg->stream, recv_msg->fd); 934 935 return NXT_UNIT_ERROR; 936 } 937 938 new_port_msg = recv_msg->start; 939 940 nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", 941 recv_msg->stream, (int) new_port_msg->pid, 942 (int) new_port_msg->id, recv_msg->fd); 943 944 nb = 0; 945 946 if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { 947 nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " 948 "failed: %s (%d)", 949 recv_msg->stream, recv_msg->fd, strerror(errno), errno); 950 951 return NXT_UNIT_ERROR; 952 } 953 954 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 955 new_port_msg->id); 956 957 new_port.in_fd = -1; 958 new_port.out_fd = recv_msg->fd; 959 new_port.data = NULL; 960 961 recv_msg->fd = -1; 962 963 return nxt_unit_add_port(ctx, &new_port); 964 } 965 966 967 static int 968 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 969 { 970 nxt_unit_impl_t *lib; 971 nxt_unit_request_t *r; 972 nxt_unit_mmap_buf_t *b; 973 nxt_unit_request_info_t *req; 974 nxt_unit_request_info_impl_t *req_impl; 975 976 if (nxt_slow_path(recv_msg->mmap == 0)) { 977 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 978 recv_msg->stream); 979 980 return NXT_UNIT_ERROR; 981 } 982 983 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { 984 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 985 "%d expected", recv_msg->stream, (int) recv_msg->size, 986 (int) sizeof(nxt_unit_request_t)); 987 988 return NXT_UNIT_ERROR; 989 } 990 991 req_impl = nxt_unit_request_info_get(ctx); 992 if (nxt_slow_path(req_impl == NULL)) { 993 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 994 recv_msg->stream); 995 996 return NXT_UNIT_ERROR; 997 } 998 999 req = &req_impl->req; 1000 1001 nxt_unit_port_id_init(&req->response_port, recv_msg->pid, 1002 recv_msg->reply_port); 1003 1004 req->request = recv_msg->start; 1005 1006 b = recv_msg->incoming_buf; 1007 1008 req->request_buf = &b->buf; 1009 req->response = NULL; 1010 req->response_buf = NULL; 1011 1012 r = req->request; 1013 1014 req->content_length = r->content_length; 1015 1016 req->content_buf = req->request_buf; 1017 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); 1018 1019 /* "Move" process reference to req_impl. */ 1020 req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg); 1021 if (nxt_slow_path(req_impl->process == NULL)) { 1022 return NXT_UNIT_ERROR; 1023 } 1024 1025 recv_msg->process = NULL; 1026 1027 req_impl->stream = recv_msg->stream; 1028 1029 req_impl->outgoing_buf = NULL; 1030 1031 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1032 b->req = req; 1033 } 1034 1035 /* "Move" incoming buffer list to req_impl. */ 1036 req_impl->incoming_buf = recv_msg->incoming_buf; 1037 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 1038 recv_msg->incoming_buf = NULL; 1039 1040 req->content_fd = recv_msg->fd; 1041 recv_msg->fd = -1; 1042 1043 req->response_max_fields = 0; 1044 req_impl->state = NXT_UNIT_RS_START; 1045 req_impl->websocket = 0; 1046 1047 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 1048 (int) r->method_length, 1049 (char *) nxt_unit_sptr_get(&r->method), 1050 (int) r->target_length, 1051 (char *) nxt_unit_sptr_get(&r->target), 1052 (int) r->content_length); 1053 1054 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1055 1056 lib->callbacks.request_handler(req); 1057 1058 return NXT_UNIT_OK; 1059 } 1060 1061 1062 static int 1063 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1064 { 1065 size_t hsize; 1066 nxt_unit_impl_t *lib; 1067 nxt_unit_mmap_buf_t *b; 1068 nxt_unit_ctx_impl_t *ctx_impl; 1069 nxt_unit_callbacks_t *cb; 1070 nxt_unit_request_info_t *req; 1071 nxt_unit_request_info_impl_t *req_impl; 1072 nxt_unit_websocket_frame_impl_t *ws_impl; 1073 1074 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1075 1076 req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream, 1077 recv_msg->last); 1078 if (req_impl == NULL) { 1079 return NXT_UNIT_OK; 1080 } 1081 1082 req = &req_impl->req; 1083 1084 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1085 cb = &lib->callbacks; 1086 1087 if (cb->websocket_handler && recv_msg->size >= 2) { 1088 ws_impl = nxt_unit_websocket_frame_get(ctx); 1089 if (nxt_slow_path(ws_impl == NULL)) { 1090 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", 1091 req_impl->stream); 1092 1093 return NXT_UNIT_ERROR; 1094 } 1095 1096 ws_impl->ws.req = req; 1097 1098 ws_impl->buf = NULL; 1099 1100 if (recv_msg->mmap) { 1101 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1102 b->req = req; 1103 } 1104 1105 /* "Move" incoming buffer list to ws_impl. */ 1106 ws_impl->buf = recv_msg->incoming_buf; 1107 ws_impl->buf->prev = &ws_impl->buf; 1108 recv_msg->incoming_buf = NULL; 1109 1110 b = ws_impl->buf; 1111 1112 } else { 1113 b = nxt_unit_mmap_buf_get(ctx); 1114 if (nxt_slow_path(b == NULL)) { 1115 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", 1116 req_impl->stream); 1117 1118 nxt_unit_websocket_frame_release(&ws_impl->ws); 1119 1120 return NXT_UNIT_ERROR; 1121 } 1122 1123 b->req = req; 1124 b->buf.start = recv_msg->start; 1125 b->buf.free = b->buf.start; 1126 b->buf.end = b->buf.start + recv_msg->size; 1127 1128 nxt_unit_mmap_buf_insert(&ws_impl->buf, b); 1129 } 1130 1131 ws_impl->ws.header = (void *) b->buf.start; 1132 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( 1133 ws_impl->ws.header); 1134 1135 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); 1136 1137 if (ws_impl->ws.header->mask) { 1138 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; 1139 1140 } else { 1141 ws_impl->ws.mask = NULL; 1142 } 1143 1144 b->buf.free += hsize; 1145 1146 ws_impl->ws.content_buf = &b->buf; 1147 ws_impl->ws.content_length = ws_impl->ws.payload_len; 1148 1149 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " 1150 "payload_len=%"PRIu64, 1151 ws_impl->ws.header->opcode, 1152 ws_impl->ws.payload_len); 1153 1154 cb->websocket_handler(&ws_impl->ws); 1155 } 1156 1157 if (recv_msg->last) { 1158 req_impl->websocket = 0; 1159 1160 if (cb->close_handler) { 1161 nxt_unit_req_debug(req, "close_handler"); 1162 1163 cb->close_handler(req); 1164 1165 } else { 1166 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1167 } 1168 } 1169 1170 return NXT_UNIT_OK; 1171 } 1172 1173 1174 static int 1175 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) 1176 { 1177 nxt_unit_impl_t *lib; 1178 nxt_unit_callbacks_t *cb; 1179 1180 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1181 cb = &lib->callbacks; 1182 1183 if (cb->shm_ack_handler != NULL) { 1184 cb->shm_ack_handler(ctx); 1185 } 1186 1187 return NXT_UNIT_OK; 1188 } 1189 1190 1191 static nxt_unit_request_info_impl_t * 1192 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 1193 { 1194 nxt_unit_impl_t *lib; 1195 nxt_queue_link_t *lnk; 1196 nxt_unit_ctx_impl_t *ctx_impl; 1197 nxt_unit_request_info_impl_t *req_impl; 1198 1199 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1200 1201 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1202 1203 pthread_mutex_lock(&ctx_impl->mutex); 1204 1205 if (nxt_queue_is_empty(&ctx_impl->free_req)) { 1206 pthread_mutex_unlock(&ctx_impl->mutex); 1207 1208 req_impl = malloc(sizeof(nxt_unit_request_info_impl_t) 1209 + lib->request_data_size); 1210 if (nxt_slow_path(req_impl == NULL)) { 1211 return NULL; 1212 } 1213 1214 req_impl->req.unit = ctx->unit; 1215 req_impl->req.ctx = ctx; 1216 1217 pthread_mutex_lock(&ctx_impl->mutex); 1218 1219 } else { 1220 lnk = nxt_queue_first(&ctx_impl->free_req); 1221 nxt_queue_remove(lnk); 1222 1223 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link); 1224 } 1225 1226 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); 1227 1228 pthread_mutex_unlock(&ctx_impl->mutex); 1229 1230 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; 1231 1232 return req_impl; 1233 } 1234 1235 1236 static void 1237 nxt_unit_request_info_release(nxt_unit_request_info_t *req) 1238 { 1239 nxt_unit_ctx_impl_t *ctx_impl; 1240 nxt_unit_request_info_impl_t *req_impl; 1241 1242 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1243 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1244 1245 req->response = NULL; 1246 req->response_buf = NULL; 1247 1248 if (req_impl->websocket) { 1249 nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); 1250 1251 req_impl->websocket = 0; 1252 } 1253 1254 while (req_impl->outgoing_buf != NULL) { 1255 nxt_unit_mmap_buf_free(req_impl->outgoing_buf); 1256 } 1257 1258 while (req_impl->incoming_buf != NULL) { 1259 nxt_unit_mmap_buf_free(req_impl->incoming_buf); 1260 } 1261 1262 if (req->content_fd != -1) { 1263 close(req->content_fd); 1264 1265 req->content_fd = -1; 1266 } 1267 1268 /* 1269 * Process release should go after buffers release to guarantee mmap 1270 * existence. 1271 */ 1272 if (req_impl->process != NULL) { 1273 nxt_unit_process_release(req_impl->process); 1274 1275 req_impl->process = NULL; 1276 } 1277 1278 pthread_mutex_lock(&ctx_impl->mutex); 1279 1280 nxt_queue_remove(&req_impl->link); 1281 1282 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); 1283 1284 pthread_mutex_unlock(&ctx_impl->mutex); 1285 1286 req_impl->state = NXT_UNIT_RS_RELEASED; 1287 } 1288 1289 1290 static void 1291 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 1292 { 1293 nxt_unit_ctx_impl_t *ctx_impl; 1294 1295 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 1296 1297 nxt_queue_remove(&req_impl->link); 1298 1299 if (req_impl != &ctx_impl->req) { 1300 free(req_impl); 1301 } 1302 } 1303 1304 1305 static nxt_unit_websocket_frame_impl_t * 1306 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) 1307 { 1308 nxt_queue_link_t *lnk; 1309 nxt_unit_ctx_impl_t *ctx_impl; 1310 nxt_unit_websocket_frame_impl_t *ws_impl; 1311 1312 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1313 1314 pthread_mutex_lock(&ctx_impl->mutex); 1315 1316 if (nxt_queue_is_empty(&ctx_impl->free_ws)) { 1317 pthread_mutex_unlock(&ctx_impl->mutex); 1318 1319 ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t)); 1320 if (nxt_slow_path(ws_impl == NULL)) { 1321 return NULL; 1322 } 1323 1324 } else { 1325 lnk = nxt_queue_first(&ctx_impl->free_ws); 1326 nxt_queue_remove(lnk); 1327 1328 pthread_mutex_unlock(&ctx_impl->mutex); 1329 1330 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); 1331 } 1332 1333 ws_impl->ctx_impl = ctx_impl; 1334 1335 return ws_impl; 1336 } 1337 1338 1339 static void 1340 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) 1341 { 1342 nxt_unit_websocket_frame_impl_t *ws_impl; 1343 1344 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 1345 1346 while (ws_impl->buf != NULL) { 1347 nxt_unit_mmap_buf_free(ws_impl->buf); 1348 } 1349 1350 ws->req = NULL; 1351 1352 pthread_mutex_lock(&ws_impl->ctx_impl->mutex); 1353 1354 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); 1355 1356 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex); 1357 } 1358 1359 1360 static void 1361 nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl) 1362 { 1363 nxt_queue_remove(&ws_impl->link); 1364 1365 free(ws_impl); 1366 } 1367 1368 1369 uint16_t 1370 nxt_unit_field_hash(const char *name, size_t name_length) 1371 { 1372 u_char ch; 1373 uint32_t hash; 1374 const char *p, *end; 1375 1376 hash = 159406; /* Magic value copied from nxt_http_parse.c */ 1377 end = name + name_length; 1378 1379 for (p = name; p < end; p++) { 1380 ch = *p; 1381 hash = (hash << 4) + hash + nxt_lowcase(ch); 1382 } 1383 1384 hash = (hash >> 16) ^ hash; 1385 1386 return hash; 1387 } 1388 1389 1390 void 1391 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) 1392 { 1393 uint32_t i, j; 1394 nxt_unit_field_t *fields, f; 1395 nxt_unit_request_t *r; 1396 1397 nxt_unit_req_debug(req, "group_dup_fields"); 1398 1399 r = req->request; 1400 fields = r->fields; 1401 1402 for (i = 0; i < r->fields_count; i++) { 1403 1404 switch (fields[i].hash) { 1405 case NXT_UNIT_HASH_CONTENT_LENGTH: 1406 r->content_length_field = i; 1407 break; 1408 1409 case NXT_UNIT_HASH_CONTENT_TYPE: 1410 r->content_type_field = i; 1411 break; 1412 1413 case NXT_UNIT_HASH_COOKIE: 1414 r->cookie_field = i; 1415 break; 1416 }; 1417 1418 for (j = i + 1; j < r->fields_count; j++) { 1419 if (fields[i].hash != fields[j].hash) { 1420 continue; 1421 } 1422 1423 if (j == i + 1) { 1424 continue; 1425 } 1426 1427 f = fields[j]; 1428 f.name.offset += (j - (i + 1)) * sizeof(f); 1429 f.value.offset += (j - (i + 1)) * sizeof(f); 1430 1431 while (j > i + 1) { 1432 fields[j] = fields[j - 1]; 1433 fields[j].name.offset -= sizeof(f); 1434 fields[j].value.offset -= sizeof(f); 1435 j--; 1436 } 1437 1438 fields[j] = f; 1439 1440 i++; 1441 } 1442 } 1443 } 1444 1445 1446 int 1447 nxt_unit_response_init(nxt_unit_request_info_t *req, 1448 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size) 1449 { 1450 uint32_t buf_size; 1451 nxt_unit_buf_t *buf; 1452 nxt_unit_request_info_impl_t *req_impl; 1453 1454 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1455 1456 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1457 nxt_unit_req_warn(req, "init: response already sent"); 1458 1459 return NXT_UNIT_ERROR; 1460 } 1461 1462 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status, 1463 (int) max_fields_count, (int) max_fields_size); 1464 1465 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) { 1466 nxt_unit_req_debug(req, "duplicate response init"); 1467 } 1468 1469 /* 1470 * Each field name and value 0-terminated by libunit, 1471 * this is the reason of '+ 2' below. 1472 */ 1473 buf_size = sizeof(nxt_unit_response_t) 1474 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 1475 + max_fields_size; 1476 1477 if (nxt_slow_path(req->response_buf != NULL)) { 1478 buf = req->response_buf; 1479 1480 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) { 1481 goto init_response; 1482 } 1483 1484 nxt_unit_buf_free(buf); 1485 1486 req->response_buf = NULL; 1487 req->response = NULL; 1488 req->response_max_fields = 0; 1489 1490 req_impl->state = NXT_UNIT_RS_START; 1491 } 1492 1493 buf = nxt_unit_response_buf_alloc(req, buf_size); 1494 if (nxt_slow_path(buf == NULL)) { 1495 return NXT_UNIT_ERROR; 1496 } 1497 1498 init_response: 1499 1500 memset(buf->start, 0, sizeof(nxt_unit_response_t)); 1501 1502 req->response_buf = buf; 1503 1504 req->response = (nxt_unit_response_t *) buf->start; 1505 req->response->status = status; 1506 1507 buf->free = buf->start + sizeof(nxt_unit_response_t) 1508 + max_fields_count * sizeof(nxt_unit_field_t); 1509 1510 req->response_max_fields = max_fields_count; 1511 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT; 1512 1513 return NXT_UNIT_OK; 1514 } 1515 1516 1517 int 1518 nxt_unit_response_realloc(nxt_unit_request_info_t *req, 1519 uint32_t max_fields_count, uint32_t max_fields_size) 1520 { 1521 char *p; 1522 uint32_t i, buf_size; 1523 nxt_unit_buf_t *buf; 1524 nxt_unit_field_t *f, *src; 1525 nxt_unit_response_t *resp; 1526 nxt_unit_request_info_impl_t *req_impl; 1527 1528 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1529 1530 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1531 nxt_unit_req_warn(req, "realloc: response not init"); 1532 1533 return NXT_UNIT_ERROR; 1534 } 1535 1536 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1537 nxt_unit_req_warn(req, "realloc: response already sent"); 1538 1539 return NXT_UNIT_ERROR; 1540 } 1541 1542 if (nxt_slow_path(max_fields_count < req->response->fields_count)) { 1543 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small"); 1544 1545 return NXT_UNIT_ERROR; 1546 } 1547 1548 /* 1549 * Each field name and value 0-terminated by libunit, 1550 * this is the reason of '+ 2' below. 1551 */ 1552 buf_size = sizeof(nxt_unit_response_t) 1553 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 1554 + max_fields_size; 1555 1556 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size); 1557 1558 buf = nxt_unit_response_buf_alloc(req, buf_size); 1559 if (nxt_slow_path(buf == NULL)) { 1560 nxt_unit_req_warn(req, "realloc: new buf allocation failed"); 1561 return NXT_UNIT_ERROR; 1562 } 1563 1564 resp = (nxt_unit_response_t *) buf->start; 1565 1566 memset(resp, 0, sizeof(nxt_unit_response_t)); 1567 1568 resp->status = req->response->status; 1569 resp->content_length = req->response->content_length; 1570 1571 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t); 1572 f = resp->fields; 1573 1574 for (i = 0; i < req->response->fields_count; i++) { 1575 src = req->response->fields + i; 1576 1577 if (nxt_slow_path(src->skip != 0)) { 1578 continue; 1579 } 1580 1581 if (nxt_slow_path(src->name_length + src->value_length + 2 1582 > (uint32_t) (buf->end - p))) 1583 { 1584 nxt_unit_req_warn(req, "realloc: not enough space for field" 1585 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required", 1586 i, src, src->name_length, src->value_length); 1587 1588 goto fail; 1589 } 1590 1591 nxt_unit_sptr_set(&f->name, p); 1592 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length); 1593 *p++ = '\0'; 1594 1595 nxt_unit_sptr_set(&f->value, p); 1596 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length); 1597 *p++ = '\0'; 1598 1599 f->hash = src->hash; 1600 f->skip = 0; 1601 f->name_length = src->name_length; 1602 f->value_length = src->value_length; 1603 1604 resp->fields_count++; 1605 f++; 1606 } 1607 1608 if (req->response->piggyback_content_length > 0) { 1609 if (nxt_slow_path(req->response->piggyback_content_length 1610 > (uint32_t) (buf->end - p))) 1611 { 1612 nxt_unit_req_warn(req, "realloc: not enought space for content" 1613 " #%"PRIu32", %"PRIu32" required", 1614 i, req->response->piggyback_content_length); 1615 1616 goto fail; 1617 } 1618 1619 resp->piggyback_content_length = 1620 req->response->piggyback_content_length; 1621 1622 nxt_unit_sptr_set(&resp->piggyback_content, p); 1623 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), 1624 req->response->piggyback_content_length); 1625 } 1626 1627 buf->free = p; 1628 1629 nxt_unit_buf_free(req->response_buf); 1630 1631 req->response = resp; 1632 req->response_buf = buf; 1633 req->response_max_fields = max_fields_count; 1634 1635 return NXT_UNIT_OK; 1636 1637 fail: 1638 1639 nxt_unit_buf_free(buf); 1640 1641 return NXT_UNIT_ERROR; 1642 } 1643 1644 1645 int 1646 nxt_unit_response_is_init(nxt_unit_request_info_t *req) 1647 { 1648 nxt_unit_request_info_impl_t *req_impl; 1649 1650 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1651 1652 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT; 1653 } 1654 1655 1656 int 1657 nxt_unit_response_add_field(nxt_unit_request_info_t *req, 1658 const char *name, uint8_t name_length, 1659 const char *value, uint32_t value_length) 1660 { 1661 nxt_unit_buf_t *buf; 1662 nxt_unit_field_t *f; 1663 nxt_unit_response_t *resp; 1664 nxt_unit_request_info_impl_t *req_impl; 1665 1666 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1667 1668 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) { 1669 nxt_unit_req_warn(req, "add_field: response not initialized or " 1670 "already sent"); 1671 1672 return NXT_UNIT_ERROR; 1673 } 1674 1675 resp = req->response; 1676 1677 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { 1678 nxt_unit_req_warn(req, "add_field: too many response fields"); 1679 1680 return NXT_UNIT_ERROR; 1681 } 1682 1683 buf = req->response_buf; 1684 1685 if (nxt_slow_path(name_length + value_length + 2 1686 > (uint32_t) (buf->end - buf->free))) 1687 { 1688 nxt_unit_req_warn(req, "add_field: response buffer overflow"); 1689 1690 return NXT_UNIT_ERROR; 1691 } 1692 1693 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s", 1694 resp->fields_count, 1695 (int) name_length, name, 1696 (int) value_length, value); 1697 1698 f = resp->fields + resp->fields_count; 1699 1700 nxt_unit_sptr_set(&f->name, buf->free); 1701 buf->free = nxt_cpymem(buf->free, name, name_length); 1702 *buf->free++ = '\0'; 1703 1704 nxt_unit_sptr_set(&f->value, buf->free); 1705 buf->free = nxt_cpymem(buf->free, value, value_length); 1706 *buf->free++ = '\0'; 1707 1708 f->hash = nxt_unit_field_hash(name, name_length); 1709 f->skip = 0; 1710 f->name_length = name_length; 1711 f->value_length = value_length; 1712 1713 resp->fields_count++; 1714 1715 return NXT_UNIT_OK; 1716 } 1717 1718 1719 int 1720 nxt_unit_response_add_content(nxt_unit_request_info_t *req, 1721 const void* src, uint32_t size) 1722 { 1723 nxt_unit_buf_t *buf; 1724 nxt_unit_response_t *resp; 1725 nxt_unit_request_info_impl_t *req_impl; 1726 1727 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1728 1729 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1730 nxt_unit_req_warn(req, "add_content: response not initialized yet"); 1731 1732 return NXT_UNIT_ERROR; 1733 } 1734 1735 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1736 nxt_unit_req_warn(req, "add_content: response already sent"); 1737 1738 return NXT_UNIT_ERROR; 1739 } 1740 1741 buf = req->response_buf; 1742 1743 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) { 1744 nxt_unit_req_warn(req, "add_content: buffer overflow"); 1745 1746 return NXT_UNIT_ERROR; 1747 } 1748 1749 resp = req->response; 1750 1751 if (resp->piggyback_content_length == 0) { 1752 nxt_unit_sptr_set(&resp->piggyback_content, buf->free); 1753 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT; 1754 } 1755 1756 resp->piggyback_content_length += size; 1757 1758 buf->free = nxt_cpymem(buf->free, src, size); 1759 1760 return NXT_UNIT_OK; 1761 } 1762 1763 1764 int 1765 nxt_unit_response_send(nxt_unit_request_info_t *req) 1766 { 1767 int rc; 1768 nxt_unit_mmap_buf_t *mmap_buf; 1769 nxt_unit_request_info_impl_t *req_impl; 1770 1771 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1772 1773 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1774 nxt_unit_req_warn(req, "send: response is not initialized yet"); 1775 1776 return NXT_UNIT_ERROR; 1777 } 1778 1779 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1780 nxt_unit_req_warn(req, "send: response already sent"); 1781 1782 return NXT_UNIT_ERROR; 1783 } 1784 1785 if (req->request->websocket_handshake && req->response->status == 101) { 1786 nxt_unit_response_upgrade(req); 1787 } 1788 1789 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 1790 req->response->fields_count, 1791 (int) (req->response_buf->free 1792 - req->response_buf->start)); 1793 1794 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 1795 1796 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); 1797 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 1798 req->response = NULL; 1799 req->response_buf = NULL; 1800 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 1801 1802 nxt_unit_mmap_buf_free(mmap_buf); 1803 } 1804 1805 return rc; 1806 } 1807 1808 1809 int 1810 nxt_unit_response_is_sent(nxt_unit_request_info_t *req) 1811 { 1812 nxt_unit_request_info_impl_t *req_impl; 1813 1814 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1815 1816 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 1817 } 1818 1819 1820 nxt_unit_buf_t * 1821 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 1822 { 1823 int rc; 1824 nxt_unit_mmap_buf_t *mmap_buf; 1825 nxt_unit_request_info_impl_t *req_impl; 1826 1827 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 1828 nxt_unit_req_warn(req, "response_buf_alloc: " 1829 "requested buffer (%"PRIu32") too big", size); 1830 1831 return NULL; 1832 } 1833 1834 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 1835 1836 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1837 1838 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 1839 if (nxt_slow_path(mmap_buf == NULL)) { 1840 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf"); 1841 1842 return NULL; 1843 } 1844 1845 mmap_buf->req = req; 1846 1847 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); 1848 1849 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 1850 &req->response_port, size, size, mmap_buf, 1851 NULL); 1852 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1853 nxt_unit_mmap_buf_release(mmap_buf); 1854 1855 return NULL; 1856 } 1857 1858 return &mmap_buf->buf; 1859 } 1860 1861 1862 static nxt_unit_process_t * 1863 nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1864 { 1865 nxt_unit_impl_t *lib; 1866 1867 if (recv_msg->process != NULL) { 1868 return recv_msg->process; 1869 } 1870 1871 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1872 1873 pthread_mutex_lock(&lib->mutex); 1874 1875 recv_msg->process = nxt_unit_process_find(lib, recv_msg->pid, 0); 1876 1877 pthread_mutex_unlock(&lib->mutex); 1878 1879 if (recv_msg->process == NULL) { 1880 nxt_unit_warn(ctx, "#%"PRIu32": process %d not found", 1881 recv_msg->stream, (int) recv_msg->pid); 1882 } 1883 1884 return recv_msg->process; 1885 } 1886 1887 1888 static nxt_unit_mmap_buf_t * 1889 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 1890 { 1891 nxt_unit_mmap_buf_t *mmap_buf; 1892 nxt_unit_ctx_impl_t *ctx_impl; 1893 1894 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1895 1896 pthread_mutex_lock(&ctx_impl->mutex); 1897 1898 if (ctx_impl->free_buf == NULL) { 1899 pthread_mutex_unlock(&ctx_impl->mutex); 1900 1901 mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t)); 1902 if (nxt_slow_path(mmap_buf == NULL)) { 1903 return NULL; 1904 } 1905 1906 } else { 1907 mmap_buf = ctx_impl->free_buf; 1908 1909 nxt_unit_mmap_buf_unlink(mmap_buf); 1910 1911 pthread_mutex_unlock(&ctx_impl->mutex); 1912 } 1913 1914 mmap_buf->ctx_impl = ctx_impl; 1915 1916 mmap_buf->hdr = NULL; 1917 mmap_buf->free_ptr = NULL; 1918 1919 return mmap_buf; 1920 } 1921 1922 1923 static void 1924 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 1925 { 1926 nxt_unit_mmap_buf_unlink(mmap_buf); 1927 1928 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); 1929 1930 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); 1931 1932 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex); 1933 } 1934 1935 1936 int 1937 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) 1938 { 1939 return req->request->websocket_handshake; 1940 } 1941 1942 1943 int 1944 nxt_unit_response_upgrade(nxt_unit_request_info_t *req) 1945 { 1946 int rc; 1947 nxt_unit_ctx_impl_t *ctx_impl; 1948 nxt_unit_request_info_impl_t *req_impl; 1949 1950 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1951 1952 if (nxt_slow_path(req_impl->websocket != 0)) { 1953 nxt_unit_req_debug(req, "upgrade: already upgraded"); 1954 1955 return NXT_UNIT_OK; 1956 } 1957 1958 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1959 nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); 1960 1961 return NXT_UNIT_ERROR; 1962 } 1963 1964 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1965 nxt_unit_req_warn(req, "upgrade: response already sent"); 1966 1967 return NXT_UNIT_ERROR; 1968 } 1969 1970 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1971 1972 rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl); 1973 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1974 nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); 1975 1976 return NXT_UNIT_ERROR; 1977 } 1978 1979 req_impl->websocket = 1; 1980 1981 req->response->status = 101; 1982 1983 return NXT_UNIT_OK; 1984 } 1985 1986 1987 int 1988 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) 1989 { 1990 nxt_unit_request_info_impl_t *req_impl; 1991 1992 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1993 1994 return req_impl->websocket; 1995 } 1996 1997 1998 nxt_unit_request_info_t * 1999 nxt_unit_get_request_info_from_data(void *data) 2000 { 2001 nxt_unit_request_info_impl_t *req_impl; 2002 2003 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); 2004 2005 return &req_impl->req; 2006 } 2007 2008 2009 int 2010 nxt_unit_buf_send(nxt_unit_buf_t *buf) 2011 { 2012 int rc; 2013 nxt_unit_mmap_buf_t *mmap_buf; 2014 nxt_unit_request_info_t *req; 2015 nxt_unit_request_info_impl_t *req_impl; 2016 2017 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2018 2019 req = mmap_buf->req; 2020 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2021 2022 nxt_unit_req_debug(req, "buf_send: %d bytes", 2023 (int) (buf->free - buf->start)); 2024 2025 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2026 nxt_unit_req_warn(req, "buf_send: response not initialized yet"); 2027 2028 return NXT_UNIT_ERROR; 2029 } 2030 2031 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 2032 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 2033 2034 return NXT_UNIT_ERROR; 2035 } 2036 2037 if (nxt_fast_path(buf->free > buf->start)) { 2038 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); 2039 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2040 return rc; 2041 } 2042 } 2043 2044 nxt_unit_mmap_buf_free(mmap_buf); 2045 2046 return NXT_UNIT_OK; 2047 } 2048 2049 2050 static void 2051 nxt_unit_buf_send_done(nxt_unit_buf_t *buf) 2052 { 2053 int rc; 2054 nxt_unit_mmap_buf_t *mmap_buf; 2055 nxt_unit_request_info_t *req; 2056 nxt_unit_request_info_impl_t *req_impl; 2057 2058 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2059 2060 req = mmap_buf->req; 2061 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2062 2063 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1); 2064 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 2065 nxt_unit_mmap_buf_free(mmap_buf); 2066 2067 nxt_unit_request_info_release(req); 2068 2069 } else { 2070 nxt_unit_request_done(req, rc); 2071 } 2072 } 2073 2074 2075 static int 2076 nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, 2077 nxt_unit_mmap_buf_t *mmap_buf, int last) 2078 { 2079 struct { 2080 nxt_port_msg_t msg; 2081 nxt_port_mmap_msg_t mmap_msg; 2082 } m; 2083 2084 int rc; 2085 u_char *last_used, *first_free; 2086 ssize_t res; 2087 nxt_chunk_id_t first_free_chunk; 2088 nxt_unit_buf_t *buf; 2089 nxt_unit_impl_t *lib; 2090 nxt_port_mmap_header_t *hdr; 2091 2092 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2093 2094 buf = &mmap_buf->buf; 2095 hdr = mmap_buf->hdr; 2096 2097 m.mmap_msg.size = buf->free - buf->start; 2098 2099 m.msg.stream = stream; 2100 m.msg.pid = lib->pid; 2101 m.msg.reply_port = 0; 2102 m.msg.type = _NXT_PORT_MSG_DATA; 2103 m.msg.last = last != 0; 2104 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; 2105 m.msg.nf = 0; 2106 m.msg.mf = 0; 2107 m.msg.tracking = 0; 2108 2109 rc = NXT_UNIT_ERROR; 2110 2111 if (m.msg.mmap) { 2112 m.mmap_msg.mmap_id = hdr->id; 2113 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, 2114 (u_char *) buf->start); 2115 2116 nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 2117 stream, 2118 (int) m.mmap_msg.mmap_id, 2119 (int) m.mmap_msg.chunk_id, 2120 (int) m.mmap_msg.size); 2121 2122 res = nxt_unit_port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), 2123 NULL, 0); 2124 if (nxt_slow_path(res != sizeof(m))) { 2125 goto free_buf; 2126 } 2127 2128 last_used = (u_char *) buf->free - 1; 2129 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 2130 2131 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { 2132 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 2133 2134 buf->start = (char *) first_free; 2135 buf->free = buf->start; 2136 2137 if (buf->end < buf->start) { 2138 buf->end = buf->start; 2139 } 2140 2141 } else { 2142 buf->start = NULL; 2143 buf->free = NULL; 2144 buf->end = NULL; 2145 2146 mmap_buf->hdr = NULL; 2147 } 2148 2149 nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks, 2150 (int) m.mmap_msg.chunk_id - (int) first_free_chunk); 2151 2152 nxt_unit_debug(ctx, "process %d allocated_chunks %d", 2153 mmap_buf->process->pid, 2154 (int) mmap_buf->process->outgoing.allocated_chunks); 2155 2156 } else { 2157 if (nxt_slow_path(mmap_buf->plain_ptr == NULL 2158 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) 2159 { 2160 nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer" 2161 ": no space reserved for message header", stream); 2162 2163 goto free_buf; 2164 } 2165 2166 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); 2167 2168 nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d", 2169 stream, 2170 (int) (sizeof(m.msg) + m.mmap_msg.size)); 2171 2172 res = nxt_unit_port_send(ctx, &mmap_buf->port_id, 2173 buf->start - sizeof(m.msg), 2174 m.mmap_msg.size + sizeof(m.msg), 2175 NULL, 0); 2176 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { 2177 goto free_buf; 2178 } 2179 } 2180 2181 rc = NXT_UNIT_OK; 2182 2183 free_buf: 2184 2185 nxt_unit_free_outgoing_buf(mmap_buf); 2186 2187 return rc; 2188 } 2189 2190 2191 void 2192 nxt_unit_buf_free(nxt_unit_buf_t *buf) 2193 { 2194 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); 2195 } 2196 2197 2198 static void 2199 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) 2200 { 2201 nxt_unit_free_outgoing_buf(mmap_buf); 2202 2203 nxt_unit_mmap_buf_release(mmap_buf); 2204 } 2205 2206 2207 static void 2208 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) 2209 { 2210 if (mmap_buf->hdr != NULL) { 2211 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, 2212 mmap_buf->process, 2213 mmap_buf->hdr, mmap_buf->buf.start, 2214 mmap_buf->buf.end - mmap_buf->buf.start); 2215 2216 mmap_buf->hdr = NULL; 2217 2218 return; 2219 } 2220 2221 if (mmap_buf->free_ptr != NULL) { 2222 free(mmap_buf->free_ptr); 2223 2224 mmap_buf->free_ptr = NULL; 2225 } 2226 } 2227 2228 2229 static nxt_unit_read_buf_t * 2230 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) 2231 { 2232 nxt_unit_ctx_impl_t *ctx_impl; 2233 2234 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2235 2236 pthread_mutex_lock(&ctx_impl->mutex); 2237 2238 return nxt_unit_read_buf_get_impl(ctx_impl); 2239 } 2240 2241 2242 static nxt_unit_read_buf_t * 2243 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) 2244 { 2245 nxt_unit_read_buf_t *rbuf; 2246 2247 if (ctx_impl->free_read_buf != NULL) { 2248 rbuf = ctx_impl->free_read_buf; 2249 ctx_impl->free_read_buf = rbuf->next; 2250 2251 pthread_mutex_unlock(&ctx_impl->mutex); 2252 2253 return rbuf; 2254 } 2255 2256 pthread_mutex_unlock(&ctx_impl->mutex); 2257 2258 rbuf = malloc(sizeof(nxt_unit_read_buf_t)); 2259 2260 return rbuf; 2261 } 2262 2263 2264 static void 2265 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 2266 nxt_unit_read_buf_t *rbuf) 2267 { 2268 nxt_unit_ctx_impl_t *ctx_impl; 2269 2270 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2271 2272 pthread_mutex_lock(&ctx_impl->mutex); 2273 2274 rbuf->next = ctx_impl->free_read_buf; 2275 ctx_impl->free_read_buf = rbuf; 2276 2277 pthread_mutex_unlock(&ctx_impl->mutex); 2278 } 2279 2280 2281 nxt_unit_buf_t * 2282 nxt_unit_buf_next(nxt_unit_buf_t *buf) 2283 { 2284 nxt_unit_mmap_buf_t *mmap_buf; 2285 2286 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2287 2288 if (mmap_buf->next == NULL) { 2289 return NULL; 2290 } 2291 2292 return &mmap_buf->next->buf; 2293 } 2294 2295 2296 uint32_t 2297 nxt_unit_buf_max(void) 2298 { 2299 return PORT_MMAP_DATA_SIZE; 2300 } 2301 2302 2303 uint32_t 2304 nxt_unit_buf_min(void) 2305 { 2306 return PORT_MMAP_CHUNK_SIZE; 2307 } 2308 2309 2310 int 2311 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 2312 size_t size) 2313 { 2314 ssize_t res; 2315 2316 res = nxt_unit_response_write_nb(req, start, size, size); 2317 2318 return res < 0 ? -res : NXT_UNIT_OK; 2319 } 2320 2321 2322 ssize_t 2323 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, 2324 size_t size, size_t min_size) 2325 { 2326 int rc; 2327 ssize_t sent; 2328 uint32_t part_size, min_part_size, buf_size; 2329 const char *part_start; 2330 nxt_unit_mmap_buf_t mmap_buf; 2331 nxt_unit_request_info_impl_t *req_impl; 2332 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2333 2334 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2335 2336 part_start = start; 2337 sent = 0; 2338 2339 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2340 nxt_unit_req_warn(req, "write: response not initialized yet"); 2341 2342 return -NXT_UNIT_ERROR; 2343 } 2344 2345 /* Check if response is not send yet. */ 2346 if (nxt_slow_path(req->response_buf != NULL)) { 2347 part_size = req->response_buf->end - req->response_buf->free; 2348 part_size = nxt_min(size, part_size); 2349 2350 rc = nxt_unit_response_add_content(req, part_start, part_size); 2351 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2352 return -rc; 2353 } 2354 2355 rc = nxt_unit_response_send(req); 2356 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2357 return -rc; 2358 } 2359 2360 size -= part_size; 2361 part_start += part_size; 2362 sent += part_size; 2363 2364 min_size -= nxt_min(min_size, part_size); 2365 } 2366 2367 while (size > 0) { 2368 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 2369 min_part_size = nxt_min(min_size, part_size); 2370 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); 2371 2372 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 2373 &req->response_port, part_size, 2374 min_part_size, &mmap_buf, local_buf); 2375 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2376 return -rc; 2377 } 2378 2379 buf_size = mmap_buf.buf.end - mmap_buf.buf.free; 2380 if (nxt_slow_path(buf_size == 0)) { 2381 return sent; 2382 } 2383 part_size = nxt_min(buf_size, part_size); 2384 2385 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 2386 part_start, part_size); 2387 2388 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); 2389 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2390 return -rc; 2391 } 2392 2393 size -= part_size; 2394 part_start += part_size; 2395 sent += part_size; 2396 2397 min_size -= nxt_min(min_size, part_size); 2398 } 2399 2400 return sent; 2401 } 2402 2403 2404 int 2405 nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 2406 nxt_unit_read_info_t *read_info) 2407 { 2408 int rc; 2409 ssize_t n; 2410 uint32_t buf_size; 2411 nxt_unit_buf_t *buf; 2412 nxt_unit_mmap_buf_t mmap_buf; 2413 nxt_unit_request_info_impl_t *req_impl; 2414 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2415 2416 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2417 2418 /* Check if response is not send yet. */ 2419 if (nxt_slow_path(req->response_buf)) { 2420 2421 /* Enable content in headers buf. */ 2422 rc = nxt_unit_response_add_content(req, "", 0); 2423 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2424 nxt_unit_req_error(req, "Failed to add piggyback content"); 2425 2426 return rc; 2427 } 2428 2429 buf = req->response_buf; 2430 2431 while (buf->end - buf->free > 0) { 2432 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2433 if (nxt_slow_path(n < 0)) { 2434 nxt_unit_req_error(req, "Read error"); 2435 2436 return NXT_UNIT_ERROR; 2437 } 2438 2439 /* Manually increase sizes. */ 2440 buf->free += n; 2441 req->response->piggyback_content_length += n; 2442 2443 if (read_info->eof) { 2444 break; 2445 } 2446 } 2447 2448 rc = nxt_unit_response_send(req); 2449 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2450 nxt_unit_req_error(req, "Failed to send headers with content"); 2451 2452 return rc; 2453 } 2454 2455 if (read_info->eof) { 2456 return NXT_UNIT_OK; 2457 } 2458 } 2459 2460 while (!read_info->eof) { 2461 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", 2462 read_info->buf_size); 2463 2464 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); 2465 2466 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 2467 &req->response_port, 2468 buf_size, buf_size, 2469 &mmap_buf, local_buf); 2470 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2471 return rc; 2472 } 2473 2474 buf = &mmap_buf.buf; 2475 2476 while (!read_info->eof && buf->end > buf->free) { 2477 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2478 if (nxt_slow_path(n < 0)) { 2479 nxt_unit_req_error(req, "Read error"); 2480 2481 nxt_unit_free_outgoing_buf(&mmap_buf); 2482 2483 return NXT_UNIT_ERROR; 2484 } 2485 2486 buf->free += n; 2487 } 2488 2489 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); 2490 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2491 nxt_unit_req_error(req, "Failed to send content"); 2492 2493 return rc; 2494 } 2495 } 2496 2497 return NXT_UNIT_OK; 2498 } 2499 2500 2501 ssize_t 2502 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 2503 { 2504 ssize_t buf_res, res; 2505 2506 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, 2507 dst, size); 2508 2509 if (buf_res < (ssize_t) size && req->content_fd != -1) { 2510 res = read(req->content_fd, dst, size); 2511 if (res < 0) { 2512 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 2513 strerror(errno), errno); 2514 2515 return res; 2516 } 2517 2518 if (res < (ssize_t) size) { 2519 close(req->content_fd); 2520 2521 req->content_fd = -1; 2522 } 2523 2524 req->content_length -= res; 2525 size -= res; 2526 2527 dst = nxt_pointer_to(dst, res); 2528 2529 } else { 2530 res = 0; 2531 } 2532 2533 return buf_res + res; 2534 } 2535 2536 2537 ssize_t 2538 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size) 2539 { 2540 char *p; 2541 size_t l_size, b_size; 2542 nxt_unit_buf_t *b; 2543 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf; 2544 2545 if (req->content_length == 0) { 2546 return 0; 2547 } 2548 2549 l_size = 0; 2550 2551 b = req->content_buf; 2552 2553 while (b != NULL) { 2554 b_size = b->end - b->free; 2555 p = memchr(b->free, '\n', b_size); 2556 2557 if (p != NULL) { 2558 p++; 2559 l_size += p - b->free; 2560 break; 2561 } 2562 2563 l_size += b_size; 2564 2565 if (max_size <= l_size) { 2566 break; 2567 } 2568 2569 mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf); 2570 if (mmap_buf->next == NULL 2571 && req->content_fd != -1 2572 && l_size < req->content_length) 2573 { 2574 preread_buf = nxt_unit_request_preread(req, 16384); 2575 if (nxt_slow_path(preread_buf == NULL)) { 2576 return -1; 2577 } 2578 2579 nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf); 2580 } 2581 2582 b = nxt_unit_buf_next(b); 2583 } 2584 2585 return nxt_min(max_size, l_size); 2586 } 2587 2588 2589 static nxt_unit_mmap_buf_t * 2590 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) 2591 { 2592 ssize_t res; 2593 nxt_unit_mmap_buf_t *mmap_buf; 2594 2595 if (req->content_fd == -1) { 2596 nxt_unit_req_alert(req, "preread: content_fd == -1"); 2597 return NULL; 2598 } 2599 2600 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 2601 if (nxt_slow_path(mmap_buf == NULL)) { 2602 nxt_unit_req_alert(req, "preread: failed to allocate buf"); 2603 return NULL; 2604 } 2605 2606 mmap_buf->free_ptr = malloc(size); 2607 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 2608 nxt_unit_req_alert(req, "preread: failed to allocate buf memory"); 2609 nxt_unit_mmap_buf_release(mmap_buf); 2610 return NULL; 2611 } 2612 2613 mmap_buf->plain_ptr = mmap_buf->free_ptr; 2614 2615 mmap_buf->hdr = NULL; 2616 mmap_buf->buf.start = mmap_buf->free_ptr; 2617 mmap_buf->buf.free = mmap_buf->buf.start; 2618 mmap_buf->buf.end = mmap_buf->buf.start + size; 2619 mmap_buf->process = NULL; 2620 2621 res = read(req->content_fd, mmap_buf->free_ptr, size); 2622 if (res < 0) { 2623 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 2624 strerror(errno), errno); 2625 2626 nxt_unit_mmap_buf_free(mmap_buf); 2627 2628 return NULL; 2629 } 2630 2631 if (res < (ssize_t) size) { 2632 close(req->content_fd); 2633 2634 req->content_fd = -1; 2635 } 2636 2637 nxt_unit_req_debug(req, "preread: read %d", (int) res); 2638 2639 mmap_buf->buf.end = mmap_buf->buf.free + res; 2640 2641 return mmap_buf; 2642 } 2643 2644 2645 static ssize_t 2646 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) 2647 { 2648 u_char *p; 2649 size_t rest, copy, read; 2650 nxt_unit_buf_t *buf, *last_buf; 2651 2652 p = dst; 2653 rest = size; 2654 2655 buf = *b; 2656 last_buf = buf; 2657 2658 while (buf != NULL) { 2659 last_buf = buf; 2660 2661 copy = buf->end - buf->free; 2662 copy = nxt_min(rest, copy); 2663 2664 p = nxt_cpymem(p, buf->free, copy); 2665 2666 buf->free += copy; 2667 rest -= copy; 2668 2669 if (rest == 0) { 2670 if (buf->end == buf->free) { 2671 buf = nxt_unit_buf_next(buf); 2672 } 2673 2674 break; 2675 } 2676 2677 buf = nxt_unit_buf_next(buf); 2678 } 2679 2680 *b = last_buf; 2681 2682 read = size - rest; 2683 2684 *len -= read; 2685 2686 return read; 2687 } 2688 2689 2690 void 2691 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 2692 { 2693 uint32_t size; 2694 nxt_port_msg_t msg; 2695 nxt_unit_impl_t *lib; 2696 nxt_unit_request_info_impl_t *req_impl; 2697 2698 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2699 2700 nxt_unit_req_debug(req, "done: %d", rc); 2701 2702 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2703 goto skip_response_send; 2704 } 2705 2706 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2707 2708 size = nxt_length("Content-Type") + nxt_length("text/plain"); 2709 2710 rc = nxt_unit_response_init(req, 200, 1, size); 2711 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2712 goto skip_response_send; 2713 } 2714 2715 rc = nxt_unit_response_add_field(req, "Content-Type", 2716 nxt_length("Content-Type"), 2717 "text/plain", nxt_length("text/plain")); 2718 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2719 goto skip_response_send; 2720 } 2721 } 2722 2723 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 2724 2725 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 2726 2727 nxt_unit_buf_send_done(req->response_buf); 2728 2729 return; 2730 } 2731 2732 skip_response_send: 2733 2734 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); 2735 2736 msg.stream = req_impl->stream; 2737 msg.pid = lib->pid; 2738 msg.reply_port = 0; 2739 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 2740 : _NXT_PORT_MSG_RPC_ERROR; 2741 msg.last = 1; 2742 msg.mmap = 0; 2743 msg.nf = 0; 2744 msg.mf = 0; 2745 msg.tracking = 0; 2746 2747 (void) nxt_unit_port_send(req->ctx, &req->response_port, 2748 &msg, sizeof(msg), NULL, 0); 2749 2750 nxt_unit_request_info_release(req); 2751 } 2752 2753 2754 int 2755 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 2756 uint8_t last, const void *start, size_t size) 2757 { 2758 const struct iovec iov = { (void *) start, size }; 2759 2760 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1); 2761 } 2762 2763 2764 int 2765 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, 2766 uint8_t last, const struct iovec *iov, int iovcnt) 2767 { 2768 int i, rc; 2769 size_t l, copy; 2770 uint32_t payload_len, buf_size, alloc_size; 2771 const uint8_t *b; 2772 nxt_unit_buf_t *buf; 2773 nxt_unit_mmap_buf_t mmap_buf; 2774 nxt_websocket_header_t *wh; 2775 nxt_unit_request_info_impl_t *req_impl; 2776 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2777 2778 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2779 2780 payload_len = 0; 2781 2782 for (i = 0; i < iovcnt; i++) { 2783 payload_len += iov[i].iov_len; 2784 } 2785 2786 buf_size = 10 + payload_len; 2787 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 2788 2789 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 2790 &req->response_port, 2791 alloc_size, alloc_size, 2792 &mmap_buf, local_buf); 2793 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2794 return rc; 2795 } 2796 2797 buf = &mmap_buf.buf; 2798 2799 buf->start[0] = 0; 2800 buf->start[1] = 0; 2801 2802 buf_size -= buf->end - buf->start; 2803 2804 wh = (void *) buf->free; 2805 2806 buf->free = nxt_websocket_frame_init(wh, payload_len); 2807 wh->fin = last; 2808 wh->opcode = opcode; 2809 2810 for (i = 0; i < iovcnt; i++) { 2811 b = iov[i].iov_base; 2812 l = iov[i].iov_len; 2813 2814 while (l > 0) { 2815 copy = buf->end - buf->free; 2816 copy = nxt_min(l, copy); 2817 2818 buf->free = nxt_cpymem(buf->free, b, copy); 2819 b += copy; 2820 l -= copy; 2821 2822 if (l > 0) { 2823 if (nxt_fast_path(buf->free > buf->start)) { 2824 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, 2825 &mmap_buf, 0); 2826 2827 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2828 return rc; 2829 } 2830 } 2831 2832 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 2833 2834 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 2835 &req->response_port, 2836 alloc_size, alloc_size, 2837 &mmap_buf, local_buf); 2838 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2839 return rc; 2840 } 2841 2842 buf_size -= buf->end - buf->start; 2843 } 2844 } 2845 } 2846 2847 if (buf->free > buf->start) { 2848 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, 2849 &mmap_buf, 0); 2850 } 2851 2852 return rc; 2853 } 2854 2855 2856 ssize_t 2857 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, 2858 size_t size) 2859 { 2860 ssize_t res; 2861 uint8_t *b; 2862 uint64_t i, d; 2863 2864 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length, 2865 dst, size); 2866 2867 if (ws->mask == NULL) { 2868 return res; 2869 } 2870 2871 b = dst; 2872 d = (ws->payload_len - ws->content_length - res) % 4; 2873 2874 for (i = 0; i < (uint64_t) res; i++) { 2875 b[i] ^= ws->mask[ (i + d) % 4 ]; 2876 } 2877 2878 return res; 2879 } 2880 2881 2882 int 2883 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) 2884 { 2885 char *b; 2886 size_t size; 2887 nxt_unit_websocket_frame_impl_t *ws_impl; 2888 2889 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 2890 2891 if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) { 2892 return NXT_UNIT_OK; 2893 } 2894 2895 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start; 2896 2897 b = malloc(size); 2898 if (nxt_slow_path(b == NULL)) { 2899 return NXT_UNIT_ERROR; 2900 } 2901 2902 memcpy(b, ws_impl->buf->buf.start, size); 2903 2904 ws_impl->buf->buf.start = b; 2905 ws_impl->buf->buf.free = b; 2906 ws_impl->buf->buf.end = b + size; 2907 2908 ws_impl->buf->free_ptr = b; 2909 2910 return NXT_UNIT_OK; 2911 } 2912 2913 2914 void 2915 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) 2916 { 2917 nxt_unit_websocket_frame_release(ws); 2918 } 2919 2920 2921 static nxt_port_mmap_header_t * 2922 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 2923 nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n) 2924 { 2925 int res, nchunks, i; 2926 uint32_t outgoing_size; 2927 nxt_unit_mmap_t *mm, *mm_end; 2928 nxt_unit_impl_t *lib; 2929 nxt_port_mmap_header_t *hdr; 2930 2931 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2932 2933 pthread_mutex_lock(&process->outgoing.mutex); 2934 2935 retry: 2936 2937 outgoing_size = process->outgoing.size; 2938 2939 mm_end = process->outgoing.elts + outgoing_size; 2940 2941 for (mm = process->outgoing.elts; mm < mm_end; mm++) { 2942 hdr = mm->hdr; 2943 2944 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) { 2945 continue; 2946 } 2947 2948 *c = 0; 2949 2950 while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) { 2951 nchunks = 1; 2952 2953 while (nchunks < *n) { 2954 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, 2955 *c + nchunks); 2956 2957 if (res == 0) { 2958 if (nchunks >= min_n) { 2959 *n = nchunks; 2960 2961 goto unlock; 2962 } 2963 2964 for (i = 0; i < nchunks; i++) { 2965 nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i); 2966 } 2967 2968 *c += nchunks + 1; 2969 nchunks = 0; 2970 break; 2971 } 2972 2973 nchunks++; 2974 } 2975 2976 if (nchunks >= min_n) { 2977 *n = nchunks; 2978 2979 goto unlock; 2980 } 2981 } 2982 2983 hdr->oosm = 1; 2984 } 2985 2986 if (outgoing_size >= lib->shm_mmap_limit) { 2987 /* Cannot allocate more shared memory. */ 2988 pthread_mutex_unlock(&process->outgoing.mutex); 2989 2990 if (min_n == 0) { 2991 *n = 0; 2992 } 2993 2994 if (nxt_slow_path(process->outgoing.allocated_chunks + min_n 2995 >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) 2996 { 2997 /* Memory allocated by application, but not send to router. */ 2998 return NULL; 2999 } 3000 3001 /* Notify router about OOSM condition. */ 3002 3003 res = nxt_unit_send_oosm(ctx, port_id); 3004 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3005 return NULL; 3006 } 3007 3008 /* Return if caller can handle OOSM condition. Non-blocking mode. */ 3009 3010 if (min_n == 0) { 3011 return NULL; 3012 } 3013 3014 nxt_unit_debug(ctx, "oosm: waiting for ACK"); 3015 3016 res = nxt_unit_wait_shm_ack(ctx); 3017 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3018 return NULL; 3019 } 3020 3021 nxt_unit_debug(ctx, "oosm: retry"); 3022 3023 pthread_mutex_lock(&process->outgoing.mutex); 3024 3025 goto retry; 3026 } 3027 3028 *c = 0; 3029 hdr = nxt_unit_new_mmap(ctx, process, port_id, *n); 3030 3031 unlock: 3032 3033 nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n); 3034 3035 nxt_unit_debug(ctx, "process %d allocated_chunks %d", 3036 process->pid, 3037 (int) process->outgoing.allocated_chunks); 3038 3039 pthread_mutex_unlock(&process->outgoing.mutex); 3040 3041 return hdr; 3042 } 3043 3044 3045 static int 3046 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 3047 { 3048 ssize_t res; 3049 nxt_port_msg_t msg; 3050 nxt_unit_impl_t *lib; 3051 3052 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3053 3054 msg.stream = 0; 3055 msg.pid = lib->pid; 3056 msg.reply_port = 0; 3057 msg.type = _NXT_PORT_MSG_OOSM; 3058 msg.last = 0; 3059 msg.mmap = 0; 3060 msg.nf = 0; 3061 msg.mf = 0; 3062 msg.tracking = 0; 3063 3064 res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); 3065 if (nxt_slow_path(res != sizeof(msg))) { 3066 return NXT_UNIT_ERROR; 3067 } 3068 3069 return NXT_UNIT_OK; 3070 } 3071 3072 3073 static int 3074 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) 3075 { 3076 nxt_port_msg_t *port_msg; 3077 nxt_unit_ctx_impl_t *ctx_impl; 3078 nxt_unit_read_buf_t *rbuf; 3079 3080 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3081 3082 while (1) { 3083 rbuf = nxt_unit_read_buf_get(ctx); 3084 if (nxt_slow_path(rbuf == NULL)) { 3085 return NXT_UNIT_ERROR; 3086 } 3087 3088 nxt_unit_read_buf(ctx, rbuf); 3089 3090 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 3091 nxt_unit_read_buf_release(ctx, rbuf); 3092 3093 return NXT_UNIT_ERROR; 3094 } 3095 3096 port_msg = (nxt_port_msg_t *) rbuf->buf; 3097 3098 if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) { 3099 nxt_unit_read_buf_release(ctx, rbuf); 3100 3101 break; 3102 } 3103 3104 pthread_mutex_lock(&ctx_impl->mutex); 3105 3106 *ctx_impl->pending_read_tail = rbuf; 3107 ctx_impl->pending_read_tail = &rbuf->next; 3108 rbuf->next = NULL; 3109 3110 pthread_mutex_unlock(&ctx_impl->mutex); 3111 3112 if (port_msg->type == _NXT_PORT_MSG_QUIT) { 3113 nxt_unit_debug(ctx, "oosm: quit received"); 3114 3115 return NXT_UNIT_ERROR; 3116 } 3117 } 3118 3119 return NXT_UNIT_OK; 3120 } 3121 3122 3123 static nxt_unit_mmap_t * 3124 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) 3125 { 3126 uint32_t cap; 3127 3128 cap = mmaps->cap; 3129 3130 if (cap == 0) { 3131 cap = i + 1; 3132 } 3133 3134 while (i + 1 > cap) { 3135 3136 if (cap < 16) { 3137 cap = cap * 2; 3138 3139 } else { 3140 cap = cap + cap / 2; 3141 } 3142 } 3143 3144 if (cap != mmaps->cap) { 3145 3146 mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts)); 3147 if (nxt_slow_path(mmaps->elts == NULL)) { 3148 return NULL; 3149 } 3150 3151 memset(mmaps->elts + mmaps->cap, 0, 3152 sizeof(*mmaps->elts) * (cap - mmaps->cap)); 3153 3154 mmaps->cap = cap; 3155 } 3156 3157 if (i + 1 > mmaps->size) { 3158 mmaps->size = i + 1; 3159 } 3160 3161 return mmaps->elts + i; 3162 } 3163 3164 3165 static nxt_port_mmap_header_t * 3166 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 3167 nxt_unit_port_id_t *port_id, int n) 3168 { 3169 int i, fd, rc; 3170 void *mem; 3171 char name[64]; 3172 nxt_unit_mmap_t *mm; 3173 nxt_unit_impl_t *lib; 3174 nxt_port_mmap_header_t *hdr; 3175 3176 lib = process->lib; 3177 3178 mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size); 3179 if (nxt_slow_path(mm == NULL)) { 3180 nxt_unit_warn(ctx, "failed to add mmap to outgoing array"); 3181 3182 return NULL; 3183 } 3184 3185 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", 3186 lib->pid, (void *) pthread_self()); 3187 3188 #if (NXT_HAVE_MEMFD_CREATE) 3189 3190 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 3191 if (nxt_slow_path(fd == -1)) { 3192 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, 3193 strerror(errno), errno); 3194 3195 goto remove_fail; 3196 } 3197 3198 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); 3199 3200 #elif (NXT_HAVE_SHM_OPEN_ANON) 3201 3202 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 3203 if (nxt_slow_path(fd == -1)) { 3204 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", 3205 strerror(errno), errno); 3206 3207 goto remove_fail; 3208 } 3209 3210 #elif (NXT_HAVE_SHM_OPEN) 3211 3212 /* Just in case. */ 3213 shm_unlink(name); 3214 3215 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 3216 if (nxt_slow_path(fd == -1)) { 3217 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, 3218 strerror(errno), errno); 3219 3220 goto remove_fail; 3221 } 3222 3223 if (nxt_slow_path(shm_unlink(name) == -1)) { 3224 nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name, 3225 strerror(errno), errno); 3226 } 3227 3228 #else 3229 3230 #error No working shared memory implementation. 3231 3232 #endif 3233 3234 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { 3235 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, 3236 strerror(errno), errno); 3237 3238 goto remove_fail; 3239 } 3240 3241 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 3242 if (nxt_slow_path(mem == MAP_FAILED)) { 3243 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, 3244 strerror(errno), errno); 3245 3246 goto remove_fail; 3247 } 3248 3249 mm->hdr = mem; 3250 hdr = mem; 3251 3252 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 3253 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 3254 3255 hdr->id = process->outgoing.size - 1; 3256 hdr->src_pid = lib->pid; 3257 hdr->dst_pid = process->pid; 3258 hdr->sent_over = port_id->id; 3259 3260 /* Mark first n chunk(s) as busy */ 3261 for (i = 0; i < n; i++) { 3262 nxt_port_mmap_set_chunk_busy(hdr->free_map, i); 3263 } 3264 3265 /* Mark as busy chunk followed the last available chunk. */ 3266 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 3267 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 3268 3269 pthread_mutex_unlock(&process->outgoing.mutex); 3270 3271 rc = nxt_unit_send_mmap(ctx, port_id, fd); 3272 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3273 munmap(mem, PORT_MMAP_SIZE); 3274 hdr = NULL; 3275 3276 } else { 3277 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", 3278 hdr->id, (int) lib->pid, (int) process->pid); 3279 } 3280 3281 close(fd); 3282 3283 pthread_mutex_lock(&process->outgoing.mutex); 3284 3285 if (nxt_fast_path(hdr != NULL)) { 3286 return hdr; 3287 } 3288 3289 remove_fail: 3290 3291 process->outgoing.size--; 3292 3293 return NULL; 3294 } 3295 3296 3297 static int 3298 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) 3299 { 3300 ssize_t res; 3301 nxt_port_msg_t msg; 3302 nxt_unit_impl_t *lib; 3303 union { 3304 struct cmsghdr cm; 3305 char space[CMSG_SPACE(sizeof(int))]; 3306 } cmsg; 3307 3308 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3309 3310 msg.stream = 0; 3311 msg.pid = lib->pid; 3312 msg.reply_port = 0; 3313 msg.type = _NXT_PORT_MSG_MMAP; 3314 msg.last = 0; 3315 msg.mmap = 0; 3316 msg.nf = 0; 3317 msg.mf = 0; 3318 msg.tracking = 0; 3319 3320 /* 3321 * Fill all padding fields with 0. 3322 * Code in Go 1.11 validate cmsghdr using padding field as part of len. 3323 * See Cmsghdr definition and socketControlMessageHeaderAndData function. 3324 */ 3325 memset(&cmsg, 0, sizeof(cmsg)); 3326 3327 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 3328 cmsg.cm.cmsg_level = SOL_SOCKET; 3329 cmsg.cm.cmsg_type = SCM_RIGHTS; 3330 3331 /* 3332 * memcpy() is used instead of simple 3333 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 3334 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 3335 * dereferencing type-punned pointer will break strict-aliasing rules 3336 * 3337 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 3338 * in the same simple assignment as in the code above. 3339 */ 3340 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 3341 3342 res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), 3343 &cmsg, sizeof(cmsg)); 3344 if (nxt_slow_path(res != sizeof(msg))) { 3345 return NXT_UNIT_ERROR; 3346 } 3347 3348 return NXT_UNIT_OK; 3349 } 3350 3351 3352 static int 3353 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 3354 nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size, 3355 nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) 3356 { 3357 int nchunks, min_nchunks; 3358 nxt_chunk_id_t c; 3359 nxt_port_mmap_header_t *hdr; 3360 3361 if (size <= NXT_UNIT_MAX_PLAIN_SIZE) { 3362 if (local_buf != NULL) { 3363 mmap_buf->free_ptr = NULL; 3364 mmap_buf->plain_ptr = local_buf; 3365 3366 } else { 3367 mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t)); 3368 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 3369 return NXT_UNIT_ERROR; 3370 } 3371 3372 mmap_buf->plain_ptr = mmap_buf->free_ptr; 3373 } 3374 3375 mmap_buf->hdr = NULL; 3376 mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); 3377 mmap_buf->buf.free = mmap_buf->buf.start; 3378 mmap_buf->buf.end = mmap_buf->buf.start + size; 3379 mmap_buf->port_id = *port_id; 3380 mmap_buf->process = process; 3381 3382 nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", 3383 mmap_buf->buf.start, (int) size); 3384 3385 return NXT_UNIT_OK; 3386 } 3387 3388 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3389 min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3390 3391 hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks); 3392 if (nxt_slow_path(hdr == NULL)) { 3393 if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) { 3394 mmap_buf->hdr = NULL; 3395 mmap_buf->buf.start = NULL; 3396 mmap_buf->buf.free = NULL; 3397 mmap_buf->buf.end = NULL; 3398 mmap_buf->free_ptr = NULL; 3399 3400 return NXT_UNIT_OK; 3401 } 3402 3403 return NXT_UNIT_ERROR; 3404 } 3405 3406 mmap_buf->hdr = hdr; 3407 mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); 3408 mmap_buf->buf.free = mmap_buf->buf.start; 3409 mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; 3410 mmap_buf->port_id = *port_id; 3411 mmap_buf->process = process; 3412 mmap_buf->free_ptr = NULL; 3413 mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3414 3415 nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", 3416 (int) hdr->id, (int) c, 3417 (int) (nchunks * PORT_MMAP_CHUNK_SIZE)); 3418 3419 return NXT_UNIT_OK; 3420 } 3421 3422 3423 static int 3424 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) 3425 { 3426 int rc; 3427 void *mem; 3428 struct stat mmap_stat; 3429 nxt_unit_mmap_t *mm; 3430 nxt_unit_impl_t *lib; 3431 nxt_unit_process_t *process; 3432 nxt_port_mmap_header_t *hdr; 3433 3434 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3435 3436 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); 3437 3438 pthread_mutex_lock(&lib->mutex); 3439 3440 process = nxt_unit_process_find(lib, pid, 0); 3441 3442 pthread_mutex_unlock(&lib->mutex); 3443 3444 if (nxt_slow_path(process == NULL)) { 3445 nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d", 3446 (int) pid, fd); 3447 3448 return NXT_UNIT_ERROR; 3449 } 3450 3451 rc = NXT_UNIT_ERROR; 3452 3453 if (fstat(fd, &mmap_stat) == -1) { 3454 nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, 3455 strerror(errno), errno); 3456 3457 goto fail; 3458 } 3459 3460 mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE, 3461 MAP_SHARED, fd, 0); 3462 if (nxt_slow_path(mem == MAP_FAILED)) { 3463 nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)", 3464 strerror(errno), errno); 3465 3466 goto fail; 3467 } 3468 3469 hdr = mem; 3470 3471 if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) { 3472 3473 nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " 3474 "detected: %d != %d or %d != %d", (int) hdr->src_pid, 3475 (int) pid, (int) hdr->dst_pid, (int) lib->pid); 3476 3477 munmap(mem, PORT_MMAP_SIZE); 3478 3479 goto fail; 3480 } 3481 3482 pthread_mutex_lock(&process->incoming.mutex); 3483 3484 mm = nxt_unit_mmap_at(&process->incoming, hdr->id); 3485 if (nxt_slow_path(mm == NULL)) { 3486 nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array"); 3487 3488 munmap(mem, PORT_MMAP_SIZE); 3489 3490 } else { 3491 mm->hdr = hdr; 3492 3493 hdr->sent_over = 0xFFFFu; 3494 3495 rc = NXT_UNIT_OK; 3496 } 3497 3498 pthread_mutex_unlock(&process->incoming.mutex); 3499 3500 fail: 3501 3502 nxt_unit_process_release(process); 3503 3504 return rc; 3505 } 3506 3507 3508 static void 3509 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) 3510 { 3511 pthread_mutex_init(&mmaps->mutex, NULL); 3512 3513 mmaps->size = 0; 3514 mmaps->cap = 0; 3515 mmaps->elts = NULL; 3516 mmaps->allocated_chunks = 0; 3517 } 3518 3519 3520 nxt_inline void 3521 nxt_unit_process_use(nxt_unit_process_t *process) 3522 { 3523 nxt_atomic_fetch_add(&process->use_count, 1); 3524 } 3525 3526 3527 nxt_inline void 3528 nxt_unit_process_release(nxt_unit_process_t *process) 3529 { 3530 long c; 3531 3532 c = nxt_atomic_fetch_add(&process->use_count, -1); 3533 3534 if (c == 1) { 3535 nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid); 3536 3537 nxt_unit_mmaps_destroy(&process->incoming); 3538 nxt_unit_mmaps_destroy(&process->outgoing); 3539 3540 free(process); 3541 } 3542 } 3543 3544 3545 static void 3546 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) 3547 { 3548 nxt_unit_mmap_t *mm, *end; 3549 3550 if (mmaps->elts != NULL) { 3551 end = mmaps->elts + mmaps->size; 3552 3553 for (mm = mmaps->elts; mm < end; mm++) { 3554 munmap(mm->hdr, PORT_MMAP_SIZE); 3555 } 3556 3557 free(mmaps->elts); 3558 } 3559 3560 pthread_mutex_destroy(&mmaps->mutex); 3561 } 3562 3563 3564 static nxt_port_mmap_header_t * 3565 nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 3566 uint32_t id) 3567 { 3568 nxt_port_mmap_header_t *hdr; 3569 3570 if (nxt_fast_path(process->incoming.size > id)) { 3571 hdr = process->incoming.elts[id].hdr; 3572 3573 } else { 3574 hdr = NULL; 3575 } 3576 3577 return hdr; 3578 } 3579 3580 3581 static int 3582 nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 3583 { 3584 int rc; 3585 nxt_chunk_id_t c; 3586 nxt_unit_process_t *process; 3587 nxt_port_mmap_header_t *hdr; 3588 nxt_port_mmap_tracking_msg_t *tracking_msg; 3589 3590 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 3591 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", 3592 recv_msg->stream, (int) recv_msg->size); 3593 3594 return 0; 3595 } 3596 3597 tracking_msg = recv_msg->start; 3598 3599 recv_msg->start = tracking_msg + 1; 3600 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); 3601 3602 process = nxt_unit_msg_get_process(ctx, recv_msg); 3603 if (nxt_slow_path(process == NULL)) { 3604 return 0; 3605 } 3606 3607 pthread_mutex_lock(&process->incoming.mutex); 3608 3609 hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id); 3610 if (nxt_slow_path(hdr == NULL)) { 3611 pthread_mutex_unlock(&process->incoming.mutex); 3612 3613 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " 3614 "invalid mmap id %d,%"PRIu32, 3615 recv_msg->stream, (int) process->pid, 3616 tracking_msg->mmap_id); 3617 3618 return 0; 3619 } 3620 3621 c = tracking_msg->tracking_id; 3622 rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); 3623 3624 if (rc == 0) { 3625 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", 3626 recv_msg->stream); 3627 3628 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 3629 } 3630 3631 pthread_mutex_unlock(&process->incoming.mutex); 3632 3633 return rc; 3634 } 3635 3636 3637 static int 3638 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 3639 { 3640 void *start; 3641 uint32_t size; 3642 nxt_unit_process_t *process; 3643 nxt_unit_mmap_buf_t *b, **incoming_tail; 3644 nxt_port_mmap_msg_t *mmap_msg, *end; 3645 nxt_port_mmap_header_t *hdr; 3646 3647 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 3648 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", 3649 recv_msg->stream, (int) recv_msg->size); 3650 3651 return NXT_UNIT_ERROR; 3652 } 3653 3654 process = nxt_unit_msg_get_process(ctx, recv_msg); 3655 if (nxt_slow_path(process == NULL)) { 3656 return NXT_UNIT_ERROR; 3657 } 3658 3659 mmap_msg = recv_msg->start; 3660 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 3661 3662 incoming_tail = &recv_msg->incoming_buf; 3663 3664 for (; mmap_msg < end; mmap_msg++) { 3665 b = nxt_unit_mmap_buf_get(ctx); 3666 if (nxt_slow_path(b == NULL)) { 3667 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", 3668 recv_msg->stream); 3669 3670 return NXT_UNIT_ERROR; 3671 } 3672 3673 nxt_unit_mmap_buf_insert(incoming_tail, b); 3674 incoming_tail = &b->next; 3675 } 3676 3677 b = recv_msg->incoming_buf; 3678 mmap_msg = recv_msg->start; 3679 3680 pthread_mutex_lock(&process->incoming.mutex); 3681 3682 for (; mmap_msg < end; mmap_msg++) { 3683 hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id); 3684 if (nxt_slow_path(hdr == NULL)) { 3685 pthread_mutex_unlock(&process->incoming.mutex); 3686 3687 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " 3688 "invalid mmap id %d,%"PRIu32, 3689 recv_msg->stream, (int) process->pid, 3690 mmap_msg->mmap_id); 3691 3692 return NXT_UNIT_ERROR; 3693 } 3694 3695 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 3696 size = mmap_msg->size; 3697 3698 if (recv_msg->start == mmap_msg) { 3699 recv_msg->start = start; 3700 recv_msg->size = size; 3701 } 3702 3703 b->buf.start = start; 3704 b->buf.free = start; 3705 b->buf.end = b->buf.start + size; 3706 b->hdr = hdr; 3707 b->process = process; 3708 3709 b = b->next; 3710 3711 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", 3712 recv_msg->stream, 3713 start, (int) size, 3714 (int) hdr->src_pid, (int) hdr->dst_pid, 3715 (int) hdr->id, (int) mmap_msg->chunk_id, 3716 (int) mmap_msg->size); 3717 } 3718 3719 pthread_mutex_unlock(&process->incoming.mutex); 3720 3721 return NXT_UNIT_OK; 3722 } 3723 3724 3725 static void 3726 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 3727 nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, 3728 void *start, uint32_t size) 3729 { 3730 int freed_chunks; 3731 u_char *p, *end; 3732 nxt_chunk_id_t c; 3733 nxt_unit_impl_t *lib; 3734 3735 memset(start, 0xA5, size); 3736 3737 p = start; 3738 end = p + size; 3739 c = nxt_port_mmap_chunk_id(hdr, p); 3740 freed_chunks = 0; 3741 3742 while (p < end) { 3743 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 3744 3745 p += PORT_MMAP_CHUNK_SIZE; 3746 c++; 3747 freed_chunks++; 3748 } 3749 3750 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3751 3752 if (hdr->src_pid == lib->pid && freed_chunks != 0) { 3753 nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, 3754 -freed_chunks); 3755 3756 nxt_unit_debug(ctx, "process %d allocated_chunks %d", 3757 process->pid, 3758 (int) process->outgoing.allocated_chunks); 3759 } 3760 3761 if (hdr->dst_pid == lib->pid 3762 && freed_chunks != 0 3763 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) 3764 { 3765 nxt_unit_send_shm_ack(ctx, hdr->src_pid); 3766 } 3767 } 3768 3769 3770 static int 3771 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) 3772 { 3773 ssize_t res; 3774 nxt_port_msg_t msg; 3775 nxt_unit_impl_t *lib; 3776 nxt_unit_port_id_t port_id; 3777 3778 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3779 3780 nxt_unit_port_id_init(&port_id, pid, 0); 3781 3782 msg.stream = 0; 3783 msg.pid = lib->pid; 3784 msg.reply_port = 0; 3785 msg.type = _NXT_PORT_MSG_SHM_ACK; 3786 msg.last = 0; 3787 msg.mmap = 0; 3788 msg.nf = 0; 3789 msg.mf = 0; 3790 msg.tracking = 0; 3791 3792 res = nxt_unit_port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); 3793 if (nxt_slow_path(res != sizeof(msg))) { 3794 return NXT_UNIT_ERROR; 3795 } 3796 3797 return NXT_UNIT_OK; 3798 } 3799 3800 3801 static nxt_int_t 3802 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 3803 { 3804 nxt_process_t *process; 3805 3806 process = data; 3807 3808 if (lhq->key.length == sizeof(pid_t) 3809 && *(pid_t *) lhq->key.start == process->pid) 3810 { 3811 return NXT_OK; 3812 } 3813 3814 return NXT_DECLINED; 3815 } 3816 3817 3818 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 3819 NXT_LVLHSH_DEFAULT, 3820 nxt_unit_lvlhsh_pid_test, 3821 nxt_lvlhsh_alloc, 3822 nxt_lvlhsh_free, 3823 }; 3824 3825 3826 static inline void 3827 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) 3828 { 3829 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 3830 lhq->key.length = sizeof(*pid); 3831 lhq->key.start = (u_char *) pid; 3832 lhq->proto = &lvlhsh_processes_proto; 3833 } 3834 3835 3836 static nxt_unit_process_t * 3837 nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) 3838 { 3839 nxt_unit_process_t *process; 3840 nxt_lvlhsh_query_t lhq; 3841 3842 nxt_unit_process_lhq_pid(&lhq, &pid); 3843 3844 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { 3845 process = lhq.value; 3846 nxt_unit_process_use(process); 3847 3848 return process; 3849 } 3850 3851 process = malloc(sizeof(nxt_unit_process_t)); 3852 if (nxt_slow_path(process == NULL)) { 3853 nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid); 3854 3855 return NULL; 3856 } 3857 3858 process->pid = pid; 3859 process->use_count = 1; 3860 process->next_port_id = 0; 3861 process->lib = lib; 3862 3863 nxt_queue_init(&process->ports); 3864 3865 nxt_unit_mmaps_init(&process->incoming); 3866 nxt_unit_mmaps_init(&process->outgoing); 3867 3868 lhq.replace = 0; 3869 lhq.value = process; 3870 3871 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) { 3872 3873 case NXT_OK: 3874 break; 3875 3876 default: 3877 nxt_unit_alert(NULL, "process %d insert failed", (int) pid); 3878 3879 pthread_mutex_destroy(&process->outgoing.mutex); 3880 pthread_mutex_destroy(&process->incoming.mutex); 3881 free(process); 3882 process = NULL; 3883 break; 3884 } 3885 3886 nxt_unit_process_use(process); 3887 3888 return process; 3889 } 3890 3891 3892 static nxt_unit_process_t * 3893 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) 3894 { 3895 int rc; 3896 nxt_unit_process_t *process; 3897 nxt_lvlhsh_query_t lhq; 3898 3899 nxt_unit_process_lhq_pid(&lhq, &pid); 3900 3901 if (remove) { 3902 rc = nxt_lvlhsh_delete(&lib->processes, &lhq); 3903 3904 } else { 3905 rc = nxt_lvlhsh_find(&lib->processes, &lhq); 3906 } 3907 3908 if (rc == NXT_OK) { 3909 process = lhq.value; 3910 3911 if (!remove) { 3912 nxt_unit_process_use(process); 3913 } 3914 3915 return process; 3916 } 3917 3918 return NULL; 3919 } 3920 3921 3922 static nxt_unit_process_t * 3923 nxt_unit_process_pop_first(nxt_unit_impl_t *lib) 3924 { 3925 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL); 3926 } 3927 3928 3929 int 3930 nxt_unit_run(nxt_unit_ctx_t *ctx) 3931 { 3932 int rc; 3933 nxt_unit_impl_t *lib; 3934 nxt_unit_ctx_impl_t *ctx_impl; 3935 3936 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3937 3938 nxt_unit_ctx_use(ctx_impl); 3939 3940 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3941 rc = NXT_UNIT_OK; 3942 3943 while (nxt_fast_path(lib->online)) { 3944 rc = nxt_unit_run_once(ctx); 3945 3946 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3947 break; 3948 } 3949 } 3950 3951 nxt_unit_ctx_release(ctx_impl); 3952 3953 return rc; 3954 } 3955 3956 3957 int 3958 nxt_unit_run_once(nxt_unit_ctx_t *ctx) 3959 { 3960 int rc; 3961 nxt_unit_ctx_impl_t *ctx_impl; 3962 nxt_unit_read_buf_t *rbuf; 3963 3964 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3965 3966 nxt_unit_ctx_use(ctx_impl); 3967 3968 pthread_mutex_lock(&ctx_impl->mutex); 3969 3970 if (ctx_impl->pending_read_head != NULL) { 3971 rbuf = ctx_impl->pending_read_head; 3972 ctx_impl->pending_read_head = rbuf->next; 3973 3974 if (ctx_impl->pending_read_tail == &rbuf->next) { 3975 ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; 3976 } 3977 3978 pthread_mutex_unlock(&ctx_impl->mutex); 3979 3980 } else { 3981 rbuf = nxt_unit_read_buf_get_impl(ctx_impl); 3982 if (nxt_slow_path(rbuf == NULL)) { 3983 3984 nxt_unit_ctx_release(ctx_impl); 3985 3986 return NXT_UNIT_ERROR; 3987 } 3988 3989 nxt_unit_read_buf(ctx, rbuf); 3990 } 3991 3992 if (nxt_fast_path(rbuf->size > 0)) { 3993 rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, 3994 rbuf->buf, rbuf->size, 3995 rbuf->oob, sizeof(rbuf->oob)); 3996 3997 #if (NXT_DEBUG) 3998 memset(rbuf->buf, 0xAC, rbuf->size); 3999 #endif 4000 4001 } else { 4002 rc = NXT_UNIT_ERROR; 4003 } 4004 4005 nxt_unit_read_buf_release(ctx, rbuf); 4006 4007 nxt_unit_ctx_release(ctx_impl); 4008 4009 return rc; 4010 } 4011 4012 4013 static void 4014 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) 4015 { 4016 nxt_unit_impl_t *lib; 4017 nxt_unit_ctx_impl_t *ctx_impl; 4018 4019 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4020 4021 memset(rbuf->oob, 0, sizeof(struct cmsghdr)); 4022 4023 if (ctx_impl->read_port_fd != -1) { 4024 rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd, 4025 rbuf->buf, sizeof(rbuf->buf), 4026 rbuf->oob, sizeof(rbuf->oob)); 4027 4028 } else { 4029 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4030 4031 rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, 4032 rbuf->buf, sizeof(rbuf->buf), 4033 rbuf->oob, sizeof(rbuf->oob)); 4034 } 4035 } 4036 4037 4038 void 4039 nxt_unit_done(nxt_unit_ctx_t *ctx) 4040 { 4041 nxt_unit_ctx_impl_t *ctx_impl; 4042 4043 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4044 4045 nxt_unit_ctx_release(ctx_impl); 4046 } 4047 4048 4049 nxt_unit_ctx_t * 4050 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) 4051 { 4052 int rc, fd; 4053 nxt_unit_impl_t *lib; 4054 nxt_unit_port_id_t new_port_id; 4055 nxt_unit_ctx_impl_t *new_ctx; 4056 4057 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4058 4059 new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size); 4060 if (nxt_slow_path(new_ctx == NULL)) { 4061 nxt_unit_warn(ctx, "failed to allocate context"); 4062 4063 return NULL; 4064 } 4065 4066 rc = nxt_unit_create_port(ctx, &new_port_id, &fd); 4067 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4068 free(new_ctx); 4069 4070 return NULL; 4071 } 4072 4073 rc = nxt_unit_send_port(ctx, &lib->router_port_id, &new_port_id, fd); 4074 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4075 nxt_unit_remove_port(lib, &new_port_id); 4076 4077 close(fd); 4078 4079 free(new_ctx); 4080 4081 return NULL; 4082 } 4083 4084 close(fd); 4085 4086 rc = nxt_unit_ctx_init(lib, new_ctx, data); 4087 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4088 nxt_unit_remove_port(lib, &new_port_id); 4089 4090 free(new_ctx); 4091 4092 return NULL; 4093 } 4094 4095 new_ctx->read_port_id = new_port_id; 4096 4097 return &new_ctx->ctx; 4098 } 4099 4100 4101 static void 4102 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) 4103 { 4104 nxt_unit_impl_t *lib; 4105 nxt_unit_mmap_buf_t *mmap_buf; 4106 nxt_unit_request_info_impl_t *req_impl; 4107 nxt_unit_websocket_frame_impl_t *ws_impl; 4108 4109 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); 4110 4111 nxt_queue_each(req_impl, &ctx_impl->active_req, 4112 nxt_unit_request_info_impl_t, link) 4113 { 4114 nxt_unit_req_warn(&req_impl->req, "active request on ctx free"); 4115 4116 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR); 4117 4118 } nxt_queue_loop; 4119 4120 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]); 4121 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]); 4122 4123 while (ctx_impl->free_buf != NULL) { 4124 mmap_buf = ctx_impl->free_buf; 4125 nxt_unit_mmap_buf_unlink(mmap_buf); 4126 free(mmap_buf); 4127 } 4128 4129 nxt_queue_each(req_impl, &ctx_impl->free_req, 4130 nxt_unit_request_info_impl_t, link) 4131 { 4132 nxt_unit_request_info_free(req_impl); 4133 4134 } nxt_queue_loop; 4135 4136 nxt_queue_each(ws_impl, &ctx_impl->free_ws, 4137 nxt_unit_websocket_frame_impl_t, link) 4138 { 4139 nxt_unit_websocket_frame_free(ws_impl); 4140 4141 } nxt_queue_loop; 4142 4143 pthread_mutex_destroy(&ctx_impl->mutex); 4144 4145 nxt_queue_remove(&ctx_impl->link); 4146 4147 if (ctx_impl != &lib->main_ctx) { 4148 free(ctx_impl); 4149 } 4150 4151 nxt_unit_lib_release(lib); 4152 } 4153 4154 4155 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */ 4156 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET) 4157 #define NXT_UNIX_SOCKET SOCK_SEQPACKET 4158 #else 4159 #define NXT_UNIX_SOCKET SOCK_DGRAM 4160 #endif 4161 4162 4163 void 4164 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) 4165 { 4166 nxt_unit_port_hash_id_t port_hash_id; 4167 4168 port_hash_id.pid = pid; 4169 port_hash_id.id = id; 4170 4171 port_id->pid = pid; 4172 port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id)); 4173 port_id->id = id; 4174 } 4175 4176 4177 static int 4178 nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) 4179 { 4180 int rc, port_sockets[2]; 4181 nxt_unit_impl_t *lib; 4182 nxt_unit_port_t new_port; 4183 nxt_unit_process_t *process; 4184 4185 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4186 4187 rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets); 4188 if (nxt_slow_path(rc != 0)) { 4189 nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", 4190 strerror(errno), errno); 4191 4192 return NXT_UNIT_ERROR; 4193 } 4194 4195 nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", 4196 port_sockets[0], port_sockets[1]); 4197 4198 pthread_mutex_lock(&lib->mutex); 4199 4200 process = nxt_unit_process_get(lib, lib->pid); 4201 if (nxt_slow_path(process == NULL)) { 4202 pthread_mutex_unlock(&lib->mutex); 4203 4204 close(port_sockets[0]); 4205 close(port_sockets[1]); 4206 4207 return NXT_UNIT_ERROR; 4208 } 4209 4210 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); 4211 4212 new_port.in_fd = port_sockets[0]; 4213 new_port.out_fd = -1; 4214 new_port.data = NULL; 4215 4216 pthread_mutex_unlock(&lib->mutex); 4217 4218 nxt_unit_process_release(process); 4219 4220 rc = nxt_unit_add_port(ctx, &new_port); 4221 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4222 nxt_unit_warn(ctx, "create_port: add_port() failed"); 4223 4224 close(port_sockets[0]); 4225 close(port_sockets[1]); 4226 4227 return rc; 4228 } 4229 4230 *port_id = new_port.id; 4231 *fd = port_sockets[1]; 4232 4233 return rc; 4234 } 4235 4236 4237 static int 4238 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, 4239 nxt_unit_port_id_t *new_port, int fd) 4240 { 4241 ssize_t res; 4242 nxt_unit_impl_t *lib; 4243 4244 struct { 4245 nxt_port_msg_t msg; 4246 nxt_port_msg_new_port_t new_port; 4247 } m; 4248 4249 union { 4250 struct cmsghdr cm; 4251 char space[CMSG_SPACE(sizeof(int))]; 4252 } cmsg; 4253 4254 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4255 4256 m.msg.stream = 0; 4257 m.msg.pid = lib->pid; 4258 m.msg.reply_port = 0; 4259 m.msg.type = _NXT_PORT_MSG_NEW_PORT; 4260 m.msg.last = 0; 4261 m.msg.mmap = 0; 4262 m.msg.nf = 0; 4263 m.msg.mf = 0; 4264 m.msg.tracking = 0; 4265 4266 m.new_port.id = new_port->id; 4267 m.new_port.pid = new_port->pid; 4268 m.new_port.type = NXT_PROCESS_APP; 4269 m.new_port.max_size = 16 * 1024; 4270 m.new_port.max_share = 64 * 1024; 4271 4272 memset(&cmsg, 0, sizeof(cmsg)); 4273 4274 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 4275 cmsg.cm.cmsg_level = SOL_SOCKET; 4276 cmsg.cm.cmsg_type = SCM_RIGHTS; 4277 4278 /* 4279 * memcpy() is used instead of simple 4280 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 4281 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 4282 * dereferencing type-punned pointer will break strict-aliasing rules 4283 * 4284 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 4285 * in the same simple assignment as in the code above. 4286 */ 4287 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 4288 4289 res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); 4290 4291 return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR; 4292 } 4293 4294 4295 static int 4296 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 4297 { 4298 int rc; 4299 nxt_unit_impl_t *lib; 4300 nxt_unit_process_t *process; 4301 nxt_unit_port_impl_t *new_port, *old_port; 4302 4303 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4304 4305 pthread_mutex_lock(&lib->mutex); 4306 4307 old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); 4308 4309 if (nxt_slow_path(old_port != NULL)) { 4310 nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d", 4311 port->id.pid, port->id.id, 4312 port->in_fd, port->out_fd); 4313 4314 if (old_port->port.data == NULL) { 4315 old_port->port.data = port->data; 4316 port->data = NULL; 4317 } 4318 4319 if (old_port->port.in_fd == -1) { 4320 old_port->port.in_fd = port->in_fd; 4321 port->in_fd = -1; 4322 } 4323 4324 if (port->in_fd != -1) { 4325 close(port->in_fd); 4326 port->in_fd = -1; 4327 } 4328 4329 if (old_port->port.out_fd == -1) { 4330 old_port->port.out_fd = port->out_fd; 4331 port->out_fd = -1; 4332 } 4333 4334 if (port->out_fd != -1) { 4335 close(port->out_fd); 4336 port->out_fd = -1; 4337 } 4338 4339 *port = old_port->port; 4340 4341 pthread_mutex_unlock(&lib->mutex); 4342 4343 if (lib->callbacks.add_port != NULL 4344 && (port->in_fd != -1 || port->out_fd != -1)) 4345 { 4346 lib->callbacks.add_port(ctx, &old_port->port); 4347 } 4348 4349 return NXT_UNIT_OK; 4350 } 4351 4352 nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", 4353 port->id.pid, port->id.id, 4354 port->in_fd, port->out_fd); 4355 4356 process = nxt_unit_process_get(lib, port->id.pid); 4357 if (nxt_slow_path(process == NULL)) { 4358 rc = NXT_UNIT_ERROR; 4359 goto unlock; 4360 } 4361 4362 if (port->id.id >= process->next_port_id) { 4363 process->next_port_id = port->id.id + 1; 4364 } 4365 4366 new_port = malloc(sizeof(nxt_unit_port_impl_t)); 4367 if (nxt_slow_path(new_port == NULL)) { 4368 rc = NXT_UNIT_ERROR; 4369 goto unlock; 4370 } 4371 4372 new_port->port = *port; 4373 4374 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port); 4375 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4376 nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed", 4377 port->id.pid, port->id.id); 4378 4379 goto unlock; 4380 } 4381 4382 nxt_queue_insert_tail(&process->ports, &new_port->link); 4383 4384 rc = NXT_UNIT_OK; 4385 4386 new_port->process = process; 4387 4388 unlock: 4389 4390 pthread_mutex_unlock(&lib->mutex); 4391 4392 if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) { 4393 nxt_unit_process_release(process); 4394 } 4395 4396 if (lib->callbacks.add_port != NULL 4397 && rc == NXT_UNIT_OK 4398 && (port->in_fd != -1 || port->out_fd != -1)) 4399 { 4400 lib->callbacks.add_port(ctx, &new_port->port); 4401 } 4402 4403 return rc; 4404 } 4405 4406 4407 static int 4408 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) 4409 { 4410 int res; 4411 nxt_unit_port_t *port; 4412 nxt_unit_process_t *process; 4413 4414 port = NULL; 4415 process = NULL; 4416 4417 pthread_mutex_lock(&lib->mutex); 4418 4419 res = nxt_unit_remove_port_unsafe(lib, port_id, &port, &process); 4420 4421 pthread_mutex_unlock(&lib->mutex); 4422 4423 if (lib->callbacks.remove_port != NULL && res == NXT_UNIT_OK) { 4424 lib->callbacks.remove_port(&lib->unit, port); 4425 } 4426 4427 if (nxt_fast_path(port != NULL)) { 4428 if (port->in_fd != -1) { 4429 close(port->in_fd); 4430 } 4431 4432 if (port->out_fd != -1) { 4433 close(port->out_fd); 4434 } 4435 } 4436 4437 if (nxt_slow_path(process != NULL)) { 4438 nxt_unit_process_release(process); 4439 } 4440 4441 if (nxt_fast_path(port != NULL)) { 4442 free(port); 4443 } 4444 4445 return res; 4446 } 4447 4448 4449 static int 4450 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id, 4451 nxt_unit_port_t **r_port, nxt_unit_process_t **process) 4452 { 4453 nxt_unit_port_impl_t *port; 4454 4455 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); 4456 if (nxt_slow_path(port == NULL)) { 4457 nxt_unit_debug(NULL, "remove_port: port %d,%d not found", 4458 (int) port_id->pid, (int) port_id->id); 4459 4460 return NXT_UNIT_ERROR; 4461 } 4462 4463 nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p", 4464 (int) port_id->pid, (int) port_id->id, 4465 port->port.in_fd, port->port.out_fd, port->port.data); 4466 4467 if (port->process != NULL) { 4468 nxt_queue_remove(&port->link); 4469 } 4470 4471 if (process != NULL) { 4472 *process = port->process; 4473 } 4474 4475 if (r_port != NULL) { 4476 *r_port = &port->port; 4477 } 4478 4479 return NXT_UNIT_OK; 4480 } 4481 4482 4483 static void 4484 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid) 4485 { 4486 nxt_unit_process_t *process; 4487 4488 pthread_mutex_lock(&lib->mutex); 4489 4490 process = nxt_unit_process_find(lib, pid, 1); 4491 if (nxt_slow_path(process == NULL)) { 4492 nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid); 4493 4494 pthread_mutex_unlock(&lib->mutex); 4495 4496 return; 4497 } 4498 4499 nxt_unit_remove_process(lib, process); 4500 4501 if (lib->callbacks.remove_pid != NULL) { 4502 lib->callbacks.remove_pid(&lib->unit, pid); 4503 } 4504 } 4505 4506 4507 static void 4508 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) 4509 { 4510 nxt_queue_t ports; 4511 nxt_unit_port_impl_t *port; 4512 4513 nxt_queue_init(&ports); 4514 4515 nxt_queue_add(&ports, &process->ports); 4516 4517 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 4518 4519 nxt_unit_process_release(process); 4520 4521 /* To avoid unlink port. */ 4522 port->process = NULL; 4523 4524 nxt_unit_remove_port_unsafe(lib, &port->port.id, NULL, NULL); 4525 4526 } nxt_queue_loop; 4527 4528 pthread_mutex_unlock(&lib->mutex); 4529 4530 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 4531 4532 nxt_queue_remove(&port->link); 4533 4534 if (lib->callbacks.remove_port != NULL) { 4535 lib->callbacks.remove_port(&lib->unit, &port->port); 4536 } 4537 4538 if (port->port.in_fd != -1) { 4539 close(port->port.in_fd); 4540 } 4541 4542 if (port->port.out_fd != -1) { 4543 close(port->port.out_fd); 4544 } 4545 4546 free(port); 4547 4548 } nxt_queue_loop; 4549 4550 nxt_unit_process_release(process); 4551 } 4552 4553 4554 static void 4555 nxt_unit_quit(nxt_unit_ctx_t *ctx) 4556 { 4557 nxt_unit_impl_t *lib; 4558 4559 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4560 4561 lib->online = 0; 4562 4563 if (lib->callbacks.quit != NULL) { 4564 lib->callbacks.quit(ctx); 4565 } 4566 } 4567 4568 4569 static ssize_t 4570 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 4571 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 4572 { 4573 int fd; 4574 nxt_unit_impl_t *lib; 4575 nxt_unit_port_impl_t *port; 4576 4577 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4578 4579 pthread_mutex_lock(&lib->mutex); 4580 4581 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 4582 4583 if (nxt_fast_path(port != NULL && port->port.out_fd != -1)) { 4584 fd = port->port.out_fd; 4585 4586 pthread_mutex_unlock(&lib->mutex); 4587 4588 } else { 4589 pthread_mutex_unlock(&lib->mutex); 4590 4591 nxt_unit_alert(ctx, "port_send: port %d,%d not found", 4592 (int) port_id->pid, (int) port_id->id); 4593 4594 return -NXT_UNIT_ERROR; 4595 } 4596 4597 nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d", 4598 (int) port_id->pid, (int) port_id->id, fd); 4599 4600 if (lib->callbacks.port_send == NULL) { 4601 return nxt_unit_sendmsg(ctx, fd, buf, buf_size, oob, oob_size); 4602 4603 } else { 4604 return lib->callbacks.port_send(ctx, port_id, buf, buf_size, 4605 oob, oob_size); 4606 } 4607 } 4608 4609 4610 static ssize_t 4611 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 4612 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 4613 { 4614 ssize_t res; 4615 struct iovec iov[1]; 4616 struct msghdr msg; 4617 4618 iov[0].iov_base = (void *) buf; 4619 iov[0].iov_len = buf_size; 4620 4621 msg.msg_name = NULL; 4622 msg.msg_namelen = 0; 4623 msg.msg_iov = iov; 4624 msg.msg_iovlen = 1; 4625 msg.msg_flags = 0; 4626 msg.msg_control = (void *) oob; 4627 msg.msg_controllen = oob_size; 4628 4629 retry: 4630 4631 res = sendmsg(fd, &msg, 0); 4632 4633 if (nxt_slow_path(res == -1)) { 4634 if (errno == EINTR) { 4635 goto retry; 4636 } 4637 4638 /* 4639 * FIXME: This should be "alert" after router graceful shutdown 4640 * implementation. 4641 */ 4642 nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)", 4643 fd, (int) buf_size, strerror(errno), errno); 4644 4645 } else { 4646 nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, 4647 (int) res); 4648 } 4649 4650 return res; 4651 } 4652 4653 4654 static ssize_t 4655 nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 4656 void *buf, size_t buf_size, void *oob, size_t oob_size) 4657 { 4658 int fd; 4659 nxt_unit_impl_t *lib; 4660 nxt_unit_ctx_impl_t *ctx_impl; 4661 nxt_unit_port_impl_t *port; 4662 4663 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4664 4665 pthread_mutex_lock(&lib->mutex); 4666 4667 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 4668 4669 if (nxt_fast_path(port != NULL)) { 4670 fd = port->port.in_fd; 4671 4672 } else { 4673 nxt_unit_debug(ctx, "port_recv: port %d,%d not found", 4674 (int) port_id->pid, (int) port_id->id); 4675 fd = -1; 4676 } 4677 4678 pthread_mutex_unlock(&lib->mutex); 4679 4680 if (nxt_slow_path(fd == -1)) { 4681 return -1; 4682 } 4683 4684 nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d", 4685 (int) port_id->pid, (int) port_id->id, fd); 4686 4687 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4688 4689 if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) { 4690 ctx_impl->read_port_fd = fd; 4691 } 4692 4693 return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size); 4694 } 4695 4696 4697 ssize_t 4698 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size, 4699 void *oob, size_t oob_size) 4700 { 4701 ssize_t res; 4702 struct iovec iov[1]; 4703 struct msghdr msg; 4704 4705 iov[0].iov_base = buf; 4706 iov[0].iov_len = buf_size; 4707 4708 msg.msg_name = NULL; 4709 msg.msg_namelen = 0; 4710 msg.msg_iov = iov; 4711 msg.msg_iovlen = 1; 4712 msg.msg_flags = 0; 4713 msg.msg_control = oob; 4714 msg.msg_controllen = oob_size; 4715 4716 retry: 4717 4718 res = recvmsg(fd, &msg, 0); 4719 4720 if (nxt_slow_path(res == -1)) { 4721 if (errno == EINTR) { 4722 goto retry; 4723 } 4724 4725 nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", 4726 fd, strerror(errno), errno); 4727 4728 } else { 4729 nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res); 4730 } 4731 4732 return res; 4733 } 4734 4735 4736 static nxt_int_t 4737 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 4738 { 4739 nxt_unit_port_t *port; 4740 nxt_unit_port_hash_id_t *port_id; 4741 4742 port = data; 4743 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start; 4744 4745 if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t) 4746 && port_id->pid == port->id.pid 4747 && port_id->id == port->id.id) 4748 { 4749 return NXT_OK; 4750 } 4751 4752 return NXT_DECLINED; 4753 } 4754 4755 4756 static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = { 4757 NXT_LVLHSH_DEFAULT, 4758 nxt_unit_port_hash_test, 4759 nxt_lvlhsh_alloc, 4760 nxt_lvlhsh_free, 4761 }; 4762 4763 4764 static inline void 4765 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq, 4766 nxt_unit_port_hash_id_t *port_hash_id, 4767 nxt_unit_port_id_t *port_id) 4768 { 4769 port_hash_id->pid = port_id->pid; 4770 port_hash_id->id = port_id->id; 4771 4772 if (nxt_fast_path(port_id->hash != 0)) { 4773 lhq->key_hash = port_id->hash; 4774 4775 } else { 4776 lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id)); 4777 4778 port_id->hash = lhq->key_hash; 4779 4780 nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X", 4781 (int) port_id->pid, (int) port_id->id, 4782 (int) port_id->hash); 4783 } 4784 4785 lhq->key.length = sizeof(nxt_unit_port_hash_id_t); 4786 lhq->key.start = (u_char *) port_hash_id; 4787 lhq->proto = &lvlhsh_ports_proto; 4788 lhq->pool = NULL; 4789 } 4790 4791 4792 static int 4793 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port) 4794 { 4795 nxt_int_t res; 4796 nxt_lvlhsh_query_t lhq; 4797 nxt_unit_port_hash_id_t port_hash_id; 4798 4799 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id); 4800 lhq.replace = 0; 4801 lhq.value = port; 4802 4803 res = nxt_lvlhsh_insert(port_hash, &lhq); 4804 4805 switch (res) { 4806 4807 case NXT_OK: 4808 return NXT_UNIT_OK; 4809 4810 default: 4811 return NXT_UNIT_ERROR; 4812 } 4813 } 4814 4815 4816 static nxt_unit_port_impl_t * 4817 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, 4818 int remove) 4819 { 4820 nxt_int_t res; 4821 nxt_lvlhsh_query_t lhq; 4822 nxt_unit_port_hash_id_t port_hash_id; 4823 4824 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id); 4825 4826 if (remove) { 4827 res = nxt_lvlhsh_delete(port_hash, &lhq); 4828 4829 } else { 4830 res = nxt_lvlhsh_find(port_hash, &lhq); 4831 } 4832 4833 switch (res) { 4834 4835 case NXT_OK: 4836 return lhq.value; 4837 4838 default: 4839 return NULL; 4840 } 4841 } 4842 4843 4844 static nxt_int_t 4845 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 4846 { 4847 return NXT_OK; 4848 } 4849 4850 4851 static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { 4852 NXT_LVLHSH_DEFAULT, 4853 nxt_unit_request_hash_test, 4854 nxt_lvlhsh_alloc, 4855 nxt_lvlhsh_free, 4856 }; 4857 4858 4859 static int 4860 nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, 4861 nxt_unit_request_info_impl_t *req_impl) 4862 { 4863 uint32_t *stream; 4864 nxt_int_t res; 4865 nxt_lvlhsh_query_t lhq; 4866 4867 stream = &req_impl->stream; 4868 4869 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); 4870 lhq.key.length = sizeof(*stream); 4871 lhq.key.start = (u_char *) stream; 4872 lhq.proto = &lvlhsh_requests_proto; 4873 lhq.pool = NULL; 4874 lhq.replace = 0; 4875 lhq.value = req_impl; 4876 4877 res = nxt_lvlhsh_insert(request_hash, &lhq); 4878 4879 switch (res) { 4880 4881 case NXT_OK: 4882 return NXT_UNIT_OK; 4883 4884 default: 4885 return NXT_UNIT_ERROR; 4886 } 4887 } 4888 4889 4890 static nxt_unit_request_info_impl_t * 4891 nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, 4892 int remove) 4893 { 4894 nxt_int_t res; 4895 nxt_lvlhsh_query_t lhq; 4896 4897 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); 4898 lhq.key.length = sizeof(stream); 4899 lhq.key.start = (u_char *) &stream; 4900 lhq.proto = &lvlhsh_requests_proto; 4901 lhq.pool = NULL; 4902 4903 if (remove) { 4904 res = nxt_lvlhsh_delete(request_hash, &lhq); 4905 4906 } else { 4907 res = nxt_lvlhsh_find(request_hash, &lhq); 4908 } 4909 4910 switch (res) { 4911 4912 case NXT_OK: 4913 return lhq.value; 4914 4915 default: 4916 return NULL; 4917 } 4918 } 4919 4920 4921 void 4922 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) 4923 { 4924 int log_fd, n; 4925 char msg[NXT_MAX_ERROR_STR], *p, *end; 4926 pid_t pid; 4927 va_list ap; 4928 nxt_unit_impl_t *lib; 4929 4930 if (nxt_fast_path(ctx != NULL)) { 4931 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4932 4933 pid = lib->pid; 4934 log_fd = lib->log_fd; 4935 4936 } else { 4937 pid = getpid(); 4938 log_fd = STDERR_FILENO; 4939 } 4940 4941 p = msg; 4942 end = p + sizeof(msg) - 1; 4943 4944 p = nxt_unit_snprint_prefix(p, end, pid, level); 4945 4946 va_start(ap, fmt); 4947 p += vsnprintf(p, end - p, fmt, ap); 4948 va_end(ap); 4949 4950 if (nxt_slow_path(p > end)) { 4951 memcpy(end - 5, "[...]", 5); 4952 p = end; 4953 } 4954 4955 *p++ = '\n'; 4956 4957 n = write(log_fd, msg, p - msg); 4958 if (nxt_slow_path(n < 0)) { 4959 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 4960 } 4961 } 4962 4963 4964 void 4965 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) 4966 { 4967 int log_fd, n; 4968 char msg[NXT_MAX_ERROR_STR], *p, *end; 4969 pid_t pid; 4970 va_list ap; 4971 nxt_unit_impl_t *lib; 4972 nxt_unit_request_info_impl_t *req_impl; 4973 4974 if (nxt_fast_path(req != NULL)) { 4975 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 4976 4977 pid = lib->pid; 4978 log_fd = lib->log_fd; 4979 4980 } else { 4981 pid = getpid(); 4982 log_fd = STDERR_FILENO; 4983 } 4984 4985 p = msg; 4986 end = p + sizeof(msg) - 1; 4987 4988 p = nxt_unit_snprint_prefix(p, end, pid, level); 4989 4990 if (nxt_fast_path(req != NULL)) { 4991 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 4992 4993 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream); 4994 } 4995 4996 va_start(ap, fmt); 4997 p += vsnprintf(p, end - p, fmt, ap); 4998 va_end(ap); 4999 5000 if (nxt_slow_path(p > end)) { 5001 memcpy(end - 5, "[...]", 5); 5002 p = end; 5003 } 5004 5005 *p++ = '\n'; 5006 5007 n = write(log_fd, msg, p - msg); 5008 if (nxt_slow_path(n < 0)) { 5009 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 5010 } 5011 } 5012 5013 5014 static const char * nxt_unit_log_levels[] = { 5015 "alert", 5016 "error", 5017 "warn", 5018 "notice", 5019 "info", 5020 "debug", 5021 }; 5022 5023 5024 static char * 5025 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level) 5026 { 5027 struct tm tm; 5028 struct timespec ts; 5029 5030 (void) clock_gettime(CLOCK_REALTIME, &ts); 5031 5032 #if (NXT_HAVE_LOCALTIME_R) 5033 (void) localtime_r(&ts.tv_sec, &tm); 5034 #else 5035 tm = *localtime(&ts.tv_sec); 5036 #endif 5037 5038 #if (NXT_DEBUG) 5039 p += snprintf(p, end - p, 5040 "%4d/%02d/%02d %02d:%02d:%02d.%03d ", 5041 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 5042 tm.tm_hour, tm.tm_min, tm.tm_sec, 5043 (int) ts.tv_nsec / 1000000); 5044 #else 5045 p += snprintf(p, end - p, 5046 "%4d/%02d/%02d %02d:%02d:%02d ", 5047 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 5048 tm.tm_hour, tm.tm_min, tm.tm_sec); 5049 #endif 5050 5051 p += snprintf(p, end - p, 5052 "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level], 5053 (int) pid, 5054 (uint64_t) (uintptr_t) nxt_thread_get_tid()); 5055 5056 return p; 5057 } 5058 5059 5060 /* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */ 5061 5062 void * 5063 nxt_memalign(size_t alignment, size_t size) 5064 { 5065 void *p; 5066 nxt_err_t err; 5067 5068 err = posix_memalign(&p, alignment, size); 5069 5070 if (nxt_fast_path(err == 0)) { 5071 return p; 5072 } 5073 5074 return NULL; 5075 } 5076 5077 #if (NXT_DEBUG) 5078 5079 void 5080 nxt_free(void *p) 5081 { 5082 free(p); 5083 } 5084 5085 #endif 5086