1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * pollset has been introduced in AIX 5L 5.3. 12 * 13 * pollset_create() returns a pollset_t descriptor which is not 14 * a file descriptor, so it cannot be added to another pollset. 15 * The first pollset_create() call returns 0. 16 */ 17 18 19 #define NXT_POLLSET_ADD 0 20 #define NXT_POLLSET_UPDATE 1 21 #define NXT_POLLSET_CHANGE 2 22 #define NXT_POLLSET_DELETE 3 23 24 25 static nxt_int_t nxt_pollset_create(nxt_event_engine_t *engine, 26 nxt_uint_t mchanges, nxt_uint_t mevents); 27 static void nxt_pollset_free(nxt_event_engine_t *engine); 28 static void nxt_pollset_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 29 static void nxt_pollset_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); 30 static nxt_bool_t nxt_pollset_close(nxt_event_engine_t *engine, 31 nxt_fd_event_t *ev); 32 static void nxt_pollset_enable_read(nxt_event_engine_t *engine, 33 nxt_fd_event_t *ev); 34 static void nxt_pollset_enable_write(nxt_event_engine_t *engine, 35 nxt_fd_event_t *ev); 36 static void nxt_pollset_disable_read(nxt_event_engine_t *engine, 37 nxt_fd_event_t *ev); 38 static void nxt_pollset_disable_write(nxt_event_engine_t *engine, 39 nxt_fd_event_t *ev); 40 static void nxt_pollset_block_read(nxt_event_engine_t *engine, 41 nxt_fd_event_t *ev); 42 static void nxt_pollset_block_write(nxt_event_engine_t *engine, 43 nxt_fd_event_t *ev); 44 static void nxt_pollset_oneshot_read(nxt_event_engine_t *engine, 45 nxt_fd_event_t *ev); 46 static void nxt_pollset_oneshot_write(nxt_event_engine_t *engine, 47 nxt_fd_event_t *ev); 48 static void nxt_pollset_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 49 nxt_uint_t op, nxt_uint_t events); 50 static nxt_int_t nxt_pollset_commit_changes(nxt_event_engine_t *engine); 51 static void nxt_pollset_change_error(nxt_event_engine_t *engine, 52 nxt_fd_event_t *ev); 53 static void nxt_pollset_remove(nxt_event_engine_t *engine, nxt_fd_t fd); 54 static nxt_int_t nxt_pollset_write(nxt_event_engine_t *engine, 55 struct poll_ctl *ctl, int n); 56 static void nxt_pollset_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); 57 58 59 const nxt_event_interface_t nxt_pollset_engine = { 60 "pollset", 61 nxt_pollset_create, 62 nxt_pollset_free, 63 nxt_pollset_enable, 64 nxt_pollset_disable, 65 nxt_pollset_disable, 66 nxt_pollset_close, 67 nxt_pollset_enable_read, 68 nxt_pollset_enable_write, 69 nxt_pollset_disable_read, 70 nxt_pollset_disable_write, 71 nxt_pollset_block_read, 72 nxt_pollset_block_write, 73 nxt_pollset_oneshot_read, 74 nxt_pollset_oneshot_write, 75 nxt_pollset_enable_read, 76 NULL, 77 NULL, 78 NULL, 79 NULL, 80 nxt_pollset_poll, 81 82 &nxt_unix_event_conn_io, 83 84 NXT_NO_FILE_EVENTS, 85 NXT_NO_SIGNAL_EVENTS, 86 }; 87 88 89 static nxt_int_t 90 nxt_pollset_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 91 nxt_uint_t mevents) 92 { 93 void *changes; 94 95 engine->u.pollset.ps = -1; 96 engine->u.pollset.mchanges = mchanges; 97 engine->u.pollset.mevents = mevents; 98 99 changes = nxt_malloc(sizeof(nxt_pollset_change_t) * mchanges); 100 if (changes == NULL) { 101 goto fail; 102 } 103 104 engine->u.pollset.changes = changes; 105 106 /* 107 * NXT_POLLSET_CHANGE requires two struct poll_ctl's 108 * for PS_DELETE and subsequent PS_ADD. 109 */ 110 changes = nxt_malloc(2 * sizeof(struct poll_ctl) * mchanges); 111 if (changes == NULL) { 112 goto fail; 113 } 114 115 engine->u.pollset.write_changes = changes; 116 117 engine->u.pollset.events = nxt_malloc(sizeof(struct pollfd) * mevents); 118 if (engine->u.pollset.events == NULL) { 119 goto fail; 120 } 121 122 engine->u.pollset.ps = pollset_create(-1); 123 124 if (engine->u.pollset.ps == -1) { 125 nxt_log(&engine->task, NXT_LOG_CRIT, "pollset_create() failed %E", 126 nxt_errno); 127 goto fail; 128 } 129 130 nxt_debug(&engine->task, "pollset_create(): %d", engine->u.pollset.ps); 131 132 return NXT_OK; 133 134 fail: 135 136 nxt_pollset_free(engine); 137 138 return NXT_ERROR; 139 } 140 141 142 static void 143 nxt_pollset_free(nxt_event_engine_t *engine) 144 { 145 pollset_t ps; 146 147 ps = engine->u.pollset.ps; 148 149 nxt_debug(&engine->task, "pollset %d free", ps); 150 151 if (ps != -1 && pollset_destroy(ps) != 0) { 152 nxt_log(&engine->task, NXT_LOG_CRIT, "pollset_destroy(%d) failed %E", 153 ps, nxt_errno); 154 } 155 156 nxt_free(engine->u.pollset.events); 157 nxt_free(engine->u.pollset.write_changes); 158 nxt_free(engine->u.pollset.changes); 159 nxt_fd_event_hash_destroy(&engine->u.pollset.fd_hash); 160 161 nxt_memzero(&engine->u.pollset, sizeof(nxt_pollset_engine_t)); 162 } 163 164 165 static void 166 nxt_pollset_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 167 { 168 ev->read = NXT_EVENT_ACTIVE; 169 ev->write = NXT_EVENT_ACTIVE; 170 171 nxt_pollset_change(engine, ev, NXT_POLLSET_ADD, POLLIN | POLLOUT); 172 } 173 174 175 static void 176 nxt_pollset_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 177 { 178 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { 179 180 ev->read = NXT_EVENT_INACTIVE; 181 ev->write = NXT_EVENT_INACTIVE; 182 183 nxt_pollset_change(engine, ev, NXT_POLLSET_DELETE, 0); 184 } 185 } 186 187 188 /* 189 * A closed descriptor must be deleted from a pollset, otherwise next 190 * pollset_poll() will return POLLNVAL on it. However, pollset_ctl() 191 * allows to delete the already closed file descriptor from the pollset 192 * using PS_DELETE, so the removal can be batched, pollset_ctl(2): 193 * 194 * After a file descriptor is added to a pollset, the file descriptor will 195 * not be removed until a pollset_ctl call with the cmd of PS_DELETE is 196 * executed. The file descriptor remains in the pollset even if the file 197 * descriptor is closed. A pollset_poll operation on a pollset containing 198 * a closed file descriptor returns a POLLNVAL event for that file 199 * descriptor. If the file descriptor is later allocated to a new object, 200 * the new object will be polled on future pollset_poll calls. 201 */ 202 203 static nxt_bool_t 204 nxt_pollset_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 205 { 206 nxt_pollset_disable(engine, ev); 207 208 return ev->changing; 209 } 210 211 212 static void 213 nxt_pollset_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 214 { 215 nxt_uint_t op, events; 216 217 if (ev->read != NXT_EVENT_BLOCKED) { 218 219 events = POLLIN; 220 221 if (ev->write == NXT_EVENT_INACTIVE) { 222 op = NXT_POLLSET_ADD; 223 224 } else if (ev->write == NXT_EVENT_BLOCKED) { 225 ev->write = NXT_EVENT_INACTIVE; 226 op = NXT_POLLSET_CHANGE; 227 228 } else { 229 op = NXT_POLLSET_UPDATE; 230 events = POLLIN | POLLOUT; 231 } 232 233 nxt_pollset_change(engine, ev, op, events); 234 } 235 236 ev->read = NXT_EVENT_ACTIVE; 237 } 238 239 240 static void 241 nxt_pollset_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 242 { 243 nxt_uint_t op, events; 244 245 if (ev->write != NXT_EVENT_BLOCKED) { 246 247 events = POLLOUT; 248 249 if (ev->read == NXT_EVENT_INACTIVE) { 250 op = NXT_POLLSET_ADD; 251 252 } else if (ev->read == NXT_EVENT_BLOCKED) { 253 ev->read = NXT_EVENT_INACTIVE; 254 op = NXT_POLLSET_CHANGE; 255 256 } else { 257 op = NXT_POLLSET_UPDATE; 258 events = POLLIN | POLLOUT; 259 } 260 261 nxt_pollset_change(engine, ev, op, events); 262 } 263 264 ev->write = NXT_EVENT_ACTIVE; 265 } 266 267 268 static void 269 nxt_pollset_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 270 { 271 nxt_uint_t op, events; 272 273 ev->read = NXT_EVENT_INACTIVE; 274 275 if (ev->write <= NXT_EVENT_BLOCKED) { 276 ev->write = NXT_EVENT_INACTIVE; 277 op = NXT_POLLSET_DELETE; 278 events = POLLREMOVE; 279 280 } else { 281 op = NXT_POLLSET_CHANGE; 282 events = POLLOUT; 283 } 284 285 nxt_pollset_change(engine, ev, op, events); 286 } 287 288 289 static void 290 nxt_pollset_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 291 { 292 nxt_uint_t op, events; 293 294 ev->write = NXT_EVENT_INACTIVE; 295 296 if (ev->read <= NXT_EVENT_BLOCKED) { 297 ev->read = NXT_EVENT_INACTIVE; 298 op = NXT_POLLSET_DELETE; 299 events = POLLREMOVE; 300 301 } else { 302 op = NXT_POLLSET_CHANGE; 303 events = POLLIN; 304 } 305 306 nxt_pollset_change(engine, ev, op, events); 307 } 308 309 310 static void 311 nxt_pollset_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 312 { 313 if (ev->read != NXT_EVENT_INACTIVE) { 314 ev->read = NXT_EVENT_BLOCKED; 315 } 316 } 317 318 319 static void 320 nxt_pollset_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 321 { 322 if (ev->write != NXT_EVENT_INACTIVE) { 323 ev->write = NXT_EVENT_BLOCKED; 324 } 325 } 326 327 328 static void 329 nxt_pollset_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 330 { 331 nxt_pollset_enable_read(engine, ev); 332 333 ev->read = NXT_EVENT_ONESHOT; 334 } 335 336 337 static void 338 nxt_pollset_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 339 { 340 nxt_pollset_enable_write(engine, ev); 341 342 ev->write = NXT_EVENT_ONESHOT; 343 } 344 345 346 /* 347 * PS_ADD adds only a new file descriptor to a pollset. 348 * PS_DELETE removes a file descriptor from a pollset. 349 * 350 * PS_MOD can add a new file descriptor or modify events for a file 351 * descriptor which is already in a pollset. However, modified events 352 * are always ORed, so to delete an event for a file descriptor, 353 * the file descriptor must be removed using PS_DELETE and then 354 * added again without the event. 355 */ 356 357 static void 358 nxt_pollset_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 359 nxt_uint_t op, nxt_uint_t events) 360 { 361 nxt_pollset_change_t *change; 362 363 nxt_debug(ev->task, "pollset %d change fd:%d op:%ui ev:%04Xi", 364 engine->u.pollset.ps, ev->fd, op, events); 365 366 if (engine->u.pollset.nchanges >= engine->u.pollset.mchanges) { 367 (void) nxt_pollset_commit_changes(engine); 368 } 369 370 ev->changing = 1; 371 372 change = &engine->u.pollset.changes[engine->u.pollset.nchanges++]; 373 change->op = op; 374 change->cmd = (op == NXT_POLLSET_DELETE) ? PS_DELETE : PS_MOD; 375 change->events = events; 376 change->event = ev; 377 } 378 379 380 static nxt_int_t 381 nxt_pollset_commit_changes(nxt_event_engine_t *engine) 382 { 383 size_t n; 384 nxt_int_t ret, retval; 385 nxt_fd_event_t *ev; 386 struct poll_ctl *ctl, *write_changes; 387 nxt_pollset_change_t *change, *end; 388 389 nxt_debug(&engine->task, "pollset %d changes:%ui", 390 engine->u.pollset.ps, engine->u.pollset.nchanges); 391 392 retval = NXT_OK; 393 n = 0; 394 write_changes = engine->u.pollset.write_changes; 395 change = engine->u.pollset.changes; 396 end = change + engine->u.pollset.nchanges; 397 398 do { 399 ev = change->event; 400 ev->changing = 0; 401 402 nxt_debug(&engine->task, "pollset fd:%d op:%d ev:%04Xd", 403 ev->fd, change->op, change->events); 404 405 if (change->op == NXT_POLLSET_CHANGE) { 406 ctl = &write_changes[n++]; 407 ctl->cmd = PS_DELETE; 408 ctl->events = 0; 409 ctl->fd = ev->fd; 410 } 411 412 ctl = &write_changes[n++]; 413 ctl->cmd = change->cmd; 414 ctl->events = change->events; 415 ctl->fd = ev->fd; 416 417 change++; 418 419 } while (change < end); 420 421 change = engine->u.pollset.changes; 422 end = change + engine->u.pollset.nchanges; 423 424 ret = nxt_pollset_write(engine, write_changes, n); 425 426 if (nxt_slow_path(ret != NXT_OK)) { 427 428 do { 429 nxt_pollset_change_error(engine, change->event); 430 change++; 431 } while (change < end); 432 433 engine->u.pollset.nchanges = 0; 434 435 return NXT_ERROR; 436 } 437 438 do { 439 ev = change->event; 440 441 if (change->op == NXT_POLLSET_ADD) { 442 ret = nxt_fd_event_hash_add(&engine->u.pollset.fd_hash, ev->fd, ev); 443 444 if (nxt_slow_path(ret != NXT_OK)) { 445 nxt_pollset_change_error(engine, ev); 446 retval = NXT_ERROR; 447 } 448 449 } else if (change->op == NXT_POLLSET_DELETE) { 450 nxt_fd_event_hash_delete(&engine->task, &engine->u.pollset.fd_hash, 451 ev->fd, 0); 452 } 453 454 /* Nothing to do for NXT_POLLSET_UPDATE and NXT_POLLSET_CHANGE. */ 455 456 change++; 457 458 } while (change < end); 459 460 engine->u.pollset.nchanges = 0; 461 462 return retval; 463 } 464 465 466 static void 467 nxt_pollset_change_error(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 468 { 469 ev->read = NXT_EVENT_INACTIVE; 470 ev->write = NXT_EVENT_INACTIVE; 471 472 nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler, 473 ev->task, ev, ev->data); 474 475 nxt_fd_event_hash_delete(&engine->task, &engine->u.pollset.fd_hash, 476 ev->fd, 1); 477 478 nxt_pollset_remove(engine, ev->fd); 479 } 480 481 482 static void 483 nxt_pollset_remove(nxt_event_engine_t *engine, nxt_fd_t fd) 484 { 485 int n; 486 struct pollfd pfd; 487 struct poll_ctl ctl; 488 489 pfd.fd = fd; 490 pfd.events = 0; 491 pfd.revents = 0; 492 493 n = pollset_query(engine->u.pollset.ps, &pfd); 494 495 nxt_debug(&engine->task, "pollset_query(%d, %d): %d", 496 engine->u.pollset.ps, fd, n); 497 498 if (n == 0) { 499 /* The file descriptor is not in the pollset. */ 500 return; 501 } 502 503 if (n == -1) { 504 nxt_log(&engine->task, NXT_LOG_CRIT, "pollset_query(%d, %d) failed %E", 505 engine->u.pollset.ps, fd, nxt_errno); 506 /* Fall through. */ 507 } 508 509 /* n == 1: The file descriptor is in the pollset. */ 510 511 nxt_debug(&engine->task, "pollset %d remove fd:%d", 512 engine->u.pollset.ps, fd); 513 514 ctl.cmd = PS_DELETE; 515 ctl.events = 0; 516 ctl.fd = fd; 517 518 nxt_pollset_write(engine, &ctl, 1); 519 } 520 521 522 static nxt_int_t 523 nxt_pollset_write(nxt_event_engine_t *engine, struct poll_ctl *ctl, int n) 524 { 525 pollset_t ps; 526 527 ps = engine->u.pollset.ps; 528 529 nxt_debug(&engine->task, "pollset_ctl(%d) changes:%d", ps, n); 530 531 nxt_set_errno(0); 532 533 n = pollset_ctl(ps, ctl, n); 534 535 if (nxt_fast_path(n == 0)) { 536 return NXT_OK; 537 } 538 539 nxt_log(&engine->task, NXT_LOG_CRIT, "pollset_ctl(%d) failed: %d %E", 540 ps, n, nxt_errno); 541 542 return NXT_ERROR; 543 } 544 545 546 static void 547 nxt_pollset_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 548 { 549 int nevents; 550 nxt_fd_t fd; 551 nxt_int_t i; 552 nxt_err_t err; 553 nxt_uint_t events, level; 554 struct pollfd *pfd; 555 nxt_fd_event_t *ev; 556 557 if (engine->u.pollset.nchanges != 0) { 558 if (nxt_pollset_commit_changes(engine) != NXT_OK) { 559 /* Error handlers have been enqueued on failure. */ 560 timeout = 0; 561 } 562 } 563 564 nxt_debug(&engine->task, "pollset_poll(%d) timeout:%M", 565 engine->u.pollset.ps, timeout); 566 567 nevents = pollset_poll(engine->u.pollset.ps, engine->u.pollset.events, 568 engine->u.pollset.mevents, timeout); 569 570 err = (nevents == -1) ? nxt_errno : 0; 571 572 nxt_thread_time_update(engine->task.thread); 573 574 nxt_debug(&engine->task, "pollset_poll(%d): %d", 575 engine->u.pollset.ps, nevents); 576 577 if (nevents == -1) { 578 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT; 579 580 nxt_log(&engine->task, level, "pollset_poll(%d) failed %E", 581 engine->u.pollset.ps, err); 582 583 return; 584 } 585 586 for (i = 0; i < nevents; i++) { 587 588 pfd = &engine->u.pollset.events[i]; 589 fd = pfd->fd; 590 events = pfd->revents; 591 592 ev = nxt_fd_event_hash_get(&engine->task, &engine->u.pollset.fd_hash, 593 fd); 594 595 if (nxt_slow_path(ev == NULL)) { 596 nxt_log(&engine->task, NXT_LOG_CRIT, 597 "pollset_poll(%d) returned invalid " 598 "fd:%d ev:%04Xd rev:%04uXi", 599 engine->u.pollset.ps, fd, pfd->events, events); 600 601 nxt_pollset_remove(engine, fd); 602 continue; 603 } 604 605 nxt_debug(ev->task, "pollset: fd:%d ev:%04uXi", fd, events); 606 607 if (nxt_slow_path(events & (POLLERR | POLLHUP | POLLNVAL)) != 0) { 608 nxt_log(ev->task, NXT_LOG_CRIT, 609 "pollset_poll(%d) error fd:%d ev:%04Xd rev:%04uXi", 610 engine->u.pollset.ps, fd, pfd->events, events); 611 612 nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler, 613 ev->task, ev, ev->data); 614 continue; 615 } 616 617 if (events & POLLIN) { 618 ev->read_ready = 1; 619 620 if (ev->read != NXT_EVENT_BLOCKED) { 621 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 622 ev->task, ev, ev->data); 623 } 624 625 if (ev->read == NXT_EVENT_BLOCKED 626 || ev->read == NXT_EVENT_ONESHOT) 627 { 628 nxt_pollset_disable_read(engine, ev); 629 } 630 } 631 632 if (events & POLLOUT) { 633 ev->write_ready = 1; 634 635 if (ev->write != NXT_EVENT_BLOCKED) { 636 nxt_work_queue_add(ev->write_work_queue, ev->write_handler, 637 ev->task, ev, ev->data); 638 } 639 640 if (ev->write == NXT_EVENT_BLOCKED 641 || ev->write == NXT_EVENT_ONESHOT) 642 { 643 nxt_pollset_disable_write(engine, ev); 644 } 645 } 646 } 647 } 648