xref: /unit/src/nxt_eventport_engine.c (revision 564:762f8c976ead)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * The event ports have been introduced in Solaris 10.
12  * The PORT_SOURCE_MQ and PORT_SOURCE_FILE sources have
13  * been added in OpenSolaris.
14  */
15 
16 
17 static nxt_int_t nxt_eventport_create(nxt_event_engine_t *engine,
18     nxt_uint_t mchanges, nxt_uint_t mevents);
19 static void nxt_eventport_free(nxt_event_engine_t *engine);
20 static void nxt_eventport_enable(nxt_event_engine_t *engine,
21     nxt_fd_event_t *ev);
22 static void nxt_eventport_disable(nxt_event_engine_t *engine,
23     nxt_fd_event_t *ev);
24 static nxt_bool_t nxt_eventport_close(nxt_event_engine_t *engine,
25     nxt_fd_event_t *ev);
26 static void nxt_eventport_enable_read(nxt_event_engine_t *engine,
27     nxt_fd_event_t *ev);
28 static void nxt_eventport_enable_write(nxt_event_engine_t *engine,
29     nxt_fd_event_t *ev);
30 static void nxt_eventport_enable_event(nxt_event_engine_t *engine,
31     nxt_fd_event_t *ev, nxt_uint_t events);
32 static void nxt_eventport_disable_read(nxt_event_engine_t *engine,
33     nxt_fd_event_t *ev);
34 static void nxt_eventport_disable_write(nxt_event_engine_t *engine,
35     nxt_fd_event_t *ev);
36 static void nxt_eventport_disable_event(nxt_event_engine_t *engine,
37     nxt_fd_event_t *ev);
38 static nxt_int_t nxt_eventport_commit_changes(nxt_event_engine_t *engine);
39 static void nxt_eventport_error_handler(nxt_task_t *task, void *obj,
40     void *data);
41 static void nxt_eventport_block_read(nxt_event_engine_t *engine,
42     nxt_fd_event_t *ev);
43 static void nxt_eventport_block_write(nxt_event_engine_t *engine,
44     nxt_fd_event_t *ev);
45 static void nxt_eventport_oneshot_read(nxt_event_engine_t *engine,
46     nxt_fd_event_t *ev);
47 static void nxt_eventport_oneshot_write(nxt_event_engine_t *engine,
48     nxt_fd_event_t *ev);
49 static void nxt_eventport_enable_accept(nxt_event_engine_t *engine,
50     nxt_fd_event_t *ev);
51 static nxt_int_t nxt_eventport_enable_post(nxt_event_engine_t *engine,
52     nxt_work_handler_t handler);
53 static void nxt_eventport_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
54 static void nxt_eventport_poll(nxt_event_engine_t *engine,
55     nxt_msec_t timeout);
56 
57 
58 const nxt_event_interface_t  nxt_eventport_engine = {
59     "eventport",
60     nxt_eventport_create,
61     nxt_eventport_free,
62     nxt_eventport_enable,
63     nxt_eventport_disable,
64     nxt_eventport_disable,
65     nxt_eventport_close,
66     nxt_eventport_enable_read,
67     nxt_eventport_enable_write,
68     nxt_eventport_disable_read,
69     nxt_eventport_disable_write,
70     nxt_eventport_block_read,
71     nxt_eventport_block_write,
72     nxt_eventport_oneshot_read,
73     nxt_eventport_oneshot_write,
74     nxt_eventport_enable_accept,
75     NULL,
76     NULL,
77     nxt_eventport_enable_post,
78     nxt_eventport_signal,
79     nxt_eventport_poll,
80 
81     &nxt_unix_conn_io,
82 
83     NXT_NO_FILE_EVENTS,
84     NXT_NO_SIGNAL_EVENTS,
85 };
86 
87 
88 static nxt_int_t
nxt_eventport_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)89 nxt_eventport_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
90     nxt_uint_t mevents)
91 {
92     nxt_eventport_change_t  *changes;
93 
94     engine->u.eventport.fd = -1;
95     engine->u.eventport.mchanges = mchanges;
96     engine->u.eventport.mevents = mevents;
97 
98     changes = nxt_malloc(sizeof(nxt_eventport_change_t) * mchanges);
99     if (changes == NULL) {
100         goto fail;
101     }
102 
103     engine->u.eventport.changes = changes;
104 
105     engine->u.eventport.events = nxt_malloc(sizeof(port_event_t) * mevents);
106     if (engine->u.eventport.events == NULL) {
107         goto fail;
108     }
109 
110     engine->u.eventport.fd = port_create();
111     if (engine->u.eventport.fd == -1) {
112         nxt_alert(&engine->task, "port_create() failed %E", nxt_errno);
113         goto fail;
114     }
115 
116     nxt_debug(&engine->task, "port_create(): %d", engine->u.eventport.fd);
117 
118     if (engine->signals != NULL) {
119         engine->u.eventport.signal_handler = engine->signals->handler;
120     }
121 
122     return NXT_OK;
123 
124 fail:
125 
126     nxt_eventport_free(engine);
127 
128     return NXT_ERROR;
129 }
130 
131 
132 static void
nxt_eventport_free(nxt_event_engine_t * engine)133 nxt_eventport_free(nxt_event_engine_t *engine)
134 {
135     int  port;
136 
137     port = engine->u.eventport.fd;
138 
139     nxt_debug(&engine->task, "eventport %d free", port);
140 
141     if (port != -1 && close(port) != 0) {
142         nxt_alert(&engine->task, "eventport close(%d) failed %E",
143                   port, nxt_errno);
144     }
145 
146     nxt_free(engine->u.eventport.events);
147 
148     nxt_memzero(&engine->u.eventport, sizeof(nxt_eventport_engine_t));
149 }
150 
151 
152 static void
nxt_eventport_enable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)153 nxt_eventport_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
154 {
155     ev->read = NXT_EVENT_ACTIVE;
156     ev->write = NXT_EVENT_ACTIVE;
157 
158     nxt_eventport_enable_event(engine, ev, POLLIN | POLLOUT);
159 }
160 
161 
162 static void
nxt_eventport_disable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)163 nxt_eventport_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
164 {
165     if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
166 
167         ev->read = NXT_EVENT_INACTIVE;
168         ev->write = NXT_EVENT_INACTIVE;
169 
170         nxt_eventport_disable_event(engine, ev);
171     }
172 }
173 
174 
175 /*
176  * port_dissociate(3):
177  *
178  *   The association is removed if the owner of the association closes the port.
179  */
180 
181 static nxt_bool_t
nxt_eventport_close(nxt_event_engine_t * engine,nxt_fd_event_t * ev)182 nxt_eventport_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
183 {
184     ev->read = NXT_EVENT_INACTIVE;
185     ev->write = NXT_EVENT_INACTIVE;
186 
187     return ev->changing;
188 }
189 
190 
191 static void
nxt_eventport_enable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)192 nxt_eventport_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
193 {
194     nxt_uint_t  events;
195 
196     if (ev->read != NXT_EVENT_BLOCKED) {
197         events = (ev->write == NXT_EVENT_INACTIVE) ? POLLIN
198                                                    : (POLLIN | POLLOUT);
199         nxt_eventport_enable_event(engine, ev, events);
200     }
201 
202     ev->read = NXT_EVENT_ACTIVE;
203 }
204 
205 
206 static void
nxt_eventport_enable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)207 nxt_eventport_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
208 {
209     nxt_uint_t  events;
210 
211     if (ev->write != NXT_EVENT_BLOCKED) {
212         events = (ev->read == NXT_EVENT_INACTIVE) ? POLLOUT
213                                                   : (POLLIN | POLLOUT);
214         nxt_eventport_enable_event(engine, ev, events);
215     }
216 
217     ev->write = NXT_EVENT_ACTIVE;
218 }
219 
220 
221 /*
222  * eventport changes are batched to improve instruction and data
223  * cache locality of several port_associate() and port_dissociate()
224  * calls followed by port_getn() call.
225  */
226 
227 static void
nxt_eventport_enable_event(nxt_event_engine_t * engine,nxt_fd_event_t * ev,nxt_uint_t events)228 nxt_eventport_enable_event(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
229     nxt_uint_t events)
230 {
231     nxt_eventport_change_t  *change;
232 
233     nxt_debug(ev->task, "port %d set event: fd:%d ev:%04XD u:%p",
234               engine->u.eventport.fd, ev->fd, events, ev);
235 
236     if (engine->u.eventport.nchanges >= engine->u.eventport.mchanges) {
237         (void) nxt_eventport_commit_changes(engine);
238     }
239 
240     ev->changing = 1;
241 
242     change = &engine->u.eventport.changes[engine->u.eventport.nchanges++];
243     change->events = events;
244     change->event = ev;
245 }
246 
247 
248 static void
nxt_eventport_disable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)249 nxt_eventport_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
250 {
251     ev->read = NXT_EVENT_INACTIVE;
252 
253     if (ev->write == NXT_EVENT_INACTIVE) {
254         nxt_eventport_disable_event(engine, ev);
255 
256     } else {
257         nxt_eventport_enable_event(engine, ev, POLLOUT);
258     }
259 }
260 
261 
262 static void
nxt_eventport_disable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)263 nxt_eventport_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
264 {
265     ev->write = NXT_EVENT_INACTIVE;
266 
267     if (ev->read == NXT_EVENT_INACTIVE) {
268         nxt_eventport_disable_event(engine, ev);
269 
270     } else {
271         nxt_eventport_enable_event(engine, ev, POLLIN);
272     }
273 }
274 
275 
276 static void
nxt_eventport_disable_event(nxt_event_engine_t * engine,nxt_fd_event_t * ev)277 nxt_eventport_disable_event(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
278 {
279     nxt_eventport_change_t  *change;
280 
281     nxt_debug(ev->task, "port %d disable event : fd:%d",
282               engine->u.eventport.fd, ev->fd);
283 
284     if (engine->u.eventport.nchanges >= engine->u.eventport.mchanges) {
285         (void) nxt_eventport_commit_changes(engine);
286     }
287 
288     ev->changing = 1;
289 
290     change = &engine->u.eventport.changes[engine->u.eventport.nchanges++];
291     change->events = 0;
292     change->event = ev;
293 }
294 
295 
296 static nxt_int_t
nxt_eventport_commit_changes(nxt_event_engine_t * engine)297 nxt_eventport_commit_changes(nxt_event_engine_t *engine)
298 {
299     int                     ret, port;
300     nxt_int_t               retval;
301     nxt_fd_event_t          *ev;
302     nxt_eventport_change_t  *change, *end;
303 
304     port = engine->u.eventport.fd;
305 
306     nxt_debug(&engine->task, "eventport %d changes:%ui",
307               port, engine->u.eventport.nchanges);
308 
309     retval = NXT_OK;
310     change = engine->u.eventport.changes;
311     end = change + engine->u.eventport.nchanges;
312 
313     do {
314         ev = change->event;
315         ev->changing = 0;
316 
317         if (change->events != 0) {
318             nxt_debug(ev->task, "port_associate(%d): fd:%d ev:%04XD u:%p",
319                       port, ev->fd, change->events, ev);
320 
321             ret = port_associate(port, PORT_SOURCE_FD,
322                                  ev->fd, change->events, ev);
323 
324             if (nxt_fast_path(ret == 0)) {
325                 goto next;
326             }
327 
328             nxt_alert(ev->task, "port_associate(%d, %d, %d, %04XD) failed %E",
329                       port, PORT_SOURCE_FD, ev->fd, change->events, nxt_errno);
330 
331         } else {
332             nxt_debug(ev->task, "port_dissociate(%d): fd:%d", port, ev->fd);
333 
334             ret = port_dissociate(port, PORT_SOURCE_FD, ev->fd);
335 
336             if (nxt_fast_path(ret == 0)) {
337                 goto next;
338             }
339 
340             nxt_alert(ev->task, "port_dissociate(%d, %d, %d) failed %E",
341                       port, PORT_SOURCE_FD, ev->fd, nxt_errno);
342         }
343 
344         nxt_work_queue_add(&engine->fast_work_queue,
345                            nxt_eventport_error_handler,
346                            ev->task, ev, ev->data);
347 
348         retval = NXT_ERROR;
349 
350     next:
351 
352         change++;
353 
354     } while (change < end);
355 
356     engine->u.eventport.nchanges = 0;
357 
358     return retval;
359 }
360 
361 
362 static void
nxt_eventport_error_handler(nxt_task_t * task,void * obj,void * data)363 nxt_eventport_error_handler(nxt_task_t *task, void *obj, void *data)
364 {
365     nxt_fd_event_t  *ev;
366 
367     ev = obj;
368 
369     ev->read = NXT_EVENT_INACTIVE;
370     ev->write = NXT_EVENT_INACTIVE;
371 
372     ev->error_handler(task, ev, data);
373 }
374 
375 
376 static void
nxt_eventport_block_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)377 nxt_eventport_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
378 {
379     if (ev->read != NXT_EVENT_INACTIVE) {
380         ev->read = NXT_EVENT_BLOCKED;
381     }
382 }
383 
384 
385 static void
nxt_eventport_block_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)386 nxt_eventport_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
387 {
388     if (ev->write != NXT_EVENT_INACTIVE) {
389         ev->write = NXT_EVENT_BLOCKED;
390     }
391 }
392 
393 
394 static void
nxt_eventport_oneshot_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)395 nxt_eventport_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
396 {
397     if (ev->read == NXT_EVENT_INACTIVE) {
398         ev->read = NXT_EVENT_ACTIVE;
399 
400         nxt_eventport_enable_event(engine, ev, POLLIN);
401     }
402 }
403 
404 
405 static void
nxt_eventport_oneshot_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)406 nxt_eventport_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
407 {
408     if (ev->write == NXT_EVENT_INACTIVE) {
409         ev->write = NXT_EVENT_ACTIVE;
410 
411         nxt_eventport_enable_event(engine, ev, POLLOUT);
412     }
413 }
414 
415 
416 static void
nxt_eventport_enable_accept(nxt_event_engine_t * engine,nxt_fd_event_t * ev)417 nxt_eventport_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
418 {
419     ev->read = NXT_EVENT_LEVEL;
420 
421     nxt_eventport_enable_event(engine, ev, POLLIN);
422 }
423 
424 
425 static nxt_int_t
nxt_eventport_enable_post(nxt_event_engine_t * engine,nxt_work_handler_t handler)426 nxt_eventport_enable_post(nxt_event_engine_t *engine,
427     nxt_work_handler_t handler)
428 {
429     engine->u.eventport.post_handler = handler;
430 
431     return NXT_OK;
432 }
433 
434 
435 static void
nxt_eventport_signal(nxt_event_engine_t * engine,nxt_uint_t signo)436 nxt_eventport_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
437 {
438     int  port;
439 
440     port = engine->u.eventport.fd;
441 
442     nxt_debug(&engine->task, "port_send(%d, %ui)", port, signo);
443 
444     if (port_send(port, signo, NULL) != 0) {
445         nxt_alert(&engine->task, "port_send(%d) failed %E", port, nxt_errno);
446     }
447 }
448 
449 
450 static void
nxt_eventport_poll(nxt_event_engine_t * engine,nxt_msec_t timeout)451 nxt_eventport_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
452 {
453     int                 n, events, signo;
454     uint_t              nevents;
455     nxt_err_t           err;
456     nxt_uint_t          i, level;
457     timespec_t          ts, *tp;
458     port_event_t        *event;
459     nxt_fd_event_t      *ev;
460     nxt_work_handler_t  handler;
461 
462     if (engine->u.eventport.nchanges != 0) {
463         if (nxt_eventport_commit_changes(engine) != NXT_OK) {
464             /* Error handlers have been enqueued on failure. */
465             timeout = 0;
466         }
467     }
468 
469     if (timeout == NXT_INFINITE_MSEC) {
470         tp = NULL;
471 
472     } else {
473         ts.tv_sec = timeout / 1000;
474         ts.tv_nsec = (timeout % 1000) * 1000000;
475         tp = &ts;
476     }
477 
478     nxt_debug(&engine->task, "port_getn(%d) timeout: %M",
479               engine->u.eventport.fd, timeout);
480 
481     /*
482      * A trap for possible error when Solaris does not update nevents
483      * if ETIME or EINTR is returned.  This issue will be logged as
484      * "unexpected port_getn() event".
485      *
486      * The details are in OpenSolaris mailing list thread "port_getn()
487      * and timeouts - is this a bug or an undocumented feature?"
488      */
489     event = &engine->u.eventport.events[0];
490     event->portev_events = -1; /* invalid port events */
491     event->portev_source = -1; /* invalid port source */
492     event->portev_object = -1;
493     event->portev_user = (void *) -1;
494 
495     nevents = 1;
496     n = port_getn(engine->u.eventport.fd, engine->u.eventport.events,
497                   engine->u.eventport.mevents, &nevents, tp);
498 
499     /*
500      * 32-bit port_getn() on Solaris 10 x86 returns large negative
501      * values instead of 0 when returning immediately.
502      */
503     err = (n < 0) ? nxt_errno : 0;
504 
505     nxt_thread_time_update(engine->task.thread);
506 
507     if (n == -1) {
508         if (err == NXT_ETIME || err == NXT_EINTR) {
509             if (nevents != 0) {
510                 nxt_alert(&engine->task, "port_getn(%d) failed %E, events:%ud",
511                           engine->u.eventport.fd, err, nevents);
512             }
513         }
514 
515         if (err != NXT_ETIME) {
516             level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
517 
518             nxt_log(&engine->task, level, "port_getn(%d) failed %E",
519                     engine->u.eventport.fd, err);
520 
521             if (err != NXT_EINTR) {
522                 return;
523             }
524         }
525     }
526 
527     nxt_debug(&engine->task, "port_getn(%d) events: %d",
528               engine->u.eventport.fd, nevents);
529 
530     for (i = 0; i < nevents; i++) {
531         event = &engine->u.eventport.events[i];
532 
533         switch (event->portev_source) {
534 
535         case PORT_SOURCE_FD:
536             ev = event->portev_user;
537             events = event->portev_events;
538 
539             nxt_debug(ev->task, "eventport: fd:%d ev:%04Xd u:%p rd:%d wr:%d",
540                       event->portev_object, events, ev, ev->read, ev->write);
541 
542             if (nxt_slow_path(events & (POLLERR | POLLHUP | POLLNVAL)) != 0) {
543                 nxt_alert(ev->task, "port_getn(%d) error fd:%d events:%04Xud",
544                           engine->u.eventport.fd, ev->fd, events);
545 
546                 nxt_work_queue_add(&engine->fast_work_queue,
547                                    nxt_eventport_error_handler,
548                                    ev->task, ev, ev->data);
549                 continue;
550             }
551 
552             if (events & POLLIN) {
553                 ev->read_ready = 1;
554 
555                 if (ev->read != NXT_EVENT_BLOCKED) {
556                     nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
557                                        ev->task, ev, ev->data);
558 
559                 }
560 
561                 if (ev->read != NXT_EVENT_LEVEL) {
562                     ev->read = NXT_EVENT_INACTIVE;
563                 }
564             }
565 
566             if (events & POLLOUT) {
567                 ev->write_ready = 1;
568 
569                 if (ev->write != NXT_EVENT_BLOCKED) {
570                     nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
571                                        ev->task, ev, ev->data);
572                 }
573 
574                 ev->write = NXT_EVENT_INACTIVE;
575             }
576 
577             /*
578              * Reactivate counterpart direction, because the
579              * eventport is oneshot notification facility.
580              */
581             events = (ev->read == NXT_EVENT_INACTIVE) ? 0 : POLLIN;
582             events |= (ev->write == NXT_EVENT_INACTIVE) ? 0 : POLLOUT;
583 
584             if (events != 0) {
585                 nxt_eventport_enable_event(engine, ev, events);
586             }
587 
588             break;
589 
590         case PORT_SOURCE_USER:
591             nxt_debug(&engine->task, "eventport: user ev:%d u:%p",
592                       event->portev_events, event->portev_user);
593 
594             signo = event->portev_events;
595 
596             handler = (signo == 0) ? engine->u.eventport.post_handler
597                                    : engine->u.eventport.signal_handler;
598 
599             nxt_work_queue_add(&engine->fast_work_queue, handler,
600                                &engine->task, (void *) (uintptr_t) signo, NULL);
601 
602             break;
603 
604         default:
605             nxt_alert(&engine->task,
606                       "unexpected port_getn(%d) event: "
607                       "ev:%d src:%d obj:%p u:%p",
608                       engine->u.eventport.fd, event->portev_events,
609                       event->portev_source, event->portev_object,
610                       event->portev_user);
611         }
612     }
613 }
614