111 c->remote = nxt_sockaddr_cache_alloc(engine, lev); 112 if (nxt_fast_path(c->remote != NULL)) { 113 return c; 114 } 115 } 116 117 fail: 118 119 nxt_mp_destroy(mp); 120 } 121 122 return NULL; 123} 124 125 126static void 127nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data) 128{ 129 nxt_listen_event_t *lev; 130 131 lev = obj; 132 lev->ready = lev->batch; 133 134 lev->accept(task, lev, data); 135} 136 137 138void 139nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data) 140{ 141 socklen_t socklen; 142 nxt_conn_t *c; 143 nxt_socket_t s; 144 struct sockaddr *sa; 145 nxt_listen_event_t *lev; 146 147 lev = obj; 148 c = lev->next; 149 150 lev->ready--; 151 lev->socket.read_ready = (lev->ready != 0); 152 153 sa = &c->remote->u.sockaddr; 154 socklen = c->remote->socklen; 155 /* 156 * The returned socklen is ignored here, because sockaddr_in and 157 * sockaddr_in6 socklens are not changed. As to unspecified sockaddr_un 158 * it is 3 byte length and already prepared, because old BSDs return zero 159 * socklen and do not update the sockaddr_un at all; Linux returns 2 byte 160 * socklen and updates only the sa_family part; other systems copy 3 bytes 161 * and truncate surplus zero part. Only bound sockaddr_un will be really 162 * truncated here. 163 */ 164 s = accept(lev->socket.fd, sa, &socklen); 165 166 if (s == -1) { 167 nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno); 168 return; 169 } 170 171 c->socket.fd = s; 172 173#if (NXT_LINUX) 174 /* 175 * Linux does not inherit non-blocking mode 176 * from listen socket for accept()ed socket. 177 */ 178 if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) { 179 nxt_socket_close(task, s); 180 } 181 182#endif 183 184 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s); 185 186 nxt_conn_accept(task, lev, c); 187} 188 189 190void 191nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c) 192{ 193 nxt_conn_t *next; 194 195 nxt_sockaddr_text(c->remote); 196 197 nxt_debug(task, "client: %*s", 198 c->remote->address_length, nxt_sockaddr_address(c->remote)); 199 200 nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); 201 202 c->read_work_queue = lev->work_queue; 203 c->write_work_queue = lev->work_queue; 204 205 if (lev->listen->read_after_accept) { 206 207 //c->socket.read_ready = 1; 208// lev->listen->handler(task, c, lev->socket.data); 209 nxt_work_queue_add(c->read_work_queue, lev->listen->handler, 210 &c->task, c, lev->socket.data); 211 212 } else { 213 nxt_work_queue_add(c->write_work_queue, lev->listen->handler, 214 &c->task, c, lev->socket.data); 215 } 216 217 next = nxt_conn_accept_next(task, lev); 218 219 if (next != NULL && lev->socket.read_ready) { 220 nxt_work_queue_add(lev->socket.read_work_queue, 221 lev->accept, task, lev, next); 222 } 223} 224 225 226static nxt_conn_t * 227nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev) 228{ 229 nxt_conn_t *c; 230 231 lev->next = NULL; 232 233 do { 234 c = nxt_conn_accept_alloc(task, lev); 235 236 if (nxt_fast_path(c != NULL)) { 237 return c; 238 } 239 240 } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK); 241 242 nxt_log(task, NXT_LOG_CRIT, "no available connections, " 243 "new connections are not accepted within 1s"); 244 245 return NULL; 246} 247 248 249static nxt_int_t 250nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev) 251{ 252 nxt_conn_t *c; 253 nxt_queue_t *idle; 254 nxt_queue_link_t *link; 255 nxt_event_engine_t *engine; 256 257 static nxt_log_moderation_t nxt_idle_close_log_moderation = { 258 NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION 259 }; 260 261 engine = task->thread->engine; 262 263 idle = &engine->idle_connections; 264 265 for (link = nxt_queue_last(idle); 266 link != nxt_queue_head(idle); 267 link = nxt_queue_next(link)) 268 { 269 c = nxt_queue_link_data(link, nxt_conn_t, link); 270 271 if (!c->socket.read_ready) { 272 nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO, 273 task->log, "no available connections, " 274 "close idle connection"); 275 nxt_queue_remove(link); 276 nxt_conn_close(engine, c); 277 278 return NXT_OK; 279 } 280 } 281 282 nxt_timer_add(engine, &lev->timer, 1000); 283 284 nxt_fd_event_disable_read(engine, &lev->socket); 285 286 return NXT_DECLINED; 287} 288 289 290void 291nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev, 292 const char *accept_syscall, nxt_err_t err) 293{ 294 static nxt_log_moderation_t nxt_accept_log_moderation = { 295 NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION 296 }; 297 298 lev->socket.read_ready = 0; 299 300 switch (err) { 301 302 case NXT_EAGAIN: 303 nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err); 304 return; 305 306 case ECONNABORTED: 307 nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN, 308 task->log, "%s(%d) failed %E", 309 accept_syscall, lev->socket.fd, err); 310 return; 311 312 case EMFILE: 313 case ENFILE: 314 case ENOBUFS: 315 case ENOMEM: 316 if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) { 317 nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, " 318 "new connections are not accepted within 1s", 319 accept_syscall, lev->socket.fd, err); 320 } 321 322 return; 323 324 default: 325 nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E", 326 accept_syscall, lev->socket.fd, err); 327 return; 328 } 329} 330 331 332static void 333nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data) 334{ 335 nxt_conn_t *c; 336 nxt_timer_t *timer; 337 nxt_listen_event_t *lev; 338 339 timer = obj; 340 341 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 342 c = lev->next; 343 344 if (c == NULL) { 345 c = nxt_conn_accept_next(task, lev); 346 347 if (c == NULL) { 348 return; 349 } 350 } 351 352 nxt_fd_event_enable_accept(task->thread->engine, &lev->socket); 353 354 lev->accept(task, lev, c); 355} 356 357 358static void 359nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data) 360{ 361 nxt_fd_event_t *ev; 362 363 ev = obj; 364 365 nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd); 366}
| 108 c->remote = nxt_sockaddr_cache_alloc(engine, lev); 109 if (nxt_fast_path(c->remote != NULL)) { 110 return c; 111 } 112 } 113 114 fail: 115 116 nxt_mp_destroy(mp); 117 } 118 119 return NULL; 120} 121 122 123static void 124nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data) 125{ 126 nxt_listen_event_t *lev; 127 128 lev = obj; 129 lev->ready = lev->batch; 130 131 lev->accept(task, lev, data); 132} 133 134 135void 136nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data) 137{ 138 socklen_t socklen; 139 nxt_conn_t *c; 140 nxt_socket_t s; 141 struct sockaddr *sa; 142 nxt_listen_event_t *lev; 143 144 lev = obj; 145 c = lev->next; 146 147 lev->ready--; 148 lev->socket.read_ready = (lev->ready != 0); 149 150 sa = &c->remote->u.sockaddr; 151 socklen = c->remote->socklen; 152 /* 153 * The returned socklen is ignored here, because sockaddr_in and 154 * sockaddr_in6 socklens are not changed. As to unspecified sockaddr_un 155 * it is 3 byte length and already prepared, because old BSDs return zero 156 * socklen and do not update the sockaddr_un at all; Linux returns 2 byte 157 * socklen and updates only the sa_family part; other systems copy 3 bytes 158 * and truncate surplus zero part. Only bound sockaddr_un will be really 159 * truncated here. 160 */ 161 s = accept(lev->socket.fd, sa, &socklen); 162 163 if (s == -1) { 164 nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno); 165 return; 166 } 167 168 c->socket.fd = s; 169 170#if (NXT_LINUX) 171 /* 172 * Linux does not inherit non-blocking mode 173 * from listen socket for accept()ed socket. 174 */ 175 if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) { 176 nxt_socket_close(task, s); 177 } 178 179#endif 180 181 nxt_debug(task, "accept(%d): %d", lev->socket.fd, s); 182 183 nxt_conn_accept(task, lev, c); 184} 185 186 187void 188nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c) 189{ 190 nxt_conn_t *next; 191 192 nxt_sockaddr_text(c->remote); 193 194 nxt_debug(task, "client: %*s", 195 c->remote->address_length, nxt_sockaddr_address(c->remote)); 196 197 nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); 198 199 c->read_work_queue = lev->work_queue; 200 c->write_work_queue = lev->work_queue; 201 202 if (lev->listen->read_after_accept) { 203 204 //c->socket.read_ready = 1; 205// lev->listen->handler(task, c, lev->socket.data); 206 nxt_work_queue_add(c->read_work_queue, lev->listen->handler, 207 &c->task, c, lev->socket.data); 208 209 } else { 210 nxt_work_queue_add(c->write_work_queue, lev->listen->handler, 211 &c->task, c, lev->socket.data); 212 } 213 214 next = nxt_conn_accept_next(task, lev); 215 216 if (next != NULL && lev->socket.read_ready) { 217 nxt_work_queue_add(lev->socket.read_work_queue, 218 lev->accept, task, lev, next); 219 } 220} 221 222 223static nxt_conn_t * 224nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev) 225{ 226 nxt_conn_t *c; 227 228 lev->next = NULL; 229 230 do { 231 c = nxt_conn_accept_alloc(task, lev); 232 233 if (nxt_fast_path(c != NULL)) { 234 return c; 235 } 236 237 } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK); 238 239 nxt_log(task, NXT_LOG_CRIT, "no available connections, " 240 "new connections are not accepted within 1s"); 241 242 return NULL; 243} 244 245 246static nxt_int_t 247nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev) 248{ 249 nxt_conn_t *c; 250 nxt_queue_t *idle; 251 nxt_queue_link_t *link; 252 nxt_event_engine_t *engine; 253 254 static nxt_log_moderation_t nxt_idle_close_log_moderation = { 255 NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION 256 }; 257 258 engine = task->thread->engine; 259 260 idle = &engine->idle_connections; 261 262 for (link = nxt_queue_last(idle); 263 link != nxt_queue_head(idle); 264 link = nxt_queue_next(link)) 265 { 266 c = nxt_queue_link_data(link, nxt_conn_t, link); 267 268 if (!c->socket.read_ready) { 269 nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO, 270 task->log, "no available connections, " 271 "close idle connection"); 272 nxt_queue_remove(link); 273 nxt_conn_close(engine, c); 274 275 return NXT_OK; 276 } 277 } 278 279 nxt_timer_add(engine, &lev->timer, 1000); 280 281 nxt_fd_event_disable_read(engine, &lev->socket); 282 283 return NXT_DECLINED; 284} 285 286 287void 288nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev, 289 const char *accept_syscall, nxt_err_t err) 290{ 291 static nxt_log_moderation_t nxt_accept_log_moderation = { 292 NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION 293 }; 294 295 lev->socket.read_ready = 0; 296 297 switch (err) { 298 299 case NXT_EAGAIN: 300 nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err); 301 return; 302 303 case ECONNABORTED: 304 nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN, 305 task->log, "%s(%d) failed %E", 306 accept_syscall, lev->socket.fd, err); 307 return; 308 309 case EMFILE: 310 case ENFILE: 311 case ENOBUFS: 312 case ENOMEM: 313 if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) { 314 nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, " 315 "new connections are not accepted within 1s", 316 accept_syscall, lev->socket.fd, err); 317 } 318 319 return; 320 321 default: 322 nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E", 323 accept_syscall, lev->socket.fd, err); 324 return; 325 } 326} 327 328 329static void 330nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data) 331{ 332 nxt_conn_t *c; 333 nxt_timer_t *timer; 334 nxt_listen_event_t *lev; 335 336 timer = obj; 337 338 lev = nxt_timer_data(timer, nxt_listen_event_t, timer); 339 c = lev->next; 340 341 if (c == NULL) { 342 c = nxt_conn_accept_next(task, lev); 343 344 if (c == NULL) { 345 return; 346 } 347 } 348 349 nxt_fd_event_enable_accept(task->thread->engine, &lev->socket); 350 351 lev->accept(task, lev, c); 352} 353 354 355static void 356nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data) 357{ 358 nxt_fd_event_t *ev; 359 360 ev = obj; 361 362 nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd); 363}
|