Back to home page

Nginx displayed by LXR

Source navigation ]
Diff markup ]
Identifier search ]
general search ]
 
 
Version: nginx-1.15.11 ]​[ nginx-1.14.2 ]​

0001 
0002 /*
0003  * Copyright (C) Nginx, Inc.
0004  * Copyright (C) Valentin V. Bartenev
0005  * Copyright (C) Ruslan Ermilov
0006  */
0007 
0008 
0009 #include <ngx_config.h>
0010 #include <ngx_core.h>
0011 #include <ngx_thread_pool.h>
0012 
0013 
0014 typedef struct {
0015     ngx_array_t               pools;
0016 } ngx_thread_pool_conf_t;
0017 
0018 
0019 typedef struct {
0020     ngx_thread_task_t        *first;
0021     ngx_thread_task_t       **last;
0022 } ngx_thread_pool_queue_t;
0023 
0024 #define ngx_thread_pool_queue_init(q)                                         \
0025     (q)->first = NULL;                                                        \
0026     (q)->last = &(q)->first
0027 
0028 
0029 struct ngx_thread_pool_s {
0030     ngx_thread_mutex_t        mtx;
0031     ngx_thread_pool_queue_t   queue;
0032     ngx_int_t                 waiting;
0033     ngx_thread_cond_t         cond;
0034 
0035     ngx_log_t                *log;
0036 
0037     ngx_str_t                 name;
0038     ngx_uint_t                threads;
0039     ngx_int_t                 max_queue;
0040 
0041     u_char                   *file;
0042     ngx_uint_t                line;
0043 };
0044 
0045 
0046 static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log,
0047     ngx_pool_t *pool);
0048 static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp);
0049 static void ngx_thread_pool_exit_handler(void *data, ngx_log_t *log);
0050 
0051 static void *ngx_thread_pool_cycle(void *data);
0052 static void ngx_thread_pool_handler(ngx_event_t *ev);
0053 
0054 static char *ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
0055 
0056 static void *ngx_thread_pool_create_conf(ngx_cycle_t *cycle);
0057 static char *ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf);
0058 
0059 static ngx_int_t ngx_thread_pool_init_worker(ngx_cycle_t *cycle);
0060 static void ngx_thread_pool_exit_worker(ngx_cycle_t *cycle);
0061 
0062 
0063 static ngx_command_t  ngx_thread_pool_commands[] = {
0064 
0065     { ngx_string("thread_pool"),
0066       NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE23,
0067       ngx_thread_pool,
0068       0,
0069       0,
0070       NULL },
0071 
0072       ngx_null_command
0073 };
0074 
0075 
0076 static ngx_core_module_t  ngx_thread_pool_module_ctx = {
0077     ngx_string("thread_pool"),
0078     ngx_thread_pool_create_conf,
0079     ngx_thread_pool_init_conf
0080 };
0081 
0082 
0083 ngx_module_t  ngx_thread_pool_module = {
0084     NGX_MODULE_V1,
0085     &ngx_thread_pool_module_ctx,           /* module context */
0086     ngx_thread_pool_commands,              /* module directives */
0087     NGX_CORE_MODULE,                       /* module type */
0088     NULL,                                  /* init master */
0089     NULL,                                  /* init module */
0090     ngx_thread_pool_init_worker,           /* init process */
0091     NULL,                                  /* init thread */
0092     NULL,                                  /* exit thread */
0093     ngx_thread_pool_exit_worker,           /* exit process */
0094     NULL,                                  /* exit master */
0095     NGX_MODULE_V1_PADDING
0096 };
0097 
0098 
0099 static ngx_str_t  ngx_thread_pool_default = ngx_string("default");
0100 
0101 static ngx_uint_t               ngx_thread_pool_task_id;
0102 static ngx_atomic_t             ngx_thread_pool_done_lock;
0103 static ngx_thread_pool_queue_t  ngx_thread_pool_done;
0104 
0105 
0106 static ngx_int_t
0107 ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
0108 {
0109     int             err;
0110     pthread_t       tid;
0111     ngx_uint_t      n;
0112     pthread_attr_t  attr;
0113 
0114     if (ngx_notify == NULL) {
0115         ngx_log_error(NGX_LOG_ALERT, log, 0,
0116                "the configured event method cannot be used with thread pools");
0117         return NGX_ERROR;
0118     }
0119 
0120     ngx_thread_pool_queue_init(&tp->queue);
0121 
0122     if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
0123         return NGX_ERROR;
0124     }
0125 
0126     if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
0127         (void) ngx_thread_mutex_destroy(&tp->mtx, log);
0128         return NGX_ERROR;
0129     }
0130 
0131     tp->log = log;
0132 
0133     err = pthread_attr_init(&attr);
0134     if (err) {
0135         ngx_log_error(NGX_LOG_ALERT, log, err,
0136                       "pthread_attr_init() failed");
0137         return NGX_ERROR;
0138     }
0139 
0140     err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
0141     if (err) {
0142         ngx_log_error(NGX_LOG_ALERT, log, err,
0143                       "pthread_attr_setdetachstate() failed");
0144         return NGX_ERROR;
0145     }
0146 
0147 #if 0
0148     err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
0149     if (err) {
0150         ngx_log_error(NGX_LOG_ALERT, log, err,
0151                       "pthread_attr_setstacksize() failed");
0152         return NGX_ERROR;
0153     }
0154 #endif
0155 
0156     for (n = 0; n < tp->threads; n++) {
0157         err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
0158         if (err) {
0159             ngx_log_error(NGX_LOG_ALERT, log, err,
0160                           "pthread_create() failed");
0161             return NGX_ERROR;
0162         }
0163     }
0164 
0165     (void) pthread_attr_destroy(&attr);
0166 
0167     return NGX_OK;
0168 }
0169 
0170 
0171 static void
0172 ngx_thread_pool_destroy(ngx_thread_pool_t *tp)
0173 {
0174     ngx_uint_t           n;
0175     ngx_thread_task_t    task;
0176     volatile ngx_uint_t  lock;
0177 
0178     ngx_memzero(&task, sizeof(ngx_thread_task_t));
0179 
0180     task.handler = ngx_thread_pool_exit_handler;
0181     task.ctx = (void *) &lock;
0182 
0183     for (n = 0; n < tp->threads; n++) {
0184         lock = 1;
0185 
0186         if (ngx_thread_task_post(tp, &task) != NGX_OK) {
0187             return;
0188         }
0189 
0190         while (lock) {
0191             ngx_sched_yield();
0192         }
0193 
0194         task.event.active = 0;
0195     }
0196 
0197     (void) ngx_thread_cond_destroy(&tp->cond, tp->log);
0198 
0199     (void) ngx_thread_mutex_destroy(&tp->mtx, tp->log);
0200 }
0201 
0202 
0203 static void
0204 ngx_thread_pool_exit_handler(void *data, ngx_log_t *log)
0205 {
0206     ngx_uint_t *lock = data;
0207 
0208     *lock = 0;
0209 
0210     pthread_exit(0);
0211 }
0212 
0213 
0214 ngx_thread_task_t *
0215 ngx_thread_task_alloc(ngx_pool_t *pool, size_t size)
0216 {
0217     ngx_thread_task_t  *task;
0218 
0219     task = ngx_pcalloc(pool, sizeof(ngx_thread_task_t) + size);
0220     if (task == NULL) {
0221         return NULL;
0222     }
0223 
0224     task->ctx = task + 1;
0225 
0226     return task;
0227 }
0228 
0229 
0230 ngx_int_t
0231 ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
0232 {
0233     if (task->event.active) {
0234         ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
0235                       "task #%ui already active", task->id);
0236         return NGX_ERROR;
0237     }
0238 
0239     if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
0240         return NGX_ERROR;
0241     }
0242 
0243     if (tp->waiting >= tp->max_queue) {
0244         (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
0245 
0246         ngx_log_error(NGX_LOG_ERR, tp->log, 0,
0247                       "thread pool \"%V\" queue overflow: %i tasks waiting",
0248                       &tp->name, tp->waiting);
0249         return NGX_ERROR;
0250     }
0251 
0252     task->event.active = 1;
0253 
0254     task->id = ngx_thread_pool_task_id++;
0255     task->next = NULL;
0256 
0257     if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
0258         (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
0259         return NGX_ERROR;
0260     }
0261 
0262     *tp->queue.last = task;
0263     tp->queue.last = &task->next;
0264 
0265     tp->waiting++;
0266 
0267     (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
0268 
0269     ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
0270                    "task #%ui added to thread pool \"%V\"",
0271                    task->id, &tp->name);
0272 
0273     return NGX_OK;
0274 }
0275 
0276 
0277 static void *
0278 ngx_thread_pool_cycle(void *data)
0279 {
0280     ngx_thread_pool_t *tp = data;
0281 
0282     int                 err;
0283     sigset_t            set;
0284     ngx_thread_task_t  *task;
0285 
0286 #if 0
0287     ngx_time_update();
0288 #endif
0289 
0290     ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0,
0291                    "thread in pool \"%V\" started", &tp->name);
0292 
0293     sigfillset(&set);
0294 
0295     sigdelset(&set, SIGILL);
0296     sigdelset(&set, SIGFPE);
0297     sigdelset(&set, SIGSEGV);
0298     sigdelset(&set, SIGBUS);
0299 
0300     err = pthread_sigmask(SIG_BLOCK, &set, NULL);
0301     if (err) {
0302         ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed");
0303         return NULL;
0304     }
0305 
0306     for ( ;; ) {
0307         if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
0308             return NULL;
0309         }
0310 
0311         /* the number may become negative */
0312         tp->waiting--;
0313 
0314         while (tp->queue.first == NULL) {
0315             if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
0316                 != NGX_OK)
0317             {
0318                 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
0319                 return NULL;
0320             }
0321         }
0322 
0323         task = tp->queue.first;
0324         tp->queue.first = task->next;
0325 
0326         if (tp->queue.first == NULL) {
0327             tp->queue.last = &tp->queue.first;
0328         }
0329 
0330         if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
0331             return NULL;
0332         }
0333 
0334 #if 0
0335         ngx_time_update();
0336 #endif
0337 
0338         ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
0339                        "run task #%ui in thread pool \"%V\"",
0340                        task->id, &tp->name);
0341 
0342         task->handler(task->ctx, tp->log);
0343 
0344         ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
0345                        "complete task #%ui in thread pool \"%V\"",
0346                        task->id, &tp->name);
0347 
0348         task->next = NULL;
0349 
0350         ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
0351 
0352         *ngx_thread_pool_done.last = task;
0353         ngx_thread_pool_done.last = &task->next;
0354 
0355         ngx_memory_barrier();
0356 
0357         ngx_unlock(&ngx_thread_pool_done_lock);
0358 
0359         (void) ngx_notify(ngx_thread_pool_handler);
0360     }
0361 }
0362 
0363 
0364 static void
0365 ngx_thread_pool_handler(ngx_event_t *ev)
0366 {
0367     ngx_event_t        *event;
0368     ngx_thread_task_t  *task;
0369 
0370     ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "thread pool handler");
0371 
0372     ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
0373 
0374     task = ngx_thread_pool_done.first;
0375     ngx_thread_pool_done.first = NULL;
0376     ngx_thread_pool_done.last = &ngx_thread_pool_done.first;
0377 
0378     ngx_memory_barrier();
0379 
0380     ngx_unlock(&ngx_thread_pool_done_lock);
0381 
0382     while (task) {
0383         ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
0384                        "run completion handler for task #%ui", task->id);
0385 
0386         event = &task->event;
0387         task = task->next;
0388 
0389         event->complete = 1;
0390         event->active = 0;
0391 
0392         event->handler(event);
0393     }
0394 }
0395 
0396 
0397 static void *
0398 ngx_thread_pool_create_conf(ngx_cycle_t *cycle)
0399 {
0400     ngx_thread_pool_conf_t  *tcf;
0401 
0402     tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));
0403     if (tcf == NULL) {
0404         return NULL;
0405     }
0406 
0407     if (ngx_array_init(&tcf->pools, cycle->pool, 4,
0408                        sizeof(ngx_thread_pool_t *))
0409         != NGX_OK)
0410     {
0411         return NULL;
0412     }
0413 
0414     return tcf;
0415 }
0416 
0417 
0418 static char *
0419 ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf)
0420 {
0421     ngx_thread_pool_conf_t *tcf = conf;
0422 
0423     ngx_uint_t           i;
0424     ngx_thread_pool_t  **tpp;
0425 
0426     tpp = tcf->pools.elts;
0427 
0428     for (i = 0; i < tcf->pools.nelts; i++) {
0429 
0430         if (tpp[i]->threads) {
0431             continue;
0432         }
0433 
0434         if (tpp[i]->name.len == ngx_thread_pool_default.len
0435             && ngx_strncmp(tpp[i]->name.data, ngx_thread_pool_default.data,
0436                            ngx_thread_pool_default.len)
0437                == 0)
0438         {
0439             tpp[i]->threads = 32;
0440             tpp[i]->max_queue = 65536;
0441             continue;
0442         }
0443 
0444         ngx_log_error(NGX_LOG_EMERG, cycle->log, 0,
0445                       "unknown thread pool \"%V\" in %s:%ui",
0446                       &tpp[i]->name, tpp[i]->file, tpp[i]->line);
0447 
0448         return NGX_CONF_ERROR;
0449     }
0450 
0451     return NGX_CONF_OK;
0452 }
0453 
0454 
0455 static char *
0456 ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
0457 {
0458     ngx_str_t          *value;
0459     ngx_uint_t          i;
0460     ngx_thread_pool_t  *tp;
0461 
0462     value = cf->args->elts;
0463 
0464     tp = ngx_thread_pool_add(cf, &value[1]);
0465 
0466     if (tp == NULL) {
0467         return NGX_CONF_ERROR;
0468     }
0469 
0470     if (tp->threads) {
0471         ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
0472                            "duplicate thread pool \"%V\"", &tp->name);
0473         return NGX_CONF_ERROR;
0474     }
0475 
0476     tp->max_queue = 65536;
0477 
0478     for (i = 2; i < cf->args->nelts; i++) {
0479 
0480         if (ngx_strncmp(value[i].data, "threads=", 8) == 0) {
0481 
0482             tp->threads = ngx_atoi(value[i].data + 8, value[i].len - 8);
0483 
0484             if (tp->threads == (ngx_uint_t) NGX_ERROR || tp->threads == 0) {
0485                 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
0486                                    "invalid threads value \"%V\"", &value[i]);
0487                 return NGX_CONF_ERROR;
0488             }
0489 
0490             continue;
0491         }
0492 
0493         if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {
0494 
0495             tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10);
0496 
0497             if (tp->max_queue == NGX_ERROR) {
0498                 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
0499                                    "invalid max_queue value \"%V\"", &value[i]);
0500                 return NGX_CONF_ERROR;
0501             }
0502 
0503             continue;
0504         }
0505     }
0506 
0507     if (tp->threads == 0) {
0508         ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
0509                            "\"%V\" must have \"threads\" parameter",
0510                            &cmd->name);
0511         return NGX_CONF_ERROR;
0512     }
0513 
0514     return NGX_CONF_OK;
0515 }
0516 
0517 
0518 ngx_thread_pool_t *
0519 ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name)
0520 {
0521     ngx_thread_pool_t       *tp, **tpp;
0522     ngx_thread_pool_conf_t  *tcf;
0523 
0524     if (name == NULL) {
0525         name = &ngx_thread_pool_default;
0526     }
0527 
0528     tp = ngx_thread_pool_get(cf->cycle, name);
0529 
0530     if (tp) {
0531         return tp;
0532     }
0533 
0534     tp = ngx_pcalloc(cf->pool, sizeof(ngx_thread_pool_t));
0535     if (tp == NULL) {
0536         return NULL;
0537     }
0538 
0539     tp->name = *name;
0540     tp->file = cf->conf_file->file.name.data;
0541     tp->line = cf->conf_file->line;
0542 
0543     tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
0544                                                   ngx_thread_pool_module);
0545 
0546     tpp = ngx_array_push(&tcf->pools);
0547     if (tpp == NULL) {
0548         return NULL;
0549     }
0550 
0551     *tpp = tp;
0552 
0553     return tp;
0554 }
0555 
0556 
0557 ngx_thread_pool_t *
0558 ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name)
0559 {
0560     ngx_uint_t                i;
0561     ngx_thread_pool_t       **tpp;
0562     ngx_thread_pool_conf_t   *tcf;
0563 
0564     tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
0565                                                   ngx_thread_pool_module);
0566 
0567     tpp = tcf->pools.elts;
0568 
0569     for (i = 0; i < tcf->pools.nelts; i++) {
0570 
0571         if (tpp[i]->name.len == name->len
0572             && ngx_strncmp(tpp[i]->name.data, name->data, name->len) == 0)
0573         {
0574             return tpp[i];
0575         }
0576     }
0577 
0578     return NULL;
0579 }
0580 
0581 
0582 static ngx_int_t
0583 ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
0584 {
0585     ngx_uint_t                i;
0586     ngx_thread_pool_t       **tpp;
0587     ngx_thread_pool_conf_t   *tcf;
0588 
0589     if (ngx_process != NGX_PROCESS_WORKER
0590         && ngx_process != NGX_PROCESS_SINGLE)
0591     {
0592         return NGX_OK;
0593     }
0594 
0595     tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
0596                                                   ngx_thread_pool_module);
0597 
0598     if (tcf == NULL) {
0599         return NGX_OK;
0600     }
0601 
0602     ngx_thread_pool_queue_init(&ngx_thread_pool_done);
0603 
0604     tpp = tcf->pools.elts;
0605 
0606     for (i = 0; i < tcf->pools.nelts; i++) {
0607         if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {
0608             return NGX_ERROR;
0609         }
0610     }
0611 
0612     return NGX_OK;
0613 }
0614 
0615 
0616 static void
0617 ngx_thread_pool_exit_worker(ngx_cycle_t *cycle)
0618 {
0619     ngx_uint_t                i;
0620     ngx_thread_pool_t       **tpp;
0621     ngx_thread_pool_conf_t   *tcf;
0622 
0623     if (ngx_process != NGX_PROCESS_WORKER
0624         && ngx_process != NGX_PROCESS_SINGLE)
0625     {
0626         return;
0627     }
0628 
0629     tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
0630                                                   ngx_thread_pool_module);
0631 
0632     if (tcf == NULL) {
0633         return;
0634     }
0635 
0636     tpp = tcf->pools.elts;
0637 
0638     for (i = 0; i < tcf->pools.nelts; i++) {
0639         ngx_thread_pool_destroy(tpp[i]);
0640     }
0641 }