nxt_port.c (342:82c2825a617a) nxt_port.c (343:9fa845db60fb)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8#include <nxt_runtime.h>
9#include <nxt_port.h>
10#include <nxt_router.h>
11
12
13static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
14
15static nxt_atomic_uint_t nxt_port_last_id = 1;
16
17
18static void
19nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
20{
21 nxt_mp_t *mp;
22 nxt_port_t *port;
23
24 port = obj;
25 mp = data;
26
27 nxt_assert(port->pair[0] == -1);
28 nxt_assert(port->pair[1] == -1);
29
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8#include <nxt_runtime.h>
9#include <nxt_port.h>
10#include <nxt_router.h>
11
12
13static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
14
15static nxt_atomic_uint_t nxt_port_last_id = 1;
16
17
18static void
19nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
20{
21 nxt_mp_t *mp;
22 nxt_port_t *port;
23
24 port = obj;
25 mp = data;
26
27 nxt_assert(port->pair[0] == -1);
28 nxt_assert(port->pair[1] == -1);
29
30 nxt_assert(port->app_stream == 0);
30 nxt_assert(port->use_count == 0);
31 nxt_assert(port->app_link.next == NULL);
32
33 nxt_assert(nxt_queue_is_empty(&port->messages));
34 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
35 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers));
36
31 nxt_assert(port->app_link.next == NULL);
32
33 nxt_assert(nxt_queue_is_empty(&port->messages));
34 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
35 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers));
36
37 nxt_thread_mutex_destroy(&port->write_mutex);
38
37 nxt_mp_free(mp, port);
38}
39
40
41nxt_port_t *
42nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
43 nxt_process_type_t type)
44{
45 nxt_mp_t *mp;
46 nxt_port_t *port;
47
48 mp = nxt_mp_create(1024, 128, 256, 32);
49
50 if (nxt_slow_path(mp == NULL)) {
51 return NULL;
52 }
53
54 port = nxt_mp_zalloc(mp, sizeof(nxt_port_t));
55
56 if (nxt_fast_path(port != NULL)) {
57 port->id = id;
58 port->pid = pid;
59 port->type = type;
60 port->mem_pool = mp;
39 nxt_mp_free(mp, port);
40}
41
42
43nxt_port_t *
44nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
45 nxt_process_type_t type)
46{
47 nxt_mp_t *mp;
48 nxt_port_t *port;
49
50 mp = nxt_mp_create(1024, 128, 256, 32);
51
52 if (nxt_slow_path(mp == NULL)) {
53 return NULL;
54 }
55
56 port = nxt_mp_zalloc(mp, sizeof(nxt_port_t));
57
58 if (nxt_fast_path(port != NULL)) {
59 port->id = id;
60 port->pid = pid;
61 port->type = type;
62 port->mem_pool = mp;
63 port->use_count = 1;
61
62 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
63
64 nxt_queue_init(&port->messages);
64
65 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
66
67 nxt_queue_init(&port->messages);
68 nxt_thread_mutex_create(&port->write_mutex);
65
66 } else {
67 nxt_mp_destroy(mp);
68 }
69
70 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type);
71
72 return port;
73}
74
75
69
70 } else {
71 nxt_mp_destroy(mp);
72 }
73
74 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type);
75
76 return port;
77}
78
79
76nxt_bool_t
77nxt_port_release(nxt_port_t *port)
80void
81nxt_port_close(nxt_task_t *task, nxt_port_t *port)
78{
82{
79 nxt_thread_log_debug("port %p %d:%d release, type %d", port, port->pid,
80 port->id, port->type);
83 nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid,
84 port->id, port->type);
81
82 if (port->pair[0] != -1) {
83 nxt_fd_close(port->pair[0]);
84 port->pair[0] = -1;
85 }
86
87 if (port->pair[1] != -1) {
88 nxt_fd_close(port->pair[1]);
89 port->pair[1] = -1;
85
86 if (port->pair[0] != -1) {
87 nxt_fd_close(port->pair[0]);
88 port->pair[0] = -1;
89 }
90
91 if (port->pair[1] != -1) {
92 nxt_fd_close(port->pair[1]);
93 port->pair[1] = -1;
90 }
91
94
92 if (port->type == NXT_PROCESS_WORKER) {
93 if (nxt_router_app_remove_port(port) == 0) {
94 return 0;
95 if (port->app != NULL) {
96 nxt_router_app_port_close(task, port);
95 }
96 }
97 }
98 }
99}
97
100
101
102static void
103nxt_port_release(nxt_task_t *task, nxt_port_t *port)
104{
105 nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
106 port->id, port->type);
107
108 if (port->app != NULL) {
109 nxt_router_app_use(task, port->app, -1);
110
111 port->app = NULL;
112 }
113
98 if (port->link.next != NULL) {
99 nxt_process_port_remove(port);
100 }
101
102 nxt_mp_release(port->mem_pool, NULL);
114 if (port->link.next != NULL) {
115 nxt_process_port_remove(port);
116 }
117
118 nxt_mp_release(port->mem_pool, NULL);
103
104 return 1;
105}
106
107
108nxt_port_id_t
109nxt_port_get_next_id()
110{
111 return nxt_atomic_fetch_add(&nxt_port_last_id, 1);
112}
113
114
115void
116nxt_port_reset_next_id()
117{
118 nxt_port_last_id = 1;
119}
120
121
122void
123nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
124 nxt_port_handlers_t *handlers)
125{
126 port->pid = nxt_pid;
127 port->handler = nxt_port_handler;
128 port->data = (nxt_port_handler_t *) (handlers);
129
130 nxt_port_read_enable(task, port);
131}
132
133
134static void
135nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
136{
137 nxt_port_handler_t *handlers;
138
139 if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) {
140
141 nxt_debug(task, "port %d: message type:%uD",
142 msg->port->socket.fd, msg->port_msg.type);
143
144 handlers = msg->port->data;
145 handlers[msg->port_msg.type](task, msg);
146
147 return;
148 }
149
150 nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD",
151 msg->port->socket.fd, msg->port_msg.type);
152}
153
154
155void
156nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
157{
158 nxt_runtime_quit(task);
159}
160
161
162void
163nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
164 nxt_port_t *new_port, uint32_t stream)
165{
166 nxt_port_t *port;
167 nxt_process_t *process;
168
169 nxt_debug(task, "new port %d for process %PI",
170 new_port->pair[1], new_port->pid);
171
172 nxt_runtime_process_each(rt, process) {
173
174 if (process->pid == new_port->pid || process->pid == nxt_pid) {
175 continue;
176 }
177
178 port = nxt_process_port_first(process);
179
180 if (port->type == NXT_PROCESS_MAIN
181 || port->type == NXT_PROCESS_CONTROLLER
182 || port->type == NXT_PROCESS_ROUTER)
183 {
184 (void) nxt_port_send_port(task, port, new_port, stream);
185 }
186
187 } nxt_runtime_process_loop;
188}
189
190
191nxt_int_t
192nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
193 uint32_t stream)
194{
195 nxt_buf_t *b;
196 nxt_port_msg_new_port_t *msg;
197
198 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
199 sizeof(nxt_port_data_t));
200 if (nxt_slow_path(b == NULL)) {
201 return NXT_ERROR;
202 }
203
204 nxt_debug(task, "send port %FD to process %PI",
205 new_port->pair[1], port->pid);
206
207 b->mem.free += sizeof(nxt_port_msg_new_port_t);
208 msg = (nxt_port_msg_new_port_t *) b->mem.pos;
209
210 msg->id = new_port->id;
211 msg->pid = new_port->pid;
212 msg->max_size = port->max_size;
213 msg->max_share = port->max_share;
214 msg->type = new_port->type;
215
216 return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT,
217 new_port->pair[1], stream, 0, b);
218}
219
220
221void
222nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
223{
224 nxt_port_t *port;
225 nxt_process_t *process;
226 nxt_runtime_t *rt;
227 nxt_port_msg_new_port_t *new_port_msg;
228
229 rt = task->thread->runtime;
230
231 new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
232
233 nxt_debug(task, "new port %d received for process %PI:%d",
234 msg->fd, new_port_msg->pid, new_port_msg->id);
235
236 port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
237 if (port != NULL) {
238 nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
239 new_port_msg->id);
240
241 nxt_fd_close(msg->fd);
242 msg->fd = -1;
243 return;
244 }
245
246 process = nxt_runtime_process_get(rt, new_port_msg->pid);
247 if (nxt_slow_path(process == NULL)) {
248 return;
249 }
250
251 port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid,
252 new_port_msg->type);
253 if (nxt_slow_path(port == NULL)) {
254 return;
255 }
256
257 nxt_process_port_add(task, process, port);
258
259 port->pair[0] = -1;
260 port->pair[1] = msg->fd;
261 port->max_size = new_port_msg->max_size;
262 port->max_share = new_port_msg->max_share;
263
264 port->socket.task = task;
265
119}
120
121
122nxt_port_id_t
123nxt_port_get_next_id()
124{
125 return nxt_atomic_fetch_add(&nxt_port_last_id, 1);
126}
127
128
129void
130nxt_port_reset_next_id()
131{
132 nxt_port_last_id = 1;
133}
134
135
136void
137nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
138 nxt_port_handlers_t *handlers)
139{
140 port->pid = nxt_pid;
141 port->handler = nxt_port_handler;
142 port->data = (nxt_port_handler_t *) (handlers);
143
144 nxt_port_read_enable(task, port);
145}
146
147
148static void
149nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
150{
151 nxt_port_handler_t *handlers;
152
153 if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) {
154
155 nxt_debug(task, "port %d: message type:%uD",
156 msg->port->socket.fd, msg->port_msg.type);
157
158 handlers = msg->port->data;
159 handlers[msg->port_msg.type](task, msg);
160
161 return;
162 }
163
164 nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD",
165 msg->port->socket.fd, msg->port_msg.type);
166}
167
168
169void
170nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
171{
172 nxt_runtime_quit(task);
173}
174
175
176void
177nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
178 nxt_port_t *new_port, uint32_t stream)
179{
180 nxt_port_t *port;
181 nxt_process_t *process;
182
183 nxt_debug(task, "new port %d for process %PI",
184 new_port->pair[1], new_port->pid);
185
186 nxt_runtime_process_each(rt, process) {
187
188 if (process->pid == new_port->pid || process->pid == nxt_pid) {
189 continue;
190 }
191
192 port = nxt_process_port_first(process);
193
194 if (port->type == NXT_PROCESS_MAIN
195 || port->type == NXT_PROCESS_CONTROLLER
196 || port->type == NXT_PROCESS_ROUTER)
197 {
198 (void) nxt_port_send_port(task, port, new_port, stream);
199 }
200
201 } nxt_runtime_process_loop;
202}
203
204
205nxt_int_t
206nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
207 uint32_t stream)
208{
209 nxt_buf_t *b;
210 nxt_port_msg_new_port_t *msg;
211
212 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
213 sizeof(nxt_port_data_t));
214 if (nxt_slow_path(b == NULL)) {
215 return NXT_ERROR;
216 }
217
218 nxt_debug(task, "send port %FD to process %PI",
219 new_port->pair[1], port->pid);
220
221 b->mem.free += sizeof(nxt_port_msg_new_port_t);
222 msg = (nxt_port_msg_new_port_t *) b->mem.pos;
223
224 msg->id = new_port->id;
225 msg->pid = new_port->pid;
226 msg->max_size = port->max_size;
227 msg->max_share = port->max_share;
228 msg->type = new_port->type;
229
230 return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT,
231 new_port->pair[1], stream, 0, b);
232}
233
234
235void
236nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
237{
238 nxt_port_t *port;
239 nxt_process_t *process;
240 nxt_runtime_t *rt;
241 nxt_port_msg_new_port_t *new_port_msg;
242
243 rt = task->thread->runtime;
244
245 new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
246
247 nxt_debug(task, "new port %d received for process %PI:%d",
248 msg->fd, new_port_msg->pid, new_port_msg->id);
249
250 port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
251 if (port != NULL) {
252 nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
253 new_port_msg->id);
254
255 nxt_fd_close(msg->fd);
256 msg->fd = -1;
257 return;
258 }
259
260 process = nxt_runtime_process_get(rt, new_port_msg->pid);
261 if (nxt_slow_path(process == NULL)) {
262 return;
263 }
264
265 port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid,
266 new_port_msg->type);
267 if (nxt_slow_path(port == NULL)) {
268 return;
269 }
270
271 nxt_process_port_add(task, process, port);
272
273 port->pair[0] = -1;
274 port->pair[1] = msg->fd;
275 port->max_size = new_port_msg->max_size;
276 port->max_share = new_port_msg->max_share;
277
278 port->socket.task = task;
279
266 nxt_runtime_port_add(rt, port);
280 nxt_runtime_port_add(task, port);
267
281
282 nxt_port_use(task, port, -1);
283
268 nxt_port_write_enable(task, port);
269
270 msg->new_port = port;
271}
272
273
274void
275nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
276{
277 nxt_port_t *port;
278 nxt_process_t *process;
279 nxt_runtime_t *rt;
280
281 rt = task->thread->runtime;
282
283 nxt_assert(nxt_runtime_is_main(rt));
284
285 process = nxt_runtime_process_get(rt, msg->port_msg.pid);
286 if (nxt_slow_path(process == NULL)) {
287 return;
288 }
289
290 process->ready = 1;
291
292 port = nxt_process_port_first(process);
293 if (nxt_slow_path(port == NULL)) {
294 return;
295 }
296
297 nxt_debug(task, "process %PI ready", msg->port_msg.pid);
298
299 nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
300}
301
302
303void
304nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
305{
306 nxt_runtime_t *rt;
307 nxt_process_t *process;
308
309 rt = task->thread->runtime;
310
311 if (nxt_slow_path(msg->fd == -1)) {
312 nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message");
313
314 return;
315 }
316
317 process = nxt_runtime_process_find(rt, msg->port_msg.pid);
318 if (nxt_slow_path(process == NULL)) {
319 nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI",
320 msg->port_msg.pid);
321
322 goto fail_close;
323 }
324
325 nxt_port_incoming_port_mmap(task, process, msg->fd);
326
327fail_close:
328
329 close(msg->fd);
330}
331
332
333void
334nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
335 nxt_fd_t fd)
336{
337 nxt_buf_t *b;
338 nxt_port_t *port;
339 nxt_process_t *process;
340
341 nxt_debug(task, "change log file #%ui fd:%FD", slot, fd);
342
343 nxt_runtime_process_each(rt, process) {
344
345 if (nxt_pid == process->pid) {
346 continue;
347 }
348
349 port = nxt_process_port_first(process);
350
351 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
352 sizeof(nxt_port_data_t));
353 if (nxt_slow_path(b == NULL)) {
354 continue;
355 }
356
357 *(nxt_uint_t *) b->mem.pos = slot;
358 b->mem.free += sizeof(nxt_uint_t);
359
360 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
361 fd, 0, 0, b);
362
363 } nxt_runtime_process_loop;
364}
365
366
367void
368nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
369{
370 nxt_buf_t *b;
371 nxt_uint_t slot;
372 nxt_file_t *log_file;
373 nxt_runtime_t *rt;
374
375 rt = task->thread->runtime;
376
377 b = msg->buf;
378 slot = *(nxt_uint_t *) b->mem.pos;
379
380 log_file = nxt_list_elt(rt->log_files, slot);
381
382 nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd);
383
384 /*
385 * The old log file descriptor must be closed at the moment when no
386 * other threads use it. dup2() allows to use the old file descriptor
387 * for new log file. This change is performed atomically in the kernel.
388 */
389 if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) {
390 if (slot == 0) {
391 (void) nxt_file_stderr(log_file);
392 }
393 }
394}
395
396
397void
398nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
399{
400 size_t dump_size;
401 nxt_buf_t *b;
402
403 b = msg->buf;
404 dump_size = b->mem.free - b->mem.pos;
405
406 if (dump_size > 300) {
407 dump_size = 300;
408 }
409
410 nxt_debug(task, "data: %*s", dump_size, b->mem.pos);
411}
412
413
414void
415nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
416{
417 nxt_buf_t *buf;
418 nxt_pid_t pid;
419 nxt_runtime_t *rt;
420 nxt_process_t *process;
421
422 buf = msg->buf;
423
424 nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
425
426 nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
427
428 nxt_debug(task, "port remove pid %PI handler", pid);
429
430 rt = task->thread->runtime;
431
432 nxt_port_rpc_remove_peer(task, msg->port, pid);
433
434 process = nxt_runtime_process_find(rt, pid);
435
436 if (process) {
284 nxt_port_write_enable(task, port);
285
286 msg->new_port = port;
287}
288
289
290void
291nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
292{
293 nxt_port_t *port;
294 nxt_process_t *process;
295 nxt_runtime_t *rt;
296
297 rt = task->thread->runtime;
298
299 nxt_assert(nxt_runtime_is_main(rt));
300
301 process = nxt_runtime_process_get(rt, msg->port_msg.pid);
302 if (nxt_slow_path(process == NULL)) {
303 return;
304 }
305
306 process->ready = 1;
307
308 port = nxt_process_port_first(process);
309 if (nxt_slow_path(port == NULL)) {
310 return;
311 }
312
313 nxt_debug(task, "process %PI ready", msg->port_msg.pid);
314
315 nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
316}
317
318
319void
320nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
321{
322 nxt_runtime_t *rt;
323 nxt_process_t *process;
324
325 rt = task->thread->runtime;
326
327 if (nxt_slow_path(msg->fd == -1)) {
328 nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message");
329
330 return;
331 }
332
333 process = nxt_runtime_process_find(rt, msg->port_msg.pid);
334 if (nxt_slow_path(process == NULL)) {
335 nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI",
336 msg->port_msg.pid);
337
338 goto fail_close;
339 }
340
341 nxt_port_incoming_port_mmap(task, process, msg->fd);
342
343fail_close:
344
345 close(msg->fd);
346}
347
348
349void
350nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
351 nxt_fd_t fd)
352{
353 nxt_buf_t *b;
354 nxt_port_t *port;
355 nxt_process_t *process;
356
357 nxt_debug(task, "change log file #%ui fd:%FD", slot, fd);
358
359 nxt_runtime_process_each(rt, process) {
360
361 if (nxt_pid == process->pid) {
362 continue;
363 }
364
365 port = nxt_process_port_first(process);
366
367 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
368 sizeof(nxt_port_data_t));
369 if (nxt_slow_path(b == NULL)) {
370 continue;
371 }
372
373 *(nxt_uint_t *) b->mem.pos = slot;
374 b->mem.free += sizeof(nxt_uint_t);
375
376 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
377 fd, 0, 0, b);
378
379 } nxt_runtime_process_loop;
380}
381
382
383void
384nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
385{
386 nxt_buf_t *b;
387 nxt_uint_t slot;
388 nxt_file_t *log_file;
389 nxt_runtime_t *rt;
390
391 rt = task->thread->runtime;
392
393 b = msg->buf;
394 slot = *(nxt_uint_t *) b->mem.pos;
395
396 log_file = nxt_list_elt(rt->log_files, slot);
397
398 nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd);
399
400 /*
401 * The old log file descriptor must be closed at the moment when no
402 * other threads use it. dup2() allows to use the old file descriptor
403 * for new log file. This change is performed atomically in the kernel.
404 */
405 if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) {
406 if (slot == 0) {
407 (void) nxt_file_stderr(log_file);
408 }
409 }
410}
411
412
413void
414nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
415{
416 size_t dump_size;
417 nxt_buf_t *b;
418
419 b = msg->buf;
420 dump_size = b->mem.free - b->mem.pos;
421
422 if (dump_size > 300) {
423 dump_size = 300;
424 }
425
426 nxt_debug(task, "data: %*s", dump_size, b->mem.pos);
427}
428
429
430void
431nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
432{
433 nxt_buf_t *buf;
434 nxt_pid_t pid;
435 nxt_runtime_t *rt;
436 nxt_process_t *process;
437
438 buf = msg->buf;
439
440 nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
441
442 nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
443
444 nxt_debug(task, "port remove pid %PI handler", pid);
445
446 rt = task->thread->runtime;
447
448 nxt_port_rpc_remove_peer(task, msg->port, pid);
449
450 process = nxt_runtime_process_find(rt, pid);
451
452 if (process) {
437 nxt_runtime_process_remove(rt, process);
453 nxt_runtime_process_remove(task, process);
438 }
439}
440
441
442void
443nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
444{
445 nxt_debug(task, "port empty handler");
446}
454 }
455}
456
457
458void
459nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
460{
461 nxt_debug(task, "port empty handler");
462}
463
464
465typedef struct {
466 nxt_work_t work;
467 nxt_port_t *port;
468 nxt_port_post_handler_t handler;
469} nxt_port_work_t;
470
471
472static void
473nxt_port_post_handler(nxt_task_t *task, void *obj, void *data)
474{
475 nxt_port_t *port;
476 nxt_port_work_t *pw;
477 nxt_port_post_handler_t handler;
478
479 pw = obj;
480 port = pw->port;
481 handler = pw->handler;
482
483 nxt_free(pw);
484
485 handler(task, port, data);
486
487 nxt_port_use(task, port, -1);
488}
489
490
491nxt_int_t
492nxt_port_post(nxt_task_t *task, nxt_port_t *port,
493 nxt_port_post_handler_t handler, void *data)
494{
495 nxt_port_work_t *pw;
496
497 if (task->thread->engine == port->engine) {
498 handler(task, port, data);
499
500 return NXT_OK;
501 }
502
503 pw = nxt_zalloc(sizeof(nxt_port_work_t));
504
505 if (nxt_slow_path(pw == NULL)) {
506 return NXT_ERROR;
507 }
508
509 nxt_atomic_fetch_add(&port->use_count, 1);
510
511 pw->work.handler = nxt_port_post_handler;
512 pw->work.task = &port->engine->task;
513 pw->work.obj = pw;
514 pw->work.data = data;
515
516 pw->port = port;
517 pw->handler = handler;
518
519 nxt_event_engine_post(port->engine, &pw->work);
520
521 return NXT_OK;
522}
523
524
525static void
526nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data)
527{
528 /* no op */
529}
530
531
532void
533nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i)
534{
535 int c;
536
537 c = nxt_atomic_fetch_add(&port->use_count, i);
538
539 if (i < 0 && c == -i) {
540
541 if (task->thread->engine == port->engine) {
542 nxt_port_release(task, port);
543
544 return;
545 }
546
547 nxt_port_post(task, port, nxt_port_release_handler, NULL);
548 }
549}