xref: /unit/src/nxt_eventport_engine.c (revision 12:477899a6661b)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * The event ports have been introduced in Solaris 10.
12  * The PORT_SOURCE_MQ and PORT_SOURCE_FILE sources have
13  * been added in OpenSolaris.
14  */
15 
16 
17 static nxt_int_t nxt_eventport_create(nxt_event_engine_t *engine,
18     nxt_uint_t mchanges, nxt_uint_t mevents);
19 static void nxt_eventport_free(nxt_event_engine_t *engine);
20 static void nxt_eventport_enable(nxt_event_engine_t *engine,
21     nxt_fd_event_t *ev);
22 static void nxt_eventport_disable(nxt_event_engine_t *engine,
23     nxt_fd_event_t *ev);
24 static nxt_bool_t nxt_eventport_close(nxt_event_engine_t *engine,
25     nxt_fd_event_t *ev);
26 static void nxt_eventport_enable_read(nxt_event_engine_t *engine,
27     nxt_fd_event_t *ev);
28 static void nxt_eventport_enable_write(nxt_event_engine_t *engine,
29     nxt_fd_event_t *ev);
30 static void nxt_eventport_enable_event(nxt_event_engine_t *engine,
31     nxt_fd_event_t *ev, nxt_uint_t events);
32 static void nxt_eventport_disable_read(nxt_event_engine_t *engine,
33     nxt_fd_event_t *ev);
34 static void nxt_eventport_disable_write(nxt_event_engine_t *engine,
35     nxt_fd_event_t *ev);
36 static void nxt_eventport_disable_event(nxt_event_engine_t *engine,
37     nxt_fd_event_t *ev);
38 static nxt_int_t nxt_eventport_commit_changes(nxt_event_engine_t *engine);
39 static void nxt_eventport_error_handler(nxt_task_t *task, void *obj,
40     void *data);
41 static void nxt_eventport_block_read(nxt_event_engine_t *engine,
42     nxt_fd_event_t *ev);
43 static void nxt_eventport_block_write(nxt_event_engine_t *engine,
44     nxt_fd_event_t *ev);
45 static void nxt_eventport_oneshot_read(nxt_event_engine_t *engine,
46     nxt_fd_event_t *ev);
47 static void nxt_eventport_oneshot_write(nxt_event_engine_t *engine,
48     nxt_fd_event_t *ev);
49 static void nxt_eventport_enable_accept(nxt_event_engine_t *engine,
50     nxt_fd_event_t *ev);
51 static nxt_int_t nxt_eventport_enable_post(nxt_event_engine_t *engine,
52     nxt_work_handler_t handler);
53 static void nxt_eventport_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
54 static void nxt_eventport_poll(nxt_event_engine_t *engine,
55     nxt_msec_t timeout);
56 
57 
58 const nxt_event_interface_t  nxt_eventport_engine = {
59     "eventport",
60     nxt_eventport_create,
61     nxt_eventport_free,
62     nxt_eventport_enable,
63     nxt_eventport_disable,
64     nxt_eventport_disable,
65     nxt_eventport_close,
66     nxt_eventport_enable_read,
67     nxt_eventport_enable_write,
68     nxt_eventport_disable_read,
69     nxt_eventport_disable_write,
70     nxt_eventport_block_read,
71     nxt_eventport_block_write,
72     nxt_eventport_oneshot_read,
73     nxt_eventport_oneshot_write,
74     nxt_eventport_enable_accept,
75     NULL,
76     NULL,
77     nxt_eventport_enable_post,
78     nxt_eventport_signal,
79     nxt_eventport_poll,
80 
81     &nxt_unix_event_conn_io,
82 
83     NXT_NO_FILE_EVENTS,
84     NXT_NO_SIGNAL_EVENTS,
85 };
86 
87 
88 static nxt_int_t
89 nxt_eventport_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
90     nxt_uint_t mevents)
91 {
92     nxt_eventport_change_t  *changes;
93 
94     engine->u.eventport.fd = -1;
95     engine->u.eventport.mchanges = mchanges;
96     engine->u.eventport.mevents = mevents;
97 
98     changes = nxt_malloc(sizeof(nxt_eventport_change_t) * mchanges);
99     if (changes == NULL) {
100         goto fail;
101     }
102 
103     engine->u.eventport.changes = changes;
104 
105     engine->u.eventport.events = nxt_malloc(sizeof(port_event_t) * mevents);
106     if (engine->u.eventport.events == NULL) {
107         goto fail;
108     }
109 
110     engine->u.eventport.fd = port_create();
111     if (engine->u.eventport.fd == -1) {
112         nxt_log(&engine->task, NXT_LOG_CRIT, "port_create() failed %E",
113                 nxt_errno);
114         goto fail;
115     }
116 
117     nxt_debug(&engine->task, "port_create(): %d", engine->u.eventport.fd);
118 
119     if (engine->signals != NULL) {
120         engine->u.eventport.signal_handler = engine->signals->handler;
121     }
122 
123     return NXT_OK;
124 
125 fail:
126 
127     nxt_eventport_free(engine);
128 
129     return NXT_ERROR;
130 }
131 
132 
133 static void
134 nxt_eventport_free(nxt_event_engine_t *engine)
135 {
136     int  port;
137 
138     port = engine->u.eventport.fd;
139 
140     nxt_debug(&engine->task, "eventport %d free", port);
141 
142     if (port != -1 && close(port) != 0) {
143         nxt_log(&engine->task, NXT_LOG_CRIT, "eventport close(%d) failed %E",
144                 port, nxt_errno);
145     }
146 
147     nxt_free(engine->u.eventport.events);
148 
149     nxt_memzero(&engine->u.eventport, sizeof(nxt_eventport_engine_t));
150 }
151 
152 
153 static void
154 nxt_eventport_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
155 {
156     ev->read = NXT_EVENT_ACTIVE;
157     ev->write = NXT_EVENT_ACTIVE;
158 
159     nxt_eventport_enable_event(engine, ev, POLLIN | POLLOUT);
160 }
161 
162 
163 static void
164 nxt_eventport_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
165 {
166     if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
167 
168         ev->read = NXT_EVENT_INACTIVE;
169         ev->write = NXT_EVENT_INACTIVE;
170 
171         nxt_eventport_disable_event(engine, ev);
172     }
173 }
174 
175 
176 /*
177  * port_dissociate(3):
178  *
179  *   The association is removed if the owner of the association closes the port.
180  */
181 
182 static nxt_bool_t
183 nxt_eventport_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
184 {
185     ev->read = NXT_EVENT_INACTIVE;
186     ev->write = NXT_EVENT_INACTIVE;
187 
188     return ev->changing;
189 }
190 
191 
192 static void
193 nxt_eventport_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
194 {
195     nxt_uint_t  events;
196 
197     if (ev->read != NXT_EVENT_BLOCKED) {
198         events = (ev->write == NXT_EVENT_INACTIVE) ? POLLIN
199                                                    : (POLLIN | POLLOUT);
200         nxt_eventport_enable_event(engine, ev, events);
201     }
202 
203     ev->read = NXT_EVENT_ACTIVE;
204 }
205 
206 
207 static void
208 nxt_eventport_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
209 {
210     nxt_uint_t  events;
211 
212     if (ev->write != NXT_EVENT_BLOCKED) {
213         events = (ev->read == NXT_EVENT_INACTIVE) ? POLLOUT
214                                                   : (POLLIN | POLLOUT);
215         nxt_eventport_enable_event(engine, ev, events);
216     }
217 
218     ev->write = NXT_EVENT_ACTIVE;
219 }
220 
221 
222 /*
223  * eventport changes are batched to improve instruction and data
224  * cache locality of several port_associate() and port_dissociate()
225  * calls followed by port_getn() call.
226  */
227 
228 static void
229 nxt_eventport_enable_event(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
230     nxt_uint_t events)
231 {
232     nxt_eventport_change_t  *change;
233 
234     nxt_debug(ev->task, "port %d set event: fd:%d ev:%04XD u:%p",
235               engine->u.eventport.fd, ev->fd, events, ev);
236 
237     if (engine->u.eventport.nchanges >= engine->u.eventport.mchanges) {
238         (void) nxt_eventport_commit_changes(engine);
239     }
240 
241     ev->changing = 1;
242 
243     change = &engine->u.eventport.changes[engine->u.eventport.nchanges++];
244     change->events = events;
245     change->event = ev;
246 }
247 
248 
249 static void
250 nxt_eventport_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
251 {
252     ev->read = NXT_EVENT_INACTIVE;
253 
254     if (ev->write == NXT_EVENT_INACTIVE) {
255         nxt_eventport_disable_event(engine, ev);
256 
257     } else {
258         nxt_eventport_enable_event(engine, ev, POLLOUT);
259     }
260 }
261 
262 
263 static void
264 nxt_eventport_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
265 {
266     ev->write = NXT_EVENT_INACTIVE;
267 
268     if (ev->read == NXT_EVENT_INACTIVE) {
269         nxt_eventport_disable_event(engine, ev);
270 
271     } else {
272         nxt_eventport_enable_event(engine, ev, POLLIN);
273     }
274 }
275 
276 
277 static void
278 nxt_eventport_disable_event(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
279 {
280     nxt_eventport_change_t  *change;
281 
282     nxt_debug(ev->task, "port %d disable event : fd:%d",
283               engine->u.eventport.fd, ev->fd);
284 
285     if (engine->u.eventport.nchanges >= engine->u.eventport.mchanges) {
286         (void) nxt_eventport_commit_changes(engine);
287     }
288 
289     ev->changing = 1;
290 
291     change = &engine->u.eventport.changes[engine->u.eventport.nchanges++];
292     change->events = 0;
293     change->event = ev;
294 }
295 
296 
297 static nxt_int_t
298 nxt_eventport_commit_changes(nxt_event_engine_t *engine)
299 {
300     int                     ret, port;
301     nxt_int_t               retval;
302     nxt_fd_event_t          *ev;
303     nxt_eventport_change_t  *change, *end;
304 
305     port = engine->u.eventport.fd;
306 
307     nxt_debug(&engine->task, "eventport %d changes:%ui",
308               port, engine->u.eventport.nchanges);
309 
310     retval = NXT_OK;
311     change = engine->u.eventport.changes;
312     end = change + engine->u.eventport.nchanges;
313 
314     do {
315         ev = change->event;
316         ev->changing = 0;
317 
318         if (change->events != 0) {
319             nxt_debug(ev->task, "port_associate(%d): fd:%d ev:%04XD u:%p",
320                       port, ev->fd, change->events, ev);
321 
322             ret = port_associate(port, PORT_SOURCE_FD,
323                                  ev->fd, change->events, ev);
324 
325             if (nxt_fast_path(ret == 0)) {
326                 goto next;
327             }
328 
329             nxt_log(ev->task, NXT_LOG_CRIT,
330                     "port_associate(%d, %d, %d, %04XD) failed %E",
331                     port, PORT_SOURCE_FD, ev->fd, change->events, nxt_errno);
332 
333         } else {
334             nxt_debug(ev->task, "port_dissociate(%d): fd:%d", port, ev->fd);
335 
336             ret = port_dissociate(port, PORT_SOURCE_FD, ev->fd);
337 
338             if (nxt_fast_path(ret == 0)) {
339                 goto next;
340             }
341 
342             nxt_log(ev->task, NXT_LOG_CRIT,
343                     "port_dissociate(%d, %d, %d) failed %E",
344                     port, PORT_SOURCE_FD, ev->fd, nxt_errno);
345         }
346 
347         nxt_work_queue_add(&engine->fast_work_queue,
348                            nxt_eventport_error_handler,
349                            ev->task, ev, ev->data);
350 
351         retval = NXT_ERROR;
352 
353     next:
354 
355         change++;
356 
357     } while (change < end);
358 
359     engine->u.eventport.nchanges = 0;
360 
361     return retval;
362 }
363 
364 
365 static void
366 nxt_eventport_error_handler(nxt_task_t *task, void *obj, void *data)
367 {
368     nxt_fd_event_t  *ev;
369 
370     ev = obj;
371 
372     ev->read = NXT_EVENT_INACTIVE;
373     ev->write = NXT_EVENT_INACTIVE;
374 
375     ev->error_handler(task, ev, data);
376 }
377 
378 
379 static void
380 nxt_eventport_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
381 {
382     if (ev->read != NXT_EVENT_INACTIVE) {
383         ev->read = NXT_EVENT_BLOCKED;
384     }
385 }
386 
387 
388 static void
389 nxt_eventport_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
390 {
391     if (ev->write != NXT_EVENT_INACTIVE) {
392         ev->write = NXT_EVENT_BLOCKED;
393     }
394 }
395 
396 
397 static void
398 nxt_eventport_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
399 {
400     if (ev->read == NXT_EVENT_INACTIVE) {
401         ev->read = NXT_EVENT_ACTIVE;
402 
403         nxt_eventport_enable_event(engine, ev, POLLIN);
404     }
405 }
406 
407 
408 static void
409 nxt_eventport_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
410 {
411     if (ev->write == NXT_EVENT_INACTIVE) {
412         ev->write = NXT_EVENT_ACTIVE;
413 
414         nxt_eventport_enable_event(engine, ev, POLLOUT);
415     }
416 }
417 
418 
419 static void
420 nxt_eventport_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
421 {
422     ev->read = NXT_EVENT_LEVEL;
423 
424     nxt_eventport_enable_event(engine, ev, POLLIN);
425 }
426 
427 
428 static nxt_int_t
429 nxt_eventport_enable_post(nxt_event_engine_t *engine,
430     nxt_work_handler_t handler)
431 {
432     engine->u.eventport.post_handler = handler;
433 
434     return NXT_OK;
435 }
436 
437 
438 static void
439 nxt_eventport_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
440 {
441     int  port;
442 
443     port = engine->u.eventport.fd;
444 
445     nxt_debug(&engine->task, "port_send(%d, %ui)", port, signo);
446 
447     if (port_send(port, signo, NULL) != 0) {
448         nxt_log(&engine->task, NXT_LOG_CRIT, "port_send(%d) failed %E",
449                 port, nxt_errno);
450     }
451 }
452 
453 
454 static void
455 nxt_eventport_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
456 {
457     int                 n, events, signo;
458     uint_t              nevents;
459     nxt_err_t           err;
460     nxt_uint_t          i, level;
461     timespec_t          ts, *tp;
462     port_event_t        *event;
463     nxt_fd_event_t      *ev;
464     nxt_work_handler_t  handler;
465 
466     if (engine->u.eventport.nchanges != 0) {
467         if (nxt_eventport_commit_changes(engine) != NXT_OK) {
468             /* Error handlers have been enqueued on failure. */
469             timeout = 0;
470         }
471     }
472 
473     if (timeout == NXT_INFINITE_MSEC) {
474         tp = NULL;
475 
476     } else {
477         ts.tv_sec = timeout / 1000;
478         ts.tv_nsec = (timeout % 1000) * 1000000;
479         tp = &ts;
480     }
481 
482     nxt_debug(&engine->task, "port_getn(%d) timeout: %M",
483               engine->u.eventport.fd, timeout);
484 
485     /*
486      * A trap for possible error when Solaris does not update nevents
487      * if ETIME or EINTR is returned.  This issue will be logged as
488      * "unexpected port_getn() event".
489      *
490      * The details are in OpenSolaris mailing list thread "port_getn()
491      * and timeouts - is this a bug or an undocumented feature?"
492      */
493     event = &engine->u.eventport.events[0];
494     event->portev_events = -1; /* invalid port events */
495     event->portev_source = -1; /* invalid port source */
496     event->portev_object = -1;
497     event->portev_user = (void *) -1;
498 
499     nevents = 1;
500     n = port_getn(engine->u.eventport.fd, engine->u.eventport.events,
501                   engine->u.eventport.mevents, &nevents, tp);
502 
503     /*
504      * 32-bit port_getn() on Solaris 10 x86 returns large negative
505      * values instead of 0 when returning immediately.
506      */
507     err = (n < 0) ? nxt_errno : 0;
508 
509     nxt_thread_time_update(engine->task.thread);
510 
511     if (n == -1) {
512         if (err == NXT_ETIME || err == NXT_EINTR) {
513             if (nevents != 0) {
514                 nxt_log(&engine->task, NXT_LOG_CRIT,
515                         "port_getn(%d) failed %E, events:%ud",
516                         engine->u.eventport.fd, err, nevents);
517             }
518         }
519 
520         if (err != NXT_ETIME) {
521             level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT;
522 
523             nxt_log(&engine->task, level, "port_getn(%d) failed %E",
524                     engine->u.eventport.fd, err);
525 
526             if (err != NXT_EINTR) {
527                 return;
528             }
529         }
530     }
531 
532     nxt_debug(&engine->task, "port_getn(%d) events: %d",
533               engine->u.eventport.fd, nevents);
534 
535     for (i = 0; i < nevents; i++) {
536         event = &engine->u.eventport.events[i];
537 
538         switch (event->portev_source) {
539 
540         case PORT_SOURCE_FD:
541             ev = event->portev_user;
542             events = event->portev_events;
543 
544             nxt_debug(ev->task, "eventport: fd:%d ev:%04Xd u:%p rd:%d wr:%d",
545                       event->portev_object, events, ev, ev->read, ev->write);
546 
547             if (nxt_slow_path(events & (POLLERR | POLLHUP | POLLNVAL)) != 0) {
548                 nxt_log(ev->task, NXT_LOG_CRIT,
549                         "port_getn(%d) error fd:%d events:%04Xud",
550                         engine->u.eventport.fd, ev->fd, events);
551 
552                 nxt_work_queue_add(&engine->fast_work_queue,
553                                    nxt_eventport_error_handler,
554                                    ev->task, ev, ev->data);
555                 continue;
556             }
557 
558             if (events & POLLIN) {
559                 ev->read_ready = 1;
560 
561                 if (ev->read != NXT_EVENT_BLOCKED) {
562                     nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
563                                        ev->task, ev, ev->data);
564 
565                 }
566 
567                 if (ev->read != NXT_EVENT_LEVEL) {
568                     ev->read = NXT_EVENT_INACTIVE;
569                 }
570             }
571 
572             if (events & POLLOUT) {
573                 ev->write_ready = 1;
574 
575                 if (ev->write != NXT_EVENT_BLOCKED) {
576                     nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
577                                        ev->task, ev, ev->data);
578                 }
579 
580                 ev->write = NXT_EVENT_INACTIVE;
581             }
582 
583             /*
584              * Reactivate counterpart direction, because the
585              * eventport is oneshot notification facility.
586              */
587             events = (ev->read == NXT_EVENT_INACTIVE) ? 0 : POLLIN;
588             events |= (ev->write == NXT_EVENT_INACTIVE) ? 0 : POLLOUT;
589 
590             if (events != 0) {
591                 nxt_eventport_enable_event(engine, ev, events);
592             }
593 
594             break;
595 
596         case PORT_SOURCE_USER:
597             nxt_debug(&engine->task, "eventport: user ev:%d u:%p",
598                       event->portev_events, event->portev_user);
599 
600             signo = event->portev_events;
601 
602             handler = (signo == 0) ? engine->u.eventport.post_handler
603                                    : engine->u.eventport.signal_handler;
604 
605             nxt_work_queue_add(&engine->fast_work_queue, handler,
606                                &engine->task, (void *) (uintptr_t) signo, NULL);
607 
608             break;
609 
610         default:
611             nxt_log(&engine->task, NXT_LOG_CRIT,
612                     "unexpected port_getn(%d) event: ev:%d src:%d obj:%p u:%p",
613                     engine->u.eventport.fd, event->portev_events,
614                     event->portev_source, event->portev_object,
615                     event->portev_user);
616         }
617     }
618 }
619