Deleted
Added
nxt_port_socket.c (1125:f92f3cd41257) | nxt_port_socket.c (1269:41331471eee7) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 --- 464 unchanged lines hidden (view full) --- 473 return msg; 474} 475 476 477static nxt_buf_t * 478nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, 479 size_t sent, nxt_bool_t mmap_mode) 480{ | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 --- 464 unchanged lines hidden (view full) --- 473 return msg; 474} 475 476 477static nxt_buf_t * 478nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, 479 size_t sent, nxt_bool_t mmap_mode) 480{ |
481 size_t size; | 481 size_t size; 482 nxt_buf_t *next; |
482 483 while (b != NULL) { 484 485 nxt_prefetch(b->next); 486 487 if (!nxt_buf_is_sync(b)) { 488 489 size = nxt_buf_used_size(b); --- 33 unchanged lines hidden (view full) --- 523 } 524 525 sent -= size; 526 } 527 } 528 529 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 530 | 483 484 while (b != NULL) { 485 486 nxt_prefetch(b->next); 487 488 if (!nxt_buf_is_sync(b)) { 489 490 size = nxt_buf_used_size(b); --- 33 unchanged lines hidden (view full) --- 524 } 525 526 sent -= size; 527 } 528 } 529 530 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 531 |
531 b = b->next; | 532 next = b->next; 533 b->next = NULL; 534 b = next; |
532 } 533 534 return b; 535} 536 537 538static nxt_port_send_msg_t * 539nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg) --- 251 unchanged lines hidden (view full) --- 791 } 792} 793 794 795static void 796nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 797 nxt_port_recv_msg_t *msg) 798{ | 535 } 536 537 return b; 538} 539 540 541static nxt_port_send_msg_t * 542nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg) --- 251 unchanged lines hidden (view full) --- 794 } 795} 796 797 798static void 799nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 800 nxt_port_recv_msg_t *msg) 801{ |
799 nxt_buf_t *b, *orig_b; | 802 nxt_buf_t *b, *orig_b, *next; |
800 nxt_port_recv_msg_t *fmsg; 801 802 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { 803 nxt_alert(task, "port %d: too small message:%uz", 804 port->socket.fd, msg->size); 805 806 if (msg->fd != -1) { 807 nxt_fd_close(msg->fd); --- 102 unchanged lines hidden (view full) --- 910 if (msg->port_msg.mmap && orig_b != b) { 911 912 /* 913 * To disable instant buffer completion, 914 * handler should reset 'msg->buf'. 915 */ 916 if (msg->buf == b) { 917 /* complete mmap buffers */ | 803 nxt_port_recv_msg_t *fmsg; 804 805 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { 806 nxt_alert(task, "port %d: too small message:%uz", 807 port->socket.fd, msg->size); 808 809 if (msg->fd != -1) { 810 nxt_fd_close(msg->fd); --- 102 unchanged lines hidden (view full) --- 913 if (msg->port_msg.mmap && orig_b != b) { 914 915 /* 916 * To disable instant buffer completion, 917 * handler should reset 'msg->buf'. 918 */ 919 if (msg->buf == b) { 920 /* complete mmap buffers */ |
918 for (; b != NULL; b = b->next) { | 921 while (b != NULL) { |
919 nxt_debug(task, "complete buffer %p", b); 920 921 nxt_work_queue_add(port->socket.read_work_queue, 922 b->completion_handler, task, b, b->parent); | 922 nxt_debug(task, "complete buffer %p", b); 923 924 nxt_work_queue_add(port->socket.read_work_queue, 925 b->completion_handler, task, b, b->parent); |
926 927 next = b->next; 928 b->next = NULL; 929 b = next; |
|
923 } 924 } 925 926 /* restore original buf */ 927 msg->buf = orig_b; 928 } 929} 930 --- 28 unchanged lines hidden (view full) --- 959 port->free_bufs = b; 960} 961 962 963static void 964nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 965{ 966 int use_delta; | 930 } 931 } 932 933 /* restore original buf */ 934 msg->buf = orig_b; 935 } 936} 937 --- 28 unchanged lines hidden (view full) --- 966 port->free_bufs = b; 967} 968 969 970static void 971nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 972{ 973 int use_delta; |
967 nxt_buf_t *b; | 974 nxt_buf_t *b, *next; |
968 nxt_port_t *port; 969 nxt_work_queue_t *wq; 970 nxt_port_send_msg_t *msg; 971 972 nxt_debug(task, "port error handler %p", obj); 973 /* TODO */ 974 975 port = nxt_container_of(obj, nxt_port_t, socket); --- 5 unchanged lines hidden (view full) --- 981 } 982 983 wq = &task->thread->engine->fast_work_queue; 984 985 nxt_thread_mutex_lock(&port->write_mutex); 986 987 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { 988 | 975 nxt_port_t *port; 976 nxt_work_queue_t *wq; 977 nxt_port_send_msg_t *msg; 978 979 nxt_debug(task, "port error handler %p", obj); 980 /* TODO */ 981 982 port = nxt_container_of(obj, nxt_port_t, socket); --- 5 unchanged lines hidden (view full) --- 988 } 989 990 wq = &task->thread->engine->fast_work_queue; 991 992 nxt_thread_mutex_lock(&port->write_mutex); 993 994 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { 995 |
989 for (b = msg->buf; b != NULL; b = b->next) { | 996 for (b = msg->buf; b != NULL; b = next) { 997 next = b->next; 998 b->next = NULL; 999 |
990 if (nxt_buf_is_sync(b)) { 991 continue; 992 } 993 994 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 995 } 996 997 nxt_queue_remove(&msg->link); --- 12 unchanged lines hidden --- | 1000 if (nxt_buf_is_sync(b)) { 1001 continue; 1002 } 1003 1004 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 1005 } 1006 1007 nxt_queue_remove(&msg->link); --- 12 unchanged lines hidden --- |