xref: /unit/src/nxt_pollset_engine.c (revision 564:762f8c976ead)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * pollset has been introduced in AIX 5L 5.3.
12  *
13  * pollset_create() returns a pollset_t descriptor which is not
14  * a file descriptor, so it cannot be added to another pollset.
15  * The first pollset_create() call returns 0.
16  */
17 
18 
19 #define NXT_POLLSET_ADD     0
20 #define NXT_POLLSET_UPDATE  1
21 #define NXT_POLLSET_CHANGE  2
22 #define NXT_POLLSET_DELETE  3
23 
24 
25 static nxt_int_t nxt_pollset_create(nxt_event_engine_t *engine,
26     nxt_uint_t mchanges, nxt_uint_t mevents);
27 static void nxt_pollset_free(nxt_event_engine_t *engine);
28 static void nxt_pollset_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
29 static void nxt_pollset_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
30 static nxt_bool_t nxt_pollset_close(nxt_event_engine_t *engine,
31     nxt_fd_event_t *ev);
32 static void nxt_pollset_enable_read(nxt_event_engine_t *engine,
33     nxt_fd_event_t *ev);
34 static void nxt_pollset_enable_write(nxt_event_engine_t *engine,
35     nxt_fd_event_t *ev);
36 static void nxt_pollset_disable_read(nxt_event_engine_t *engine,
37     nxt_fd_event_t *ev);
38 static void nxt_pollset_disable_write(nxt_event_engine_t *engine,
39     nxt_fd_event_t *ev);
40 static void nxt_pollset_block_read(nxt_event_engine_t *engine,
41     nxt_fd_event_t *ev);
42 static void nxt_pollset_block_write(nxt_event_engine_t *engine,
43     nxt_fd_event_t *ev);
44 static void nxt_pollset_oneshot_read(nxt_event_engine_t *engine,
45     nxt_fd_event_t *ev);
46 static void nxt_pollset_oneshot_write(nxt_event_engine_t *engine,
47     nxt_fd_event_t *ev);
48 static void nxt_pollset_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
49     nxt_uint_t op, nxt_uint_t events);
50 static nxt_int_t nxt_pollset_commit_changes(nxt_event_engine_t *engine);
51 static void nxt_pollset_change_error(nxt_event_engine_t *engine,
52     nxt_fd_event_t *ev);
53 static void nxt_pollset_remove(nxt_event_engine_t *engine, nxt_fd_t fd);
54 static nxt_int_t nxt_pollset_write(nxt_event_engine_t *engine,
55     struct poll_ctl *ctl, int n);
56 static void nxt_pollset_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
57 
58 
59 const nxt_event_interface_t  nxt_pollset_engine = {
60     "pollset",
61     nxt_pollset_create,
62     nxt_pollset_free,
63     nxt_pollset_enable,
64     nxt_pollset_disable,
65     nxt_pollset_disable,
66     nxt_pollset_close,
67     nxt_pollset_enable_read,
68     nxt_pollset_enable_write,
69     nxt_pollset_disable_read,
70     nxt_pollset_disable_write,
71     nxt_pollset_block_read,
72     nxt_pollset_block_write,
73     nxt_pollset_oneshot_read,
74     nxt_pollset_oneshot_write,
75     nxt_pollset_enable_read,
76     NULL,
77     NULL,
78     NULL,
79     NULL,
80     nxt_pollset_poll,
81 
82     &nxt_unix_conn_io,
83 
84     NXT_NO_FILE_EVENTS,
85     NXT_NO_SIGNAL_EVENTS,
86 };
87 
88 
89 static nxt_int_t
nxt_pollset_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)90 nxt_pollset_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
91     nxt_uint_t mevents)
92 {
93     void  *changes;
94 
95     engine->u.pollset.ps = -1;
96     engine->u.pollset.mchanges = mchanges;
97     engine->u.pollset.mevents = mevents;
98 
99     changes = nxt_malloc(sizeof(nxt_pollset_change_t) * mchanges);
100     if (changes == NULL) {
101         goto fail;
102     }
103 
104     engine->u.pollset.changes = changes;
105 
106     /*
107      * NXT_POLLSET_CHANGE requires two struct poll_ctl's
108      * for PS_DELETE and subsequent PS_ADD.
109      */
110     changes = nxt_malloc(2 * sizeof(struct poll_ctl) * mchanges);
111     if (changes == NULL) {
112         goto fail;
113     }
114 
115     engine->u.pollset.write_changes = changes;
116 
117     engine->u.pollset.events = nxt_malloc(sizeof(struct pollfd) * mevents);
118     if (engine->u.pollset.events == NULL) {
119         goto fail;
120     }
121 
122     engine->u.pollset.ps = pollset_create(-1);
123 
124     if (engine->u.pollset.ps == -1) {
125         nxt_alert(&engine->task, "pollset_create() failed %E", nxt_errno);
126         goto fail;
127     }
128 
129     nxt_debug(&engine->task, "pollset_create(): %d", engine->u.pollset.ps);
130 
131     return NXT_OK;
132 
133 fail:
134 
135     nxt_pollset_free(engine);
136 
137     return NXT_ERROR;
138 }
139 
140 
141 static void
nxt_pollset_free(nxt_event_engine_t * engine)142 nxt_pollset_free(nxt_event_engine_t *engine)
143 {
144     pollset_t  ps;
145 
146     ps = engine->u.pollset.ps;
147 
148     nxt_debug(&engine->task, "pollset %d free", ps);
149 
150     if (ps != -1 && pollset_destroy(ps) != 0) {
151         nxt_alert(&engine->task, "pollset_destroy(%d) failed %E",
152                   ps, nxt_errno);
153     }
154 
155     nxt_free(engine->u.pollset.events);
156     nxt_free(engine->u.pollset.write_changes);
157     nxt_free(engine->u.pollset.changes);
158     nxt_fd_event_hash_destroy(&engine->u.pollset.fd_hash);
159 
160     nxt_memzero(&engine->u.pollset, sizeof(nxt_pollset_engine_t));
161 }
162 
163 
164 static void
nxt_pollset_enable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)165 nxt_pollset_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
166 {
167     ev->read = NXT_EVENT_ACTIVE;
168     ev->write = NXT_EVENT_ACTIVE;
169 
170     nxt_pollset_change(engine, ev, NXT_POLLSET_ADD, POLLIN | POLLOUT);
171 }
172 
173 
174 static void
nxt_pollset_disable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)175 nxt_pollset_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
176 {
177     if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
178 
179         ev->read = NXT_EVENT_INACTIVE;
180         ev->write = NXT_EVENT_INACTIVE;
181 
182         nxt_pollset_change(engine, ev, NXT_POLLSET_DELETE, 0);
183     }
184 }
185 
186 
187 /*
188  * A closed descriptor must be deleted from a pollset, otherwise next
189  * pollset_poll() will return POLLNVAL on it.  However, pollset_ctl()
190  * allows to delete the already closed file descriptor from the pollset
191  * using PS_DELETE, so the removal can be batched, pollset_ctl(2):
192  *
193  *   After a file descriptor is added to a pollset, the file descriptor will
194  *   not be removed until a pollset_ctl call with the cmd of PS_DELETE is
195  *   executed.  The file descriptor remains in the pollset even if the file
196  *   descriptor is closed.  A pollset_poll operation on a pollset containing
197  *   a closed file descriptor returns a POLLNVAL event for that file
198  *   descriptor. If the file descriptor is later allocated to a new object,
199  *   the new object will be polled on future pollset_poll calls.
200  */
201 
202 static nxt_bool_t
nxt_pollset_close(nxt_event_engine_t * engine,nxt_fd_event_t * ev)203 nxt_pollset_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
204 {
205     nxt_pollset_disable(engine, ev);
206 
207     return ev->changing;
208 }
209 
210 
211 static void
nxt_pollset_enable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)212 nxt_pollset_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
213 {
214     nxt_uint_t  op, events;
215 
216     if (ev->read != NXT_EVENT_BLOCKED) {
217 
218         events = POLLIN;
219 
220         if (ev->write == NXT_EVENT_INACTIVE) {
221             op = NXT_POLLSET_ADD;
222 
223         } else if (ev->write == NXT_EVENT_BLOCKED) {
224             ev->write = NXT_EVENT_INACTIVE;
225             op = NXT_POLLSET_CHANGE;
226 
227         } else {
228             op = NXT_POLLSET_UPDATE;
229             events = POLLIN | POLLOUT;
230         }
231 
232         nxt_pollset_change(engine, ev, op, events);
233     }
234 
235     ev->read = NXT_EVENT_ACTIVE;
236 }
237 
238 
239 static void
nxt_pollset_enable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)240 nxt_pollset_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
241 {
242     nxt_uint_t  op, events;
243 
244     if (ev->write != NXT_EVENT_BLOCKED) {
245 
246         events = POLLOUT;
247 
248         if (ev->read == NXT_EVENT_INACTIVE) {
249             op = NXT_POLLSET_ADD;
250 
251         } else if (ev->read == NXT_EVENT_BLOCKED) {
252             ev->read = NXT_EVENT_INACTIVE;
253             op = NXT_POLLSET_CHANGE;
254 
255         } else {
256             op = NXT_POLLSET_UPDATE;
257             events = POLLIN | POLLOUT;
258         }
259 
260         nxt_pollset_change(engine, ev, op, events);
261     }
262 
263     ev->write = NXT_EVENT_ACTIVE;
264 }
265 
266 
267 static void
nxt_pollset_disable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)268 nxt_pollset_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
269 {
270     nxt_uint_t  op, events;
271 
272     ev->read = NXT_EVENT_INACTIVE;
273 
274     if (ev->write <= NXT_EVENT_BLOCKED) {
275         ev->write = NXT_EVENT_INACTIVE;
276         op = NXT_POLLSET_DELETE;
277         events = POLLREMOVE;
278 
279     } else {
280         op = NXT_POLLSET_CHANGE;
281         events = POLLOUT;
282     }
283 
284     nxt_pollset_change(engine, ev, op, events);
285 }
286 
287 
288 static void
nxt_pollset_disable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)289 nxt_pollset_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
290 {
291     nxt_uint_t  op, events;
292 
293     ev->write = NXT_EVENT_INACTIVE;
294 
295     if (ev->read <= NXT_EVENT_BLOCKED) {
296         ev->read = NXT_EVENT_INACTIVE;
297         op = NXT_POLLSET_DELETE;
298         events = POLLREMOVE;
299 
300     } else {
301         op = NXT_POLLSET_CHANGE;
302         events = POLLIN;
303     }
304 
305     nxt_pollset_change(engine, ev, op, events);
306 }
307 
308 
309 static void
nxt_pollset_block_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)310 nxt_pollset_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
311 {
312     if (ev->read != NXT_EVENT_INACTIVE) {
313         ev->read = NXT_EVENT_BLOCKED;
314     }
315 }
316 
317 
318 static void
nxt_pollset_block_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)319 nxt_pollset_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
320 {
321     if (ev->write != NXT_EVENT_INACTIVE) {
322         ev->write = NXT_EVENT_BLOCKED;
323     }
324 }
325 
326 
327 static void
nxt_pollset_oneshot_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)328 nxt_pollset_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
329 {
330     nxt_pollset_enable_read(engine, ev);
331 
332     ev->read = NXT_EVENT_ONESHOT;
333 }
334 
335 
336 static void
nxt_pollset_oneshot_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)337 nxt_pollset_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
338 {
339     nxt_pollset_enable_write(engine, ev);
340 
341     ev->write = NXT_EVENT_ONESHOT;
342 }
343 
344 
345 /*
346  * PS_ADD adds only a new file descriptor to a pollset.
347  * PS_DELETE removes a file descriptor from a pollset.
348  *
349  * PS_MOD can add a new file descriptor or modify events for a file
350  * descriptor which is already in a pollset.  However, modified events
351  * are always ORed, so to delete an event for a file descriptor,
352  * the file descriptor must be removed using PS_DELETE and then
353  * added again without the event.
354  */
355 
356 static void
nxt_pollset_change(nxt_event_engine_t * engine,nxt_fd_event_t * ev,nxt_uint_t op,nxt_uint_t events)357 nxt_pollset_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
358     nxt_uint_t op, nxt_uint_t events)
359 {
360     nxt_pollset_change_t  *change;
361 
362     nxt_debug(ev->task, "pollset %d change fd:%d op:%ui ev:%04Xi",
363               engine->u.pollset.ps, ev->fd, op, events);
364 
365     if (engine->u.pollset.nchanges >= engine->u.pollset.mchanges) {
366         (void) nxt_pollset_commit_changes(engine);
367     }
368 
369     ev->changing = 1;
370 
371     change = &engine->u.pollset.changes[engine->u.pollset.nchanges++];
372     change->op = op;
373     change->cmd = (op == NXT_POLLSET_DELETE) ? PS_DELETE : PS_MOD;
374     change->events = events;
375     change->event = ev;
376 }
377 
378 
379 static nxt_int_t
nxt_pollset_commit_changes(nxt_event_engine_t * engine)380 nxt_pollset_commit_changes(nxt_event_engine_t *engine)
381 {
382     size_t                n;
383     nxt_int_t             ret, retval;
384     nxt_fd_event_t        *ev;
385     struct poll_ctl       *ctl, *write_changes;
386     nxt_pollset_change_t  *change, *end;
387 
388     nxt_debug(&engine->task, "pollset %d changes:%ui",
389               engine->u.pollset.ps, engine->u.pollset.nchanges);
390 
391     retval = NXT_OK;
392     n = 0;
393     write_changes = engine->u.pollset.write_changes;
394     change = engine->u.pollset.changes;
395     end = change + engine->u.pollset.nchanges;
396 
397     do {
398         ev = change->event;
399         ev->changing = 0;
400 
401         nxt_debug(&engine->task, "pollset fd:%d op:%d ev:%04Xd",
402                   ev->fd, change->op, change->events);
403 
404         if (change->op == NXT_POLLSET_CHANGE) {
405             ctl = &write_changes[n++];
406             ctl->cmd = PS_DELETE;
407             ctl->events = 0;
408             ctl->fd = ev->fd;
409         }
410 
411         ctl = &write_changes[n++];
412         ctl->cmd = change->cmd;
413         ctl->events = change->events;
414         ctl->fd = ev->fd;
415 
416         change++;
417 
418     } while (change < end);
419 
420     change = engine->u.pollset.changes;
421     end = change + engine->u.pollset.nchanges;
422 
423     ret = nxt_pollset_write(engine, write_changes, n);
424 
425     if (nxt_slow_path(ret != NXT_OK)) {
426 
427         do {
428             nxt_pollset_change_error(engine, change->event);
429             change++;
430         } while (change < end);
431 
432         engine->u.pollset.nchanges = 0;
433 
434         return NXT_ERROR;
435     }
436 
437     do {
438         ev = change->event;
439 
440         if (change->op == NXT_POLLSET_ADD) {
441             ret = nxt_fd_event_hash_add(&engine->u.pollset.fd_hash, ev->fd, ev);
442 
443             if (nxt_slow_path(ret != NXT_OK)) {
444                 nxt_pollset_change_error(engine, ev);
445                 retval = NXT_ERROR;
446             }
447 
448         } else if (change->op == NXT_POLLSET_DELETE) {
449             nxt_fd_event_hash_delete(&engine->task, &engine->u.pollset.fd_hash,
450                                      ev->fd, 0);
451         }
452 
453         /* Nothing to do for NXT_POLLSET_UPDATE and NXT_POLLSET_CHANGE. */
454 
455         change++;
456 
457     } while (change < end);
458 
459     engine->u.pollset.nchanges = 0;
460 
461     return retval;
462 }
463 
464 
465 static void
nxt_pollset_change_error(nxt_event_engine_t * engine,nxt_fd_event_t * ev)466 nxt_pollset_change_error(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
467 {
468     ev->read = NXT_EVENT_INACTIVE;
469     ev->write = NXT_EVENT_INACTIVE;
470 
471     nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
472                        ev->task, ev, ev->data);
473 
474     nxt_fd_event_hash_delete(&engine->task, &engine->u.pollset.fd_hash,
475                              ev->fd, 1);
476 
477     nxt_pollset_remove(engine, ev->fd);
478 }
479 
480 
481 static void
nxt_pollset_remove(nxt_event_engine_t * engine,nxt_fd_t fd)482 nxt_pollset_remove(nxt_event_engine_t *engine, nxt_fd_t fd)
483 {
484     int              n;
485     struct pollfd    pfd;
486     struct poll_ctl  ctl;
487 
488     pfd.fd = fd;
489     pfd.events = 0;
490     pfd.revents = 0;
491 
492     n = pollset_query(engine->u.pollset.ps, &pfd);
493 
494     nxt_debug(&engine->task, "pollset_query(%d, %d): %d",
495               engine->u.pollset.ps, fd, n);
496 
497     if (n == 0) {
498         /* The file descriptor is not in the pollset. */
499         return;
500     }
501 
502     if (n == -1) {
503         nxt_alert(&engine->task, "pollset_query(%d, %d) failed %E",
504                   engine->u.pollset.ps, fd, nxt_errno);
505         /* Fall through. */
506     }
507 
508     /* n == 1: The file descriptor is in the pollset. */
509 
510     nxt_debug(&engine->task, "pollset %d remove fd:%d",
511               engine->u.pollset.ps, fd);
512 
513     ctl.cmd = PS_DELETE;
514     ctl.events = 0;
515     ctl.fd = fd;
516 
517     nxt_pollset_write(engine, &ctl, 1);
518 }
519 
520 
521 static nxt_int_t
nxt_pollset_write(nxt_event_engine_t * engine,struct poll_ctl * ctl,int n)522 nxt_pollset_write(nxt_event_engine_t *engine, struct poll_ctl *ctl, int n)
523 {
524     pollset_t  ps;
525 
526     ps = engine->u.pollset.ps;
527 
528     nxt_debug(&engine->task, "pollset_ctl(%d) changes:%d", ps, n);
529 
530     nxt_set_errno(0);
531 
532     n = pollset_ctl(ps, ctl, n);
533 
534     if (nxt_fast_path(n == 0)) {
535         return NXT_OK;
536     }
537 
538     nxt_alert(&engine->task, "pollset_ctl(%d) failed: %d %E", ps, n, nxt_errno);
539 
540     return NXT_ERROR;
541 }
542 
543 
544 static void
nxt_pollset_poll(nxt_event_engine_t * engine,nxt_msec_t timeout)545 nxt_pollset_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
546 {
547     int             nevents;
548     nxt_fd_t        fd;
549     nxt_int_t       i;
550     nxt_err_t       err;
551     nxt_uint_t      events, level;
552     struct pollfd   *pfd;
553     nxt_fd_event_t  *ev;
554 
555     if (engine->u.pollset.nchanges != 0) {
556         if (nxt_pollset_commit_changes(engine) != NXT_OK) {
557             /* Error handlers have been enqueued on failure. */
558             timeout = 0;
559         }
560     }
561 
562     nxt_debug(&engine->task, "pollset_poll(%d) timeout:%M",
563               engine->u.pollset.ps, timeout);
564 
565     nevents = pollset_poll(engine->u.pollset.ps, engine->u.pollset.events,
566                            engine->u.pollset.mevents, timeout);
567 
568     err = (nevents == -1) ? nxt_errno : 0;
569 
570     nxt_thread_time_update(engine->task.thread);
571 
572     nxt_debug(&engine->task, "pollset_poll(%d): %d",
573               engine->u.pollset.ps, nevents);
574 
575     if (nevents == -1) {
576         level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
577 
578         nxt_log(&engine->task, level, "pollset_poll(%d) failed %E",
579                 engine->u.pollset.ps, err);
580 
581         return;
582     }
583 
584     for (i = 0; i < nevents; i++) {
585 
586         pfd = &engine->u.pollset.events[i];
587         fd = pfd->fd;
588         events = pfd->revents;
589 
590         ev = nxt_fd_event_hash_get(&engine->task, &engine->u.pollset.fd_hash,
591                                    fd);
592 
593         if (nxt_slow_path(ev == NULL)) {
594             nxt_alert(&engine->task,
595                       "pollset_poll(%d) returned invalid "
596                       "fd:%d ev:%04Xd rev:%04uXi",
597                       engine->u.pollset.ps, fd, pfd->events, events);
598 
599             nxt_pollset_remove(engine, fd);
600             continue;
601         }
602 
603         nxt_debug(ev->task, "pollset: fd:%d ev:%04uXi", fd, events);
604 
605         if (nxt_slow_path(events & (POLLERR | POLLHUP | POLLNVAL)) != 0) {
606             nxt_alert(ev->task,
607                       "pollset_poll(%d) error fd:%d ev:%04Xd rev:%04uXi",
608                       engine->u.pollset.ps, fd, pfd->events, events);
609 
610             nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
611                                ev->task, ev, ev->data);
612             continue;
613         }
614 
615         if (events & POLLIN) {
616             ev->read_ready = 1;
617 
618             if (ev->read != NXT_EVENT_BLOCKED) {
619                 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
620                                    ev->task, ev, ev->data);
621             }
622 
623             if (ev->read == NXT_EVENT_BLOCKED
624                 || ev->read == NXT_EVENT_ONESHOT)
625             {
626                 nxt_pollset_disable_read(engine, ev);
627             }
628         }
629 
630         if (events & POLLOUT) {
631             ev->write_ready = 1;
632 
633             if (ev->write != NXT_EVENT_BLOCKED) {
634                 nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
635                                    ev->task, ev, ev->data);
636             }
637 
638             if (ev->write == NXT_EVENT_BLOCKED
639                 || ev->write == NXT_EVENT_ONESHOT)
640             {
641                 nxt_pollset_disable_write(engine, ev);
642             }
643         }
644     }
645 }
646