1 2 /* 3 * Copyright (C) Max Romanov 4 * Copyright (C) Igor Sysoev 5 * Copyright (C) Valentin V. Bartenev 6 * Copyright (C) NGINX, Inc. 7 */ 8 9 #include <nxt_main.h> 10 #include <nxt_runtime.h> 11 #include <nxt_main_process.h> 12 #include <nxt_router.h> 13 #include <nxt_http.h> 14 #include <nxt_application.h> 15 #include <nxt_unit.h> 16 #include <nxt_port_memory_int.h> 17 18 #include <glob.h> 19 20 21 typedef struct { 22 nxt_app_type_t type; 23 nxt_str_t version; 24 nxt_str_t file; 25 } nxt_module_t; 26 27 28 static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path); 29 static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, 30 nxt_array_t *modules, const char *name); 31 static void nxt_discovery_completion_handler(nxt_task_t *task, void *obj, 32 void *data); 33 static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, 34 void *data); 35 static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task, 36 const char *name); 37 static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment); 38 39 static void nxt_app_http_release(nxt_task_t *task, void *obj, void *data); 40 41 42 static uint32_t compat[] = { 43 NXT_VERNUM, NXT_DEBUG, 44 }; 45 46 47 nxt_str_t nxt_server = nxt_string(NXT_SERVER); 48 49 50 static nxt_app_module_t *nxt_app; 51 52 53 nxt_int_t 54 nxt_discovery_start(nxt_task_t *task, void *data) 55 { 56 uint32_t stream; 57 nxt_buf_t *b; 58 nxt_int_t ret; 59 nxt_port_t *main_port, *discovery_port; 60 nxt_runtime_t *rt; 61 62 nxt_debug(task, "DISCOVERY"); 63 64 rt = task->thread->runtime; 65 66 b = nxt_discovery_modules(task, rt->modules); 67 if (nxt_slow_path(b == NULL)) { 68 return NXT_ERROR; 69 } 70 71 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 72 discovery_port = rt->port_by_type[NXT_PROCESS_DISCOVERY]; 73 74 stream = nxt_port_rpc_register_handler(task, discovery_port, 75 nxt_discovery_quit, 76 nxt_discovery_quit, 77 main_port->pid, NULL); 78 79 if (nxt_slow_path(stream == 0)) { 80 return NXT_ERROR; 81 } 82 83 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1, 84 stream, discovery_port->id, b); 85 86 if (nxt_slow_path(ret != NXT_OK)) { 87 nxt_port_rpc_cancel(task, discovery_port, stream); 88 return NXT_ERROR; 89 } 90 91 return NXT_OK; 92 } 93 94 95 static nxt_buf_t * 96 nxt_discovery_modules(nxt_task_t *task, const char *path) 97 { 98 char *name; 99 u_char *p, *end; 100 size_t size; 101 glob_t glb; 102 nxt_mp_t *mp; 103 nxt_buf_t *b; 104 nxt_int_t ret; 105 nxt_uint_t i, n; 106 nxt_array_t *modules; 107 nxt_module_t *module; 108 109 b = NULL; 110 111 mp = nxt_mp_create(1024, 128, 256, 32); 112 if (mp == NULL) { 113 return b; 114 } 115 116 ret = glob(path, 0, NULL, &glb); 117 118 n = glb.gl_pathc; 119 120 if (ret != 0) { 121 nxt_log(task, NXT_LOG_NOTICE, 122 "no modules matching: \"%s\" found", path); 123 n = 0; 124 } 125 126 modules = nxt_array_create(mp, n, sizeof(nxt_module_t)); 127 if (modules == NULL) { 128 goto fail; 129 } 130 131 for (i = 0; i < n; i++) { 132 name = glb.gl_pathv[i]; 133 134 ret = nxt_discovery_module(task, mp, modules, name); 135 if (ret != NXT_OK) { 136 goto fail; 137 } 138 } 139 140 size = nxt_length("[]"); 141 module = modules->elts; 142 n = modules->nelts; 143 144 for (i = 0; i < n; i++) { 145 nxt_debug(task, "module: %d %V %V", 146 module[i].type, &module[i].version, &module[i].file); 147 148 size += nxt_length("{\"type\": ,"); 149 size += nxt_length(" \"version\": \"\","); 150 size += nxt_length(" \"file\": \"\"},"); 151 152 size += NXT_INT_T_LEN 153 + module[i].version.length 154 + module[i].file.length; 155 } 156 157 b = nxt_buf_mem_alloc(mp, size, 0); 158 if (b == NULL) { 159 goto fail; 160 } 161 162 b->completion_handler = nxt_discovery_completion_handler; 163 164 p = b->mem.free; 165 end = b->mem.end; 166 *p++ = '['; 167 168 for (i = 0; i < n; i++) { 169 p = nxt_sprintf(p, end, 170 "{\"type\": %d, \"version\": \"%V\", \"file\": \"%V\"},", 171 module[i].type, &module[i].version, &module[i].file); 172 } 173 174 *p++ = ']'; 175 b->mem.free = p; 176 177 fail: 178 179 globfree(&glb); 180 181 return b; 182 } 183 184 185 static nxt_int_t 186 nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules, 187 const char *name) 188 { 189 void *dl; 190 nxt_str_t version; 191 nxt_int_t ret; 192 nxt_uint_t i, n; 193 nxt_module_t *module; 194 nxt_app_type_t type; 195 nxt_app_module_t *app; 196 197 /* 198 * Only memory allocation failure should return NXT_ERROR. 199 * Any module processing errors are ignored. 200 */ 201 ret = NXT_ERROR; 202 203 dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW); 204 205 if (dl == NULL) { 206 nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror()); 207 return NXT_OK; 208 } 209 210 app = dlsym(dl, "nxt_app_module"); 211 212 if (app != NULL) { 213 nxt_log(task, NXT_LOG_NOTICE, "module: %V %s \"%s\"", 214 &app->type, app->version, name); 215 216 if (app->compat_length != sizeof(compat) 217 || nxt_memcmp(app->compat, compat, sizeof(compat)) != 0) 218 { 219 nxt_log(task, NXT_LOG_NOTICE, "incompatible module %s", name); 220 221 goto done; 222 } 223 224 type = nxt_app_parse_type(app->type.start, app->type.length); 225 226 if (type == NXT_APP_UNKNOWN) { 227 nxt_log(task, NXT_LOG_NOTICE, "unknown module type %V", &app->type); 228 229 goto done; 230 } 231 232 module = modules->elts; 233 n = modules->nelts; 234 235 version.start = (u_char *) app->version; 236 version.length = nxt_strlen(app->version); 237 238 for (i = 0; i < n; i++) { 239 if (type == module[i].type 240 && nxt_strstr_eq(&module[i].version, &version)) 241 { 242 nxt_log(task, NXT_LOG_NOTICE, 243 "ignoring %s module with the same " 244 "application language version %V %V as in %V", 245 name, &app->type, &version, &module[i].file); 246 247 goto done; 248 } 249 } 250 251 module = nxt_array_add(modules); 252 if (module == NULL) { 253 goto fail; 254 } 255 256 module->type = type; 257 258 nxt_str_dup(mp, &module->version, &version); 259 if (module->version.start == NULL) { 260 goto fail; 261 } 262 263 module->file.length = nxt_strlen(name); 264 265 module->file.start = nxt_mp_alloc(mp, module->file.length); 266 if (module->file.start == NULL) { 267 goto fail; 268 } 269 270 nxt_memcpy(module->file.start, name, module->file.length); 271 272 } else { 273 nxt_alert(task, "dlsym(\"%s\"), failed: \"%s\"", name, dlerror()); 274 } 275 276 done: 277 278 ret = NXT_OK; 279 280 fail: 281 282 if (dlclose(dl) != 0) { 283 nxt_alert(task, "dlclose(\"%s\"), failed: \"%s\"", name, dlerror()); 284 } 285 286 return ret; 287 } 288 289 290 static void 291 nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data) 292 { 293 nxt_mp_t *mp; 294 nxt_buf_t *b; 295 296 b = obj; 297 mp = b->data; 298 299 nxt_mp_destroy(mp); 300 } 301 302 303 static void 304 nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) 305 { 306 nxt_worker_process_quit_handler(task, msg); 307 } 308 309 310 nxt_int_t 311 nxt_app_start(nxt_task_t *task, void *data) 312 { 313 nxt_int_t ret; 314 nxt_app_lang_module_t *lang; 315 nxt_common_app_conf_t *app_conf; 316 317 app_conf = data; 318 319 lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type); 320 if (nxt_slow_path(lang == NULL)) { 321 nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type); 322 return NXT_ERROR; 323 } 324 325 nxt_app = lang->module; 326 327 if (nxt_app == NULL) { 328 nxt_debug(task, "application language module: %s \"%s\"", 329 lang->version, lang->file); 330 331 nxt_app = nxt_app_module_load(task, lang->file); 332 } 333 334 if (app_conf->working_directory != NULL 335 && app_conf->working_directory[0] != 0) 336 { 337 ret = chdir(app_conf->working_directory); 338 339 if (nxt_slow_path(ret != 0)) { 340 nxt_log(task, NXT_LOG_WARN, "chdir(%s) failed %E", 341 app_conf->working_directory, nxt_errno); 342 343 return NXT_ERROR; 344 } 345 } 346 347 if (nxt_slow_path(nxt_app_set_environment(app_conf->environment) 348 != NXT_OK)) 349 { 350 nxt_alert(task, "failed to set environment"); 351 return NXT_ERROR; 352 } 353 354 ret = nxt_app->init(task, data); 355 356 if (nxt_slow_path(ret != NXT_OK)) { 357 nxt_debug(task, "application init failed"); 358 359 } else { 360 nxt_debug(task, "application init done"); 361 } 362 363 return ret; 364 } 365 366 367 static nxt_app_module_t * 368 nxt_app_module_load(nxt_task_t *task, const char *name) 369 { 370 void *dl; 371 372 dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY); 373 374 if (dl != NULL) { 375 return dlsym(dl, "nxt_app_module"); 376 } 377 378 nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror()); 379 380 return NULL; 381 } 382 383 384 static nxt_int_t 385 nxt_app_set_environment(nxt_conf_value_t *environment) 386 { 387 char *env, *p; 388 uint32_t next; 389 nxt_str_t name, value; 390 nxt_conf_value_t *value_obj; 391 392 if (environment != NULL) { 393 next = 0; 394 395 for ( ;; ) { 396 value_obj = nxt_conf_next_object_member(environment, &name, &next); 397 if (value_obj == NULL) { 398 break; 399 } 400 401 nxt_conf_get_string(value_obj, &value); 402 403 env = nxt_malloc(name.length + value.length + 2); 404 if (nxt_slow_path(env == NULL)) { 405 return NXT_ERROR; 406 } 407 408 p = nxt_cpymem(env, name.start, name.length); 409 *p++ = '='; 410 p = nxt_cpymem(p, value.start, value.length); 411 *p = '\0'; 412 413 if (nxt_slow_path(putenv(env) != 0)) { 414 return NXT_ERROR; 415 } 416 } 417 } 418 419 return NXT_OK; 420 } 421 422 423 nxt_int_t 424 nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ar) 425 { 426 ar->timer.handler = nxt_app_http_release; 427 nxt_timer_add(task->thread->engine, &ar->timer, 0); 428 429 return NXT_OK; 430 } 431 432 433 static void 434 nxt_app_http_release(nxt_task_t *task, void *obj, void *data) 435 { 436 nxt_timer_t *timer; 437 nxt_app_parse_ctx_t *ar; 438 439 timer = obj; 440 441 nxt_debug(task, "http app release"); 442 443 ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer); 444 445 nxt_mp_release(ar->request->mem_pool); 446 } 447 448 449 nxt_app_lang_module_t * 450 nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name) 451 { 452 u_char *p, *end, *version; 453 size_t version_length; 454 nxt_uint_t i, n; 455 nxt_app_type_t type; 456 nxt_app_lang_module_t *lang; 457 458 end = name->start + name->length; 459 version = end; 460 461 for (p = name->start; p < end; p++) { 462 if (*p == ' ') { 463 version = p + 1; 464 break; 465 } 466 467 if (*p >= '0' && *p <= '9') { 468 version = p; 469 break; 470 } 471 } 472 473 type = nxt_app_parse_type(name->start, p - name->start); 474 475 if (type == NXT_APP_UNKNOWN) { 476 return NULL; 477 } 478 479 version_length = end - version; 480 481 lang = rt->languages->elts; 482 n = rt->languages->nelts; 483 484 for (i = 0; i < n; i++) { 485 486 /* 487 * Versions are sorted in descending order 488 * so first match chooses the highest version. 489 */ 490 491 if (lang[i].type == type 492 && nxt_strvers_match(lang[i].version, version, version_length)) 493 { 494 return &lang[i]; 495 } 496 } 497 498 return NULL; 499 } 500 501 502 nxt_app_type_t 503 nxt_app_parse_type(u_char *p, size_t length) 504 { 505 nxt_str_t str; 506 507 str.length = length; 508 str.start = p; 509 510 if (nxt_str_eq(&str, "external", 8) || nxt_str_eq(&str, "go", 2)) { 511 return NXT_APP_EXTERNAL; 512 513 } else if (nxt_str_eq(&str, "python", 6)) { 514 return NXT_APP_PYTHON; 515 516 } else if (nxt_str_eq(&str, "php", 3)) { 517 return NXT_APP_PHP; 518 519 } else if (nxt_str_eq(&str, "perl", 4)) { 520 return NXT_APP_PERL; 521 522 } else if (nxt_str_eq(&str, "ruby", 4)) { 523 return NXT_APP_RUBY; 524 } 525 526 return NXT_APP_UNKNOWN; 527 } 528 529 530 nxt_int_t 531 nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init) 532 { 533 nxt_port_t *my_port, *main_port; 534 nxt_runtime_t *rt; 535 536 nxt_memzero(init, sizeof(nxt_unit_init_t)); 537 538 rt = task->thread->runtime; 539 540 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 541 if (nxt_slow_path(main_port == NULL)) { 542 return NXT_ERROR; 543 } 544 545 my_port = nxt_runtime_port_find(rt, nxt_pid, 0); 546 if (nxt_slow_path(my_port == NULL)) { 547 return NXT_ERROR; 548 } 549 550 init->ready_port.id.pid = main_port->pid; 551 init->ready_port.id.id = main_port->id; 552 init->ready_port.out_fd = main_port->pair[1]; 553 554 nxt_fd_blocking(task, main_port->pair[1]); 555 556 init->ready_stream = my_port->process->init->stream; 557 558 init->read_port.id.pid = my_port->pid; 559 init->read_port.id.id = my_port->id; 560 init->read_port.in_fd = my_port->pair[0]; 561 562 nxt_fd_blocking(task, my_port->pair[0]); 563 564 init->log_fd = 2; 565 566 return NXT_OK; 567 } 568