nxt_port_socket.c (1004:306ceaf8927d) nxt_port_socket.c (1005:7000543fffde)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 624 unchanged lines hidden (view full) ---

633
634 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
635 nxt_port_error_handler, task, &port->socket, NULL);
636 return;
637 }
638}
639
640
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 624 unchanged lines hidden (view full) ---

633
634 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
635 nxt_port_error_handler, task, &port->socket, NULL);
636 return;
637 }
638}
639
640
641typedef struct {
642 uint32_t stream;
643 uint32_t pid;
644} nxt_port_frag_key_t;
645
646
641static nxt_int_t
642nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
643{
644 nxt_port_recv_msg_t *fmsg;
647static nxt_int_t
648nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
649{
650 nxt_port_recv_msg_t *fmsg;
651 nxt_port_frag_key_t *frag_key;
645
646 fmsg = data;
652
653 fmsg = data;
654 frag_key = (nxt_port_frag_key_t *) lhq->key.start;
647
655
648 if (lhq->key.length == sizeof(uint32_t)
649 && *(uint32_t *) lhq->key.start == fmsg->port_msg.stream)
656 if (lhq->key.length == sizeof(nxt_port_frag_key_t)
657 && frag_key->stream == fmsg->port_msg.stream
658 && frag_key->pid == (uint32_t) fmsg->port_msg.pid)
650 {
651 return NXT_OK;
652 }
653
654 return NXT_DECLINED;
655}
656
657

--- 21 unchanged lines hidden (view full) ---

679
680static nxt_port_recv_msg_t *
681nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
682 nxt_port_recv_msg_t *msg)
683{
684 nxt_int_t res;
685 nxt_lvlhsh_query_t lhq;
686 nxt_port_recv_msg_t *fmsg;
659 {
660 return NXT_OK;
661 }
662
663 return NXT_DECLINED;
664}
665
666

--- 21 unchanged lines hidden (view full) ---

688
689static nxt_port_recv_msg_t *
690nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
691 nxt_port_recv_msg_t *msg)
692{
693 nxt_int_t res;
694 nxt_lvlhsh_query_t lhq;
695 nxt_port_recv_msg_t *fmsg;
696 nxt_port_frag_key_t frag_key;
687
688 nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
689
690 fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
691
692 if (nxt_slow_path(fmsg == NULL)) {
693 return NULL;
694 }
695
696 *fmsg = *msg;
697
697
698 nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
699
700 fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
701
702 if (nxt_slow_path(fmsg == NULL)) {
703 return NULL;
704 }
705
706 *fmsg = *msg;
707
698 lhq.key_hash = nxt_murmur_hash2(&fmsg->port_msg.stream, sizeof(uint32_t));
699 lhq.key.length = sizeof(uint32_t);
700 lhq.key.start = (u_char *) &fmsg->port_msg.stream;
708 frag_key.stream = fmsg->port_msg.stream;
709 frag_key.pid = fmsg->port_msg.pid;
710
711 lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
712 lhq.key.length = sizeof(nxt_port_frag_key_t);
713 lhq.key.start = (u_char *) &frag_key;
701 lhq.proto = &lvlhsh_frag_proto;
702 lhq.replace = 0;
703 lhq.value = fmsg;
704 lhq.pool = port->mem_pool;
705
706 res = nxt_lvlhsh_insert(&port->frags, &lhq);
707
708 switch (res) {

--- 16 unchanged lines hidden (view full) ---

725
726 return NULL;
727
728 }
729}
730
731
732static nxt_port_recv_msg_t *
714 lhq.proto = &lvlhsh_frag_proto;
715 lhq.replace = 0;
716 lhq.value = fmsg;
717 lhq.pool = port->mem_pool;
718
719 res = nxt_lvlhsh_insert(&port->frags, &lhq);
720
721 switch (res) {

--- 16 unchanged lines hidden (view full) ---

738
739 return NULL;
740
741 }
742}
743
744
745static nxt_port_recv_msg_t *
733nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream,
734 nxt_bool_t last)
746nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg)
735{
747{
736 nxt_int_t res;
737 nxt_lvlhsh_query_t lhq;
748 nxt_int_t res;
749 nxt_bool_t last;
750 nxt_lvlhsh_query_t lhq;
751 nxt_port_frag_key_t frag_key;
738
752
739 nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", stream);
753 last = msg->port_msg.mf == 0;
740
754
741 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(uint32_t));
742 lhq.key.length = sizeof(uint32_t);
743 lhq.key.start = (u_char *) &stream;
755 nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next",
756 msg->port_msg.stream);
757
758 frag_key.stream = msg->port_msg.stream;
759 frag_key.pid = msg->port_msg.pid;
760
761 lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
762 lhq.key.length = sizeof(nxt_port_frag_key_t);
763 lhq.key.start = (u_char *) &frag_key;
744 lhq.proto = &lvlhsh_frag_proto;
745 lhq.pool = port->mem_pool;
746
747 res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
748 nxt_lvlhsh_find(&port->frags, &lhq);
749
750 switch (res) {
751
752 case NXT_OK:
753 return lhq.value;
754
755 default:
764 lhq.proto = &lvlhsh_frag_proto;
765 lhq.pool = port->mem_pool;
766
767 res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
768 nxt_lvlhsh_find(&port->frags, &lhq);
769
770 switch (res) {
771
772 case NXT_OK:
773 return lhq.value;
774
775 default:
756 nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", stream);
776 nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found",
777 frag_key.stream);
757
758 return NULL;
759 }
760}
761
762
763static void
764nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,

--- 23 unchanged lines hidden (view full) ---

788 msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0;
789
790 } else {
791 msg->cancelled = 0;
792 }
793
794 if (nxt_slow_path(msg->port_msg.nf != 0)) {
795
778
779 return NULL;
780 }
781}
782
783
784static void
785nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,

--- 23 unchanged lines hidden (view full) ---

809 msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0;
810
811 } else {
812 msg->cancelled = 0;
813 }
814
815 if (nxt_slow_path(msg->port_msg.nf != 0)) {
816
796 fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream,
797 msg->port_msg.mf == 0);
817 fmsg = nxt_port_frag_find(task, port, msg);
798
799 if (nxt_slow_path(fmsg == NULL)) {
800 goto fmsg_failed;
801 }
802
803 if (nxt_fast_path(fmsg->cancelled == 0)) {
804
805 if (msg->port_msg.mmap) {

--- 173 unchanged lines hidden ---
818
819 if (nxt_slow_path(fmsg == NULL)) {
820 goto fmsg_failed;
821 }
822
823 if (nxt_fast_path(fmsg->cancelled == 0)) {
824
825 if (msg->port_msg.mmap) {

--- 173 unchanged lines hidden ---