nxt_job.c (0:a63ceefd6ab0) nxt_job.c (1:fdc027c56872)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10#if (NXT_THREADS)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10#if (NXT_THREADS)
11static void nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data);
12static void nxt_job_thread_return_handler(nxt_thread_t *thr, void *obj,
11static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data);
12static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj,
13 void *data);
14#endif
15
16
17void *
18nxt_job_create(nxt_mem_pool_t *mp, size_t size)
19{
20 size_t cache_size;
21 nxt_job_t *job;
22
23 if (mp == NULL) {
24 mp = nxt_mem_pool_create(256);
25
26 if (nxt_slow_path(mp == NULL)) {
27 return NULL;
28 }
29
30 job = nxt_mem_zalloc(mp, size);
31 cache_size = 0;
32
33 } else {
34 job = nxt_mem_cache_zalloc0(mp, size);
35 cache_size = size;
36 }
37
38 if (nxt_fast_path(job != NULL)) {
39 job->cache_size = (uint16_t) cache_size;
40 job->mem_pool = mp;
41 nxt_job_set_name(job, "job");
42 }
43
44 /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
45 nxt_queue_self(&job->link);
46
13 void *data);
14#endif
15
16
17void *
18nxt_job_create(nxt_mem_pool_t *mp, size_t size)
19{
20 size_t cache_size;
21 nxt_job_t *job;
22
23 if (mp == NULL) {
24 mp = nxt_mem_pool_create(256);
25
26 if (nxt_slow_path(mp == NULL)) {
27 return NULL;
28 }
29
30 job = nxt_mem_zalloc(mp, size);
31 cache_size = 0;
32
33 } else {
34 job = nxt_mem_cache_zalloc0(mp, size);
35 cache_size = size;
36 }
37
38 if (nxt_fast_path(job != NULL)) {
39 job->cache_size = (uint16_t) cache_size;
40 job->mem_pool = mp;
41 nxt_job_set_name(job, "job");
42 }
43
44 /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
45 nxt_queue_self(&job->link);
46
47 job->task.ident = nxt_task_next_ident();
48
47 return job;
48}
49
50
51void
52nxt_job_init(nxt_job_t *job, size_t size)
53{
54 nxt_memzero(job, size);
55
56 nxt_job_set_name(job, "job");
57
58 nxt_queue_self(&job->link);
49 return job;
50}
51
52
53void
54nxt_job_init(nxt_job_t *job, size_t size)
55{
56 nxt_memzero(job, size);
57
58 nxt_job_set_name(job, "job");
59
60 nxt_queue_self(&job->link);
61
62 job->task.ident = nxt_task_next_ident();
59}
60
61
62void
63nxt_job_destroy(void *data)
64{
65 nxt_job_t *job;
66
67 job = data;
68
69 nxt_queue_remove(&job->link);
70
71 if (job->cache_size == 0) {
72
73 if (job->mem_pool != NULL) {
74 nxt_mem_pool_destroy(job->mem_pool);
75 }
76
77 } else {
78 nxt_mem_cache_free0(job->mem_pool, job, job->cache_size);
79 }
80}
81
82
83nxt_int_t
84nxt_job_cleanup_add(nxt_mem_pool_t *mp, nxt_job_t *job)
85{
86 nxt_mem_pool_cleanup_t *mpcl;
87
88 mpcl = nxt_mem_pool_cleanup(mp, 0);
89
90 if (nxt_fast_path(mpcl != NULL)) {
91 mpcl->handler = nxt_job_destroy;
92 mpcl->data = job;
93 return NXT_OK;
94 }
95
96 return NXT_ERROR;
97}
98
99
100/*
101 * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post()
102 * calls and to the "nxt_work_handler_t" are required by Sun C.
103 */
104
105void
63}
64
65
66void
67nxt_job_destroy(void *data)
68{
69 nxt_job_t *job;
70
71 job = data;
72
73 nxt_queue_remove(&job->link);
74
75 if (job->cache_size == 0) {
76
77 if (job->mem_pool != NULL) {
78 nxt_mem_pool_destroy(job->mem_pool);
79 }
80
81 } else {
82 nxt_mem_cache_free0(job->mem_pool, job, job->cache_size);
83 }
84}
85
86
87nxt_int_t
88nxt_job_cleanup_add(nxt_mem_pool_t *mp, nxt_job_t *job)
89{
90 nxt_mem_pool_cleanup_t *mpcl;
91
92 mpcl = nxt_mem_pool_cleanup(mp, 0);
93
94 if (nxt_fast_path(mpcl != NULL)) {
95 mpcl->handler = nxt_job_destroy;
96 mpcl->data = job;
97 return NXT_OK;
98 }
99
100 return NXT_ERROR;
101}
102
103
104/*
105 * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post()
106 * calls and to the "nxt_work_handler_t" are required by Sun C.
107 */
108
109void
106nxt_job_start(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler)
110nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
107{
111{
108 nxt_log_debug(thr->log, "%s start", job->name);
112 nxt_debug(task, "%s start", job->name);
109
110#if (NXT_THREADS)
111
112 if (job->thread_pool != NULL) {
113 nxt_int_t ret;
114
113
114#if (NXT_THREADS)
115
116 if (job->thread_pool != NULL) {
117 nxt_int_t ret;
118
115 job->engine = thr->engine;
119 job->engine = task->thread->engine;
120
116 ret = nxt_thread_pool_post(job->thread_pool, nxt_job_thread_trampoline,
121 ret = nxt_thread_pool_post(job->thread_pool, nxt_job_thread_trampoline,
117 job, (void *) handler, job->log);
122 &job->task, job, (void *) handler);
118 if (ret == NXT_OK) {
119 return;
120 }
121
122 handler = job->abort_handler;
123 }
124
125#endif
126
123 if (ret == NXT_OK) {
124 return;
125 }
126
127 handler = job->abort_handler;
128 }
129
130#endif
131
127 handler(thr, job, job->data);
132 handler(&job->task, job, job->data);
128}
129
130
131#if (NXT_THREADS)
132
133/* A trampoline function is called by a thread pool thread. */
134
135static void
133}
134
135
136#if (NXT_THREADS)
137
138/* A trampoline function is called by a thread pool thread. */
139
140static void
136nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data)
141nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data)
137{
138 nxt_job_t *job;
139 nxt_work_handler_t handler;
140
141 job = obj;
142 handler = (nxt_work_handler_t) data;
143
142{
143 nxt_job_t *job;
144 nxt_work_handler_t handler;
145
146 job = obj;
147 handler = (nxt_work_handler_t) data;
148
144 nxt_log_debug(thr->log, "%s thread", job->name);
149 job->task.log = job->log;
145
150
151 nxt_debug(task, "%s thread", job->name);
152
146 if (nxt_slow_path(job->cancel)) {
153 if (nxt_slow_path(job->cancel)) {
147 nxt_job_return(thr, job, job->abort_handler);
154 nxt_job_return(task, job, job->abort_handler);
148
149 } else {
155
156 } else {
150 handler(thr, job, job->data);
157 handler(&job->task, job, job->data);
151 }
152}
153
154#endif
155
156
157void
158 }
159}
160
161#endif
162
163
164void
158nxt_job_return(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler)
165nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
159{
166{
160 nxt_log_debug(thr->log, "%s return", job->name);
167 nxt_debug(task, "%s return", job->name);
161
162#if (NXT_THREADS)
163
164 if (job->engine != NULL) {
165 /* A return function is called in thread pool thread context. */
166 nxt_event_engine_post(job->engine, nxt_job_thread_return_handler,
168
169#if (NXT_THREADS)
170
171 if (job->engine != NULL) {
172 /* A return function is called in thread pool thread context. */
173 nxt_event_engine_post(job->engine, nxt_job_thread_return_handler,
167 job, (void *) handler, job->log);
174 &job->task, job, (void *) handler, job->log);
168 return;
169 }
170
171#endif
172
173 if (nxt_slow_path(job->cancel)) {
175 return;
176 }
177
178#endif
179
180 if (nxt_slow_path(job->cancel)) {
174 nxt_log_debug(thr->log, "%s cancellation", job->name);
181 nxt_debug(task, "%s cancellation", job->name);
175 handler = job->abort_handler;
176 }
177
182 handler = job->abort_handler;
183 }
184
178 nxt_thread_work_queue_push(thr, &thr->work_queue.main,
179 handler, job, job->data, thr->log);
185 nxt_thread_work_queue_push(task->thread, &task->thread->work_queue.main,
186 handler, &job->task, job, job->data);
180}
181
182
183#if (NXT_THREADS)
184
185static void
187}
188
189
190#if (NXT_THREADS)
191
192static void
186nxt_job_thread_return_handler(nxt_thread_t *thr, void *obj, void *data)
193nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data)
187{
188 nxt_job_t *job;
189 nxt_work_handler_t handler;
190
191 job = obj;
192 handler = (nxt_work_handler_t) data;
193
194{
195 nxt_job_t *job;
196 nxt_work_handler_t handler;
197
198 job = obj;
199 handler = (nxt_work_handler_t) data;
200
201 job->task.thread = task->thread;
202
194 if (nxt_slow_path(job->cancel)) {
203 if (nxt_slow_path(job->cancel)) {
195 nxt_log_debug(thr->log, "%s cancellation", job->name);
204 nxt_debug(task, "%s cancellation", job->name);
196 handler = job->abort_handler;
197 }
198
205 handler = job->abort_handler;
206 }
207
199 handler(thr, job, job->data);
208 handler(&job->task, job, job->data);
200}
201
202#endif
209}
210
211#endif