nxt_port_socket.c (1125:f92f3cd41257) nxt_port_socket.c (1269:41331471eee7)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 464 unchanged lines hidden (view full) ---

473 return msg;
474}
475
476
477static nxt_buf_t *
478nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
479 size_t sent, nxt_bool_t mmap_mode)
480{
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 464 unchanged lines hidden (view full) ---

473 return msg;
474}
475
476
477static nxt_buf_t *
478nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
479 size_t sent, nxt_bool_t mmap_mode)
480{
481 size_t size;
481 size_t size;
482 nxt_buf_t *next;
482
483 while (b != NULL) {
484
485 nxt_prefetch(b->next);
486
487 if (!nxt_buf_is_sync(b)) {
488
489 size = nxt_buf_used_size(b);

--- 33 unchanged lines hidden (view full) ---

523 }
524
525 sent -= size;
526 }
527 }
528
529 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
530
483
484 while (b != NULL) {
485
486 nxt_prefetch(b->next);
487
488 if (!nxt_buf_is_sync(b)) {
489
490 size = nxt_buf_used_size(b);

--- 33 unchanged lines hidden (view full) ---

524 }
525
526 sent -= size;
527 }
528 }
529
530 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
531
531 b = b->next;
532 next = b->next;
533 b->next = NULL;
534 b = next;
532 }
533
534 return b;
535}
536
537
538static nxt_port_send_msg_t *
539nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg)

--- 251 unchanged lines hidden (view full) ---

791 }
792}
793
794
795static void
796nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
797 nxt_port_recv_msg_t *msg)
798{
535 }
536
537 return b;
538}
539
540
541static nxt_port_send_msg_t *
542nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg)

--- 251 unchanged lines hidden (view full) ---

794 }
795}
796
797
798static void
799nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
800 nxt_port_recv_msg_t *msg)
801{
799 nxt_buf_t *b, *orig_b;
802 nxt_buf_t *b, *orig_b, *next;
800 nxt_port_recv_msg_t *fmsg;
801
802 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
803 nxt_alert(task, "port %d: too small message:%uz",
804 port->socket.fd, msg->size);
805
806 if (msg->fd != -1) {
807 nxt_fd_close(msg->fd);

--- 102 unchanged lines hidden (view full) ---

910 if (msg->port_msg.mmap && orig_b != b) {
911
912 /*
913 * To disable instant buffer completion,
914 * handler should reset 'msg->buf'.
915 */
916 if (msg->buf == b) {
917 /* complete mmap buffers */
803 nxt_port_recv_msg_t *fmsg;
804
805 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
806 nxt_alert(task, "port %d: too small message:%uz",
807 port->socket.fd, msg->size);
808
809 if (msg->fd != -1) {
810 nxt_fd_close(msg->fd);

--- 102 unchanged lines hidden (view full) ---

913 if (msg->port_msg.mmap && orig_b != b) {
914
915 /*
916 * To disable instant buffer completion,
917 * handler should reset 'msg->buf'.
918 */
919 if (msg->buf == b) {
920 /* complete mmap buffers */
918 for (; b != NULL; b = b->next) {
921 while (b != NULL) {
919 nxt_debug(task, "complete buffer %p", b);
920
921 nxt_work_queue_add(port->socket.read_work_queue,
922 b->completion_handler, task, b, b->parent);
922 nxt_debug(task, "complete buffer %p", b);
923
924 nxt_work_queue_add(port->socket.read_work_queue,
925 b->completion_handler, task, b, b->parent);
926
927 next = b->next;
928 b->next = NULL;
929 b = next;
923 }
924 }
925
926 /* restore original buf */
927 msg->buf = orig_b;
928 }
929}
930

--- 28 unchanged lines hidden (view full) ---

959 port->free_bufs = b;
960}
961
962
963static void
964nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
965{
966 int use_delta;
930 }
931 }
932
933 /* restore original buf */
934 msg->buf = orig_b;
935 }
936}
937

--- 28 unchanged lines hidden (view full) ---

966 port->free_bufs = b;
967}
968
969
970static void
971nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
972{
973 int use_delta;
967 nxt_buf_t *b;
974 nxt_buf_t *b, *next;
968 nxt_port_t *port;
969 nxt_work_queue_t *wq;
970 nxt_port_send_msg_t *msg;
971
972 nxt_debug(task, "port error handler %p", obj);
973 /* TODO */
974
975 port = nxt_container_of(obj, nxt_port_t, socket);

--- 5 unchanged lines hidden (view full) ---

981 }
982
983 wq = &task->thread->engine->fast_work_queue;
984
985 nxt_thread_mutex_lock(&port->write_mutex);
986
987 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
988
975 nxt_port_t *port;
976 nxt_work_queue_t *wq;
977 nxt_port_send_msg_t *msg;
978
979 nxt_debug(task, "port error handler %p", obj);
980 /* TODO */
981
982 port = nxt_container_of(obj, nxt_port_t, socket);

--- 5 unchanged lines hidden (view full) ---

988 }
989
990 wq = &task->thread->engine->fast_work_queue;
991
992 nxt_thread_mutex_lock(&port->write_mutex);
993
994 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
995
989 for (b = msg->buf; b != NULL; b = b->next) {
996 for (b = msg->buf; b != NULL; b = next) {
997 next = b->next;
998 b->next = NULL;
999
990 if (nxt_buf_is_sync(b)) {
991 continue;
992 }
993
994 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
995 }
996
997 nxt_queue_remove(&msg->link);

--- 12 unchanged lines hidden ---
1000 if (nxt_buf_is_sync(b)) {
1001 continue;
1002 }
1003
1004 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
1005 }
1006
1007 nxt_queue_remove(&msg->link);

--- 12 unchanged lines hidden ---