nxt_port_socket.c (89:c3532440470d) nxt_port_socket.c (122:d18727e877c6)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 121 unchanged lines hidden (view full) ---

130void
131nxt_port_write_close(nxt_port_t *port)
132{
133 nxt_socket_close(port->socket.task, port->pair[1]);
134 port->pair[1] = -1;
135}
136
137
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 121 unchanged lines hidden (view full) ---

130void
131nxt_port_write_close(nxt_port_t *port)
132{
133 nxt_socket_close(port->socket.task, port->pair[1]);
134 port->pair[1] = -1;
135}
136
137
138static void
139nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data)
140{
141 nxt_event_engine_t *engine;
142 nxt_port_send_msg_t *msg;
143
144 msg = obj;
145 engine = data;
146
147#if (NXT_DEBUG)
148 if (nxt_slow_path(data != msg->engine)) {
149 nxt_log_alert(task->log, "release msg data (%p) != msg->engine (%p)",
150 data, msg->engine);
151 nxt_abort();
152 }
153#endif
154
155 if (engine != task->thread->engine) {
156
157 nxt_debug(task, "current thread is %PT, expected %PT",
158 task->thread->tid, engine->task.thread->tid);
159
160 nxt_event_engine_post(engine, &msg->work);
161
162 return;
163 }
164
165 nxt_mp_release(msg->mem_pool, obj);
166}
167
168
138nxt_int_t
139nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
140 nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
141{
142 nxt_queue_link_t *link;
143 nxt_port_send_msg_t *msg;
144
145 for (link = nxt_queue_first(&port->messages);

--- 9 unchanged lines hidden (view full) ---

155 * must be sent only in the first message of a stream.
156 */
157 nxt_buf_chain_add(&msg->buf, b);
158
159 return NXT_OK;
160 }
161 }
162
169nxt_int_t
170nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
171 nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
172{
173 nxt_queue_link_t *link;
174 nxt_port_send_msg_t *msg;
175
176 for (link = nxt_queue_first(&port->messages);

--- 9 unchanged lines hidden (view full) ---

186 * must be sent only in the first message of a stream.
187 */
188 nxt_buf_chain_add(&msg->buf, b);
189
190 return NXT_OK;
191 }
192 }
193
163 msg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_send_msg_t));
194 msg = nxt_mp_retain(port->mem_pool, sizeof(nxt_port_send_msg_t));
164 if (nxt_slow_path(msg == NULL)) {
165 return NXT_ERROR;
166 }
167
195 if (nxt_slow_path(msg == NULL)) {
196 return NXT_ERROR;
197 }
198
199 msg->link.next = NULL;
200 msg->link.prev = NULL;
201
168 msg->buf = b;
169 msg->fd = fd;
170 msg->share = 0;
202 msg->buf = b;
203 msg->fd = fd;
204 msg->share = 0;
205
206 msg->work.next = NULL;
207 msg->work.handler = nxt_port_release_send_msg;
208 msg->work.task = task;
209 msg->work.obj = msg;
210 msg->work.data = task->thread->engine;
211
212 msg->engine = task->thread->engine;
171 msg->mem_pool = port->mem_pool;
172
173 msg->port_msg.stream = stream;
174 msg->port_msg.pid = nxt_pid;
175 msg->port_msg.reply_port = reply_port;
176 msg->port_msg.type = type;
177 msg->port_msg.last = 0;
178 msg->port_msg.mmap = 0;

--- 108 unchanged lines hidden (view full) ---

287 if (msg->share >= port->max_share) {
288 msg->share = 0;
289 nxt_queue_remove(link);
290 nxt_queue_insert_tail(&port->messages, link);
291 }
292
293 } else {
294 nxt_queue_remove(link);
213 msg->mem_pool = port->mem_pool;
214
215 msg->port_msg.stream = stream;
216 msg->port_msg.pid = nxt_pid;
217 msg->port_msg.reply_port = reply_port;
218 msg->port_msg.type = type;
219 msg->port_msg.last = 0;
220 msg->port_msg.mmap = 0;

--- 108 unchanged lines hidden (view full) ---

329 if (msg->share >= port->max_share) {
330 msg->share = 0;
331 nxt_queue_remove(link);
332 nxt_queue_insert_tail(&port->messages, link);
333 }
334
335 } else {
336 nxt_queue_remove(link);
295 nxt_mp_free(msg->mem_pool, msg);
337 nxt_port_release_send_msg(task, msg, msg->engine);
296 }
297
298 } else if (nxt_slow_path(n == NXT_ERROR)) {
299 goto fail;
300 }
301
302 /* n == NXT_AGAIN */
303

--- 181 unchanged lines hidden ---
338 }
339
340 } else if (nxt_slow_path(n == NXT_ERROR)) {
341 goto fail;
342 }
343
344 /* n == NXT_AGAIN */
345

--- 181 unchanged lines hidden ---