nxt_port_memory.c (364:742e5c203c6d) nxt_port_memory.c (365:28b2a468be43)
1
2/*
3 * Copyright (C) Max Romanov
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9#if (NXT_HAVE_MEMFD_CREATE)
10
11#include <linux/memfd.h>
12#include <unistd.h>
13#include <sys/syscall.h>
14
15#endif
16
17#include <nxt_port_memory_int.h>
18
1
2/*
3 * Copyright (C) Max Romanov
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9#if (NXT_HAVE_MEMFD_CREATE)
10
11#include <linux/memfd.h>
12#include <unistd.h>
13#include <sys/syscall.h>
14
15#endif
16
17#include <nxt_port_memory_int.h>
18
19
19nxt_inline void
20nxt_inline void
20nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
21nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
21{
22{
22 if (port_mmap->hdr != NULL) {
23 nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE);
24 port_mmap->hdr = NULL;
23 int c;
24
25 c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
26
27 if (i < 0 && c == -i) {
28 if (mmap_handler->hdr != NULL) {
29 nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
30 mmap_handler->hdr = NULL;
31 }
32
33 nxt_free(mmap_handler);
25 }
26}
27
28
29static nxt_port_mmap_t *
30nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
31{
32 uint32_t cap;

--- 32 unchanged lines hidden (view full) ---

65 port_mmaps->size = i + 1;
66 }
67
68 return port_mmaps->elts + i;
69}
70
71
72void
34 }
35}
36
37
38static nxt_port_mmap_t *
39nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
40{
41 uint32_t cap;

--- 32 unchanged lines hidden (view full) ---

74 port_mmaps->size = i + 1;
75 }
76
77 return port_mmaps->elts + i;
78}
79
80
81void
73nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free)
82nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
74{
75 uint32_t i;
76 nxt_port_mmap_t *port_mmap;
77
78 if (port_mmaps == NULL) {
79 return;
80 }
81
82 port_mmap = port_mmaps->elts;
83
84 for (i = 0; i < port_mmaps->size; i++) {
83{
84 uint32_t i;
85 nxt_port_mmap_t *port_mmap;
86
87 if (port_mmaps == NULL) {
88 return;
89 }
90
91 port_mmap = port_mmaps->elts;
92
93 for (i = 0; i < port_mmaps->size; i++) {
85 nxt_port_mmap_destroy(port_mmap + i);
94 nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
86 }
87
88 port_mmaps->size = 0;
89
95 }
96
97 port_mmaps->size = 0;
98
90 if (free != 0) {
99 if (free_elts != 0) {
91 nxt_free(port_mmaps->elts);
92 }
93}
94
95
96#define nxt_port_mmap_free_junk(p, size) \
97 memset((p), 0xA5, size)
98
99
100static void
101nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
102{
100 nxt_free(port_mmaps->elts);
101 }
102}
103
104
105#define nxt_port_mmap_free_junk(p, size) \
106 memset((p), 0xA5, size)
107
108
109static void
110nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
111{
103 u_char *p;
104 nxt_mp_t *mp;
105 nxt_buf_t *b;
106 nxt_chunk_id_t c;
107 nxt_port_mmap_header_t *hdr;
112 u_char *p;
113 nxt_mp_t *mp;
114 nxt_buf_t *b;
115 nxt_chunk_id_t c;
116 nxt_port_mmap_header_t *hdr;
117 nxt_port_mmap_handler_t *mmap_handler;
108
109 if (nxt_buf_ts_handle(task, obj, data)) {
110 return;
111 }
112
113 b = obj;
114
115 mp = b->data;
116
117#if (NXT_DEBUG)
118 if (nxt_slow_path(data != b->parent)) {
119 nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
120 data, b->parent);
121 nxt_abort();
122 }
123#endif
124
118
119 if (nxt_buf_ts_handle(task, obj, data)) {
120 return;
121 }
122
123 b = obj;
124
125 mp = b->data;
126
127#if (NXT_DEBUG)
128 if (nxt_slow_path(data != b->parent)) {
129 nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
130 data, b->parent);
131 nxt_abort();
132 }
133#endif
134
125 hdr = data;
135 mmap_handler = data;
136 hdr = mmap_handler->hdr;
126
127 if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
128 nxt_debug(task, "mmap buf completion: mmap for other process pair "
129 "%PI->%PI", hdr->src_pid, hdr->dst_pid);
130
131 goto release_buf;
132 }
133

--- 21 unchanged lines hidden (view full) ---

155 nxt_port_mmap_set_chunk_free(hdr, c);
156
157 p += PORT_MMAP_CHUNK_SIZE;
158 c++;
159 }
160
161release_buf:
162
137
138 if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
139 nxt_debug(task, "mmap buf completion: mmap for other process pair "
140 "%PI->%PI", hdr->src_pid, hdr->dst_pid);
141
142 goto release_buf;
143 }
144

--- 21 unchanged lines hidden (view full) ---

166 nxt_port_mmap_set_chunk_free(hdr, c);
167
168 p += PORT_MMAP_CHUNK_SIZE;
169 c++;
170 }
171
172release_buf:
173
174 nxt_port_mmap_handler_use(mmap_handler, -1);
175
163 nxt_mp_release(mp, b);
164}
165
166
176 nxt_mp_release(mp, b);
177}
178
179
167nxt_port_mmap_header_t *
180nxt_port_mmap_handler_t *
168nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
169 nxt_fd_t fd)
170{
181nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
182 nxt_fd_t fd)
183{
171 void *mem;
172 struct stat mmap_stat;
173 nxt_port_mmap_t *port_mmap;
174 nxt_port_mmap_header_t *hdr;
184 void *mem;
185 struct stat mmap_stat;
186 nxt_port_mmap_t *port_mmap;
187 nxt_port_mmap_header_t *hdr;
188 nxt_port_mmap_handler_t *mmap_handler;
175
176 nxt_debug(task, "got new mmap fd #%FD from process %PI",
177 fd, process->pid);
178
179 port_mmap = NULL;
180 hdr = NULL;
181
182 if (fstat(fd, &mmap_stat) == -1) {

--- 8 unchanged lines hidden (view full) ---

191 if (nxt_slow_path(mem == MAP_FAILED)) {
192 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
193
194 return NULL;
195 }
196
197 hdr = mem;
198
189
190 nxt_debug(task, "got new mmap fd #%FD from process %PI",
191 fd, process->pid);
192
193 port_mmap = NULL;
194 hdr = NULL;
195
196 if (fstat(fd, &mmap_stat) == -1) {

--- 8 unchanged lines hidden (view full) ---

205 if (nxt_slow_path(mem == MAP_FAILED)) {
206 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
207
208 return NULL;
209 }
210
211 hdr = mem;
212
213 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
214 if (nxt_slow_path(mmap_handler == NULL)) {
215 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
216
217 return NULL;
218 }
219
220 mmap_handler->hdr = hdr;
221
199 nxt_thread_mutex_lock(&process->incoming.mutex);
200
201 port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
202 if (nxt_slow_path(port_mmap == NULL)) {
203 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
204
205 nxt_mem_munmap(mem, PORT_MMAP_SIZE);
206 hdr = NULL;
207
222 nxt_thread_mutex_lock(&process->incoming.mutex);
223
224 port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
225 if (nxt_slow_path(port_mmap == NULL)) {
226 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
227
228 nxt_mem_munmap(mem, PORT_MMAP_SIZE);
229 hdr = NULL;
230
231 nxt_free(mmap_handler);
232 mmap_handler = NULL;
233
208 goto fail;
209 }
210
211 nxt_assert(hdr->src_pid == process->pid);
212 nxt_assert(hdr->dst_pid == nxt_pid);
213
234 goto fail;
235 }
236
237 nxt_assert(hdr->src_pid == process->pid);
238 nxt_assert(hdr->dst_pid == nxt_pid);
239
214 port_mmap->hdr = hdr;
240 port_mmap->mmap_handler = mmap_handler;
241 nxt_port_mmap_handler_use(mmap_handler, 1);
215
216 hdr->sent_over = 0xFFFFu;
217
218fail:
219
220 nxt_thread_mutex_unlock(&process->incoming.mutex);
221
242
243 hdr->sent_over = 0xFFFFu;
244
245fail:
246
247 nxt_thread_mutex_unlock(&process->incoming.mutex);
248
222 return hdr;
249 return mmap_handler;
223}
224
225
250}
251
252
226static nxt_port_mmap_header_t *
253static nxt_port_mmap_handler_t *
227nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
228 nxt_port_t *port)
229{
254nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
255 nxt_port_t *port)
256{
230 void *mem;
231 u_char *p, name[64];
232 nxt_fd_t fd;
233 nxt_port_mmap_t *port_mmap;
234 nxt_port_mmap_header_t *hdr;
257 void *mem;
258 u_char *p, name[64];
259 nxt_fd_t fd;
260 nxt_port_mmap_t *port_mmap;
261 nxt_port_mmap_header_t *hdr;
262 nxt_port_mmap_handler_t *mmap_handler;
235
263
264 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
265 if (nxt_slow_path(mmap_handler == NULL)) {
266 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
267
268 return NULL;
269 }
270
236 port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
237 if (nxt_slow_path(port_mmap == NULL)) {
238 nxt_log(task, NXT_LOG_WARN,
239 "failed to add port mmap to outgoing array");
240
271 port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
272 if (nxt_slow_path(port_mmap == NULL)) {
273 nxt_log(task, NXT_LOG_WARN,
274 "failed to add port mmap to outgoing array");
275
276 nxt_free(mmap_handler);
241 return NULL;
242 }
243
244 p = nxt_sprintf(name, name + sizeof(name), "/unit.%PI.%uxD",
245 nxt_pid, nxt_random(&task->thread->random));
246 *p = '\0';
247
248#if (NXT_HAVE_MEMFD_CREATE)

--- 39 unchanged lines hidden (view full) ---

288
289 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
290 MAP_SHARED, fd, 0);
291
292 if (nxt_slow_path(mem == MAP_FAILED)) {
293 goto remove_fail;
294 }
295
277 return NULL;
278 }
279
280 p = nxt_sprintf(name, name + sizeof(name), "/unit.%PI.%uxD",
281 nxt_pid, nxt_random(&task->thread->random));
282 *p = '\0';
283
284#if (NXT_HAVE_MEMFD_CREATE)

--- 39 unchanged lines hidden (view full) ---

324
325 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
326 MAP_SHARED, fd, 0);
327
328 if (nxt_slow_path(mem == MAP_FAILED)) {
329 goto remove_fail;
330 }
331
296 port_mmap->hdr = mem;
332 mmap_handler->hdr = mem;
333 port_mmap->mmap_handler = mmap_handler;
334 nxt_port_mmap_handler_use(mmap_handler, 1);
297
298 /* Init segment header. */
335
336 /* Init segment header. */
299 hdr = port_mmap->hdr;
337 hdr = mmap_handler->hdr;
300
301 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
302
303 hdr->id = process->outgoing.size - 1;
304 hdr->src_pid = nxt_pid;
305 hdr->dst_pid = process->pid;
306 hdr->sent_over = port->id;
307

--- 6 unchanged lines hidden (view full) ---

314 nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
315
316 /* TODO handle error */
317 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
318
319 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
320 hdr->id, nxt_pid, process->pid);
321
338
339 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
340
341 hdr->id = process->outgoing.size - 1;
342 hdr->src_pid = nxt_pid;
343 hdr->dst_pid = process->pid;
344 hdr->sent_over = port->id;
345

--- 6 unchanged lines hidden (view full) ---

352 nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
353
354 /* TODO handle error */
355 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
356
357 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
358 hdr->id, nxt_pid, process->pid);
359
322 return hdr;
360 return mmap_handler;
323
324remove_fail:
325
361
362remove_fail:
363
364 nxt_free(mmap_handler);
365
326 process->outgoing.size--;
327
328 return NULL;
329}
330
331
366 process->outgoing.size--;
367
368 return NULL;
369}
370
371
332static nxt_port_mmap_header_t *
372static nxt_port_mmap_handler_t *
333nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
334 size_t size)
335{
373nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
374 size_t size)
375{
336 nxt_process_t *process;
337 nxt_port_mmap_t *port_mmap;
338 nxt_port_mmap_t *end_port_mmap;
339 nxt_port_mmap_header_t *hdr;
376 nxt_process_t *process;
377 nxt_port_mmap_t *port_mmap;
378 nxt_port_mmap_t *end_port_mmap;
379 nxt_port_mmap_header_t *hdr;
380 nxt_port_mmap_handler_t *mmap_handler;
340
341 process = port->process;
342 if (nxt_slow_path(process == NULL)) {
343 return NULL;
344 }
345
346 *c = 0;
381
382 process = port->process;
383 if (nxt_slow_path(process == NULL)) {
384 return NULL;
385 }
386
387 *c = 0;
347 port_mmap = NULL;
348 hdr = NULL;
349
350 nxt_thread_mutex_lock(&process->outgoing.mutex);
351
388
389 nxt_thread_mutex_lock(&process->outgoing.mutex);
390
352 port_mmap = process->outgoing.elts;
353 end_port_mmap = port_mmap + process->outgoing.size;
391 end_port_mmap = process->outgoing.elts + process->outgoing.size;
354
392
355 while (port_mmap < end_port_mmap) {
393 for (port_mmap = process->outgoing.elts;
394 port_mmap < end_port_mmap;
395 port_mmap++)
396 {
397 mmap_handler = port_mmap->mmap_handler;
398 hdr = mmap_handler->hdr;
356
399
357 if ( (port_mmap->hdr->sent_over == 0xFFFFu ||
358 port_mmap->hdr->sent_over == port->id) &&
359 nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) {
360 hdr = port_mmap->hdr;
400 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) {
401 continue;
402 }
361
403
404 if (nxt_port_mmap_get_free_chunk(hdr, c)) {
362 goto unlock_return;
363 }
405 goto unlock_return;
406 }
364
365 port_mmap++;
366 }
367
368 /* TODO introduce port_mmap limit and release wait. */
369
407 }
408
409 /* TODO introduce port_mmap limit and release wait. */
410
370 hdr = nxt_port_new_port_mmap(task, process, port);
411 mmap_handler = nxt_port_new_port_mmap(task, process, port);
371
372unlock_return:
373
374 nxt_thread_mutex_unlock(&process->outgoing.mutex);
375
412
413unlock_return:
414
415 nxt_thread_mutex_unlock(&process->outgoing.mutex);
416
376 return hdr;
417 return mmap_handler;
377}
378
379
418}
419
420
380static nxt_port_mmap_header_t *
421static nxt_port_mmap_handler_t *
381nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
382{
422nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
423{
383 nxt_process_t *process;
384 nxt_port_mmap_header_t *hdr;
424 nxt_process_t *process;
425 nxt_port_mmap_handler_t *mmap_handler;
385
386 process = nxt_runtime_process_find(task->thread->runtime, spid);
387 if (nxt_slow_path(process == NULL)) {
388 return NULL;
389 }
390
426
427 process = nxt_runtime_process_find(task->thread->runtime, spid);
428 if (nxt_slow_path(process == NULL)) {
429 return NULL;
430 }
431
391 hdr = NULL;
432 mmap_handler = NULL;
392
393 nxt_thread_mutex_lock(&process->incoming.mutex);
394
395 if (nxt_fast_path(process->incoming.size > id)) {
433
434 nxt_thread_mutex_lock(&process->incoming.mutex);
435
436 if (nxt_fast_path(process->incoming.size > id)) {
396 hdr = process->incoming.elts[id].hdr;
397
398 } else {
399 nxt_log(task, NXT_LOG_WARN,
400 "failed to get incoming mmap #%d for process %PI", id, spid);
437 mmap_handler = process->incoming.elts[id].mmap_handler;
401 }
402
403 nxt_thread_mutex_unlock(&process->incoming.mutex);
404
438 }
439
440 nxt_thread_mutex_unlock(&process->incoming.mutex);
441
405 return hdr;
442 return mmap_handler;
406}
407
408
409nxt_buf_t *
410nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
411{
443}
444
445
446nxt_buf_t *
447nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
448{
412 size_t nchunks;
413 nxt_buf_t *b;
414 nxt_chunk_id_t c;
415 nxt_port_mmap_header_t *hdr;
449 size_t nchunks;
450 nxt_buf_t *b;
451 nxt_chunk_id_t c;
452 nxt_port_mmap_header_t *hdr;
453 nxt_port_mmap_handler_t *mmap_handler;
416
417 nxt_debug(task, "request %z bytes shm buffer", size);
418
419 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
420 if (nxt_slow_path(b == NULL)) {
421 return NULL;
422 }
423
424 b->completion_handler = nxt_port_mmap_buf_completion;
425 nxt_buf_set_port_mmap(b);
426
454
455 nxt_debug(task, "request %z bytes shm buffer", size);
456
457 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
458 if (nxt_slow_path(b == NULL)) {
459 return NULL;
460 }
461
462 b->completion_handler = nxt_port_mmap_buf_completion;
463 nxt_buf_set_port_mmap(b);
464
427 hdr = nxt_port_mmap_get(task, port, &c, size);
428 if (nxt_slow_path(hdr == NULL)) {
465 mmap_handler = nxt_port_mmap_get(task, port, &c, size);
466 if (nxt_slow_path(mmap_handler == NULL)) {
429 nxt_mp_release(task->thread->engine->mem_pool, b);
430 return NULL;
431 }
432
467 nxt_mp_release(task->thread->engine->mem_pool, b);
468 return NULL;
469 }
470
433 b->parent = hdr;
471 b->parent = mmap_handler;
434
472
473 nxt_port_mmap_handler_use(mmap_handler, 1);
474
475 hdr = mmap_handler->hdr;
476
435 b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
436 b->mem.pos = b->mem.start;
437 b->mem.free = b->mem.start;
438 b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
439
440 nchunks = size / PORT_MMAP_CHUNK_SIZE;
441 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
442 nchunks++;

--- 21 unchanged lines hidden (view full) ---

464 return b;
465}
466
467
468nxt_int_t
469nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
470 size_t min_size)
471{
477 b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
478 b->mem.pos = b->mem.start;
479 b->mem.free = b->mem.start;
480 b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
481
482 nchunks = size / PORT_MMAP_CHUNK_SIZE;
483 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
484 nchunks++;

--- 21 unchanged lines hidden (view full) ---

506 return b;
507}
508
509
510nxt_int_t
511nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
512 size_t min_size)
513{
472 size_t nchunks, free_size;
473 nxt_chunk_id_t c, start;
474 nxt_port_mmap_header_t *hdr;
514 size_t nchunks, free_size;
515 nxt_chunk_id_t c, start;
516 nxt_port_mmap_header_t *hdr;
517 nxt_port_mmap_handler_t *mmap_handler;
475
476 nxt_debug(task, "request increase %z bytes shm buffer", size);
477
478 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
479 nxt_log(task, NXT_LOG_WARN,
480 "failed to increase, not a mmap buffer");
481 return NXT_ERROR;
482 }
483
484 free_size = nxt_buf_mem_free_size(&b->mem);
485
486 if (nxt_slow_path(size <= free_size)) {
487 return NXT_OK;
488 }
489
518
519 nxt_debug(task, "request increase %z bytes shm buffer", size);
520
521 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
522 nxt_log(task, NXT_LOG_WARN,
523 "failed to increase, not a mmap buffer");
524 return NXT_ERROR;
525 }
526
527 free_size = nxt_buf_mem_free_size(&b->mem);
528
529 if (nxt_slow_path(size <= free_size)) {
530 return NXT_OK;
531 }
532
490 hdr = b->parent;
533 mmap_handler = b->parent;
534 hdr = mmap_handler->hdr;
491
492 start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
493
494 size -= free_size;
495
496 nchunks = size / PORT_MMAP_CHUNK_SIZE;
497 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
498 nchunks++;

--- 32 unchanged lines hidden (view full) ---

531 }
532}
533
534
535static nxt_buf_t *
536nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
537 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
538{
535
536 start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
537
538 size -= free_size;
539
540 nchunks = size / PORT_MMAP_CHUNK_SIZE;
541 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
542 nchunks++;

--- 32 unchanged lines hidden (view full) ---

575 }
576}
577
578
579static nxt_buf_t *
580nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
581 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
582{
539 size_t nchunks;
540 nxt_buf_t *b;
541 nxt_port_mmap_header_t *hdr;
583 size_t nchunks;
584 nxt_buf_t *b;
585 nxt_port_mmap_header_t *hdr;
586 nxt_port_mmap_handler_t *mmap_handler;
542
587
543 hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id);
544 if (nxt_slow_path(hdr == NULL)) {
588 mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
589 mmap_msg->mmap_id);
590 if (nxt_slow_path(mmap_handler == NULL)) {
545 return NULL;
546 }
547
548 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
549 if (nxt_slow_path(b == NULL)) {
550 return NULL;
551 }
552
553 b->completion_handler = nxt_port_mmap_buf_completion;
554
555 nxt_buf_set_port_mmap(b);
556
557 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
558 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
559 nchunks++;
560 }
561
591 return NULL;
592 }
593
594 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
595 if (nxt_slow_path(b == NULL)) {
596 return NULL;
597 }
598
599 b->completion_handler = nxt_port_mmap_buf_completion;
600
601 nxt_buf_set_port_mmap(b);
602
603 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
604 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
605 nchunks++;
606 }
607
608 hdr = mmap_handler->hdr;
609
562 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
563 b->mem.pos = b->mem.start;
564 b->mem.free = b->mem.start + mmap_msg->size;
565 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
566
610 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
611 b->mem.pos = b->mem.start;
612 b->mem.free = b->mem.start + mmap_msg->size;
613 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
614
567 b->parent = hdr;
615 b->parent = mmap_handler;
616 nxt_port_mmap_handler_use(mmap_handler, 1);
568
569 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d",
570 b, b->mem.start, b->mem.end - b->mem.start,
571 hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
572
573 return b;
574}
575
576
577void
578nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
579 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
580{
617
618 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d",
619 b, b->mem.start, b->mem.end - b->mem.start,
620 hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
621
622 return b;
623}
624
625
626void
627nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
628 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
629{
581 size_t bsize;
582 nxt_buf_t *bmem;
583 nxt_uint_t i;
584 nxt_port_mmap_msg_t *mmap_msg;
585 nxt_port_mmap_header_t *hdr;
630 size_t bsize;
631 nxt_buf_t *bmem;
632 nxt_uint_t i;
633 nxt_port_mmap_msg_t *mmap_msg;
634 nxt_port_mmap_header_t *hdr;
635 nxt_port_mmap_handler_t *mmap_handler;
586
587 nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
588 "via shared memory", sb->size, port->pid);
589
590 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
591 mmap_msg = port->mmsg_buf;
592
593 bmem = msg->buf;

--- 7 unchanged lines hidden (view full) ---

601
602 if (nxt_slow_path(bmem == NULL)) {
603 nxt_log_error(NXT_LOG_ERR, task->log,
604 "failed to find buf for iobuf[%d]", i);
605 return;
606 /* TODO clear b and exit */
607 }
608
636
637 nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
638 "via shared memory", sb->size, port->pid);
639
640 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
641 mmap_msg = port->mmsg_buf;
642
643 bmem = msg->buf;

--- 7 unchanged lines hidden (view full) ---

651
652 if (nxt_slow_path(bmem == NULL)) {
653 nxt_log_error(NXT_LOG_ERR, task->log,
654 "failed to find buf for iobuf[%d]", i);
655 return;
656 /* TODO clear b and exit */
657 }
658
609 hdr = bmem->parent;
659 mmap_handler = bmem->parent;
660 hdr = mmap_handler->hdr;
610
611 mmap_msg->mmap_id = hdr->id;
612 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
613 mmap_msg->size = sb->iobuf[i].iov_len;
614
615 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
616 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
617 port->pid);

--- 44 unchanged lines hidden (view full) ---

662 /* Mark original buf as complete. */
663 b->mem.pos += nxt_buf_used_size(b);
664}
665
666
667nxt_port_method_t
668nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
669{
661
662 mmap_msg->mmap_id = hdr->id;
663 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
664 mmap_msg->size = sb->iobuf[i].iov_len;
665
666 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
667 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
668 port->pid);

--- 44 unchanged lines hidden (view full) ---

713 /* Mark original buf as complete. */
714 b->mem.pos += nxt_buf_used_size(b);
715}
716
717
718nxt_port_method_t
719nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
720{
670 nxt_port_method_t m;
671 nxt_port_mmap_header_t *hdr;
721 nxt_port_method_t m;
722 nxt_port_mmap_header_t *hdr;
723 nxt_port_mmap_handler_t *mmap_handler;
672
673 m = NXT_PORT_METHOD_ANY;
674
675 for (; b != NULL; b = b->next) {
676 if (nxt_buf_used_size(b) == 0) {
677 /* empty buffers does not affect method */
678 continue;
679 }
680
681 if (nxt_buf_is_port_mmap(b)) {
724
725 m = NXT_PORT_METHOD_ANY;
726
727 for (; b != NULL; b = b->next) {
728 if (nxt_buf_used_size(b) == 0) {
729 /* empty buffers does not affect method */
730 continue;
731 }
732
733 if (nxt_buf_is_port_mmap(b)) {
682 hdr = b->parent;
734 mmap_handler = b->parent;
735 hdr = mmap_handler->hdr;
683
684 if (m == NXT_PORT_METHOD_PLAIN) {
685 nxt_log_error(NXT_LOG_ERR, task->log,
686 "mixing plain and mmap buffers, "
687 "using plain mode");
688
689 break;
690 }

--- 37 unchanged lines hidden ---
736
737 if (m == NXT_PORT_METHOD_PLAIN) {
738 nxt_log_error(NXT_LOG_ERR, task->log,
739 "mixing plain and mmap buffers, "
740 "using plain mode");
741
742 break;
743 }

--- 37 unchanged lines hidden ---