xref: /unit/src/nxt_port_memory.c (revision 1321:2c7f79bf0a1f)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 #if (NXT_HAVE_MEMFD_CREATE)
10 
11 #include <linux/memfd.h>
12 #include <unistd.h>
13 #include <sys/syscall.h>
14 
15 #endif
16 
17 #include <nxt_port_memory_int.h>
18 
19 
20 nxt_inline void
21 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
22 {
23     int  c;
24 
25     c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
26 
27     if (i < 0 && c == -i) {
28         if (mmap_handler->hdr != NULL) {
29             nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
30             mmap_handler->hdr = NULL;
31         }
32 
33         nxt_free(mmap_handler);
34     }
35 }
36 
37 
38 static nxt_port_mmap_t *
39 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
40 {
41     uint32_t  cap;
42 
43     cap = port_mmaps->cap;
44 
45     if (cap == 0) {
46         cap = i + 1;
47     }
48 
49     while (i + 1 > cap) {
50 
51         if (cap < 16) {
52             cap = cap * 2;
53 
54         } else {
55             cap = cap + cap / 2;
56         }
57     }
58 
59     if (cap != port_mmaps->cap) {
60 
61         port_mmaps->elts = nxt_realloc(port_mmaps->elts,
62                                        cap * sizeof(nxt_port_mmap_t));
63         if (nxt_slow_path(port_mmaps->elts == NULL)) {
64             return NULL;
65         }
66 
67         nxt_memzero(port_mmaps->elts + port_mmaps->cap,
68                     sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap));
69 
70         port_mmaps->cap = cap;
71     }
72 
73     if (i + 1 > port_mmaps->size) {
74         port_mmaps->size = i + 1;
75     }
76 
77     return port_mmaps->elts + i;
78 }
79 
80 
81 void
82 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
83 {
84     uint32_t         i;
85     nxt_port_mmap_t  *port_mmap;
86 
87     if (port_mmaps == NULL) {
88         return;
89     }
90 
91     port_mmap = port_mmaps->elts;
92 
93     for (i = 0; i < port_mmaps->size; i++) {
94         nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
95     }
96 
97     port_mmaps->size = 0;
98 
99     if (free_elts != 0) {
100         nxt_free(port_mmaps->elts);
101     }
102 }
103 
104 
105 #define nxt_port_mmap_free_junk(p, size)                                      \
106     memset((p), 0xA5, size)
107 
108 
109 static void
110 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
111 {
112     u_char                   *p;
113     nxt_mp_t                 *mp;
114     nxt_buf_t                *b;
115     nxt_port_t               *port;
116     nxt_process_t            *process;
117     nxt_chunk_id_t           c;
118     nxt_port_mmap_header_t   *hdr;
119     nxt_port_mmap_handler_t  *mmap_handler;
120 
121     if (nxt_buf_ts_handle(task, obj, data)) {
122         return;
123     }
124 
125     b = obj;
126 
127     mp = b->data;
128 
129     nxt_assert(data == b->parent);
130 
131     mmap_handler = data;
132     hdr = mmap_handler->hdr;
133 
134     if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
135         nxt_debug(task, "mmap buf completion: mmap for other process pair "
136                   "%PI->%PI", hdr->src_pid, hdr->dst_pid);
137 
138         goto release_buf;
139     }
140 
141     if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
142         /*
143          * Chunks until b->mem.pos has been sent to other side,
144          * let's release rest (if any).
145          */
146         p = b->mem.pos - 1;
147         c = nxt_port_mmap_chunk_id(hdr, p) + 1;
148         p = nxt_port_mmap_chunk_start(hdr, c);
149 
150     } else {
151         p = b->mem.start;
152         c = nxt_port_mmap_chunk_id(hdr, p);
153     }
154 
155     nxt_port_mmap_free_junk(p, b->mem.end - p);
156 
157     nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), "
158               "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start,
159               b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c);
160 
161     while (p < b->mem.end) {
162         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
163 
164         p += PORT_MMAP_CHUNK_SIZE;
165         c++;
166     }
167 
168     if (hdr->dst_pid == nxt_pid
169         && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
170     {
171         process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid);
172 
173         if (process != NULL && !nxt_queue_is_empty(&process->ports)) {
174             port = nxt_process_port_first(process);
175 
176             if (port->type == NXT_PROCESS_WORKER) {
177                 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
178                                              -1, 0, 0, NULL);
179             }
180         }
181     }
182 
183 release_buf:
184 
185     nxt_port_mmap_handler_use(mmap_handler, -1);
186 
187     nxt_mp_free(mp, b);
188     nxt_mp_release(mp);
189 }
190 
191 
192 nxt_port_mmap_handler_t *
193 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
194     nxt_fd_t fd)
195 {
196     void                     *mem;
197     struct stat              mmap_stat;
198     nxt_port_mmap_t          *port_mmap;
199     nxt_port_mmap_header_t   *hdr;
200     nxt_port_mmap_handler_t  *mmap_handler;
201 
202     nxt_debug(task, "got new mmap fd #%FD from process %PI",
203               fd, process->pid);
204 
205     port_mmap = NULL;
206 
207     if (fstat(fd, &mmap_stat) == -1) {
208         nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
209 
210         return NULL;
211     }
212 
213     mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
214                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
215 
216     if (nxt_slow_path(mem == MAP_FAILED)) {
217         nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
218 
219         return NULL;
220     }
221 
222     hdr = mem;
223 
224     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
225     if (nxt_slow_path(mmap_handler == NULL)) {
226         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
227 
228         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
229 
230         return NULL;
231     }
232 
233     mmap_handler->hdr = hdr;
234 
235     if (nxt_slow_path(hdr->src_pid != process->pid
236                       || hdr->dst_pid != nxt_pid))
237     {
238         nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: "
239                 "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid,
240                 hdr->dst_pid, nxt_pid);
241 
242         return NULL;
243     }
244 
245     nxt_thread_mutex_lock(&process->incoming.mutex);
246 
247     port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
248     if (nxt_slow_path(port_mmap == NULL)) {
249         nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
250 
251         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
252         hdr = NULL;
253 
254         nxt_free(mmap_handler);
255         mmap_handler = NULL;
256 
257         goto fail;
258     }
259 
260     port_mmap->mmap_handler = mmap_handler;
261     nxt_port_mmap_handler_use(mmap_handler, 1);
262 
263     hdr->sent_over = 0xFFFFu;
264 
265 fail:
266 
267     nxt_thread_mutex_unlock(&process->incoming.mutex);
268 
269     return mmap_handler;
270 }
271 
272 
273 static nxt_port_mmap_handler_t *
274 nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
275     nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n)
276 {
277     void                     *mem;
278     u_char                   *p, name[64];
279     nxt_fd_t                 fd;
280     nxt_int_t                i;
281     nxt_free_map_t           *free_map;
282     nxt_port_mmap_t          *port_mmap;
283     nxt_port_mmap_header_t   *hdr;
284     nxt_port_mmap_handler_t  *mmap_handler;
285 
286     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
287     if (nxt_slow_path(mmap_handler == NULL)) {
288         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
289 
290         return NULL;
291     }
292 
293     port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
294     if (nxt_slow_path(port_mmap == NULL)) {
295         nxt_log(task, NXT_LOG_WARN,
296                 "failed to add port mmap to outgoing array");
297 
298         nxt_free(mmap_handler);
299         return NULL;
300     }
301 
302     p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
303                     nxt_pid, nxt_random(&task->thread->random));
304     *p = '\0';
305 
306 #if (NXT_HAVE_MEMFD_CREATE)
307 
308     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
309 
310     if (nxt_slow_path(fd == -1)) {
311         nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
312 
313         goto remove_fail;
314     }
315 
316     nxt_debug(task, "memfd_create(%s): %FD", name, fd);
317 
318 #elif (NXT_HAVE_SHM_OPEN_ANON)
319 
320     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
321 
322     nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
323 
324     if (nxt_slow_path(fd == -1)) {
325         nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
326 
327         goto remove_fail;
328     }
329 
330 #elif (NXT_HAVE_SHM_OPEN)
331 
332     /* Just in case. */
333     shm_unlink((char *) name);
334 
335     fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
336 
337     nxt_debug(task, "shm_open(%s): %FD", name, fd);
338 
339     if (nxt_slow_path(fd == -1)) {
340         nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
341 
342         goto remove_fail;
343     }
344 
345     if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
346         nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
347                 nxt_errno);
348     }
349 
350 #else
351 
352 #error No working shared memory implementation.
353 
354 #endif
355 
356     if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
357         nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
358 
359         goto remove_fail;
360     }
361 
362     mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
363                        MAP_SHARED, fd, 0);
364 
365     if (nxt_slow_path(mem == MAP_FAILED)) {
366         goto remove_fail;
367     }
368 
369     mmap_handler->hdr = mem;
370     port_mmap->mmap_handler = mmap_handler;
371     nxt_port_mmap_handler_use(mmap_handler, 1);
372 
373     /* Init segment header. */
374     hdr = mmap_handler->hdr;
375 
376     nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
377     nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
378 
379     hdr->id = process->outgoing.size - 1;
380     hdr->src_pid = nxt_pid;
381     hdr->dst_pid = process->pid;
382     hdr->sent_over = port->id;
383 
384     /* Mark first chunk as busy */
385     free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
386 
387     for (i = 0; i < n; i++) {
388         nxt_port_mmap_set_chunk_busy(free_map, i);
389     }
390 
391     /* Mark as busy chunk followed the last available chunk. */
392     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
393     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
394 
395     nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
396 
397     /* TODO handle error */
398     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
399 
400     nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
401             hdr->id, nxt_pid, process->pid);
402 
403     return mmap_handler;
404 
405 remove_fail:
406 
407     nxt_free(mmap_handler);
408 
409     process->outgoing.size--;
410 
411     return NULL;
412 }
413 
414 
415 static nxt_port_mmap_handler_t *
416 nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
417     nxt_int_t n, nxt_bool_t tracking)
418 {
419     nxt_int_t                i, res, nchunks;
420     nxt_process_t            *process;
421     nxt_free_map_t           *free_map;
422     nxt_port_mmap_t          *port_mmap;
423     nxt_port_mmap_t          *end_port_mmap;
424     nxt_port_mmap_header_t   *hdr;
425     nxt_port_mmap_handler_t  *mmap_handler;
426 
427     process = port->process;
428     if (nxt_slow_path(process == NULL)) {
429         return NULL;
430     }
431 
432     nxt_thread_mutex_lock(&process->outgoing.mutex);
433 
434     end_port_mmap = process->outgoing.elts + process->outgoing.size;
435 
436     for (port_mmap = process->outgoing.elts;
437          port_mmap < end_port_mmap;
438          port_mmap++)
439     {
440         mmap_handler = port_mmap->mmap_handler;
441         hdr = mmap_handler->hdr;
442 
443         if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) {
444             continue;
445         }
446 
447         *c = 0;
448 
449         free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
450 
451         while (nxt_port_mmap_get_free_chunk(free_map, c)) {
452             nchunks = 1;
453 
454             while (nchunks < n) {
455                 res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks);
456 
457                 if (res == 0) {
458                     for (i = 0; i < nchunks; i++) {
459                         nxt_port_mmap_set_chunk_free(free_map, *c + i);
460                     }
461 
462                     *c += nchunks + 1;
463                     nchunks = 0;
464                     break;
465                 }
466 
467                 nchunks++;
468             }
469 
470             if (nchunks == n) {
471                 goto unlock_return;
472             }
473         }
474 
475         hdr->oosm = 1;
476     }
477 
478     /* TODO introduce port_mmap limit and release wait. */
479 
480     *c = 0;
481     mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n);
482 
483 unlock_return:
484 
485     nxt_thread_mutex_unlock(&process->outgoing.mutex);
486 
487     return mmap_handler;
488 }
489 
490 
491 static nxt_port_mmap_handler_t *
492 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
493 {
494     nxt_process_t            *process;
495     nxt_port_mmap_handler_t  *mmap_handler;
496 
497     process = nxt_runtime_process_find(task->thread->runtime, spid);
498     if (nxt_slow_path(process == NULL)) {
499         return NULL;
500     }
501 
502     nxt_thread_mutex_lock(&process->incoming.mutex);
503 
504     if (nxt_fast_path(process->incoming.size > id)) {
505         mmap_handler = process->incoming.elts[id].mmap_handler;
506 
507     } else {
508         mmap_handler = NULL;
509 
510         nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid);
511     }
512 
513     nxt_thread_mutex_unlock(&process->incoming.mutex);
514 
515     return mmap_handler;
516 }
517 
518 
519 nxt_int_t
520 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
521     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
522 {
523     nxt_chunk_id_t           c;
524     nxt_port_mmap_header_t   *hdr;
525     nxt_port_mmap_handler_t  *mmap_handler;
526 
527     nxt_debug(task, "request tracking for stream #%uD", stream);
528 
529     mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1);
530     if (nxt_slow_path(mmap_handler == NULL)) {
531         return NXT_ERROR;
532     }
533 
534     nxt_port_mmap_handler_use(mmap_handler, 1);
535 
536     hdr = mmap_handler->hdr;
537 
538     tracking->mmap_handler = mmap_handler;
539     tracking->tracking = hdr->tracking + c;
540 
541     *tracking->tracking = stream;
542 
543     nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d",
544               hdr->src_pid, hdr->dst_pid, hdr->id, c);
545 
546     return NXT_OK;
547 }
548 
549 
550 nxt_bool_t
551 nxt_port_mmap_tracking_cancel(nxt_task_t *task,
552     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
553 {
554     nxt_bool_t               res;
555     nxt_chunk_id_t           c;
556     nxt_port_mmap_header_t   *hdr;
557     nxt_port_mmap_handler_t  *mmap_handler;
558 
559     mmap_handler = tracking->mmap_handler;
560 
561     if (nxt_slow_path(mmap_handler == NULL)) {
562         return 0;
563     }
564 
565     hdr = mmap_handler->hdr;
566 
567     res = nxt_atomic_cmp_set(tracking->tracking, stream, 0);
568 
569     nxt_debug(task, "%s tracking for stream #%uD",
570               (res ? "cancelled" : "failed to cancel"), stream);
571 
572     if (!res) {
573         c = tracking->tracking - hdr->tracking;
574         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
575     }
576 
577     nxt_port_mmap_handler_use(mmap_handler, -1);
578 
579     return res;
580 }
581 
582 
583 nxt_int_t
584 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t)
585 {
586     nxt_port_mmap_handler_t  *mmap_handler;
587 
588     mmap_handler = t->mmap_handler;
589 
590 #if (NXT_DEBUG)
591     {
592     nxt_atomic_t  *tracking;
593 
594     tracking = mmap_handler->hdr->tracking;
595 
596     nxt_assert(t->tracking >= tracking);
597     nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT);
598     }
599 #endif
600 
601     buf[0] = mmap_handler->hdr->id;
602     buf[1] = t->tracking - mmap_handler->hdr->tracking;
603 
604     return NXT_OK;
605 }
606 
607 nxt_bool_t
608 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
609 {
610     nxt_buf_t                     *b;
611     nxt_bool_t                    res;
612     nxt_chunk_id_t                c;
613     nxt_port_mmap_header_t        *hdr;
614     nxt_port_mmap_handler_t       *mmap_handler;
615     nxt_port_mmap_tracking_msg_t  *tracking_msg;
616 
617     b = msg->buf;
618 
619     if (nxt_buf_used_size(b) < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
620         nxt_debug(task, "too small message %O", nxt_buf_used_size(b));
621         return 0;
622     }
623 
624     tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos;
625 
626     b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t);
627     mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid,
628                                                    tracking_msg->mmap_id);
629 
630     if (nxt_slow_path(mmap_handler == NULL)) {
631         return 0;
632     }
633 
634     hdr = mmap_handler->hdr;
635 
636     c = tracking_msg->tracking_id;
637     res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0);
638 
639     nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream,
640               (res ? "received" : "already cancelled"));
641 
642     if (!res) {
643         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
644     }
645 
646     return res;
647 }
648 
649 
650 nxt_buf_t *
651 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
652 {
653     nxt_mp_t                 *mp;
654     nxt_buf_t                *b;
655     nxt_int_t                nchunks;
656     nxt_chunk_id_t           c;
657     nxt_port_mmap_header_t   *hdr;
658     nxt_port_mmap_handler_t  *mmap_handler;
659 
660     nxt_debug(task, "request %z bytes shm buffer", size);
661 
662     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
663 
664     if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) {
665         nxt_alert(task, "requested buffer (%z) too big", size);
666 
667         return NULL;
668     }
669 
670     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
671     if (nxt_slow_path(b == NULL)) {
672         return NULL;
673     }
674 
675     b->completion_handler = nxt_port_mmap_buf_completion;
676     nxt_buf_set_port_mmap(b);
677 
678     mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0);
679     if (nxt_slow_path(mmap_handler == NULL)) {
680         mp = task->thread->engine->mem_pool;
681         nxt_mp_free(mp, b);
682         nxt_mp_release(mp);
683         return NULL;
684     }
685 
686     b->parent = mmap_handler;
687 
688     nxt_port_mmap_handler_use(mmap_handler, 1);
689 
690     hdr = mmap_handler->hdr;
691 
692     b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
693     b->mem.pos = b->mem.start;
694     b->mem.free = b->mem.start;
695     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
696 
697     nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
698               b, b->mem.start, b->mem.end - b->mem.start,
699               hdr->src_pid, hdr->dst_pid, hdr->id, c);
700 
701     return b;
702 }
703 
704 
705 nxt_int_t
706 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
707     size_t min_size)
708 {
709     size_t                   nchunks, free_size;
710     nxt_chunk_id_t           c, start;
711     nxt_port_mmap_header_t   *hdr;
712     nxt_port_mmap_handler_t  *mmap_handler;
713 
714     nxt_debug(task, "request increase %z bytes shm buffer", size);
715 
716     if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
717         nxt_log(task, NXT_LOG_WARN,
718                 "failed to increase, not a mmap buffer");
719         return NXT_ERROR;
720     }
721 
722     free_size = nxt_buf_mem_free_size(&b->mem);
723 
724     if (nxt_slow_path(size <= free_size)) {
725         return NXT_OK;
726     }
727 
728     mmap_handler = b->parent;
729     hdr = mmap_handler->hdr;
730 
731     start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
732 
733     size -= free_size;
734 
735     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
736 
737     c = start;
738 
739     /* Try to acquire as much chunks as required. */
740     while (nchunks > 0) {
741 
742         if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
743             break;
744         }
745 
746         c++;
747         nchunks--;
748     }
749 
750     if (nchunks != 0
751         && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start))
752     {
753         c--;
754         while (c >= start) {
755             nxt_port_mmap_set_chunk_free(hdr->free_map, c);
756             c--;
757         }
758 
759         nxt_debug(task, "failed to increase, %uz chunks busy", nchunks);
760 
761         return NXT_ERROR;
762 
763     } else {
764         b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
765 
766         return NXT_OK;
767     }
768 }
769 
770 
771 static nxt_buf_t *
772 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
773     nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
774 {
775     size_t                   nchunks;
776     nxt_buf_t                *b;
777     nxt_port_mmap_header_t   *hdr;
778     nxt_port_mmap_handler_t  *mmap_handler;
779 
780     mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
781                                                    mmap_msg->mmap_id);
782     if (nxt_slow_path(mmap_handler == NULL)) {
783         return NULL;
784     }
785 
786     b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
787     if (nxt_slow_path(b == NULL)) {
788         return NULL;
789     }
790 
791     b->completion_handler = nxt_port_mmap_buf_completion;
792 
793     nxt_buf_set_port_mmap(b);
794 
795     nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
796     if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
797         nchunks++;
798     }
799 
800     hdr = mmap_handler->hdr;
801 
802     b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
803     b->mem.pos = b->mem.start;
804     b->mem.free = b->mem.start + mmap_msg->size;
805     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
806 
807     b->parent = mmap_handler;
808     nxt_port_mmap_handler_use(mmap_handler, 1);
809 
810     nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
811               b, b->mem.start, b->mem.end - b->mem.start,
812               hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
813 
814     return b;
815 }
816 
817 
818 void
819 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
820     nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf)
821 {
822     size_t                   bsize;
823     nxt_buf_t                *bmem;
824     nxt_uint_t               i;
825     nxt_port_mmap_msg_t      *mmap_msg;
826     nxt_port_mmap_header_t   *hdr;
827     nxt_port_mmap_handler_t  *mmap_handler;
828 
829     nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
830                     "via shared memory", sb->size, port->pid);
831 
832     bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
833     mmap_msg = mmsg_buf;
834 
835     bmem = msg->buf;
836 
837     for (i = 0; i < sb->niov; i++, mmap_msg++) {
838 
839         /* Lookup buffer which starts current iov_base. */
840         while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
841             bmem = bmem->next;
842         }
843 
844         if (nxt_slow_path(bmem == NULL)) {
845             nxt_log_error(NXT_LOG_ERR, task->log,
846                           "failed to find buf for iobuf[%d]", i);
847             return;
848             /* TODO clear b and exit */
849         }
850 
851         mmap_handler = bmem->parent;
852         hdr = mmap_handler->hdr;
853 
854         mmap_msg->mmap_id = hdr->id;
855         mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
856         mmap_msg->size = sb->iobuf[i].iov_len;
857 
858         nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
859                   mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
860                   port->pid);
861     }
862 
863     sb->iobuf[0].iov_base = mmsg_buf;
864     sb->iobuf[0].iov_len = bsize;
865     sb->niov = 1;
866     sb->size = bsize;
867 
868     msg->port_msg.mmap = 1;
869 }
870 
871 
872 void
873 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
874 {
875     nxt_buf_t            *b, **pb;
876     nxt_port_mmap_msg_t  *end, *mmap_msg;
877 
878     pb = &msg->buf;
879     msg->size = 0;
880 
881     for (b = msg->buf; b != NULL; b = b->next) {
882 
883         mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
884         end = (nxt_port_mmap_msg_t *) b->mem.free;
885 
886         while (mmap_msg < end) {
887             nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
888                       mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
889                       msg->port_msg.pid);
890 
891             *pb = nxt_port_mmap_get_incoming_buf(task, msg->port,
892                                                  msg->port_msg.pid, mmap_msg);
893             if (nxt_slow_path(*pb == NULL)) {
894                 nxt_log_error(NXT_LOG_ERR, task->log,
895                               "failed to get mmap buffer");
896 
897                 break;
898             }
899 
900             msg->size += mmap_msg->size;
901             pb = &(*pb)->next;
902             mmap_msg++;
903 
904             /* Mark original buf as complete. */
905             b->mem.pos += sizeof(nxt_port_mmap_msg_t);
906         }
907     }
908 }
909 
910 
911 nxt_port_method_t
912 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
913 {
914     nxt_port_method_t        m;
915     nxt_port_mmap_header_t   *hdr;
916     nxt_port_mmap_handler_t  *mmap_handler;
917 
918     m = NXT_PORT_METHOD_ANY;
919 
920     for (/* void */; b != NULL; b = b->next) {
921         if (nxt_buf_used_size(b) == 0) {
922             /* empty buffers does not affect method */
923             continue;
924         }
925 
926         if (nxt_buf_is_port_mmap(b)) {
927             mmap_handler = b->parent;
928             hdr = mmap_handler->hdr;
929 
930             if (m == NXT_PORT_METHOD_PLAIN) {
931                 nxt_log_error(NXT_LOG_ERR, task->log,
932                               "mixing plain and mmap buffers, "
933                               "using plain mode");
934 
935                 break;
936             }
937 
938             if (port->pid != hdr->dst_pid) {
939                 nxt_log_error(NXT_LOG_ERR, task->log,
940                               "send mmap buffer for %PI to %PI, "
941                               "using plain mode", hdr->dst_pid, port->pid);
942 
943                 m = NXT_PORT_METHOD_PLAIN;
944 
945                 break;
946             }
947 
948             if (m == NXT_PORT_METHOD_ANY) {
949                 nxt_debug(task, "using mmap mode");
950 
951                 m = NXT_PORT_METHOD_MMAP;
952             }
953         } else {
954             if (m == NXT_PORT_METHOD_MMAP) {
955                 nxt_log_error(NXT_LOG_ERR, task->log,
956                               "mixing mmap and plain buffers, "
957                               "switching to plain mode");
958 
959                 m = NXT_PORT_METHOD_PLAIN;
960 
961                 break;
962             }
963 
964             if (m == NXT_PORT_METHOD_ANY) {
965                 nxt_debug(task, "using plain mode");
966 
967                 m = NXT_PORT_METHOD_PLAIN;
968             }
969         }
970     }
971 
972     return m;
973 }
974