nxt_port_socket.c (342:82c2825a617a) nxt_port_socket.c (343:9fa845db60fb)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 155 unchanged lines hidden (view full) ---

164
165
166nxt_int_t
167nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
168 nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
169{
170 nxt_port_send_msg_t *msg;
171
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 155 unchanged lines hidden (view full) ---

164
165
166nxt_int_t
167nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
168 nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
169{
170 nxt_port_send_msg_t *msg;
171
172 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
173
174 if ((type & NXT_PORT_MSG_SYNC) != 0) {
175 msg->opened = 0;
176 continue;
177 }
178
179 if (msg->port_msg.stream == stream
180 && msg->port_msg.reply_port == reply_port
181 && msg->port_msg.last == 0
182 && msg->opened) {
183
184 /*
185 * An fd is ignored since a file descriptor
186 * must be sent only in the first message of a stream.
187 */
188 nxt_buf_chain_add(&msg->buf, b);
189
190 msg->port_msg.last |= (type & NXT_PORT_MSG_LAST) != 0;
191
192 return NXT_OK;
193 }
194
195 } nxt_queue_loop;
196
197 msg = nxt_mp_retain(task->thread->engine->mem_pool,
198 sizeof(nxt_port_send_msg_t));
199 if (nxt_slow_path(msg == NULL)) {
200 return NXT_ERROR;
201 }
202
203 msg->link.next = NULL;
204 msg->link.prev = NULL;
205
206 msg->buf = b;
207 msg->fd = fd;
208 msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
209 msg->share = 0;
172 msg = nxt_mp_retain(task->thread->engine->mem_pool,
173 sizeof(nxt_port_send_msg_t));
174 if (nxt_slow_path(msg == NULL)) {
175 return NXT_ERROR;
176 }
177
178 msg->link.next = NULL;
179 msg->link.prev = NULL;
180
181 msg->buf = b;
182 msg->fd = fd;
183 msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
184 msg->share = 0;
210 msg->opened = 1;
211
212 msg->work.next = NULL;
213 msg->work.handler = nxt_port_release_send_msg;
214 msg->work.task = task;
215 msg->work.obj = msg;
216 msg->work.data = task->thread->engine;
217
218 msg->engine = task->thread->engine;
219 msg->mem_pool = msg->engine->mem_pool;
220
221 msg->port_msg.stream = stream;
222 msg->port_msg.pid = nxt_pid;
223 msg->port_msg.reply_port = reply_port;
224 msg->port_msg.type = type & NXT_PORT_MSG_MASK;
225 msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
226 msg->port_msg.mmap = 0;
227
185
186 msg->work.next = NULL;
187 msg->work.handler = nxt_port_release_send_msg;
188 msg->work.task = task;
189 msg->work.obj = msg;
190 msg->work.data = task->thread->engine;
191
192 msg->engine = task->thread->engine;
193 msg->mem_pool = msg->engine->mem_pool;
194
195 msg->port_msg.stream = stream;
196 msg->port_msg.pid = nxt_pid;
197 msg->port_msg.reply_port = reply_port;
198 msg->port_msg.type = type & NXT_PORT_MSG_MASK;
199 msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
200 msg->port_msg.mmap = 0;
201
202 nxt_thread_mutex_lock(&port->write_mutex);
203
228 nxt_queue_insert_tail(&port->messages, &msg->link);
229
204 nxt_queue_insert_tail(&port->messages, &msg->link);
205
206 nxt_thread_mutex_unlock(&port->write_mutex);
207
208 nxt_port_use(task, port, 1);
209
230 if (port->socket.write_ready) {
231 nxt_port_write_handler(task, &port->socket, NULL);
232 }
233
234 return NXT_OK;
235}
236
237
238static void
210 if (port->socket.write_ready) {
211 nxt_port_write_handler(task, &port->socket, NULL);
212 }
213
214 return NXT_OK;
215}
216
217
218static void
219nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
220{
221 nxt_fd_event_block_write(task->thread->engine, &port->socket);
222}
223
224
225static void
226nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
227{
228 nxt_fd_event_enable_write(task->thread->engine, &port->socket);
229}
230
231
232static void
239nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
240{
233nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
234{
235 int use_delta;
241 size_t plain_size;
242 ssize_t n;
236 size_t plain_size;
237 ssize_t n;
238 nxt_bool_t block_write, enable_write;
243 nxt_port_t *port;
244 struct iovec *iov;
245 nxt_work_queue_t *wq;
246 nxt_queue_link_t *link;
247 nxt_port_method_t m;
248 nxt_port_send_msg_t *msg;
249 nxt_sendbuf_coalesce_t sb;
250
251 port = nxt_container_of(obj, nxt_port_t, socket);
252
239 nxt_port_t *port;
240 struct iovec *iov;
241 nxt_work_queue_t *wq;
242 nxt_queue_link_t *link;
243 nxt_port_method_t m;
244 nxt_port_send_msg_t *msg;
245 nxt_sendbuf_coalesce_t sb;
246
247 port = nxt_container_of(obj, nxt_port_t, socket);
248
249 block_write = 0;
250 enable_write = 0;
251 use_delta = 0;
252
253 nxt_thread_mutex_lock(&port->write_mutex);
254
253 iov = port->iov;
254
255 do {
256 link = nxt_queue_first(&port->messages);
257
258 if (link == nxt_queue_tail(&port->messages)) {
255 iov = port->iov;
256
257 do {
258 link = nxt_queue_first(&port->messages);
259
260 if (link == nxt_queue_tail(&port->messages)) {
259 nxt_fd_event_block_write(task->thread->engine, &port->socket);
260 return;
261 block_write = 1;
262 goto unlock_mutex;
261 }
262
263 msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link);
264
265 iov[0].iov_base = &msg->port_msg;
266 iov[0].iov_len = sizeof(nxt_port_msg_t);
267
268 sb.buf = msg->buf;

--- 60 unchanged lines hidden (view full) ---

329 if (msg->share >= port->max_share) {
330 msg->share = 0;
331 nxt_queue_remove(link);
332 nxt_queue_insert_tail(&port->messages, link);
333 }
334
335 } else {
336 nxt_queue_remove(link);
263 }
264
265 msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link);
266
267 iov[0].iov_base = &msg->port_msg;
268 iov[0].iov_len = sizeof(nxt_port_msg_t);
269
270 sb.buf = msg->buf;

--- 60 unchanged lines hidden (view full) ---

331 if (msg->share >= port->max_share) {
332 msg->share = 0;
333 nxt_queue_remove(link);
334 nxt_queue_insert_tail(&port->messages, link);
335 }
336
337 } else {
338 nxt_queue_remove(link);
339 use_delta--;
337 nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
338 msg->engine);
339 }
340
341 } else if (nxt_slow_path(n == NXT_ERROR)) {
342 goto fail;
343 }
344
345 /* n == NXT_AGAIN */
346
347 } while (port->socket.write_ready);
348
349 if (nxt_fd_event_is_disabled(port->socket.write)) {
340 nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
341 msg->engine);
342 }
343
344 } else if (nxt_slow_path(n == NXT_ERROR)) {
345 goto fail;
346 }
347
348 /* n == NXT_AGAIN */
349
350 } while (port->socket.write_ready);
351
352 if (nxt_fd_event_is_disabled(port->socket.write)) {
350 /* TODO task->thread->engine or port->engine ? */
351 nxt_fd_event_enable_write(task->thread->engine, &port->socket);
353 enable_write = 1;
352 }
353
354 }
355
354 return;
356 goto unlock_mutex;
355
356fail:
357
357
358fail:
359
360 use_delta++;
361
358 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
362 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
359 nxt_port_error_handler, task, &port->socket, NULL);
363 nxt_port_error_handler, task, &port->socket,
364 &port->socket);
365
366unlock_mutex:
367 nxt_thread_mutex_unlock(&port->write_mutex);
368
369 if (block_write && nxt_fd_event_is_active(port->socket.write)) {
370 nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
371 }
372
373 if (enable_write) {
374 nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
375 }
376
377 if (use_delta != 0) {
378 nxt_port_use(task, port, use_delta);
379 }
360}
361
362
363void
364nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
365{
366 port->socket.fd = port->pair[0];
367 port->socket.log = &nxt_main_log;

--- 168 unchanged lines hidden (view full) ---

536 b->next = port->free_bufs;
537 port->free_bufs = b;
538}
539
540
541static void
542nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
543{
380}
381
382
383void
384nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
385{
386 port->socket.fd = port->pair[0];
387 port->socket.log = &nxt_main_log;

--- 168 unchanged lines hidden (view full) ---

556 b->next = port->free_bufs;
557 port->free_bufs = b;
558}
559
560
561static void
562nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
563{
564 int use_delta;
544 nxt_buf_t *b;
545 nxt_port_t *port;
546 nxt_work_queue_t *wq;
547 nxt_port_send_msg_t *msg;
548
549 nxt_debug(task, "port error handler %p", obj);
550 /* TODO */
551
552 port = nxt_container_of(obj, nxt_port_t, socket);
553
565 nxt_buf_t *b;
566 nxt_port_t *port;
567 nxt_work_queue_t *wq;
568 nxt_port_send_msg_t *msg;
569
570 nxt_debug(task, "port error handler %p", obj);
571 /* TODO */
572
573 port = nxt_container_of(obj, nxt_port_t, socket);
574
554 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
575 use_delta = 0;
555
576
556 wq = &task->thread->engine->fast_work_queue;
577 if (obj == data) {
578 use_delta--;
579 }
557
580
581 wq = &task->thread->engine->fast_work_queue;
582
583 nxt_thread_mutex_lock(&port->write_mutex);
584
585 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
586
558 for(b = msg->buf; b != NULL; b = b->next) {
559 if (nxt_buf_is_sync(b)) {
560 continue;
561 }
562
563 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
564 }
565
566 nxt_queue_remove(&msg->link);
587 for(b = msg->buf; b != NULL; b = b->next) {
588 if (nxt_buf_is_sync(b)) {
589 continue;
590 }
591
592 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
593 }
594
595 nxt_queue_remove(&msg->link);
596 use_delta--;
567 nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
568 msg->engine);
569
570 } nxt_queue_loop;
597 nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
598 msg->engine);
599
600 } nxt_queue_loop;
601
602 nxt_thread_mutex_unlock(&port->write_mutex);
603
604 if (use_delta != 0) {
605 nxt_port_use(task, port, use_delta);
606 }
571}
607}