1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include <stdlib.h> 7 8 #include "nxt_main.h" 9 #include "nxt_port_memory_int.h" 10 11 #include "nxt_unit.h" 12 #include "nxt_unit_request.h" 13 #include "nxt_unit_response.h" 14 #include "nxt_unit_websocket.h" 15 16 #include "nxt_websocket.h" 17 18 #if (NXT_HAVE_MEMFD_CREATE) 19 #include <linux/memfd.h> 20 #endif 21 22 typedef struct nxt_unit_impl_s nxt_unit_impl_t; 23 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 24 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 25 typedef struct nxt_unit_process_s nxt_unit_process_t; 26 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 27 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 28 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 29 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 30 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; 31 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; 32 33 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 34 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, 35 nxt_unit_ctx_impl_t *ctx_impl, void *data); 36 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 37 nxt_unit_mmap_buf_t *mmap_buf); 38 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 39 nxt_unit_mmap_buf_t *mmap_buf); 40 nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf); 41 static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 42 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream); 43 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 44 uint32_t stream); 45 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 46 nxt_unit_recv_msg_t *recv_msg); 47 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 48 nxt_unit_recv_msg_t *recv_msg); 49 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 50 nxt_unit_recv_msg_t *recv_msg); 51 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 52 nxt_unit_ctx_t *ctx); 53 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 54 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); 55 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( 56 nxt_unit_ctx_t *ctx); 57 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); 58 static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws); 59 static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, 60 nxt_unit_recv_msg_t *recv_msg); 61 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 62 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 63 static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, 64 nxt_unit_mmap_buf_t *mmap_buf, int last); 65 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); 66 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, 67 size_t size); 68 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 69 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, 70 nxt_chunk_id_t *c, int n); 71 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 72 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 73 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n); 74 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 75 int fd); 76 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, 77 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size, 78 nxt_unit_mmap_buf_t *mmap_buf); 79 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 80 81 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 82 static void nxt_unit_process_use(nxt_unit_ctx_t *ctx, 83 nxt_unit_process_t *process, int i); 84 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 85 static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, 86 nxt_unit_process_t *process, uint32_t id); 87 static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, 88 nxt_unit_recv_msg_t *recv_msg); 89 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, 90 nxt_unit_recv_msg_t *recv_msg); 91 static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, 92 uint32_t size); 93 94 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, 95 pid_t pid); 96 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, 97 pid_t pid, int remove); 98 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); 99 static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, 100 nxt_unit_port_id_t *port_id, int *fd); 101 102 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, 103 nxt_unit_port_id_t *new_port, int fd); 104 105 static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, 106 nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port, 107 nxt_unit_process_t **process); 108 static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx, 109 nxt_unit_process_t *process); 110 111 static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, 112 nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, 113 const void *oob, size_t oob_size); 114 static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, 115 nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, 116 void *oob, size_t oob_size); 117 118 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 119 nxt_unit_port_t *port); 120 static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 121 nxt_unit_port_id_t *port_id, int remove); 122 123 static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, 124 nxt_unit_request_info_impl_t *req_impl); 125 static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find( 126 nxt_lvlhsh_t *request_hash, uint32_t stream, int remove); 127 128 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); 129 130 131 struct nxt_unit_mmap_buf_s { 132 nxt_unit_buf_t buf; 133 134 nxt_unit_mmap_buf_t *next; 135 nxt_unit_mmap_buf_t **prev; 136 137 nxt_port_mmap_header_t *hdr; 138 // nxt_queue_link_t link; 139 nxt_unit_port_id_t port_id; 140 nxt_unit_request_info_t *req; 141 nxt_unit_ctx_impl_t *ctx_impl; 142 }; 143 144 145 struct nxt_unit_recv_msg_s { 146 uint32_t stream; 147 nxt_pid_t pid; 148 nxt_port_id_t reply_port; 149 150 uint8_t last; /* 1 bit */ 151 uint8_t mmap; /* 1 bit */ 152 153 void *start; 154 uint32_t size; 155 156 int fd; 157 nxt_unit_process_t *process; 158 159 nxt_unit_mmap_buf_t *incoming_buf; 160 }; 161 162 163 typedef enum { 164 NXT_UNIT_RS_START = 0, 165 NXT_UNIT_RS_RESPONSE_INIT, 166 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 167 NXT_UNIT_RS_RESPONSE_SENT, 168 NXT_UNIT_RS_RELEASED, 169 } nxt_unit_req_state_t; 170 171 172 struct nxt_unit_request_info_impl_s { 173 nxt_unit_request_info_t req; 174 175 uint32_t stream; 176 177 nxt_unit_process_t *process; 178 179 nxt_unit_mmap_buf_t *outgoing_buf; 180 nxt_unit_mmap_buf_t *incoming_buf; 181 182 nxt_unit_req_state_t state; 183 uint8_t websocket; 184 185 nxt_queue_link_t link; 186 187 char extra_data[]; 188 }; 189 190 191 struct nxt_unit_websocket_frame_impl_s { 192 nxt_unit_websocket_frame_t ws; 193 194 nxt_unit_mmap_buf_t *buf; 195 196 nxt_queue_link_t link; 197 198 nxt_unit_ctx_impl_t *ctx_impl; 199 200 void *retain_buf; 201 }; 202 203 204 struct nxt_unit_ctx_impl_s { 205 nxt_unit_ctx_t ctx; 206 207 pthread_mutex_t mutex; 208 209 nxt_unit_port_id_t read_port_id; 210 int read_port_fd; 211 212 nxt_queue_link_t link; 213 214 nxt_unit_mmap_buf_t *free_buf; 215 216 /* of nxt_unit_request_info_impl_t */ 217 nxt_queue_t free_req; 218 219 /* of nxt_unit_websocket_frame_impl_t */ 220 nxt_queue_t free_ws; 221 222 /* of nxt_unit_request_info_impl_t */ 223 nxt_queue_t active_req; 224 225 /* of nxt_unit_request_info_impl_t */ 226 nxt_lvlhsh_t requests; 227 228 nxt_unit_mmap_buf_t ctx_buf[2]; 229 230 nxt_unit_request_info_impl_t req; 231 }; 232 233 234 struct nxt_unit_impl_s { 235 nxt_unit_t unit; 236 nxt_unit_callbacks_t callbacks; 237 238 uint32_t request_data_size; 239 240 pthread_mutex_t mutex; 241 242 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ 243 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ 244 245 nxt_unit_port_id_t ready_port_id; 246 247 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ 248 249 pid_t pid; 250 int log_fd; 251 int online; 252 253 nxt_unit_ctx_impl_t main_ctx; 254 }; 255 256 257 struct nxt_unit_port_impl_s { 258 nxt_unit_port_t port; 259 260 nxt_queue_link_t link; 261 nxt_unit_process_t *process; 262 }; 263 264 265 struct nxt_unit_mmap_s { 266 nxt_port_mmap_header_t *hdr; 267 }; 268 269 270 struct nxt_unit_mmaps_s { 271 pthread_mutex_t mutex; 272 uint32_t size; 273 uint32_t cap; 274 nxt_unit_mmap_t *elts; 275 }; 276 277 278 struct nxt_unit_process_s { 279 pid_t pid; 280 281 nxt_queue_t ports; 282 283 nxt_unit_mmaps_t incoming; 284 nxt_unit_mmaps_t outgoing; 285 286 nxt_unit_impl_t *lib; 287 288 nxt_atomic_t use_count; 289 290 uint32_t next_port_id; 291 }; 292 293 294 /* Explicitly using 32 bit types to avoid possible alignment. */ 295 typedef struct { 296 int32_t pid; 297 uint32_t id; 298 } nxt_unit_port_hash_id_t; 299 300 301 nxt_unit_ctx_t * 302 nxt_unit_init(nxt_unit_init_t *init) 303 { 304 int rc; 305 uint32_t ready_stream; 306 nxt_unit_ctx_t *ctx; 307 nxt_unit_impl_t *lib; 308 nxt_unit_port_t ready_port, read_port; 309 310 lib = nxt_unit_create(init); 311 if (nxt_slow_path(lib == NULL)) { 312 return NULL; 313 } 314 315 if (init->ready_port.id.pid != 0 316 && init->ready_stream != 0 317 && init->read_port.id.pid != 0) 318 { 319 ready_port = init->ready_port; 320 ready_stream = init->ready_stream; 321 read_port = init->read_port; 322 lib->log_fd = init->log_fd; 323 324 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, 325 ready_port.id.id); 326 nxt_unit_port_id_init(&read_port.id, read_port.id.pid, 327 read_port.id.id); 328 } else { 329 rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd, 330 &ready_stream); 331 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 332 goto fail; 333 } 334 } 335 336 lib->pid = read_port.id.pid; 337 ctx = &lib->main_ctx.ctx; 338 339 rc = lib->callbacks.add_port(ctx, &ready_port); 340 if (rc != NXT_UNIT_OK) { 341 nxt_unit_alert(NULL, "failed to add ready_port"); 342 343 goto fail; 344 } 345 346 rc = lib->callbacks.add_port(ctx, &read_port); 347 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 348 nxt_unit_alert(NULL, "failed to add read_port"); 349 350 goto fail; 351 } 352 353 lib->main_ctx.read_port_id = read_port.id; 354 lib->ready_port_id = ready_port.id; 355 356 rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream); 357 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 358 nxt_unit_alert(NULL, "failed to send READY message"); 359 360 goto fail; 361 } 362 363 return ctx; 364 365 fail: 366 367 free(lib); 368 369 return NULL; 370 } 371 372 373 static nxt_unit_impl_t * 374 nxt_unit_create(nxt_unit_init_t *init) 375 { 376 int rc; 377 nxt_unit_impl_t *lib; 378 nxt_unit_callbacks_t *cb; 379 380 lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size); 381 if (nxt_slow_path(lib == NULL)) { 382 nxt_unit_alert(NULL, "failed to allocate unit struct"); 383 384 return NULL; 385 } 386 387 rc = pthread_mutex_init(&lib->mutex, NULL); 388 if (nxt_slow_path(rc != 0)) { 389 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 390 391 goto fail; 392 } 393 394 lib->unit.data = init->data; 395 lib->callbacks = init->callbacks; 396 397 lib->request_data_size = init->request_data_size; 398 399 lib->processes.slot = NULL; 400 lib->ports.slot = NULL; 401 402 lib->log_fd = STDERR_FILENO; 403 lib->online = 1; 404 405 nxt_queue_init(&lib->contexts); 406 407 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); 408 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 409 goto fail; 410 } 411 412 cb = &lib->callbacks; 413 414 if (cb->request_handler == NULL) { 415 nxt_unit_alert(NULL, "request_handler is NULL"); 416 417 goto fail; 418 } 419 420 if (cb->add_port == NULL) { 421 cb->add_port = nxt_unit_add_port; 422 } 423 424 if (cb->remove_port == NULL) { 425 cb->remove_port = nxt_unit_remove_port; 426 } 427 428 if (cb->remove_pid == NULL) { 429 cb->remove_pid = nxt_unit_remove_pid; 430 } 431 432 if (cb->quit == NULL) { 433 cb->quit = nxt_unit_quit; 434 } 435 436 if (cb->port_send == NULL) { 437 cb->port_send = nxt_unit_port_send_default; 438 } 439 440 if (cb->port_recv == NULL) { 441 cb->port_recv = nxt_unit_port_recv_default; 442 } 443 444 return lib; 445 446 fail: 447 448 free(lib); 449 450 return NULL; 451 } 452 453 454 static int 455 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 456 void *data) 457 { 458 int rc; 459 460 ctx_impl->ctx.data = data; 461 ctx_impl->ctx.unit = &lib->unit; 462 463 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 464 465 rc = pthread_mutex_init(&ctx_impl->mutex, NULL); 466 if (nxt_slow_path(rc != 0)) { 467 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 468 469 return NXT_UNIT_ERROR; 470 } 471 472 nxt_queue_init(&ctx_impl->free_req); 473 nxt_queue_init(&ctx_impl->free_ws); 474 nxt_queue_init(&ctx_impl->active_req); 475 476 ctx_impl->free_buf = NULL; 477 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 478 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 479 480 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 481 482 ctx_impl->req.req.ctx = &ctx_impl->ctx; 483 ctx_impl->req.req.unit = &lib->unit; 484 485 ctx_impl->read_port_fd = -1; 486 ctx_impl->requests.slot = 0; 487 488 return NXT_UNIT_OK; 489 } 490 491 492 nxt_inline void 493 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 494 nxt_unit_mmap_buf_t *mmap_buf) 495 { 496 mmap_buf->next = *head; 497 498 if (mmap_buf->next != NULL) { 499 mmap_buf->next->prev = &mmap_buf->next; 500 } 501 502 *head = mmap_buf; 503 mmap_buf->prev = head; 504 } 505 506 507 nxt_inline void 508 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 509 nxt_unit_mmap_buf_t *mmap_buf) 510 { 511 while (*prev != NULL) { 512 prev = &(*prev)->next; 513 } 514 515 nxt_unit_mmap_buf_insert(prev, mmap_buf); 516 } 517 518 519 nxt_inline void 520 nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf) 521 { 522 nxt_unit_mmap_buf_t **prev; 523 524 prev = mmap_buf->prev; 525 526 if (mmap_buf->next != NULL) { 527 mmap_buf->next->prev = prev; 528 } 529 530 if (prev != NULL) { 531 *prev = mmap_buf->next; 532 } 533 } 534 535 536 static int 537 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, 538 int *log_fd, uint32_t *stream) 539 { 540 int rc; 541 int ready_fd, read_fd; 542 char *unit_init, *version_end; 543 long version_length; 544 int64_t ready_pid, read_pid; 545 uint32_t ready_stream, ready_id, read_id; 546 547 unit_init = getenv(NXT_UNIT_INIT_ENV); 548 if (nxt_slow_path(unit_init == NULL)) { 549 nxt_unit_alert(NULL, "%s is not in the current environment", 550 NXT_UNIT_INIT_ENV); 551 552 return NXT_UNIT_ERROR; 553 } 554 555 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); 556 557 version_length = nxt_length(NXT_VERSION); 558 559 version_end = strchr(unit_init, ';'); 560 if (version_end == NULL 561 || version_end - unit_init != version_length 562 || memcmp(unit_init, NXT_VERSION, version_length) != 0) 563 { 564 nxt_unit_alert(NULL, "version check error"); 565 566 return NXT_UNIT_ERROR; 567 } 568 569 rc = sscanf(version_end + 1, 570 "%"PRIu32";" 571 "%"PRId64",%"PRIu32",%d;" 572 "%"PRId64",%"PRIu32",%d;" 573 "%d", 574 &ready_stream, 575 &ready_pid, &ready_id, &ready_fd, 576 &read_pid, &read_id, &read_fd, 577 log_fd); 578 579 if (nxt_slow_path(rc != 8)) { 580 nxt_unit_alert(NULL, "failed to scan variables"); 581 582 return NXT_UNIT_ERROR; 583 } 584 585 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); 586 587 ready_port->in_fd = -1; 588 ready_port->out_fd = ready_fd; 589 ready_port->data = NULL; 590 591 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); 592 593 read_port->in_fd = read_fd; 594 read_port->out_fd = -1; 595 read_port->data = NULL; 596 597 *stream = ready_stream; 598 599 return NXT_UNIT_OK; 600 } 601 602 603 static int 604 nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 605 uint32_t stream) 606 { 607 ssize_t res; 608 nxt_port_msg_t msg; 609 nxt_unit_impl_t *lib; 610 611 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 612 613 msg.stream = stream; 614 msg.pid = lib->pid; 615 msg.reply_port = 0; 616 msg.type = _NXT_PORT_MSG_PROCESS_READY; 617 msg.last = 1; 618 msg.mmap = 0; 619 msg.nf = 0; 620 msg.mf = 0; 621 msg.tracking = 0; 622 623 res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); 624 if (res != sizeof(msg)) { 625 return NXT_UNIT_ERROR; 626 } 627 628 return NXT_UNIT_OK; 629 } 630 631 632 int 633 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 634 void *buf, size_t buf_size, void *oob, size_t oob_size) 635 { 636 int rc; 637 pid_t pid; 638 struct cmsghdr *cm; 639 nxt_port_msg_t *port_msg; 640 nxt_unit_impl_t *lib; 641 nxt_unit_recv_msg_t recv_msg; 642 nxt_unit_callbacks_t *cb; 643 644 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 645 646 rc = NXT_UNIT_ERROR; 647 recv_msg.fd = -1; 648 recv_msg.process = NULL; 649 port_msg = buf; 650 cm = oob; 651 652 if (oob_size >= CMSG_SPACE(sizeof(int)) 653 && cm->cmsg_len == CMSG_LEN(sizeof(int)) 654 && cm->cmsg_level == SOL_SOCKET 655 && cm->cmsg_type == SCM_RIGHTS) 656 { 657 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); 658 } 659 660 recv_msg.incoming_buf = NULL; 661 662 if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { 663 nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); 664 goto fail; 665 } 666 667 recv_msg.stream = port_msg->stream; 668 recv_msg.pid = port_msg->pid; 669 recv_msg.reply_port = port_msg->reply_port; 670 recv_msg.last = port_msg->last; 671 recv_msg.mmap = port_msg->mmap; 672 673 recv_msg.start = port_msg + 1; 674 recv_msg.size = buf_size - sizeof(nxt_port_msg_t); 675 676 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 677 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", 678 port_msg->stream, (int) port_msg->type); 679 goto fail; 680 } 681 682 if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) { 683 rc = NXT_UNIT_OK; 684 685 goto fail; 686 } 687 688 /* Fragmentation is unsupported. */ 689 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 690 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", 691 port_msg->stream, (int) port_msg->type); 692 goto fail; 693 } 694 695 if (port_msg->mmap) { 696 if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) { 697 goto fail; 698 } 699 } 700 701 cb = &lib->callbacks; 702 703 switch (port_msg->type) { 704 705 case _NXT_PORT_MSG_QUIT: 706 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); 707 708 cb->quit(ctx); 709 rc = NXT_UNIT_OK; 710 break; 711 712 case _NXT_PORT_MSG_NEW_PORT: 713 rc = nxt_unit_process_new_port(ctx, &recv_msg); 714 break; 715 716 case _NXT_PORT_MSG_CHANGE_FILE: 717 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 718 port_msg->stream, recv_msg.fd); 719 break; 720 721 case _NXT_PORT_MSG_MMAP: 722 if (nxt_slow_path(recv_msg.fd < 0)) { 723 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", 724 port_msg->stream, recv_msg.fd); 725 726 goto fail; 727 } 728 729 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd); 730 break; 731 732 case _NXT_PORT_MSG_REQ_HEADERS: 733 rc = nxt_unit_process_req_headers(ctx, &recv_msg); 734 break; 735 736 case _NXT_PORT_MSG_WEBSOCKET: 737 rc = nxt_unit_process_websocket(ctx, &recv_msg); 738 break; 739 740 case _NXT_PORT_MSG_REMOVE_PID: 741 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 742 nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size " 743 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 744 (int) sizeof(pid)); 745 746 goto fail; 747 } 748 749 memcpy(&pid, recv_msg.start, sizeof(pid)); 750 751 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 752 port_msg->stream, (int) pid); 753 754 cb->remove_pid(ctx, pid); 755 756 rc = NXT_UNIT_OK; 757 break; 758 759 default: 760 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", 761 port_msg->stream, (int) port_msg->type); 762 763 goto fail; 764 } 765 766 fail: 767 768 if (recv_msg.fd != -1) { 769 close(recv_msg.fd); 770 } 771 772 while (recv_msg.incoming_buf != NULL) { 773 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 774 } 775 776 if (recv_msg.process != NULL) { 777 nxt_unit_process_use(ctx, recv_msg.process, -1); 778 } 779 780 return rc; 781 } 782 783 784 static int 785 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 786 { 787 int nb; 788 nxt_unit_impl_t *lib; 789 nxt_unit_port_t new_port; 790 nxt_port_msg_new_port_t *new_port_msg; 791 792 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 793 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 794 "invalid message size (%d)", 795 recv_msg->stream, (int) recv_msg->size); 796 797 return NXT_UNIT_ERROR; 798 } 799 800 if (nxt_slow_path(recv_msg->fd < 0)) { 801 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", 802 recv_msg->stream, recv_msg->fd); 803 804 return NXT_UNIT_ERROR; 805 } 806 807 new_port_msg = recv_msg->start; 808 809 nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", 810 recv_msg->stream, (int) new_port_msg->pid, 811 (int) new_port_msg->id, recv_msg->fd); 812 813 nb = 0; 814 815 if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { 816 nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " 817 "failed: %s (%d)", recv_msg->fd, strerror(errno), errno); 818 819 return NXT_UNIT_ERROR; 820 } 821 822 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 823 new_port_msg->id); 824 825 new_port.in_fd = -1; 826 new_port.out_fd = recv_msg->fd; 827 new_port.data = NULL; 828 829 recv_msg->fd = -1; 830 831 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 832 833 return lib->callbacks.add_port(ctx, &new_port); 834 } 835 836 837 static int 838 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 839 { 840 nxt_unit_impl_t *lib; 841 nxt_unit_request_t *r; 842 nxt_unit_mmap_buf_t *b; 843 nxt_unit_request_info_t *req; 844 nxt_unit_request_info_impl_t *req_impl; 845 846 if (nxt_slow_path(recv_msg->mmap == 0)) { 847 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 848 recv_msg->stream); 849 850 return NXT_UNIT_ERROR; 851 } 852 853 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { 854 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 855 "%d expected", recv_msg->stream, (int) recv_msg->size, 856 (int) sizeof(nxt_unit_request_t)); 857 858 return NXT_UNIT_ERROR; 859 } 860 861 req_impl = nxt_unit_request_info_get(ctx); 862 if (nxt_slow_path(req_impl == NULL)) { 863 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 864 recv_msg->stream); 865 866 return NXT_UNIT_ERROR; 867 } 868 869 req = &req_impl->req; 870 871 nxt_unit_port_id_init(&req->response_port, recv_msg->pid, 872 recv_msg->reply_port); 873 874 req->request = recv_msg->start; 875 876 b = recv_msg->incoming_buf; 877 878 req->request_buf = &b->buf; 879 req->response = NULL; 880 req->response_buf = NULL; 881 882 r = req->request; 883 884 req->content_length = r->content_length; 885 886 req->content_buf = req->request_buf; 887 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); 888 889 /* "Move" process reference to req_impl. */ 890 req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg); 891 if (nxt_slow_path(req_impl->process == NULL)) { 892 return NXT_UNIT_ERROR; 893 } 894 895 recv_msg->process = NULL; 896 897 req_impl->stream = recv_msg->stream; 898 899 req_impl->outgoing_buf = NULL; 900 901 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 902 b->req = req; 903 } 904 905 /* "Move" incoming buffer list to req_impl. */ 906 req_impl->incoming_buf = recv_msg->incoming_buf; 907 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 908 recv_msg->incoming_buf = NULL; 909 910 req->response_max_fields = 0; 911 req_impl->state = NXT_UNIT_RS_START; 912 req_impl->websocket = 0; 913 914 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 915 (int) r->method_length, nxt_unit_sptr_get(&r->method), 916 (int) r->target_length, nxt_unit_sptr_get(&r->target), 917 (int) r->content_length); 918 919 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 920 921 lib->callbacks.request_handler(req); 922 923 return NXT_UNIT_OK; 924 } 925 926 927 static int 928 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 929 { 930 size_t hsize; 931 nxt_unit_impl_t *lib; 932 nxt_unit_mmap_buf_t *b; 933 nxt_unit_ctx_impl_t *ctx_impl; 934 nxt_unit_callbacks_t *cb; 935 nxt_unit_request_info_t *req; 936 nxt_unit_request_info_impl_t *req_impl; 937 nxt_unit_websocket_frame_impl_t *ws_impl; 938 939 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 940 941 req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream, 942 recv_msg->last); 943 if (req_impl == NULL) { 944 return NXT_UNIT_OK; 945 } 946 947 req = &req_impl->req; 948 949 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 950 cb = &lib->callbacks; 951 952 if (cb->websocket_handler && recv_msg->size >= 2) { 953 ws_impl = nxt_unit_websocket_frame_get(ctx); 954 if (nxt_slow_path(ws_impl == NULL)) { 955 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", 956 req_impl->stream); 957 958 return NXT_UNIT_ERROR; 959 } 960 961 ws_impl->ws.req = req; 962 963 ws_impl->buf = NULL; 964 ws_impl->retain_buf = NULL; 965 966 if (recv_msg->mmap) { 967 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 968 b->req = req; 969 } 970 971 /* "Move" incoming buffer list to ws_impl. */ 972 ws_impl->buf = recv_msg->incoming_buf; 973 ws_impl->buf->prev = &ws_impl->buf; 974 recv_msg->incoming_buf = NULL; 975 976 b = ws_impl->buf; 977 978 } else { 979 b = nxt_unit_mmap_buf_get(ctx); 980 if (nxt_slow_path(b == NULL)) { 981 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", 982 req_impl->stream); 983 984 nxt_unit_websocket_frame_release(&ws_impl->ws); 985 986 return NXT_UNIT_ERROR; 987 } 988 989 b->hdr = NULL; 990 b->req = req; 991 b->buf.start = recv_msg->start; 992 b->buf.free = b->buf.start; 993 b->buf.end = b->buf.start + recv_msg->size; 994 995 nxt_unit_mmap_buf_insert(&ws_impl->buf, b); 996 } 997 998 ws_impl->ws.header = (void *) b->buf.start; 999 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( 1000 ws_impl->ws.header); 1001 1002 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); 1003 1004 if (ws_impl->ws.header->mask) { 1005 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; 1006 1007 } else { 1008 ws_impl->ws.mask = NULL; 1009 } 1010 1011 b->buf.free += hsize; 1012 1013 ws_impl->ws.content_buf = &b->buf; 1014 ws_impl->ws.content_length = ws_impl->ws.payload_len; 1015 1016 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " 1017 "payload_len=%"PRIu64, 1018 ws_impl->ws.header->opcode, 1019 ws_impl->ws.payload_len); 1020 1021 cb->websocket_handler(&ws_impl->ws); 1022 } 1023 1024 if (recv_msg->last) { 1025 req_impl->websocket = 0; 1026 1027 if (cb->close_handler) { 1028 nxt_unit_req_debug(req, "close_handler"); 1029 1030 cb->close_handler(req); 1031 1032 } else { 1033 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1034 } 1035 } 1036 1037 return NXT_UNIT_OK; 1038 } 1039 1040 1041 static nxt_unit_request_info_impl_t * 1042 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 1043 { 1044 nxt_unit_impl_t *lib; 1045 nxt_queue_link_t *lnk; 1046 nxt_unit_ctx_impl_t *ctx_impl; 1047 nxt_unit_request_info_impl_t *req_impl; 1048 1049 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1050 1051 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1052 1053 pthread_mutex_lock(&ctx_impl->mutex); 1054 1055 if (nxt_queue_is_empty(&ctx_impl->free_req)) { 1056 pthread_mutex_unlock(&ctx_impl->mutex); 1057 1058 req_impl = malloc(sizeof(nxt_unit_request_info_impl_t) 1059 + lib->request_data_size); 1060 if (nxt_slow_path(req_impl == NULL)) { 1061 return NULL; 1062 } 1063 1064 req_impl->req.unit = ctx->unit; 1065 req_impl->req.ctx = ctx; 1066 1067 pthread_mutex_lock(&ctx_impl->mutex); 1068 1069 } else { 1070 lnk = nxt_queue_first(&ctx_impl->free_req); 1071 nxt_queue_remove(lnk); 1072 1073 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link); 1074 } 1075 1076 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); 1077 1078 pthread_mutex_unlock(&ctx_impl->mutex); 1079 1080 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; 1081 1082 return req_impl; 1083 } 1084 1085 1086 static void 1087 nxt_unit_request_info_release(nxt_unit_request_info_t *req) 1088 { 1089 nxt_unit_ctx_impl_t *ctx_impl; 1090 nxt_unit_request_info_impl_t *req_impl; 1091 1092 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1093 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1094 1095 req->response = NULL; 1096 req->response_buf = NULL; 1097 1098 if (req_impl->websocket) { 1099 nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); 1100 1101 req_impl->websocket = 0; 1102 } 1103 1104 while (req_impl->outgoing_buf != NULL) { 1105 nxt_unit_mmap_buf_free(req_impl->outgoing_buf); 1106 } 1107 1108 while (req_impl->incoming_buf != NULL) { 1109 nxt_unit_mmap_buf_free(req_impl->incoming_buf); 1110 } 1111 1112 /* 1113 * Process release should go after buffers release to guarantee mmap 1114 * existence. 1115 */ 1116 if (req_impl->process != NULL) { 1117 nxt_unit_process_use(req->ctx, req_impl->process, -1); 1118 1119 req_impl->process = NULL; 1120 } 1121 1122 pthread_mutex_lock(&ctx_impl->mutex); 1123 1124 nxt_queue_remove(&req_impl->link); 1125 1126 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); 1127 1128 pthread_mutex_unlock(&ctx_impl->mutex); 1129 1130 req_impl->state = NXT_UNIT_RS_RELEASED; 1131 } 1132 1133 1134 static void 1135 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 1136 { 1137 nxt_unit_ctx_impl_t *ctx_impl; 1138 1139 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 1140 1141 nxt_queue_remove(&req_impl->link); 1142 1143 if (req_impl != &ctx_impl->req) { 1144 free(req_impl); 1145 } 1146 } 1147 1148 1149 static nxt_unit_websocket_frame_impl_t * 1150 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) 1151 { 1152 nxt_queue_link_t *lnk; 1153 nxt_unit_ctx_impl_t *ctx_impl; 1154 nxt_unit_websocket_frame_impl_t *ws_impl; 1155 1156 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1157 1158 pthread_mutex_lock(&ctx_impl->mutex); 1159 1160 if (nxt_queue_is_empty(&ctx_impl->free_ws)) { 1161 pthread_mutex_unlock(&ctx_impl->mutex); 1162 1163 ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t)); 1164 if (nxt_slow_path(ws_impl == NULL)) { 1165 return NULL; 1166 } 1167 1168 } else { 1169 lnk = nxt_queue_first(&ctx_impl->free_ws); 1170 nxt_queue_remove(lnk); 1171 1172 pthread_mutex_unlock(&ctx_impl->mutex); 1173 1174 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); 1175 } 1176 1177 ws_impl->ctx_impl = ctx_impl; 1178 1179 return ws_impl; 1180 } 1181 1182 1183 static void 1184 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) 1185 { 1186 nxt_unit_websocket_frame_impl_t *ws_impl; 1187 1188 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 1189 1190 while (ws_impl->buf != NULL) { 1191 nxt_unit_mmap_buf_free(ws_impl->buf); 1192 } 1193 1194 ws->req = NULL; 1195 1196 if (ws_impl->retain_buf != NULL) { 1197 free(ws_impl->retain_buf); 1198 1199 ws_impl->retain_buf = NULL; 1200 } 1201 1202 pthread_mutex_lock(&ws_impl->ctx_impl->mutex); 1203 1204 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); 1205 1206 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex); 1207 } 1208 1209 1210 static void 1211 nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl) 1212 { 1213 nxt_queue_remove(&ws_impl->link); 1214 1215 free(ws_impl); 1216 } 1217 1218 1219 uint16_t 1220 nxt_unit_field_hash(const char *name, size_t name_length) 1221 { 1222 u_char ch; 1223 uint32_t hash; 1224 const char *p, *end; 1225 1226 hash = 159406; /* Magic value copied from nxt_http_parse.c */ 1227 end = name + name_length; 1228 1229 for (p = name; p < end; p++) { 1230 ch = *p; 1231 hash = (hash << 4) + hash + nxt_lowcase(ch); 1232 } 1233 1234 hash = (hash >> 16) ^ hash; 1235 1236 return hash; 1237 } 1238 1239 1240 void 1241 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) 1242 { 1243 uint32_t i, j; 1244 nxt_unit_field_t *fields, f; 1245 nxt_unit_request_t *r; 1246 1247 nxt_unit_req_debug(req, "group_dup_fields"); 1248 1249 r = req->request; 1250 fields = r->fields; 1251 1252 for (i = 0; i < r->fields_count; i++) { 1253 1254 switch (fields[i].hash) { 1255 case NXT_UNIT_HASH_CONTENT_LENGTH: 1256 r->content_length_field = i; 1257 break; 1258 1259 case NXT_UNIT_HASH_CONTENT_TYPE: 1260 r->content_type_field = i; 1261 break; 1262 1263 case NXT_UNIT_HASH_COOKIE: 1264 r->cookie_field = i; 1265 break; 1266 }; 1267 1268 for (j = i + 1; j < r->fields_count; j++) { 1269 if (fields[i].hash != fields[j].hash) { 1270 continue; 1271 } 1272 1273 if (j == i + 1) { 1274 continue; 1275 } 1276 1277 f = fields[j]; 1278 f.name.offset += (j - (i + 1)) * sizeof(f); 1279 f.value.offset += (j - (i + 1)) * sizeof(f); 1280 1281 while (j > i + 1) { 1282 fields[j] = fields[j - 1]; 1283 fields[j].name.offset -= sizeof(f); 1284 fields[j].value.offset -= sizeof(f); 1285 j--; 1286 } 1287 1288 fields[j] = f; 1289 1290 i++; 1291 } 1292 } 1293 } 1294 1295 1296 int 1297 nxt_unit_response_init(nxt_unit_request_info_t *req, 1298 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size) 1299 { 1300 uint32_t buf_size; 1301 nxt_unit_buf_t *buf; 1302 nxt_unit_request_info_impl_t *req_impl; 1303 1304 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1305 1306 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1307 nxt_unit_req_warn(req, "init: response already sent"); 1308 1309 return NXT_UNIT_ERROR; 1310 } 1311 1312 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status, 1313 (int) max_fields_count, (int) max_fields_size); 1314 1315 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) { 1316 nxt_unit_req_debug(req, "duplicate response init"); 1317 } 1318 1319 buf_size = sizeof(nxt_unit_response_t) 1320 + max_fields_count * sizeof(nxt_unit_field_t) 1321 + max_fields_size; 1322 1323 if (nxt_slow_path(req->response_buf != NULL)) { 1324 buf = req->response_buf; 1325 1326 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) { 1327 goto init_response; 1328 } 1329 1330 nxt_unit_buf_free(buf); 1331 1332 req->response_buf = NULL; 1333 req->response = NULL; 1334 req->response_max_fields = 0; 1335 1336 req_impl->state = NXT_UNIT_RS_START; 1337 } 1338 1339 buf = nxt_unit_response_buf_alloc(req, buf_size); 1340 if (nxt_slow_path(buf == NULL)) { 1341 return NXT_UNIT_ERROR; 1342 } 1343 1344 init_response: 1345 1346 memset(buf->start, 0, sizeof(nxt_unit_response_t)); 1347 1348 req->response_buf = buf; 1349 1350 req->response = (nxt_unit_response_t *) buf->start; 1351 req->response->status = status; 1352 1353 buf->free = buf->start + sizeof(nxt_unit_response_t) 1354 + max_fields_count * sizeof(nxt_unit_field_t); 1355 1356 req->response_max_fields = max_fields_count; 1357 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT; 1358 1359 return NXT_UNIT_OK; 1360 } 1361 1362 1363 int 1364 nxt_unit_response_realloc(nxt_unit_request_info_t *req, 1365 uint32_t max_fields_count, uint32_t max_fields_size) 1366 { 1367 char *p; 1368 uint32_t i, buf_size; 1369 nxt_unit_buf_t *buf; 1370 nxt_unit_field_t *f, *src; 1371 nxt_unit_response_t *resp; 1372 nxt_unit_request_info_impl_t *req_impl; 1373 1374 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1375 1376 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1377 nxt_unit_req_warn(req, "realloc: response not init"); 1378 1379 return NXT_UNIT_ERROR; 1380 } 1381 1382 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1383 nxt_unit_req_warn(req, "realloc: response already sent"); 1384 1385 return NXT_UNIT_ERROR; 1386 } 1387 1388 if (nxt_slow_path(max_fields_count < req->response->fields_count)) { 1389 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small"); 1390 1391 return NXT_UNIT_ERROR; 1392 } 1393 1394 buf_size = sizeof(nxt_unit_response_t) 1395 + max_fields_count * sizeof(nxt_unit_field_t) 1396 + max_fields_size; 1397 1398 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size); 1399 1400 buf = nxt_unit_response_buf_alloc(req, buf_size); 1401 if (nxt_slow_path(buf == NULL)) { 1402 nxt_unit_req_warn(req, "realloc: new buf allocation failed"); 1403 return NXT_UNIT_ERROR; 1404 } 1405 1406 resp = (nxt_unit_response_t *) buf->start; 1407 1408 memset(resp, 0, sizeof(nxt_unit_response_t)); 1409 1410 resp->status = req->response->status; 1411 resp->content_length = req->response->content_length; 1412 1413 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t); 1414 f = resp->fields; 1415 1416 for (i = 0; i < req->response->fields_count; i++) { 1417 src = req->response->fields + i; 1418 1419 if (nxt_slow_path(src->skip != 0)) { 1420 continue; 1421 } 1422 1423 if (nxt_slow_path(src->name_length + src->value_length + 2 1424 > (uint32_t) (buf->end - p))) 1425 { 1426 nxt_unit_req_warn(req, "realloc: not enough space for field" 1427 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required", 1428 i, src, src->name_length, src->value_length); 1429 1430 goto fail; 1431 } 1432 1433 nxt_unit_sptr_set(&f->name, p); 1434 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length); 1435 *p++ = '\0'; 1436 1437 nxt_unit_sptr_set(&f->value, p); 1438 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length); 1439 *p++ = '\0'; 1440 1441 f->hash = src->hash; 1442 f->skip = 0; 1443 f->name_length = src->name_length; 1444 f->value_length = src->value_length; 1445 1446 resp->fields_count++; 1447 f++; 1448 } 1449 1450 if (req->response->piggyback_content_length > 0) { 1451 if (nxt_slow_path(req->response->piggyback_content_length 1452 > (uint32_t) (buf->end - p))) 1453 { 1454 nxt_unit_req_warn(req, "realloc: not enought space for content" 1455 " #%"PRIu32", %"PRIu32" required", 1456 i, req->response->piggyback_content_length); 1457 1458 goto fail; 1459 } 1460 1461 resp->piggyback_content_length = 1462 req->response->piggyback_content_length; 1463 1464 nxt_unit_sptr_set(&resp->piggyback_content, p); 1465 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), 1466 req->response->piggyback_content_length); 1467 } 1468 1469 buf->free = p; 1470 1471 nxt_unit_buf_free(req->response_buf); 1472 1473 req->response = resp; 1474 req->response_buf = buf; 1475 req->response_max_fields = max_fields_count; 1476 1477 return NXT_UNIT_OK; 1478 1479 fail: 1480 1481 nxt_unit_buf_free(buf); 1482 1483 return NXT_UNIT_ERROR; 1484 } 1485 1486 1487 int 1488 nxt_unit_response_is_init(nxt_unit_request_info_t *req) 1489 { 1490 nxt_unit_request_info_impl_t *req_impl; 1491 1492 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1493 1494 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT; 1495 } 1496 1497 1498 int 1499 nxt_unit_response_add_field(nxt_unit_request_info_t *req, 1500 const char *name, uint8_t name_length, 1501 const char *value, uint32_t value_length) 1502 { 1503 nxt_unit_buf_t *buf; 1504 nxt_unit_field_t *f; 1505 nxt_unit_response_t *resp; 1506 nxt_unit_request_info_impl_t *req_impl; 1507 1508 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1509 1510 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) { 1511 nxt_unit_req_warn(req, "add_field: response not initialized or " 1512 "already sent"); 1513 1514 return NXT_UNIT_ERROR; 1515 } 1516 1517 resp = req->response; 1518 1519 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { 1520 nxt_unit_req_warn(req, "add_field: too many response fields"); 1521 1522 return NXT_UNIT_ERROR; 1523 } 1524 1525 buf = req->response_buf; 1526 1527 if (nxt_slow_path(name_length + value_length + 2 1528 > (uint32_t) (buf->end - buf->free))) 1529 { 1530 nxt_unit_req_warn(req, "add_field: response buffer overflow"); 1531 1532 return NXT_UNIT_ERROR; 1533 } 1534 1535 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s", 1536 resp->fields_count, 1537 (int) name_length, name, 1538 (int) value_length, value); 1539 1540 f = resp->fields + resp->fields_count; 1541 1542 nxt_unit_sptr_set(&f->name, buf->free); 1543 buf->free = nxt_cpymem(buf->free, name, name_length); 1544 *buf->free++ = '\0'; 1545 1546 nxt_unit_sptr_set(&f->value, buf->free); 1547 buf->free = nxt_cpymem(buf->free, value, value_length); 1548 *buf->free++ = '\0'; 1549 1550 f->hash = nxt_unit_field_hash(name, name_length); 1551 f->skip = 0; 1552 f->name_length = name_length; 1553 f->value_length = value_length; 1554 1555 resp->fields_count++; 1556 1557 return NXT_UNIT_OK; 1558 } 1559 1560 1561 int 1562 nxt_unit_response_add_content(nxt_unit_request_info_t *req, 1563 const void* src, uint32_t size) 1564 { 1565 nxt_unit_buf_t *buf; 1566 nxt_unit_response_t *resp; 1567 nxt_unit_request_info_impl_t *req_impl; 1568 1569 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1570 1571 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1572 nxt_unit_req_warn(req, "add_content: response not initialized yet"); 1573 1574 return NXT_UNIT_ERROR; 1575 } 1576 1577 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1578 nxt_unit_req_warn(req, "add_content: response already sent"); 1579 1580 return NXT_UNIT_ERROR; 1581 } 1582 1583 buf = req->response_buf; 1584 1585 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) { 1586 nxt_unit_req_warn(req, "add_content: buffer overflow"); 1587 1588 return NXT_UNIT_ERROR; 1589 } 1590 1591 resp = req->response; 1592 1593 if (resp->piggyback_content_length == 0) { 1594 nxt_unit_sptr_set(&resp->piggyback_content, buf->free); 1595 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT; 1596 } 1597 1598 resp->piggyback_content_length += size; 1599 1600 buf->free = nxt_cpymem(buf->free, src, size); 1601 1602 return NXT_UNIT_OK; 1603 } 1604 1605 1606 int 1607 nxt_unit_response_send(nxt_unit_request_info_t *req) 1608 { 1609 int rc; 1610 nxt_unit_mmap_buf_t *mmap_buf; 1611 nxt_unit_request_info_impl_t *req_impl; 1612 1613 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1614 1615 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1616 nxt_unit_req_warn(req, "send: response is not initialized yet"); 1617 1618 return NXT_UNIT_ERROR; 1619 } 1620 1621 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1622 nxt_unit_req_warn(req, "send: response already sent"); 1623 1624 return NXT_UNIT_ERROR; 1625 } 1626 1627 if (req->request->websocket_handshake && req->response->status == 101) { 1628 nxt_unit_response_upgrade(req); 1629 } 1630 1631 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 1632 req->response->fields_count, 1633 (int) (req->response_buf->free 1634 - req->response_buf->start)); 1635 1636 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 1637 1638 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); 1639 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 1640 req->response = NULL; 1641 req->response_buf = NULL; 1642 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 1643 1644 nxt_unit_mmap_buf_release(mmap_buf); 1645 } 1646 1647 return rc; 1648 } 1649 1650 1651 int 1652 nxt_unit_response_is_sent(nxt_unit_request_info_t *req) 1653 { 1654 nxt_unit_request_info_impl_t *req_impl; 1655 1656 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1657 1658 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 1659 } 1660 1661 1662 nxt_unit_buf_t * 1663 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 1664 { 1665 int rc; 1666 nxt_unit_mmap_buf_t *mmap_buf; 1667 nxt_unit_request_info_impl_t *req_impl; 1668 1669 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 1670 nxt_unit_req_warn(req, "response_buf_alloc: " 1671 "requested buffer (%"PRIu32") too big", size); 1672 1673 return NULL; 1674 } 1675 1676 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 1677 1678 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1679 1680 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 1681 if (nxt_slow_path(mmap_buf == NULL)) { 1682 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf"); 1683 1684 return NULL; 1685 } 1686 1687 mmap_buf->req = req; 1688 1689 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); 1690 1691 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 1692 &req->response_port, size, mmap_buf); 1693 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1694 nxt_unit_mmap_buf_release(mmap_buf); 1695 1696 return NULL; 1697 } 1698 1699 return &mmap_buf->buf; 1700 } 1701 1702 1703 static nxt_unit_process_t * 1704 nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1705 { 1706 nxt_unit_impl_t *lib; 1707 1708 if (recv_msg->process != NULL) { 1709 return recv_msg->process; 1710 } 1711 1712 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1713 1714 pthread_mutex_lock(&lib->mutex); 1715 1716 recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0); 1717 1718 pthread_mutex_unlock(&lib->mutex); 1719 1720 if (recv_msg->process == NULL) { 1721 nxt_unit_warn(ctx, "#%"PRIu32": process %d not found", 1722 recv_msg->stream, (int) recv_msg->pid); 1723 } 1724 1725 return recv_msg->process; 1726 } 1727 1728 1729 static nxt_unit_mmap_buf_t * 1730 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 1731 { 1732 nxt_unit_mmap_buf_t *mmap_buf; 1733 nxt_unit_ctx_impl_t *ctx_impl; 1734 1735 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1736 1737 pthread_mutex_lock(&ctx_impl->mutex); 1738 1739 if (ctx_impl->free_buf == NULL) { 1740 pthread_mutex_unlock(&ctx_impl->mutex); 1741 1742 mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t)); 1743 if (nxt_slow_path(mmap_buf == NULL)) { 1744 return NULL; 1745 } 1746 1747 } else { 1748 mmap_buf = ctx_impl->free_buf; 1749 1750 nxt_unit_mmap_buf_remove(mmap_buf); 1751 1752 pthread_mutex_unlock(&ctx_impl->mutex); 1753 } 1754 1755 mmap_buf->ctx_impl = ctx_impl; 1756 1757 return mmap_buf; 1758 } 1759 1760 1761 static void 1762 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 1763 { 1764 nxt_unit_mmap_buf_remove(mmap_buf); 1765 1766 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); 1767 1768 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); 1769 1770 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex); 1771 } 1772 1773 1774 typedef struct { 1775 size_t len; 1776 const char *str; 1777 } nxt_unit_str_t; 1778 1779 1780 #define nxt_unit_str(str) { nxt_length(str), str } 1781 1782 1783 int 1784 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) 1785 { 1786 return req->request->websocket_handshake; 1787 } 1788 1789 1790 int 1791 nxt_unit_response_upgrade(nxt_unit_request_info_t *req) 1792 { 1793 int rc; 1794 nxt_unit_ctx_impl_t *ctx_impl; 1795 nxt_unit_request_info_impl_t *req_impl; 1796 1797 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1798 1799 if (nxt_slow_path(req_impl->websocket != 0)) { 1800 nxt_unit_req_debug(req, "upgrade: already upgraded"); 1801 1802 return NXT_UNIT_OK; 1803 } 1804 1805 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1806 nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); 1807 1808 return NXT_UNIT_ERROR; 1809 } 1810 1811 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1812 nxt_unit_req_warn(req, "upgrade: response already sent"); 1813 1814 return NXT_UNIT_ERROR; 1815 } 1816 1817 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1818 1819 rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl); 1820 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1821 nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); 1822 1823 return NXT_UNIT_ERROR; 1824 } 1825 1826 req_impl->websocket = 1; 1827 1828 req->response->status = 101; 1829 1830 return NXT_UNIT_OK; 1831 } 1832 1833 1834 int 1835 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) 1836 { 1837 nxt_unit_request_info_impl_t *req_impl; 1838 1839 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1840 1841 return req_impl->websocket; 1842 } 1843 1844 1845 nxt_unit_request_info_t * 1846 nxt_unit_get_request_info_from_data(void *data) 1847 { 1848 nxt_unit_request_info_impl_t *req_impl; 1849 1850 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); 1851 1852 return &req_impl->req; 1853 } 1854 1855 1856 int 1857 nxt_unit_buf_send(nxt_unit_buf_t *buf) 1858 { 1859 int rc; 1860 nxt_unit_mmap_buf_t *mmap_buf; 1861 nxt_unit_request_info_t *req; 1862 nxt_unit_request_info_impl_t *req_impl; 1863 1864 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 1865 1866 req = mmap_buf->req; 1867 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1868 1869 nxt_unit_req_debug(req, "buf_send: %d bytes", 1870 (int) (buf->free - buf->start)); 1871 1872 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1873 nxt_unit_req_warn(req, "buf_send: response not initialized yet"); 1874 1875 return NXT_UNIT_ERROR; 1876 } 1877 1878 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 1879 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 1880 1881 return NXT_UNIT_ERROR; 1882 } 1883 1884 if (nxt_fast_path(buf->free > buf->start)) { 1885 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); 1886 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1887 return rc; 1888 } 1889 } 1890 1891 nxt_unit_mmap_buf_release(mmap_buf); 1892 1893 return NXT_UNIT_OK; 1894 } 1895 1896 1897 static void 1898 nxt_unit_buf_send_done(nxt_unit_buf_t *buf) 1899 { 1900 int rc; 1901 nxt_unit_mmap_buf_t *mmap_buf; 1902 nxt_unit_request_info_t *req; 1903 nxt_unit_request_info_impl_t *req_impl; 1904 1905 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 1906 1907 req = mmap_buf->req; 1908 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1909 1910 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1); 1911 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 1912 nxt_unit_mmap_buf_release(mmap_buf); 1913 1914 nxt_unit_request_info_release(req); 1915 1916 } else { 1917 nxt_unit_request_done(req, rc); 1918 } 1919 } 1920 1921 1922 static int 1923 nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, 1924 nxt_unit_mmap_buf_t *mmap_buf, int last) 1925 { 1926 struct { 1927 nxt_port_msg_t msg; 1928 nxt_port_mmap_msg_t mmap_msg; 1929 } m; 1930 1931 u_char *end, *last_used, *first_free; 1932 ssize_t res; 1933 nxt_chunk_id_t first_free_chunk; 1934 nxt_unit_buf_t *buf; 1935 nxt_unit_impl_t *lib; 1936 nxt_port_mmap_header_t *hdr; 1937 1938 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1939 1940 buf = &mmap_buf->buf; 1941 hdr = mmap_buf->hdr; 1942 1943 m.mmap_msg.size = buf->free - buf->start; 1944 1945 m.msg.stream = stream; 1946 m.msg.pid = lib->pid; 1947 m.msg.reply_port = 0; 1948 m.msg.type = _NXT_PORT_MSG_DATA; 1949 m.msg.last = last != 0; 1950 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; 1951 m.msg.nf = 0; 1952 m.msg.mf = 0; 1953 m.msg.tracking = 0; 1954 1955 if (hdr != NULL) { 1956 m.mmap_msg.mmap_id = hdr->id; 1957 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, 1958 (u_char *) buf->start); 1959 } 1960 1961 nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 1962 stream, 1963 (int) m.mmap_msg.mmap_id, 1964 (int) m.mmap_msg.chunk_id, 1965 (int) m.mmap_msg.size); 1966 1967 res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, 1968 m.msg.mmap ? sizeof(m) : sizeof(m.msg), 1969 NULL, 0); 1970 if (nxt_slow_path(res != sizeof(m))) { 1971 return NXT_UNIT_ERROR; 1972 } 1973 1974 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) { 1975 last_used = (u_char *) buf->free - 1; 1976 1977 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 1978 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 1979 end = (u_char *) buf->end; 1980 1981 nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free)); 1982 1983 buf->end = (char *) first_free; 1984 } 1985 1986 return NXT_UNIT_OK; 1987 } 1988 1989 1990 void 1991 nxt_unit_buf_free(nxt_unit_buf_t *buf) 1992 { 1993 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); 1994 } 1995 1996 1997 static void 1998 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) 1999 { 2000 if (nxt_fast_path(mmap_buf->hdr != NULL)) { 2001 nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start, 2002 mmap_buf->buf.end - mmap_buf->buf.start); 2003 } 2004 2005 nxt_unit_mmap_buf_release(mmap_buf); 2006 } 2007 2008 2009 nxt_unit_buf_t * 2010 nxt_unit_buf_next(nxt_unit_buf_t *buf) 2011 { 2012 nxt_unit_mmap_buf_t *mmap_buf; 2013 2014 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2015 2016 if (mmap_buf->next == NULL) { 2017 return NULL; 2018 } 2019 2020 return &mmap_buf->next->buf; 2021 } 2022 2023 2024 uint32_t 2025 nxt_unit_buf_max(void) 2026 { 2027 return PORT_MMAP_DATA_SIZE; 2028 } 2029 2030 2031 uint32_t 2032 nxt_unit_buf_min(void) 2033 { 2034 return PORT_MMAP_CHUNK_SIZE; 2035 } 2036 2037 2038 int 2039 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 2040 size_t size) 2041 { 2042 int rc; 2043 uint32_t part_size; 2044 const char *part_start; 2045 nxt_unit_mmap_buf_t mmap_buf; 2046 nxt_unit_request_info_impl_t *req_impl; 2047 2048 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2049 2050 part_start = start; 2051 2052 /* Check if response is not send yet. */ 2053 if (nxt_slow_path(req->response_buf)) { 2054 part_size = req->response_buf->end - req->response_buf->free; 2055 part_size = nxt_min(size, part_size); 2056 2057 rc = nxt_unit_response_add_content(req, part_start, part_size); 2058 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2059 return rc; 2060 } 2061 2062 rc = nxt_unit_response_send(req); 2063 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2064 return rc; 2065 } 2066 2067 size -= part_size; 2068 part_start += part_size; 2069 } 2070 2071 while (size > 0) { 2072 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 2073 2074 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, 2075 &req->response_port, part_size, 2076 &mmap_buf); 2077 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2078 return rc; 2079 } 2080 2081 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 2082 part_start, part_size); 2083 2084 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); 2085 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2086 nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start, 2087 mmap_buf.buf.end - mmap_buf.buf.start); 2088 2089 return rc; 2090 } 2091 2092 size -= part_size; 2093 part_start += part_size; 2094 } 2095 2096 return NXT_UNIT_OK; 2097 } 2098 2099 2100 int 2101 nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 2102 nxt_unit_read_info_t *read_info) 2103 { 2104 int rc; 2105 ssize_t n; 2106 nxt_unit_buf_t *buf; 2107 2108 /* Check if response is not send yet. */ 2109 if (nxt_slow_path(req->response_buf)) { 2110 2111 /* Enable content in headers buf. */ 2112 rc = nxt_unit_response_add_content(req, "", 0); 2113 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2114 nxt_unit_req_error(req, "Failed to add piggyback content"); 2115 2116 return rc; 2117 } 2118 2119 buf = req->response_buf; 2120 2121 while (buf->end - buf->free > 0) { 2122 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2123 if (nxt_slow_path(n < 0)) { 2124 nxt_unit_req_error(req, "Read error"); 2125 2126 return NXT_UNIT_ERROR; 2127 } 2128 2129 /* Manually increase sizes. */ 2130 buf->free += n; 2131 req->response->piggyback_content_length += n; 2132 2133 if (read_info->eof) { 2134 break; 2135 } 2136 } 2137 2138 rc = nxt_unit_response_send(req); 2139 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2140 nxt_unit_req_error(req, "Failed to send headers with content"); 2141 2142 return rc; 2143 } 2144 2145 if (read_info->eof) { 2146 return NXT_UNIT_OK; 2147 } 2148 } 2149 2150 while (!read_info->eof) { 2151 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", 2152 read_info->buf_size); 2153 2154 buf = nxt_unit_response_buf_alloc(req, nxt_min(read_info->buf_size, 2155 PORT_MMAP_DATA_SIZE)); 2156 if (nxt_slow_path(buf == NULL)) { 2157 nxt_unit_req_error(req, "Failed to allocate buf for content"); 2158 2159 return NXT_UNIT_ERROR; 2160 } 2161 2162 while (!read_info->eof && buf->end > buf->free) { 2163 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2164 if (nxt_slow_path(n < 0)) { 2165 nxt_unit_req_error(req, "Read error"); 2166 2167 nxt_unit_buf_free(buf); 2168 2169 return NXT_UNIT_ERROR; 2170 } 2171 2172 buf->free += n; 2173 } 2174 2175 rc = nxt_unit_buf_send(buf); 2176 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2177 nxt_unit_req_error(req, "Failed to send content"); 2178 2179 return rc; 2180 } 2181 } 2182 2183 return NXT_UNIT_OK; 2184 } 2185 2186 2187 ssize_t 2188 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 2189 { 2190 return nxt_unit_buf_read(&req->content_buf, &req->content_length, 2191 dst, size); 2192 } 2193 2194 2195 static ssize_t 2196 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) 2197 { 2198 u_char *p; 2199 size_t rest, copy, read; 2200 nxt_unit_buf_t *buf; 2201 2202 p = dst; 2203 rest = size; 2204 2205 buf = *b; 2206 2207 while (buf != NULL) { 2208 copy = buf->end - buf->free; 2209 copy = nxt_min(rest, copy); 2210 2211 p = nxt_cpymem(p, buf->free, copy); 2212 2213 buf->free += copy; 2214 rest -= copy; 2215 2216 if (rest == 0) { 2217 if (buf->end == buf->free) { 2218 buf = nxt_unit_buf_next(buf); 2219 } 2220 2221 break; 2222 } 2223 2224 buf = nxt_unit_buf_next(buf); 2225 } 2226 2227 *b = buf; 2228 2229 read = size - rest; 2230 2231 *len -= read; 2232 2233 return read; 2234 } 2235 2236 2237 void 2238 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 2239 { 2240 ssize_t res; 2241 uint32_t size; 2242 nxt_port_msg_t msg; 2243 nxt_unit_impl_t *lib; 2244 nxt_unit_request_info_impl_t *req_impl; 2245 2246 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2247 2248 nxt_unit_req_debug(req, "done: %d", rc); 2249 2250 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2251 goto skip_response_send; 2252 } 2253 2254 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2255 2256 size = nxt_length("Content-Type") + nxt_length("text/plain"); 2257 2258 rc = nxt_unit_response_init(req, 200, 1, size); 2259 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2260 goto skip_response_send; 2261 } 2262 2263 rc = nxt_unit_response_add_field(req, "Content-Type", 2264 nxt_length("Content-Type"), 2265 "text/plain", nxt_length("text/plain")); 2266 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2267 goto skip_response_send; 2268 } 2269 } 2270 2271 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 2272 2273 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 2274 2275 nxt_unit_buf_send_done(req->response_buf); 2276 2277 return; 2278 } 2279 2280 skip_response_send: 2281 2282 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); 2283 2284 msg.stream = req_impl->stream; 2285 msg.pid = lib->pid; 2286 msg.reply_port = 0; 2287 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 2288 : _NXT_PORT_MSG_RPC_ERROR; 2289 msg.last = 1; 2290 msg.mmap = 0; 2291 msg.nf = 0; 2292 msg.mf = 0; 2293 msg.tracking = 0; 2294 2295 res = lib->callbacks.port_send(req->ctx, &req->response_port, 2296 &msg, sizeof(msg), NULL, 0); 2297 if (nxt_slow_path(res != sizeof(msg))) { 2298 nxt_unit_req_alert(req, "last message send failed: %s (%d)", 2299 strerror(errno), errno); 2300 } 2301 2302 nxt_unit_request_info_release(req); 2303 } 2304 2305 2306 int 2307 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 2308 uint8_t last, const void *start, size_t size) 2309 { 2310 const struct iovec iov = { (void *) start, size }; 2311 2312 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1); 2313 } 2314 2315 2316 int 2317 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, 2318 uint8_t last, const struct iovec *iov, int iovcnt) 2319 { 2320 int i, rc; 2321 size_t l, copy; 2322 uint32_t payload_len, buf_size; 2323 const uint8_t *b; 2324 nxt_unit_buf_t *buf; 2325 nxt_websocket_header_t *wh; 2326 2327 payload_len = 0; 2328 2329 for (i = 0; i < iovcnt; i++) { 2330 payload_len += iov[i].iov_len; 2331 } 2332 2333 buf_size = 10 + payload_len; 2334 2335 buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, 2336 PORT_MMAP_DATA_SIZE)); 2337 if (nxt_slow_path(buf == NULL)) { 2338 nxt_unit_req_error(req, "Failed to allocate buf for content"); 2339 2340 return NXT_UNIT_ERROR; 2341 } 2342 2343 buf->start[0] = 0; 2344 buf->start[1] = 0; 2345 2346 wh = (void *) buf->free; 2347 2348 buf->free = nxt_websocket_frame_init(wh, payload_len); 2349 wh->fin = last; 2350 wh->opcode = opcode; 2351 2352 for (i = 0; i < iovcnt; i++) { 2353 b = iov[i].iov_base; 2354 l = iov[i].iov_len; 2355 2356 while (l > 0) { 2357 copy = buf->end - buf->free; 2358 copy = nxt_min(l, copy); 2359 2360 buf->free = nxt_cpymem(buf->free, b, copy); 2361 b += copy; 2362 l -= copy; 2363 2364 if (l > 0) { 2365 buf_size -= buf->end - buf->start; 2366 2367 rc = nxt_unit_buf_send(buf); 2368 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2369 nxt_unit_req_error(req, "Failed to send content"); 2370 2371 return NXT_UNIT_ERROR; 2372 } 2373 2374 buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, 2375 PORT_MMAP_DATA_SIZE)); 2376 if (nxt_slow_path(buf == NULL)) { 2377 nxt_unit_req_error(req, 2378 "Failed to allocate buf for content"); 2379 2380 return NXT_UNIT_ERROR; 2381 } 2382 } 2383 } 2384 } 2385 2386 if (buf->free > buf->start) { 2387 rc = nxt_unit_buf_send(buf); 2388 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2389 nxt_unit_req_error(req, "Failed to send content"); 2390 } 2391 } 2392 2393 return rc; 2394 } 2395 2396 2397 ssize_t 2398 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, 2399 size_t size) 2400 { 2401 ssize_t res; 2402 uint8_t *b; 2403 uint64_t i, d; 2404 2405 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length, 2406 dst, size); 2407 2408 if (ws->mask == NULL) { 2409 return res; 2410 } 2411 2412 b = dst; 2413 d = (ws->payload_len - ws->content_length - res) % 4; 2414 2415 for (i = 0; i < (uint64_t) res; i++) { 2416 b[i] ^= ws->mask[ (i + d) % 4 ]; 2417 } 2418 2419 return res; 2420 } 2421 2422 2423 int 2424 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) 2425 { 2426 char *b; 2427 size_t size; 2428 nxt_unit_websocket_frame_impl_t *ws_impl; 2429 2430 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 2431 2432 if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) { 2433 return NXT_UNIT_OK; 2434 } 2435 2436 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start; 2437 2438 b = malloc(size); 2439 if (nxt_slow_path(b == NULL)) { 2440 return NXT_UNIT_ERROR; 2441 } 2442 2443 memcpy(b, ws_impl->buf->buf.start, size); 2444 2445 ws_impl->buf->buf.start = b; 2446 ws_impl->buf->buf.free = b; 2447 ws_impl->buf->buf.end = b + size; 2448 2449 ws_impl->retain_buf = b; 2450 2451 return NXT_UNIT_OK; 2452 } 2453 2454 2455 void 2456 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) 2457 { 2458 nxt_unit_websocket_frame_release(ws); 2459 } 2460 2461 2462 static nxt_port_mmap_header_t * 2463 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 2464 nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n) 2465 { 2466 int res, nchunks, i; 2467 nxt_unit_mmap_t *mm, *mm_end; 2468 nxt_port_mmap_header_t *hdr; 2469 2470 pthread_mutex_lock(&process->outgoing.mutex); 2471 2472 mm_end = process->outgoing.elts + process->outgoing.size; 2473 2474 for (mm = process->outgoing.elts; mm < mm_end; mm++) { 2475 hdr = mm->hdr; 2476 2477 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) { 2478 continue; 2479 } 2480 2481 *c = 0; 2482 2483 while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) { 2484 nchunks = 1; 2485 2486 while (nchunks < n) { 2487 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, 2488 *c + nchunks); 2489 2490 if (res == 0) { 2491 for (i = 0; i < nchunks; i++) { 2492 nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i); 2493 } 2494 2495 *c += nchunks + 1; 2496 nchunks = 0; 2497 break; 2498 } 2499 2500 nchunks++; 2501 } 2502 2503 if (nchunks == n) { 2504 goto unlock; 2505 } 2506 } 2507 } 2508 2509 *c = 0; 2510 hdr = nxt_unit_new_mmap(ctx, process, port_id, n); 2511 2512 unlock: 2513 2514 pthread_mutex_unlock(&process->outgoing.mutex); 2515 2516 return hdr; 2517 } 2518 2519 2520 static nxt_unit_mmap_t * 2521 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) 2522 { 2523 uint32_t cap; 2524 2525 cap = mmaps->cap; 2526 2527 if (cap == 0) { 2528 cap = i + 1; 2529 } 2530 2531 while (i + 1 > cap) { 2532 2533 if (cap < 16) { 2534 cap = cap * 2; 2535 2536 } else { 2537 cap = cap + cap / 2; 2538 } 2539 } 2540 2541 if (cap != mmaps->cap) { 2542 2543 mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts)); 2544 if (nxt_slow_path(mmaps->elts == NULL)) { 2545 return NULL; 2546 } 2547 2548 memset(mmaps->elts + mmaps->cap, 0, 2549 sizeof(*mmaps->elts) * (cap - mmaps->cap)); 2550 2551 mmaps->cap = cap; 2552 } 2553 2554 if (i + 1 > mmaps->size) { 2555 mmaps->size = i + 1; 2556 } 2557 2558 return mmaps->elts + i; 2559 } 2560 2561 2562 static nxt_port_mmap_header_t * 2563 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 2564 nxt_unit_port_id_t *port_id, int n) 2565 { 2566 int i, fd, rc; 2567 void *mem; 2568 char name[64]; 2569 nxt_unit_mmap_t *mm; 2570 nxt_unit_impl_t *lib; 2571 nxt_port_mmap_header_t *hdr; 2572 2573 lib = process->lib; 2574 2575 mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size); 2576 if (nxt_slow_path(mm == NULL)) { 2577 nxt_unit_warn(ctx, "failed to add mmap to outgoing array"); 2578 2579 return NULL; 2580 } 2581 2582 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", 2583 lib->pid, (void *) pthread_self()); 2584 2585 #if (NXT_HAVE_MEMFD_CREATE) 2586 2587 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 2588 if (nxt_slow_path(fd == -1)) { 2589 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, 2590 strerror(errno), errno); 2591 2592 goto remove_fail; 2593 } 2594 2595 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); 2596 2597 #elif (NXT_HAVE_SHM_OPEN_ANON) 2598 2599 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 2600 if (nxt_slow_path(fd == -1)) { 2601 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", 2602 strerror(errno), errno); 2603 2604 goto remove_fail; 2605 } 2606 2607 #elif (NXT_HAVE_SHM_OPEN) 2608 2609 /* Just in case. */ 2610 shm_unlink(name); 2611 2612 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 2613 if (nxt_slow_path(fd == -1)) { 2614 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, 2615 strerror(errno), errno); 2616 2617 goto remove_fail; 2618 } 2619 2620 if (nxt_slow_path(shm_unlink(name) == -1)) { 2621 nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name, 2622 strerror(errno), errno); 2623 } 2624 2625 #else 2626 2627 #error No working shared memory implementation. 2628 2629 #endif 2630 2631 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { 2632 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, 2633 strerror(errno), errno); 2634 2635 goto remove_fail; 2636 } 2637 2638 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2639 if (nxt_slow_path(mem == MAP_FAILED)) { 2640 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, 2641 strerror(errno), errno); 2642 2643 goto remove_fail; 2644 } 2645 2646 mm->hdr = mem; 2647 hdr = mem; 2648 2649 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 2650 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 2651 2652 hdr->id = process->outgoing.size - 1; 2653 hdr->src_pid = lib->pid; 2654 hdr->dst_pid = process->pid; 2655 hdr->sent_over = port_id->id; 2656 2657 /* Mark first n chunk(s) as busy */ 2658 for (i = 0; i < n; i++) { 2659 nxt_port_mmap_set_chunk_busy(hdr->free_map, i); 2660 } 2661 2662 /* Mark as busy chunk followed the last available chunk. */ 2663 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 2664 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 2665 2666 pthread_mutex_unlock(&process->outgoing.mutex); 2667 2668 rc = nxt_unit_send_mmap(ctx, port_id, fd); 2669 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2670 munmap(mem, PORT_MMAP_SIZE); 2671 hdr = NULL; 2672 2673 } else { 2674 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", 2675 hdr->id, (int) lib->pid, (int) process->pid); 2676 } 2677 2678 close(fd); 2679 2680 pthread_mutex_lock(&process->outgoing.mutex); 2681 2682 if (nxt_fast_path(hdr != NULL)) { 2683 return hdr; 2684 } 2685 2686 remove_fail: 2687 2688 process->outgoing.size--; 2689 2690 return NULL; 2691 } 2692 2693 2694 static int 2695 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) 2696 { 2697 ssize_t res; 2698 nxt_port_msg_t msg; 2699 nxt_unit_impl_t *lib; 2700 union { 2701 struct cmsghdr cm; 2702 char space[CMSG_SPACE(sizeof(int))]; 2703 } cmsg; 2704 2705 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2706 2707 msg.stream = 0; 2708 msg.pid = lib->pid; 2709 msg.reply_port = 0; 2710 msg.type = _NXT_PORT_MSG_MMAP; 2711 msg.last = 0; 2712 msg.mmap = 0; 2713 msg.nf = 0; 2714 msg.mf = 0; 2715 msg.tracking = 0; 2716 2717 /* 2718 * Fill all padding fields with 0. 2719 * Code in Go 1.11 validate cmsghdr using padding field as part of len. 2720 * See Cmsghdr definition and socketControlMessageHeaderAndData function. 2721 */ 2722 memset(&cmsg, 0, sizeof(cmsg)); 2723 2724 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 2725 cmsg.cm.cmsg_level = SOL_SOCKET; 2726 cmsg.cm.cmsg_type = SCM_RIGHTS; 2727 2728 /* 2729 * memcpy() is used instead of simple 2730 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 2731 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 2732 * dereferencing type-punned pointer will break strict-aliasing rules 2733 * 2734 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 2735 * in the same simple assignment as in the code above. 2736 */ 2737 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 2738 2739 res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), 2740 &cmsg, sizeof(cmsg)); 2741 if (nxt_slow_path(res != sizeof(msg))) { 2742 nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)", 2743 (int) port_id->pid, strerror(errno), errno); 2744 2745 return NXT_UNIT_ERROR; 2746 } 2747 2748 return NXT_UNIT_OK; 2749 } 2750 2751 2752 static int 2753 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 2754 nxt_unit_port_id_t *port_id, uint32_t size, 2755 nxt_unit_mmap_buf_t *mmap_buf) 2756 { 2757 uint32_t nchunks; 2758 nxt_chunk_id_t c; 2759 nxt_port_mmap_header_t *hdr; 2760 2761 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 2762 2763 hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks); 2764 if (nxt_slow_path(hdr == NULL)) { 2765 return NXT_UNIT_ERROR; 2766 } 2767 2768 mmap_buf->hdr = hdr; 2769 mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); 2770 mmap_buf->buf.free = mmap_buf->buf.start; 2771 mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; 2772 mmap_buf->port_id = *port_id; 2773 2774 nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", 2775 (int) hdr->id, (int) c, 2776 (int) (nchunks * PORT_MMAP_CHUNK_SIZE)); 2777 2778 return NXT_UNIT_OK; 2779 } 2780 2781 2782 static int 2783 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) 2784 { 2785 int rc; 2786 void *mem; 2787 struct stat mmap_stat; 2788 nxt_unit_mmap_t *mm; 2789 nxt_unit_impl_t *lib; 2790 nxt_unit_process_t *process; 2791 nxt_port_mmap_header_t *hdr; 2792 2793 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2794 2795 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); 2796 2797 pthread_mutex_lock(&lib->mutex); 2798 2799 process = nxt_unit_process_find(ctx, pid, 0); 2800 2801 pthread_mutex_unlock(&lib->mutex); 2802 2803 if (nxt_slow_path(process == NULL)) { 2804 nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d", 2805 (int) pid, fd); 2806 2807 return NXT_UNIT_ERROR; 2808 } 2809 2810 rc = NXT_UNIT_ERROR; 2811 2812 if (fstat(fd, &mmap_stat) == -1) { 2813 nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, 2814 strerror(errno), errno); 2815 2816 goto fail; 2817 } 2818 2819 mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE, 2820 MAP_SHARED, fd, 0); 2821 if (nxt_slow_path(mem == MAP_FAILED)) { 2822 nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)", 2823 strerror(errno), errno); 2824 2825 goto fail; 2826 } 2827 2828 hdr = mem; 2829 2830 if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) { 2831 2832 nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " 2833 "detected: %d != %d or %d != %d", (int) hdr->src_pid, 2834 (int) pid, (int) hdr->dst_pid, (int) lib->pid); 2835 2836 munmap(mem, PORT_MMAP_SIZE); 2837 2838 goto fail; 2839 } 2840 2841 pthread_mutex_lock(&process->incoming.mutex); 2842 2843 mm = nxt_unit_mmap_at(&process->incoming, hdr->id); 2844 if (nxt_slow_path(mm == NULL)) { 2845 nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array"); 2846 2847 munmap(mem, PORT_MMAP_SIZE); 2848 2849 } else { 2850 mm->hdr = hdr; 2851 2852 hdr->sent_over = 0xFFFFu; 2853 2854 rc = NXT_UNIT_OK; 2855 } 2856 2857 pthread_mutex_unlock(&process->incoming.mutex); 2858 2859 fail: 2860 2861 nxt_unit_process_use(ctx, process, -1); 2862 2863 return rc; 2864 } 2865 2866 2867 static void 2868 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) 2869 { 2870 pthread_mutex_init(&mmaps->mutex, NULL); 2871 2872 mmaps->size = 0; 2873 mmaps->cap = 0; 2874 mmaps->elts = NULL; 2875 } 2876 2877 2878 static void 2879 nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i) 2880 { 2881 long c; 2882 2883 c = nxt_atomic_fetch_add(&process->use_count, i); 2884 2885 if (i < 0 && c == -i) { 2886 nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid); 2887 2888 nxt_unit_mmaps_destroy(&process->incoming); 2889 nxt_unit_mmaps_destroy(&process->outgoing); 2890 2891 free(process); 2892 } 2893 } 2894 2895 2896 static void 2897 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) 2898 { 2899 nxt_unit_mmap_t *mm, *end; 2900 2901 if (mmaps->elts != NULL) { 2902 end = mmaps->elts + mmaps->size; 2903 2904 for (mm = mmaps->elts; mm < end; mm++) { 2905 munmap(mm->hdr, PORT_MMAP_SIZE); 2906 } 2907 2908 free(mmaps->elts); 2909 } 2910 2911 pthread_mutex_destroy(&mmaps->mutex); 2912 } 2913 2914 2915 static nxt_port_mmap_header_t * 2916 nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 2917 uint32_t id) 2918 { 2919 nxt_port_mmap_header_t *hdr; 2920 2921 if (nxt_fast_path(process->incoming.size > id)) { 2922 hdr = process->incoming.elts[id].hdr; 2923 2924 } else { 2925 hdr = NULL; 2926 } 2927 2928 return hdr; 2929 } 2930 2931 2932 static int 2933 nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 2934 { 2935 int rc; 2936 nxt_chunk_id_t c; 2937 nxt_unit_process_t *process; 2938 nxt_port_mmap_header_t *hdr; 2939 nxt_port_mmap_tracking_msg_t *tracking_msg; 2940 2941 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 2942 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", 2943 recv_msg->stream, (int) recv_msg->size); 2944 2945 return 0; 2946 } 2947 2948 tracking_msg = recv_msg->start; 2949 2950 recv_msg->start = tracking_msg + 1; 2951 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); 2952 2953 process = nxt_unit_msg_get_process(ctx, recv_msg); 2954 if (nxt_slow_path(process == NULL)) { 2955 return 0; 2956 } 2957 2958 pthread_mutex_lock(&process->incoming.mutex); 2959 2960 hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id); 2961 if (nxt_slow_path(hdr == NULL)) { 2962 pthread_mutex_unlock(&process->incoming.mutex); 2963 2964 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " 2965 "invalid mmap id %d,%"PRIu32, 2966 recv_msg->stream, (int) process->pid, 2967 tracking_msg->mmap_id); 2968 2969 return 0; 2970 } 2971 2972 c = tracking_msg->tracking_id; 2973 rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); 2974 2975 if (rc == 0) { 2976 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", 2977 recv_msg->stream); 2978 2979 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 2980 } 2981 2982 pthread_mutex_unlock(&process->incoming.mutex); 2983 2984 return rc; 2985 } 2986 2987 2988 static int 2989 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 2990 { 2991 void *start; 2992 uint32_t size; 2993 nxt_unit_process_t *process; 2994 nxt_unit_mmap_buf_t *b, **incoming_tail; 2995 nxt_port_mmap_msg_t *mmap_msg, *end; 2996 nxt_port_mmap_header_t *hdr; 2997 2998 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 2999 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", 3000 recv_msg->stream, (int) recv_msg->size); 3001 3002 return NXT_UNIT_ERROR; 3003 } 3004 3005 process = nxt_unit_msg_get_process(ctx, recv_msg); 3006 if (nxt_slow_path(process == NULL)) { 3007 return NXT_UNIT_ERROR; 3008 } 3009 3010 mmap_msg = recv_msg->start; 3011 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 3012 3013 incoming_tail = &recv_msg->incoming_buf; 3014 3015 pthread_mutex_lock(&process->incoming.mutex); 3016 3017 for (; mmap_msg < end; mmap_msg++) { 3018 hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id); 3019 if (nxt_slow_path(hdr == NULL)) { 3020 pthread_mutex_unlock(&process->incoming.mutex); 3021 3022 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " 3023 "invalid mmap id %d,%"PRIu32, 3024 recv_msg->stream, (int) process->pid, 3025 mmap_msg->mmap_id); 3026 3027 return NXT_UNIT_ERROR; 3028 } 3029 3030 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 3031 size = mmap_msg->size; 3032 3033 if (recv_msg->start == mmap_msg) { 3034 recv_msg->start = start; 3035 recv_msg->size = size; 3036 } 3037 3038 b = nxt_unit_mmap_buf_get(ctx); 3039 if (nxt_slow_path(b == NULL)) { 3040 pthread_mutex_unlock(&process->incoming.mutex); 3041 3042 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", 3043 recv_msg->stream); 3044 3045 nxt_unit_mmap_release(hdr, start, size); 3046 3047 return NXT_UNIT_ERROR; 3048 } 3049 3050 nxt_unit_mmap_buf_insert(incoming_tail, b); 3051 incoming_tail = &b->next; 3052 3053 b->buf.start = start; 3054 b->buf.free = start; 3055 b->buf.end = b->buf.start + size; 3056 b->hdr = hdr; 3057 3058 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", 3059 recv_msg->stream, 3060 start, (int) size, 3061 (int) hdr->src_pid, (int) hdr->dst_pid, 3062 (int) hdr->id, (int) mmap_msg->chunk_id, 3063 (int) mmap_msg->size); 3064 } 3065 3066 pthread_mutex_unlock(&process->incoming.mutex); 3067 3068 return NXT_UNIT_OK; 3069 } 3070 3071 3072 static int 3073 nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size) 3074 { 3075 u_char *p, *end; 3076 nxt_chunk_id_t c; 3077 3078 memset(start, 0xA5, size); 3079 3080 p = start; 3081 end = p + size; 3082 c = nxt_port_mmap_chunk_id(hdr, p); 3083 3084 while (p < end) { 3085 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 3086 3087 p += PORT_MMAP_CHUNK_SIZE; 3088 c++; 3089 } 3090 3091 return NXT_UNIT_OK; 3092 } 3093 3094 3095 static nxt_int_t 3096 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 3097 { 3098 nxt_process_t *process; 3099 3100 process = data; 3101 3102 if (lhq->key.length == sizeof(pid_t) 3103 && *(pid_t *) lhq->key.start == process->pid) 3104 { 3105 return NXT_OK; 3106 } 3107 3108 return NXT_DECLINED; 3109 } 3110 3111 3112 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 3113 NXT_LVLHSH_DEFAULT, 3114 nxt_unit_lvlhsh_pid_test, 3115 nxt_lvlhsh_alloc, 3116 nxt_lvlhsh_free, 3117 }; 3118 3119 3120 static inline void 3121 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) 3122 { 3123 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 3124 lhq->key.length = sizeof(*pid); 3125 lhq->key.start = (u_char *) pid; 3126 lhq->proto = &lvlhsh_processes_proto; 3127 } 3128 3129 3130 static nxt_unit_process_t * 3131 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) 3132 { 3133 nxt_unit_impl_t *lib; 3134 nxt_unit_process_t *process; 3135 nxt_lvlhsh_query_t lhq; 3136 3137 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3138 3139 nxt_unit_process_lhq_pid(&lhq, &pid); 3140 3141 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { 3142 process = lhq.value; 3143 nxt_unit_process_use(ctx, process, 1); 3144 3145 return process; 3146 } 3147 3148 process = malloc(sizeof(nxt_unit_process_t)); 3149 if (nxt_slow_path(process == NULL)) { 3150 nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid); 3151 3152 return NULL; 3153 } 3154 3155 process->pid = pid; 3156 process->use_count = 1; 3157 process->next_port_id = 0; 3158 process->lib = lib; 3159 3160 nxt_queue_init(&process->ports); 3161 3162 nxt_unit_mmaps_init(&process->incoming); 3163 nxt_unit_mmaps_init(&process->outgoing); 3164 3165 lhq.replace = 0; 3166 lhq.value = process; 3167 3168 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) { 3169 3170 case NXT_OK: 3171 break; 3172 3173 default: 3174 nxt_unit_warn(ctx, "process %d insert failed", (int) pid); 3175 3176 pthread_mutex_destroy(&process->outgoing.mutex); 3177 pthread_mutex_destroy(&process->incoming.mutex); 3178 free(process); 3179 process = NULL; 3180 break; 3181 } 3182 3183 nxt_unit_process_use(ctx, process, 1); 3184 3185 return process; 3186 } 3187 3188 3189 static nxt_unit_process_t * 3190 nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove) 3191 { 3192 int rc; 3193 nxt_unit_impl_t *lib; 3194 nxt_unit_process_t *process; 3195 nxt_lvlhsh_query_t lhq; 3196 3197 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3198 3199 nxt_unit_process_lhq_pid(&lhq, &pid); 3200 3201 if (remove) { 3202 rc = nxt_lvlhsh_delete(&lib->processes, &lhq); 3203 3204 } else { 3205 rc = nxt_lvlhsh_find(&lib->processes, &lhq); 3206 } 3207 3208 if (rc == NXT_OK) { 3209 process = lhq.value; 3210 3211 if (!remove) { 3212 nxt_unit_process_use(ctx, process, 1); 3213 } 3214 3215 return process; 3216 } 3217 3218 return NULL; 3219 } 3220 3221 3222 static nxt_unit_process_t * 3223 nxt_unit_process_pop_first(nxt_unit_impl_t *lib) 3224 { 3225 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL); 3226 } 3227 3228 3229 int 3230 nxt_unit_run(nxt_unit_ctx_t *ctx) 3231 { 3232 int rc; 3233 nxt_unit_impl_t *lib; 3234 3235 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3236 rc = NXT_UNIT_OK; 3237 3238 while (nxt_fast_path(lib->online)) { 3239 rc = nxt_unit_run_once(ctx); 3240 } 3241 3242 return rc; 3243 } 3244 3245 3246 int 3247 nxt_unit_run_once(nxt_unit_ctx_t *ctx) 3248 { 3249 int rc; 3250 char buf[4096]; 3251 char oob[256]; 3252 ssize_t rsize; 3253 nxt_unit_impl_t *lib; 3254 nxt_unit_ctx_impl_t *ctx_impl; 3255 3256 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3257 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3258 3259 memset(oob, 0, sizeof(struct cmsghdr)); 3260 3261 if (ctx_impl->read_port_fd != -1) { 3262 rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd, 3263 buf, sizeof(buf), 3264 oob, sizeof(oob)); 3265 } else { 3266 rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, 3267 buf, sizeof(buf), 3268 oob, sizeof(oob)); 3269 } 3270 3271 if (nxt_fast_path(rsize > 0)) { 3272 rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize, 3273 oob, sizeof(oob)); 3274 3275 #if (NXT_DEBUG) 3276 memset(buf, 0xAC, rsize); 3277 #endif 3278 3279 } else { 3280 rc = NXT_UNIT_ERROR; 3281 } 3282 3283 return rc; 3284 } 3285 3286 3287 void 3288 nxt_unit_done(nxt_unit_ctx_t *ctx) 3289 { 3290 nxt_unit_impl_t *lib; 3291 nxt_unit_process_t *process; 3292 nxt_unit_ctx_impl_t *ctx_impl; 3293 3294 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3295 3296 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { 3297 3298 nxt_unit_ctx_free(&ctx_impl->ctx); 3299 3300 } nxt_queue_loop; 3301 3302 for ( ;; ) { 3303 pthread_mutex_lock(&lib->mutex); 3304 3305 process = nxt_unit_process_pop_first(lib); 3306 if (process == NULL) { 3307 pthread_mutex_unlock(&lib->mutex); 3308 3309 break; 3310 } 3311 3312 nxt_unit_remove_process(ctx, process); 3313 } 3314 3315 pthread_mutex_destroy(&lib->mutex); 3316 3317 free(lib); 3318 } 3319 3320 3321 nxt_unit_ctx_t * 3322 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) 3323 { 3324 int rc, fd; 3325 nxt_unit_impl_t *lib; 3326 nxt_unit_port_id_t new_port_id; 3327 nxt_unit_ctx_impl_t *new_ctx; 3328 3329 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3330 3331 new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size); 3332 if (nxt_slow_path(new_ctx == NULL)) { 3333 nxt_unit_warn(ctx, "failed to allocate context"); 3334 3335 return NULL; 3336 } 3337 3338 rc = nxt_unit_create_port(ctx, &new_port_id, &fd); 3339 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3340 free(new_ctx); 3341 3342 return NULL; 3343 } 3344 3345 rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd); 3346 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3347 lib->callbacks.remove_port(ctx, &new_port_id); 3348 3349 close(fd); 3350 3351 free(new_ctx); 3352 3353 return NULL; 3354 } 3355 3356 close(fd); 3357 3358 rc = nxt_unit_ctx_init(lib, new_ctx, data); 3359 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3360 lib->callbacks.remove_port(ctx, &new_port_id); 3361 3362 free(new_ctx); 3363 3364 return NULL; 3365 } 3366 3367 new_ctx->read_port_id = new_port_id; 3368 3369 return &new_ctx->ctx; 3370 } 3371 3372 3373 void 3374 nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) 3375 { 3376 nxt_unit_impl_t *lib; 3377 nxt_unit_ctx_impl_t *ctx_impl; 3378 nxt_unit_mmap_buf_t *mmap_buf; 3379 nxt_unit_request_info_impl_t *req_impl; 3380 nxt_unit_websocket_frame_impl_t *ws_impl; 3381 3382 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3383 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3384 3385 nxt_queue_each(req_impl, &ctx_impl->active_req, 3386 nxt_unit_request_info_impl_t, link) 3387 { 3388 nxt_unit_req_warn(&req_impl->req, "active request on ctx free"); 3389 3390 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR); 3391 3392 } nxt_queue_loop; 3393 3394 nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]); 3395 nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]); 3396 3397 while (ctx_impl->free_buf != NULL) { 3398 mmap_buf = ctx_impl->free_buf; 3399 nxt_unit_mmap_buf_remove(mmap_buf); 3400 free(mmap_buf); 3401 } 3402 3403 nxt_queue_each(req_impl, &ctx_impl->free_req, 3404 nxt_unit_request_info_impl_t, link) 3405 { 3406 nxt_unit_request_info_free(req_impl); 3407 3408 } nxt_queue_loop; 3409 3410 nxt_queue_each(ws_impl, &ctx_impl->free_ws, 3411 nxt_unit_websocket_frame_impl_t, link) 3412 { 3413 nxt_unit_websocket_frame_free(ws_impl); 3414 3415 } nxt_queue_loop; 3416 3417 pthread_mutex_destroy(&ctx_impl->mutex); 3418 3419 nxt_queue_remove(&ctx_impl->link); 3420 3421 if (ctx_impl != &lib->main_ctx) { 3422 free(ctx_impl); 3423 } 3424 } 3425 3426 3427 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */ 3428 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET) 3429 #define NXT_UNIX_SOCKET SOCK_SEQPACKET 3430 #else 3431 #define NXT_UNIX_SOCKET SOCK_DGRAM 3432 #endif 3433 3434 3435 void 3436 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) 3437 { 3438 nxt_unit_port_hash_id_t port_hash_id; 3439 3440 port_hash_id.pid = pid; 3441 port_hash_id.id = id; 3442 3443 port_id->pid = pid; 3444 port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id)); 3445 port_id->id = id; 3446 } 3447 3448 3449 int 3450 nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, 3451 nxt_unit_port_id_t *port_id) 3452 { 3453 int rc, fd; 3454 nxt_unit_impl_t *lib; 3455 nxt_unit_port_id_t new_port_id; 3456 3457 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3458 3459 rc = nxt_unit_create_port(ctx, &new_port_id, &fd); 3460 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3461 return rc; 3462 } 3463 3464 rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd); 3465 3466 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 3467 *port_id = new_port_id; 3468 3469 } else { 3470 lib->callbacks.remove_port(ctx, &new_port_id); 3471 } 3472 3473 close(fd); 3474 3475 return rc; 3476 } 3477 3478 3479 static int 3480 nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) 3481 { 3482 int rc, port_sockets[2]; 3483 nxt_unit_impl_t *lib; 3484 nxt_unit_port_t new_port; 3485 nxt_unit_process_t *process; 3486 3487 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3488 3489 rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets); 3490 if (nxt_slow_path(rc != 0)) { 3491 nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", 3492 strerror(errno), errno); 3493 3494 return NXT_UNIT_ERROR; 3495 } 3496 3497 nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", 3498 port_sockets[0], port_sockets[1]); 3499 3500 pthread_mutex_lock(&lib->mutex); 3501 3502 process = nxt_unit_process_get(ctx, lib->pid); 3503 if (nxt_slow_path(process == NULL)) { 3504 pthread_mutex_unlock(&lib->mutex); 3505 3506 close(port_sockets[0]); 3507 close(port_sockets[1]); 3508 3509 return NXT_UNIT_ERROR; 3510 } 3511 3512 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); 3513 3514 new_port.in_fd = port_sockets[0]; 3515 new_port.out_fd = -1; 3516 new_port.data = NULL; 3517 3518 pthread_mutex_unlock(&lib->mutex); 3519 3520 nxt_unit_process_use(ctx, process, -1); 3521 3522 rc = lib->callbacks.add_port(ctx, &new_port); 3523 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3524 nxt_unit_warn(ctx, "create_port: add_port() failed"); 3525 3526 close(port_sockets[0]); 3527 close(port_sockets[1]); 3528 3529 return rc; 3530 } 3531 3532 *port_id = new_port.id; 3533 *fd = port_sockets[1]; 3534 3535 return rc; 3536 } 3537 3538 3539 static int 3540 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, 3541 nxt_unit_port_id_t *new_port, int fd) 3542 { 3543 ssize_t res; 3544 nxt_unit_impl_t *lib; 3545 3546 struct { 3547 nxt_port_msg_t msg; 3548 nxt_port_msg_new_port_t new_port; 3549 } m; 3550 3551 union { 3552 struct cmsghdr cm; 3553 char space[CMSG_SPACE(sizeof(int))]; 3554 } cmsg; 3555 3556 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3557 3558 m.msg.stream = 0; 3559 m.msg.pid = lib->pid; 3560 m.msg.reply_port = 0; 3561 m.msg.type = _NXT_PORT_MSG_NEW_PORT; 3562 m.msg.last = 0; 3563 m.msg.mmap = 0; 3564 m.msg.nf = 0; 3565 m.msg.mf = 0; 3566 m.msg.tracking = 0; 3567 3568 m.new_port.id = new_port->id; 3569 m.new_port.pid = new_port->pid; 3570 m.new_port.type = NXT_PROCESS_WORKER; 3571 m.new_port.max_size = 16 * 1024; 3572 m.new_port.max_share = 64 * 1024; 3573 3574 memset(&cmsg, 0, sizeof(cmsg)); 3575 3576 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 3577 cmsg.cm.cmsg_level = SOL_SOCKET; 3578 cmsg.cm.cmsg_type = SCM_RIGHTS; 3579 3580 /* 3581 * memcpy() is used instead of simple 3582 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 3583 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 3584 * dereferencing type-punned pointer will break strict-aliasing rules 3585 * 3586 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 3587 * in the same simple assignment as in the code above. 3588 */ 3589 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 3590 3591 res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m), 3592 &cmsg, sizeof(cmsg)); 3593 3594 return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR; 3595 } 3596 3597 3598 int 3599 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 3600 { 3601 int rc; 3602 nxt_unit_impl_t *lib; 3603 nxt_unit_process_t *process; 3604 nxt_unit_port_impl_t *new_port; 3605 3606 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3607 3608 nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", 3609 port->id.pid, port->id.id, 3610 port->in_fd, port->out_fd); 3611 3612 pthread_mutex_lock(&lib->mutex); 3613 3614 process = nxt_unit_process_get(ctx, port->id.pid); 3615 if (nxt_slow_path(process == NULL)) { 3616 rc = NXT_UNIT_ERROR; 3617 goto unlock; 3618 } 3619 3620 if (port->id.id >= process->next_port_id) { 3621 process->next_port_id = port->id.id + 1; 3622 } 3623 3624 new_port = malloc(sizeof(nxt_unit_port_impl_t)); 3625 if (nxt_slow_path(new_port == NULL)) { 3626 rc = NXT_UNIT_ERROR; 3627 goto unlock; 3628 } 3629 3630 new_port->port = *port; 3631 3632 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port); 3633 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3634 goto unlock; 3635 } 3636 3637 nxt_queue_insert_tail(&process->ports, &new_port->link); 3638 3639 rc = NXT_UNIT_OK; 3640 3641 new_port->process = process; 3642 3643 unlock: 3644 3645 pthread_mutex_unlock(&lib->mutex); 3646 3647 if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) { 3648 nxt_unit_process_use(ctx, process, -1); 3649 } 3650 3651 return rc; 3652 } 3653 3654 3655 void 3656 nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 3657 { 3658 nxt_unit_find_remove_port(ctx, port_id, NULL); 3659 } 3660 3661 3662 void 3663 nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 3664 nxt_unit_port_t *r_port) 3665 { 3666 nxt_unit_impl_t *lib; 3667 nxt_unit_process_t *process; 3668 3669 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3670 3671 pthread_mutex_lock(&lib->mutex); 3672 3673 process = NULL; 3674 3675 nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process); 3676 3677 pthread_mutex_unlock(&lib->mutex); 3678 3679 if (nxt_slow_path(process != NULL)) { 3680 nxt_unit_process_use(ctx, process, -1); 3681 } 3682 } 3683 3684 3685 static void 3686 nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 3687 nxt_unit_port_t *r_port, nxt_unit_process_t **process) 3688 { 3689 nxt_unit_impl_t *lib; 3690 nxt_unit_port_impl_t *port; 3691 3692 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3693 3694 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); 3695 if (nxt_slow_path(port == NULL)) { 3696 nxt_unit_debug(ctx, "remove_port: port %d,%d not found", 3697 (int) port_id->pid, (int) port_id->id); 3698 3699 return; 3700 } 3701 3702 nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p", 3703 (int) port_id->pid, (int) port_id->id, 3704 port->port.in_fd, port->port.out_fd, port->port.data); 3705 3706 if (port->port.in_fd != -1) { 3707 close(port->port.in_fd); 3708 } 3709 3710 if (port->port.out_fd != -1) { 3711 close(port->port.out_fd); 3712 } 3713 3714 if (port->process != NULL) { 3715 nxt_queue_remove(&port->link); 3716 } 3717 3718 if (process != NULL) { 3719 *process = port->process; 3720 } 3721 3722 if (r_port != NULL) { 3723 *r_port = port->port; 3724 } 3725 3726 free(port); 3727 } 3728 3729 3730 void 3731 nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid) 3732 { 3733 nxt_unit_impl_t *lib; 3734 nxt_unit_process_t *process; 3735 3736 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3737 3738 pthread_mutex_lock(&lib->mutex); 3739 3740 process = nxt_unit_process_find(ctx, pid, 1); 3741 if (nxt_slow_path(process == NULL)) { 3742 nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid); 3743 3744 pthread_mutex_unlock(&lib->mutex); 3745 3746 return; 3747 } 3748 3749 nxt_unit_remove_process(ctx, process); 3750 } 3751 3752 3753 static void 3754 nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process) 3755 { 3756 nxt_queue_t ports; 3757 nxt_unit_impl_t *lib; 3758 nxt_unit_port_impl_t *port; 3759 3760 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3761 3762 nxt_queue_init(&ports); 3763 3764 nxt_queue_add(&ports, &process->ports); 3765 3766 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 3767 3768 nxt_unit_process_use(ctx, process, -1); 3769 port->process = NULL; 3770 3771 /* Shortcut for default callback. */ 3772 if (lib->callbacks.remove_port == nxt_unit_remove_port) { 3773 nxt_queue_remove(&port->link); 3774 3775 nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL); 3776 } 3777 3778 } nxt_queue_loop; 3779 3780 pthread_mutex_unlock(&lib->mutex); 3781 3782 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 3783 3784 nxt_queue_remove(&port->link); 3785 3786 lib->callbacks.remove_port(ctx, &port->port.id); 3787 3788 } nxt_queue_loop; 3789 3790 nxt_unit_process_use(ctx, process, -1); 3791 } 3792 3793 3794 void 3795 nxt_unit_quit(nxt_unit_ctx_t *ctx) 3796 { 3797 nxt_unit_impl_t *lib; 3798 3799 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3800 3801 lib->online = 0; 3802 } 3803 3804 3805 static ssize_t 3806 nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 3807 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 3808 { 3809 int fd; 3810 nxt_unit_impl_t *lib; 3811 nxt_unit_port_impl_t *port; 3812 3813 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3814 3815 pthread_mutex_lock(&lib->mutex); 3816 3817 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 3818 3819 if (nxt_fast_path(port != NULL)) { 3820 fd = port->port.out_fd; 3821 3822 } else { 3823 nxt_unit_warn(ctx, "port_send: port %d,%d not found", 3824 (int) port_id->pid, (int) port_id->id); 3825 fd = -1; 3826 } 3827 3828 pthread_mutex_unlock(&lib->mutex); 3829 3830 if (nxt_slow_path(fd == -1)) { 3831 if (port != NULL) { 3832 nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1", 3833 (int) port_id->pid, (int) port_id->id); 3834 } 3835 3836 return -1; 3837 } 3838 3839 nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d", 3840 (int) port_id->pid, (int) port_id->id, fd); 3841 3842 return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size); 3843 } 3844 3845 3846 ssize_t 3847 nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd, 3848 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 3849 { 3850 ssize_t res; 3851 struct iovec iov[1]; 3852 struct msghdr msg; 3853 3854 iov[0].iov_base = (void *) buf; 3855 iov[0].iov_len = buf_size; 3856 3857 msg.msg_name = NULL; 3858 msg.msg_namelen = 0; 3859 msg.msg_iov = iov; 3860 msg.msg_iovlen = 1; 3861 msg.msg_flags = 0; 3862 msg.msg_control = (void *) oob; 3863 msg.msg_controllen = oob_size; 3864 3865 res = sendmsg(fd, &msg, 0); 3866 3867 if (nxt_slow_path(res == -1)) { 3868 nxt_unit_warn(ctx, "port_send(%d, %d) failed: %s (%d)", 3869 fd, (int) buf_size, strerror(errno), errno); 3870 3871 } else { 3872 nxt_unit_debug(ctx, "port_send(%d, %d): %d", fd, (int) buf_size, 3873 (int) res); 3874 } 3875 3876 return res; 3877 } 3878 3879 3880 static ssize_t 3881 nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 3882 void *buf, size_t buf_size, void *oob, size_t oob_size) 3883 { 3884 int fd; 3885 nxt_unit_impl_t *lib; 3886 nxt_unit_ctx_impl_t *ctx_impl; 3887 nxt_unit_port_impl_t *port; 3888 3889 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3890 3891 pthread_mutex_lock(&lib->mutex); 3892 3893 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 3894 3895 if (nxt_fast_path(port != NULL)) { 3896 fd = port->port.in_fd; 3897 3898 } else { 3899 nxt_unit_debug(ctx, "port_recv: port %d,%d not found", 3900 (int) port_id->pid, (int) port_id->id); 3901 fd = -1; 3902 } 3903 3904 pthread_mutex_unlock(&lib->mutex); 3905 3906 if (nxt_slow_path(fd == -1)) { 3907 return -1; 3908 } 3909 3910 nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d", 3911 (int) port_id->pid, (int) port_id->id, fd); 3912 3913 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3914 3915 if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) { 3916 ctx_impl->read_port_fd = fd; 3917 } 3918 3919 return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size); 3920 } 3921 3922 3923 ssize_t 3924 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size, 3925 void *oob, size_t oob_size) 3926 { 3927 ssize_t res; 3928 struct iovec iov[1]; 3929 struct msghdr msg; 3930 3931 iov[0].iov_base = buf; 3932 iov[0].iov_len = buf_size; 3933 3934 msg.msg_name = NULL; 3935 msg.msg_namelen = 0; 3936 msg.msg_iov = iov; 3937 msg.msg_iovlen = 1; 3938 msg.msg_flags = 0; 3939 msg.msg_control = oob; 3940 msg.msg_controllen = oob_size; 3941 3942 res = recvmsg(fd, &msg, 0); 3943 3944 if (nxt_slow_path(res == -1)) { 3945 nxt_unit_warn(ctx, "port_recv(%d) failed: %s (%d)", 3946 fd, strerror(errno), errno); 3947 3948 } else { 3949 nxt_unit_debug(ctx, "port_recv(%d): %d", fd, (int) res); 3950 } 3951 3952 return res; 3953 } 3954 3955 3956 static nxt_int_t 3957 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 3958 { 3959 nxt_unit_port_t *port; 3960 nxt_unit_port_hash_id_t *port_id; 3961 3962 port = data; 3963 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start; 3964 3965 if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t) 3966 && port_id->pid == port->id.pid 3967 && port_id->id == port->id.id) 3968 { 3969 return NXT_OK; 3970 } 3971 3972 return NXT_DECLINED; 3973 } 3974 3975 3976 static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = { 3977 NXT_LVLHSH_DEFAULT, 3978 nxt_unit_port_hash_test, 3979 nxt_lvlhsh_alloc, 3980 nxt_lvlhsh_free, 3981 }; 3982 3983 3984 static inline void 3985 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq, 3986 nxt_unit_port_hash_id_t *port_hash_id, 3987 nxt_unit_port_id_t *port_id) 3988 { 3989 port_hash_id->pid = port_id->pid; 3990 port_hash_id->id = port_id->id; 3991 3992 if (nxt_fast_path(port_id->hash != 0)) { 3993 lhq->key_hash = port_id->hash; 3994 3995 } else { 3996 lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id)); 3997 3998 port_id->hash = lhq->key_hash; 3999 4000 nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X", 4001 (int) port_id->pid, (int) port_id->id, 4002 (int) port_id->hash); 4003 } 4004 4005 lhq->key.length = sizeof(nxt_unit_port_hash_id_t); 4006 lhq->key.start = (u_char *) port_hash_id; 4007 lhq->proto = &lvlhsh_ports_proto; 4008 lhq->pool = NULL; 4009 } 4010 4011 4012 static int 4013 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port) 4014 { 4015 nxt_int_t res; 4016 nxt_lvlhsh_query_t lhq; 4017 nxt_unit_port_hash_id_t port_hash_id; 4018 4019 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id); 4020 lhq.replace = 0; 4021 lhq.value = port; 4022 4023 res = nxt_lvlhsh_insert(port_hash, &lhq); 4024 4025 switch (res) { 4026 4027 case NXT_OK: 4028 return NXT_UNIT_OK; 4029 4030 default: 4031 return NXT_UNIT_ERROR; 4032 } 4033 } 4034 4035 4036 static nxt_unit_port_impl_t * 4037 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, 4038 int remove) 4039 { 4040 nxt_int_t res; 4041 nxt_lvlhsh_query_t lhq; 4042 nxt_unit_port_hash_id_t port_hash_id; 4043 4044 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id); 4045 4046 if (remove) { 4047 res = nxt_lvlhsh_delete(port_hash, &lhq); 4048 4049 } else { 4050 res = nxt_lvlhsh_find(port_hash, &lhq); 4051 } 4052 4053 switch (res) { 4054 4055 case NXT_OK: 4056 return lhq.value; 4057 4058 default: 4059 return NULL; 4060 } 4061 } 4062 4063 4064 static nxt_int_t 4065 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 4066 { 4067 return NXT_OK; 4068 } 4069 4070 4071 static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { 4072 NXT_LVLHSH_DEFAULT, 4073 nxt_unit_request_hash_test, 4074 nxt_lvlhsh_alloc, 4075 nxt_lvlhsh_free, 4076 }; 4077 4078 4079 static int 4080 nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, 4081 nxt_unit_request_info_impl_t *req_impl) 4082 { 4083 uint32_t *stream; 4084 nxt_int_t res; 4085 nxt_lvlhsh_query_t lhq; 4086 4087 stream = &req_impl->stream; 4088 4089 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); 4090 lhq.key.length = sizeof(*stream); 4091 lhq.key.start = (u_char *) stream; 4092 lhq.proto = &lvlhsh_requests_proto; 4093 lhq.pool = NULL; 4094 lhq.replace = 0; 4095 lhq.value = req_impl; 4096 4097 res = nxt_lvlhsh_insert(request_hash, &lhq); 4098 4099 switch (res) { 4100 4101 case NXT_OK: 4102 return NXT_UNIT_OK; 4103 4104 default: 4105 return NXT_UNIT_ERROR; 4106 } 4107 } 4108 4109 4110 static nxt_unit_request_info_impl_t * 4111 nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, 4112 int remove) 4113 { 4114 nxt_int_t res; 4115 nxt_lvlhsh_query_t lhq; 4116 4117 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); 4118 lhq.key.length = sizeof(stream); 4119 lhq.key.start = (u_char *) &stream; 4120 lhq.proto = &lvlhsh_requests_proto; 4121 lhq.pool = NULL; 4122 4123 if (remove) { 4124 res = nxt_lvlhsh_delete(request_hash, &lhq); 4125 4126 } else { 4127 res = nxt_lvlhsh_find(request_hash, &lhq); 4128 } 4129 4130 switch (res) { 4131 4132 case NXT_OK: 4133 return lhq.value; 4134 4135 default: 4136 return NULL; 4137 } 4138 } 4139 4140 4141 void 4142 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) 4143 { 4144 int log_fd, n; 4145 char msg[NXT_MAX_ERROR_STR], *p, *end; 4146 pid_t pid; 4147 va_list ap; 4148 nxt_unit_impl_t *lib; 4149 4150 if (nxt_fast_path(ctx != NULL)) { 4151 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4152 4153 pid = lib->pid; 4154 log_fd = lib->log_fd; 4155 4156 } else { 4157 pid = getpid(); 4158 log_fd = STDERR_FILENO; 4159 } 4160 4161 p = msg; 4162 end = p + sizeof(msg) - 1; 4163 4164 p = nxt_unit_snprint_prefix(p, end, pid, level); 4165 4166 va_start(ap, fmt); 4167 p += vsnprintf(p, end - p, fmt, ap); 4168 va_end(ap); 4169 4170 if (nxt_slow_path(p > end)) { 4171 memcpy(end - 5, "[...]", 5); 4172 p = end; 4173 } 4174 4175 *p++ = '\n'; 4176 4177 n = write(log_fd, msg, p - msg); 4178 if (nxt_slow_path(n < 0)) { 4179 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 4180 } 4181 } 4182 4183 4184 void 4185 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) 4186 { 4187 int log_fd, n; 4188 char msg[NXT_MAX_ERROR_STR], *p, *end; 4189 pid_t pid; 4190 va_list ap; 4191 nxt_unit_impl_t *lib; 4192 nxt_unit_request_info_impl_t *req_impl; 4193 4194 if (nxt_fast_path(req != NULL)) { 4195 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 4196 4197 pid = lib->pid; 4198 log_fd = lib->log_fd; 4199 4200 } else { 4201 pid = getpid(); 4202 log_fd = STDERR_FILENO; 4203 } 4204 4205 p = msg; 4206 end = p + sizeof(msg) - 1; 4207 4208 p = nxt_unit_snprint_prefix(p, end, pid, level); 4209 4210 if (nxt_fast_path(req != NULL)) { 4211 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 4212 4213 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream); 4214 } 4215 4216 va_start(ap, fmt); 4217 p += vsnprintf(p, end - p, fmt, ap); 4218 va_end(ap); 4219 4220 if (nxt_slow_path(p > end)) { 4221 memcpy(end - 5, "[...]", 5); 4222 p = end; 4223 } 4224 4225 *p++ = '\n'; 4226 4227 n = write(log_fd, msg, p - msg); 4228 if (nxt_slow_path(n < 0)) { 4229 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 4230 } 4231 } 4232 4233 4234 static const char * nxt_unit_log_levels[] = { 4235 "alert", 4236 "error", 4237 "warn", 4238 "notice", 4239 "info", 4240 "debug", 4241 }; 4242 4243 4244 static char * 4245 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level) 4246 { 4247 struct tm tm; 4248 struct timespec ts; 4249 4250 (void) clock_gettime(CLOCK_REALTIME, &ts); 4251 4252 #if (NXT_HAVE_LOCALTIME_R) 4253 (void) localtime_r(&ts.tv_sec, &tm); 4254 #else 4255 tm = *localtime(&ts.tv_sec); 4256 #endif 4257 4258 p += snprintf(p, end - p, 4259 "%4d/%02d/%02d %02d:%02d:%02d.%03d ", 4260 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 4261 tm.tm_hour, tm.tm_min, tm.tm_sec, 4262 (int) ts.tv_nsec / 1000000); 4263 4264 p += snprintf(p, end - p, 4265 "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level], 4266 (int) pid, 4267 (uint64_t) (uintptr_t) nxt_thread_get_tid()); 4268 4269 return p; 4270 } 4271 4272 4273 /* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */ 4274 4275 void * 4276 nxt_memalign(size_t alignment, size_t size) 4277 { 4278 void *p; 4279 nxt_err_t err; 4280 4281 err = posix_memalign(&p, alignment, size); 4282 4283 if (nxt_fast_path(err == 0)) { 4284 return p; 4285 } 4286 4287 return NULL; 4288 } 4289 4290 #if (NXT_DEBUG) 4291 4292 void 4293 nxt_free(void *p) 4294 { 4295 free(p); 4296 } 4297 4298 #endif 4299