Lines Matching refs:port

26     nxt_port_t  *port;  in nxt_port_mp_cleanup()  local
28 port = obj; in nxt_port_mp_cleanup()
31 nxt_assert(port->pair[0] == -1); in nxt_port_mp_cleanup()
32 nxt_assert(port->pair[1] == -1); in nxt_port_mp_cleanup()
34 nxt_assert(port->use_count == 0); in nxt_port_mp_cleanup()
35 nxt_assert(port->app_link.next == NULL); in nxt_port_mp_cleanup()
36 nxt_assert(port->idle_link.next == NULL); in nxt_port_mp_cleanup()
38 nxt_assert(nxt_queue_is_empty(&port->messages)); in nxt_port_mp_cleanup()
39 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams)); in nxt_port_mp_cleanup()
40 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers)); in nxt_port_mp_cleanup()
42 nxt_thread_mutex_destroy(&port->write_mutex); in nxt_port_mp_cleanup()
44 nxt_mp_free(mp, port); in nxt_port_mp_cleanup()
53 nxt_port_t *port; in nxt_port_new() local
61 port = nxt_mp_zalloc(mp, sizeof(nxt_port_t)); in nxt_port_new()
63 if (nxt_fast_path(port != NULL)) { in nxt_port_new()
64 port->id = id; in nxt_port_new()
65 port->pid = pid; in nxt_port_new()
66 port->type = type; in nxt_port_new()
67 port->mem_pool = mp; in nxt_port_new()
68 port->use_count = 1; in nxt_port_new()
70 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp); in nxt_port_new()
72 nxt_queue_init(&port->messages); in nxt_port_new()
73 nxt_thread_mutex_create(&port->write_mutex); in nxt_port_new()
75 port->queue_fd = -1; in nxt_port_new()
81 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type); in nxt_port_new()
83 return port; in nxt_port_new()
88 nxt_port_close(nxt_task_t *task, nxt_port_t *port) in nxt_port_close() argument
92 nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid, in nxt_port_close()
93 port->id, port->type); in nxt_port_close()
95 if (port->pair[0] != -1) { in nxt_port_close()
96 nxt_port_rpc_close(task, port); in nxt_port_close()
98 nxt_fd_close(port->pair[0]); in nxt_port_close()
99 port->pair[0] = -1; in nxt_port_close()
102 if (port->pair[1] != -1) { in nxt_port_close()
103 nxt_fd_close(port->pair[1]); in nxt_port_close()
104 port->pair[1] = -1; in nxt_port_close()
106 if (port->app != NULL) { in nxt_port_close()
107 nxt_router_app_port_close(task, port); in nxt_port_close()
111 if (port->queue_fd != -1) { in nxt_port_close()
112 nxt_fd_close(port->queue_fd); in nxt_port_close()
113 port->queue_fd = -1; in nxt_port_close()
116 if (port->queue != NULL) { in nxt_port_close()
117 size = (port->id == (nxt_port_id_t) -1) ? sizeof(nxt_app_queue_t) in nxt_port_close()
119 nxt_mem_munmap(port->queue, size); in nxt_port_close()
121 port->queue = NULL; in nxt_port_close()
127 nxt_port_release(nxt_task_t *task, nxt_port_t *port) in nxt_port_release() argument
129 nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid, in nxt_port_release()
130 port->id, port->type); in nxt_port_release()
132 port->app = NULL; in nxt_port_release()
134 if (port->link.next != NULL) { in nxt_port_release()
135 nxt_assert(port->process != NULL); in nxt_port_release()
137 nxt_process_port_remove(port); in nxt_port_release()
139 nxt_process_use(task, port->process, -1); in nxt_port_release()
142 nxt_mp_release(port->mem_pool); in nxt_port_release()
161 nxt_port_enable(nxt_task_t *task, nxt_port_t *port, in nxt_port_enable() argument
164 port->pid = nxt_pid; in nxt_port_enable()
165 port->handler = nxt_port_handler; in nxt_port_enable()
166 port->data = (nxt_port_handler_t *) (handlers); in nxt_port_enable()
168 nxt_port_read_enable(task, port); in nxt_port_enable()
180 msg->port->socket.fd, msg->port_msg.type, in nxt_port_handler()
183 handlers = msg->port->data; in nxt_port_handler()
190 msg->port->socket.fd, msg->port_msg.type); in nxt_port_handler()
206 nxt_port_t *port; in nxt_port_send_new_port() local
218 port = nxt_process_port_first(process); in nxt_port_send_new_port()
220 if (nxt_proc_send_matrix[port->type][new_port->type]) { in nxt_port_send_new_port()
221 (void) nxt_port_send_port(task, port, new_port, stream); in nxt_port_send_new_port()
229 nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, in nxt_port_send_port() argument
242 new_port->pair[1], port->pid); in nxt_port_send_port()
249 msg->max_size = port->max_size; in nxt_port_send_port()
250 msg->max_share = port->max_share; in nxt_port_send_port()
253 return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT, in nxt_port_send_port()
262 nxt_port_t *port; in nxt_port_new_port_handler() local
275 port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id); in nxt_port_new_port_handler()
276 if (port != NULL) { in nxt_port_new_port_handler()
280 msg->u.new_port = port; in nxt_port_new_port_handler()
287 port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid, in nxt_port_new_port_handler()
290 if (nxt_slow_path(port == NULL)) { in nxt_port_new_port_handler()
296 port->pair[0] = -1; in nxt_port_new_port_handler()
297 port->pair[1] = msg->fd[0]; in nxt_port_new_port_handler()
298 port->max_size = new_port_msg->max_size; in nxt_port_new_port_handler()
299 port->max_share = new_port_msg->max_share; in nxt_port_new_port_handler()
301 port->socket.task = task; in nxt_port_new_port_handler()
303 nxt_port_write_enable(task, port); in nxt_port_new_port_handler()
305 msg->u.new_port = port; in nxt_port_new_port_handler()
312 nxt_port_t *port; in nxt_port_process_ready_handler() local
329 port = nxt_process_port_first(process); in nxt_port_process_ready_handler()
334 port->queue_fd = msg->fd[0]; in nxt_port_process_ready_handler()
335 port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), in nxt_port_process_ready_handler()
340 nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); in nxt_port_process_ready_handler()
379 nxt_port_t *port; in nxt_port_change_log_file() local
390 port = nxt_process_port_first(process); in nxt_port_change_log_file()
400 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, in nxt_port_change_log_file()
459 nxt_port_t *port; in nxt_port_remove_notify_others() local
479 port = nxt_process_port_first(p); in nxt_port_remove_notify_others()
481 if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) { in nxt_port_remove_notify_others()
494 nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, in nxt_port_remove_notify_others()
530 nxt_port_rpc_remove_peer(task, msg->port, pid); in nxt_port_remove_pid()
549 nxt_port_t *port; member
557 nxt_port_t *port; in nxt_port_post_handler() local
562 port = pw->port; in nxt_port_post_handler()
567 handler(task, port, data); in nxt_port_post_handler()
569 nxt_port_use(task, port, -1); in nxt_port_post_handler()
574 nxt_port_post(nxt_task_t *task, nxt_port_t *port, in nxt_port_post() argument
579 if (task->thread->engine == port->engine) { in nxt_port_post()
580 handler(task, port, data); in nxt_port_post()
591 nxt_atomic_fetch_add(&port->use_count, 1); in nxt_port_post()
594 pw->work.task = &port->engine->task; in nxt_port_post()
598 pw->port = port; in nxt_port_post()
601 nxt_event_engine_post(port->engine, &pw->work); in nxt_port_post()
608 nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data) in nxt_port_release_handler() argument
615 nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i) in nxt_port_use() argument
619 c = nxt_atomic_fetch_add(&port->use_count, i); in nxt_port_use()
623 if (port->engine == NULL || task->thread->engine == port->engine) { in nxt_port_use()
624 nxt_port_release(task, port); in nxt_port_use()
629 nxt_port_post(task, port, nxt_port_release_handler, NULL); in nxt_port_use()