xref: /unit/src/nxt_select_engine.c (revision 62:5e1efcc7b740)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_select_create(nxt_event_engine_t *engine,
11     nxt_uint_t mchanges, nxt_uint_t mevents);
12 static void nxt_select_free(nxt_event_engine_t *engine);
13 static void nxt_select_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
14 static void nxt_select_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
15 static nxt_bool_t nxt_select_close(nxt_event_engine_t *engine,
16     nxt_fd_event_t *ev);
17 static void nxt_select_enable_read(nxt_event_engine_t *engine,
18     nxt_fd_event_t *ev);
19 static void nxt_select_enable_write(nxt_event_engine_t *engine,
20     nxt_fd_event_t *ev);
21 static void nxt_select_error_handler(nxt_task_t *task, void *obj, void *data);
22 static void nxt_select_disable_read(nxt_event_engine_t *engine,
23     nxt_fd_event_t *ev);
24 static void nxt_select_disable_write(nxt_event_engine_t *engine,
25     nxt_fd_event_t *ev);
26 static void nxt_select_block_read(nxt_event_engine_t *engine,
27     nxt_fd_event_t *ev);
28 static void nxt_select_block_write(nxt_event_engine_t *engine,
29     nxt_fd_event_t *ev);
30 static void nxt_select_oneshot_read(nxt_event_engine_t *engine,
31     nxt_fd_event_t *ev);
32 static void nxt_select_oneshot_write(nxt_event_engine_t *engine,
33     nxt_fd_event_t *ev);
34 static void nxt_select_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
35 
36 
37 const nxt_event_interface_t  nxt_select_engine = {
38     "select",
39     nxt_select_create,
40     nxt_select_free,
41     nxt_select_enable,
42     nxt_select_disable,
43     nxt_select_disable,
44     nxt_select_close,
45     nxt_select_enable_read,
46     nxt_select_enable_write,
47     nxt_select_disable_read,
48     nxt_select_disable_write,
49     nxt_select_block_read,
50     nxt_select_block_write,
51     nxt_select_oneshot_read,
52     nxt_select_oneshot_write,
53     nxt_select_enable_read,
54     NULL,
55     NULL,
56     NULL,
57     NULL,
58     nxt_select_poll,
59 
60     &nxt_unix_conn_io,
61 
62     NXT_NO_FILE_EVENTS,
63     NXT_NO_SIGNAL_EVENTS,
64 };
65 
66 
67 static nxt_int_t
nxt_select_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)68 nxt_select_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
69     nxt_uint_t mevents)
70 {
71     engine->u.select.nfds = -1;
72     engine->u.select.update_nfds = 0;
73 
74     engine->u.select.events = nxt_zalloc(FD_SETSIZE * sizeof(nxt_fd_event_t *));
75 
76     if (engine->u.select.events != NULL) {
77         return NXT_OK;
78     }
79 
80     nxt_select_free(engine);
81 
82     return NXT_ERROR;
83 }
84 
85 
86 static void
nxt_select_free(nxt_event_engine_t * engine)87 nxt_select_free(nxt_event_engine_t *engine)
88 {
89     nxt_debug(&engine->task, "select free");
90 
91     nxt_free(engine->u.select.events);
92 
93     nxt_memzero(&engine->u.select, sizeof(nxt_select_engine_t));
94 }
95 
96 
97 static void
nxt_select_enable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)98 nxt_select_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
99 {
100     nxt_select_enable_read(engine, ev);
101     nxt_select_enable_write(engine, ev);
102 }
103 
104 
105 static void
nxt_select_disable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)106 nxt_select_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
107 {
108     if (ev->read != NXT_EVENT_INACTIVE) {
109         nxt_select_disable_read(engine, ev);
110     }
111 
112     if (ev->write != NXT_EVENT_INACTIVE) {
113         nxt_select_disable_write(engine, ev);
114     }
115 }
116 
117 
118 static nxt_bool_t
nxt_select_close(nxt_event_engine_t * engine,nxt_fd_event_t * ev)119 nxt_select_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
120 {
121     nxt_select_disable(engine, ev);
122 
123     return 0;
124 }
125 
126 
127 static void
nxt_select_enable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)128 nxt_select_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
129 {
130     nxt_fd_t  fd;
131 
132     fd = ev->fd;
133 
134     nxt_debug(ev->task, "select enable read: fd:%d", fd);
135 
136     if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
137         nxt_work_queue_add(&engine->fast_work_queue, nxt_select_error_handler,
138                            ev->task, ev, ev->data);
139         return;
140     }
141 
142     ev->read = NXT_EVENT_ACTIVE;
143 
144     FD_SET(fd, &engine->u.select.main_read_fd_set);
145     engine->u.select.events[fd] = ev;
146 
147     if (engine->u.select.nfds < fd) {
148         engine->u.select.nfds = fd;
149         engine->u.select.update_nfds = 0;
150     }
151 }
152 
153 
154 static void
nxt_select_enable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)155 nxt_select_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
156 {
157     nxt_fd_t  fd;
158 
159     fd = ev->fd;
160 
161     nxt_debug(ev->task, "select enable write: fd:%d", fd);
162 
163     if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
164         nxt_work_queue_add(&engine->fast_work_queue, nxt_select_error_handler,
165                            ev->task, ev, ev->data);
166         return;
167     }
168 
169     ev->write = NXT_EVENT_ACTIVE;
170 
171     FD_SET(fd, &engine->u.select.main_write_fd_set);
172     engine->u.select.events[fd] = ev;
173 
174     if (engine->u.select.nfds < fd) {
175         engine->u.select.nfds = fd;
176         engine->u.select.update_nfds = 0;
177     }
178 }
179 
180 
181 static void
nxt_select_error_handler(nxt_task_t * task,void * obj,void * data)182 nxt_select_error_handler(nxt_task_t *task, void *obj, void *data)
183 {
184     nxt_fd_event_t  *ev;
185 
186     ev = obj;
187 
188     ev->read = NXT_EVENT_INACTIVE;
189     ev->write = NXT_EVENT_INACTIVE;
190 
191     ev->error_handler(task, ev, data);
192 }
193 
194 
195 static void
nxt_select_disable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)196 nxt_select_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
197 {
198     nxt_fd_t  fd;
199 
200     fd = ev->fd;
201 
202     nxt_debug(ev->task, "select disable read: fd:%d", fd);
203 
204     if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
205         return;
206     }
207 
208     FD_CLR(fd, &engine->u.select.main_read_fd_set);
209 
210     ev->read = NXT_EVENT_INACTIVE;
211 
212     if (ev->write == NXT_EVENT_INACTIVE) {
213         engine->u.select.events[fd] = NULL;
214         engine->u.select.update_nfds = 1;
215     }
216 }
217 
218 
219 static void
nxt_select_disable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)220 nxt_select_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
221 {
222     nxt_fd_t  fd;
223 
224     fd = ev->fd;
225 
226     nxt_debug(ev->task, "select disable write: fd:%d", fd);
227 
228     if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
229         return;
230     }
231 
232     FD_CLR(fd, &engine->u.select.main_write_fd_set);
233 
234     ev->write = NXT_EVENT_INACTIVE;
235 
236     if (ev->read == NXT_EVENT_INACTIVE) {
237         engine->u.select.events[fd] = NULL;
238         engine->u.select.update_nfds = 1;
239     }
240 }
241 
242 
243 static void
nxt_select_block_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)244 nxt_select_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
245 {
246     if (ev->read != NXT_EVENT_INACTIVE) {
247         nxt_select_disable_read(engine, ev);
248     }
249 }
250 
251 
252 static void
nxt_select_block_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)253 nxt_select_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
254 {
255     if (ev->write != NXT_EVENT_INACTIVE) {
256         nxt_select_disable_write(engine, ev);
257     }
258 }
259 
260 
261 static void
nxt_select_oneshot_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)262 nxt_select_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
263 {
264     nxt_select_enable_read(engine, ev);
265 
266     ev->read = NXT_EVENT_ONESHOT;
267 }
268 
269 
270 static void
nxt_select_oneshot_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)271 nxt_select_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
272 {
273     nxt_select_enable_write(engine, ev);
274 
275     ev->write = NXT_EVENT_ONESHOT;
276 }
277 
278 
279 static void
nxt_select_poll(nxt_event_engine_t * engine,nxt_msec_t timeout)280 nxt_select_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
281 {
282     int             nevents, nfds, found;
283     nxt_err_t       err;
284     nxt_int_t       i;
285     nxt_uint_t      fd, level;
286     nxt_fd_event_t  *ev;
287     struct timeval  tv, *tp;
288 
289     if (timeout == NXT_INFINITE_MSEC) {
290         tp = NULL;
291 
292     } else {
293         tv.tv_sec = (long) (timeout / 1000);
294         tv.tv_usec = (long) ((timeout % 1000) * 1000);
295         tp = &tv;
296     }
297 
298     if (engine->u.select.update_nfds) {
299         for (i = engine->u.select.nfds; i >= 0; i--) {
300             if (engine->u.select.events[i] != NULL) {
301                 engine->u.select.nfds = i;
302                 engine->u.select.update_nfds = 0;
303                 break;
304             }
305         }
306     }
307 
308     engine->u.select.work_read_fd_set = engine->u.select.main_read_fd_set;
309     engine->u.select.work_write_fd_set = engine->u.select.main_write_fd_set;
310 
311     nfds = engine->u.select.nfds + 1;
312 
313     nxt_debug(&engine->task, "select() nfds:%d timeout:%M", nfds, timeout);
314 
315     nevents = select(nfds, &engine->u.select.work_read_fd_set,
316                      &engine->u.select.work_write_fd_set, NULL, tp);
317 
318     err = (nevents == -1) ? nxt_errno : 0;
319 
320     nxt_thread_time_update(engine->task.thread);
321 
322     nxt_debug(&engine->task, "select(): %d", nevents);
323 
324     if (nevents == -1) {
325         level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
326         nxt_log(&engine->task, level, "select() failed %E", err);
327         return;
328     }
329 
330     for (fd = 0; fd < (nxt_uint_t) nfds && nevents != 0; fd++) {
331 
332         found = 0;
333 
334         if (FD_ISSET(fd, &engine->u.select.work_read_fd_set)) {
335             ev = engine->u.select.events[fd];
336 
337             nxt_debug(ev->task, "select() fd:%ui read rd:%d wr:%d",
338                       fd, ev->read, ev->write);
339 
340             ev->read_ready = 1;
341 
342             if (ev->read == NXT_EVENT_ONESHOT) {
343                 nxt_select_disable_read(engine, ev);
344             }
345 
346             nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
347                                ev->task, ev, ev->data);
348             found = 1;
349         }
350 
351         if (FD_ISSET(fd, &engine->u.select.work_write_fd_set)) {
352             ev = engine->u.select.events[fd];
353 
354             nxt_debug(ev->task, "select() fd:%ui write rd:%d wr:%d",
355                       fd, ev->read, ev->write);
356 
357             ev->write_ready = 1;
358 
359             if (ev->write == NXT_EVENT_ONESHOT) {
360                 nxt_select_disable_write(engine, ev);
361             }
362 
363             nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
364                                ev->task, ev, ev->data);
365             found = 1;
366         }
367 
368         nevents -= found;
369     }
370 }
371