xref: /unit/src/nxt_job.c (revision 223:bf98efe2c55c)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data);
11 static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj,
12     void *data);
13 
14 
15 void *
nxt_job_create(nxt_mp_t * mp,size_t size)16 nxt_job_create(nxt_mp_t *mp, size_t size)
17 {
18     size_t     cache_size;
19     nxt_job_t  *job;
20 
21     if (mp == NULL) {
22         mp = nxt_mp_create(1024, 128, 256, 32);
23         if (nxt_slow_path(mp == NULL)) {
24             return NULL;
25         }
26 
27         job = nxt_mp_zget(mp, size);
28         cache_size = 0;
29 
30     } else {
31         job = nxt_mp_zalloc(mp, size);
32         cache_size = size;
33     }
34 
35     if (nxt_fast_path(job != NULL)) {
36         job->cache_size = (uint16_t) cache_size;
37         job->mem_pool = mp;
38         nxt_job_set_name(job, "job");
39     }
40 
41     /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
42     nxt_queue_self(&job->link);
43 
44     return job;
45 }
46 
47 
48 void
nxt_job_init(nxt_job_t * job,size_t size)49 nxt_job_init(nxt_job_t *job, size_t size)
50 {
51     nxt_memzero(job, size);
52 
53     nxt_job_set_name(job, "job");
54 
55     nxt_queue_self(&job->link);
56 }
57 
58 
59 void
nxt_job_destroy(nxt_task_t * task,void * data)60 nxt_job_destroy(nxt_task_t *task, void *data)
61 {
62     nxt_job_t  *job;
63 
64     job = data;
65 
66     nxt_queue_remove(&job->link);
67 
68     if (job->cache_size == 0) {
69 
70         if (job->mem_pool != NULL) {
71             nxt_mp_destroy(job->mem_pool);
72         }
73 
74     } else {
75         nxt_mp_free(job->mem_pool, job);
76     }
77 }
78 
79 
80 #if 0
81 
82 nxt_int_t
83 nxt_job_cleanup_add(nxt_mp_t *mp, nxt_job_t *job)
84 {
85     nxt_mem_pool_cleanup_t  *mpcl;
86 
87     mpcl = nxt_mem_pool_cleanup(mp, 0);
88 
89     if (nxt_fast_path(mpcl != NULL)) {
90         mpcl->handler = nxt_job_destroy;
91         mpcl->data = job;
92         return NXT_OK;
93     }
94 
95     return NXT_ERROR;
96 }
97 
98 #endif
99 
100 
101 /*
102  * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post()
103  * calls and to the "nxt_work_handler_t" are required by Sun C.
104  */
105 
106 void
nxt_job_start(nxt_task_t * task,nxt_job_t * job,nxt_work_handler_t handler)107 nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
108 {
109     nxt_debug(task, "%s start", job->name);
110 
111     if (job->thread_pool != NULL) {
112         nxt_int_t  ret;
113 
114         job->engine = task->thread->engine;
115 
116         nxt_work_set(&job->work, nxt_job_thread_trampoline,
117                      job->task, job, (void *) handler);
118 
119         ret = nxt_thread_pool_post(job->thread_pool, &job->work);
120 
121         if (ret == NXT_OK) {
122             return;
123         }
124 
125         handler = job->abort_handler;
126     }
127 
128     handler(job->task, job, job->data);
129 }
130 
131 
132 /* A trampoline function is called by a thread pool thread. */
133 
134 static void
nxt_job_thread_trampoline(nxt_task_t * task,void * obj,void * data)135 nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data)
136 {
137     nxt_job_t           *job;
138     nxt_work_handler_t  handler;
139 
140     job = obj;
141     handler = (nxt_work_handler_t) data;
142 
143     nxt_debug(task, "%s thread", job->name);
144 
145     if (nxt_slow_path(job->cancel)) {
146         nxt_job_return(task, job, job->abort_handler);
147 
148     } else {
149         handler(job->task, job, job->data);
150     }
151 }
152 
153 
154 void
nxt_job_return(nxt_task_t * task,nxt_job_t * job,nxt_work_handler_t handler)155 nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
156 {
157     nxt_debug(task, "%s return", job->name);
158 
159     if (job->engine != NULL) {
160         /* A return function is called in thread pool thread context. */
161 
162         nxt_work_set(&job->work, nxt_job_thread_return_handler,
163                      job->task, job, (void *) handler);
164 
165         nxt_event_engine_post(job->engine, &job->work);
166 
167         return;
168     }
169 
170     if (nxt_slow_path(job->cancel)) {
171         nxt_debug(task, "%s cancellation", job->name);
172         handler = job->abort_handler;
173     }
174 
175     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
176                        handler, job->task, job, job->data);
177 }
178 
179 
180 static void
nxt_job_thread_return_handler(nxt_task_t * task,void * obj,void * data)181 nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data)
182 {
183     nxt_job_t           *job;
184     nxt_work_handler_t  handler;
185 
186     job = obj;
187     handler = (nxt_work_handler_t) data;
188 
189     job->task->thread = task->thread;
190 
191     if (nxt_slow_path(job->cancel)) {
192         nxt_debug(task, "%s cancellation", job->name);
193         handler = job->abort_handler;
194     }
195 
196     handler(job->task, job, job->data);
197 }
198