Deleted Added
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 4 unchanged lines hidden (view full) ---

13#include <nxt_http.h>
14#include <nxt_port_memory_int.h>
15#include <nxt_unit_request.h>
16#include <nxt_unit_response.h>
17#include <nxt_router_request.h>
18#include <nxt_app_queue.h>
19#include <nxt_port_queue.h>
20
21#define NXT_SHARED_PORT_ID 0xFFFFu
22
23typedef struct {
24 nxt_str_t type;
25 uint32_t processes;
26 uint32_t max_processes;
27 uint32_t spare_processes;
28 nxt_msec_t timeout;
29 nxt_msec_t idle_timeout;
30 uint32_t requests;

--- 33 unchanged lines hidden (view full) ---

64
65
66typedef struct {
67 nxt_app_t *app;
68 nxt_router_temp_conf_t *temp_conf;
69} nxt_app_rpc_t;
70
71
72typedef struct {
73 nxt_app_joint_t *app_joint;
74 uint32_t generation;
75} nxt_app_joint_rpc_t;
76
77
78static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
79 nxt_mp_t *mp);
80static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
81static void nxt_router_greet_controller(nxt_task_t *task,
82 nxt_port_t *controller_port);
83
84static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
85
86static void nxt_router_new_port_handler(nxt_task_t *task,
87 nxt_port_recv_msg_t *msg);
88static void nxt_router_conf_data_handler(nxt_task_t *task,
89 nxt_port_recv_msg_t *msg);
90static void nxt_router_app_restart_handler(nxt_task_t *task,
91 nxt_port_recv_msg_t *msg);
92static void nxt_router_remove_pid_handler(nxt_task_t *task,
93 nxt_port_recv_msg_t *msg);
94static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
95 nxt_port_recv_msg_t *msg);
96
97static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
98static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
99static void nxt_router_conf_ready(nxt_task_t *task,

--- 186 unchanged lines hidden (view full) ---

286static const nxt_port_handlers_t nxt_router_process_port_handlers = {
287 .quit = nxt_signal_quit_handler,
288 .new_port = nxt_router_new_port_handler,
289 .get_port = nxt_router_get_port_handler,
290 .change_file = nxt_port_change_log_file_handler,
291 .mmap = nxt_port_mmap_handler,
292 .get_mmap = nxt_router_get_mmap_handler,
293 .data = nxt_router_conf_data_handler,
294 .app_restart = nxt_router_app_restart_handler,
295 .remove_pid = nxt_router_remove_pid_handler,
296 .access_log = nxt_router_access_log_reopen_handler,
297 .rpc_ready = nxt_port_rpc_handler,
298 .rpc_error = nxt_port_rpc_handler,
299 .oosm = nxt_router_oosm_handler,
300};
301
302

--- 82 unchanged lines hidden (view full) ---

385 -1, 0, 0, NULL);
386}
387
388
389static void
390nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
391 void *data)
392{
393 size_t size;
394 uint32_t stream;
395 nxt_mp_t *mp;
396 nxt_int_t ret;
397 nxt_app_t *app;
398 nxt_buf_t *b;
399 nxt_port_t *main_port;
400 nxt_runtime_t *rt;
401 nxt_app_joint_rpc_t *app_joint_rpc;
402
403 app = data;
404
405 rt = task->thread->runtime;
406 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
407
408 nxt_debug(task, "app '%V' %p start process", &app->name, app);
409

--- 4 unchanged lines hidden (view full) ---

414 if (nxt_slow_path(b == NULL)) {
415 goto failed;
416 }
417
418 nxt_buf_cpystr(b, &app->name);
419 *b->mem.free++ = '\0';
420 nxt_buf_cpystr(b, &app->conf);
421
422 app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
423 nxt_router_app_port_ready,
424 nxt_router_app_port_error,
425 sizeof(nxt_app_joint_rpc_t));
426 if (nxt_slow_path(app_joint_rpc == NULL)) {
427 goto failed;
428 }
429
430 stream = nxt_port_rpc_ex_stream(app_joint_rpc);
431
432 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
433 -1, stream, port->id, b);
434 if (nxt_slow_path(ret != NXT_OK)) {
435 nxt_port_rpc_cancel(task, port, stream);
436
437 goto failed;
438 }
439
440 app_joint_rpc->app_joint = app->joint;
441 app_joint_rpc->generation = app->generation;
442
443 nxt_router_app_joint_use(task, app->joint, 1);
444
445 nxt_router_app_use(task, app, -1);
446
447 return;
448
449failed:
450
451 if (b != NULL) {
452 mp = b->data;

--- 57 unchanged lines hidden (view full) ---

510}
511
512
513nxt_inline nxt_bool_t
514nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
515{
516 nxt_buf_t *b, *next;
517 nxt_bool_t cancelled;
518 nxt_port_t *app_port;
519 nxt_msg_info_t *msg_info;
520
521 msg_info = &req_rpc_data->msg_info;
522
523 if (msg_info->buf == NULL) {
524 return 0;
525 }
526
527 app_port = req_rpc_data->app_port;
528
529 if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
530 cancelled = nxt_app_queue_cancel(app_port->queue,
531 msg_info->tracking_cookie,
532 req_rpc_data->stream);
533
534 if (cancelled) {
535 nxt_debug(task, "stream #%uD: cancelled by router",
536 req_rpc_data->stream);
537 }
538
539 } else {
540 cancelled = 0;
541 }
542
543 for (b = msg_info->buf; b != NULL; b = next) {
544 next = b->next;
545 b->next = NULL;
546
547 if (b->is_port_mmap_sent) {
548 b->is_port_mmap_sent = cancelled == 0;

--- 259 unchanged lines hidden (view full) ---

808 if (msg->fd[0] != -1) {
809 nxt_fd_close(msg->fd[0]);
810 msg->fd[0] = -1;
811 }
812}
813
814
815static void
816nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
817{
818 nxt_app_t *app;
819 nxt_int_t ret;
820 nxt_str_t app_name;
821 nxt_port_t *port, *reply_port, *shared_port, *old_shared_port;
822 nxt_port_msg_type_t reply;
823
824 reply_port = nxt_runtime_port_find(task->thread->runtime,
825 msg->port_msg.pid,
826 msg->port_msg.reply_port);
827 if (nxt_slow_path(reply_port == NULL)) {
828 nxt_alert(task, "app_restart_handler: reply port not found");
829 return;
830 }
831
832 app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
833 app_name.start = msg->buf->mem.pos;
834
835 nxt_debug(task, "app_restart_handler: %V", &app_name);
836
837 app = nxt_router_app_find(&nxt_router->apps, &app_name);
838
839 if (nxt_fast_path(app != NULL)) {
840 shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
841 NXT_PROCESS_APP);
842 if (nxt_slow_path(shared_port == NULL)) {
843 goto fail;
844 }
845
846 ret = nxt_port_socket_init(task, shared_port, 0);
847 if (nxt_slow_path(ret != NXT_OK)) {
848 nxt_port_use(task, shared_port, -1);
849 goto fail;
850 }
851
852 ret = nxt_router_app_queue_init(task, shared_port);
853 if (nxt_slow_path(ret != NXT_OK)) {
854 nxt_port_write_close(shared_port);
855 nxt_port_read_close(shared_port);
856 nxt_port_use(task, shared_port, -1);
857 goto fail;
858 }
859
860 nxt_port_write_enable(task, shared_port);
861
862 nxt_thread_mutex_lock(&app->mutex);
863
864 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
865
866 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
867 0, 0, NULL);
868
869 } nxt_queue_loop;
870
871 app->generation++;
872
873 shared_port->app = app;
874
875 old_shared_port = app->shared_port;
876 old_shared_port->app = NULL;
877
878 app->shared_port = shared_port;
879
880 nxt_thread_mutex_unlock(&app->mutex);
881
882 nxt_port_close(task, old_shared_port);
883 nxt_port_use(task, old_shared_port, -1);
884
885 reply = NXT_PORT_MSG_RPC_READY_LAST;
886
887 } else {
888
889fail:
890
891 reply = NXT_PORT_MSG_RPC_ERROR;
892 }
893
894 nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
895 0, NULL);
896}
897
898
899static void
900nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
901 void *data)
902{
903 union {
904 nxt_pid_t removed_pid;
905 void *data;
906 } u;
907

--- 797 unchanged lines hidden (view full) ---

1705 app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1706 app_joint->idle_timer.task = &engine->task;
1707 app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1708
1709 app_joint->free_app_work.handler = nxt_router_free_app;
1710 app_joint->free_app_work.task = &engine->task;
1711 app_joint->free_app_work.obj = app_joint;
1712
1713 port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1714 NXT_PROCESS_APP);
1715 if (nxt_slow_path(port == NULL)) {
1716 return NXT_ERROR;
1717 }
1718
1719 ret = nxt_port_socket_init(task, port, 0);
1720 if (nxt_slow_path(ret != NXT_OK)) {
1721 nxt_port_use(task, port, -1);

--- 2609 unchanged lines hidden (view full) ---

4331 nxt_request_rpc_data_unlink(task, req_rpc_data);
4332}
4333
4334
4335static void
4336nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4337 void *data)
4338{
4339 nxt_app_t *app;
4340 nxt_bool_t start_process;
4341 nxt_port_t *port;
4342 nxt_app_joint_t *app_joint;
4343 nxt_app_joint_rpc_t *app_joint_rpc;
4344
4345 nxt_assert(data != NULL);
4346
4347 app_joint_rpc = data;
4348 app_joint = app_joint_rpc->app_joint;
4349 port = msg->u.new_port;
4350
4351 nxt_assert(app_joint != NULL);
4352 nxt_assert(port != NULL);
4353 nxt_assert(port->type == NXT_PROCESS_APP);
4354 nxt_assert(port->id == 0);
4355
4356 app = app_joint->app;
4357
4358 nxt_router_app_joint_use(task, app_joint, -1);
4359
4360 if (nxt_slow_path(app == NULL)) {
4361 nxt_debug(task, "new port ready for released app, send QUIT");
4362
4363 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4364
4365 return;
4366 }
4367
4368 nxt_thread_mutex_lock(&app->mutex);
4369
4370 nxt_assert(app->pending_processes != 0);
4371
4372 app->pending_processes--;
4373
4374 if (nxt_slow_path(app->generation != app_joint_rpc->generation)) {
4375 nxt_debug(task, "new port ready for restarted app, send QUIT");
4376
4377 start_process = !task->thread->engine->shutdown
4378 && nxt_router_app_can_start(app)
4379 && nxt_router_app_need_start(app);
4380
4381 if (start_process) {
4382 app->pending_processes++;
4383 }
4384
4385 nxt_thread_mutex_unlock(&app->mutex);
4386
4387 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4388
4389 if (start_process) {
4390 nxt_router_start_app_process(task, app);
4391 }
4392
4393 return;
4394 }
4395
4396 port->app = app;
4397 port->main_app_port = port;
4398
4399 app->processes++;
4400 nxt_port_hash_add(&app->port_hash, port);
4401 app->port_hash_count++;
4402
4403 nxt_thread_mutex_unlock(&app->mutex);
4404
4405 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4406 &app->name, port->pid, app->processes, app->pending_processes);

--- 37 unchanged lines hidden (view full) ---

4444 0, 0, b);
4445}
4446
4447
4448static void
4449nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4450 void *data)
4451{
4452 nxt_app_t *app;
4453 nxt_app_joint_t *app_joint;
4454 nxt_queue_link_t *link;
4455 nxt_http_request_t *r;
4456 nxt_app_joint_rpc_t *app_joint_rpc;
4457
4458 nxt_assert(data != NULL);
4459
4460 app_joint_rpc = data;
4461 app_joint = app_joint_rpc->app_joint;
4462
4463 nxt_assert(app_joint != NULL);
4464
4465 app = app_joint->app;
4466
4467 nxt_router_app_joint_use(task, app_joint, -1);
4468
4469 if (nxt_slow_path(app == NULL)) {
4470 nxt_debug(task, "start error for released app");

--- 149 unchanged lines hidden (view full) ---

4620 inc_use = -1;
4621 break;
4622 }
4623
4624 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4625 port->pid, port->id,
4626 (int) inc_use, (int) got_response);
4627
4628 if (port->id == NXT_SHARED_PORT_ID) {
4629 nxt_thread_mutex_lock(&app->mutex);
4630
4631 app->active_requests -= got_response + dec_requests;
4632
4633 nxt_thread_mutex_unlock(&app->mutex);
4634
4635 goto adjust_use;
4636 }

--- 353 unchanged lines hidden (view full) ---

4990 nxt_port_mmaps_destroy(&app->outgoing, 1);
4991
4992 nxt_thread_mutex_destroy(&app->outgoing.mutex);
4993
4994 if (app->shared_port != NULL) {
4995 app->shared_port->app = NULL;
4996 nxt_port_close(task, app->shared_port);
4997 nxt_port_use(task, app->shared_port, -1);
4998
4999 app->shared_port = NULL;
5000 }
5001
5002 nxt_thread_mutex_destroy(&app->mutex);
5003 nxt_mp_destroy(app->mem_pool);
5004
5005 app_joint->app = NULL;
5006
5007 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {

--- 798 unchanged lines hidden ---