1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data);
11 static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj,
12 void *data);
13
14
15 void *
nxt_job_create(nxt_mp_t * mp,size_t size)16 nxt_job_create(nxt_mp_t *mp, size_t size)
17 {
18 size_t cache_size;
19 nxt_job_t *job;
20
21 if (mp == NULL) {
22 mp = nxt_mp_create(1024, 128, 256, 32);
23 if (nxt_slow_path(mp == NULL)) {
24 return NULL;
25 }
26
27 job = nxt_mp_zget(mp, size);
28 cache_size = 0;
29
30 } else {
31 job = nxt_mp_zalloc(mp, size);
32 cache_size = size;
33 }
34
35 if (nxt_slow_path(job == NULL)) {
36 return NULL;
37 }
38
39 job->cache_size = (uint16_t) cache_size;
40 job->mem_pool = mp;
41 nxt_job_set_name(job, "job");
42
43 /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
44 nxt_queue_self(&job->link);
45
46 return job;
47 }
48
49
50 void
nxt_job_init(nxt_job_t * job,size_t size)51 nxt_job_init(nxt_job_t *job, size_t size)
52 {
53 nxt_memzero(job, size);
54
55 nxt_job_set_name(job, "job");
56
57 nxt_queue_self(&job->link);
58 }
59
60
61 void
nxt_job_destroy(nxt_task_t * task,void * data)62 nxt_job_destroy(nxt_task_t *task, void *data)
63 {
64 nxt_job_t *job;
65
66 job = data;
67
68 nxt_queue_remove(&job->link);
69
70 if (job->cache_size == 0) {
71
72 if (job->mem_pool != NULL) {
73 nxt_mp_destroy(job->mem_pool);
74 }
75
76 } else {
77 nxt_mp_free(job->mem_pool, job);
78 }
79 }
80
81
82 #if 0
83
84 nxt_int_t
85 nxt_job_cleanup_add(nxt_mp_t *mp, nxt_job_t *job)
86 {
87 nxt_mem_pool_cleanup_t *mpcl;
88
89 mpcl = nxt_mem_pool_cleanup(mp, 0);
90
91 if (nxt_fast_path(mpcl != NULL)) {
92 mpcl->handler = nxt_job_destroy;
93 mpcl->data = job;
94 return NXT_OK;
95 }
96
97 return NXT_ERROR;
98 }
99
100 #endif
101
102
103 /*
104 * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post()
105 * calls and to the "nxt_work_handler_t" are required by Sun C.
106 */
107
108 void
nxt_job_start(nxt_task_t * task,nxt_job_t * job,nxt_work_handler_t handler)109 nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
110 {
111 nxt_debug(task, "%s start", job->name);
112
113 if (job->thread_pool != NULL) {
114 nxt_int_t ret;
115
116 job->engine = task->thread->engine;
117
118 nxt_work_set(&job->work, nxt_job_thread_trampoline,
119 job->task, job, (void *) handler);
120
121 ret = nxt_thread_pool_post(job->thread_pool, &job->work);
122
123 if (ret == NXT_OK) {
124 return;
125 }
126
127 handler = job->abort_handler;
128 }
129
130 handler(job->task, job, job->data);
131 }
132
133
134 /* A trampoline function is called by a thread pool thread. */
135
136 static void
nxt_job_thread_trampoline(nxt_task_t * task,void * obj,void * data)137 nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data)
138 {
139 nxt_job_t *job;
140 nxt_work_handler_t handler;
141
142 job = obj;
143 handler = (nxt_work_handler_t) data;
144
145 nxt_debug(task, "%s thread", job->name);
146
147 if (nxt_slow_path(job->cancel)) {
148 nxt_job_return(task, job, job->abort_handler);
149
150 } else {
151 handler(job->task, job, job->data);
152 }
153 }
154
155
156 void
nxt_job_return(nxt_task_t * task,nxt_job_t * job,nxt_work_handler_t handler)157 nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
158 {
159 nxt_debug(task, "%s return", job->name);
160
161 if (job->engine != NULL) {
162 /* A return function is called in thread pool thread context. */
163
164 nxt_work_set(&job->work, nxt_job_thread_return_handler,
165 job->task, job, (void *) handler);
166
167 nxt_event_engine_post(job->engine, &job->work);
168
169 return;
170 }
171
172 if (nxt_slow_path(job->cancel)) {
173 nxt_debug(task, "%s cancellation", job->name);
174 handler = job->abort_handler;
175 }
176
177 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
178 handler, job->task, job, job->data);
179 }
180
181
182 static void
nxt_job_thread_return_handler(nxt_task_t * task,void * obj,void * data)183 nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data)
184 {
185 nxt_job_t *job;
186 nxt_work_handler_t handler;
187
188 job = obj;
189 handler = (nxt_work_handler_t) data;
190
191 job->task->thread = task->thread;
192
193 if (nxt_slow_path(job->cancel)) {
194 nxt_debug(task, "%s cancellation", job->name);
195 handler = job->abort_handler;
196 }
197
198 handler(job->task, job, job->data);
199 }
200