1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 void 11 nxt_conn_wait(nxt_conn_t *c) 12 { 13 nxt_event_engine_t *engine; 14 const nxt_conn_state_t *state; 15 16 nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d", 17 c->socket.fd, c->socket.read_ready); 18 19 engine = c->socket.task->thread->engine; 20 state = c->read_state; 21 22 if (c->socket.read_ready) { 23 nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler, 24 c->socket.task, c, c->socket.data); 25 return; 26 } 27 28 c->socket.read_handler = state->ready_handler; 29 c->socket.error_handler = state->error_handler; 30 31 nxt_conn_timer(engine, c, state, &c->read_timer); 32 33 nxt_fd_event_enable_read(engine, &c->socket); 34 } 35 36 37 void 38 nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) 39 { 40 ssize_t n; 41 nxt_conn_t *c; 42 nxt_event_engine_t *engine; 43 nxt_work_handler_t handler; 44 const nxt_conn_state_t *state; 45 46 c = obj; 47 48 nxt_debug(task, "conn read fd:%d rdy:%d cl:%d", 49 c->socket.fd, c->socket.read_ready, c->socket.closed); 50 51 if (c->socket.error != 0) { 52 return; 53 } 54 55 engine = task->thread->engine; 56 57 /* 58 * Here c->io->read() is assigned instead of direct nxt_conn_io_read() 59 * because the function can be called by nxt_kqueue_conn_io_read(). 60 */ 61 c->socket.read_handler = c->io->read; 62 state = c->read_state; 63 c->socket.error_handler = state->error_handler; 64 65 if (c->socket.read_ready) { 66 67 if (state->io_read_handler == NULL) { 68 n = c->io->recvbuf(c, c->read); 69 70 } else { 71 n = state->io_read_handler(c); 72 /* The state can be changed by io_read_handler. */ 73 state = c->read_state; 74 } 75 76 if (n > 0) { 77 c->nbytes = n; 78 79 nxt_recvbuf_update(c->read, n); 80 81 nxt_fd_event_block_read(engine, &c->socket); 82 83 if (state->timer_autoreset) { 84 nxt_timer_disable(engine, &c->read_timer); 85 } 86 87 nxt_work_queue_add(c->read_work_queue, 88 state->ready_handler, task, c, data); 89 return; 90 } 91 92 if (n != NXT_AGAIN) { 93 /* n == 0 or n == NXT_ERROR. */ 94 handler = (n == 0) ? state->close_handler : state->error_handler; 95 96 nxt_fd_event_block_read(engine, &c->socket); 97 nxt_timer_disable(engine, &c->read_timer); 98 99 nxt_work_queue_add(&engine->fast_work_queue, 100 handler, task, c, data); 101 return; 102 } 103 104 /* n == NXT_AGAIN. */ 105 106 if (c->socket.read_ready) { 107 /* 108 * SSL/TLS library can return NXT_AGAIN if renegotiation 109 * occured during read operation, it toggled write event 110 * internally so only read timer should be set. 111 */ 112 if (!c->read_timer.enabled) { 113 nxt_conn_timer(engine, c, state, &c->read_timer); 114 } 115 116 return; 117 } 118 } 119 120 if (nxt_fd_event_is_disabled(c->socket.read)) { 121 nxt_fd_event_enable_read(engine, &c->socket); 122 } 123 124 if (state->timer_autoreset || !c->read_timer.enabled) { 125 nxt_conn_timer(engine, c, state, &c->read_timer); 126 } 127 } 128 129 130 ssize_t 131 nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 132 { 133 ssize_t n; 134 nxt_err_t err; 135 nxt_uint_t niov; 136 struct iovec iov[NXT_IOBUF_MAX]; 137 nxt_recvbuf_coalesce_t rb; 138 139 rb.buf = b; 140 rb.iobuf = iov; 141 rb.nmax = NXT_IOBUF_MAX; 142 rb.size = 0; 143 144 niov = nxt_recvbuf_mem_coalesce(&rb); 145 146 if (niov == 1) { 147 /* Disposal of surplus kernel iovec copy-in operation. */ 148 return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0); 149 } 150 151 for ( ;; ) { 152 n = readv(c->socket.fd, iov, niov); 153 154 err = (n == -1) ? nxt_socket_errno : 0; 155 156 nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n); 157 158 if (n > 0) { 159 if ((size_t) n < rb.size) { 160 c->socket.read_ready = 0; 161 } 162 163 return n; 164 } 165 166 if (n == 0) { 167 c->socket.closed = 1; 168 c->socket.read_ready = 0; 169 return n; 170 } 171 172 /* n == -1 */ 173 174 switch (err) { 175 176 case NXT_EAGAIN: 177 nxt_debug(c->socket.task, "readv() %E", err); 178 c->socket.read_ready = 0; 179 return NXT_AGAIN; 180 181 case NXT_EINTR: 182 nxt_debug(c->socket.task, "readv() %E", err); 183 continue; 184 185 default: 186 c->socket.error = err; 187 nxt_log(c->socket.task, nxt_socket_error_level(err), 188 "readv(%d, %ui) failed %E", c->socket.fd, niov, err); 189 190 return NXT_ERROR; 191 } 192 } 193 } 194 195 196 ssize_t 197 nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags) 198 { 199 ssize_t n; 200 nxt_err_t err; 201 202 for ( ;; ) { 203 n = recv(c->socket.fd, buf, size, flags); 204 205 err = (n == -1) ? nxt_socket_errno : 0; 206 207 nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z", 208 c->socket.fd, buf, size, flags, n); 209 210 if (n > 0) { 211 if ((size_t) n < size && (flags & MSG_PEEK) == 0) { 212 c->socket.read_ready = 0; 213 } 214 215 return n; 216 } 217 218 if (n == 0) { 219 c->socket.closed = 1; 220 221 if ((flags & MSG_PEEK) == 0) { 222 c->socket.read_ready = 0; 223 } 224 225 return n; 226 } 227 228 /* n == -1 */ 229 230 switch (err) { 231 232 case NXT_EAGAIN: 233 nxt_debug(c->socket.task, "recv() %E", err); 234 c->socket.read_ready = 0; 235 236 return NXT_AGAIN; 237 238 case NXT_EINTR: 239 nxt_debug(c->socket.task, "recv() %E", err); 240 continue; 241 242 default: 243 c->socket.error = err; 244 nxt_log(c->socket.task, nxt_socket_error_level(err), 245 "recv(%d, %p, %uz, %ui) failed %E", 246 c->socket.fd, buf, size, flags, err); 247 248 return NXT_ERROR; 249 } 250 } 251 } 252