1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10typedef struct nxt_mem_cache_block_s nxt_mem_cache_block_t; 11 12struct nxt_mem_cache_block_s { 13 nxt_mem_cache_block_t *next; 14}; 15 16 17typedef struct { 18 nxt_mem_cache_block_t *free; 19 uint32_t size; 20 uint32_t count; 21} nxt_mem_cache_t; 22 23 24static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine); 25static nxt_int_t nxt_event_engine_signal_pipe_create( 26 nxt_event_engine_t *engine); 27static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, 28 void *data); 29static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, 30 void *data); 31static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj, 32 void *data); 33static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, 34 void *data); 35static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, 36 void *data); 37static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine, 38 nxt_task_t **task, void **obj, void **data); 39 40 41nxt_event_engine_t * 42nxt_event_engine_create(nxt_task_t *task, 43 const nxt_event_interface_t *interface, const nxt_sig_event_t *signals, 44 nxt_uint_t flags, nxt_uint_t batch) 45{ 46 nxt_uint_t events; 47 nxt_thread_t *thread; 48 nxt_event_engine_t *engine; 49 50 engine = nxt_zalloc(sizeof(nxt_event_engine_t)); 51 if (engine == NULL) { 52 return NULL; 53 } 54 55 nxt_debug(task, "create engine %p", engine); 56 57 thread = task->thread; 58 59 engine->task.thread = thread; 60 engine->task.log = thread->log; 61 engine->task.ident = nxt_task_next_ident(); 62 63 engine->batch = batch; 64 65#if 0 66 if (flags & NXT_ENGINE_FIBERS) { 67 engine->fibers = nxt_fiber_main_create(engine); 68 if (engine->fibers == NULL) { 69 goto fibers_fail; 70 } 71 } 72#endif 73 74 engine->current_work_queue = &engine->fast_work_queue; 75 76 nxt_work_queue_cache_create(&engine->work_queue_cache, 0); 77 78 engine->fast_work_queue.cache = &engine->work_queue_cache; 79 engine->accept_work_queue.cache = &engine->work_queue_cache; 80 engine->read_work_queue.cache = &engine->work_queue_cache; 81 engine->socket_work_queue.cache = &engine->work_queue_cache; 82 engine->connect_work_queue.cache = &engine->work_queue_cache; 83 engine->write_work_queue.cache = &engine->work_queue_cache; 84 engine->shutdown_work_queue.cache = &engine->work_queue_cache; 85 engine->close_work_queue.cache = &engine->work_queue_cache; 86 87 nxt_work_queue_name(&engine->fast_work_queue, "fast"); 88 nxt_work_queue_name(&engine->accept_work_queue, "accept"); 89 nxt_work_queue_name(&engine->read_work_queue, "read"); 90 nxt_work_queue_name(&engine->socket_work_queue, "socket"); 91 nxt_work_queue_name(&engine->connect_work_queue, "connect"); 92 nxt_work_queue_name(&engine->write_work_queue, "write"); 93 nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); 94 nxt_work_queue_name(&engine->close_work_queue, "close"); 95 96 if (signals != NULL) { 97 engine->signals = nxt_event_engine_signals(signals); 98 if (engine->signals == NULL) { 99 goto signals_fail; 100 } 101 102 engine->signals->handler = nxt_event_engine_signal_handler; 103 104 if (!interface->signal_support) { 105 if (nxt_event_engine_signals_start(engine) != NXT_OK) { 106 goto signals_fail; 107 } 108 } 109 } 110 111 /* 112 * Number of event set and timers changes should be at least twice 113 * more than number of events to avoid premature flushes of the changes. 114 * Fourfold is for sure. 115 */ 116 events = (batch != 0) ? batch : 32; 117 118 if (interface->create(engine, 4 * events, events) != NXT_OK) { 119 goto event_set_fail; 120 } 121 122 engine->event = *interface; 123 124 if (nxt_event_engine_post_init(engine) != NXT_OK) { 125 goto post_fail; 126 } 127 128 if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) { 129 goto timers_fail; 130 } 131 132 thread = task->thread; 133 134 nxt_thread_time_update(thread); 135 engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000; 136 137 engine->max_connections = 0xffffffff; 138 139 nxt_queue_init(&engine->joints); 140 nxt_queue_init(&engine->listen_connections); 141 nxt_queue_init(&engine->idle_connections); 142 143 return engine; 144 145timers_fail: 146post_fail: 147 148 interface->free(engine); 149 150event_set_fail: 151signals_fail: 152 153 nxt_free(engine->signals); 154 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 155 nxt_free(engine->fibers); 156 157#if 0 158fibers_fail: 159 160 nxt_free(engine); 161#endif 162 163 return NULL; 164} 165 166 167static nxt_int_t 168nxt_event_engine_post_init(nxt_event_engine_t *engine) 169{ 170 if (engine->event.enable_post != NULL) { 171 return engine->event.enable_post(engine, nxt_event_engine_post_handler); 172 } 173 174 if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) { 175 return NXT_ERROR; 176 } 177 178 return NXT_OK; 179} 180 181 182static nxt_int_t 183nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) 184{ 185 nxt_event_engine_pipe_t *pipe; 186 187 pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t)); 188 if (pipe == NULL) { 189 return NXT_ERROR; 190 } 191 192 engine->pipe = pipe; 193 194 /* 195 * An event engine pipe is in blocking mode for writer 196 * and in non-blocking node for reader. 197 */ 198 199 if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) { 200 nxt_free(pipe); 201 return NXT_ERROR; 202 } 203 204 pipe->event.fd = pipe->fds[0]; 205 pipe->event.task = &engine->task; 206 pipe->event.read_work_queue = &engine->fast_work_queue; 207 pipe->event.read_handler = nxt_event_engine_signal_pipe; 208 pipe->event.write_work_queue = &engine->fast_work_queue; 209 pipe->event.error_handler = nxt_event_engine_signal_pipe_error; 210 pipe->event.log = engine->task.log; 211 212 nxt_fd_event_enable_read(engine, &pipe->event); 213 214 return NXT_OK; 215} 216 217 218static void 219nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) 220{ 221 nxt_event_engine_pipe_t *pipe; 222 223 pipe = engine->pipe; 224 225 if (pipe != NULL) { 226 227 if (pipe->event.read_work_queue != NULL) { 228 nxt_fd_event_close(engine, &pipe->event); 229 nxt_pipe_close(pipe->event.task, pipe->fds); 230 } 231 232 nxt_free(pipe); 233 } 234} 235 236 237static void 238nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) 239{ 240 nxt_event_engine_pipe_t *pipe; 241 242 pipe = obj; 243 244 nxt_pipe_close(pipe->event.task, pipe->fds); 245 nxt_free(pipe); 246} 247 248 249void 250nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work) 251{ 252 nxt_debug(&engine->task, "event engine post"); 253 254#if (NXT_DEBUG) 255 if (nxt_slow_path(work->next != NULL)) { 256 nxt_debug(&engine->task, "event engine post multiple works"); 257 } 258#endif 259 260 nxt_locked_work_queue_add(&engine->locked_work_queue, work); 261 262 nxt_event_engine_signal(engine, 0); 263} 264 265 266void 267nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 268{ 269 u_char buf; 270 271 nxt_debug(&engine->task, "event engine signal:%ui", signo); 272 273 /* 274 * A signal number may be sent in a signal context, so the signal 275 * information cannot be passed via a locked work queue. 276 */ 277 278 if (engine->event.signal != NULL) { 279 engine->event.signal(engine, signo); 280 return; 281 } 282 283 buf = (u_char) signo; 284 (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1); 285} 286 287 288static void 289nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) 290{ 291 int i, n; 292 u_char signo; 293 nxt_bool_t post; 294 nxt_fd_event_t *ev; 295 u_char buf[128]; 296 297 ev = obj; 298 299 nxt_debug(task, "engine signal pipe"); 300 301 post = 0; 302 303 do { 304 n = nxt_fd_read(ev->fd, buf, sizeof(buf)); 305 306 for (i = 0; i < n; i++) { 307 signo = buf[i]; 308 309 nxt_debug(task, "engine pipe signo:%d", signo); 310 311 if (signo == 0) { 312 /* A post should be processed only once. */ 313 post = 1; 314 315 } else { 316 nxt_event_engine_signal_handler(task, 317 (void *) (uintptr_t) signo, NULL); 318 } 319 } 320 321 } while (n == sizeof(buf)); 322 323 if (post) { 324 nxt_event_engine_post_handler(task, NULL, NULL); 325 } 326} 327 328 329static void 330nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data) 331{ 332 nxt_thread_t *thread; 333 nxt_event_engine_t *engine; 334 335 thread = task->thread; 336 engine = thread->engine; 337 338 nxt_locked_work_queue_move(thread, &engine->locked_work_queue, 339 &engine->fast_work_queue); 340} 341 342 343static void 344nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data) 345{ 346 nxt_event_engine_t *engine; 347 nxt_event_engine_pipe_t *pipe; 348 349 engine = task->thread->engine; 350 pipe = engine->pipe; 351 352 nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error", 353 pipe->fds[0], pipe->fds[1]); 354 355 nxt_fd_event_close(engine, &pipe->event); 356 nxt_pipe_close(pipe->event.task, pipe->fds); 357} 358 359 360static void 361nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) 362{ 363 uintptr_t signo; 364 const nxt_sig_event_t *sigev; 365 366 signo = (uintptr_t) obj; 367 368 for (sigev = task->thread->engine->signals->sigev; 369 sigev->signo != 0; 370 sigev++) 371 { 372 if (signo == (nxt_uint_t) sigev->signo) { 373 sigev->handler(task, (void *) signo, (void *) sigev->name); 374 return; 375 } 376 } 377
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10typedef struct nxt_mem_cache_block_s nxt_mem_cache_block_t; 11 12struct nxt_mem_cache_block_s { 13 nxt_mem_cache_block_t *next; 14}; 15 16 17typedef struct { 18 nxt_mem_cache_block_t *free; 19 uint32_t size; 20 uint32_t count; 21} nxt_mem_cache_t; 22 23 24static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine); 25static nxt_int_t nxt_event_engine_signal_pipe_create( 26 nxt_event_engine_t *engine); 27static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, 28 void *data); 29static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, 30 void *data); 31static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj, 32 void *data); 33static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, 34 void *data); 35static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, 36 void *data); 37static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine, 38 nxt_task_t **task, void **obj, void **data); 39 40 41nxt_event_engine_t * 42nxt_event_engine_create(nxt_task_t *task, 43 const nxt_event_interface_t *interface, const nxt_sig_event_t *signals, 44 nxt_uint_t flags, nxt_uint_t batch) 45{ 46 nxt_uint_t events; 47 nxt_thread_t *thread; 48 nxt_event_engine_t *engine; 49 50 engine = nxt_zalloc(sizeof(nxt_event_engine_t)); 51 if (engine == NULL) { 52 return NULL; 53 } 54 55 nxt_debug(task, "create engine %p", engine); 56 57 thread = task->thread; 58 59 engine->task.thread = thread; 60 engine->task.log = thread->log; 61 engine->task.ident = nxt_task_next_ident(); 62 63 engine->batch = batch; 64 65#if 0 66 if (flags & NXT_ENGINE_FIBERS) { 67 engine->fibers = nxt_fiber_main_create(engine); 68 if (engine->fibers == NULL) { 69 goto fibers_fail; 70 } 71 } 72#endif 73 74 engine->current_work_queue = &engine->fast_work_queue; 75 76 nxt_work_queue_cache_create(&engine->work_queue_cache, 0); 77 78 engine->fast_work_queue.cache = &engine->work_queue_cache; 79 engine->accept_work_queue.cache = &engine->work_queue_cache; 80 engine->read_work_queue.cache = &engine->work_queue_cache; 81 engine->socket_work_queue.cache = &engine->work_queue_cache; 82 engine->connect_work_queue.cache = &engine->work_queue_cache; 83 engine->write_work_queue.cache = &engine->work_queue_cache; 84 engine->shutdown_work_queue.cache = &engine->work_queue_cache; 85 engine->close_work_queue.cache = &engine->work_queue_cache; 86 87 nxt_work_queue_name(&engine->fast_work_queue, "fast"); 88 nxt_work_queue_name(&engine->accept_work_queue, "accept"); 89 nxt_work_queue_name(&engine->read_work_queue, "read"); 90 nxt_work_queue_name(&engine->socket_work_queue, "socket"); 91 nxt_work_queue_name(&engine->connect_work_queue, "connect"); 92 nxt_work_queue_name(&engine->write_work_queue, "write"); 93 nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); 94 nxt_work_queue_name(&engine->close_work_queue, "close"); 95 96 if (signals != NULL) { 97 engine->signals = nxt_event_engine_signals(signals); 98 if (engine->signals == NULL) { 99 goto signals_fail; 100 } 101 102 engine->signals->handler = nxt_event_engine_signal_handler; 103 104 if (!interface->signal_support) { 105 if (nxt_event_engine_signals_start(engine) != NXT_OK) { 106 goto signals_fail; 107 } 108 } 109 } 110 111 /* 112 * Number of event set and timers changes should be at least twice 113 * more than number of events to avoid premature flushes of the changes. 114 * Fourfold is for sure. 115 */ 116 events = (batch != 0) ? batch : 32; 117 118 if (interface->create(engine, 4 * events, events) != NXT_OK) { 119 goto event_set_fail; 120 } 121 122 engine->event = *interface; 123 124 if (nxt_event_engine_post_init(engine) != NXT_OK) { 125 goto post_fail; 126 } 127 128 if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) { 129 goto timers_fail; 130 } 131 132 thread = task->thread; 133 134 nxt_thread_time_update(thread); 135 engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000; 136 137 engine->max_connections = 0xffffffff; 138 139 nxt_queue_init(&engine->joints); 140 nxt_queue_init(&engine->listen_connections); 141 nxt_queue_init(&engine->idle_connections); 142 143 return engine; 144 145timers_fail: 146post_fail: 147 148 interface->free(engine); 149 150event_set_fail: 151signals_fail: 152 153 nxt_free(engine->signals); 154 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 155 nxt_free(engine->fibers); 156 157#if 0 158fibers_fail: 159 160 nxt_free(engine); 161#endif 162 163 return NULL; 164} 165 166 167static nxt_int_t 168nxt_event_engine_post_init(nxt_event_engine_t *engine) 169{ 170 if (engine->event.enable_post != NULL) { 171 return engine->event.enable_post(engine, nxt_event_engine_post_handler); 172 } 173 174 if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) { 175 return NXT_ERROR; 176 } 177 178 return NXT_OK; 179} 180 181 182static nxt_int_t 183nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) 184{ 185 nxt_event_engine_pipe_t *pipe; 186 187 pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t)); 188 if (pipe == NULL) { 189 return NXT_ERROR; 190 } 191 192 engine->pipe = pipe; 193 194 /* 195 * An event engine pipe is in blocking mode for writer 196 * and in non-blocking node for reader. 197 */ 198 199 if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) { 200 nxt_free(pipe); 201 return NXT_ERROR; 202 } 203 204 pipe->event.fd = pipe->fds[0]; 205 pipe->event.task = &engine->task; 206 pipe->event.read_work_queue = &engine->fast_work_queue; 207 pipe->event.read_handler = nxt_event_engine_signal_pipe; 208 pipe->event.write_work_queue = &engine->fast_work_queue; 209 pipe->event.error_handler = nxt_event_engine_signal_pipe_error; 210 pipe->event.log = engine->task.log; 211 212 nxt_fd_event_enable_read(engine, &pipe->event); 213 214 return NXT_OK; 215} 216 217 218static void 219nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) 220{ 221 nxt_event_engine_pipe_t *pipe; 222 223 pipe = engine->pipe; 224 225 if (pipe != NULL) { 226 227 if (pipe->event.read_work_queue != NULL) { 228 nxt_fd_event_close(engine, &pipe->event); 229 nxt_pipe_close(pipe->event.task, pipe->fds); 230 } 231 232 nxt_free(pipe); 233 } 234} 235 236 237static void 238nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) 239{ 240 nxt_event_engine_pipe_t *pipe; 241 242 pipe = obj; 243 244 nxt_pipe_close(pipe->event.task, pipe->fds); 245 nxt_free(pipe); 246} 247 248 249void 250nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work) 251{ 252 nxt_debug(&engine->task, "event engine post"); 253 254#if (NXT_DEBUG) 255 if (nxt_slow_path(work->next != NULL)) { 256 nxt_debug(&engine->task, "event engine post multiple works"); 257 } 258#endif 259 260 nxt_locked_work_queue_add(&engine->locked_work_queue, work); 261 262 nxt_event_engine_signal(engine, 0); 263} 264 265 266void 267nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) 268{ 269 u_char buf; 270 271 nxt_debug(&engine->task, "event engine signal:%ui", signo); 272 273 /* 274 * A signal number may be sent in a signal context, so the signal 275 * information cannot be passed via a locked work queue. 276 */ 277 278 if (engine->event.signal != NULL) { 279 engine->event.signal(engine, signo); 280 return; 281 } 282 283 buf = (u_char) signo; 284 (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1); 285} 286 287 288static void 289nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) 290{ 291 int i, n; 292 u_char signo; 293 nxt_bool_t post; 294 nxt_fd_event_t *ev; 295 u_char buf[128]; 296 297 ev = obj; 298 299 nxt_debug(task, "engine signal pipe"); 300 301 post = 0; 302 303 do { 304 n = nxt_fd_read(ev->fd, buf, sizeof(buf)); 305 306 for (i = 0; i < n; i++) { 307 signo = buf[i]; 308 309 nxt_debug(task, "engine pipe signo:%d", signo); 310 311 if (signo == 0) { 312 /* A post should be processed only once. */ 313 post = 1; 314 315 } else { 316 nxt_event_engine_signal_handler(task, 317 (void *) (uintptr_t) signo, NULL); 318 } 319 } 320 321 } while (n == sizeof(buf)); 322 323 if (post) { 324 nxt_event_engine_post_handler(task, NULL, NULL); 325 } 326} 327 328 329static void 330nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data) 331{ 332 nxt_thread_t *thread; 333 nxt_event_engine_t *engine; 334 335 thread = task->thread; 336 engine = thread->engine; 337 338 nxt_locked_work_queue_move(thread, &engine->locked_work_queue, 339 &engine->fast_work_queue); 340} 341 342 343static void 344nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data) 345{ 346 nxt_event_engine_t *engine; 347 nxt_event_engine_pipe_t *pipe; 348 349 engine = task->thread->engine; 350 pipe = engine->pipe; 351 352 nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error", 353 pipe->fds[0], pipe->fds[1]); 354 355 nxt_fd_event_close(engine, &pipe->event); 356 nxt_pipe_close(pipe->event.task, pipe->fds); 357} 358 359 360static void 361nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) 362{ 363 uintptr_t signo; 364 const nxt_sig_event_t *sigev; 365 366 signo = (uintptr_t) obj; 367 368 for (sigev = task->thread->engine->signals->sigev; 369 sigev->signo != 0; 370 sigev++) 371 { 372 if (signo == (nxt_uint_t) sigev->signo) { 373 sigev->handler(task, (void *) signo, (void *) sigev->name); 374 return; 375 } 376 } 377
|
379} 380 381 382nxt_int_t 383nxt_event_engine_change(nxt_event_engine_t *engine, 384 const nxt_event_interface_t *interface, nxt_uint_t batch) 385{ 386 nxt_uint_t events; 387 388 engine->batch = batch; 389 390 if (!engine->event.signal_support && interface->signal_support) { 391 /* 392 * Block signal processing if the current event 393 * facility does not support signal processing. 394 */ 395 nxt_event_engine_signals_stop(engine); 396 397 /* 398 * Add to engine fast work queue the signal events possibly 399 * received before the blocking signal processing. 400 */ 401 nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL); 402 } 403 404 if (engine->pipe != NULL && interface->enable_post != NULL) { 405 /* 406 * An engine pipe must be closed after all signal events 407 * added above to engine fast work queue will be processed. 408 */ 409 nxt_work_queue_add(&engine->fast_work_queue, 410 nxt_event_engine_signal_pipe_close, 411 &engine->task, engine->pipe, NULL); 412 413 engine->pipe = NULL; 414 } 415 416 engine->event.free(engine); 417 418 events = (batch != 0) ? batch : 32; 419 420 if (interface->create(engine, 4 * events, events) != NXT_OK) { 421 return NXT_ERROR; 422 } 423 424 engine->event = *interface; 425 426 if (nxt_event_engine_post_init(engine) != NXT_OK) { 427 return NXT_ERROR; 428 } 429 430 if (engine->signals != NULL) { 431 432 if (!engine->event.signal_support) { 433 return nxt_event_engine_signals_start(engine); 434 } 435 436 /* 437 * Reset the PID flag to start the signal thread if 438 * some future event facility will not support signals. 439 */ 440 engine->signals->process = 0; 441 } 442 443 return NXT_OK; 444} 445 446 447void 448nxt_event_engine_free(nxt_event_engine_t *engine) 449{ 450 nxt_thread_log_debug("free engine %p", engine); 451 452 nxt_event_engine_signal_pipe_free(engine); 453 nxt_free(engine->signals); 454 455 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 456 457 engine->event.free(engine); 458 459 /* TODO: free timers */ 460 461 nxt_free(engine); 462} 463 464 465static nxt_work_handler_t 466nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task, 467 void **obj, void **data) 468{ 469 nxt_work_queue_t *wq, *last; 470 471 wq = engine->current_work_queue; 472 last = wq; 473 474 if (wq->head == NULL) { 475 wq = &engine->fast_work_queue; 476 477 if (wq->head == NULL) { 478 479 do { 480 engine->current_work_queue++; 481 wq = engine->current_work_queue; 482 483 if (wq > &engine->close_work_queue) { 484 wq = &engine->fast_work_queue; 485 engine->current_work_queue = wq; 486 } 487 488 if (wq->head != NULL) { 489 goto found; 490 } 491 492 } while (wq != last); 493 494 engine->current_work_queue = &engine->fast_work_queue; 495 496 return NULL; 497 } 498 } 499 500found: 501 502 nxt_debug(&engine->task, "work queue: %s", wq->name); 503 504 return nxt_work_queue_pop(wq, task, obj, data); 505} 506 507 508void 509nxt_event_engine_start(nxt_event_engine_t *engine) 510{ 511 void *obj, *data; 512 nxt_task_t *task; 513 nxt_msec_t timeout, now; 514 nxt_thread_t *thr; 515 nxt_work_handler_t handler; 516 517 thr = nxt_thread(); 518 519 if (engine->fibers) { 520 /* 521 * _setjmp() cannot be wrapped in a function since return from 522 * the function clobbers stack used by future _setjmp() returns. 523 */ 524 _setjmp(engine->fibers->fiber.jmp); 525 526 /* A return point from fibers. */ 527 } 528 529 thr->log = engine->task.log; 530 531 for ( ;; ) { 532 533 for ( ;; ) { 534 handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data); 535 536 if (handler == NULL) { 537 break; 538 } 539 540 thr->task = task; 541 542 handler(task, obj, data); 543 } 544 545 /* Attach some event engine work queues in preferred order. */ 546 547 timeout = nxt_timer_find(engine); 548 549 engine->event.poll(engine, timeout); 550 551 now = nxt_thread_monotonic_time(thr) / 1000000; 552 553 nxt_timer_expire(engine, now); 554 } 555} 556 557 558void * 559nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot, 560 size_t size) 561{ 562 uint8_t n; 563 nxt_uint_t items; 564 nxt_array_t *mem_cache; 565 nxt_mem_cache_t *cache; 566 nxt_mem_cache_block_t *block; 567 568 mem_cache = engine->mem_cache; 569 n = *slot; 570 571 if (n == (uint8_t) -1) { 572 573 if (mem_cache == NULL) { 574 /* IPv4 nxt_sockaddr_t and HTTP/1 and HTTP/2 buffers. */ 575 items = 3; 576#if (NXT_INET6) 577 items++; 578#endif 579#if (NXT_HAVE_UNIX_DOMAIN) 580 items++; 581#endif 582 583 mem_cache = nxt_array_create(engine->mem_pool, items, 584 sizeof(nxt_mem_cache_t)); 585 if (nxt_slow_path(mem_cache == NULL)) { 586 return mem_cache; 587 } 588 589 engine->mem_cache = mem_cache; 590 } 591 592 cache = mem_cache->elts; 593 for (n = 0; n < mem_cache->nelts; n++) { 594 if (cache[n].size == size) { 595 goto found; 596 } 597 } 598 599 cache = nxt_array_add(mem_cache); 600 if (nxt_slow_path(cache == NULL)) { 601 return cache; 602 } 603 604 cache->free = NULL; 605 cache->size = size; 606 cache->count = 0; 607 608 found: 609 610 *slot = n; 611 } 612 613 cache = mem_cache->elts; 614 cache = cache + n; 615 616 block = cache->free; 617 618 if (block != NULL) { 619 cache->free = block->next; 620 cache->count--; 621 return block; 622 } 623 624 return nxt_mp_alloc(engine->mem_pool, size); 625} 626 627 628void 629nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, void *p) 630{ 631 nxt_mem_cache_t *cache; 632 nxt_mem_cache_block_t *block; 633 634 block = p; 635 636 cache = engine->mem_cache->elts; 637 cache = cache + *slot; 638 639 if (cache->count < 16) { 640 cache->count++; 641 block->next = cache->free; 642 cache->free = block; 643 644 return; 645 } 646 647 nxt_mp_free(engine->mem_pool, p); 648} 649 650 651#if (NXT_DEBUG) 652 653void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine) 654{ 655 nxt_work_queue_thread_adopt(&engine->fast_work_queue); 656 nxt_work_queue_thread_adopt(&engine->accept_work_queue); 657 nxt_work_queue_thread_adopt(&engine->read_work_queue); 658 nxt_work_queue_thread_adopt(&engine->socket_work_queue); 659 nxt_work_queue_thread_adopt(&engine->connect_work_queue); 660 nxt_work_queue_thread_adopt(&engine->write_work_queue); 661 nxt_work_queue_thread_adopt(&engine->shutdown_work_queue); 662 nxt_work_queue_thread_adopt(&engine->close_work_queue); 663} 664 665#endif
| 380} 381 382 383nxt_int_t 384nxt_event_engine_change(nxt_event_engine_t *engine, 385 const nxt_event_interface_t *interface, nxt_uint_t batch) 386{ 387 nxt_uint_t events; 388 389 engine->batch = batch; 390 391 if (!engine->event.signal_support && interface->signal_support) { 392 /* 393 * Block signal processing if the current event 394 * facility does not support signal processing. 395 */ 396 nxt_event_engine_signals_stop(engine); 397 398 /* 399 * Add to engine fast work queue the signal events possibly 400 * received before the blocking signal processing. 401 */ 402 nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL); 403 } 404 405 if (engine->pipe != NULL && interface->enable_post != NULL) { 406 /* 407 * An engine pipe must be closed after all signal events 408 * added above to engine fast work queue will be processed. 409 */ 410 nxt_work_queue_add(&engine->fast_work_queue, 411 nxt_event_engine_signal_pipe_close, 412 &engine->task, engine->pipe, NULL); 413 414 engine->pipe = NULL; 415 } 416 417 engine->event.free(engine); 418 419 events = (batch != 0) ? batch : 32; 420 421 if (interface->create(engine, 4 * events, events) != NXT_OK) { 422 return NXT_ERROR; 423 } 424 425 engine->event = *interface; 426 427 if (nxt_event_engine_post_init(engine) != NXT_OK) { 428 return NXT_ERROR; 429 } 430 431 if (engine->signals != NULL) { 432 433 if (!engine->event.signal_support) { 434 return nxt_event_engine_signals_start(engine); 435 } 436 437 /* 438 * Reset the PID flag to start the signal thread if 439 * some future event facility will not support signals. 440 */ 441 engine->signals->process = 0; 442 } 443 444 return NXT_OK; 445} 446 447 448void 449nxt_event_engine_free(nxt_event_engine_t *engine) 450{ 451 nxt_thread_log_debug("free engine %p", engine); 452 453 nxt_event_engine_signal_pipe_free(engine); 454 nxt_free(engine->signals); 455 456 nxt_work_queue_cache_destroy(&engine->work_queue_cache); 457 458 engine->event.free(engine); 459 460 /* TODO: free timers */ 461 462 nxt_free(engine); 463} 464 465 466static nxt_work_handler_t 467nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task, 468 void **obj, void **data) 469{ 470 nxt_work_queue_t *wq, *last; 471 472 wq = engine->current_work_queue; 473 last = wq; 474 475 if (wq->head == NULL) { 476 wq = &engine->fast_work_queue; 477 478 if (wq->head == NULL) { 479 480 do { 481 engine->current_work_queue++; 482 wq = engine->current_work_queue; 483 484 if (wq > &engine->close_work_queue) { 485 wq = &engine->fast_work_queue; 486 engine->current_work_queue = wq; 487 } 488 489 if (wq->head != NULL) { 490 goto found; 491 } 492 493 } while (wq != last); 494 495 engine->current_work_queue = &engine->fast_work_queue; 496 497 return NULL; 498 } 499 } 500 501found: 502 503 nxt_debug(&engine->task, "work queue: %s", wq->name); 504 505 return nxt_work_queue_pop(wq, task, obj, data); 506} 507 508 509void 510nxt_event_engine_start(nxt_event_engine_t *engine) 511{ 512 void *obj, *data; 513 nxt_task_t *task; 514 nxt_msec_t timeout, now; 515 nxt_thread_t *thr; 516 nxt_work_handler_t handler; 517 518 thr = nxt_thread(); 519 520 if (engine->fibers) { 521 /* 522 * _setjmp() cannot be wrapped in a function since return from 523 * the function clobbers stack used by future _setjmp() returns. 524 */ 525 _setjmp(engine->fibers->fiber.jmp); 526 527 /* A return point from fibers. */ 528 } 529 530 thr->log = engine->task.log; 531 532 for ( ;; ) { 533 534 for ( ;; ) { 535 handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data); 536 537 if (handler == NULL) { 538 break; 539 } 540 541 thr->task = task; 542 543 handler(task, obj, data); 544 } 545 546 /* Attach some event engine work queues in preferred order. */ 547 548 timeout = nxt_timer_find(engine); 549 550 engine->event.poll(engine, timeout); 551 552 now = nxt_thread_monotonic_time(thr) / 1000000; 553 554 nxt_timer_expire(engine, now); 555 } 556} 557 558 559void * 560nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot, 561 size_t size) 562{ 563 uint8_t n; 564 nxt_uint_t items; 565 nxt_array_t *mem_cache; 566 nxt_mem_cache_t *cache; 567 nxt_mem_cache_block_t *block; 568 569 mem_cache = engine->mem_cache; 570 n = *slot; 571 572 if (n == (uint8_t) -1) { 573 574 if (mem_cache == NULL) { 575 /* IPv4 nxt_sockaddr_t and HTTP/1 and HTTP/2 buffers. */ 576 items = 3; 577#if (NXT_INET6) 578 items++; 579#endif 580#if (NXT_HAVE_UNIX_DOMAIN) 581 items++; 582#endif 583 584 mem_cache = nxt_array_create(engine->mem_pool, items, 585 sizeof(nxt_mem_cache_t)); 586 if (nxt_slow_path(mem_cache == NULL)) { 587 return mem_cache; 588 } 589 590 engine->mem_cache = mem_cache; 591 } 592 593 cache = mem_cache->elts; 594 for (n = 0; n < mem_cache->nelts; n++) { 595 if (cache[n].size == size) { 596 goto found; 597 } 598 } 599 600 cache = nxt_array_add(mem_cache); 601 if (nxt_slow_path(cache == NULL)) { 602 return cache; 603 } 604 605 cache->free = NULL; 606 cache->size = size; 607 cache->count = 0; 608 609 found: 610 611 *slot = n; 612 } 613 614 cache = mem_cache->elts; 615 cache = cache + n; 616 617 block = cache->free; 618 619 if (block != NULL) { 620 cache->free = block->next; 621 cache->count--; 622 return block; 623 } 624 625 return nxt_mp_alloc(engine->mem_pool, size); 626} 627 628 629void 630nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, void *p) 631{ 632 nxt_mem_cache_t *cache; 633 nxt_mem_cache_block_t *block; 634 635 block = p; 636 637 cache = engine->mem_cache->elts; 638 cache = cache + *slot; 639 640 if (cache->count < 16) { 641 cache->count++; 642 block->next = cache->free; 643 cache->free = block; 644 645 return; 646 } 647 648 nxt_mp_free(engine->mem_pool, p); 649} 650 651 652#if (NXT_DEBUG) 653 654void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine) 655{ 656 nxt_work_queue_thread_adopt(&engine->fast_work_queue); 657 nxt_work_queue_thread_adopt(&engine->accept_work_queue); 658 nxt_work_queue_thread_adopt(&engine->read_work_queue); 659 nxt_work_queue_thread_adopt(&engine->socket_work_queue); 660 nxt_work_queue_thread_adopt(&engine->connect_work_queue); 661 nxt_work_queue_thread_adopt(&engine->write_work_queue); 662 nxt_work_queue_thread_adopt(&engine->shutdown_work_queue); 663 nxt_work_queue_thread_adopt(&engine->close_work_queue); 664} 665 666#endif
|