Deleted
Added
nxt_conn_read.c (766:3ef71e325c4a) | nxt_conn_read.c (771:f349b2d68e75) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 --- 25 unchanged lines hidden (view full) --- 34} 35 36 37void 38nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) 39{ 40 ssize_t n; 41 nxt_conn_t *c; | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 --- 25 unchanged lines hidden (view full) --- 34} 35 36 37void 38nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) 39{ 40 ssize_t n; 41 nxt_conn_t *c; |
42 nxt_work_queue_t *wq; | |
43 nxt_event_engine_t *engine; 44 nxt_work_handler_t handler; 45 const nxt_conn_state_t *state; 46 47 c = obj; 48 49 nxt_debug(task, "conn read fd:%d rdy:%d cl:%d", 50 c->socket.fd, c->socket.read_ready, c->socket.closed); 51 52 engine = task->thread->engine; 53 | 42 nxt_event_engine_t *engine; 43 nxt_work_handler_t handler; 44 const nxt_conn_state_t *state; 45 46 c = obj; 47 48 nxt_debug(task, "conn read fd:%d rdy:%d cl:%d", 49 c->socket.fd, c->socket.read_ready, c->socket.closed); 50 51 engine = task->thread->engine; 52 |
53 /* 54 * Here c->io->read() is assigned instead of direct nxt_conn_io_read() 55 * because the function can be called by nxt_kqueue_conn_io_read(). 56 */ 57 c->socket.read_handler = c->io->read; |
|
54 state = c->read_state; | 58 state = c->read_state; |
59 c->socket.error_handler = state->error_handler; |
|
55 56 if (c->socket.read_ready) { 57 58 if (state->io_read_handler == NULL) { 59 n = c->io->recvbuf(c, c->read); 60 61 } else { 62 n = state->io_read_handler(c); --- 7 unchanged lines hidden (view full) --- 70 nxt_recvbuf_update(c->read, n); 71 72 nxt_fd_event_block_read(engine, &c->socket); 73 74 if (state->timer_autoreset) { 75 nxt_timer_disable(engine, &c->read_timer); 76 } 77 | 60 61 if (c->socket.read_ready) { 62 63 if (state->io_read_handler == NULL) { 64 n = c->io->recvbuf(c, c->read); 65 66 } else { 67 n = state->io_read_handler(c); --- 7 unchanged lines hidden (view full) --- 75 nxt_recvbuf_update(c->read, n); 76 77 nxt_fd_event_block_read(engine, &c->socket); 78 79 if (state->timer_autoreset) { 80 nxt_timer_disable(engine, &c->read_timer); 81 } 82 |
78 wq = c->read_work_queue; 79 handler = state->ready_handler; 80 81 nxt_work_queue_add(wq, handler, task, c, data); 82 | 83 nxt_work_queue_add(c->read_work_queue, 84 state->ready_handler, task, c, data); |
83 return; 84 } 85 86 if (n != NXT_AGAIN) { | 85 return; 86 } 87 88 if (n != NXT_AGAIN) { |
89 /* n == 0 or n == NXT_ERROR. */ 90 handler = (n == 0) ? state->close_handler : state->error_handler; 91 |
|
87 nxt_fd_event_block_read(engine, &c->socket); 88 nxt_timer_disable(engine, &c->read_timer); 89 | 92 nxt_fd_event_block_read(engine, &c->socket); 93 nxt_timer_disable(engine, &c->read_timer); 94 |
90 wq = &engine->fast_work_queue; | 95 nxt_work_queue_add(&engine->fast_work_queue, 96 handler, task, c, data); 97 return; 98 } |
91 | 99 |
92 handler = (n == 0) ? state->close_handler : state->error_handler; | 100 /* n == NXT_AGAIN. */ |
93 | 101 |
94 nxt_work_queue_add(wq, handler, task, c, data); | 102 if (c->socket.read_ready) { 103 /* 104 * SSL/TLS library can return NXT_AGAIN if renegotiation 105 * occured during read operation, it toggled write event 106 * internally so only read timer should be set. 107 */ 108 if (c->read_timer.state == NXT_TIMER_DISABLED) { 109 nxt_conn_timer(engine, c, state, &c->read_timer); 110 } |
95 96 return; 97 } 98 } 99 | 111 112 return; 113 } 114 } 115 |
100 /* 101 * Here c->io->read() is assigned instead of direct nxt_conn_io_read() 102 * because the function can be called by nxt_kqueue_conn_io_read(). 103 */ 104 c->socket.read_handler = c->io->read; 105 c->socket.error_handler = state->error_handler; 106 | |
107 if (nxt_fd_event_is_disabled(c->socket.read)) { 108 nxt_fd_event_enable_read(engine, &c->socket); 109 } 110 111 if (state->timer_autoreset || c->read_timer.state == NXT_TIMER_DISABLED) { 112 nxt_conn_timer(engine, c, state, &c->read_timer); 113 } 114} --- 75 unchanged lines hidden (view full) --- 190 n = recv(c->socket.fd, buf, size, flags); 191 192 err = (n == -1) ? nxt_socket_errno : 0; 193 194 nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z", 195 c->socket.fd, buf, size, flags, n); 196 197 if (n > 0) { | 116 if (nxt_fd_event_is_disabled(c->socket.read)) { 117 nxt_fd_event_enable_read(engine, &c->socket); 118 } 119 120 if (state->timer_autoreset || c->read_timer.state == NXT_TIMER_DISABLED) { 121 nxt_conn_timer(engine, c, state, &c->read_timer); 122 } 123} --- 75 unchanged lines hidden (view full) --- 199 n = recv(c->socket.fd, buf, size, flags); 200 201 err = (n == -1) ? nxt_socket_errno : 0; 202 203 nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z", 204 c->socket.fd, buf, size, flags, n); 205 206 if (n > 0) { |
198 if ((size_t) n < size) { | 207 if ((size_t) n < size && (flags & MSG_PEEK) == 0) { |
199 c->socket.read_ready = 0; 200 } 201 202 return n; 203 } 204 205 if (n == 0) { 206 c->socket.closed = 1; | 208 c->socket.read_ready = 0; 209 } 210 211 return n; 212 } 213 214 if (n == 0) { 215 c->socket.closed = 1; |
207 c->socket.read_ready = 0; | |
208 | 216 |
217 if ((flags & MSG_PEEK) == 0) { 218 c->socket.read_ready = 0; 219 } 220 |
|
209 return n; 210 } 211 212 /* n == -1 */ 213 214 switch (err) { 215 216 case NXT_EAGAIN: --- 19 unchanged lines hidden --- | 221 return n; 222 } 223 224 /* n == -1 */ 225 226 switch (err) { 227 228 case NXT_EAGAIN: --- 19 unchanged lines hidden --- |