xref: /unit/src/nxt_poll_engine.c (revision 65:10688b89aa16)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 #define NXT_POLL_ADD     0
11 #define NXT_POLL_CHANGE  1
12 #define NXT_POLL_DELETE  2
13 
14 
15 typedef struct {
16     /*
17      * A file descriptor is stored in hash entry to allow
18      * nxt_poll_fd_hash_test() to not dereference a pointer to
19      * nxt_fd_event_t which may be invalid if the file descriptor has
20      * been already closed and the nxt_fd_event_t's memory has been freed.
21      */
22     nxt_socket_t         fd;
23 
24     uint32_t             index;
25     void                 *event;
26 } nxt_poll_hash_entry_t;
27 
28 
29 static nxt_int_t nxt_poll_create(nxt_event_engine_t *engine,
30     nxt_uint_t mchanges, nxt_uint_t mevents);
31 static void nxt_poll_free(nxt_event_engine_t *engine);
32 static void nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
33 static void nxt_poll_disable(nxt_event_engine_t *engine,
34     nxt_fd_event_t *ev);
35 static nxt_bool_t nxt_poll_close(nxt_event_engine_t *engine,
36     nxt_fd_event_t *ev);
37 static void nxt_poll_enable_read(nxt_event_engine_t *engine,
38     nxt_fd_event_t *ev);
39 static void nxt_poll_enable_write(nxt_event_engine_t *engine,
40     nxt_fd_event_t *ev);
41 static void nxt_poll_disable_read(nxt_event_engine_t *engine,
42     nxt_fd_event_t *ev);
43 static void nxt_poll_disable_write(nxt_event_engine_t *engine,
44     nxt_fd_event_t *ev);
45 static void nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
46 static void nxt_poll_block_write(nxt_event_engine_t *engine,
47     nxt_fd_event_t *ev);
48 static void nxt_poll_oneshot_read(nxt_event_engine_t *engine,
49     nxt_fd_event_t *ev);
50 static void nxt_poll_oneshot_write(nxt_event_engine_t *engine,
51     nxt_fd_event_t *ev);
52 static void nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
53     nxt_uint_t op, nxt_uint_t events);
54 static nxt_int_t nxt_poll_commit_changes(nxt_event_engine_t *engine);
55 static nxt_int_t nxt_poll_set_add(nxt_event_engine_t *engine,
56     nxt_fd_event_t *ev, int events);
57 static nxt_int_t nxt_poll_set_change(nxt_event_engine_t *engine,
58     nxt_fd_t fd, int events);
59 static nxt_int_t nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd);
60 static void nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
61 static nxt_poll_hash_entry_t *nxt_poll_fd_hash_get(nxt_event_engine_t *engine,
62     nxt_fd_t fd);
63 static nxt_int_t nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
64 static void nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine,
65     nxt_lvlhsh_t *lh);
66 
67 
68 const nxt_event_interface_t  nxt_poll_engine = {
69     "poll",
70     nxt_poll_create,
71     nxt_poll_free,
72     nxt_poll_enable,
73     nxt_poll_disable,
74     nxt_poll_disable,
75     nxt_poll_close,
76     nxt_poll_enable_read,
77     nxt_poll_enable_write,
78     nxt_poll_disable_read,
79     nxt_poll_disable_write,
80     nxt_poll_block_read,
81     nxt_poll_block_write,
82     nxt_poll_oneshot_read,
83     nxt_poll_oneshot_write,
84     nxt_poll_enable_read,
85     NULL,
86     NULL,
87     NULL,
88     NULL,
89     nxt_poll,
90 
91     &nxt_unix_conn_io,
92 
93     NXT_NO_FILE_EVENTS,
94     NXT_NO_SIGNAL_EVENTS,
95 };
96 
97 
98 static const nxt_lvlhsh_proto_t  nxt_poll_fd_hash_proto  nxt_aligned(64) =
99 {
100     NXT_LVLHSH_LARGE_MEMALIGN,
101     nxt_poll_fd_hash_test,
102     nxt_lvlhsh_alloc,
103     nxt_lvlhsh_free,
104 };
105 
106 
107 static nxt_int_t
108 nxt_poll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
109     nxt_uint_t mevents)
110 {
111     engine->u.poll.mchanges = mchanges;
112 
113     engine->u.poll.changes = nxt_malloc(sizeof(nxt_poll_change_t) * mchanges);
114 
115     if (engine->u.poll.changes != NULL) {
116         return NXT_OK;
117     }
118 
119     return NXT_ERROR;
120 }
121 
122 
123 static void
124 nxt_poll_free(nxt_event_engine_t *engine)
125 {
126     nxt_debug(&engine->task, "poll free");
127 
128     nxt_free(engine->u.poll.set);
129     nxt_free(engine->u.poll.changes);
130     nxt_poll_fd_hash_destroy(engine, &engine->u.poll.fd_hash);
131 
132     nxt_memzero(&engine->u.poll, sizeof(nxt_poll_engine_t));
133 }
134 
135 
136 static void
137 nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
138 {
139     ev->read = NXT_EVENT_ACTIVE;
140     ev->write = NXT_EVENT_ACTIVE;
141 
142     nxt_poll_change(engine, ev, NXT_POLL_ADD, POLLIN | POLLOUT);
143 }
144 
145 
146 static void
147 nxt_poll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
148 {
149     if (ev->read != NXT_EVENT_INACTIVE && ev->write != NXT_EVENT_INACTIVE) {
150         ev->read = NXT_EVENT_INACTIVE;
151         ev->write = NXT_EVENT_INACTIVE;
152 
153         nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
154     }
155 }
156 
157 
158 static nxt_bool_t
159 nxt_poll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
160 {
161     nxt_poll_disable(engine, ev);
162 
163     return ev->changing;
164 }
165 
166 
167 static void
168 nxt_poll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
169 {
170     nxt_uint_t  op, events;
171 
172     ev->read = NXT_EVENT_ACTIVE;
173 
174     if (ev->write == NXT_EVENT_INACTIVE) {
175         op = NXT_POLL_ADD;
176         events = POLLIN;
177 
178     } else {
179         op = NXT_POLL_CHANGE;
180         events = POLLIN | POLLOUT;
181     }
182 
183     nxt_poll_change(engine, ev, op, events);
184 }
185 
186 
187 static void
188 nxt_poll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
189 {
190     nxt_uint_t  op, events;
191 
192     ev->write = NXT_EVENT_ACTIVE;
193 
194     if (ev->read == NXT_EVENT_INACTIVE) {
195         op = NXT_POLL_ADD;
196         events = POLLOUT;
197 
198     } else {
199         op = NXT_POLL_CHANGE;
200         events = POLLIN | POLLOUT;
201     }
202 
203     nxt_poll_change(engine, ev, op, events);
204 }
205 
206 
207 static void
208 nxt_poll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
209 {
210     nxt_uint_t  op, events;
211 
212     ev->read = NXT_EVENT_INACTIVE;
213 
214     if (ev->write == NXT_EVENT_INACTIVE) {
215         op = NXT_POLL_DELETE;
216         events = 0;
217 
218     } else {
219         op = NXT_POLL_CHANGE;
220         events = POLLOUT;
221     }
222 
223     nxt_poll_change(engine, ev, op, events);
224 }
225 
226 
227 static void
228 nxt_poll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
229 {
230     nxt_uint_t  op, events;
231 
232     ev->write = NXT_EVENT_INACTIVE;
233 
234     if (ev->read == NXT_EVENT_INACTIVE) {
235         op = NXT_POLL_DELETE;
236         events = 0;
237 
238     } else {
239         op = NXT_POLL_CHANGE;
240         events = POLLIN;
241     }
242 
243     nxt_poll_change(engine, ev, op, events);
244 }
245 
246 
247 static void
248 nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
249 {
250     if (ev->read != NXT_EVENT_INACTIVE) {
251         nxt_poll_disable_read(engine, ev);
252     }
253 }
254 
255 
256 static void
257 nxt_poll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
258 {
259     if (ev->write != NXT_EVENT_INACTIVE) {
260         nxt_poll_disable_write(engine, ev);
261     }
262 }
263 
264 
265 static void
266 nxt_poll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
267 {
268     nxt_uint_t  op;
269 
270     op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
271              NXT_POLL_ADD : NXT_POLL_CHANGE;
272 
273     ev->read = NXT_EVENT_ONESHOT;
274     ev->write = NXT_EVENT_INACTIVE;
275 
276     nxt_poll_change(engine, ev, op, POLLIN);
277 }
278 
279 
280 static void
281 nxt_poll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
282 {
283     nxt_uint_t  op;
284 
285     op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
286              NXT_POLL_ADD : NXT_POLL_CHANGE;
287 
288     ev->read = NXT_EVENT_INACTIVE;
289     ev->write = NXT_EVENT_ONESHOT;
290 
291     nxt_poll_change(engine, ev, op, POLLOUT);
292 }
293 
294 
295 /*
296  * poll changes are batched to improve instruction and data cache
297  * locality of several lvlhsh operations followed by poll() call.
298  */
299 
300 static void
301 nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, nxt_uint_t op,
302     nxt_uint_t events)
303 {
304     nxt_poll_change_t  *change;
305 
306     nxt_debug(ev->task, "poll change: fd:%d op:%d ev:%XD", ev->fd, op, events);
307 
308     if (engine->u.poll.nchanges >= engine->u.poll.mchanges) {
309         (void) nxt_poll_commit_changes(engine);
310     }
311 
312     ev->changing = 1;
313 
314     change = &engine->u.poll.changes[engine->u.poll.nchanges++];
315     change->op = op;
316     change->events = events;
317     change->event = ev;
318 }
319 
320 
321 static nxt_int_t
322 nxt_poll_commit_changes(nxt_event_engine_t *engine)
323 {
324     nxt_int_t          ret, retval;
325     nxt_fd_event_t     *ev;
326     nxt_poll_change_t  *change, *end;
327 
328     nxt_debug(&engine->task, "poll changes:%ui", engine->u.poll.nchanges);
329 
330     retval = NXT_OK;
331     change = engine->u.poll.changes;
332     end = change + engine->u.poll.nchanges;
333 
334     do {
335         ev = change->event;
336         ev->changing = 0;
337 
338         switch (change->op) {
339 
340         case NXT_POLL_ADD:
341             ret = nxt_poll_set_add(engine, ev, change->events);
342 
343             if (nxt_fast_path(ret == NXT_OK)) {
344                 goto next;
345             }
346 
347             break;
348 
349         case NXT_POLL_CHANGE:
350             ret = nxt_poll_set_change(engine, ev->fd, change->events);
351 
352             if (nxt_fast_path(ret == NXT_OK)) {
353                 goto next;
354             }
355 
356             break;
357 
358         case NXT_POLL_DELETE:
359             ret = nxt_poll_set_delete(engine, ev->fd);
360 
361             if (nxt_fast_path(ret == NXT_OK)) {
362                 goto next;
363             }
364 
365             break;
366         }
367 
368         nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
369                            ev->task, ev, ev->data);
370 
371         retval = NXT_ERROR;
372 
373       next:
374 
375         change++;
376 
377     } while (change < end);
378 
379     engine->u.poll.nchanges = 0;
380 
381     return retval;
382 }
383 
384 
385 static nxt_int_t
386 nxt_poll_set_add(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int events)
387 {
388     nxt_int_t              ret;
389     nxt_uint_t             max_nfds;
390     struct pollfd          *pfd;
391     nxt_lvlhsh_query_t     lhq;
392     nxt_poll_hash_entry_t  *phe;
393 
394     nxt_debug(&engine->task, "poll add event: fd:%d ev:%04Xi", ev->fd, events);
395 
396     if (engine->u.poll.nfds >= engine->u.poll.max_nfds) {
397         max_nfds = engine->u.poll.max_nfds + 512; /* 4K */
398 
399         pfd = nxt_realloc(engine->u.poll.set, sizeof(struct pollfd) * max_nfds);
400         if (nxt_slow_path(pfd == NULL)) {
401             return NXT_ERROR;
402         }
403 
404         engine->u.poll.set = pfd;
405         engine->u.poll.max_nfds = max_nfds;
406     }
407 
408     phe = nxt_malloc(sizeof(nxt_poll_hash_entry_t));
409     if (nxt_slow_path(phe == NULL)) {
410         return NXT_ERROR;
411     }
412 
413     phe->fd = ev->fd;
414     phe->index = engine->u.poll.nfds;
415     phe->event = ev;
416 
417     pfd = &engine->u.poll.set[engine->u.poll.nfds++];
418     pfd->fd = ev->fd;
419     pfd->events = events;
420     pfd->revents = 0;
421 
422     lhq.key_hash = nxt_murmur_hash2(&ev->fd, sizeof(nxt_fd_t));
423     lhq.replace = 0;
424     lhq.value = phe;
425     lhq.proto = &nxt_poll_fd_hash_proto;
426     lhq.data = engine;
427 
428     ret = nxt_lvlhsh_insert(&engine->u.poll.fd_hash, &lhq);
429 
430     if (nxt_fast_path(ret == NXT_OK)) {
431         return NXT_OK;
432     }
433 
434     nxt_free(phe);
435 
436     return NXT_ERROR;
437 }
438 
439 
440 static nxt_int_t
441 nxt_poll_set_change(nxt_event_engine_t *engine, nxt_fd_t fd, int events)
442 {
443     nxt_poll_hash_entry_t  *phe;
444 
445     nxt_debug(&engine->task, "poll change event: fd:%d ev:%04Xi",
446               fd, events);
447 
448     phe = nxt_poll_fd_hash_get(engine, fd);
449 
450     if (nxt_fast_path(phe != NULL)) {
451         engine->u.poll.set[phe->index].events = events;
452         return NXT_OK;
453     }
454 
455     return NXT_ERROR;
456 }
457 
458 
459 static nxt_int_t
460 nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd)
461 {
462     nxt_int_t              ret;
463     nxt_uint_t             index, nfds;
464     nxt_lvlhsh_query_t     lhq;
465     nxt_poll_hash_entry_t  *phe;
466 
467     nxt_debug(&engine->task, "poll delete event: fd:%d", fd);
468 
469     lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t));
470     lhq.proto = &nxt_poll_fd_hash_proto;
471     lhq.data = engine;
472 
473     ret = nxt_lvlhsh_delete(&engine->u.poll.fd_hash, &lhq);
474 
475     if (nxt_slow_path(ret != NXT_OK)) {
476         return NXT_ERROR;
477     }
478 
479     phe = lhq.value;
480 
481     index = phe->index;
482     engine->u.poll.nfds--;
483     nfds = engine->u.poll.nfds;
484 
485     if (index != nfds) {
486         engine->u.poll.set[index] = engine->u.poll.set[nfds];
487 
488         phe = nxt_poll_fd_hash_get(engine, engine->u.poll.set[nfds].fd);
489 
490         phe->index = index;
491     }
492 
493     nxt_free(lhq.value);
494 
495     return NXT_OK;
496 }
497 
498 
499 static void
500 nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
501 {
502     int                    nevents;
503     nxt_fd_t               fd;
504     nxt_err_t              err;
505     nxt_bool_t             error;
506     nxt_uint_t             i, events, level;
507     struct pollfd          *pfd;
508     nxt_fd_event_t         *ev;
509     nxt_poll_hash_entry_t  *phe;
510 
511     if (engine->u.poll.nchanges != 0) {
512         if (nxt_poll_commit_changes(engine) != NXT_OK) {
513             /* Error handlers have been enqueued on failure. */
514             timeout = 0;
515         }
516     }
517 
518     nxt_debug(&engine->task, "poll() events:%ui timeout:%M",
519               engine->u.poll.nfds, timeout);
520 
521     nevents = poll(engine->u.poll.set, engine->u.poll.nfds, timeout);
522 
523     err = (nevents == -1) ? nxt_errno : 0;
524 
525     nxt_thread_time_update(engine->task.thread);
526 
527     nxt_debug(&engine->task, "poll(): %d", nevents);
528 
529     if (nevents == -1) {
530         level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
531         nxt_log(&engine->task, level, "poll() failed %E", err);
532         return;
533     }
534 
535     for (i = 0; i < engine->u.poll.nfds && nevents != 0; i++) {
536 
537         pfd = &engine->u.poll.set[i];
538         events = pfd->revents;
539 
540         if (events == 0) {
541             continue;
542         }
543 
544         fd = pfd->fd;
545 
546         phe = nxt_poll_fd_hash_get(engine, fd);
547 
548         if (nxt_slow_path(phe == NULL)) {
549             nxt_log(&engine->task, NXT_LOG_CRIT,
550                     "poll() returned invalid fd:%d ev:%04Xd rev:%04uXi",
551                     fd, pfd->events, events);
552 
553             /* Mark the poll entry to ignore it by the kernel. */
554             pfd->fd = -1;
555             goto next;
556         }
557 
558         ev = phe->event;
559 
560         nxt_debug(ev->task, "poll: fd:%d ev:%04uXi rd:%d %wr:%d",
561                   fd, events, ev->read, ev->write);
562 
563         if (nxt_slow_path((events & POLLNVAL) != 0)) {
564             nxt_log(ev->task, NXT_LOG_CRIT,
565                     "poll() error fd:%d ev:%04Xd rev:%04uXi",
566                     fd, pfd->events, events);
567 
568             /* Mark the poll entry to ignore it by the kernel. */
569             pfd->fd = -1;
570 
571             nxt_work_queue_add(&engine->fast_work_queue,
572                                ev->error_handler, ev->task, ev, ev->data);
573             goto next;
574         }
575 
576         /*
577          * On a socket's remote end close:
578          *
579          *   Linux, FreeBSD, and Solaris set POLLIN;
580          *   MacOSX sets POLLIN and POLLHUP;
581          *   NetBSD sets POLLIN, and poll(2) claims this explicitly:
582          *
583          *     If the remote end of a socket is closed, poll()
584          *     returns a POLLIN event, rather than a POLLHUP.
585          *
586          * On error:
587          *
588          *   Linux sets POLLHUP and POLLERR only;
589          *   FreeBSD adds POLLHUP to POLLIN or POLLOUT, although poll(2)
590          *   claims the opposite:
591          *
592          *     Note that POLLHUP and POLLOUT should never be
593          *     present in the revents bitmask at the same time.
594          *
595          *   Solaris and NetBSD do not add POLLHUP or POLLERR;
596          *   MacOSX sets POLLHUP only.
597          *
598          * If an implementation sets POLLERR or POLLHUP only without POLLIN
599          * or POLLOUT, the "error" variable enqueues only one active handler.
600          */
601 
602         error = (((events & (POLLERR | POLLHUP)) != 0)
603                  && ((events & (POLLIN | POLLOUT)) == 0));
604 
605         if ((events & POLLIN) || (error && ev->read_handler != NULL)) {
606             error = 0;
607             ev->read_ready = 1;
608 
609             if (ev->read == NXT_EVENT_ONESHOT) {
610                 ev->read = NXT_EVENT_INACTIVE;
611                 nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
612             }
613 
614             nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
615                                ev->task, ev, ev->data);
616         }
617 
618         if ((events & POLLOUT) || (error && ev->write_handler != NULL)) {
619             ev->write_ready = 1;
620 
621             if (ev->write == NXT_EVENT_ONESHOT) {
622                 ev->write = NXT_EVENT_INACTIVE;
623                 nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
624             }
625 
626             nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
627                                ev->task, ev, ev->data);
628         }
629 
630     next:
631 
632         nevents--;
633     }
634 }
635 
636 
637 static nxt_poll_hash_entry_t *
638 nxt_poll_fd_hash_get(nxt_event_engine_t *engine, nxt_fd_t fd)
639 {
640     nxt_lvlhsh_query_t     lhq;
641     nxt_poll_hash_entry_t  *phe;
642 
643     lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t));
644     lhq.proto = &nxt_poll_fd_hash_proto;
645     lhq.data = engine;
646 
647     if (nxt_lvlhsh_find(&engine->u.poll.fd_hash, &lhq) == NXT_OK) {
648         phe = lhq.value;
649         return phe;
650     }
651 
652     nxt_log(&engine->task, NXT_LOG_CRIT, "fd %d not found in hash", fd);
653 
654     return NULL;
655 }
656 
657 
658 static nxt_int_t
659 nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
660 {
661     nxt_event_engine_t     *engine;
662     nxt_poll_hash_entry_t  *phe;
663 
664     phe = data;
665 
666     /* nxt_murmur_hash2() is unique for 4 bytes. */
667 
668     engine = lhq->data;
669 
670     if (nxt_fast_path(phe->fd == engine->u.poll.set[phe->index].fd)) {
671         return NXT_OK;
672     }
673 
674     nxt_log(&engine->task, NXT_LOG_CRIT,
675             "fd %d in hash mismatches fd %d in poll set",
676             phe->fd, engine->u.poll.set[phe->index].fd);
677 
678     return NXT_DECLINED;
679 }
680 
681 
682 static void
683 nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine, nxt_lvlhsh_t *lh)
684 {
685     nxt_lvlhsh_each_t      lhe;
686     nxt_lvlhsh_query_t     lhq;
687     nxt_poll_hash_entry_t  *phe;
688 
689     nxt_memzero(&lhe, sizeof(nxt_lvlhsh_each_t));
690     lhe.proto = &nxt_poll_fd_hash_proto;
691     lhq.proto = &nxt_poll_fd_hash_proto;
692 
693     for ( ;; ) {
694         phe = nxt_lvlhsh_each(lh, &lhe);
695 
696         if (phe == NULL) {
697             return;
698         }
699 
700         lhq.key_hash = nxt_murmur_hash2(&phe->fd, sizeof(nxt_fd_t));
701 
702         if (nxt_lvlhsh_delete(lh, &lhq) != NXT_OK) {
703             nxt_log(&engine->task, NXT_LOG_CRIT,
704                     "event fd %d not found in hash", phe->fd);
705         }
706 
707         nxt_free(phe);
708     }
709 }
710