nxt_port_socket.c (350:f50a43b89b5c) nxt_port_socket.c (352:47649fbbcb53)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 228 unchanged lines hidden (view full) ---

237 msg.share = 0;
238
239 msg.port_msg.stream = stream;
240 msg.port_msg.pid = nxt_pid;
241 msg.port_msg.reply_port = reply_port;
242 msg.port_msg.type = type & NXT_PORT_MSG_MASK;
243 msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
244 msg.port_msg.mmap = 0;
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 228 unchanged lines hidden (view full) ---

237 msg.share = 0;
238
239 msg.port_msg.stream = stream;
240 msg.port_msg.pid = nxt_pid;
241 msg.port_msg.reply_port = reply_port;
242 msg.port_msg.type = type & NXT_PORT_MSG_MASK;
243 msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
244 msg.port_msg.mmap = 0;
245 msg.port_msg.nf = 0;
246 msg.port_msg.mf = 0;
245
246 msg.work.data = NULL;
247
248 if (port->socket.write_ready) {
249 nxt_port_write_handler(task, &port->socket, &msg);
250 } else {
251 nxt_thread_mutex_lock(&port->write_mutex);
252

--- 66 unchanged lines hidden (view full) ---

319 sb.buf = msg->buf;
320 sb.iobuf = &iov[1];
321 sb.nmax = NXT_IOBUF_MAX - 1;
322 sb.sync = 0;
323 sb.last = 0;
324 sb.size = 0;
325 sb.limit = port->max_size;
326
247
248 msg.work.data = NULL;
249
250 if (port->socket.write_ready) {
251 nxt_port_write_handler(task, &port->socket, &msg);
252 } else {
253 nxt_thread_mutex_lock(&port->write_mutex);
254

--- 66 unchanged lines hidden (view full) ---

321 sb.buf = msg->buf;
322 sb.iobuf = &iov[1];
323 sb.nmax = NXT_IOBUF_MAX - 1;
324 sb.sync = 0;
325 sb.last = 0;
326 sb.size = 0;
327 sb.limit = port->max_size;
328
329 sb.limit_reached = 0;
330 sb.nmax_reached = 0;
331
327 m = nxt_port_mmap_get_method(task, port, msg->buf);
328
329 if (m == NXT_PORT_METHOD_MMAP) {
330 sb.limit = (1ULL << 31) - 1;
332 m = nxt_port_mmap_get_method(task, port, msg->buf);
333
334 if (m == NXT_PORT_METHOD_MMAP) {
335 sb.limit = (1ULL << 31) - 1;
331 sb.nmax = NXT_IOBUF_MAX * 10 - 1;
336 sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
337 port->max_size / PORT_MMAP_MIN_SIZE);
332 }
333
334 nxt_sendbuf_mem_coalesce(task, &sb);
335
336 plain_size = sb.size;
337
338 /*
339 * Send through mmap enabled only when payload
340 * is bigger than PORT_MMAP_MIN_SIZE.
341 */
342 if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
343 nxt_port_mmap_write(task, port, msg, &sb);
344
345 } else {
346 m = NXT_PORT_METHOD_PLAIN;
347 }
348
349 msg->port_msg.last |= sb.last;
338 }
339
340 nxt_sendbuf_mem_coalesce(task, &sb);
341
342 plain_size = sb.size;
343
344 /*
345 * Send through mmap enabled only when payload
346 * is bigger than PORT_MMAP_MIN_SIZE.
347 */
348 if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
349 nxt_port_mmap_write(task, port, msg, &sb);
350
351 } else {
352 m = NXT_PORT_METHOD_PLAIN;
353 }
354
355 msg->port_msg.last |= sb.last;
356 msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
350
351 n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
352
353 if (n > 0) {
354 if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
355 nxt_log(task, NXT_LOG_CRIT,
356 "port %d: short write: %z instead of %uz",
357 port->socket.fd, n, sb.size + iov[0].iov_len);

--- 5 unchanged lines hidden (view full) ---

363
364 msg->fd = -1;
365 }
366
367 msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size,
368 m == NXT_PORT_METHOD_MMAP);
369
370 if (msg->buf != NULL) {
357
358 n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
359
360 if (n > 0) {
361 if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
362 nxt_log(task, NXT_LOG_CRIT,
363 "port %d: short write: %z instead of %uz",
364 port->socket.fd, n, sb.size + iov[0].iov_len);

--- 5 unchanged lines hidden (view full) ---

370
371 msg->fd = -1;
372 }
373
374 msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size,
375 m == NXT_PORT_METHOD_MMAP);
376
377 if (msg->buf != NULL) {
378 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
379 msg->port_msg.stream);
380
371 /*
372 * A file descriptor is sent only
373 * in the first message of a stream.
374 */
375 msg->fd = -1;
376 msg->share += n;
381 /*
382 * A file descriptor is sent only
383 * in the first message of a stream.
384 */
385 msg->fd = -1;
386 msg->share += n;
387 msg->port_msg.nf = 1;
377
378 if (msg->share >= port->max_share) {
379 msg->share = 0;
380
381 if (msg->link.next != NULL) {
382 nxt_queue_remove(&msg->link);
383 use_delta--;
384 }

--- 144 unchanged lines hidden (view full) ---

529
530 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
531 nxt_port_error_handler, task, &port->socket, NULL);
532 return;
533 }
534}
535
536
388
389 if (msg->share >= port->max_share) {
390 msg->share = 0;
391
392 if (msg->link.next != NULL) {
393 nxt_queue_remove(&msg->link);
394 use_delta--;
395 }

--- 144 unchanged lines hidden (view full) ---

540
541 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
542 nxt_port_error_handler, task, &port->socket, NULL);
543 return;
544 }
545}
546
547
548static nxt_int_t
549nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
550{
551 nxt_port_recv_msg_t *fmsg;
552
553 fmsg = data;
554
555 if (lhq->key.length == sizeof(uint32_t)
556 && *(uint32_t *) lhq->key.start == fmsg->port_msg.stream)
557 {
558 return NXT_OK;
559 }
560
561 return NXT_DECLINED;
562}
563
564
565static void *
566nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
567{
568 return nxt_mp_alloc(ctx, size);
569}
570
571
537static void
572static void
573nxt_port_lvlhsh_frag_free(void *ctx, void *p)
574{
575 return nxt_mp_free(ctx, p);
576}
577
578
579static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = {
580 NXT_LVLHSH_DEFAULT,
581 nxt_port_lvlhsh_frag_test,
582 nxt_port_lvlhsh_frag_alloc,
583 nxt_port_lvlhsh_frag_free,
584};
585
586
587static nxt_port_recv_msg_t *
588nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
589 nxt_port_recv_msg_t *msg)
590{
591 nxt_int_t res;
592 nxt_lvlhsh_query_t lhq;
593 nxt_port_recv_msg_t *fmsg;
594
595 nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
596
597 fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
598
599 if (nxt_slow_path(fmsg == NULL)) {
600 return NULL;
601 }
602
603 *fmsg = *msg;
604
605 lhq.key_hash = nxt_murmur_hash2(&fmsg->port_msg.stream, sizeof(uint32_t));
606 lhq.key.length = sizeof(uint32_t);
607 lhq.key.start = (u_char *) &fmsg->port_msg.stream;
608 lhq.proto = &lvlhsh_frag_proto;
609 lhq.replace = 0;
610 lhq.value = fmsg;
611 lhq.pool = port->mem_pool;
612
613 res = nxt_lvlhsh_insert(&port->frags, &lhq);
614
615 switch (res) {
616
617 case NXT_OK:
618 return fmsg;
619
620 case NXT_DECLINED:
621 nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
622 fmsg->port_msg.stream);
623 nxt_mp_free(port->mem_pool, fmsg);
624
625 return NULL;
626
627 default:
628 nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
629 fmsg->port_msg.stream);
630
631 nxt_mp_free(port->mem_pool, fmsg);
632
633 return NULL;
634
635 }
636}
637
638
639static nxt_port_recv_msg_t *
640nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream,
641 nxt_bool_t last)
642{
643 nxt_int_t res;
644 nxt_lvlhsh_query_t lhq;
645
646 nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", stream);
647
648 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(uint32_t));
649 lhq.key.length = sizeof(uint32_t);
650 lhq.key.start = (u_char *) &stream;
651 lhq.proto = &lvlhsh_frag_proto;
652 lhq.pool = port->mem_pool;
653
654 res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
655 nxt_lvlhsh_find(&port->frags, &lhq);
656
657 switch (res) {
658
659 case NXT_OK:
660 return lhq.value;
661
662 default:
663 nxt_log(task, NXT_LOG_WARN, "frag stream #%uD not found", stream);
664
665 return NULL;
666 }
667}
668
669
670static void
538nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
539 nxt_port_recv_msg_t *msg)
540{
671nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
672 nxt_port_recv_msg_t *msg)
673{
541 nxt_buf_t *b;
542 nxt_buf_t *orig_b;
674 nxt_buf_t *b, *orig_b;
675 nxt_port_recv_msg_t *fmsg;
543
544 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
545 nxt_log(task, NXT_LOG_CRIT,
546 "port %d: too small message:%uz", port->socket.fd, msg->size);
547 goto fail;
548 }
549
550 /* adjust size to actual buffer used size */
551 msg->size -= sizeof(nxt_port_msg_t);
552
553 b = orig_b = msg->buf;
554 b->mem.free += msg->size;
555
556 if (msg->port_msg.mmap) {
557 nxt_port_mmap_read(task, port, msg);
558 b = msg->buf;
559 }
560
676
677 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
678 nxt_log(task, NXT_LOG_CRIT,
679 "port %d: too small message:%uz", port->socket.fd, msg->size);
680 goto fail;
681 }
682
683 /* adjust size to actual buffer used size */
684 msg->size -= sizeof(nxt_port_msg_t);
685
686 b = orig_b = msg->buf;
687 b->mem.free += msg->size;
688
689 if (msg->port_msg.mmap) {
690 nxt_port_mmap_read(task, port, msg);
691 b = msg->buf;
692 }
693
561 port->handler(task, msg);
694 if (nxt_slow_path(msg->port_msg.nf != 0)) {
695 fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream,
696 msg->port_msg.mf == 0);
562
697
698 if (nxt_slow_path(fmsg == NULL)) {
699 nxt_assert(fmsg != NULL);
700 }
701
702 nxt_buf_chain_add(&fmsg->buf, msg->buf);
703
704 fmsg->size += msg->size;
705
706 msg->buf = NULL;
707 b = NULL;
708
709 if (nxt_fast_path(msg->port_msg.mf == 0)) {
710 b = fmsg->buf;
711
712 port->handler(task, fmsg);
713
714 msg->buf = fmsg->buf;
715 msg->fd = fmsg->fd;
716
717 nxt_mp_free(port->mem_pool, fmsg);
718 }
719 } else {
720 if (nxt_slow_path(msg->port_msg.mf != 0)) {
721 fmsg = nxt_port_frag_start(task, port, msg);
722
723 if (nxt_slow_path(fmsg == NULL)) {
724 nxt_assert(fmsg != NULL);
725 }
726
727 fmsg->port_msg.nf = 0;
728 fmsg->port_msg.mf = 0;
729
730 msg->buf = NULL;
731 msg->fd = -1;
732 b = NULL;
733 } else {
734 port->handler(task, msg);
735 }
736 }
737
563 if (msg->port_msg.mmap && orig_b != b) {
564
565 /*
566 * To disable instant buffer completion,
567 * handler should reset 'msg->buf'.
568 */
569 if (msg->buf == b) {
570 /* complete mmap buffers */

--- 100 unchanged lines hidden ---
738 if (msg->port_msg.mmap && orig_b != b) {
739
740 /*
741 * To disable instant buffer completion,
742 * handler should reset 'msg->buf'.
743 */
744 if (msg->buf == b) {
745 /* complete mmap buffers */

--- 100 unchanged lines hidden ---