1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * The event ports have been introduced in Solaris 10. 12 * The PORT_SOURCE_MQ and PORT_SOURCE_FILE sources have 13 * been added in OpenSolaris. 14 */ 15 16 17 static nxt_int_t nxt_eventport_create(nxt_event_engine_t *engine, 18 nxt_uint_t mchanges, nxt_uint_t mevents); 19 static void nxt_eventport_free(nxt_event_engine_t *engine); 20 static void nxt_eventport_enable(nxt_event_engine_t *engine, 21 nxt_fd_event_t *ev); 22 static void nxt_eventport_disable(nxt_event_engine_t *engine, 23 nxt_fd_event_t *ev); 24 static nxt_bool_t nxt_eventport_close(nxt_event_engine_t *engine, 25 nxt_fd_event_t *ev); 26 static void nxt_eventport_enable_read(nxt_event_engine_t *engine, 27 nxt_fd_event_t *ev); 28 static void nxt_eventport_enable_write(nxt_event_engine_t *engine, 29 nxt_fd_event_t *ev); 30 static void nxt_eventport_enable_event(nxt_event_engine_t *engine, 31 nxt_fd_event_t *ev, nxt_uint_t events); 32 static void nxt_eventport_disable_read(nxt_event_engine_t *engine, 33 nxt_fd_event_t *ev); 34 static void nxt_eventport_disable_write(nxt_event_engine_t *engine, 35 nxt_fd_event_t *ev); 36 static void nxt_eventport_disable_event(nxt_event_engine_t *engine, 37 nxt_fd_event_t *ev); 38 static nxt_int_t nxt_eventport_commit_changes(nxt_event_engine_t *engine); 39 static void nxt_eventport_error_handler(nxt_task_t *task, void *obj, 40 void *data); 41 static void nxt_eventport_block_read(nxt_event_engine_t *engine, 42 nxt_fd_event_t *ev); 43 static void nxt_eventport_block_write(nxt_event_engine_t *engine, 44 nxt_fd_event_t *ev); 45 static void nxt_eventport_oneshot_read(nxt_event_engine_t *engine, 46 nxt_fd_event_t *ev); 47 static void nxt_eventport_oneshot_write(nxt_event_engine_t *engine, 48 nxt_fd_event_t *ev); 49 static void nxt_eventport_enable_accept(nxt_event_engine_t *engine, 50 nxt_fd_event_t *ev); 51 static nxt_int_t nxt_eventport_enable_post(nxt_event_engine_t *engine, 52 nxt_work_handler_t handler); 53 static void nxt_eventport_signal(nxt_event_engine_t *engine, nxt_uint_t signo); 54 static void nxt_eventport_poll(nxt_event_engine_t *engine, 55 nxt_msec_t timeout); 56 57 58 const nxt_event_interface_t nxt_eventport_engine = { 59 "eventport", 60 nxt_eventport_create, 61 nxt_eventport_free, 62 nxt_eventport_enable, 63 nxt_eventport_disable, 64 nxt_eventport_disable, 65 nxt_eventport_close, 66 nxt_eventport_enable_read, 67 nxt_eventport_enable_write, 68 nxt_eventport_disable_read, 69 nxt_eventport_disable_write, 70 nxt_eventport_block_read, 71 nxt_eventport_block_write, 72 nxt_eventport_oneshot_read, 73 nxt_eventport_oneshot_write, 74 nxt_eventport_enable_accept, 75 NULL, 76 NULL, 77 nxt_eventport_enable_post, 78 nxt_eventport_signal, 79 nxt_eventport_poll, 80 81 &nxt_unix_event_conn_io, 82 83 NXT_NO_FILE_EVENTS, 84 NXT_NO_SIGNAL_EVENTS, 85 }; 86 87 88 static nxt_int_t 89 nxt_eventport_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, 90 nxt_uint_t mevents) 91 { 92 nxt_eventport_change_t *changes; 93 94 engine->u.eventport.fd = -1; 95 engine->u.eventport.mchanges = mchanges; 96 engine->u.eventport.mevents = mevents; 97 98 changes = nxt_malloc(sizeof(nxt_eventport_change_t) * mchanges); 99 if (changes == NULL) { 100 goto fail; 101 } 102 103 engine->u.eventport.changes = changes; 104 105 engine->u.eventport.events = nxt_malloc(sizeof(port_event_t) * mevents); 106 if (engine->u.eventport.events == NULL) { 107 goto fail; 108 } 109 110 engine->u.eventport.fd = port_create(); 111 if (engine->u.eventport.fd == -1) { 112 nxt_log(&engine->task, NXT_LOG_CRIT, "port_create() failed %E", 113 nxt_errno); 114 goto fail; 115 } 116 117 nxt_debug(&engine->task, "port_create(): %d", engine->u.eventport.fd); 118 119 if (engine->signals != NULL) { 120 engine->u.eventport.signal_handler = engine->signals->handler; 121 } 122 123 return NXT_OK; 124 125 fail: 126 127 nxt_eventport_free(engine); 128 129 return NXT_ERROR; 130 } 131 132 133 static void 134 nxt_eventport_free(nxt_event_engine_t *engine) 135 { 136 int port; 137 138 port = engine->u.eventport.fd; 139 140 nxt_debug(&engine->task, "eventport %d free", port); 141 142 if (port != -1 && close(port) != 0) { 143 nxt_log(&engine->task, NXT_LOG_CRIT, "eventport close(%d) failed %E", 144 port, nxt_errno); 145 } 146 147 nxt_free(engine->u.eventport.events); 148 149 nxt_memzero(&engine->u.eventport, sizeof(nxt_eventport_engine_t)); 150 } 151 152 153 static void 154 nxt_eventport_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 155 { 156 ev->read = NXT_EVENT_ACTIVE; 157 ev->write = NXT_EVENT_ACTIVE; 158 159 nxt_eventport_enable_event(engine, ev, POLLIN | POLLOUT); 160 } 161 162 163 static void 164 nxt_eventport_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 165 { 166 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { 167 168 ev->read = NXT_EVENT_INACTIVE; 169 ev->write = NXT_EVENT_INACTIVE; 170 171 nxt_eventport_disable_event(engine, ev); 172 } 173 } 174 175 176 /* 177 * port_dissociate(3): 178 * 179 * The association is removed if the owner of the association closes the port. 180 */ 181 182 static nxt_bool_t 183 nxt_eventport_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 184 { 185 ev->read = NXT_EVENT_INACTIVE; 186 ev->write = NXT_EVENT_INACTIVE; 187 188 return ev->changing; 189 } 190 191 192 static void 193 nxt_eventport_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 194 { 195 nxt_uint_t events; 196 197 if (ev->read != NXT_EVENT_BLOCKED) { 198 events = (ev->write == NXT_EVENT_INACTIVE) ? POLLIN 199 : (POLLIN | POLLOUT); 200 nxt_eventport_enable_event(engine, ev, events); 201 } 202 203 ev->read = NXT_EVENT_ACTIVE; 204 } 205 206 207 static void 208 nxt_eventport_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 209 { 210 nxt_uint_t events; 211 212 if (ev->write != NXT_EVENT_BLOCKED) { 213 events = (ev->read == NXT_EVENT_INACTIVE) ? POLLOUT 214 : (POLLIN | POLLOUT); 215 nxt_eventport_enable_event(engine, ev, events); 216 } 217 218 ev->write = NXT_EVENT_ACTIVE; 219 } 220 221 222 /* 223 * eventport changes are batched to improve instruction and data 224 * cache locality of several port_associate() and port_dissociate() 225 * calls followed by port_getn() call. 226 */ 227 228 static void 229 nxt_eventport_enable_event(nxt_event_engine_t *engine, nxt_fd_event_t *ev, 230 nxt_uint_t events) 231 { 232 nxt_eventport_change_t *change; 233 234 nxt_debug(ev->task, "port %d set event: fd:%d ev:%04XD u:%p", 235 engine->u.eventport.fd, ev->fd, events, ev); 236 237 if (engine->u.eventport.nchanges >= engine->u.eventport.mchanges) { 238 (void) nxt_eventport_commit_changes(engine); 239 } 240 241 ev->changing = 1; 242 243 change = &engine->u.eventport.changes[engine->u.eventport.nchanges++]; 244 change->events = events; 245 change->event = ev; 246 } 247 248 249 static void 250 nxt_eventport_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 251 { 252 ev->read = NXT_EVENT_INACTIVE; 253 254 if (ev->write == NXT_EVENT_INACTIVE) { 255 nxt_eventport_disable_event(engine, ev); 256 257 } else { 258 nxt_eventport_enable_event(engine, ev, POLLOUT); 259 } 260 } 261 262 263 static void 264 nxt_eventport_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 265 { 266 ev->write = NXT_EVENT_INACTIVE; 267 268 if (ev->read == NXT_EVENT_INACTIVE) { 269 nxt_eventport_disable_event(engine, ev); 270 271 } else { 272 nxt_eventport_enable_event(engine, ev, POLLIN); 273 } 274 } 275 276 277 static void 278 nxt_eventport_disable_event(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 279 { 280 nxt_eventport_change_t *change; 281 282 nxt_debug(ev->task, "port %d disable event : fd:%d", 283 engine->u.eventport.fd, ev->fd); 284 285 if (engine->u.eventport.nchanges >= engine->u.eventport.mchanges) { 286 (void) nxt_eventport_commit_changes(engine); 287 } 288 289 ev->changing = 1; 290 291 change = &engine->u.eventport.changes[engine->u.eventport.nchanges++]; 292 change->events = 0; 293 change->event = ev; 294 } 295 296 297 static nxt_int_t 298 nxt_eventport_commit_changes(nxt_event_engine_t *engine) 299 { 300 int ret, port; 301 nxt_int_t retval; 302 nxt_fd_event_t *ev; 303 nxt_eventport_change_t *change, *end; 304 305 port = engine->u.eventport.fd; 306 307 nxt_debug(&engine->task, "eventport %d changes:%ui", 308 port, engine->u.eventport.nchanges); 309 310 retval = NXT_OK; 311 change = engine->u.eventport.changes; 312 end = change + engine->u.eventport.nchanges; 313 314 do { 315 ev = change->event; 316 ev->changing = 0; 317 318 if (change->events != 0) { 319 nxt_debug(ev->task, "port_associate(%d): fd:%d ev:%04XD u:%p", 320 port, ev->fd, change->events, ev); 321 322 ret = port_associate(port, PORT_SOURCE_FD, 323 ev->fd, change->events, ev); 324 325 if (nxt_fast_path(ret == 0)) { 326 goto next; 327 } 328 329 nxt_log(ev->task, NXT_LOG_CRIT, 330 "port_associate(%d, %d, %d, %04XD) failed %E", 331 port, PORT_SOURCE_FD, ev->fd, change->events, nxt_errno); 332 333 } else { 334 nxt_debug(ev->task, "port_dissociate(%d): fd:%d", port, ev->fd); 335 336 ret = port_dissociate(port, PORT_SOURCE_FD, ev->fd); 337 338 if (nxt_fast_path(ret == 0)) { 339 goto next; 340 } 341 342 nxt_log(ev->task, NXT_LOG_CRIT, 343 "port_dissociate(%d, %d, %d) failed %E", 344 port, PORT_SOURCE_FD, ev->fd, nxt_errno); 345 } 346 347 nxt_work_queue_add(&engine->fast_work_queue, 348 nxt_eventport_error_handler, 349 ev->task, ev, ev->data); 350 351 retval = NXT_ERROR; 352 353 next: 354 355 change++; 356 357 } while (change < end); 358 359 engine->u.eventport.nchanges = 0; 360 361 return retval; 362 } 363 364 365 static void 366 nxt_eventport_error_handler(nxt_task_t *task, void *obj, void *data) 367 { 368 nxt_fd_event_t *ev; 369 370 ev = obj; 371 372 ev->read = NXT_EVENT_INACTIVE; 373 ev->write = NXT_EVENT_INACTIVE; 374 375 ev->error_handler(task, ev, data); 376 } 377 378 379 static void 380 nxt_eventport_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 381 { 382 if (ev->read != NXT_EVENT_INACTIVE) { 383 ev->read = NXT_EVENT_BLOCKED; 384 } 385 } 386 387 388 static void 389 nxt_eventport_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 390 { 391 if (ev->write != NXT_EVENT_INACTIVE) { 392 ev->write = NXT_EVENT_BLOCKED; 393 } 394 } 395 396 397 static void 398 nxt_eventport_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 399 { 400 if (ev->read == NXT_EVENT_INACTIVE) { 401 ev->read = NXT_EVENT_ACTIVE; 402 403 nxt_eventport_enable_event(engine, ev, POLLIN); 404 } 405 } 406 407 408 static void 409 nxt_eventport_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 410 { 411 if (ev->write == NXT_EVENT_INACTIVE) { 412 ev->write = NXT_EVENT_ACTIVE; 413 414 nxt_eventport_enable_event(engine, ev, POLLOUT); 415 } 416 } 417 418 419 static void 420 nxt_eventport_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) 421 { 422 ev->read = NXT_EVENT_LEVEL; 423 424 nxt_eventport_enable_event(engine, ev, POLLIN); 425 } 426 427 428 static nxt_int_t 429 nxt_eventport_enable_post(nxt_event_engine_t *engine, 430 nxt_work_handler_t handler) 431 { 432 engine->u.eventport.post_handler = handler; 433 434 return NXT_OK; 435 } 436 437 438 static void 439 nxt_eventport_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 440 { 441 int port; 442 443 port = engine->u.eventport.fd; 444 445 nxt_debug(&engine->task, "port_send(%d, %ui)", port, signo); 446 447 if (port_send(port, signo, NULL) != 0) { 448 nxt_log(&engine->task, NXT_LOG_CRIT, "port_send(%d) failed %E", 449 port, nxt_errno); 450 } 451 } 452 453 454 static void 455 nxt_eventport_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) 456 { 457 int n, events, signo; 458 uint_t nevents; 459 nxt_err_t err; 460 nxt_uint_t i, level; 461 timespec_t ts, *tp; 462 port_event_t *event; 463 nxt_fd_event_t *ev; 464 nxt_work_handler_t handler; 465 466 if (engine->u.eventport.nchanges != 0) { 467 if (nxt_eventport_commit_changes(engine) != NXT_OK) { 468 /* Error handlers have been enqueued on failure. */ 469 timeout = 0; 470 } 471 } 472 473 if (timeout == NXT_INFINITE_MSEC) { 474 tp = NULL; 475 476 } else { 477 ts.tv_sec = timeout / 1000; 478 ts.tv_nsec = (timeout % 1000) * 1000000; 479 tp = &ts; 480 } 481 482 nxt_debug(&engine->task, "port_getn(%d) timeout: %M", 483 engine->u.eventport.fd, timeout); 484 485 /* 486 * A trap for possible error when Solaris does not update nevents 487 * if ETIME or EINTR is returned. This issue will be logged as 488 * "unexpected port_getn() event". 489 * 490 * The details are in OpenSolaris mailing list thread "port_getn() 491 * and timeouts - is this a bug or an undocumented feature?" 492 */ 493 event = &engine->u.eventport.events[0]; 494 event->portev_events = -1; /* invalid port events */ 495 event->portev_source = -1; /* invalid port source */ 496 event->portev_object = -1; 497 event->portev_user = (void *) -1; 498 499 nevents = 1; 500 n = port_getn(engine->u.eventport.fd, engine->u.eventport.events, 501 engine->u.eventport.mevents, &nevents, tp); 502 503 /* 504 * 32-bit port_getn() on Solaris 10 x86 returns large negative 505 * values instead of 0 when returning immediately. 506 */ 507 err = (n < 0) ? nxt_errno : 0; 508 509 nxt_thread_time_update(engine->task.thread); 510 511 if (n == -1) { 512 if (err == NXT_ETIME || err == NXT_EINTR) { 513 if (nevents != 0) { 514 nxt_log(&engine->task, NXT_LOG_CRIT, 515 "port_getn(%d) failed %E, events:%ud", 516 engine->u.eventport.fd, err, nevents); 517 } 518 } 519 520 if (err != NXT_ETIME) { 521 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT; 522 523 nxt_log(&engine->task, level, "port_getn(%d) failed %E", 524 engine->u.eventport.fd, err); 525 526 if (err != NXT_EINTR) { 527 return; 528 } 529 } 530 } 531 532 nxt_debug(&engine->task, "port_getn(%d) events: %d", 533 engine->u.eventport.fd, nevents); 534 535 for (i = 0; i < nevents; i++) { 536 event = &engine->u.eventport.events[i]; 537 538 switch (event->portev_source) { 539 540 case PORT_SOURCE_FD: 541 ev = event->portev_user; 542 events = event->portev_events; 543 544 nxt_debug(ev->task, "eventport: fd:%d ev:%04Xd u:%p rd:%d wr:%d", 545 event->portev_object, events, ev, ev->read, ev->write); 546 547 if (nxt_slow_path(events & (POLLERR | POLLHUP | POLLNVAL)) != 0) { 548 nxt_log(ev->task, NXT_LOG_CRIT, 549 "port_getn(%d) error fd:%d events:%04Xud", 550 engine->u.eventport.fd, ev->fd, events); 551 552 nxt_work_queue_add(&engine->fast_work_queue, 553 nxt_eventport_error_handler, 554 ev->task, ev, ev->data); 555 continue; 556 } 557 558 if (events & POLLIN) { 559 ev->read_ready = 1; 560 561 if (ev->read != NXT_EVENT_BLOCKED) { 562 nxt_work_queue_add(ev->read_work_queue, ev->read_handler, 563 ev->task, ev, ev->data); 564 565 } 566 567 if (ev->read != NXT_EVENT_LEVEL) { 568 ev->read = NXT_EVENT_INACTIVE; 569 } 570 } 571 572 if (events & POLLOUT) { 573 ev->write_ready = 1; 574 575 if (ev->write != NXT_EVENT_BLOCKED) { 576 nxt_work_queue_add(ev->write_work_queue, ev->write_handler, 577 ev->task, ev, ev->data); 578 } 579 580 ev->write = NXT_EVENT_INACTIVE; 581 } 582 583 /* 584 * Reactivate counterpart direction, because the 585 * eventport is oneshot notification facility. 586 */ 587 events = (ev->read == NXT_EVENT_INACTIVE) ? 0 : POLLIN; 588 events |= (ev->write == NXT_EVENT_INACTIVE) ? 0 : POLLOUT; 589 590 if (events != 0) { 591 nxt_eventport_enable_event(engine, ev, events); 592 } 593 594 break; 595 596 case PORT_SOURCE_USER: 597 nxt_debug(&engine->task, "eventport: user ev:%d u:%p", 598 event->portev_events, event->portev_user); 599 600 signo = event->portev_events; 601 602 handler = (signo == 0) ? engine->u.eventport.post_handler 603 : engine->u.eventport.signal_handler; 604 605 nxt_work_queue_add(&engine->fast_work_queue, handler, 606 &engine->task, (void *) (uintptr_t) signo, NULL); 607 608 break; 609 610 default: 611 nxt_log(&engine->task, NXT_LOG_CRIT, 612 "unexpected port_getn(%d) event: ev:%d src:%d obj:%p u:%p", 613 engine->u.eventport.fd, event->portev_events, 614 event->portev_source, event->portev_object, 615 event->portev_user); 616 } 617 } 618 } 619