xref: /unit/src/nxt_buf.c (revision 352:47649fbbcb53)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static void nxt_buf_completion(nxt_task_t *task, void *obj, void *data);
11 static void nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data);
12 
13 
14 typedef struct {
15     nxt_work_t          work;
16     nxt_event_engine_t  *engine;
17 } nxt_buf_ts_t;
18 
19 
20 void
21 nxt_buf_mem_init(nxt_buf_t *b, void *start, size_t size)
22 {
23     b->mem.start = start;
24     b->mem.pos = start;
25     b->mem.free = start;
26     b->mem.end = nxt_pointer_to(start, size);
27 }
28 
29 
30 nxt_buf_t *
31 nxt_buf_mem_alloc(nxt_mp_t *mp, size_t size, nxt_uint_t flags)
32 {
33     nxt_buf_t  *b;
34 
35     b = nxt_mp_alloc(mp, NXT_BUF_MEM_SIZE + size);
36     if (nxt_slow_path(b == NULL)) {
37         return NULL;
38     }
39 
40     nxt_memzero(b, NXT_BUF_MEM_SIZE);
41 
42     b->data = mp;
43     b->completion_handler = nxt_buf_completion;
44 
45     if (size != 0) {
46         b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE);
47         b->mem.pos = b->mem.start;
48         b->mem.free = b->mem.start;
49         b->mem.end = b->mem.start + size;
50     }
51 
52     return b;
53 }
54 
55 
56 nxt_buf_t *
57 nxt_buf_mem_ts_alloc(nxt_task_t *task, nxt_mp_t *mp, size_t size)
58 {
59     nxt_buf_t     *b;
60     nxt_buf_ts_t  *ts;
61 
62     b = nxt_mp_retain(mp, NXT_BUF_MEM_SIZE + sizeof(nxt_buf_ts_t) + size);
63     if (nxt_slow_path(b == NULL)) {
64         return NULL;
65     }
66 
67     nxt_memzero(b, NXT_BUF_MEM_SIZE + sizeof(nxt_buf_ts_t));
68 
69     b->data = mp;
70     b->completion_handler = nxt_buf_ts_completion;
71     b->is_ts = 1;
72 
73     if (size != 0) {
74         b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE +
75                                       sizeof(nxt_buf_ts_t));
76         b->mem.pos = b->mem.start;
77         b->mem.free = b->mem.start;
78         b->mem.end = b->mem.start + size;
79     }
80 
81     ts = nxt_pointer_to(b, NXT_BUF_MEM_SIZE);
82     ts->engine = task->thread->engine;
83 
84     ts->work.handler = nxt_buf_ts_completion;
85     ts->work.task = task;
86     ts->work.obj = b;
87     ts->work.data = b->parent;
88 
89     return b;
90 }
91 
92 
93 nxt_buf_t *
94 nxt_buf_file_alloc(nxt_mp_t *mp, size_t size, nxt_uint_t flags)
95 {
96     nxt_buf_t  *b;
97 
98     b = nxt_mp_alloc(mp, NXT_BUF_FILE_SIZE + size);
99     if (nxt_slow_path(b == NULL)) {
100         return NULL;
101     }
102 
103     nxt_memzero(b, NXT_BUF_FILE_SIZE);
104 
105     b->data = mp;
106     b->completion_handler = nxt_buf_completion;
107     nxt_buf_set_file(b);
108 
109     if (size != 0) {
110         b->mem.start = nxt_pointer_to(b, NXT_BUF_FILE_SIZE);
111         b->mem.pos = b->mem.start;
112         b->mem.free = b->mem.start;
113         b->mem.end = b->mem.start + size;
114     }
115 
116     return b;
117 }
118 
119 
120 nxt_buf_t *
121 nxt_buf_mmap_alloc(nxt_mp_t *mp, size_t size)
122 {
123     nxt_buf_t  *b;
124 
125     b = nxt_mp_zalloc(mp, NXT_BUF_MMAP_SIZE);
126 
127     if (nxt_fast_path(b != NULL)) {
128         b->data = mp;
129         b->completion_handler = nxt_buf_completion;
130 
131         nxt_buf_set_file(b);
132         nxt_buf_set_mmap(b);
133         nxt_buf_mem_set_size(&b->mem, size);
134     }
135 
136     return b;
137 }
138 
139 
140 nxt_buf_t *
141 nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags)
142 {
143     nxt_buf_t  *b;
144 
145     b = nxt_mp_zalloc(mp, NXT_BUF_SYNC_SIZE);
146 
147     if (nxt_fast_path(b != NULL)) {
148         b->data = mp;
149         b->completion_handler = nxt_buf_completion;
150 
151         nxt_buf_set_sync(b);
152         b->is_nobuf = ((flags & NXT_BUF_SYNC_NOBUF) != 0);
153         b->is_flush = ((flags & NXT_BUF_SYNC_FLUSH) != 0);
154         b->is_last = ((flags & NXT_BUF_SYNC_LAST) != 0);
155     }
156 
157     return b;
158 }
159 
160 
161 void
162 nxt_buf_chain_add(nxt_buf_t **head, nxt_buf_t *in)
163 {
164     nxt_buf_t  *b, **prev;
165 
166     prev = head;
167 
168     for (b = *head; b != NULL; b = b->next) {
169         prev = &b->next;
170     }
171 
172     *prev = in;
173 }
174 
175 
176 size_t
177 nxt_buf_chain_length(nxt_buf_t *b)
178 {
179     size_t  length;
180 
181     length = 0;
182 
183     while (b != NULL) {
184         length += b->mem.free - b->mem.pos;
185         b = b->next;
186     }
187 
188     return length;
189 }
190 
191 
192 static void
193 nxt_buf_completion(nxt_task_t *task, void *obj, void *data)
194 {
195     nxt_mp_t   *mp;
196     nxt_buf_t  *b, *parent;
197 
198     b = obj;
199     parent = data;
200 
201     nxt_debug(task, "buf completion: %p %p", b, b->mem.start);
202 
203 #if (NXT_DEBUG)
204     if (nxt_slow_path(data != b->parent)) {
205         nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
206                       data, b->parent);
207         nxt_abort();
208     }
209 #endif
210 
211     mp = b->data;
212     nxt_mp_free(mp, b);
213 
214     if (parent != NULL) {
215         nxt_debug(task, "parent retain:%uD", parent->retain);
216 
217         parent->retain--;
218 
219         if (parent->retain == 0) {
220             parent->mem.pos = parent->mem.free;
221 
222             parent->completion_handler(task, parent, parent->parent);
223         }
224     }
225 }
226 
227 
228 nxt_int_t
229 nxt_buf_ts_handle(nxt_task_t *task, void *obj, void *data)
230 {
231     nxt_buf_t     *b;
232     nxt_buf_ts_t  *ts;
233 
234     b = obj;
235 
236 #if (NXT_DEBUG)
237     if (nxt_slow_path(b->is_ts == 0)) {
238         nxt_log_alert(task->log, "not a thread safe buf (%p) completed", b);
239         nxt_abort();
240     }
241 #endif
242 
243     ts = nxt_pointer_to(b, NXT_BUF_MEM_SIZE);
244 
245     if (ts->engine != task->thread->engine) {
246 
247         nxt_debug(task, "buf ts: %p current engine is %p, expected %p",
248                   b, task->thread->engine, ts->engine);
249 
250         ts->work.handler = b->completion_handler;
251         ts->work.obj = obj;
252         ts->work.data = data;
253 
254         nxt_event_engine_post(ts->engine, &ts->work);
255 
256         return 1;
257     }
258 
259     return 0;
260 }
261 
262 
263 static void
264 nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data)
265 {
266     nxt_mp_t   *mp;
267     nxt_buf_t  *b, *parent;
268 
269     b = obj;
270     parent = data;
271 
272     if (nxt_buf_ts_handle(task, obj, data)) {
273         return;
274     }
275 
276     nxt_debug(task, "buf ts completion: %p %p", b, b->mem.start);
277 
278 #if (NXT_DEBUG)
279     if (nxt_slow_path(data != b->parent)) {
280         nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
281                       data, b->parent);
282         nxt_abort();
283     }
284 #endif
285 
286     mp = b->data;
287     nxt_mp_release(mp, b);
288 
289     if (parent != NULL) {
290         nxt_debug(task, "parent retain:%uD", parent->retain);
291 
292         parent->retain--;
293 
294         if (parent->retain == 0) {
295             parent->mem.pos = parent->mem.free;
296 
297             parent->completion_handler(task, parent, parent->parent);
298         }
299     }
300 }
301 
302 
303 nxt_buf_t *
304 nxt_buf_make_plain(nxt_mp_t *mp, nxt_buf_t *src, size_t size)
305 {
306     nxt_buf_t  *b, *i;
307 
308     if (nxt_slow_path(size == 0)) {
309         for (i = src; i != NULL; i = i->next) {
310             size += nxt_buf_used_size(i);
311         }
312     }
313 
314     b = nxt_buf_mem_alloc(mp, size, 0);
315 
316     if (nxt_slow_path(b == NULL)) {
317         return NULL;
318     }
319 
320     for (i = src; i != NULL; i = i->next) {
321         if (nxt_slow_path(nxt_buf_mem_free_size(&b->mem) <
322                           nxt_buf_used_size(i))) {
323             break;
324         }
325 
326         b->mem.free = nxt_cpymem(b->mem.free, i->mem.pos, nxt_buf_used_size(i));
327     }
328 
329     return b;
330 }
331