xref: /unit/src/nxt_port_memory.c (revision 551:220b0834790b)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 #if (NXT_HAVE_MEMFD_CREATE)
10 
11 #include <linux/memfd.h>
12 #include <unistd.h>
13 #include <sys/syscall.h>
14 
15 #endif
16 
17 #include <nxt_port_memory_int.h>
18 
19 
20 nxt_inline void
21 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
22 {
23     int c;
24 
25     c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
26 
27     if (i < 0 && c == -i) {
28         if (mmap_handler->hdr != NULL) {
29             nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
30             mmap_handler->hdr = NULL;
31         }
32 
33         nxt_free(mmap_handler);
34     }
35 }
36 
37 
38 static nxt_port_mmap_t *
39 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
40 {
41     uint32_t  cap;
42 
43     cap = port_mmaps->cap;
44 
45     if (cap == 0) {
46         cap = i + 1;
47     }
48 
49     while (i + 1 > cap) {
50 
51         if (cap < 16) {
52           cap = cap * 2;
53 
54         } else {
55           cap = cap + cap / 2;
56         }
57     }
58 
59     if (cap != port_mmaps->cap) {
60 
61         port_mmaps->elts = nxt_realloc(port_mmaps->elts,
62                                        cap * sizeof(nxt_port_mmap_t));
63         if (nxt_slow_path(port_mmaps->elts == NULL)) {
64             return NULL;
65         }
66 
67         nxt_memzero(port_mmaps->elts + port_mmaps->cap,
68                     sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap));
69 
70         port_mmaps->cap = cap;
71     }
72 
73     if (i + 1 > port_mmaps->size) {
74         port_mmaps->size = i + 1;
75     }
76 
77     return port_mmaps->elts + i;
78 }
79 
80 
81 void
82 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
83 {
84     uint32_t         i;
85     nxt_port_mmap_t  *port_mmap;
86 
87     if (port_mmaps == NULL) {
88         return;
89     }
90 
91     port_mmap = port_mmaps->elts;
92 
93     for (i = 0; i < port_mmaps->size; i++) {
94         nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
95     }
96 
97     port_mmaps->size = 0;
98 
99     if (free_elts != 0) {
100         nxt_free(port_mmaps->elts);
101     }
102 }
103 
104 
105 #define nxt_port_mmap_free_junk(p, size)                                      \
106     memset((p), 0xA5, size)
107 
108 
109 static void
110 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
111 {
112     u_char                   *p;
113     nxt_mp_t                 *mp;
114     nxt_buf_t                *b;
115     nxt_chunk_id_t           c;
116     nxt_port_mmap_header_t   *hdr;
117     nxt_port_mmap_handler_t  *mmap_handler;
118 
119     if (nxt_buf_ts_handle(task, obj, data)) {
120         return;
121     }
122 
123     b = obj;
124 
125     mp = b->data;
126 
127 #if (NXT_DEBUG)
128     if (nxt_slow_path(data != b->parent)) {
129         nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
130                       data, b->parent);
131         nxt_abort();
132     }
133 #endif
134 
135     mmap_handler = data;
136     hdr = mmap_handler->hdr;
137 
138     if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
139         nxt_debug(task, "mmap buf completion: mmap for other process pair "
140                   "%PI->%PI", hdr->src_pid, hdr->dst_pid);
141 
142         goto release_buf;
143     }
144 
145     if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
146         /*
147          * Chunks until b->mem.pos has been sent to other side,
148          * let's release rest (if any).
149          */
150         p = b->mem.pos - 1;
151         c = nxt_port_mmap_chunk_id(hdr, p) + 1;
152         p = nxt_port_mmap_chunk_start(hdr, c);
153 
154     } else {
155         p = b->mem.start;
156         c = nxt_port_mmap_chunk_id(hdr, p);
157     }
158 
159     nxt_port_mmap_free_junk(p, b->mem.end - p);
160 
161     nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), "
162               "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start,
163               b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c);
164 
165     while (p < b->mem.end) {
166         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
167 
168         p += PORT_MMAP_CHUNK_SIZE;
169         c++;
170     }
171 
172 release_buf:
173 
174     nxt_port_mmap_handler_use(mmap_handler, -1);
175 
176     nxt_mp_free(mp, b);
177     nxt_mp_release(mp);
178 }
179 
180 
181 nxt_port_mmap_handler_t *
182 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
183     nxt_fd_t fd)
184 {
185     void                     *mem;
186     struct stat              mmap_stat;
187     nxt_port_mmap_t          *port_mmap;
188     nxt_port_mmap_header_t   *hdr;
189     nxt_port_mmap_handler_t  *mmap_handler;
190 
191     nxt_debug(task, "got new mmap fd #%FD from process %PI",
192               fd, process->pid);
193 
194     port_mmap = NULL;
195 
196     if (fstat(fd, &mmap_stat) == -1) {
197         nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
198 
199         return NULL;
200     }
201 
202     mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
203                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
204 
205     if (nxt_slow_path(mem == MAP_FAILED)) {
206         nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
207 
208         return NULL;
209     }
210 
211     hdr = mem;
212 
213     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
214     if (nxt_slow_path(mmap_handler == NULL)) {
215         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
216 
217         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
218 
219         return NULL;
220     }
221 
222     mmap_handler->hdr = hdr;
223 
224     if (nxt_slow_path(hdr->src_pid != process->pid
225                       || hdr->dst_pid != nxt_pid))
226     {
227         nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: "
228                 "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid,
229                 hdr->dst_pid, nxt_pid);
230 
231         return NULL;
232     }
233 
234     nxt_thread_mutex_lock(&process->incoming.mutex);
235 
236     port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
237     if (nxt_slow_path(port_mmap == NULL)) {
238         nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
239 
240         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
241         hdr = NULL;
242 
243         nxt_free(mmap_handler);
244         mmap_handler = NULL;
245 
246         goto fail;
247     }
248 
249     port_mmap->mmap_handler = mmap_handler;
250     nxt_port_mmap_handler_use(mmap_handler, 1);
251 
252     hdr->sent_over = 0xFFFFu;
253 
254 fail:
255 
256     nxt_thread_mutex_unlock(&process->incoming.mutex);
257 
258     return mmap_handler;
259 }
260 
261 
262 static nxt_port_mmap_handler_t *
263 nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
264     nxt_port_t *port, nxt_bool_t tracking)
265 {
266     void                     *mem;
267     u_char                   *p, name[64];
268     nxt_fd_t                 fd;
269     nxt_free_map_t           *free_map;
270     nxt_port_mmap_t          *port_mmap;
271     nxt_port_mmap_header_t   *hdr;
272     nxt_port_mmap_handler_t  *mmap_handler;
273 
274     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
275     if (nxt_slow_path(mmap_handler == NULL)) {
276         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
277 
278         return NULL;
279     }
280 
281     port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
282     if (nxt_slow_path(port_mmap == NULL)) {
283         nxt_log(task, NXT_LOG_WARN,
284                 "failed to add port mmap to outgoing array");
285 
286         nxt_free(mmap_handler);
287         return NULL;
288     }
289 
290     p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
291                     nxt_pid, nxt_random(&task->thread->random));
292     *p = '\0';
293 
294 #if (NXT_HAVE_MEMFD_CREATE)
295 
296     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
297 
298     if (nxt_slow_path(fd == -1)) {
299         nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E",
300                 name, nxt_errno);
301 
302         goto remove_fail;
303     }
304 
305     nxt_debug(task, "memfd_create(%s): %FD", name, fd);
306 
307 #elif (NXT_HAVE_SHM_OPEN)
308 
309     /* Just in case. */
310     shm_unlink((char *) name);
311 
312     fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
313 
314     nxt_debug(task, "shm_open(%s): %FD", name, fd);
315 
316     if (nxt_slow_path(fd == -1)) {
317         nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno);
318 
319         goto remove_fail;
320     }
321 
322     if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
323         nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
324                 nxt_errno);
325     }
326 
327 #else
328 
329 #error No working shared memory implementation.
330 
331 #endif
332 
333     if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
334         nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
335 
336         goto remove_fail;
337     }
338 
339     mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
340                        MAP_SHARED, fd, 0);
341 
342     if (nxt_slow_path(mem == MAP_FAILED)) {
343         goto remove_fail;
344     }
345 
346     mmap_handler->hdr = mem;
347     port_mmap->mmap_handler = mmap_handler;
348     nxt_port_mmap_handler_use(mmap_handler, 1);
349 
350     /* Init segment header. */
351     hdr = mmap_handler->hdr;
352 
353     nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
354     nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
355 
356     hdr->id = process->outgoing.size - 1;
357     hdr->src_pid = nxt_pid;
358     hdr->dst_pid = process->pid;
359     hdr->sent_over = port->id;
360 
361     /* Mark first chunk as busy */
362     free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
363 
364     nxt_port_mmap_set_chunk_busy(free_map, 0);
365 
366     /* Mark as busy chunk followed the last available chunk. */
367     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
368     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
369 
370     nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
371 
372     /* TODO handle error */
373     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
374 
375     nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
376             hdr->id, nxt_pid, process->pid);
377 
378     return mmap_handler;
379 
380 remove_fail:
381 
382     nxt_free(mmap_handler);
383 
384     process->outgoing.size--;
385 
386     return NULL;
387 }
388 
389 
390 static nxt_port_mmap_handler_t *
391 nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
392     nxt_bool_t tracking)
393 {
394     nxt_process_t            *process;
395     nxt_free_map_t           *free_map;
396     nxt_port_mmap_t          *port_mmap;
397     nxt_port_mmap_t          *end_port_mmap;
398     nxt_port_mmap_header_t   *hdr;
399     nxt_port_mmap_handler_t  *mmap_handler;
400 
401     process = port->process;
402     if (nxt_slow_path(process == NULL)) {
403         return NULL;
404     }
405 
406     *c = 0;
407 
408     nxt_thread_mutex_lock(&process->outgoing.mutex);
409 
410     end_port_mmap = process->outgoing.elts + process->outgoing.size;
411 
412     for (port_mmap = process->outgoing.elts;
413          port_mmap < end_port_mmap;
414          port_mmap++)
415     {
416         mmap_handler = port_mmap->mmap_handler;
417         hdr = mmap_handler->hdr;
418 
419         if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) {
420             continue;
421         }
422 
423         free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
424 
425         if (nxt_port_mmap_get_free_chunk(free_map, c)) {
426             goto unlock_return;
427         }
428     }
429 
430     /* TODO introduce port_mmap limit and release wait. */
431 
432     mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking);
433 
434 unlock_return:
435 
436     nxt_thread_mutex_unlock(&process->outgoing.mutex);
437 
438     return mmap_handler;
439 }
440 
441 
442 static nxt_port_mmap_handler_t *
443 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
444 {
445     nxt_process_t            *process;
446     nxt_port_mmap_handler_t  *mmap_handler;
447 
448     process = nxt_runtime_process_find(task->thread->runtime, spid);
449     if (nxt_slow_path(process == NULL)) {
450         return NULL;
451     }
452 
453     nxt_thread_mutex_lock(&process->incoming.mutex);
454 
455     if (nxt_fast_path(process->incoming.size > id)) {
456         mmap_handler = process->incoming.elts[id].mmap_handler;
457 
458     } else {
459         mmap_handler = NULL;
460 
461         nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid);
462     }
463 
464     nxt_thread_mutex_unlock(&process->incoming.mutex);
465 
466     return mmap_handler;
467 }
468 
469 
470 nxt_int_t
471 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
472     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
473 {
474     nxt_chunk_id_t           c;
475     nxt_port_mmap_header_t   *hdr;
476     nxt_port_mmap_handler_t  *mmap_handler;
477 
478     nxt_debug(task, "request tracking for stream #%uD", stream);
479 
480     mmap_handler = nxt_port_mmap_get(task, port, &c, 1);
481     if (nxt_slow_path(mmap_handler == NULL)) {
482         return NXT_ERROR;
483     }
484 
485     nxt_port_mmap_handler_use(mmap_handler, 1);
486 
487     hdr = mmap_handler->hdr;
488 
489     tracking->mmap_handler = mmap_handler;
490     tracking->tracking = hdr->tracking + c;
491 
492     *tracking->tracking = stream;
493 
494     nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d",
495               hdr->src_pid, hdr->dst_pid, hdr->id, c);
496 
497     return NXT_OK;
498 }
499 
500 
501 nxt_bool_t
502 nxt_port_mmap_tracking_cancel(nxt_task_t *task,
503     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
504 {
505     nxt_bool_t               res;
506     nxt_chunk_id_t           c;
507     nxt_port_mmap_header_t   *hdr;
508     nxt_port_mmap_handler_t  *mmap_handler;
509 
510     mmap_handler = tracking->mmap_handler;
511 
512     if (nxt_slow_path(mmap_handler == NULL)) {
513         return 0;
514     }
515 
516     hdr = mmap_handler->hdr;
517 
518     res = nxt_atomic_cmp_set(tracking->tracking, stream, 0);
519 
520     nxt_debug(task, "%s tracking for stream #%uD",
521               (res ? "cancelled" : "failed to cancel"), stream);
522 
523     if (!res) {
524         c = tracking->tracking - hdr->tracking;
525         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
526     }
527 
528     nxt_port_mmap_handler_use(mmap_handler, -1);
529 
530     return res;
531 }
532 
533 
534 nxt_int_t
535 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t)
536 {
537     nxt_port_mmap_handler_t  *mmap_handler;
538 
539     mmap_handler = t->mmap_handler;
540 
541 #if (NXT_DEBUG)
542     {
543     nxt_atomic_t  *tracking;
544 
545     tracking = mmap_handler->hdr->tracking;
546 
547     nxt_assert(t->tracking >= tracking);
548     nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT);
549     }
550 #endif
551 
552     buf[0] = mmap_handler->hdr->id;
553     buf[1] = t->tracking - mmap_handler->hdr->tracking;
554 
555     return NXT_OK;
556 }
557 
558 nxt_bool_t
559 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
560 {
561     nxt_buf_t                     *b;
562     nxt_bool_t                    res;
563     nxt_chunk_id_t                c;
564     nxt_port_mmap_header_t        *hdr;
565     nxt_port_mmap_handler_t       *mmap_handler;
566     nxt_port_mmap_tracking_msg_t  *tracking_msg;
567 
568     b = msg->buf;
569 
570     if (nxt_buf_used_size(b) < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
571         nxt_debug(task, "too small message %O", nxt_buf_used_size(b));
572         return 0;
573     }
574 
575     tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos;
576 
577     b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t);
578     mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid,
579                                                    tracking_msg->mmap_id);
580 
581     if (nxt_slow_path(mmap_handler == NULL)) {
582         return 0;
583     }
584 
585     hdr = mmap_handler->hdr;
586 
587     c = tracking_msg->tracking_id;
588     res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0);
589 
590     nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream,
591               (res ? "received" : "already cancelled"));
592 
593     if (!res) {
594         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
595     }
596 
597     return res;
598 }
599 
600 
601 nxt_buf_t *
602 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
603 {
604     size_t                   nchunks;
605     nxt_mp_t                 *mp;
606     nxt_buf_t                *b;
607     nxt_chunk_id_t           c;
608     nxt_port_mmap_header_t   *hdr;
609     nxt_port_mmap_handler_t  *mmap_handler;
610 
611     nxt_debug(task, "request %z bytes shm buffer", size);
612 
613     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
614     if (nxt_slow_path(b == NULL)) {
615         return NULL;
616     }
617 
618     b->completion_handler = nxt_port_mmap_buf_completion;
619     nxt_buf_set_port_mmap(b);
620 
621     mmap_handler = nxt_port_mmap_get(task, port, &c, 0);
622     if (nxt_slow_path(mmap_handler == NULL)) {
623         mp = task->thread->engine->mem_pool;
624         nxt_mp_free(mp, b);
625         nxt_mp_release(mp);
626         return NULL;
627     }
628 
629     b->parent = mmap_handler;
630 
631     nxt_port_mmap_handler_use(mmap_handler, 1);
632 
633     hdr = mmap_handler->hdr;
634 
635     b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
636     b->mem.pos = b->mem.start;
637     b->mem.free = b->mem.start;
638     b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
639 
640     nchunks = size / PORT_MMAP_CHUNK_SIZE;
641     if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
642         nchunks++;
643     }
644 
645     nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
646               b, b->mem.start, b->mem.end - b->mem.start,
647               hdr->src_pid, hdr->dst_pid, hdr->id, c);
648 
649     c++;
650     nchunks--;
651 
652     /* Try to acquire as much chunks as required. */
653     while (nchunks > 0) {
654 
655         if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
656             break;
657         }
658 
659         b->mem.end += PORT_MMAP_CHUNK_SIZE;
660         c++;
661         nchunks--;
662     }
663 
664     return b;
665 }
666 
667 
668 nxt_int_t
669 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
670     size_t min_size)
671 {
672     size_t                   nchunks, free_size;
673     nxt_chunk_id_t           c, start;
674     nxt_port_mmap_header_t   *hdr;
675     nxt_port_mmap_handler_t  *mmap_handler;
676 
677     nxt_debug(task, "request increase %z bytes shm buffer", size);
678 
679     if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
680         nxt_log(task, NXT_LOG_WARN,
681                 "failed to increase, not a mmap buffer");
682         return NXT_ERROR;
683     }
684 
685     free_size = nxt_buf_mem_free_size(&b->mem);
686 
687     if (nxt_slow_path(size <= free_size)) {
688         return NXT_OK;
689     }
690 
691     mmap_handler = b->parent;
692     hdr = mmap_handler->hdr;
693 
694     start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
695 
696     size -= free_size;
697 
698     nchunks = size / PORT_MMAP_CHUNK_SIZE;
699     if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
700         nchunks++;
701     }
702 
703     c = start;
704 
705     /* Try to acquire as much chunks as required. */
706     while (nchunks > 0) {
707 
708         if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
709             break;
710         }
711 
712         c++;
713         nchunks--;
714     }
715 
716     if (nchunks != 0
717         && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start))
718     {
719         c--;
720         while (c >= start) {
721             nxt_port_mmap_set_chunk_free(hdr->free_map, c);
722             c--;
723         }
724 
725         nxt_debug(task, "failed to increase, %uz chunks busy", nchunks);
726 
727         return NXT_ERROR;
728 
729     } else {
730         b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
731 
732         return NXT_OK;
733     }
734 }
735 
736 
737 static nxt_buf_t *
738 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
739     nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
740 {
741     size_t                   nchunks;
742     nxt_buf_t                *b;
743     nxt_port_mmap_header_t   *hdr;
744     nxt_port_mmap_handler_t  *mmap_handler;
745 
746     mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
747                                                    mmap_msg->mmap_id);
748     if (nxt_slow_path(mmap_handler == NULL)) {
749         return NULL;
750     }
751 
752     b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
753     if (nxt_slow_path(b == NULL)) {
754         return NULL;
755     }
756 
757     b->completion_handler = nxt_port_mmap_buf_completion;
758 
759     nxt_buf_set_port_mmap(b);
760 
761     nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
762     if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
763         nchunks++;
764     }
765 
766     hdr = mmap_handler->hdr;
767 
768     b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
769     b->mem.pos = b->mem.start;
770     b->mem.free = b->mem.start + mmap_msg->size;
771     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
772 
773     b->parent = mmap_handler;
774     nxt_port_mmap_handler_use(mmap_handler, 1);
775 
776     nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
777               b, b->mem.start, b->mem.end - b->mem.start,
778               hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
779 
780     return b;
781 }
782 
783 
784 void
785 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
786     nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
787 {
788     size_t                   bsize;
789     nxt_buf_t                *bmem;
790     nxt_uint_t               i;
791     nxt_port_mmap_msg_t      *mmap_msg;
792     nxt_port_mmap_header_t   *hdr;
793     nxt_port_mmap_handler_t  *mmap_handler;
794 
795     nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
796                     "via shared memory", sb->size, port->pid);
797 
798     bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
799     mmap_msg = port->mmsg_buf;
800 
801     bmem = msg->buf;
802 
803     for (i = 0; i < sb->niov; i++, mmap_msg++) {
804 
805         /* Lookup buffer which starts current iov_base. */
806         while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
807             bmem = bmem->next;
808         }
809 
810         if (nxt_slow_path(bmem == NULL)) {
811             nxt_log_error(NXT_LOG_ERR, task->log,
812                           "failed to find buf for iobuf[%d]", i);
813             return;
814             /* TODO clear b and exit */
815         }
816 
817         mmap_handler = bmem->parent;
818         hdr = mmap_handler->hdr;
819 
820         mmap_msg->mmap_id = hdr->id;
821         mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
822         mmap_msg->size = sb->iobuf[i].iov_len;
823 
824         nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
825                   mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
826                   port->pid);
827     }
828 
829     sb->iobuf[0].iov_base = port->mmsg_buf;
830     sb->iobuf[0].iov_len = bsize;
831     sb->niov = 1;
832     sb->size = bsize;
833 
834     msg->port_msg.mmap = 1;
835 }
836 
837 
838 void
839 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
840 {
841     nxt_buf_t            *b, **pb;
842     nxt_port_mmap_msg_t  *end, *mmap_msg;
843 
844     pb = &msg->buf;
845     msg->size = 0;
846 
847     for (b = msg->buf; b != NULL; b = b->next) {
848 
849         mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
850         end = (nxt_port_mmap_msg_t *) b->mem.free;
851 
852         while (mmap_msg < end) {
853             nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
854                       mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
855                       msg->port_msg.pid);
856 
857             *pb = nxt_port_mmap_get_incoming_buf(task, msg->port,
858                                                  msg->port_msg.pid, mmap_msg);
859             if (nxt_slow_path(*pb == NULL)) {
860                 nxt_log_error(NXT_LOG_ERR, task->log,
861                               "failed to get mmap buffer");
862 
863                 break;
864             }
865 
866             msg->size += mmap_msg->size;
867             pb = &(*pb)->next;
868             mmap_msg++;
869 
870             /* Mark original buf as complete. */
871             b->mem.pos += sizeof(nxt_port_mmap_msg_t);
872         }
873     }
874 }
875 
876 
877 nxt_port_method_t
878 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
879 {
880     nxt_port_method_t        m;
881     nxt_port_mmap_header_t   *hdr;
882     nxt_port_mmap_handler_t  *mmap_handler;
883 
884     m = NXT_PORT_METHOD_ANY;
885 
886     for (; b != NULL; b = b->next) {
887         if (nxt_buf_used_size(b) == 0) {
888             /* empty buffers does not affect method */
889             continue;
890         }
891 
892         if (nxt_buf_is_port_mmap(b)) {
893             mmap_handler = b->parent;
894             hdr = mmap_handler->hdr;
895 
896             if (m == NXT_PORT_METHOD_PLAIN) {
897                 nxt_log_error(NXT_LOG_ERR, task->log,
898                               "mixing plain and mmap buffers, "
899                               "using plain mode");
900 
901                 break;
902             }
903 
904             if (port->pid != hdr->dst_pid) {
905                 nxt_log_error(NXT_LOG_ERR, task->log,
906                               "send mmap buffer for %PI to %PI, "
907                               "using plain mode", hdr->dst_pid, port->pid);
908 
909                 m = NXT_PORT_METHOD_PLAIN;
910 
911                 break;
912             }
913 
914             if (m == NXT_PORT_METHOD_ANY) {
915                 nxt_debug(task, "using mmap mode");
916 
917                 m = NXT_PORT_METHOD_MMAP;
918             }
919         } else {
920             if (m == NXT_PORT_METHOD_MMAP) {
921                 nxt_log_error(NXT_LOG_ERR, task->log,
922                               "mixing mmap and plain buffers, "
923                               "switching to plain mode");
924 
925                 m = NXT_PORT_METHOD_PLAIN;
926 
927                 break;
928             }
929 
930             if (m == NXT_PORT_METHOD_ANY) {
931                 nxt_debug(task, "using plain mode");
932 
933                 m = NXT_PORT_METHOD_PLAIN;
934             }
935         }
936     }
937 
938     return m;
939 }
940