1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * A listen socket handler calls an event facility specific io_accept() 12 * method. The method accept()s a new connection and then calls 13 * nxt_event_conn_accept() to handle the new connection and to prepare 14 * for a next connection to avoid just dropping next accept()ed socket 15 * if no more connections allowed. If there are no available connections 16 * an idle connection would be closed. If there are no idle connections 17 * then new connections will not be accept()ed for 1 second. 18 */ 19 20 21 static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task, 22 nxt_listen_event_t *lev); 23 static void nxt_conn_listen_handler(nxt_task_t *task, void *obj, 24 void *data); 25 static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task, 26 nxt_listen_event_t *lev); 27 static nxt_int_t nxt_conn_accept_close_idle(nxt_task_t *task, 28 nxt_listen_event_t *lev); 29 static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj, 30 void *data); 31 static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, 32 void *data); 33 34 35 nxt_listen_event_t * 36 nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls) 37 { 38 nxt_listen_event_t *lev; 39 nxt_event_engine_t *engine; 40 41 lev = nxt_zalloc(sizeof(nxt_listen_event_t)); 42 43 if (nxt_fast_path(lev != NULL)) { 44 lev->socket.fd = ls->socket; 45 46 engine = task->thread->engine; 47 lev->batch = engine->batch; 48 49 lev->socket.read_work_queue = &engine->accept_work_queue; 50 lev->socket.read_handler = nxt_conn_listen_handler; 51 lev->socket.error_handler = nxt_conn_listen_event_error; 52 lev->socket.log = &nxt_main_log; 53 54 lev->accept = engine->event.io->accept; 55 56 lev->listen = ls; 57 lev->work_queue = &engine->read_work_queue; 58 59 lev->timer.work_queue = &engine->fast_work_queue; 60 lev->timer.handler = nxt_conn_listen_timer_handler; 61 lev->timer.log = &nxt_main_log; 62 63 lev->task.thread = task->thread; 64 lev->task.log = &nxt_main_log; 65 lev->task.ident = nxt_task_next_ident(); 66 lev->socket.task = &lev->task; 67 lev->timer.task = &lev->task; 68 69 if (nxt_conn_accept_alloc(task, lev) != NULL) { 70 nxt_fd_event_enable_accept(engine, &lev->socket); 71 72 nxt_queue_insert_tail(&engine->listen_connections, &lev->link); 73 } 74 75 return lev; 76 } 77 78 return NULL; 79 } 80 81 82 static nxt_conn_t * 83 nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev) 84 { 85 nxt_mp_t *mp; 86 nxt_conn_t *c; 87 nxt_sockaddr_t *sa, *remote; 88 nxt_event_engine_t *engine; 89 nxt_listen_socket_t *ls; 90 91 engine = task->thread->engine; 92 93 if (engine->connections < engine->max_connections) { 94 95 mp = nxt_mp_create(1024, 128, 256, 32); 96 97 if (nxt_fast_path(mp != NULL)) { 98 c = nxt_conn_create(mp, lev->socket.task); 99 if (nxt_slow_path(c == NULL)) { 100 goto fail; 101 } 102 103 lev->next = c; 104 c->socket.read_work_queue = lev->socket.read_work_queue; 105 c->socket.write_ready = 1; 106 c->listen = lev; 107 108 ls = lev->listen; 109 110 remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length); 111 if (nxt_slow_path(remote == NULL)) { 112 goto fail; 113 } 114 115 c->remote = remote; 116 117 sa = ls->sockaddr; 118 remote->type = sa->type; 119 /* 120 * Set address family for unspecified Unix domain, 121 * because these sockaddr's are not be passed to accept(). 122 */ 123 remote->u.sockaddr.sa_family = sa->u.sockaddr.sa_family; 124 125 return c; 126 } 127 128 fail: 129 130 nxt_mp_destroy(mp); 131 } 132 133 return NULL; 134 } 135 136 137 static void 138 nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data) 139 { 140 nxt_listen_event_t *lev; 141 142 lev = obj; 143 lev->ready = lev->batch; 144 145 lev->accept(task, lev, data); 146 } 147 148 149 void 150 nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data) 151 { 152 socklen_t len; 153 nxt_conn_t *c; 154 nxt_socket_t s; 155 struct sockaddr *sa; 156 nxt_listen_event_t *lev; 157 158 lev = obj; 159 c = lev->next; 160 161 lev->ready--; 162 lev->socket.read_ready = (lev->ready != 0); 163 164 len = c->remote->socklen; 165 166 if (len >= sizeof(struct sockaddr)) { 167 sa = &c->remote->u.sockaddr; 168 169 } else { 170 sa = NULL; 171 len = 0; 172 } 173 174 s = accept(lev->socket.fd, sa, &len); 175 176 if (s == -1) { 177 nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno); 178 return; 179 } 180 181 c->socket.fd = s; 182 183 #if (NXT_LINUX) 184 /* 185 * Linux does not inherit non-blocking mode 186 * from listen socket for accept()ed socket. 187 */ 188 if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) { 189 nxt_socket_close(task, s); 190 } 191 192 #endif 193 194 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s); 195 196 nxt_conn_accept(task, lev, c); 197 } 198 199 200 void 201 nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c) 202 { 203 nxt_conn_t *next; 204 205 nxt_sockaddr_text(c->remote); 206 207 nxt_debug(task, "client: %*s", 208 c->remote->address_length, nxt_sockaddr_address(c->remote)); 209 210 nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); 211 212 c->read_work_queue = lev->work_queue; 213 c->write_work_queue = lev->work_queue; 214 215 if (lev->listen->read_after_accept) { 216 217 //c->socket.read_ready = 1; 218 // lev->listen->handler(task, c, lev->socket.data); 219 nxt_work_queue_add(c->read_work_queue, lev->listen->handler, 220 task, c, lev->socket.data); 221 222 } else { 223 nxt_work_queue_add(c->write_work_queue, lev->listen->handler, 224 task, c, lev->socket.data); 225 } 226 227 next = nxt_conn_accept_next(task, lev); 228 229 if (next != NULL && lev->socket.read_ready) { 230 nxt_work_queue_add(lev->socket.read_work_queue, 231 lev->accept, task, lev, next); 232 } 233 } 234 235 236 static nxt_conn_t * 237 nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev) 238 { 239 nxt_conn_t *c; 240 241 lev->next = NULL; 242 243 do { 244 c = nxt_conn_accept_alloc(task, lev); 245 246 if (nxt_fast_path(c != NULL)) { 247 return c; 248 } 249 250 } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK); 251 252 nxt_log(task, NXT_LOG_CRIT, "no available connections, " 253 "new connections are not accepted within 1s"); 254 255 return NULL; 256 } 257 258 259 static nxt_int_t 260 nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev) 261 { 262 nxt_conn_t *c; 263 nxt_queue_t *idle; 264 nxt_queue_link_t *link; 265 nxt_event_engine_t *engine; 266 267 static nxt_log_moderation_t nxt_idle_close_log_moderation = { 268 NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION 269 }; 270 271 engine = task->thread->engine; 272 273 idle = &engine->idle_connections; 274 275 for (link = nxt_queue_last(idle); 276 link != nxt_queue_head(idle); 277 link = nxt_queue_next(link)) 278 { 279 c = nxt_queue_link_data(link, nxt_conn_t, link); 280 281 if (!c->socket.read_ready) { 282 nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO, 283 task->log, "no available connections, " 284 "close idle connection"); 285 nxt_queue_remove(link); 286 nxt_conn_close(engine, c); 287 288 return NXT_OK; 289 } 290 } 291 292 nxt_timer_add(engine, &lev->timer, 1000); 293 294 nxt_fd_event_disable_read(engine, &lev->socket); 295 296 return NXT_DECLINED; 297 } 298 299 300 void 301 nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev, 302 const char *accept_syscall, nxt_err_t err) 303 { 304 static nxt_log_moderation_t nxt_accept_log_moderation = { 305 NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION 306 }; 307 308 lev->socket.read_ready = 0; 309 310 switch (err) { 311 312 case NXT_EAGAIN: 313 nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err); 314 return; 315 316 case ECONNABORTED: 317 nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN, 318 task->log, "%s(%d) failed %E", 319 accept_syscall, lev->socket.fd, err); 320 return; 321 322 case EMFILE: 323 case ENFILE: 324 case ENOBUFS: 325 case ENOMEM: 326 if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) { 327 nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, " 328 "new connections are not accepted within 1s", 329 accept_syscall, lev->socket.fd, err); 330 } 331 332 return; 333 334 default: 335 nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E", 336 accept_syscall, lev->socket.fd, err); 337 return; 338 } 339 } 340 341 342 static void 343 nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data) 344 { 345 nxt_conn_t *c; 346 nxt_timer_t *timer; 347 nxt_listen_event_t *lev; 348 349 timer = obj; 350 351 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 352 c = lev->next; 353 354 if (c == NULL) { 355 c = nxt_conn_accept_next(task, lev); 356 357 if (c == NULL) { 358 return; 359 } 360 } 361 362 nxt_fd_event_enable_accept(task->thread->engine, &lev->socket); 363 364 lev->accept(task, lev, c); 365 } 366 367 368 static void 369 nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data) 370 { 371 nxt_fd_event_t *ev; 372 373 ev = obj; 374 375 nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd); 376 } 377