1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 void 11 nxt_conn_wait(nxt_conn_t *c) 12 { 13 nxt_event_engine_t *engine; 14 const nxt_conn_state_t *state; 15 16 nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d", 17 c->socket.fd, c->socket.read_ready); 18 19 engine = c->socket.task->thread->engine; 20 state = c->read_state; 21 22 if (c->socket.read_ready) { 23 nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler, 24 c->socket.task, c, c->socket.data); 25 return; 26 } 27 28 c->socket.read_handler = state->ready_handler; 29 c->socket.error_handler = state->error_handler; 30 31 nxt_conn_timer(engine, c, state, &c->read_timer); 32 33 nxt_fd_event_enable_read(engine, &c->socket); 34 } 35 36 37 void 38 nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) 39 { 40 ssize_t n; 41 nxt_conn_t *c; 42 nxt_event_engine_t *engine; 43 nxt_work_handler_t handler; 44 const nxt_conn_state_t *state; 45 46 c = obj; 47 48 nxt_debug(task, "conn read fd:%d rdy:%d cl:%d", 49 c->socket.fd, c->socket.read_ready, c->socket.closed); 50 51 engine = task->thread->engine; 52 53 /* 54 * Here c->io->read() is assigned instead of direct nxt_conn_io_read() 55 * because the function can be called by nxt_kqueue_conn_io_read(). 56 */ 57 c->socket.read_handler = c->io->read; 58 state = c->read_state; 59 c->socket.error_handler = state->error_handler; 60 61 if (c->socket.read_ready) { 62 63 if (state->io_read_handler == NULL) { 64 n = c->io->recvbuf(c, c->read); 65 66 } else { 67 n = state->io_read_handler(c); 68 /* The state can be changed by io_read_handler. */ 69 state = c->read_state; 70 } 71 72 if (n > 0) { 73 c->nbytes = n; 74 75 nxt_recvbuf_update(c->read, n); 76 77 nxt_fd_event_block_read(engine, &c->socket); 78 79 if (state->timer_autoreset) { 80 nxt_timer_disable(engine, &c->read_timer); 81 } 82 83 nxt_work_queue_add(c->read_work_queue, 84 state->ready_handler, task, c, data); 85 return; 86 } 87 88 if (n != NXT_AGAIN) { 89 /* n == 0 or n == NXT_ERROR. */ 90 handler = (n == 0) ? state->close_handler : state->error_handler; 91 92 nxt_fd_event_block_read(engine, &c->socket); 93 nxt_timer_disable(engine, &c->read_timer); 94 95 nxt_work_queue_add(&engine->fast_work_queue, 96 handler, task, c, data); 97 return; 98 } 99 100 /* n == NXT_AGAIN. */ 101 102 if (c->socket.read_ready) { 103 /* 104 * SSL/TLS library can return NXT_AGAIN if renegotiation 105 * occured during read operation, it toggled write event 106 * internally so only read timer should be set. 107 */ 108 if (!c->read_timer.enabled) { 109 nxt_conn_timer(engine, c, state, &c->read_timer); 110 } 111 112 return; 113 } 114 } 115 116 if (nxt_fd_event_is_disabled(c->socket.read)) { 117 nxt_fd_event_enable_read(engine, &c->socket); 118 } 119 120 if (state->timer_autoreset || !c->read_timer.enabled) { 121 nxt_conn_timer(engine, c, state, &c->read_timer); 122 } 123 } 124 125 126 ssize_t 127 nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 128 { 129 ssize_t n; 130 nxt_err_t err; 131 nxt_uint_t niov; 132 struct iovec iov[NXT_IOBUF_MAX]; 133 nxt_recvbuf_coalesce_t rb; 134 135 rb.buf = b; 136 rb.iobuf = iov; 137 rb.nmax = NXT_IOBUF_MAX; 138 rb.size = 0; 139 140 niov = nxt_recvbuf_mem_coalesce(&rb); 141 142 if (niov == 1) { 143 /* Disposal of surplus kernel iovec copy-in operation. */ 144 return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0); 145 } 146 147 for ( ;; ) { 148 n = readv(c->socket.fd, iov, niov); 149 150 err = (n == -1) ? nxt_socket_errno : 0; 151 152 nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n); 153 154 if (n > 0) { 155 if ((size_t) n < rb.size) { 156 c->socket.read_ready = 0; 157 } 158 159 return n; 160 } 161 162 if (n == 0) { 163 c->socket.closed = 1; 164 c->socket.read_ready = 0; 165 return n; 166 } 167 168 /* n == -1 */ 169 170 switch (err) { 171 172 case NXT_EAGAIN: 173 nxt_debug(c->socket.task, "readv() %E", err); 174 c->socket.read_ready = 0; 175 return NXT_AGAIN; 176 177 case NXT_EINTR: 178 nxt_debug(c->socket.task, "readv() %E", err); 179 continue; 180 181 default: 182 c->socket.error = err; 183 nxt_log(c->socket.task, nxt_socket_error_level(err), 184 "readv(%d, %ui) failed %E", c->socket.fd, niov, err); 185 186 return NXT_ERROR; 187 } 188 } 189 } 190 191 192 ssize_t 193 nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags) 194 { 195 ssize_t n; 196 nxt_err_t err; 197 198 for ( ;; ) { 199 n = recv(c->socket.fd, buf, size, flags); 200 201 err = (n == -1) ? nxt_socket_errno : 0; 202 203 nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z", 204 c->socket.fd, buf, size, flags, n); 205 206 if (n > 0) { 207 if ((size_t) n < size && (flags & MSG_PEEK) == 0) { 208 c->socket.read_ready = 0; 209 } 210 211 return n; 212 } 213 214 if (n == 0) { 215 c->socket.closed = 1; 216 217 if ((flags & MSG_PEEK) == 0) { 218 c->socket.read_ready = 0; 219 } 220 221 return n; 222 } 223 224 /* n == -1 */ 225 226 switch (err) { 227 228 case NXT_EAGAIN: 229 nxt_debug(c->socket.task, "recv() %E", err); 230 c->socket.read_ready = 0; 231 232 return NXT_AGAIN; 233 234 case NXT_EINTR: 235 nxt_debug(c->socket.task, "recv() %E", err); 236 continue; 237 238 default: 239 c->socket.error = err; 240 nxt_log(c->socket.task, nxt_socket_error_level(err), 241 "recv(%d, %p, %uz, %ui) failed %E", 242 c->socket.fd, buf, size, flags, err); 243 244 return NXT_ERROR; 245 } 246 } 247 } 248