xref: /unit/src/nxt_port_memory.c (revision 423:449f2a9c5e62)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 #if (NXT_HAVE_MEMFD_CREATE)
10 
11 #include <linux/memfd.h>
12 #include <unistd.h>
13 #include <sys/syscall.h>
14 
15 #endif
16 
17 #include <nxt_port_memory_int.h>
18 
19 
20 nxt_inline void
21 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
22 {
23     int c;
24 
25     c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
26 
27     if (i < 0 && c == -i) {
28         if (mmap_handler->hdr != NULL) {
29             nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
30             mmap_handler->hdr = NULL;
31         }
32 
33         nxt_free(mmap_handler);
34     }
35 }
36 
37 
38 static nxt_port_mmap_t *
39 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
40 {
41     uint32_t  cap;
42 
43     cap = port_mmaps->cap;
44 
45     if (cap == 0) {
46         cap = i + 1;
47     }
48 
49     while (i + 1 > cap) {
50 
51         if (cap < 16) {
52           cap = cap * 2;
53 
54         } else {
55           cap = cap + cap / 2;
56         }
57     }
58 
59     if (cap != port_mmaps->cap) {
60 
61         port_mmaps->elts = nxt_realloc(port_mmaps->elts,
62                                        cap * sizeof(nxt_port_mmap_t));
63         if (nxt_slow_path(port_mmaps->elts == NULL)) {
64             return NULL;
65         }
66 
67         nxt_memzero(port_mmaps->elts + port_mmaps->cap,
68                     sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap));
69 
70         port_mmaps->cap = cap;
71     }
72 
73     if (i + 1 > port_mmaps->size) {
74         port_mmaps->size = i + 1;
75     }
76 
77     return port_mmaps->elts + i;
78 }
79 
80 
81 void
82 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
83 {
84     uint32_t         i;
85     nxt_port_mmap_t  *port_mmap;
86 
87     if (port_mmaps == NULL) {
88         return;
89     }
90 
91     port_mmap = port_mmaps->elts;
92 
93     for (i = 0; i < port_mmaps->size; i++) {
94         nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
95     }
96 
97     port_mmaps->size = 0;
98 
99     if (free_elts != 0) {
100         nxt_free(port_mmaps->elts);
101     }
102 }
103 
104 
105 #define nxt_port_mmap_free_junk(p, size)                                      \
106     memset((p), 0xA5, size)
107 
108 
109 static void
110 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
111 {
112     u_char                   *p;
113     nxt_mp_t                 *mp;
114     nxt_buf_t                *b;
115     nxt_chunk_id_t           c;
116     nxt_port_mmap_header_t   *hdr;
117     nxt_port_mmap_handler_t  *mmap_handler;
118 
119     if (nxt_buf_ts_handle(task, obj, data)) {
120         return;
121     }
122 
123     b = obj;
124 
125     mp = b->data;
126 
127 #if (NXT_DEBUG)
128     if (nxt_slow_path(data != b->parent)) {
129         nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
130                       data, b->parent);
131         nxt_abort();
132     }
133 #endif
134 
135     mmap_handler = data;
136     hdr = mmap_handler->hdr;
137 
138     if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
139         nxt_debug(task, "mmap buf completion: mmap for other process pair "
140                   "%PI->%PI", hdr->src_pid, hdr->dst_pid);
141 
142         goto release_buf;
143     }
144 
145     if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
146         /*
147          * Chunks until b->mem.pos has been sent to other side,
148          * let's release rest (if any).
149          */
150         p = b->mem.pos - 1;
151         c = nxt_port_mmap_chunk_id(hdr, p) + 1;
152         p = nxt_port_mmap_chunk_start(hdr, c);
153 
154     } else {
155         p = b->mem.start;
156         c = nxt_port_mmap_chunk_id(hdr, p);
157     }
158 
159     nxt_port_mmap_free_junk(p, b->mem.end - p);
160 
161     nxt_debug(task, "mmap buf completion: %p [%p,%d] (sent=%d), "
162               "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start,
163               b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c);
164 
165     while (p < b->mem.end) {
166         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
167 
168         p += PORT_MMAP_CHUNK_SIZE;
169         c++;
170     }
171 
172 release_buf:
173 
174     nxt_port_mmap_handler_use(mmap_handler, -1);
175 
176     nxt_mp_release(mp, b);
177 }
178 
179 
180 nxt_port_mmap_handler_t *
181 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
182     nxt_fd_t fd)
183 {
184     void                     *mem;
185     struct stat              mmap_stat;
186     nxt_port_mmap_t          *port_mmap;
187     nxt_port_mmap_header_t   *hdr;
188     nxt_port_mmap_handler_t  *mmap_handler;
189 
190     nxt_debug(task, "got new mmap fd #%FD from process %PI",
191               fd, process->pid);
192 
193     port_mmap = NULL;
194 
195     if (fstat(fd, &mmap_stat) == -1) {
196         nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
197 
198         return NULL;
199     }
200 
201     mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
202                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
203 
204     if (nxt_slow_path(mem == MAP_FAILED)) {
205         nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
206 
207         return NULL;
208     }
209 
210     hdr = mem;
211 
212     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
213     if (nxt_slow_path(mmap_handler == NULL)) {
214         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
215 
216         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
217 
218         return NULL;
219     }
220 
221     mmap_handler->hdr = hdr;
222 
223     nxt_thread_mutex_lock(&process->incoming.mutex);
224 
225     port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
226     if (nxt_slow_path(port_mmap == NULL)) {
227         nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
228 
229         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
230         hdr = NULL;
231 
232         nxt_free(mmap_handler);
233         mmap_handler = NULL;
234 
235         goto fail;
236     }
237 
238     nxt_assert(hdr->src_pid == process->pid);
239     nxt_assert(hdr->dst_pid == nxt_pid);
240 
241     port_mmap->mmap_handler = mmap_handler;
242     nxt_port_mmap_handler_use(mmap_handler, 1);
243 
244     hdr->sent_over = 0xFFFFu;
245 
246 fail:
247 
248     nxt_thread_mutex_unlock(&process->incoming.mutex);
249 
250     return mmap_handler;
251 }
252 
253 
254 static nxt_port_mmap_handler_t *
255 nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
256     nxt_port_t *port, nxt_bool_t tracking)
257 {
258     void                     *mem;
259     u_char                   *p, name[64];
260     nxt_fd_t                 fd;
261     nxt_free_map_t           *free_map;
262     nxt_port_mmap_t          *port_mmap;
263     nxt_port_mmap_header_t   *hdr;
264     nxt_port_mmap_handler_t  *mmap_handler;
265 
266     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
267     if (nxt_slow_path(mmap_handler == NULL)) {
268         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
269 
270         return NULL;
271     }
272 
273     port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
274     if (nxt_slow_path(port_mmap == NULL)) {
275         nxt_log(task, NXT_LOG_WARN,
276                 "failed to add port mmap to outgoing array");
277 
278         nxt_free(mmap_handler);
279         return NULL;
280     }
281 
282     p = nxt_sprintf(name, name + sizeof(name), "/unit.%PI.%uxD",
283                     nxt_pid, nxt_random(&task->thread->random));
284     *p = '\0';
285 
286 #if (NXT_HAVE_MEMFD_CREATE)
287 
288     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
289 
290     if (nxt_slow_path(fd == -1)) {
291         nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E",
292                 name, nxt_errno);
293 
294         goto remove_fail;
295     }
296 
297     nxt_debug(task, "memfd_create(%s): %FD", name, fd);
298 
299 #elif (NXT_HAVE_SHM_OPEN)
300 
301     /* Just in case. */
302     shm_unlink((char *) name);
303 
304     fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
305 
306     nxt_debug(task, "shm_open(%s): %FD", name, fd);
307 
308     if (nxt_slow_path(fd == -1)) {
309         nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno);
310 
311         goto remove_fail;
312     }
313 
314     if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
315         nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
316                 nxt_errno);
317     }
318 
319 #else
320 
321 #error No working shared memory implementation.
322 
323 #endif
324 
325     if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
326         nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
327 
328         goto remove_fail;
329     }
330 
331     mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
332                        MAP_SHARED, fd, 0);
333 
334     if (nxt_slow_path(mem == MAP_FAILED)) {
335         goto remove_fail;
336     }
337 
338     mmap_handler->hdr = mem;
339     port_mmap->mmap_handler = mmap_handler;
340     nxt_port_mmap_handler_use(mmap_handler, 1);
341 
342     /* Init segment header. */
343     hdr = mmap_handler->hdr;
344 
345     nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
346     nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
347 
348     hdr->id = process->outgoing.size - 1;
349     hdr->src_pid = nxt_pid;
350     hdr->dst_pid = process->pid;
351     hdr->sent_over = port->id;
352 
353     /* Mark first chunk as busy */
354     free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
355 
356     nxt_port_mmap_set_chunk_busy(free_map, 0);
357 
358     /* Mark as busy chunk followed the last available chunk. */
359     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
360     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
361 
362     nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
363 
364     /* TODO handle error */
365     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
366 
367     nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
368             hdr->id, nxt_pid, process->pid);
369 
370     return mmap_handler;
371 
372 remove_fail:
373 
374     nxt_free(mmap_handler);
375 
376     process->outgoing.size--;
377 
378     return NULL;
379 }
380 
381 
382 static nxt_port_mmap_handler_t *
383 nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
384     nxt_bool_t tracking)
385 {
386     nxt_process_t            *process;
387     nxt_free_map_t           *free_map;
388     nxt_port_mmap_t          *port_mmap;
389     nxt_port_mmap_t          *end_port_mmap;
390     nxt_port_mmap_header_t   *hdr;
391     nxt_port_mmap_handler_t  *mmap_handler;
392 
393     process = port->process;
394     if (nxt_slow_path(process == NULL)) {
395         return NULL;
396     }
397 
398     *c = 0;
399 
400     nxt_thread_mutex_lock(&process->outgoing.mutex);
401 
402     end_port_mmap = process->outgoing.elts + process->outgoing.size;
403 
404     for (port_mmap = process->outgoing.elts;
405          port_mmap < end_port_mmap;
406          port_mmap++)
407     {
408         mmap_handler = port_mmap->mmap_handler;
409         hdr = mmap_handler->hdr;
410 
411         if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) {
412             continue;
413         }
414 
415         free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
416 
417         if (nxt_port_mmap_get_free_chunk(free_map, c)) {
418             goto unlock_return;
419         }
420     }
421 
422     /* TODO introduce port_mmap limit and release wait. */
423 
424     mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking);
425 
426 unlock_return:
427 
428     nxt_thread_mutex_unlock(&process->outgoing.mutex);
429 
430     return mmap_handler;
431 }
432 
433 
434 static nxt_port_mmap_handler_t *
435 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
436 {
437     nxt_process_t            *process;
438     nxt_port_mmap_handler_t  *mmap_handler;
439 
440     process = nxt_runtime_process_find(task->thread->runtime, spid);
441     if (nxt_slow_path(process == NULL)) {
442         return NULL;
443     }
444 
445     nxt_thread_mutex_lock(&process->incoming.mutex);
446 
447     if (nxt_fast_path(process->incoming.size > id)) {
448         mmap_handler = process->incoming.elts[id].mmap_handler;
449 
450     } else {
451         mmap_handler = NULL;
452 
453         nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid);
454     }
455 
456     nxt_thread_mutex_unlock(&process->incoming.mutex);
457 
458     return mmap_handler;
459 }
460 
461 
462 nxt_int_t
463 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
464     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
465 {
466     nxt_chunk_id_t           c;
467     nxt_port_mmap_header_t   *hdr;
468     nxt_port_mmap_handler_t  *mmap_handler;
469 
470     nxt_debug(task, "request tracking for stream #%uD", stream);
471 
472     mmap_handler = nxt_port_mmap_get(task, port, &c, 1);
473     if (nxt_slow_path(mmap_handler == NULL)) {
474         return NXT_ERROR;
475     }
476 
477     nxt_port_mmap_handler_use(mmap_handler, 1);
478 
479     hdr = mmap_handler->hdr;
480 
481     tracking->mmap_handler = mmap_handler;
482     tracking->tracking = hdr->tracking + c;
483 
484     *tracking->tracking = stream;
485 
486     nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d",
487               hdr->src_pid, hdr->dst_pid, hdr->id, c);
488 
489     return NXT_OK;
490 }
491 
492 
493 nxt_bool_t
494 nxt_port_mmap_tracking_cancel(nxt_task_t *task,
495     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
496 {
497     nxt_bool_t               res;
498     nxt_chunk_id_t           c;
499     nxt_port_mmap_header_t   *hdr;
500     nxt_port_mmap_handler_t  *mmap_handler;
501 
502     mmap_handler = tracking->mmap_handler;
503 
504     if (nxt_slow_path(mmap_handler == NULL)) {
505         return 0;
506     }
507 
508     hdr = mmap_handler->hdr;
509 
510     res = nxt_atomic_cmp_set(tracking->tracking, stream, 0);
511 
512     nxt_debug(task, "%s tracking for stream #%uD",
513               (res ? "cancelled" : "failed to cancel"), stream);
514 
515     if (!res) {
516         c = tracking->tracking - hdr->tracking;
517         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
518     }
519 
520     nxt_port_mmap_handler_use(mmap_handler, -1);
521 
522     return res;
523 }
524 
525 
526 nxt_int_t
527 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t)
528 {
529     nxt_atomic_t             *tracking;
530     nxt_port_mmap_handler_t  *mmap_handler;
531 
532     mmap_handler = t->mmap_handler;
533     tracking = mmap_handler->hdr->tracking;
534 
535     nxt_assert(t->tracking >= tracking);
536     nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT);
537 
538     buf[0] = mmap_handler->hdr->id;
539     buf[1] = t->tracking - mmap_handler->hdr->tracking;
540 
541     return NXT_OK;
542 }
543 
544 nxt_bool_t
545 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
546 {
547     nxt_buf_t                     *b;
548     nxt_bool_t                    res;
549     nxt_chunk_id_t                c;
550     nxt_port_mmap_header_t        *hdr;
551     nxt_port_mmap_handler_t       *mmap_handler;
552     nxt_port_mmap_tracking_msg_t  *tracking_msg;
553 
554     b = msg->buf;
555 
556     if (nxt_buf_used_size(b) < (int)sizeof(nxt_port_mmap_tracking_msg_t)) {
557         nxt_debug(task, "too small message %u", nxt_buf_used_size(b));
558         return 0;
559     }
560 
561     tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos;
562 
563     b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t);
564     mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid,
565                                                    tracking_msg->mmap_id);
566 
567     if (nxt_slow_path(mmap_handler == NULL)) {
568         return 0;
569     }
570 
571     hdr = mmap_handler->hdr;
572 
573     c = tracking_msg->tracking_id;
574     res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0);
575 
576     nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream,
577               (res ? "received" : "already cancelled"));
578 
579     if (!res) {
580         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
581     }
582 
583     return res;
584 }
585 
586 
587 nxt_buf_t *
588 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
589 {
590     size_t                   nchunks;
591     nxt_buf_t                *b;
592     nxt_chunk_id_t           c;
593     nxt_port_mmap_header_t   *hdr;
594     nxt_port_mmap_handler_t  *mmap_handler;
595 
596     nxt_debug(task, "request %z bytes shm buffer", size);
597 
598     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
599     if (nxt_slow_path(b == NULL)) {
600         return NULL;
601     }
602 
603     b->completion_handler = nxt_port_mmap_buf_completion;
604     nxt_buf_set_port_mmap(b);
605 
606     mmap_handler = nxt_port_mmap_get(task, port, &c, 0);
607     if (nxt_slow_path(mmap_handler == NULL)) {
608         nxt_mp_release(task->thread->engine->mem_pool, b);
609         return NULL;
610     }
611 
612     b->parent = mmap_handler;
613 
614     nxt_port_mmap_handler_use(mmap_handler, 1);
615 
616     hdr = mmap_handler->hdr;
617 
618     b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
619     b->mem.pos = b->mem.start;
620     b->mem.free = b->mem.start;
621     b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
622 
623     nchunks = size / PORT_MMAP_CHUNK_SIZE;
624     if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
625         nchunks++;
626     }
627 
628     nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d",
629               b, b->mem.start, b->mem.end - b->mem.start,
630               hdr->src_pid, hdr->dst_pid, hdr->id, c);
631 
632     c++;
633     nchunks--;
634 
635     /* Try to acquire as much chunks as required. */
636     while (nchunks > 0) {
637 
638         if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
639             break;
640         }
641 
642         b->mem.end += PORT_MMAP_CHUNK_SIZE;
643         c++;
644         nchunks--;
645     }
646 
647     return b;
648 }
649 
650 
651 nxt_int_t
652 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
653     size_t min_size)
654 {
655     size_t                   nchunks, free_size;
656     nxt_chunk_id_t           c, start;
657     nxt_port_mmap_header_t   *hdr;
658     nxt_port_mmap_handler_t  *mmap_handler;
659 
660     nxt_debug(task, "request increase %z bytes shm buffer", size);
661 
662     if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
663         nxt_log(task, NXT_LOG_WARN,
664                 "failed to increase, not a mmap buffer");
665         return NXT_ERROR;
666     }
667 
668     free_size = nxt_buf_mem_free_size(&b->mem);
669 
670     if (nxt_slow_path(size <= free_size)) {
671         return NXT_OK;
672     }
673 
674     mmap_handler = b->parent;
675     hdr = mmap_handler->hdr;
676 
677     start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
678 
679     size -= free_size;
680 
681     nchunks = size / PORT_MMAP_CHUNK_SIZE;
682     if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
683         nchunks++;
684     }
685 
686     c = start;
687 
688     /* Try to acquire as much chunks as required. */
689     while (nchunks > 0) {
690 
691         if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
692             break;
693         }
694 
695         c++;
696         nchunks--;
697     }
698 
699     if (nchunks != 0
700         && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start))
701     {
702         c--;
703         while (c >= start) {
704             nxt_port_mmap_set_chunk_free(hdr->free_map, c);
705             c--;
706         }
707 
708         nxt_debug(task, "failed to increase, %d chunks busy", nchunks);
709 
710         return NXT_ERROR;
711 
712     } else {
713         b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
714 
715         return NXT_OK;
716     }
717 }
718 
719 
720 static nxt_buf_t *
721 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
722     nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
723 {
724     size_t                   nchunks;
725     nxt_buf_t                *b;
726     nxt_port_mmap_header_t   *hdr;
727     nxt_port_mmap_handler_t  *mmap_handler;
728 
729     mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
730                                                    mmap_msg->mmap_id);
731     if (nxt_slow_path(mmap_handler == NULL)) {
732         return NULL;
733     }
734 
735     b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
736     if (nxt_slow_path(b == NULL)) {
737         return NULL;
738     }
739 
740     b->completion_handler = nxt_port_mmap_buf_completion;
741 
742     nxt_buf_set_port_mmap(b);
743 
744     nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
745     if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
746         nchunks++;
747     }
748 
749     hdr = mmap_handler->hdr;
750 
751     b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
752     b->mem.pos = b->mem.start;
753     b->mem.free = b->mem.start + mmap_msg->size;
754     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
755 
756     b->parent = mmap_handler;
757     nxt_port_mmap_handler_use(mmap_handler, 1);
758 
759     nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d",
760               b, b->mem.start, b->mem.end - b->mem.start,
761               hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
762 
763     return b;
764 }
765 
766 
767 void
768 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
769     nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
770 {
771     size_t                   bsize;
772     nxt_buf_t                *bmem;
773     nxt_uint_t               i;
774     nxt_port_mmap_msg_t      *mmap_msg;
775     nxt_port_mmap_header_t   *hdr;
776     nxt_port_mmap_handler_t  *mmap_handler;
777 
778     nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
779                     "via shared memory", sb->size, port->pid);
780 
781     bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
782     mmap_msg = port->mmsg_buf;
783 
784     bmem = msg->buf;
785 
786     for (i = 0; i < sb->niov; i++, mmap_msg++) {
787 
788         /* Lookup buffer which starts current iov_base. */
789         while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
790             bmem = bmem->next;
791         }
792 
793         if (nxt_slow_path(bmem == NULL)) {
794             nxt_log_error(NXT_LOG_ERR, task->log,
795                           "failed to find buf for iobuf[%d]", i);
796             return;
797             /* TODO clear b and exit */
798         }
799 
800         mmap_handler = bmem->parent;
801         hdr = mmap_handler->hdr;
802 
803         mmap_msg->mmap_id = hdr->id;
804         mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
805         mmap_msg->size = sb->iobuf[i].iov_len;
806 
807         nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
808                   mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
809                   port->pid);
810     }
811 
812     sb->iobuf[0].iov_base = port->mmsg_buf;
813     sb->iobuf[0].iov_len = bsize;
814     sb->niov = 1;
815     sb->size = bsize;
816 
817     msg->port_msg.mmap = 1;
818 }
819 
820 
821 void
822 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
823 {
824     nxt_buf_t            *b, **pb;
825     nxt_port_mmap_msg_t  *end, *mmap_msg;
826 
827     pb = &msg->buf;
828     msg->size = 0;
829 
830     for (b = msg->buf; b != NULL; b = b->next) {
831 
832         mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
833         end = (nxt_port_mmap_msg_t *) b->mem.free;
834 
835         while (mmap_msg < end) {
836             nxt_assert(mmap_msg + 1 <= end);
837 
838             nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
839                       mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
840                       msg->port_msg.pid);
841 
842             *pb = nxt_port_mmap_get_incoming_buf(task, msg->port,
843                                                  msg->port_msg.pid, mmap_msg);
844             if (nxt_slow_path(*pb == NULL)) {
845                 nxt_log_error(NXT_LOG_ERR, task->log,
846                               "failed to get mmap buffer");
847 
848                 break;
849             }
850 
851             msg->size += mmap_msg->size;
852             pb = &(*pb)->next;
853             mmap_msg++;
854 
855             /* Mark original buf as complete. */
856             b->mem.pos += sizeof(nxt_port_mmap_msg_t);
857         }
858     }
859 }
860 
861 
862 nxt_port_method_t
863 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
864 {
865     nxt_port_method_t        m;
866     nxt_port_mmap_header_t   *hdr;
867     nxt_port_mmap_handler_t  *mmap_handler;
868 
869     m = NXT_PORT_METHOD_ANY;
870 
871     for (; b != NULL; b = b->next) {
872         if (nxt_buf_used_size(b) == 0) {
873             /* empty buffers does not affect method */
874             continue;
875         }
876 
877         if (nxt_buf_is_port_mmap(b)) {
878             mmap_handler = b->parent;
879             hdr = mmap_handler->hdr;
880 
881             if (m == NXT_PORT_METHOD_PLAIN) {
882                 nxt_log_error(NXT_LOG_ERR, task->log,
883                               "mixing plain and mmap buffers, "
884                               "using plain mode");
885 
886                 break;
887             }
888 
889             if (port->pid != hdr->dst_pid) {
890                 nxt_log_error(NXT_LOG_ERR, task->log,
891                               "send mmap buffer for %PI to %PI, "
892                               "using plain mode", hdr->dst_pid, port->pid);
893 
894                 m = NXT_PORT_METHOD_PLAIN;
895 
896                 break;
897             }
898 
899             if (m == NXT_PORT_METHOD_ANY) {
900                 nxt_debug(task, "using mmap mode");
901 
902                 m = NXT_PORT_METHOD_MMAP;
903             }
904         } else {
905             if (m == NXT_PORT_METHOD_MMAP) {
906                 nxt_log_error(NXT_LOG_ERR, task->log,
907                               "mixing mmap and plain buffers, "
908                               "switching to plain mode");
909 
910                 m = NXT_PORT_METHOD_PLAIN;
911 
912                 break;
913             }
914 
915             if (m == NXT_PORT_METHOD_ANY) {
916                 nxt_debug(task, "using plain mode");
917 
918                 m = NXT_PORT_METHOD_PLAIN;
919             }
920         }
921     }
922 
923     return m;
924 }
925