1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include <stdlib.h> 7 8 #include "nxt_main.h" 9 #include "nxt_port_memory_int.h" 10 11 #include "nxt_unit.h" 12 #include "nxt_unit_request.h" 13 #include "nxt_unit_response.h" 14 15 #if (NXT_HAVE_MEMFD_CREATE) 16 #include <linux/memfd.h> 17 #endif 18 19 typedef struct nxt_unit_impl_s nxt_unit_impl_t; 20 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 21 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 22 typedef struct nxt_unit_process_s nxt_unit_process_t; 23 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 24 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 25 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 26 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 27 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; 28 29 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 30 static void nxt_unit_ctx_init(nxt_unit_impl_t *lib, 31 nxt_unit_ctx_impl_t *ctx_impl, void *data); 32 static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 33 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream); 34 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 35 uint32_t stream); 36 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 37 nxt_unit_ctx_t *ctx); 38 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 39 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); 40 static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, 41 nxt_unit_recv_msg_t *recv_msg); 42 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 43 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 44 static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, 45 nxt_unit_mmap_buf_t *mmap_buf, int last); 46 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 47 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, 48 nxt_chunk_id_t *c, int n); 49 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 50 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 51 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n); 52 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 53 int fd); 54 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, 55 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size, 56 nxt_unit_mmap_buf_t *mmap_buf); 57 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 58 59 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 60 static void nxt_unit_process_use(nxt_unit_ctx_t *ctx, 61 nxt_unit_process_t *process, int i); 62 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 63 static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, 64 nxt_unit_process_t *process, uint32_t id); 65 static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, 66 nxt_unit_recv_msg_t *recv_msg); 67 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, 68 nxt_unit_recv_msg_t *recv_msg, nxt_queue_t *incoming_buf); 69 static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, 70 uint32_t size); 71 72 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, 73 pid_t pid); 74 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, 75 pid_t pid, int remove); 76 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); 77 static int nxt_unit_run_once(nxt_unit_ctx_t *ctx); 78 static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, 79 nxt_unit_port_id_t *port_id, int *fd); 80 81 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, 82 nxt_unit_port_id_t *new_port, int fd); 83 84 static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, 85 nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port, 86 nxt_unit_process_t **process); 87 static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx, 88 nxt_unit_process_t *process); 89 90 static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, 91 nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, 92 const void *oob, size_t oob_size); 93 static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, 94 nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, 95 void *oob, size_t oob_size); 96 97 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 98 nxt_unit_port_t *port); 99 static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 100 nxt_unit_port_id_t *port_id, int remove); 101 102 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); 103 104 105 struct nxt_unit_mmap_buf_s { 106 nxt_unit_buf_t buf; 107 108 nxt_port_mmap_header_t *hdr; 109 nxt_queue_link_t link; 110 nxt_unit_port_id_t port_id; 111 nxt_unit_request_info_t *req; 112 nxt_unit_ctx_impl_t *ctx_impl; 113 }; 114 115 116 struct nxt_unit_recv_msg_s { 117 nxt_port_msg_t port_msg; 118 119 void *start; 120 uint32_t size; 121 122 nxt_unit_process_t *process; 123 }; 124 125 126 typedef enum { 127 NXT_UNIT_RS_START = 0, 128 NXT_UNIT_RS_RESPONSE_INIT, 129 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 130 NXT_UNIT_RS_RESPONSE_SENT, 131 NXT_UNIT_RS_DONE, 132 } nxt_unit_req_state_t; 133 134 135 struct nxt_unit_request_info_impl_s { 136 nxt_unit_request_info_t req; 137 138 nxt_unit_recv_msg_t recv_msg; 139 nxt_queue_t outgoing_buf; /* of nxt_unit_mmap_buf_t */ 140 nxt_queue_t incoming_buf; /* of nxt_unit_mmap_buf_t */ 141 142 nxt_unit_req_state_t state; 143 144 nxt_queue_link_t link; 145 146 char extra_data[]; 147 }; 148 149 150 struct nxt_unit_ctx_impl_s { 151 nxt_unit_ctx_t ctx; 152 153 nxt_unit_port_id_t read_port_id; 154 int read_port_fd; 155 156 nxt_queue_link_t link; 157 158 nxt_queue_t free_buf; /* of nxt_unit_mmap_buf_t */ 159 160 /* of nxt_unit_request_info_impl_t */ 161 nxt_queue_t free_req; 162 163 /* of nxt_unit_request_info_impl_t */ 164 nxt_queue_t active_req; 165 166 nxt_unit_mmap_buf_t ctx_buf[2]; 167 168 nxt_unit_request_info_impl_t req; 169 }; 170 171 172 struct nxt_unit_impl_s { 173 nxt_unit_t unit; 174 nxt_unit_callbacks_t callbacks; 175 176 uint32_t request_data_size; 177 178 pthread_mutex_t mutex; 179 180 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ 181 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ 182 183 nxt_unit_port_id_t ready_port_id; 184 185 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ 186 187 pid_t pid; 188 int log_fd; 189 int online; 190 191 nxt_unit_ctx_impl_t main_ctx; 192 }; 193 194 195 struct nxt_unit_port_impl_s { 196 nxt_unit_port_t port; 197 198 nxt_queue_link_t link; 199 nxt_unit_process_t *process; 200 }; 201 202 203 struct nxt_unit_mmap_s { 204 nxt_port_mmap_header_t *hdr; 205 }; 206 207 208 struct nxt_unit_mmaps_s { 209 pthread_mutex_t mutex; 210 uint32_t size; 211 uint32_t cap; 212 nxt_unit_mmap_t *elts; 213 }; 214 215 216 struct nxt_unit_process_s { 217 pid_t pid; 218 219 nxt_queue_t ports; 220 221 nxt_unit_mmaps_t incoming; 222 nxt_unit_mmaps_t outgoing; 223 224 nxt_unit_impl_t *lib; 225 226 nxt_atomic_t use_count; 227 228 uint32_t next_port_id; 229 }; 230 231 232 /* Explicitly using 32 bit types to avoid possible alignment. */ 233 typedef struct { 234 int32_t pid; 235 uint32_t id; 236 } nxt_unit_port_hash_id_t; 237 238 239 nxt_unit_ctx_t * 240 nxt_unit_init(nxt_unit_init_t *init) 241 { 242 int rc; 243 uint32_t ready_stream; 244 nxt_unit_ctx_t *ctx; 245 nxt_unit_impl_t *lib; 246 nxt_unit_port_t ready_port, read_port; 247 248 lib = nxt_unit_create(init); 249 if (nxt_slow_path(lib == NULL)) { 250 return NULL; 251 } 252 253 if (init->ready_port.id.pid != 0 254 && init->ready_stream != 0 255 && init->read_port.id.pid != 0) 256 { 257 ready_port = init->ready_port; 258 ready_stream = init->ready_stream; 259 read_port = init->read_port; 260 lib->log_fd = init->log_fd; 261 262 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, 263 ready_port.id.id); 264 nxt_unit_port_id_init(&read_port.id, read_port.id.pid, 265 read_port.id.id); 266 } else { 267 rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd, 268 &ready_stream); 269 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 270 goto fail; 271 } 272 } 273 274 ctx = &lib->main_ctx.ctx; 275 276 rc = lib->callbacks.add_port(ctx, &ready_port); 277 if (rc != NXT_UNIT_OK) { 278 nxt_unit_alert(NULL, "failed to add ready_port"); 279 280 goto fail; 281 } 282 283 rc = lib->callbacks.add_port(ctx, &read_port); 284 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 285 nxt_unit_alert(NULL, "failed to add read_port"); 286 287 goto fail; 288 } 289 290 lib->main_ctx.read_port_id = read_port.id; 291 lib->ready_port_id = ready_port.id; 292 293 rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream); 294 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 295 nxt_unit_alert(NULL, "failed to send READY message"); 296 297 goto fail; 298 } 299 300 return ctx; 301 302 fail: 303 304 free(lib); 305 306 return NULL; 307 } 308 309 310 static nxt_unit_impl_t * 311 nxt_unit_create(nxt_unit_init_t *init) 312 { 313 int rc; 314 nxt_unit_impl_t *lib; 315 nxt_unit_callbacks_t *cb; 316 317 lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size); 318 if (nxt_slow_path(lib == NULL)) { 319 nxt_unit_alert(NULL, "failed to allocate unit struct"); 320 321 return NULL; 322 } 323 324 rc = pthread_mutex_init(&lib->mutex, NULL); 325 if (nxt_slow_path(rc != 0)) { 326 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 327 328 goto fail; 329 } 330 331 lib->unit.data = init->data; 332 lib->callbacks = init->callbacks; 333 334 lib->request_data_size = init->request_data_size; 335 336 lib->processes.slot = NULL; 337 lib->ports.slot = NULL; 338 339 lib->pid = getpid(); 340 lib->log_fd = STDERR_FILENO; 341 lib->online = 1; 342 343 nxt_queue_init(&lib->contexts); 344 345 nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); 346 347 cb = &lib->callbacks; 348 349 if (cb->request_handler == NULL) { 350 nxt_unit_alert(NULL, "request_handler is NULL"); 351 352 goto fail; 353 } 354 355 if (cb->add_port == NULL) { 356 cb->add_port = nxt_unit_add_port; 357 } 358 359 if (cb->remove_port == NULL) { 360 cb->remove_port = nxt_unit_remove_port; 361 } 362 363 if (cb->remove_pid == NULL) { 364 cb->remove_pid = nxt_unit_remove_pid; 365 } 366 367 if (cb->quit == NULL) { 368 cb->quit = nxt_unit_quit; 369 } 370 371 if (cb->port_send == NULL) { 372 cb->port_send = nxt_unit_port_send_default; 373 } 374 375 if (cb->port_recv == NULL) { 376 cb->port_recv = nxt_unit_port_recv_default; 377 } 378 379 return lib; 380 381 fail: 382 383 free(lib); 384 385 return NULL; 386 } 387 388 389 static void 390 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 391 void *data) 392 { 393 ctx_impl->ctx.data = data; 394 ctx_impl->ctx.unit = &lib->unit; 395 396 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 397 398 nxt_queue_init(&ctx_impl->free_buf); 399 nxt_queue_init(&ctx_impl->free_req); 400 nxt_queue_init(&ctx_impl->active_req); 401 402 nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0].link); 403 nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1].link); 404 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 405 406 ctx_impl->req.req.ctx = &ctx_impl->ctx; 407 ctx_impl->req.req.unit = &lib->unit; 408 409 ctx_impl->read_port_fd = -1; 410 } 411 412 413 static int 414 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, 415 int *log_fd, uint32_t *stream) 416 { 417 int rc; 418 int ready_fd, read_fd; 419 char *unit_init, *version_end; 420 long version_length; 421 int64_t ready_pid, read_pid; 422 uint32_t ready_stream, ready_id, read_id; 423 424 unit_init = getenv(NXT_UNIT_INIT_ENV); 425 if (nxt_slow_path(unit_init == NULL)) { 426 nxt_unit_alert(NULL, "%s is not in the current environment", 427 NXT_UNIT_INIT_ENV); 428 429 return NXT_UNIT_ERROR; 430 } 431 432 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); 433 434 version_length = nxt_length(NXT_VERSION); 435 436 version_end = strchr(unit_init, ';'); 437 if (version_end == NULL 438 || version_end - unit_init != version_length 439 || memcmp(unit_init, NXT_VERSION, version_length) != 0) 440 { 441 nxt_unit_alert(NULL, "version check error"); 442 443 return NXT_UNIT_ERROR; 444 } 445 446 rc = sscanf(version_end + 1, 447 "%"PRIu32";" 448 "%"PRId64",%"PRIu32",%d;" 449 "%"PRId64",%"PRIu32",%d;" 450 "%d", 451 &ready_stream, 452 &ready_pid, &ready_id, &ready_fd, 453 &read_pid, &read_id, &read_fd, 454 log_fd); 455 456 if (nxt_slow_path(rc != 8)) { 457 nxt_unit_alert(NULL, "failed to scan variables"); 458 459 return NXT_UNIT_ERROR; 460 } 461 462 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); 463 464 ready_port->in_fd = -1; 465 ready_port->out_fd = ready_fd; 466 ready_port->data = NULL; 467 468 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); 469 470 read_port->in_fd = read_fd; 471 read_port->out_fd = -1; 472 read_port->data = NULL; 473 474 *stream = ready_stream; 475 476 return NXT_UNIT_OK; 477 } 478 479 480 static int 481 nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 482 uint32_t stream) 483 { 484 ssize_t res; 485 nxt_port_msg_t msg; 486 nxt_unit_impl_t *lib; 487 488 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 489 490 msg.stream = stream; 491 msg.pid = lib->pid; 492 msg.reply_port = 0; 493 msg.type = _NXT_PORT_MSG_PROCESS_READY; 494 msg.last = 1; 495 msg.mmap = 0; 496 msg.nf = 0; 497 msg.mf = 0; 498 msg.tracking = 0; 499 500 res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); 501 if (res != sizeof(msg)) { 502 return NXT_UNIT_ERROR; 503 } 504 505 return NXT_UNIT_OK; 506 } 507 508 509 int 510 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 511 void *buf, size_t buf_size, void *oob, size_t oob_size) 512 { 513 int fd, rc; 514 pid_t pid; 515 nxt_queue_t incoming_buf; 516 struct cmsghdr *cm; 517 nxt_port_msg_t *port_msg; 518 nxt_unit_impl_t *lib; 519 nxt_unit_port_t new_port; 520 nxt_queue_link_t *lnk; 521 nxt_unit_request_t *r; 522 nxt_unit_mmap_buf_t *b; 523 nxt_unit_recv_msg_t recv_msg; 524 nxt_unit_callbacks_t *cb; 525 nxt_port_msg_new_port_t *new_port_msg; 526 nxt_unit_request_info_t *req; 527 nxt_unit_request_info_impl_t *req_impl; 528 529 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 530 531 rc = NXT_UNIT_ERROR; 532 fd = -1; 533 recv_msg.process = NULL; 534 port_msg = buf; 535 cm = oob; 536 537 if (oob_size >= CMSG_SPACE(sizeof(int)) 538 && cm->cmsg_len == CMSG_LEN(sizeof(int)) 539 && cm->cmsg_level == SOL_SOCKET 540 && cm->cmsg_type == SCM_RIGHTS) 541 { 542 memcpy(&fd, CMSG_DATA(cm), sizeof(int)); 543 } 544 545 if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { 546 nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); 547 goto fail; 548 } 549 550 recv_msg.port_msg = *port_msg; 551 recv_msg.start = port_msg + 1; 552 recv_msg.size = buf_size - sizeof(nxt_port_msg_t); 553 554 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 555 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", 556 port_msg->stream, (int) port_msg->type); 557 goto fail; 558 } 559 560 if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) { 561 rc = NXT_UNIT_OK; 562 563 goto fail; 564 } 565 566 /* Fragmentation is unsupported. */ 567 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 568 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", 569 port_msg->stream, (int) port_msg->type); 570 goto fail; 571 } 572 573 if (port_msg->mmap) { 574 nxt_queue_init(&incoming_buf); 575 576 if (nxt_unit_mmap_read(ctx, &recv_msg, &incoming_buf) != NXT_UNIT_OK) { 577 goto fail; 578 } 579 } 580 581 cb = &lib->callbacks; 582 583 switch (port_msg->type) { 584 585 case _NXT_PORT_MSG_QUIT: 586 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); 587 588 cb->quit(ctx); 589 rc = NXT_UNIT_OK; 590 break; 591 592 case _NXT_PORT_MSG_NEW_PORT: 593 if (nxt_slow_path(recv_msg.size != sizeof(nxt_port_msg_new_port_t))) { 594 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 595 "invalid message size (%d)", 596 port_msg->stream, (int) recv_msg.size); 597 598 goto fail; 599 } 600 601 new_port_msg = recv_msg.start; 602 603 nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", 604 port_msg->stream, (int) new_port_msg->pid, 605 (int) new_port_msg->id, fd); 606 607 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 608 new_port_msg->id); 609 610 new_port.in_fd = -1; 611 new_port.out_fd = fd; 612 new_port.data = NULL; 613 614 fd = -1; 615 616 rc = cb->add_port(ctx, &new_port); 617 break; 618 619 case _NXT_PORT_MSG_CHANGE_FILE: 620 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 621 port_msg->stream, fd); 622 break; 623 624 case _NXT_PORT_MSG_MMAP: 625 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, fd); 626 break; 627 628 case _NXT_PORT_MSG_DATA: 629 if (nxt_slow_path(port_msg->mmap == 0)) { 630 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 631 port_msg->stream); 632 633 goto fail; 634 } 635 636 if (nxt_slow_path(recv_msg.size < sizeof(nxt_unit_request_t))) { 637 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 638 "%d expected", port_msg->stream, (int) recv_msg.size, 639 (int) sizeof(nxt_unit_request_t)); 640 641 goto fail; 642 } 643 644 req_impl = nxt_unit_request_info_get(ctx); 645 if (nxt_slow_path(req_impl == NULL)) { 646 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 647 port_msg->stream); 648 649 goto fail; 650 } 651 652 req = &req_impl->req; 653 654 req->request_port = *port_id; 655 656 nxt_unit_port_id_init(&req->response_port, port_msg->pid, 657 port_msg->reply_port); 658 659 req->request = recv_msg.start; 660 661 lnk = nxt_queue_first(&incoming_buf); 662 b = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link); 663 664 req->request_buf = &b->buf; 665 req->response = NULL; 666 req->response_buf = NULL; 667 668 r = req->request; 669 670 req->content_length = r->content_length; 671 672 req->content_buf = req->request_buf; 673 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); 674 675 /* Move process to req_impl. */ 676 req_impl->recv_msg = recv_msg; 677 678 recv_msg.process = NULL; 679 680 nxt_queue_init(&req_impl->outgoing_buf); 681 nxt_queue_init(&req_impl->incoming_buf); 682 683 nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link) 684 { 685 b->req = req; 686 } nxt_queue_loop; 687 688 nxt_queue_add(&req_impl->incoming_buf, &incoming_buf); 689 nxt_queue_init(&incoming_buf); 690 691 req->response_max_fields = 0; 692 req_impl->state = NXT_UNIT_RS_START; 693 694 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", port_msg->stream, 695 (int) r->method_length, nxt_unit_sptr_get(&r->method), 696 (int) r->target_length, nxt_unit_sptr_get(&r->target), 697 (int) r->content_length); 698 699 cb->request_handler(req); 700 701 rc = NXT_UNIT_OK; 702 break; 703 704 case _NXT_PORT_MSG_REMOVE_PID: 705 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 706 nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size " 707 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 708 (int) sizeof(pid)); 709 710 goto fail; 711 } 712 713 memcpy(&pid, recv_msg.start, sizeof(pid)); 714 715 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 716 port_msg->stream, (int) pid); 717 718 cb->remove_pid(ctx, pid); 719 720 rc = NXT_UNIT_OK; 721 break; 722 723 default: 724 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", 725 port_msg->stream, (int) port_msg->type); 726 727 goto fail; 728 } 729 730 fail: 731 732 if (fd != -1) { 733 close(fd); 734 } 735 736 if (port_msg->mmap) { 737 nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link) 738 { 739 nxt_unit_mmap_release(b->hdr, b->buf.start, 740 b->buf.end - b->buf.start); 741 742 nxt_unit_mmap_buf_release(b); 743 } nxt_queue_loop; 744 } 745 746 if (recv_msg.process != NULL) { 747 nxt_unit_process_use(ctx, recv_msg.process, -1); 748 } 749 750 return rc; 751 } 752 753 754 static nxt_unit_request_info_impl_t * 755 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 756 { 757 nxt_unit_impl_t *lib; 758 nxt_queue_link_t *lnk; 759 nxt_unit_ctx_impl_t *ctx_impl; 760 nxt_unit_request_info_impl_t *req_impl; 761 762 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 763 764 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 765 766 if (nxt_queue_is_empty(&ctx_impl->free_req)) { 767 req_impl = malloc(sizeof(nxt_unit_request_info_impl_t) 768 + lib->request_data_size); 769 if (nxt_slow_path(req_impl == NULL)) { 770 nxt_unit_warn(ctx, "request info allocation failed"); 771 772 return NULL; 773 } 774 775 req_impl->req.unit = ctx->unit; 776 req_impl->req.ctx = ctx; 777 778 } else { 779 lnk = nxt_queue_first(&ctx_impl->free_req); 780 nxt_queue_remove(lnk); 781 782 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link); 783 } 784 785 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); 786 787 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; 788 789 return req_impl; 790 } 791 792 793 static void 794 nxt_unit_request_info_release(nxt_unit_request_info_t *req) 795 { 796 nxt_unit_mmap_buf_t *b; 797 nxt_unit_ctx_impl_t *ctx_impl; 798 nxt_unit_recv_msg_t *recv_msg; 799 nxt_unit_request_info_impl_t *req_impl; 800 801 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 802 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 803 804 req->response = NULL; 805 req->response_buf = NULL; 806 807 recv_msg = &req_impl->recv_msg; 808 809 if (recv_msg->process != NULL) { 810 nxt_unit_process_use(req->ctx, recv_msg->process, -1); 811 812 recv_msg->process = NULL; 813 } 814 815 nxt_queue_each(b, &req_impl->outgoing_buf, nxt_unit_mmap_buf_t, link) { 816 817 nxt_unit_buf_free(&b->buf); 818 819 } nxt_queue_loop; 820 821 nxt_queue_each(b, &req_impl->incoming_buf, nxt_unit_mmap_buf_t, link) { 822 823 nxt_unit_mmap_release(b->hdr, b->buf.start, b->buf.end - b->buf.start); 824 nxt_unit_mmap_buf_release(b); 825 826 } nxt_queue_loop; 827 828 nxt_queue_remove(&req_impl->link); 829 830 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); 831 } 832 833 834 static void 835 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 836 { 837 nxt_unit_ctx_impl_t *ctx_impl; 838 839 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 840 841 nxt_queue_remove(&req_impl->link); 842 843 if (req_impl != &ctx_impl->req) { 844 free(req_impl); 845 } 846 } 847 848 849 uint16_t 850 nxt_unit_field_hash(const char *name, size_t name_length) 851 { 852 u_char ch; 853 uint32_t hash; 854 const char *p, *end; 855 856 hash = 159406; /* Magic value copied from nxt_http_parse.c */ 857 end = name + name_length; 858 859 for (p = name; p < end; p++) { 860 ch = *p; 861 hash = (hash << 4) + hash + nxt_lowcase(ch); 862 } 863 864 hash = (hash >> 16) ^ hash; 865 866 return hash; 867 } 868 869 870 void 871 nxt_unit_split_host(char *host, uint32_t host_length, 872 char **name, uint32_t *name_length, char **port, uint32_t *port_length) 873 { 874 char *cpos; 875 876 static char default_host[] = "localhost"; 877 static char default_port[] = "80"; 878 879 if (nxt_slow_path(host == NULL || host_length == 0)) { 880 *name = default_host; 881 *name_length = nxt_length(default_host); 882 883 *port = default_port; 884 *port_length = nxt_length(default_port); 885 886 return; 887 } 888 889 cpos = memchr(host, ':', host_length); 890 891 if (nxt_slow_path(cpos == NULL)) { 892 *name = host; 893 *name_length = host_length; 894 895 *port = default_port; 896 *port_length = nxt_length(default_port); 897 898 return; 899 } 900 901 if (nxt_slow_path(cpos == host)) { 902 *name = default_host; 903 *name_length = nxt_length(default_host); 904 905 } else { 906 *name = host; 907 *name_length = cpos - host; 908 } 909 910 cpos++; 911 912 if (nxt_slow_path(host + host_length == cpos)) { 913 *port = default_port; 914 *port_length = nxt_length(default_port); 915 916 } else { 917 *port = cpos; 918 *port_length = host_length - (cpos - host); 919 } 920 } 921 922 923 void 924 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) 925 { 926 uint32_t i, j; 927 nxt_unit_field_t *fields, f; 928 nxt_unit_request_t *r; 929 930 nxt_unit_req_debug(req, "group_dup_fields"); 931 932 r = req->request; 933 fields = r->fields; 934 935 for (i = 0; i < r->fields_count; i++) { 936 937 switch (fields[i].hash) { 938 case NXT_UNIT_HASH_HOST: 939 r->host_field = i; 940 break; 941 942 case NXT_UNIT_HASH_CONTENT_LENGTH: 943 r->content_length_field = i; 944 break; 945 946 case NXT_UNIT_HASH_CONTENT_TYPE: 947 r->content_type_field = i; 948 break; 949 950 case NXT_UNIT_HASH_COOKIE: 951 r->cookie_field = i; 952 break; 953 }; 954 955 for (j = i + 1; j < r->fields_count; j++) { 956 if (fields[i].hash != fields[j].hash) { 957 continue; 958 } 959 960 if (j == i + 1) { 961 continue; 962 } 963 964 f = fields[j]; 965 f.name.offset += (j - (i + 1)) * sizeof(f); 966 f.value.offset += (j - (i + 1)) * sizeof(f); 967 968 while (j > i + 1) { 969 fields[j] = fields[j - 1]; 970 fields[j].name.offset -= sizeof(f); 971 fields[j].value.offset -= sizeof(f); 972 j--; 973 } 974 975 fields[j] = f; 976 977 i++; 978 } 979 } 980 } 981 982 983 int 984 nxt_unit_response_init(nxt_unit_request_info_t *req, 985 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size) 986 { 987 uint32_t buf_size; 988 nxt_unit_buf_t *buf; 989 nxt_unit_request_info_impl_t *req_impl; 990 991 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 992 993 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 994 nxt_unit_req_warn(req, "init: response already sent"); 995 996 return NXT_UNIT_ERROR; 997 } 998 999 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status, 1000 (int) max_fields_count, (int) max_fields_size); 1001 1002 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) { 1003 nxt_unit_req_debug(req, "duplicate response init"); 1004 } 1005 1006 buf_size = sizeof(nxt_unit_response_t) 1007 + max_fields_count * sizeof(nxt_unit_field_t) 1008 + max_fields_size; 1009 1010 if (nxt_slow_path(req->response_buf != NULL)) { 1011 buf = req->response_buf; 1012 1013 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) { 1014 goto init_response; 1015 } 1016 1017 nxt_unit_buf_free(buf); 1018 1019 req->response_buf = NULL; 1020 req->response = NULL; 1021 req->response_max_fields = 0; 1022 1023 req_impl->state = NXT_UNIT_RS_START; 1024 } 1025 1026 buf = nxt_unit_response_buf_alloc(req, buf_size); 1027 if (nxt_slow_path(buf == NULL)) { 1028 return NXT_UNIT_ERROR; 1029 } 1030 1031 init_response: 1032 1033 memset(buf->start, 0, sizeof(nxt_unit_response_t)); 1034 1035 req->response_buf = buf; 1036 1037 req->response = (nxt_unit_response_t *) buf->start; 1038 req->response->status = status; 1039 1040 buf->free = buf->start + sizeof(nxt_unit_response_t) 1041 + max_fields_count * sizeof(nxt_unit_field_t); 1042 1043 req->response_max_fields = max_fields_count; 1044 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT; 1045 1046 return NXT_UNIT_OK; 1047 } 1048 1049 1050 int 1051 nxt_unit_response_realloc(nxt_unit_request_info_t *req, 1052 uint32_t max_fields_count, uint32_t max_fields_size) 1053 { 1054 char *p; 1055 uint32_t i, buf_size; 1056 nxt_unit_buf_t *buf; 1057 nxt_unit_field_t *f, *src; 1058 nxt_unit_response_t *resp; 1059 nxt_unit_request_info_impl_t *req_impl; 1060 1061 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1062 1063 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1064 nxt_unit_req_warn(req, "realloc: response not init"); 1065 1066 return NXT_UNIT_ERROR; 1067 } 1068 1069 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1070 nxt_unit_req_warn(req, "realloc: response already sent"); 1071 1072 return NXT_UNIT_ERROR; 1073 } 1074 1075 if (nxt_slow_path(max_fields_count < req->response->fields_count)) { 1076 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small"); 1077 1078 return NXT_UNIT_ERROR; 1079 } 1080 1081 buf_size = sizeof(nxt_unit_response_t) 1082 + max_fields_count * sizeof(nxt_unit_field_t) 1083 + max_fields_size; 1084 1085 buf = nxt_unit_response_buf_alloc(req, buf_size); 1086 if (nxt_slow_path(buf == NULL)) { 1087 return NXT_UNIT_ERROR; 1088 } 1089 1090 resp = (nxt_unit_response_t *) buf->start; 1091 1092 memset(resp, 0, sizeof(nxt_unit_response_t)); 1093 1094 resp->status = req->response->status; 1095 resp->content_length = req->response->content_length; 1096 1097 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t); 1098 f = resp->fields; 1099 1100 for (i = 0; i < req->response->fields_count; i++) { 1101 src = req->request->fields + i; 1102 1103 if (nxt_slow_path(src->skip != 0)) { 1104 continue; 1105 } 1106 1107 if (nxt_slow_path(src->name_length + src->value_length 1108 > (uint32_t) (buf->end - p))) 1109 { 1110 goto fail; 1111 } 1112 1113 nxt_unit_sptr_set(&f->name, p); 1114 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length); 1115 1116 nxt_unit_sptr_set(&f->value, p); 1117 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length); 1118 1119 f->hash = src->hash; 1120 f->skip = 0; 1121 f->name_length = src->name_length; 1122 f->value_length = src->value_length; 1123 1124 resp->fields_count++; 1125 f++; 1126 } 1127 1128 if (req->response->piggyback_content_length > 0) { 1129 if (nxt_slow_path(req->response->piggyback_content_length 1130 > (uint32_t) (buf->end - p))) 1131 { 1132 goto fail; 1133 } 1134 1135 resp->piggyback_content_length = req->response->piggyback_content_length; 1136 1137 nxt_unit_sptr_set(&resp->piggyback_content, p); 1138 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), 1139 req->response->piggyback_content_length); 1140 } 1141 1142 buf->free = p; 1143 1144 nxt_unit_buf_free(req->response_buf); 1145 1146 req->response = resp; 1147 req->response_buf = buf; 1148 req->response_max_fields = max_fields_count; 1149 1150 return NXT_UNIT_OK; 1151 1152 fail: 1153 1154 nxt_unit_buf_free(buf); 1155 1156 return NXT_UNIT_ERROR; 1157 } 1158 1159 1160 int 1161 nxt_unit_response_is_init(nxt_unit_request_info_t *req) 1162 { 1163 nxt_unit_request_info_impl_t *req_impl; 1164 1165 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1166 1167 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT; 1168 } 1169 1170 1171 int 1172 nxt_unit_response_add_field(nxt_unit_request_info_t *req, 1173 const char *name, uint8_t name_length, 1174 const char *value, uint32_t value_length) 1175 { 1176 nxt_unit_buf_t *buf; 1177 nxt_unit_field_t *f; 1178 nxt_unit_response_t *resp; 1179 nxt_unit_request_info_impl_t *req_impl; 1180 1181 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1182 1183 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) { 1184 nxt_unit_req_warn(req, "add_field: response not initialized or " 1185 "already sent"); 1186 1187 return NXT_UNIT_ERROR; 1188 } 1189 1190 resp = req->response; 1191 1192 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { 1193 nxt_unit_req_warn(req, "add_field: too many response fields"); 1194 1195 return NXT_UNIT_ERROR; 1196 } 1197 1198 buf = req->response_buf; 1199 1200 if (nxt_slow_path(name_length + value_length 1201 > (uint32_t) (buf->end - buf->free))) 1202 { 1203 nxt_unit_req_warn(req, "add_field: response buffer overflow"); 1204 1205 return NXT_UNIT_ERROR; 1206 } 1207 1208 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s", 1209 resp->fields_count, 1210 (int) name_length, name, 1211 (int) value_length, value); 1212 1213 f = resp->fields + resp->fields_count; 1214 1215 nxt_unit_sptr_set(&f->name, buf->free); 1216 buf->free = nxt_cpymem(buf->free, name, name_length); 1217 1218 nxt_unit_sptr_set(&f->value, buf->free); 1219 buf->free = nxt_cpymem(buf->free, value, value_length); 1220 1221 f->hash = nxt_unit_field_hash(name, name_length); 1222 f->skip = 0; 1223 f->name_length = name_length; 1224 f->value_length = value_length; 1225 1226 resp->fields_count++; 1227 1228 return NXT_UNIT_OK; 1229 } 1230 1231 1232 int 1233 nxt_unit_response_add_content(nxt_unit_request_info_t *req, 1234 const void* src, uint32_t size) 1235 { 1236 nxt_unit_buf_t *buf; 1237 nxt_unit_response_t *resp; 1238 nxt_unit_request_info_impl_t *req_impl; 1239 1240 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1241 1242 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1243 nxt_unit_req_warn(req, "add_content: response not initialized yet"); 1244 1245 return NXT_UNIT_ERROR; 1246 } 1247 1248 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1249 nxt_unit_req_warn(req, "add_content: response already sent"); 1250 1251 return NXT_UNIT_ERROR; 1252 } 1253 1254 buf = req->response_buf; 1255 1256 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) { 1257 nxt_unit_req_warn(req, "add_content: buffer overflow"); 1258 1259 return NXT_UNIT_ERROR; 1260 } 1261 1262 resp = req->response; 1263 1264 if (resp->piggyback_content_length == 0) { 1265 nxt_unit_sptr_set(&resp->piggyback_content, buf->free); 1266 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT; 1267 } 1268 1269 resp->piggyback_content_length += size; 1270 1271 buf->free = nxt_cpymem(buf->free, src, size); 1272 1273 return NXT_UNIT_OK; 1274 } 1275 1276 1277 int 1278 nxt_unit_response_send(nxt_unit_request_info_t *req) 1279 { 1280 int rc; 1281 nxt_unit_mmap_buf_t *mmap_buf; 1282 nxt_unit_request_info_impl_t *req_impl; 1283 1284 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1285 1286 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1287 nxt_unit_req_warn(req, "send: response is not initialized yet"); 1288 1289 return NXT_UNIT_ERROR; 1290 } 1291 1292 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1293 nxt_unit_req_warn(req, "send: response already sent"); 1294 1295 return NXT_UNIT_ERROR; 1296 } 1297 1298 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 1299 req->response->fields_count, 1300 (int) (req->response_buf->free 1301 - req->response_buf->start)); 1302 1303 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 1304 1305 rc = nxt_unit_mmap_buf_send(req->ctx, 1306 req_impl->recv_msg.port_msg.stream, 1307 mmap_buf, 0); 1308 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 1309 req->response = NULL; 1310 req->response_buf = NULL; 1311 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 1312 1313 nxt_unit_mmap_buf_release(mmap_buf); 1314 } 1315 1316 return rc; 1317 } 1318 1319 1320 int 1321 nxt_unit_response_is_sent(nxt_unit_request_info_t *req) 1322 { 1323 nxt_unit_request_info_impl_t *req_impl; 1324 1325 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1326 1327 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 1328 } 1329 1330 1331 nxt_unit_buf_t * 1332 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 1333 { 1334 int rc; 1335 nxt_unit_process_t *process; 1336 nxt_unit_mmap_buf_t *mmap_buf; 1337 nxt_unit_request_info_impl_t *req_impl; 1338 1339 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 1340 nxt_unit_req_warn(req, "response_buf_alloc: " 1341 "requested buffer (%"PRIu32") too big", size); 1342 1343 return NULL; 1344 } 1345 1346 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 1347 1348 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1349 1350 process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg); 1351 if (nxt_slow_path(process == NULL)) { 1352 return NULL; 1353 } 1354 1355 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 1356 if (nxt_slow_path(mmap_buf == NULL)) { 1357 return NULL; 1358 } 1359 1360 mmap_buf->req = req; 1361 1362 nxt_queue_insert_tail(&req_impl->outgoing_buf, &mmap_buf->link); 1363 1364 rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port, 1365 size, mmap_buf); 1366 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1367 nxt_unit_mmap_buf_release(mmap_buf); 1368 1369 return NULL; 1370 } 1371 1372 return &mmap_buf->buf; 1373 } 1374 1375 1376 static nxt_unit_process_t * 1377 nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1378 { 1379 nxt_unit_impl_t *lib; 1380 1381 if (recv_msg->process != NULL) { 1382 return recv_msg->process; 1383 } 1384 1385 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1386 1387 pthread_mutex_lock(&lib->mutex); 1388 1389 recv_msg->process = nxt_unit_process_find(ctx, recv_msg->port_msg.pid, 0); 1390 1391 pthread_mutex_unlock(&lib->mutex); 1392 1393 if (recv_msg->process == NULL) { 1394 nxt_unit_warn(ctx, "#%"PRIu32": process %d not found", 1395 recv_msg->port_msg.stream, (int) recv_msg->port_msg.pid); 1396 } 1397 1398 return recv_msg->process; 1399 } 1400 1401 1402 static nxt_unit_mmap_buf_t * 1403 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 1404 { 1405 nxt_queue_link_t *lnk; 1406 nxt_unit_mmap_buf_t *mmap_buf; 1407 nxt_unit_ctx_impl_t *ctx_impl; 1408 1409 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1410 1411 if (nxt_queue_is_empty(&ctx_impl->free_buf)) { 1412 mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t)); 1413 if (nxt_slow_path(mmap_buf == NULL)) { 1414 nxt_unit_warn(ctx, "failed to allocate buf"); 1415 } 1416 1417 } else { 1418 lnk = nxt_queue_first(&ctx_impl->free_buf); 1419 nxt_queue_remove(lnk); 1420 1421 mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link); 1422 } 1423 1424 mmap_buf->ctx_impl = ctx_impl; 1425 1426 return mmap_buf; 1427 } 1428 1429 1430 static void 1431 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 1432 { 1433 nxt_queue_remove(&mmap_buf->link); 1434 1435 nxt_queue_insert_tail(&mmap_buf->ctx_impl->free_buf, &mmap_buf->link); 1436 } 1437 1438 1439 int 1440 nxt_unit_buf_send(nxt_unit_buf_t *buf) 1441 { 1442 int rc; 1443 nxt_unit_mmap_buf_t *mmap_buf; 1444 nxt_unit_request_info_t *req; 1445 nxt_unit_request_info_impl_t *req_impl; 1446 1447 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 1448 1449 req = mmap_buf->req; 1450 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1451 1452 nxt_unit_req_debug(req, "buf_send: %d bytes", 1453 (int) (buf->free - buf->start)); 1454 1455 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1456 nxt_unit_req_warn(req, "buf_send: response not initialized yet"); 1457 1458 return NXT_UNIT_ERROR; 1459 } 1460 1461 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 1462 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 1463 1464 return NXT_UNIT_ERROR; 1465 } 1466 1467 if (nxt_fast_path(buf->free > buf->start)) { 1468 rc = nxt_unit_mmap_buf_send(req->ctx, 1469 req_impl->recv_msg.port_msg.stream, 1470 mmap_buf, 0); 1471 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1472 return rc; 1473 } 1474 } 1475 1476 nxt_unit_mmap_buf_release(mmap_buf); 1477 1478 return NXT_UNIT_OK; 1479 } 1480 1481 1482 static void 1483 nxt_unit_buf_send_done(nxt_unit_buf_t *buf) 1484 { 1485 int rc; 1486 nxt_unit_mmap_buf_t *mmap_buf; 1487 nxt_unit_request_info_t *req; 1488 nxt_unit_request_info_impl_t *req_impl; 1489 1490 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 1491 1492 req = mmap_buf->req; 1493 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1494 1495 rc = nxt_unit_mmap_buf_send(req->ctx, 1496 req_impl->recv_msg.port_msg.stream, 1497 mmap_buf, 1); 1498 1499 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 1500 nxt_unit_mmap_buf_release(mmap_buf); 1501 1502 nxt_unit_request_info_release(req); 1503 1504 } else { 1505 nxt_unit_request_done(req, rc); 1506 } 1507 } 1508 1509 1510 static int 1511 nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, 1512 nxt_unit_mmap_buf_t *mmap_buf, int last) 1513 { 1514 struct { 1515 nxt_port_msg_t msg; 1516 nxt_port_mmap_msg_t mmap_msg; 1517 } m; 1518 1519 u_char *end, *last_used, *first_free; 1520 ssize_t res; 1521 nxt_chunk_id_t first_free_chunk; 1522 nxt_unit_buf_t *buf; 1523 nxt_unit_impl_t *lib; 1524 nxt_port_mmap_header_t *hdr; 1525 1526 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1527 1528 buf = &mmap_buf->buf; 1529 1530 m.mmap_msg.size = buf->free - buf->start; 1531 1532 m.msg.stream = stream; 1533 m.msg.pid = lib->pid; 1534 m.msg.reply_port = 0; 1535 m.msg.type = _NXT_PORT_MSG_DATA; 1536 m.msg.last = last != 0; 1537 m.msg.mmap = m.mmap_msg.size > 0; 1538 m.msg.nf = 0; 1539 m.msg.mf = 0; 1540 m.msg.tracking = 0; 1541 1542 hdr = mmap_buf->hdr; 1543 1544 m.mmap_msg.mmap_id = hdr->id; 1545 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); 1546 1547 nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 1548 stream, 1549 (int) m.mmap_msg.mmap_id, 1550 (int) m.mmap_msg.chunk_id, 1551 (int) m.mmap_msg.size); 1552 1553 res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, 1554 m.mmap_msg.size > 0 ? sizeof(m) 1555 : sizeof(m.msg), 1556 NULL, 0); 1557 if (nxt_slow_path(res != sizeof(m))) { 1558 return NXT_UNIT_ERROR; 1559 } 1560 1561 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { 1562 last_used = (u_char *) buf->free - 1; 1563 1564 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 1565 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 1566 end = (u_char *) buf->end; 1567 1568 nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free)); 1569 1570 buf->end = (char *) first_free; 1571 } 1572 1573 return NXT_UNIT_OK; 1574 } 1575 1576 1577 void 1578 nxt_unit_buf_free(nxt_unit_buf_t *buf) 1579 { 1580 nxt_unit_mmap_buf_t *mmap_buf; 1581 1582 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 1583 1584 nxt_unit_mmap_release(mmap_buf->hdr, buf->start, buf->end - buf->start); 1585 1586 nxt_unit_mmap_buf_release(mmap_buf); 1587 } 1588 1589 1590 nxt_unit_buf_t * 1591 nxt_unit_buf_next(nxt_unit_buf_t *buf) 1592 { 1593 nxt_queue_link_t *lnk; 1594 nxt_unit_mmap_buf_t *mmap_buf; 1595 nxt_unit_request_info_impl_t *req_impl; 1596 1597 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 1598 req_impl = nxt_container_of(mmap_buf->req, nxt_unit_request_info_impl_t, 1599 req); 1600 1601 lnk = &mmap_buf->link; 1602 1603 if (lnk == nxt_queue_last(&req_impl->incoming_buf)) { 1604 return NULL; 1605 } 1606 1607 lnk = nxt_queue_next(lnk); 1608 mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link); 1609 1610 return &mmap_buf->buf; 1611 } 1612 1613 1614 uint32_t 1615 nxt_unit_buf_max(void) 1616 { 1617 return PORT_MMAP_DATA_SIZE; 1618 } 1619 1620 1621 uint32_t 1622 nxt_unit_buf_min(void) 1623 { 1624 return PORT_MMAP_CHUNK_SIZE; 1625 } 1626 1627 1628 int 1629 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 1630 size_t size) 1631 { 1632 int rc; 1633 uint32_t part_size; 1634 const char *part_start; 1635 nxt_unit_process_t *process; 1636 nxt_unit_mmap_buf_t mmap_buf; 1637 nxt_unit_request_info_impl_t *req_impl; 1638 1639 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1640 1641 part_start = start; 1642 1643 /* Check if response is not send yet. */ 1644 if (nxt_slow_path(req->response_buf)) { 1645 part_size = req->response_buf->end - req->response_buf->free; 1646 part_size = nxt_min(size, part_size); 1647 1648 rc = nxt_unit_response_add_content(req, part_start, part_size); 1649 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1650 return rc; 1651 } 1652 1653 rc = nxt_unit_response_send(req); 1654 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1655 return rc; 1656 } 1657 1658 size -= part_size; 1659 part_start += part_size; 1660 } 1661 1662 process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg); 1663 if (nxt_slow_path(process == NULL)) { 1664 return NXT_UNIT_ERROR; 1665 } 1666 1667 while (size > 0) { 1668 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 1669 1670 rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port, 1671 part_size, &mmap_buf); 1672 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1673 return rc; 1674 } 1675 1676 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 1677 part_start, part_size); 1678 1679 rc = nxt_unit_mmap_buf_send(req->ctx, 1680 req_impl->recv_msg.port_msg.stream, 1681 &mmap_buf, 0); 1682 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1683 nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start, 1684 mmap_buf.buf.end - mmap_buf.buf.start); 1685 1686 return rc; 1687 } 1688 1689 size -= part_size; 1690 part_start += part_size; 1691 } 1692 1693 return NXT_UNIT_OK; 1694 } 1695 1696 1697 int 1698 nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 1699 nxt_unit_read_info_t *read_info) 1700 { 1701 int rc; 1702 ssize_t n; 1703 nxt_unit_buf_t *buf; 1704 1705 /* Check if response is not send yet. */ 1706 if (nxt_slow_path(req->response_buf)) { 1707 1708 /* Enable content in headers buf. */ 1709 rc = nxt_unit_response_add_content(req, "", 0); 1710 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1711 nxt_unit_req_error(req, "Failed to add piggyback content"); 1712 1713 return rc; 1714 } 1715 1716 buf = req->response_buf; 1717 1718 while (buf->end - buf->free > 0) { 1719 n = read_info->read(read_info, buf->free, buf->end - buf->free); 1720 if (nxt_slow_path(n < 0)) { 1721 nxt_unit_req_error(req, "Read error"); 1722 1723 return NXT_UNIT_ERROR; 1724 } 1725 1726 /* Manually increase sizes. */ 1727 buf->free += n; 1728 req->response->piggyback_content_length += n; 1729 1730 if (read_info->eof) { 1731 break; 1732 } 1733 } 1734 1735 rc = nxt_unit_response_send(req); 1736 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1737 nxt_unit_req_error(req, "Failed to send headers with content"); 1738 1739 return rc; 1740 } 1741 1742 if (read_info->eof) { 1743 return NXT_UNIT_OK; 1744 } 1745 } 1746 1747 while (!read_info->eof) { 1748 buf = nxt_unit_response_buf_alloc(req, nxt_min(read_info->buf_size, 1749 PORT_MMAP_DATA_SIZE)); 1750 if (nxt_slow_path(buf == NULL)) { 1751 nxt_unit_req_error(req, "Failed to allocate buf for content"); 1752 1753 return NXT_UNIT_ERROR; 1754 } 1755 1756 while (!read_info->eof && buf->end > buf->free) { 1757 n = read_info->read(read_info, buf->free, buf->end - buf->free); 1758 if (nxt_slow_path(n < 0)) { 1759 nxt_unit_req_error(req, "Read error"); 1760 1761 nxt_unit_buf_free(buf); 1762 1763 return NXT_UNIT_ERROR; 1764 } 1765 1766 buf->free += n; 1767 } 1768 1769 rc = nxt_unit_buf_send(buf); 1770 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1771 nxt_unit_req_error(req, "Failed to send content"); 1772 1773 return rc; 1774 } 1775 } 1776 1777 return NXT_UNIT_OK; 1778 } 1779 1780 1781 ssize_t 1782 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 1783 { 1784 u_char *p; 1785 size_t rest, copy, read; 1786 nxt_unit_buf_t *buf; 1787 1788 p = dst; 1789 rest = size; 1790 1791 buf = req->content_buf; 1792 1793 while (buf != NULL) { 1794 copy = buf->end - buf->free; 1795 copy = nxt_min(rest, copy); 1796 1797 p = nxt_cpymem(p, buf->free, copy); 1798 1799 buf->free += copy; 1800 rest -= copy; 1801 1802 if (rest == 0) { 1803 if (buf->end == buf->free) { 1804 buf = nxt_unit_buf_next(buf); 1805 } 1806 1807 break; 1808 } 1809 1810 buf = nxt_unit_buf_next(buf); 1811 } 1812 1813 req->content_buf = buf; 1814 1815 read = size - rest; 1816 1817 req->content_length -= read; 1818 1819 return read; 1820 } 1821 1822 1823 void 1824 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 1825 { 1826 ssize_t res; 1827 uint32_t size; 1828 nxt_port_msg_t msg; 1829 nxt_unit_impl_t *lib; 1830 nxt_unit_request_info_impl_t *req_impl; 1831 1832 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1833 1834 nxt_unit_req_debug(req, "done: %d", rc); 1835 1836 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1837 goto skip_response_send; 1838 } 1839 1840 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 1841 1842 size = nxt_length("Content-Type") + nxt_length("text/plain"); 1843 1844 rc = nxt_unit_response_init(req, 200, 1, size); 1845 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1846 goto skip_response_send; 1847 } 1848 1849 rc = nxt_unit_response_add_field(req, "Content-Type", 1850 nxt_length("Content-Type"), 1851 "text/plain", nxt_length("text/plain")); 1852 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1853 goto skip_response_send; 1854 } 1855 } 1856 1857 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 1858 1859 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 1860 1861 nxt_unit_buf_send_done(req->response_buf); 1862 1863 return; 1864 } 1865 1866 skip_response_send: 1867 1868 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); 1869 1870 msg.stream = req_impl->recv_msg.port_msg.stream; 1871 msg.pid = lib->pid; 1872 msg.reply_port = 0; 1873 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 1874 : _NXT_PORT_MSG_RPC_ERROR; 1875 msg.last = 1; 1876 msg.mmap = 0; 1877 msg.nf = 0; 1878 msg.mf = 0; 1879 msg.tracking = 0; 1880 1881 res = lib->callbacks.port_send(req->ctx, &req->response_port, 1882 &msg, sizeof(msg), NULL, 0); 1883 if (nxt_slow_path(res != sizeof(msg))) { 1884 nxt_unit_req_alert(req, "last message send failed: %s (%d)", 1885 strerror(errno), errno); 1886 } 1887 1888 nxt_unit_request_info_release(req); 1889 } 1890 1891 1892 static nxt_port_mmap_header_t * 1893 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 1894 nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n) 1895 { 1896 int res, nchunks, i; 1897 nxt_unit_mmap_t *mm, *mm_end; 1898 nxt_port_mmap_header_t *hdr; 1899 1900 pthread_mutex_lock(&process->outgoing.mutex); 1901 1902 mm_end = process->outgoing.elts + process->outgoing.size; 1903 1904 for (mm = process->outgoing.elts; mm < mm_end; mm++) { 1905 hdr = mm->hdr; 1906 1907 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) { 1908 continue; 1909 } 1910 1911 *c = 0; 1912 1913 while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) { 1914 nchunks = 1; 1915 1916 while (nchunks < n) { 1917 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, 1918 *c + nchunks); 1919 1920 if (res == 0) { 1921 for (i = 0; i < nchunks; i++) { 1922 nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i); 1923 } 1924 1925 *c += nchunks + 1; 1926 nchunks = 0; 1927 break; 1928 } 1929 1930 nchunks++; 1931 } 1932 1933 if (nchunks == n) { 1934 goto unlock; 1935 } 1936 } 1937 } 1938 1939 *c = 0; 1940 hdr = nxt_unit_new_mmap(ctx, process, port_id, n); 1941 1942 unlock: 1943 1944 pthread_mutex_unlock(&process->outgoing.mutex); 1945 1946 return hdr; 1947 } 1948 1949 1950 static nxt_unit_mmap_t * 1951 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) 1952 { 1953 uint32_t cap; 1954 1955 cap = mmaps->cap; 1956 1957 if (cap == 0) { 1958 cap = i + 1; 1959 } 1960 1961 while (i + 1 > cap) { 1962 1963 if (cap < 16) { 1964 cap = cap * 2; 1965 1966 } else { 1967 cap = cap + cap / 2; 1968 } 1969 } 1970 1971 if (cap != mmaps->cap) { 1972 1973 mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts)); 1974 if (nxt_slow_path(mmaps->elts == NULL)) { 1975 return NULL; 1976 } 1977 1978 memset(mmaps->elts + mmaps->cap, 0, 1979 sizeof(*mmaps->elts) * (cap - mmaps->cap)); 1980 1981 mmaps->cap = cap; 1982 } 1983 1984 if (i + 1 > mmaps->size) { 1985 mmaps->size = i + 1; 1986 } 1987 1988 return mmaps->elts + i; 1989 } 1990 1991 1992 static nxt_port_mmap_header_t * 1993 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 1994 nxt_unit_port_id_t *port_id, int n) 1995 { 1996 int i, fd, rc; 1997 void *mem; 1998 char name[64]; 1999 nxt_unit_mmap_t *mm; 2000 nxt_unit_impl_t *lib; 2001 nxt_port_mmap_header_t *hdr; 2002 2003 lib = process->lib; 2004 2005 mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size); 2006 if (nxt_slow_path(mm == NULL)) { 2007 nxt_unit_warn(ctx, "failed to add mmap to outgoing array"); 2008 2009 return NULL; 2010 } 2011 2012 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", 2013 lib->pid, (void *) pthread_self()); 2014 2015 #if (NXT_HAVE_MEMFD_CREATE) 2016 2017 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 2018 if (nxt_slow_path(fd == -1)) { 2019 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, 2020 strerror(errno), errno); 2021 2022 goto remove_fail; 2023 } 2024 2025 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); 2026 2027 #elif (NXT_HAVE_SHM_OPEN_ANON) 2028 2029 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 2030 if (nxt_slow_path(fd == -1)) { 2031 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", 2032 strerror(errno), errno); 2033 2034 goto remove_fail; 2035 } 2036 2037 #elif (NXT_HAVE_SHM_OPEN) 2038 2039 /* Just in case. */ 2040 shm_unlink(name); 2041 2042 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 2043 if (nxt_slow_path(fd == -1)) { 2044 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, 2045 strerror(errno), errno); 2046 2047 goto remove_fail; 2048 } 2049 2050 if (nxt_slow_path(shm_unlink(name) == -1)) { 2051 nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name, 2052 strerror(errno), errno); 2053 } 2054 2055 #else 2056 2057 #error No working shared memory implementation. 2058 2059 #endif 2060 2061 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { 2062 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, 2063 strerror(errno), errno); 2064 2065 goto remove_fail; 2066 } 2067 2068 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 2069 if (nxt_slow_path(mem == MAP_FAILED)) { 2070 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, 2071 strerror(errno), errno); 2072 2073 goto remove_fail; 2074 } 2075 2076 mm->hdr = mem; 2077 hdr = mem; 2078 2079 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 2080 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 2081 2082 hdr->id = process->outgoing.size - 1; 2083 hdr->src_pid = lib->pid; 2084 hdr->dst_pid = process->pid; 2085 hdr->sent_over = port_id->id; 2086 2087 /* Mark first n chunk(s) as busy */ 2088 for (i = 0; i < n; i++) { 2089 nxt_port_mmap_set_chunk_busy(hdr->free_map, i); 2090 } 2091 2092 /* Mark as busy chunk followed the last available chunk. */ 2093 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 2094 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 2095 2096 pthread_mutex_unlock(&process->outgoing.mutex); 2097 2098 rc = nxt_unit_send_mmap(ctx, port_id, fd); 2099 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2100 munmap(mem, PORT_MMAP_SIZE); 2101 hdr = NULL; 2102 2103 } else { 2104 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", 2105 hdr->id, (int) lib->pid, (int) process->pid); 2106 } 2107 2108 close(fd); 2109 2110 pthread_mutex_lock(&process->outgoing.mutex); 2111 2112 if (nxt_fast_path(hdr != NULL)) { 2113 return hdr; 2114 } 2115 2116 remove_fail: 2117 2118 process->outgoing.size--; 2119 2120 return NULL; 2121 } 2122 2123 2124 static int 2125 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) 2126 { 2127 ssize_t res; 2128 nxt_port_msg_t msg; 2129 nxt_unit_impl_t *lib; 2130 union { 2131 struct cmsghdr cm; 2132 char space[CMSG_SPACE(sizeof(int))]; 2133 } cmsg; 2134 2135 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2136 2137 msg.stream = 0; 2138 msg.pid = lib->pid; 2139 msg.reply_port = 0; 2140 msg.type = _NXT_PORT_MSG_MMAP; 2141 msg.last = 0; 2142 msg.mmap = 0; 2143 msg.nf = 0; 2144 msg.mf = 0; 2145 msg.tracking = 0; 2146 2147 #if (NXT_VALGRIND) 2148 memset(&cmsg, 0, sizeof(cmsg)); 2149 #endif 2150 2151 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 2152 cmsg.cm.cmsg_level = SOL_SOCKET; 2153 cmsg.cm.cmsg_type = SCM_RIGHTS; 2154 2155 /* 2156 * memcpy() is used instead of simple 2157 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 2158 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 2159 * dereferencing type-punned pointer will break strict-aliasing rules 2160 * 2161 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 2162 * in the same simple assignment as in the code above. 2163 */ 2164 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 2165 2166 res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), 2167 &cmsg, sizeof(cmsg)); 2168 if (nxt_slow_path(res != sizeof(msg))) { 2169 nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)", 2170 (int) port_id->pid, strerror(errno), errno); 2171 2172 return NXT_UNIT_ERROR; 2173 } 2174 2175 return NXT_UNIT_OK; 2176 } 2177 2178 2179 static int 2180 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 2181 nxt_unit_port_id_t *port_id, uint32_t size, 2182 nxt_unit_mmap_buf_t *mmap_buf) 2183 { 2184 uint32_t nchunks; 2185 nxt_chunk_id_t c; 2186 nxt_port_mmap_header_t *hdr; 2187 2188 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 2189 2190 hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks); 2191 if (nxt_slow_path(hdr == NULL)) { 2192 return NXT_UNIT_ERROR; 2193 } 2194 2195 mmap_buf->hdr = hdr; 2196 mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); 2197 mmap_buf->buf.free = mmap_buf->buf.start; 2198 mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; 2199 mmap_buf->port_id = *port_id; 2200 2201 nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", 2202 (int) hdr->id, (int) c, 2203 (int) (nchunks * PORT_MMAP_CHUNK_SIZE)); 2204 2205 return NXT_UNIT_OK; 2206 } 2207 2208 2209 static int 2210 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) 2211 { 2212 int rc; 2213 void *mem; 2214 struct stat mmap_stat; 2215 nxt_unit_mmap_t *mm; 2216 nxt_unit_impl_t *lib; 2217 nxt_unit_process_t *process; 2218 nxt_port_mmap_header_t *hdr; 2219 2220 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2221 2222 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); 2223 2224 pthread_mutex_lock(&lib->mutex); 2225 2226 process = nxt_unit_process_find(ctx, pid, 0); 2227 2228 pthread_mutex_unlock(&lib->mutex); 2229 2230 if (nxt_slow_path(process == NULL)) { 2231 nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d", 2232 (int) pid, fd); 2233 2234 return NXT_UNIT_ERROR; 2235 } 2236 2237 rc = NXT_UNIT_ERROR; 2238 2239 if (fstat(fd, &mmap_stat) == -1) { 2240 nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, 2241 strerror(errno), errno); 2242 2243 goto fail; 2244 } 2245 2246 mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE, 2247 MAP_SHARED, fd, 0); 2248 if (nxt_slow_path(mem == MAP_FAILED)) { 2249 nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)", 2250 strerror(errno), errno); 2251 2252 goto fail; 2253 } 2254 2255 hdr = mem; 2256 2257 if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) { 2258 2259 nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " 2260 "detected: %d != %d or %d != %d", (int) hdr->src_pid, 2261 (int) pid, (int) hdr->dst_pid, (int) lib->pid); 2262 2263 munmap(mem, PORT_MMAP_SIZE); 2264 2265 goto fail; 2266 } 2267 2268 pthread_mutex_lock(&process->incoming.mutex); 2269 2270 mm = nxt_unit_mmap_at(&process->incoming, hdr->id); 2271 if (nxt_slow_path(mm == NULL)) { 2272 nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array"); 2273 2274 munmap(mem, PORT_MMAP_SIZE); 2275 2276 } else { 2277 mm->hdr = hdr; 2278 2279 hdr->sent_over = 0xFFFFu; 2280 2281 rc = NXT_UNIT_OK; 2282 } 2283 2284 pthread_mutex_unlock(&process->incoming.mutex); 2285 2286 fail: 2287 2288 nxt_unit_process_use(ctx, process, -1); 2289 2290 return rc; 2291 } 2292 2293 2294 static void 2295 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) 2296 { 2297 pthread_mutex_init(&mmaps->mutex, NULL); 2298 2299 mmaps->size = 0; 2300 mmaps->cap = 0; 2301 mmaps->elts = NULL; 2302 } 2303 2304 2305 static void 2306 nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i) 2307 { 2308 long c; 2309 2310 c = nxt_atomic_fetch_add(&process->use_count, i); 2311 2312 if (i < 0 && c == -i) { 2313 nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid); 2314 2315 nxt_unit_mmaps_destroy(&process->incoming); 2316 nxt_unit_mmaps_destroy(&process->outgoing); 2317 2318 free(process); 2319 } 2320 } 2321 2322 2323 static void 2324 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) 2325 { 2326 nxt_unit_mmap_t *mm, *end; 2327 2328 if (mmaps->elts != NULL) { 2329 end = mmaps->elts + mmaps->size; 2330 2331 for (mm = mmaps->elts; mm < end; mm++) { 2332 munmap(mm->hdr, PORT_MMAP_SIZE); 2333 } 2334 2335 free(mmaps->elts); 2336 } 2337 2338 pthread_mutex_destroy(&mmaps->mutex); 2339 } 2340 2341 2342 static nxt_port_mmap_header_t * 2343 nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, 2344 uint32_t id) 2345 { 2346 nxt_port_mmap_header_t *hdr; 2347 2348 if (nxt_fast_path(process->incoming.size > id)) { 2349 hdr = process->incoming.elts[id].hdr; 2350 2351 } else { 2352 hdr = NULL; 2353 } 2354 2355 return hdr; 2356 } 2357 2358 2359 static int 2360 nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 2361 { 2362 int rc; 2363 nxt_chunk_id_t c; 2364 nxt_unit_process_t *process; 2365 nxt_port_mmap_header_t *hdr; 2366 nxt_port_mmap_tracking_msg_t *tracking_msg; 2367 2368 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 2369 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", 2370 recv_msg->port_msg.stream, (int) recv_msg->size); 2371 2372 return 0; 2373 } 2374 2375 tracking_msg = recv_msg->start; 2376 2377 recv_msg->start = tracking_msg + 1; 2378 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); 2379 2380 process = nxt_unit_msg_get_process(ctx, recv_msg); 2381 if (nxt_slow_path(process == NULL)) { 2382 return 0; 2383 } 2384 2385 pthread_mutex_lock(&process->incoming.mutex); 2386 2387 hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id); 2388 if (nxt_slow_path(hdr == NULL)) { 2389 pthread_mutex_unlock(&process->incoming.mutex); 2390 2391 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " 2392 "invalid mmap id %d,%"PRIu32, 2393 recv_msg->port_msg.stream, 2394 (int) process->pid, tracking_msg->mmap_id); 2395 2396 return 0; 2397 } 2398 2399 c = tracking_msg->tracking_id; 2400 rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->port_msg.stream, 0); 2401 2402 if (rc == 0) { 2403 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", 2404 recv_msg->port_msg.stream); 2405 2406 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 2407 } 2408 2409 pthread_mutex_unlock(&process->incoming.mutex); 2410 2411 return rc; 2412 } 2413 2414 2415 static int 2416 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, 2417 nxt_queue_t *incoming_buf) 2418 { 2419 void *start; 2420 uint32_t size; 2421 nxt_unit_process_t *process; 2422 nxt_unit_mmap_buf_t *b; 2423 nxt_port_mmap_msg_t *mmap_msg, *end; 2424 nxt_port_mmap_header_t *hdr; 2425 2426 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 2427 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", 2428 recv_msg->port_msg.stream, (int) recv_msg->size); 2429 2430 return NXT_UNIT_ERROR; 2431 } 2432 2433 process = nxt_unit_msg_get_process(ctx, recv_msg); 2434 if (nxt_slow_path(process == NULL)) { 2435 return NXT_UNIT_ERROR; 2436 } 2437 2438 mmap_msg = recv_msg->start; 2439 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 2440 2441 pthread_mutex_lock(&process->incoming.mutex); 2442 2443 for (; mmap_msg < end; mmap_msg++) { 2444 hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id); 2445 if (nxt_slow_path(hdr == NULL)) { 2446 pthread_mutex_unlock(&process->incoming.mutex); 2447 2448 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " 2449 "invalid mmap id %d,%"PRIu32, 2450 recv_msg->port_msg.stream, 2451 (int) process->pid, mmap_msg->mmap_id); 2452 2453 return NXT_UNIT_ERROR; 2454 } 2455 2456 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 2457 size = mmap_msg->size; 2458 2459 if (recv_msg->start == mmap_msg) { 2460 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " 2461 "move start %p -> %p", 2462 recv_msg->port_msg.stream, 2463 recv_msg->start, start); 2464 2465 recv_msg->start = start; 2466 recv_msg->size = size; 2467 } 2468 2469 b = nxt_unit_mmap_buf_get(ctx); 2470 if (nxt_slow_path(b == NULL)) { 2471 pthread_mutex_unlock(&process->incoming.mutex); 2472 2473 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " 2474 "failed to allocate buf", 2475 recv_msg->port_msg.stream); 2476 2477 nxt_unit_mmap_release(hdr, start, size); 2478 2479 return NXT_UNIT_ERROR; 2480 } 2481 2482 nxt_queue_insert_tail(incoming_buf, &b->link); 2483 2484 b->buf.start = start; 2485 b->buf.free = start; 2486 b->buf.end = b->buf.start + size; 2487 b->hdr = hdr; 2488 2489 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)\n" 2490 "%.*s", 2491 recv_msg->port_msg.stream, 2492 start, (int) size, 2493 (int) hdr->src_pid, (int) hdr->dst_pid, 2494 (int) hdr->id, (int) mmap_msg->chunk_id, 2495 (int) mmap_msg->size, 2496 (int) size, (char *) start); 2497 } 2498 2499 pthread_mutex_unlock(&process->incoming.mutex); 2500 2501 return NXT_UNIT_OK; 2502 } 2503 2504 2505 static int 2506 nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size) 2507 { 2508 u_char *p, *end; 2509 nxt_chunk_id_t c; 2510 2511 memset(start, 0xA5, size); 2512 2513 p = start; 2514 end = p + size; 2515 c = nxt_port_mmap_chunk_id(hdr, p); 2516 2517 while (p < end) { 2518 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 2519 2520 p += PORT_MMAP_CHUNK_SIZE; 2521 c++; 2522 } 2523 2524 return NXT_UNIT_OK; 2525 } 2526 2527 2528 static nxt_int_t 2529 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 2530 { 2531 nxt_process_t *process; 2532 2533 process = data; 2534 2535 if (lhq->key.length == sizeof(pid_t) 2536 && *(pid_t *) lhq->key.start == process->pid) 2537 { 2538 return NXT_OK; 2539 } 2540 2541 return NXT_DECLINED; 2542 } 2543 2544 2545 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 2546 NXT_LVLHSH_DEFAULT, 2547 nxt_unit_lvlhsh_pid_test, 2548 nxt_lvlhsh_alloc, 2549 nxt_lvlhsh_free, 2550 }; 2551 2552 2553 static inline void 2554 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) 2555 { 2556 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 2557 lhq->key.length = sizeof(*pid); 2558 lhq->key.start = (u_char *) pid; 2559 lhq->proto = &lvlhsh_processes_proto; 2560 } 2561 2562 2563 static nxt_unit_process_t * 2564 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) 2565 { 2566 nxt_unit_impl_t *lib; 2567 nxt_unit_process_t *process; 2568 nxt_lvlhsh_query_t lhq; 2569 2570 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2571 2572 nxt_unit_process_lhq_pid(&lhq, &pid); 2573 2574 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { 2575 process = lhq.value; 2576 nxt_unit_process_use(ctx, process, 1); 2577 2578 return process; 2579 } 2580 2581 process = malloc(sizeof(nxt_unit_process_t)); 2582 if (nxt_slow_path(process == NULL)) { 2583 nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid); 2584 2585 return NULL; 2586 } 2587 2588 process->pid = pid; 2589 process->use_count = 1; 2590 process->next_port_id = 0; 2591 process->lib = lib; 2592 2593 nxt_queue_init(&process->ports); 2594 2595 nxt_unit_mmaps_init(&process->incoming); 2596 nxt_unit_mmaps_init(&process->outgoing); 2597 2598 lhq.replace = 0; 2599 lhq.value = process; 2600 2601 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) { 2602 2603 case NXT_OK: 2604 break; 2605 2606 default: 2607 nxt_unit_warn(ctx, "process %d insert failed", (int) pid); 2608 2609 pthread_mutex_destroy(&process->outgoing.mutex); 2610 pthread_mutex_destroy(&process->incoming.mutex); 2611 free(process); 2612 process = NULL; 2613 break; 2614 } 2615 2616 nxt_unit_process_use(ctx, process, 1); 2617 2618 return process; 2619 } 2620 2621 2622 static nxt_unit_process_t * 2623 nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove) 2624 { 2625 int rc; 2626 nxt_unit_impl_t *lib; 2627 nxt_unit_process_t *process; 2628 nxt_lvlhsh_query_t lhq; 2629 2630 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2631 2632 nxt_unit_process_lhq_pid(&lhq, &pid); 2633 2634 if (remove) { 2635 rc = nxt_lvlhsh_delete(&lib->processes, &lhq); 2636 2637 } else { 2638 rc = nxt_lvlhsh_find(&lib->processes, &lhq); 2639 } 2640 2641 if (rc == NXT_OK) { 2642 process = lhq.value; 2643 2644 if (!remove) { 2645 nxt_unit_process_use(ctx, process, 1); 2646 } 2647 2648 return process; 2649 } 2650 2651 return NULL; 2652 } 2653 2654 2655 static nxt_unit_process_t * 2656 nxt_unit_process_pop_first(nxt_unit_impl_t *lib) 2657 { 2658 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL); 2659 } 2660 2661 2662 int 2663 nxt_unit_run(nxt_unit_ctx_t *ctx) 2664 { 2665 int rc; 2666 nxt_unit_impl_t *lib; 2667 2668 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2669 rc = NXT_UNIT_OK; 2670 2671 while (nxt_fast_path(lib->online)) { 2672 rc = nxt_unit_run_once(ctx); 2673 } 2674 2675 return rc; 2676 } 2677 2678 2679 static int 2680 nxt_unit_run_once(nxt_unit_ctx_t *ctx) 2681 { 2682 int rc; 2683 char buf[4096]; 2684 char oob[256]; 2685 ssize_t rsize; 2686 nxt_unit_impl_t *lib; 2687 nxt_unit_ctx_impl_t *ctx_impl; 2688 2689 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2690 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2691 2692 memset(oob, 0, sizeof(struct cmsghdr)); 2693 2694 if (ctx_impl->read_port_fd != -1) { 2695 rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd, 2696 buf, sizeof(buf), 2697 oob, sizeof(oob)); 2698 } else { 2699 rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, 2700 buf, sizeof(buf), 2701 oob, sizeof(oob)); 2702 } 2703 2704 if (nxt_fast_path(rsize > 0)) { 2705 rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize, 2706 oob, sizeof(oob)); 2707 } else { 2708 rc = NXT_UNIT_ERROR; 2709 } 2710 2711 return rc; 2712 } 2713 2714 2715 void 2716 nxt_unit_done(nxt_unit_ctx_t *ctx) 2717 { 2718 nxt_unit_impl_t *lib; 2719 nxt_unit_process_t *process; 2720 nxt_unit_ctx_impl_t *ctx_impl; 2721 2722 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2723 2724 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { 2725 2726 nxt_unit_ctx_free(&ctx_impl->ctx); 2727 2728 } nxt_queue_loop; 2729 2730 for ( ;; ) { 2731 process = nxt_unit_process_pop_first(lib); 2732 if (process == NULL) { 2733 break; 2734 } 2735 2736 nxt_unit_remove_process(ctx, process); 2737 } 2738 2739 free(lib); 2740 } 2741 2742 2743 nxt_unit_ctx_t * 2744 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) 2745 { 2746 int rc, fd; 2747 nxt_unit_impl_t *lib; 2748 nxt_unit_port_id_t new_port_id; 2749 nxt_unit_ctx_impl_t *new_ctx; 2750 2751 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2752 2753 new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size); 2754 if (nxt_slow_path(new_ctx == NULL)) { 2755 nxt_unit_warn(ctx, "failed to allocate context"); 2756 2757 return NULL; 2758 } 2759 2760 rc = nxt_unit_create_port(ctx, &new_port_id, &fd); 2761 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2762 free(new_ctx); 2763 2764 return NULL; 2765 } 2766 2767 rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd); 2768 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2769 lib->callbacks.remove_port(ctx, &new_port_id); 2770 2771 close(fd); 2772 2773 free(new_ctx); 2774 2775 return NULL; 2776 } 2777 2778 close(fd); 2779 2780 nxt_unit_ctx_init(lib, new_ctx, data); 2781 2782 new_ctx->read_port_id = new_port_id; 2783 2784 return &new_ctx->ctx; 2785 } 2786 2787 2788 void 2789 nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) 2790 { 2791 nxt_unit_impl_t *lib; 2792 nxt_unit_ctx_impl_t *ctx_impl; 2793 nxt_unit_mmap_buf_t *mmap_buf; 2794 nxt_unit_request_info_impl_t *req_impl; 2795 2796 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2797 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2798 2799 nxt_queue_each(req_impl, &ctx_impl->active_req, 2800 nxt_unit_request_info_impl_t, link) 2801 { 2802 nxt_unit_req_warn(&req_impl->req, "active request on ctx free"); 2803 2804 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR); 2805 2806 } nxt_queue_loop; 2807 2808 nxt_queue_remove(&ctx_impl->ctx_buf[0].link); 2809 nxt_queue_remove(&ctx_impl->ctx_buf[1].link); 2810 2811 nxt_queue_each(mmap_buf, &ctx_impl->free_buf, nxt_unit_mmap_buf_t, link) { 2812 2813 nxt_queue_remove(&mmap_buf->link); 2814 free(mmap_buf); 2815 2816 } nxt_queue_loop; 2817 2818 nxt_queue_each(req_impl, &ctx_impl->free_req, 2819 nxt_unit_request_info_impl_t, link) 2820 { 2821 nxt_unit_request_info_free(req_impl); 2822 2823 } nxt_queue_loop; 2824 2825 nxt_queue_remove(&ctx_impl->link); 2826 2827 if (ctx_impl != &lib->main_ctx) { 2828 free(ctx_impl); 2829 } 2830 } 2831 2832 2833 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */ 2834 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET) 2835 #define NXT_UNIX_SOCKET SOCK_SEQPACKET 2836 #else 2837 #define NXT_UNIX_SOCKET SOCK_DGRAM 2838 #endif 2839 2840 2841 void 2842 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) 2843 { 2844 nxt_unit_port_hash_id_t port_hash_id; 2845 2846 port_hash_id.pid = pid; 2847 port_hash_id.id = id; 2848 2849 port_id->pid = pid; 2850 port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id)); 2851 port_id->id = id; 2852 } 2853 2854 2855 int 2856 nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, 2857 nxt_unit_port_id_t *port_id) 2858 { 2859 int rc, fd; 2860 nxt_unit_impl_t *lib; 2861 nxt_unit_port_id_t new_port_id; 2862 2863 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2864 2865 rc = nxt_unit_create_port(ctx, &new_port_id, &fd); 2866 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2867 return rc; 2868 } 2869 2870 rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd); 2871 2872 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 2873 *port_id = new_port_id; 2874 2875 } else { 2876 lib->callbacks.remove_port(ctx, &new_port_id); 2877 } 2878 2879 close(fd); 2880 2881 return rc; 2882 } 2883 2884 2885 static int 2886 nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) 2887 { 2888 int rc, port_sockets[2]; 2889 nxt_unit_impl_t *lib; 2890 nxt_unit_port_t new_port; 2891 nxt_unit_process_t *process; 2892 2893 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2894 2895 rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets); 2896 if (nxt_slow_path(rc != 0)) { 2897 nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", 2898 strerror(errno), errno); 2899 2900 return NXT_UNIT_ERROR; 2901 } 2902 2903 nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", 2904 port_sockets[0], port_sockets[1]); 2905 2906 pthread_mutex_lock(&lib->mutex); 2907 2908 process = nxt_unit_process_get(ctx, lib->pid); 2909 if (nxt_slow_path(process == NULL)) { 2910 pthread_mutex_unlock(&lib->mutex); 2911 2912 close(port_sockets[0]); 2913 close(port_sockets[1]); 2914 2915 return NXT_UNIT_ERROR; 2916 } 2917 2918 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); 2919 2920 new_port.in_fd = port_sockets[0]; 2921 new_port.out_fd = -1; 2922 new_port.data = NULL; 2923 2924 pthread_mutex_unlock(&lib->mutex); 2925 2926 nxt_unit_process_use(ctx, process, -1); 2927 2928 rc = lib->callbacks.add_port(ctx, &new_port); 2929 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2930 nxt_unit_warn(ctx, "create_port: add_port() failed"); 2931 2932 close(port_sockets[0]); 2933 close(port_sockets[1]); 2934 2935 return rc; 2936 } 2937 2938 *port_id = new_port.id; 2939 *fd = port_sockets[1]; 2940 2941 return rc; 2942 } 2943 2944 2945 static int 2946 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, 2947 nxt_unit_port_id_t *new_port, int fd) 2948 { 2949 ssize_t res; 2950 nxt_unit_impl_t *lib; 2951 2952 struct { 2953 nxt_port_msg_t msg; 2954 nxt_port_msg_new_port_t new_port; 2955 } m; 2956 2957 union { 2958 struct cmsghdr cm; 2959 char space[CMSG_SPACE(sizeof(int))]; 2960 } cmsg; 2961 2962 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 2963 2964 m.msg.stream = 0; 2965 m.msg.pid = lib->pid; 2966 m.msg.reply_port = 0; 2967 m.msg.type = _NXT_PORT_MSG_NEW_PORT; 2968 m.msg.last = 0; 2969 m.msg.mmap = 0; 2970 m.msg.nf = 0; 2971 m.msg.mf = 0; 2972 m.msg.tracking = 0; 2973 2974 m.new_port.id = new_port->id; 2975 m.new_port.pid = new_port->pid; 2976 m.new_port.type = NXT_PROCESS_WORKER; 2977 m.new_port.max_size = 16 * 1024; 2978 m.new_port.max_share = 64 * 1024; 2979 2980 #if (NXT_VALGRIND) 2981 memset(&cmsg, 0, sizeof(cmsg)); 2982 #endif 2983 2984 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 2985 cmsg.cm.cmsg_level = SOL_SOCKET; 2986 cmsg.cm.cmsg_type = SCM_RIGHTS; 2987 2988 /* 2989 * memcpy() is used instead of simple 2990 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 2991 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 2992 * dereferencing type-punned pointer will break strict-aliasing rules 2993 * 2994 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 2995 * in the same simple assignment as in the code above. 2996 */ 2997 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 2998 2999 res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m), 3000 &cmsg, sizeof(cmsg)); 3001 3002 return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR; 3003 } 3004 3005 3006 int 3007 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 3008 { 3009 int rc; 3010 nxt_unit_impl_t *lib; 3011 nxt_unit_process_t *process; 3012 nxt_unit_port_impl_t *new_port; 3013 3014 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3015 3016 nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", 3017 port->id.pid, port->id.id, 3018 port->in_fd, port->out_fd); 3019 3020 pthread_mutex_lock(&lib->mutex); 3021 3022 process = nxt_unit_process_get(ctx, port->id.pid); 3023 if (nxt_slow_path(process == NULL)) { 3024 rc = NXT_UNIT_ERROR; 3025 goto unlock; 3026 } 3027 3028 if (port->id.id >= process->next_port_id) { 3029 process->next_port_id = port->id.id + 1; 3030 } 3031 3032 new_port = malloc(sizeof(nxt_unit_port_impl_t)); 3033 if (nxt_slow_path(new_port == NULL)) { 3034 rc = NXT_UNIT_ERROR; 3035 goto unlock; 3036 } 3037 3038 new_port->port = *port; 3039 3040 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port); 3041 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3042 goto unlock; 3043 } 3044 3045 nxt_queue_insert_tail(&process->ports, &new_port->link); 3046 3047 rc = NXT_UNIT_OK; 3048 3049 new_port->process = process; 3050 3051 unlock: 3052 3053 pthread_mutex_unlock(&lib->mutex); 3054 3055 if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) { 3056 nxt_unit_process_use(ctx, process, -1); 3057 } 3058 3059 return rc; 3060 } 3061 3062 3063 void 3064 nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 3065 { 3066 nxt_unit_find_remove_port(ctx, port_id, NULL); 3067 } 3068 3069 3070 void 3071 nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 3072 nxt_unit_port_t *r_port) 3073 { 3074 nxt_unit_impl_t *lib; 3075 nxt_unit_process_t *process; 3076 3077 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3078 3079 pthread_mutex_lock(&lib->mutex); 3080 3081 process = NULL; 3082 3083 nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process); 3084 3085 pthread_mutex_unlock(&lib->mutex); 3086 3087 if (nxt_slow_path(process != NULL)) { 3088 nxt_unit_process_use(ctx, process, -1); 3089 } 3090 } 3091 3092 3093 static void 3094 nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, 3095 nxt_unit_port_t *r_port, nxt_unit_process_t **process) 3096 { 3097 nxt_unit_impl_t *lib; 3098 nxt_unit_port_impl_t *port; 3099 3100 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3101 3102 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); 3103 if (nxt_slow_path(port == NULL)) { 3104 nxt_unit_debug(ctx, "remove_port: port %d,%d not found", 3105 (int) port_id->pid, (int) port_id->id); 3106 3107 return; 3108 } 3109 3110 nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p", 3111 (int) port_id->pid, (int) port_id->id, 3112 port->port.in_fd, port->port.out_fd, port->port.data); 3113 3114 if (port->port.in_fd != -1) { 3115 close(port->port.in_fd); 3116 } 3117 3118 if (port->port.out_fd != -1) { 3119 close(port->port.out_fd); 3120 } 3121 3122 if (port->process != NULL) { 3123 nxt_queue_remove(&port->link); 3124 } 3125 3126 if (process != NULL) { 3127 *process = port->process; 3128 } 3129 3130 if (r_port != NULL) { 3131 *r_port = port->port; 3132 } 3133 3134 free(port); 3135 } 3136 3137 3138 void 3139 nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid) 3140 { 3141 nxt_unit_impl_t *lib; 3142 nxt_unit_process_t *process; 3143 3144 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3145 3146 pthread_mutex_lock(&lib->mutex); 3147 3148 process = nxt_unit_process_find(ctx, pid, 1); 3149 if (nxt_slow_path(process == NULL)) { 3150 nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid); 3151 3152 pthread_mutex_unlock(&lib->mutex); 3153 3154 return; 3155 } 3156 3157 nxt_unit_remove_process(ctx, process); 3158 } 3159 3160 3161 static void 3162 nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process) 3163 { 3164 nxt_queue_t ports; 3165 nxt_unit_impl_t *lib; 3166 nxt_unit_port_impl_t *port; 3167 3168 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3169 3170 nxt_queue_init(&ports); 3171 3172 nxt_queue_add(&ports, &process->ports); 3173 3174 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 3175 3176 nxt_unit_process_use(ctx, process, -1); 3177 port->process = NULL; 3178 3179 /* Shortcut for default callback. */ 3180 if (lib->callbacks.remove_port == nxt_unit_remove_port) { 3181 nxt_queue_remove(&port->link); 3182 3183 nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL); 3184 } 3185 3186 } nxt_queue_loop; 3187 3188 pthread_mutex_unlock(&lib->mutex); 3189 3190 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 3191 3192 nxt_queue_remove(&port->link); 3193 3194 lib->callbacks.remove_port(ctx, &port->port.id); 3195 3196 } nxt_queue_loop; 3197 3198 nxt_unit_process_use(ctx, process, -1); 3199 } 3200 3201 3202 void 3203 nxt_unit_quit(nxt_unit_ctx_t *ctx) 3204 { 3205 nxt_unit_impl_t *lib; 3206 3207 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3208