nxt_event_engine.c (12:477899a6661b) nxt_event_engine.c (13:3a52b2c3d3f1)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 30 unchanged lines hidden (view full) ---

39
40 engine->task.thread = thr;
41 engine->task.log = thr->log;
42 engine->task.ident = nxt_task_next_ident();
43
44 thr->engine = engine;
45 thr->fiber = &engine->fibers->fiber;
46
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 30 unchanged lines hidden (view full) ---

39
40 engine->task.thread = thr;
41 engine->task.log = thr->log;
42 engine->task.ident = nxt_task_next_ident();
43
44 thr->engine = engine;
45 thr->fiber = &engine->fibers->fiber;
46
47 engine->batch = batch;
47 engine->batch0 = batch;
48
49 if (flags & NXT_ENGINE_FIBERS) {
50 engine->fibers = nxt_fiber_main_create(engine);
51 if (engine->fibers == NULL) {
52 goto fibers_fail;
53 }
54 }
55

--- 129 unchanged lines hidden (view full) ---

185
186 engine->pipe = pipe;
187
188 /*
189 * An event engine pipe is in blocking mode for writer
190 * and in non-blocking node for reader.
191 */
192
48
49 if (flags & NXT_ENGINE_FIBERS) {
50 engine->fibers = nxt_fiber_main_create(engine);
51 if (engine->fibers == NULL) {
52 goto fibers_fail;
53 }
54 }
55

--- 129 unchanged lines hidden (view full) ---

185
186 engine->pipe = pipe;
187
188 /*
189 * An event engine pipe is in blocking mode for writer
190 * and in non-blocking node for reader.
191 */
192
193 if (nxt_pipe_create(pipe->fds, 1, 0) != NXT_OK) {
193 if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) {
194 nxt_free(pipe);
195 return NXT_ERROR;
196 }
197
198 pipe->event.fd = pipe->fds[0];
199 pipe->event.task = &engine->task;
200 pipe->event.read_work_queue = &engine->fast_work_queue;
201 pipe->event.read_handler = nxt_event_engine_signal_pipe;

--- 13 unchanged lines hidden (view full) ---

215 nxt_event_engine_pipe_t *pipe;
216
217 pipe = engine->pipe;
218
219 if (pipe != NULL) {
220
221 if (pipe->event.read_work_queue != NULL) {
222 nxt_fd_event_close(engine, &pipe->event);
194 nxt_free(pipe);
195 return NXT_ERROR;
196 }
197
198 pipe->event.fd = pipe->fds[0];
199 pipe->event.task = &engine->task;
200 pipe->event.read_work_queue = &engine->fast_work_queue;
201 pipe->event.read_handler = nxt_event_engine_signal_pipe;

--- 13 unchanged lines hidden (view full) ---

215 nxt_event_engine_pipe_t *pipe;
216
217 pipe = engine->pipe;
218
219 if (pipe != NULL) {
220
221 if (pipe->event.read_work_queue != NULL) {
222 nxt_fd_event_close(engine, &pipe->event);
223 nxt_pipe_close(pipe->fds);
223 nxt_pipe_close(pipe->event.task, pipe->fds);
224 }
225
226 nxt_free(pipe);
227 }
228}
229
230
231static void
232nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
233{
234 nxt_event_engine_pipe_t *pipe;
235
236 pipe = obj;
237
224 }
225
226 nxt_free(pipe);
227 }
228}
229
230
231static void
232nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
233{
234 nxt_event_engine_pipe_t *pipe;
235
236 pipe = obj;
237
238 nxt_pipe_close(pipe->fds);
238 nxt_pipe_close(pipe->event.task, pipe->fds);
239 nxt_free(pipe);
240}
241
242
243void
244nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
245{
246 nxt_debug(&engine->task, "event engine post");

--- 79 unchanged lines hidden (view full) ---

326 nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
327 &engine->fast_work_queue);
328}
329
330
331static void
332nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
333{
239 nxt_free(pipe);
240}
241
242
243void
244nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
245{
246 nxt_debug(&engine->task, "event engine post");

--- 79 unchanged lines hidden (view full) ---

326 nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
327 &engine->fast_work_queue);
328}
329
330
331static void
332nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
333{
334 nxt_event_engine_t *engine;
334 nxt_event_engine_t *engine;
335 nxt_event_engine_pipe_t *pipe;
335
336 engine = task->thread->engine;
336
337 engine = task->thread->engine;
338 pipe = engine->pipe;
337
338 nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
339
340 nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
339 engine->pipe->fds[0], engine->pipe->fds[1]);
341 pipe->fds[0], pipe->fds[1]);
340
342
341 nxt_fd_event_close(engine, &engine->pipe->event);
342 nxt_pipe_close(engine->pipe->fds);
343 nxt_fd_event_close(engine, &pipe->event);
344 nxt_pipe_close(pipe->event.task, pipe->fds);
343}
344
345
346static void
347nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
348{
349 uintptr_t signo;
350 const nxt_sig_event_t *sigev;

--- 17 unchanged lines hidden (view full) ---

368nxt_int_t
369nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
370 const nxt_event_interface_t *interface, nxt_uint_t batch)
371{
372 nxt_uint_t events;
373 nxt_event_engine_t *engine;
374
375 engine = thr->engine;
345}
346
347
348static void
349nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
350{
351 uintptr_t signo;
352 const nxt_sig_event_t *sigev;

--- 17 unchanged lines hidden (view full) ---

370nxt_int_t
371nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
372 const nxt_event_interface_t *interface, nxt_uint_t batch)
373{
374 nxt_uint_t events;
375 nxt_event_engine_t *engine;
376
377 engine = thr->engine;
376 engine->batch = batch;
378 engine->batch0 = batch;
377
378 if (!engine->event.signal_support && interface->signal_support) {
379 /*
380 * Block signal processing if the current event
381 * facility does not support signal processing.
382 */
383 nxt_event_engine_signals_stop(engine);
384

--- 157 unchanged lines hidden ---
379
380 if (!engine->event.signal_support && interface->signal_support) {
381 /*
382 * Block signal processing if the current event
383 * facility does not support signal processing.
384 */
385 nxt_event_engine_signals_stop(engine);
386

--- 157 unchanged lines hidden ---