1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 #include <nxt_cycle.h> 9 #include <nxt_port.h> 10 11 12 static void nxt_process_port_handler(nxt_task_t *task, 13 nxt_port_recv_msg_t *msg); 14 static void nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, 15 void *data); 16 17 18 void 19 nxt_process_port_create(nxt_thread_t *thr, nxt_process_port_t *proc, 20 nxt_process_port_handler_t *handlers) 21 { 22 proc->pid = nxt_pid; 23 proc->engine = thr->engine->id; 24 proc->port->handler = nxt_process_port_handler; 25 proc->port->data = handlers; 26 27 nxt_port_write_close(proc->port); 28 nxt_port_read_enable(&thr->engine->task, proc->port); 29 } 30 31 32 void 33 nxt_process_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type, 34 nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) 35 { 36 nxt_uint_t i, n; 37 nxt_process_port_t *proc; 38 39 proc = cycle->processes->elts; 40 n = cycle->processes->nelts; 41 42 for (i = 0; i < n; i++) { 43 if (nxt_pid != proc[i].pid) { 44 (void) nxt_port_write(task, proc[i].port, type, fd, stream, b); 45 } 46 } 47 } 48 49 50 static void 51 nxt_process_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 52 { 53 nxt_process_port_handler_t *handlers; 54 55 if (nxt_fast_path(msg->type <= NXT_PORT_MSG_MAX)) { 56 57 nxt_debug(task, "port %d: message type:%uD", 58 msg->port->socket.fd, msg->type); 59 60 handlers = msg->port->data; 61 handlers[msg->type](task, msg); 62 63 return; 64 } 65 66 nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD", 67 msg->port->socket.fd, msg->type); 68 } 69 70 71 void 72 nxt_process_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 73 { 74 nxt_cycle_quit(task, NULL); 75 } 76 77 78 void 79 nxt_process_new_port(nxt_task_t *task, nxt_cycle_t *cycle, 80 nxt_process_port_t *proc) 81 { 82 nxt_buf_t *b; 83 nxt_uint_t i, n; 84 nxt_process_port_t *p; 85 nxt_proc_msg_new_port_t *new_port; 86 87 n = cycle->processes->nelts; 88 if (n == 0) { 89 return; 90 } 91 92 nxt_thread_log_debug("new port %d for process %PI engine %uD", 93 proc->port->socket.fd, proc->pid, proc->engine); 94 95 p = cycle->processes->elts; 96 97 for (i = 0; i < n; i++) { 98 99 if (proc->pid == p[i].pid || nxt_pid == p[i].pid || p[i].engine != 0) { 100 continue; 101 } 102 103 b = nxt_buf_mem_alloc(p[i].port->mem_pool, 104 sizeof(nxt_process_port_data_t), 0); 105 106 if (nxt_slow_path(b == NULL)) { 107 continue; 108 } 109 110 b->data = p[i].port; 111 b->completion_handler = nxt_process_new_port_buf_completion; 112 b->mem.free += sizeof(nxt_proc_msg_new_port_t); 113 new_port = (nxt_proc_msg_new_port_t *) b->mem.pos; 114 115 new_port->pid = proc->pid; 116 new_port->engine = proc->engine; 117 new_port->max_size = p[i].port->max_size; 118 new_port->max_share = p[i].port->max_share; 119 120 (void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_NEW_PORT, 121 proc->port->socket.fd, 0, b); 122 } 123 } 124 125 126 static void 127 nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, void *data) 128 { 129 nxt_buf_t *b; 130 nxt_port_t *port; 131 132 b = obj; 133 port = b->data; 134 135 /* TODO: b->mem.pos */ 136 137 nxt_buf_free(port->mem_pool, b); 138 } 139 140 141 void 142 nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 143 { 144 nxt_port_t *port; 145 nxt_cycle_t *cycle; 146 nxt_process_port_t *proc; 147 nxt_proc_msg_new_port_t *new_port; 148 149 cycle = nxt_thread_cycle(); 150 151 proc = nxt_array_add(cycle->processes); 152 if (nxt_slow_path(proc == NULL)) { 153 return; 154 } 155 156 port = nxt_port_alloc(task); 157 if (nxt_slow_path(port == NULL)) { 158 return; 159 } 160 161 proc->port = port; 162 163 new_port = (nxt_proc_msg_new_port_t *) msg->buf->mem.pos; 164 msg->buf->mem.pos = msg->buf->mem.free; 165 166 nxt_debug(task, "new port %d received for process %PI engine %uD", 167 msg->fd, new_port->pid, new_port->engine); 168 169 proc->pid = new_port->pid; 170 proc->engine = new_port->engine; 171 port->pair[1] = msg->fd; 172 port->max_size = new_port->max_size; 173 port->max_share = new_port->max_share; 174 175 /* A read port is not passed at all. */ 176 nxt_port_write_enable(task, port); 177 } 178 179 180 void 181 nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle, 182 nxt_uint_t slot, nxt_fd_t fd) 183 { 184 nxt_buf_t *b; 185 nxt_uint_t i, n; 186 nxt_process_port_t *p; 187 188 n = cycle->processes->nelts; 189 if (n == 0) { 190 return; 191 } 192 193 nxt_thread_log_debug("change log file #%ui fd:%FD", slot, fd); 194 195 p = cycle->processes->elts; 196 197 /* p[0] is master process. */ 198 199 for (i = 1; i < n; i++) { 200 b = nxt_buf_mem_alloc(p[i].port->mem_pool, 201 sizeof(nxt_process_port_data_t), 0); 202 203 if (nxt_slow_path(b == NULL)) { 204 continue; 205 } 206 207 *(nxt_uint_t *) b->mem.pos = slot; 208 b->mem.free += sizeof(nxt_uint_t); 209 210 (void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_PORTGE_FILE, 211 fd, 0, b); 212 } 213 } 214 215 216 void 217 nxt_process_port_change_log_file_handler(nxt_task_t *task, 218 nxt_port_recv_msg_t *msg) 219 { 220 nxt_buf_t *b; 221 nxt_uint_t slot; 222 nxt_file_t *log_file; 223 nxt_cycle_t *cycle; 224 225 cycle = nxt_thread_cycle(); 226 227 b = msg->buf; 228 slot = *(nxt_uint_t *) b->mem.pos; 229 230 log_file = nxt_list_elt(cycle->log_files, slot); 231 232 nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd); 233 234 /* 235 * The old log file descriptor must be closed at the moment when no 236 * other threads use it. dup2() allows to use the old file descriptor 237 * for new log file. This change is performed atomically in the kernel. 238 */ 239 if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) { 240 241 if (slot == 0) { 242 (void) nxt_file_stderr(log_file); 243 } 244 } 245 } 246 247 248 void 249 nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 250 { 251 nxt_buf_t *b; 252 253 b = msg->buf; 254 255 nxt_debug(task, "data: %*s", b->mem.free - b->mem.pos, b->mem.pos); 256 257 b->mem.pos = b->mem.free; 258 } 259 260 261 void 262 nxt_process_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 263 { 264 nxt_debug(task, "port empty handler"); 265 } 266