1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include <stdlib.h> 7 8 #include "nxt_main.h" 9 #include "nxt_port_memory_int.h" 10 11 #include "nxt_unit.h" 12 #include "nxt_unit_request.h" 13 #include "nxt_unit_response.h" 14 #include "nxt_unit_websocket.h" 15 16 #include "nxt_websocket.h" 17 18 #if (NXT_HAVE_MEMFD_CREATE) 19 #include <linux/memfd.h> 20 #endif 21 22 #define NXT_UNIT_MAX_PLAIN_SIZE 1024 23 #define NXT_UNIT_LOCAL_BUF_SIZE \ 24 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) 25 26 typedef struct nxt_unit_impl_s nxt_unit_impl_t; 27 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 28 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 29 typedef struct nxt_unit_process_s nxt_unit_process_t; 30 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 31 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 32 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t; 33 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 34 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 35 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; 36 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; 37 38 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 39 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, 40 nxt_unit_ctx_impl_t *ctx_impl, void *data); 41 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 42 nxt_unit_mmap_buf_t *mmap_buf); 43 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 44 nxt_unit_mmap_buf_t *mmap_buf); 45 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); 46 static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 47 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, 48 uint32_t *shm_limit); 49 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 50 uint32_t stream); 51 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 52 nxt_unit_recv_msg_t *recv_msg); 53 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 54 nxt_unit_recv_msg_t *recv_msg); 55 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 56 nxt_unit_recv_msg_t *recv_msg); 57 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); 58 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 59 nxt_unit_ctx_t *ctx); 60 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 61 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); 62 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( 63 nxt_unit_ctx_t *ctx); 64 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); 65 static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws); 66 static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, 67 nxt_unit_recv_msg_t *recv_msg); 68 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 69 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 70 static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, 71 nxt_unit_mmap_buf_t *mmap_buf, int last); 72 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); 73 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); 74 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx); 75 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl( 76 nxt_unit_ctx_impl_t *ctx_impl); 77 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 78 nxt_unit_read_buf_t *rbuf); 79 static nxt_unit_mmap_buf_t *nxt_unit_request_preread( 80 nxt_unit_request_info_t *req, size_t size); 81 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, 82 size_t size); 83 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 84 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, 85 nxt_chunk_id_t *c, int *n, int min_n); 86 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); 87 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); 88 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 89 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 90 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n); 91 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 92 int fd); 93 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, 94 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size, 95 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); 96 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 97 98 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 99 static void nxt_unit_process_use(nxt_unit_ctx_t *ctx, 100 nxt_unit_process_t *process, int i); 101 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 102 static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, 103 nxt_unit_process_t *process, uint32_t id); 104 static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, 105 nxt_unit_recv_msg_t *recv_msg); 106 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, 107 nxt_unit_recv_msg_t *recv_msg); 108 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 109 nxt_unit_process_t *process, 110 nxt_port_mmap_header_t *hdr, void *start, uint32_t size); 111 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); 112 113 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, 114 pid_t pid); 115 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, 116 pid_t pid, int remove); 117 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); 118 static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, 119 nxt_unit_read_buf_t *rbuf); 120 static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, 121 nxt_unit_port_id_t *port_id, int *fd); 122 123 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, 124 nxt_unit_port_id_t *new_port, int fd); 125 126 static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, 127 nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port, 128 nxt_unit_process_t **process); 129 static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx, 130 nxt_unit_process_t *process); 131 132 static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, 133 nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, 134 const void *oob, size_t oob_size); 135 static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, 136 nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, 137 void *oob, size_t oob_size); 138 139 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 140 nxt_unit_port_t *port); 141 static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 142 nxt_unit_port_id_t *port_id, int remove); 143 144 static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, 145 nxt_unit_request_info_impl_t *req_impl); 146 static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find( 147 nxt_lvlhsh_t *request_hash, uint32_t stream, int remove); 148 149 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); 150 151 152 struct nxt_unit_mmap_buf_s { 153 nxt_unit_buf_t buf; 154 155 nxt_unit_mmap_buf_t *next; 156 nxt_unit_mmap_buf_t **prev; 157 158 nxt_port_mmap_header_t *hdr; 159 nxt_unit_port_id_t port_id; 160 nxt_unit_request_info_t *req; 161 nxt_unit_ctx_impl_t *ctx_impl; 162 nxt_unit_process_t *process; 163 char *free_ptr; 164 char *plain_ptr; 165 }; 166 167 168 struct nxt_unit_recv_msg_s { 169 uint32_t stream; 170 nxt_pid_t pid; 171 nxt_port_id_t reply_port; 172 173 uint8_t last; /* 1 bit */ 174 uint8_t mmap; /* 1 bit */ 175 176 void *start; 177 uint32_t size; 178 179 int fd; 180 nxt_unit_process_t *process; 181 182 nxt_unit_mmap_buf_t *incoming_buf; 183 }; 184 185 186 typedef enum { 187 NXT_UNIT_RS_START = 0, 188 NXT_UNIT_RS_RESPONSE_INIT, 189 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 190 NXT_UNIT_RS_RESPONSE_SENT, 191 NXT_UNIT_RS_RELEASED, 192 } nxt_unit_req_state_t; 193 194 195 struct nxt_unit_request_info_impl_s { 196 nxt_unit_request_info_t req; 197 198 uint32_t stream; 199 200 nxt_unit_process_t *process; 201 202 nxt_unit_mmap_buf_t *outgoing_buf; 203 nxt_unit_mmap_buf_t *incoming_buf; 204 205 nxt_unit_req_state_t state; 206 uint8_t websocket; 207 208 nxt_queue_link_t link; 209 210 char extra_data[]; 211 }; 212 213 214 struct nxt_unit_websocket_frame_impl_s { 215 nxt_unit_websocket_frame_t ws; 216 217 nxt_unit_mmap_buf_t *buf; 218 219 nxt_queue_link_t link; 220 221 nxt_unit_ctx_impl_t *ctx_impl; 222 }; 223 224 225 struct nxt_unit_read_buf_s { 226 nxt_unit_read_buf_t *next; 227 ssize_t size; 228 char buf[16384]; 229 char oob[256]; 230 }; 231 232 233 struct nxt_unit_ctx_impl_s { 234 nxt_unit_ctx_t ctx; 235 236 pthread_mutex_t mutex; 237 238 nxt_unit_port_id_t read_port_id; 239 int read_port_fd; 240 241 nxt_queue_link_t link; 242 243 nxt_unit_mmap_buf_t *free_buf; 244 245 /* of nxt_unit_request_info_impl_t */ 246 nxt_queue_t free_req; 247 248 /* of nxt_unit_websocket_frame_impl_t */ 249 nxt_queue_t free_ws; 250 251 /* of nxt_unit_request_info_impl_t */ 252 nxt_queue_t active_req; 253 254 /* of nxt_unit_request_info_impl_t */ 255 nxt_lvlhsh_t requests; 256 257 nxt_unit_read_buf_t *pending_read_head; 258 nxt_unit_read_buf_t **pending_read_tail; 259 nxt_unit_read_buf_t *free_read_buf; 260 261 nxt_unit_mmap_buf_t ctx_buf[2]; 262 nxt_unit_read_buf_t ctx_read_buf; 263 264 nxt_unit_request_info_impl_t req; 265 }; 266 267 268 struct nxt_unit_impl_s { 269 nxt_unit_t unit; 270 nxt_unit_callbacks_t callbacks; 271 272 uint32_t request_data_size; 273 uint32_t shm_mmap_limit; 274 275 pthread_mutex_t mutex; 276 277 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ 278 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ 279 280 nxt_unit_port_id_t ready_port_id; 281 282 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ 283 284 pid_t pid; 285 int log_fd; 286 int online; 287 288 nxt_unit_ctx_impl_t main_ctx; 289 }; 290 291 292 struct nxt_unit_port_impl_s { 293 nxt_unit_port_t port; 294 295 nxt_queue_link_t link; 296 nxt_unit_process_t *process; 297 }; 298 299 300 struct nxt_unit_mmap_s { 301 nxt_port_mmap_header_t *hdr; 302 }; 303 304 305 struct nxt_unit_mmaps_s { 306 pthread_mutex_t mutex; 307 uint32_t size; 308 uint32_t cap; 309 nxt_atomic_t allocated_chunks; 310 nxt_unit_mmap_t *elts; 311 }; 312 313 314 struct nxt_unit_process_s { 315 pid_t pid; 316 317 nxt_queue_t ports; 318 319 nxt_unit_mmaps_t incoming; 320 nxt_unit_mmaps_t outgoing; 321 322 nxt_unit_impl_t *lib; 323 324 nxt_atomic_t use_count; 325 326 uint32_t next_port_id; 327 }; 328 329 330 /* Explicitly using 32 bit types to avoid possible alignment. */ 331 typedef struct { 332 int32_t pid; 333 uint32_t id; 334 } nxt_unit_port_hash_id_t; 335 336 337 nxt_unit_ctx_t * 338 nxt_unit_init(nxt_unit_init_t *init) 339 { 340 int rc; 341 uint32_t ready_stream, shm_limit; 342 nxt_unit_ctx_t *ctx; 343 nxt_unit_impl_t *lib; 344 nxt_unit_port_t ready_port, read_port; 345 346 lib = nxt_unit_create(init); 347 if (nxt_slow_path(lib == NULL)) { 348 return NULL; 349 } 350 351 if (init->ready_port.id.pid != 0 352 && init->ready_stream != 0 353 && init->read_port.id.pid != 0) 354 { 355 ready_port = init->ready_port; 356 ready_stream = init->ready_stream; 357 read_port = init->read_port; 358 lib->log_fd = init->log_fd; 359 360 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, 361 ready_port.id.id); 362 nxt_unit_port_id_init(&read_port.id, read_port.id.pid, 363 read_port.id.id); 364 365 } else { 366 rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd, 367 &ready_stream, &shm_limit); 368 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 369 goto fail; 370 } 371 372 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) 373 / PORT_MMAP_DATA_SIZE; 374 } 375 376 if (nxt_slow_path(lib->shm_mmap_limit < 1)) { 377 lib->shm_mmap_limit = 1; 378 } 379 380 lib->pid = read_port.id.pid; 381 ctx = &lib->main_ctx.ctx; 382 383 rc = lib->callbacks.add_port(ctx, &ready_port); 384 if (rc != NXT_UNIT_OK) { 385 nxt_unit_alert(NULL, "failed to add ready_port"); 386 387 goto fail; 388 } 389 390 rc = lib->callbacks.add_port(ctx, &read_port); 391 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 392 nxt_unit_alert(NULL, "failed to add read_port"); 393 394 goto fail; 395 } 396 397 lib->main_ctx.read_port_id = read_port.id; 398 lib->ready_port_id = ready_port.id; 399 400 rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream); 401 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 402 nxt_unit_alert(NULL, "failed to send READY message"); 403 404 goto fail; 405 } 406 407 return ctx; 408 409 fail: 410 411 free(lib); 412 413 return NULL; 414 } 415 416 417 static nxt_unit_impl_t * 418 nxt_unit_create(nxt_unit_init_t *init) 419 { 420 int rc; 421 nxt_unit_impl_t *lib; 422 nxt_unit_callbacks_t *cb; 423 424 lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size); 425 if (nxt_slow_path(lib == NULL)) { 426 nxt_unit_alert(NULL, "failed to allocate unit struct"); 427 428 return NULL; 429 } 430 431 rc = pthread_mutex_init(&lib->mutex, NULL); 432 if (nxt_slow_path(rc != 0)) { 433 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 434 435 goto fail; 436 } 437 438 lib->unit.data = init->data; 439 lib->callbacks = init->callbacks; 440 441 lib->request_data_size = init->request_data_size; 442 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) 443 / PORT_MMAP_DATA_SIZE; 444 445 lib->processes.slot = NULL; 446 lib->ports.slot = NULL; 447 448 lib->log_fd = STDERR_FILENO; 449 lib->online = 1; 450 451 nxt_queue_init(&lib->contexts); 452 453 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); 454 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 455 goto fail; 456 } 457 458 cb = &lib->callbacks; 459 460 if (cb->request_handler == NULL) { 461 nxt_unit_alert(NULL, "request_handler is NULL"); 462 463 goto fail; 464 } 465 466 if (cb->add_port == NULL) { 467 cb->add_port = nxt_unit_add_port; 468 } 469 470 if (cb->remove_port == NULL) { 471 cb->remove_port = nxt_unit_remove_port; 472 } 473 474 if (cb->remove_pid == NULL) { 475 cb->remove_pid = nxt_unit_remove_pid; 476 } 477 478 if (cb->quit == NULL) { 479 cb->quit = nxt_unit_quit; 480 } 481 482 if (cb->port_send == NULL) { 483 cb->port_send = nxt_unit_port_send_default; 484 } 485 486 if (cb->port_recv == NULL) { 487 cb->port_recv = nxt_unit_port_recv_default; 488 } 489 490 return lib; 491 492 fail: 493 494 free(lib); 495 496 return NULL; 497 } 498 499 500 static int 501 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 502 void *data) 503 { 504 int rc; 505 506 ctx_impl->ctx.data = data; 507 ctx_impl->ctx.unit = &lib->unit; 508 509 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 510 511 rc = pthread_mutex_init(&ctx_impl->mutex, NULL); 512 if (nxt_slow_path(rc != 0)) { 513 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 514 515 return NXT_UNIT_ERROR; 516 } 517 518 nxt_queue_init(&ctx_impl->free_req); 519 nxt_queue_init(&ctx_impl->free_ws); 520 nxt_queue_init(&ctx_impl->active_req); 521 522 ctx_impl->free_buf = NULL; 523 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 524 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 525 526 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 527 528 ctx_impl->pending_read_head = NULL; 529 ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; 530 ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf; 531 ctx_impl->ctx_read_buf.next = NULL; 532 533 ctx_impl->req.req.ctx = &ctx_impl->ctx; 534 ctx_impl->req.req.unit = &lib->unit; 535 536 ctx_impl->read_port_fd = -1; 537 ctx_impl->requests.slot = 0; 538 539 return NXT_UNIT_OK; 540 } 541 542 543 nxt_inline void 544 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 545 nxt_unit_mmap_buf_t *mmap_buf) 546 { 547 mmap_buf->next = *head; 548 549 if (mmap_buf->next != NULL) { 550 mmap_buf->next->prev = &mmap_buf->next; 551 } 552 553 *head = mmap_buf; 554 mmap_buf->prev = head; 555 } 556 557 558 nxt_inline void 559 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 560 nxt_unit_mmap_buf_t *mmap_buf) 561 { 562 while (*prev != NULL) { 563 prev = &(*prev)->next; 564 } 565 566 nxt_unit_mmap_buf_insert(prev, mmap_buf); 567 } 568 569 570 nxt_inline void 571 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) 572 { 573 nxt_unit_mmap_buf_t **prev; 574 575 prev = mmap_buf->prev; 576 577 if (mmap_buf->next != NULL) { 578 mmap_buf->next->prev = prev; 579 } 580 581 if (prev != NULL) { 582 *prev = mmap_buf->next; 583 } 584 } 585 586 587 static int 588 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, 589 int *log_fd, uint32_t *stream, uint32_t *shm_limit) 590 { 591 int rc; 592 int ready_fd, read_fd; 593 char *unit_init, *version_end; 594 long version_length; 595 int64_t ready_pid, read_pid; 596 uint32_t ready_stream, ready_id, read_id; 597 598 unit_init = getenv(NXT_UNIT_INIT_ENV); 599 if (nxt_slow_path(unit_init == NULL)) { 600 nxt_unit_alert(NULL, "%s is not in the current environment", 601 NXT_UNIT_INIT_ENV); 602 603 return NXT_UNIT_ERROR; 604 } 605 606 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); 607 608 version_length = nxt_length(NXT_VERSION); 609 610 version_end = strchr(unit_init, ';'); 611 if (version_end == NULL 612 || version_end - unit_init != version_length 613 || memcmp(unit_init, NXT_VERSION, version_length) != 0) 614 { 615 nxt_unit_alert(NULL, "version check error"); 616 617 return NXT_UNIT_ERROR; 618 } 619 620 rc = sscanf(version_end + 1, 621 "%"PRIu32";" 622 "%"PRId64",%"PRIu32",%d;" 623 "%"PRId64",%"PRIu32",%d;" 624 "%d,%"PRIu32, 625 &ready_stream, 626 &ready_pid, &ready_id, &ready_fd, 627 &read_pid, &read_id, &read_fd, 628 log_fd, shm_limit); 629 630 if (nxt_slow_path(rc != 9)) { 631 nxt_unit_alert(NULL, "failed to scan variables: %d", rc); 632 633 return NXT_UNIT_ERROR; 634 } 635 636 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); 637 638 ready_port->in_fd = -1; 639 ready_port->out_fd = ready_fd; 640 ready_port->data = NULL; 641 642 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); 643 644 read_port->in_fd = read_fd; 645 read_port->out_fd = -1; 646 read_port->data = NULL; 647 648 *stream = ready_stream; 649 650 return NXT_UNIT_OK; 651 } 652 653 654 static int 655 nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 656 uint32_t stream) 657 { 658 ssize_t res; 659 nxt_port_msg_t msg; 660 nxt_unit_impl_t *lib; 661 662 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 663 664 msg.stream = stream; 665 msg.pid = lib->pid; 666 msg.reply_port = 0; 667 msg.type = _NXT_PORT_MSG_PROCESS_READY; 668 msg.last = 1; 669 msg.mmap = 0; 670 msg.nf = 0; 671 msg.mf = 0; 672 msg.tracking = 0; 673 674 res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); 675 if (res != sizeof(msg)) { 676 return NXT_UNIT_ERROR; 677 } 678 679 return NXT_UNIT_OK; 680 } 681 682 683 int 684 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 685 void *buf, size_t buf_size, void *oob, size_t oob_size) 686 { 687 int rc; 688 pid_t pid; 689 struct cmsghdr *cm; 690 nxt_port_msg_t *port_msg; 691 nxt_unit_impl_t *lib; 692 nxt_unit_recv_msg_t recv_msg; 693 nxt_unit_callbacks_t *cb; 694 695 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 696 697 rc = NXT_UNIT_ERROR; 698 recv_msg.fd = -1; 699 recv_msg.process = NULL; 700 port_msg = buf; 701 cm = oob; 702 703 if (oob_size >= CMSG_SPACE(sizeof(int)) 704 && cm->cmsg_len == CMSG_LEN(sizeof(int)) 705 && cm->cmsg_level == SOL_SOCKET 706 && cm->cmsg_type == SCM_RIGHTS) 707 { 708 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); 709 } 710 711 recv_msg.incoming_buf = NULL; 712 713 if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { 714 nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); 715 goto fail; 716 } 717 718 recv_msg.stream = port_msg->stream; 719 recv_msg.pid = port_msg->pid; 720 recv_msg.reply_port = port_msg->reply_port; 721 recv_msg.last = port_msg->last; 722 recv_msg.mmap = port_msg->mmap; 723 724 recv_msg.start = port_msg + 1; 725 recv_msg.size = buf_size - sizeof(nxt_port_msg_t); 726 727 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 728 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", 729 port_msg->stream, (int) port_msg->type); 730 goto fail; 731 } 732 733 if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) { 734 rc = NXT_UNIT_OK; 735 736 goto fail; 737 } 738 739 /* Fragmentation is unsupported. */ 740 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 741 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", 742 port_msg->stream, (int) port_msg->type); 743 goto fail; 744 } 745 746 if (port_msg->mmap) { 747 if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) { 748 goto fail; 749 } 750 } 751 752 cb = &lib->callbacks; 753 754 switch (port_msg->type) { 755 756 case _NXT_PORT_MSG_QUIT: 757 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); 758 759 cb->quit(ctx); 760 rc = NXT_UNIT_OK; 761 break; 762 763 case _NXT_PORT_MSG_NEW_PORT: 764 rc = nxt_unit_process_new_port(ctx, &recv_msg); 765 break; 766 767 case _NXT_PORT_MSG_CHANGE_FILE: 768 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 769 port_msg->stream, recv_msg.fd); 770 771 if (dup2(recv_msg.fd, lib->log_fd) == -1) { 772 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", 773 port_msg->stream, recv_msg.fd, lib->log_fd, 774 strerror(errno), errno); 775 776 goto fail; 777 } 778 779 rc = NXT_UNIT_OK; 780 break; 781 782 case _NXT_PORT_MSG_MMAP: 783 if (nxt_slow_path(recv_msg.fd < 0)) { 784 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", 785 port_msg->stream, recv_msg.fd); 786 787 goto fail; 788 } 789 790 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd); 791 break; 792 793 case _NXT_PORT_MSG_REQ_HEADERS: 794 rc = nxt_unit_process_req_headers(ctx, &recv_msg); 795 break; 796 797 case _NXT_PORT_MSG_WEBSOCKET: 798 rc = nxt_unit_process_websocket(ctx, &recv_msg); 799 break; 800 801 case _NXT_PORT_MSG_REMOVE_PID: 802 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 803 nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size " 804 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 805 (int) sizeof(pid)); 806 807 goto fail; 808 } 809 810 memcpy(&pid, recv_msg.start, sizeof(pid)); 811 812 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 813 port_msg->stream, (int) pid); 814 815 cb->remove_pid(ctx, pid); 816 817 rc = NXT_UNIT_OK; 818 break; 819 820 case _NXT_PORT_MSG_SHM_ACK: 821 rc = nxt_unit_process_shm_ack(ctx); 822 break; 823 824 default: 825 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", 826 port_msg->stream, (int) port_msg->type); 827 828 goto fail; 829 } 830 831 fail: 832 833 if (recv_msg.fd != -1) { 834 close(recv_msg.fd); 835 } 836 837 while (recv_msg.incoming_buf != NULL) { 838 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 839 } 840 841 if (recv_msg.process != NULL) { 842 nxt_unit_process_use(ctx, recv_msg.process, -1); 843 } 844 845 return rc; 846 } 847 848 849 static int 850 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 851 { 852 int nb; 853 nxt_unit_impl_t *lib; 854 nxt_unit_port_t new_port; 855 nxt_port_msg_new_port_t *new_port_msg; 856 857 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 858 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 859 "invalid message size (%d)", 860 recv_msg->stream, (int) recv_msg->size); 861 862 return NXT_UNIT_ERROR; 863 } 864 865 if (nxt_slow_path(recv_msg->fd < 0)) { 866 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", 867 recv_msg->stream, recv_msg->fd); 868 869 return NXT_UNIT_ERROR; 870 } 871 872 new_port_msg = recv_msg->start; 873 874 nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", 875 recv_msg->stream, (int) new_port_msg->pid, 876 (int) new_port_msg->id, recv_msg->fd); 877 878 nb = 0; 879 880 if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { 881 nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " 882 "failed: %s (%d)", 883 recv_msg->stream, recv_msg->fd, strerror(errno), errno); 884 885 return NXT_UNIT_ERROR; 886 } 887 888 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 889 new_port_msg->id); 890 891 new_port.in_fd = -1; 892 new_port.out_fd = recv_msg->fd; 893 new_port.data = NULL; 894 895 recv_msg->fd = -1; 896 897 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 898 899 return lib->callbacks.add_port(ctx, &new_port); 900 } 901 902 903 static int 904 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 905 { 906 nxt_unit_impl_t *lib; 907 nxt_unit_request_t *r; 908 nxt_unit_mmap_buf_t *b; 909 nxt_unit_request_info_t *req; 910 nxt_unit_request_info_impl_t *req_impl; 911 912 if (nxt_slow_path(recv_msg->mmap == 0)) { 913 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 914 recv_msg->stream); 915 916 return NXT_UNIT_ERROR; 917 } 918 919 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { 920 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 921 "%d expected", recv_msg->stream, (int) recv_msg->size, 922 (int) sizeof(nxt_unit_request_t)); 923 924 return NXT_UNIT_ERROR; 925 } 926 927 req_impl = nxt_unit_request_info_get(ctx); 928 if (nxt_slow_path(req_impl == NULL)) { 929 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 930 recv_msg->stream); 931 932 return NXT_UNIT_ERROR; 933 } 934 935 req = &req_impl->req; 936 937 nxt_unit_port_id_init(&req->response_port, recv_msg->pid, 938 recv_msg->reply_port); 939 940 req->request = recv_msg->start; 941 942 b = recv_msg->incoming_buf; 943 944 req->request_buf = &b->buf; 945 req->response = NULL; 946 req->response_buf = NULL; 947 948 r = req->request; 949 950 req->content_length = r->content_length; 951 952 req->content_buf = req->request_buf; 953 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); 954 955 /* "Move" process reference to req_impl. */ 956 req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg); 957 if (nxt_slow_path(req_impl->process == NULL)) { 958 return NXT_UNIT_ERROR; 959 } 960 961 recv_msg->process = NULL; 962 963 req_impl->stream = recv_msg->stream; 964 965 req_impl->outgoing_buf = NULL; 966 967 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 968 b->req = req; 969 } 970 971 /* "Move" incoming buffer list to req_impl. */ 972 req_impl->incoming_buf = recv_msg->incoming_buf; 973 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 974 recv_msg->incoming_buf = NULL; 975 976 req->content_fd = recv_msg->fd; 977 recv_msg->fd = -1; 978 979 req->response_max_fields = 0; 980 req_impl->state = NXT_UNIT_RS_START; 981 req_impl->websocket = 0; 982 983 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 984 (int) r->method_length, 985 (char *) nxt_unit_sptr_get(&r->method), 986 (int) r->target_length, 987 (char *) nxt_unit_sptr_get(&r->target), 988 (int) r->content_length); 989 990 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 991 992 lib->callbacks.request_handler(req); 993 994 return NXT_UNIT_OK; 995 } 996 997 998 static int 999 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1000 { 1001 size_t hsize; 1002 nxt_unit_impl_t *lib; 1003 nxt_unit_mmap_buf_t *b; 1004 nxt_unit_ctx_impl_t *ctx_impl; 1005 nxt_unit_callbacks_t *cb; 1006 nxt_unit_request_info_t *req; 1007 nxt_unit_request_info_impl_t *req_impl; 1008 nxt_unit_websocket_frame_impl_t *ws_impl; 1009 1010 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1011 1012 req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream, 1013 recv_msg->last); 1014 if (req_impl == NULL) { 1015 return NXT_UNIT_OK; 1016 } 1017 1018 req = &req_impl->req; 1019 1020 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1021 cb = &lib->callbacks; 1022 1023 if (cb->websocket_handler && recv_msg->size >= 2) { 1024 ws_impl = nxt_unit_websocket_frame_get(ctx); 1025 if (nxt_slow_path(ws_impl == NULL)) { 1026 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", 1027 req_impl->stream); 1028 1029 return NXT_UNIT_ERROR; 1030 } 1031 1032 ws_impl->ws.req = req; 1033 1034 ws_impl->buf = NULL; 1035 1036 if (recv_msg->mmap) { 1037 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1038 b->req = req; 1039 } 1040 1041 /* "Move" incoming buffer list to ws_impl. */ 1042 ws_impl->buf = recv_msg->incoming_buf; 1043 ws_impl->buf->prev = &ws_impl->buf; 1044 recv_msg->incoming_buf = NULL; 1045 1046 b = ws_impl->buf; 1047 1048 } else { 1049 b = nxt_unit_mmap_buf_get(ctx); 1050 if (nxt_slow_path(b == NULL)) { 1051 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", 1052 req_impl->stream); 1053 1054 nxt_unit_websocket_frame_release(&ws_impl->ws); 1055 1056 return NXT_UNIT_ERROR; 1057 } 1058 1059 b->req = req; 1060 b->buf.start = recv_msg->start; 1061 b->buf.free = b->buf.start; 1062 b->buf.end = b->buf.start + recv_msg->size; 1063 1064 nxt_unit_mmap_buf_insert(&ws_impl->buf, b); 1065 } 1066 1067 ws_impl->ws.header = (void *) b->buf.start; 1068 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( 1069 ws_impl->ws.header); 1070 1071 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); 1072 1073 if (ws_impl->ws.header->mask) { 1074 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; 1075 1076 } else { 1077 ws_impl->ws.mask = NULL; 1078 } 1079 1080 b->buf.free += hsize; 1081 1082 ws_impl->ws.content_buf = &b->buf; 1083 ws_impl->ws.content_length = ws_impl->ws.payload_len; 1084 1085 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " 1086 "payload_len=%"PRIu64, 1087 ws_impl->ws.header->opcode, 1088 ws_impl->ws.payload_len); 1089 1090 cb->websocket_handler(&ws_impl->ws); 1091 } 1092 1093 if (recv_msg->last) { 1094 req_impl->websocket = 0; 1095 1096 if (cb->close_handler) { 1097 nxt_unit_req_debug(req, "close_handler"); 1098 1099 cb->close_handler(req); 1100 1101 } else { 1102 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1103 } 1104 } 1105 1106 return NXT_UNIT_OK; 1107 } 1108 1109 1110 static int 1111 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) 1112 { 1113 nxt_unit_impl_t *lib; 1114 nxt_unit_callbacks_t *cb; 1115 1116 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1117 cb = &lib->callbacks; 1118 1119 if (cb->shm_ack_handler != NULL) { 1120 cb->shm_ack_handler(ctx); 1121 } 1122 1123 return NXT_UNIT_OK; 1124 } 1125 1126 1127 static nxt_unit_request_info_impl_t * 1128 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 1129 { 1130 nxt_unit_impl_t *lib; 1131 nxt_queue_link_t *lnk; 1132 nxt_unit_ctx_impl_t *ctx_impl; 1133 nxt_unit_request_info_impl_t *req_impl; 1134 1135 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1136 1137 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1138 1139 pthread_mutex_lock(&ctx_impl->mutex); 1140 1141 if (nxt_queue_is_empty(&ctx_impl->free_req)) { 1142 pthread_mutex_unlock(&ctx_impl->mutex); 1143 1144 req_impl = malloc(sizeof(nxt_unit_request_info_impl_t) 1145 + lib->request_data_size); 1146 if (nxt_slow_path(req_impl == NULL)) { 1147 return NULL; 1148 } 1149 1150 req_impl->req.unit = ctx->unit; 1151 req_impl->req.ctx = ctx; 1152 1153 pthread_mutex_lock(&ctx_impl->mutex); 1154 1155 } else { 1156 lnk = nxt_queue_first(&ctx_impl->free_req); 1157 nxt_queue_remove(lnk); 1158 1159 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link); 1160 } 1161 1162 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); 1163 1164 pthread_mutex_unlock(&ctx_impl->mutex); 1165 1166 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; 1167 1168 return req_impl; 1169 } 1170 1171 1172 static void 1173 nxt_unit_request_info_release(nxt_unit_request_info_t *req) 1174 { 1175 nxt_unit_ctx_impl_t *ctx_impl; 1176 nxt_unit_request_info_impl_t *req_impl; 1177 1178 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1179 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1180 1181 req->response = NULL; 1182 req->response_buf = NULL; 1183 1184 if (req_impl->websocket) { 1185 nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); 1186 1187 req_impl->websocket = 0; 1188 } 1189 1190 while (req_impl->outgoing_buf != NULL) { 1191 nxt_unit_mmap_buf_free(req_impl->outgoing_buf); 1192 } 1193 1194 while (req_impl->incoming_buf != NULL) { 1195 nxt_unit_mmap_buf_free(req_impl->incoming_buf); 1196 } 1197 1198 if (req->content_fd != -1) { 1199 close(req->content_fd); 1200 1201 req->content_fd = -1; 1202 } 1203 1204 /* 1205 * Process release should go after buffers release to guarantee mmap 1206 * existence. 1207 */ 1208 if (req_impl->process != NULL) { 1209 nxt_unit_process_use(req->ctx, req_impl->process, -1); 1210 1211 req_impl->process = NULL; 1212 } 1213 1214 pthread_mutex_lock(&ctx_impl->mutex); 1215 1216 nxt_queue_remove(&req_impl->link); 1217 1218 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); 1219 1220 pthread_mutex_unlock(&ctx_impl->mutex); 1221 1222 req_impl->state = NXT_UNIT_RS_RELEASED; 1223 } 1224 1225 1226 static void 1227 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 1228 { 1229 nxt_unit_ctx_impl_t *ctx_impl; 1230 1231 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 1232 1233 nxt_queue_remove(&req_impl->link); 1234 1235 if (req_impl != &ctx_impl->req) { 1236 free(req_impl); 1237 } 1238 } 1239 1240 1241 static nxt_unit_websocket_frame_impl_t * 1242 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) 1243 { 1244 nxt_queue_link_t *lnk; 1245 nxt_unit_ctx_impl_t *ctx_impl; 1246 nxt_unit_websocket_frame_impl_t *ws_impl; 1247 1248 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1249 1250 pthread_mutex_lock(&ctx_impl->mutex); 1251 1252 if (nxt_queue_is_empty(&ctx_impl->free_ws)) { 1253 pthread_mutex_unlock(&ctx_impl->mutex); 1254 1255 ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t)); 1256 if (nxt_slow_path(ws_impl == NULL)) { 1257 return NULL; 1258 } 1259 1260 } else { 1261 lnk = nxt_queue_first(&ctx_impl->free_ws); 1262 nxt_queue_remove(lnk); 1263 1264 pthread_mutex_unlock(&ctx_impl->mutex); 1265 1266 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); 1267 } 1268 1269 ws_impl->ctx_impl = ctx_impl; 1270 1271 return ws_impl; 1272 } 1273 1274 1275 static void 1276 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) 1277 { 1278 nxt_unit_websocket_frame_impl_t *ws_impl; 1279 1280 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 1281 1282 while (ws_impl->buf != NULL) { 1283 nxt_unit_mmap_buf_free(ws_impl->buf); 1284 } 1285 1286 ws->req = NULL; 1287 1288 pthread_mutex_lock(&ws_impl->ctx_impl->mutex); 1289 1290 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); 1291 1292 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex); 1293 } 1294 1295 1296 static void 1297 nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl) 1298 { 1299 nxt_queue_remove(&ws_impl->link); 1300 1301 free(ws_impl); 1302 } 1303 1304 1305 uint16_t 1306 nxt_unit_field_hash(const char *name, size_t name_length) 1307 { 1308 u_char ch; 1309 uint32_t hash; 1310 const char *p, *end; 1311 1312 hash = 159406; /* Magic value copied from nxt_http_parse.c */ 1313 end = name + name_length; 1314 1315 for (p = name; p < end; p++) { 1316 ch = *p; 1317 hash = (hash << 4) + hash + nxt_lowcase(ch); 1318 } 1319 1320 hash = (hash >> 16) ^ hash; 1321 1322 return hash; 1323 } 1324 1325 1326 void 1327 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) 1328 { 1329 uint32_t i, j; 1330 nxt_unit_field_t *fields, f; 1331 nxt_unit_request_t *r; 1332 1333 nxt_unit_req_debug(req, "group_dup_fields"); 1334 1335 r = req->request; 1336 fields = r->fields; 1337 1338 for (i = 0; i < r->fields_count; i++) { 1339 1340 switch (fields[i].hash) { 1341 case NXT_UNIT_HASH_CONTENT_LENGTH: 1342 r->content_length_field = i; 1343 break; 1344 1345 case NXT_UNIT_HASH_CONTENT_TYPE: 1346 r->content_type_field = i; 1347 break; 1348 1349 case NXT_UNIT_HASH_COOKIE: 1350 r->cookie_field = i; 1351 break; 1352 }; 1353 1354 for (j = i + 1; j < r->fields_count; j++) { 1355 if (fields[i].hash != fields[j].hash) { 1356 continue; 1357 } 1358 1359 if (j == i + 1) { 1360 continue; 1361 } 1362 1363 f = fields[j]; 1364 f.name.offset += (j - (i + 1)) * sizeof(f); 1365 f.value.offset += (j - (i + 1)) * sizeof(f); 1366 1367 while (j > i + 1) { 1368 fields[j] = fields[j - 1]; 1369 fields[j].name.offset -= sizeof(f); 1370 fields[j].value.offset -= sizeof(f); 1371 j--; 1372 } 1373 1374 fields[j] = f; 1375 1376 i++; 1377 } 1378 } 1379 } 1380 1381 1382 int 1383 nxt_unit_response_init(nxt_unit_request_info_t *req, 1384 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size) 1385 { 1386 uint32_t buf_size; 1387 nxt_unit_buf_t *buf; 1388 nxt_unit_request_info_impl_t *req_impl; 1389 1390 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1391 1392 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1393 nxt_unit_req_warn(req, "init: response already sent"); 1394 1395 return NXT_UNIT_ERROR; 1396 } 1397 1398 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status, 1399 (int) max_fields_count, (int) max_fields_size); 1400 1401 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) { 1402 nxt_unit_req_debug(req, "duplicate response init"); 1403 } 1404 1405 /* 1406 * Each field name and value 0-terminated by libunit, 1407 * this is the reason of '+ 2' below. 1408 */ 1409 buf_size = sizeof(nxt_unit_response_t) 1410 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 1411 + max_fields_size; 1412 1413 if (nxt_slow_path(req->response_buf != NULL)) { 1414 buf = req->response_buf; 1415 1416 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) { 1417 goto init_response; 1418 } 1419 1420 nxt_unit_buf_free(buf); 1421 1422 req->response_buf = NULL; 1423 req->response = NULL; 1424 req->response_max_fields = 0; 1425 1426 req_impl->state = NXT_UNIT_RS_START; 1427 } 1428 1429 buf = nxt_unit_response_buf_alloc(req, buf_size); 1430 if (nxt_slow_path(buf == NULL)) { 1431 return NXT_UNIT_ERROR; 1432 } 1433 1434 init_response: 1435 1436 memset(buf->start, 0, sizeof(nxt_unit_response_t)); 1437 1438 req->response_buf = buf; 1439 1440 req->response = (nxt_unit_response_t *) buf->start; 1441 req->response->status = status; 1442 1443 buf->free = buf->start + sizeof(nxt_unit_response_t) 1444 + max_fields_count * sizeof(nxt_unit_field_t); 1445 1446 req->response_max_fields = max_fields_count; 1447 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT; 1448 1449 return NXT_UNIT_OK; 1450 } 1451 1452 1453 int 1454 nxt_unit_response_realloc(nxt_unit_request_info_t *req, 1455 uint32_t max_fields_count, uint32_t max_fields_size) 1456 { 1457 char *p; 1458 uint32_t i, buf_size; 1459 nxt_unit_buf_t *buf; 1460 nxt_unit_field_t *f, *src; 1461 nxt_unit_response_t *resp; 1462 nxt_unit_request_info_impl_t *req_impl; 1463 1464 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1465 1466 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1467 nxt_unit_req_warn(req, "realloc: response not init"); 1468 1469 return NXT_UNIT_ERROR; 1470 } 1471 1472 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1473 nxt_unit_req_warn(req, "realloc: response already sent"); 1474 1475 return NXT_UNIT_ERROR; 1476 } 1477 1478 if (nxt_slow_path(max_fields_count < req->response->fields_count)) { 1479 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small"); 1480 1481 return NXT_UNIT_ERROR; 1482 } 1483 1484 /* 1485 * Each field name and value 0-terminated by libunit, 1486 * this is the reason of '+ 2' below. 1487 */ 1488 buf_size = sizeof(nxt_unit_response_t) 1489 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 1490 + max_fields_size; 1491 1492 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size); 1493 1494 buf = nxt_unit_response_buf_alloc(req, buf_size); 1495 if (nxt_slow_path(buf == NULL)) { 1496 nxt_unit_req_warn(req, "realloc: new buf allocation failed"); 1497 return NXT_UNIT_ERROR; 1498 } 1499 1500 resp = (nxt_unit_response_t *) buf->start; 1501 1502 memset(resp, 0, sizeof(nxt_unit_response_t)); 1503 1504 resp->status = req->response->status; 1505 resp->content_length = req->response->content_length; 1506 1507 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t); 1508 f = resp->fields; 1509 1510 for (i = 0; i < req->response->fields_count; i++) { 1511 src = req->response->fields + i; 1512 1513 if (nxt_slow_path(src->skip != 0)) { 1514 continue; 1515 } 1516 1517 if (nxt_slow_path(src->name_length + src->value_length + 2 1518 > (uint32_t) (buf->end - p))) 1519 { 1520 nxt_unit_req_warn(req, "realloc: not enough space for field" 1521 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required", 1522 i, src, src->name_length, src->value_length); 1523 1524 goto fail; 1525 } 1526 1527 nxt_unit_sptr_set(&f->name, p); 1528 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length); 1529 *p++ = '\0'; 1530 1531 nxt_unit_sptr_set(&f->value, p); 1532 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length); 1533 *p++ = '\0'; 1534 1535 f->hash = src->hash; 1536 f->skip = 0; 1537 f->name_length = src->name_length; 1538 f->value_length = src->value_length; 1539 1540 resp->fields_count++; 1541 f++; 1542 } 1543 1544 if (req->response->piggyback_content_length > 0) { 1545 if (nxt_slow_path(req->response->piggyback_content_length 1546 > (uint32_t) (buf->end - p))) 1547 { 1548 nxt_unit_req_warn(req, "realloc: not enought space for content" 1549 " #%"PRIu32", %"PRIu32" required", 1550 i, req->response->piggyback_content_length); 1551 1552 goto fail; 1553 } 1554 1555 resp->piggyback_content_length = 1556 req->response->piggyback_content_length; 1557 1558 nxt_unit_sptr_set(&resp->piggyback_content, p); 1559 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), 1560 req->response->piggyback_content_length); 1561 } 1562 1563 buf->free = p; 1564 1565 nxt_unit_buf_free(req->response_buf); 1566 1567 req->response = resp; 1568 req->response_buf = buf; 1569 req->response_max_fields = max_fields_count; 1570 1571 return NXT_UNIT_OK; 1572 1573 fail: 1574 1575 nxt_unit_buf_free(buf); 1576 1577 return NXT_UNIT_ERROR; 1578 } 1579 1580 1581 int 1582 nxt_unit_response_is_init(nxt_unit_request_info_t *req) 1583 { 1584 nxt_unit_request_info_impl_t *req_impl; 1585 1586 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1587 1588 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT; 1589 } 1590 1591 1592 int 1593 nxt_unit_response_add_field(nxt_unit_request_info_t *req, 1594 const char *name, uint8_t name_length, 1595 const char *value, uint32_t value_length) 1596 { 1597 nxt_unit_buf_t *buf; 1598 nxt_unit_field_t *f; 1599 nxt_unit_response_t *resp; 1600 nxt_unit_request_info_impl_t *req_impl; 1601 1602 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1603 1604 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) { 1605 nxt_unit_req_warn(req, "add_field: response not initialized or " 1606 "already sent"); 1607 1608 return NXT_UNIT_ERROR; 1609 } 1610 1611 resp = req->response; 1612 1613 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { 1614 nxt_unit_req_warn(req, "add_field: too many response fields"); 1615 1616 return NXT_UNIT_ERROR; 1617 } 1618 1619 buf = req->response_buf; 1620 1621 if (nxt_slow_path(name_length + value_length + 2 1622 > (uint32_t) (buf->end - buf->free))) 1623 { 1624 nxt_unit_req_warn(req, "add_field: response buffer overflow"); 1625 1626 return NXT_UNIT_ERROR; 1627 } 1628 1629 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s", 1630 resp->fields_count, 1631 (int) name_length, name, 1632 (int) value_length, value); 1633 1634 f = resp->fields + resp->fields_count; 1635 1636 nxt_unit_sptr_set(&f->name, buf->free); 1637 buf->free = nxt_cpymem(buf->free, name, name_length); 1638 *buf->free++ = '\0'; 1639 1640 nxt_unit_sptr_set(&f->value, buf->free); 1641 buf->free = nxt_cpymem(buf->free, value, value_length); 1642 *buf->free++ = '\0'; 1643 1644 f->hash = nxt_unit_field_hash(name, name_length); 1645 f->skip = 0; 1646 f->name_length = name_length; 1647 f->value_length = value_length; 1648 1649 resp->fields_count++; 1650 1651 return NXT_UNIT_OK; 1652 } 1653 1654 1655 int 1656 nxt_unit_response_add_content(nxt_unit_request_info_t *req, 1657 const void* src, uint32_t size) 1658 { 1659 nxt_unit_buf_t *buf; 1660 nxt_unit_response_t *resp; 1661 nxt_unit_request_info_impl_t *req_impl; 1662 1663 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1664 1665 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1666 nxt_unit_req_warn(req, "add_content: response not initialized yet"); 1667 1668 return NXT_UNIT_ERROR; 1669 } 1670 1671 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1672 nxt_unit_req_warn(req, "add_content: response already sent"); 1673 1674 return NXT_UNIT_ERROR; 1675 } 1676 1677 buf = req->response_buf; 1678 1679 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) { 1680 nxt_unit_req_warn(req, "add_content: buffer overflow"); 1681 1682 return NXT_UNIT_ERROR; 1683 } 1684 1685 resp = req->response; 1686 1687 if (resp->piggyback_content_length == 0) { 1688 nxt_unit_sptr_set(&resp->piggyback_content, buf->free); 1689 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT; 1690 } 1691 1692 resp->piggyback_content_length += size; 1693 1694 buf->free = nxt_cpymem(buf->free, src, size); 1695 1696 return NXT_UNIT_OK; 1697 } 1698 1699 1700 int 1701 nxt_unit_response_send(nxt_unit_request_info_t *req) 1702 { 1703 int rc; 1704 nxt_unit_mmap_buf_t *mmap_buf; 1705 nxt_unit_request_info_impl_t *req_impl; 1706 1707 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1708 1709 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1710 nxt_unit_req_warn(req, "send: response is not initialized yet"); 1711 1712 return NXT_UNIT_ERROR; 1713 } 1714 1715 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1716 nxt_unit_req_warn(req, "send: response already sent"); 1717 1718 return NXT_UNIT_ERROR; 1719 } 1720 1721 if (req->request->websocket_handshake && req->response->status == 101) { 1722 nxt_unit_response_upgrade(req); 1723 } 1724 1725 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 1726 req->response->fields_count, 1727 (int) (req->response_buf->free 1728 - req->response_buf->start)); 1729 1730 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 1731 1732 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); 1733 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 1734 req->response = NULL; 1735 req->response_buf = NULL; 1736 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 1737 1738 nxt_unit_mmap_buf_free(mmap_buf); 1739 } 1740 1741 return rc; 1742 } 1743 1744 1745 int 1746 nxt_unit_response_is_sent(nxt_unit_request_info_t *req) 1747 { 1748 nxt_unit_request_info_impl_t *req_impl; 1749 1750 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1751 1752 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 1753 } 1754 1755 1756 nxt_unit_buf_t * 1757 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 1758 { 1759 int rc; 1760 nxt_unit_mmap_buf_t *mmap_buf; 1761 nxt_unit_request_info_impl_t *req_impl; 1762 1763 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 1764 nxt_unit_req_warn(req, "response_buf_alloc: " 1765 "requested buffer (%"PRIu32") too big", size); 1766 1767 return NULL; 1768 } 1769 1770 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 1771 1772 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1773 1774 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 1775 if (nxt_slow_path(mmap_buf == NULL)) { 1776 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf"); 1777 1778 return NULL; 1779 } 1780 1781 mmap_buf->req = req; 1782 1783 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); 1784 1785 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 1786 &req->response_port, size, size, mmap_buf, 1787 NULL); 1788 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1789 nxt_unit_mmap_buf_release(mmap_buf); 1790 1791 return NULL; 1792 } 1793 1794 return &mmap_buf->buf; 1795 } 1796 1797 1798 static nxt_unit_process_t * 1799 nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1800 { 1801 nxt_unit_impl_t *lib; 1802 1803 if (recv_msg->process != NULL) { 1804 return recv_msg->process; 1805 } 1806 1807 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1808 1809 pthread_mutex_lock(&lib->mutex); 1810 1811 recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0); 1812 1813 pthread_mutex_unlock(&lib->mutex); 1814 1815 if (recv_msg->process == NULL) { 1816 nxt_unit_warn(ctx, "#%"PRIu32": process %d not found", 1817 recv_msg->stream, (int) recv_msg->pid); 1818 } 1819 1820 return recv_msg->process; 1821 } 1822 1823 1824 static nxt_unit_mmap_buf_t * 1825 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 1826 { 1827 nxt_unit_mmap_buf_t *mmap_buf; 1828 nxt_unit_ctx_impl_t *ctx_impl; 1829 1830 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1831 1832 pthread_mutex_lock(&ctx_impl->mutex); 1833 1834 if (ctx_impl->free_buf == NULL) { 1835 pthread_mutex_unlock(&ctx_impl->mutex); 1836 1837 mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t)); 1838 if (nxt_slow_path(mmap_buf == NULL)) { 1839 return NULL; 1840 } 1841 1842 } else { 1843 mmap_buf = ctx_impl->free_buf; 1844 1845 nxt_unit_mmap_buf_unlink(mmap_buf); 1846 1847 pthread_mutex_unlock(&ctx_impl->mutex); 1848 } 1849 1850 mmap_buf->ctx_impl = ctx_impl; 1851 1852 mmap_buf->hdr = NULL; 1853 mmap_buf->free_ptr = NULL; 1854 1855 return mmap_buf; 1856 } 1857 1858 1859 static void 1860 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 1861 { 1862 nxt_unit_mmap_buf_unlink(mmap_buf); 1863 1864 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); 1865 1866 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); 1867 1868 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex); 1869 } 1870 1871 1872 typedef struct { 1873 size_t len; 1874 const char *str; 1875 } nxt_unit_str_t; 1876 1877 1878 #define nxt_unit_str(str) { nxt_length(str), str } 1879 1880 1881 int 1882 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) 1883 { 1884 return req->request->websocket_handshake; 1885 } 1886 1887 1888 int 1889 nxt_unit_response_upgrade(nxt_unit_request_info_t *req) 1890 { 1891 int rc; 1892 nxt_unit_ctx_impl_t *ctx_impl; 1893 nxt_unit_request_info_impl_t *req_impl; 1894 1895 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1896 1897 if (nxt_slow_path(req_impl->websocket != 0)) { 1898 nxt_unit_req_debug(req, "upgrade: already upgraded"); 1899 1900 return NXT_UNIT_OK; 1901 } 1902 1903 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1904 nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); 1905 1906 return NXT_UNIT_ERROR; 1907 } 1908 1909 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1910 nxt_unit_req_warn(req, "upgrade: response already sent"); 1911 1912 return NXT_UNIT_ERROR; 1913 } 1914 1915 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1916 1917 rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl); 1918 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1919 nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); 1920 1921 return NXT_UNIT_ERROR; 1922 } 1923 1924 req_impl->websocket = 1; 1925 1926 req->response->status = 101; 1927 1928 return NXT_UNIT_OK; 1929 } 1930 1931 1932 int 1933 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) 1934 { 1935 nxt_unit_request_info_impl_t *req_impl; 1936 1937 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1938 1939 return req_impl->websocket; 1940 } 1941 1942 1943 nxt_unit_request_info_t * 1944 nxt_unit_get_request_info_from_data(void *data) 1945 { 1946 nxt_unit_request_info_impl_t *req_impl; 1947 1948 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); 1949 1950 return &req_impl->req; 1951 } 1952 1953 1954 int 1955 nxt_unit_buf_send(nxt_unit_buf_t *buf) 1956 { 1957 int rc; 1958 nxt_unit_mmap_buf_t *mmap_buf; 1959 nxt_unit_request_info_t *req; 1960 nxt_unit_request_info_impl_t *req_impl; 1961 1962 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 1963 1964 req = mmap_buf->req; 1965 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1966 1967 nxt_unit_req_debug(req, "buf_send: %d bytes", 1968 (int) (buf->free - buf->start)); 1969 1970 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1971 nxt_unit_req_warn(req, "buf_send: response not initialized yet"); 1972 1973 return NXT_UNIT_ERROR; 1974 } 1975 1976 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 1977 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 1978 1979 return NXT_UNIT_ERROR; 1980 } 1981 1982 if (nxt_fast_path(buf->free > buf->start)) { 1983 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); 1984 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1985 return rc; 1986 } 1987 } 1988 1989 nxt_unit_mmap_buf_free(mmap_buf); 1990 1991 return NXT_UNIT_OK; 1992 } 1993 1994 1995 static void 1996 nxt_unit_buf_send_done(nxt_unit_buf_t *buf) 1997 { 1998 int rc; 1999 nxt_unit_mmap_buf_t *mmap_buf; 2000 nxt_unit_request_info_t *req; 2001 nxt_unit_request_info_impl_t *req_impl; 2002 2003 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2004 2005 req = mmap_buf->req; 2006 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2007 2008 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1); 2009 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 2010 nxt_unit_mmap_buf_free(mmap_buf); 2011 2012 nxt_unit_request_info_release(req); 2013 2014 } else { 2015 nxt_unit_request_done(req, rc); 2016 } 2017 } 2018 2019 2020 static int 2021 nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, 2022 nxt_unit_mmap_buf_t *mmap_buf, int last) 2023 { 2024 struct { 2025 nxt_port_msg_t msg; 2026 nxt_port_mmap_msg_t mmap_msg; 2027 } m; 2028 2029 int rc; 2030 u_char *last_used, *first_free; 2031 ssize_t res; 2032 nxt_chunk_id_t first_free_chunk; 2033 nxt_unit_buf_t *buf; 2034 nxt_unit_impl_t *lib; 2035 nxt_port_mmap_header_t *hdr; 2036 2037 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2038 2039 buf = &mmap_buf->buf; 2040 hdr = mmap_buf->hdr; 2041 2042 m.mmap_msg.size = buf->free - buf->start; 2043 2044 m.msg.stream = stream; 2045 m.msg.pid = lib->pid; 2046 m.msg.reply_port = 0; 2047 m.msg.type = _NXT_PORT_MSG_DATA; 2048 m.msg.last = last != 0; 2049 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; 2050 m.msg.nf = 0; 2051 m.msg.mf = 0; 2052 m.msg.tracking = 0; 2053 2054 rc = NXT_UNIT_ERROR; 2055 2056 if (m.msg.mmap) { 2057 m.mmap_msg.mmap_id = hdr->id; 2058 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, 2059 (u_char *) buf->start); 2060 2061 nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 2062 stream, 2063 (int) m.mmap_msg.mmap_id, 2064 (int) m.mmap_msg.chunk_id, 2065 (int) m.mmap_msg.size); 2066 2067 res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), 2068 NULL, 0); 2069 if (nxt_slow_path(res != sizeof(m))) { 2070 goto free_buf; 2071 } 2072 2073 last_used = (u_char *) buf->free - 1; 2074 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 2075 2076 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { 2077 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 2078 2079 buf->start = (char *) first_free; 2080 buf->free = buf->start; 2081 2082 if (buf->end < buf->start) { 2083 buf->end = buf->start; 2084 } 2085 2086 } else { 2087 buf->start = NULL; 2088 buf->free = NULL; 2089 buf->end = NULL; 2090 2091 mmap_buf->hdr = NULL; 2092 } 2093 2094 nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks, 2095 (int) m.mmap_msg.chunk_id - (int) first_free_chunk); 2096 2097 nxt_unit_debug(ctx, "process %d allocated_chunks %d", 2098 mmap_buf->process->pid, 2099 (int) mmap_buf->process->outgoing.allocated_chunks); 2100 2101 } else { 2102 if (nxt_slow_path(mmap_buf->plain_ptr == NULL 2103 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) 2104 { 2105 nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer" 2106 ": no space reserved for message header", stream); 2107 2108 goto free_buf; 2109 } 2110 2111 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); 2112 2113 nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d", 2114 stream, 2115 (int) (sizeof(m.msg) + m.mmap_msg.size)); 2116 2117 res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, 2118 buf->start - sizeof(m.msg), 2119 m.mmap_msg.size + sizeof(m.msg), 2120 NULL, 0); 2121 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { 2122 goto free_buf; 2123 } 2124 } 2125 2126 rc = NXT_UNIT_OK; 2127 2128 free_buf: 2129 2130 nxt_unit_free_outgoing_buf(mmap_buf); 2131 2132 return rc; 2133 } 2134 2135 2136 void 2137 nxt_unit_buf_free(nxt_unit_buf_t *buf) 2138 { 2139 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); 2140 } 2141 2142 2143 static void 2144 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) 2145 { 2146 nxt_unit_free_outgoing_buf(mmap_buf); 2147 2148 nxt_unit_mmap_buf_release(mmap_buf); 2149 } 2150 2151 2152 static void 2153 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) 2154 { 2155 if (mmap_buf->hdr != NULL) { 2156 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, 2157 mmap_buf->process, 2158 mmap_buf->hdr, mmap_buf->buf.start, 2159 mmap_buf->buf.end - mmap_buf->buf.start); 2160 2161 mmap_buf->hdr = NULL; 2162 2163 return; 2164 } 2165 2166 if (mmap_buf->free_ptr != NULL) { 2167 free(mmap_buf->free_ptr); 2168 2169 mmap_buf->free_ptr = NULL; 2170 } 2171 } 2172 2173 2174 static nxt_unit_read_buf_t * 2175 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) 2176 { 2177 nxt_unit_ctx_impl_t *ctx_impl; 2178 2179 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2180 2181 pthread_mutex_lock(&ctx_impl->mutex); 2182 2183 return nxt_unit_read_buf_get_impl(ctx_impl); 2184 } 2185 2186 2187 static nxt_unit_read_buf_t * 2188 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) 2189 { 2190 nxt_unit_read_buf_t *rbuf; 2191 2192 if (ctx_impl->free_read_buf != NULL) { 2193 rbuf = ctx_impl->free_read_buf; 2194 ctx_impl->free_read_buf = rbuf->next; 2195 2196 pthread_mutex_unlock(&ctx_impl->mutex); 2197 2198 return rbuf; 2199 } 2200 2201 pthread_mutex_unlock(&ctx_impl->mutex); 2202 2203 rbuf = malloc(sizeof(nxt_unit_read_buf_t)); 2204 2205 return rbuf; 2206 } 2207 2208 2209 static void 2210 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 2211 nxt_unit_read_buf_t *rbuf) 2212 { 2213 nxt_unit_ctx_impl_t *ctx_impl; 2214 2215 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2216 2217 pthread_mutex_lock(&ctx_impl->mutex); 2218 2219 rbuf->next = ctx_impl->free_read_buf; 2220 ctx_impl->free_read_buf = rbuf; 2221 2222 pthread_mutex_unlock(&ctx_impl->mutex); 2223 } 2224 2225 2226 nxt_unit_buf_t * 2227 nxt_unit_buf_next(nxt_unit_buf_t *buf) 2228 { 2229 nxt_unit_mmap_buf_t *mmap_buf; 2230 2231 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2232 2233 if (mmap_buf->next == NULL) { 2234 return NULL; 2235 } 2236 2237 return &mmap_buf->next->buf; 2238 } 2239 2240 2241 uint32_t 2242 nxt_unit_buf_max(void) 2243 { 2244 return PORT_MMAP_DATA_SIZE; 2245 } 2246 2247 2248 uint32_t 2249 nxt_unit_buf_min(void) 2250 { 2251 return PORT_MMAP_CHUNK_SIZE; 2252 } 2253 2254 2255 int 2256 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 2257 size_t size) 2258 { 2259 ssize_t res; 2260 2261 res = nxt_unit_response_write_nb(req, start, size, size); 2262 2263 return res < 0 ? -res : NXT_UNIT_OK; 2264 } 2265 2266 2267 ssize_t 2268 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, 2269 size_t size, size_t min_size) 2270 { 2271 int rc; 2272 ssize_t sent; 2273 uint32_t part_size, min_part_size, buf_size; 2274 const char *part_start; 2275 nxt_unit_mmap_buf_t mmap_buf; 2276 nxt_unit_request_info_impl_t *req_impl; 2277 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2278 2279 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2280 2281 part_start = start; 2282 sent = 0; 2283 2284 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2285 nxt_unit_req_warn(req, "write: response not initialized yet"); 2286 2287 return -NXT_UNIT_ERROR; 2288 } 2289 2290 /* Check if response is not send yet. */ 2291 if (nxt_slow_path(req->response_buf != NULL)) { 2292 part_size = req->response_buf->end - req->response_buf->free; 2293 part_size = nxt_min(size, part_size); 2294 2295 rc = nxt_unit_response_add_content(req, part_start, part_size); 2296 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2297 return -rc; 2298 } 2299 2300 rc = nxt_unit_response_send(req); 2301 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2302 return -rc; 2303 } 2304 2305 size -= part_size; 2306 part_start += part_size; 2307 sent += part_size; 2308 2309 min_size -= nxt_min(min_size, part_size); 2310 } 2311 2312 while (size > 0) { 2313 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 2314 min_part_size = nxt_min(min_size, part_size); 2315 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); 2316 2317 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 2318 &req->response_port, part_size, 2319 min_part_size, &mmap_buf, local_buf); 2320 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2321 return -rc; 2322 } 2323 2324 buf_size = mmap_buf.buf.end - mmap_buf.buf.free; 2325 if (nxt_slow_path(buf_size == 0)) { 2326 return sent; 2327 } 2328 part_size = nxt_min(buf_size, part_size); 2329 2330 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 2331 part_start, part_size); 2332 2333 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); 2334 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2335 return -rc; 2336 } 2337 2338 size -= part_size; 2339 part_start += part_size; 2340 sent += part_size; 2341 2342 min_size -= nxt_min(min_size, part_size); 2343 } 2344 2345 return sent; 2346 } 2347 2348 2349 int 2350 nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 2351 nxt_unit_read_info_t *read_info) 2352 { 2353 int rc; 2354 ssize_t n; 2355 uint32_t buf_size; 2356 nxt_unit_buf_t *buf; 2357 nxt_unit_mmap_buf_t mmap_buf; 2358 nxt_unit_request_info_impl_t *req_impl; 2359 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2360 2361 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2362 2363 /* Check if response is not send yet. */ 2364 if (nxt_slow_path(req->response_buf)) { 2365 2366 /* Enable content in headers buf. */ 2367 rc = nxt_unit_response_add_content(req, "", 0); 2368 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2369 nxt_unit_req_error(req, "Failed to add piggyback content"); 2370 2371 return rc; 2372 } 2373 2374 buf = req->response_buf; 2375 2376 while (buf->end - buf->free > 0) { 2377 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2378 if (nxt_slow_path(n < 0)) { 2379 nxt_unit_req_error(req, "Read error"); 2380 2381 return NXT_UNIT_ERROR; 2382 } 2383 2384 /* Manually increase sizes. */ 2385 buf->free += n; 2386 req->response->piggyback_content_length += n; 2387 2388 if (read_info->eof) { 2389 break; 2390 } 2391 } 2392 2393 rc = nxt_unit_response_send(req); 2394 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2395 nxt_unit_req_error(req, "Failed to send headers with content"); 2396 2397 return rc; 2398 } 2399 2400 if (read_info->eof) { 2401 return NXT_UNIT_OK; 2402 } 2403 } 2404 2405 while (!read_info->eof) { 2406 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", 2407 read_info->buf_size); 2408 2409 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); 2410 2411 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 2412 &req->response_port, 2413 buf_size, buf_size, 2414 &mmap_buf, local_buf); 2415 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2416 return rc; 2417 } 2418 2419 buf = &mmap_buf.buf; 2420 2421 while (!read_info->eof && buf->end > buf->free) { 2422 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2423 if (nxt_slow_path(n < 0)) { 2424 nxt_unit_req_error(req, "Read error"); 2425 2426 nxt_unit_free_outgoing_buf(&mmap_buf); 2427 2428 return NXT_UNIT_ERROR; 2429 } 2430 2431 buf->free += n; 2432 } 2433 2434 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); 2435 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2436 nxt_unit_req_error(req, "Failed to send content"); 2437 2438 return rc; 2439 } 2440 } 2441 2442 return NXT_UNIT_OK; 2443 } 2444 2445 2446 ssize_t 2447 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 2448 { 2449 ssize_t buf_res, res; 2450 2451 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, 2452 dst, size); 2453 2454 if (buf_res < (ssize_t) size && req->content_fd != -1) { 2455 res = read(req->content_fd, dst, size); 2456 if (res < 0) { 2457 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 2458 strerror(errno), errno); 2459 2460 return res; 2461 } 2462 2463 if (res < (ssize_t) size) { 2464 close(req->content_fd); 2465 2466 req->content_fd = -1; 2467 } 2468 2469 req->content_length -= res; 2470 size -= res; 2471 2472 dst = nxt_pointer_to(dst, res); 2473 2474 } else { 2475 res = 0; 2476 } 2477 2478 return buf_res + res; 2479 } 2480 2481 2482 ssize_t 2483 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size) 2484 { 2485 char *p; 2486 size_t l_size, b_size; 2487 nxt_unit_buf_t *b; 2488 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf; 2489 2490 if (req->content_length == 0) { 2491 return 0; 2492 } 2493 2494 l_size = 0; 2495 2496 b = req->content_buf; 2497 2498 while (b != NULL) { 2499 b_size = b->end - b->free; 2500 p = memchr(b->free, '\n', b_size); 2501 2502 if (p != NULL) { 2503 p++; 2504 l_size += p - b->free; 2505 break; 2506 } 2507 2508 l_size += b_size; 2509 2510 if (max_size <= l_size) { 2511 break; 2512 } 2513 2514 mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf); 2515 if (mmap_buf->next == NULL 2516 && req->content_fd != -1 2517 && l_size < req->content_length) 2518 { 2519 preread_buf = nxt_unit_request_preread(req, 16384); 2520 if (nxt_slow_path(preread_buf == NULL)) { 2521 return -1; 2522 } 2523 2524 nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf); 2525 } 2526 2527 b = nxt_unit_buf_next(b); 2528 } 2529 2530 return nxt_min(max_size, l_size); 2531 } 2532 2533 2534 static nxt_unit_mmap_buf_t * 2535 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) 2536 { 2537 ssize_t res; 2538 nxt_unit_mmap_buf_t *mmap_buf; 2539 2540 if (req->content_fd == -1) { 2541 nxt_unit_req_alert(req, "preread: content_fd == -1"); 2542 return NULL; 2543 } 2544 2545 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 2546 if (nxt_slow_path(mmap_buf == NULL)) { 2547 nxt_unit_req_alert(req, "preread: failed to allocate buf"); 2548 return NULL; 2549 } 2550 2551 mmap_buf->free_ptr = malloc(size); 2552 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 2553 nxt_unit_req_alert(req, "preread: failed to allocate buf memory"); 2554 nxt_unit_mmap_buf_release(mmap_buf); 2555 return NULL; 2556 } 2557 2558 mmap_buf->plain_ptr = mmap_buf->free_ptr; 2559 2560 mmap_buf->hdr = NULL; 2561 mmap_buf->buf.start = mmap_buf->free_ptr; 2562 mmap_buf->buf.free = mmap_buf->buf.start; 2563 mmap_buf->buf.end = mmap_buf->buf.start + size; 2564 mmap_buf->process = NULL; 2565 2566 res = read(req->content_fd, mmap_buf->free_ptr, size); 2567 if (res < 0) { 2568 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 2569 strerror(errno), errno); 2570 2571 nxt_unit_mmap_buf_free(mmap_buf); 2572 2573 return NULL; 2574 } 2575 2576 if (res < (ssize_t) size) { 2577 close(req->content_fd); 2578 2579 req->content_fd = -1; 2580 } 2581 2582 nxt_unit_req_debug(req, "preread: read %d", (int) res); 2583 2584 mmap_buf->buf.end = mmap_buf->buf.free + res; 2585 2586 return mmap_buf; 2587 } 2588 2589 2590 static ssize_t 2591 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) 2592 { 2593 u_char *p; 2594 size_t rest, copy, read; 2595 nxt_unit_buf_t *buf, *last_buf; 2596 2597 p = dst; 2598 rest = size; 2599 2600 buf = *b; 2601 last_buf = buf; 2602 2603 while (buf != NULL) { 2604 last_buf = buf; 2605 2606 copy = buf->end - buf->free; 2607 copy = nxt_min(rest, copy); 2608 2609 p = nxt_cpymem(p, buf->free, copy); 2610 2611 buf->free += copy; 2612 rest -= copy; 2613 2614 if (rest == 0) { 2615 if (buf->end == buf->free) { 2616 buf = nxt_unit_buf_next(buf); 2617 } 2618 2619 break; 2620 } 2621 2622 buf = nxt_unit_buf_next(buf); 2623 } 2624 2625 *b = last_buf; 2626 2627 read = size - rest; 2628 2629 *len -= read; 2630 2631 return read; 2632 } 2633 2634 2635 void 2636 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 2637 { 2638 ssize_t res; 2639 uint32_t size; 2640 nxt_port_msg_t msg; 2641 nxt_unit_impl_t *lib; 2642 nxt_unit_request_info_impl_t *req_impl; 2643 2644 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2645 2646 nxt_unit_req_debug(req, "done: %d", rc); 2647 2648 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2649 goto skip_response_send; 2650 } 2651 2652 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2653 2654 size = nxt_length("Content-Type") + nxt_length("text/plain"); 2655 2656 rc = nxt_unit_response_init(req, 200, 1, size); 2657 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2658 goto skip_response_send; 2659 } 2660 2661 rc = nxt_unit_response_add_field(req, "Content-Type", 2662 nxt_length("Content-Type"), 2663 "text/plain", nxt_length("text/plain")); 2664 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2665 goto skip_response_send; 2666 } 2667 } 2668 2669 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 2670 2671 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 2672 2673 nxt_unit_buf_send_done(req->response_buf); 2674 2675 return; 2676 } 2677 2678 skip_response_send: 2679 2680 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); 2681 2682 msg.stream = req_impl->stream; 2683 msg.pid = lib->pid; 2684 msg.reply_port = 0; 2685 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 2686 : _NXT_PORT_MSG_RPC_ERROR; 2687 msg.last = 1; 2688 msg.mmap = 0; 2689 msg.nf = 0; 2690 msg.mf = 0; 2691 msg.tracking = 0; 2692 2693 res = lib->callbacks.port_send(req->ctx, &req->response_port, 2694 &msg, sizeof(msg), NULL, 0); 2695 if (nxt_slow_path(res != sizeof(msg))) { 2696 nxt_unit_req_alert(req, "last message send failed: %s (%d)", 2697 strerror(errno), errno); 2698 } 2699 2700 nxt_unit_request_info_release(req); 2701 } 2702 2703 2704 int 2705 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 2706 uint8_t last, const void *start, size_t size) 2707 { 2708 const struct iovec iov = { (void *) start, size }; 2709 2710 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1); 2711 } 2712 2713 2714 int 2715 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, 2716 uint8_t last, const struct iovec *iov, int iovcnt) 2717 { 2718 int i, rc; 2719 size_t l, copy; 2720 uint32_t payload_len, buf_size, alloc_size; 2721 const uint8_t *b; 2722 nxt_unit_buf_t *buf; 2723 nxt_unit_mmap_buf_t mmap_buf; 2724 nxt_websocket_header_t *wh; 2725 nxt_unit_request_info_impl_t *req_impl; 2726 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2727 2728 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2729 2730 payload_len = 0; 2731 2732 for (i = 0; i < iovcnt; i++) { 2733 payload_len += iov[i].iov_len; 2734 } 2735 2736 buf_size = 10 + payload_len; 2737 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 2738 2739 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 2740 &req->response_port, 2741 alloc_size, alloc_size, 2742 &mmap_buf, local_buf); 2743 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2744 return rc; 2745 } 2746 2747 buf = &mmap_buf.buf; 2748 2749 buf->start[0] = 0; 2750 buf->start[1] = 0; 2751 2752 buf_size -= buf->end - buf->start; 2753 2754 wh = (void *) buf->free; 2755 2756 buf->free = nxt_websocket_frame_init(wh, payload_len); 2757 wh->fin = last; 2758 wh->opcode = opcode; 2759 2760 for (i = 0; i < iovcnt; i++) { 2761 b = iov[i].iov_base; 2762 l = iov[i].iov_len; 2763 2764 while (l > 0) { 2765 copy = buf->end - buf->free; 2766 copy = nxt_min(l, copy); 2767 2768 buf->free = nxt_cpymem(buf->free, b, copy); 2769 b += copy; 2770 l -= copy; 2771 2772 if (l > 0) { 2773 if (nxt_fast_path(buf->free > buf->start)) { 2774 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, 2775 &mmap_buf, 0); 2776 2777 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2778 return rc; 2779 } 2780 } 2781 2782 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 2783 2784 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 2785 &req->response_port, 2786 alloc_size, alloc_size, 2787 &mmap_buf, local_buf); 2788 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2789 return rc; 2790 } 2791 2792 buf_size -= buf->end - buf->start; 2793 } 2794 } 2795 } 2796 2797 if (buf->free > buf->start) { 2798 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, 2799 &mmap_buf, 0); 2800 } 2801 2802 return rc; 2803 } 2804 2805 2806 ssize_t 2807 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, 2808 size_t size) 2809 { 2810 ssize_t res; 2811 uint8_t *b; 2812 uint64_t i, d; 2813 2814 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length, 2815 dst, size); 2816 2817 if (ws->mask == NULL) { 2818 return res; 2819 } 2820 2821 b = dst; 2822 d = (ws->payload_len - ws->content_length - res) % 4; 2823 2824 for (i = 0; i < (uint64_t) res; i++) { 2825 b[i] ^= ws->mask[ (i + d) % 4 ]; 2826 } 2827 2828 return res; 2829 } 2830 2831 2832 int 2833 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) 2834 { 2835 char *b; 2836 size_t size; 2837 nxt_unit_websocket_frame_impl_t *ws_impl; 2838 2839 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 2840 2841 if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) { 2842 return NXT_UNIT_OK; 2843 } 2844 2845 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start; 2846 2847 b = malloc(size); 2848 if (nxt_slow_path(b == NULL)) { 2849 return NXT_UNIT_ERROR; 2850 } 2851 2852 memcpy(b, ws_impl->buf->buf.start, size); 2853 2854 ws_impl->buf->buf.start = b; 2855 ws_impl->buf->buf.free = b; 2856 ws_impl->buf->buf.end = b + size; 2857 2858 ws_impl->buf->free_ptr = b; 2859 2860 return NXT_UNIT_OK; 2861 } 2862 2863 2864 void 2865 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) 2866 { 2867 nxt_unit_websocket_frame_release(ws); 2868 } 2869 2870 2871 static nxt_port_mmap_header_t * 2872 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 2873 nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n) 2874 { 2875 int res, nchunks, i; 2876 uint32_t outgoing_size; 2877 nxt_unit_mmap_t *mm, *mm_end; 2878 nxt_unit_impl_t *lib; 2879 nxt_port_mmap_header_t *hdr; 2880 2881 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2882 2883 pthread_mutex_lock(&process->outgoing.mutex); 2884 2885 retry: 2886 2887 outgoing_size = process->outgoing.size; 2888 2889 mm_end = process->outgoing.elts + outgoing_size; 2890 2891 for (mm = process->outgoing.elts; mm < mm_end; mm++) { 2892 hdr = mm->hdr; 2893 2894 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) { 2895 continue; 2896 } 2897 2898 *c = 0; 2899 2900 while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) { 2901 nchunks = 1; 2902 2903 while (nchunks < *n) { 2904 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, 2905 *c + nchunks); 2906 2907 if (res == 0) { 2908 if (nchunks >= min_n) { 2909 *n = nchunks; 2910 2911 goto unlock; 2912 } 2913 2914 for (i = 0; i < nchunks; i++) { 2915 nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i); 2916 } 2917 2918 *c += nchunks + 1; 2919 nchunks = 0; 2920 break; 2921 } 2922 2923 nchunks++; 2924 } 2925 2926 if (nchunks >= min_n) { 2927 *n = nchunks; 2928 2929 goto unlock; 2930 } 2931 } 2932 2933 hdr->oosm = 1; 2934 } 2935 2936 if (outgoing_size >= lib->shm_mmap_limit) { 2937 /* Cannot allocate more shared memory. */ 2938 pthread_mutex_unlock(&process->outgoing.mutex); 2939 2940 if (min_n == 0) { 2941 *n = 0; 2942 } 2943 2944 if (nxt_slow_path(process->outgoing.allocated_chunks + min_n 2945 >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) 2946 { 2947 /* Memory allocated by application, but not send to router. */ 2948 return NULL; 2949 } 2950 2951 /* Notify router about OOSM condition. */ 2952 2953 res = nxt_unit_send_oosm(ctx, port_id); 2954 if (nxt_slow_path(res != NXT_UNIT_OK)) { 2955 return NULL; 2956 } 2957 2958 /* Return if caller can handle OOSM condition. Non-blocking mode. */ 2959 2960 if (min_n == 0) { 2961 return NULL; 2962 } 2963 2964 nxt_unit_debug(ctx, "oosm: waiting for ACK"); 2965 2966 res = nxt_unit_wait_shm_ack(ctx); 2967 if (nxt_slow_path(res != NXT_UNIT_OK)) { 2968 return NULL; 2969 } 2970 2971 nxt_unit_debug(ctx, "oosm: retry"); 2972 2973 pthread_mutex_lock(&process->outgoing.mutex); 2974 2975 goto retry; 2976 } 2977 2978 *c = 0; 2979 hdr = nxt_unit_new_mmap(ctx, process, port_id, *n); 2980 2981 unlock: 2982 2983 nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n); 2984 2985 nxt_unit_debug(ctx, "process %d allocated_chunks %d", 2986 process->pid, 2987 (int) process->outgoing.allocated_chunks); 2988 2989 pthread_mutex_unlock(&process->outgoing.mutex); 2990 2991 return hdr; 2992 } 2993 2994 2995 static int 2996 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 2997 { 2998 ssize_t res; 2999 nxt_port_msg_t msg; 3000 nxt_unit_impl_t *lib; 3001 3002 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3003 3004 msg.stream = 0; 3005 msg.pid = lib->pid; 3006 msg.reply_port = 0; 3007 msg.type = _NXT_PORT_MSG_OOSM; 3008 msg.last = 0; 3009 msg.mmap = 0; 3010 msg.nf = 0; 3011 msg.mf = 0; 3012 msg.tracking = 0; 3013 3014 res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); 3015 if (nxt_slow_path(res != sizeof(msg))) { 3016 nxt_unit_warn(ctx, "failed to send oosm to %d: %s (%d)", 3017 (int) port_id->pid, strerror(errno), errno); 3018 3019 return NXT_UNIT_ERROR; 3020 } 3021 3022 return NXT_UNIT_OK; 3023 } 3024 3025 3026 static int 3027 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) 3028 { 3029 nxt_port_msg_t *port_msg; 3030 nxt_unit_ctx_impl_t *ctx_impl; 3031 nxt_unit_read_buf_t *rbuf; 3032 3033 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3034 3035 while (1) { 3036 rbuf = nxt_unit_read_buf_get(ctx); 3037 if (nxt_slow_path(rbuf == NULL)) { 3038 return NXT_UNIT_ERROR; 3039 } 3040 3041 nxt_unit_read_buf(ctx, rbuf); 3042 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 3043 nxt_unit_read_buf_release(ctx, rbuf); 3044 3045 return NXT_UNIT_ERROR; 3046 } 3047 3048 port_msg = (nxt_port_msg_t *) rbuf->buf; 3049 3050 if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) { 3051 nxt_unit_read_buf_release(ctx, rbuf); 3052 3053 break; 3054 } 3055 3056 pthread_mutex_lock(&ctx_impl->mutex); 3057 3058 *ctx_impl->pending_read_tail = rbuf; 3059 ctx_impl->pending_read_tail = &rbuf->next; 3060 rbuf->next = NULL; 3061 3062 pthread_mutex_unlock(&ctx_impl->mutex); 3063 3064 if (port_msg->type == _NXT_PORT_MSG_QUIT) { 3065 nxt_unit_debug(ctx, "oosm: quit received"); 3066 3067 return NXT_UNIT_ERROR; 3068 } 3069 } 3070 3071 return NXT_UNIT_OK; 3072 } 3073 3074 3075 static nxt_unit_mmap_t * 3076 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) 3077 { 3078 uint32_t cap; 3079 3080 cap = mmaps->cap; 3081 3082 if (cap == 0) { 3083 cap = i + 1; 3084 } 3085 3086 while (i + 1 > cap) { 3087 3088 if (cap < 16) { 3089 cap = cap * 2; 3090 3091 } else { 3092 cap = cap + cap / 2; 3093 } 3094 } 3095 3096 if (cap != mmaps->cap) { 3097 3098 mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts)); 3099 if (nxt_slow_path(mmaps->elts == NULL)) { 3100 return NULL; 3101 } 3102 3103 memset(mmaps->elts + mmaps->cap, 0, 3104 sizeof(*mmaps->elts) * (cap - mmaps->cap)); 3105 3106 mmaps->cap = cap; 3107 } 3108 3109 if (i + 1 > mmaps->size) { 3110 mmaps->size = i + 1; 3111 } 3112 3113 return mmaps->elts + i; 3114 } 3115 3116 3117 static nxt_port_mmap_header_t * 3118 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 3119 nxt_unit_port_id_t *port_id, int n) 3120 { 3121 int i, fd, rc; 3122 void *mem; 3123 char name[64]; 3124 nxt_unit_mmap_t *mm; 3125 nxt_unit_impl_t *lib; 3126 nxt_port_mmap_header_t *hdr; 3127 3128 lib = process->lib; 3129 3130 mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size); 3131 if (nxt_slow_path(mm == NULL)) { 3132 nxt_unit_warn(ctx, "failed to add mmap to outgoing array"); 3133 3134 return NULL; 3135 } 3136 3137 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", 3138 lib->pid, (void *) pthread_self()); 3139 3140 #if (NXT_HAVE_MEMFD_CREATE) 3141 3142 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 3143 if (nxt_slow_path(fd == -1)) { 3144 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, 3145 strerror(errno), errno); 3146 3147 goto remove_fail; 3148 } 3149 3150 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); 3151 3152 #elif (NXT_HAVE_SHM_OPEN_ANON) 3153 3154 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 3155 if (nxt_slow_path(fd == -1)) { 3156 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", 3157 strerror(errno), errno); 3158 3159 goto remove_fail; 3160 } 3161 3162 #elif (NXT_HAVE_SHM_OPEN) 3163 3164 /* Just in case. */ 3165 shm_unlink(name); 3166 3167 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 3168 if (nxt_slow_path(fd == -1)) { 3169 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, 3170 strerror(errno), errno); 3171 3172 goto remove_fail; 3173 } 3174 3175 if (nxt_slow_path(shm_unlink(name) == -1)) { 3176 nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name, 3177 strerror(errno), errno); 3178 } 3179 3180 #else 3181 3182 #error No working shared memory implementation. 3183 3184 #endif 3185 3186 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { 3187 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, 3188 strerror(errno), errno); 3189 3190 goto remove_fail; 3191 } 3192 3193 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 3194 if (nxt_slow_path(mem == MAP_FAILED)) { 3195 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, 3196 strerror(errno), errno); 3197 3198 goto remove_fail; 3199 } 3200 3201 mm->hdr = mem; 3202 hdr = mem; 3203 3204 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 3205 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 3206 3207 hdr->id = process->outgoing.size - 1; 3208 hdr->src_pid = lib->pid; 3209 hdr->dst_pid = process->pid; 3210 hdr->sent_over = port_id->id; 3211 3212 /* Mark first n chunk(s) as busy */ 3213 for (i = 0; i < n; i++) { 3214 nxt_port_mmap_set_chunk_busy(hdr->free_map, i); 3215 } 3216 3217 /* Mark as busy chunk followed the last available chunk. */ 3218 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 3219 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 3220 3221 pthread_mutex_unlock(&process->outgoing.mutex); 3222 3223 rc = nxt_unit_send_mmap(ctx, port_id, fd); 3224 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3225 munmap(mem, PORT_MMAP_SIZE); 3226 hdr = NULL; 3227 3228 } else { 3229 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", 3230 hdr->id, (int) lib->pid, (int) process->pid); 3231 } 3232 3233 close(fd); 3234 3235 pthread_mutex_lock(&process->outgoing.mutex); 3236 3237 if (nxt_fast_path(hdr != NULL)) { 3238 return hdr; 3239 } 3240 3241 remove_fail: 3242 3243 process->outgoing.size--; 3244 3245 return NULL; 3246 } 3247 3248 3249 static int 3250 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) 3251 { 3252 ssize_t res; 3253 nxt_port_msg_t msg; 3254 nxt_unit_impl_t *lib; 3255 union { 3256 struct cmsghdr cm; 3257 char space[CMSG_SPACE(sizeof(int))]; 3258 } cmsg; 3259 3260 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3261 3262 msg.stream = 0; 3263 msg.pid = lib->pid; 3264 msg.reply_port = 0; 3265 msg.type = _NXT_PORT_MSG_MMAP; 3266 msg.last = 0; 3267 msg.mmap = 0; 3268 msg.nf = 0; 3269 msg.mf = 0; 3270 msg.tracking = 0; 3271 3272 /* 3273 * Fill all padding fields with 0. 3274 * Code in Go 1.11 validate cmsghdr using padding field as part of len. 3275 * See Cmsghdr definition and socketControlMessageHeaderAndData function. 3276 */ 3277 memset(&cmsg, 0, sizeof(cmsg)); 3278 3279 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 3280 cmsg.cm.cmsg_level = SOL_SOCKET; 3281 cmsg.cm.cmsg_type = SCM_RIGHTS; 3282 3283 /* 3284 * memcpy() is used instead of simple 3285 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 3286 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 3287 * dereferencing type-punned pointer will break strict-aliasing rules 3288 * 3289 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 3290 * in the same simple assignment as in the code above. 3291 */ 3292 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 3293 3294 res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), 3295 &cmsg, sizeof(cmsg)); 3296 if (nxt_slow_path(res != sizeof(msg))) { 3297 nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)", 3298 (int) port_id->pid, strerror(errno), errno); 3299 3300 return NXT_UNIT_ERROR; 3301 } 3302 3303 return NXT_UNIT_OK; 3304 } 3305 3306 3307 static int 3308 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 3309 nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size, 3310 nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) 3311 { 3312 int nchunks, min_nchunks; 3313 nxt_chunk_id_t c; 3314 nxt_port_mmap_header_t *hdr; 3315 3316 if (size <= NXT_UNIT_MAX_PLAIN_SIZE) { 3317 if (local_buf != NULL) { 3318 mmap_buf->free_ptr = NULL; 3319 mmap_buf->plain_ptr = local_buf; 3320 3321 } else { 3322 mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t)); 3323 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 3324 return NXT_UNIT_ERROR; 3325 } 3326 3327 mmap_buf->plain_ptr = mmap_buf->free_ptr; 3328 } 3329 3330 mmap_buf->hdr = NULL; 3331 mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); 3332 mmap_buf->buf.free = mmap_buf->buf.start; 3333 mmap_buf->buf.end = mmap_buf->buf.start + size; 3334 mmap_buf->port_id = *port_id; 3335 mmap_buf->process = process; 3336 3337 nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", 3338 mmap_buf->buf.start, (int) size); 3339 3340 return NXT_UNIT_OK; 3341 } 3342 3343 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3344 min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3345 3346 hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks); 3347 if (nxt_slow_path(hdr == NULL)) { 3348 if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) { 3349 mmap_buf->hdr = NULL; 3350 mmap_buf->buf.start = NULL; 3351 mmap_buf->buf.free = NULL; 3352 mmap_buf->buf.end = NULL; 3353 mmap_buf->free_ptr = NULL; 3354 3355 return NXT_UNIT_OK; 3356 } 3357 3358 return NXT_UNIT_ERROR; 3359 } 3360 3361 mmap_buf->hdr = hdr; 3362 mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); 3363 mmap_buf->buf.free = mmap_buf->buf.start; 3364 mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; 3365 mmap_buf->port_id = *port_id; 3366 mmap_buf->process = process; 3367 mmap_buf->free_ptr = NULL; 3368 mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3369 3370 nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", 3371 (int) hdr->id, (int) c, 3372 (int) (nchunks * PORT_MMAP_CHUNK_SIZE)); 3373 3374 return NXT_UNIT_OK; 3375 } 3376 3377 3378 static int 3379 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) 3380 { 3381 int rc; 3382 void *mem; 3383 struct stat mmap_stat; 3384 nxt_unit_mmap_t *mm; 3385 nxt_unit_impl_t *lib; 3386 nxt_unit_process_t *process; 3387 nxt_port_mmap_header_t *hdr; 3388 3389 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3390 3391 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); 3392 3393 pthread_mutex_lock(&lib->mutex); 3394 3395 process = nxt_unit_process_find(ctx, pid, 0); 3396 3397 pthread_mutex_unlock(&lib->mutex); 3398 3399 if (nxt_slow_path(process == NULL)) { 3400 nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d", 3401 (int) pid, fd); 3402 3403 return NXT_UNIT_ERROR; 3404 } 3405 3406 rc = NXT_UNIT_ERROR; 3407 3408 if (fstat(fd, &mmap_stat) == -1) { 3409 nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, 3410 strerror(errno), errno); 3411 3412 goto fail; 3413 } 3414 3415 mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE, 3416 MAP_SHARED, fd, 0); 3417 if (nxt_slow_path(mem == MAP_FAILED)) { 3418 nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)", 3419 strerror(errno), errno); 3420 3421 goto fail; 3422 } 3423 3424 hdr = mem; 3425 3426 if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) { 3427 3428 nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " 3429 "detected: %d != %d or %d != %d", (int) hdr->src_pid, 3430 (int) pid, (int) hdr->dst_pid, (int) lib->pid); 3431 3432 munmap(mem, PORT_MMAP_SIZE); 3433 3434 goto fail; 3435 } 3436 3437 pthread_mutex_lock(&process->incoming.mutex); 3438 3439 mm = nxt_unit_mmap_at(&process->incoming, hdr->id); 3440 if (nxt_slow_path(mm == NULL)) { 3441 nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array"); 3442 3443 munmap(mem, PORT_MMAP_SIZE); 3444 3445 } else { 3446 mm->hdr = hdr; 3447 3448 hdr->sent_over = 0xFFFFu; 3449 3450 rc = NXT_UNIT_OK; 3451 } 3452 3453 pthread_mutex_unlock(&process->incoming.mutex); 3454 3455 fail: 3456 3457 nxt_unit_process_use(ctx, process, -1); 3458 3459 return rc; 3460 } 3461 3462 3463 static void 3464 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) 3465 { 3466 pthread_mutex_init(&mmaps->mutex, NULL); 3467 3468 mmaps->size = 0; 3469 mmaps->cap = 0; 3470 mmaps->elts = NULL; 3471 mmaps->allocated_chunks = 0; 3472 } 3473 3474 3475 static void 3476 nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i) 3477 { 3478 long c; 3479 3480 c = nxt_atomic_fetch_add(&process->use_count, i); 3481 3482 if (i < 0 && c == -i) { 3483 nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid); 3484 3485 nxt_unit_mmaps_destroy(&process->incoming); 3486 nxt_unit_mmaps_destroy(&process->outgoing); 3487 3488 free(process); 3489 } 3490 } 3491 3492 3493 static void 3494 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) 3495 { 3496 nxt_unit_mmap_t *mm, *end; 3497 3498 if (mmaps->elts != NULL) { 3499 end = mmaps->elts + mmaps->size; 3500 3501 for (mm = mmaps->elts; mm < end; mm++) { 3502 munmap(mm->hdr, PORT_MMAP_SIZE); 3503 } 3504 3505 free(mmaps->elts); 3506 } 3507 3508 pthread_mutex_destroy(&mmaps->mutex); 3509 } 3510 3511 3512 static nxt_port_mmap_header_t * 3513 nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 3514 uint32_t id) 3515 { 3516 nxt_port_mmap_header_t *hdr; 3517 3518 if (nxt_fast_path(process->incoming.size > id)) { 3519 hdr = process->incoming.elts[id].hdr; 3520 3521 } else { 3522 hdr = NULL; 3523 } 3524 3525 return hdr; 3526 } 3527 3528 3529 static int 3530 nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 3531 { 3532 int rc; 3533 nxt_chunk_id_t c; 3534 nxt_unit_process_t *process; 3535 nxt_port_mmap_header_t *hdr; 3536 nxt_port_mmap_tracking_msg_t *tracking_msg; 3537 3538 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 3539 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", 3540 recv_msg->stream, (int) recv_msg->size); 3541 3542 return 0; 3543 } 3544 3545 tracking_msg = recv_msg->start; 3546 3547 recv_msg->start = tracking_msg + 1; 3548 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); 3549 3550 process = nxt_unit_msg_get_process(ctx, recv_msg); 3551 if (nxt_slow_path(process == NULL)) { 3552 return 0; 3553 } 3554 3555 pthread_mutex_lock(&process->incoming.mutex); 3556 3557 hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id); 3558 if (nxt_slow_path(hdr == NULL)) { 3559 pthread_mutex_unlock(&process->incoming.mutex); 3560 3561 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " 3562 "invalid mmap id %d,%"PRIu32, 3563 recv_msg->stream, (int) process->pid, 3564 tracking_msg->mmap_id); 3565 3566 return 0; 3567 } 3568 3569 c = tracking_msg->tracking_id; 3570 rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); 3571 3572 if (rc == 0) { 3573 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", 3574 recv_msg->stream); 3575 3576 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 3577 } 3578 3579 pthread_mutex_unlock(&process->incoming.mutex); 3580 3581 return rc; 3582 } 3583 3584 3585 static int 3586 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 3587 { 3588 void *start; 3589 uint32_t size; 3590 nxt_unit_process_t *process; 3591 nxt_unit_mmap_buf_t *b, **incoming_tail; 3592 nxt_port_mmap_msg_t *mmap_msg, *end; 3593 nxt_port_mmap_header_t *hdr; 3594 3595 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 3596 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", 3597 recv_msg->stream, (int) recv_msg->size); 3598 3599 return NXT_UNIT_ERROR; 3600 } 3601 3602 process = nxt_unit_msg_get_process(ctx, recv_msg); 3603 if (nxt_slow_path(process == NULL)) { 3604 return NXT_UNIT_ERROR; 3605 } 3606 3607 mmap_msg = recv_msg->start; 3608 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 3609 3610 incoming_tail = &recv_msg->incoming_buf; 3611 3612 for (; mmap_msg < end; mmap_msg++) { 3613 b = nxt_unit_mmap_buf_get(ctx); 3614 if (nxt_slow_path(b == NULL)) { 3615 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", 3616 recv_msg->stream); 3617 3618 return NXT_UNIT_ERROR; 3619 } 3620 3621 nxt_unit_mmap_buf_insert(incoming_tail, b); 3622 incoming_tail = &b->next; 3623 } 3624 3625 b = recv_msg->incoming_buf; 3626 mmap_msg = recv_msg->start; 3627 3628 pthread_mutex_lock(&process->incoming.mutex); 3629 3630 for (; mmap_msg < end; mmap_msg++) { 3631 hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id); 3632 if (nxt_slow_path(hdr == NULL)) { 3633 pthread_mutex_unlock(&process->incoming.mutex); 3634 3635 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " 3636 "invalid mmap id %d,%"PRIu32, 3637 recv_msg->stream, (int) process->pid, 3638 mmap_msg->mmap_id); 3639 3640 return NXT_UNIT_ERROR; 3641 } 3642 3643 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 3644 size = mmap_msg->size; 3645 3646 if (recv_msg->start == mmap_msg) { 3647 recv_msg->start = start; 3648 recv_msg->size = size; 3649 } 3650 3651 b->buf.start = start; 3652 b->buf.free = start; 3653 b->buf.end = b->buf.start + size; 3654 b->hdr = hdr; 3655 b->process = process; 3656 3657 b = b->next; 3658 3659 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", 3660 recv_msg->stream, 3661 start, (int) size, 3662 (int) hdr->src_pid, (int) hdr->dst_pid, 3663 (int) hdr->id, (int) mmap_msg->chunk_id, 3664 (int) mmap_msg->size); 3665 } 3666 3667 pthread_mutex_unlock(&process->incoming.mutex); 3668 3669 return NXT_UNIT_OK; 3670 } 3671 3672 3673 static void 3674 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 3675 nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, 3676 void *start, uint32_t size) 3677 { 3678 int freed_chunks; 3679 u_char *p, *end; 3680 nxt_chunk_id_t c; 3681 nxt_unit_impl_t *lib; 3682 3683 memset(start, 0xA5, size); 3684 3685 p = start; 3686 end = p + size; 3687 c = nxt_port_mmap_chunk_id(hdr, p); 3688 freed_chunks = 0; 3689 3690 while (p < end) { 3691 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 3692 3693 p += PORT_MMAP_CHUNK_SIZE; 3694 c++; 3695 freed_chunks++; 3696 } 3697 3698 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3699 3700 if (hdr->src_pid == lib->pid && freed_chunks != 0) { 3701 nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, 3702 -freed_chunks); 3703 3704 nxt_unit_debug(ctx, "process %d allocated_chunks %d", 3705 process->pid, 3706 (int) process->outgoing.allocated_chunks); 3707 } 3708 3709 if (hdr->dst_pid == lib->pid 3710 && freed_chunks != 0 3711 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) 3712 { 3713 nxt_unit_send_shm_ack(ctx, hdr->src_pid); 3714 } 3715 } 3716 3717 3718 static int 3719 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) 3720 { 3721 ssize_t res; 3722 nxt_port_msg_t msg; 3723 nxt_unit_impl_t *lib; 3724 nxt_unit_port_id_t port_id; 3725 3726 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3727 3728 nxt_unit_port_id_init(&port_id, pid, 0); 3729 3730 msg.stream = 0; 3731 msg.pid = lib->pid; 3732 msg.reply_port = 0; 3733 msg.type = _NXT_PORT_MSG_SHM_ACK; 3734 msg.last = 0; 3735 msg.mmap = 0; 3736 msg.nf = 0; 3737 msg.mf = 0; 3738 msg.tracking = 0; 3739 3740 res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); 3741 if (nxt_slow_path(res != sizeof(msg))) { 3742 nxt_unit_warn(ctx, "failed to send ack to %d: %s (%d)", 3743 (int) port_id.pid, strerror(errno), errno); 3744 3745 return NXT_UNIT_ERROR; 3746 } 3747 3748 return NXT_UNIT_OK; 3749 } 3750 3751 3752 static nxt_int_t 3753 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 3754 { 3755 nxt_process_t *process; 3756 3757 process = data; 3758 3759 if (lhq->key.length == sizeof(pid_t) 3760 && *(pid_t *) lhq->key.start == process->pid) 3761 { 3762 return NXT_OK; 3763 } 3764 3765 return NXT_DECLINED; 3766 } 3767 3768 3769 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 3770 NXT_LVLHSH_DEFAULT, 3771 nxt_unit_lvlhsh_pid_test, 3772 nxt_lvlhsh_alloc, 3773 nxt_lvlhsh_free, 3774 }; 3775 3776 3777 static inline void 3778 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) 3779 { 3780 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 3781 lhq->key.length = sizeof(*pid); 3782 lhq->key.start = (u_char *) pid; 3783 lhq->proto = &lvlhsh_processes_proto; 3784 } 3785 3786 3787 static nxt_unit_process_t * 3788 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) 3789 { 3790 nxt_unit_impl_t *lib; 3791 nxt_unit_process_t *process; 3792 nxt_lvlhsh_query_t lhq; 3793 3794 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3795 3796 nxt_unit_process_lhq_pid(&lhq, &pid); 3797 3798 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { 3799 process = lhq.value; 3800 nxt_unit_process_use(ctx, process, 1); 3801 3802 return process; 3803 } 3804 3805 process = malloc(sizeof(nxt_unit_process_t)); 3806 if (nxt_slow_path(process == NULL)) { 3807 nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid); 3808 3809 return NULL; 3810 } 3811 3812 process->pid = pid; 3813 process->use_count = 1; 3814 process->next_port_id = 0; 3815 process->lib = lib; 3816 3817 nxt_queue_init(&process->ports); 3818 3819 nxt_unit_mmaps_init(&process->incoming); 3820 nxt_unit_mmaps_init(&process->outgoing); 3821 3822 lhq.replace = 0; 3823 lhq.value = process; 3824 3825 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) { 3826 3827 case NXT_OK: 3828 break; 3829 3830 default: 3831 nxt_unit_warn(ctx, "process %d insert failed", (int) pid); 3832 3833 pthread_mutex_destroy(&process->outgoing.mutex); 3834 pthread_mutex_destroy(&process->incoming.mutex); 3835 free(process); 3836 process = NULL; 3837 break; 3838 } 3839 3840 nxt_unit_process_use(ctx, process, 1); 3841 3842 return process; 3843 } 3844 3845 3846 static nxt_unit_process_t * 3847 nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove) 3848 { 3849 int rc; 3850 nxt_unit_impl_t *lib; 3851 nxt_unit_process_t *process; 3852 nxt_lvlhsh_query_t lhq; 3853 3854 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3855 3856 nxt_unit_process_lhq_pid(&lhq, &pid); 3857 3858 if (remove) { 3859 rc = nxt_lvlhsh_delete(&lib->processes, &lhq); 3860 3861 } else { 3862 rc = nxt_lvlhsh_find(&lib->processes, &lhq); 3863 } 3864 3865 if (rc == NXT_OK) { 3866 process = lhq.value; 3867 3868 if (!remove) { 3869 nxt_unit_process_use(ctx, process, 1); 3870 } 3871 3872 return process; 3873 } 3874 3875 return NULL; 3876 } 3877 3878 3879 static nxt_unit_process_t * 3880 nxt_unit_process_pop_first(nxt_unit_impl_t *lib) 3881 { 3882 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL); 3883 } 3884 3885 3886 int 3887 nxt_unit_run(nxt_unit_ctx_t *ctx) 3888 { 3889 int rc; 3890 nxt_unit_impl_t *lib; 3891 3892 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3893 rc = NXT_UNIT_OK; 3894 3895 while (nxt_fast_path(lib->online)) { 3896 rc = nxt_unit_run_once(ctx); 3897 } 3898 3899 return rc; 3900 } 3901 3902 3903 int 3904 nxt_unit_run_once(nxt_unit_ctx_t *ctx) 3905 { 3906 int rc; 3907 nxt_unit_ctx_impl_t *ctx_impl; 3908 nxt_unit_read_buf_t *rbuf; 3909 3910 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3911 3912 pthread_mutex_lock(&ctx_impl->mutex); 3913 3914 if (ctx_impl->pending_read_head != NULL) { 3915 rbuf = ctx_impl->pending_read_head; 3916 ctx_impl->pending_read_head = rbuf->next; 3917 3918 if (ctx_impl->pending_read_tail == &rbuf->next) { 3919 ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; 3920 } 3921 3922 pthread_mutex_unlock(&ctx_impl->mutex); 3923 3924 } else { 3925 rbuf = nxt_unit_read_buf_get_impl(ctx_impl); 3926 if (nxt_slow_path(rbuf == NULL)) { 3927 return NXT_UNIT_ERROR; 3928 } 3929 3930 nxt_unit_read_buf(ctx, rbuf); 3931 } 3932 3933 if (nxt_fast_path(rbuf->size > 0)) { 3934 rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, 3935 rbuf->buf, rbuf->size, 3936 rbuf->oob, sizeof(rbuf->oob)); 3937 3938 #if (NXT_DEBUG) 3939 memset(rbuf->buf, 0xAC, rbuf->size); 3940 #endif 3941 3942 } else { 3943 rc = NXT_UNIT_ERROR; 3944 } 3945 3946 nxt_unit_read_buf_release(ctx, rbuf); 3947 3948 return rc; 3949 } 3950 3951 3952 static void 3953 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) 3954 { 3955 nxt_unit_impl_t *lib; 3956 nxt_unit_ctx_impl_t *ctx_impl; 3957 3958 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3959 3960 memset(rbuf->oob, 0, sizeof(struct cmsghdr)); 3961 3962 if (ctx_impl->read_port_fd != -1) { 3963 rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd, 3964 rbuf->buf, sizeof(rbuf->buf), 3965 rbuf->oob, sizeof(rbuf->oob)); 3966 3967 } else { 3968 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3969 3970 rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, 3971 rbuf->buf, sizeof(rbuf->buf), 3972 rbuf->oob, sizeof(rbuf->oob)); 3973 } 3974 } 3975 3976 3977 void 3978 nxt_unit_done(nxt_unit_ctx_t *ctx) 3979 { 3980 nxt_unit_impl_t *lib; 3981 nxt_unit_process_t *process; 3982 nxt_unit_ctx_impl_t *ctx_impl; 3983 3984 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3985 3986 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { 3987 3988 nxt_unit_ctx_free(&ctx_impl->ctx); 3989 3990 } nxt_queue_loop; 3991 3992 for ( ;; ) { 3993 pthread_mutex_lock(&lib->mutex); 3994 3995 process = nxt_unit_process_pop_first(lib); 3996 if (process == NULL) { 3997 pthread_mutex_unlock(&lib->mutex); 3998 3999 break; 4000 } 4001 4002 nxt_unit_remove_process(ctx, process); 4003 } 4004 4005 pthread_mutex_destroy(&lib->mutex); 4006 4007 free(lib); 4008 } 4009 4010 4011 nxt_unit_ctx_t * 4012 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) 4013 { 4014 int rc, fd; 4015 nxt_unit_impl_t *lib; 4016 nxt_unit_port_id_t new_port_id; 4017 nxt_unit_ctx_impl_t *new_ctx; 4018 4019 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4020 4021 new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size); 4022 if (nxt_slow_path(new_ctx == NULL)) { 4023 nxt_unit_warn(ctx, "failed to allocate context"); 4024 4025 return NULL; 4026 } 4027 4028 rc = nxt_unit_create_port(ctx, &new_port_id, &fd); 4029 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4030 free(new_ctx); 4031 4032 return NULL; 4033 } 4034 4035 rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd); 4036 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4037 lib->callbacks.remove_port(ctx, &new_port_id); 4038 4039 close(fd); 4040 4041 free(new_ctx); 4042 4043 return NULL; 4044 } 4045 4046 close(fd); 4047 4048 rc = nxt_unit_ctx_init(lib, new_ctx, data); 4049 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4050 lib->callbacks.remove_port(ctx, &new_port_id); 4051 4052 free(new_ctx); 4053 4054 return NULL; 4055 } 4056 4057 new_ctx->read_port_id = new_port_id; 4058 4059 return &new_ctx->ctx; 4060 } 4061 4062 4063 void 4064 nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) 4065 { 4066 nxt_unit_impl_t *lib; 4067 nxt_unit_ctx_impl_t *ctx_impl; 4068 nxt_unit_mmap_buf_t *mmap_buf; 4069 nxt_unit_request_info_impl_t *req_impl; 4070 nxt_unit_websocket_frame_impl_t *ws_impl; 4071 4072 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4073 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4074 4075 nxt_queue_each(req_impl, &ctx_impl->active_req, 4076 nxt_unit_request_info_impl_t, link) 4077 { 4078 nxt_unit_req_warn(&req_impl->req, "active request on ctx free"); 4079 4080 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR); 4081 4082 } nxt_queue_loop; 4083 4084 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]); 4085 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]); 4086 4087 while (ctx_impl->free_buf != NULL) { 4088 mmap_buf = ctx_impl->free_buf; 4089 nxt_unit_mmap_buf_unlink(mmap_buf); 4090 free(mmap_buf); 4091 } 4092 4093 nxt_queue_each(req_impl, &ctx_impl->free_req, 4094 nxt_unit_request_info_impl_t, link) 4095 { 4096 nxt_unit_request_info_free(req_impl); 4097 4098 } nxt_queue_loop; 4099 4100 nxt_queue_each(ws_impl, &ctx_impl->free_ws, 4101 nxt_unit_websocket_frame_impl_t, link) 4102 { 4103 nxt_unit_websocket_frame_free(ws_impl); 4104 4105 } nxt_queue_loop; 4106 4107 pthread_mutex_destroy(&ctx_impl->mutex); 4108 4109 nxt_queue_remove(&ctx_impl->link); 4110 4111 if (ctx_impl != &lib->main_ctx) { 4112 free(ctx_impl); 4113 } 4114 } 4115 4116 4117 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */ 4118 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET) 4119 #define NXT_UNIX_SOCKET SOCK_SEQPACKET 4120 #else 4121 #define NXT_UNIX_SOCKET SOCK_DGRAM 4122 #endif 4123 4124 4125 void 4126 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) 4127 { 4128 nxt_unit_port_hash_id_t port_hash_id; 4129 4130 port_hash_id.pid = pid; 4131 port_hash_id.id = id; 4132 4133 port_id->pid = pid; 4134 port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id)); 4135 port_id->id = id; 4136 } 4137 4138 4139 int 4140 nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, 4141 nxt_unit_port_id_t *port_id) 4142 { 4143 int rc, fd; 4144 nxt_unit_impl_t *lib; 4145 nxt_unit_port_id_t new_port_id; 4146 4147 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4148 4149 rc = nxt_unit_create_port(ctx, &new_port_id, &fd); 4150 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4151 return rc; 4152 } 4153 4154 rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd); 4155 4156 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 4157 *port_id = new_port_id; 4158 4159 } else { 4160 lib->callbacks.remove_port(ctx, &new_port_id); 4161 } 4162 4163 close(fd); 4164 4165 return rc; 4166 } 4167 4168 4169 static int 4170 nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) 4171 { 4172 int rc, port_sockets[2]; 4173 nxt_unit_impl_t *lib; 4174 nxt_unit_port_t new_port; 4175 nxt_unit_process_t *process; 4176 4177 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4178 4179 rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets); 4180 if (nxt_slow_path(rc != 0)) { 4181 nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", 4182 strerror(errno), errno); 4183 4184 return NXT_UNIT_ERROR; 4185 } 4186 4187 nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", 4188 port_sockets[0], port_sockets[1]); 4189 4190 pthread_mutex_lock(&lib->mutex); 4191 4192 process = nxt_unit_process_get(ctx, lib->pid); 4193 if (nxt_slow_path(process == NULL)) { 4194 pthread_mutex_unlock(&lib->mutex); 4195 4196 close(port_sockets[0]); 4197 close(port_sockets[1]); 4198 4199 return NXT_UNIT_ERROR; 4200 } 4201 4202 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); 4203 4204 new_port.in_fd = port_sockets[0]; 4205 new_port.out_fd = -1; 4206 new_port.data = NULL; 4207 4208 pthread_mutex_unlock(&lib->mutex); 4209 4210 nxt_unit_process_use(ctx, process, -1); 4211 4212 rc = lib->callbacks.add_port(ctx, &new_port); 4213 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4214 nxt_unit_warn(ctx, "create_port: add_port() failed"); 4215 4216 close(port_sockets[0]); 4217 close(port_sockets[1]); 4218 4219 return rc; 4220 } 4221 4222 *port_id = new_port.id; 4223 *fd = port_sockets[1]; 4224 4225 return rc; 4226 } 4227 4228 4229 static int 4230 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, 4231 nxt_unit_port_id_t *new_port, int fd) 4232 { 4233 ssize_t res; 4234 nxt_unit_impl_t *lib; 4235 4236 struct { 4237 nxt_port_msg_t msg; 4238 nxt_port_msg_new_port_t new_port; 4239 } m; 4240 4241 union { 4242 struct cmsghdr cm; 4243 char space[CMSG_SPACE(sizeof(int))]; 4244 } cmsg; 4245 4246 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4247 4248 m.msg.stream = 0; 4249 m.msg.pid = lib->pid; 4250 m.msg.reply_port = 0; 4251 m.msg.type = _NXT_PORT_MSG_NEW_PORT; 4252 m.msg.last = 0; 4253 m.msg.mmap = 0; 4254 m.msg.nf = 0; 4255 m.msg.mf = 0; 4256 m.msg.tracking = 0; 4257 4258 m.new_port.id = new_port->id; 4259 m.new_port.pid = new_port->pid; 4260 m.new_port.type = NXT_PROCESS_WORKER; 4261 m.new_port.max_size = 16 * 1024; 4262 m.new_port.max_share = 64 * 1024; 4263 4264 memset(&cmsg, 0, sizeof(cmsg)); 4265 4266 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 4267 cmsg.cm.cmsg_level = SOL_SOCKET; 4268 cmsg.cm.cmsg_type = SCM_RIGHTS; 4269 4270 /* 4271 * memcpy() is used instead of simple 4272 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 4273 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 4274 * dereferencing type-punned pointer will break strict-aliasing rules 4275 * 4276 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 4277 * in the same simple assignment as in the code above. 4278 */ 4279 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 4280 4281 res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m), 4282 &cmsg, sizeof(cmsg)); 4283 4284 return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR; 4285 } 4286 4287 4288 int 4289 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 4290 { 4291 int rc; 4292 nxt_unit_impl_t *lib; 4293 nxt_unit_process_t *process; 4294 nxt_unit_port_impl_t *new_port; 4295 4296 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4297 4298 nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", 4299 port->id.pid, port->id.id, 4300 port->in_fd, port->out_fd); 4301 4302 pthread_mutex_lock(&lib->mutex); 4303 4304 process = nxt_unit_process_get(ctx, port->id.pid); 4305 if (nxt_slow_path(process == NULL)) { 4306 rc = NXT_UNIT_ERROR; 4307 goto unlock; 4308 } 4309 4310 if (port->id.id >= process->next_port_id) { 4311 process->next_port_id = port->id.id + 1; 4312 } 4313 4314 new_port = malloc(sizeof(nxt_unit_port_impl_t)); 4315 if (nxt_slow_path(new_port == NULL)) { 4316 rc = NXT_UNIT_ERROR; 4317 goto unlock; 4318 } 4319 4320 new_port->port = *port; 4321 4322 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port); 4323 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4324 goto unlock; 4325 } 4326 4327 nxt_queue_insert_tail(&process->ports, &new_port->link); 4328 4329 rc = NXT_UNIT_OK; 4330 4331 new_port->process = process; 4332 4333 unlock: 4334 4335 pthread_mutex_unlock(&lib->mutex); 4336 4337 if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) { 4338 nxt_unit_process_use(ctx, process, -1); 4339 } 4340 4341 return rc; 4342 } 4343 4344 4345 void 4346 nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 4347 { 4348 nxt_unit_find_remove_port(ctx, port_id, NULL); 4349 } 4350 4351 4352 void 4353 nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 4354 nxt_unit_port_t *r_port) 4355 { 4356 nxt_unit_impl_t *lib; 4357 nxt_unit_process_t *process; 4358 4359 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4360 4361 pthread_mutex_lock(&lib->mutex); 4362 4363 process = NULL; 4364 4365 nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process); 4366 4367 pthread_mutex_unlock(&lib->mutex); 4368 4369 if (nxt_slow_path(process != NULL)) { 4370 nxt_unit_process_use(ctx, process, -1); 4371 } 4372 } 4373 4374 4375 static void 4376 nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 4377 nxt_unit_port_t *r_port, nxt_unit_process_t **process) 4378 { 4379 nxt_unit_impl_t *lib; 4380 nxt_unit_port_impl_t *port; 4381 4382 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4383 4384 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); 4385 if (nxt_slow_path(port == NULL)) { 4386 nxt_unit_debug(ctx, "remove_port: port %d,%d not found", 4387 (int) port_id->pid, (int) port_id->id); 4388 4389 return; 4390 } 4391 4392 nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p", 4393 (int) port_id->pid, (int) port_id->id, 4394 port->port.in_fd, port->port.out_fd, port->port.data); 4395 4396 if (port->port.in_fd != -1) { 4397 close(port->port.in_fd); 4398 } 4399 4400 if (port->port.out_fd != -1) { 4401 close(port->port.out_fd); 4402 } 4403 4404 if (port->process != NULL) { 4405 nxt_queue_remove(&port->link); 4406 } 4407 4408 if (process != NULL) { 4409 *process = port->process; 4410 } 4411 4412 if (r_port != NULL) { 4413 *r_port = port->port; 4414 } 4415 4416 free(port); 4417 } 4418 4419 4420 void 4421 nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid) 4422 { 4423 nxt_unit_impl_t *lib; 4424 nxt_unit_process_t *process; 4425 4426 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4427 4428 pthread_mutex_lock(&lib->mutex); 4429 4430 process = nxt_unit_process_find(ctx, pid, 1); 4431 if (nxt_slow_path(process == NULL)) { 4432 nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid); 4433 4434 pthread_mutex_unlock(&lib->mutex); 4435 4436 return; 4437 } 4438 4439 nxt_unit_remove_process(ctx, process); 4440 } 4441 4442 4443 static void 4444 nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process) 4445 { 4446 nxt_queue_t ports; 4447 nxt_unit_impl_t *lib; 4448 nxt_unit_port_impl_t *port; 4449 4450 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4451 4452 nxt_queue_init(&ports); 4453 4454 nxt_queue_add(&ports, &process->ports); 4455 4456 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 4457 4458 nxt_unit_process_use(ctx, process, -1); 4459 port->process = NULL; 4460 4461 /* Shortcut for default callback. */ 4462 if (lib->callbacks.remove_port == nxt_unit_remove_port) { 4463 nxt_queue_remove(&port->link); 4464 4465 nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL); 4466 } 4467 4468 } nxt_queue_loop; 4469 4470 pthread_mutex_unlock(&lib->mutex); 4471 4472 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 4473 4474 nxt_queue_remove(&port->link); 4475 4476 lib->callbacks.remove_port(ctx, &port->port.id); 4477 4478 } nxt_queue_loop; 4479 4480 nxt_unit_process_use(ctx, process, -1); 4481 } 4482 4483 4484 void 4485 nxt_unit_quit(nxt_unit_ctx_t *ctx) 4486 { 4487 nxt_unit_impl_t *lib; 4488 4489 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4490 4491 lib->online = 0; 4492 } 4493 4494 4495 static ssize_t 4496 nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 4497 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 4498 { 4499 int fd; 4500 nxt_unit_impl_t *lib; 4501 nxt_unit_port_impl_t *port; 4502 4503 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4504 4505 pthread_mutex_lock(&lib->mutex); 4506 4507 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 4508 4509 if (nxt_fast_path(port != NULL)) { 4510 fd = port->port.out_fd; 4511 4512 } else { 4513 nxt_unit_warn(ctx, "port_send: port %d,%d not found", 4514 (int) port_id->pid, (int) port_id->id); 4515 fd = -1; 4516 } 4517 4518 pthread_mutex_unlock(&lib->mutex); 4519 4520 if (nxt_slow_path(fd == -1)) { 4521 if (port != NULL) { 4522 nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1", 4523 (int) port_id->pid, (int) port_id->id); 4524 } 4525 4526 return -1; 4527 } 4528 4529 nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d", 4530 (int) port_id->pid, (int) port_id->id, fd); 4531 4532 return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size); 4533 } 4534 4535 4536 ssize_t 4537 nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd, 4538 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 4539 { 4540 ssize_t res; 4541 struct iovec iov[1]; 4542 struct msghdr msg; 4543 4544 iov[0].iov_base = (void *) buf; 4545 iov[0].iov_len = buf_size; 4546 4547 msg.msg_name = NULL; 4548 msg.msg_namelen = 0; 4549 msg.msg_iov = iov; 4550 msg.msg_iovlen = 1; 4551 msg.msg_flags = 0; 4552 msg.msg_control = (void *) oob; 4553 msg.msg_controllen = oob_size; 4554 4555 res = sendmsg(fd, &msg, 0); 4556 4557 if (nxt_slow_path(res == -1)) { 4558 nxt_unit_warn(ctx, "port_send(%d, %d) failed: %s (%d)", 4559 fd, (int) buf_size, strerror(errno), errno); 4560 4561 } else { 4562 nxt_unit_debug(ctx, "port_send(%d, %d): %d", fd, (int) buf_size, 4563 (int) res); 4564 } 4565 4566 return res; 4567 } 4568 4569 4570 static ssize_t 4571 nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 4572 void *buf, size_t buf_size, void *oob, size_t oob_size) 4573 { 4574 int fd; 4575 nxt_unit_impl_t *lib; 4576 nxt_unit_ctx_impl_t *ctx_impl; 4577 nxt_unit_port_impl_t *port; 4578 4579 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4580 4581 pthread_mutex_lock(&lib->mutex); 4582 4583 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 4584 4585 if (nxt_fast_path(port != NULL)) { 4586 fd = port->port.in_fd; 4587 4588 } else { 4589 nxt_unit_debug(ctx, "port_recv: port %d,%d not found", 4590 (int) port_id->pid, (int) port_id->id); 4591 fd = -1; 4592 } 4593 4594 pthread_mutex_unlock(&lib->mutex); 4595 4596 if (nxt_slow_path(fd == -1)) { 4597 return -1; 4598 } 4599 4600 nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d", 4601 (int) port_id->pid, (int) port_id->id, fd); 4602 4603 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4604 4605 if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) { 4606 ctx_impl->read_port_fd = fd; 4607 } 4608 4609 return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size); 4610 } 4611 4612 4613 ssize_t 4614 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size, 4615 void *oob, size_t oob_size) 4616 { 4617 ssize_t res; 4618 struct iovec iov[1]; 4619 struct msghdr msg; 4620 4621 iov[0].iov_base = buf; 4622 iov[0].iov_len = buf_size; 4623 4624 msg.msg_name = NULL; 4625 msg.msg_namelen = 0; 4626 msg.msg_iov = iov; 4627 msg.msg_iovlen = 1; 4628 msg.msg_flags = 0; 4629 msg.msg_control = oob; 4630 msg.msg_controllen = oob_size; 4631 4632 res = recvmsg(fd, &msg, 0); 4633 4634 if (nxt_slow_path(res == -1)) { 4635 nxt_unit_warn(ctx, "port_recv(%d) failed: %s (%d)", 4636 fd, strerror(errno), errno); 4637 4638 } else { 4639 nxt_unit_debug(ctx, "port_recv(%d): %d", fd, (int) res); 4640 } 4641 4642 return res; 4643 } 4644 4645 4646 static nxt_int_t 4647 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 4648 { 4649 nxt_unit_port_t *port; 4650 nxt_unit_port_hash_id_t *port_id; 4651 4652 port = data; 4653 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start; 4654 4655 if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t) 4656 && port_id->pid == port->id.pid 4657 && port_id->id == port->id.id) 4658 { 4659 return NXT_OK; 4660 } 4661 4662 return NXT_DECLINED; 4663 } 4664 4665 4666 static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = { 4667 NXT_LVLHSH_DEFAULT, 4668 nxt_unit_port_hash_test, 4669 nxt_lvlhsh_alloc, 4670 nxt_lvlhsh_free, 4671 }; 4672 4673 4674 static inline void 4675 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq, 4676 nxt_unit_port_hash_id_t *port_hash_id, 4677 nxt_unit_port_id_t *port_id) 4678 { 4679 port_hash_id->pid = port_id->pid; 4680 port_hash_id->id = port_id->id; 4681 4682 if (nxt_fast_path(port_id->hash != 0)) { 4683 lhq->key_hash = port_id->hash; 4684 4685 } else { 4686 lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id)); 4687 4688 port_id->hash = lhq->key_hash; 4689 4690 nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X", 4691 (int) port_id->pid, (int) port_id->id, 4692 (int) port_id->hash); 4693 } 4694 4695 lhq->key.length = sizeof(nxt_unit_port_hash_id_t); 4696 lhq->key.start = (u_char *) port_hash_id; 4697 lhq->proto = &lvlhsh_ports_proto; 4698 lhq->pool = NULL; 4699 } 4700 4701 4702 static int 4703 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port) 4704 { 4705 nxt_int_t res; 4706 nxt_lvlhsh_query_t lhq; 4707 nxt_unit_port_hash_id_t port_hash_id; 4708 4709 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id); 4710 lhq.replace = 0; 4711 lhq.value = port; 4712 4713 res = nxt_lvlhsh_insert(port_hash, &lhq); 4714 4715 switch (res) { 4716 4717 case NXT_OK: 4718 return NXT_UNIT_OK; 4719 4720 default: 4721 return NXT_UNIT_ERROR; 4722 } 4723 } 4724 4725 4726 static nxt_unit_port_impl_t * 4727 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, 4728 int remove) 4729 { 4730 nxt_int_t res; 4731 nxt_lvlhsh_query_t lhq; 4732 nxt_unit_port_hash_id_t port_hash_id; 4733 4734 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id); 4735 4736 if (remove) { 4737 res = nxt_lvlhsh_delete(port_hash, &lhq); 4738 4739 } else { 4740 res = nxt_lvlhsh_find(port_hash, &lhq); 4741 } 4742 4743 switch (res) { 4744 4745 case NXT_OK: 4746 return lhq.value; 4747 4748 default: 4749 return NULL; 4750 } 4751 } 4752 4753 4754 static nxt_int_t 4755 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 4756 { 4757 return NXT_OK; 4758 } 4759 4760 4761 static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { 4762 NXT_LVLHSH_DEFAULT, 4763 nxt_unit_request_hash_test, 4764 nxt_lvlhsh_alloc, 4765 nxt_lvlhsh_free, 4766 }; 4767 4768 4769 static int 4770 nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, 4771 nxt_unit_request_info_impl_t *req_impl) 4772 { 4773 uint32_t *stream; 4774 nxt_int_t res; 4775 nxt_lvlhsh_query_t lhq; 4776 4777 stream = &req_impl->stream; 4778 4779 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); 4780 lhq.key.length = sizeof(*stream); 4781 lhq.key.start = (u_char *) stream; 4782 lhq.proto = &lvlhsh_requests_proto; 4783 lhq.pool = NULL; 4784 lhq.replace = 0; 4785 lhq.value = req_impl; 4786 4787 res = nxt_lvlhsh_insert(request_hash, &lhq); 4788 4789 switch (res) { 4790 4791 case NXT_OK: 4792 return NXT_UNIT_OK; 4793 4794 default: 4795 return NXT_UNIT_ERROR; 4796 } 4797 } 4798 4799 4800 static nxt_unit_request_info_impl_t * 4801 nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, 4802 int remove) 4803 { 4804 nxt_int_t res; 4805 nxt_lvlhsh_query_t lhq; 4806 4807 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); 4808 lhq.key.length = sizeof(stream); 4809 lhq.key.start = (u_char *) &stream; 4810 lhq.proto = &lvlhsh_requests_proto; 4811 lhq.pool = NULL; 4812 4813 if (remove) { 4814 res = nxt_lvlhsh_delete(request_hash, &lhq); 4815 4816 } else { 4817 res = nxt_lvlhsh_find(request_hash, &lhq); 4818 } 4819 4820 switch (res) { 4821 4822 case NXT_OK: 4823 return lhq.value; 4824 4825 default: 4826 return NULL; 4827 } 4828 } 4829 4830 4831 void 4832 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) 4833 { 4834 int log_fd, n; 4835 char msg[NXT_MAX_ERROR_STR], *p, *end; 4836 pid_t pid; 4837 va_list ap; 4838 nxt_unit_impl_t *lib; 4839 4840 if (nxt_fast_path(ctx != NULL)) { 4841 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4842 4843 pid = lib->pid; 4844 log_fd = lib->log_fd; 4845 4846 } else { 4847 pid = getpid(); 4848 log_fd = STDERR_FILENO; 4849 } 4850 4851 p = msg; 4852 end = p + sizeof(msg) - 1; 4853 4854 p = nxt_unit_snprint_prefix(p, end, pid, level); 4855 4856 va_start(ap, fmt); 4857 p += vsnprintf(p, end - p, fmt, ap); 4858 va_end(ap); 4859 4860 if (nxt_slow_path(p > end)) { 4861 memcpy(end - 5, "[...]", 5); 4862 p = end; 4863 } 4864 4865 *p++ = '\n'; 4866 4867 n = write(log_fd, msg, p - msg); 4868 if (nxt_slow_path(n < 0)) { 4869 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 4870 } 4871 } 4872 4873 4874 void 4875 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) 4876 { 4877 int log_fd, n; 4878 char msg[NXT_MAX_ERROR_STR], *p, *end; 4879 pid_t pid; 4880 va_list ap; 4881 nxt_unit_impl_t *lib; 4882 nxt_unit_request_info_impl_t *req_impl; 4883 4884 if (nxt_fast_path(req != NULL)) { 4885 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 4886 4887 pid = lib->pid; 4888 log_fd = lib->log_fd; 4889 4890 } else { 4891 pid = getpid(); 4892 log_fd = STDERR_FILENO; 4893 } 4894 4895 p = msg; 4896 end = p + sizeof(msg) - 1; 4897 4898 p = nxt_unit_snprint_prefix(p, end, pid, level); 4899 4900 if (nxt_fast_path(req != NULL)) { 4901 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 4902 4903 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream); 4904 } 4905 4906 va_start(ap, fmt); 4907 p += vsnprintf(p, end - p, fmt, ap); 4908 va_end(ap); 4909 4910 if (nxt_slow_path(p > end)) { 4911 memcpy(end - 5, "[...]", 5); 4912 p = end; 4913 } 4914 4915 *p++ = '\n'; 4916 4917 n = write(log_fd, msg, p - msg); 4918 if (nxt_slow_path(n < 0)) { 4919 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 4920 } 4921 } 4922 4923 4924 static const char * nxt_unit_log_levels[] = { 4925 "alert", 4926 "error", 4927 "warn", 4928 "notice", 4929 "info", 4930 "debug", 4931 }; 4932 4933 4934 static char * 4935 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level) 4936 { 4937 struct tm tm; 4938 struct timespec ts; 4939 4940 (void) clock_gettime(CLOCK_REALTIME, &ts); 4941 4942 #if (NXT_HAVE_LOCALTIME_R) 4943 (void) localtime_r(&ts.tv_sec, &tm); 4944 #else 4945 tm = *localtime(&ts.tv_sec); 4946 #endif 4947 4948 p += snprintf(p, end - p, 4949 "%4d/%02d/%02d %02d:%02d:%02d.%03d ", 4950 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 4951 tm.tm_hour, tm.tm_min, tm.tm_sec, 4952 (int) ts.tv_nsec / 1000000); 4953 4954 p += snprintf(p, end - p, 4955 "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level], 4956 (int) pid, 4957 (uint64_t) (uintptr_t) nxt_thread_get_tid()); 4958 4959 return p; 4960 } 4961 4962 4963 /* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */ 4964 4965 void * 4966 nxt_memalign(size_t alignment, size_t size) 4967 { 4968 void *p; 4969 nxt_err_t err; 4970 4971 err = posix_memalign(&p, alignment, size); 4972 4973 if (nxt_fast_path(err == 0)) { 4974 return p; 4975 } 4976 4977 return NULL; 4978 } 4979 4980 #if (NXT_DEBUG) 4981 4982 void 4983 nxt_free(void *p) 4984 { 4985 free(p); 4986 } 4987 4988 #endif 4989