1 2 /* 3 * Copyright (C) Max Romanov 4 * Copyright (C) Igor Sysoev 5 * Copyright (C) Valentin V. Bartenev 6 * Copyright (C) NGINX, Inc. 7 */ 8 9 #include <nxt_main.h> 10 #include <nxt_runtime.h> 11 #include <nxt_main_process.h> 12 #include <nxt_router.h> 13 #include <nxt_http.h> 14 #include <nxt_application.h> 15 16 #include <glob.h> 17 18 19 typedef struct { 20 nxt_app_type_t type; 21 nxt_str_t version; 22 nxt_str_t file; 23 } nxt_module_t; 24 25 26 static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path); 27 static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, 28 nxt_array_t *modules, const char *name); 29 static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task, 30 const char *name); 31 32 static void nxt_app_http_release(nxt_task_t *task, void *obj, void *data); 33 34 35 static uint32_t compat[] = { 36 NXT_VERNUM, NXT_DEBUG, 37 }; 38 39 40 static nxt_thread_mutex_t nxt_app_mutex; 41 static nxt_thread_cond_t nxt_app_cond; 42 43 static nxt_application_module_t *nxt_app; 44 45 46 nxt_int_t 47 nxt_discovery_start(nxt_task_t *task, void *data) 48 { 49 nxt_buf_t *b; 50 nxt_port_t *main_port; 51 nxt_runtime_t *rt; 52 53 nxt_debug(task, "DISCOVERY"); 54 55 rt = task->thread->runtime; 56 57 b = nxt_discovery_modules(task, rt->modules); 58 if (nxt_slow_path(b == NULL)) { 59 exit(1); 60 } 61 62 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 63 64 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1, 65 0, -1, b); 66 67 return NXT_OK; 68 } 69 70 71 static void 72 nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data) 73 { 74 nxt_mp_t *mp; 75 nxt_buf_t *b; 76 77 b = obj; 78 mp = b->data; 79 80 nxt_mp_destroy(mp); 81 82 exit(0); 83 } 84 85 86 static nxt_buf_t * 87 nxt_discovery_modules(nxt_task_t *task, const char *path) 88 { 89 char *name; 90 u_char *p, *end; 91 size_t size; 92 glob_t glb; 93 nxt_mp_t *mp; 94 nxt_buf_t *b; 95 nxt_int_t ret; 96 nxt_uint_t i, n; 97 nxt_array_t *modules; 98 nxt_module_t *module; 99 100 b = NULL; 101 102 mp = nxt_mp_create(1024, 128, 256, 32); 103 if (mp == NULL) { 104 return b; 105 } 106 107 ret = glob(path, 0, NULL, &glb); 108 109 n = glb.gl_pathc; 110 111 if (ret != 0) { 112 nxt_log(task, NXT_LOG_NOTICE, 113 "no modules matching: \"%s\" found", path); 114 n = 0; 115 } 116 117 modules = nxt_array_create(mp, n, sizeof(nxt_module_t)); 118 if (modules == NULL) { 119 goto fail; 120 } 121 122 for (i = 0; i < n; i++) { 123 name = glb.gl_pathv[i]; 124 125 ret = nxt_discovery_module(task, mp, modules, name); 126 if (ret != NXT_OK) { 127 goto fail; 128 } 129 } 130 131 size = sizeof("[]") - 1; 132 module = modules->elts; 133 n = modules->nelts; 134 135 for (i = 0; i < n; i++) { 136 nxt_debug(task, "module: %d %V %V", 137 module[i].type, &module[i].version, &module[i].file); 138 139 size += sizeof("{\"type\": ,") - 1; 140 size += sizeof(" \"version\": \"\",") - 1; 141 size += sizeof(" \"file\": \"\"},") - 1; 142 143 size += NXT_INT_T_LEN 144 + module[i].version.length 145 + module[i].file.length; 146 } 147 148 b = nxt_buf_mem_alloc(mp, size, 0); 149 if (b == NULL) { 150 goto fail; 151 } 152 153 b->completion_handler = nxt_discovery_completion_handler; 154 155 p = b->mem.free; 156 end = b->mem.end; 157 *p++ = '['; 158 159 for (i = 0; i < n; i++) { 160 p = nxt_sprintf(p, end, 161 "{\"type\": %d, \"version\": \"%V\", \"file\": \"%V\"},", 162 module[i].type, &module[i].version, &module[i].file); 163 } 164 165 *p++ = ']'; 166 b->mem.free = p; 167 168 fail: 169 170 globfree(&glb); 171 172 return b; 173 } 174 175 176 static nxt_int_t 177 nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules, 178 const char *name) 179 { 180 void *dl; 181 nxt_str_t *s; 182 nxt_int_t ret; 183 nxt_uint_t i, n; 184 nxt_module_t *module; 185 nxt_app_type_t type; 186 nxt_application_module_t *app; 187 188 /* 189 * Only memory allocation failure should return NXT_ERROR. 190 * Any module processing errors are ignored. 191 */ 192 ret = NXT_ERROR; 193 194 dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW); 195 196 if (dl == NULL) { 197 nxt_log(task, NXT_LOG_CRIT, "dlopen(\"%s\"), failed: \"%s\"", 198 name, dlerror()); 199 return NXT_OK; 200 } 201 202 app = dlsym(dl, "nxt_app_module"); 203 204 if (app != NULL) { 205 nxt_log(task, NXT_LOG_NOTICE, "module: %V %V \"%s\"", 206 &app->type, &app->version, name); 207 208 if (app->compat_length != sizeof(compat) 209 || nxt_memcmp(app->compat, compat, sizeof(compat)) != 0) 210 { 211 nxt_log(task, NXT_LOG_NOTICE, "incompatible module %s", name); 212 213 goto done; 214 } 215 216 type = nxt_app_parse_type(app->type.start, app->type.length); 217 218 if (type == NXT_APP_UNKNOWN) { 219 nxt_log(task, NXT_LOG_NOTICE, "unknown module type %V", &app->type); 220 221 goto done; 222 } 223 224 module = modules->elts; 225 n = modules->nelts; 226 227 for (i = 0; i < n; i++) { 228 if (type == module[i].type 229 && nxt_strstr_eq(&app->version, &module[i].version)) 230 { 231 nxt_log(task, NXT_LOG_NOTICE, 232 "ignoring %s module with the same " 233 "application language version %V %V as in %V", 234 name, &app->type, &app->version, 235 &module[i].file); 236 237 goto done; 238 } 239 } 240 241 module = nxt_array_add(modules); 242 if (module == NULL) { 243 goto fail; 244 } 245 246 module->type = type; 247 248 s = nxt_str_dup(mp, &module->version, &app->version); 249 if (s == NULL) { 250 goto fail; 251 } 252 253 module->file.length = nxt_strlen(name); 254 255 module->file.start = nxt_mp_alloc(mp, module->file.length); 256 if (module->file.start == NULL) { 257 goto fail; 258 } 259 260 nxt_memcpy(module->file.start, name, module->file.length); 261 262 } else { 263 nxt_log(task, NXT_LOG_CRIT, "dlsym(\"%s\"), failed: \"%s\"", 264 name, dlerror()); 265 } 266 267 done: 268 269 ret = NXT_OK; 270 271 fail: 272 273 if (dlclose(dl) != 0) { 274 nxt_log(task, NXT_LOG_CRIT, "dlclose(\"%s\"), failed: \"%s\"", 275 name, dlerror()); 276 } 277 278 return ret; 279 } 280 281 282 nxt_int_t 283 nxt_app_start(nxt_task_t *task, void *data) 284 { 285 nxt_int_t ret; 286 nxt_app_lang_module_t *lang; 287 nxt_common_app_conf_t *app_conf; 288 289 app_conf = data; 290 291 lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type); 292 if (nxt_slow_path(lang == NULL)) { 293 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"", 294 &app_conf->type); 295 return NXT_ERROR; 296 } 297 298 nxt_app = lang->module; 299 300 if (nxt_app == NULL) { 301 nxt_debug(task, "application language module: %s \"%s\"", 302 lang->version, lang->file); 303 304 nxt_app = nxt_app_module_load(task, lang->file); 305 } 306 307 if (app_conf->working_directory != NULL 308 && app_conf->working_directory[0] != 0) 309 { 310 ret = chdir(app_conf->working_directory); 311 312 if (nxt_slow_path(ret != 0)) { 313 nxt_log(task, NXT_LOG_WARN, "chdir(%s) failed %E", 314 app_conf->working_directory, nxt_errno); 315 316 return NXT_ERROR; 317 } 318 } 319 320 if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) { 321 return NXT_ERROR; 322 } 323 324 if (nxt_slow_path(nxt_thread_cond_create(&nxt_app_cond) != NXT_OK)) { 325 return NXT_ERROR; 326 } 327 328 ret = nxt_app->init(task, data); 329 330 if (nxt_slow_path(ret != NXT_OK)) { 331 nxt_debug(task, "application init failed"); 332 333 } else { 334 nxt_debug(task, "application init done"); 335 } 336 337 return ret; 338 } 339 340 341 static nxt_app_module_t * 342 nxt_app_module_load(nxt_task_t *task, const char *name) 343 { 344 void *dl; 345 346 dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY); 347 348 if (dl != NULL) { 349 return dlsym(dl, "nxt_app_module"); 350 } 351 352 nxt_log(task, NXT_LOG_CRIT, "dlopen(\"%s\"), failed: \"%s\"", 353 name, dlerror()); 354 355 return NULL; 356 } 357 358 359 void 360 nxt_app_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 361 { 362 if (nxt_app->atexit != NULL) { 363 nxt_app->atexit(task); 364 } 365 366 nxt_worker_process_quit_handler(task, msg); 367 } 368 369 370 void 371 nxt_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 372 { 373 size_t dump_size; 374 nxt_buf_t *b; 375 nxt_port_t *port; 376 nxt_app_rmsg_t rmsg = { msg->buf }; 377 nxt_app_wmsg_t wmsg; 378 379 b = msg->buf; 380 dump_size = b->mem.free - b->mem.pos; 381 382 if (dump_size > 300) { 383 dump_size = 300; 384 } 385 386 nxt_debug(task, "app data: %*s ...", dump_size, b->mem.pos); 387 388 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, 389 msg->port_msg.reply_port); 390 if (nxt_slow_path(port == NULL)) { 391 nxt_debug(task, "stream #%uD: reply port %d not found", 392 msg->port_msg.stream, msg->port_msg.reply_port); 393 return; 394 } 395 396 wmsg.port = port; 397 wmsg.write = NULL; 398 wmsg.buf = &wmsg.write; 399 wmsg.stream = msg->port_msg.stream; 400 401 nxt_app->run(task, &rmsg, &wmsg); 402 } 403 404 405 u_char * 406 nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size) 407 { 408 size_t free_size; 409 u_char *res; 410 nxt_buf_t *b; 411 412 res = NULL; 413 414 do { 415 b = *msg->buf; 416 417 if (b == NULL) { 418 b = nxt_port_mmap_get_buf(task, msg->port, size); 419 if (nxt_slow_path(b == NULL)) { 420 return NULL; 421 } 422 423 *msg->buf = b; 424 425 free_size = nxt_buf_mem_free_size(&b->mem); 426 427 if (nxt_slow_path(free_size < size)) { 428 nxt_log(task, NXT_LOG_WARN, "requested buffer too big " 429 "(%z < %z)", free_size, size); 430 return NULL; 431 } 432 433 } 434 435 free_size = nxt_buf_mem_free_size(&b->mem); 436 437 if (free_size >= size) { 438 res = b->mem.free; 439 b->mem.free += size; 440 441 return res; 442 } 443 444 if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) { 445 res = b->mem.free; 446 b->mem.free += size; 447 448 return res; 449 } 450 451 msg->buf = &b->next; 452 } while(1); 453 } 454 455 456 nxt_int_t 457 nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, u_char *c, size_t size) 458 { 459 u_char *dst; 460 size_t dst_length; 461 462 if (c != NULL) { 463 dst_length = size + (size < 128 ? 1 : 4) + 1; 464 465 dst = nxt_app_msg_write_get_buf(task, msg, dst_length); 466 if (nxt_slow_path(dst == NULL)) { 467 nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed", 468 dst_length); 469 return NXT_ERROR; 470 } 471 472 dst = nxt_app_msg_write_length(dst, size + 1); /* +1 for trailing 0 */ 473 474 nxt_memcpy(dst, c, size); 475 dst[size] = 0; 476 477 nxt_debug(task, "nxt_app_msg_write: %uz %*s", size, size, c); 478 479 } else { 480 dst_length = 1; 481 482 dst = nxt_app_msg_write_get_buf(task, msg, dst_length); 483 if (nxt_slow_path(dst == NULL)) { 484 nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed", 485 dst_length); 486 return NXT_ERROR; 487 } 488 489 dst = nxt_app_msg_write_length(dst, 0); 490 491 nxt_debug(task, "nxt_app_msg_write: NULL"); 492 } 493 494 return NXT_OK; 495 } 496 497 498 nxt_int_t 499 nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg, 500 const nxt_str_t *prefix, u_char *c, size_t size) 501 { 502 u_char *dst, *src; 503 size_t i, length, dst_length; 504 505 length = prefix->length + size; 506 507 dst_length = length + (length < 128 ? 1 : 4) + 1; 508 509 dst = nxt_app_msg_write_get_buf(task, msg, dst_length); 510 if (nxt_slow_path(dst == NULL)) { 511 return NXT_ERROR; 512 } 513 514 dst = nxt_app_msg_write_length(dst, length + 1); /* +1 for trailing 0 */ 515 516 nxt_memcpy(dst, prefix->start, prefix->length); 517 dst += prefix->length; 518 519 src = c; 520 for (i = 0; i < size; i++, dst++, src++) { 521 522 if (*src >= 'a' && *src <= 'z') { 523 *dst = *src & ~0x20; 524 continue; 525 } 526 527 if (*src == '-') { 528 *dst = '_'; 529 continue; 530 } 531 532 *dst = *src; 533 } 534 535 *dst = 0; 536 537 return NXT_OK; 538 } 539 540 541 nxt_inline nxt_int_t 542 nxt_app_msg_read_size_(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size) 543 { 544 nxt_buf_t *buf; 545 546 do { 547 buf = msg->buf; 548 549 if (nxt_slow_path(buf == NULL)) { 550 return NXT_DONE; 551 } 552 553 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) { 554 if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { 555 msg->buf = buf->next; 556 continue; 557 } 558 return NXT_ERROR; 559 } 560 561 if (buf->mem.pos[0] >= 128) { 562 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) { 563 return NXT_ERROR; 564 } 565 } 566 567 break; 568 } while (1); 569 570 buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size); 571 572 return NXT_OK; 573 } 574 575 576 nxt_int_t 577 nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str) 578 { 579 size_t length; 580 nxt_int_t ret; 581 nxt_buf_t *buf; 582 583 ret = nxt_app_msg_read_size_(task, msg, &length); 584 if (ret != NXT_OK) { 585 return ret; 586 } 587 588 buf = msg->buf; 589 590 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t) length)) { 591 return NXT_ERROR; 592 } 593 594 if (length > 0) { 595 str->start = buf->mem.pos; 596 str->length = length - 1; 597 598 buf->mem.pos += length; 599 600 nxt_debug(task, "nxt_read_str: %uz %*s", length - 1, 601 length - 1, str->start); 602 603 } else { 604 str->start = NULL; 605 str->length = 0; 606 607 nxt_debug(task, "nxt_read_str: NULL"); 608 } 609 610 return NXT_OK; 611 } 612 613 614 size_t 615 nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst, 616 size_t size) 617 { 618 size_t res, read_size; 619 nxt_buf_t *buf; 620 621 res = 0; 622 623 while (size > 0) { 624 buf = msg->buf; 625 626 if (nxt_slow_path(buf == NULL)) { 627 break; 628 } 629 630 if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { 631 msg->buf = buf->next; 632 continue; 633 } 634 635 read_size = nxt_buf_mem_used_size(&buf->mem); 636 read_size = nxt_min(read_size, size); 637 638 dst = nxt_cpymem(dst, buf->mem.pos, read_size); 639 640 size -= read_size; 641 buf->mem.pos += read_size; 642 res += read_size; 643 } 644 645 nxt_debug(task, "nxt_read_raw: %uz", res); 646 647 return res; 648 } 649 650 651 nxt_int_t 652 nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n, 653 nxt_str_t *v) 654 { 655 nxt_int_t rc; 656 657 rc = nxt_app_msg_read_str(task, rmsg, n); 658 if (nxt_slow_path(rc != NXT_OK)) { 659 return rc; 660 } 661 662 rc = nxt_app_msg_read_str(task, rmsg, v); 663 if (nxt_slow_path(rc != NXT_OK)) { 664 return rc; 665 } 666 667 return rc; 668 } 669 670 671 nxt_int_t 672 nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size) 673 { 674 nxt_int_t ret; 675 676 ret = nxt_app_msg_read_size_(task, msg, size); 677 678 nxt_debug(task, "nxt_read_size: %d", (int) *size); 679 680 return ret; 681 } 682 683 684 nxt_int_t 685 nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ar) 686 { 687 ar->timer.handler = nxt_app_http_release; 688 nxt_timer_add(task->thread->engine, &ar->timer, 0); 689 690 return NXT_OK; 691 } 692 693 694 static void 695 nxt_app_http_release(nxt_task_t *task, void *obj, void *data) 696 { 697 nxt_timer_t *timer; 698 nxt_app_parse_ctx_t *ar; 699 700 timer = obj; 701 702 nxt_debug(task, "http app release"); 703 704 ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer); 705 706 nxt_mp_release(ar->request->mem_pool); 707 } 708 709 710 nxt_int_t 711 nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last) 712 { 713 nxt_int_t rc; 714 nxt_buf_t *b; 715 716 rc = NXT_OK; 717 718 if (nxt_slow_path(last == 1)) { 719 do { 720 b = *msg->buf; 721 722 if (b == NULL) { 723 b = nxt_buf_sync_alloc(msg->port->mem_pool, NXT_BUF_SYNC_LAST); 724 *msg->buf = b; 725 break; 726 } 727 728 msg->buf = &b->next; 729 } while(1); 730 } 731 732 if (nxt_slow_path(msg->write != NULL)) { 733 rc = nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_DATA, 734 -1, msg->stream, 0, msg->write); 735 736 msg->write = NULL; 737 msg->buf = &msg->write; 738 } 739 740 return rc; 741 } 742 743 744 nxt_int_t 745 nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c, 746 size_t size) 747 { 748 size_t free_size, copy_size; 749 nxt_buf_t *b; 750 751 nxt_debug(task, "nxt_app_msg_write_raw: %uz", size); 752 753 while (size > 0) { 754 b = *msg->buf; 755 756 if (b == NULL) { 757 b = nxt_port_mmap_get_buf(task, msg->port, size); 758 if (nxt_slow_path(b == NULL)) { 759 return NXT_ERROR; 760 } 761 762 *msg->buf = b; 763 } 764 765 do { 766 free_size = nxt_buf_mem_free_size(&b->mem); 767 768 if (free_size > 0) { 769 copy_size = nxt_min(free_size, size); 770 771 b->mem.free = nxt_cpymem(b->mem.free, c, copy_size); 772 773 size -= copy_size; 774 c += copy_size; 775 776 if (size == 0) { 777 return NXT_OK; 778 } 779 } 780 } while (nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK); 781 782 msg->buf = &b->next; 783 } 784 785 return NXT_OK; 786 } 787 788 789 nxt_app_lang_module_t * 790 nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name) 791 { 792 u_char *p, *end, *version; 793 size_t version_length; 794 nxt_uint_t i, n; 795 nxt_app_type_t type; 796 nxt_app_lang_module_t *lang; 797 798 end = name->start + name->length; 799 version = end; 800 801 for (p = name->start; p < end; p++) { 802 if (*p == ' ') { 803 version = p + 1; 804 break; 805 } 806 807 if (*p >= '0' && *p <= '9') { 808 version = p; 809 break; 810 } 811 } 812 813 type = nxt_app_parse_type(name->start, p - name->start); 814 815 if (type == NXT_APP_UNKNOWN) { 816 return NULL; 817 } 818 819 version_length = end - version; 820 821 lang = rt->languages->elts; 822 n = rt->languages->nelts; 823 824 for (i = 0; i < n; i++) { 825 826 /* 827 * Versions are sorted in descending order 828 * so first match chooses the highest version. 829 */ 830 831 if (lang[i].type == type 832 && nxt_strvers_match(lang[i].version, version, version_length)) 833 { 834 return &lang[i]; 835 } 836 } 837 838 return NULL; 839 } 840 841 842 nxt_app_type_t 843 nxt_app_parse_type(u_char *p, size_t length) 844 { 845 nxt_str_t str; 846 847 str.length = length; 848 str.start = p; 849 850 if (nxt_str_eq(&str, "python", 6)) { 851 return NXT_APP_PYTHON; 852 853 } else if (nxt_str_eq(&str, "php", 3)) { 854 return NXT_APP_PHP; 855 856 } else if (nxt_str_eq(&str, "go", 2)) { 857 return NXT_APP_GO; 858 859 } else if (nxt_str_eq(&str, "perl", 4)) { 860 return NXT_APP_PERL; 861 } 862 863 return NXT_APP_UNKNOWN; 864 } 865