xref: /unit/src/nxt_pollset_engine.c (revision 12:477899a6661b)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * pollset has been introduced in AIX 5L 5.3.
12  *
13  * pollset_create() returns a pollset_t descriptor which is not
14  * a file descriptor, so it cannot be added to another pollset.
15  * The first pollset_create() call returns 0.
16  */
17 
18 
19 #define NXT_POLLSET_ADD     0
20 #define NXT_POLLSET_UPDATE  1
21 #define NXT_POLLSET_CHANGE  2
22 #define NXT_POLLSET_DELETE  3
23 
24 
25 static nxt_int_t nxt_pollset_create(nxt_event_engine_t *engine,
26     nxt_uint_t mchanges, nxt_uint_t mevents);
27 static void nxt_pollset_free(nxt_event_engine_t *engine);
28 static void nxt_pollset_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
29 static void nxt_pollset_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
30 static nxt_bool_t nxt_pollset_close(nxt_event_engine_t *engine,
31     nxt_fd_event_t *ev);
32 static void nxt_pollset_enable_read(nxt_event_engine_t *engine,
33     nxt_fd_event_t *ev);
34 static void nxt_pollset_enable_write(nxt_event_engine_t *engine,
35     nxt_fd_event_t *ev);
36 static void nxt_pollset_disable_read(nxt_event_engine_t *engine,
37     nxt_fd_event_t *ev);
38 static void nxt_pollset_disable_write(nxt_event_engine_t *engine,
39     nxt_fd_event_t *ev);
40 static void nxt_pollset_block_read(nxt_event_engine_t *engine,
41     nxt_fd_event_t *ev);
42 static void nxt_pollset_block_write(nxt_event_engine_t *engine,
43     nxt_fd_event_t *ev);
44 static void nxt_pollset_oneshot_read(nxt_event_engine_t *engine,
45     nxt_fd_event_t *ev);
46 static void nxt_pollset_oneshot_write(nxt_event_engine_t *engine,
47     nxt_fd_event_t *ev);
48 static void nxt_pollset_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
49     nxt_uint_t op, nxt_uint_t events);
50 static nxt_int_t nxt_pollset_commit_changes(nxt_event_engine_t *engine);
51 static void nxt_pollset_change_error(nxt_event_engine_t *engine,
52     nxt_fd_event_t *ev);
53 static void nxt_pollset_remove(nxt_event_engine_t *engine, nxt_fd_t fd);
54 static nxt_int_t nxt_pollset_write(nxt_event_engine_t *engine,
55     struct poll_ctl *ctl, int n);
56 static void nxt_pollset_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
57 
58 
59 const nxt_event_interface_t  nxt_pollset_engine = {
60     "pollset",
61     nxt_pollset_create,
62     nxt_pollset_free,
63     nxt_pollset_enable,
64     nxt_pollset_disable,
65     nxt_pollset_disable,
66     nxt_pollset_close,
67     nxt_pollset_enable_read,
68     nxt_pollset_enable_write,
69     nxt_pollset_disable_read,
70     nxt_pollset_disable_write,
71     nxt_pollset_block_read,
72     nxt_pollset_block_write,
73     nxt_pollset_oneshot_read,
74     nxt_pollset_oneshot_write,
75     nxt_pollset_enable_read,
76     NULL,
77     NULL,
78     NULL,
79     NULL,
80     nxt_pollset_poll,
81 
82     &nxt_unix_event_conn_io,
83 
84     NXT_NO_FILE_EVENTS,
85     NXT_NO_SIGNAL_EVENTS,
86 };
87 
88 
89 static nxt_int_t
90 nxt_pollset_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
91     nxt_uint_t mevents)
92 {
93     void  *changes;
94 
95     engine->u.pollset.ps = -1;
96     engine->u.pollset.mchanges = mchanges;
97     engine->u.pollset.mevents = mevents;
98 
99     changes = nxt_malloc(sizeof(nxt_pollset_change_t) * mchanges);
100     if (changes == NULL) {
101         goto fail;
102     }
103 
104     engine->u.pollset.changes = changes;
105 
106     /*
107      * NXT_POLLSET_CHANGE requires two struct poll_ctl's
108      * for PS_DELETE and subsequent PS_ADD.
109      */
110     changes = nxt_malloc(2 * sizeof(struct poll_ctl) * mchanges);
111     if (changes == NULL) {
112         goto fail;
113     }
114 
115     engine->u.pollset.write_changes = changes;
116 
117     engine->u.pollset.events = nxt_malloc(sizeof(struct pollfd) * mevents);
118     if (engine->u.pollset.events == NULL) {
119         goto fail;
120     }
121 
122     engine->u.pollset.ps = pollset_create(-1);
123 
124     if (engine->u.pollset.ps == -1) {
125         nxt_log(&engine->task, NXT_LOG_CRIT, "pollset_create() failed %E",
126                 nxt_errno);
127         goto fail;
128     }
129 
130     nxt_debug(&engine->task, "pollset_create(): %d", engine->u.pollset.ps);
131 
132     return NXT_OK;
133 
134 fail:
135 
136     nxt_pollset_free(engine);
137 
138     return NXT_ERROR;
139 }
140 
141 
142 static void
143 nxt_pollset_free(nxt_event_engine_t *engine)
144 {
145     pollset_t  ps;
146 
147     ps = engine->u.pollset.ps;
148 
149     nxt_debug(&engine->task, "pollset %d free", ps);
150 
151     if (ps != -1 && pollset_destroy(ps) != 0) {
152         nxt_log(&engine->task, NXT_LOG_CRIT, "pollset_destroy(%d) failed %E",
153                 ps, nxt_errno);
154     }
155 
156     nxt_free(engine->u.pollset.events);
157     nxt_free(engine->u.pollset.write_changes);
158     nxt_free(engine->u.pollset.changes);
159     nxt_fd_event_hash_destroy(&engine->u.pollset.fd_hash);
160 
161     nxt_memzero(&engine->u.pollset, sizeof(nxt_pollset_engine_t));
162 }
163 
164 
165 static void
166 nxt_pollset_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
167 {
168     ev->read = NXT_EVENT_ACTIVE;
169     ev->write = NXT_EVENT_ACTIVE;
170 
171     nxt_pollset_change(engine, ev, NXT_POLLSET_ADD, POLLIN | POLLOUT);
172 }
173 
174 
175 static void
176 nxt_pollset_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
177 {
178     if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
179 
180         ev->read = NXT_EVENT_INACTIVE;
181         ev->write = NXT_EVENT_INACTIVE;
182 
183         nxt_pollset_change(engine, ev, NXT_POLLSET_DELETE, 0);
184     }
185 }
186 
187 
188 /*
189  * A closed descriptor must be deleted from a pollset, otherwise next
190  * pollset_poll() will return POLLNVAL on it.  However, pollset_ctl()
191  * allows to delete the already closed file descriptor from the pollset
192  * using PS_DELETE, so the removal can be batched, pollset_ctl(2):
193  *
194  *   After a file descriptor is added to a pollset, the file descriptor will
195  *   not be removed until a pollset_ctl call with the cmd of PS_DELETE is
196  *   executed.  The file descriptor remains in the pollset even if the file
197  *   descriptor is closed.  A pollset_poll operation on a pollset containing
198  *   a closed file descriptor returns a POLLNVAL event for that file
199  *   descriptor. If the file descriptor is later allocated to a new object,
200  *   the new object will be polled on future pollset_poll calls.
201  */
202 
203 static nxt_bool_t
204 nxt_pollset_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
205 {
206     nxt_pollset_disable(engine, ev);
207 
208     return ev->changing;
209 }
210 
211 
212 static void
213 nxt_pollset_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
214 {
215     nxt_uint_t  op, events;
216 
217     if (ev->read != NXT_EVENT_BLOCKED) {
218 
219         events = POLLIN;
220 
221         if (ev->write == NXT_EVENT_INACTIVE) {
222             op = NXT_POLLSET_ADD;
223 
224         } else if (ev->write == NXT_EVENT_BLOCKED) {
225             ev->write = NXT_EVENT_INACTIVE;
226             op = NXT_POLLSET_CHANGE;
227 
228         } else {
229             op = NXT_POLLSET_UPDATE;
230             events = POLLIN | POLLOUT;
231         }
232 
233         nxt_pollset_change(engine, ev, op, events);
234     }
235 
236     ev->read = NXT_EVENT_ACTIVE;
237 }
238 
239 
240 static void
241 nxt_pollset_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
242 {
243     nxt_uint_t  op, events;
244 
245     if (ev->write != NXT_EVENT_BLOCKED) {
246 
247         events = POLLOUT;
248 
249         if (ev->read == NXT_EVENT_INACTIVE) {
250             op = NXT_POLLSET_ADD;
251 
252         } else if (ev->read == NXT_EVENT_BLOCKED) {
253             ev->read = NXT_EVENT_INACTIVE;
254             op = NXT_POLLSET_CHANGE;
255 
256         } else {
257             op = NXT_POLLSET_UPDATE;
258             events = POLLIN | POLLOUT;
259         }
260 
261         nxt_pollset_change(engine, ev, op, events);
262     }
263 
264     ev->write = NXT_EVENT_ACTIVE;
265 }
266 
267 
268 static void
269 nxt_pollset_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
270 {
271     nxt_uint_t  op, events;
272 
273     ev->read = NXT_EVENT_INACTIVE;
274 
275     if (ev->write <= NXT_EVENT_BLOCKED) {
276         ev->write = NXT_EVENT_INACTIVE;
277         op = NXT_POLLSET_DELETE;
278         events = POLLREMOVE;
279 
280     } else {
281         op = NXT_POLLSET_CHANGE;
282         events = POLLOUT;
283     }
284 
285     nxt_pollset_change(engine, ev, op, events);
286 }
287 
288 
289 static void
290 nxt_pollset_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
291 {
292     nxt_uint_t  op, events;
293 
294     ev->write = NXT_EVENT_INACTIVE;
295 
296     if (ev->read <= NXT_EVENT_BLOCKED) {
297         ev->read = NXT_EVENT_INACTIVE;
298         op = NXT_POLLSET_DELETE;
299         events = POLLREMOVE;
300 
301     } else {
302         op = NXT_POLLSET_CHANGE;
303         events = POLLIN;
304     }
305 
306     nxt_pollset_change(engine, ev, op, events);
307 }
308 
309 
310 static void
311 nxt_pollset_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
312 {
313     if (ev->read != NXT_EVENT_INACTIVE) {
314         ev->read = NXT_EVENT_BLOCKED;
315     }
316 }
317 
318 
319 static void
320 nxt_pollset_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
321 {
322     if (ev->write != NXT_EVENT_INACTIVE) {
323         ev->write = NXT_EVENT_BLOCKED;
324     }
325 }
326 
327 
328 static void
329 nxt_pollset_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
330 {
331     nxt_pollset_enable_read(engine, ev);
332 
333     ev->read = NXT_EVENT_ONESHOT;
334 }
335 
336 
337 static void
338 nxt_pollset_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
339 {
340     nxt_pollset_enable_write(engine, ev);
341 
342     ev->write = NXT_EVENT_ONESHOT;
343 }
344 
345 
346 /*
347  * PS_ADD adds only a new file descriptor to a pollset.
348  * PS_DELETE removes a file descriptor from a pollset.
349  *
350  * PS_MOD can add a new file descriptor or modify events for a file
351  * descriptor which is already in a pollset.  However, modified events
352  * are always ORed, so to delete an event for a file descriptor,
353  * the file descriptor must be removed using PS_DELETE and then
354  * added again without the event.
355  */
356 
357 static void
358 nxt_pollset_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
359     nxt_uint_t op, nxt_uint_t events)
360 {
361     nxt_pollset_change_t  *change;
362 
363     nxt_debug(ev->task, "pollset %d change fd:%d op:%ui ev:%04Xi",
364               engine->u.pollset.ps, ev->fd, op, events);
365 
366     if (engine->u.pollset.nchanges >= engine->u.pollset.mchanges) {
367         (void) nxt_pollset_commit_changes(engine);
368     }
369 
370     ev->changing = 1;
371 
372     change = &engine->u.pollset.changes[engine->u.pollset.nchanges++];
373     change->op = op;
374     change->cmd = (op == NXT_POLLSET_DELETE) ? PS_DELETE : PS_MOD;
375     change->events = events;
376     change->event = ev;
377 }
378 
379 
380 static nxt_int_t
381 nxt_pollset_commit_changes(nxt_event_engine_t *engine)
382 {
383     size_t                n;
384     nxt_int_t             ret, retval;
385     nxt_fd_event_t        *ev;
386     struct poll_ctl       *ctl, *write_changes;
387     nxt_pollset_change_t  *change, *end;
388 
389     nxt_debug(&engine->task, "pollset %d changes:%ui",
390               engine->u.pollset.ps, engine->u.pollset.nchanges);
391 
392     retval = NXT_OK;
393     n = 0;
394     write_changes = engine->u.pollset.write_changes;
395     change = engine->u.pollset.changes;
396     end = change + engine->u.pollset.nchanges;
397 
398     do {
399         ev = change->event;
400         ev->changing = 0;
401 
402         nxt_debug(&engine->task, "pollset fd:%d op:%d ev:%04Xd",
403                   ev->fd, change->op, change->events);
404 
405         if (change->op == NXT_POLLSET_CHANGE) {
406             ctl = &write_changes[n++];
407             ctl->cmd = PS_DELETE;
408             ctl->events = 0;
409             ctl->fd = ev->fd;
410         }
411 
412         ctl = &write_changes[n++];
413         ctl->cmd = change->cmd;
414         ctl->events = change->events;
415         ctl->fd = ev->fd;
416 
417         change++;
418 
419     } while (change < end);
420 
421     change = engine->u.pollset.changes;
422     end = change + engine->u.pollset.nchanges;
423 
424     ret = nxt_pollset_write(engine, write_changes, n);
425 
426     if (nxt_slow_path(ret != NXT_OK)) {
427 
428         do {
429             nxt_pollset_change_error(engine, change->event);
430             change++;
431         } while (change < end);
432 
433         engine->u.pollset.nchanges = 0;
434 
435         return NXT_ERROR;
436     }
437 
438     do {
439         ev = change->event;
440 
441         if (change->op == NXT_POLLSET_ADD) {
442             ret = nxt_fd_event_hash_add(&engine->u.pollset.fd_hash, ev->fd, ev);
443 
444             if (nxt_slow_path(ret != NXT_OK)) {
445                 nxt_pollset_change_error(engine, ev);
446                 retval = NXT_ERROR;
447             }
448 
449         } else if (change->op == NXT_POLLSET_DELETE) {
450             nxt_fd_event_hash_delete(&engine->task, &engine->u.pollset.fd_hash,
451                                      ev->fd, 0);
452         }
453 
454         /* Nothing to do for NXT_POLLSET_UPDATE and NXT_POLLSET_CHANGE. */
455 
456         change++;
457 
458     } while (change < end);
459 
460     engine->u.pollset.nchanges = 0;
461 
462     return retval;
463 }
464 
465 
466 static void
467 nxt_pollset_change_error(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
468 {
469     ev->read = NXT_EVENT_INACTIVE;
470     ev->write = NXT_EVENT_INACTIVE;
471 
472     nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
473                        ev->task, ev, ev->data);
474 
475     nxt_fd_event_hash_delete(&engine->task, &engine->u.pollset.fd_hash,
476                              ev->fd, 1);
477 
478     nxt_pollset_remove(engine, ev->fd);
479 }
480 
481 
482 static void
483 nxt_pollset_remove(nxt_event_engine_t *engine, nxt_fd_t fd)
484 {
485     int              n;
486     struct pollfd    pfd;
487     struct poll_ctl  ctl;
488 
489     pfd.fd = fd;
490     pfd.events = 0;
491     pfd.revents = 0;
492 
493     n = pollset_query(engine->u.pollset.ps, &pfd);
494 
495     nxt_debug(&engine->task, "pollset_query(%d, %d): %d",
496               engine->u.pollset.ps, fd, n);
497 
498     if (n == 0) {
499         /* The file descriptor is not in the pollset. */
500         return;
501     }
502 
503     if (n == -1) {
504         nxt_log(&engine->task, NXT_LOG_CRIT, "pollset_query(%d, %d) failed %E",
505                 engine->u.pollset.ps, fd, nxt_errno);
506         /* Fall through. */
507     }
508 
509     /* n == 1: The file descriptor is in the pollset. */
510 
511     nxt_debug(&engine->task, "pollset %d remove fd:%d",
512               engine->u.pollset.ps, fd);
513 
514     ctl.cmd = PS_DELETE;
515     ctl.events = 0;
516     ctl.fd = fd;
517 
518     nxt_pollset_write(engine, &ctl, 1);
519 }
520 
521 
522 static nxt_int_t
523 nxt_pollset_write(nxt_event_engine_t *engine, struct poll_ctl *ctl, int n)
524 {
525     pollset_t  ps;
526 
527     ps = engine->u.pollset.ps;
528 
529     nxt_debug(&engine->task, "pollset_ctl(%d) changes:%d", ps, n);
530 
531     nxt_set_errno(0);
532 
533     n = pollset_ctl(ps, ctl, n);
534 
535     if (nxt_fast_path(n == 0)) {
536         return NXT_OK;
537     }
538 
539     nxt_log(&engine->task, NXT_LOG_CRIT, "pollset_ctl(%d) failed: %d %E",
540             ps, n, nxt_errno);
541 
542     return NXT_ERROR;
543 }
544 
545 
546 static void
547 nxt_pollset_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
548 {
549     int             nevents;
550     nxt_fd_t        fd;
551     nxt_int_t       i;
552     nxt_err_t       err;
553     nxt_uint_t      events, level;
554     struct pollfd   *pfd;
555     nxt_fd_event_t  *ev;
556 
557     if (engine->u.pollset.nchanges != 0) {
558         if (nxt_pollset_commit_changes(engine) != NXT_OK) {
559             /* Error handlers have been enqueued on failure. */
560             timeout = 0;
561         }
562     }
563 
564     nxt_debug(&engine->task, "pollset_poll(%d) timeout:%M",
565               engine->u.pollset.ps, timeout);
566 
567     nevents = pollset_poll(engine->u.pollset.ps, engine->u.pollset.events,
568                            engine->u.pollset.mevents, timeout);
569 
570     err = (nevents == -1) ? nxt_errno : 0;
571 
572     nxt_thread_time_update(engine->task.thread);
573 
574     nxt_debug(&engine->task, "pollset_poll(%d): %d",
575               engine->u.pollset.ps, nevents);
576 
577     if (nevents == -1) {
578         level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT;
579 
580         nxt_log(&engine->task, level, "pollset_poll(%d) failed %E",
581                 engine->u.pollset.ps, err);
582 
583         return;
584     }
585 
586     for (i = 0; i < nevents; i++) {
587 
588         pfd = &engine->u.pollset.events[i];
589         fd = pfd->fd;
590         events = pfd->revents;
591 
592         ev = nxt_fd_event_hash_get(&engine->task, &engine->u.pollset.fd_hash,
593                                    fd);
594 
595         if (nxt_slow_path(ev == NULL)) {
596             nxt_log(&engine->task, NXT_LOG_CRIT,
597                     "pollset_poll(%d) returned invalid "
598                     "fd:%d ev:%04Xd rev:%04uXi",
599                     engine->u.pollset.ps, fd, pfd->events, events);
600 
601             nxt_pollset_remove(engine, fd);
602             continue;
603         }
604 
605         nxt_debug(ev->task, "pollset: fd:%d ev:%04uXi", fd, events);
606 
607         if (nxt_slow_path(events & (POLLERR | POLLHUP | POLLNVAL)) != 0) {
608             nxt_log(ev->task, NXT_LOG_CRIT,
609                     "pollset_poll(%d) error fd:%d ev:%04Xd rev:%04uXi",
610                     engine->u.pollset.ps, fd, pfd->events, events);
611 
612             nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
613                                ev->task, ev, ev->data);
614             continue;
615         }
616 
617         if (events & POLLIN) {
618             ev->read_ready = 1;
619 
620             if (ev->read != NXT_EVENT_BLOCKED) {
621                 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
622                                    ev->task, ev, ev->data);
623             }
624 
625             if (ev->read == NXT_EVENT_BLOCKED
626                 || ev->read == NXT_EVENT_ONESHOT)
627             {
628                 nxt_pollset_disable_read(engine, ev);
629             }
630         }
631 
632         if (events & POLLOUT) {
633             ev->write_ready = 1;
634 
635             if (ev->write != NXT_EVENT_BLOCKED) {
636                 nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
637                                    ev->task, ev, ev->data);
638             }
639 
640             if (ev->write == NXT_EVENT_BLOCKED
641                 || ev->write == NXT_EVENT_ONESHOT)
642             {
643                 nxt_pollset_disable_write(engine, ev);
644             }
645         }
646     }
647 }
648