1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 /*
11 * The event ports have been introduced in Solaris 10.
12 * The PORT_SOURCE_MQ and PORT_SOURCE_FILE sources have
13 * been added in OpenSolaris.
14 */
15
16
17 static nxt_int_t nxt_eventport_create(nxt_event_engine_t *engine,
18 nxt_uint_t mchanges, nxt_uint_t mevents);
19 static void nxt_eventport_free(nxt_event_engine_t *engine);
20 static void nxt_eventport_enable(nxt_event_engine_t *engine,
21 nxt_fd_event_t *ev);
22 static void nxt_eventport_disable(nxt_event_engine_t *engine,
23 nxt_fd_event_t *ev);
24 static nxt_bool_t nxt_eventport_close(nxt_event_engine_t *engine,
25 nxt_fd_event_t *ev);
26 static void nxt_eventport_enable_read(nxt_event_engine_t *engine,
27 nxt_fd_event_t *ev);
28 static void nxt_eventport_enable_write(nxt_event_engine_t *engine,
29 nxt_fd_event_t *ev);
30 static void nxt_eventport_enable_event(nxt_event_engine_t *engine,
31 nxt_fd_event_t *ev, nxt_uint_t events);
32 static void nxt_eventport_disable_read(nxt_event_engine_t *engine,
33 nxt_fd_event_t *ev);
34 static void nxt_eventport_disable_write(nxt_event_engine_t *engine,
35 nxt_fd_event_t *ev);
36 static void nxt_eventport_disable_event(nxt_event_engine_t *engine,
37 nxt_fd_event_t *ev);
38 static nxt_int_t nxt_eventport_commit_changes(nxt_event_engine_t *engine);
39 static void nxt_eventport_error_handler(nxt_task_t *task, void *obj,
40 void *data);
41 static void nxt_eventport_block_read(nxt_event_engine_t *engine,
42 nxt_fd_event_t *ev);
43 static void nxt_eventport_block_write(nxt_event_engine_t *engine,
44 nxt_fd_event_t *ev);
45 static void nxt_eventport_oneshot_read(nxt_event_engine_t *engine,
46 nxt_fd_event_t *ev);
47 static void nxt_eventport_oneshot_write(nxt_event_engine_t *engine,
48 nxt_fd_event_t *ev);
49 static void nxt_eventport_enable_accept(nxt_event_engine_t *engine,
50 nxt_fd_event_t *ev);
51 static nxt_int_t nxt_eventport_enable_post(nxt_event_engine_t *engine,
52 nxt_work_handler_t handler);
53 static void nxt_eventport_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
54 static void nxt_eventport_poll(nxt_event_engine_t *engine,
55 nxt_msec_t timeout);
56
57
58 const nxt_event_interface_t nxt_eventport_engine = {
59 "eventport",
60 nxt_eventport_create,
61 nxt_eventport_free,
62 nxt_eventport_enable,
63 nxt_eventport_disable,
64 nxt_eventport_disable,
65 nxt_eventport_close,
66 nxt_eventport_enable_read,
67 nxt_eventport_enable_write,
68 nxt_eventport_disable_read,
69 nxt_eventport_disable_write,
70 nxt_eventport_block_read,
71 nxt_eventport_block_write,
72 nxt_eventport_oneshot_read,
73 nxt_eventport_oneshot_write,
74 nxt_eventport_enable_accept,
75 NULL,
76 NULL,
77 nxt_eventport_enable_post,
78 nxt_eventport_signal,
79 nxt_eventport_poll,
80
81 &nxt_unix_conn_io,
82
83 NXT_NO_FILE_EVENTS,
84 NXT_NO_SIGNAL_EVENTS,
85 };
86
87
88 static nxt_int_t
nxt_eventport_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)89 nxt_eventport_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
90 nxt_uint_t mevents)
91 {
92 nxt_eventport_change_t *changes;
93
94 engine->u.eventport.fd = -1;
95 engine->u.eventport.mchanges = mchanges;
96 engine->u.eventport.mevents = mevents;
97
98 changes = nxt_malloc(sizeof(nxt_eventport_change_t) * mchanges);
99 if (changes == NULL) {
100 goto fail;
101 }
102
103 engine->u.eventport.changes = changes;
104
105 engine->u.eventport.events = nxt_malloc(sizeof(port_event_t) * mevents);
106 if (engine->u.eventport.events == NULL) {
107 goto fail;
108 }
109
110 engine->u.eventport.fd = port_create();
111 if (engine->u.eventport.fd == -1) {
112 nxt_alert(&engine->task, "port_create() failed %E", nxt_errno);
113 goto fail;
114 }
115
116 nxt_debug(&engine->task, "port_create(): %d", engine->u.eventport.fd);
117
118 if (engine->signals != NULL) {
119 engine->u.eventport.signal_handler = engine->signals->handler;
120 }
121
122 return NXT_OK;
123
124 fail:
125
126 nxt_eventport_free(engine);
127
128 return NXT_ERROR;
129 }
130
131
132 static void
nxt_eventport_free(nxt_event_engine_t * engine)133 nxt_eventport_free(nxt_event_engine_t *engine)
134 {
135 int port;
136
137 port = engine->u.eventport.fd;
138
139 nxt_debug(&engine->task, "eventport %d free", port);
140
141 if (port != -1 && close(port) != 0) {
142 nxt_alert(&engine->task, "eventport close(%d) failed %E",
143 port, nxt_errno);
144 }
145
146 nxt_free(engine->u.eventport.events);
147
148 nxt_memzero(&engine->u.eventport, sizeof(nxt_eventport_engine_t));
149 }
150
151
152 static void
nxt_eventport_enable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)153 nxt_eventport_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
154 {
155 ev->read = NXT_EVENT_ACTIVE;
156 ev->write = NXT_EVENT_ACTIVE;
157
158 nxt_eventport_enable_event(engine, ev, POLLIN | POLLOUT);
159 }
160
161
162 static void
nxt_eventport_disable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)163 nxt_eventport_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
164 {
165 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
166
167 ev->read = NXT_EVENT_INACTIVE;
168 ev->write = NXT_EVENT_INACTIVE;
169
170 nxt_eventport_disable_event(engine, ev);
171 }
172 }
173
174
175 /*
176 * port_dissociate(3):
177 *
178 * The association is removed if the owner of the association closes the port.
179 */
180
181 static nxt_bool_t
nxt_eventport_close(nxt_event_engine_t * engine,nxt_fd_event_t * ev)182 nxt_eventport_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
183 {
184 ev->read = NXT_EVENT_INACTIVE;
185 ev->write = NXT_EVENT_INACTIVE;
186
187 return ev->changing;
188 }
189
190
191 static void
nxt_eventport_enable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)192 nxt_eventport_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
193 {
194 nxt_uint_t events;
195
196 if (ev->read != NXT_EVENT_BLOCKED) {
197 events = (ev->write == NXT_EVENT_INACTIVE) ? POLLIN
198 : (POLLIN | POLLOUT);
199 nxt_eventport_enable_event(engine, ev, events);
200 }
201
202 ev->read = NXT_EVENT_ACTIVE;
203 }
204
205
206 static void
nxt_eventport_enable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)207 nxt_eventport_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
208 {
209 nxt_uint_t events;
210
211 if (ev->write != NXT_EVENT_BLOCKED) {
212 events = (ev->read == NXT_EVENT_INACTIVE) ? POLLOUT
213 : (POLLIN | POLLOUT);
214 nxt_eventport_enable_event(engine, ev, events);
215 }
216
217 ev->write = NXT_EVENT_ACTIVE;
218 }
219
220
221 /*
222 * eventport changes are batched to improve instruction and data
223 * cache locality of several port_associate() and port_dissociate()
224 * calls followed by port_getn() call.
225 */
226
227 static void
nxt_eventport_enable_event(nxt_event_engine_t * engine,nxt_fd_event_t * ev,nxt_uint_t events)228 nxt_eventport_enable_event(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
229 nxt_uint_t events)
230 {
231 nxt_eventport_change_t *change;
232
233 nxt_debug(ev->task, "port %d set event: fd:%d ev:%04XD u:%p",
234 engine->u.eventport.fd, ev->fd, events, ev);
235
236 if (engine->u.eventport.nchanges >= engine->u.eventport.mchanges) {
237 (void) nxt_eventport_commit_changes(engine);
238 }
239
240 ev->changing = 1;
241
242 change = &engine->u.eventport.changes[engine->u.eventport.nchanges++];
243 change->events = events;
244 change->event = ev;
245 }
246
247
248 static void
nxt_eventport_disable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)249 nxt_eventport_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
250 {
251 ev->read = NXT_EVENT_INACTIVE;
252
253 if (ev->write == NXT_EVENT_INACTIVE) {
254 nxt_eventport_disable_event(engine, ev);
255
256 } else {
257 nxt_eventport_enable_event(engine, ev, POLLOUT);
258 }
259 }
260
261
262 static void
nxt_eventport_disable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)263 nxt_eventport_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
264 {
265 ev->write = NXT_EVENT_INACTIVE;
266
267 if (ev->read == NXT_EVENT_INACTIVE) {
268 nxt_eventport_disable_event(engine, ev);
269
270 } else {
271 nxt_eventport_enable_event(engine, ev, POLLIN);
272 }
273 }
274
275
276 static void
nxt_eventport_disable_event(nxt_event_engine_t * engine,nxt_fd_event_t * ev)277 nxt_eventport_disable_event(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
278 {
279 nxt_eventport_change_t *change;
280
281 nxt_debug(ev->task, "port %d disable event : fd:%d",
282 engine->u.eventport.fd, ev->fd);
283
284 if (engine->u.eventport.nchanges >= engine->u.eventport.mchanges) {
285 (void) nxt_eventport_commit_changes(engine);
286 }
287
288 ev->changing = 1;
289
290 change = &engine->u.eventport.changes[engine->u.eventport.nchanges++];
291 change->events = 0;
292 change->event = ev;
293 }
294
295
296 static nxt_int_t
nxt_eventport_commit_changes(nxt_event_engine_t * engine)297 nxt_eventport_commit_changes(nxt_event_engine_t *engine)
298 {
299 int ret, port;
300 nxt_int_t retval;
301 nxt_fd_event_t *ev;
302 nxt_eventport_change_t *change, *end;
303
304 port = engine->u.eventport.fd;
305
306 nxt_debug(&engine->task, "eventport %d changes:%ui",
307 port, engine->u.eventport.nchanges);
308
309 retval = NXT_OK;
310 change = engine->u.eventport.changes;
311 end = change + engine->u.eventport.nchanges;
312
313 do {
314 ev = change->event;
315 ev->changing = 0;
316
317 if (change->events != 0) {
318 nxt_debug(ev->task, "port_associate(%d): fd:%d ev:%04XD u:%p",
319 port, ev->fd, change->events, ev);
320
321 ret = port_associate(port, PORT_SOURCE_FD,
322 ev->fd, change->events, ev);
323
324 if (nxt_fast_path(ret == 0)) {
325 goto next;
326 }
327
328 nxt_alert(ev->task, "port_associate(%d, %d, %d, %04XD) failed %E",
329 port, PORT_SOURCE_FD, ev->fd, change->events, nxt_errno);
330
331 } else {
332 nxt_debug(ev->task, "port_dissociate(%d): fd:%d", port, ev->fd);
333
334 ret = port_dissociate(port, PORT_SOURCE_FD, ev->fd);
335
336 if (nxt_fast_path(ret == 0)) {
337 goto next;
338 }
339
340 nxt_alert(ev->task, "port_dissociate(%d, %d, %d) failed %E",
341 port, PORT_SOURCE_FD, ev->fd, nxt_errno);
342 }
343
344 nxt_work_queue_add(&engine->fast_work_queue,
345 nxt_eventport_error_handler,
346 ev->task, ev, ev->data);
347
348 retval = NXT_ERROR;
349
350 next:
351
352 change++;
353
354 } while (change < end);
355
356 engine->u.eventport.nchanges = 0;
357
358 return retval;
359 }
360
361
362 static void
nxt_eventport_error_handler(nxt_task_t * task,void * obj,void * data)363 nxt_eventport_error_handler(nxt_task_t *task, void *obj, void *data)
364 {
365 nxt_fd_event_t *ev;
366
367 ev = obj;
368
369 ev->read = NXT_EVENT_INACTIVE;
370 ev->write = NXT_EVENT_INACTIVE;
371
372 ev->error_handler(task, ev, data);
373 }
374
375
376 static void
nxt_eventport_block_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)377 nxt_eventport_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
378 {
379 if (ev->read != NXT_EVENT_INACTIVE) {
380 ev->read = NXT_EVENT_BLOCKED;
381 }
382 }
383
384
385 static void
nxt_eventport_block_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)386 nxt_eventport_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
387 {
388 if (ev->write != NXT_EVENT_INACTIVE) {
389 ev->write = NXT_EVENT_BLOCKED;
390 }
391 }
392
393
394 static void
nxt_eventport_oneshot_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)395 nxt_eventport_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
396 {
397 if (ev->read == NXT_EVENT_INACTIVE) {
398 ev->read = NXT_EVENT_ACTIVE;
399
400 nxt_eventport_enable_event(engine, ev, POLLIN);
401 }
402 }
403
404
405 static void
nxt_eventport_oneshot_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)406 nxt_eventport_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
407 {
408 if (ev->write == NXT_EVENT_INACTIVE) {
409 ev->write = NXT_EVENT_ACTIVE;
410
411 nxt_eventport_enable_event(engine, ev, POLLOUT);
412 }
413 }
414
415
416 static void
nxt_eventport_enable_accept(nxt_event_engine_t * engine,nxt_fd_event_t * ev)417 nxt_eventport_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
418 {
419 ev->read = NXT_EVENT_LEVEL;
420
421 nxt_eventport_enable_event(engine, ev, POLLIN);
422 }
423
424
425 static nxt_int_t
nxt_eventport_enable_post(nxt_event_engine_t * engine,nxt_work_handler_t handler)426 nxt_eventport_enable_post(nxt_event_engine_t *engine,
427 nxt_work_handler_t handler)
428 {
429 engine->u.eventport.post_handler = handler;
430
431 return NXT_OK;
432 }
433
434
435 static void
nxt_eventport_signal(nxt_event_engine_t * engine,nxt_uint_t signo)436 nxt_eventport_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
437 {
438 int port;
439
440 port = engine->u.eventport.fd;
441
442 nxt_debug(&engine->task, "port_send(%d, %ui)", port, signo);
443
444 if (port_send(port, signo, NULL) != 0) {
445 nxt_alert(&engine->task, "port_send(%d) failed %E", port, nxt_errno);
446 }
447 }
448
449
450 static void
nxt_eventport_poll(nxt_event_engine_t * engine,nxt_msec_t timeout)451 nxt_eventport_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
452 {
453 int n, events, signo;
454 uint_t nevents;
455 nxt_err_t err;
456 nxt_uint_t i, level;
457 timespec_t ts, *tp;
458 port_event_t *event;
459 nxt_fd_event_t *ev;
460 nxt_work_handler_t handler;
461
462 if (engine->u.eventport.nchanges != 0) {
463 if (nxt_eventport_commit_changes(engine) != NXT_OK) {
464 /* Error handlers have been enqueued on failure. */
465 timeout = 0;
466 }
467 }
468
469 if (timeout == NXT_INFINITE_MSEC) {
470 tp = NULL;
471
472 } else {
473 ts.tv_sec = timeout / 1000;
474 ts.tv_nsec = (timeout % 1000) * 1000000;
475 tp = &ts;
476 }
477
478 nxt_debug(&engine->task, "port_getn(%d) timeout: %M",
479 engine->u.eventport.fd, timeout);
480
481 /*
482 * A trap for possible error when Solaris does not update nevents
483 * if ETIME or EINTR is returned. This issue will be logged as
484 * "unexpected port_getn() event".
485 *
486 * The details are in OpenSolaris mailing list thread "port_getn()
487 * and timeouts - is this a bug or an undocumented feature?"
488 */
489 event = &engine->u.eventport.events[0];
490 event->portev_events = -1; /* invalid port events */
491 event->portev_source = -1; /* invalid port source */
492 event->portev_object = -1;
493 event->portev_user = (void *) -1;
494
495 nevents = 1;
496 n = port_getn(engine->u.eventport.fd, engine->u.eventport.events,
497 engine->u.eventport.mevents, &nevents, tp);
498
499 /*
500 * 32-bit port_getn() on Solaris 10 x86 returns large negative
501 * values instead of 0 when returning immediately.
502 */
503 err = (n < 0) ? nxt_errno : 0;
504
505 nxt_thread_time_update(engine->task.thread);
506
507 if (n == -1) {
508 if (err == NXT_ETIME || err == NXT_EINTR) {
509 if (nevents != 0) {
510 nxt_alert(&engine->task, "port_getn(%d) failed %E, events:%ud",
511 engine->u.eventport.fd, err, nevents);
512 }
513 }
514
515 if (err != NXT_ETIME) {
516 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
517
518 nxt_log(&engine->task, level, "port_getn(%d) failed %E",
519 engine->u.eventport.fd, err);
520
521 if (err != NXT_EINTR) {
522 return;
523 }
524 }
525 }
526
527 nxt_debug(&engine->task, "port_getn(%d) events: %d",
528 engine->u.eventport.fd, nevents);
529
530 for (i = 0; i < nevents; i++) {
531 event = &engine->u.eventport.events[i];
532
533 switch (event->portev_source) {
534
535 case PORT_SOURCE_FD:
536 ev = event->portev_user;
537 events = event->portev_events;
538
539 nxt_debug(ev->task, "eventport: fd:%d ev:%04Xd u:%p rd:%d wr:%d",
540 event->portev_object, events, ev, ev->read, ev->write);
541
542 if (nxt_slow_path(events & (POLLERR | POLLHUP | POLLNVAL)) != 0) {
543 nxt_alert(ev->task, "port_getn(%d) error fd:%d events:%04Xud",
544 engine->u.eventport.fd, ev->fd, events);
545
546 nxt_work_queue_add(&engine->fast_work_queue,
547 nxt_eventport_error_handler,
548 ev->task, ev, ev->data);
549 continue;
550 }
551
552 if (events & POLLIN) {
553 ev->read_ready = 1;
554
555 if (ev->read != NXT_EVENT_BLOCKED) {
556 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
557 ev->task, ev, ev->data);
558
559 }
560
561 if (ev->read != NXT_EVENT_LEVEL) {
562 ev->read = NXT_EVENT_INACTIVE;
563 }
564 }
565
566 if (events & POLLOUT) {
567 ev->write_ready = 1;
568
569 if (ev->write != NXT_EVENT_BLOCKED) {
570 nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
571 ev->task, ev, ev->data);
572 }
573
574 ev->write = NXT_EVENT_INACTIVE;
575 }
576
577 /*
578 * Reactivate counterpart direction, because the
579 * eventport is oneshot notification facility.
580 */
581 events = (ev->read == NXT_EVENT_INACTIVE) ? 0 : POLLIN;
582 events |= (ev->write == NXT_EVENT_INACTIVE) ? 0 : POLLOUT;
583
584 if (events != 0) {
585 nxt_eventport_enable_event(engine, ev, events);
586 }
587
588 break;
589
590 case PORT_SOURCE_USER:
591 nxt_debug(&engine->task, "eventport: user ev:%d u:%p",
592 event->portev_events, event->portev_user);
593
594 signo = event->portev_events;
595
596 handler = (signo == 0) ? engine->u.eventport.post_handler
597 : engine->u.eventport.signal_handler;
598
599 nxt_work_queue_add(&engine->fast_work_queue, handler,
600 &engine->task, (void *) (uintptr_t) signo, NULL);
601
602 break;
603
604 default:
605 nxt_alert(&engine->task,
606 "unexpected port_getn(%d) event: "
607 "ev:%d src:%d obj:%p u:%p",
608 engine->u.eventport.fd, event->portev_events,
609 event->portev_source, event->portev_object,
610 event->portev_user);
611 }
612 }
613 }
614