xref: /unit/src/nxt_port_memory.c (revision 1526:5c2a0b6f92e7)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 #if (NXT_HAVE_MEMFD_CREATE)
10 
11 #include <linux/memfd.h>
12 #include <unistd.h>
13 #include <sys/syscall.h>
14 
15 #endif
16 
17 #include <nxt_port_memory_int.h>
18 
19 
20 nxt_inline void
21 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
22 {
23     int  c;
24 
25     c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
26 
27     if (i < 0 && c == -i) {
28         if (mmap_handler->hdr != NULL) {
29             nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
30             mmap_handler->hdr = NULL;
31         }
32 
33         nxt_free(mmap_handler);
34     }
35 }
36 
37 
38 static nxt_port_mmap_t *
39 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
40 {
41     uint32_t  cap;
42 
43     cap = port_mmaps->cap;
44 
45     if (cap == 0) {
46         cap = i + 1;
47     }
48 
49     while (i + 1 > cap) {
50 
51         if (cap < 16) {
52             cap = cap * 2;
53 
54         } else {
55             cap = cap + cap / 2;
56         }
57     }
58 
59     if (cap != port_mmaps->cap) {
60 
61         port_mmaps->elts = nxt_realloc(port_mmaps->elts,
62                                        cap * sizeof(nxt_port_mmap_t));
63         if (nxt_slow_path(port_mmaps->elts == NULL)) {
64             return NULL;
65         }
66 
67         nxt_memzero(port_mmaps->elts + port_mmaps->cap,
68                     sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap));
69 
70         port_mmaps->cap = cap;
71     }
72 
73     if (i + 1 > port_mmaps->size) {
74         port_mmaps->size = i + 1;
75     }
76 
77     return port_mmaps->elts + i;
78 }
79 
80 
81 void
82 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
83 {
84     uint32_t         i;
85     nxt_port_mmap_t  *port_mmap;
86 
87     if (port_mmaps == NULL) {
88         return;
89     }
90 
91     port_mmap = port_mmaps->elts;
92 
93     for (i = 0; i < port_mmaps->size; i++) {
94         nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
95     }
96 
97     port_mmaps->size = 0;
98 
99     if (free_elts != 0) {
100         nxt_free(port_mmaps->elts);
101     }
102 }
103 
104 
105 #define nxt_port_mmap_free_junk(p, size)                                      \
106     memset((p), 0xA5, size)
107 
108 
109 static void
110 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
111 {
112     u_char                   *p;
113     nxt_mp_t                 *mp;
114     nxt_buf_t                *b, *next;
115     nxt_port_t               *port;
116     nxt_process_t            *process;
117     nxt_chunk_id_t           c;
118     nxt_port_mmap_header_t   *hdr;
119     nxt_port_mmap_handler_t  *mmap_handler;
120 
121     if (nxt_buf_ts_handle(task, obj, data)) {
122         return;
123     }
124 
125     b = obj;
126 
127     nxt_assert(data == b->parent);
128 
129     mmap_handler = data;
130 
131 complete_buf:
132 
133     hdr = mmap_handler->hdr;
134 
135     if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
136         nxt_debug(task, "mmap buf completion: mmap for other process pair "
137                   "%PI->%PI", hdr->src_pid, hdr->dst_pid);
138 
139         goto release_buf;
140     }
141 
142     if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
143         /*
144          * Chunks until b->mem.pos has been sent to other side,
145          * let's release rest (if any).
146          */
147         p = b->mem.pos - 1;
148         c = nxt_port_mmap_chunk_id(hdr, p) + 1;
149         p = nxt_port_mmap_chunk_start(hdr, c);
150 
151     } else {
152         p = b->mem.start;
153         c = nxt_port_mmap_chunk_id(hdr, p);
154     }
155 
156     nxt_port_mmap_free_junk(p, b->mem.end - p);
157 
158     nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), "
159               "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start,
160               b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c);
161 
162     while (p < b->mem.end) {
163         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
164 
165         p += PORT_MMAP_CHUNK_SIZE;
166         c++;
167     }
168 
169     if (hdr->dst_pid == nxt_pid
170         && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
171     {
172         process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid);
173 
174         if (process != NULL && !nxt_queue_is_empty(&process->ports)) {
175             port = nxt_process_port_first(process);
176 
177             if (port->type == NXT_PROCESS_APP) {
178                 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
179                                              -1, 0, 0, NULL);
180             }
181         }
182     }
183 
184 release_buf:
185 
186     nxt_port_mmap_handler_use(mmap_handler, -1);
187 
188     next = b->next;
189     mp = b->data;
190 
191     nxt_mp_free(mp, b);
192     nxt_mp_release(mp);
193 
194     if (next != NULL) {
195         b = next;
196         mmap_handler = b->parent;
197 
198         goto complete_buf;
199     }
200 }
201 
202 
203 nxt_port_mmap_handler_t *
204 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
205     nxt_fd_t fd)
206 {
207     void                     *mem;
208     struct stat              mmap_stat;
209     nxt_port_mmap_t          *port_mmap;
210     nxt_port_mmap_header_t   *hdr;
211     nxt_port_mmap_handler_t  *mmap_handler;
212 
213     nxt_debug(task, "got new mmap fd #%FD from process %PI",
214               fd, process->pid);
215 
216     port_mmap = NULL;
217 
218     if (fstat(fd, &mmap_stat) == -1) {
219         nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
220 
221         return NULL;
222     }
223 
224     mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
225                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
226 
227     if (nxt_slow_path(mem == MAP_FAILED)) {
228         nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
229 
230         return NULL;
231     }
232 
233     hdr = mem;
234 
235     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
236     if (nxt_slow_path(mmap_handler == NULL)) {
237         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
238 
239         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
240 
241         return NULL;
242     }
243 
244     mmap_handler->hdr = hdr;
245 
246     if (nxt_slow_path(hdr->src_pid != process->pid
247                       || hdr->dst_pid != nxt_pid))
248     {
249         nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: "
250                 "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid,
251                 hdr->dst_pid, nxt_pid);
252 
253         return NULL;
254     }
255 
256     nxt_thread_mutex_lock(&process->incoming.mutex);
257 
258     port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
259     if (nxt_slow_path(port_mmap == NULL)) {
260         nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
261 
262         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
263         hdr = NULL;
264 
265         nxt_free(mmap_handler);
266         mmap_handler = NULL;
267 
268         goto fail;
269     }
270 
271     port_mmap->mmap_handler = mmap_handler;
272     nxt_port_mmap_handler_use(mmap_handler, 1);
273 
274     hdr->sent_over = 0xFFFFu;
275 
276 fail:
277 
278     nxt_thread_mutex_unlock(&process->incoming.mutex);
279 
280     return mmap_handler;
281 }
282 
283 
284 static nxt_port_mmap_handler_t *
285 nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
286     nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n)
287 {
288     void                     *mem;
289     nxt_fd_t                 fd;
290     nxt_int_t                i;
291     nxt_free_map_t           *free_map;
292     nxt_port_mmap_t          *port_mmap;
293     nxt_port_mmap_header_t   *hdr;
294     nxt_port_mmap_handler_t  *mmap_handler;
295 
296     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
297     if (nxt_slow_path(mmap_handler == NULL)) {
298         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
299 
300         return NULL;
301     }
302 
303     port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
304     if (nxt_slow_path(port_mmap == NULL)) {
305         nxt_log(task, NXT_LOG_WARN,
306                 "failed to add port mmap to outgoing array");
307 
308         nxt_free(mmap_handler);
309         return NULL;
310     }
311 
312     fd = nxt_shm_open(task, PORT_MMAP_SIZE);
313     if (nxt_slow_path(fd == -1)) {
314         goto remove_fail;
315     }
316 
317     mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
318                        MAP_SHARED, fd, 0);
319 
320     if (nxt_slow_path(mem == MAP_FAILED)) {
321         goto remove_fail;
322     }
323 
324     mmap_handler->hdr = mem;
325     port_mmap->mmap_handler = mmap_handler;
326     nxt_port_mmap_handler_use(mmap_handler, 1);
327 
328     /* Init segment header. */
329     hdr = mmap_handler->hdr;
330 
331     nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
332     nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
333 
334     hdr->id = process->outgoing.size - 1;
335     hdr->src_pid = nxt_pid;
336     hdr->dst_pid = process->pid;
337     hdr->sent_over = port->id;
338 
339     /* Mark first chunk as busy */
340     free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
341 
342     for (i = 0; i < n; i++) {
343         nxt_port_mmap_set_chunk_busy(free_map, i);
344     }
345 
346     /* Mark as busy chunk followed the last available chunk. */
347     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
348     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
349 
350     nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
351 
352     /* TODO handle error */
353     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
354 
355     nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
356             hdr->id, nxt_pid, process->pid);
357 
358     return mmap_handler;
359 
360 remove_fail:
361 
362     nxt_free(mmap_handler);
363 
364     process->outgoing.size--;
365 
366     return NULL;
367 }
368 
369 
370 nxt_int_t
371 nxt_shm_open(nxt_task_t *task, size_t size)
372 {
373     nxt_fd_t  fd;
374 
375 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
376 
377     u_char    *p, name[64];
378 
379     p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
380                     nxt_pid, nxt_random(&task->thread->random));
381     *p = '\0';
382 
383 #endif
384 
385 #if (NXT_HAVE_MEMFD_CREATE)
386 
387     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
388 
389     if (nxt_slow_path(fd == -1)) {
390         nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
391 
392         return -1;
393     }
394 
395     nxt_debug(task, "memfd_create(%s): %FD", name, fd);
396 
397 #elif (NXT_HAVE_SHM_OPEN_ANON)
398 
399     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
400 
401     if (nxt_slow_path(fd == -1)) {
402         nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
403 
404         return -1;
405     }
406 
407     nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
408 
409 #elif (NXT_HAVE_SHM_OPEN)
410 
411     /* Just in case. */
412     shm_unlink((char *) name);
413 
414     fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
415 
416     if (nxt_slow_path(fd == -1)) {
417         nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
418 
419         return -1;
420     }
421 
422     nxt_debug(task, "shm_open(%s): %FD", name, fd);
423 
424     if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
425         nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
426                 nxt_errno);
427     }
428 
429 #else
430 
431 #error No working shared memory implementation.
432 
433 #endif
434 
435     if (nxt_slow_path(ftruncate(fd, size) == -1)) {
436         nxt_alert(task, "ftruncate() failed %E", nxt_errno);
437 
438         nxt_fd_close(fd);
439 
440         return -1;
441     }
442 
443     return fd;
444 }
445 
446 
447 static nxt_port_mmap_handler_t *
448 nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
449     nxt_int_t n, nxt_bool_t tracking)
450 {
451     nxt_int_t                i, res, nchunks;
452     nxt_process_t            *process;
453     nxt_free_map_t           *free_map;
454     nxt_port_mmap_t          *port_mmap;
455     nxt_port_mmap_t          *end_port_mmap;
456     nxt_port_mmap_header_t   *hdr;
457     nxt_port_mmap_handler_t  *mmap_handler;
458 
459     process = port->process;
460     if (nxt_slow_path(process == NULL)) {
461         return NULL;
462     }
463 
464     nxt_thread_mutex_lock(&process->outgoing.mutex);
465 
466     end_port_mmap = process->outgoing.elts + process->outgoing.size;
467 
468     for (port_mmap = process->outgoing.elts;
469          port_mmap < end_port_mmap;
470          port_mmap++)
471     {
472         mmap_handler = port_mmap->mmap_handler;
473         hdr = mmap_handler->hdr;
474 
475         if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) {
476             continue;
477         }
478 
479         *c = 0;
480 
481         free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
482 
483         while (nxt_port_mmap_get_free_chunk(free_map, c)) {
484             nchunks = 1;
485 
486             while (nchunks < n) {
487                 res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks);
488 
489                 if (res == 0) {
490                     for (i = 0; i < nchunks; i++) {
491                         nxt_port_mmap_set_chunk_free(free_map, *c + i);
492                     }
493 
494                     *c += nchunks + 1;
495                     nchunks = 0;
496                     break;
497                 }
498 
499                 nchunks++;
500             }
501 
502             if (nchunks == n) {
503                 goto unlock_return;
504             }
505         }
506 
507         hdr->oosm = 1;
508     }
509 
510     /* TODO introduce port_mmap limit and release wait. */
511 
512     *c = 0;
513     mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n);
514 
515 unlock_return:
516 
517     nxt_thread_mutex_unlock(&process->outgoing.mutex);
518 
519     return mmap_handler;
520 }
521 
522 
523 static nxt_port_mmap_handler_t *
524 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
525 {
526     nxt_process_t            *process;
527     nxt_port_mmap_handler_t  *mmap_handler;
528 
529     process = nxt_runtime_process_find(task->thread->runtime, spid);
530     if (nxt_slow_path(process == NULL)) {
531         return NULL;
532     }
533 
534     nxt_thread_mutex_lock(&process->incoming.mutex);
535 
536     if (nxt_fast_path(process->incoming.size > id)) {
537         mmap_handler = process->incoming.elts[id].mmap_handler;
538 
539     } else {
540         mmap_handler = NULL;
541 
542         nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid);
543     }
544 
545     nxt_thread_mutex_unlock(&process->incoming.mutex);
546 
547     return mmap_handler;
548 }
549 
550 
551 nxt_int_t
552 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
553     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
554 {
555     nxt_chunk_id_t           c;
556     nxt_port_mmap_header_t   *hdr;
557     nxt_port_mmap_handler_t  *mmap_handler;
558 
559     nxt_debug(task, "request tracking for stream #%uD", stream);
560 
561     mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1);
562     if (nxt_slow_path(mmap_handler == NULL)) {
563         return NXT_ERROR;
564     }
565 
566     nxt_port_mmap_handler_use(mmap_handler, 1);
567 
568     hdr = mmap_handler->hdr;
569 
570     tracking->mmap_handler = mmap_handler;
571     tracking->tracking = hdr->tracking + c;
572 
573     *tracking->tracking = stream;
574 
575     nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d",
576               hdr->src_pid, hdr->dst_pid, hdr->id, c);
577 
578     return NXT_OK;
579 }
580 
581 
582 nxt_bool_t
583 nxt_port_mmap_tracking_cancel(nxt_task_t *task,
584     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
585 {
586     nxt_bool_t               res;
587     nxt_chunk_id_t           c;
588     nxt_port_mmap_header_t   *hdr;
589     nxt_port_mmap_handler_t  *mmap_handler;
590 
591     mmap_handler = tracking->mmap_handler;
592 
593     if (nxt_slow_path(mmap_handler == NULL)) {
594         return 0;
595     }
596 
597     hdr = mmap_handler->hdr;
598 
599     res = nxt_atomic_cmp_set(tracking->tracking, stream, 0);
600 
601     nxt_debug(task, "%s tracking for stream #%uD",
602               (res ? "cancelled" : "failed to cancel"), stream);
603 
604     if (!res) {
605         c = tracking->tracking - hdr->tracking;
606         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
607     }
608 
609     nxt_port_mmap_handler_use(mmap_handler, -1);
610 
611     return res;
612 }
613 
614 
615 nxt_int_t
616 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t)
617 {
618     nxt_port_mmap_handler_t  *mmap_handler;
619 
620     mmap_handler = t->mmap_handler;
621 
622 #if (NXT_DEBUG)
623     {
624     nxt_atomic_t  *tracking;
625 
626     tracking = mmap_handler->hdr->tracking;
627 
628     nxt_assert(t->tracking >= tracking);
629     nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT);
630     }
631 #endif
632 
633     buf[0] = mmap_handler->hdr->id;
634     buf[1] = t->tracking - mmap_handler->hdr->tracking;
635 
636     return NXT_OK;
637 }
638 
639 nxt_bool_t
640 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
641 {
642     nxt_buf_t                     *b;
643     nxt_bool_t                    res;
644     nxt_chunk_id_t                c;
645     nxt_port_mmap_header_t        *hdr;
646     nxt_port_mmap_handler_t       *mmap_handler;
647     nxt_port_mmap_tracking_msg_t  *tracking_msg;
648 
649     b = msg->buf;
650 
651     if (nxt_buf_used_size(b) < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
652         nxt_debug(task, "too small message %O", nxt_buf_used_size(b));
653         return 0;
654     }
655 
656     tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos;
657 
658     b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t);
659     mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid,
660                                                    tracking_msg->mmap_id);
661 
662     if (nxt_slow_path(mmap_handler == NULL)) {
663         return 0;
664     }
665 
666     hdr = mmap_handler->hdr;
667 
668     c = tracking_msg->tracking_id;
669     res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0);
670 
671     nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream,
672               (res ? "received" : "already cancelled"));
673 
674     if (!res) {
675         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
676     }
677 
678     return res;
679 }
680 
681 
682 nxt_buf_t *
683 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
684 {
685     nxt_mp_t                 *mp;
686     nxt_buf_t                *b;
687     nxt_int_t                nchunks;
688     nxt_chunk_id_t           c;
689     nxt_port_mmap_header_t   *hdr;
690     nxt_port_mmap_handler_t  *mmap_handler;
691 
692     nxt_debug(task, "request %z bytes shm buffer", size);
693 
694     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
695 
696     if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) {
697         nxt_alert(task, "requested buffer (%z) too big", size);
698 
699         return NULL;
700     }
701 
702     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
703     if (nxt_slow_path(b == NULL)) {
704         return NULL;
705     }
706 
707     b->completion_handler = nxt_port_mmap_buf_completion;
708     nxt_buf_set_port_mmap(b);
709 
710     mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0);
711     if (nxt_slow_path(mmap_handler == NULL)) {
712         mp = task->thread->engine->mem_pool;
713         nxt_mp_free(mp, b);
714         nxt_mp_release(mp);
715         return NULL;
716     }
717 
718     b->parent = mmap_handler;
719 
720     nxt_port_mmap_handler_use(mmap_handler, 1);
721 
722     hdr = mmap_handler->hdr;
723 
724     b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
725     b->mem.pos = b->mem.start;
726     b->mem.free = b->mem.start;
727     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
728 
729     nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
730               b, b->mem.start, b->mem.end - b->mem.start,
731               hdr->src_pid, hdr->dst_pid, hdr->id, c);
732 
733     return b;
734 }
735 
736 
737 nxt_int_t
738 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
739     size_t min_size)
740 {
741     size_t                   nchunks, free_size;
742     nxt_chunk_id_t           c, start;
743     nxt_port_mmap_header_t   *hdr;
744     nxt_port_mmap_handler_t  *mmap_handler;
745 
746     nxt_debug(task, "request increase %z bytes shm buffer", size);
747 
748     if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
749         nxt_log(task, NXT_LOG_WARN,
750                 "failed to increase, not a mmap buffer");
751         return NXT_ERROR;
752     }
753 
754     free_size = nxt_buf_mem_free_size(&b->mem);
755 
756     if (nxt_slow_path(size <= free_size)) {
757         return NXT_OK;
758     }
759 
760     mmap_handler = b->parent;
761     hdr = mmap_handler->hdr;
762 
763     start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
764 
765     size -= free_size;
766 
767     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
768 
769     c = start;
770 
771     /* Try to acquire as much chunks as required. */
772     while (nchunks > 0) {
773 
774         if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
775             break;
776         }
777 
778         c++;
779         nchunks--;
780     }
781 
782     if (nchunks != 0
783         && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start))
784     {
785         c--;
786         while (c >= start) {
787             nxt_port_mmap_set_chunk_free(hdr->free_map, c);
788             c--;
789         }
790 
791         nxt_debug(task, "failed to increase, %uz chunks busy", nchunks);
792 
793         return NXT_ERROR;
794 
795     } else {
796         b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
797 
798         return NXT_OK;
799     }
800 }
801 
802 
803 static nxt_buf_t *
804 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
805     nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
806 {
807     size_t                   nchunks;
808     nxt_buf_t                *b;
809     nxt_port_mmap_header_t   *hdr;
810     nxt_port_mmap_handler_t  *mmap_handler;
811 
812     mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
813                                                    mmap_msg->mmap_id);
814     if (nxt_slow_path(mmap_handler == NULL)) {
815         return NULL;
816     }
817 
818     b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
819     if (nxt_slow_path(b == NULL)) {
820         return NULL;
821     }
822 
823     b->completion_handler = nxt_port_mmap_buf_completion;
824 
825     nxt_buf_set_port_mmap(b);
826 
827     nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
828     if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
829         nchunks++;
830     }
831 
832     hdr = mmap_handler->hdr;
833 
834     b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
835     b->mem.pos = b->mem.start;
836     b->mem.free = b->mem.start + mmap_msg->size;
837     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
838 
839     b->parent = mmap_handler;
840     nxt_port_mmap_handler_use(mmap_handler, 1);
841 
842     nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
843               b, b->mem.start, b->mem.end - b->mem.start,
844               hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
845 
846     return b;
847 }
848 
849 
850 void
851 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
852     nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf)
853 {
854     size_t                   bsize;
855     nxt_buf_t                *bmem;
856     nxt_uint_t               i;
857     nxt_port_mmap_msg_t      *mmap_msg;
858     nxt_port_mmap_header_t   *hdr;
859     nxt_port_mmap_handler_t  *mmap_handler;
860 
861     nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
862                     "via shared memory", sb->size, port->pid);
863 
864     bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
865     mmap_msg = mmsg_buf;
866 
867     bmem = msg->buf;
868 
869     for (i = 0; i < sb->niov; i++, mmap_msg++) {
870 
871         /* Lookup buffer which starts current iov_base. */
872         while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
873             bmem = bmem->next;
874         }
875 
876         if (nxt_slow_path(bmem == NULL)) {
877             nxt_log_error(NXT_LOG_ERR, task->log,
878                           "failed to find buf for iobuf[%d]", i);
879             return;
880             /* TODO clear b and exit */
881         }
882 
883         mmap_handler = bmem->parent;
884         hdr = mmap_handler->hdr;
885 
886         mmap_msg->mmap_id = hdr->id;
887         mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
888         mmap_msg->size = sb->iobuf[i].iov_len;
889 
890         nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
891                   mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
892                   port->pid);
893     }
894 
895     sb->iobuf[0].iov_base = mmsg_buf;
896     sb->iobuf[0].iov_len = bsize;
897     sb->niov = 1;
898     sb->size = bsize;
899 
900     msg->port_msg.mmap = 1;
901 }
902 
903 
904 void
905 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
906 {
907     nxt_buf_t            *b, **pb;
908     nxt_port_mmap_msg_t  *end, *mmap_msg;
909 
910     pb = &msg->buf;
911     msg->size = 0;
912 
913     for (b = msg->buf; b != NULL; b = b->next) {
914 
915         mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
916         end = (nxt_port_mmap_msg_t *) b->mem.free;
917 
918         while (mmap_msg < end) {
919             nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
920                       mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
921                       msg->port_msg.pid);
922 
923             *pb = nxt_port_mmap_get_incoming_buf(task, msg->port,
924                                                  msg->port_msg.pid, mmap_msg);
925             if (nxt_slow_path(*pb == NULL)) {
926                 nxt_log_error(NXT_LOG_ERR, task->log,
927                               "failed to get mmap buffer");
928 
929                 break;
930             }
931 
932             msg->size += mmap_msg->size;
933             pb = &(*pb)->next;
934             mmap_msg++;
935 
936             /* Mark original buf as complete. */
937             b->mem.pos += sizeof(nxt_port_mmap_msg_t);
938         }
939     }
940 }
941 
942 
943 nxt_port_method_t
944 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
945 {
946     nxt_port_method_t        m;
947     nxt_port_mmap_header_t   *hdr;
948     nxt_port_mmap_handler_t  *mmap_handler;
949 
950     m = NXT_PORT_METHOD_ANY;
951 
952     for (/* void */; b != NULL; b = b->next) {
953         if (nxt_buf_used_size(b) == 0) {
954             /* empty buffers does not affect method */
955             continue;
956         }
957 
958         if (nxt_buf_is_port_mmap(b)) {
959             mmap_handler = b->parent;
960             hdr = mmap_handler->hdr;
961 
962             if (m == NXT_PORT_METHOD_PLAIN) {
963                 nxt_log_error(NXT_LOG_ERR, task->log,
964                               "mixing plain and mmap buffers, "
965                               "using plain mode");
966 
967                 break;
968             }
969 
970             if (port->pid != hdr->dst_pid) {
971                 nxt_log_error(NXT_LOG_ERR, task->log,
972                               "send mmap buffer for %PI to %PI, "
973                               "using plain mode", hdr->dst_pid, port->pid);
974 
975                 m = NXT_PORT_METHOD_PLAIN;
976 
977                 break;
978             }
979 
980             if (m == NXT_PORT_METHOD_ANY) {
981                 nxt_debug(task, "using mmap mode");
982 
983                 m = NXT_PORT_METHOD_MMAP;
984             }
985         } else {
986             if (m == NXT_PORT_METHOD_MMAP) {
987                 nxt_log_error(NXT_LOG_ERR, task->log,
988                               "mixing mmap and plain buffers, "
989                               "switching to plain mode");
990 
991                 m = NXT_PORT_METHOD_PLAIN;
992 
993                 break;
994             }
995 
996             if (m == NXT_PORT_METHOD_ANY) {
997                 nxt_debug(task, "using plain mode");
998 
999                 m = NXT_PORT_METHOD_PLAIN;
1000             }
1001         }
1002     }
1003 
1004     return m;
1005 }
1006