nxt_event_engine.c (358:40bbd4c2349d) nxt_event_engine.c (494:7c83ddcc1c42)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10typedef struct nxt_mem_cache_block_s nxt_mem_cache_block_t;
11
12struct nxt_mem_cache_block_s {
13 nxt_mem_cache_block_t *next;
14};
15
16
17typedef struct {
18 nxt_mem_cache_block_t *free;
19 uint32_t size;
20 uint32_t count;
21} nxt_mem_cache_t;
22
23
24static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
25static nxt_int_t nxt_event_engine_signal_pipe_create(
26 nxt_event_engine_t *engine);
27static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
28 void *data);
29static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
30 void *data);
31static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
32 void *data);
33static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
34 void *data);
35static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
36 void *data);
37static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
38 nxt_task_t **task, void **obj, void **data);
39
40
41nxt_event_engine_t *
42nxt_event_engine_create(nxt_task_t *task,
43 const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
44 nxt_uint_t flags, nxt_uint_t batch)
45{
46 nxt_uint_t events;
47 nxt_thread_t *thread;
48 nxt_event_engine_t *engine;
49
50 engine = nxt_zalloc(sizeof(nxt_event_engine_t));
51 if (engine == NULL) {
52 return NULL;
53 }
54
55 nxt_debug(task, "create engine %p", engine);
56
57 thread = task->thread;
58
59 engine->task.thread = thread;
60 engine->task.log = thread->log;
61 engine->task.ident = nxt_task_next_ident();
62
63 engine->batch = batch;
64
65#if 0
66 if (flags & NXT_ENGINE_FIBERS) {
67 engine->fibers = nxt_fiber_main_create(engine);
68 if (engine->fibers == NULL) {
69 goto fibers_fail;
70 }
71 }
72#endif
73
74 engine->current_work_queue = &engine->fast_work_queue;
75
76 nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
77
78 engine->fast_work_queue.cache = &engine->work_queue_cache;
79 engine->accept_work_queue.cache = &engine->work_queue_cache;
80 engine->read_work_queue.cache = &engine->work_queue_cache;
81 engine->socket_work_queue.cache = &engine->work_queue_cache;
82 engine->connect_work_queue.cache = &engine->work_queue_cache;
83 engine->write_work_queue.cache = &engine->work_queue_cache;
84 engine->shutdown_work_queue.cache = &engine->work_queue_cache;
85 engine->close_work_queue.cache = &engine->work_queue_cache;
86
87 nxt_work_queue_name(&engine->fast_work_queue, "fast");
88 nxt_work_queue_name(&engine->accept_work_queue, "accept");
89 nxt_work_queue_name(&engine->read_work_queue, "read");
90 nxt_work_queue_name(&engine->socket_work_queue, "socket");
91 nxt_work_queue_name(&engine->connect_work_queue, "connect");
92 nxt_work_queue_name(&engine->write_work_queue, "write");
93 nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
94 nxt_work_queue_name(&engine->close_work_queue, "close");
95
96 if (signals != NULL) {
97 engine->signals = nxt_event_engine_signals(signals);
98 if (engine->signals == NULL) {
99 goto signals_fail;
100 }
101
102 engine->signals->handler = nxt_event_engine_signal_handler;
103
104 if (!interface->signal_support) {
105 if (nxt_event_engine_signals_start(engine) != NXT_OK) {
106 goto signals_fail;
107 }
108 }
109 }
110
111 /*
112 * Number of event set and timers changes should be at least twice
113 * more than number of events to avoid premature flushes of the changes.
114 * Fourfold is for sure.
115 */
116 events = (batch != 0) ? batch : 32;
117
118 if (interface->create(engine, 4 * events, events) != NXT_OK) {
119 goto event_set_fail;
120 }
121
122 engine->event = *interface;
123
124 if (nxt_event_engine_post_init(engine) != NXT_OK) {
125 goto post_fail;
126 }
127
128 if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) {
129 goto timers_fail;
130 }
131
132 thread = task->thread;
133
134 nxt_thread_time_update(thread);
135 engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000;
136
137 engine->max_connections = 0xffffffff;
138
139 nxt_queue_init(&engine->joints);
140 nxt_queue_init(&engine->listen_connections);
141 nxt_queue_init(&engine->idle_connections);
142
143 return engine;
144
145timers_fail:
146post_fail:
147
148 interface->free(engine);
149
150event_set_fail:
151signals_fail:
152
153 nxt_free(engine->signals);
154 nxt_work_queue_cache_destroy(&engine->work_queue_cache);
155 nxt_free(engine->fibers);
156
157#if 0
158fibers_fail:
159
160 nxt_free(engine);
161#endif
162
163 return NULL;
164}
165
166
167static nxt_int_t
168nxt_event_engine_post_init(nxt_event_engine_t *engine)
169{
170 if (engine->event.enable_post != NULL) {
171 return engine->event.enable_post(engine, nxt_event_engine_post_handler);
172 }
173
174 if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
175 return NXT_ERROR;
176 }
177
178 return NXT_OK;
179}
180
181
182static nxt_int_t
183nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
184{
185 nxt_event_engine_pipe_t *pipe;
186
187 pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
188 if (pipe == NULL) {
189 return NXT_ERROR;
190 }
191
192 engine->pipe = pipe;
193
194 /*
195 * An event engine pipe is in blocking mode for writer
196 * and in non-blocking node for reader.
197 */
198
199 if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) {
200 nxt_free(pipe);
201 return NXT_ERROR;
202 }
203
204 pipe->event.fd = pipe->fds[0];
205 pipe->event.task = &engine->task;
206 pipe->event.read_work_queue = &engine->fast_work_queue;
207 pipe->event.read_handler = nxt_event_engine_signal_pipe;
208 pipe->event.write_work_queue = &engine->fast_work_queue;
209 pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
210 pipe->event.log = engine->task.log;
211
212 nxt_fd_event_enable_read(engine, &pipe->event);
213
214 return NXT_OK;
215}
216
217
218static void
219nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
220{
221 nxt_event_engine_pipe_t *pipe;
222
223 pipe = engine->pipe;
224
225 if (pipe != NULL) {
226
227 if (pipe->event.read_work_queue != NULL) {
228 nxt_fd_event_close(engine, &pipe->event);
229 nxt_pipe_close(pipe->event.task, pipe->fds);
230 }
231
232 nxt_free(pipe);
233 }
234}
235
236
237static void
238nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
239{
240 nxt_event_engine_pipe_t *pipe;
241
242 pipe = obj;
243
244 nxt_pipe_close(pipe->event.task, pipe->fds);
245 nxt_free(pipe);
246}
247
248
249void
250nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
251{
252 nxt_debug(&engine->task, "event engine post");
253
254#if (NXT_DEBUG)
255 if (nxt_slow_path(work->next != NULL)) {
256 nxt_debug(&engine->task, "event engine post multiple works");
257 }
258#endif
259
260 nxt_locked_work_queue_add(&engine->locked_work_queue, work);
261
262 nxt_event_engine_signal(engine, 0);
263}
264
265
266void
267nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
268{
269 u_char buf;
270
271 nxt_debug(&engine->task, "event engine signal:%ui", signo);
272
273 /*
274 * A signal number may be sent in a signal context, so the signal
275 * information cannot be passed via a locked work queue.
276 */
277
278 if (engine->event.signal != NULL) {
279 engine->event.signal(engine, signo);
280 return;
281 }
282
283 buf = (u_char) signo;
284 (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
285}
286
287
288static void
289nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
290{
291 int i, n;
292 u_char signo;
293 nxt_bool_t post;
294 nxt_fd_event_t *ev;
295 u_char buf[128];
296
297 ev = obj;
298
299 nxt_debug(task, "engine signal pipe");
300
301 post = 0;
302
303 do {
304 n = nxt_fd_read(ev->fd, buf, sizeof(buf));
305
306 for (i = 0; i < n; i++) {
307 signo = buf[i];
308
309 nxt_debug(task, "engine pipe signo:%d", signo);
310
311 if (signo == 0) {
312 /* A post should be processed only once. */
313 post = 1;
314
315 } else {
316 nxt_event_engine_signal_handler(task,
317 (void *) (uintptr_t) signo, NULL);
318 }
319 }
320
321 } while (n == sizeof(buf));
322
323 if (post) {
324 nxt_event_engine_post_handler(task, NULL, NULL);
325 }
326}
327
328
329static void
330nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
331{
332 nxt_thread_t *thread;
333 nxt_event_engine_t *engine;
334
335 thread = task->thread;
336 engine = thread->engine;
337
338 nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
339 &engine->fast_work_queue);
340}
341
342
343static void
344nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
345{
346 nxt_event_engine_t *engine;
347 nxt_event_engine_pipe_t *pipe;
348
349 engine = task->thread->engine;
350 pipe = engine->pipe;
351
352 nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
353 pipe->fds[0], pipe->fds[1]);
354
355 nxt_fd_event_close(engine, &pipe->event);
356 nxt_pipe_close(pipe->event.task, pipe->fds);
357}
358
359
360static void
361nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
362{
363 uintptr_t signo;
364 const nxt_sig_event_t *sigev;
365
366 signo = (uintptr_t) obj;
367
368 for (sigev = task->thread->engine->signals->sigev;
369 sigev->signo != 0;
370 sigev++)
371 {
372 if (signo == (nxt_uint_t) sigev->signo) {
373 sigev->handler(task, (void *) signo, (void *) sigev->name);
374 return;
375 }
376 }
377
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10typedef struct nxt_mem_cache_block_s nxt_mem_cache_block_t;
11
12struct nxt_mem_cache_block_s {
13 nxt_mem_cache_block_t *next;
14};
15
16
17typedef struct {
18 nxt_mem_cache_block_t *free;
19 uint32_t size;
20 uint32_t count;
21} nxt_mem_cache_t;
22
23
24static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
25static nxt_int_t nxt_event_engine_signal_pipe_create(
26 nxt_event_engine_t *engine);
27static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
28 void *data);
29static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
30 void *data);
31static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
32 void *data);
33static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
34 void *data);
35static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
36 void *data);
37static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
38 nxt_task_t **task, void **obj, void **data);
39
40
41nxt_event_engine_t *
42nxt_event_engine_create(nxt_task_t *task,
43 const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
44 nxt_uint_t flags, nxt_uint_t batch)
45{
46 nxt_uint_t events;
47 nxt_thread_t *thread;
48 nxt_event_engine_t *engine;
49
50 engine = nxt_zalloc(sizeof(nxt_event_engine_t));
51 if (engine == NULL) {
52 return NULL;
53 }
54
55 nxt_debug(task, "create engine %p", engine);
56
57 thread = task->thread;
58
59 engine->task.thread = thread;
60 engine->task.log = thread->log;
61 engine->task.ident = nxt_task_next_ident();
62
63 engine->batch = batch;
64
65#if 0
66 if (flags & NXT_ENGINE_FIBERS) {
67 engine->fibers = nxt_fiber_main_create(engine);
68 if (engine->fibers == NULL) {
69 goto fibers_fail;
70 }
71 }
72#endif
73
74 engine->current_work_queue = &engine->fast_work_queue;
75
76 nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
77
78 engine->fast_work_queue.cache = &engine->work_queue_cache;
79 engine->accept_work_queue.cache = &engine->work_queue_cache;
80 engine->read_work_queue.cache = &engine->work_queue_cache;
81 engine->socket_work_queue.cache = &engine->work_queue_cache;
82 engine->connect_work_queue.cache = &engine->work_queue_cache;
83 engine->write_work_queue.cache = &engine->work_queue_cache;
84 engine->shutdown_work_queue.cache = &engine->work_queue_cache;
85 engine->close_work_queue.cache = &engine->work_queue_cache;
86
87 nxt_work_queue_name(&engine->fast_work_queue, "fast");
88 nxt_work_queue_name(&engine->accept_work_queue, "accept");
89 nxt_work_queue_name(&engine->read_work_queue, "read");
90 nxt_work_queue_name(&engine->socket_work_queue, "socket");
91 nxt_work_queue_name(&engine->connect_work_queue, "connect");
92 nxt_work_queue_name(&engine->write_work_queue, "write");
93 nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
94 nxt_work_queue_name(&engine->close_work_queue, "close");
95
96 if (signals != NULL) {
97 engine->signals = nxt_event_engine_signals(signals);
98 if (engine->signals == NULL) {
99 goto signals_fail;
100 }
101
102 engine->signals->handler = nxt_event_engine_signal_handler;
103
104 if (!interface->signal_support) {
105 if (nxt_event_engine_signals_start(engine) != NXT_OK) {
106 goto signals_fail;
107 }
108 }
109 }
110
111 /*
112 * Number of event set and timers changes should be at least twice
113 * more than number of events to avoid premature flushes of the changes.
114 * Fourfold is for sure.
115 */
116 events = (batch != 0) ? batch : 32;
117
118 if (interface->create(engine, 4 * events, events) != NXT_OK) {
119 goto event_set_fail;
120 }
121
122 engine->event = *interface;
123
124 if (nxt_event_engine_post_init(engine) != NXT_OK) {
125 goto post_fail;
126 }
127
128 if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) {
129 goto timers_fail;
130 }
131
132 thread = task->thread;
133
134 nxt_thread_time_update(thread);
135 engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000;
136
137 engine->max_connections = 0xffffffff;
138
139 nxt_queue_init(&engine->joints);
140 nxt_queue_init(&engine->listen_connections);
141 nxt_queue_init(&engine->idle_connections);
142
143 return engine;
144
145timers_fail:
146post_fail:
147
148 interface->free(engine);
149
150event_set_fail:
151signals_fail:
152
153 nxt_free(engine->signals);
154 nxt_work_queue_cache_destroy(&engine->work_queue_cache);
155 nxt_free(engine->fibers);
156
157#if 0
158fibers_fail:
159
160 nxt_free(engine);
161#endif
162
163 return NULL;
164}
165
166
167static nxt_int_t
168nxt_event_engine_post_init(nxt_event_engine_t *engine)
169{
170 if (engine->event.enable_post != NULL) {
171 return engine->event.enable_post(engine, nxt_event_engine_post_handler);
172 }
173
174 if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
175 return NXT_ERROR;
176 }
177
178 return NXT_OK;
179}
180
181
182static nxt_int_t
183nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
184{
185 nxt_event_engine_pipe_t *pipe;
186
187 pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
188 if (pipe == NULL) {
189 return NXT_ERROR;
190 }
191
192 engine->pipe = pipe;
193
194 /*
195 * An event engine pipe is in blocking mode for writer
196 * and in non-blocking node for reader.
197 */
198
199 if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) {
200 nxt_free(pipe);
201 return NXT_ERROR;
202 }
203
204 pipe->event.fd = pipe->fds[0];
205 pipe->event.task = &engine->task;
206 pipe->event.read_work_queue = &engine->fast_work_queue;
207 pipe->event.read_handler = nxt_event_engine_signal_pipe;
208 pipe->event.write_work_queue = &engine->fast_work_queue;
209 pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
210 pipe->event.log = engine->task.log;
211
212 nxt_fd_event_enable_read(engine, &pipe->event);
213
214 return NXT_OK;
215}
216
217
218static void
219nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
220{
221 nxt_event_engine_pipe_t *pipe;
222
223 pipe = engine->pipe;
224
225 if (pipe != NULL) {
226
227 if (pipe->event.read_work_queue != NULL) {
228 nxt_fd_event_close(engine, &pipe->event);
229 nxt_pipe_close(pipe->event.task, pipe->fds);
230 }
231
232 nxt_free(pipe);
233 }
234}
235
236
237static void
238nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
239{
240 nxt_event_engine_pipe_t *pipe;
241
242 pipe = obj;
243
244 nxt_pipe_close(pipe->event.task, pipe->fds);
245 nxt_free(pipe);
246}
247
248
249void
250nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
251{
252 nxt_debug(&engine->task, "event engine post");
253
254#if (NXT_DEBUG)
255 if (nxt_slow_path(work->next != NULL)) {
256 nxt_debug(&engine->task, "event engine post multiple works");
257 }
258#endif
259
260 nxt_locked_work_queue_add(&engine->locked_work_queue, work);
261
262 nxt_event_engine_signal(engine, 0);
263}
264
265
266void
267nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
268{
269 u_char buf;
270
271 nxt_debug(&engine->task, "event engine signal:%ui", signo);
272
273 /*
274 * A signal number may be sent in a signal context, so the signal
275 * information cannot be passed via a locked work queue.
276 */
277
278 if (engine->event.signal != NULL) {
279 engine->event.signal(engine, signo);
280 return;
281 }
282
283 buf = (u_char) signo;
284 (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
285}
286
287
288static void
289nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
290{
291 int i, n;
292 u_char signo;
293 nxt_bool_t post;
294 nxt_fd_event_t *ev;
295 u_char buf[128];
296
297 ev = obj;
298
299 nxt_debug(task, "engine signal pipe");
300
301 post = 0;
302
303 do {
304 n = nxt_fd_read(ev->fd, buf, sizeof(buf));
305
306 for (i = 0; i < n; i++) {
307 signo = buf[i];
308
309 nxt_debug(task, "engine pipe signo:%d", signo);
310
311 if (signo == 0) {
312 /* A post should be processed only once. */
313 post = 1;
314
315 } else {
316 nxt_event_engine_signal_handler(task,
317 (void *) (uintptr_t) signo, NULL);
318 }
319 }
320
321 } while (n == sizeof(buf));
322
323 if (post) {
324 nxt_event_engine_post_handler(task, NULL, NULL);
325 }
326}
327
328
329static void
330nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
331{
332 nxt_thread_t *thread;
333 nxt_event_engine_t *engine;
334
335 thread = task->thread;
336 engine = thread->engine;
337
338 nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
339 &engine->fast_work_queue);
340}
341
342
343static void
344nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
345{
346 nxt_event_engine_t *engine;
347 nxt_event_engine_pipe_t *pipe;
348
349 engine = task->thread->engine;
350 pipe = engine->pipe;
351
352 nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
353 pipe->fds[0], pipe->fds[1]);
354
355 nxt_fd_event_close(engine, &pipe->event);
356 nxt_pipe_close(pipe->event.task, pipe->fds);
357}
358
359
360static void
361nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
362{
363 uintptr_t signo;
364 const nxt_sig_event_t *sigev;
365
366 signo = (uintptr_t) obj;
367
368 for (sigev = task->thread->engine->signals->sigev;
369 sigev->signo != 0;
370 sigev++)
371 {
372 if (signo == (nxt_uint_t) sigev->signo) {
373 sigev->handler(task, (void *) signo, (void *) sigev->name);
374 return;
375 }
376 }
377
378 nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
378 nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found",
379 (nxt_uint_t) signo);
379}
380
381
382nxt_int_t
383nxt_event_engine_change(nxt_event_engine_t *engine,
384 const nxt_event_interface_t *interface, nxt_uint_t batch)
385{
386 nxt_uint_t events;
387
388 engine->batch = batch;
389
390 if (!engine->event.signal_support && interface->signal_support) {
391 /*
392 * Block signal processing if the current event
393 * facility does not support signal processing.
394 */
395 nxt_event_engine_signals_stop(engine);
396
397 /*
398 * Add to engine fast work queue the signal events possibly
399 * received before the blocking signal processing.
400 */
401 nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL);
402 }
403
404 if (engine->pipe != NULL && interface->enable_post != NULL) {
405 /*
406 * An engine pipe must be closed after all signal events
407 * added above to engine fast work queue will be processed.
408 */
409 nxt_work_queue_add(&engine->fast_work_queue,
410 nxt_event_engine_signal_pipe_close,
411 &engine->task, engine->pipe, NULL);
412
413 engine->pipe = NULL;
414 }
415
416 engine->event.free(engine);
417
418 events = (batch != 0) ? batch : 32;
419
420 if (interface->create(engine, 4 * events, events) != NXT_OK) {
421 return NXT_ERROR;
422 }
423
424 engine->event = *interface;
425
426 if (nxt_event_engine_post_init(engine) != NXT_OK) {
427 return NXT_ERROR;
428 }
429
430 if (engine->signals != NULL) {
431
432 if (!engine->event.signal_support) {
433 return nxt_event_engine_signals_start(engine);
434 }
435
436 /*
437 * Reset the PID flag to start the signal thread if
438 * some future event facility will not support signals.
439 */
440 engine->signals->process = 0;
441 }
442
443 return NXT_OK;
444}
445
446
447void
448nxt_event_engine_free(nxt_event_engine_t *engine)
449{
450 nxt_thread_log_debug("free engine %p", engine);
451
452 nxt_event_engine_signal_pipe_free(engine);
453 nxt_free(engine->signals);
454
455 nxt_work_queue_cache_destroy(&engine->work_queue_cache);
456
457 engine->event.free(engine);
458
459 /* TODO: free timers */
460
461 nxt_free(engine);
462}
463
464
465static nxt_work_handler_t
466nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
467 void **obj, void **data)
468{
469 nxt_work_queue_t *wq, *last;
470
471 wq = engine->current_work_queue;
472 last = wq;
473
474 if (wq->head == NULL) {
475 wq = &engine->fast_work_queue;
476
477 if (wq->head == NULL) {
478
479 do {
480 engine->current_work_queue++;
481 wq = engine->current_work_queue;
482
483 if (wq > &engine->close_work_queue) {
484 wq = &engine->fast_work_queue;
485 engine->current_work_queue = wq;
486 }
487
488 if (wq->head != NULL) {
489 goto found;
490 }
491
492 } while (wq != last);
493
494 engine->current_work_queue = &engine->fast_work_queue;
495
496 return NULL;
497 }
498 }
499
500found:
501
502 nxt_debug(&engine->task, "work queue: %s", wq->name);
503
504 return nxt_work_queue_pop(wq, task, obj, data);
505}
506
507
508void
509nxt_event_engine_start(nxt_event_engine_t *engine)
510{
511 void *obj, *data;
512 nxt_task_t *task;
513 nxt_msec_t timeout, now;
514 nxt_thread_t *thr;
515 nxt_work_handler_t handler;
516
517 thr = nxt_thread();
518
519 if (engine->fibers) {
520 /*
521 * _setjmp() cannot be wrapped in a function since return from
522 * the function clobbers stack used by future _setjmp() returns.
523 */
524 _setjmp(engine->fibers->fiber.jmp);
525
526 /* A return point from fibers. */
527 }
528
529 thr->log = engine->task.log;
530
531 for ( ;; ) {
532
533 for ( ;; ) {
534 handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
535
536 if (handler == NULL) {
537 break;
538 }
539
540 thr->task = task;
541
542 handler(task, obj, data);
543 }
544
545 /* Attach some event engine work queues in preferred order. */
546
547 timeout = nxt_timer_find(engine);
548
549 engine->event.poll(engine, timeout);
550
551 now = nxt_thread_monotonic_time(thr) / 1000000;
552
553 nxt_timer_expire(engine, now);
554 }
555}
556
557
558void *
559nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot,
560 size_t size)
561{
562 uint8_t n;
563 nxt_uint_t items;
564 nxt_array_t *mem_cache;
565 nxt_mem_cache_t *cache;
566 nxt_mem_cache_block_t *block;
567
568 mem_cache = engine->mem_cache;
569 n = *slot;
570
571 if (n == (uint8_t) -1) {
572
573 if (mem_cache == NULL) {
574 /* IPv4 nxt_sockaddr_t and HTTP/1 and HTTP/2 buffers. */
575 items = 3;
576#if (NXT_INET6)
577 items++;
578#endif
579#if (NXT_HAVE_UNIX_DOMAIN)
580 items++;
581#endif
582
583 mem_cache = nxt_array_create(engine->mem_pool, items,
584 sizeof(nxt_mem_cache_t));
585 if (nxt_slow_path(mem_cache == NULL)) {
586 return mem_cache;
587 }
588
589 engine->mem_cache = mem_cache;
590 }
591
592 cache = mem_cache->elts;
593 for (n = 0; n < mem_cache->nelts; n++) {
594 if (cache[n].size == size) {
595 goto found;
596 }
597 }
598
599 cache = nxt_array_add(mem_cache);
600 if (nxt_slow_path(cache == NULL)) {
601 return cache;
602 }
603
604 cache->free = NULL;
605 cache->size = size;
606 cache->count = 0;
607
608 found:
609
610 *slot = n;
611 }
612
613 cache = mem_cache->elts;
614 cache = cache + n;
615
616 block = cache->free;
617
618 if (block != NULL) {
619 cache->free = block->next;
620 cache->count--;
621 return block;
622 }
623
624 return nxt_mp_alloc(engine->mem_pool, size);
625}
626
627
628void
629nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, void *p)
630{
631 nxt_mem_cache_t *cache;
632 nxt_mem_cache_block_t *block;
633
634 block = p;
635
636 cache = engine->mem_cache->elts;
637 cache = cache + *slot;
638
639 if (cache->count < 16) {
640 cache->count++;
641 block->next = cache->free;
642 cache->free = block;
643
644 return;
645 }
646
647 nxt_mp_free(engine->mem_pool, p);
648}
649
650
651#if (NXT_DEBUG)
652
653void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine)
654{
655 nxt_work_queue_thread_adopt(&engine->fast_work_queue);
656 nxt_work_queue_thread_adopt(&engine->accept_work_queue);
657 nxt_work_queue_thread_adopt(&engine->read_work_queue);
658 nxt_work_queue_thread_adopt(&engine->socket_work_queue);
659 nxt_work_queue_thread_adopt(&engine->connect_work_queue);
660 nxt_work_queue_thread_adopt(&engine->write_work_queue);
661 nxt_work_queue_thread_adopt(&engine->shutdown_work_queue);
662 nxt_work_queue_thread_adopt(&engine->close_work_queue);
663}
664
665#endif
380}
381
382
383nxt_int_t
384nxt_event_engine_change(nxt_event_engine_t *engine,
385 const nxt_event_interface_t *interface, nxt_uint_t batch)
386{
387 nxt_uint_t events;
388
389 engine->batch = batch;
390
391 if (!engine->event.signal_support && interface->signal_support) {
392 /*
393 * Block signal processing if the current event
394 * facility does not support signal processing.
395 */
396 nxt_event_engine_signals_stop(engine);
397
398 /*
399 * Add to engine fast work queue the signal events possibly
400 * received before the blocking signal processing.
401 */
402 nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL);
403 }
404
405 if (engine->pipe != NULL && interface->enable_post != NULL) {
406 /*
407 * An engine pipe must be closed after all signal events
408 * added above to engine fast work queue will be processed.
409 */
410 nxt_work_queue_add(&engine->fast_work_queue,
411 nxt_event_engine_signal_pipe_close,
412 &engine->task, engine->pipe, NULL);
413
414 engine->pipe = NULL;
415 }
416
417 engine->event.free(engine);
418
419 events = (batch != 0) ? batch : 32;
420
421 if (interface->create(engine, 4 * events, events) != NXT_OK) {
422 return NXT_ERROR;
423 }
424
425 engine->event = *interface;
426
427 if (nxt_event_engine_post_init(engine) != NXT_OK) {
428 return NXT_ERROR;
429 }
430
431 if (engine->signals != NULL) {
432
433 if (!engine->event.signal_support) {
434 return nxt_event_engine_signals_start(engine);
435 }
436
437 /*
438 * Reset the PID flag to start the signal thread if
439 * some future event facility will not support signals.
440 */
441 engine->signals->process = 0;
442 }
443
444 return NXT_OK;
445}
446
447
448void
449nxt_event_engine_free(nxt_event_engine_t *engine)
450{
451 nxt_thread_log_debug("free engine %p", engine);
452
453 nxt_event_engine_signal_pipe_free(engine);
454 nxt_free(engine->signals);
455
456 nxt_work_queue_cache_destroy(&engine->work_queue_cache);
457
458 engine->event.free(engine);
459
460 /* TODO: free timers */
461
462 nxt_free(engine);
463}
464
465
466static nxt_work_handler_t
467nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
468 void **obj, void **data)
469{
470 nxt_work_queue_t *wq, *last;
471
472 wq = engine->current_work_queue;
473 last = wq;
474
475 if (wq->head == NULL) {
476 wq = &engine->fast_work_queue;
477
478 if (wq->head == NULL) {
479
480 do {
481 engine->current_work_queue++;
482 wq = engine->current_work_queue;
483
484 if (wq > &engine->close_work_queue) {
485 wq = &engine->fast_work_queue;
486 engine->current_work_queue = wq;
487 }
488
489 if (wq->head != NULL) {
490 goto found;
491 }
492
493 } while (wq != last);
494
495 engine->current_work_queue = &engine->fast_work_queue;
496
497 return NULL;
498 }
499 }
500
501found:
502
503 nxt_debug(&engine->task, "work queue: %s", wq->name);
504
505 return nxt_work_queue_pop(wq, task, obj, data);
506}
507
508
509void
510nxt_event_engine_start(nxt_event_engine_t *engine)
511{
512 void *obj, *data;
513 nxt_task_t *task;
514 nxt_msec_t timeout, now;
515 nxt_thread_t *thr;
516 nxt_work_handler_t handler;
517
518 thr = nxt_thread();
519
520 if (engine->fibers) {
521 /*
522 * _setjmp() cannot be wrapped in a function since return from
523 * the function clobbers stack used by future _setjmp() returns.
524 */
525 _setjmp(engine->fibers->fiber.jmp);
526
527 /* A return point from fibers. */
528 }
529
530 thr->log = engine->task.log;
531
532 for ( ;; ) {
533
534 for ( ;; ) {
535 handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
536
537 if (handler == NULL) {
538 break;
539 }
540
541 thr->task = task;
542
543 handler(task, obj, data);
544 }
545
546 /* Attach some event engine work queues in preferred order. */
547
548 timeout = nxt_timer_find(engine);
549
550 engine->event.poll(engine, timeout);
551
552 now = nxt_thread_monotonic_time(thr) / 1000000;
553
554 nxt_timer_expire(engine, now);
555 }
556}
557
558
559void *
560nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot,
561 size_t size)
562{
563 uint8_t n;
564 nxt_uint_t items;
565 nxt_array_t *mem_cache;
566 nxt_mem_cache_t *cache;
567 nxt_mem_cache_block_t *block;
568
569 mem_cache = engine->mem_cache;
570 n = *slot;
571
572 if (n == (uint8_t) -1) {
573
574 if (mem_cache == NULL) {
575 /* IPv4 nxt_sockaddr_t and HTTP/1 and HTTP/2 buffers. */
576 items = 3;
577#if (NXT_INET6)
578 items++;
579#endif
580#if (NXT_HAVE_UNIX_DOMAIN)
581 items++;
582#endif
583
584 mem_cache = nxt_array_create(engine->mem_pool, items,
585 sizeof(nxt_mem_cache_t));
586 if (nxt_slow_path(mem_cache == NULL)) {
587 return mem_cache;
588 }
589
590 engine->mem_cache = mem_cache;
591 }
592
593 cache = mem_cache->elts;
594 for (n = 0; n < mem_cache->nelts; n++) {
595 if (cache[n].size == size) {
596 goto found;
597 }
598 }
599
600 cache = nxt_array_add(mem_cache);
601 if (nxt_slow_path(cache == NULL)) {
602 return cache;
603 }
604
605 cache->free = NULL;
606 cache->size = size;
607 cache->count = 0;
608
609 found:
610
611 *slot = n;
612 }
613
614 cache = mem_cache->elts;
615 cache = cache + n;
616
617 block = cache->free;
618
619 if (block != NULL) {
620 cache->free = block->next;
621 cache->count--;
622 return block;
623 }
624
625 return nxt_mp_alloc(engine->mem_pool, size);
626}
627
628
629void
630nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, void *p)
631{
632 nxt_mem_cache_t *cache;
633 nxt_mem_cache_block_t *block;
634
635 block = p;
636
637 cache = engine->mem_cache->elts;
638 cache = cache + *slot;
639
640 if (cache->count < 16) {
641 cache->count++;
642 block->next = cache->free;
643 cache->free = block;
644
645 return;
646 }
647
648 nxt_mp_free(engine->mem_pool, p);
649}
650
651
652#if (NXT_DEBUG)
653
654void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine)
655{
656 nxt_work_queue_thread_adopt(&engine->fast_work_queue);
657 nxt_work_queue_thread_adopt(&engine->accept_work_queue);
658 nxt_work_queue_thread_adopt(&engine->read_work_queue);
659 nxt_work_queue_thread_adopt(&engine->socket_work_queue);
660 nxt_work_queue_thread_adopt(&engine->connect_work_queue);
661 nxt_work_queue_thread_adopt(&engine->write_work_queue);
662 nxt_work_queue_thread_adopt(&engine->shutdown_work_queue);
663 nxt_work_queue_thread_adopt(&engine->close_work_queue);
664}
665
666#endif