1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 /*
11 * pollset has been introduced in AIX 5L 5.3.
12 *
13 * pollset_create() returns a pollset_t descriptor which is not
14 * a file descriptor, so it cannot be added to another pollset.
15 * The first pollset_create() call returns 0.
16 */
17
18
19 #define NXT_POLLSET_ADD 0
20 #define NXT_POLLSET_UPDATE 1
21 #define NXT_POLLSET_CHANGE 2
22 #define NXT_POLLSET_DELETE 3
23
24
25 static nxt_int_t nxt_pollset_create(nxt_event_engine_t *engine,
26 nxt_uint_t mchanges, nxt_uint_t mevents);
27 static void nxt_pollset_free(nxt_event_engine_t *engine);
28 static void nxt_pollset_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
29 static void nxt_pollset_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
30 static nxt_bool_t nxt_pollset_close(nxt_event_engine_t *engine,
31 nxt_fd_event_t *ev);
32 static void nxt_pollset_enable_read(nxt_event_engine_t *engine,
33 nxt_fd_event_t *ev);
34 static void nxt_pollset_enable_write(nxt_event_engine_t *engine,
35 nxt_fd_event_t *ev);
36 static void nxt_pollset_disable_read(nxt_event_engine_t *engine,
37 nxt_fd_event_t *ev);
38 static void nxt_pollset_disable_write(nxt_event_engine_t *engine,
39 nxt_fd_event_t *ev);
40 static void nxt_pollset_block_read(nxt_event_engine_t *engine,
41 nxt_fd_event_t *ev);
42 static void nxt_pollset_block_write(nxt_event_engine_t *engine,
43 nxt_fd_event_t *ev);
44 static void nxt_pollset_oneshot_read(nxt_event_engine_t *engine,
45 nxt_fd_event_t *ev);
46 static void nxt_pollset_oneshot_write(nxt_event_engine_t *engine,
47 nxt_fd_event_t *ev);
48 static void nxt_pollset_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
49 nxt_uint_t op, nxt_uint_t events);
50 static nxt_int_t nxt_pollset_commit_changes(nxt_event_engine_t *engine);
51 static void nxt_pollset_change_error(nxt_event_engine_t *engine,
52 nxt_fd_event_t *ev);
53 static void nxt_pollset_remove(nxt_event_engine_t *engine, nxt_fd_t fd);
54 static nxt_int_t nxt_pollset_write(nxt_event_engine_t *engine,
55 struct poll_ctl *ctl, int n);
56 static void nxt_pollset_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
57
58
59 const nxt_event_interface_t nxt_pollset_engine = {
60 "pollset",
61 nxt_pollset_create,
62 nxt_pollset_free,
63 nxt_pollset_enable,
64 nxt_pollset_disable,
65 nxt_pollset_disable,
66 nxt_pollset_close,
67 nxt_pollset_enable_read,
68 nxt_pollset_enable_write,
69 nxt_pollset_disable_read,
70 nxt_pollset_disable_write,
71 nxt_pollset_block_read,
72 nxt_pollset_block_write,
73 nxt_pollset_oneshot_read,
74 nxt_pollset_oneshot_write,
75 nxt_pollset_enable_read,
76 NULL,
77 NULL,
78 NULL,
79 NULL,
80 nxt_pollset_poll,
81
82 &nxt_unix_conn_io,
83
84 NXT_NO_FILE_EVENTS,
85 NXT_NO_SIGNAL_EVENTS,
86 };
87
88
89 static nxt_int_t
nxt_pollset_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)90 nxt_pollset_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
91 nxt_uint_t mevents)
92 {
93 void *changes;
94
95 engine->u.pollset.ps = -1;
96 engine->u.pollset.mchanges = mchanges;
97 engine->u.pollset.mevents = mevents;
98
99 changes = nxt_malloc(sizeof(nxt_pollset_change_t) * mchanges);
100 if (changes == NULL) {
101 goto fail;
102 }
103
104 engine->u.pollset.changes = changes;
105
106 /*
107 * NXT_POLLSET_CHANGE requires two struct poll_ctl's
108 * for PS_DELETE and subsequent PS_ADD.
109 */
110 changes = nxt_malloc(2 * sizeof(struct poll_ctl) * mchanges);
111 if (changes == NULL) {
112 goto fail;
113 }
114
115 engine->u.pollset.write_changes = changes;
116
117 engine->u.pollset.events = nxt_malloc(sizeof(struct pollfd) * mevents);
118 if (engine->u.pollset.events == NULL) {
119 goto fail;
120 }
121
122 engine->u.pollset.ps = pollset_create(-1);
123
124 if (engine->u.pollset.ps == -1) {
125 nxt_alert(&engine->task, "pollset_create() failed %E", nxt_errno);
126 goto fail;
127 }
128
129 nxt_debug(&engine->task, "pollset_create(): %d", engine->u.pollset.ps);
130
131 return NXT_OK;
132
133 fail:
134
135 nxt_pollset_free(engine);
136
137 return NXT_ERROR;
138 }
139
140
141 static void
nxt_pollset_free(nxt_event_engine_t * engine)142 nxt_pollset_free(nxt_event_engine_t *engine)
143 {
144 pollset_t ps;
145
146 ps = engine->u.pollset.ps;
147
148 nxt_debug(&engine->task, "pollset %d free", ps);
149
150 if (ps != -1 && pollset_destroy(ps) != 0) {
151 nxt_alert(&engine->task, "pollset_destroy(%d) failed %E",
152 ps, nxt_errno);
153 }
154
155 nxt_free(engine->u.pollset.events);
156 nxt_free(engine->u.pollset.write_changes);
157 nxt_free(engine->u.pollset.changes);
158 nxt_fd_event_hash_destroy(&engine->u.pollset.fd_hash);
159
160 nxt_memzero(&engine->u.pollset, sizeof(nxt_pollset_engine_t));
161 }
162
163
164 static void
nxt_pollset_enable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)165 nxt_pollset_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
166 {
167 ev->read = NXT_EVENT_ACTIVE;
168 ev->write = NXT_EVENT_ACTIVE;
169
170 nxt_pollset_change(engine, ev, NXT_POLLSET_ADD, POLLIN | POLLOUT);
171 }
172
173
174 static void
nxt_pollset_disable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)175 nxt_pollset_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
176 {
177 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
178
179 ev->read = NXT_EVENT_INACTIVE;
180 ev->write = NXT_EVENT_INACTIVE;
181
182 nxt_pollset_change(engine, ev, NXT_POLLSET_DELETE, 0);
183 }
184 }
185
186
187 /*
188 * A closed descriptor must be deleted from a pollset, otherwise next
189 * pollset_poll() will return POLLNVAL on it. However, pollset_ctl()
190 * allows to delete the already closed file descriptor from the pollset
191 * using PS_DELETE, so the removal can be batched, pollset_ctl(2):
192 *
193 * After a file descriptor is added to a pollset, the file descriptor will
194 * not be removed until a pollset_ctl call with the cmd of PS_DELETE is
195 * executed. The file descriptor remains in the pollset even if the file
196 * descriptor is closed. A pollset_poll operation on a pollset containing
197 * a closed file descriptor returns a POLLNVAL event for that file
198 * descriptor. If the file descriptor is later allocated to a new object,
199 * the new object will be polled on future pollset_poll calls.
200 */
201
202 static nxt_bool_t
nxt_pollset_close(nxt_event_engine_t * engine,nxt_fd_event_t * ev)203 nxt_pollset_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
204 {
205 nxt_pollset_disable(engine, ev);
206
207 return ev->changing;
208 }
209
210
211 static void
nxt_pollset_enable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)212 nxt_pollset_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
213 {
214 nxt_uint_t op, events;
215
216 if (ev->read != NXT_EVENT_BLOCKED) {
217
218 events = POLLIN;
219
220 if (ev->write == NXT_EVENT_INACTIVE) {
221 op = NXT_POLLSET_ADD;
222
223 } else if (ev->write == NXT_EVENT_BLOCKED) {
224 ev->write = NXT_EVENT_INACTIVE;
225 op = NXT_POLLSET_CHANGE;
226
227 } else {
228 op = NXT_POLLSET_UPDATE;
229 events = POLLIN | POLLOUT;
230 }
231
232 nxt_pollset_change(engine, ev, op, events);
233 }
234
235 ev->read = NXT_EVENT_ACTIVE;
236 }
237
238
239 static void
nxt_pollset_enable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)240 nxt_pollset_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
241 {
242 nxt_uint_t op, events;
243
244 if (ev->write != NXT_EVENT_BLOCKED) {
245
246 events = POLLOUT;
247
248 if (ev->read == NXT_EVENT_INACTIVE) {
249 op = NXT_POLLSET_ADD;
250
251 } else if (ev->read == NXT_EVENT_BLOCKED) {
252 ev->read = NXT_EVENT_INACTIVE;
253 op = NXT_POLLSET_CHANGE;
254
255 } else {
256 op = NXT_POLLSET_UPDATE;
257 events = POLLIN | POLLOUT;
258 }
259
260 nxt_pollset_change(engine, ev, op, events);
261 }
262
263 ev->write = NXT_EVENT_ACTIVE;
264 }
265
266
267 static void
nxt_pollset_disable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)268 nxt_pollset_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
269 {
270 nxt_uint_t op, events;
271
272 ev->read = NXT_EVENT_INACTIVE;
273
274 if (ev->write <= NXT_EVENT_BLOCKED) {
275 ev->write = NXT_EVENT_INACTIVE;
276 op = NXT_POLLSET_DELETE;
277 events = POLLREMOVE;
278
279 } else {
280 op = NXT_POLLSET_CHANGE;
281 events = POLLOUT;
282 }
283
284 nxt_pollset_change(engine, ev, op, events);
285 }
286
287
288 static void
nxt_pollset_disable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)289 nxt_pollset_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
290 {
291 nxt_uint_t op, events;
292
293 ev->write = NXT_EVENT_INACTIVE;
294
295 if (ev->read <= NXT_EVENT_BLOCKED) {
296 ev->read = NXT_EVENT_INACTIVE;
297 op = NXT_POLLSET_DELETE;
298 events = POLLREMOVE;
299
300 } else {
301 op = NXT_POLLSET_CHANGE;
302 events = POLLIN;
303 }
304
305 nxt_pollset_change(engine, ev, op, events);
306 }
307
308
309 static void
nxt_pollset_block_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)310 nxt_pollset_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
311 {
312 if (ev->read != NXT_EVENT_INACTIVE) {
313 ev->read = NXT_EVENT_BLOCKED;
314 }
315 }
316
317
318 static void
nxt_pollset_block_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)319 nxt_pollset_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
320 {
321 if (ev->write != NXT_EVENT_INACTIVE) {
322 ev->write = NXT_EVENT_BLOCKED;
323 }
324 }
325
326
327 static void
nxt_pollset_oneshot_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)328 nxt_pollset_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
329 {
330 nxt_pollset_enable_read(engine, ev);
331
332 ev->read = NXT_EVENT_ONESHOT;
333 }
334
335
336 static void
nxt_pollset_oneshot_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)337 nxt_pollset_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
338 {
339 nxt_pollset_enable_write(engine, ev);
340
341 ev->write = NXT_EVENT_ONESHOT;
342 }
343
344
345 /*
346 * PS_ADD adds only a new file descriptor to a pollset.
347 * PS_DELETE removes a file descriptor from a pollset.
348 *
349 * PS_MOD can add a new file descriptor or modify events for a file
350 * descriptor which is already in a pollset. However, modified events
351 * are always ORed, so to delete an event for a file descriptor,
352 * the file descriptor must be removed using PS_DELETE and then
353 * added again without the event.
354 */
355
356 static void
nxt_pollset_change(nxt_event_engine_t * engine,nxt_fd_event_t * ev,nxt_uint_t op,nxt_uint_t events)357 nxt_pollset_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
358 nxt_uint_t op, nxt_uint_t events)
359 {
360 nxt_pollset_change_t *change;
361
362 nxt_debug(ev->task, "pollset %d change fd:%d op:%ui ev:%04Xi",
363 engine->u.pollset.ps, ev->fd, op, events);
364
365 if (engine->u.pollset.nchanges >= engine->u.pollset.mchanges) {
366 (void) nxt_pollset_commit_changes(engine);
367 }
368
369 ev->changing = 1;
370
371 change = &engine->u.pollset.changes[engine->u.pollset.nchanges++];
372 change->op = op;
373 change->cmd = (op == NXT_POLLSET_DELETE) ? PS_DELETE : PS_MOD;
374 change->events = events;
375 change->event = ev;
376 }
377
378
379 static nxt_int_t
nxt_pollset_commit_changes(nxt_event_engine_t * engine)380 nxt_pollset_commit_changes(nxt_event_engine_t *engine)
381 {
382 size_t n;
383 nxt_int_t ret, retval;
384 nxt_fd_event_t *ev;
385 struct poll_ctl *ctl, *write_changes;
386 nxt_pollset_change_t *change, *end;
387
388 nxt_debug(&engine->task, "pollset %d changes:%ui",
389 engine->u.pollset.ps, engine->u.pollset.nchanges);
390
391 retval = NXT_OK;
392 n = 0;
393 write_changes = engine->u.pollset.write_changes;
394 change = engine->u.pollset.changes;
395 end = change + engine->u.pollset.nchanges;
396
397 do {
398 ev = change->event;
399 ev->changing = 0;
400
401 nxt_debug(&engine->task, "pollset fd:%d op:%d ev:%04Xd",
402 ev->fd, change->op, change->events);
403
404 if (change->op == NXT_POLLSET_CHANGE) {
405 ctl = &write_changes[n++];
406 ctl->cmd = PS_DELETE;
407 ctl->events = 0;
408 ctl->fd = ev->fd;
409 }
410
411 ctl = &write_changes[n++];
412 ctl->cmd = change->cmd;
413 ctl->events = change->events;
414 ctl->fd = ev->fd;
415
416 change++;
417
418 } while (change < end);
419
420 change = engine->u.pollset.changes;
421 end = change + engine->u.pollset.nchanges;
422
423 ret = nxt_pollset_write(engine, write_changes, n);
424
425 if (nxt_slow_path(ret != NXT_OK)) {
426
427 do {
428 nxt_pollset_change_error(engine, change->event);
429 change++;
430 } while (change < end);
431
432 engine->u.pollset.nchanges = 0;
433
434 return NXT_ERROR;
435 }
436
437 do {
438 ev = change->event;
439
440 if (change->op == NXT_POLLSET_ADD) {
441 ret = nxt_fd_event_hash_add(&engine->u.pollset.fd_hash, ev->fd, ev);
442
443 if (nxt_slow_path(ret != NXT_OK)) {
444 nxt_pollset_change_error(engine, ev);
445 retval = NXT_ERROR;
446 }
447
448 } else if (change->op == NXT_POLLSET_DELETE) {
449 nxt_fd_event_hash_delete(&engine->task, &engine->u.pollset.fd_hash,
450 ev->fd, 0);
451 }
452
453 /* Nothing to do for NXT_POLLSET_UPDATE and NXT_POLLSET_CHANGE. */
454
455 change++;
456
457 } while (change < end);
458
459 engine->u.pollset.nchanges = 0;
460
461 return retval;
462 }
463
464
465 static void
nxt_pollset_change_error(nxt_event_engine_t * engine,nxt_fd_event_t * ev)466 nxt_pollset_change_error(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
467 {
468 ev->read = NXT_EVENT_INACTIVE;
469 ev->write = NXT_EVENT_INACTIVE;
470
471 nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
472 ev->task, ev, ev->data);
473
474 nxt_fd_event_hash_delete(&engine->task, &engine->u.pollset.fd_hash,
475 ev->fd, 1);
476
477 nxt_pollset_remove(engine, ev->fd);
478 }
479
480
481 static void
nxt_pollset_remove(nxt_event_engine_t * engine,nxt_fd_t fd)482 nxt_pollset_remove(nxt_event_engine_t *engine, nxt_fd_t fd)
483 {
484 int n;
485 struct pollfd pfd;
486 struct poll_ctl ctl;
487
488 pfd.fd = fd;
489 pfd.events = 0;
490 pfd.revents = 0;
491
492 n = pollset_query(engine->u.pollset.ps, &pfd);
493
494 nxt_debug(&engine->task, "pollset_query(%d, %d): %d",
495 engine->u.pollset.ps, fd, n);
496
497 if (n == 0) {
498 /* The file descriptor is not in the pollset. */
499 return;
500 }
501
502 if (n == -1) {
503 nxt_alert(&engine->task, "pollset_query(%d, %d) failed %E",
504 engine->u.pollset.ps, fd, nxt_errno);
505 /* Fall through. */
506 }
507
508 /* n == 1: The file descriptor is in the pollset. */
509
510 nxt_debug(&engine->task, "pollset %d remove fd:%d",
511 engine->u.pollset.ps, fd);
512
513 ctl.cmd = PS_DELETE;
514 ctl.events = 0;
515 ctl.fd = fd;
516
517 nxt_pollset_write(engine, &ctl, 1);
518 }
519
520
521 static nxt_int_t
nxt_pollset_write(nxt_event_engine_t * engine,struct poll_ctl * ctl,int n)522 nxt_pollset_write(nxt_event_engine_t *engine, struct poll_ctl *ctl, int n)
523 {
524 pollset_t ps;
525
526 ps = engine->u.pollset.ps;
527
528 nxt_debug(&engine->task, "pollset_ctl(%d) changes:%d", ps, n);
529
530 nxt_set_errno(0);
531
532 n = pollset_ctl(ps, ctl, n);
533
534 if (nxt_fast_path(n == 0)) {
535 return NXT_OK;
536 }
537
538 nxt_alert(&engine->task, "pollset_ctl(%d) failed: %d %E", ps, n, nxt_errno);
539
540 return NXT_ERROR;
541 }
542
543
544 static void
nxt_pollset_poll(nxt_event_engine_t * engine,nxt_msec_t timeout)545 nxt_pollset_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
546 {
547 int nevents;
548 nxt_fd_t fd;
549 nxt_int_t i;
550 nxt_err_t err;
551 nxt_uint_t events, level;
552 struct pollfd *pfd;
553 nxt_fd_event_t *ev;
554
555 if (engine->u.pollset.nchanges != 0) {
556 if (nxt_pollset_commit_changes(engine) != NXT_OK) {
557 /* Error handlers have been enqueued on failure. */
558 timeout = 0;
559 }
560 }
561
562 nxt_debug(&engine->task, "pollset_poll(%d) timeout:%M",
563 engine->u.pollset.ps, timeout);
564
565 nevents = pollset_poll(engine->u.pollset.ps, engine->u.pollset.events,
566 engine->u.pollset.mevents, timeout);
567
568 err = (nevents == -1) ? nxt_errno : 0;
569
570 nxt_thread_time_update(engine->task.thread);
571
572 nxt_debug(&engine->task, "pollset_poll(%d): %d",
573 engine->u.pollset.ps, nevents);
574
575 if (nevents == -1) {
576 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
577
578 nxt_log(&engine->task, level, "pollset_poll(%d) failed %E",
579 engine->u.pollset.ps, err);
580
581 return;
582 }
583
584 for (i = 0; i < nevents; i++) {
585
586 pfd = &engine->u.pollset.events[i];
587 fd = pfd->fd;
588 events = pfd->revents;
589
590 ev = nxt_fd_event_hash_get(&engine->task, &engine->u.pollset.fd_hash,
591 fd);
592
593 if (nxt_slow_path(ev == NULL)) {
594 nxt_alert(&engine->task,
595 "pollset_poll(%d) returned invalid "
596 "fd:%d ev:%04Xd rev:%04uXi",
597 engine->u.pollset.ps, fd, pfd->events, events);
598
599 nxt_pollset_remove(engine, fd);
600 continue;
601 }
602
603 nxt_debug(ev->task, "pollset: fd:%d ev:%04uXi", fd, events);
604
605 if (nxt_slow_path(events & (POLLERR | POLLHUP | POLLNVAL)) != 0) {
606 nxt_alert(ev->task,
607 "pollset_poll(%d) error fd:%d ev:%04Xd rev:%04uXi",
608 engine->u.pollset.ps, fd, pfd->events, events);
609
610 nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
611 ev->task, ev, ev->data);
612 continue;
613 }
614
615 if (events & POLLIN) {
616 ev->read_ready = 1;
617
618 if (ev->read != NXT_EVENT_BLOCKED) {
619 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
620 ev->task, ev, ev->data);
621 }
622
623 if (ev->read == NXT_EVENT_BLOCKED
624 || ev->read == NXT_EVENT_ONESHOT)
625 {
626 nxt_pollset_disable_read(engine, ev);
627 }
628 }
629
630 if (events & POLLOUT) {
631 ev->write_ready = 1;
632
633 if (ev->write != NXT_EVENT_BLOCKED) {
634 nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
635 ev->task, ev, ev->data);
636 }
637
638 if (ev->write == NXT_EVENT_BLOCKED
639 || ev->write == NXT_EVENT_ONESHOT)
640 {
641 nxt_pollset_disable_write(engine, ev);
642 }
643 }
644 }
645 }
646