1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 void 11 nxt_conn_wait(nxt_conn_t *c) 12 { 13 nxt_event_engine_t *engine; 14 const nxt_conn_state_t *state; 15 16 nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d", 17 c->socket.fd, c->socket.read_ready); 18 19 engine = c->socket.task->thread->engine; 20 state = c->read_state; 21 22 if (c->socket.read_ready) { 23 nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler, 24 c->socket.task, c, c->socket.data); 25 return; 26 } 27 28 c->socket.read_handler = state->ready_handler; 29 c->socket.error_handler = state->error_handler; 30 31 nxt_conn_timer(engine, c, state, &c->read_timer); 32 33 nxt_fd_event_enable_read(engine, &c->socket); 34 } 35 36 37 void 38 nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) 39 { 40 ssize_t n; 41 nxt_conn_t *c; 42 nxt_work_queue_t *wq; 43 nxt_event_engine_t *engine; 44 nxt_work_handler_t handler; 45 const nxt_conn_state_t *state; 46 47 c = obj; 48 49 nxt_debug(task, "conn read fd:%d rdy:%d cl:%d", 50 c->socket.fd, c->socket.read_ready, c->socket.closed); 51 52 engine = task->thread->engine; 53 54 state = c->read_state; 55 56 if (c->socket.read_ready) { 57 58 if (state->io_read_handler == NULL) { 59 n = c->io->recvbuf(c, c->read); 60 61 } else { 62 n = state->io_read_handler(c); 63 /* The state can be changed by io_read_handler. */ 64 state = c->read_state; 65 } 66 67 if (n > 0) { 68 c->nbytes = n; 69 70 nxt_recvbuf_update(c->read, n); 71 72 nxt_fd_event_block_read(engine, &c->socket); 73 74 if (state->timer_autoreset) { 75 nxt_timer_disable(engine, &c->read_timer); 76 } 77 78 wq = c->read_work_queue; 79 handler = state->ready_handler; 80 81 nxt_work_queue_add(wq, handler, task, c, data); 82 83 return; 84 } 85 86 if (n != NXT_AGAIN) { 87 nxt_fd_event_block_read(engine, &c->socket); 88 nxt_timer_disable(engine, &c->read_timer); 89 90 wq = &engine->fast_work_queue; 91 92 handler = (n == 0) ? state->close_handler : state->error_handler; 93 94 nxt_work_queue_add(wq, handler, task, c, data); 95 96 return; 97 } 98 } 99 100 /* 101 * Here c->io->read() is assigned instead of direct nxt_conn_io_read() 102 * because the function can be called by nxt_kqueue_conn_io_read(). 103 */ 104 c->socket.read_handler = c->io->read; 105 c->socket.error_handler = state->error_handler; 106 107 if (c->read_timer.state == NXT_TIMER_DISABLED 108 || nxt_fd_event_is_disabled(c->socket.read)) 109 { 110 /* Timer may be set or reset. */ 111 nxt_conn_timer(engine, c, state, &c->read_timer); 112 113 if (nxt_fd_event_is_disabled(c->socket.read)) { 114 nxt_fd_event_enable_read(engine, &c->socket); 115 } 116 } 117 118 return; 119 } 120 121 122 ssize_t 123 nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 124 { 125 ssize_t n; 126 nxt_err_t err; 127 nxt_uint_t niov; 128 struct iovec iov[NXT_IOBUF_MAX]; 129 nxt_recvbuf_coalesce_t rb; 130 131 rb.buf = b; 132 rb.iobuf = iov; 133 rb.nmax = NXT_IOBUF_MAX; 134 rb.size = 0; 135 136 niov = nxt_recvbuf_mem_coalesce(&rb); 137 138 if (niov == 1) { 139 /* Disposal of surplus kernel iovec copy-in operation. */ 140 return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0); 141 } 142 143 for ( ;; ) { 144 n = readv(c->socket.fd, iov, niov); 145 146 err = (n == -1) ? nxt_socket_errno : 0; 147 148 nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n); 149 150 if (n > 0) { 151 if ((size_t) n < rb.size) { 152 c->socket.read_ready = 0; 153 } 154 155 return n; 156 } 157 158 if (n == 0) { 159 c->socket.closed = 1; 160 c->socket.read_ready = 0; 161 return n; 162 } 163 164 /* n == -1 */ 165 166 switch (err) { 167 168 case NXT_EAGAIN: 169 nxt_debug(c->socket.task, "readv() %E", err); 170 c->socket.read_ready = 0; 171 return NXT_AGAIN; 172 173 case NXT_EINTR: 174 nxt_debug(c->socket.task, "readv() %E", err); 175 continue; 176 177 default: 178 c->socket.error = err; 179 nxt_log(c->socket.task, nxt_socket_error_level(err), 180 "readv(%d, %ui) failed %E", c->socket.fd, niov, err); 181 182 return NXT_ERROR; 183 } 184 } 185 } 186 187 188 ssize_t 189 nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags) 190 { 191 ssize_t n; 192 nxt_err_t err; 193 194 for ( ;; ) { 195 n = recv(c->socket.fd, buf, size, flags); 196 197 err = (n == -1) ? nxt_socket_errno : 0; 198 199 nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z", 200 c->socket.fd, buf, size, flags, n); 201 202 if (n > 0) { 203 if ((size_t) n < size) { 204 c->socket.read_ready = 0; 205 } 206 207 return n; 208 } 209 210 if (n == 0) { 211 c->socket.closed = 1; 212 c->socket.read_ready = 0; 213 214 return n; 215 } 216 217 /* n == -1 */ 218 219 switch (err) { 220 221 case NXT_EAGAIN: 222 nxt_debug(c->socket.task, "recv() %E", err); 223 c->socket.read_ready = 0; 224 225 return NXT_AGAIN; 226 227 case NXT_EINTR: 228 nxt_debug(c->socket.task, "recv() %E", err); 229 continue; 230 231 default: 232 c->socket.error = err; 233 nxt_log(c->socket.task, nxt_socket_error_level(err), 234 "recv(%d, %p, %uz, %ui) failed %E", 235 c->socket.fd, buf, size, flags, err); 236 237 return NXT_ERROR; 238 } 239 } 240 } 241