xref: /unit/src/nxt_devpoll_engine.c (revision 564:762f8c976ead)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * "/dev/poll" has been introduced in Solaris 7 (11/99), HP-UX 11.22 (named
12  * "eventport pseudo driver" internally, not to be confused with Solaris 10
13  * event ports), IRIX 6.5.15, and Tru64 UNIX 5.1A.
14  *
15  * Although "/dev/poll" descriptor is a file descriptor, nevertheless
16  * it cannot be added to another poll set, Solaris poll(7d):
17  *
18  *   The /dev/poll driver does not yet support polling.  Polling on a
19  *   /dev/poll file descriptor will result in POLLERR being returned
20  *   in the revents field of pollfd structure.
21  */
22 
23 
24 #define NXT_DEVPOLL_ADD     0
25 #define NXT_DEVPOLL_UPDATE  1
26 #define NXT_DEVPOLL_CHANGE  2
27 #define NXT_DEVPOLL_DELETE  3
28 
29 
30 static nxt_int_t nxt_devpoll_create(nxt_event_engine_t *engine,
31     nxt_uint_t mchanges, nxt_uint_t mevents);
32 static void nxt_devpoll_free(nxt_event_engine_t *engine);
33 static void nxt_devpoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
34 static void nxt_devpoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
35 static nxt_bool_t nxt_devpoll_close(nxt_event_engine_t *engine,
36     nxt_fd_event_t *ev);
37 static void nxt_devpoll_enable_read(nxt_event_engine_t *engine,
38     nxt_fd_event_t *ev);
39 static void nxt_devpoll_enable_write(nxt_event_engine_t *engine,
40     nxt_fd_event_t *ev);
41 static void nxt_devpoll_disable_read(nxt_event_engine_t *engine,
42     nxt_fd_event_t *ev);
43 static void nxt_devpoll_disable_write(nxt_event_engine_t *engine,
44     nxt_fd_event_t *ev);
45 static void nxt_devpoll_block_read(nxt_event_engine_t *engine,
46     nxt_fd_event_t *ev);
47 static void nxt_devpoll_block_write(nxt_event_engine_t *engine,
48     nxt_fd_event_t *ev);
49 static void nxt_devpoll_oneshot_read(nxt_event_engine_t *engine,
50     nxt_fd_event_t *ev);
51 static void nxt_devpoll_oneshot_write(nxt_event_engine_t *engine,
52     nxt_fd_event_t *ev);
53 static void nxt_devpoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
54     nxt_uint_t op, nxt_uint_t events);
55 static nxt_int_t nxt_devpoll_commit_changes(nxt_event_engine_t *engine);
56 static void nxt_devpoll_change_error(nxt_event_engine_t *engine,
57     nxt_fd_event_t *ev);
58 static void nxt_devpoll_remove(nxt_event_engine_t *engine, nxt_fd_t fd);
59 static nxt_int_t nxt_devpoll_write(nxt_event_engine_t *engine,
60     struct pollfd *pfd, size_t n);
61 static void nxt_devpoll_poll(nxt_event_engine_t *engine,
62     nxt_msec_t timeout);
63 
64 
65 const nxt_event_interface_t  nxt_devpoll_engine = {
66     "devpoll",
67     nxt_devpoll_create,
68     nxt_devpoll_free,
69     nxt_devpoll_enable,
70     nxt_devpoll_disable,
71     nxt_devpoll_disable,
72     nxt_devpoll_close,
73     nxt_devpoll_enable_read,
74     nxt_devpoll_enable_write,
75     nxt_devpoll_disable_read,
76     nxt_devpoll_disable_write,
77     nxt_devpoll_block_read,
78     nxt_devpoll_block_write,
79     nxt_devpoll_oneshot_read,
80     nxt_devpoll_oneshot_write,
81     nxt_devpoll_enable_read,
82     NULL,
83     NULL,
84     NULL,
85     NULL,
86     nxt_devpoll_poll,
87 
88     &nxt_unix_conn_io,
89 
90     NXT_NO_FILE_EVENTS,
91     NXT_NO_SIGNAL_EVENTS,
92 };
93 
94 
95 static nxt_int_t
nxt_devpoll_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)96 nxt_devpoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
97     nxt_uint_t mevents)
98 {
99     void  *changes;
100 
101     engine->u.devpoll.fd = -1;
102     engine->u.devpoll.mchanges = mchanges;
103     engine->u.devpoll.mevents = mevents;
104 
105     changes = nxt_malloc(sizeof(nxt_devpoll_change_t) * mchanges);
106     if (changes == NULL) {
107         goto fail;
108     }
109 
110     engine->u.devpoll.changes = changes;
111 
112     /*
113      * NXT_DEVPOLL_CHANGE requires two struct pollfd's:
114      * for POLLREMOVE and subsequent POLLIN or POLLOUT.
115      */
116     changes = nxt_malloc(2 * sizeof(struct pollfd) * mchanges);
117     if (changes == NULL) {
118         goto fail;
119     }
120 
121     engine->u.devpoll.write_changes = changes;
122 
123     engine->u.devpoll.events = nxt_malloc(sizeof(struct pollfd) * mevents);
124     if (engine->u.devpoll.events == NULL) {
125         goto fail;
126     }
127 
128     engine->u.devpoll.fd = open("/dev/poll", O_RDWR);
129 
130     if (engine->u.devpoll.fd == -1) {
131         nxt_alert(&engine->task, "open(\"/dev/poll\") failed %E", nxt_errno);
132         goto fail;
133     }
134 
135     nxt_debug(&engine->task, "open(\"/dev/poll\"): %d", engine->u.devpoll.fd);
136 
137     return NXT_OK;
138 
139 fail:
140 
141     nxt_devpoll_free(engine);
142 
143     return NXT_ERROR;
144 }
145 
146 
147 static void
nxt_devpoll_free(nxt_event_engine_t * engine)148 nxt_devpoll_free(nxt_event_engine_t *engine)
149 {
150     nxt_fd_t  fd;
151 
152     fd = engine->u.devpoll.fd;
153 
154     nxt_debug(&engine->task, "devpoll %d free", fd);
155 
156     if (fd != -1 && close(fd) != 0) {
157         nxt_alert(&engine->task, "devpoll close(%d) failed %E", fd, nxt_errno);
158     }
159 
160     nxt_free(engine->u.devpoll.events);
161     nxt_free(engine->u.devpoll.write_changes);
162     nxt_free(engine->u.devpoll.changes);
163     nxt_fd_event_hash_destroy(&engine->u.devpoll.fd_hash);
164 
165     nxt_memzero(&engine->u.devpoll, sizeof(nxt_devpoll_engine_t));
166 }
167 
168 
169 static void
nxt_devpoll_enable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)170 nxt_devpoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
171 {
172     ev->read = NXT_EVENT_ACTIVE;
173     ev->write = NXT_EVENT_ACTIVE;
174 
175     nxt_devpoll_change(engine, ev, NXT_DEVPOLL_ADD, POLLIN | POLLOUT);
176 }
177 
178 
179 static void
nxt_devpoll_disable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)180 nxt_devpoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
181 {
182     if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
183 
184         ev->read = NXT_EVENT_INACTIVE;
185         ev->write = NXT_EVENT_INACTIVE;
186 
187         nxt_devpoll_change(engine, ev, NXT_DEVPOLL_DELETE, POLLREMOVE);
188     }
189 }
190 
191 
192 /*
193  * Solaris does not automatically remove a closed file descriptor from
194  * a "/dev/poll" set: ioctl(DP_ISPOLLED) for the descriptor returns 1,
195  * significative of active descriptor.  POLLREMOVE can remove already
196  * closed file descriptor, so the removal can be batched, Solaris poll(7d):
197  *
198  *   When using the "/dev/poll" driver, you should remove a closed file
199  *   descriptor from a monitored poll set.  Failure to do so may result
200  *   in a POLLNVAL revents being returned for the closed file descriptor.
201  *   When a file descriptor is closed but not removed from the monitored
202  *   set, and is reused in subsequent open of a different device, you
203  *   will be polling the device associated with the reused file descriptor.
204  *   In a multithreaded application, careful coordination among threads
205  *   doing close and DP_POLL ioctl is recommended for consistent results.
206  *
207  * Besides Solaris and HP-UX allow to add invalid descriptors to an
208  * "/dev/poll" set, although the descriptors are not marked as polled,
209  * that is, ioctl(DP_ISPOLLED) returns 0.
210  *
211  * HP-UX poll(7):
212  *
213  *   When a polled file descriptor is closed, it is automatically
214  *   deregistered.
215  */
216 
217 static nxt_bool_t
nxt_devpoll_close(nxt_event_engine_t * engine,nxt_fd_event_t * ev)218 nxt_devpoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
219 {
220     nxt_devpoll_disable(engine, ev);
221 
222     return ev->changing;
223 }
224 
225 
226 /*
227  * Solaris poll(7d):
228  *
229  *   The fd field specifies the file descriptor being polled.  The events
230  *   field indicates the interested poll events on the file descriptor.
231  *   If a pollfd array contains multiple pollfd entries with the same fd field,
232  *   the "events" field in each pollfd entry is OR'ed.  A special POLLREMOVE
233  *   event in the events field of the pollfd structure removes the fd from
234  *   the monitored set. The revents field is not used.
235  */
236 
237 static void
nxt_devpoll_enable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)238 nxt_devpoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
239 {
240     nxt_uint_t  op, events;
241 
242     if (ev->read != NXT_EVENT_BLOCKED) {
243 
244         events = POLLIN;
245 
246         if (ev->write == NXT_EVENT_INACTIVE) {
247             op = NXT_DEVPOLL_ADD;
248 
249         } else if (ev->write == NXT_EVENT_BLOCKED) {
250             ev->write = NXT_EVENT_INACTIVE;
251             op = NXT_DEVPOLL_CHANGE;
252 
253         } else {
254             op = NXT_DEVPOLL_UPDATE;
255             events = POLLIN | POLLOUT;
256         }
257 
258         nxt_devpoll_change(engine, ev, op, events);
259     }
260 
261     ev->read = NXT_EVENT_ACTIVE;
262 }
263 
264 
265 static void
nxt_devpoll_enable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)266 nxt_devpoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
267 {
268     nxt_uint_t  op, events;
269 
270     if (ev->write != NXT_EVENT_BLOCKED) {
271 
272         events = POLLOUT;
273 
274         if (ev->read == NXT_EVENT_INACTIVE) {
275             op = NXT_DEVPOLL_ADD;
276 
277         } else if (ev->read == NXT_EVENT_BLOCKED) {
278             ev->read = NXT_EVENT_INACTIVE;
279             op = NXT_DEVPOLL_CHANGE;
280 
281         } else {
282             op = NXT_DEVPOLL_UPDATE;
283             events = POLLIN | POLLOUT;
284         }
285 
286         nxt_devpoll_change(engine, ev, op, events);
287     }
288 
289     ev->write = NXT_EVENT_ACTIVE;
290 }
291 
292 
293 static void
nxt_devpoll_disable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)294 nxt_devpoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
295 {
296     nxt_uint_t  op, events;
297 
298     ev->read = NXT_EVENT_INACTIVE;
299 
300     if (ev->write <= NXT_EVENT_BLOCKED) {
301         ev->write = NXT_EVENT_INACTIVE;
302         op = NXT_DEVPOLL_DELETE;
303         events = POLLREMOVE;
304 
305     } else {
306         op = NXT_DEVPOLL_CHANGE;
307         events = POLLOUT;
308     }
309 
310     nxt_devpoll_change(engine, ev, op, events);
311 }
312 
313 
314 static void
nxt_devpoll_disable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)315 nxt_devpoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
316 {
317     nxt_uint_t  op, events;
318 
319     ev->write = NXT_EVENT_INACTIVE;
320 
321     if (ev->read <= NXT_EVENT_BLOCKED) {
322         ev->read = NXT_EVENT_INACTIVE;
323         op = NXT_DEVPOLL_DELETE;
324         events = POLLREMOVE;
325 
326     } else {
327         op = NXT_DEVPOLL_CHANGE;
328         events = POLLIN;
329     }
330 
331     nxt_devpoll_change(engine, ev, op, events);
332 }
333 
334 
335 static void
nxt_devpoll_block_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)336 nxt_devpoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
337 {
338     if (ev->read != NXT_EVENT_INACTIVE) {
339         ev->read = NXT_EVENT_BLOCKED;
340     }
341 }
342 
343 
344 static void
nxt_devpoll_block_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)345 nxt_devpoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
346 {
347     if (ev->write != NXT_EVENT_INACTIVE) {
348         ev->write = NXT_EVENT_BLOCKED;
349     }
350 }
351 
352 
353 static void
nxt_devpoll_oneshot_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)354 nxt_devpoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
355 {
356     nxt_devpoll_enable_read(engine, ev);
357 
358     ev->read = NXT_EVENT_ONESHOT;
359 }
360 
361 
362 static void
nxt_devpoll_oneshot_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)363 nxt_devpoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
364 {
365     nxt_devpoll_enable_write(engine, ev);
366 
367     ev->write = NXT_EVENT_ONESHOT;
368 }
369 
370 
371 static void
nxt_devpoll_change(nxt_event_engine_t * engine,nxt_fd_event_t * ev,nxt_uint_t op,nxt_uint_t events)372 nxt_devpoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
373     nxt_uint_t op, nxt_uint_t events)
374 {
375     nxt_devpoll_change_t  *change;
376 
377     nxt_debug(ev->task, "devpoll %d change fd:%d op:%ui ev:%04Xi",
378               engine->u.devpoll.fd, ev->fd, op, events);
379 
380     if (engine->u.devpoll.nchanges >= engine->u.devpoll.mchanges) {
381         (void) nxt_devpoll_commit_changes(engine);
382     }
383 
384     ev->changing = 1;
385 
386     change = &engine->u.devpoll.changes[engine->u.devpoll.nchanges++];
387     change->op = op;
388     change->events = events;
389     change->event = ev;
390 }
391 
392 
393 static nxt_int_t
nxt_devpoll_commit_changes(nxt_event_engine_t * engine)394 nxt_devpoll_commit_changes(nxt_event_engine_t *engine)
395 {
396     size_t                n;
397     nxt_int_t             ret, retval;
398     struct pollfd         *pfd, *write_changes;
399     nxt_fd_event_t        *ev;
400     nxt_devpoll_change_t  *change, *end;
401 
402     nxt_debug(&engine->task, "devpoll %d changes:%ui",
403               engine->u.devpoll.fd, engine->u.devpoll.nchanges);
404 
405     retval = NXT_OK;
406     n = 0;
407     write_changes = engine->u.devpoll.write_changes;
408     change = engine->u.devpoll.changes;
409     end = change + engine->u.devpoll.nchanges;
410 
411     do {
412         ev = change->event;
413 
414         nxt_debug(&engine->task, "devpoll fd:%d op:%d ev:%04Xd",
415                   ev->fd, change->op, change->events);
416 
417         if (change->op == NXT_DEVPOLL_CHANGE) {
418             pfd = &write_changes[n++];
419             pfd->fd = ev->fd;
420             pfd->events = POLLREMOVE;
421             pfd->revents = 0;
422         }
423 
424         pfd = &write_changes[n++];
425         pfd->fd = ev->fd;
426         pfd->events = change->events;
427         pfd->revents = 0;
428 
429         ev->changing = 0;
430 
431         change++;
432 
433     } while (change < end);
434 
435     change = engine->u.devpoll.changes;
436     end = change + engine->u.devpoll.nchanges;
437 
438     ret = nxt_devpoll_write(engine, write_changes, n);
439 
440     if (nxt_slow_path(ret != NXT_OK)) {
441 
442         do {
443             nxt_devpoll_change_error(engine, change->event);
444             change++;
445         } while (change < end);
446 
447         engine->u.devpoll.nchanges = 0;
448 
449         return NXT_ERROR;
450     }
451 
452     do {
453         ev = change->event;
454 
455         if (change->op == NXT_DEVPOLL_ADD) {
456             ret = nxt_fd_event_hash_add(&engine->u.devpoll.fd_hash, ev->fd, ev);
457 
458             if (nxt_slow_path(ret != NXT_OK)) {
459                 nxt_devpoll_change_error(engine, ev);
460                 retval = NXT_ERROR;
461             }
462 
463         } else if (change->op == NXT_DEVPOLL_DELETE) {
464             nxt_fd_event_hash_delete(&engine->task, &engine->u.devpoll.fd_hash,
465                                      ev->fd, 0);
466         }
467 
468         /* Nothing tp do for NXT_DEVPOLL_UPDATE and NXT_DEVPOLL_CHANGE. */
469 
470         change++;
471 
472     } while (change < end);
473 
474     engine->u.devpoll.nchanges = 0;
475 
476     return retval;
477 }
478 
479 
480 static void
nxt_devpoll_change_error(nxt_event_engine_t * engine,nxt_fd_event_t * ev)481 nxt_devpoll_change_error(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
482 {
483     ev->read = NXT_EVENT_INACTIVE;
484     ev->write = NXT_EVENT_INACTIVE;
485 
486     nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
487                        ev->task, ev, ev->data);
488 
489     nxt_fd_event_hash_delete(ev->task, &engine->u.devpoll.fd_hash, ev->fd, 1);
490 
491     nxt_devpoll_remove(engine, ev->fd);
492 }
493 
494 
495 static void
nxt_devpoll_remove(nxt_event_engine_t * engine,nxt_fd_t fd)496 nxt_devpoll_remove(nxt_event_engine_t *engine, nxt_fd_t fd)
497 {
498     int            n;
499     struct pollfd  pfd;
500 
501     pfd.fd = fd;
502     pfd.events = 0;
503     pfd.revents = 0;
504 
505     n = ioctl(engine->u.devpoll.fd, DP_ISPOLLED, &pfd);
506 
507     nxt_debug(&engine->task, "ioctl(%d, DP_ISPOLLED, %d): %d",
508               engine->u.devpoll.fd, fd, n);
509 
510     if (n == 0) {
511         /* The file descriptor is not in the set. */
512         return;
513     }
514 
515     if (n == -1) {
516         nxt_alert(&engine->task, "ioctl(%d, DP_ISPOLLED, %d) failed %E",
517                   engine->u.devpoll.fd, fd, nxt_errno);
518         /* Fall through. */
519     }
520 
521     /* n == 1: the file descriptor is in the set. */
522 
523     nxt_debug(&engine->task, "devpoll %d remove fd:%d",
524               engine->u.devpoll.fd, fd);
525 
526     pfd.fd = fd;
527     pfd.events = POLLREMOVE;
528     pfd.revents = 0;
529 
530     nxt_devpoll_write(engine, &pfd, 1);
531 }
532 
533 
534 static nxt_int_t
nxt_devpoll_write(nxt_event_engine_t * engine,struct pollfd * pfd,size_t n)535 nxt_devpoll_write(nxt_event_engine_t *engine, struct pollfd *pfd, size_t n)
536 {
537     int  fd;
538 
539     fd = engine->u.devpoll.fd;
540 
541     nxt_debug(&engine->task, "devpoll write(%d) changes:%uz", fd, n);
542 
543     n *= sizeof(struct pollfd);
544 
545     if (nxt_slow_path(write(fd, pfd, n) == (ssize_t) n)) {
546         return NXT_OK;
547     }
548 
549     nxt_alert(&engine->task, "devpoll write(%d) failed %E", fd, nxt_errno);
550 
551     return NXT_ERROR;
552 }
553 
554 
555 static void
nxt_devpoll_poll(nxt_event_engine_t * engine,nxt_msec_t timeout)556 nxt_devpoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
557 {
558     int             nevents;
559     nxt_fd_t        fd;
560     nxt_int_t       i;
561     nxt_err_t       err;
562     nxt_uint_t      events, level;
563     struct dvpoll   dvp;
564     struct pollfd   *pfd;
565     nxt_fd_event_t  *ev;
566 
567     if (engine->u.devpoll.nchanges != 0) {
568         if (nxt_devpoll_commit_changes(engine) != NXT_OK) {
569             /* Error handlers have been enqueued on failure. */
570             timeout = 0;
571         }
572     }
573 
574     nxt_debug(&engine->task, "ioctl(%d, DP_POLL) timeout:%M",
575               engine->u.devpoll.fd, timeout);
576 
577     dvp.dp_fds = engine->u.devpoll.events;
578     dvp.dp_nfds = engine->u.devpoll.mevents;
579     dvp.dp_timeout = timeout;
580 
581     nevents = ioctl(engine->u.devpoll.fd, DP_POLL, &dvp);
582 
583     err = (nevents == -1) ? nxt_errno : 0;
584 
585     nxt_thread_time_update(engine->task.thread);
586 
587     nxt_debug(&engine->task, "ioctl(%d, DP_POLL): %d",
588               engine->u.devpoll.fd, nevents);
589 
590     if (nevents == -1) {
591         level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
592 
593         nxt_log(&engine->task, level, "ioctl(%d, DP_POLL) failed %E",
594                 engine->u.devpoll.fd, err);
595 
596         return;
597     }
598 
599     for (i = 0; i < nevents; i++) {
600 
601         pfd = &engine->u.devpoll.events[i];
602         fd = pfd->fd;
603         events = pfd->revents;
604 
605         ev = nxt_fd_event_hash_get(&engine->task, &engine->u.devpoll.fd_hash,
606                                    fd);
607 
608         if (nxt_slow_path(ev == NULL)) {
609             nxt_alert(&engine->task,
610                       "ioctl(%d, DP_POLL) returned invalid "
611                       "fd:%d ev:%04Xd rev:%04uXi",
612                       engine->u.devpoll.fd, fd, pfd->events, events);
613 
614             nxt_devpoll_remove(engine, fd);
615             continue;
616         }
617 
618         nxt_debug(ev->task, "devpoll: fd:%d ev:%04uXi rd:%d wr:%d",
619                   fd, events, ev->read, ev->write);
620 
621         if (nxt_slow_path(events & (POLLERR | POLLHUP | POLLNVAL)) != 0) {
622             nxt_alert(ev->task,
623                       "ioctl(%d, DP_POLL) error fd:%d ev:%04Xd rev:%04uXi",
624                       engine->u.devpoll.fd, fd, pfd->events, events);
625 
626             nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
627                                ev->task, ev, ev->data);
628             continue;
629         }
630 
631         if (events & POLLIN) {
632             ev->read_ready = 1;
633 
634             if (ev->read != NXT_EVENT_BLOCKED) {
635                 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
636                                    ev->task, ev, ev->data);
637             }
638 
639             if (ev->read == NXT_EVENT_BLOCKED
640                 || ev->read == NXT_EVENT_ONESHOT)
641             {
642                 nxt_devpoll_disable_read(engine, ev);
643             }
644         }
645 
646         if (events & POLLOUT) {
647             ev->write_ready = 1;
648 
649             if (ev->write != NXT_EVENT_BLOCKED) {
650                 nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
651                                    ev->task, ev, ev->data);
652             }
653 
654             if (ev->write == NXT_EVENT_BLOCKED
655                 || ev->write == NXT_EVENT_ONESHOT)
656             {
657                 nxt_devpoll_disable_write(engine, ev);
658             }
659         }
660     }
661 }
662