xref: /unit/src/nxt_port_memory.c (revision 1008:84f2370bd642)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 #if (NXT_HAVE_MEMFD_CREATE)
10 
11 #include <linux/memfd.h>
12 #include <unistd.h>
13 #include <sys/syscall.h>
14 
15 #endif
16 
17 #include <nxt_port_memory_int.h>
18 
19 
20 nxt_inline void
21 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
22 {
23     int  c;
24 
25     c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
26 
27     if (i < 0 && c == -i) {
28         if (mmap_handler->hdr != NULL) {
29             nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
30             mmap_handler->hdr = NULL;
31         }
32 
33         nxt_free(mmap_handler);
34     }
35 }
36 
37 
38 static nxt_port_mmap_t *
39 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
40 {
41     uint32_t  cap;
42 
43     cap = port_mmaps->cap;
44 
45     if (cap == 0) {
46         cap = i + 1;
47     }
48 
49     while (i + 1 > cap) {
50 
51         if (cap < 16) {
52             cap = cap * 2;
53 
54         } else {
55             cap = cap + cap / 2;
56         }
57     }
58 
59     if (cap != port_mmaps->cap) {
60 
61         port_mmaps->elts = nxt_realloc(port_mmaps->elts,
62                                        cap * sizeof(nxt_port_mmap_t));
63         if (nxt_slow_path(port_mmaps->elts == NULL)) {
64             return NULL;
65         }
66 
67         nxt_memzero(port_mmaps->elts + port_mmaps->cap,
68                     sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap));
69 
70         port_mmaps->cap = cap;
71     }
72 
73     if (i + 1 > port_mmaps->size) {
74         port_mmaps->size = i + 1;
75     }
76 
77     return port_mmaps->elts + i;
78 }
79 
80 
81 void
82 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
83 {
84     uint32_t         i;
85     nxt_port_mmap_t  *port_mmap;
86 
87     if (port_mmaps == NULL) {
88         return;
89     }
90 
91     port_mmap = port_mmaps->elts;
92 
93     for (i = 0; i < port_mmaps->size; i++) {
94         nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
95     }
96 
97     port_mmaps->size = 0;
98 
99     if (free_elts != 0) {
100         nxt_free(port_mmaps->elts);
101     }
102 }
103 
104 
105 #define nxt_port_mmap_free_junk(p, size)                                      \
106     memset((p), 0xA5, size)
107 
108 
109 static void
110 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
111 {
112     u_char                   *p;
113     nxt_mp_t                 *mp;
114     nxt_buf_t                *b;
115     nxt_chunk_id_t           c;
116     nxt_port_mmap_header_t   *hdr;
117     nxt_port_mmap_handler_t  *mmap_handler;
118 
119     if (nxt_buf_ts_handle(task, obj, data)) {
120         return;
121     }
122 
123     b = obj;
124 
125     mp = b->data;
126 
127     nxt_assert(data == b->parent);
128 
129     mmap_handler = data;
130     hdr = mmap_handler->hdr;
131 
132     if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
133         nxt_debug(task, "mmap buf completion: mmap for other process pair "
134                   "%PI->%PI", hdr->src_pid, hdr->dst_pid);
135 
136         goto release_buf;
137     }
138 
139     if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
140         /*
141          * Chunks until b->mem.pos has been sent to other side,
142          * let's release rest (if any).
143          */
144         p = b->mem.pos - 1;
145         c = nxt_port_mmap_chunk_id(hdr, p) + 1;
146         p = nxt_port_mmap_chunk_start(hdr, c);
147 
148     } else {
149         p = b->mem.start;
150         c = nxt_port_mmap_chunk_id(hdr, p);
151     }
152 
153     nxt_port_mmap_free_junk(p, b->mem.end - p);
154 
155     nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), "
156               "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start,
157               b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c);
158 
159     while (p < b->mem.end) {
160         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
161 
162         p += PORT_MMAP_CHUNK_SIZE;
163         c++;
164     }
165 
166 release_buf:
167 
168     nxt_port_mmap_handler_use(mmap_handler, -1);
169 
170     nxt_mp_free(mp, b);
171     nxt_mp_release(mp);
172 }
173 
174 
175 nxt_port_mmap_handler_t *
176 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
177     nxt_fd_t fd)
178 {
179     void                     *mem;
180     struct stat              mmap_stat;
181     nxt_port_mmap_t          *port_mmap;
182     nxt_port_mmap_header_t   *hdr;
183     nxt_port_mmap_handler_t  *mmap_handler;
184 
185     nxt_debug(task, "got new mmap fd #%FD from process %PI",
186               fd, process->pid);
187 
188     port_mmap = NULL;
189 
190     if (fstat(fd, &mmap_stat) == -1) {
191         nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
192 
193         return NULL;
194     }
195 
196     mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
197                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
198 
199     if (nxt_slow_path(mem == MAP_FAILED)) {
200         nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
201 
202         return NULL;
203     }
204 
205     hdr = mem;
206 
207     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
208     if (nxt_slow_path(mmap_handler == NULL)) {
209         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
210 
211         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
212 
213         return NULL;
214     }
215 
216     mmap_handler->hdr = hdr;
217 
218     if (nxt_slow_path(hdr->src_pid != process->pid
219                       || hdr->dst_pid != nxt_pid))
220     {
221         nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: "
222                 "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid,
223                 hdr->dst_pid, nxt_pid);
224 
225         return NULL;
226     }
227 
228     nxt_thread_mutex_lock(&process->incoming.mutex);
229 
230     port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
231     if (nxt_slow_path(port_mmap == NULL)) {
232         nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
233 
234         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
235         hdr = NULL;
236 
237         nxt_free(mmap_handler);
238         mmap_handler = NULL;
239 
240         goto fail;
241     }
242 
243     port_mmap->mmap_handler = mmap_handler;
244     nxt_port_mmap_handler_use(mmap_handler, 1);
245 
246     hdr->sent_over = 0xFFFFu;
247 
248 fail:
249 
250     nxt_thread_mutex_unlock(&process->incoming.mutex);
251 
252     return mmap_handler;
253 }
254 
255 
256 static nxt_port_mmap_handler_t *
257 nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
258     nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n)
259 {
260     void                     *mem;
261     u_char                   *p, name[64];
262     nxt_fd_t                 fd;
263     nxt_int_t                i;
264     nxt_free_map_t           *free_map;
265     nxt_port_mmap_t          *port_mmap;
266     nxt_port_mmap_header_t   *hdr;
267     nxt_port_mmap_handler_t  *mmap_handler;
268 
269     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
270     if (nxt_slow_path(mmap_handler == NULL)) {
271         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
272 
273         return NULL;
274     }
275 
276     port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
277     if (nxt_slow_path(port_mmap == NULL)) {
278         nxt_log(task, NXT_LOG_WARN,
279                 "failed to add port mmap to outgoing array");
280 
281         nxt_free(mmap_handler);
282         return NULL;
283     }
284 
285     p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
286                     nxt_pid, nxt_random(&task->thread->random));
287     *p = '\0';
288 
289 #if (NXT_HAVE_MEMFD_CREATE)
290 
291     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
292 
293     if (nxt_slow_path(fd == -1)) {
294         nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
295 
296         goto remove_fail;
297     }
298 
299     nxt_debug(task, "memfd_create(%s): %FD", name, fd);
300 
301 #elif (NXT_HAVE_SHM_OPEN_ANON)
302 
303     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
304 
305     nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
306 
307     if (nxt_slow_path(fd == -1)) {
308         nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
309 
310         goto remove_fail;
311     }
312 
313 #elif (NXT_HAVE_SHM_OPEN)
314 
315     /* Just in case. */
316     shm_unlink((char *) name);
317 
318     fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
319 
320     nxt_debug(task, "shm_open(%s): %FD", name, fd);
321 
322     if (nxt_slow_path(fd == -1)) {
323         nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
324 
325         goto remove_fail;
326     }
327 
328     if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
329         nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
330                 nxt_errno);
331     }
332 
333 #else
334 
335 #error No working shared memory implementation.
336 
337 #endif
338 
339     if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
340         nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
341 
342         goto remove_fail;
343     }
344 
345     mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
346                        MAP_SHARED, fd, 0);
347 
348     if (nxt_slow_path(mem == MAP_FAILED)) {
349         goto remove_fail;
350     }
351 
352     mmap_handler->hdr = mem;
353     port_mmap->mmap_handler = mmap_handler;
354     nxt_port_mmap_handler_use(mmap_handler, 1);
355 
356     /* Init segment header. */
357     hdr = mmap_handler->hdr;
358 
359     nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
360     nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
361 
362     hdr->id = process->outgoing.size - 1;
363     hdr->src_pid = nxt_pid;
364     hdr->dst_pid = process->pid;
365     hdr->sent_over = port->id;
366 
367     /* Mark first chunk as busy */
368     free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
369 
370     for (i = 0; i < n; i++) {
371         nxt_port_mmap_set_chunk_busy(free_map, i);
372     }
373 
374     /* Mark as busy chunk followed the last available chunk. */
375     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
376     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
377 
378     nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
379 
380     /* TODO handle error */
381     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
382 
383     nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
384             hdr->id, nxt_pid, process->pid);
385 
386     return mmap_handler;
387 
388 remove_fail:
389 
390     nxt_free(mmap_handler);
391 
392     process->outgoing.size--;
393 
394     return NULL;
395 }
396 
397 
398 static nxt_port_mmap_handler_t *
399 nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
400     nxt_int_t n, nxt_bool_t tracking)
401 {
402     nxt_int_t                i, res, nchunks;
403     nxt_process_t            *process;
404     nxt_free_map_t           *free_map;
405     nxt_port_mmap_t          *port_mmap;
406     nxt_port_mmap_t          *end_port_mmap;
407     nxt_port_mmap_header_t   *hdr;
408     nxt_port_mmap_handler_t  *mmap_handler;
409 
410     process = port->process;
411     if (nxt_slow_path(process == NULL)) {
412         return NULL;
413     }
414 
415     nxt_thread_mutex_lock(&process->outgoing.mutex);
416 
417     end_port_mmap = process->outgoing.elts + process->outgoing.size;
418 
419     for (port_mmap = process->outgoing.elts;
420          port_mmap < end_port_mmap;
421          port_mmap++)
422     {
423         mmap_handler = port_mmap->mmap_handler;
424         hdr = mmap_handler->hdr;
425 
426         if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) {
427             continue;
428         }
429 
430         *c = 0;
431 
432         free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
433 
434         while (nxt_port_mmap_get_free_chunk(free_map, c)) {
435             nchunks = 1;
436 
437             while (nchunks < n) {
438                 res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks);
439 
440                 if (res == 0) {
441                     for (i = 0; i < nchunks; i++) {
442                         nxt_port_mmap_set_chunk_free(free_map, *c + i);
443                     }
444 
445                     *c += nchunks + 1;
446                     nchunks = 0;
447                     break;
448                 }
449 
450                 nchunks++;
451             }
452 
453             if (nchunks == n) {
454                 goto unlock_return;
455             }
456         }
457     }
458 
459     /* TODO introduce port_mmap limit and release wait. */
460 
461     *c = 0;
462     mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n);
463 
464 unlock_return:
465 
466     nxt_thread_mutex_unlock(&process->outgoing.mutex);
467 
468     return mmap_handler;
469 }
470 
471 
472 static nxt_port_mmap_handler_t *
473 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
474 {
475     nxt_process_t            *process;
476     nxt_port_mmap_handler_t  *mmap_handler;
477 
478     process = nxt_runtime_process_find(task->thread->runtime, spid);
479     if (nxt_slow_path(process == NULL)) {
480         return NULL;
481     }
482 
483     nxt_thread_mutex_lock(&process->incoming.mutex);
484 
485     if (nxt_fast_path(process->incoming.size > id)) {
486         mmap_handler = process->incoming.elts[id].mmap_handler;
487 
488     } else {
489         mmap_handler = NULL;
490 
491         nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid);
492     }
493 
494     nxt_thread_mutex_unlock(&process->incoming.mutex);
495 
496     return mmap_handler;
497 }
498 
499 
500 nxt_int_t
501 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
502     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
503 {
504     nxt_chunk_id_t           c;
505     nxt_port_mmap_header_t   *hdr;
506     nxt_port_mmap_handler_t  *mmap_handler;
507 
508     nxt_debug(task, "request tracking for stream #%uD", stream);
509 
510     mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1);
511     if (nxt_slow_path(mmap_handler == NULL)) {
512         return NXT_ERROR;
513     }
514 
515     nxt_port_mmap_handler_use(mmap_handler, 1);
516 
517     hdr = mmap_handler->hdr;
518 
519     tracking->mmap_handler = mmap_handler;
520     tracking->tracking = hdr->tracking + c;
521 
522     *tracking->tracking = stream;
523 
524     nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d",
525               hdr->src_pid, hdr->dst_pid, hdr->id, c);
526 
527     return NXT_OK;
528 }
529 
530 
531 nxt_bool_t
532 nxt_port_mmap_tracking_cancel(nxt_task_t *task,
533     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
534 {
535     nxt_bool_t               res;
536     nxt_chunk_id_t           c;
537     nxt_port_mmap_header_t   *hdr;
538     nxt_port_mmap_handler_t  *mmap_handler;
539 
540     mmap_handler = tracking->mmap_handler;
541 
542     if (nxt_slow_path(mmap_handler == NULL)) {
543         return 0;
544     }
545 
546     hdr = mmap_handler->hdr;
547 
548     res = nxt_atomic_cmp_set(tracking->tracking, stream, 0);
549 
550     nxt_debug(task, "%s tracking for stream #%uD",
551               (res ? "cancelled" : "failed to cancel"), stream);
552 
553     if (!res) {
554         c = tracking->tracking - hdr->tracking;
555         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
556     }
557 
558     nxt_port_mmap_handler_use(mmap_handler, -1);
559 
560     return res;
561 }
562 
563 
564 nxt_int_t
565 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t)
566 {
567     nxt_port_mmap_handler_t  *mmap_handler;
568 
569     mmap_handler = t->mmap_handler;
570 
571 #if (NXT_DEBUG)
572     {
573     nxt_atomic_t  *tracking;
574 
575     tracking = mmap_handler->hdr->tracking;
576 
577     nxt_assert(t->tracking >= tracking);
578     nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT);
579     }
580 #endif
581 
582     buf[0] = mmap_handler->hdr->id;
583     buf[1] = t->tracking - mmap_handler->hdr->tracking;
584 
585     return NXT_OK;
586 }
587 
588 nxt_bool_t
589 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
590 {
591     nxt_buf_t                     *b;
592     nxt_bool_t                    res;
593     nxt_chunk_id_t                c;
594     nxt_port_mmap_header_t        *hdr;
595     nxt_port_mmap_handler_t       *mmap_handler;
596     nxt_port_mmap_tracking_msg_t  *tracking_msg;
597 
598     b = msg->buf;
599 
600     if (nxt_buf_used_size(b) < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
601         nxt_debug(task, "too small message %O", nxt_buf_used_size(b));
602         return 0;
603     }
604 
605     tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos;
606 
607     b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t);
608     mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid,
609                                                    tracking_msg->mmap_id);
610 
611     if (nxt_slow_path(mmap_handler == NULL)) {
612         return 0;
613     }
614 
615     hdr = mmap_handler->hdr;
616 
617     c = tracking_msg->tracking_id;
618     res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0);
619 
620     nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream,
621               (res ? "received" : "already cancelled"));
622 
623     if (!res) {
624         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
625     }
626 
627     return res;
628 }
629 
630 
631 nxt_buf_t *
632 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
633 {
634     nxt_mp_t                 *mp;
635     nxt_buf_t                *b;
636     nxt_int_t                nchunks;
637     nxt_chunk_id_t           c;
638     nxt_port_mmap_header_t   *hdr;
639     nxt_port_mmap_handler_t  *mmap_handler;
640 
641     nxt_debug(task, "request %z bytes shm buffer", size);
642 
643     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
644 
645     if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) {
646         nxt_alert(task, "requested buffer (%z) too big", size);
647 
648         return NULL;
649     }
650 
651     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
652     if (nxt_slow_path(b == NULL)) {
653         return NULL;
654     }
655 
656     b->completion_handler = nxt_port_mmap_buf_completion;
657     nxt_buf_set_port_mmap(b);
658 
659     mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0);
660     if (nxt_slow_path(mmap_handler == NULL)) {
661         mp = task->thread->engine->mem_pool;
662         nxt_mp_free(mp, b);
663         nxt_mp_release(mp);
664         return NULL;
665     }
666 
667     b->parent = mmap_handler;
668 
669     nxt_port_mmap_handler_use(mmap_handler, 1);
670 
671     hdr = mmap_handler->hdr;
672 
673     b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
674     b->mem.pos = b->mem.start;
675     b->mem.free = b->mem.start;
676     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
677 
678     nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
679               b, b->mem.start, b->mem.end - b->mem.start,
680               hdr->src_pid, hdr->dst_pid, hdr->id, c);
681 
682     return b;
683 }
684 
685 
686 nxt_int_t
687 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
688     size_t min_size)
689 {
690     size_t                   nchunks, free_size;
691     nxt_chunk_id_t           c, start;
692     nxt_port_mmap_header_t   *hdr;
693     nxt_port_mmap_handler_t  *mmap_handler;
694 
695     nxt_debug(task, "request increase %z bytes shm buffer", size);
696 
697     if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
698         nxt_log(task, NXT_LOG_WARN,
699                 "failed to increase, not a mmap buffer");
700         return NXT_ERROR;
701     }
702 
703     free_size = nxt_buf_mem_free_size(&b->mem);
704 
705     if (nxt_slow_path(size <= free_size)) {
706         return NXT_OK;
707     }
708 
709     mmap_handler = b->parent;
710     hdr = mmap_handler->hdr;
711 
712     start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
713 
714     size -= free_size;
715 
716     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
717 
718     c = start;
719 
720     /* Try to acquire as much chunks as required. */
721     while (nchunks > 0) {
722 
723         if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
724             break;
725         }
726 
727         c++;
728         nchunks--;
729     }
730 
731     if (nchunks != 0
732         && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start))
733     {
734         c--;
735         while (c >= start) {
736             nxt_port_mmap_set_chunk_free(hdr->free_map, c);
737             c--;
738         }
739 
740         nxt_debug(task, "failed to increase, %uz chunks busy", nchunks);
741 
742         return NXT_ERROR;
743 
744     } else {
745         b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
746 
747         return NXT_OK;
748     }
749 }
750 
751 
752 static nxt_buf_t *
753 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
754     nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
755 {
756     size_t                   nchunks;
757     nxt_buf_t                *b;
758     nxt_port_mmap_header_t   *hdr;
759     nxt_port_mmap_handler_t  *mmap_handler;
760 
761     mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
762                                                    mmap_msg->mmap_id);
763     if (nxt_slow_path(mmap_handler == NULL)) {
764         return NULL;
765     }
766 
767     b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
768     if (nxt_slow_path(b == NULL)) {
769         return NULL;
770     }
771 
772     b->completion_handler = nxt_port_mmap_buf_completion;
773 
774     nxt_buf_set_port_mmap(b);
775 
776     nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
777     if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
778         nchunks++;
779     }
780 
781     hdr = mmap_handler->hdr;
782 
783     b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
784     b->mem.pos = b->mem.start;
785     b->mem.free = b->mem.start + mmap_msg->size;
786     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
787 
788     b->parent = mmap_handler;
789     nxt_port_mmap_handler_use(mmap_handler, 1);
790 
791     nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
792               b, b->mem.start, b->mem.end - b->mem.start,
793               hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
794 
795     return b;
796 }
797 
798 
799 void
800 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
801     nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
802 {
803     size_t                   bsize;
804     nxt_buf_t                *bmem;
805     nxt_uint_t               i;
806     nxt_port_mmap_msg_t      *mmap_msg;
807     nxt_port_mmap_header_t   *hdr;
808     nxt_port_mmap_handler_t  *mmap_handler;
809 
810     nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
811                     "via shared memory", sb->size, port->pid);
812 
813     bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
814     mmap_msg = port->mmsg_buf;
815 
816     bmem = msg->buf;
817 
818     for (i = 0; i < sb->niov; i++, mmap_msg++) {
819 
820         /* Lookup buffer which starts current iov_base. */
821         while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
822             bmem = bmem->next;
823         }
824 
825         if (nxt_slow_path(bmem == NULL)) {
826             nxt_log_error(NXT_LOG_ERR, task->log,
827                           "failed to find buf for iobuf[%d]", i);
828             return;
829             /* TODO clear b and exit */
830         }
831 
832         mmap_handler = bmem->parent;
833         hdr = mmap_handler->hdr;
834 
835         mmap_msg->mmap_id = hdr->id;
836         mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
837         mmap_msg->size = sb->iobuf[i].iov_len;
838 
839         nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
840                   mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
841                   port->pid);
842     }
843 
844     sb->iobuf[0].iov_base = port->mmsg_buf;
845     sb->iobuf[0].iov_len = bsize;
846     sb->niov = 1;
847     sb->size = bsize;
848 
849     msg->port_msg.mmap = 1;
850 }
851 
852 
853 void
854 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
855 {
856     nxt_buf_t            *b, **pb;
857     nxt_port_mmap_msg_t  *end, *mmap_msg;
858 
859     pb = &msg->buf;
860     msg->size = 0;
861 
862     for (b = msg->buf; b != NULL; b = b->next) {
863 
864         mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
865         end = (nxt_port_mmap_msg_t *) b->mem.free;
866 
867         while (mmap_msg < end) {
868             nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
869                       mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
870                       msg->port_msg.pid);
871 
872             *pb = nxt_port_mmap_get_incoming_buf(task, msg->port,
873                                                  msg->port_msg.pid, mmap_msg);
874             if (nxt_slow_path(*pb == NULL)) {
875                 nxt_log_error(NXT_LOG_ERR, task->log,
876                               "failed to get mmap buffer");
877 
878                 break;
879             }
880 
881             msg->size += mmap_msg->size;
882             pb = &(*pb)->next;
883             mmap_msg++;
884 
885             /* Mark original buf as complete. */
886             b->mem.pos += sizeof(nxt_port_mmap_msg_t);
887         }
888     }
889 }
890 
891 
892 nxt_port_method_t
893 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
894 {
895     nxt_port_method_t        m;
896     nxt_port_mmap_header_t   *hdr;
897     nxt_port_mmap_handler_t  *mmap_handler;
898 
899     m = NXT_PORT_METHOD_ANY;
900 
901     for (/* void */; b != NULL; b = b->next) {
902         if (nxt_buf_used_size(b) == 0) {
903             /* empty buffers does not affect method */
904             continue;
905         }
906 
907         if (nxt_buf_is_port_mmap(b)) {
908             mmap_handler = b->parent;
909             hdr = mmap_handler->hdr;
910 
911             if (m == NXT_PORT_METHOD_PLAIN) {
912                 nxt_log_error(NXT_LOG_ERR, task->log,
913                               "mixing plain and mmap buffers, "
914                               "using plain mode");
915 
916                 break;
917             }
918 
919             if (port->pid != hdr->dst_pid) {
920                 nxt_log_error(NXT_LOG_ERR, task->log,
921                               "send mmap buffer for %PI to %PI, "
922                               "using plain mode", hdr->dst_pid, port->pid);
923 
924                 m = NXT_PORT_METHOD_PLAIN;
925 
926                 break;
927             }
928 
929             if (m == NXT_PORT_METHOD_ANY) {
930                 nxt_debug(task, "using mmap mode");
931 
932                 m = NXT_PORT_METHOD_MMAP;
933             }
934         } else {
935             if (m == NXT_PORT_METHOD_MMAP) {
936                 nxt_log_error(NXT_LOG_ERR, task->log,
937                               "mixing mmap and plain buffers, "
938                               "switching to plain mode");
939 
940                 m = NXT_PORT_METHOD_PLAIN;
941 
942                 break;
943             }
944 
945             if (m == NXT_PORT_METHOD_ANY) {
946                 nxt_debug(task, "using plain mode");
947 
948                 m = NXT_PORT_METHOD_PLAIN;
949             }
950         }
951     }
952 
953     return m;
954 }
955