Deleted Added
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10#if (NXT_THREADS)
11static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data);
12static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj,
13 void *data);
14#endif
15
16
17void *
18nxt_job_create(nxt_mem_pool_t *mp, size_t size)
19{
20 size_t cache_size;

--- 18 unchanged lines hidden (view full) ---

39 job->cache_size = (uint16_t) cache_size;
40 job->mem_pool = mp;
41 nxt_job_set_name(job, "job");
42 }
43
44 /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
45 nxt_queue_self(&job->link);
46
47 job->task.ident = nxt_task_next_ident();
48
49 return job;
50}
51
52
53void
54nxt_job_init(nxt_job_t *job, size_t size)
55{
56 nxt_memzero(job, size);
57
58 nxt_job_set_name(job, "job");
59
60 nxt_queue_self(&job->link);
61
62 job->task.ident = nxt_task_next_ident();
63}
64
65
66void
67nxt_job_destroy(void *data)
68{
69 nxt_job_t *job;
70

--- 31 unchanged lines hidden (view full) ---

102
103
104/*
105 * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post()
106 * calls and to the "nxt_work_handler_t" are required by Sun C.
107 */
108
109void
110nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
111{
112 nxt_debug(task, "%s start", job->name);
113
114#if (NXT_THREADS)
115
116 if (job->thread_pool != NULL) {
117 nxt_int_t ret;
118
119 job->engine = task->thread->engine;
120
121 ret = nxt_thread_pool_post(job->thread_pool, nxt_job_thread_trampoline,
122 &job->task, job, (void *) handler);
123 if (ret == NXT_OK) {
124 return;
125 }
126
127 handler = job->abort_handler;
128 }
129
130#endif
131
132 handler(&job->task, job, job->data);
133}
134
135
136#if (NXT_THREADS)
137
138/* A trampoline function is called by a thread pool thread. */
139
140static void
141nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data)
142{
143 nxt_job_t *job;
144 nxt_work_handler_t handler;
145
146 job = obj;
147 handler = (nxt_work_handler_t) data;
148
149 job->task.log = job->log;
150
151 nxt_debug(task, "%s thread", job->name);
152
153 if (nxt_slow_path(job->cancel)) {
154 nxt_job_return(task, job, job->abort_handler);
155
156 } else {
157 handler(&job->task, job, job->data);
158 }
159}
160
161#endif
162
163
164void
165nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
166{
167 nxt_debug(task, "%s return", job->name);
168
169#if (NXT_THREADS)
170
171 if (job->engine != NULL) {
172 /* A return function is called in thread pool thread context. */
173 nxt_event_engine_post(job->engine, nxt_job_thread_return_handler,
174 &job->task, job, (void *) handler, job->log);
175 return;
176 }
177
178#endif
179
180 if (nxt_slow_path(job->cancel)) {
181 nxt_debug(task, "%s cancellation", job->name);
182 handler = job->abort_handler;
183 }
184
185 nxt_thread_work_queue_push(task->thread, &task->thread->work_queue.main,
186 handler, &job->task, job, job->data);
187}
188
189
190#if (NXT_THREADS)
191
192static void
193nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data)
194{
195 nxt_job_t *job;
196 nxt_work_handler_t handler;
197
198 job = obj;
199 handler = (nxt_work_handler_t) data;
200
201 job->task.thread = task->thread;
202
203 if (nxt_slow_path(job->cancel)) {
204 nxt_debug(task, "%s cancellation", job->name);
205 handler = job->abort_handler;
206 }
207
208 handler(&job->task, job, job->data);
209}
210
211#endif