nxt_port_socket.c (194:6281674ecf4f) nxt_port_socket.c (197:ae8f843e1fd4)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 100 unchanged lines hidden (view full) ---

109 port->socket.log = &nxt_main_log;
110 port->socket.write_ready = 1;
111
112 port->engine = task->thread->engine;
113
114 port->socket.write_work_queue = &port->engine->fast_work_queue;
115 port->socket.write_handler = nxt_port_write_handler;
116 port->socket.error_handler = nxt_port_error_handler;
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 100 unchanged lines hidden (view full) ---

109 port->socket.log = &nxt_main_log;
110 port->socket.write_ready = 1;
111
112 port->engine = task->thread->engine;
113
114 port->socket.write_work_queue = &port->engine->fast_work_queue;
115 port->socket.write_handler = nxt_port_write_handler;
116 port->socket.error_handler = nxt_port_error_handler;
117
118 if (port->iov == NULL) {
119 port->iov = nxt_mp_get(port->mem_pool, sizeof(struct iovec) *
120 NXT_IOBUF_MAX * 10);
121 port->mmsg_buf = nxt_mp_get(port->mem_pool, sizeof(uint32_t) * 3 *
122 NXT_IOBUF_MAX * 10);
123 }
117}
118
119
120void
121nxt_port_write_close(nxt_port_t *port)
122{
123 nxt_socket_close(port->socket.task, port->pair[1]);
124 port->pair[1] = -1;

--- 94 unchanged lines hidden (view full) ---

219
220 return NXT_OK;
221}
222
223
224static void
225nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
226{
124}
125
126
127void
128nxt_port_write_close(nxt_port_t *port)
129{
130 nxt_socket_close(port->socket.task, port->pair[1]);
131 port->pair[1] = -1;

--- 94 unchanged lines hidden (view full) ---

226
227 return NXT_OK;
228}
229
230
231static void
232nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
233{
234 size_t plain_size;
227 ssize_t n;
228 nxt_port_t *port;
235 ssize_t n;
236 nxt_port_t *port;
229 struct iovec iov[NXT_IOBUF_MAX * 10];
237 struct iovec *iov;
230 nxt_work_queue_t *wq;
231 nxt_queue_link_t *link;
232 nxt_port_method_t m;
233 nxt_port_send_msg_t *msg;
234 nxt_sendbuf_coalesce_t sb;
235
238 nxt_work_queue_t *wq;
239 nxt_queue_link_t *link;
240 nxt_port_method_t m;
241 nxt_port_send_msg_t *msg;
242 nxt_sendbuf_coalesce_t sb;
243
236 size_t plain_size;
237 nxt_buf_t *plain_buf;
238
239 port = nxt_container_of(obj, nxt_port_t, socket);
240
244 port = nxt_container_of(obj, nxt_port_t, socket);
245
246 iov = port->iov;
247
241 do {
242 link = nxt_queue_first(&port->messages);
243
244 if (link == nxt_queue_tail(&port->messages)) {
245 nxt_fd_event_block_write(task->thread->engine, &port->socket);
246 return;
247 }
248

--- 15 unchanged lines hidden (view full) ---

264 if (m == NXT_PORT_METHOD_MMAP) {
265 sb.limit = (1ULL << 31) - 1;
266 sb.nmax = NXT_IOBUF_MAX * 10 - 1;
267 }
268
269 nxt_sendbuf_mem_coalesce(task, &sb);
270
271 plain_size = sb.size;
248 do {
249 link = nxt_queue_first(&port->messages);
250
251 if (link == nxt_queue_tail(&port->messages)) {
252 nxt_fd_event_block_write(task->thread->engine, &port->socket);
253 return;
254 }
255

--- 15 unchanged lines hidden (view full) ---

271 if (m == NXT_PORT_METHOD_MMAP) {
272 sb.limit = (1ULL << 31) - 1;
273 sb.nmax = NXT_IOBUF_MAX * 10 - 1;
274 }
275
276 nxt_sendbuf_mem_coalesce(task, &sb);
277
278 plain_size = sb.size;
272 plain_buf = msg->buf;
273
274 /*
275 * Send through mmap enabled only when payload
276 * is bigger than PORT_MMAP_MIN_SIZE.
277 */
278 if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
279 nxt_port_mmap_write(task, port, msg, &sb);
280

--- 16 unchanged lines hidden (view full) ---

297 if (msg->fd != -1 && msg->close_fd != 0) {
298 nxt_fd_close(msg->fd);
299
300 msg->fd = -1;
301 }
302
303 wq = &task->thread->engine->fast_work_queue;
304
279
280 /*
281 * Send through mmap enabled only when payload
282 * is bigger than PORT_MMAP_MIN_SIZE.
283 */
284 if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
285 nxt_port_mmap_write(task, port, msg, &sb);
286

--- 16 unchanged lines hidden (view full) ---

303 if (msg->fd != -1 && msg->close_fd != 0) {
304 nxt_fd_close(msg->fd);
305
306 msg->fd = -1;
307 }
308
309 wq = &task->thread->engine->fast_work_queue;
310
305 if (msg->buf != plain_buf) {
306 /*
307 * Complete crafted mmap_msgs buf and restore msg->buf
308 * for regular completion call.
309 */
310 nxt_port_mmap_completion(task, wq, msg->buf);
311
312 msg->buf = plain_buf;
313 }
314
315 msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size);
316
317 if (msg->buf != NULL) {
318 /*
319 * A file descriptor is sent only
320 * in the first message of a stream.
321 */
322 msg->fd = -1;
323 msg->share += n;
324
325 if (msg->share >= port->max_share) {
326 msg->share = 0;
327 nxt_queue_remove(link);
328 nxt_queue_insert_tail(&port->messages, link);
329 }
330
331 } else {
332 nxt_queue_remove(link);
311 msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size);
312
313 if (msg->buf != NULL) {
314 /*
315 * A file descriptor is sent only
316 * in the first message of a stream.
317 */
318 msg->fd = -1;
319 msg->share += n;
320
321 if (msg->share >= port->max_share) {
322 msg->share = 0;
323 nxt_queue_remove(link);
324 nxt_queue_insert_tail(&port->messages, link);
325 }
326
327 } else {
328 nxt_queue_remove(link);
333 nxt_port_release_send_msg(task, msg, msg->engine);
329 nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
330 msg->engine);
334 }
335
336 } else if (nxt_slow_path(n == NXT_ERROR)) {
337 goto fail;
338 }
339
340 /* n == NXT_AGAIN */
341

--- 189 unchanged lines hidden (view full) ---

531 b->next = port->free_bufs;
532 port->free_bufs = b;
533}
534
535
536static void
537nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
538{
331 }
332
333 } else if (nxt_slow_path(n == NXT_ERROR)) {
334 goto fail;
335 }
336
337 /* n == NXT_AGAIN */
338

--- 189 unchanged lines hidden (view full) ---

528 b->next = port->free_bufs;
529 port->free_bufs = b;
530}
531
532
533static void
534nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
535{
536 nxt_buf_t *b;
537 nxt_port_t *port;
538 nxt_work_queue_t *wq;
539 nxt_port_send_msg_t *msg;
540
539 nxt_debug(task, "port error handler %p", obj);
540 /* TODO */
541 nxt_debug(task, "port error handler %p", obj);
542 /* TODO */
543
544 port = nxt_container_of(obj, nxt_port_t, socket);
545
546 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
547
548 wq = &task->thread->engine->fast_work_queue;
549
550 for(b = msg->buf; b != NULL; b = b->next) {
551 if (nxt_buf_is_sync(b)) {
552 continue;
553 }
554
555 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
556 }
557
558 nxt_queue_remove(&msg->link);
559 nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
560 msg->engine);
561
562 } nxt_queue_loop;
541}
563}