xref: /unit/src/nxt_job.c (revision 2403:81b1e3a34636)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data);
11 static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj,
12     void *data);
13 
14 
15 void *
nxt_job_create(nxt_mp_t * mp,size_t size)16 nxt_job_create(nxt_mp_t *mp, size_t size)
17 {
18     size_t     cache_size;
19     nxt_job_t  *job;
20 
21     if (mp == NULL) {
22         mp = nxt_mp_create(1024, 128, 256, 32);
23         if (nxt_slow_path(mp == NULL)) {
24             return NULL;
25         }
26 
27         job = nxt_mp_zget(mp, size);
28         cache_size = 0;
29 
30     } else {
31         job = nxt_mp_zalloc(mp, size);
32         cache_size = size;
33     }
34 
35     if (nxt_slow_path(job == NULL)) {
36         return NULL;
37     }
38 
39     job->cache_size = (uint16_t) cache_size;
40     job->mem_pool = mp;
41     nxt_job_set_name(job, "job");
42 
43     /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
44     nxt_queue_self(&job->link);
45 
46     return job;
47 }
48 
49 
50 void
nxt_job_init(nxt_job_t * job,size_t size)51 nxt_job_init(nxt_job_t *job, size_t size)
52 {
53     nxt_memzero(job, size);
54 
55     nxt_job_set_name(job, "job");
56 
57     nxt_queue_self(&job->link);
58 }
59 
60 
61 void
nxt_job_destroy(nxt_task_t * task,void * data)62 nxt_job_destroy(nxt_task_t *task, void *data)
63 {
64     nxt_job_t  *job;
65 
66     job = data;
67 
68     nxt_queue_remove(&job->link);
69 
70     if (job->cache_size == 0) {
71 
72         if (job->mem_pool != NULL) {
73             nxt_mp_destroy(job->mem_pool);
74         }
75 
76     } else {
77         nxt_mp_free(job->mem_pool, job);
78     }
79 }
80 
81 
82 #if 0
83 
84 nxt_int_t
85 nxt_job_cleanup_add(nxt_mp_t *mp, nxt_job_t *job)
86 {
87     nxt_mem_pool_cleanup_t  *mpcl;
88 
89     mpcl = nxt_mem_pool_cleanup(mp, 0);
90 
91     if (nxt_fast_path(mpcl != NULL)) {
92         mpcl->handler = nxt_job_destroy;
93         mpcl->data = job;
94         return NXT_OK;
95     }
96 
97     return NXT_ERROR;
98 }
99 
100 #endif
101 
102 
103 /*
104  * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post()
105  * calls and to the "nxt_work_handler_t" are required by Sun C.
106  */
107 
108 void
nxt_job_start(nxt_task_t * task,nxt_job_t * job,nxt_work_handler_t handler)109 nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
110 {
111     nxt_debug(task, "%s start", job->name);
112 
113     if (job->thread_pool != NULL) {
114         nxt_int_t  ret;
115 
116         job->engine = task->thread->engine;
117 
118         nxt_work_set(&job->work, nxt_job_thread_trampoline,
119                      job->task, job, (void *) handler);
120 
121         ret = nxt_thread_pool_post(job->thread_pool, &job->work);
122 
123         if (ret == NXT_OK) {
124             return;
125         }
126 
127         handler = job->abort_handler;
128     }
129 
130     handler(job->task, job, job->data);
131 }
132 
133 
134 /* A trampoline function is called by a thread pool thread. */
135 
136 static void
nxt_job_thread_trampoline(nxt_task_t * task,void * obj,void * data)137 nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data)
138 {
139     nxt_job_t           *job;
140     nxt_work_handler_t  handler;
141 
142     job = obj;
143     handler = (nxt_work_handler_t) data;
144 
145     nxt_debug(task, "%s thread", job->name);
146 
147     if (nxt_slow_path(job->cancel)) {
148         nxt_job_return(task, job, job->abort_handler);
149 
150     } else {
151         handler(job->task, job, job->data);
152     }
153 }
154 
155 
156 void
nxt_job_return(nxt_task_t * task,nxt_job_t * job,nxt_work_handler_t handler)157 nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
158 {
159     nxt_debug(task, "%s return", job->name);
160 
161     if (job->engine != NULL) {
162         /* A return function is called in thread pool thread context. */
163 
164         nxt_work_set(&job->work, nxt_job_thread_return_handler,
165                      job->task, job, (void *) handler);
166 
167         nxt_event_engine_post(job->engine, &job->work);
168 
169         return;
170     }
171 
172     if (nxt_slow_path(job->cancel)) {
173         nxt_debug(task, "%s cancellation", job->name);
174         handler = job->abort_handler;
175     }
176 
177     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
178                        handler, job->task, job, job->data);
179 }
180 
181 
182 static void
nxt_job_thread_return_handler(nxt_task_t * task,void * obj,void * data)183 nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data)
184 {
185     nxt_job_t           *job;
186     nxt_work_handler_t  handler;
187 
188     job = obj;
189     handler = (nxt_work_handler_t) data;
190 
191     job->task->thread = task->thread;
192 
193     if (nxt_slow_path(job->cancel)) {
194         nxt_debug(task, "%s cancellation", job->name);
195         handler = job->abort_handler;
196     }
197 
198     handler(job->task, job, job->data);
199 }
200