1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include <stdlib.h> 7 8 #include "nxt_main.h" 9 #include "nxt_port_memory_int.h" 10 11 #include "nxt_unit.h" 12 #include "nxt_unit_request.h" 13 #include "nxt_unit_response.h" 14 #include "nxt_unit_websocket.h" 15 16 #include "nxt_websocket.h" 17 18 #if (NXT_HAVE_MEMFD_CREATE) 19 #include <linux/memfd.h> 20 #endif 21 22 typedef struct nxt_unit_impl_s nxt_unit_impl_t; 23 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 24 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 25 typedef struct nxt_unit_process_s nxt_unit_process_t; 26 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 27 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 28 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 29 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 30 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; 31 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; 32 33 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 34 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, 35 nxt_unit_ctx_impl_t *ctx_impl, void *data); 36 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 37 nxt_unit_mmap_buf_t *mmap_buf); 38 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 39 nxt_unit_mmap_buf_t *mmap_buf); 40 nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf); 41 static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 42 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream); 43 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 44 uint32_t stream); 45 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 46 nxt_unit_recv_msg_t *recv_msg); 47 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 48 nxt_unit_recv_msg_t *recv_msg); 49 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 50 nxt_unit_recv_msg_t *recv_msg); 51 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 52 nxt_unit_ctx_t *ctx); 53 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 54 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); 55 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( 56 nxt_unit_ctx_t *ctx); 57 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); 58 static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws); 59 static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, 60 nxt_unit_recv_msg_t *recv_msg); 61 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 62 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 63 static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, 64 nxt_unit_mmap_buf_t *mmap_buf, int last); 65 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); 66 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, 67 size_t size); 68 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 69 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, 70 nxt_chunk_id_t *c, int n); 71 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 72 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 73 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n); 74 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 75 int fd); 76 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, 77 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size, 78 nxt_unit_mmap_buf_t *mmap_buf); 79 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 80 81 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 82 static void nxt_unit_process_use(nxt_unit_ctx_t *ctx, 83 nxt_unit_process_t *process, int i); 84 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 85 static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, 86 nxt_unit_process_t *process, uint32_t id); 87 static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, 88 nxt_unit_recv_msg_t *recv_msg); 89 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, 90 nxt_unit_recv_msg_t *recv_msg); 91 static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, 92 uint32_t size); 93 94 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, 95 pid_t pid); 96 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, 97 pid_t pid, int remove); 98 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); 99 static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, 100 nxt_unit_port_id_t *port_id, int *fd); 101 102 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, 103 nxt_unit_port_id_t *new_port, int fd); 104 105 static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, 106 nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port, 107 nxt_unit_process_t **process); 108 static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx, 109 nxt_unit_process_t *process); 110 111 static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, 112 nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, 113 const void *oob, size_t oob_size); 114 static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, 115 nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, 116 void *oob, size_t oob_size); 117 118 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 119 nxt_unit_port_t *port); 120 static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 121 nxt_unit_port_id_t *port_id, int remove); 122 123 static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, 124 nxt_unit_request_info_impl_t *req_impl); 125 static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find( 126 nxt_lvlhsh_t *request_hash, uint32_t stream, int remove); 127 128 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); 129 130 131 struct nxt_unit_mmap_buf_s { 132 nxt_unit_buf_t buf; 133 134 nxt_unit_mmap_buf_t *next; 135 nxt_unit_mmap_buf_t **prev; 136 137 nxt_port_mmap_header_t *hdr; 138 // nxt_queue_link_t link; 139 nxt_unit_port_id_t port_id; 140 nxt_unit_request_info_t *req; 141 nxt_unit_ctx_impl_t *ctx_impl; 142 }; 143 144 145 struct nxt_unit_recv_msg_s { 146 uint32_t stream; 147 nxt_pid_t pid; 148 nxt_port_id_t reply_port; 149 150 uint8_t last; /* 1 bit */ 151 uint8_t mmap; /* 1 bit */ 152 153 void *start; 154 uint32_t size; 155 156 int fd; 157 nxt_unit_process_t *process; 158 159 nxt_unit_mmap_buf_t *incoming_buf; 160 }; 161 162 163 typedef enum { 164 NXT_UNIT_RS_START = 0, 165 NXT_UNIT_RS_RESPONSE_INIT, 166 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 167 NXT_UNIT_RS_RESPONSE_SENT, 168 NXT_UNIT_RS_RELEASED, 169 } nxt_unit_req_state_t; 170 171 172 struct nxt_unit_request_info_impl_s { 173 nxt_unit_request_info_t req; 174 175 uint32_t stream; 176 177 nxt_unit_process_t *process; 178 179 nxt_unit_mmap_buf_t *outgoing_buf; 180 nxt_unit_mmap_buf_t *incoming_buf; 181 182 nxt_unit_req_state_t state; 183 uint8_t websocket; 184 185 nxt_queue_link_t link; 186 187 char extra_data[]; 188 }; 189 190 191 struct nxt_unit_websocket_frame_impl_s { 192 nxt_unit_websocket_frame_t ws; 193 194 nxt_unit_mmap_buf_t *buf; 195 196 nxt_queue_link_t link; 197 198 nxt_unit_ctx_impl_t *ctx_impl; 199 200 void *retain_buf; 201 }; 202 203 204 struct nxt_unit_ctx_impl_s { 205 nxt_unit_ctx_t ctx; 206 207 pthread_mutex_t mutex; 208 209 nxt_unit_port_id_t read_port_id; 210 int read_port_fd; 211 212 nxt_queue_link_t link; 213 214 nxt_unit_mmap_buf_t *free_buf; 215 216 /* of nxt_unit_request_info_impl_t */ 217 nxt_queue_t free_req; 218 219 /* of nxt_unit_websocket_frame_impl_t */ 220 nxt_queue_t free_ws; 221 222 /* of nxt_unit_request_info_impl_t */ 223 nxt_queue_t active_req; 224 225 /* of nxt_unit_request_info_impl_t */ 226 nxt_lvlhsh_t requests; 227 228 nxt_unit_mmap_buf_t ctx_buf[2]; 229 230 nxt_unit_request_info_impl_t req; 231 }; 232 233 234 struct nxt_unit_impl_s { 235 nxt_unit_t unit; 236 nxt_unit_callbacks_t callbacks; 237 238 uint32_t request_data_size; 239 240 pthread_mutex_t mutex; 241 242 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ 243 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ 244 245 nxt_unit_port_id_t ready_port_id; 246 247 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ 248 249 pid_t pid; 250 int log_fd; 251 int online; 252 253 nxt_unit_ctx_impl_t main_ctx; 254 }; 255 256 257 struct nxt_unit_port_impl_s { 258 nxt_unit_port_t port; 259 260 nxt_queue_link_t link; 261 nxt_unit_process_t *process; 262 }; 263 264 265 struct nxt_unit_mmap_s { 266 nxt_port_mmap_header_t *hdr; 267 }; 268 269 270 struct nxt_unit_mmaps_s { 271 pthread_mutex_t mutex; 272 uint32_t size; 273 uint32_t cap; 274 nxt_unit_mmap_t *elts; 275 }; 276 277 278 struct nxt_unit_process_s { 279 pid_t pid; 280 281 nxt_queue_t ports; 282 283 nxt_unit_mmaps_t incoming; 284 nxt_unit_mmaps_t outgoing; 285 286 nxt_unit_impl_t *lib; 287 288 nxt_atomic_t use_count; 289 290 uint32_t next_port_id; 291 }; 292 293 294 /* Explicitly using 32 bit types to avoid possible alignment. */ 295 typedef struct { 296 int32_t pid; 297 uint32_t id; 298 } nxt_unit_port_hash_id_t; 299 300 301 nxt_unit_ctx_t * 302 nxt_unit_init(nxt_unit_init_t *init) 303 { 304 int rc; 305 uint32_t ready_stream; 306 nxt_unit_ctx_t *ctx; 307 nxt_unit_impl_t *lib; 308 nxt_unit_port_t ready_port, read_port; 309 310 lib = nxt_unit_create(init); 311 if (nxt_slow_path(lib == NULL)) { 312 return NULL; 313 } 314 315 if (init->ready_port.id.pid != 0 316 && init->ready_stream != 0 317 && init->read_port.id.pid != 0) 318 { 319 ready_port = init->ready_port; 320 ready_stream = init->ready_stream; 321 read_port = init->read_port; 322 lib->log_fd = init->log_fd; 323 324 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, 325 ready_port.id.id); 326 nxt_unit_port_id_init(&read_port.id, read_port.id.pid, 327 read_port.id.id); 328 } else { 329 rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd, 330 &ready_stream); 331 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 332 goto fail; 333 } 334 } 335 336 lib->pid = read_port.id.pid; 337 ctx = &lib->main_ctx.ctx; 338 339 rc = lib->callbacks.add_port(ctx, &ready_port); 340 if (rc != NXT_UNIT_OK) { 341 nxt_unit_alert(NULL, "failed to add ready_port"); 342 343 goto fail; 344 } 345 346 rc = lib->callbacks.add_port(ctx, &read_port); 347 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 348 nxt_unit_alert(NULL, "failed to add read_port"); 349 350 goto fail; 351 } 352 353 lib->main_ctx.read_port_id = read_port.id; 354 lib->ready_port_id = ready_port.id; 355 356 rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream); 357 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 358 nxt_unit_alert(NULL, "failed to send READY message"); 359 360 goto fail; 361 } 362 363 return ctx; 364 365 fail: 366 367 free(lib); 368 369 return NULL; 370 } 371 372 373 static nxt_unit_impl_t * 374 nxt_unit_create(nxt_unit_init_t *init) 375 { 376 int rc; 377 nxt_unit_impl_t *lib; 378 nxt_unit_callbacks_t *cb; 379 380 lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size); 381 if (nxt_slow_path(lib == NULL)) { 382 nxt_unit_alert(NULL, "failed to allocate unit struct"); 383 384 return NULL; 385 } 386 387 rc = pthread_mutex_init(&lib->mutex, NULL); 388 if (nxt_slow_path(rc != 0)) { 389 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 390 391 goto fail; 392 } 393 394 lib->unit.data = init->data; 395 lib->callbacks = init->callbacks; 396 397 lib->request_data_size = init->request_data_size; 398 399 lib->processes.slot = NULL; 400 lib->ports.slot = NULL; 401 402 lib->log_fd = STDERR_FILENO; 403 lib->online = 1; 404 405 nxt_queue_init(&lib->contexts); 406 407 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); 408 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 409 goto fail; 410 } 411 412 cb = &lib->callbacks; 413 414 if (cb->request_handler == NULL) { 415 nxt_unit_alert(NULL, "request_handler is NULL"); 416 417 goto fail; 418 } 419 420 if (cb->add_port == NULL) { 421 cb->add_port = nxt_unit_add_port; 422 } 423 424 if (cb->remove_port == NULL) { 425 cb->remove_port = nxt_unit_remove_port; 426 } 427 428 if (cb->remove_pid == NULL) { 429 cb->remove_pid = nxt_unit_remove_pid; 430 } 431 432 if (cb->quit == NULL) { 433 cb->quit = nxt_unit_quit; 434 } 435 436 if (cb->port_send == NULL) { 437 cb->port_send = nxt_unit_port_send_default; 438 } 439 440 if (cb->port_recv == NULL) { 441 cb->port_recv = nxt_unit_port_recv_default; 442 } 443 444 return lib; 445 446 fail: 447 448 free(lib); 449 450 return NULL; 451 } 452 453 454 static int 455 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 456 void *data) 457 { 458 int rc; 459 460 ctx_impl->ctx.data = data; 461 ctx_impl->ctx.unit = &lib->unit; 462 463 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 464 465 rc = pthread_mutex_init(&ctx_impl->mutex, NULL); 466 if (nxt_slow_path(rc != 0)) { 467 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 468 469 return NXT_UNIT_ERROR; 470 } 471 472 nxt_queue_init(&ctx_impl->free_req); 473 nxt_queue_init(&ctx_impl->free_ws); 474 nxt_queue_init(&ctx_impl->active_req); 475 476 ctx_impl->free_buf = NULL; 477 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 478 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 479 480 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 481 482 ctx_impl->req.req.ctx = &ctx_impl->ctx; 483 ctx_impl->req.req.unit = &lib->unit; 484 485 ctx_impl->read_port_fd = -1; 486 ctx_impl->requests.slot = 0; 487 488 return NXT_UNIT_OK; 489 } 490 491 492 nxt_inline void 493 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 494 nxt_unit_mmap_buf_t *mmap_buf) 495 { 496 mmap_buf->next = *head; 497 498 if (mmap_buf->next != NULL) { 499 mmap_buf->next->prev = &mmap_buf->next; 500 } 501 502 *head = mmap_buf; 503 mmap_buf->prev = head; 504 } 505 506 507 nxt_inline void 508 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 509 nxt_unit_mmap_buf_t *mmap_buf) 510 { 511 while (*prev != NULL) { 512 prev = &(*prev)->next; 513 } 514 515 nxt_unit_mmap_buf_insert(prev, mmap_buf); 516 } 517 518 519 nxt_inline void 520 nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf) 521 { 522 nxt_unit_mmap_buf_t **prev; 523 524 prev = mmap_buf->prev; 525 526 if (mmap_buf->next != NULL) { 527 mmap_buf->next->prev = prev; 528 } 529 530 if (prev != NULL) { 531 *prev = mmap_buf->next; 532 } 533 } 534 535 536 static int 537 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, 538 int *log_fd, uint32_t *stream) 539 { 540 int rc; 541 int ready_fd, read_fd; 542 char *unit_init, *version_end; 543 long version_length; 544 int64_t ready_pid, read_pid; 545 uint32_t ready_stream, ready_id, read_id; 546 547 unit_init = getenv(NXT_UNIT_INIT_ENV); 548 if (nxt_slow_path(unit_init == NULL)) { 549 nxt_unit_alert(NULL, "%s is not in the current environment", 550 NXT_UNIT_INIT_ENV); 551 552 return NXT_UNIT_ERROR; 553 } 554 555 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); 556 557 version_length = nxt_length(NXT_VERSION); 558 559 version_end = strchr(unit_init, ';'); 560 if (version_end == NULL 561 || version_end - unit_init != version_length 562 || memcmp(unit_init, NXT_VERSION, version_length) != 0) 563 { 564 nxt_unit_alert(NULL, "version check error"); 565 566 return NXT_UNIT_ERROR; 567 } 568 569 rc = sscanf(version_end + 1, 570 "%"PRIu32";" 571 "%"PRId64",%"PRIu32",%d;" 572 "%"PRId64",%"PRIu32",%d;" 573 "%d", 574 &ready_stream, 575 &ready_pid, &ready_id, &ready_fd, 576 &read_pid, &read_id, &read_fd, 577 log_fd); 578 579 if (nxt_slow_path(rc != 8)) { 580 nxt_unit_alert(NULL, "failed to scan variables"); 581 582 return NXT_UNIT_ERROR; 583 } 584 585 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); 586 587 ready_port->in_fd = -1; 588 ready_port->out_fd = ready_fd; 589 ready_port->data = NULL; 590 591 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); 592 593 read_port->in_fd = read_fd; 594 read_port->out_fd = -1; 595 read_port->data = NULL; 596 597 *stream = ready_stream; 598 599 return NXT_UNIT_OK; 600 } 601 602 603 static int 604 nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 605 uint32_t stream) 606 { 607 ssize_t res; 608 nxt_port_msg_t msg; 609 nxt_unit_impl_t *lib; 610 611 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 612 613 msg.stream = stream; 614 msg.pid = lib->pid; 615 msg.reply_port = 0; 616 msg.type = _NXT_PORT_MSG_PROCESS_READY; 617 msg.last = 1; 618 msg.mmap = 0; 619 msg.nf = 0; 620 msg.mf = 0; 621 msg.tracking = 0; 622 623 res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); 624 if (res != sizeof(msg)) { 625 return NXT_UNIT_ERROR; 626 } 627 628 return NXT_UNIT_OK; 629 } 630 631 632 int 633 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 634 void *buf, size_t buf_size, void *oob, size_t oob_size) 635 { 636 int rc; 637 pid_t pid; 638 struct cmsghdr *cm; 639 nxt_port_msg_t *port_msg; 640 nxt_unit_impl_t *lib; 641 nxt_unit_recv_msg_t recv_msg; 642 nxt_unit_callbacks_t *cb; 643 644 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 645 646 rc = NXT_UNIT_ERROR; 647 recv_msg.fd = -1; 648 recv_msg.process = NULL; 649 port_msg = buf; 650 cm = oob; 651 652 if (oob_size >= CMSG_SPACE(sizeof(int)) 653 && cm->cmsg_len == CMSG_LEN(sizeof(int)) 654 && cm->cmsg_level == SOL_SOCKET 655 && cm->cmsg_type == SCM_RIGHTS) 656 { 657 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); 658 } 659 660 recv_msg.incoming_buf = NULL; 661 662 if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { 663 nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); 664 goto fail; 665 } 666 667 recv_msg.stream = port_msg->stream; 668 recv_msg.pid = port_msg->pid; 669 recv_msg.reply_port = port_msg->reply_port; 670 recv_msg.last = port_msg->last; 671 recv_msg.mmap = port_msg->mmap; 672 673 recv_msg.start = port_msg + 1; 674 recv_msg.size = buf_size - sizeof(nxt_port_msg_t); 675 676 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 677 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", 678 port_msg->stream, (int) port_msg->type); 679 goto fail; 680 } 681 682 if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) { 683 rc = NXT_UNIT_OK; 684 685 goto fail; 686 } 687 688 /* Fragmentation is unsupported. */ 689 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 690 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", 691 port_msg->stream, (int) port_msg->type); 692 goto fail; 693 } 694 695 if (port_msg->mmap) { 696 if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) { 697 goto fail; 698 } 699 } 700 701 cb = &lib->callbacks; 702 703 switch (port_msg->type) { 704 705 case _NXT_PORT_MSG_QUIT: 706 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); 707 708 cb->quit(ctx); 709 rc = NXT_UNIT_OK; 710 break; 711 712 case _NXT_PORT_MSG_NEW_PORT: 713 rc = nxt_unit_process_new_port(ctx, &recv_msg); 714 break; 715 716 case _NXT_PORT_MSG_CHANGE_FILE: 717 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 718 port_msg->stream, recv_msg.fd); 719 break; 720 721 case _NXT_PORT_MSG_MMAP: 722 if (nxt_slow_path(recv_msg.fd < 0)) { 723 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", 724 port_msg->stream, recv_msg.fd); 725 726 goto fail; 727 } 728 729 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd); 730 break; 731 732 case _NXT_PORT_MSG_REQ_HEADERS: 733 rc = nxt_unit_process_req_headers(ctx, &recv_msg); 734 break; 735 736 case _NXT_PORT_MSG_WEBSOCKET: 737 rc = nxt_unit_process_websocket(ctx, &recv_msg); 738 break; 739 740 case _NXT_PORT_MSG_REMOVE_PID: 741 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 742 nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size " 743 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 744 (int) sizeof(pid)); 745 746 goto fail; 747 } 748 749 memcpy(&pid, recv_msg.start, sizeof(pid)); 750 751 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 752 port_msg->stream, (int) pid); 753 754 cb->remove_pid(ctx, pid); 755 756 rc = NXT_UNIT_OK; 757 break; 758 759 default: 760 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", 761 port_msg->stream, (int) port_msg->type); 762 763 goto fail; 764 } 765 766 fail: 767 768 if (recv_msg.fd != -1) { 769 close(recv_msg.fd); 770 } 771 772 while (recv_msg.incoming_buf != NULL) { 773 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 774 } 775 776 if (recv_msg.process != NULL) { 777 nxt_unit_process_use(ctx, recv_msg.process, -1); 778 } 779 780 return rc; 781 } 782 783 784 static int 785 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 786 { 787 int nb; 788 nxt_unit_impl_t *lib; 789 nxt_unit_port_t new_port; 790 nxt_port_msg_new_port_t *new_port_msg; 791 792 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 793 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 794 "invalid message size (%d)", 795 recv_msg->stream, (int) recv_msg->size); 796 797 return NXT_UNIT_ERROR; 798 } 799 800 if (nxt_slow_path(recv_msg->fd < 0)) { 801 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", 802 recv_msg->stream, recv_msg->fd); 803 804 return NXT_UNIT_ERROR; 805 } 806 807 new_port_msg = recv_msg->start; 808 809 nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", 810 recv_msg->stream, (int) new_port_msg->pid, 811 (int) new_port_msg->id, recv_msg->fd); 812 813 nb = 0; 814 815 if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { 816 nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " 817 "failed: %s (%d)", recv_msg->fd, strerror(errno), errno); 818 819 return NXT_UNIT_ERROR; 820 } 821 822 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 823 new_port_msg->id); 824 825 new_port.in_fd = -1; 826 new_port.out_fd = recv_msg->fd; 827 new_port.data = NULL; 828 829 recv_msg->fd = -1; 830 831 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 832 833 return lib->callbacks.add_port(ctx, &new_port); 834 } 835 836 837 static int 838 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 839 { 840 nxt_unit_impl_t *lib; 841 nxt_unit_request_t *r; 842 nxt_unit_mmap_buf_t *b; 843 nxt_unit_request_info_t *req; 844 nxt_unit_request_info_impl_t *req_impl; 845 846 if (nxt_slow_path(recv_msg->mmap == 0)) { 847 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 848 recv_msg->stream); 849 850 return NXT_UNIT_ERROR; 851 } 852 853 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { 854 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 855 "%d expected", recv_msg->stream, (int) recv_msg->size, 856 (int) sizeof(nxt_unit_request_t)); 857 858 return NXT_UNIT_ERROR; 859 } 860 861 req_impl = nxt_unit_request_info_get(ctx); 862 if (nxt_slow_path(req_impl == NULL)) { 863 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 864 recv_msg->stream); 865 866 return NXT_UNIT_ERROR; 867 } 868 869 req = &req_impl->req; 870 871 nxt_unit_port_id_init(&req->response_port, recv_msg->pid, 872 recv_msg->reply_port); 873 874 req->request = recv_msg->start; 875 876 b = recv_msg->incoming_buf; 877 878 req->request_buf = &b->buf; 879 req->response = NULL; 880 req->response_buf = NULL; 881 882 r = req->request; 883 884 req->content_length = r->content_length; 885 886 req->content_buf = req->request_buf; 887 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); 888 889 /* "Move" process reference to req_impl. */ 890 req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg); 891 if (nxt_slow_path(req_impl->process == NULL)) { 892 return NXT_UNIT_ERROR; 893 } 894 895 recv_msg->process = NULL; 896 897 req_impl->stream = recv_msg->stream; 898 899 req_impl->outgoing_buf = NULL; 900 901 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 902 b->req = req; 903 } 904 905 /* "Move" incoming buffer list to req_impl. */ 906 req_impl->incoming_buf = recv_msg->incoming_buf; 907 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 908 recv_msg->incoming_buf = NULL; 909 910 req->response_max_fields = 0; 911 req_impl->state = NXT_UNIT_RS_START; 912 req_impl->websocket = 0; 913 914 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 915 (int) r->method_length, nxt_unit_sptr_get(&r->method), 916 (int) r->target_length, nxt_unit_sptr_get(&r->target), 917 (int) r->content_length); 918 919 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 920 921 lib->callbacks.request_handler(req); 922 923 return NXT_UNIT_OK; 924 } 925 926 927 static int 928 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 929 { 930 size_t hsize; 931 nxt_unit_impl_t *lib; 932 nxt_unit_mmap_buf_t *b; 933 nxt_unit_ctx_impl_t *ctx_impl; 934 nxt_unit_callbacks_t *cb; 935 nxt_unit_request_info_t *req; 936 nxt_unit_request_info_impl_t *req_impl; 937 nxt_unit_websocket_frame_impl_t *ws_impl; 938 939 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 940 941 req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream, 942 recv_msg->last); 943 if (req_impl == NULL) { 944 return NXT_UNIT_OK; 945 } 946 947 req = &req_impl->req; 948 949 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 950 cb = &lib->callbacks; 951 952 if (cb->websocket_handler && recv_msg->size >= 2) { 953 ws_impl = nxt_unit_websocket_frame_get(ctx); 954 if (nxt_slow_path(ws_impl == NULL)) { 955 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", 956 req_impl->stream); 957 958 return NXT_UNIT_ERROR; 959 } 960 961 ws_impl->ws.req = req; 962 963 ws_impl->buf = NULL; 964 ws_impl->retain_buf = NULL; 965 966 if (recv_msg->mmap) { 967 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 968 b->req = req; 969 } 970 971 /* "Move" incoming buffer list to ws_impl. */ 972 ws_impl->buf = recv_msg->incoming_buf; 973 ws_impl->buf->prev = &ws_impl->buf; 974 recv_msg->incoming_buf = NULL; 975 976 b = ws_impl->buf; 977 978 } else { 979 b = nxt_unit_mmap_buf_get(ctx); 980 if (nxt_slow_path(b == NULL)) { 981 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", 982 req_impl->stream); 983 984 nxt_unit_websocket_frame_release(&ws_impl->ws); 985 986 return NXT_UNIT_ERROR; 987 } 988 989 b->hdr = NULL; 990 b->req = req; 991 b->buf.start = recv_msg->start; 992 b->buf.free = b->buf.start; 993 b->buf.end = b->buf.start + recv_msg->size; 994 995 nxt_unit_mmap_buf_insert(&ws_impl->buf, b); 996 } 997 998 ws_impl->ws.header = (void *) b->buf.start; 999 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( 1000 ws_impl->ws.header); 1001 1002 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); 1003 1004 if (ws_impl->ws.header->mask) { 1005 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; 1006 1007 } else { 1008 ws_impl->ws.mask = NULL; 1009 } 1010 1011 b->buf.free += hsize; 1012 1013 ws_impl->ws.content_buf = &b->buf; 1014 ws_impl->ws.content_length = ws_impl->ws.payload_len; 1015 1016 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " 1017 "payload_len=%"PRIu64, 1018 ws_impl->ws.header->opcode, 1019 ws_impl->ws.payload_len); 1020 1021 cb->websocket_handler(&ws_impl->ws); 1022 } 1023 1024 if (recv_msg->last) { 1025 req_impl->websocket = 0; 1026 1027 if (cb->close_handler) { 1028 nxt_unit_req_debug(req, "close_handler"); 1029 1030 cb->close_handler(req); 1031 1032 } else { 1033 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1034 } 1035 } 1036 1037 return NXT_UNIT_OK; 1038 } 1039 1040 1041 static nxt_unit_request_info_impl_t * 1042 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 1043 { 1044 nxt_unit_impl_t *lib; 1045 nxt_queue_link_t *lnk; 1046 nxt_unit_ctx_impl_t *ctx_impl; 1047 nxt_unit_request_info_impl_t *req_impl; 1048 1049 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1050 1051 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1052 1053 pthread_mutex_lock(&ctx_impl->mutex); 1054 1055 if (nxt_queue_is_empty(&ctx_impl->free_req)) { 1056 pthread_mutex_unlock(&ctx_impl->mutex); 1057 1058 req_impl = malloc(sizeof(nxt_unit_request_info_impl_t) 1059 + lib->request_data_size); 1060 if (nxt_slow_path(req_impl == NULL)) { 1061 return NULL; 1062 } 1063 1064 req_impl->req.unit = ctx->unit; 1065 req_impl->req.ctx = ctx; 1066 1067 pthread_mutex_lock(&ctx_impl->mutex); 1068 1069 } else { 1070 lnk = nxt_queue_first(&ctx_impl->free_req); 1071 nxt_queue_remove(lnk); 1072 1073 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link); 1074 } 1075 1076 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); 1077 1078 pthread_mutex_unlock(&ctx_impl->mutex); 1079 1080 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; 1081 1082 return req_impl; 1083 } 1084 1085 1086 static void 1087 nxt_unit_request_info_release(nxt_unit_request_info_t *req) 1088 { 1089 nxt_unit_ctx_impl_t *ctx_impl; 1090 nxt_unit_request_info_impl_t *req_impl; 1091 1092 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1093 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1094 1095 req->response = NULL; 1096 req->response_buf = NULL; 1097 1098 if (req_impl->websocket) { 1099 nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); 1100 1101 req_impl->websocket = 0; 1102 } 1103 1104 while (req_impl->outgoing_buf != NULL) { 1105 nxt_unit_mmap_buf_free(req_impl->outgoing_buf); 1106 } 1107 1108 while (req_impl->incoming_buf != NULL) { 1109 nxt_unit_mmap_buf_free(req_impl->incoming_buf); 1110 } 1111 1112 /* 1113 * Process release should go after buffers release to guarantee mmap 1114 * existence. 1115 */ 1116 if (req_impl->process != NULL) { 1117 nxt_unit_process_use(req->ctx, req_impl->process, -1); 1118 1119 req_impl->process = NULL; 1120 } 1121 1122 pthread_mutex_lock(&ctx_impl->mutex); 1123 1124 nxt_queue_remove(&req_impl->link); 1125 1126 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); 1127 1128 pthread_mutex_unlock(&ctx_impl->mutex); 1129 1130 req_impl->state = NXT_UNIT_RS_RELEASED; 1131 } 1132 1133 1134 static void 1135 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 1136 { 1137 nxt_unit_ctx_impl_t *ctx_impl; 1138 1139 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 1140 1141 nxt_queue_remove(&req_impl->link); 1142 1143 if (req_impl != &ctx_impl->req) { 1144 free(req_impl); 1145 } 1146 } 1147 1148 1149 static nxt_unit_websocket_frame_impl_t * 1150 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) 1151 { 1152 nxt_queue_link_t *lnk; 1153 nxt_unit_ctx_impl_t *ctx_impl; 1154 nxt_unit_websocket_frame_impl_t *ws_impl; 1155 1156 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1157 1158 pthread_mutex_lock(&ctx_impl->mutex); 1159 1160 if (nxt_queue_is_empty(&ctx_impl->free_ws)) { 1161 pthread_mutex_unlock(&ctx_impl->mutex); 1162 1163 ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t)); 1164 if (nxt_slow_path(ws_impl == NULL)) { 1165 return NULL; 1166 } 1167 1168 } else { 1169 lnk = nxt_queue_first(&ctx_impl->free_ws); 1170 nxt_queue_remove(lnk); 1171 1172 pthread_mutex_unlock(&ctx_impl->mutex); 1173 1174 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); 1175 } 1176 1177 ws_impl->ctx_impl = ctx_impl; 1178 1179 return ws_impl; 1180 } 1181 1182 1183 static void 1184 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) 1185 { 1186 nxt_unit_websocket_frame_impl_t *ws_impl; 1187 1188 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 1189 1190 while (ws_impl->buf != NULL) { 1191 nxt_unit_mmap_buf_free(ws_impl->buf); 1192 } 1193 1194 ws->req = NULL; 1195 1196 if (ws_impl->retain_buf != NULL) { 1197 free(ws_impl->retain_buf); 1198 1199 ws_impl->retain_buf = NULL; 1200 } 1201 1202 pthread_mutex_lock(&ws_impl->ctx_impl->mutex); 1203 1204 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); 1205 1206 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex); 1207 } 1208 1209 1210 static void 1211 nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl) 1212 { 1213 nxt_queue_remove(&ws_impl->link); 1214 1215 free(ws_impl); 1216 } 1217 1218 1219 uint16_t 1220 nxt_unit_field_hash(const char *name, size_t name_length) 1221 { 1222 u_char ch; 1223 uint32_t hash; 1224 const char *p, *end; 1225 1226 hash = 159406; /* Magic value copied from nxt_http_parse.c */ 1227 end = name + name_length; 1228 1229 for (p = name; p < end; p++) { 1230 ch = *p; 1231 hash = (hash << 4) + hash + nxt_lowcase(ch); 1232 } 1233 1234 hash = (hash >> 16) ^ hash; 1235 1236 return hash; 1237 } 1238 1239 1240 void 1241 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) 1242 { 1243 uint32_t i, j; 1244 nxt_unit_field_t *fields, f; 1245 nxt_unit_request_t *r; 1246 1247 nxt_unit_req_debug(req, "group_dup_fields"); 1248 1249 r = req->request; 1250 fields = r->fields; 1251 1252 for (i = 0; i < r->fields_count; i++) { 1253 1254 switch (fields[i].hash) { 1255 case NXT_UNIT_HASH_CONTENT_LENGTH: 1256 r->content_length_field = i; 1257 break; 1258 1259 case NXT_UNIT_HASH_CONTENT_TYPE: 1260 r->content_type_field = i; 1261 break; 1262 1263 case NXT_UNIT_HASH_COOKIE: 1264 r->cookie_field = i; 1265 break; 1266 }; 1267 1268 for (j = i + 1; j < r->fields_count; j++) { 1269 if (fields[i].hash != fields[j].hash) { 1270 continue; 1271 } 1272 1273 if (j == i + 1) { 1274 continue; 1275 } 1276 1277 f = fields[j]; 1278 f.name.offset += (j - (i + 1)) * sizeof(f); 1279 f.value.offset += (j - (i + 1)) * sizeof(f); 1280 1281 while (j > i + 1) { 1282 fields[j] = fields[j - 1]; 1283 fields[j].name.offset -= sizeof(f); 1284 fields[j].value.offset -= sizeof(f); 1285 j--; 1286 } 1287 1288 fields[j] = f; 1289 1290 i++; 1291 } 1292 } 1293 } 1294 1295 1296 int 1297 nxt_unit_response_init(nxt_unit_request_info_t *req, 1298 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size) 1299 { 1300 uint32_t buf_size; 1301 nxt_unit_buf_t *buf; 1302 nxt_unit_request_info_impl_t *req_impl; 1303 1304 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1305 1306 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1307 nxt_unit_req_warn(req, "init: response already sent"); 1308 1309 return NXT_UNIT_ERROR; 1310 } 1311 1312 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status, 1313 (int) max_fields_count, (int) max_fields_size); 1314 1315 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) { 1316 nxt_unit_req_debug(req, "duplicate response init"); 1317 } 1318 1319 buf_size = sizeof(nxt_unit_response_t) 1320 + max_fields_count * sizeof(nxt_unit_field_t) 1321 + max_fields_size; 1322 1323 if (nxt_slow_path(req->response_buf != NULL)) { 1324 buf = req->response_buf; 1325 1326 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) { 1327 goto init_response; 1328 } 1329 1330 nxt_unit_buf_free(buf); 1331 1332 req->response_buf = NULL; 1333 req->response = NULL; 1334 req->response_max_fields = 0; 1335 1336 req_impl->state = NXT_UNIT_RS_START; 1337 } 1338 1339 buf = nxt_unit_response_buf_alloc(req, buf_size); 1340 if (nxt_slow_path(buf == NULL)) { 1341 return NXT_UNIT_ERROR; 1342 } 1343 1344 init_response: 1345 1346 memset(buf->start, 0, sizeof(nxt_unit_response_t)); 1347 1348 req->response_buf = buf; 1349 1350 req->response = (nxt_unit_response_t *) buf->start; 1351 req->response->status = status; 1352 1353 buf->free = buf->start + sizeof(nxt_unit_response_t) 1354 + max_fields_count * sizeof(nxt_unit_field_t); 1355 1356 req->response_max_fields = max_fields_count; 1357 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT; 1358 1359 return NXT_UNIT_OK; 1360 } 1361 1362 1363 int 1364 nxt_unit_response_realloc(nxt_unit_request_info_t *req, 1365 uint32_t max_fields_count, uint32_t max_fields_size) 1366 { 1367 char *p; 1368 uint32_t i, buf_size; 1369 nxt_unit_buf_t *buf; 1370 nxt_unit_field_t *f, *src; 1371 nxt_unit_response_t *resp; 1372 nxt_unit_request_info_impl_t *req_impl; 1373 1374 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1375 1376 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1377 nxt_unit_req_warn(req, "realloc: response not init"); 1378 1379 return NXT_UNIT_ERROR; 1380 } 1381 1382 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1383 nxt_unit_req_warn(req, "realloc: response already sent"); 1384 1385 return NXT_UNIT_ERROR; 1386 } 1387 1388 if (nxt_slow_path(max_fields_count < req->response->fields_count)) { 1389 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small"); 1390 1391 return NXT_UNIT_ERROR; 1392 } 1393 1394 buf_size = sizeof(nxt_unit_response_t) 1395 + max_fields_count * sizeof(nxt_unit_field_t) 1396 + max_fields_size; 1397 1398 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size); 1399 1400 buf = nxt_unit_response_buf_alloc(req, buf_size); 1401 if (nxt_slow_path(buf == NULL)) { 1402 nxt_unit_req_warn(req, "realloc: new buf allocation failed"); 1403 return NXT_UNIT_ERROR; 1404 } 1405 1406 resp = (nxt_unit_response_t *) buf->start; 1407 1408 memset(resp, 0, sizeof(nxt_unit_response_t)); 1409 1410 resp->status = req->response->status; 1411 resp->content_length = req->response->content_length; 1412 1413 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t); 1414 f = resp->fields; 1415 1416 for (i = 0; i < req->response->fields_count; i++) { 1417 src = req->response->fields + i; 1418 1419 if (nxt_slow_path(src->skip != 0)) { 1420 continue; 1421 } 1422 1423 if (nxt_slow_path(src->name_length + src->value_length + 2 1424 > (uint32_t) (buf->end - p))) 1425 { 1426 nxt_unit_req_warn(req, "realloc: not enough space for field" 1427 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required", 1428 i, src, src->name_length, src->value_length); 1429 1430 goto fail; 1431 } 1432 1433 nxt_unit_sptr_set(&f->name, p); 1434 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length); 1435 *p++ = '\0'; 1436 1437 nxt_unit_sptr_set(&f->value, p); 1438 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length); 1439 *p++ = '\0'; 1440 1441 f->hash = src->hash; 1442 f->skip = 0; 1443 f->name_length = src->name_length; 1444 f->value_length = src->value_length; 1445 1446 resp->fields_count++; 1447 f++; 1448 } 1449 1450 if (req->response->piggyback_content_length > 0) { 1451 if (nxt_slow_path(req->response->piggyback_content_length 1452 > (uint32_t) (buf->end - p))) 1453 { 1454 nxt_unit_req_warn(req, "realloc: not enought space for content" 1455 " #%"PRIu32", %"PRIu32" required", 1456 i, req->response->piggyback_content_length); 1457 1458 goto fail; 1459 } 1460 1461 resp->piggyback_content_length = req->response->piggyback_content_length; 1462 1463 nxt_unit_sptr_set(&resp->piggyback_content, p); 1464 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), 1465 req->response->piggyback_content_length); 1466 } 1467 1468 buf->free = p; 1469 1470 nxt_unit_buf_free(req->response_buf); 1471 1472 req->response = resp; 1473 req->response_buf = buf; 1474 req->response_max_fields = max_fields_count; 1475 1476 return NXT_UNIT_OK; 1477 1478 fail: 1479 1480 nxt_unit_buf_free(buf); 1481 1482 return NXT_UNIT_ERROR; 1483 } 1484 1485 1486 int 1487 nxt_unit_response_is_init(nxt_unit_request_info_t *req) 1488 { 1489 nxt_unit_request_info_impl_t *req_impl; 1490 1491 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1492 1493 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT; 1494 } 1495 1496 1497 int 1498 nxt_unit_response_add_field(nxt_unit_request_info_t *req, 1499 const char *name, uint8_t name_length, 1500 const char *value, uint32_t value_length) 1501 { 1502 nxt_unit_buf_t *buf; 1503 nxt_unit_field_t *f; 1504 nxt_unit_response_t *resp; 1505 nxt_unit_request_info_impl_t *req_impl; 1506 1507 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1508 1509 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) { 1510 nxt_unit_req_warn(req, "add_field: response not initialized or " 1511 "already sent"); 1512 1513 return NXT_UNIT_ERROR; 1514 } 1515 1516 resp = req->response; 1517 1518 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { 1519 nxt_unit_req_warn(req, "add_field: too many response fields"); 1520 1521 return NXT_UNIT_ERROR; 1522 } 1523 1524 buf = req->response_buf; 1525 1526 if (nxt_slow_path(name_length + value_length + 2 1527 > (uint32_t) (buf->end - buf->free))) 1528 { 1529 nxt_unit_req_warn(req, "add_field: response buffer overflow"); 1530 1531 return NXT_UNIT_ERROR; 1532 } 1533 1534 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s", 1535 resp->fields_count, 1536 (int) name_length, name, 1537 (int) value_length, value); 1538 1539 f = resp->fields + resp->fields_count; 1540 1541 nxt_unit_sptr_set(&f->name, buf->free); 1542 buf->free = nxt_cpymem(buf->free, name, name_length); 1543 *buf->free++ = '\0'; 1544 1545 nxt_unit_sptr_set(&f->value, buf->free); 1546 buf->free = nxt_cpymem(buf->free, value, value_length); 1547 *buf->free++ = '\0'; 1548 1549 f->hash = nxt_unit_field_hash(name, name_length); 1550 f->skip = 0; 1551 f->name_length = name_length; 1552 f->value_length = value_length; 1553 1554 resp->fields_count++; 1555 1556 return NXT_UNIT_OK; 1557 } 1558 1559 1560 int 1561 nxt_unit_response_add_content(nxt_unit_request_info_t *req, 1562 const void* src, uint32_t size) 1563 { 1564 nxt_unit_buf_t *buf; 1565 nxt_unit_response_t *resp; 1566 nxt_unit_request_info_impl_t *req_impl; 1567 1568 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1569 1570 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1571 nxt_unit_req_warn(req, "add_content: response not initialized yet"); 1572 1573 return NXT_UNIT_ERROR; 1574 } 1575 1576 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1577 nxt_unit_req_warn(req, "add_content: response already sent"); 1578 1579 return NXT_UNIT_ERROR; 1580 } 1581 1582 buf = req->response_buf; 1583 1584 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) { 1585 nxt_unit_req_warn(req, "add_content: buffer overflow"); 1586 1587 return NXT_UNIT_ERROR; 1588 } 1589 1590 resp = req->response; 1591 1592 if (resp->piggyback_content_length == 0) { 1593 nxt_unit_sptr_set(&resp->piggyback_content, buf->free); 1594 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT; 1595 } 1596 1597 resp->piggyback_content_length += size; 1598 1599 buf->free = nxt_cpymem(buf->free, src, size); 1600 1601 return NXT_UNIT_OK; 1602 } 1603 1604 1605 int 1606 nxt_unit_response_send(nxt_unit_request_info_t *req) 1607 { 1608 int rc; 1609 nxt_unit_mmap_buf_t *mmap_buf; 1610 nxt_unit_request_info_impl_t *req_impl; 1611 1612 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1613 1614 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1615 nxt_unit_req_warn(req, "send: response is not initialized yet"); 1616 1617 return NXT_UNIT_ERROR; 1618 } 1619 1620 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1621 nxt_unit_req_warn(req, "send: response already sent"); 1622 1623 return NXT_UNIT_ERROR; 1624 } 1625 1626 if (req->request->websocket_handshake && req->response->status == 101) { 1627 nxt_unit_response_upgrade(req); 1628 } 1629 1630 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 1631 req->response->fields_count, 1632 (int) (req->response_buf->free 1633 - req->response_buf->start)); 1634 1635 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 1636 1637 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); 1638 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 1639 req->response = NULL; 1640 req->response_buf = NULL; 1641 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 1642 1643 nxt_unit_mmap_buf_release(mmap_buf); 1644 } 1645 1646 return rc; 1647 } 1648 1649 1650 int 1651 nxt_unit_response_is_sent(nxt_unit_request_info_t *req) 1652 { 1653 nxt_unit_request_info_impl_t *req_impl; 1654 1655 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1656 1657 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 1658 } 1659 1660 1661 nxt_unit_buf_t * 1662 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 1663 { 1664 int rc; 1665 nxt_unit_mmap_buf_t *mmap_buf; 1666 nxt_unit_request_info_impl_t *req_impl; 1667 1668 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 1669 nxt_unit_req_warn(req, "response_buf_alloc: " 1670 "requested buffer (%"PRIu32") too big", size); 1671 1672 return NULL; 1673 } 1674 1675 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 1676 1677 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1678 1679 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 1680 if (nxt_slow_path(mmap_buf == NULL)) { 1681 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf"); 1682 1683 return NULL; 1684 } 1685 1686 mmap_buf->req = req; 1687 1688 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); 1689 1690 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 1691 &req->response_port, size, mmap_buf); 1692 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1693 nxt_unit_mmap_buf_release(mmap_buf); 1694 1695 return NULL; 1696 } 1697 1698 return &mmap_buf->buf; 1699 } 1700 1701 1702 static nxt_unit_process_t * 1703 nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1704 { 1705 nxt_unit_impl_t *lib; 1706 1707 if (recv_msg->process != NULL) { 1708 return recv_msg->process; 1709 } 1710 1711 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1712 1713 pthread_mutex_lock(&lib->mutex); 1714 1715 recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0); 1716 1717 pthread_mutex_unlock(&lib->mutex); 1718 1719 if (recv_msg->process == NULL) { 1720 nxt_unit_warn(ctx, "#%"PRIu32": process %d not found", 1721 recv_msg->stream, (int) recv_msg->pid); 1722 } 1723 1724 return recv_msg->process; 1725 } 1726 1727 1728 static nxt_unit_mmap_buf_t * 1729 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 1730 { 1731 nxt_unit_mmap_buf_t *mmap_buf; 1732 nxt_unit_ctx_impl_t *ctx_impl; 1733 1734 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1735 1736 pthread_mutex_lock(&ctx_impl->mutex); 1737 1738 if (ctx_impl->free_buf == NULL) { 1739 pthread_mutex_unlock(&ctx_impl->mutex); 1740 1741 mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t)); 1742 if (nxt_slow_path(mmap_buf == NULL)) { 1743 return NULL; 1744 } 1745 1746 } else { 1747 mmap_buf = ctx_impl->free_buf; 1748 1749 nxt_unit_mmap_buf_remove(mmap_buf); 1750 1751 pthread_mutex_unlock(&ctx_impl->mutex); 1752 } 1753 1754 mmap_buf->ctx_impl = ctx_impl; 1755 1756 return mmap_buf; 1757 } 1758 1759 1760 static void 1761 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 1762 { 1763 nxt_unit_mmap_buf_remove(mmap_buf); 1764 1765 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); 1766 1767 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); 1768 1769 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex); 1770 } 1771 1772 1773 typedef struct { 1774 size_t len; 1775 const char *str; 1776 } nxt_unit_str_t; 1777 1778 1779 #define nxt_unit_str(str) { nxt_length(str), str } 1780 1781 1782 int 1783 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) 1784 { 1785 return req->request->websocket_handshake; 1786 } 1787 1788 1789 int 1790 nxt_unit_response_upgrade(nxt_unit_request_info_t *req) 1791 { 1792 int rc; 1793 nxt_unit_ctx_impl_t *ctx_impl; 1794 nxt_unit_request_info_impl_t *req_impl; 1795 1796 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1797 1798 if (nxt_slow_path(req_impl->websocket != 0)) { 1799 nxt_unit_req_debug(req, "upgrade: already upgraded"); 1800 1801 return NXT_UNIT_OK; 1802 } 1803 1804 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1805 nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); 1806 1807 return NXT_UNIT_ERROR; 1808 } 1809 1810 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1811 nxt_unit_req_warn(req, "upgrade: response already sent"); 1812 1813 return NXT_UNIT_ERROR; 1814 } 1815 1816 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1817 1818 rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl); 1819 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1820 nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); 1821 1822 return NXT_UNIT_ERROR; 1823 } 1824 1825 req_impl->websocket = 1; 1826 1827 req->response->status = 101; 1828 1829 return NXT_UNIT_OK; 1830 } 1831 1832 1833 int 1834 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) 1835 { 1836 nxt_unit_request_info_impl_t *req_impl; 1837 1838 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1839 1840 return req_impl->websocket; 1841 } 1842 1843 1844 nxt_unit_request_info_t * 1845 nxt_unit_get_request_info_from_data(void *data) 1846 { 1847 nxt_unit_request_info_impl_t *req_impl; 1848 1849 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); 1850 1851 return &req_impl->req; 1852 } 1853 1854 1855 int 1856 nxt_unit_buf_send(nxt_unit_buf_t *buf) 1857 { 1858 int rc; 1859 nxt_unit_mmap_buf_t *mmap_buf; 1860 nxt_unit_request_info_t *req; 1861 nxt_unit_request_info_impl_t *req_impl; 1862 1863 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 1864 1865 req = mmap_buf->req; 1866 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1867 1868 nxt_unit_req_debug(req, "buf_send: %d bytes", 1869 (int) (buf->free - buf->start)); 1870 1871 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1872 nxt_unit_req_warn(req, "buf_send: response not initialized yet"); 1873 1874 return NXT_UNIT_ERROR; 1875 } 1876 1877 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 1878 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 1879 1880 return NXT_UNIT_ERROR; 1881 } 1882 1883 if (nxt_fast_path(buf->free > buf->start)) { 1884 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); 1885 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1886 return rc; 1887 } 1888 } 1889 1890 nxt_unit_mmap_buf_release(mmap_buf); 1891 1892 return NXT_UNIT_OK; 1893 } 1894 1895 1896 static void 1897 nxt_unit_buf_send_done(nxt_unit_buf_t *buf) 1898 { 1899 int rc; 1900 nxt_unit_mmap_buf_t *mmap_buf; 1901 nxt_unit_request_info_t *req; 1902 nxt_unit_request_info_impl_t *req_impl; 1903 1904 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 1905 1906 req = mmap_buf->req; 1907 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1908 1909 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1); 1910 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 1911 nxt_unit_mmap_buf_release(mmap_buf); 1912 1913 nxt_unit_request_info_release(req); 1914 1915 } else { 1916 nxt_unit_request_done(req, rc); 1917 } 1918 } 1919 1920 1921 static int 1922 nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, 1923 nxt_unit_mmap_buf_t *mmap_buf, int last) 1924 { 1925 struct { 1926 nxt_port_msg_t msg; 1927 nxt_port_mmap_msg_t mmap_msg; 1928 } m; 1929 1930 u_char *end, *last_used, *first_free; 1931 ssize_t res; 1932 nxt_chunk_id_t first_free_chunk; 1933 nxt_unit_buf_t *buf; 1934 nxt_unit_impl_t *lib; 1935 nxt_port_mmap_header_t *hdr; 1936 1937 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1938 1939 buf = &mmap_buf->buf; 1940 hdr = mmap_buf->hdr; 1941 1942 m.mmap_msg.size = buf->free - buf->start; 1943 1944 m.msg.stream = stream; 1945 m.msg.pid = lib->pid; 1946 m.msg.reply_port = 0; 1947 m.msg.type = _NXT_PORT_MSG_DATA; 1948 m.msg.last = last != 0; 1949 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; 1950 m.msg.nf = 0; 1951 m.msg.mf = 0; 1952 m.msg.tracking = 0; 1953 1954 if (hdr != NULL) { 1955 m.mmap_msg.mmap_id = hdr->id; 1956 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); 1957 } 1958 1959 nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 1960 stream, 1961 (int) m.mmap_msg.mmap_id, 1962 (int) m.mmap_msg.chunk_id, 1963 (int) m.mmap_msg.size); 1964 1965 res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, 1966 m.msg.mmap ? sizeof(m) : sizeof(m.msg), 1967 NULL, 0); 1968 if (nxt_slow_path(res != sizeof(m))) { 1969 return NXT_UNIT_ERROR; 1970 } 1971 1972 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) { 1973 last_used = (u_char *) buf->free - 1; 1974 1975 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 1976 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 1977 end = (u_char *) buf->end; 1978 1979 nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free)); 1980 1981 buf->end = (char *) first_free; 1982 } 1983 1984 return NXT_UNIT_OK; 1985 } 1986 1987 1988 void 1989 nxt_unit_buf_free(nxt_unit_buf_t *buf) 1990 { 1991 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); 1992 } 1993 1994 1995 static void 1996 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) 1997 { 1998 if (nxt_fast_path(mmap_buf->hdr != NULL)) { 1999 nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start, 2000 mmap_buf->buf.end - mmap_buf->buf.start); 2001 } 2002 2003 nxt_unit_mmap_buf_release(mmap_buf); 2004 } 2005 2006 2007 nxt_unit_buf_t * 2008 nxt_unit_buf_next(nxt_unit_buf_t *buf) 2009 { 2010 nxt_unit_mmap_buf_t *mmap_buf; 2011 2012 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2013 2014 if (mmap_buf->next == NULL) { 2015 return NULL; 2016 } 2017 2018 return &mmap_buf->next->buf; 2019 } 2020 2021 2022 uint32_t 2023 nxt_unit_buf_max(void) 2024 { 2025 return PORT_MMAP_DATA_SIZE; 2026 } 2027 2028 2029 uint32_t 2030 nxt_unit_buf_min(void) 2031 { 2032 return PORT_MMAP_CHUNK_SIZE; 2033 } 2034 2035 2036 int 2037 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 2038 size_t size) 2039 { 2040 int rc; 2041 uint32_t part_size; 2042 const char *part_start; 2043 nxt_unit_mmap_buf_t mmap_buf; 2044 nxt_unit_request_info_impl_t *req_impl; 2045 2046 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2047 2048 part_start = start; 2049 2050 /* Check if response is not send yet. */ 2051 if (nxt_slow_path(req->response_buf)) { 2052 part_size = req->response_buf->end - req->response_buf->free; 2053 part_size = nxt_min(size, part_size); 2054 2055 rc = nxt_unit_response_add_content(req, part_start, part_size); 2056 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2057 return rc; 2058 } 2059 2060 rc = nxt_unit_response_send(req); 2061 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2062 return rc; 2063 } 2064 2065 size -= part_size; 2066 part_start += part_size; 2067 } 2068 2069 while (size > 0) { 2070 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 2071 2072 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 2073 &req->response_port, part_size, 2074 &mmap_buf); 2075 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2076 return rc; 2077 } 2078 2079 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 2080 part_start, part_size); 2081 2082 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); 2083 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2084 nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start, 2085 mmap_buf.buf.end - mmap_buf.buf.start); 2086 2087 return rc; 2088 } 2089 2090 size -= part_size; 2091 part_start += part_size; 2092 } 2093 2094 return NXT_UNIT_OK; 2095 } 2096 2097 2098 int 2099 nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 2100 nxt_unit_read_info_t *read_info) 2101 { 2102 int rc; 2103 ssize_t n; 2104 nxt_unit_buf_t *buf; 2105 2106 /* Check if response is not send yet. */ 2107 if (nxt_slow_path(req->response_buf)) { 2108 2109 /* Enable content in headers buf. */ 2110 rc = nxt_unit_response_add_content(req, "", 0); 2111 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2112 nxt_unit_req_error(req, "Failed to add piggyback content"); 2113 2114 return rc; 2115 } 2116 2117 buf = req->response_buf; 2118 2119 while (buf->end - buf->free > 0) { 2120 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2121 if (nxt_slow_path(n < 0)) { 2122 nxt_unit_req_error(req, "Read error"); 2123 2124 return NXT_UNIT_ERROR; 2125 } 2126 2127 /* Manually increase sizes. */ 2128 buf->free += n; 2129 req->response->piggyback_content_length += n; 2130 2131 if (read_info->eof) { 2132 break; 2133 } 2134 } 2135 2136 rc = nxt_unit_response_send(req); 2137 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2138 nxt_unit_req_error(req, "Failed to send headers with content"); 2139 2140 return rc; 2141 } 2142 2143 if (read_info->eof) { 2144 return NXT_UNIT_OK; 2145 } 2146 } 2147 2148 while (!read_info->eof) { 2149 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", 2150 read_info->buf_size); 2151 2152 buf = nxt_unit_response_buf_alloc(req, nxt_min(read_info->buf_size, 2153 PORT_MMAP_DATA_SIZE)); 2154 if (nxt_slow_path(buf == NULL)) { 2155 nxt_unit_req_error(req, "Failed to allocate buf for content"); 2156 2157 return NXT_UNIT_ERROR; 2158 } 2159 2160 while (!read_info->eof && buf->end > buf->free) { 2161 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2162 if (nxt_slow_path(n < 0)) { 2163 nxt_unit_req_error(req, "Read error"); 2164 2165 nxt_unit_buf_free(buf); 2166 2167 return NXT_UNIT_ERROR; 2168 } 2169 2170 buf->free += n; 2171 } 2172 2173 rc = nxt_unit_buf_send(buf); 2174 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2175 nxt_unit_req_error(req, "Failed to send content"); 2176 2177 return rc; 2178 } 2179 } 2180 2181 return NXT_UNIT_OK; 2182 } 2183 2184 2185 ssize_t 2186 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 2187 { 2188 return nxt_unit_buf_read(&req->content_buf, &req->content_length, 2189 dst, size); 2190 } 2191 2192 2193 static ssize_t 2194 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) 2195 { 2196 u_char *p; 2197 size_t rest, copy, read; 2198 nxt_unit_buf_t *buf; 2199 2200 p = dst; 2201 rest = size; 2202 2203 buf = *b; 2204 2205 while (buf != NULL) { 2206 copy = buf->end - buf->free; 2207 copy = nxt_min(rest, copy); 2208 2209 p = nxt_cpymem(p, buf->free, copy); 2210 2211 buf->free += copy; 2212 rest -= copy; 2213 2214 if (rest == 0) { 2215 if (buf->end == buf->free) { 2216 buf = nxt_unit_buf_next(buf); 2217 } 2218 2219 break; 2220 } 2221 2222 buf = nxt_unit_buf_next(buf); 2223 } 2224 2225 *b = buf; 2226 2227 read = size - rest; 2228 2229 *len -= read; 2230 2231 return read; 2232 } 2233 2234 2235 void 2236 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 2237 { 2238 ssize_t res; 2239 uint32_t size; 2240 nxt_port_msg_t msg; 2241 nxt_unit_impl_t *lib; 2242 nxt_unit_request_info_impl_t *req_impl; 2243 2244 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2245 2246 nxt_unit_req_debug(req, "done: %d", rc); 2247 2248 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2249 goto skip_response_send; 2250 } 2251 2252 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2253 2254 size = nxt_length("Content-Type") + nxt_length("text/plain"); 2255 2256 rc = nxt_unit_response_init(req, 200, 1, size); 2257 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2258 goto skip_response_send; 2259 } 2260 2261 rc = nxt_unit_response_add_field(req, "Content-Type", 2262 nxt_length("Content-Type"), 2263 "text/plain", nxt_length("text/plain")); 2264 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2265 goto skip_response_send; 2266 } 2267 } 2268 2269 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 2270 2271 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 2272 2273 nxt_unit_buf_send_done(req->response_buf); 2274 2275 return; 2276 } 2277 2278 skip_response_send: 2279 2280 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); 2281 2282 msg.stream = req_impl->stream; 2283 msg.pid = lib->pid; 2284 msg.reply_port = 0; 2285 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 2286 : _NXT_PORT_MSG_RPC_ERROR; 2287 msg.last = 1; 2288 msg.mmap = 0; 2289 msg.nf = 0; 2290 msg.mf = 0; 2291 msg.tracking = 0; 2292 2293 res = lib->callbacks.port_send(req->ctx, &req->response_port, 2294 &msg, sizeof(msg), NULL, 0); 2295 if (nxt_slow_path(res != sizeof(msg))) { 2296 nxt_unit_req_alert(req, "last message send failed: %s (%d)", 2297 strerror(errno), errno); 2298 } 2299 2300 nxt_unit_request_info_release(req); 2301 } 2302 2303 2304 int 2305 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 2306 uint8_t last, const void *start, size_t size) 2307 { 2308 const struct iovec iov = { (void *) start, size }; 2309 2310 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1); 2311 } 2312 2313 2314 int 2315 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, 2316 uint8_t last, const struct iovec *iov, int iovcnt) 2317 { 2318 int i, rc; 2319 size_t l, copy; 2320 uint32_t payload_len, buf_size; 2321 const uint8_t *b; 2322 nxt_unit_buf_t *buf; 2323 nxt_websocket_header_t *wh; 2324 2325 payload_len = 0; 2326 2327 for (i = 0; i < iovcnt; i++) { 2328 payload_len += iov[i].iov_len; 2329 } 2330 2331 buf_size = 10 + payload_len; 2332 2333 buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, 2334 PORT_MMAP_DATA_SIZE)); 2335 if (nxt_slow_path(buf == NULL)) { 2336 nxt_unit_req_error(req, "Failed to allocate buf for content"); 2337 2338 return NXT_UNIT_ERROR; 2339 } 2340 2341 buf->start[0] = 0; 2342 buf->start[1] = 0; 2343 2344 wh = (void *) buf->free; 2345 2346 buf->free = nxt_websocket_frame_init(wh, payload_len); 2347 wh->fin = last; 2348 wh->opcode = opcode; 2349 2350 for (i = 0; i < iovcnt; i++) { 2351 b = iov[i].iov_base; 2352 l = iov[i].iov_len; 2353 2354 while (l > 0) { 2355 copy = buf->end - buf->free; 2356 copy = nxt_min(l, copy); 2357 2358 buf->free = nxt_cpymem(buf->free, b, copy); 2359 b += copy; 2360 l -= copy; 2361 2362 if (l > 0) { 2363 buf_size -= buf->end - buf->start; 2364 2365 rc = nxt_unit_buf_send(buf); 2366 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2367 nxt_unit_req_error(req, "Failed to send content"); 2368 2369 return NXT_UNIT_ERROR; 2370 } 2371 2372 buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, 2373 PORT_MMAP_DATA_SIZE)); 2374 if (nxt_slow_path(buf == NULL)) { 2375 nxt_unit_req_error(req, 2376 "Failed to allocate buf for content"); 2377 2378 return NXT_UNIT_ERROR; 2379 } 2380 } 2381 } 2382 } 2383 2384 if (buf->free > buf->start) { 2385 rc = nxt_unit_buf_send(buf); 2386 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2387 nxt_unit_req_error(req, "Failed to send content"); 2388 } 2389 } 2390 2391 return rc; 2392 } 2393 2394 2395 ssize_t 2396 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, 2397 size_t size) 2398 { 2399 ssize_t res; 2400 uint8_t *b; 2401 uint64_t i, d; 2402 2403 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length, 2404 dst, size); 2405 2406 if (ws->mask == NULL) { 2407 return res; 2408 } 2409 2410 b = dst; 2411 d = (ws->payload_len - ws->content_length - res) % 4; 2412 2413 for (i = 0; i < (uint64_t) res; i++) { 2414 b[i] ^= ws->mask[ (i + d) % 4 ]; 2415 } 2416 2417 return res; 2418 } 2419 2420 2421 int 2422 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) 2423 { 2424 char *b; 2425 size_t size; 2426 nxt_unit_websocket_frame_impl_t *ws_impl; 2427 2428 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 2429 2430 if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) { 2431 return NXT_UNIT_OK; 2432 } 2433 2434 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start; 2435 2436 b = malloc(size); 2437 if (nxt_slow_path(b == NULL)) { 2438 return NXT_UNIT_ERROR; 2439 } 2440 2441 memcpy(b, ws_impl->buf->buf.start, size); 2442 2443 ws_impl->buf->buf.start = b; 2444 ws_impl->buf->buf.free = b; 2445 ws_impl->buf->buf.end = b + size; 2446 2447 ws_impl->retain_buf = b; 2448 2449 return NXT_UNIT_OK; 2450 } 2451 2452 2453 void 2454 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) 2455 { 2456 nxt_unit_websocket_frame_release(ws); 2457 } 2458 2459 2460 static nxt_port_mmap_header_t * 2461 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 2462 nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n) 2463 { 2464 int res, nchunks, i; 2465 nxt_unit_mmap_t *mm, *mm_end; 2466 nxt_port_mmap_header_t *hdr; 2467 2468 pthread_mutex_lock(&process->outgoing.mutex); 2469 2470 mm_end = process->outgoing.elts + process->outgoing.size; 2471 2472 for (mm = process->outgoing.elts; mm < mm_end; mm++) { 2473 hdr = mm->hdr; 2474 2475 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) { 2476 continue; 2477 } 2478 2479 *c = 0; 2480 2481 while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) { 2482 nchunks = 1; 2483 2484 while (nchunks < n) { 2485 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, 2486 *c + nchunks); 2487 2488 if (res == 0) { 2489 for (i = 0; i < nchunks; i++) { 2490 nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i); 2491 } 2492 2493 *c += nchunks + 1; 2494 nchunks = 0; 2495 break; 2496 } 2497 2498 nchunks++; 2499 } 2500 2501 if (nchunks == n) { 2502 goto unlock; 2503 } 2504 } 2505 } 2506 2507 *c = 0; 2508 hdr = nxt_unit_new_mmap(ctx, process, port_id, n); 2509 2510 unlock: 2511 2512 pthread_mutex_unlock(&process->outgoing.mutex); 2513 2514 return hdr; 2515 } 2516 2517 2518 static nxt_unit_mmap_t * 2519 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) 2520 { 2521 uint32_t cap; 2522 2523 cap = mmaps->cap; 2524 2525 if (cap == 0) { 2526 cap = i + 1; 2527 } 2528 2529 while (i + 1 > cap) { 2530 2531 if (cap < 16) { 2532 cap = cap * 2; 2533 2534 } else { 2535 cap = cap + cap / 2; 2536 } 2537 } 2538 2539 if (cap != mmaps->cap) { 2540 2541 mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts)); 2542 if (nxt_slow_path(mmaps->elts == NULL)) { 2543 return NULL; 2544 } 2545 2546 memset(mmaps->elts + mmaps->cap, 0, 2547 sizeof(*mmaps->elts) * (cap - mmaps->cap)); 2548 2549 mmaps->cap = cap; 2550 } 2551 2552 if (i + 1 > mmaps->size) { 2553 mmaps->size = i + 1; 2554 } 2555 2556 return mmaps->elts + i; 2557 } 2558 2559 2560 static nxt_port_mmap_header_t * 2561 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 2562 nxt_unit_port_id_t *port_id, int n) 2563 { 2564 int i, fd, rc; 2565 void *mem; 2566 char name[64]; 2567 nxt_unit_mmap_t *mm; 2568 nxt_unit_impl_t *lib; 2569 nxt_port_mmap_header_t *hdr; 2570 2571 lib = process->lib; 2572 2573 mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size); 2574 if (nxt_slow_path(mm == NULL)) { 2575 nxt_unit_warn(ctx, "failed to add mmap to outgoing array"); 2576 2577 return NULL; 2578 } 2579 2580 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", 2581 lib->pid, (void *) pthread_self()); 2582 2583 #if (NXT_HAVE_MEMFD_CREATE) 2584 2585 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 2586 if (nxt_slow_path(fd == -1)) { 2587 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, 2588 strerror(errno), errno); 2589 2590 goto remove_fail; 2591 } 2592 2593 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); 2594 2595 #elif (NXT_HAVE_SHM_OPEN_ANON) 2596 2597 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 2598 if (nxt_slow_path(fd == -1)) { 2599 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", 2600 strerror(errno), errno); 2601 2602 goto remove_fail; 2603 } 2604 2605 #elif (NXT_HAVE_SHM_OPEN) 2606 2607 /* Just in case. */ 2608 shm_unlink(name); 2609 2610 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 2611 if (nxt_slow_path(fd == -1)) { 2612 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, 2613 strerror(errno), errno); 2614 2615 goto remove_fail; 2616 } 2617 2618 if (nxt_slow_path(shm_unlink(name) == -1)) { 2619 nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name, 2620 strerror(errno), errno); 2621 } 2622 2623 #else 2624 2625 #error No working shared memory implementation. 2626 2627 #endif 2628 2629 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { 2630 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, 2631 strerror(errno), errno); 2632 2633 goto remove_fail; 2634 } 2635 2636 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2637 if (nxt_slow_path(mem == MAP_FAILED)) { 2638 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, 2639 strerror(errno), errno); 2640 2641 goto remove_fail; 2642 } 2643 2644 mm->hdr = mem; 2645 hdr = mem; 2646 2647 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 2648 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 2649 2650 hdr->id = process->outgoing.size - 1; 2651 hdr->src_pid = lib->pid; 2652 hdr->dst_pid = process->pid; 2653 hdr->sent_over = port_id->id; 2654 2655 /* Mark first n chunk(s) as busy */ 2656 for (i = 0; i < n; i++) { 2657 nxt_port_mmap_set_chunk_busy(hdr->free_map, i); 2658 } 2659 2660 /* Mark as busy chunk followed the last available chunk. */ 2661 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 2662 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 2663 2664 pthread_mutex_unlock(&process->outgoing.mutex); 2665 2666 rc = nxt_unit_send_mmap(ctx, port_id, fd); 2667 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2668 munmap(mem, PORT_MMAP_SIZE); 2669 hdr = NULL; 2670 2671 } else { 2672 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", 2673 hdr->id, (int) lib->pid, (int) process->pid); 2674 } 2675 2676 close(fd); 2677 2678 pthread_mutex_lock(&process->outgoing.mutex); 2679 2680 if (nxt_fast_path(hdr != NULL)) { 2681 return hdr; 2682 } 2683 2684 remove_fail: 2685 2686 process->outgoing.size--; 2687 2688 return NULL; 2689 } 2690 2691 2692 static int 2693 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) 2694 { 2695 ssize_t res; 2696 nxt_port_msg_t msg; 2697 nxt_unit_impl_t *lib; 2698 union { 2699 struct cmsghdr cm; 2700 char space[CMSG_SPACE(sizeof(int))]; 2701 } cmsg; 2702 2703 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2704 2705 msg.stream = 0; 2706 msg.pid = lib->pid; 2707 msg.reply_port = 0; 2708 msg.type = _NXT_PORT_MSG_MMAP; 2709 msg.last = 0; 2710 msg.mmap = 0; 2711 msg.nf = 0; 2712 msg.mf = 0; 2713 msg.tracking = 0; 2714 2715 /* 2716 * Fill all padding fields with 0. 2717 * Code in Go 1.11 validate cmsghdr using padding field as part of len. 2718 * See Cmsghdr definition and socketControlMessageHeaderAndData function. 2719 */ 2720 memset(&cmsg, 0, sizeof(cmsg)); 2721 2722 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 2723 cmsg.cm.cmsg_level = SOL_SOCKET; 2724 cmsg.cm.cmsg_type = SCM_RIGHTS; 2725 2726 /* 2727 * memcpy() is used instead of simple 2728 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 2729 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 2730 * dereferencing type-punned pointer will break strict-aliasing rules 2731 * 2732 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 2733 * in the same simple assignment as in the code above. 2734 */ 2735 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 2736 2737 res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), 2738 &cmsg, sizeof(cmsg)); 2739 if (nxt_slow_path(res != sizeof(msg))) { 2740 nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)", 2741 (int) port_id->pid, strerror(errno), errno); 2742 2743 return NXT_UNIT_ERROR; 2744 } 2745 2746 return NXT_UNIT_OK; 2747 } 2748 2749 2750 static int 2751 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 2752 nxt_unit_port_id_t *port_id, uint32_t size, 2753 nxt_unit_mmap_buf_t *mmap_buf) 2754 { 2755 uint32_t nchunks; 2756 nxt_chunk_id_t c; 2757 nxt_port_mmap_header_t *hdr; 2758 2759 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 2760 2761 hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks); 2762 if (nxt_slow_path(hdr == NULL)) { 2763 return NXT_UNIT_ERROR; 2764 } 2765 2766 mmap_buf->hdr = hdr; 2767 mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); 2768 mmap_buf->buf.free = mmap_buf->buf.start; 2769 mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; 2770 mmap_buf->port_id = *port_id; 2771 2772 nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", 2773 (int) hdr->id, (int) c, 2774 (int) (nchunks * PORT_MMAP_CHUNK_SIZE)); 2775 2776 return NXT_UNIT_OK; 2777 } 2778 2779 2780 static int 2781 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) 2782 { 2783 int rc; 2784 void *mem; 2785 struct stat mmap_stat; 2786 nxt_unit_mmap_t *mm; 2787 nxt_unit_impl_t *lib; 2788 nxt_unit_process_t *process; 2789 nxt_port_mmap_header_t *hdr; 2790 2791 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2792 2793 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); 2794 2795 pthread_mutex_lock(&lib->mutex); 2796 2797 process = nxt_unit_process_find(ctx, pid, 0); 2798 2799 pthread_mutex_unlock(&lib->mutex); 2800 2801 if (nxt_slow_path(process == NULL)) { 2802 nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d", 2803 (int) pid, fd); 2804 2805 return NXT_UNIT_ERROR; 2806 } 2807 2808 rc = NXT_UNIT_ERROR; 2809 2810 if (fstat(fd, &mmap_stat) == -1) { 2811 nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, 2812 strerror(errno), errno); 2813 2814 goto fail; 2815 } 2816 2817 mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE, 2818 MAP_SHARED, fd, 0); 2819 if (nxt_slow_path(mem == MAP_FAILED)) { 2820 nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)", 2821 strerror(errno), errno); 2822 2823 goto fail; 2824 } 2825 2826 hdr = mem; 2827 2828 if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) { 2829 2830 nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " 2831 "detected: %d != %d or %d != %d", (int) hdr->src_pid, 2832 (int) pid, (int) hdr->dst_pid, (int) lib->pid); 2833 2834 munmap(mem, PORT_MMAP_SIZE); 2835 2836 goto fail; 2837 } 2838 2839 pthread_mutex_lock(&process->incoming.mutex); 2840 2841 mm = nxt_unit_mmap_at(&process->incoming, hdr->id); 2842 if (nxt_slow_path(mm == NULL)) { 2843 nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array"); 2844 2845 munmap(mem, PORT_MMAP_SIZE); 2846 2847 } else { 2848 mm->hdr = hdr; 2849 2850 hdr->sent_over = 0xFFFFu; 2851 2852 rc = NXT_UNIT_OK; 2853 } 2854 2855 pthread_mutex_unlock(&process->incoming.mutex); 2856 2857 fail: 2858 2859 nxt_unit_process_use(ctx, process, -1); 2860 2861 return rc; 2862 } 2863 2864 2865 static void 2866 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) 2867 { 2868 pthread_mutex_init(&mmaps->mutex, NULL); 2869 2870 mmaps->size = 0; 2871 mmaps->cap = 0; 2872 mmaps->elts = NULL; 2873 } 2874 2875 2876 static void 2877 nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i) 2878 { 2879 long c; 2880 2881 c = nxt_atomic_fetch_add(&process->use_count, i); 2882 2883 if (i < 0 && c == -i) { 2884 nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid); 2885 2886 nxt_unit_mmaps_destroy(&process->incoming); 2887 nxt_unit_mmaps_destroy(&process->outgoing); 2888 2889 free(process); 2890 } 2891 } 2892 2893 2894 static void 2895 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) 2896 { 2897 nxt_unit_mmap_t *mm, *end; 2898 2899 if (mmaps->elts != NULL) { 2900 end = mmaps->elts + mmaps->size; 2901 2902 for (mm = mmaps->elts; mm < end; mm++) { 2903 munmap(mm->hdr, PORT_MMAP_SIZE); 2904 } 2905 2906 free(mmaps->elts); 2907 } 2908 2909 pthread_mutex_destroy(&mmaps->mutex); 2910 } 2911 2912 2913 static nxt_port_mmap_header_t * 2914 nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 2915 uint32_t id) 2916 { 2917 nxt_port_mmap_header_t *hdr; 2918 2919 if (nxt_fast_path(process->incoming.size > id)) { 2920 hdr = process->incoming.elts[id].hdr; 2921 2922 } else { 2923 hdr = NULL; 2924 } 2925 2926 return hdr; 2927 } 2928 2929 2930 static int 2931 nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 2932 { 2933 int rc; 2934 nxt_chunk_id_t c; 2935 nxt_unit_process_t *process; 2936 nxt_port_mmap_header_t *hdr; 2937 nxt_port_mmap_tracking_msg_t *tracking_msg; 2938 2939 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 2940 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", 2941 recv_msg->stream, (int) recv_msg->size); 2942 2943 return 0; 2944 } 2945 2946 tracking_msg = recv_msg->start; 2947 2948 recv_msg->start = tracking_msg + 1; 2949 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); 2950 2951 process = nxt_unit_msg_get_process(ctx, recv_msg); 2952 if (nxt_slow_path(process == NULL)) { 2953 return 0; 2954 } 2955 2956 pthread_mutex_lock(&process->incoming.mutex); 2957 2958 hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id); 2959 if (nxt_slow_path(hdr == NULL)) { 2960 pthread_mutex_unlock(&process->incoming.mutex); 2961 2962 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " 2963 "invalid mmap id %d,%"PRIu32, 2964 recv_msg->stream, (int) process->pid, 2965 tracking_msg->mmap_id); 2966 2967 return 0; 2968 } 2969 2970 c = tracking_msg->tracking_id; 2971 rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); 2972 2973 if (rc == 0) { 2974 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", 2975 recv_msg->stream); 2976 2977 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 2978 } 2979 2980 pthread_mutex_unlock(&process->incoming.mutex); 2981 2982 return rc; 2983 } 2984 2985 2986 static int 2987 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 2988 { 2989 void *start; 2990 uint32_t size; 2991 nxt_unit_process_t *process; 2992 nxt_unit_mmap_buf_t *b, **incoming_tail; 2993 nxt_port_mmap_msg_t *mmap_msg, *end; 2994 nxt_port_mmap_header_t *hdr; 2995 2996 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 2997 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", 2998 recv_msg->stream, (int) recv_msg->size); 2999 3000 return NXT_UNIT_ERROR; 3001 } 3002 3003 process = nxt_unit_msg_get_process(ctx, recv_msg); 3004 if (nxt_slow_path(process == NULL)) { 3005 return NXT_UNIT_ERROR; 3006 } 3007 3008 mmap_msg = recv_msg->start; 3009 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 3010 3011 incoming_tail = &recv_msg->incoming_buf; 3012 3013 pthread_mutex_lock(&process->incoming.mutex); 3014 3015 for (; mmap_msg < end; mmap_msg++) { 3016 hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id); 3017 if (nxt_slow_path(hdr == NULL)) { 3018 pthread_mutex_unlock(&process->incoming.mutex); 3019 3020 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " 3021 "invalid mmap id %d,%"PRIu32, 3022 recv_msg->stream, (int) process->pid, 3023 mmap_msg->mmap_id); 3024 3025 return NXT_UNIT_ERROR; 3026 } 3027 3028 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 3029 size = mmap_msg->size; 3030 3031 if (recv_msg->start == mmap_msg) { 3032 recv_msg->start = start; 3033 recv_msg->size = size; 3034 } 3035 3036 b = nxt_unit_mmap_buf_get(ctx); 3037 if (nxt_slow_path(b == NULL)) { 3038 pthread_mutex_unlock(&process->incoming.mutex); 3039 3040 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", 3041 recv_msg->stream); 3042 3043 nxt_unit_mmap_release(hdr, start, size); 3044 3045 return NXT_UNIT_ERROR; 3046 } 3047 3048 nxt_unit_mmap_buf_insert(incoming_tail, b); 3049 incoming_tail = &b->next; 3050 3051 b->buf.start = start; 3052 b->buf.free = start; 3053 b->buf.end = b->buf.start + size; 3054 b->hdr = hdr; 3055 3056 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", 3057 recv_msg->stream, 3058 start, (int) size, 3059 (int) hdr->src_pid, (int) hdr->dst_pid, 3060 (int) hdr->id, (int) mmap_msg->chunk_id, 3061 (int) mmap_msg->size); 3062 } 3063 3064 pthread_mutex_unlock(&process->incoming.mutex); 3065 3066 return NXT_UNIT_OK; 3067 } 3068 3069 3070 static int 3071 nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size) 3072 { 3073 u_char *p, *end; 3074 nxt_chunk_id_t c; 3075 3076 memset(start, 0xA5, size); 3077 3078 p = start; 3079 end = p + size; 3080 c = nxt_port_mmap_chunk_id(hdr, p); 3081 3082 while (p < end) { 3083 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 3084 3085 p += PORT_MMAP_CHUNK_SIZE; 3086 c++; 3087 } 3088 3089 return NXT_UNIT_OK; 3090 } 3091 3092 3093 static nxt_int_t 3094 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 3095 { 3096 nxt_process_t *process; 3097 3098 process = data; 3099 3100 if (lhq->key.length == sizeof(pid_t) 3101 && *(pid_t *) lhq->key.start == process->pid) 3102 { 3103 return NXT_OK; 3104 } 3105 3106 return NXT_DECLINED; 3107 } 3108 3109 3110 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 3111 NXT_LVLHSH_DEFAULT, 3112 nxt_unit_lvlhsh_pid_test, 3113 nxt_lvlhsh_alloc, 3114 nxt_lvlhsh_free, 3115 }; 3116 3117 3118 static inline void 3119 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) 3120 { 3121 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 3122 lhq->key.length = sizeof(*pid); 3123 lhq->key.start = (u_char *) pid; 3124 lhq->proto = &lvlhsh_processes_proto; 3125 } 3126 3127 3128 static nxt_unit_process_t * 3129 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) 3130 { 3131 nxt_unit_impl_t *lib; 3132 nxt_unit_process_t *process; 3133 nxt_lvlhsh_query_t lhq; 3134 3135 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3136 3137 nxt_unit_process_lhq_pid(&lhq, &pid); 3138 3139 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { 3140 process = lhq.value; 3141 nxt_unit_process_use(ctx, process, 1); 3142 3143 return process; 3144 } 3145 3146 process = malloc(sizeof(nxt_unit_process_t)); 3147 if (nxt_slow_path(process == NULL)) { 3148 nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid); 3149 3150 return NULL; 3151 } 3152 3153 process->pid = pid; 3154 process->use_count = 1; 3155 process->next_port_id = 0; 3156 process->lib = lib; 3157 3158 nxt_queue_init(&process->ports); 3159 3160 nxt_unit_mmaps_init(&process->incoming); 3161 nxt_unit_mmaps_init(&process->outgoing); 3162 3163 lhq.replace = 0; 3164 lhq.value = process; 3165 3166 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) { 3167 3168 case NXT_OK: 3169 break; 3170 3171 default: 3172 nxt_unit_warn(ctx, "process %d insert failed", (int) pid); 3173 3174 pthread_mutex_destroy(&process->outgoing.mutex); 3175 pthread_mutex_destroy(&process->incoming.mutex); 3176 free(process); 3177 process = NULL; 3178 break; 3179 } 3180 3181 nxt_unit_process_use(ctx, process, 1); 3182 3183 return process; 3184 } 3185 3186 3187 static nxt_unit_process_t * 3188 nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove) 3189 { 3190 int rc; 3191 nxt_unit_impl_t *lib; 3192 nxt_unit_process_t *process; 3193 nxt_lvlhsh_query_t lhq; 3194 3195 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3196 3197 nxt_unit_process_lhq_pid(&lhq, &pid); 3198 3199 if (remove) { 3200 rc = nxt_lvlhsh_delete(&lib->processes, &lhq); 3201 3202 } else { 3203 rc = nxt_lvlhsh_find(&lib->processes, &lhq); 3204 } 3205 3206 if (rc == NXT_OK) { 3207 process = lhq.value; 3208 3209 if (!remove) { 3210 nxt_unit_process_use(ctx, process, 1); 3211 } 3212 3213 return process; 3214 } 3215 3216 return NULL; 3217 } 3218 3219 3220 static nxt_unit_process_t * 3221 nxt_unit_process_pop_first(nxt_unit_impl_t *lib) 3222 { 3223 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL); 3224 } 3225 3226 3227 int 3228 nxt_unit_run(nxt_unit_ctx_t *ctx) 3229 { 3230 int rc; 3231 nxt_unit_impl_t *lib; 3232 3233 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3234 rc = NXT_UNIT_OK; 3235 3236 while (nxt_fast_path(lib->online)) { 3237 rc = nxt_unit_run_once(ctx); 3238 } 3239 3240 return rc; 3241 } 3242 3243 3244 int 3245 nxt_unit_run_once(nxt_unit_ctx_t *ctx) 3246 { 3247 int rc; 3248 char buf[4096]; 3249 char oob[256]; 3250 ssize_t rsize; 3251 nxt_unit_impl_t *lib; 3252 nxt_unit_ctx_impl_t *ctx_impl; 3253 3254 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3255 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3256 3257 memset(oob, 0, sizeof(struct cmsghdr)); 3258 3259 if (ctx_impl->read_port_fd != -1) { 3260 rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd, 3261 buf, sizeof(buf), 3262 oob, sizeof(oob)); 3263 } else { 3264 rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, 3265 buf, sizeof(buf), 3266 oob, sizeof(oob)); 3267 } 3268 3269 if (nxt_fast_path(rsize > 0)) { 3270 rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize, 3271 oob, sizeof(oob)); 3272 3273 #if (NXT_DEBUG) 3274 memset(buf, 0xAC, rsize); 3275 #endif 3276 3277 } else { 3278 rc = NXT_UNIT_ERROR; 3279 } 3280 3281 return rc; 3282 } 3283 3284 3285 void 3286 nxt_unit_done(nxt_unit_ctx_t *ctx) 3287 { 3288 nxt_unit_impl_t *lib; 3289 nxt_unit_process_t *process; 3290 nxt_unit_ctx_impl_t *ctx_impl; 3291 3292 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3293 3294 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { 3295 3296 nxt_unit_ctx_free(&ctx_impl->ctx); 3297 3298 } nxt_queue_loop; 3299 3300 for ( ;; ) { 3301 pthread_mutex_lock(&lib->mutex); 3302 3303 process = nxt_unit_process_pop_first(lib); 3304 if (process == NULL) { 3305 pthread_mutex_unlock(&lib->mutex); 3306 3307 break; 3308 } 3309 3310 nxt_unit_remove_process(ctx, process); 3311 } 3312 3313 pthread_mutex_destroy(&lib->mutex); 3314 3315 free(lib); 3316 } 3317 3318 3319 nxt_unit_ctx_t * 3320 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) 3321 { 3322 int rc, fd; 3323 nxt_unit_impl_t *lib; 3324 nxt_unit_port_id_t new_port_id; 3325 nxt_unit_ctx_impl_t *new_ctx; 3326 3327 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3328 3329 new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size); 3330 if (nxt_slow_path(new_ctx == NULL)) { 3331 nxt_unit_warn(ctx, "failed to allocate context"); 3332 3333 return NULL; 3334 } 3335 3336 rc = nxt_unit_create_port(ctx, &new_port_id, &fd); 3337 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3338 free(new_ctx); 3339 3340 return NULL; 3341 } 3342 3343 rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd); 3344 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3345 lib->callbacks.remove_port(ctx, &new_port_id); 3346 3347 close(fd); 3348 3349 free(new_ctx); 3350 3351 return NULL; 3352 } 3353 3354 close(fd); 3355 3356 rc = nxt_unit_ctx_init(lib, new_ctx, data); 3357 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3358 lib->callbacks.remove_port(ctx, &new_port_id); 3359 3360 free(new_ctx); 3361 3362 return NULL; 3363 } 3364 3365 new_ctx->read_port_id = new_port_id; 3366 3367 return &new_ctx->ctx; 3368 } 3369 3370 3371 void 3372 nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) 3373 { 3374 nxt_unit_impl_t *lib; 3375 nxt_unit_ctx_impl_t *ctx_impl; 3376 nxt_unit_mmap_buf_t *mmap_buf; 3377 nxt_unit_request_info_impl_t *req_impl; 3378 nxt_unit_websocket_frame_impl_t *ws_impl; 3379 3380 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3381 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3382 3383 nxt_queue_each(req_impl, &ctx_impl->active_req, 3384 nxt_unit_request_info_impl_t, link) 3385 { 3386 nxt_unit_req_warn(&req_impl->req, "active request on ctx free"); 3387 3388 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR); 3389 3390 } nxt_queue_loop; 3391 3392 nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]); 3393 nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]); 3394 3395 while (ctx_impl->free_buf != NULL) { 3396 mmap_buf = ctx_impl->free_buf; 3397 nxt_unit_mmap_buf_remove(mmap_buf); 3398 free(mmap_buf); 3399 } 3400 3401 nxt_queue_each(req_impl, &ctx_impl->free_req, 3402 nxt_unit_request_info_impl_t, link) 3403 { 3404 nxt_unit_request_info_free(req_impl); 3405 3406 } nxt_queue_loop; 3407 3408 nxt_queue_each(ws_impl, &ctx_impl->free_ws, 3409 nxt_unit_websocket_frame_impl_t, link) 3410 { 3411 nxt_unit_websocket_frame_free(ws_impl); 3412 3413 } nxt_queue_loop; 3414 3415 pthread_mutex_destroy(&ctx_impl->mutex); 3416 3417 nxt_queue_remove(&ctx_impl->link); 3418 3419 if (ctx_impl != &lib->main_ctx) { 3420 free(ctx_impl); 3421 } 3422 } 3423 3424 3425 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */ 3426 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET) 3427 #define NXT_UNIX_SOCKET SOCK_SEQPACKET 3428 #else 3429 #define NXT_UNIX_SOCKET SOCK_DGRAM 3430 #endif 3431 3432 3433 void 3434 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) 3435 { 3436 nxt_unit_port_hash_id_t port_hash_id; 3437 3438 port_hash_id.pid = pid; 3439 port_hash_id.id = id; 3440 3441 port_id->pid = pid; 3442 port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id)); 3443 port_id->id = id; 3444 } 3445 3446 3447 int 3448 nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, 3449 nxt_unit_port_id_t *port_id) 3450 { 3451 int rc, fd; 3452 nxt_unit_impl_t *lib; 3453 nxt_unit_port_id_t new_port_id; 3454 3455 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3456 3457 rc = nxt_unit_create_port(ctx, &new_port_id, &fd); 3458 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3459 return rc; 3460 } 3461 3462 rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd); 3463 3464 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 3465 *port_id = new_port_id; 3466 3467 } else { 3468 lib->callbacks.remove_port(ctx, &new_port_id); 3469 } 3470 3471 close(fd); 3472 3473 return rc; 3474 } 3475 3476 3477 static int 3478 nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) 3479 { 3480 int rc, port_sockets[2]; 3481 nxt_unit_impl_t *lib; 3482 nxt_unit_port_t new_port; 3483 nxt_unit_process_t *process; 3484 3485 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3486 3487 rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets); 3488 if (nxt_slow_path(rc != 0)) { 3489 nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", 3490 strerror(errno), errno); 3491 3492 return NXT_UNIT_ERROR; 3493 } 3494 3495 nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", 3496 port_sockets[0], port_sockets[1]); 3497 3498 pthread_mutex_lock(&lib->mutex); 3499 3500 process = nxt_unit_process_get(ctx, lib->pid); 3501 if (nxt_slow_path(process == NULL)) { 3502 pthread_mutex_unlock(&lib->mutex); 3503 3504 close(port_sockets[0]); 3505 close(port_sockets[1]); 3506 3507 return NXT_UNIT_ERROR; 3508 } 3509 3510 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); 3511 3512 new_port.in_fd = port_sockets[0]; 3513 new_port.out_fd = -1; 3514 new_port.data = NULL; 3515 3516 pthread_mutex_unlock(&lib->mutex); 3517 3518 nxt_unit_process_use(ctx, process, -1); 3519 3520 rc = lib->callbacks.add_port(ctx, &new_port); 3521 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3522 nxt_unit_warn(ctx, "create_port: add_port() failed"); 3523 3524 close(port_sockets[0]); 3525 close(port_sockets[1]); 3526 3527 return rc; 3528 } 3529 3530 *port_id = new_port.id; 3531 *fd = port_sockets[1]; 3532 3533 return rc; 3534 } 3535 3536 3537 static int 3538 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, 3539 nxt_unit_port_id_t *new_port, int fd) 3540 { 3541 ssize_t res; 3542 nxt_unit_impl_t *lib; 3543 3544 struct { 3545 nxt_port_msg_t msg; 3546 nxt_port_msg_new_port_t new_port; 3547 } m; 3548 3549 union { 3550 struct cmsghdr cm; 3551 char space[CMSG_SPACE(sizeof(int))]; 3552 } cmsg; 3553 3554 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3555 3556 m.msg.stream = 0; 3557 m.msg.pid = lib->pid; 3558 m.msg.reply_port = 0; 3559 m.msg.type = _NXT_PORT_MSG_NEW_PORT; 3560 m.msg.last = 0; 3561 m.msg.mmap = 0; 3562 m.msg.nf = 0; 3563 m.msg.mf = 0; 3564 m.msg.tracking = 0; 3565 3566 m.new_port.id = new_port->id; 3567 m.new_port.pid = new_port->pid; 3568 m.new_port.type = NXT_PROCESS_WORKER; 3569 m.new_port.max_size = 16 * 1024; 3570 m.new_port.max_share = 64 * 1024; 3571 3572 memset(&cmsg, 0, sizeof(cmsg)); 3573 3574 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 3575 cmsg.cm.cmsg_level = SOL_SOCKET; 3576 cmsg.cm.cmsg_type = SCM_RIGHTS; 3577 3578 /* 3579 * memcpy() is used instead of simple 3580 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 3581 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 3582 * dereferencing type-punned pointer will break strict-aliasing rules 3583 * 3584 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 3585 * in the same simple assignment as in the code above. 3586 */ 3587 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 3588 3589 res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m), 3590 &cmsg, sizeof(cmsg)); 3591 3592 return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR; 3593 } 3594 3595 3596 int 3597 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 3598 { 3599 int rc; 3600 nxt_unit_impl_t *lib; 3601 nxt_unit_process_t *process; 3602 nxt_unit_port_impl_t *new_port; 3603 3604 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3605 3606 nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", 3607 port->id.pid, port->id.id, 3608 port->in_fd, port->out_fd); 3609 3610 pthread_mutex_lock(&lib->mutex); 3611 3612 process = nxt_unit_process_get(ctx, port->id.pid); 3613 if (nxt_slow_path(process == NULL)) { 3614 rc = NXT_UNIT_ERROR; 3615 goto unlock; 3616 } 3617 3618 if (port->id.id >= process->next_port_id) { 3619 process->next_port_id = port->id.id + 1; 3620 } 3621 3622 new_port = malloc(sizeof(nxt_unit_port_impl_t)); 3623 if (nxt_slow_path(new_port == NULL)) { 3624 rc = NXT_UNIT_ERROR; 3625 goto unlock; 3626 } 3627 3628 new_port->port = *port; 3629 3630 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port); 3631 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3632 goto unlock; 3633 } 3634 3635 nxt_queue_insert_tail(&process->ports, &new_port->link); 3636 3637 rc = NXT_UNIT_OK; 3638 3639 new_port->process = process; 3640 3641 unlock: 3642 3643 pthread_mutex_unlock(&lib->mutex); 3644 3645 if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) { 3646 nxt_unit_process_use(ctx, process, -1); 3647 } 3648 3649 return rc; 3650 } 3651 3652 3653 void 3654 nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 3655 { 3656 nxt_unit_find_remove_port(ctx, port_id, NULL); 3657 } 3658 3659 3660 void 3661 nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 3662 nxt_unit_port_t *r_port) 3663 { 3664 nxt_unit_impl_t *lib; 3665 nxt_unit_process_t *process; 3666 3667 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3668 3669 pthread_mutex_lock(&lib->mutex); 3670 3671 process = NULL; 3672 3673 nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process); 3674 3675 pthread_mutex_unlock(&lib->mutex); 3676 3677 if (nxt_slow_path(process != NULL)) { 3678 nxt_unit_process_use(ctx, process, -1); 3679 } 3680 } 3681 3682 3683 static void 3684 nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 3685 nxt_unit_port_t *r_port, nxt_unit_process_t **process) 3686 { 3687 nxt_unit_impl_t *lib; 3688 nxt_unit_port_impl_t *port; 3689 3690 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3691 3692 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); 3693 if (nxt_slow_path(port == NULL)) { 3694 nxt_unit_debug(ctx, "remove_port: port %d,%d not found", 3695 (int) port_id->pid, (int) port_id->id); 3696 3697 return; 3698 } 3699 3700 nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p", 3701 (int) port_id->pid, (int) port_id->id, 3702 port->port.in_fd, port->port.out_fd, port->port.data); 3703 3704 if (port->port.in_fd != -1) { 3705 close(port->port.in_fd); 3706 } 3707 3708 if (port->port.out_fd != -1) { 3709 close(port->port.out_fd); 3710 } 3711 3712 if (port->process != NULL) { 3713 nxt_queue_remove(&port->link); 3714 } 3715 3716 if (process != NULL) { 3717 *process = port->process; 3718 } 3719 3720 if (r_port != NULL) { 3721 *r_port = port->port; 3722 } 3723 3724 free(port); 3725 } 3726 3727 3728 void 3729 nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid) 3730 { 3731 nxt_unit_impl_t *lib; 3732 nxt_unit_process_t *process; 3733 3734 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3735 3736 pthread_mutex_lock(&lib->mutex); 3737 3738 process = nxt_unit_process_find(ctx, pid, 1); 3739 if (nxt_slow_path(process == NULL)) { 3740 nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid); 3741 3742 pthread_mutex_unlock(&lib->mutex); 3743 3744 return; 3745 } 3746 3747 nxt_unit_remove_process(ctx, process); 3748 } 3749 3750 3751 static void 3752 nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process) 3753 { 3754 nxt_queue_t ports; 3755 nxt_unit_impl_t *lib; 3756 nxt_unit_port_impl_t *port; 3757 3758 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3759 3760 nxt_queue_init(&ports); 3761 3762 nxt_queue_add(&ports, &process->ports); 3763 3764 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 3765 3766 nxt_unit_process_use(ctx, process, -1); 3767 port->process = NULL; 3768 3769 /* Shortcut for default callback. */ 3770 if (lib->callbacks.remove_port == nxt_unit_remove_port) { 3771 nxt_queue_remove(&port->link); 3772 3773 nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL); 3774 } 3775 3776 } nxt_queue_loop; 3777 3778 pthread_mutex_unlock(&lib->mutex); 3779 3780 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 3781 3782 nxt_queue_remove(&port->link); 3783 3784 lib->callbacks.remove_port(ctx, &port->port.id); 3785 3786 } nxt_queue_loop; 3787 3788 nxt_unit_process_use(ctx, process, -1); 3789 } 3790 3791 3792 void 3793 nxt_unit_quit(nxt_unit_ctx_t *ctx) 3794 { 3795 nxt_unit_impl_t *lib; 3796 3797 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3798 3799 lib->online = 0; 3800 } 3801 3802 3803 static ssize_t 3804 nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 3805 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 3806 { 3807 int fd; 3808 nxt_unit_impl_t *lib; 3809 nxt_unit_port_impl_t *port; 3810 3811 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3812 3813 pthread_mutex_lock(&lib->mutex); 3814 3815 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 3816 3817 if (nxt_fast_path(port != NULL)) { 3818 fd = port->port.out_fd; 3819 3820 } else { 3821 nxt_unit_warn(ctx, "port_send: port %d,%d not found", 3822 (int) port_id->pid, (int) port_id->id); 3823 fd = -1; 3824 } 3825 3826 pthread_mutex_unlock(&lib->mutex); 3827 3828 if (nxt_slow_path(fd == -1)) { 3829 if (port != NULL) { 3830 nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1", 3831 (int) port_id->pid, (int) port_id->id); 3832 } 3833 3834 return -1; 3835 } 3836 3837 nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d", 3838 (int) port_id->pid, (int) port_id->id, fd); 3839 3840 return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size); 3841 } 3842 3843 3844 ssize_t 3845 nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd, 3846 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 3847 { 3848 ssize_t res; 3849 struct iovec iov[1]; 3850 struct msghdr msg; 3851 3852 iov[0].iov_base = (void *) buf; 3853 iov[0].iov_len = buf_size; 3854 3855 msg.msg_name = NULL; 3856 msg.msg_namelen = 0; 3857 msg.msg_iov = iov; 3858 msg.msg_iovlen = 1; 3859 msg.msg_flags = 0; 3860 msg.msg_control = (void *) oob; 3861 msg.msg_controllen = oob_size; 3862 3863 res = sendmsg(fd, &msg, 0); 3864 3865 if (nxt_slow_path(res == -1)) { 3866 nxt_unit_warn(ctx, "port_send(%d, %d) failed: %s (%d)", 3867 fd, (int) buf_size, strerror(errno), errno); 3868 3869 } else { 3870 nxt_unit_debug(ctx, "port_send(%d, %d): %d", fd, (int) buf_size, 3871 (int) res); 3872 } 3873 3874 return res; 3875 } 3876 3877 3878 static ssize_t 3879 nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 3880 void *buf, size_t buf_size, void *oob, size_t oob_size) 3881 { 3882 int fd; 3883 nxt_unit_impl_t *lib; 3884 nxt_unit_ctx_impl_t *ctx_impl; 3885 nxt_unit_port_impl_t *port; 3886 3887 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3888 3889 pthread_mutex_lock(&lib->mutex); 3890 3891 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 3892 3893 if (nxt_fast_path(port != NULL)) { 3894 fd = port->port.in_fd; 3895 3896 } else { 3897 nxt_unit_debug(ctx, "port_recv: port %d,%d not found", 3898 (int) port_id->pid, (int) port_id->id); 3899 fd = -1; 3900 } 3901 3902 pthread_mutex_unlock(&lib->mutex); 3903 3904 if (nxt_slow_path(fd == -1)) { 3905 return -1; 3906 } 3907 3908 nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d", 3909 (int) port_id->pid, (int) port_id->id, fd); 3910 3911 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3912 3913 if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) { 3914 ctx_impl->read_port_fd = fd; 3915 } 3916 3917 return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size); 3918 } 3919 3920 3921 ssize_t 3922 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size, 3923 void *oob, size_t oob_size) 3924 { 3925 ssize_t res; 3926 struct iovec iov[1]; 3927 struct msghdr msg; 3928 3929 iov[0].iov_base = buf; 3930 iov[0].iov_len = buf_size; 3931 3932 msg.msg_name = NULL; 3933 msg.msg_namelen = 0; 3934 msg.msg_iov = iov; 3935 msg.msg_iovlen = 1; 3936 msg.msg_flags = 0; 3937 msg.msg_control = oob; 3938 msg.msg_controllen = oob_size; 3939 3940 res = recvmsg(fd, &msg, 0); 3941 3942 if (nxt_slow_path(res == -1)) { 3943 nxt_unit_warn(ctx, "port_recv(%d) failed: %s (%d)", 3944 fd, strerror(errno), errno); 3945 3946 } else { 3947 nxt_unit_debug(ctx, "port_recv(%d): %d", fd, (int) res); 3948 } 3949 3950 return res; 3951 } 3952 3953 3954 static nxt_int_t 3955 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 3956 { 3957 nxt_unit_port_t *port; 3958 nxt_unit_port_hash_id_t *port_id; 3959 3960 port = data; 3961 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start; 3962 3963 if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t) 3964 && port_id->pid == port->id.pid 3965 && port_id->id == port->id.id) 3966 { 3967 return NXT_OK; 3968 } 3969 3970 return NXT_DECLINED; 3971 } 3972 3973 3974 static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = { 3975 NXT_LVLHSH_DEFAULT, 3976 nxt_unit_port_hash_test, 3977 nxt_lvlhsh_alloc, 3978 nxt_lvlhsh_free, 3979 }; 3980 3981 3982 static inline void 3983 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq, 3984 nxt_unit_port_hash_id_t *port_hash_id, 3985 nxt_unit_port_id_t *port_id) 3986 { 3987 port_hash_id->pid = port_id->pid; 3988 port_hash_id->id = port_id->id; 3989 3990 if (nxt_fast_path(port_id->hash != 0)) { 3991 lhq->key_hash = port_id->hash; 3992 3993 } else { 3994 lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id)); 3995 3996 port_id->hash = lhq->key_hash; 3997 3998 nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X", 3999 (int) port_id->pid, (int) port_id->id, 4000 (int) port_id->hash); 4001 } 4002 4003 lhq->key.length = sizeof(nxt_unit_port_hash_id_t); 4004 lhq->key.start = (u_char *) port_hash_id; 4005 lhq->proto = &lvlhsh_ports_proto; 4006 lhq->pool = NULL; 4007 } 4008 4009 4010 static int 4011 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port) 4012 { 4013 nxt_int_t res; 4014 nxt_lvlhsh_query_t lhq; 4015 nxt_unit_port_hash_id_t port_hash_id; 4016 4017 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id); 4018 lhq.replace = 0; 4019 lhq.value = port; 4020 4021 res = nxt_lvlhsh_insert(port_hash, &lhq); 4022 4023 switch (res) { 4024 4025 case NXT_OK: 4026 return NXT_UNIT_OK; 4027 4028 default: 4029 return NXT_UNIT_ERROR; 4030 } 4031 } 4032 4033 4034 static nxt_unit_port_impl_t * 4035 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, 4036 int remove) 4037 { 4038 nxt_int_t res; 4039 nxt_lvlhsh_query_t lhq; 4040 nxt_unit_port_hash_id_t port_hash_id; 4041 4042 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id); 4043 4044 if (remove) { 4045 res = nxt_lvlhsh_delete(port_hash, &lhq); 4046 4047 } else { 4048 res = nxt_lvlhsh_find(port_hash, &lhq); 4049 } 4050 4051 switch (res) { 4052 4053 case NXT_OK: 4054 return lhq.value; 4055 4056 default: 4057 return NULL; 4058 } 4059 } 4060 4061 4062 static nxt_int_t 4063 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 4064 { 4065 return NXT_OK; 4066 } 4067 4068 4069 static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { 4070 NXT_LVLHSH_DEFAULT, 4071 nxt_unit_request_hash_test, 4072 nxt_lvlhsh_alloc, 4073 nxt_lvlhsh_free, 4074 }; 4075 4076 4077 static int 4078 nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, 4079 nxt_unit_request_info_impl_t *req_impl) 4080 { 4081 uint32_t *stream; 4082 nxt_int_t res; 4083 nxt_lvlhsh_query_t lhq; 4084 4085 stream = &req_impl->stream; 4086 4087 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); 4088 lhq.key.length = sizeof(*stream); 4089 lhq.key.start = (u_char *) stream; 4090 lhq.proto = &lvlhsh_requests_proto; 4091 lhq.pool = NULL; 4092 lhq.replace = 0; 4093 lhq.value = req_impl; 4094 4095 res = nxt_lvlhsh_insert(request_hash, &lhq); 4096 4097 switch (res) { 4098 4099 case NXT_OK: 4100 return NXT_UNIT_OK; 4101 4102 default: 4103 return NXT_UNIT_ERROR; 4104 } 4105 } 4106 4107 4108 static nxt_unit_request_info_impl_t * 4109 nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, 4110 int remove) 4111 { 4112 nxt_int_t res; 4113 nxt_lvlhsh_query_t lhq; 4114 4115 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); 4116 lhq.key.length = sizeof(stream); 4117 lhq.key.start = (u_char *) &stream; 4118 lhq.proto = &lvlhsh_requests_proto; 4119 lhq.pool = NULL; 4120 4121 if (remove) { 4122 res = nxt_lvlhsh_delete(request_hash, &lhq); 4123 4124 } else { 4125 res = nxt_lvlhsh_find(request_hash, &lhq); 4126 } 4127 4128 switch (res) { 4129 4130 case NXT_OK: 4131 return lhq.value; 4132 4133 default: 4134 return NULL; 4135 } 4136 } 4137 4138 4139 void 4140 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) 4141 { 4142 int log_fd, n; 4143 char msg[NXT_MAX_ERROR_STR], *p, *end; 4144 pid_t pid; 4145 va_list ap; 4146 nxt_unit_impl_t *lib; 4147 4148 if (nxt_fast_path(ctx != NULL)) { 4149 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4150 4151 pid = lib->pid; 4152 log_fd = lib->log_fd; 4153 4154 } else { 4155 pid = getpid(); 4156 log_fd = STDERR_FILENO; 4157 } 4158 4159 p = msg; 4160 end = p + sizeof(msg) - 1; 4161 4162 p = nxt_unit_snprint_prefix(p, end, pid, level); 4163 4164 va_start(ap, fmt); 4165 p += vsnprintf(p, end - p, fmt, ap); 4166 va_end(ap); 4167 4168 if (nxt_slow_path(p > end)) { 4169 memcpy(end - 5, "[...]", 5); 4170 p = end; 4171 } 4172 4173 *p++ = '\n'; 4174 4175 n = write(log_fd, msg, p - msg); 4176 if (nxt_slow_path(n < 0)) { 4177 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 4178 } 4179 } 4180 4181 4182 void 4183 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) 4184 { 4185 int log_fd, n; 4186 char msg[NXT_MAX_ERROR_STR], *p, *end; 4187 pid_t pid; 4188 va_list ap; 4189 nxt_unit_impl_t *lib; 4190 nxt_unit_request_info_impl_t *req_impl; 4191 4192 if (nxt_fast_path(req != NULL)) { 4193 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 4194 4195 pid = lib->pid; 4196 log_fd = lib->log_fd; 4197 4198 } else { 4199 pid = getpid(); 4200 log_fd = STDERR_FILENO; 4201 } 4202 4203 p = msg; 4204 end = p + sizeof(msg) - 1; 4205 4206 p = nxt_unit_snprint_prefix(p, end, pid, level); 4207 4208 if (nxt_fast_path(req != NULL)) { 4209 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 4210 4211 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream); 4212 } 4213 4214 va_start(ap, fmt); 4215 p += vsnprintf(p, end - p, fmt, ap); 4216 va_end(ap); 4217 4218 if (nxt_slow_path(p > end)) { 4219 memcpy(end - 5, "[...]", 5); 4220 p = end; 4221 } 4222 4223 *p++ = '\n'; 4224 4225 n = write(log_fd, msg, p - msg); 4226 if (nxt_slow_path(n < 0)) { 4227 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 4228 } 4229 } 4230 4231 4232 static const char * nxt_unit_log_levels[] = { 4233 "alert", 4234 "error", 4235 "warn", 4236 "notice", 4237 "info", 4238 "debug", 4239 }; 4240 4241 4242 static char * 4243 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level) 4244 { 4245 struct tm tm; 4246 struct timespec ts; 4247 4248 (void) clock_gettime(CLOCK_REALTIME, &ts); 4249 4250 #if (NXT_HAVE_LOCALTIME_R) 4251 (void) localtime_r(&ts.tv_sec, &tm); 4252 #else 4253 tm = *localtime(&ts.tv_sec); 4254 #endif 4255 4256 p += snprintf(p, end - p, 4257 "%4d/%02d/%02d %02d:%02d:%02d.%03d ", 4258 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 4259 tm.tm_hour, tm.tm_min, tm.tm_sec, 4260 (int) ts.tv_nsec / 1000000); 4261 4262 p += snprintf(p, end - p, 4263 "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level], 4264 (int) pid, 4265 (uint64_t) (uintptr_t) nxt_thread_get_tid()); 4266 4267 return p; 4268 } 4269 4270 4271 /* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */ 4272 4273 void * 4274 nxt_memalign(size_t alignment, size_t size) 4275 { 4276 void *p; 4277 nxt_err_t err; 4278 4279 err = posix_memalign(&p, alignment, size); 4280 4281 if (nxt_fast_path(err == 0)) { 4282 return p; 4283 } 4284 4285 return NULL; 4286 } 4287 4288 #if (NXT_DEBUG) 4289 4290 void 4291 nxt_free(void *p) 4292 { 4293 free(p); 4294 } 4295 4296 #endif 4297