xref: /unit/src/nxt_job.c (revision 0:a63ceefd6ab0)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 #if (NXT_THREADS)
11 static void nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data);
12 static void nxt_job_thread_return_handler(nxt_thread_t *thr, void *obj,
13     void *data);
14 #endif
15 
16 
17 void *
18 nxt_job_create(nxt_mem_pool_t *mp, size_t size)
19 {
20     size_t     cache_size;
21     nxt_job_t  *job;
22 
23     if (mp == NULL) {
24         mp = nxt_mem_pool_create(256);
25 
26         if (nxt_slow_path(mp == NULL)) {
27             return NULL;
28         }
29 
30         job = nxt_mem_zalloc(mp, size);
31         cache_size = 0;
32 
33     } else {
34         job = nxt_mem_cache_zalloc0(mp, size);
35         cache_size = size;
36     }
37 
38     if (nxt_fast_path(job != NULL)) {
39         job->cache_size = (uint16_t) cache_size;
40         job->mem_pool = mp;
41         nxt_job_set_name(job, "job");
42     }
43 
44     /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
45     nxt_queue_self(&job->link);
46 
47     return job;
48 }
49 
50 
51 void
52 nxt_job_init(nxt_job_t *job, size_t size)
53 {
54     nxt_memzero(job, size);
55 
56     nxt_job_set_name(job, "job");
57 
58     nxt_queue_self(&job->link);
59 }
60 
61 
62 void
63 nxt_job_destroy(void *data)
64 {
65     nxt_job_t  *job;
66 
67     job = data;
68 
69     nxt_queue_remove(&job->link);
70 
71     if (job->cache_size == 0) {
72 
73         if (job->mem_pool != NULL) {
74             nxt_mem_pool_destroy(job->mem_pool);
75         }
76 
77     } else {
78         nxt_mem_cache_free0(job->mem_pool, job, job->cache_size);
79     }
80 }
81 
82 
83 nxt_int_t
84 nxt_job_cleanup_add(nxt_mem_pool_t *mp, nxt_job_t *job)
85 {
86     nxt_mem_pool_cleanup_t  *mpcl;
87 
88     mpcl = nxt_mem_pool_cleanup(mp, 0);
89 
90     if (nxt_fast_path(mpcl != NULL)) {
91         mpcl->handler = nxt_job_destroy;
92         mpcl->data = job;
93         return NXT_OK;
94     }
95 
96     return NXT_ERROR;
97 }
98 
99 
100 /*
101  * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post()
102  * calls and to the "nxt_work_handler_t" are required by Sun C.
103  */
104 
105 void
106 nxt_job_start(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler)
107 {
108     nxt_log_debug(thr->log, "%s start", job->name);
109 
110 #if (NXT_THREADS)
111 
112     if (job->thread_pool != NULL) {
113         nxt_int_t  ret;
114 
115         job->engine = thr->engine;
116         ret = nxt_thread_pool_post(job->thread_pool, nxt_job_thread_trampoline,
117                                    job, (void *) handler, job->log);
118         if (ret == NXT_OK) {
119             return;
120         }
121 
122         handler = job->abort_handler;
123     }
124 
125 #endif
126 
127     handler(thr, job, job->data);
128 }
129 
130 
131 #if (NXT_THREADS)
132 
133 /* A trampoline function is called by a thread pool thread. */
134 
135 static void
136 nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data)
137 {
138     nxt_job_t           *job;
139     nxt_work_handler_t  handler;
140 
141     job = obj;
142     handler = (nxt_work_handler_t) data;
143 
144     nxt_log_debug(thr->log, "%s thread", job->name);
145 
146     if (nxt_slow_path(job->cancel)) {
147         nxt_job_return(thr, job, job->abort_handler);
148 
149     } else {
150         handler(thr, job, job->data);
151     }
152 }
153 
154 #endif
155 
156 
157 void
158 nxt_job_return(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler)
159 {
160     nxt_log_debug(thr->log, "%s return", job->name);
161 
162 #if (NXT_THREADS)
163 
164     if (job->engine != NULL) {
165         /* A return function is called in thread pool thread context. */
166         nxt_event_engine_post(job->engine, nxt_job_thread_return_handler,
167                               job, (void *) handler, job->log);
168         return;
169     }
170 
171 #endif
172 
173     if (nxt_slow_path(job->cancel)) {
174         nxt_log_debug(thr->log, "%s cancellation", job->name);
175         handler = job->abort_handler;
176     }
177 
178     nxt_thread_work_queue_push(thr, &thr->work_queue.main,
179                                handler, job, job->data, thr->log);
180 }
181 
182 
183 #if (NXT_THREADS)
184 
185 static void
186 nxt_job_thread_return_handler(nxt_thread_t *thr, void *obj, void *data)
187 {
188     nxt_job_t           *job;
189     nxt_work_handler_t  handler;
190 
191     job = obj;
192     handler = (nxt_work_handler_t) data;
193 
194     if (nxt_slow_path(job->cancel)) {
195         nxt_log_debug(thr->log, "%s cancellation", job->name);
196         handler = job->abort_handler;
197     }
198 
199     handler(thr, job, job->data);
200 }
201 
202 #endif
203