xref: /unit/src/nxt_poll_engine.c (revision 12:477899a6661b)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 #define NXT_POLL_ADD     0
11 #define NXT_POLL_CHANGE  1
12 #define NXT_POLL_DELETE  2
13 
14 
15 typedef struct {
16     /*
17      * A file descriptor is stored in hash entry to allow
18      * nxt_poll_fd_hash_test() to not dereference a pointer to
19      * nxt_fd_event_t which may be invalid if the file descriptor has
20      * been already closed and the nxt_fd_event_t's memory has been freed.
21      */
22     nxt_socket_t         fd;
23 
24     uint32_t             index;
25     void                 *event;
26 } nxt_poll_hash_entry_t;
27 
28 
29 static nxt_int_t nxt_poll_create(nxt_event_engine_t *engine,
30     nxt_uint_t mchanges, nxt_uint_t mevents);
31 static void nxt_poll_free(nxt_event_engine_t *engine);
32 static void nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
33 static void nxt_poll_disable(nxt_event_engine_t *engine,
34     nxt_fd_event_t *ev);
35 static nxt_bool_t nxt_poll_close(nxt_event_engine_t *engine,
36     nxt_fd_event_t *ev);
37 static void nxt_poll_enable_read(nxt_event_engine_t *engine,
38     nxt_fd_event_t *ev);
39 static void nxt_poll_enable_write(nxt_event_engine_t *engine,
40     nxt_fd_event_t *ev);
41 static void nxt_poll_disable_read(nxt_event_engine_t *engine,
42     nxt_fd_event_t *ev);
43 static void nxt_poll_disable_write(nxt_event_engine_t *engine,
44     nxt_fd_event_t *ev);
45 static void nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
46 static void nxt_poll_block_write(nxt_event_engine_t *engine,
47     nxt_fd_event_t *ev);
48 static void nxt_poll_oneshot_read(nxt_event_engine_t *engine,
49     nxt_fd_event_t *ev);
50 static void nxt_poll_oneshot_write(nxt_event_engine_t *engine,
51     nxt_fd_event_t *ev);
52 static void nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
53     nxt_uint_t op, nxt_uint_t events);
54 static nxt_int_t nxt_poll_commit_changes(nxt_event_engine_t *engine);
55 static nxt_int_t nxt_poll_set_add(nxt_event_engine_t *engine,
56     nxt_fd_event_t *ev, int events);
57 static nxt_int_t nxt_poll_set_change(nxt_event_engine_t *engine,
58     nxt_fd_t fd, int events);
59 static nxt_int_t nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd);
60 static void nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
61 static nxt_poll_hash_entry_t *nxt_poll_fd_hash_get(nxt_event_engine_t *engine,
62     nxt_fd_t fd);
63 static nxt_int_t nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
64 static void nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine,
65     nxt_lvlhsh_t *lh);
66 
67 
68 const nxt_event_interface_t  nxt_poll_engine = {
69     "poll",
70     nxt_poll_create,
71     nxt_poll_free,
72     nxt_poll_enable,
73     nxt_poll_disable,
74     nxt_poll_disable,
75     nxt_poll_close,
76     nxt_poll_enable_read,
77     nxt_poll_enable_write,
78     nxt_poll_disable_read,
79     nxt_poll_disable_write,
80     nxt_poll_block_read,
81     nxt_poll_block_write,
82     nxt_poll_oneshot_read,
83     nxt_poll_oneshot_write,
84     nxt_poll_enable_read,
85     NULL,
86     NULL,
87     NULL,
88     NULL,
89     nxt_poll,
90 
91     &nxt_unix_event_conn_io,
92 
93     NXT_NO_FILE_EVENTS,
94     NXT_NO_SIGNAL_EVENTS,
95 };
96 
97 
98 static const nxt_lvlhsh_proto_t  nxt_poll_fd_hash_proto  nxt_aligned(64) =
99 {
100     NXT_LVLHSH_LARGE_MEMALIGN,
101     0,
102     nxt_poll_fd_hash_test,
103     nxt_lvlhsh_alloc,
104     nxt_lvlhsh_free,
105 };
106 
107 
108 static nxt_int_t
109 nxt_poll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
110     nxt_uint_t mevents)
111 {
112     engine->u.poll.mchanges = mchanges;
113 
114     engine->u.poll.changes = nxt_malloc(sizeof(nxt_poll_change_t) * mchanges);
115 
116     if (engine->u.poll.changes != NULL) {
117         return NXT_OK;
118     }
119 
120     return NXT_ERROR;
121 }
122 
123 
124 static void
125 nxt_poll_free(nxt_event_engine_t *engine)
126 {
127     nxt_debug(&engine->task, "poll free");
128 
129     nxt_free(engine->u.poll.set);
130     nxt_free(engine->u.poll.changes);
131     nxt_poll_fd_hash_destroy(engine, &engine->u.poll.fd_hash);
132 
133     nxt_memzero(&engine->u.poll, sizeof(nxt_poll_engine_t));
134 }
135 
136 
137 static void
138 nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
139 {
140     ev->read = NXT_EVENT_ACTIVE;
141     ev->write = NXT_EVENT_ACTIVE;
142 
143     nxt_poll_change(engine, ev, NXT_POLL_ADD, POLLIN | POLLOUT);
144 }
145 
146 
147 static void
148 nxt_poll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
149 {
150     if (ev->read != NXT_EVENT_INACTIVE && ev->write != NXT_EVENT_INACTIVE) {
151         ev->read = NXT_EVENT_INACTIVE;
152         ev->write = NXT_EVENT_INACTIVE;
153 
154         nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
155     }
156 }
157 
158 
159 static nxt_bool_t
160 nxt_poll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
161 {
162     nxt_poll_disable(engine, ev);
163 
164     return ev->changing;
165 }
166 
167 
168 static void
169 nxt_poll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
170 {
171     nxt_uint_t  op, events;
172 
173     ev->read = NXT_EVENT_ACTIVE;
174 
175     if (ev->write == NXT_EVENT_INACTIVE) {
176         op = NXT_POLL_ADD;
177         events = POLLIN;
178 
179     } else {
180         op = NXT_POLL_CHANGE;
181         events = POLLIN | POLLOUT;
182     }
183 
184     nxt_poll_change(engine, ev, op, events);
185 }
186 
187 
188 static void
189 nxt_poll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
190 {
191     nxt_uint_t  op, events;
192 
193     ev->write = NXT_EVENT_ACTIVE;
194 
195     if (ev->read == NXT_EVENT_INACTIVE) {
196         op = NXT_POLL_ADD;
197         events = POLLOUT;
198 
199     } else {
200         op = NXT_POLL_CHANGE;
201         events = POLLIN | POLLOUT;
202     }
203 
204     nxt_poll_change(engine, ev, op, events);
205 }
206 
207 
208 static void
209 nxt_poll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
210 {
211     nxt_uint_t  op, events;
212 
213     ev->read = NXT_EVENT_INACTIVE;
214 
215     if (ev->write == NXT_EVENT_INACTIVE) {
216         op = NXT_POLL_DELETE;
217         events = 0;
218 
219     } else {
220         op = NXT_POLL_CHANGE;
221         events = POLLOUT;
222     }
223 
224     nxt_poll_change(engine, ev, op, events);
225 }
226 
227 
228 static void
229 nxt_poll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
230 {
231     nxt_uint_t  op, events;
232 
233     ev->write = NXT_EVENT_INACTIVE;
234 
235     if (ev->read == NXT_EVENT_INACTIVE) {
236         op = NXT_POLL_DELETE;
237         events = 0;
238 
239     } else {
240         op = NXT_POLL_CHANGE;
241         events = POLLIN;
242     }
243 
244     nxt_poll_change(engine, ev, op, events);
245 }
246 
247 
248 static void
249 nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
250 {
251     if (ev->read != NXT_EVENT_INACTIVE) {
252         nxt_poll_disable_read(engine, ev);
253     }
254 }
255 
256 
257 static void
258 nxt_poll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
259 {
260     if (ev->write != NXT_EVENT_INACTIVE) {
261         nxt_poll_disable_write(engine, ev);
262     }
263 }
264 
265 
266 static void
267 nxt_poll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
268 {
269     nxt_uint_t  op;
270 
271     op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
272              NXT_POLL_ADD : NXT_POLL_CHANGE;
273 
274     ev->read = NXT_EVENT_ONESHOT;
275     ev->write = NXT_EVENT_INACTIVE;
276 
277     nxt_poll_change(engine, ev, op, POLLIN);
278 }
279 
280 
281 static void
282 nxt_poll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
283 {
284     nxt_uint_t  op;
285 
286     op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
287              NXT_POLL_ADD : NXT_POLL_CHANGE;
288 
289     ev->read = NXT_EVENT_INACTIVE;
290     ev->write = NXT_EVENT_ONESHOT;
291 
292     nxt_poll_change(engine, ev, op, POLLOUT);
293 }
294 
295 
296 /*
297  * poll changes are batched to improve instruction and data cache
298  * locality of several lvlhsh operations followed by poll() call.
299  */
300 
301 static void
302 nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, nxt_uint_t op,
303     nxt_uint_t events)
304 {
305     nxt_poll_change_t  *change;
306 
307     nxt_debug(ev->task, "poll change: fd:%d op:%d ev:%XD", ev->fd, op, events);
308 
309     if (engine->u.poll.nchanges >= engine->u.poll.mchanges) {
310         (void) nxt_poll_commit_changes(engine);
311     }
312 
313     ev->changing = 1;
314 
315     change = &engine->u.poll.changes[engine->u.poll.nchanges++];
316     change->op = op;
317     change->events = events;
318     change->event = ev;
319 }
320 
321 
322 static nxt_int_t
323 nxt_poll_commit_changes(nxt_event_engine_t *engine)
324 {
325     nxt_int_t          ret, retval;
326     nxt_fd_event_t     *ev;
327     nxt_poll_change_t  *change, *end;
328 
329     nxt_debug(&engine->task, "poll changes:%ui", engine->u.poll.nchanges);
330 
331     retval = NXT_OK;
332     change = engine->u.poll.changes;
333     end = change + engine->u.poll.nchanges;
334 
335     do {
336         ev = change->event;
337         ev->changing = 0;
338 
339         switch (change->op) {
340 
341         case NXT_POLL_ADD:
342             ret = nxt_poll_set_add(engine, ev, change->events);
343 
344             if (nxt_fast_path(ret == NXT_OK)) {
345                 goto next;
346             }
347 
348             break;
349 
350         case NXT_POLL_CHANGE:
351             ret = nxt_poll_set_change(engine, ev->fd, change->events);
352 
353             if (nxt_fast_path(ret == NXT_OK)) {
354                 goto next;
355             }
356 
357             break;
358 
359         case NXT_POLL_DELETE:
360             ret = nxt_poll_set_delete(engine, ev->fd);
361 
362             if (nxt_fast_path(ret == NXT_OK)) {
363                 goto next;
364             }
365 
366             break;
367         }
368 
369         nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
370                            ev->task, ev, ev->data);
371 
372         retval = NXT_ERROR;
373 
374       next:
375 
376         change++;
377 
378     } while (change < end);
379 
380     engine->u.poll.nchanges = 0;
381 
382     return retval;
383 }
384 
385 
386 static nxt_int_t
387 nxt_poll_set_add(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int events)
388 {
389     nxt_int_t              ret;
390     nxt_uint_t             max_nfds;
391     struct pollfd          *pfd;
392     nxt_lvlhsh_query_t     lhq;
393     nxt_poll_hash_entry_t  *phe;
394 
395     nxt_debug(&engine->task, "poll add event: fd:%d ev:%04Xi", ev->fd, events);
396 
397     if (engine->u.poll.nfds >= engine->u.poll.max_nfds) {
398         max_nfds = engine->u.poll.max_nfds + 512; /* 4K */
399 
400         pfd = nxt_realloc(engine->u.poll.set, sizeof(struct pollfd) * max_nfds);
401         if (nxt_slow_path(pfd == NULL)) {
402             return NXT_ERROR;
403         }
404 
405         engine->u.poll.set = pfd;
406         engine->u.poll.max_nfds = max_nfds;
407     }
408 
409     phe = nxt_malloc(sizeof(nxt_poll_hash_entry_t));
410     if (nxt_slow_path(phe == NULL)) {
411         return NXT_ERROR;
412     }
413 
414     phe->fd = ev->fd;
415     phe->index = engine->u.poll.nfds;
416     phe->event = ev;
417 
418     pfd = &engine->u.poll.set[engine->u.poll.nfds++];
419     pfd->fd = ev->fd;
420     pfd->events = events;
421     pfd->revents = 0;
422 
423     lhq.key_hash = nxt_murmur_hash2(&ev->fd, sizeof(nxt_fd_t));
424     lhq.replace = 0;
425     lhq.value = phe;
426     lhq.proto = &nxt_poll_fd_hash_proto;
427     lhq.data = engine;
428 
429     ret = nxt_lvlhsh_insert(&engine->u.poll.fd_hash, &lhq);
430 
431     if (nxt_fast_path(ret == NXT_OK)) {
432         return NXT_OK;
433     }
434 
435     nxt_free(phe);
436 
437     return NXT_ERROR;
438 }
439 
440 
441 static nxt_int_t
442 nxt_poll_set_change(nxt_event_engine_t *engine, nxt_fd_t fd, int events)
443 {
444     nxt_poll_hash_entry_t  *phe;
445 
446     nxt_debug(&engine->task, "poll change event: fd:%d ev:%04Xi",
447               fd, events);
448 
449     phe = nxt_poll_fd_hash_get(engine, fd);
450 
451     if (nxt_fast_path(phe != NULL)) {
452         engine->u.poll.set[phe->index].events = events;
453         return NXT_OK;
454     }
455 
456     return NXT_ERROR;
457 }
458 
459 
460 static nxt_int_t
461 nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd)
462 {
463     nxt_int_t              ret;
464     nxt_uint_t             index, nfds;
465     nxt_lvlhsh_query_t     lhq;
466     nxt_poll_hash_entry_t  *phe;
467 
468     nxt_debug(&engine->task, "poll delete event: fd:%d", fd);
469 
470     lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t));
471     lhq.proto = &nxt_poll_fd_hash_proto;
472     lhq.data = engine;
473 
474     ret = nxt_lvlhsh_delete(&engine->u.poll.fd_hash, &lhq);
475 
476     if (nxt_slow_path(ret != NXT_OK)) {
477         return NXT_ERROR;
478     }
479 
480     phe = lhq.value;
481 
482     index = phe->index;
483     engine->u.poll.nfds--;
484     nfds = engine->u.poll.nfds;
485 
486     if (index != nfds) {
487         engine->u.poll.set[index] = engine->u.poll.set[nfds];
488 
489         phe = nxt_poll_fd_hash_get(engine, engine->u.poll.set[nfds].fd);
490 
491         phe->index = index;
492     }
493 
494     nxt_free(lhq.value);
495 
496     return NXT_OK;
497 }
498 
499 
500 static void
501 nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
502 {
503     int                    nevents;
504     nxt_fd_t               fd;
505     nxt_err_t              err;
506     nxt_bool_t             error;
507     nxt_uint_t             i, events, level;
508     struct pollfd          *pfd;
509     nxt_fd_event_t         *ev;
510     nxt_poll_hash_entry_t  *phe;
511 
512     if (engine->u.poll.nchanges != 0) {
513         if (nxt_poll_commit_changes(engine) != NXT_OK) {
514             /* Error handlers have been enqueued on failure. */
515             timeout = 0;
516         }
517     }
518 
519     nxt_debug(&engine->task, "poll() events:%ui timeout:%M",
520               engine->u.poll.nfds, timeout);
521 
522     nevents = poll(engine->u.poll.set, engine->u.poll.nfds, timeout);
523 
524     err = (nevents == -1) ? nxt_errno : 0;
525 
526     nxt_thread_time_update(engine->task.thread);
527 
528     nxt_debug(&engine->task, "poll(): %d", nevents);
529 
530     if (nevents == -1) {
531         level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
532         nxt_log(&engine->task, level, "poll() failed %E", err);
533         return;
534     }
535 
536     for (i = 0; i < engine->u.poll.nfds && nevents != 0; i++) {
537 
538         pfd = &engine->u.poll.set[i];
539         events = pfd->revents;
540 
541         if (events == 0) {
542             continue;
543         }
544 
545         fd = pfd->fd;
546 
547         phe = nxt_poll_fd_hash_get(engine, fd);
548 
549         if (nxt_slow_path(phe == NULL)) {
550             nxt_log(&engine->task, NXT_LOG_CRIT,
551                     "poll() returned invalid fd:%d ev:%04Xd rev:%04uXi",
552                     fd, pfd->events, events);
553 
554             /* Mark the poll entry to ignore it by the kernel. */
555             pfd->fd = -1;
556             goto next;
557         }
558 
559         ev = phe->event;
560 
561         nxt_debug(ev->task, "poll: fd:%d ev:%04uXi rd:%d %wr:%d",
562                   fd, events, ev->read, ev->write);
563 
564         if (nxt_slow_path((events & POLLNVAL) != 0)) {
565             nxt_log(ev->task, NXT_LOG_CRIT,
566                     "poll() error fd:%d ev:%04Xd rev:%04uXi",
567                     fd, pfd->events, events);
568 
569             /* Mark the poll entry to ignore it by the kernel. */
570             pfd->fd = -1;
571 
572             nxt_work_queue_add(&engine->fast_work_queue,
573                                ev->error_handler, ev->task, ev, ev->data);
574             goto next;
575         }
576 
577         /*
578          * On a socket's remote end close:
579          *
580          *   Linux, FreeBSD, and Solaris set POLLIN;
581          *   MacOSX sets POLLIN and POLLHUP;
582          *   NetBSD sets POLLIN, and poll(2) claims this explicitly:
583          *
584          *     If the remote end of a socket is closed, poll()
585          *     returns a POLLIN event, rather than a POLLHUP.
586          *
587          * On error:
588          *
589          *   Linux sets POLLHUP and POLLERR only;
590          *   FreeBSD adds POLLHUP to POLLIN or POLLOUT, although poll(2)
591          *   claims the opposite:
592          *
593          *     Note that POLLHUP and POLLOUT should never be
594          *     present in the revents bitmask at the same time.
595          *
596          *   Solaris and NetBSD do not add POLLHUP or POLLERR;
597          *   MacOSX sets POLLHUP only.
598          *
599          * If an implementation sets POLLERR or POLLHUP only without POLLIN
600          * or POLLOUT, the "error" variable enqueues only one active handler.
601          */
602 
603         error = (((events & (POLLERR | POLLHUP)) != 0)
604                  && ((events & (POLLIN | POLLOUT)) == 0));
605 
606         if ((events & POLLIN) || (error && ev->read_handler != NULL)) {
607             error = 0;
608             ev->read_ready = 1;
609 
610             if (ev->read == NXT_EVENT_ONESHOT) {
611                 ev->read = NXT_EVENT_INACTIVE;
612                 nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
613             }
614 
615             nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
616                                ev->task, ev, ev->data);
617         }
618 
619         if ((events & POLLOUT) || (error && ev->write_handler != NULL)) {
620             ev->write_ready = 1;
621 
622             if (ev->write == NXT_EVENT_ONESHOT) {
623                 ev->write = NXT_EVENT_INACTIVE;
624                 nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
625             }
626 
627             nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
628                                ev->task, ev, ev->data);
629         }
630 
631     next:
632 
633         nevents--;
634     }
635 }
636 
637 
638 static nxt_poll_hash_entry_t *
639 nxt_poll_fd_hash_get(nxt_event_engine_t *engine, nxt_fd_t fd)
640 {
641     nxt_lvlhsh_query_t     lhq;
642     nxt_poll_hash_entry_t  *phe;
643 
644     lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t));
645     lhq.proto = &nxt_poll_fd_hash_proto;
646     lhq.data = engine;
647 
648     if (nxt_lvlhsh_find(&engine->u.poll.fd_hash, &lhq) == NXT_OK) {
649         phe = lhq.value;
650         return phe;
651     }
652 
653     nxt_log(&engine->task, NXT_LOG_CRIT, "fd %d not found in hash", fd);
654 
655     return NULL;
656 }
657 
658 
659 static nxt_int_t
660 nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
661 {
662     nxt_event_engine_t     *engine;
663     nxt_poll_hash_entry_t  *phe;
664 
665     phe = data;
666 
667     /* nxt_murmur_hash2() is unique for 4 bytes. */
668 
669     engine = lhq->data;
670 
671     if (nxt_fast_path(phe->fd == engine->u.poll.set[phe->index].fd)) {
672         return NXT_OK;
673     }
674 
675     nxt_log(&engine->task, NXT_LOG_CRIT,
676             "fd %d in hash mismatches fd %d in poll set",
677             phe->fd, engine->u.poll.set[phe->index].fd);
678 
679     return NXT_DECLINED;
680 }
681 
682 
683 static void
684 nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine, nxt_lvlhsh_t *lh)
685 {
686     nxt_lvlhsh_each_t      lhe;
687     nxt_lvlhsh_query_t     lhq;
688     nxt_poll_hash_entry_t  *phe;
689 
690     nxt_memzero(&lhe, sizeof(nxt_lvlhsh_each_t));
691     lhe.proto = &nxt_poll_fd_hash_proto;
692     lhq.proto = &nxt_poll_fd_hash_proto;
693 
694     for ( ;; ) {
695         phe = nxt_lvlhsh_each(lh, &lhe);
696 
697         if (phe == NULL) {
698             return;
699         }
700 
701         lhq.key_hash = nxt_murmur_hash2(&phe->fd, sizeof(nxt_fd_t));
702 
703         if (nxt_lvlhsh_delete(lh, &lhq) != NXT_OK) {
704             nxt_log(&engine->task, NXT_LOG_CRIT,
705                     "event fd %d not found in hash", phe->fd);
706         }
707 
708         nxt_free(phe);
709     }
710 }
711