1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 #include <nxt_runtime.h> 9 #include <nxt_port.h> 10 #include <nxt_router.h> 11 12 13 static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 14 15 static nxt_atomic_uint_t nxt_port_last_id = 1; 16 17 18 static void 19 nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) 20 { 21 nxt_mp_t *mp; 22 nxt_port_t *port; 23 24 port = obj; 25 mp = data; 26 27 nxt_assert(port->pair[0] == -1); 28 nxt_assert(port->pair[1] == -1); 29 30 nxt_assert(port->app_req_id == 0); 31 nxt_assert(port->app_link.next == NULL); 32 33 nxt_assert(nxt_queue_is_empty(&port->messages)); 34 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams)); 35 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers)); 36 37 nxt_mp_free(mp, port); 38 } 39 40 41 nxt_port_t * 42 nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, 43 nxt_process_type_t type) 44 { 45 nxt_mp_t *mp; 46 nxt_port_t *port; 47 48 mp = nxt_mp_create(1024, 128, 256, 32); 49 50 if (nxt_slow_path(mp == NULL)) { 51 return NULL; 52 } 53 54 port = nxt_mp_zalloc(mp, sizeof(nxt_port_t)); 55 56 if (nxt_fast_path(port != NULL)) { 57 port->id = id; 58 port->pid = pid; 59 port->type = type; 60 port->mem_pool = mp; 61 port->next_stream = 1; 62 63 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp); 64 65 nxt_queue_init(&port->messages); 66 67 } else { 68 nxt_mp_destroy(mp); 69 } 70 71 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type); 72 73 return port; 74 } 75 76 77 nxt_bool_t 78 nxt_port_release(nxt_port_t *port) 79 { 80 nxt_thread_log_debug("port %p %d:%d release, type %d", port, port->pid, 81 port->id, port->type); 82 83 if (port->pair[0] != -1) { 84 nxt_fd_close(port->pair[0]); 85 port->pair[0] = -1; 86 } 87 88 if (port->pair[1] != -1) { 89 nxt_fd_close(port->pair[1]); 90 port->pair[1] = -1; 91 } 92 93 if (port->type == NXT_PROCESS_WORKER) { 94 if (nxt_router_app_remove_port(port) == 0) { 95 return 0; 96 } 97 } 98 99 if (port->link.next != NULL) { 100 nxt_process_port_remove(port); 101 } 102 103 nxt_mp_release(port->mem_pool, NULL); 104 105 return 1; 106 } 107 108 109 nxt_port_id_t 110 nxt_port_get_next_id() 111 { 112 return nxt_atomic_fetch_add(&nxt_port_last_id, 1); 113 } 114 115 116 void 117 nxt_port_reset_next_id() 118 { 119 nxt_port_last_id = 1; 120 } 121 122 123 void 124 nxt_port_enable(nxt_task_t *task, nxt_port_t *port, 125 nxt_port_handler_t *handlers) 126 { 127 port->pid = nxt_pid; 128 port->handler = nxt_port_handler; 129 port->data = handlers; 130 131 nxt_port_read_enable(task, port); 132 } 133 134 135 static void 136 nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 137 { 138 nxt_port_handler_t *handlers; 139 140 if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) { 141 142 nxt_debug(task, "port %d: message type:%uD", 143 msg->port->socket.fd, msg->port_msg.type); 144 145 handlers = msg->port->data; 146 handlers[msg->port_msg.type](task, msg); 147 148 return; 149 } 150 151 nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD", 152 msg->port->socket.fd, msg->port_msg.type); 153 } 154 155 156 void 157 nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 158 { 159 nxt_runtime_quit(task); 160 } 161 162 163 void 164 nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, 165 nxt_port_t *new_port, uint32_t stream) 166 { 167 nxt_port_t *port; 168 nxt_process_t *process; 169 170 nxt_debug(task, "new port %d for process %PI", 171 new_port->pair[1], new_port->pid); 172 173 nxt_runtime_process_each(rt, process) { 174 175 if (process->pid == new_port->pid || process->pid == nxt_pid) { 176 continue; 177 } 178 179 port = nxt_process_port_first(process); 180 181 if (port->type == NXT_PROCESS_MAIN 182 || port->type == NXT_PROCESS_CONTROLLER 183 || port->type == NXT_PROCESS_ROUTER) 184 { 185 (void) nxt_port_send_port(task, port, new_port, stream); 186 } 187 188 } nxt_runtime_process_loop; 189 } 190 191 192 nxt_int_t 193 nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, 194 uint32_t stream) 195 { 196 nxt_buf_t *b; 197 nxt_port_msg_new_port_t *msg; 198 199 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, sizeof(nxt_port_data_t)); 200 if (nxt_slow_path(b == NULL)) { 201 return NXT_ERROR; 202 } 203 204 nxt_debug(task, "send port %FD to process %PI", 205 new_port->pair[1], port->pid); 206 207 b->mem.free += sizeof(nxt_port_msg_new_port_t); 208 msg = (nxt_port_msg_new_port_t *) b->mem.pos; 209 210 msg->id = new_port->id; 211 msg->pid = new_port->pid; 212 msg->max_size = port->max_size; 213 msg->max_share = port->max_share; 214 msg->type = new_port->type; 215 216 return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT, 217 new_port->pair[1], stream, 0, b); 218 } 219 220 221 void 222 nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 223 { 224 nxt_port_t *port; 225 nxt_process_t *process; 226 nxt_runtime_t *rt; 227 nxt_port_msg_new_port_t *new_port_msg; 228 229 rt = task->thread->runtime; 230 231 new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos; 232 233 nxt_debug(task, "new port %d received for process %PI:%d", 234 msg->fd, new_port_msg->pid, new_port_msg->id); 235 236 port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id); 237 if (port != NULL) { 238 nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid, 239 new_port_msg->id); 240 241 nxt_fd_close(msg->fd); 242 msg->fd = -1; 243 return; 244 } 245 246 process = nxt_runtime_process_get(rt, new_port_msg->pid); 247 if (nxt_slow_path(process == NULL)) { 248 return; 249 } 250 251 port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid, 252 new_port_msg->type); 253 if (nxt_slow_path(port == NULL)) { 254 return; 255 } 256 257 nxt_process_port_add(task, process, port); 258 259 port->pair[0] = -1; 260 port->pair[1] = msg->fd; 261 port->max_size = new_port_msg->max_size; 262 port->max_share = new_port_msg->max_share; 263 264 port->socket.task = task; 265 266 nxt_runtime_port_add(rt, port); 267 268 nxt_port_write_enable(task, port); 269 270 msg->new_port = port; 271 } 272 273 274 void 275 nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 276 { 277 nxt_port_t *port; 278 nxt_process_t *process; 279 nxt_runtime_t *rt; 280 281 rt = task->thread->runtime; 282 283 nxt_assert(nxt_runtime_is_main(rt)); 284 285 process = nxt_runtime_process_get(rt, msg->port_msg.pid); 286 if (nxt_slow_path(process == NULL)) { 287 return; 288 } 289 290 process->ready = 1; 291 292 port = nxt_process_port_first(process); 293 if (nxt_slow_path(port == NULL)) { 294 return; 295 } 296 297 nxt_debug(task, "process %PI ready", msg->port_msg.pid); 298 299 nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); 300 } 301 302 303 void 304 nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 305 { 306 nxt_runtime_t *rt; 307 nxt_process_t *process; 308 309 rt = task->thread->runtime; 310 311 if (nxt_slow_path(msg->fd == -1)) { 312 nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message"); 313 314 return; 315 } 316 317 process = nxt_runtime_process_find(rt, msg->port_msg.pid); 318 if (nxt_slow_path(process == NULL)) { 319 nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI", 320 msg->port_msg.pid); 321 322 goto fail_close; 323 } 324 325 nxt_port_incoming_port_mmap(task, process, msg->fd); 326 327 fail_close: 328 329 close(msg->fd); 330 } 331 332 333 void 334 nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, 335 nxt_fd_t fd) 336 { 337 nxt_buf_t *b; 338 nxt_port_t *port; 339 nxt_process_t *process; 340 341 nxt_debug(task, "change log file #%ui fd:%FD", slot, fd); 342 343 nxt_runtime_process_each(rt, process) { 344 345 if (nxt_pid == process->pid) { 346 continue; 347 } 348 349 port = nxt_process_port_first(process); 350 351 b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0); 352 if (nxt_slow_path(b == NULL)) { 353 continue; 354 } 355 356 *(nxt_uint_t *) b->mem.pos = slot; 357 b->mem.free += sizeof(nxt_uint_t); 358 359 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, 360 fd, 0, 0, b); 361 362 } nxt_runtime_process_loop; 363 } 364 365 366 void 367 nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 368 { 369 nxt_buf_t *b; 370 nxt_uint_t slot; 371 nxt_file_t *log_file; 372 nxt_runtime_t *rt; 373 374 rt = task->thread->runtime; 375 376 b = msg->buf; 377 slot = *(nxt_uint_t *) b->mem.pos; 378 379 log_file = nxt_list_elt(rt->log_files, slot); 380 381 nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd); 382 383 /* 384 * The old log file descriptor must be closed at the moment when no 385 * other threads use it. dup2() allows to use the old file descriptor 386 * for new log file. This change is performed atomically in the kernel. 387 */ 388 if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) { 389 if (slot == 0) { 390 (void) nxt_file_stderr(log_file); 391 } 392 } 393 } 394 395 396 void 397 nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 398 { 399 size_t dump_size; 400 nxt_buf_t *b; 401 402 b = msg->buf; 403 dump_size = b->mem.free - b->mem.pos; 404 405 if (dump_size > 300) { 406 dump_size = 300; 407 } 408 409 nxt_debug(task, "data: %*s", dump_size, b->mem.pos); 410 } 411 412 413 void 414 nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 415 { 416 nxt_buf_t *buf; 417 nxt_pid_t pid; 418 nxt_runtime_t *rt; 419 nxt_process_t *process; 420 421 buf = msg->buf; 422 423 nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); 424 425 nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); 426 427 nxt_debug(task, "port remove pid %PI handler", pid); 428 429 rt = task->thread->runtime; 430 431 nxt_port_rpc_remove_peer(task, msg->port, pid); 432 433 process = nxt_runtime_process_find(rt, pid); 434 435 if (process) { 436 nxt_runtime_process_remove(rt, process); 437 } 438 } 439 440 441 void 442 nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 443 { 444 nxt_debug(task, "port empty handler"); 445 } 446