1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 #include <nxt_runtime.h> 9 #include <nxt_port.h> 10 11 12 static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 13 static void nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj, 14 void *data); 15 16 17 void 18 nxt_port_create(nxt_thread_t *thread, nxt_port_t *port, 19 nxt_port_handler_t *handlers) 20 { 21 port->pid = nxt_pid; 22 port->engine = thread->engine->id; 23 port->handler = nxt_port_handler; 24 port->data = handlers; 25 26 nxt_port_write_close(port); 27 nxt_port_read_enable(&thread->engine->task, port); 28 } 29 30 31 void 32 nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type, 33 nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) 34 { 35 nxt_uint_t i, n, nprocesses, nports; 36 nxt_port_t *port; 37 nxt_process_t *process; 38 39 process = rt->processes->elts; 40 nprocesses = rt->processes->nelts; 41 42 for (i = 0; i < nprocesses; i++) { 43 44 if (nxt_pid != process[i].pid) { 45 port = process[i].ports->elts; 46 nports = process[i].ports->nelts; 47 48 for (n = 0; n < nports; n++) { 49 (void) nxt_port_socket_write(task, &port[n], type, 50 fd, stream, b); 51 } 52 } 53 } 54 } 55 56 57 static void 58 nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 59 { 60 nxt_port_handler_t *handlers; 61 62 if (nxt_fast_path(msg->type <= NXT_PORT_MSG_MAX)) { 63 64 nxt_debug(task, "port %d: message type:%uD", 65 msg->port->socket.fd, msg->type); 66 67 handlers = msg->port->data; 68 handlers[msg->type](task, msg); 69 70 return; 71 } 72 73 nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD", 74 msg->port->socket.fd, msg->type); 75 } 76 77 78 void 79 nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 80 { 81 nxt_runtime_quit(task); 82 } 83 84 85 void 86 nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, 87 nxt_port_t *new_port) 88 { 89 nxt_buf_t *b; 90 nxt_uint_t i, n; 91 nxt_port_t *port; 92 nxt_process_t *process; 93 nxt_port_msg_new_port_t *msg; 94 95 n = rt->processes->nelts; 96 if (n == 0) { 97 return; 98 } 99 100 nxt_debug(task, "new port %d for process %PI engine %uD", 101 new_port->socket.fd, new_port->pid, new_port->engine); 102 103 process = rt->processes->elts; 104 105 for (i = 0; i < n; i++) { 106 107 if (process[i].pid == new_port->pid || process[i].pid == nxt_pid) { 108 continue; 109 } 110 111 port = process[i].ports->elts; 112 113 b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0); 114 115 if (nxt_slow_path(b == NULL)) { 116 continue; 117 } 118 119 b->data = port; 120 b->completion_handler = nxt_port_new_port_buf_completion; 121 b->mem.free += sizeof(nxt_port_msg_new_port_t); 122 msg = (nxt_port_msg_new_port_t *) b->mem.pos; 123 124 msg->pid = new_port->pid; 125 msg->engine = new_port->engine; 126 msg->max_size = port->max_size; 127 msg->max_share = port->max_share; 128 129 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT, 130 new_port->socket.fd, 0, b); 131 } 132 } 133 134 135 static void 136 nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj, void *data) 137 { 138 nxt_buf_t *b; 139 nxt_port_t *port; 140 141 b = obj; 142 port = b->data; 143 144 /* TODO: b->mem.pos */ 145 146 nxt_buf_free(port->mem_pool, b); 147 } 148 149 150 void 151 nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 152 { 153 nxt_port_t *port; 154 nxt_process_t *process; 155 nxt_runtime_t *rt; 156 nxt_mem_pool_t *mp; 157 nxt_port_msg_new_port_t *new_port_msg; 158 159 rt = task->thread->runtime; 160 161 process = nxt_runtime_new_process(rt); 162 if (nxt_slow_path(process == NULL)) { 163 return; 164 } 165 166 port = nxt_array_zero_add(process->ports); 167 if (nxt_slow_path(port == NULL)) { 168 return; 169 } 170 171 mp = nxt_mem_pool_create(1024); 172 if (nxt_slow_path(mp == NULL)) { 173 return; 174 } 175 176 port->mem_pool = mp; 177 178 new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos; 179 msg->buf->mem.pos = msg->buf->mem.free; 180 181 nxt_debug(task, "new port %d received for process %PI engine %uD", 182 msg->fd, new_port_msg->pid, new_port_msg->engine); 183 184 process->pid = new_port_msg->pid; 185 186 port->pid = new_port_msg->pid; 187 port->engine = new_port_msg->engine; 188 port->pair[0] = -1; 189 port->pair[1] = msg->fd; 190 port->max_size = new_port_msg->max_size; 191 port->max_share = new_port_msg->max_share; 192 193 nxt_queue_init(&port->messages); 194 195 port->socket.task = task; 196 197 nxt_port_write_enable(task, port); 198 } 199 200 201 void 202 nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, 203 nxt_fd_t fd) 204 { 205 nxt_buf_t *b; 206 nxt_uint_t i, n; 207 nxt_port_t *port; 208 nxt_process_t *process; 209 210 n = rt->processes->nelts; 211 if (n == 0) { 212 return; 213 } 214 215 nxt_debug(task, "change log file #%ui fd:%FD", slot, fd); 216 217 process = rt->processes->elts; 218 219 /* process[0] is master process. */ 220 221 for (i = 1; i < n; i++) { 222 port = process[i].ports->elts; 223 224 b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0); 225 if (nxt_slow_path(b == NULL)) { 226 continue; 227 } 228 229 *(nxt_uint_t *) b->mem.pos = slot; 230 b->mem.free += sizeof(nxt_uint_t); 231 232 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, 233 fd, 0, b); 234 } 235 } 236 237 238 void 239 nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 240 { 241 nxt_buf_t *b; 242 nxt_uint_t slot; 243 nxt_file_t *log_file; 244 nxt_runtime_t *rt; 245 246 rt = task->thread->runtime; 247 248 b = msg->buf; 249 slot = *(nxt_uint_t *) b->mem.pos; 250 251 log_file = nxt_list_elt(rt->log_files, slot); 252 253 nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd); 254 255 /* 256 * The old log file descriptor must be closed at the moment when no 257 * other threads use it. dup2() allows to use the old file descriptor 258 * for new log file. This change is performed atomically in the kernel. 259 */ 260 if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) { 261 262 if (slot == 0) { 263 (void) nxt_file_stderr(log_file); 264 } 265 } 266 } 267 268 269 void 270 nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 271 { 272 nxt_buf_t *b; 273 274 b = msg->buf; 275 276 nxt_debug(task, "data: %*s", b->mem.free - b->mem.pos, b->mem.pos); 277 278 b->mem.pos = b->mem.free; 279 } 280 281 282 void 283 nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 284 { 285 nxt_debug(task, "port empty handler"); 286 } 287