1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 static nxt_int_t nxt_select_create(nxt_event_engine_t *engine,
11 nxt_uint_t mchanges, nxt_uint_t mevents);
12 static void nxt_select_free(nxt_event_engine_t *engine);
13 static void nxt_select_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
14 static void nxt_select_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
15 static nxt_bool_t nxt_select_close(nxt_event_engine_t *engine,
16 nxt_fd_event_t *ev);
17 static void nxt_select_enable_read(nxt_event_engine_t *engine,
18 nxt_fd_event_t *ev);
19 static void nxt_select_enable_write(nxt_event_engine_t *engine,
20 nxt_fd_event_t *ev);
21 static void nxt_select_error_handler(nxt_task_t *task, void *obj, void *data);
22 static void nxt_select_disable_read(nxt_event_engine_t *engine,
23 nxt_fd_event_t *ev);
24 static void nxt_select_disable_write(nxt_event_engine_t *engine,
25 nxt_fd_event_t *ev);
26 static void nxt_select_block_read(nxt_event_engine_t *engine,
27 nxt_fd_event_t *ev);
28 static void nxt_select_block_write(nxt_event_engine_t *engine,
29 nxt_fd_event_t *ev);
30 static void nxt_select_oneshot_read(nxt_event_engine_t *engine,
31 nxt_fd_event_t *ev);
32 static void nxt_select_oneshot_write(nxt_event_engine_t *engine,
33 nxt_fd_event_t *ev);
34 static void nxt_select_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
35
36
37 const nxt_event_interface_t nxt_select_engine = {
38 "select",
39 nxt_select_create,
40 nxt_select_free,
41 nxt_select_enable,
42 nxt_select_disable,
43 nxt_select_disable,
44 nxt_select_close,
45 nxt_select_enable_read,
46 nxt_select_enable_write,
47 nxt_select_disable_read,
48 nxt_select_disable_write,
49 nxt_select_block_read,
50 nxt_select_block_write,
51 nxt_select_oneshot_read,
52 nxt_select_oneshot_write,
53 nxt_select_enable_read,
54 NULL,
55 NULL,
56 NULL,
57 NULL,
58 nxt_select_poll,
59
60 &nxt_unix_conn_io,
61
62 NXT_NO_FILE_EVENTS,
63 NXT_NO_SIGNAL_EVENTS,
64 };
65
66
67 static nxt_int_t
nxt_select_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)68 nxt_select_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
69 nxt_uint_t mevents)
70 {
71 engine->u.select.nfds = -1;
72 engine->u.select.update_nfds = 0;
73
74 engine->u.select.events = nxt_zalloc(FD_SETSIZE * sizeof(nxt_fd_event_t *));
75
76 if (engine->u.select.events != NULL) {
77 return NXT_OK;
78 }
79
80 nxt_select_free(engine);
81
82 return NXT_ERROR;
83 }
84
85
86 static void
nxt_select_free(nxt_event_engine_t * engine)87 nxt_select_free(nxt_event_engine_t *engine)
88 {
89 nxt_debug(&engine->task, "select free");
90
91 nxt_free(engine->u.select.events);
92
93 nxt_memzero(&engine->u.select, sizeof(nxt_select_engine_t));
94 }
95
96
97 static void
nxt_select_enable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)98 nxt_select_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
99 {
100 nxt_select_enable_read(engine, ev);
101 nxt_select_enable_write(engine, ev);
102 }
103
104
105 static void
nxt_select_disable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)106 nxt_select_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
107 {
108 if (ev->read != NXT_EVENT_INACTIVE) {
109 nxt_select_disable_read(engine, ev);
110 }
111
112 if (ev->write != NXT_EVENT_INACTIVE) {
113 nxt_select_disable_write(engine, ev);
114 }
115 }
116
117
118 static nxt_bool_t
nxt_select_close(nxt_event_engine_t * engine,nxt_fd_event_t * ev)119 nxt_select_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
120 {
121 nxt_select_disable(engine, ev);
122
123 return 0;
124 }
125
126
127 static void
nxt_select_enable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)128 nxt_select_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
129 {
130 nxt_fd_t fd;
131
132 fd = ev->fd;
133
134 nxt_debug(ev->task, "select enable read: fd:%d", fd);
135
136 if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
137 nxt_work_queue_add(&engine->fast_work_queue, nxt_select_error_handler,
138 ev->task, ev, ev->data);
139 return;
140 }
141
142 ev->read = NXT_EVENT_ACTIVE;
143
144 FD_SET(fd, &engine->u.select.main_read_fd_set);
145 engine->u.select.events[fd] = ev;
146
147 if (engine->u.select.nfds < fd) {
148 engine->u.select.nfds = fd;
149 engine->u.select.update_nfds = 0;
150 }
151 }
152
153
154 static void
nxt_select_enable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)155 nxt_select_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
156 {
157 nxt_fd_t fd;
158
159 fd = ev->fd;
160
161 nxt_debug(ev->task, "select enable write: fd:%d", fd);
162
163 if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
164 nxt_work_queue_add(&engine->fast_work_queue, nxt_select_error_handler,
165 ev->task, ev, ev->data);
166 return;
167 }
168
169 ev->write = NXT_EVENT_ACTIVE;
170
171 FD_SET(fd, &engine->u.select.main_write_fd_set);
172 engine->u.select.events[fd] = ev;
173
174 if (engine->u.select.nfds < fd) {
175 engine->u.select.nfds = fd;
176 engine->u.select.update_nfds = 0;
177 }
178 }
179
180
181 static void
nxt_select_error_handler(nxt_task_t * task,void * obj,void * data)182 nxt_select_error_handler(nxt_task_t *task, void *obj, void *data)
183 {
184 nxt_fd_event_t *ev;
185
186 ev = obj;
187
188 ev->read = NXT_EVENT_INACTIVE;
189 ev->write = NXT_EVENT_INACTIVE;
190
191 ev->error_handler(task, ev, data);
192 }
193
194
195 static void
nxt_select_disable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)196 nxt_select_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
197 {
198 nxt_fd_t fd;
199
200 fd = ev->fd;
201
202 nxt_debug(ev->task, "select disable read: fd:%d", fd);
203
204 if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
205 return;
206 }
207
208 FD_CLR(fd, &engine->u.select.main_read_fd_set);
209
210 ev->read = NXT_EVENT_INACTIVE;
211
212 if (ev->write == NXT_EVENT_INACTIVE) {
213 engine->u.select.events[fd] = NULL;
214 engine->u.select.update_nfds = 1;
215 }
216 }
217
218
219 static void
nxt_select_disable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)220 nxt_select_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
221 {
222 nxt_fd_t fd;
223
224 fd = ev->fd;
225
226 nxt_debug(ev->task, "select disable write: fd:%d", fd);
227
228 if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
229 return;
230 }
231
232 FD_CLR(fd, &engine->u.select.main_write_fd_set);
233
234 ev->write = NXT_EVENT_INACTIVE;
235
236 if (ev->read == NXT_EVENT_INACTIVE) {
237 engine->u.select.events[fd] = NULL;
238 engine->u.select.update_nfds = 1;
239 }
240 }
241
242
243 static void
nxt_select_block_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)244 nxt_select_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
245 {
246 if (ev->read != NXT_EVENT_INACTIVE) {
247 nxt_select_disable_read(engine, ev);
248 }
249 }
250
251
252 static void
nxt_select_block_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)253 nxt_select_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
254 {
255 if (ev->write != NXT_EVENT_INACTIVE) {
256 nxt_select_disable_write(engine, ev);
257 }
258 }
259
260
261 static void
nxt_select_oneshot_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)262 nxt_select_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
263 {
264 nxt_select_enable_read(engine, ev);
265
266 ev->read = NXT_EVENT_ONESHOT;
267 }
268
269
270 static void
nxt_select_oneshot_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)271 nxt_select_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
272 {
273 nxt_select_enable_write(engine, ev);
274
275 ev->write = NXT_EVENT_ONESHOT;
276 }
277
278
279 static void
nxt_select_poll(nxt_event_engine_t * engine,nxt_msec_t timeout)280 nxt_select_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
281 {
282 int nevents, nfds, found;
283 nxt_err_t err;
284 nxt_int_t i;
285 nxt_uint_t fd, level;
286 nxt_fd_event_t *ev;
287 struct timeval tv, *tp;
288
289 if (timeout == NXT_INFINITE_MSEC) {
290 tp = NULL;
291
292 } else {
293 tv.tv_sec = (long) (timeout / 1000);
294 tv.tv_usec = (long) ((timeout % 1000) * 1000);
295 tp = &tv;
296 }
297
298 if (engine->u.select.update_nfds) {
299 for (i = engine->u.select.nfds; i >= 0; i--) {
300 if (engine->u.select.events[i] != NULL) {
301 engine->u.select.nfds = i;
302 engine->u.select.update_nfds = 0;
303 break;
304 }
305 }
306 }
307
308 engine->u.select.work_read_fd_set = engine->u.select.main_read_fd_set;
309 engine->u.select.work_write_fd_set = engine->u.select.main_write_fd_set;
310
311 nfds = engine->u.select.nfds + 1;
312
313 nxt_debug(&engine->task, "select() nfds:%d timeout:%M", nfds, timeout);
314
315 nevents = select(nfds, &engine->u.select.work_read_fd_set,
316 &engine->u.select.work_write_fd_set, NULL, tp);
317
318 err = (nevents == -1) ? nxt_errno : 0;
319
320 nxt_thread_time_update(engine->task.thread);
321
322 nxt_debug(&engine->task, "select(): %d", nevents);
323
324 if (nevents == -1) {
325 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
326 nxt_log(&engine->task, level, "select() failed %E", err);
327 return;
328 }
329
330 for (fd = 0; fd < (nxt_uint_t) nfds && nevents != 0; fd++) {
331
332 found = 0;
333
334 if (FD_ISSET(fd, &engine->u.select.work_read_fd_set)) {
335 ev = engine->u.select.events[fd];
336
337 nxt_debug(ev->task, "select() fd:%ui read rd:%d wr:%d",
338 fd, ev->read, ev->write);
339
340 ev->read_ready = 1;
341
342 if (ev->read == NXT_EVENT_ONESHOT) {
343 nxt_select_disable_read(engine, ev);
344 }
345
346 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
347 ev->task, ev, ev->data);
348 found = 1;
349 }
350
351 if (FD_ISSET(fd, &engine->u.select.work_write_fd_set)) {
352 ev = engine->u.select.events[fd];
353
354 nxt_debug(ev->task, "select() fd:%ui write rd:%d wr:%d",
355 fd, ev->read, ev->write);
356
357 ev->write_ready = 1;
358
359 if (ev->write == NXT_EVENT_ONESHOT) {
360 nxt_select_disable_write(engine, ev);
361 }
362
363 nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
364 ev->task, ev, ev->data);
365 found = 1;
366 }
367
368 nevents -= found;
369 }
370 }
371