1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8#include <nxt_runtime.h> 9#include <nxt_port.h> 10#include <nxt_router.h> 11 12 13static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 14 15static nxt_atomic_uint_t nxt_port_last_id = 1; 16 17 18static void 19nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) 20{ 21 nxt_mp_t *mp; 22 nxt_port_t *port; 23 24 port = obj; 25 mp = data; 26 27 nxt_assert(port->pair[0] == -1); 28 nxt_assert(port->pair[1] == -1); 29
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8#include <nxt_runtime.h> 9#include <nxt_port.h> 10#include <nxt_router.h> 11 12 13static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 14 15static nxt_atomic_uint_t nxt_port_last_id = 1; 16 17 18static void 19nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) 20{ 21 nxt_mp_t *mp; 22 nxt_port_t *port; 23 24 port = obj; 25 mp = data; 26 27 nxt_assert(port->pair[0] == -1); 28 nxt_assert(port->pair[1] == -1); 29
|
30 nxt_assert(port->app_stream == 0);
| 30 nxt_assert(port->use_count == 0);
|
31 nxt_assert(port->app_link.next == NULL); 32 33 nxt_assert(nxt_queue_is_empty(&port->messages)); 34 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams)); 35 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers)); 36
| 31 nxt_assert(port->app_link.next == NULL); 32 33 nxt_assert(nxt_queue_is_empty(&port->messages)); 34 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams)); 35 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers)); 36
|
| 37 nxt_thread_mutex_destroy(&port->write_mutex); 38
|
37 nxt_mp_free(mp, port); 38} 39 40 41nxt_port_t * 42nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, 43 nxt_process_type_t type) 44{ 45 nxt_mp_t *mp; 46 nxt_port_t *port; 47 48 mp = nxt_mp_create(1024, 128, 256, 32); 49 50 if (nxt_slow_path(mp == NULL)) { 51 return NULL; 52 } 53 54 port = nxt_mp_zalloc(mp, sizeof(nxt_port_t)); 55 56 if (nxt_fast_path(port != NULL)) { 57 port->id = id; 58 port->pid = pid; 59 port->type = type; 60 port->mem_pool = mp;
| 39 nxt_mp_free(mp, port); 40} 41 42 43nxt_port_t * 44nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, 45 nxt_process_type_t type) 46{ 47 nxt_mp_t *mp; 48 nxt_port_t *port; 49 50 mp = nxt_mp_create(1024, 128, 256, 32); 51 52 if (nxt_slow_path(mp == NULL)) { 53 return NULL; 54 } 55 56 port = nxt_mp_zalloc(mp, sizeof(nxt_port_t)); 57 58 if (nxt_fast_path(port != NULL)) { 59 port->id = id; 60 port->pid = pid; 61 port->type = type; 62 port->mem_pool = mp;
|
| 63 port->use_count = 1;
|
61 62 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp); 63 64 nxt_queue_init(&port->messages);
| 64 65 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp); 66 67 nxt_queue_init(&port->messages);
|
| 68 nxt_thread_mutex_create(&port->write_mutex);
|
65 66 } else { 67 nxt_mp_destroy(mp); 68 } 69 70 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type); 71 72 return port; 73} 74 75
| 69 70 } else { 71 nxt_mp_destroy(mp); 72 } 73 74 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type); 75 76 return port; 77} 78 79
|
76nxt_bool_t 77nxt_port_release(nxt_port_t *port)
| 80void 81nxt_port_close(nxt_task_t *task, nxt_port_t *port)
|
78{
| 82{
|
79 nxt_thread_log_debug("port %p %d:%d release, type %d", port, port->pid, 80 port->id, port->type);
| 83 nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid, 84 port->id, port->type);
|
81 82 if (port->pair[0] != -1) { 83 nxt_fd_close(port->pair[0]); 84 port->pair[0] = -1; 85 } 86 87 if (port->pair[1] != -1) { 88 nxt_fd_close(port->pair[1]); 89 port->pair[1] = -1;
| 85 86 if (port->pair[0] != -1) { 87 nxt_fd_close(port->pair[0]); 88 port->pair[0] = -1; 89 } 90 91 if (port->pair[1] != -1) { 92 nxt_fd_close(port->pair[1]); 93 port->pair[1] = -1;
|
90 }
| |
91
| 94
|
92 if (port->type == NXT_PROCESS_WORKER) { 93 if (nxt_router_app_remove_port(port) == 0) { 94 return 0;
| 95 if (port->app != NULL) { 96 nxt_router_app_port_close(task, port);
|
95 } 96 }
| 97 } 98 }
|
| 99}
|
97
| 100
|
| 101 102static void 103nxt_port_release(nxt_task_t *task, nxt_port_t *port) 104{ 105 nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid, 106 port->id, port->type); 107 108 if (port->app != NULL) { 109 nxt_router_app_use(task, port->app, -1); 110 111 port->app = NULL; 112 } 113
|
98 if (port->link.next != NULL) { 99 nxt_process_port_remove(port); 100 } 101 102 nxt_mp_release(port->mem_pool, NULL);
| 114 if (port->link.next != NULL) { 115 nxt_process_port_remove(port); 116 } 117 118 nxt_mp_release(port->mem_pool, NULL);
|
103 104 return 1;
| |
105} 106 107 108nxt_port_id_t 109nxt_port_get_next_id() 110{ 111 return nxt_atomic_fetch_add(&nxt_port_last_id, 1); 112} 113 114 115void 116nxt_port_reset_next_id() 117{ 118 nxt_port_last_id = 1; 119} 120 121 122void 123nxt_port_enable(nxt_task_t *task, nxt_port_t *port, 124 nxt_port_handlers_t *handlers) 125{ 126 port->pid = nxt_pid; 127 port->handler = nxt_port_handler; 128 port->data = (nxt_port_handler_t *) (handlers); 129 130 nxt_port_read_enable(task, port); 131} 132 133 134static void 135nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 136{ 137 nxt_port_handler_t *handlers; 138 139 if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) { 140 141 nxt_debug(task, "port %d: message type:%uD", 142 msg->port->socket.fd, msg->port_msg.type); 143 144 handlers = msg->port->data; 145 handlers[msg->port_msg.type](task, msg); 146 147 return; 148 } 149 150 nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD", 151 msg->port->socket.fd, msg->port_msg.type); 152} 153 154 155void 156nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 157{ 158 nxt_runtime_quit(task); 159} 160 161 162void 163nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, 164 nxt_port_t *new_port, uint32_t stream) 165{ 166 nxt_port_t *port; 167 nxt_process_t *process; 168 169 nxt_debug(task, "new port %d for process %PI", 170 new_port->pair[1], new_port->pid); 171 172 nxt_runtime_process_each(rt, process) { 173 174 if (process->pid == new_port->pid || process->pid == nxt_pid) { 175 continue; 176 } 177 178 port = nxt_process_port_first(process); 179 180 if (port->type == NXT_PROCESS_MAIN 181 || port->type == NXT_PROCESS_CONTROLLER 182 || port->type == NXT_PROCESS_ROUTER) 183 { 184 (void) nxt_port_send_port(task, port, new_port, stream); 185 } 186 187 } nxt_runtime_process_loop; 188} 189 190 191nxt_int_t 192nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, 193 uint32_t stream) 194{ 195 nxt_buf_t *b; 196 nxt_port_msg_new_port_t *msg; 197 198 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 199 sizeof(nxt_port_data_t)); 200 if (nxt_slow_path(b == NULL)) { 201 return NXT_ERROR; 202 } 203 204 nxt_debug(task, "send port %FD to process %PI", 205 new_port->pair[1], port->pid); 206 207 b->mem.free += sizeof(nxt_port_msg_new_port_t); 208 msg = (nxt_port_msg_new_port_t *) b->mem.pos; 209 210 msg->id = new_port->id; 211 msg->pid = new_port->pid; 212 msg->max_size = port->max_size; 213 msg->max_share = port->max_share; 214 msg->type = new_port->type; 215 216 return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT, 217 new_port->pair[1], stream, 0, b); 218} 219 220 221void 222nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 223{ 224 nxt_port_t *port; 225 nxt_process_t *process; 226 nxt_runtime_t *rt; 227 nxt_port_msg_new_port_t *new_port_msg; 228 229 rt = task->thread->runtime; 230 231 new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos; 232 233 nxt_debug(task, "new port %d received for process %PI:%d", 234 msg->fd, new_port_msg->pid, new_port_msg->id); 235 236 port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id); 237 if (port != NULL) { 238 nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid, 239 new_port_msg->id); 240 241 nxt_fd_close(msg->fd); 242 msg->fd = -1; 243 return; 244 } 245 246 process = nxt_runtime_process_get(rt, new_port_msg->pid); 247 if (nxt_slow_path(process == NULL)) { 248 return; 249 } 250 251 port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid, 252 new_port_msg->type); 253 if (nxt_slow_path(port == NULL)) { 254 return; 255 } 256 257 nxt_process_port_add(task, process, port); 258 259 port->pair[0] = -1; 260 port->pair[1] = msg->fd; 261 port->max_size = new_port_msg->max_size; 262 port->max_share = new_port_msg->max_share; 263 264 port->socket.task = task; 265
| 119} 120 121 122nxt_port_id_t 123nxt_port_get_next_id() 124{ 125 return nxt_atomic_fetch_add(&nxt_port_last_id, 1); 126} 127 128 129void 130nxt_port_reset_next_id() 131{ 132 nxt_port_last_id = 1; 133} 134 135 136void 137nxt_port_enable(nxt_task_t *task, nxt_port_t *port, 138 nxt_port_handlers_t *handlers) 139{ 140 port->pid = nxt_pid; 141 port->handler = nxt_port_handler; 142 port->data = (nxt_port_handler_t *) (handlers); 143 144 nxt_port_read_enable(task, port); 145} 146 147 148static void 149nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 150{ 151 nxt_port_handler_t *handlers; 152 153 if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) { 154 155 nxt_debug(task, "port %d: message type:%uD", 156 msg->port->socket.fd, msg->port_msg.type); 157 158 handlers = msg->port->data; 159 handlers[msg->port_msg.type](task, msg); 160 161 return; 162 } 163 164 nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD", 165 msg->port->socket.fd, msg->port_msg.type); 166} 167 168 169void 170nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 171{ 172 nxt_runtime_quit(task); 173} 174 175 176void 177nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, 178 nxt_port_t *new_port, uint32_t stream) 179{ 180 nxt_port_t *port; 181 nxt_process_t *process; 182 183 nxt_debug(task, "new port %d for process %PI", 184 new_port->pair[1], new_port->pid); 185 186 nxt_runtime_process_each(rt, process) { 187 188 if (process->pid == new_port->pid || process->pid == nxt_pid) { 189 continue; 190 } 191 192 port = nxt_process_port_first(process); 193 194 if (port->type == NXT_PROCESS_MAIN 195 || port->type == NXT_PROCESS_CONTROLLER 196 || port->type == NXT_PROCESS_ROUTER) 197 { 198 (void) nxt_port_send_port(task, port, new_port, stream); 199 } 200 201 } nxt_runtime_process_loop; 202} 203 204 205nxt_int_t 206nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, 207 uint32_t stream) 208{ 209 nxt_buf_t *b; 210 nxt_port_msg_new_port_t *msg; 211 212 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 213 sizeof(nxt_port_data_t)); 214 if (nxt_slow_path(b == NULL)) { 215 return NXT_ERROR; 216 } 217 218 nxt_debug(task, "send port %FD to process %PI", 219 new_port->pair[1], port->pid); 220 221 b->mem.free += sizeof(nxt_port_msg_new_port_t); 222 msg = (nxt_port_msg_new_port_t *) b->mem.pos; 223 224 msg->id = new_port->id; 225 msg->pid = new_port->pid; 226 msg->max_size = port->max_size; 227 msg->max_share = port->max_share; 228 msg->type = new_port->type; 229 230 return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT, 231 new_port->pair[1], stream, 0, b); 232} 233 234 235void 236nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 237{ 238 nxt_port_t *port; 239 nxt_process_t *process; 240 nxt_runtime_t *rt; 241 nxt_port_msg_new_port_t *new_port_msg; 242 243 rt = task->thread->runtime; 244 245 new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos; 246 247 nxt_debug(task, "new port %d received for process %PI:%d", 248 msg->fd, new_port_msg->pid, new_port_msg->id); 249 250 port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id); 251 if (port != NULL) { 252 nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid, 253 new_port_msg->id); 254 255 nxt_fd_close(msg->fd); 256 msg->fd = -1; 257 return; 258 } 259 260 process = nxt_runtime_process_get(rt, new_port_msg->pid); 261 if (nxt_slow_path(process == NULL)) { 262 return; 263 } 264 265 port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid, 266 new_port_msg->type); 267 if (nxt_slow_path(port == NULL)) { 268 return; 269 } 270 271 nxt_process_port_add(task, process, port); 272 273 port->pair[0] = -1; 274 port->pair[1] = msg->fd; 275 port->max_size = new_port_msg->max_size; 276 port->max_share = new_port_msg->max_share; 277 278 port->socket.task = task; 279
|
266 nxt_runtime_port_add(rt, port);
| 280 nxt_runtime_port_add(task, port);
|
267
| 281
|
| 282 nxt_port_use(task, port, -1); 283
|
268 nxt_port_write_enable(task, port); 269 270 msg->new_port = port; 271} 272 273 274void 275nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 276{ 277 nxt_port_t *port; 278 nxt_process_t *process; 279 nxt_runtime_t *rt; 280 281 rt = task->thread->runtime; 282 283 nxt_assert(nxt_runtime_is_main(rt)); 284 285 process = nxt_runtime_process_get(rt, msg->port_msg.pid); 286 if (nxt_slow_path(process == NULL)) { 287 return; 288 } 289 290 process->ready = 1; 291 292 port = nxt_process_port_first(process); 293 if (nxt_slow_path(port == NULL)) { 294 return; 295 } 296 297 nxt_debug(task, "process %PI ready", msg->port_msg.pid); 298 299 nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); 300} 301 302 303void 304nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 305{ 306 nxt_runtime_t *rt; 307 nxt_process_t *process; 308 309 rt = task->thread->runtime; 310 311 if (nxt_slow_path(msg->fd == -1)) { 312 nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message"); 313 314 return; 315 } 316 317 process = nxt_runtime_process_find(rt, msg->port_msg.pid); 318 if (nxt_slow_path(process == NULL)) { 319 nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI", 320 msg->port_msg.pid); 321 322 goto fail_close; 323 } 324 325 nxt_port_incoming_port_mmap(task, process, msg->fd); 326 327fail_close: 328 329 close(msg->fd); 330} 331 332 333void 334nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, 335 nxt_fd_t fd) 336{ 337 nxt_buf_t *b; 338 nxt_port_t *port; 339 nxt_process_t *process; 340 341 nxt_debug(task, "change log file #%ui fd:%FD", slot, fd); 342 343 nxt_runtime_process_each(rt, process) { 344 345 if (nxt_pid == process->pid) { 346 continue; 347 } 348 349 port = nxt_process_port_first(process); 350 351 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 352 sizeof(nxt_port_data_t)); 353 if (nxt_slow_path(b == NULL)) { 354 continue; 355 } 356 357 *(nxt_uint_t *) b->mem.pos = slot; 358 b->mem.free += sizeof(nxt_uint_t); 359 360 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, 361 fd, 0, 0, b); 362 363 } nxt_runtime_process_loop; 364} 365 366 367void 368nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 369{ 370 nxt_buf_t *b; 371 nxt_uint_t slot; 372 nxt_file_t *log_file; 373 nxt_runtime_t *rt; 374 375 rt = task->thread->runtime; 376 377 b = msg->buf; 378 slot = *(nxt_uint_t *) b->mem.pos; 379 380 log_file = nxt_list_elt(rt->log_files, slot); 381 382 nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd); 383 384 /* 385 * The old log file descriptor must be closed at the moment when no 386 * other threads use it. dup2() allows to use the old file descriptor 387 * for new log file. This change is performed atomically in the kernel. 388 */ 389 if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) { 390 if (slot == 0) { 391 (void) nxt_file_stderr(log_file); 392 } 393 } 394} 395 396 397void 398nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 399{ 400 size_t dump_size; 401 nxt_buf_t *b; 402 403 b = msg->buf; 404 dump_size = b->mem.free - b->mem.pos; 405 406 if (dump_size > 300) { 407 dump_size = 300; 408 } 409 410 nxt_debug(task, "data: %*s", dump_size, b->mem.pos); 411} 412 413 414void 415nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 416{ 417 nxt_buf_t *buf; 418 nxt_pid_t pid; 419 nxt_runtime_t *rt; 420 nxt_process_t *process; 421 422 buf = msg->buf; 423 424 nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); 425 426 nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); 427 428 nxt_debug(task, "port remove pid %PI handler", pid); 429 430 rt = task->thread->runtime; 431 432 nxt_port_rpc_remove_peer(task, msg->port, pid); 433 434 process = nxt_runtime_process_find(rt, pid); 435 436 if (process) {
| 284 nxt_port_write_enable(task, port); 285 286 msg->new_port = port; 287} 288 289 290void 291nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 292{ 293 nxt_port_t *port; 294 nxt_process_t *process; 295 nxt_runtime_t *rt; 296 297 rt = task->thread->runtime; 298 299 nxt_assert(nxt_runtime_is_main(rt)); 300 301 process = nxt_runtime_process_get(rt, msg->port_msg.pid); 302 if (nxt_slow_path(process == NULL)) { 303 return; 304 } 305 306 process->ready = 1; 307 308 port = nxt_process_port_first(process); 309 if (nxt_slow_path(port == NULL)) { 310 return; 311 } 312 313 nxt_debug(task, "process %PI ready", msg->port_msg.pid); 314 315 nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); 316} 317 318 319void 320nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 321{ 322 nxt_runtime_t *rt; 323 nxt_process_t *process; 324 325 rt = task->thread->runtime; 326 327 if (nxt_slow_path(msg->fd == -1)) { 328 nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message"); 329 330 return; 331 } 332 333 process = nxt_runtime_process_find(rt, msg->port_msg.pid); 334 if (nxt_slow_path(process == NULL)) { 335 nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI", 336 msg->port_msg.pid); 337 338 goto fail_close; 339 } 340 341 nxt_port_incoming_port_mmap(task, process, msg->fd); 342 343fail_close: 344 345 close(msg->fd); 346} 347 348 349void 350nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, 351 nxt_fd_t fd) 352{ 353 nxt_buf_t *b; 354 nxt_port_t *port; 355 nxt_process_t *process; 356 357 nxt_debug(task, "change log file #%ui fd:%FD", slot, fd); 358 359 nxt_runtime_process_each(rt, process) { 360 361 if (nxt_pid == process->pid) { 362 continue; 363 } 364 365 port = nxt_process_port_first(process); 366 367 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 368 sizeof(nxt_port_data_t)); 369 if (nxt_slow_path(b == NULL)) { 370 continue; 371 } 372 373 *(nxt_uint_t *) b->mem.pos = slot; 374 b->mem.free += sizeof(nxt_uint_t); 375 376 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, 377 fd, 0, 0, b); 378 379 } nxt_runtime_process_loop; 380} 381 382 383void 384nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 385{ 386 nxt_buf_t *b; 387 nxt_uint_t slot; 388 nxt_file_t *log_file; 389 nxt_runtime_t *rt; 390 391 rt = task->thread->runtime; 392 393 b = msg->buf; 394 slot = *(nxt_uint_t *) b->mem.pos; 395 396 log_file = nxt_list_elt(rt->log_files, slot); 397 398 nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd); 399 400 /* 401 * The old log file descriptor must be closed at the moment when no 402 * other threads use it. dup2() allows to use the old file descriptor 403 * for new log file. This change is performed atomically in the kernel. 404 */ 405 if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) { 406 if (slot == 0) { 407 (void) nxt_file_stderr(log_file); 408 } 409 } 410} 411 412 413void 414nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 415{ 416 size_t dump_size; 417 nxt_buf_t *b; 418 419 b = msg->buf; 420 dump_size = b->mem.free - b->mem.pos; 421 422 if (dump_size > 300) { 423 dump_size = 300; 424 } 425 426 nxt_debug(task, "data: %*s", dump_size, b->mem.pos); 427} 428 429 430void 431nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 432{ 433 nxt_buf_t *buf; 434 nxt_pid_t pid; 435 nxt_runtime_t *rt; 436 nxt_process_t *process; 437 438 buf = msg->buf; 439 440 nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); 441 442 nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); 443 444 nxt_debug(task, "port remove pid %PI handler", pid); 445 446 rt = task->thread->runtime; 447 448 nxt_port_rpc_remove_peer(task, msg->port, pid); 449 450 process = nxt_runtime_process_find(rt, pid); 451 452 if (process) {
|
437 nxt_runtime_process_remove(rt, process);
| 453 nxt_runtime_process_remove(task, process);
|
438 } 439} 440 441 442void 443nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 444{ 445 nxt_debug(task, "port empty handler"); 446}
| 454 } 455} 456 457 458void 459nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 460{ 461 nxt_debug(task, "port empty handler"); 462}
|
| 463 464 465typedef struct { 466 nxt_work_t work; 467 nxt_port_t *port; 468 nxt_port_post_handler_t handler; 469} nxt_port_work_t; 470 471 472static void 473nxt_port_post_handler(nxt_task_t *task, void *obj, void *data) 474{ 475 nxt_port_t *port; 476 nxt_port_work_t *pw; 477 nxt_port_post_handler_t handler; 478 479 pw = obj; 480 port = pw->port; 481 handler = pw->handler; 482 483 nxt_free(pw); 484 485 handler(task, port, data); 486 487 nxt_port_use(task, port, -1); 488} 489 490 491nxt_int_t 492nxt_port_post(nxt_task_t *task, nxt_port_t *port, 493 nxt_port_post_handler_t handler, void *data) 494{ 495 nxt_port_work_t *pw; 496 497 if (task->thread->engine == port->engine) { 498 handler(task, port, data); 499 500 return NXT_OK; 501 } 502 503 pw = nxt_zalloc(sizeof(nxt_port_work_t)); 504 505 if (nxt_slow_path(pw == NULL)) { 506 return NXT_ERROR; 507 } 508 509 nxt_atomic_fetch_add(&port->use_count, 1); 510 511 pw->work.handler = nxt_port_post_handler; 512 pw->work.task = &port->engine->task; 513 pw->work.obj = pw; 514 pw->work.data = data; 515 516 pw->port = port; 517 pw->handler = handler; 518 519 nxt_event_engine_post(port->engine, &pw->work); 520 521 return NXT_OK; 522} 523 524 525static void 526nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data) 527{ 528 /* no op */ 529} 530 531 532void 533nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i) 534{ 535 int c; 536 537 c = nxt_atomic_fetch_add(&port->use_count, i); 538 539 if (i < 0 && c == -i) { 540 541 if (task->thread->engine == port->engine) { 542 nxt_port_release(task, port); 543 544 return; 545 } 546 547 nxt_port_post(task, port, nxt_port_release_handler, NULL); 548 } 549}
|
| |