1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * A listen socket handler calls an event facility specific io_accept() 12 * method. The method accept()s a new connection and then calls 13 * nxt_event_conn_accept() to handle the new connection and to prepare 14 * for a next connection to avoid just dropping next accept()ed socket 15 * if no more connections allowed. If there are no available connections 16 * an idle connection would be closed. If there are no idle connections 17 * then new connections will not be accept()ed for 1 second. 18 */ 19 20 21 static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task, 22 nxt_listen_event_t *lev); 23 static void nxt_conn_listen_handler(nxt_task_t *task, void *obj, 24 void *data); 25 static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task, 26 nxt_listen_event_t *lev); 27 static void nxt_conn_accept_close_idle(nxt_task_t *task, 28 nxt_listen_event_t *lev); 29 static void nxt_conn_accept_close_idle_handler(nxt_task_t *task, void *obj, 30 void *data); 31 static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj, 32 void *data); 33 static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, 34 void *data); 35 36 37 nxt_listen_event_t * 38 nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls) 39 { 40 nxt_listen_event_t *lev; 41 nxt_event_engine_t *engine; 42 43 lev = nxt_zalloc(sizeof(nxt_listen_event_t)); 44 45 if (nxt_fast_path(lev != NULL)) { 46 lev->socket.fd = ls->socket; 47 48 engine = task->thread->engine; 49 lev->batch = engine->batch; 50 lev->count = 1; 51 52 lev->socket.read_work_queue = &engine->accept_work_queue; 53 lev->socket.read_handler = nxt_conn_listen_handler; 54 lev->socket.error_handler = nxt_conn_listen_event_error; 55 lev->socket.log = &nxt_main_log; 56 57 lev->accept = engine->event.io->accept; 58 59 lev->listen = ls; 60 lev->work_queue = &engine->read_work_queue; 61 62 lev->timer.work_queue = &engine->fast_work_queue; 63 lev->timer.handler = nxt_conn_listen_timer_handler; 64 lev->timer.log = &nxt_main_log; 65 66 lev->task.thread = task->thread; 67 lev->task.log = &nxt_main_log; 68 lev->task.ident = nxt_task_next_ident(); 69 lev->socket.task = &lev->task; 70 lev->timer.task = &lev->task; 71 72 if (nxt_conn_accept_alloc(task, lev) != NULL) { 73 nxt_fd_event_enable_accept(engine, &lev->socket); 74 75 nxt_queue_insert_tail(&engine->listen_connections, &lev->link); 76 } 77 78 return lev; 79 } 80 81 return NULL; 82 } 83 84 85 static nxt_conn_t * 86 nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev) 87 { 88 nxt_mp_t *mp; 89 nxt_conn_t *c; 90 nxt_event_engine_t *engine; 91 92 engine = task->thread->engine; 93 94 if (engine->connections < engine->max_connections) { 95 96 mp = nxt_mp_create(1024, 128, 256, 32); 97 98 if (nxt_fast_path(mp != NULL)) { 99 c = nxt_conn_create(mp, lev->socket.task); 100 if (nxt_slow_path(c == NULL)) { 101 goto fail; 102 } 103 104 c->socket.read_work_queue = lev->socket.read_work_queue; 105 c->socket.write_ready = 1; 106 107 c->remote = nxt_sockaddr_cache_alloc(engine, lev->listen); 108 if (nxt_fast_path(c->remote != NULL)) { 109 lev->next = c; 110 return c; 111 } 112 } 113 114 fail: 115 116 nxt_mp_destroy(mp); 117 } 118 119 return NULL; 120 } 121 122 123 static void 124 nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data) 125 { 126 nxt_listen_event_t *lev; 127 128 lev = obj; 129 lev->ready = lev->batch; 130 131 lev->accept(task, lev, data); 132 } 133 134 135 void 136 nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data) 137 { 138 socklen_t socklen; 139 nxt_conn_t *c; 140 nxt_socket_t s; 141 struct sockaddr *sa; 142 nxt_listen_event_t *lev; 143 144 lev = obj; 145 c = lev->next; 146 147 lev->ready--; 148 lev->socket.read_ready = (lev->ready != 0); 149 150 sa = &c->remote->u.sockaddr; 151 socklen = c->remote->socklen; 152 /* 153 * The returned socklen is ignored here, because sockaddr_in and 154 * sockaddr_in6 socklens are not changed. As to unspecified sockaddr_un 155 * it is 3 byte length and already prepared, because old BSDs return zero 156 * socklen and do not update the sockaddr_un at all; Linux returns 2 byte 157 * socklen and updates only the sa_family part; other systems copy 3 bytes 158 * and truncate surplus zero part. Only bound sockaddr_un will be really 159 * truncated here. 160 */ 161 s = accept(lev->socket.fd, sa, &socklen); 162 163 if (s == -1) { 164 nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno); 165 return; 166 } 167 168 c->socket.fd = s; 169 170 #if (NXT_LINUX) 171 /* 172 * Linux does not inherit non-blocking mode 173 * from listen socket for accept()ed socket. 174 */ 175 if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) { 176 nxt_socket_close(task, s); 177 } 178 179 #endif 180 181 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s); 182 183 nxt_conn_accept(task, lev, c); 184 } 185 186 187 void 188 nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c) 189 { 190 nxt_conn_t *next; 191 192 nxt_sockaddr_text(c->remote); 193 194 nxt_debug(task, "client: %*s", 195 (size_t) c->remote->address_length, 196 nxt_sockaddr_address(c->remote)); 197 198 nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); 199 200 c->listen = lev; 201 lev->count++; 202 lev->next = NULL; 203 c->socket.data = NULL; 204 205 c->read_work_queue = lev->work_queue; 206 c->write_work_queue = lev->work_queue; 207 208 if (lev->listen->read_after_accept) { 209 210 //c->socket.read_ready = 1; 211 // lev->listen->handler(task, c, lev); 212 nxt_work_queue_add(c->read_work_queue, lev->listen->handler, 213 &c->task, c, lev); 214 215 } else { 216 nxt_work_queue_add(c->write_work_queue, lev->listen->handler, 217 &c->task, c, lev); 218 } 219 220 next = nxt_conn_accept_next(task, lev); 221 222 if (next != NULL && lev->socket.read_ready) { 223 nxt_work_queue_add(lev->socket.read_work_queue, 224 lev->accept, task, lev, next); 225 } 226 } 227 228 229 static nxt_conn_t * 230 nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev) 231 { 232 nxt_conn_t *c; 233 234 c = lev->next; 235 236 if (c == NULL) { 237 c = nxt_conn_accept_alloc(task, lev); 238 239 if (nxt_slow_path(c == NULL)) { 240 nxt_conn_accept_close_idle(task, lev); 241 } 242 } 243 244 return c; 245 } 246 247 248 static void 249 nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev) 250 { 251 nxt_event_engine_t *engine; 252 253 engine = task->thread->engine; 254 255 nxt_work_queue_add(&engine->close_work_queue, 256 nxt_conn_accept_close_idle_handler, task, NULL, NULL); 257 258 nxt_timer_add(engine, &lev->timer, 100); 259 260 nxt_fd_event_disable_read(engine, &lev->socket); 261 262 nxt_alert(task, "new connections are not accepted within 100ms"); 263 } 264 265 266 static void 267 nxt_conn_accept_close_idle_handler(nxt_task_t *task, void *obj, void *data) 268 { 269 nxt_uint_t times; 270 nxt_conn_t *c; 271 nxt_queue_t *idle; 272 nxt_queue_link_t *link, *next; 273 nxt_event_engine_t *engine; 274 275 static nxt_log_moderation_t nxt_idle_close_log_moderation = { 276 NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION 277 }; 278 279 times = 10; 280 engine = task->thread->engine; 281 idle = &engine->idle_connections; 282 283 for (link = nxt_queue_last(idle); 284 link != nxt_queue_head(idle); 285 link = next) 286 { 287 next = nxt_queue_next(link); 288 289 c = nxt_queue_link_data(link, nxt_conn_t, link); 290 291 nxt_debug(c->socket.task, "idle connection: %d rdy:%d", 292 c->socket.fd, c->socket.read_ready); 293 294 if (!c->socket.read_ready) { 295 nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO, 296 task->log, "no available connections, " 297 "close idle connection"); 298 299 c->read_state->close_handler(c->socket.task, c, c->socket.data); 300 301 times--; 302 303 if (times == 0) { 304 break; 305 } 306 } 307 } 308 } 309 310 311 void 312 nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev, 313 const char *accept_syscall, nxt_err_t err) 314 { 315 static nxt_log_moderation_t nxt_accept_log_moderation = { 316 NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION 317 }; 318 319 lev->socket.read_ready = 0; 320 321 switch (err) { 322 323 case NXT_EAGAIN: 324 nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err); 325 return; 326 327 case ECONNABORTED: 328 nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN, 329 task->log, "%s(%d) failed %E", 330 accept_syscall, lev->socket.fd, err); 331 return; 332 333 case EMFILE: 334 case ENFILE: 335 case ENOBUFS: 336 case ENOMEM: 337 nxt_alert(task, "%s(%d) failed %E", 338 accept_syscall, lev->socket.fd, err); 339 340 nxt_conn_accept_close_idle(task, lev); 341 return; 342 343 default: 344 nxt_alert(task, "%s(%d) failed %E", 345 accept_syscall, lev->socket.fd, err); 346 return; 347 } 348 } 349 350 351 static void 352 nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data) 353 { 354 nxt_conn_t *c; 355 nxt_timer_t *timer; 356 nxt_listen_event_t *lev; 357 358 timer = obj; 359 360 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 361 362 c = nxt_conn_accept_next(task, lev); 363 if (c == NULL) { 364 return; 365 } 366 367 nxt_fd_event_enable_accept(task->thread->engine, &lev->socket); 368 369 lev->accept(task, lev, c); 370 } 371 372 373 static void 374 nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data) 375 { 376 nxt_fd_event_t *ev; 377 378 ev = obj; 379 380 nxt_alert(task, "accept(%d) event error", ev->fd); 381 } 382