nxt_port.c (342:82c2825a617a) nxt_port.c (343:9fa845db60fb)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8#include <nxt_runtime.h>

--- 13 unchanged lines hidden (view full) ---

22 nxt_port_t *port;
23
24 port = obj;
25 mp = data;
26
27 nxt_assert(port->pair[0] == -1);
28 nxt_assert(port->pair[1] == -1);
29
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8#include <nxt_runtime.h>

--- 13 unchanged lines hidden (view full) ---

22 nxt_port_t *port;
23
24 port = obj;
25 mp = data;
26
27 nxt_assert(port->pair[0] == -1);
28 nxt_assert(port->pair[1] == -1);
29
30 nxt_assert(port->app_stream == 0);
30 nxt_assert(port->use_count == 0);
31 nxt_assert(port->app_link.next == NULL);
32
33 nxt_assert(nxt_queue_is_empty(&port->messages));
34 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
35 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers));
36
31 nxt_assert(port->app_link.next == NULL);
32
33 nxt_assert(nxt_queue_is_empty(&port->messages));
34 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
35 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers));
36
37 nxt_thread_mutex_destroy(&port->write_mutex);
38
37 nxt_mp_free(mp, port);
38}
39
40
41nxt_port_t *
42nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
43 nxt_process_type_t type)
44{

--- 8 unchanged lines hidden (view full) ---

53
54 port = nxt_mp_zalloc(mp, sizeof(nxt_port_t));
55
56 if (nxt_fast_path(port != NULL)) {
57 port->id = id;
58 port->pid = pid;
59 port->type = type;
60 port->mem_pool = mp;
39 nxt_mp_free(mp, port);
40}
41
42
43nxt_port_t *
44nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
45 nxt_process_type_t type)
46{

--- 8 unchanged lines hidden (view full) ---

55
56 port = nxt_mp_zalloc(mp, sizeof(nxt_port_t));
57
58 if (nxt_fast_path(port != NULL)) {
59 port->id = id;
60 port->pid = pid;
61 port->type = type;
62 port->mem_pool = mp;
63 port->use_count = 1;
61
62 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
63
64 nxt_queue_init(&port->messages);
64
65 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
66
67 nxt_queue_init(&port->messages);
68 nxt_thread_mutex_create(&port->write_mutex);
65
66 } else {
67 nxt_mp_destroy(mp);
68 }
69
70 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type);
71
72 return port;
73}
74
75
69
70 } else {
71 nxt_mp_destroy(mp);
72 }
73
74 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type);
75
76 return port;
77}
78
79
76nxt_bool_t
77nxt_port_release(nxt_port_t *port)
80void
81nxt_port_close(nxt_task_t *task, nxt_port_t *port)
78{
82{
79 nxt_thread_log_debug("port %p %d:%d release, type %d", port, port->pid,
80 port->id, port->type);
83 nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid,
84 port->id, port->type);
81
82 if (port->pair[0] != -1) {
83 nxt_fd_close(port->pair[0]);
84 port->pair[0] = -1;
85 }
86
87 if (port->pair[1] != -1) {
88 nxt_fd_close(port->pair[1]);
89 port->pair[1] = -1;
85
86 if (port->pair[0] != -1) {
87 nxt_fd_close(port->pair[0]);
88 port->pair[0] = -1;
89 }
90
91 if (port->pair[1] != -1) {
92 nxt_fd_close(port->pair[1]);
93 port->pair[1] = -1;
90 }
91
94
92 if (port->type == NXT_PROCESS_WORKER) {
93 if (nxt_router_app_remove_port(port) == 0) {
94 return 0;
95 if (port->app != NULL) {
96 nxt_router_app_port_close(task, port);
95 }
96 }
97 }
98 }
99}
97
100
101
102static void
103nxt_port_release(nxt_task_t *task, nxt_port_t *port)
104{
105 nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
106 port->id, port->type);
107
108 if (port->app != NULL) {
109 nxt_router_app_use(task, port->app, -1);
110
111 port->app = NULL;
112 }
113
98 if (port->link.next != NULL) {
99 nxt_process_port_remove(port);
100 }
101
102 nxt_mp_release(port->mem_pool, NULL);
114 if (port->link.next != NULL) {
115 nxt_process_port_remove(port);
116 }
117
118 nxt_mp_release(port->mem_pool, NULL);
103
104 return 1;
105}
106
107
108nxt_port_id_t
109nxt_port_get_next_id()
110{
111 return nxt_atomic_fetch_add(&nxt_port_last_id, 1);
112}

--- 145 unchanged lines hidden (view full) ---

258
259 port->pair[0] = -1;
260 port->pair[1] = msg->fd;
261 port->max_size = new_port_msg->max_size;
262 port->max_share = new_port_msg->max_share;
263
264 port->socket.task = task;
265
119}
120
121
122nxt_port_id_t
123nxt_port_get_next_id()
124{
125 return nxt_atomic_fetch_add(&nxt_port_last_id, 1);
126}

--- 145 unchanged lines hidden (view full) ---

272
273 port->pair[0] = -1;
274 port->pair[1] = msg->fd;
275 port->max_size = new_port_msg->max_size;
276 port->max_share = new_port_msg->max_share;
277
278 port->socket.task = task;
279
266 nxt_runtime_port_add(rt, port);
280 nxt_runtime_port_add(task, port);
267
281
282 nxt_port_use(task, port, -1);
283
268 nxt_port_write_enable(task, port);
269
270 msg->new_port = port;
271}
272
273
274void
275nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)

--- 153 unchanged lines hidden (view full) ---

429
430 rt = task->thread->runtime;
431
432 nxt_port_rpc_remove_peer(task, msg->port, pid);
433
434 process = nxt_runtime_process_find(rt, pid);
435
436 if (process) {
284 nxt_port_write_enable(task, port);
285
286 msg->new_port = port;
287}
288
289
290void
291nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)

--- 153 unchanged lines hidden (view full) ---

445
446 rt = task->thread->runtime;
447
448 nxt_port_rpc_remove_peer(task, msg->port, pid);
449
450 process = nxt_runtime_process_find(rt, pid);
451
452 if (process) {
437 nxt_runtime_process_remove(rt, process);
453 nxt_runtime_process_remove(task, process);
438 }
439}
440
441
442void
443nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
444{
445 nxt_debug(task, "port empty handler");
446}
454 }
455}
456
457
458void
459nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
460{
461 nxt_debug(task, "port empty handler");
462}
463
464
465typedef struct {
466 nxt_work_t work;
467 nxt_port_t *port;
468 nxt_port_post_handler_t handler;
469} nxt_port_work_t;
470
471
472static void
473nxt_port_post_handler(nxt_task_t *task, void *obj, void *data)
474{
475 nxt_port_t *port;
476 nxt_port_work_t *pw;
477 nxt_port_post_handler_t handler;
478
479 pw = obj;
480 port = pw->port;
481 handler = pw->handler;
482
483 nxt_free(pw);
484
485 handler(task, port, data);
486
487 nxt_port_use(task, port, -1);
488}
489
490
491nxt_int_t
492nxt_port_post(nxt_task_t *task, nxt_port_t *port,
493 nxt_port_post_handler_t handler, void *data)
494{
495 nxt_port_work_t *pw;
496
497 if (task->thread->engine == port->engine) {
498 handler(task, port, data);
499
500 return NXT_OK;
501 }
502
503 pw = nxt_zalloc(sizeof(nxt_port_work_t));
504
505 if (nxt_slow_path(pw == NULL)) {
506 return NXT_ERROR;
507 }
508
509 nxt_atomic_fetch_add(&port->use_count, 1);
510
511 pw->work.handler = nxt_port_post_handler;
512 pw->work.task = &port->engine->task;
513 pw->work.obj = pw;
514 pw->work.data = data;
515
516 pw->port = port;
517 pw->handler = handler;
518
519 nxt_event_engine_post(port->engine, &pw->work);
520
521 return NXT_OK;
522}
523
524
525static void
526nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data)
527{
528 /* no op */
529}
530
531
532void
533nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i)
534{
535 int c;
536
537 c = nxt_atomic_fetch_add(&port->use_count, i);
538
539 if (i < 0 && c == -i) {
540
541 if (task->thread->engine == port->engine) {
542 nxt_port_release(task, port);
543
544 return;
545 }
546
547 nxt_port_post(task, port, nxt_port_release_handler, NULL);
548 }
549}