nxt_conn_read.c (766:3ef71e325c4a) nxt_conn_read.c (771:f349b2d68e75)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 25 unchanged lines hidden (view full) ---

34}
35
36
37void
38nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
39{
40 ssize_t n;
41 nxt_conn_t *c;
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 25 unchanged lines hidden (view full) ---

34}
35
36
37void
38nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
39{
40 ssize_t n;
41 nxt_conn_t *c;
42 nxt_work_queue_t *wq;
43 nxt_event_engine_t *engine;
44 nxt_work_handler_t handler;
45 const nxt_conn_state_t *state;
46
47 c = obj;
48
49 nxt_debug(task, "conn read fd:%d rdy:%d cl:%d",
50 c->socket.fd, c->socket.read_ready, c->socket.closed);
51
52 engine = task->thread->engine;
53
42 nxt_event_engine_t *engine;
43 nxt_work_handler_t handler;
44 const nxt_conn_state_t *state;
45
46 c = obj;
47
48 nxt_debug(task, "conn read fd:%d rdy:%d cl:%d",
49 c->socket.fd, c->socket.read_ready, c->socket.closed);
50
51 engine = task->thread->engine;
52
53 /*
54 * Here c->io->read() is assigned instead of direct nxt_conn_io_read()
55 * because the function can be called by nxt_kqueue_conn_io_read().
56 */
57 c->socket.read_handler = c->io->read;
54 state = c->read_state;
58 state = c->read_state;
59 c->socket.error_handler = state->error_handler;
55
56 if (c->socket.read_ready) {
57
58 if (state->io_read_handler == NULL) {
59 n = c->io->recvbuf(c, c->read);
60
61 } else {
62 n = state->io_read_handler(c);

--- 7 unchanged lines hidden (view full) ---

70 nxt_recvbuf_update(c->read, n);
71
72 nxt_fd_event_block_read(engine, &c->socket);
73
74 if (state->timer_autoreset) {
75 nxt_timer_disable(engine, &c->read_timer);
76 }
77
60
61 if (c->socket.read_ready) {
62
63 if (state->io_read_handler == NULL) {
64 n = c->io->recvbuf(c, c->read);
65
66 } else {
67 n = state->io_read_handler(c);

--- 7 unchanged lines hidden (view full) ---

75 nxt_recvbuf_update(c->read, n);
76
77 nxt_fd_event_block_read(engine, &c->socket);
78
79 if (state->timer_autoreset) {
80 nxt_timer_disable(engine, &c->read_timer);
81 }
82
78 wq = c->read_work_queue;
79 handler = state->ready_handler;
80
81 nxt_work_queue_add(wq, handler, task, c, data);
82
83 nxt_work_queue_add(c->read_work_queue,
84 state->ready_handler, task, c, data);
83 return;
84 }
85
86 if (n != NXT_AGAIN) {
85 return;
86 }
87
88 if (n != NXT_AGAIN) {
89 /* n == 0 or n == NXT_ERROR. */
90 handler = (n == 0) ? state->close_handler : state->error_handler;
91
87 nxt_fd_event_block_read(engine, &c->socket);
88 nxt_timer_disable(engine, &c->read_timer);
89
92 nxt_fd_event_block_read(engine, &c->socket);
93 nxt_timer_disable(engine, &c->read_timer);
94
90 wq = &engine->fast_work_queue;
95 nxt_work_queue_add(&engine->fast_work_queue,
96 handler, task, c, data);
97 return;
98 }
91
99
92 handler = (n == 0) ? state->close_handler : state->error_handler;
100 /* n == NXT_AGAIN. */
93
101
94 nxt_work_queue_add(wq, handler, task, c, data);
102 if (c->socket.read_ready) {
103 /*
104 * SSL/TLS library can return NXT_AGAIN if renegotiation
105 * occured during read operation, it toggled write event
106 * internally so only read timer should be set.
107 */
108 if (c->read_timer.state == NXT_TIMER_DISABLED) {
109 nxt_conn_timer(engine, c, state, &c->read_timer);
110 }
95
96 return;
97 }
98 }
99
111
112 return;
113 }
114 }
115
100 /*
101 * Here c->io->read() is assigned instead of direct nxt_conn_io_read()
102 * because the function can be called by nxt_kqueue_conn_io_read().
103 */
104 c->socket.read_handler = c->io->read;
105 c->socket.error_handler = state->error_handler;
106
107 if (nxt_fd_event_is_disabled(c->socket.read)) {
108 nxt_fd_event_enable_read(engine, &c->socket);
109 }
110
111 if (state->timer_autoreset || c->read_timer.state == NXT_TIMER_DISABLED) {
112 nxt_conn_timer(engine, c, state, &c->read_timer);
113 }
114}

--- 75 unchanged lines hidden (view full) ---

190 n = recv(c->socket.fd, buf, size, flags);
191
192 err = (n == -1) ? nxt_socket_errno : 0;
193
194 nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z",
195 c->socket.fd, buf, size, flags, n);
196
197 if (n > 0) {
116 if (nxt_fd_event_is_disabled(c->socket.read)) {
117 nxt_fd_event_enable_read(engine, &c->socket);
118 }
119
120 if (state->timer_autoreset || c->read_timer.state == NXT_TIMER_DISABLED) {
121 nxt_conn_timer(engine, c, state, &c->read_timer);
122 }
123}

--- 75 unchanged lines hidden (view full) ---

199 n = recv(c->socket.fd, buf, size, flags);
200
201 err = (n == -1) ? nxt_socket_errno : 0;
202
203 nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z",
204 c->socket.fd, buf, size, flags, n);
205
206 if (n > 0) {
198 if ((size_t) n < size) {
207 if ((size_t) n < size && (flags & MSG_PEEK) == 0) {
199 c->socket.read_ready = 0;
200 }
201
202 return n;
203 }
204
205 if (n == 0) {
206 c->socket.closed = 1;
208 c->socket.read_ready = 0;
209 }
210
211 return n;
212 }
213
214 if (n == 0) {
215 c->socket.closed = 1;
207 c->socket.read_ready = 0;
208
216
217 if ((flags & MSG_PEEK) == 0) {
218 c->socket.read_ready = 0;
219 }
220
209 return n;
210 }
211
212 /* n == -1 */
213
214 switch (err) {
215
216 case NXT_EAGAIN:

--- 19 unchanged lines hidden ---
221 return n;
222 }
223
224 /* n == -1 */
225
226 switch (err) {
227
228 case NXT_EAGAIN:

--- 19 unchanged lines hidden ---