1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * A listen socket handler calls an event facility specific io_accept() 12 * method. The method accept()s a new connection and then calls 13 * nxt_event_conn_accept() to handle the new connection and to prepare 14 * for a next connection to avoid just dropping next accept()ed socket 15 * if no more connections allowed. If there are no available connections 16 * an idle connection would be closed. If there are no idle connections 17 * then new connections will not be accept()ed for 1 second. 18 */ 19 20 21 static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task, 22 nxt_listen_event_t *lev); 23 static void nxt_conn_listen_handler(nxt_task_t *task, void *obj, 24 void *data); 25 static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task, 26 nxt_listen_event_t *lev); 27 static nxt_int_t nxt_conn_accept_close_idle(nxt_task_t *task, 28 nxt_listen_event_t *lev); 29 static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj, 30 void *data); 31 static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, 32 void *data); 33 34 35 nxt_listen_event_t * 36 nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls) 37 { 38 nxt_listen_event_t *lev; 39 nxt_event_engine_t *engine; 40 41 lev = nxt_zalloc(sizeof(nxt_listen_event_t)); 42 43 if (nxt_fast_path(lev != NULL)) { 44 lev->socket.fd = ls->socket; 45 46 engine = task->thread->engine; 47 lev->batch = engine->batch; 48 lev->count = 1; 49 50 lev->socket.read_work_queue = &engine->accept_work_queue; 51 lev->socket.read_handler = nxt_conn_listen_handler; 52 lev->socket.error_handler = nxt_conn_listen_event_error; 53 lev->socket.log = &nxt_main_log; 54 55 lev->accept = engine->event.io->accept; 56 57 lev->listen = ls; 58 lev->work_queue = &engine->read_work_queue; 59 60 lev->timer.work_queue = &engine->fast_work_queue; 61 lev->timer.handler = nxt_conn_listen_timer_handler; 62 lev->timer.log = &nxt_main_log; 63 64 lev->task.thread = task->thread; 65 lev->task.log = &nxt_main_log; 66 lev->task.ident = nxt_task_next_ident(); 67 lev->socket.task = &lev->task; 68 lev->timer.task = &lev->task; 69 70 if (nxt_conn_accept_alloc(task, lev) != NULL) { 71 nxt_fd_event_enable_accept(engine, &lev->socket); 72 73 nxt_queue_insert_tail(&engine->listen_connections, &lev->link); 74 } 75 76 return lev; 77 } 78 79 return NULL; 80 } 81 82 83 static nxt_conn_t * 84 nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev) 85 { 86 nxt_mp_t *mp; 87 nxt_conn_t *c; 88 nxt_event_engine_t *engine; 89 90 engine = task->thread->engine; 91 92 if (engine->connections < engine->max_connections) { 93 94 mp = nxt_mp_create(1024, 128, 256, 32); 95 96 if (nxt_fast_path(mp != NULL)) { 97 c = nxt_conn_create(mp, lev->socket.task); 98 if (nxt_slow_path(c == NULL)) { 99 goto fail; 100 } 101 102 lev->next = c; 103 c->socket.read_work_queue = lev->socket.read_work_queue; 104 c->socket.write_ready = 1; 105 106 c->remote = nxt_sockaddr_cache_alloc(engine, lev->listen); 107 if (nxt_fast_path(c->remote != NULL)) { 108 return c; 109 } 110 } 111 112 fail: 113 114 nxt_mp_destroy(mp); 115 } 116 117 return NULL; 118 } 119 120 121 static void 122 nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data) 123 { 124 nxt_listen_event_t *lev; 125 126 lev = obj; 127 lev->ready = lev->batch; 128 129 lev->accept(task, lev, data); 130 } 131 132 133 void 134 nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data) 135 { 136 socklen_t socklen; 137 nxt_conn_t *c; 138 nxt_socket_t s; 139 struct sockaddr *sa; 140 nxt_listen_event_t *lev; 141 142 lev = obj; 143 c = lev->next; 144 145 lev->ready--; 146 lev->socket.read_ready = (lev->ready != 0); 147 148 sa = &c->remote->u.sockaddr; 149 socklen = c->remote->socklen; 150 /* 151 * The returned socklen is ignored here, because sockaddr_in and 152 * sockaddr_in6 socklens are not changed. As to unspecified sockaddr_un 153 * it is 3 byte length and already prepared, because old BSDs return zero 154 * socklen and do not update the sockaddr_un at all; Linux returns 2 byte 155 * socklen and updates only the sa_family part; other systems copy 3 bytes 156 * and truncate surplus zero part. Only bound sockaddr_un will be really 157 * truncated here. 158 */ 159 s = accept(lev->socket.fd, sa, &socklen); 160 161 if (s == -1) { 162 nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno); 163 return; 164 } 165 166 c->socket.fd = s; 167 168 #if (NXT_LINUX) 169 /* 170 * Linux does not inherit non-blocking mode 171 * from listen socket for accept()ed socket. 172 */ 173 if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) { 174 nxt_socket_close(task, s); 175 } 176 177 #endif 178 179 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s); 180 181 nxt_conn_accept(task, lev, c); 182 } 183 184 185 void 186 nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c) 187 { 188 nxt_conn_t *next; 189 190 nxt_sockaddr_text(c->remote); 191 192 nxt_debug(task, "client: %*s", 193 (size_t) c->remote->address_length, 194 nxt_sockaddr_address(c->remote)); 195 196 nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); 197 198 c->listen = lev; 199 lev->count++; 200 c->socket.data = NULL; 201 202 c->read_work_queue = lev->work_queue; 203 c->write_work_queue = lev->work_queue; 204 205 if (lev->listen->read_after_accept) { 206 207 //c->socket.read_ready = 1; 208 // lev->listen->handler(task, c, lev); 209 nxt_work_queue_add(c->read_work_queue, lev->listen->handler, 210 &c->task, c, lev); 211 212 } else { 213 nxt_work_queue_add(c->write_work_queue, lev->listen->handler, 214 &c->task, c, lev); 215 } 216 217 next = nxt_conn_accept_next(task, lev); 218 219 if (next != NULL && lev->socket.read_ready) { 220 nxt_work_queue_add(lev->socket.read_work_queue, 221 lev->accept, task, lev, next); 222 } 223 } 224 225 226 static nxt_conn_t * 227 nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev) 228 { 229 nxt_conn_t *c; 230 231 lev->next = NULL; 232 233 do { 234 c = nxt_conn_accept_alloc(task, lev); 235 236 if (nxt_fast_path(c != NULL)) { 237 return c; 238 } 239 240 } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK); 241 242 nxt_alert(task, "no available connections, " 243 "new connections are not accepted within 1s"); 244 245 return NULL; 246 } 247 248 249 static nxt_int_t 250 nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev) 251 { 252 nxt_conn_t *c; 253 nxt_queue_t *idle; 254 nxt_queue_link_t *link; 255 nxt_event_engine_t *engine; 256 257 static nxt_log_moderation_t nxt_idle_close_log_moderation = { 258 NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION 259 }; 260 261 engine = task->thread->engine; 262 263 idle = &engine->idle_connections; 264 265 for (link = nxt_queue_last(idle); 266 link != nxt_queue_head(idle); 267 link = nxt_queue_next(link)) 268 { 269 c = nxt_queue_link_data(link, nxt_conn_t, link); 270 271 if (!c->socket.read_ready) { 272 nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO, 273 task->log, "no available connections, " 274 "close idle connection"); 275 nxt_queue_remove(link); 276 nxt_conn_close(engine, c); 277 278 return NXT_OK; 279 } 280 } 281 282 nxt_timer_add(engine, &lev->timer, 1000); 283 284 nxt_fd_event_disable_read(engine, &lev->socket); 285 286 return NXT_DECLINED; 287 } 288 289 290 void 291 nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev, 292 const char *accept_syscall, nxt_err_t err) 293 { 294 static nxt_log_moderation_t nxt_accept_log_moderation = { 295 NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION 296 }; 297 298 lev->socket.read_ready = 0; 299 300 switch (err) { 301 302 case NXT_EAGAIN: 303 nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err); 304 return; 305 306 case ECONNABORTED: 307 nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN, 308 task->log, "%s(%d) failed %E", 309 accept_syscall, lev->socket.fd, err); 310 return; 311 312 case EMFILE: 313 case ENFILE: 314 case ENOBUFS: 315 case ENOMEM: 316 if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) { 317 nxt_alert(task, "%s(%d) failed %E, " 318 "new connections are not accepted within 1s", 319 accept_syscall, lev->socket.fd, err); 320 } 321 322 return; 323 324 default: 325 nxt_alert(task, "%s(%d) failed %E", 326 accept_syscall, lev->socket.fd, err); 327 return; 328 } 329 } 330 331 332 static void 333 nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data) 334 { 335 nxt_conn_t *c; 336 nxt_timer_t *timer; 337 nxt_listen_event_t *lev; 338 339 timer = obj; 340 341 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 342 c = lev->next; 343 344 if (c == NULL) { 345 c = nxt_conn_accept_next(task, lev); 346 347 if (c == NULL) { 348 return; 349 } 350 } 351 352 nxt_fd_event_enable_accept(task->thread->engine, &lev->socket); 353 354 lev->accept(task, lev, c); 355 } 356 357 358 static void 359 nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data) 360 { 361 nxt_fd_event_t *ev; 362 363 ev = obj; 364 365 nxt_alert(task, "accept(%d) event error", ev->fd); 366 } 367