xref: /unit/src/nxt_port_memory.c (revision 1662:1472957bc7bd)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 #if (NXT_HAVE_MEMFD_CREATE)
10 
11 #include <linux/memfd.h>
12 #include <unistd.h>
13 #include <sys/syscall.h>
14 
15 #endif
16 
17 #include <nxt_port_memory_int.h>
18 
19 
20 static void nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port,
21     void *data);
22 
23 
24 nxt_inline void
25 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
26 {
27     int  c;
28 
29     c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
30 
31     if (i < 0 && c == -i) {
32         if (mmap_handler->hdr != NULL) {
33             nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
34             mmap_handler->hdr = NULL;
35         }
36 
37         nxt_free(mmap_handler);
38     }
39 }
40 
41 
42 static nxt_port_mmap_t *
43 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
44 {
45     uint32_t  cap;
46 
47     cap = port_mmaps->cap;
48 
49     if (cap == 0) {
50         cap = i + 1;
51     }
52 
53     while (i + 1 > cap) {
54 
55         if (cap < 16) {
56             cap = cap * 2;
57 
58         } else {
59             cap = cap + cap / 2;
60         }
61     }
62 
63     if (cap != port_mmaps->cap) {
64 
65         port_mmaps->elts = nxt_realloc(port_mmaps->elts,
66                                        cap * sizeof(nxt_port_mmap_t));
67         if (nxt_slow_path(port_mmaps->elts == NULL)) {
68             return NULL;
69         }
70 
71         nxt_memzero(port_mmaps->elts + port_mmaps->cap,
72                     sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap));
73 
74         port_mmaps->cap = cap;
75     }
76 
77     if (i + 1 > port_mmaps->size) {
78         port_mmaps->size = i + 1;
79     }
80 
81     return port_mmaps->elts + i;
82 }
83 
84 
85 void
86 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
87 {
88     uint32_t         i;
89     nxt_port_mmap_t  *port_mmap;
90 
91     if (port_mmaps == NULL) {
92         return;
93     }
94 
95     port_mmap = port_mmaps->elts;
96 
97     for (i = 0; i < port_mmaps->size; i++) {
98         nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
99     }
100 
101     port_mmaps->size = 0;
102 
103     if (free_elts != 0) {
104         nxt_free(port_mmaps->elts);
105     }
106 }
107 
108 
109 #define nxt_port_mmap_free_junk(p, size)                                      \
110     memset((p), 0xA5, size)
111 
112 
113 static void
114 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
115 {
116     u_char                   *p;
117     nxt_mp_t                 *mp;
118     nxt_buf_t                *b, *next;
119     nxt_process_t            *process;
120     nxt_chunk_id_t           c;
121     nxt_port_mmap_header_t   *hdr;
122     nxt_port_mmap_handler_t  *mmap_handler;
123 
124     if (nxt_buf_ts_handle(task, obj, data)) {
125         return;
126     }
127 
128     b = obj;
129 
130     nxt_assert(data == b->parent);
131 
132     mmap_handler = data;
133 
134 complete_buf:
135 
136     hdr = mmap_handler->hdr;
137 
138     if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
139         nxt_debug(task, "mmap buf completion: mmap for other process pair "
140                   "%PI->%PI", hdr->src_pid, hdr->dst_pid);
141 
142         goto release_buf;
143     }
144 
145     if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
146         /*
147          * Chunks until b->mem.pos has been sent to other side,
148          * let's release rest (if any).
149          */
150         p = b->mem.pos - 1;
151         c = nxt_port_mmap_chunk_id(hdr, p) + 1;
152         p = nxt_port_mmap_chunk_start(hdr, c);
153 
154     } else {
155         p = b->mem.start;
156         c = nxt_port_mmap_chunk_id(hdr, p);
157     }
158 
159     nxt_port_mmap_free_junk(p, b->mem.end - p);
160 
161     nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), "
162               "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start,
163               b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c);
164 
165     while (p < b->mem.end) {
166         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
167 
168         p += PORT_MMAP_CHUNK_SIZE;
169         c++;
170     }
171 
172     if (hdr->dst_pid == nxt_pid
173         && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
174     {
175         process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid);
176 
177         nxt_process_broadcast_shm_ack(task, process);
178     }
179 
180 release_buf:
181 
182     nxt_port_mmap_handler_use(mmap_handler, -1);
183 
184     next = b->next;
185     mp = b->data;
186 
187     nxt_mp_free(mp, b);
188     nxt_mp_release(mp);
189 
190     if (next != NULL) {
191         b = next;
192         mmap_handler = b->parent;
193 
194         goto complete_buf;
195     }
196 }
197 
198 
199 nxt_port_mmap_handler_t *
200 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
201     nxt_fd_t fd)
202 {
203     void                     *mem;
204     struct stat              mmap_stat;
205     nxt_port_mmap_t          *port_mmap;
206     nxt_port_mmap_header_t   *hdr;
207     nxt_port_mmap_handler_t  *mmap_handler;
208 
209     nxt_debug(task, "got new mmap fd #%FD from process %PI",
210               fd, process->pid);
211 
212     port_mmap = NULL;
213 
214     if (fstat(fd, &mmap_stat) == -1) {
215         nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
216 
217         return NULL;
218     }
219 
220     mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
221                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
222 
223     if (nxt_slow_path(mem == MAP_FAILED)) {
224         nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
225 
226         return NULL;
227     }
228 
229     hdr = mem;
230 
231     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
232     if (nxt_slow_path(mmap_handler == NULL)) {
233         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
234 
235         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
236 
237         return NULL;
238     }
239 
240     mmap_handler->hdr = hdr;
241 
242     if (nxt_slow_path(hdr->src_pid != process->pid
243                       || hdr->dst_pid != nxt_pid))
244     {
245         nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: "
246                 "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid,
247                 hdr->dst_pid, nxt_pid);
248 
249         return NULL;
250     }
251 
252     nxt_thread_mutex_lock(&process->incoming.mutex);
253 
254     port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
255     if (nxt_slow_path(port_mmap == NULL)) {
256         nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
257 
258         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
259         hdr = NULL;
260 
261         nxt_free(mmap_handler);
262         mmap_handler = NULL;
263 
264         goto fail;
265     }
266 
267     port_mmap->mmap_handler = mmap_handler;
268     nxt_port_mmap_handler_use(mmap_handler, 1);
269 
270     hdr->sent_over = 0xFFFFu;
271 
272 fail:
273 
274     nxt_thread_mutex_unlock(&process->incoming.mutex);
275 
276     return mmap_handler;
277 }
278 
279 
280 static nxt_port_mmap_handler_t *
281 nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
282     nxt_bool_t tracking, nxt_int_t n)
283 {
284     void                     *mem;
285     nxt_fd_t                 fd;
286     nxt_int_t                i;
287     nxt_free_map_t           *free_map;
288     nxt_port_mmap_t          *port_mmap;
289     nxt_port_mmap_header_t   *hdr;
290     nxt_port_mmap_handler_t  *mmap_handler;
291 
292     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
293     if (nxt_slow_path(mmap_handler == NULL)) {
294         nxt_alert(task, "failed to allocate mmap_handler");
295 
296         return NULL;
297     }
298 
299     port_mmap = nxt_port_mmap_at(mmaps, mmaps->size);
300     if (nxt_slow_path(port_mmap == NULL)) {
301         nxt_alert(task, "failed to add port mmap to mmaps array");
302 
303         nxt_free(mmap_handler);
304         return NULL;
305     }
306 
307     fd = nxt_shm_open(task, PORT_MMAP_SIZE);
308     if (nxt_slow_path(fd == -1)) {
309         goto remove_fail;
310     }
311 
312     mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
313                        MAP_SHARED, fd, 0);
314 
315     if (nxt_slow_path(mem == MAP_FAILED)) {
316         goto remove_fail;
317     }
318 
319     mmap_handler->hdr = mem;
320     mmap_handler->fd = fd;
321     port_mmap->mmap_handler = mmap_handler;
322     nxt_port_mmap_handler_use(mmap_handler, 1);
323 
324     /* Init segment header. */
325     hdr = mmap_handler->hdr;
326 
327     nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
328     nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
329 
330     hdr->id = mmaps->size - 1;
331     hdr->src_pid = nxt_pid;
332     hdr->sent_over = 0xFFFFu;
333 
334     /* Mark first chunk as busy */
335     free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
336 
337     for (i = 0; i < n; i++) {
338         nxt_port_mmap_set_chunk_busy(free_map, i);
339     }
340 
341     /* Mark as busy chunk followed the last available chunk. */
342     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
343     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
344 
345     nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...",
346             hdr->id, nxt_pid);
347 
348     return mmap_handler;
349 
350 remove_fail:
351 
352     nxt_free(mmap_handler);
353 
354     mmaps->size--;
355 
356     return NULL;
357 }
358 
359 
360 nxt_int_t
361 nxt_shm_open(nxt_task_t *task, size_t size)
362 {
363     nxt_fd_t  fd;
364 
365 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
366 
367     u_char    *p, name[64];
368 
369     p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
370                     nxt_pid, nxt_random(&task->thread->random));
371     *p = '\0';
372 
373 #endif
374 
375 #if (NXT_HAVE_MEMFD_CREATE)
376 
377     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
378 
379     if (nxt_slow_path(fd == -1)) {
380         nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
381 
382         return -1;
383     }
384 
385     nxt_debug(task, "memfd_create(%s): %FD", name, fd);
386 
387 #elif (NXT_HAVE_SHM_OPEN_ANON)
388 
389     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
390 
391     if (nxt_slow_path(fd == -1)) {
392         nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
393 
394         return -1;
395     }
396 
397     nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
398 
399 #elif (NXT_HAVE_SHM_OPEN)
400 
401     /* Just in case. */
402     shm_unlink((char *) name);
403 
404     fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
405 
406     if (nxt_slow_path(fd == -1)) {
407         nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
408 
409         return -1;
410     }
411 
412     nxt_debug(task, "shm_open(%s): %FD", name, fd);
413 
414     if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
415         nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
416                 nxt_errno);
417     }
418 
419 #else
420 
421 #error No working shared memory implementation.
422 
423 #endif
424 
425     if (nxt_slow_path(ftruncate(fd, size) == -1)) {
426         nxt_alert(task, "ftruncate() failed %E", nxt_errno);
427 
428         nxt_fd_close(fd);
429 
430         return -1;
431     }
432 
433     return fd;
434 }
435 
436 
437 static nxt_port_mmap_handler_t *
438 nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c,
439     nxt_int_t n, nxt_bool_t tracking)
440 {
441     nxt_int_t                i, res, nchunks;
442     nxt_free_map_t           *free_map;
443     nxt_port_mmap_t          *port_mmap;
444     nxt_port_mmap_t          *end_port_mmap;
445     nxt_port_mmap_header_t   *hdr;
446     nxt_port_mmap_handler_t  *mmap_handler;
447 
448     nxt_thread_mutex_lock(&mmaps->mutex);
449 
450     end_port_mmap = mmaps->elts + mmaps->size;
451 
452     for (port_mmap = mmaps->elts;
453          port_mmap < end_port_mmap;
454          port_mmap++)
455     {
456         mmap_handler = port_mmap->mmap_handler;
457         hdr = mmap_handler->hdr;
458 
459         if (hdr->sent_over != 0xFFFFu) {
460             continue;
461         }
462 
463         *c = 0;
464 
465         free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
466 
467         while (nxt_port_mmap_get_free_chunk(free_map, c)) {
468             nchunks = 1;
469 
470             while (nchunks < n) {
471                 res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks);
472 
473                 if (res == 0) {
474                     for (i = 0; i < nchunks; i++) {
475                         nxt_port_mmap_set_chunk_free(free_map, *c + i);
476                     }
477 
478                     *c += nchunks + 1;
479                     nchunks = 0;
480                     break;
481                 }
482 
483                 nchunks++;
484             }
485 
486             if (nchunks == n) {
487                 goto unlock_return;
488             }
489         }
490 
491         hdr->oosm = 1;
492     }
493 
494     /* TODO introduce port_mmap limit and release wait. */
495 
496     *c = 0;
497     mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n);
498 
499 unlock_return:
500 
501     nxt_thread_mutex_unlock(&mmaps->mutex);
502 
503     return mmap_handler;
504 }
505 
506 
507 static nxt_port_mmap_handler_t *
508 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
509 {
510     nxt_process_t            *process;
511     nxt_port_mmap_handler_t  *mmap_handler;
512 
513     process = nxt_runtime_process_find(task->thread->runtime, spid);
514     if (nxt_slow_path(process == NULL)) {
515         return NULL;
516     }
517 
518     nxt_thread_mutex_lock(&process->incoming.mutex);
519 
520     if (nxt_fast_path(process->incoming.size > id)) {
521         mmap_handler = process->incoming.elts[id].mmap_handler;
522 
523     } else {
524         mmap_handler = NULL;
525 
526         nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid);
527     }
528 
529     nxt_thread_mutex_unlock(&process->incoming.mutex);
530 
531     return mmap_handler;
532 }
533 
534 
535 nxt_int_t
536 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
537     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
538 {
539     nxt_chunk_id_t           c;
540     nxt_port_mmap_header_t   *hdr;
541     nxt_port_mmap_handler_t  *mmap_handler;
542 
543     nxt_debug(task, "request tracking for stream #%uD", stream);
544 
545     mmap_handler = nxt_port_mmap_get(task, mmaps, &c, 1, 1);
546     if (nxt_slow_path(mmap_handler == NULL)) {
547         return NXT_ERROR;
548     }
549 
550     nxt_port_mmap_handler_use(mmap_handler, 1);
551 
552     hdr = mmap_handler->hdr;
553 
554     tracking->mmap_handler = mmap_handler;
555     tracking->tracking = hdr->tracking + c;
556 
557     *tracking->tracking = stream;
558 
559     nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d",
560               hdr->src_pid, hdr->dst_pid, hdr->id, c);
561 
562     return NXT_OK;
563 }
564 
565 
566 nxt_bool_t
567 nxt_port_mmap_tracking_cancel(nxt_task_t *task,
568     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
569 {
570     nxt_bool_t               res;
571     nxt_chunk_id_t           c;
572     nxt_port_mmap_header_t   *hdr;
573     nxt_port_mmap_handler_t  *mmap_handler;
574 
575     mmap_handler = tracking->mmap_handler;
576 
577     if (nxt_slow_path(mmap_handler == NULL)) {
578         return 0;
579     }
580 
581     hdr = mmap_handler->hdr;
582 
583     res = nxt_atomic_cmp_set(tracking->tracking, stream, 0);
584 
585     nxt_debug(task, "%s tracking for stream #%uD",
586               (res ? "cancelled" : "failed to cancel"), stream);
587 
588     if (!res) {
589         c = tracking->tracking - hdr->tracking;
590         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
591     }
592 
593     nxt_port_mmap_handler_use(mmap_handler, -1);
594 
595     return res;
596 }
597 
598 
599 nxt_int_t
600 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t)
601 {
602     nxt_port_mmap_handler_t  *mmap_handler;
603 
604     mmap_handler = t->mmap_handler;
605 
606 #if (NXT_DEBUG)
607     {
608     nxt_atomic_t  *tracking;
609 
610     tracking = mmap_handler->hdr->tracking;
611 
612     nxt_assert(t->tracking >= tracking);
613     nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT);
614     }
615 #endif
616 
617     buf[0] = mmap_handler->hdr->id;
618     buf[1] = t->tracking - mmap_handler->hdr->tracking;
619 
620     return NXT_OK;
621 }
622 
623 nxt_bool_t
624 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
625 {
626     nxt_buf_t                     *b;
627     nxt_bool_t                    res;
628     nxt_chunk_id_t                c;
629     nxt_port_mmap_header_t        *hdr;
630     nxt_port_mmap_handler_t       *mmap_handler;
631     nxt_port_mmap_tracking_msg_t  *tracking_msg;
632 
633     b = msg->buf;
634 
635     if (nxt_buf_used_size(b) < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
636         nxt_debug(task, "too small message %O", nxt_buf_used_size(b));
637         return 0;
638     }
639 
640     tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos;
641 
642     b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t);
643     mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid,
644                                                    tracking_msg->mmap_id);
645 
646     if (nxt_slow_path(mmap_handler == NULL)) {
647         return 0;
648     }
649 
650     hdr = mmap_handler->hdr;
651 
652     c = tracking_msg->tracking_id;
653     res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0);
654 
655     nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream,
656               (res ? "received" : "already cancelled"));
657 
658     if (!res) {
659         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
660     }
661 
662     return res;
663 }
664 
665 
666 nxt_buf_t *
667 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size)
668 {
669     nxt_mp_t                 *mp;
670     nxt_buf_t                *b;
671     nxt_int_t                nchunks;
672     nxt_chunk_id_t           c;
673     nxt_port_mmap_header_t   *hdr;
674     nxt_port_mmap_handler_t  *mmap_handler;
675 
676     nxt_debug(task, "request %z bytes shm buffer", size);
677 
678     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
679 
680     if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) {
681         nxt_alert(task, "requested buffer (%z) too big", size);
682 
683         return NULL;
684     }
685 
686     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
687     if (nxt_slow_path(b == NULL)) {
688         return NULL;
689     }
690 
691     b->completion_handler = nxt_port_mmap_buf_completion;
692     nxt_buf_set_port_mmap(b);
693 
694     mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0);
695     if (nxt_slow_path(mmap_handler == NULL)) {
696         mp = task->thread->engine->mem_pool;
697         nxt_mp_free(mp, b);
698         nxt_mp_release(mp);
699         return NULL;
700     }
701 
702     b->parent = mmap_handler;
703 
704     nxt_port_mmap_handler_use(mmap_handler, 1);
705 
706     hdr = mmap_handler->hdr;
707 
708     b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
709     b->mem.pos = b->mem.start;
710     b->mem.free = b->mem.start;
711     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
712 
713     nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
714               b, b->mem.start, b->mem.end - b->mem.start,
715               hdr->src_pid, hdr->dst_pid, hdr->id, c);
716 
717     return b;
718 }
719 
720 
721 nxt_int_t
722 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
723     size_t min_size)
724 {
725     size_t                   nchunks, free_size;
726     nxt_chunk_id_t           c, start;
727     nxt_port_mmap_header_t   *hdr;
728     nxt_port_mmap_handler_t  *mmap_handler;
729 
730     nxt_debug(task, "request increase %z bytes shm buffer", size);
731 
732     if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
733         nxt_log(task, NXT_LOG_WARN,
734                 "failed to increase, not a mmap buffer");
735         return NXT_ERROR;
736     }
737 
738     free_size = nxt_buf_mem_free_size(&b->mem);
739 
740     if (nxt_slow_path(size <= free_size)) {
741         return NXT_OK;
742     }
743 
744     mmap_handler = b->parent;
745     hdr = mmap_handler->hdr;
746 
747     start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
748 
749     size -= free_size;
750 
751     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
752 
753     c = start;
754 
755     /* Try to acquire as much chunks as required. */
756     while (nchunks > 0) {
757 
758         if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
759             break;
760         }
761 
762         c++;
763         nchunks--;
764     }
765 
766     if (nchunks != 0
767         && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start))
768     {
769         c--;
770         while (c >= start) {
771             nxt_port_mmap_set_chunk_free(hdr->free_map, c);
772             c--;
773         }
774 
775         nxt_debug(task, "failed to increase, %uz chunks busy", nchunks);
776 
777         return NXT_ERROR;
778 
779     } else {
780         b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
781 
782         return NXT_OK;
783     }
784 }
785 
786 
787 static nxt_buf_t *
788 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
789     nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
790 {
791     size_t                   nchunks;
792     nxt_buf_t                *b;
793     nxt_port_mmap_header_t   *hdr;
794     nxt_port_mmap_handler_t  *mmap_handler;
795 
796     mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
797                                                    mmap_msg->mmap_id);
798     if (nxt_slow_path(mmap_handler == NULL)) {
799         return NULL;
800     }
801 
802     b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
803     if (nxt_slow_path(b == NULL)) {
804         return NULL;
805     }
806 
807     b->completion_handler = nxt_port_mmap_buf_completion;
808 
809     nxt_buf_set_port_mmap(b);
810 
811     nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
812     if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
813         nchunks++;
814     }
815 
816     hdr = mmap_handler->hdr;
817 
818     b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
819     b->mem.pos = b->mem.start;
820     b->mem.free = b->mem.start + mmap_msg->size;
821     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
822 
823     b->parent = mmap_handler;
824     nxt_port_mmap_handler_use(mmap_handler, 1);
825 
826     nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
827               b, b->mem.start, b->mem.end - b->mem.start,
828               hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
829 
830     return b;
831 }
832 
833 
834 void
835 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
836     nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf)
837 {
838     size_t                   bsize;
839     nxt_buf_t                *bmem;
840     nxt_uint_t               i;
841     nxt_port_mmap_msg_t      *mmap_msg;
842     nxt_port_mmap_header_t   *hdr;
843     nxt_port_mmap_handler_t  *mmap_handler;
844 
845     nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
846                     "via shared memory", sb->size, port->pid);
847 
848     bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
849     mmap_msg = mmsg_buf;
850 
851     bmem = msg->buf;
852 
853     for (i = 0; i < sb->niov; i++, mmap_msg++) {
854 
855         /* Lookup buffer which starts current iov_base. */
856         while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
857             bmem = bmem->next;
858         }
859 
860         if (nxt_slow_path(bmem == NULL)) {
861             nxt_log_error(NXT_LOG_ERR, task->log,
862                           "failed to find buf for iobuf[%d]", i);
863             return;
864             /* TODO clear b and exit */
865         }
866 
867         mmap_handler = bmem->parent;
868         hdr = mmap_handler->hdr;
869 
870         mmap_msg->mmap_id = hdr->id;
871         mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
872         mmap_msg->size = sb->iobuf[i].iov_len;
873 
874         nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
875                   mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
876                   port->pid);
877     }
878 
879     sb->iobuf[0].iov_base = mmsg_buf;
880     sb->iobuf[0].iov_len = bsize;
881     sb->niov = 1;
882     sb->size = bsize;
883 
884     msg->port_msg.mmap = 1;
885 }
886 
887 
888 void
889 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
890 {
891     nxt_buf_t            *b, **pb;
892     nxt_port_mmap_msg_t  *end, *mmap_msg;
893 
894     pb = &msg->buf;
895     msg->size = 0;
896 
897     for (b = msg->buf; b != NULL; b = b->next) {
898 
899         mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
900         end = (nxt_port_mmap_msg_t *) b->mem.free;
901 
902         while (mmap_msg < end) {
903             nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
904                       mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
905                       msg->port_msg.pid);
906 
907             *pb = nxt_port_mmap_get_incoming_buf(task, msg->port,
908                                                  msg->port_msg.pid, mmap_msg);
909             if (nxt_slow_path(*pb == NULL)) {
910                 nxt_log_error(NXT_LOG_ERR, task->log,
911                               "failed to get mmap buffer");
912 
913                 break;
914             }
915 
916             msg->size += mmap_msg->size;
917             pb = &(*pb)->next;
918             mmap_msg++;
919 
920             /* Mark original buf as complete. */
921             b->mem.pos += sizeof(nxt_port_mmap_msg_t);
922         }
923     }
924 }
925 
926 
927 nxt_port_method_t
928 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
929 {
930     nxt_port_method_t  m;
931 
932     m = NXT_PORT_METHOD_ANY;
933 
934     for (/* void */; b != NULL; b = b->next) {
935         if (nxt_buf_used_size(b) == 0) {
936             /* empty buffers does not affect method */
937             continue;
938         }
939 
940         if (nxt_buf_is_port_mmap(b)) {
941             if (m == NXT_PORT_METHOD_PLAIN) {
942                 nxt_log_error(NXT_LOG_ERR, task->log,
943                               "mixing plain and mmap buffers, "
944                               "using plain mode");
945 
946                 break;
947             }
948 
949             if (m == NXT_PORT_METHOD_ANY) {
950                 nxt_debug(task, "using mmap mode");
951 
952                 m = NXT_PORT_METHOD_MMAP;
953             }
954         } else {
955             if (m == NXT_PORT_METHOD_MMAP) {
956                 nxt_log_error(NXT_LOG_ERR, task->log,
957                               "mixing mmap and plain buffers, "
958                               "switching to plain mode");
959 
960                 m = NXT_PORT_METHOD_PLAIN;
961 
962                 break;
963             }
964 
965             if (m == NXT_PORT_METHOD_ANY) {
966                 nxt_debug(task, "using plain mode");
967 
968                 m = NXT_PORT_METHOD_PLAIN;
969             }
970         }
971     }
972 
973     return m;
974 }
975 
976 
977 void
978 nxt_process_broadcast_shm_ack(nxt_task_t *task, nxt_process_t *process)
979 {
980     nxt_port_t  *port;
981 
982     if (nxt_slow_path(process == NULL || nxt_queue_is_empty(&process->ports)))
983     {
984         return;
985     }
986 
987     port = nxt_process_port_first(process);
988 
989     if (port->type == NXT_PROCESS_APP) {
990         nxt_port_post(task, port, nxt_port_broadcast_shm_ack, process);
991     }
992 }
993 
994 
995 static void
996 nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port, void *data)
997 {
998     nxt_process_t  *process;
999 
1000     process = data;
1001 
1002     nxt_queue_each(port, &process->ports, nxt_port_t, link) {
1003         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
1004                                      -1, 0, 0, NULL);
1005     } nxt_queue_loop;
1006 }
1007