nxt_kqueue_engine.c (56:92b4984ca3c1) nxt_kqueue_engine.c (62:5e1efcc7b740)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 64 unchanged lines hidden (view full) ---

73 nxt_fd_event_t *ev);
74static void nxt_kqueue_oneshot_read(nxt_event_engine_t *engine,
75 nxt_fd_event_t *ev);
76static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine,
77 nxt_fd_event_t *ev);
78static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine,
79 nxt_fd_event_t *ev);
80static void nxt_kqueue_enable_file(nxt_event_engine_t *engine,
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 64 unchanged lines hidden (view full) ---

73 nxt_fd_event_t *ev);
74static void nxt_kqueue_oneshot_read(nxt_event_engine_t *engine,
75 nxt_fd_event_t *ev);
76static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine,
77 nxt_fd_event_t *ev);
78static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine,
79 nxt_fd_event_t *ev);
80static void nxt_kqueue_enable_file(nxt_event_engine_t *engine,
81 nxt_event_file_t *ev);
81 nxt_file_event_t *ev);
82static void nxt_kqueue_close_file(nxt_event_engine_t *engine,
82static void nxt_kqueue_close_file(nxt_event_engine_t *engine,
83 nxt_event_file_t *ev);
83 nxt_file_event_t *ev);
84static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
85 nxt_int_t filter, nxt_uint_t flags);
86static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine);
87static void nxt_kqueue_error(nxt_event_engine_t *engine);
88static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj,
89 void *data);
90static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj,
91 void *data);
92static nxt_int_t nxt_kqueue_add_signal(nxt_event_engine_t *engine,
93 const nxt_sig_event_t *sigev);
94#if (NXT_HAVE_EVFILT_USER)
95static nxt_int_t nxt_kqueue_enable_post(nxt_event_engine_t *engine,
96 nxt_work_handler_t handler);
97static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
98#endif
99static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
100
84static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
85 nxt_int_t filter, nxt_uint_t flags);
86static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine);
87static void nxt_kqueue_error(nxt_event_engine_t *engine);
88static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj,
89 void *data);
90static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj,
91 void *data);
92static nxt_int_t nxt_kqueue_add_signal(nxt_event_engine_t *engine,
93 const nxt_sig_event_t *sigev);
94#if (NXT_HAVE_EVFILT_USER)
95static nxt_int_t nxt_kqueue_enable_post(nxt_event_engine_t *engine,
96 nxt_work_handler_t handler);
97static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
98#endif
99static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
100
101static void nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj,
101static void nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj,
102 void *data);
102 void *data);
103static void nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj,
103static void nxt_kqueue_conn_connected(nxt_task_t *task, void *obj,
104 void *data);
105static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data);
104 void *data);
105static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data);
106static void nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj,
106static void nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj,
107 void *data);
107 void *data);
108static void nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj,
108static void nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj,
109 void *data);
109 void *data);
110static ssize_t nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c,
111 nxt_buf_t *b);
110static ssize_t nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
112
113
111
112
114static nxt_event_conn_io_t nxt_kqueue_event_conn_io = {
115 nxt_kqueue_event_conn_io_connect,
116 nxt_kqueue_event_conn_io_accept,
113static nxt_conn_io_t nxt_kqueue_conn_io = {
114 nxt_kqueue_conn_io_connect,
115 nxt_kqueue_conn_io_accept,
117
116
118 nxt_kqueue_event_conn_io_read,
119 nxt_kqueue_event_conn_io_recvbuf,
120 nxt_event_conn_io_recv,
117 nxt_kqueue_conn_io_read,
118 nxt_kqueue_conn_io_recvbuf,
119 nxt_conn_io_recv,
121
122 nxt_conn_io_write,
123 nxt_event_conn_io_write_chunk,
124
125#if (NXT_HAVE_FREEBSD_SENDFILE)
126 nxt_freebsd_event_conn_io_sendfile,
127#elif (NXT_HAVE_MACOSX_SENDFILE)
128 nxt_macosx_event_conn_io_sendfile,
129#else
130 nxt_event_conn_io_sendbuf,
131#endif
132
133 nxt_event_conn_io_writev,
134 nxt_event_conn_io_send,
135
120
121 nxt_conn_io_write,
122 nxt_event_conn_io_write_chunk,
123
124#if (NXT_HAVE_FREEBSD_SENDFILE)
125 nxt_freebsd_event_conn_io_sendfile,
126#elif (NXT_HAVE_MACOSX_SENDFILE)
127 nxt_macosx_event_conn_io_sendfile,
128#else
129 nxt_event_conn_io_sendbuf,
130#endif
131
132 nxt_event_conn_io_writev,
133 nxt_event_conn_io_send,
134
136 nxt_event_conn_io_shutdown,
135 nxt_conn_io_shutdown,
137};
138
139
140const nxt_event_interface_t nxt_kqueue_engine = {
141 "kqueue",
142 nxt_kqueue_create,
143 nxt_kqueue_free,
144 nxt_kqueue_enable,

--- 15 unchanged lines hidden (view full) ---

160 nxt_kqueue_enable_post,
161 nxt_kqueue_signal,
162#else
163 NULL,
164 NULL,
165#endif
166 nxt_kqueue_poll,
167
136};
137
138
139const nxt_event_interface_t nxt_kqueue_engine = {
140 "kqueue",
141 nxt_kqueue_create,
142 nxt_kqueue_free,
143 nxt_kqueue_enable,

--- 15 unchanged lines hidden (view full) ---

159 nxt_kqueue_enable_post,
160 nxt_kqueue_signal,
161#else
162 NULL,
163 NULL,
164#endif
165 nxt_kqueue_poll,
166
168 &nxt_kqueue_event_conn_io,
167 &nxt_kqueue_conn_io,
169
170 NXT_FILE_EVENTS,
171 NXT_SIGNAL_EVENTS,
172};
173
174
175static nxt_int_t
176nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,

--- 232 unchanged lines hidden (view full) ---

409 ev->read = NXT_EVENT_ACTIVE;
410 ev->read_handler = nxt_kqueue_listen_handler;
411
412 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE);
413}
414
415
416static void
168
169 NXT_FILE_EVENTS,
170 NXT_SIGNAL_EVENTS,
171};
172
173
174static nxt_int_t
175nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,

--- 232 unchanged lines hidden (view full) ---

408 ev->read = NXT_EVENT_ACTIVE;
409 ev->read_handler = nxt_kqueue_listen_handler;
410
411 nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE);
412}
413
414
415static void
417nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_event_file_t *ev)
416nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_file_event_t *ev)
418{
419 struct kevent *kev;
420
421 const nxt_int_t flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
422 const nxt_uint_t fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND
423 | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE;
424
425 nxt_debug(&engine->task, "kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD",

--- 6 unchanged lines hidden (view full) ---

432 kev->flags = flags;
433 kev->fflags = fflags;
434 kev->data = 0;
435 kev->udata = nxt_kevent_set_udata(ev);
436}
437
438
439static void
417{
418 struct kevent *kev;
419
420 const nxt_int_t flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
421 const nxt_uint_t fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND
422 | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE;
423
424 nxt_debug(&engine->task, "kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD",

--- 6 unchanged lines hidden (view full) ---

431 kev->flags = flags;
432 kev->fflags = fflags;
433 kev->data = 0;
434 kev->udata = nxt_kevent_set_udata(ev);
435}
436
437
438static void
440nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_event_file_t *ev)
439nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_file_event_t *ev)
441{
442 /* TODO: pending event. */
443}
444
445
446static void
447nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
448 nxt_int_t filter, nxt_uint_t flags)

--- 43 unchanged lines hidden (view full) ---

492}
493
494
495static void
496nxt_kqueue_error(nxt_event_engine_t *engine)
497{
498 struct kevent *kev, *end;
499 nxt_fd_event_t *ev;
440{
441 /* TODO: pending event. */
442}
443
444
445static void
446nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
447 nxt_int_t filter, nxt_uint_t flags)

--- 43 unchanged lines hidden (view full) ---

491}
492
493
494static void
495nxt_kqueue_error(nxt_event_engine_t *engine)
496{
497 struct kevent *kev, *end;
498 nxt_fd_event_t *ev;
500 nxt_event_file_t *fev;
499 nxt_file_event_t *fev;
501 nxt_work_queue_t *wq;
502
503 wq = &engine->fast_work_queue;
504 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges];
505
506 for (kev = engine->u.kqueue.changes; kev < end; kev++) {
507
508 switch (kev->filter) {

--- 37 unchanged lines hidden (view full) ---

546
547 ev->error_handler(task, ev, data);
548}
549
550
551static void
552nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data)
553{
500 nxt_work_queue_t *wq;
501
502 wq = &engine->fast_work_queue;
503 end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges];
504
505 for (kev = engine->u.kqueue.changes; kev < end; kev++) {
506
507 switch (kev->filter) {

--- 37 unchanged lines hidden (view full) ---

545
546 ev->error_handler(task, ev, data);
547}
548
549
550static void
551nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data)
552{
554 nxt_event_file_t *ev;
553 nxt_file_event_t *ev;
555
556 ev = obj;
557
558 nxt_debug(task, "kqueue file error handler fd:%d", ev->file->fd);
559
560 ev->handler(task, ev, data);
561}
562

--- 110 unchanged lines hidden (view full) ---

673 nxt_err_t err;
674 nxt_uint_t level;
675 nxt_bool_t error, eof;
676 nxt_task_t *task;
677 struct kevent *kev;
678 nxt_fd_event_t *ev;
679 nxt_sig_event_t *sigev;
680 struct timespec ts, *tp;
554
555 ev = obj;
556
557 nxt_debug(task, "kqueue file error handler fd:%d", ev->file->fd);
558
559 ev->handler(task, ev, data);
560}
561

--- 110 unchanged lines hidden (view full) ---

672 nxt_err_t err;
673 nxt_uint_t level;
674 nxt_bool_t error, eof;
675 nxt_task_t *task;
676 struct kevent *kev;
677 nxt_fd_event_t *ev;
678 nxt_sig_event_t *sigev;
679 struct timespec ts, *tp;
681 nxt_event_file_t *fev;
680 nxt_file_event_t *fev;
682 nxt_work_queue_t *wq;
683 nxt_work_handler_t handler;
684
685 if (timeout == NXT_INFINITE_MSEC) {
686 tp = NULL;
687
688 } else {
689 ts.tv_sec = timeout / 1000;

--- 155 unchanged lines hidden (view full) ---

845
846
847/*
848 * nxt_kqueue_event_conn_io_connect() eliminates the
849 * getsockopt() syscall to test pending connect() error.
850 */
851
852static void
681 nxt_work_queue_t *wq;
682 nxt_work_handler_t handler;
683
684 if (timeout == NXT_INFINITE_MSEC) {
685 tp = NULL;
686
687 } else {
688 ts.tv_sec = timeout / 1000;

--- 155 unchanged lines hidden (view full) ---

844
845
846/*
847 * nxt_kqueue_event_conn_io_connect() eliminates the
848 * getsockopt() syscall to test pending connect() error.
849 */
850
851static void
853nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
852nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, void *data)
854{
853{
855 nxt_event_conn_t *c;
854 nxt_conn_t *c;
856 nxt_event_engine_t *engine;
857 nxt_work_handler_t handler;
858 const nxt_event_conn_state_t *state;
859
860 c = obj;
861
862 state = c->write_state;
863
864 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){
865
866 case NXT_OK:
867 c->socket.write_ready = 1;
868 handler = state->ready_handler;
869 break;
870
871 case NXT_AGAIN:
855 nxt_event_engine_t *engine;
856 nxt_work_handler_t handler;
857 const nxt_event_conn_state_t *state;
858
859 c = obj;
860
861 state = c->write_state;
862
863 switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){
864
865 case NXT_OK:
866 c->socket.write_ready = 1;
867 handler = state->ready_handler;
868 break;
869
870 case NXT_AGAIN:
872 c->socket.write_handler = nxt_kqueue_event_conn_connected;
873 c->socket.error_handler = nxt_event_conn_connect_error;
871 c->socket.write_handler = nxt_kqueue_conn_connected;
872 c->socket.error_handler = nxt_conn_connect_error;
874
875 engine = task->thread->engine;
873
874 engine = task->thread->engine;
876 nxt_event_conn_timer(engine, c, state, &c->write_timer);
875 nxt_conn_timer(engine, c, state, &c->write_timer);
877
878 nxt_kqueue_enable_write(engine, &c->socket);
879 return;
880
881 case NXT_DECLINED:
882 handler = state->close_handler;
883 break;
884
885 default: /* NXT_ERROR */
886 handler = state->error_handler;
887 break;
888 }
889
890 nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
891}
892
893
894static void
876
877 nxt_kqueue_enable_write(engine, &c->socket);
878 return;
879
880 case NXT_DECLINED:
881 handler = state->close_handler;
882 break;
883
884 default: /* NXT_ERROR */
885 handler = state->error_handler;
886 break;
887 }
888
889 nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
890}
891
892
893static void
895nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data)
894nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, void *data)
896{
895{
897 nxt_event_conn_t *c;
896 nxt_conn_t *c;
898
899 c = obj;
900
897
898 c = obj;
899
901 nxt_debug(task, "kqueue event conn connected fd:%d", c->socket.fd);
900 nxt_debug(task, "kqueue conn connected fd:%d", c->socket.fd);
902
903 c->socket.write = NXT_EVENT_BLOCKED;
904
905 if (c->write_state->timer_autoreset) {
906 nxt_timer_disable(task->thread->engine, &c->write_timer);
907 }
908
909 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
910 task, c, data);
911}
912
913
914static void
915nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data)
916{
901
902 c->socket.write = NXT_EVENT_BLOCKED;
903
904 if (c->write_state->timer_autoreset) {
905 nxt_timer_disable(task->thread->engine, &c->write_timer);
906 }
907
908 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
909 task, c, data);
910}
911
912
913static void
914nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data)
915{
917 nxt_event_conn_listen_t *cls;
916 nxt_listen_event_t *lev;
918
917
919 cls = obj;
918 lev = obj;
920
921 nxt_debug(task, "kevent fd:%d avail:%D",
919
920 nxt_debug(task, "kevent fd:%d avail:%D",
922 cls->socket.fd, cls->socket.kq_available);
921 lev->socket.fd, lev->socket.kq_available);
923
922
924 cls->ready = nxt_min(cls->batch, (uint32_t) cls->socket.kq_available);
923 lev->ready = nxt_min(lev->batch, (uint32_t) lev->socket.kq_available);
925
924
926 nxt_kqueue_event_conn_io_accept(task, cls, data);
925 nxt_kqueue_conn_io_accept(task, lev, data);
927}
928
929
930static void
926}
927
928
929static void
931nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data)
930nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, void *data)
932{
931{
933 socklen_t len;
934 nxt_socket_t s;
935 struct sockaddr *sa;
936 nxt_event_conn_t *c;
937 nxt_event_conn_listen_t *cls;
932 socklen_t len;
933 nxt_conn_t *c;
934 nxt_socket_t s;
935 struct sockaddr *sa;
936 nxt_listen_event_t *lev;
938
937
939 cls = obj;
940 c = cls->next;
938 lev = obj;
939 c = lev->next;
941
940
942 cls->ready--;
943 cls->socket.read_ready = (cls->ready != 0);
941 lev->ready--;
942 lev->socket.read_ready = (lev->ready != 0);
944
943
945 cls->socket.kq_available--;
946 cls->socket.read_ready = (cls->socket.kq_available != 0);
944 lev->socket.kq_available--;
945 lev->socket.read_ready = (lev->socket.kq_available != 0);
947
948 len = c->remote->socklen;
949
950 if (len >= sizeof(struct sockaddr)) {
951 sa = &c->remote->u.sockaddr;
952
953 } else {
954 sa = NULL;
955 len = 0;
956 }
957
946
947 len = c->remote->socklen;
948
949 if (len >= sizeof(struct sockaddr)) {
950 sa = &c->remote->u.sockaddr;
951
952 } else {
953 sa = NULL;
954 len = 0;
955 }
956
958 s = accept(cls->socket.fd, sa, &len);
957 s = accept(lev->socket.fd, sa, &len);
959
960 if (s != -1) {
961 c->socket.fd = s;
962
958
959 if (s != -1) {
960 c->socket.fd = s;
961
963 nxt_debug(task, "accept(%d): %d", cls->socket.fd, s);
962 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
964
963
965 nxt_event_conn_accept(task, cls, c);
964 nxt_conn_accept(task, lev, c);
966 return;
967 }
968
965 return;
966 }
967
969 nxt_event_conn_accept_error(task, cls, "accept", nxt_errno);
968 nxt_conn_accept_error(task, lev, "accept", nxt_errno);
970}
971
972
973/*
969}
970
971
972/*
974 * nxt_kqueue_event_conn_io_read() is just a wrapper to eliminate the
973 * nxt_kqueue_conn_io_read() is just a wrapper to eliminate the
975 * readv() or recv() syscall if a remote side just closed connection.
976 */
977
978static void
974 * readv() or recv() syscall if a remote side just closed connection.
975 */
976
977static void
979nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
978nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, void *data)
980{
979{
981 nxt_event_conn_t *c;
980 nxt_conn_t *c;
982
983 c = obj;
984
981
982 c = obj;
983
985 nxt_debug(task, "kqueue event conn read fd:%d", c->socket.fd);
984 nxt_debug(task, "kqueue conn read fd:%d", c->socket.fd);
986
987 if (c->socket.kq_available == 0 && c->socket.kq_eof) {
988 nxt_debug(task, "kevent fd:%d eof", c->socket.fd);
989
990 c->socket.closed = 1;
991 nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler,
992 task, c, data);
993 return;
994 }
995
985
986 if (c->socket.kq_available == 0 && c->socket.kq_eof) {
987 nxt_debug(task, "kevent fd:%d eof", c->socket.fd);
988
989 c->socket.closed = 1;
990 nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler,
991 task, c, data);
992 return;
993 }
994
996 nxt_event_conn_io_read(task, c, data);
995 nxt_conn_io_read(task, c, data);
997}
998
999
1000/*
996}
997
998
999/*
1001 * nxt_kqueue_event_conn_io_recvbuf() is just wrapper around standard
1002 * nxt_event_conn_io_recvbuf() to eliminate the readv() or recv() syscalls
1000 * nxt_kqueue_conn_io_recvbuf() is just wrapper around standard
1001 * nxt_conn_io_recvbuf() to eliminate the readv() or recv() syscalls
1003 * if there is no pending data or a remote side closed connection.
1004 */
1005
1006static ssize_t
1002 * if there is no pending data or a remote side closed connection.
1003 */
1004
1005static ssize_t
1007nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b)
1006nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
1008{
1009 ssize_t n;
1010
1011 if (c->socket.kq_available == 0 && c->socket.kq_eof) {
1012 c->socket.closed = 1;
1013 return 0;
1014 }
1015
1007{
1008 ssize_t n;
1009
1010 if (c->socket.kq_available == 0 && c->socket.kq_eof) {
1011 c->socket.closed = 1;
1012 return 0;
1013 }
1014
1016 n = nxt_event_conn_io_recvbuf(c, b);
1015 n = nxt_conn_io_recvbuf(c, b);
1017
1018 if (n > 0) {
1019 c->socket.kq_available -= n;
1020
1021 if (c->socket.kq_available < 0) {
1022 c->socket.kq_available = 0;
1023 }
1024
1025 nxt_debug(c->socket.task, "kevent fd:%d avail:%D eof:%d",
1026 c->socket.fd, c->socket.kq_available, c->socket.kq_eof);
1027
1028 c->socket.read_ready = (c->socket.kq_available != 0
1029 || c->socket.kq_eof);
1030 }
1031
1032 return n;
1033}
1016
1017 if (n > 0) {
1018 c->socket.kq_available -= n;
1019
1020 if (c->socket.kq_available < 0) {
1021 c->socket.kq_available = 0;
1022 }
1023
1024 nxt_debug(c->socket.task, "kevent fd:%d avail:%D eof:%d",
1025 c->socket.fd, c->socket.kq_available, c->socket.kq_eof);
1026
1027 c->socket.read_ready = (c->socket.kq_available != 0
1028 || c->socket.kq_eof);
1029 }
1030
1031 return n;
1032}