1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * A listen socket handler calls an event facility specific io_accept() 12 * method. The method accept()s a new connection and then calls 13 * nxt_event_conn_accept() to handle the new connection and to prepare 14 * for a next connection to avoid just dropping next accept()ed socket 15 * if no more connections allowed. If there are no available connections 16 * an idle connection would be closed. If there are no idle connections 17 * then new connections will not be accept()ed for 1 second. 18 */ 19 20 21 static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task, 22 nxt_listen_event_t *lev); 23 static void nxt_conn_listen_handler(nxt_task_t *task, void *obj, 24 void *data); 25 static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task, 26 nxt_listen_event_t *lev); 27 static nxt_int_t nxt_conn_accept_close_idle(nxt_task_t *task, 28 nxt_listen_event_t *lev); 29 static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj, 30 void *data); 31 static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, 32 void *data); 33 34 35 nxt_listen_event_t * 36 nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls) 37 { 38 nxt_listen_event_t *lev; 39 nxt_event_engine_t *engine; 40 41 lev = nxt_zalloc(sizeof(nxt_listen_event_t)); 42 43 if (nxt_fast_path(lev != NULL)) { 44 lev->socket.fd = ls->socket; 45 46 engine = task->thread->engine; 47 lev->batch = engine->batch; 48 49 lev->socket.read_work_queue = &engine->accept_work_queue; 50 lev->socket.read_handler = nxt_conn_listen_handler; 51 lev->socket.error_handler = nxt_conn_listen_event_error; 52 lev->socket.log = &nxt_main_log; 53 54 lev->accept = engine->event.io->accept; 55 56 lev->listen = ls; 57 lev->work_queue = &engine->read_work_queue; 58 59 lev->timer.work_queue = &engine->fast_work_queue; 60 lev->timer.handler = nxt_conn_listen_timer_handler; 61 lev->timer.log = &nxt_main_log; 62 63 lev->task.thread = task->thread; 64 lev->task.log = &nxt_main_log; 65 lev->task.ident = nxt_task_next_ident(); 66 lev->socket.task = &lev->task; 67 lev->timer.task = &lev->task; 68 69 if (nxt_conn_accept_alloc(task, lev) != NULL) { 70 nxt_fd_event_enable_accept(engine, &lev->socket); 71 72 nxt_queue_insert_tail(&engine->listen_connections, &lev->link); 73 } 74 75 return lev; 76 } 77 78 return NULL; 79 } 80 81 82 static nxt_conn_t * 83 nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev) 84 { 85 nxt_mp_t *mp; 86 nxt_conn_t *c; 87 nxt_event_engine_t *engine; 88 89 engine = task->thread->engine; 90 91 if (engine->connections < engine->max_connections) { 92 93 mp = nxt_mp_create(1024, 128, 256, 32); 94 95 if (nxt_fast_path(mp != NULL)) { 96 c = nxt_conn_create(mp, lev->socket.task); 97 if (nxt_slow_path(c == NULL)) { 98 goto fail; 99 } 100 101 lev->next = c; 102 c->socket.read_work_queue = lev->socket.read_work_queue; 103 c->socket.write_ready = 1; 104 105 c->remote = nxt_sockaddr_cache_alloc(engine, lev->listen); 106 if (nxt_fast_path(c->remote != NULL)) { 107 return c; 108 } 109 } 110 111 fail: 112 113 nxt_mp_destroy(mp); 114 } 115 116 return NULL; 117 } 118 119 120 static void 121 nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data) 122 { 123 nxt_listen_event_t *lev; 124 125 lev = obj; 126 lev->ready = lev->batch; 127 128 lev->accept(task, lev, data); 129 } 130 131 132 void 133 nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data) 134 { 135 socklen_t socklen; 136 nxt_conn_t *c; 137 nxt_socket_t s; 138 struct sockaddr *sa; 139 nxt_listen_event_t *lev; 140 141 lev = obj; 142 c = lev->next; 143 144 lev->ready--; 145 lev->socket.read_ready = (lev->ready != 0); 146 147 sa = &c->remote->u.sockaddr; 148 socklen = c->remote->socklen; 149 /* 150 * The returned socklen is ignored here, because sockaddr_in and 151 * sockaddr_in6 socklens are not changed. As to unspecified sockaddr_un 152 * it is 3 byte length and already prepared, because old BSDs return zero 153 * socklen and do not update the sockaddr_un at all; Linux returns 2 byte 154 * socklen and updates only the sa_family part; other systems copy 3 bytes 155 * and truncate surplus zero part. Only bound sockaddr_un will be really 156 * truncated here. 157 */ 158 s = accept(lev->socket.fd, sa, &socklen); 159 160 if (s == -1) { 161 nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno); 162 return; 163 } 164 165 c->socket.fd = s; 166 167 #if (NXT_LINUX) 168 /* 169 * Linux does not inherit non-blocking mode 170 * from listen socket for accept()ed socket. 171 */ 172 if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) { 173 nxt_socket_close(task, s); 174 } 175 176 #endif 177 178 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s); 179 180 nxt_conn_accept(task, lev, c); 181 } 182 183 184 void 185 nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c) 186 { 187 nxt_conn_t *next; 188 189 nxt_sockaddr_text(c->remote); 190 191 nxt_debug(task, "client: %*s", 192 c->remote->address_length, nxt_sockaddr_address(c->remote)); 193 194 nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); 195 196 c->read_work_queue = lev->work_queue; 197 c->write_work_queue = lev->work_queue; 198 199 if (lev->listen->read_after_accept) { 200 201 //c->socket.read_ready = 1; 202 // lev->listen->handler(task, c, lev->socket.data); 203 nxt_work_queue_add(c->read_work_queue, lev->listen->handler, 204 &c->task, c, lev->socket.data); 205 206 } else { 207 nxt_work_queue_add(c->write_work_queue, lev->listen->handler, 208 &c->task, c, lev->socket.data); 209 } 210 211 next = nxt_conn_accept_next(task, lev); 212 213 if (next != NULL && lev->socket.read_ready) { 214 nxt_work_queue_add(lev->socket.read_work_queue, 215 lev->accept, task, lev, next); 216 } 217 } 218 219 220 static nxt_conn_t * 221 nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev) 222 { 223 nxt_conn_t *c; 224 225 lev->next = NULL; 226 227 do { 228 c = nxt_conn_accept_alloc(task, lev); 229 230 if (nxt_fast_path(c != NULL)) { 231 return c; 232 } 233 234 } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK); 235 236 nxt_log(task, NXT_LOG_CRIT, "no available connections, " 237 "new connections are not accepted within 1s"); 238 239 return NULL; 240 } 241 242 243 static nxt_int_t 244 nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev) 245 { 246 nxt_conn_t *c; 247 nxt_queue_t *idle; 248 nxt_queue_link_t *link; 249 nxt_event_engine_t *engine; 250 251 static nxt_log_moderation_t nxt_idle_close_log_moderation = { 252 NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION 253 }; 254 255 engine = task->thread->engine; 256 257 idle = &engine->idle_connections; 258 259 for (link = nxt_queue_last(idle); 260 link != nxt_queue_head(idle); 261 link = nxt_queue_next(link)) 262 { 263 c = nxt_queue_link_data(link, nxt_conn_t, link); 264 265 if (!c->socket.read_ready) { 266 nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO, 267 task->log, "no available connections, " 268 "close idle connection"); 269 nxt_queue_remove(link); 270 nxt_conn_close(engine, c); 271 272 return NXT_OK; 273 } 274 } 275 276 nxt_timer_add(engine, &lev->timer, 1000); 277 278 nxt_fd_event_disable_read(engine, &lev->socket); 279 280 return NXT_DECLINED; 281 } 282 283 284 void 285 nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev, 286 const char *accept_syscall, nxt_err_t err) 287 { 288 static nxt_log_moderation_t nxt_accept_log_moderation = { 289 NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION 290 }; 291 292 lev->socket.read_ready = 0; 293 294 switch (err) { 295 296 case NXT_EAGAIN: 297 nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err); 298 return; 299 300 case ECONNABORTED: 301 nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN, 302 task->log, "%s(%d) failed %E", 303 accept_syscall, lev->socket.fd, err); 304 return; 305 306 case EMFILE: 307 case ENFILE: 308 case ENOBUFS: 309 case ENOMEM: 310 if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) { 311 nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, " 312 "new connections are not accepted within 1s", 313 accept_syscall, lev->socket.fd, err); 314 } 315 316 return; 317 318 default: 319 nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E", 320 accept_syscall, lev->socket.fd, err); 321 return; 322 } 323 } 324 325 326 static void 327 nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data) 328 { 329 nxt_conn_t *c; 330 nxt_timer_t *timer; 331 nxt_listen_event_t *lev; 332 333 timer = obj; 334 335 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 336 c = lev->next; 337 338 if (c == NULL) { 339 c = nxt_conn_accept_next(task, lev); 340 341 if (c == NULL) { 342 return; 343 } 344 } 345 346 nxt_fd_event_enable_accept(task->thread->engine, &lev->socket); 347 348 lev->accept(task, lev, c); 349 } 350 351 352 static void 353 nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data) 354 { 355 nxt_fd_event_t *ev; 356 357 ev = obj; 358 359 nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd); 360 } 361