xref: /unit/src/nxt_port_memory.c (revision 1976:3997c4645142)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 #if (NXT_HAVE_MEMFD_CREATE)
10 
11 #include <linux/memfd.h>
12 #include <unistd.h>
13 #include <sys/syscall.h>
14 
15 #endif
16 
17 #include <nxt_port_memory_int.h>
18 
19 
20 static void nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port,
21     void *data);
22 
23 
24 nxt_inline void
25 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
26 {
27     int  c;
28 
29     c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
30 
31     if (i < 0 && c == -i) {
32         if (mmap_handler->hdr != NULL) {
33             nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
34             mmap_handler->hdr = NULL;
35         }
36 
37         if (mmap_handler->fd != -1) {
38             nxt_fd_close(mmap_handler->fd);
39         }
40 
41         nxt_free(mmap_handler);
42     }
43 }
44 
45 
46 static nxt_port_mmap_t *
47 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
48 {
49     uint32_t  cap;
50 
51     cap = port_mmaps->cap;
52 
53     if (cap == 0) {
54         cap = i + 1;
55     }
56 
57     while (i + 1 > cap) {
58 
59         if (cap < 16) {
60             cap = cap * 2;
61 
62         } else {
63             cap = cap + cap / 2;
64         }
65     }
66 
67     if (cap != port_mmaps->cap) {
68 
69         port_mmaps->elts = nxt_realloc(port_mmaps->elts,
70                                        cap * sizeof(nxt_port_mmap_t));
71         if (nxt_slow_path(port_mmaps->elts == NULL)) {
72             return NULL;
73         }
74 
75         nxt_memzero(port_mmaps->elts + port_mmaps->cap,
76                     sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap));
77 
78         port_mmaps->cap = cap;
79     }
80 
81     if (i + 1 > port_mmaps->size) {
82         port_mmaps->size = i + 1;
83     }
84 
85     return port_mmaps->elts + i;
86 }
87 
88 
89 void
90 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
91 {
92     uint32_t         i;
93     nxt_port_mmap_t  *port_mmap;
94 
95     if (port_mmaps == NULL) {
96         return;
97     }
98 
99     port_mmap = port_mmaps->elts;
100 
101     for (i = 0; i < port_mmaps->size; i++) {
102         nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
103     }
104 
105     port_mmaps->size = 0;
106 
107     if (free_elts != 0) {
108         nxt_free(port_mmaps->elts);
109     }
110 }
111 
112 
113 #define nxt_port_mmap_free_junk(p, size)                                      \
114     memset((p), 0xA5, size)
115 
116 
117 static void
118 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
119 {
120     u_char                   *p;
121     nxt_mp_t                 *mp;
122     nxt_buf_t                *b, *next;
123     nxt_process_t            *process;
124     nxt_chunk_id_t           c;
125     nxt_port_mmap_header_t   *hdr;
126     nxt_port_mmap_handler_t  *mmap_handler;
127 
128     if (nxt_buf_ts_handle(task, obj, data)) {
129         return;
130     }
131 
132     b = obj;
133 
134     nxt_assert(data == b->parent);
135 
136     mmap_handler = data;
137 
138 complete_buf:
139 
140     hdr = mmap_handler->hdr;
141 
142     if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
143         nxt_debug(task, "mmap buf completion: mmap for other process pair "
144                   "%PI->%PI", hdr->src_pid, hdr->dst_pid);
145 
146         goto release_buf;
147     }
148 
149     if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
150         /*
151          * Chunks until b->mem.pos has been sent to other side,
152          * let's release rest (if any).
153          */
154         p = b->mem.pos - 1;
155         c = nxt_port_mmap_chunk_id(hdr, p) + 1;
156         p = nxt_port_mmap_chunk_start(hdr, c);
157 
158     } else {
159         p = b->mem.start;
160         c = nxt_port_mmap_chunk_id(hdr, p);
161     }
162 
163     nxt_port_mmap_free_junk(p, b->mem.end - p);
164 
165     nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), "
166               "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start,
167               b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c);
168 
169     while (p < b->mem.end) {
170         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
171 
172         p += PORT_MMAP_CHUNK_SIZE;
173         c++;
174     }
175 
176     if (hdr->dst_pid == nxt_pid
177         && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
178     {
179         process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid);
180 
181         nxt_process_broadcast_shm_ack(task, process);
182     }
183 
184 release_buf:
185 
186     nxt_port_mmap_handler_use(mmap_handler, -1);
187 
188     next = b->next;
189     mp = b->data;
190 
191     nxt_mp_free(mp, b);
192     nxt_mp_release(mp);
193 
194     if (next != NULL) {
195         b = next;
196         mmap_handler = b->parent;
197 
198         goto complete_buf;
199     }
200 }
201 
202 
203 nxt_port_mmap_handler_t *
204 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
205     nxt_fd_t fd)
206 {
207     void                     *mem;
208     struct stat              mmap_stat;
209     nxt_port_mmap_t          *port_mmap;
210     nxt_port_mmap_header_t   *hdr;
211     nxt_port_mmap_handler_t  *mmap_handler;
212 
213     nxt_debug(task, "got new mmap fd #%FD from process %PI",
214               fd, process->pid);
215 
216     port_mmap = NULL;
217 
218     if (fstat(fd, &mmap_stat) == -1) {
219         nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
220 
221         return NULL;
222     }
223 
224     mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
225                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
226 
227     if (nxt_slow_path(mem == MAP_FAILED)) {
228         nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
229 
230         return NULL;
231     }
232 
233     hdr = mem;
234 
235     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
236     if (nxt_slow_path(mmap_handler == NULL)) {
237         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
238 
239         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
240 
241         return NULL;
242     }
243 
244     mmap_handler->hdr = hdr;
245     mmap_handler->fd = -1;
246 
247     if (nxt_slow_path(hdr->src_pid != process->pid
248                       || hdr->dst_pid != nxt_pid))
249     {
250         nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: "
251                 "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid,
252                 hdr->dst_pid, nxt_pid);
253 
254         return NULL;
255     }
256 
257     nxt_thread_mutex_lock(&process->incoming.mutex);
258 
259     port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
260     if (nxt_slow_path(port_mmap == NULL)) {
261         nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
262 
263         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
264         hdr = NULL;
265 
266         nxt_free(mmap_handler);
267         mmap_handler = NULL;
268 
269         goto fail;
270     }
271 
272     port_mmap->mmap_handler = mmap_handler;
273     nxt_port_mmap_handler_use(mmap_handler, 1);
274 
275     hdr->sent_over = 0xFFFFu;
276 
277 fail:
278 
279     nxt_thread_mutex_unlock(&process->incoming.mutex);
280 
281     return mmap_handler;
282 }
283 
284 
285 static nxt_port_mmap_handler_t *
286 nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
287     nxt_bool_t tracking, nxt_int_t n)
288 {
289     void                     *mem;
290     nxt_fd_t                 fd;
291     nxt_int_t                i;
292     nxt_free_map_t           *free_map;
293     nxt_port_mmap_t          *port_mmap;
294     nxt_port_mmap_header_t   *hdr;
295     nxt_port_mmap_handler_t  *mmap_handler;
296 
297     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
298     if (nxt_slow_path(mmap_handler == NULL)) {
299         nxt_alert(task, "failed to allocate mmap_handler");
300 
301         return NULL;
302     }
303 
304     port_mmap = nxt_port_mmap_at(mmaps, mmaps->size);
305     if (nxt_slow_path(port_mmap == NULL)) {
306         nxt_alert(task, "failed to add port mmap to mmaps array");
307 
308         nxt_free(mmap_handler);
309         return NULL;
310     }
311 
312     fd = nxt_shm_open(task, PORT_MMAP_SIZE);
313     if (nxt_slow_path(fd == -1)) {
314         goto remove_fail;
315     }
316 
317     mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
318                        MAP_SHARED, fd, 0);
319 
320     if (nxt_slow_path(mem == MAP_FAILED)) {
321         nxt_fd_close(fd);
322         goto remove_fail;
323     }
324 
325     mmap_handler->hdr = mem;
326     mmap_handler->fd = fd;
327     port_mmap->mmap_handler = mmap_handler;
328     nxt_port_mmap_handler_use(mmap_handler, 1);
329 
330     /* Init segment header. */
331     hdr = mmap_handler->hdr;
332 
333     nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
334     nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
335 
336     hdr->id = mmaps->size - 1;
337     hdr->src_pid = nxt_pid;
338     hdr->sent_over = 0xFFFFu;
339 
340     /* Mark first chunk as busy */
341     free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
342 
343     for (i = 0; i < n; i++) {
344         nxt_port_mmap_set_chunk_busy(free_map, i);
345     }
346 
347     /* Mark as busy chunk followed the last available chunk. */
348     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
349     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
350 
351     nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...",
352             hdr->id, nxt_pid);
353 
354     return mmap_handler;
355 
356 remove_fail:
357 
358     nxt_free(mmap_handler);
359 
360     mmaps->size--;
361 
362     return NULL;
363 }
364 
365 
366 nxt_int_t
367 nxt_shm_open(nxt_task_t *task, size_t size)
368 {
369     nxt_fd_t  fd;
370 
371 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
372 
373     u_char    *p, name[64];
374 
375     p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
376                     nxt_pid, nxt_random(&task->thread->random));
377     *p = '\0';
378 
379 #endif
380 
381 #if (NXT_HAVE_MEMFD_CREATE)
382 
383     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
384 
385     if (nxt_slow_path(fd == -1)) {
386         nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
387 
388         return -1;
389     }
390 
391     nxt_debug(task, "memfd_create(%s): %FD", name, fd);
392 
393 #elif (NXT_HAVE_SHM_OPEN_ANON)
394 
395     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
396 
397     if (nxt_slow_path(fd == -1)) {
398         nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
399 
400         return -1;
401     }
402 
403     nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
404 
405 #elif (NXT_HAVE_SHM_OPEN)
406 
407     /* Just in case. */
408     shm_unlink((char *) name);
409 
410     fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
411 
412     if (nxt_slow_path(fd == -1)) {
413         nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
414 
415         return -1;
416     }
417 
418     nxt_debug(task, "shm_open(%s): %FD", name, fd);
419 
420     if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
421         nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
422                 nxt_errno);
423     }
424 
425 #else
426 
427 #error No working shared memory implementation.
428 
429 #endif
430 
431     if (nxt_slow_path(ftruncate(fd, size) == -1)) {
432         nxt_alert(task, "ftruncate() failed %E", nxt_errno);
433 
434         nxt_fd_close(fd);
435 
436         return -1;
437     }
438 
439     return fd;
440 }
441 
442 
443 static nxt_port_mmap_handler_t *
444 nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c,
445     nxt_int_t n, nxt_bool_t tracking)
446 {
447     nxt_int_t                i, res, nchunks;
448     nxt_free_map_t           *free_map;
449     nxt_port_mmap_t          *port_mmap;
450     nxt_port_mmap_t          *end_port_mmap;
451     nxt_port_mmap_header_t   *hdr;
452     nxt_port_mmap_handler_t  *mmap_handler;
453 
454     nxt_thread_mutex_lock(&mmaps->mutex);
455 
456     end_port_mmap = mmaps->elts + mmaps->size;
457 
458     for (port_mmap = mmaps->elts;
459          port_mmap < end_port_mmap;
460          port_mmap++)
461     {
462         mmap_handler = port_mmap->mmap_handler;
463         hdr = mmap_handler->hdr;
464 
465         if (hdr->sent_over != 0xFFFFu) {
466             continue;
467         }
468 
469         *c = 0;
470 
471         free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
472 
473         while (nxt_port_mmap_get_free_chunk(free_map, c)) {
474             nchunks = 1;
475 
476             while (nchunks < n) {
477                 res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks);
478 
479                 if (res == 0) {
480                     for (i = 0; i < nchunks; i++) {
481                         nxt_port_mmap_set_chunk_free(free_map, *c + i);
482                     }
483 
484                     *c += nchunks + 1;
485                     nchunks = 0;
486                     break;
487                 }
488 
489                 nchunks++;
490             }
491 
492             if (nchunks == n) {
493                 goto unlock_return;
494             }
495         }
496 
497         hdr->oosm = 1;
498     }
499 
500     /* TODO introduce port_mmap limit and release wait. */
501 
502     *c = 0;
503     mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n);
504 
505 unlock_return:
506 
507     nxt_thread_mutex_unlock(&mmaps->mutex);
508 
509     return mmap_handler;
510 }
511 
512 
513 static nxt_port_mmap_handler_t *
514 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
515 {
516     nxt_process_t            *process;
517     nxt_port_mmap_handler_t  *mmap_handler;
518 
519     process = nxt_runtime_process_find(task->thread->runtime, spid);
520     if (nxt_slow_path(process == NULL)) {
521         return NULL;
522     }
523 
524     nxt_thread_mutex_lock(&process->incoming.mutex);
525 
526     if (nxt_fast_path(process->incoming.size > id)) {
527         mmap_handler = process->incoming.elts[id].mmap_handler;
528 
529     } else {
530         mmap_handler = NULL;
531 
532         nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid);
533     }
534 
535     nxt_thread_mutex_unlock(&process->incoming.mutex);
536 
537     return mmap_handler;
538 }
539 
540 
541 nxt_int_t
542 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
543     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
544 {
545     nxt_chunk_id_t           c;
546     nxt_port_mmap_header_t   *hdr;
547     nxt_port_mmap_handler_t  *mmap_handler;
548 
549     nxt_debug(task, "request tracking for stream #%uD", stream);
550 
551     mmap_handler = nxt_port_mmap_get(task, mmaps, &c, 1, 1);
552     if (nxt_slow_path(mmap_handler == NULL)) {
553         return NXT_ERROR;
554     }
555 
556     nxt_port_mmap_handler_use(mmap_handler, 1);
557 
558     hdr = mmap_handler->hdr;
559 
560     tracking->mmap_handler = mmap_handler;
561     tracking->tracking = hdr->tracking + c;
562 
563     *tracking->tracking = stream;
564 
565     nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d",
566               hdr->src_pid, hdr->dst_pid, hdr->id, c);
567 
568     return NXT_OK;
569 }
570 
571 
572 nxt_bool_t
573 nxt_port_mmap_tracking_cancel(nxt_task_t *task,
574     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
575 {
576     nxt_bool_t               res;
577     nxt_chunk_id_t           c;
578     nxt_port_mmap_header_t   *hdr;
579     nxt_port_mmap_handler_t  *mmap_handler;
580 
581     mmap_handler = tracking->mmap_handler;
582 
583     if (nxt_slow_path(mmap_handler == NULL)) {
584         return 0;
585     }
586 
587     hdr = mmap_handler->hdr;
588 
589     res = nxt_atomic_cmp_set(tracking->tracking, stream, 0);
590 
591     nxt_debug(task, "%s tracking for stream #%uD",
592               (res ? "cancelled" : "failed to cancel"), stream);
593 
594     if (!res) {
595         c = tracking->tracking - hdr->tracking;
596         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
597     }
598 
599     nxt_port_mmap_handler_use(mmap_handler, -1);
600 
601     return res;
602 }
603 
604 
605 nxt_int_t
606 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t)
607 {
608     nxt_port_mmap_handler_t  *mmap_handler;
609 
610     mmap_handler = t->mmap_handler;
611 
612 #if (NXT_DEBUG)
613     {
614     nxt_atomic_t  *tracking;
615 
616     tracking = mmap_handler->hdr->tracking;
617 
618     nxt_assert(t->tracking >= tracking);
619     nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT);
620     }
621 #endif
622 
623     buf[0] = mmap_handler->hdr->id;
624     buf[1] = t->tracking - mmap_handler->hdr->tracking;
625 
626     return NXT_OK;
627 }
628 
629 nxt_bool_t
630 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
631 {
632     nxt_buf_t                     *b;
633     nxt_bool_t                    res;
634     nxt_chunk_id_t                c;
635     nxt_port_mmap_header_t        *hdr;
636     nxt_port_mmap_handler_t       *mmap_handler;
637     nxt_port_mmap_tracking_msg_t  *tracking_msg;
638 
639     b = msg->buf;
640 
641     if (nxt_buf_used_size(b) < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
642         nxt_debug(task, "too small message %O", nxt_buf_used_size(b));
643         return 0;
644     }
645 
646     tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos;
647 
648     b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t);
649     mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid,
650                                                    tracking_msg->mmap_id);
651 
652     if (nxt_slow_path(mmap_handler == NULL)) {
653         return 0;
654     }
655 
656     hdr = mmap_handler->hdr;
657 
658     c = tracking_msg->tracking_id;
659     res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0);
660 
661     nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream,
662               (res ? "received" : "already cancelled"));
663 
664     if (!res) {
665         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
666     }
667 
668     return res;
669 }
670 
671 
672 nxt_buf_t *
673 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size)
674 {
675     nxt_mp_t                 *mp;
676     nxt_buf_t                *b;
677     nxt_int_t                nchunks;
678     nxt_chunk_id_t           c;
679     nxt_port_mmap_header_t   *hdr;
680     nxt_port_mmap_handler_t  *mmap_handler;
681 
682     nxt_debug(task, "request %z bytes shm buffer", size);
683 
684     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
685 
686     if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) {
687         nxt_alert(task, "requested buffer (%z) too big", size);
688 
689         return NULL;
690     }
691 
692     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
693     if (nxt_slow_path(b == NULL)) {
694         return NULL;
695     }
696 
697     b->completion_handler = nxt_port_mmap_buf_completion;
698     nxt_buf_set_port_mmap(b);
699 
700     mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0);
701     if (nxt_slow_path(mmap_handler == NULL)) {
702         mp = task->thread->engine->mem_pool;
703         nxt_mp_free(mp, b);
704         nxt_mp_release(mp);
705         return NULL;
706     }
707 
708     b->parent = mmap_handler;
709 
710     nxt_port_mmap_handler_use(mmap_handler, 1);
711 
712     hdr = mmap_handler->hdr;
713 
714     b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
715     b->mem.pos = b->mem.start;
716     b->mem.free = b->mem.start;
717     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
718 
719     nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
720               b, b->mem.start, b->mem.end - b->mem.start,
721               hdr->src_pid, hdr->dst_pid, hdr->id, c);
722 
723     return b;
724 }
725 
726 
727 nxt_int_t
728 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
729     size_t min_size)
730 {
731     size_t                   nchunks, free_size;
732     nxt_chunk_id_t           c, start;
733     nxt_port_mmap_header_t   *hdr;
734     nxt_port_mmap_handler_t  *mmap_handler;
735 
736     nxt_debug(task, "request increase %z bytes shm buffer", size);
737 
738     if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
739         nxt_log(task, NXT_LOG_WARN,
740                 "failed to increase, not a mmap buffer");
741         return NXT_ERROR;
742     }
743 
744     free_size = nxt_buf_mem_free_size(&b->mem);
745 
746     if (nxt_slow_path(size <= free_size)) {
747         return NXT_OK;
748     }
749 
750     mmap_handler = b->parent;
751     hdr = mmap_handler->hdr;
752 
753     start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
754 
755     size -= free_size;
756 
757     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
758 
759     c = start;
760 
761     /* Try to acquire as much chunks as required. */
762     while (nchunks > 0) {
763 
764         if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
765             break;
766         }
767 
768         c++;
769         nchunks--;
770     }
771 
772     if (nchunks != 0
773         && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start))
774     {
775         c--;
776         while (c >= start) {
777             nxt_port_mmap_set_chunk_free(hdr->free_map, c);
778             c--;
779         }
780 
781         nxt_debug(task, "failed to increase, %uz chunks busy", nchunks);
782 
783         return NXT_ERROR;
784 
785     } else {
786         b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
787 
788         return NXT_OK;
789     }
790 }
791 
792 
793 static nxt_buf_t *
794 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
795     nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
796 {
797     size_t                   nchunks;
798     nxt_buf_t                *b;
799     nxt_port_mmap_header_t   *hdr;
800     nxt_port_mmap_handler_t  *mmap_handler;
801 
802     mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
803                                                    mmap_msg->mmap_id);
804     if (nxt_slow_path(mmap_handler == NULL)) {
805         return NULL;
806     }
807 
808     b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
809     if (nxt_slow_path(b == NULL)) {
810         return NULL;
811     }
812 
813     b->completion_handler = nxt_port_mmap_buf_completion;
814 
815     nxt_buf_set_port_mmap(b);
816 
817     nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
818     if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
819         nchunks++;
820     }
821 
822     hdr = mmap_handler->hdr;
823 
824     b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
825     b->mem.pos = b->mem.start;
826     b->mem.free = b->mem.start + mmap_msg->size;
827     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
828 
829     b->parent = mmap_handler;
830     nxt_port_mmap_handler_use(mmap_handler, 1);
831 
832     nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
833               b, b->mem.start, b->mem.end - b->mem.start,
834               hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
835 
836     return b;
837 }
838 
839 
840 void
841 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
842     nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf)
843 {
844     size_t                   bsize;
845     nxt_buf_t                *bmem;
846     nxt_uint_t               i;
847     nxt_port_mmap_msg_t      *mmap_msg;
848     nxt_port_mmap_header_t   *hdr;
849     nxt_port_mmap_handler_t  *mmap_handler;
850 
851     nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
852                     "via shared memory", sb->size, port->pid);
853 
854     bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
855     mmap_msg = mmsg_buf;
856 
857     bmem = msg->buf;
858 
859     for (i = 0; i < sb->niov; i++, mmap_msg++) {
860 
861         /* Lookup buffer which starts current iov_base. */
862         while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
863             bmem = bmem->next;
864         }
865 
866         if (nxt_slow_path(bmem == NULL)) {
867             nxt_log_error(NXT_LOG_ERR, task->log,
868                           "failed to find buf for iobuf[%d]", i);
869             return;
870             /* TODO clear b and exit */
871         }
872 
873         mmap_handler = bmem->parent;
874         hdr = mmap_handler->hdr;
875 
876         mmap_msg->mmap_id = hdr->id;
877         mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
878         mmap_msg->size = sb->iobuf[i].iov_len;
879 
880         nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
881                   mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
882                   port->pid);
883     }
884 
885     sb->iobuf[0].iov_base = mmsg_buf;
886     sb->iobuf[0].iov_len = bsize;
887     sb->niov = 1;
888     sb->size = bsize;
889 
890     msg->port_msg.mmap = 1;
891 }
892 
893 
894 void
895 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
896 {
897     nxt_buf_t            *b, **pb;
898     nxt_port_mmap_msg_t  *end, *mmap_msg;
899 
900     pb = &msg->buf;
901     msg->size = 0;
902 
903     for (b = msg->buf; b != NULL; b = b->next) {
904 
905         mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
906         end = (nxt_port_mmap_msg_t *) b->mem.free;
907 
908         while (mmap_msg < end) {
909             nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
910                       mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
911                       msg->port_msg.pid);
912 
913             *pb = nxt_port_mmap_get_incoming_buf(task, msg->port,
914                                                  msg->port_msg.pid, mmap_msg);
915             if (nxt_slow_path(*pb == NULL)) {
916                 nxt_log_error(NXT_LOG_ERR, task->log,
917                               "failed to get mmap buffer");
918 
919                 break;
920             }
921 
922             msg->size += mmap_msg->size;
923             pb = &(*pb)->next;
924             mmap_msg++;
925 
926             /* Mark original buf as complete. */
927             b->mem.pos += sizeof(nxt_port_mmap_msg_t);
928         }
929     }
930 }
931 
932 
933 nxt_port_method_t
934 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
935 {
936     nxt_port_method_t  m;
937 
938     m = NXT_PORT_METHOD_ANY;
939 
940     for (/* void */; b != NULL; b = b->next) {
941         if (nxt_buf_used_size(b) == 0) {
942             /* empty buffers does not affect method */
943             continue;
944         }
945 
946         if (nxt_buf_is_port_mmap(b)) {
947             if (m == NXT_PORT_METHOD_PLAIN) {
948                 nxt_log_error(NXT_LOG_ERR, task->log,
949                               "mixing plain and mmap buffers, "
950                               "using plain mode");
951 
952                 break;
953             }
954 
955             if (m == NXT_PORT_METHOD_ANY) {
956                 nxt_debug(task, "using mmap mode");
957 
958                 m = NXT_PORT_METHOD_MMAP;
959             }
960         } else {
961             if (m == NXT_PORT_METHOD_MMAP) {
962                 nxt_log_error(NXT_LOG_ERR, task->log,
963                               "mixing mmap and plain buffers, "
964                               "switching to plain mode");
965 
966                 m = NXT_PORT_METHOD_PLAIN;
967 
968                 break;
969             }
970 
971             if (m == NXT_PORT_METHOD_ANY) {
972                 nxt_debug(task, "using plain mode");
973 
974                 m = NXT_PORT_METHOD_PLAIN;
975             }
976         }
977     }
978 
979     return m;
980 }
981 
982 
983 void
984 nxt_process_broadcast_shm_ack(nxt_task_t *task, nxt_process_t *process)
985 {
986     nxt_port_t  *port;
987 
988     if (nxt_slow_path(process == NULL || nxt_queue_is_empty(&process->ports)))
989     {
990         return;
991     }
992 
993     port = nxt_process_port_first(process);
994 
995     if (port->type == NXT_PROCESS_APP) {
996         nxt_port_post(task, port, nxt_port_broadcast_shm_ack, process);
997     }
998 }
999 
1000 
1001 static void
1002 nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port, void *data)
1003 {
1004     nxt_process_t  *process;
1005 
1006     process = data;
1007 
1008     nxt_queue_each(port, &process->ports, nxt_port_t, link) {
1009         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
1010                                      -1, 0, 0, NULL);
1011     } nxt_queue_loop;
1012 }
1013