1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 void 11 nxt_conn_wait(nxt_conn_t *c) 12 { 13 nxt_event_engine_t *engine; 14 const nxt_conn_state_t *state; 15 16 nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d", 17 c->socket.fd, c->socket.read_ready); 18 19 engine = c->socket.task->thread->engine; 20 state = c->read_state; 21 22 if (c->socket.read_ready) { 23 nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler, 24 c->socket.task, c, c->socket.data); 25 return; 26 } 27 28 c->socket.read_handler = state->ready_handler; 29 c->socket.error_handler = state->error_handler; 30 31 nxt_conn_timer(engine, c, state, &c->read_timer); 32 33 nxt_fd_event_enable_read(engine, &c->socket); 34 } 35 36 37 void 38 nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) 39 { 40 ssize_t n; 41 nxt_conn_t *c; 42 nxt_event_engine_t *engine; 43 nxt_work_handler_t handler; 44 const nxt_conn_state_t *state; 45 46 c = obj; 47 48 nxt_debug(task, "conn read fd:%d rdy:%d cl:%d er:%d bl:%d", 49 c->socket.fd, c->socket.read_ready, c->socket.closed, 50 c->socket.error, c->block_read); 51 52 if (c->socket.error != 0 || c->block_read) { 53 return; 54 } 55 56 engine = task->thread->engine; 57 58 /* 59 * Here c->io->read() is assigned instead of direct nxt_conn_io_read() 60 * because the function can be called by nxt_kqueue_conn_io_read(). 61 */ 62 c->socket.read_handler = c->io->read; 63 state = c->read_state; 64 c->socket.error_handler = state->error_handler; 65 66 if (c->socket.read_ready) { 67 68 if (state->io_read_handler == NULL) { 69 n = c->io->recvbuf(c, c->read); 70 71 } else { 72 n = state->io_read_handler(c); 73 /* The state can be changed by io_read_handler. */ 74 state = c->read_state; 75 } 76 77 if (n > 0) { 78 c->nbytes = n; 79 80 nxt_recvbuf_update(c->read, n); 81 82 nxt_fd_event_block_read(engine, &c->socket); 83 84 if (state->timer_autoreset) { 85 nxt_timer_disable(engine, &c->read_timer); 86 } 87 88 nxt_work_queue_add(c->read_work_queue, 89 state->ready_handler, task, c, data); 90 return; 91 } 92 93 if (n != NXT_AGAIN) { 94 /* n == 0 or n == NXT_ERROR. */ 95 handler = (n == 0) ? state->close_handler : state->error_handler; 96 97 nxt_fd_event_block_read(engine, &c->socket); 98 nxt_timer_disable(engine, &c->read_timer); 99 100 nxt_work_queue_add(&engine->fast_work_queue, 101 handler, task, c, data); 102 return; 103 } 104 105 /* n == NXT_AGAIN. */ 106 107 if (c->socket.read_ready) { 108 /* 109 * SSL/TLS library can return NXT_AGAIN if renegotiation 110 * occured during read operation, it toggled write event 111 * internally so only read timer should be set. 112 */ 113 if (!c->read_timer.enabled) { 114 nxt_conn_timer(engine, c, state, &c->read_timer); 115 } 116 117 return; 118 } 119 } 120 121 if (nxt_fd_event_is_disabled(c->socket.read)) { 122 nxt_fd_event_enable_read(engine, &c->socket); 123 } 124 125 if (state->timer_autoreset || !c->read_timer.enabled) { 126 nxt_conn_timer(engine, c, state, &c->read_timer); 127 } 128 } 129 130 131 ssize_t 132 nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 133 { 134 ssize_t n; 135 nxt_err_t err; 136 nxt_uint_t niov; 137 struct iovec iov[NXT_IOBUF_MAX]; 138 nxt_recvbuf_coalesce_t rb; 139 140 rb.buf = b; 141 rb.iobuf = iov; 142 rb.nmax = NXT_IOBUF_MAX; 143 rb.size = 0; 144 145 niov = nxt_recvbuf_mem_coalesce(&rb); 146 147 if (niov == 1) { 148 /* Disposal of surplus kernel iovec copy-in operation. */ 149 return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0); 150 } 151 152 for ( ;; ) { 153 n = readv(c->socket.fd, iov, niov); 154 155 err = (n == -1) ? nxt_socket_errno : 0; 156 157 nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n); 158 159 if (n > 0) { 160 if ((size_t) n < rb.size) { 161 c->socket.read_ready = 0; 162 } 163 164 return n; 165 } 166 167 if (n == 0) { 168 c->socket.closed = 1; 169 c->socket.read_ready = 0; 170 return n; 171 } 172 173 /* n == -1 */ 174 175 switch (err) { 176 177 case NXT_EAGAIN: 178 nxt_debug(c->socket.task, "readv() %E", err); 179 c->socket.read_ready = 0; 180 return NXT_AGAIN; 181 182 case NXT_EINTR: 183 nxt_debug(c->socket.task, "readv() %E", err); 184 continue; 185 186 default: 187 c->socket.error = err; 188 nxt_log(c->socket.task, nxt_socket_error_level(err), 189 "readv(%d, %ui) failed %E", c->socket.fd, niov, err); 190 191 return NXT_ERROR; 192 } 193 } 194 } 195 196 197 ssize_t 198 nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags) 199 { 200 ssize_t n; 201 nxt_err_t err; 202 203 for ( ;; ) { 204 n = recv(c->socket.fd, buf, size, flags); 205 206 err = (n == -1) ? nxt_socket_errno : 0; 207 208 nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z", 209 c->socket.fd, buf, size, flags, n); 210 211 if (n > 0) { 212 if ((size_t) n < size && (flags & MSG_PEEK) == 0) { 213 c->socket.read_ready = 0; 214 } 215 216 return n; 217 } 218 219 if (n == 0) { 220 c->socket.closed = 1; 221 222 if ((flags & MSG_PEEK) == 0) { 223 c->socket.read_ready = 0; 224 } 225 226 return n; 227 } 228 229 /* n == -1 */ 230 231 switch (err) { 232 233 case NXT_EAGAIN: 234 nxt_debug(c->socket.task, "recv() %E", err); 235 c->socket.read_ready = 0; 236 237 return NXT_AGAIN; 238 239 case NXT_EINTR: 240 nxt_debug(c->socket.task, "recv() %E", err); 241 continue; 242 243 default: 244 c->socket.error = err; 245 nxt_log(c->socket.task, nxt_socket_error_level(err), 246 "recv(%d, %p, %uz, %ui) failed %E", 247 c->socket.fd, buf, size, flags, err); 248 249 return NXT_ERROR; 250 } 251 } 252 } 253