xref: /unit/src/nxt_job.c (revision 1:fdc027c56872)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 #if (NXT_THREADS)
11 static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data);
12 static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj,
13     void *data);
14 #endif
15 
16 
17 void *
18 nxt_job_create(nxt_mem_pool_t *mp, size_t size)
19 {
20     size_t     cache_size;
21     nxt_job_t  *job;
22 
23     if (mp == NULL) {
24         mp = nxt_mem_pool_create(256);
25 
26         if (nxt_slow_path(mp == NULL)) {
27             return NULL;
28         }
29 
30         job = nxt_mem_zalloc(mp, size);
31         cache_size = 0;
32 
33     } else {
34         job = nxt_mem_cache_zalloc0(mp, size);
35         cache_size = size;
36     }
37 
38     if (nxt_fast_path(job != NULL)) {
39         job->cache_size = (uint16_t) cache_size;
40         job->mem_pool = mp;
41         nxt_job_set_name(job, "job");
42     }
43 
44     /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
45     nxt_queue_self(&job->link);
46 
47     job->task.ident = nxt_task_next_ident();
48 
49     return job;
50 }
51 
52 
53 void
54 nxt_job_init(nxt_job_t *job, size_t size)
55 {
56     nxt_memzero(job, size);
57 
58     nxt_job_set_name(job, "job");
59 
60     nxt_queue_self(&job->link);
61 
62     job->task.ident = nxt_task_next_ident();
63 }
64 
65 
66 void
67 nxt_job_destroy(void *data)
68 {
69     nxt_job_t  *job;
70 
71     job = data;
72 
73     nxt_queue_remove(&job->link);
74 
75     if (job->cache_size == 0) {
76 
77         if (job->mem_pool != NULL) {
78             nxt_mem_pool_destroy(job->mem_pool);
79         }
80 
81     } else {
82         nxt_mem_cache_free0(job->mem_pool, job, job->cache_size);
83     }
84 }
85 
86 
87 nxt_int_t
88 nxt_job_cleanup_add(nxt_mem_pool_t *mp, nxt_job_t *job)
89 {
90     nxt_mem_pool_cleanup_t  *mpcl;
91 
92     mpcl = nxt_mem_pool_cleanup(mp, 0);
93 
94     if (nxt_fast_path(mpcl != NULL)) {
95         mpcl->handler = nxt_job_destroy;
96         mpcl->data = job;
97         return NXT_OK;
98     }
99 
100     return NXT_ERROR;
101 }
102 
103 
104 /*
105  * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post()
106  * calls and to the "nxt_work_handler_t" are required by Sun C.
107  */
108 
109 void
110 nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
111 {
112     nxt_debug(task, "%s start", job->name);
113 
114 #if (NXT_THREADS)
115 
116     if (job->thread_pool != NULL) {
117         nxt_int_t  ret;
118 
119         job->engine = task->thread->engine;
120 
121         ret = nxt_thread_pool_post(job->thread_pool, nxt_job_thread_trampoline,
122                                    &job->task, job, (void *) handler);
123         if (ret == NXT_OK) {
124             return;
125         }
126 
127         handler = job->abort_handler;
128     }
129 
130 #endif
131 
132     handler(&job->task, job, job->data);
133 }
134 
135 
136 #if (NXT_THREADS)
137 
138 /* A trampoline function is called by a thread pool thread. */
139 
140 static void
141 nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data)
142 {
143     nxt_job_t           *job;
144     nxt_work_handler_t  handler;
145 
146     job = obj;
147     handler = (nxt_work_handler_t) data;
148 
149     job->task.log = job->log;
150 
151     nxt_debug(task, "%s thread", job->name);
152 
153     if (nxt_slow_path(job->cancel)) {
154         nxt_job_return(task, job, job->abort_handler);
155 
156     } else {
157         handler(&job->task, job, job->data);
158     }
159 }
160 
161 #endif
162 
163 
164 void
165 nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
166 {
167     nxt_debug(task, "%s return", job->name);
168 
169 #if (NXT_THREADS)
170 
171     if (job->engine != NULL) {
172         /* A return function is called in thread pool thread context. */
173         nxt_event_engine_post(job->engine, nxt_job_thread_return_handler,
174                               &job->task, job, (void *) handler, job->log);
175         return;
176     }
177 
178 #endif
179 
180     if (nxt_slow_path(job->cancel)) {
181         nxt_debug(task, "%s cancellation", job->name);
182         handler = job->abort_handler;
183     }
184 
185     nxt_thread_work_queue_push(task->thread, &task->thread->work_queue.main,
186                                handler, &job->task, job, job->data);
187 }
188 
189 
190 #if (NXT_THREADS)
191 
192 static void
193 nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data)
194 {
195     nxt_job_t           *job;
196     nxt_work_handler_t  handler;
197 
198     job = obj;
199     handler = (nxt_work_handler_t) data;
200 
201     job->task.thread = task->thread;
202 
203     if (nxt_slow_path(job->cancel)) {
204         nxt_debug(task, "%s cancellation", job->name);
205         handler = job->abort_handler;
206     }
207 
208     handler(&job->task, job, job->data);
209 }
210 
211 #endif
212