30c30
< nxt_assert(port->app_stream == 0);
---
> nxt_assert(port->use_count == 0);
36a37,38
> nxt_thread_mutex_destroy(&port->write_mutex);
>
60a63
> port->use_count = 1;
64a68
> nxt_thread_mutex_create(&port->write_mutex);
76,77c80,81
< nxt_bool_t
< nxt_port_release(nxt_port_t *port)
---
> void
> nxt_port_close(nxt_task_t *task, nxt_port_t *port)
79,80c83,84
< nxt_thread_log_debug("port %p %d:%d release, type %d", port, port->pid,
< port->id, port->type);
---
> nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid,
> port->id, port->type);
90d93
< }
92,94c95,96
< if (port->type == NXT_PROCESS_WORKER) {
< if (nxt_router_app_remove_port(port) == 0) {
< return 0;
---
> if (port->app != NULL) {
> nxt_router_app_port_close(task, port);
96a99
> }
97a101,113
>
> static void
> nxt_port_release(nxt_task_t *task, nxt_port_t *port)
> {
> nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
> port->id, port->type);
>
> if (port->app != NULL) {
> nxt_router_app_use(task, port->app, -1);
>
> port->app = NULL;
> }
>
103,104d118
<
< return 1;
266c280
< nxt_runtime_port_add(rt, port);
---
> nxt_runtime_port_add(task, port);
267a282,283
> nxt_port_use(task, port, -1);
>
437c453
< nxt_runtime_process_remove(rt, process);
---
> nxt_runtime_process_remove(task, process);
446a463,549
>
>
> typedef struct {
> nxt_work_t work;
> nxt_port_t *port;
> nxt_port_post_handler_t handler;
> } nxt_port_work_t;
>
>
> static void
> nxt_port_post_handler(nxt_task_t *task, void *obj, void *data)
> {
> nxt_port_t *port;
> nxt_port_work_t *pw;
> nxt_port_post_handler_t handler;
>
> pw = obj;
> port = pw->port;
> handler = pw->handler;
>
> nxt_free(pw);
>
> handler(task, port, data);
>
> nxt_port_use(task, port, -1);
> }
>
>
> nxt_int_t
> nxt_port_post(nxt_task_t *task, nxt_port_t *port,
> nxt_port_post_handler_t handler, void *data)
> {
> nxt_port_work_t *pw;
>
> if (task->thread->engine == port->engine) {
> handler(task, port, data);
>
> return NXT_OK;
> }
>
> pw = nxt_zalloc(sizeof(nxt_port_work_t));
>
> if (nxt_slow_path(pw == NULL)) {
> return NXT_ERROR;
> }
>
> nxt_atomic_fetch_add(&port->use_count, 1);
>
> pw->work.handler = nxt_port_post_handler;
> pw->work.task = &port->engine->task;
> pw->work.obj = pw;
> pw->work.data = data;
>
> pw->port = port;
> pw->handler = handler;
>
> nxt_event_engine_post(port->engine, &pw->work);
>
> return NXT_OK;
> }
>
>
> static void
> nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data)
> {
> /* no op */
> }
>
>
> void
> nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i)
> {
> int c;
>
> c = nxt_atomic_fetch_add(&port->use_count, i);
>
> if (i < 0 && c == -i) {
>
> if (task->thread->engine == port->engine) {
> nxt_port_release(task, port);
>
> return;
> }
>
> nxt_port_post(task, port, nxt_port_release_handler, NULL);
> }
> }