xref: /unit/src/nxt_job.c (revision 65:10688b89aa16)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 #if (NXT_THREADS)
11 static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data);
12 static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj,
13     void *data);
14 #endif
15 
16 
17 void *
18 nxt_job_create(nxt_mp_t *mp, size_t size)
19 {
20     size_t     cache_size;
21     nxt_job_t  *job;
22 
23     if (mp == NULL) {
24         mp = nxt_mp_create(1024, 128, 256, 32);
25         if (nxt_slow_path(mp == NULL)) {
26             return NULL;
27         }
28 
29         job = nxt_mp_zget(mp, size);
30         cache_size = 0;
31 
32     } else {
33         job = nxt_mp_zalloc(mp, size);
34         cache_size = size;
35     }
36 
37     if (nxt_fast_path(job != NULL)) {
38         job->cache_size = (uint16_t) cache_size;
39         job->mem_pool = mp;
40         nxt_job_set_name(job, "job");
41     }
42 
43     /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
44     nxt_queue_self(&job->link);
45 
46     return job;
47 }
48 
49 
50 void
51 nxt_job_init(nxt_job_t *job, size_t size)
52 {
53     nxt_memzero(job, size);
54 
55     nxt_job_set_name(job, "job");
56 
57     nxt_queue_self(&job->link);
58 }
59 
60 
61 void
62 nxt_job_destroy(nxt_task_t *task, void *data)
63 {
64     nxt_job_t  *job;
65 
66     job = data;
67 
68     nxt_queue_remove(&job->link);
69 
70     if (job->cache_size == 0) {
71 
72         if (job->mem_pool != NULL) {
73             nxt_mp_destroy(job->mem_pool);
74         }
75 
76     } else {
77         nxt_mp_free(job->mem_pool, job);
78     }
79 }
80 
81 
82 #if 0
83 
84 nxt_int_t
85 nxt_job_cleanup_add(nxt_mp_t *mp, nxt_job_t *job)
86 {
87     nxt_mem_pool_cleanup_t  *mpcl;
88 
89     mpcl = nxt_mem_pool_cleanup(mp, 0);
90 
91     if (nxt_fast_path(mpcl != NULL)) {
92         mpcl->handler = nxt_job_destroy;
93         mpcl->data = job;
94         return NXT_OK;
95     }
96 
97     return NXT_ERROR;
98 }
99 
100 #endif
101 
102 
103 /*
104  * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post()
105  * calls and to the "nxt_work_handler_t" are required by Sun C.
106  */
107 
108 void
109 nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
110 {
111     nxt_debug(task, "%s start", job->name);
112 
113 #if (NXT_THREADS)
114 
115     if (job->thread_pool != NULL) {
116         nxt_int_t  ret;
117 
118         job->engine = task->thread->engine;
119 
120         nxt_work_set(&job->work, nxt_job_thread_trampoline,
121                      job->task, job, (void *) handler);
122 
123         ret = nxt_thread_pool_post(job->thread_pool, &job->work);
124 
125         if (ret == NXT_OK) {
126             return;
127         }
128 
129         handler = job->abort_handler;
130     }
131 
132 #endif
133 
134     handler(job->task, job, job->data);
135 }
136 
137 
138 #if (NXT_THREADS)
139 
140 /* A trampoline function is called by a thread pool thread. */
141 
142 static void
143 nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data)
144 {
145     nxt_job_t           *job;
146     nxt_work_handler_t  handler;
147 
148     job = obj;
149     handler = (nxt_work_handler_t) data;
150 
151     nxt_debug(task, "%s thread", job->name);
152 
153     if (nxt_slow_path(job->cancel)) {
154         nxt_job_return(task, job, job->abort_handler);
155 
156     } else {
157         handler(job->task, job, job->data);
158     }
159 }
160 
161 #endif
162 
163 
164 void
165 nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
166 {
167     nxt_debug(task, "%s return", job->name);
168 
169 #if (NXT_THREADS)
170 
171     if (job->engine != NULL) {
172         /* A return function is called in thread pool thread context. */
173 
174         nxt_work_set(&job->work, nxt_job_thread_return_handler,
175                      job->task, job, (void *) handler);
176 
177         nxt_event_engine_post(job->engine, &job->work);
178 
179         return;
180     }
181 
182 #endif
183 
184     if (nxt_slow_path(job->cancel)) {
185         nxt_debug(task, "%s cancellation", job->name);
186         handler = job->abort_handler;
187     }
188 
189     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
190                        handler, job->task, job, job->data);
191 }
192 
193 
194 #if (NXT_THREADS)
195 
196 static void
197 nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data)
198 {
199     nxt_job_t           *job;
200     nxt_work_handler_t  handler;
201 
202     job = obj;
203     handler = (nxt_work_handler_t) data;
204 
205     job->task->thread = task->thread;
206 
207     if (nxt_slow_path(job->cancel)) {
208         nxt_debug(task, "%s cancellation", job->name);
209         handler = job->abort_handler;
210     }
211 
212     handler(job->task, job, job->data);
213 }
214 
215 #endif
216