xref: /unit/src/nxt_job.c (revision 20:4dc92b438f58)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 #if (NXT_THREADS)
11 static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data);
12 static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj,
13     void *data);
14 #endif
15 
16 
17 void *
18 nxt_job_create(nxt_mem_pool_t *mp, size_t size)
19 {
20     size_t     cache_size;
21     nxt_job_t  *job;
22 
23     if (mp == NULL) {
24         mp = nxt_mem_pool_create(256);
25 
26         if (nxt_slow_path(mp == NULL)) {
27             return NULL;
28         }
29 
30         job = nxt_mem_zalloc(mp, size);
31         cache_size = 0;
32 
33     } else {
34         job = nxt_mem_cache_zalloc0(mp, size);
35         cache_size = size;
36     }
37 
38     if (nxt_fast_path(job != NULL)) {
39         job->cache_size = (uint16_t) cache_size;
40         job->mem_pool = mp;
41         nxt_job_set_name(job, "job");
42     }
43 
44     /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
45     nxt_queue_self(&job->link);
46 
47     return job;
48 }
49 
50 
51 void
52 nxt_job_init(nxt_job_t *job, size_t size)
53 {
54     nxt_memzero(job, size);
55 
56     nxt_job_set_name(job, "job");
57 
58     nxt_queue_self(&job->link);
59 }
60 
61 
62 void
63 nxt_job_destroy(nxt_task_t *task, void *data)
64 {
65     nxt_job_t  *job;
66 
67     job = data;
68 
69     nxt_queue_remove(&job->link);
70 
71     if (job->cache_size == 0) {
72 
73         if (job->mem_pool != NULL) {
74             nxt_mem_pool_destroy(job->mem_pool);
75         }
76 
77     } else {
78         nxt_mem_cache_free0(job->mem_pool, job, job->cache_size);
79     }
80 }
81 
82 
83 nxt_int_t
84 nxt_job_cleanup_add(nxt_mem_pool_t *mp, nxt_job_t *job)
85 {
86     nxt_mem_pool_cleanup_t  *mpcl;
87 
88     mpcl = nxt_mem_pool_cleanup(mp, 0);
89 
90     if (nxt_fast_path(mpcl != NULL)) {
91         mpcl->handler = nxt_job_destroy;
92         mpcl->data = job;
93         return NXT_OK;
94     }
95 
96     return NXT_ERROR;
97 }
98 
99 
100 /*
101  * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post()
102  * calls and to the "nxt_work_handler_t" are required by Sun C.
103  */
104 
105 void
106 nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
107 {
108     nxt_debug(task, "%s start", job->name);
109 
110 #if (NXT_THREADS)
111 
112     if (job->thread_pool != NULL) {
113         nxt_int_t  ret;
114 
115         job->engine = task->thread->engine;
116 
117         nxt_work_set(&job->work, nxt_job_thread_trampoline,
118                      job->task, job, (void *) handler);
119 
120         ret = nxt_thread_pool_post(job->thread_pool, &job->work);
121 
122         if (ret == NXT_OK) {
123             return;
124         }
125 
126         handler = job->abort_handler;
127     }
128 
129 #endif
130 
131     handler(job->task, job, job->data);
132 }
133 
134 
135 #if (NXT_THREADS)
136 
137 /* A trampoline function is called by a thread pool thread. */
138 
139 static void
140 nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data)
141 {
142     nxt_job_t           *job;
143     nxt_work_handler_t  handler;
144 
145     job = obj;
146     handler = (nxt_work_handler_t) data;
147 
148     nxt_debug(task, "%s thread", job->name);
149 
150     if (nxt_slow_path(job->cancel)) {
151         nxt_job_return(task, job, job->abort_handler);
152 
153     } else {
154         handler(job->task, job, job->data);
155     }
156 }
157 
158 #endif
159 
160 
161 void
162 nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
163 {
164     nxt_debug(task, "%s return", job->name);
165 
166 #if (NXT_THREADS)
167 
168     if (job->engine != NULL) {
169         /* A return function is called in thread pool thread context. */
170 
171         nxt_work_set(&job->work, nxt_job_thread_return_handler,
172                      job->task, job, (void *) handler);
173 
174         nxt_event_engine_post(job->engine, &job->work);
175 
176         return;
177     }
178 
179 #endif
180 
181     if (nxt_slow_path(job->cancel)) {
182         nxt_debug(task, "%s cancellation", job->name);
183         handler = job->abort_handler;
184     }
185 
186     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
187                        handler, job->task, job, job->data);
188 }
189 
190 
191 #if (NXT_THREADS)
192 
193 static void
194 nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data)
195 {
196     nxt_job_t           *job;
197     nxt_work_handler_t  handler;
198 
199     job = obj;
200     handler = (nxt_work_handler_t) data;
201 
202     job->task->thread = task->thread;
203 
204     if (nxt_slow_path(job->cancel)) {
205         nxt_debug(task, "%s cancellation", job->name);
206         handler = job->abort_handler;
207     }
208 
209     handler(job->task, job, job->data);
210 }
211 
212 #endif
213