xref: /unit/src/nxt_port_memory.c (revision 430:3a24c399394f)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 #if (NXT_HAVE_MEMFD_CREATE)
10 
11 #include <linux/memfd.h>
12 #include <unistd.h>
13 #include <sys/syscall.h>
14 
15 #endif
16 
17 #include <nxt_port_memory_int.h>
18 
19 
20 nxt_inline void
21 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
22 {
23     int c;
24 
25     c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
26 
27     if (i < 0 && c == -i) {
28         if (mmap_handler->hdr != NULL) {
29             nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
30             mmap_handler->hdr = NULL;
31         }
32 
33         nxt_free(mmap_handler);
34     }
35 }
36 
37 
38 static nxt_port_mmap_t *
39 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
40 {
41     uint32_t  cap;
42 
43     cap = port_mmaps->cap;
44 
45     if (cap == 0) {
46         cap = i + 1;
47     }
48 
49     while (i + 1 > cap) {
50 
51         if (cap < 16) {
52           cap = cap * 2;
53 
54         } else {
55           cap = cap + cap / 2;
56         }
57     }
58 
59     if (cap != port_mmaps->cap) {
60 
61         port_mmaps->elts = nxt_realloc(port_mmaps->elts,
62                                        cap * sizeof(nxt_port_mmap_t));
63         if (nxt_slow_path(port_mmaps->elts == NULL)) {
64             return NULL;
65         }
66 
67         nxt_memzero(port_mmaps->elts + port_mmaps->cap,
68                     sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap));
69 
70         port_mmaps->cap = cap;
71     }
72 
73     if (i + 1 > port_mmaps->size) {
74         port_mmaps->size = i + 1;
75     }
76 
77     return port_mmaps->elts + i;
78 }
79 
80 
81 void
82 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
83 {
84     uint32_t         i;
85     nxt_port_mmap_t  *port_mmap;
86 
87     if (port_mmaps == NULL) {
88         return;
89     }
90 
91     port_mmap = port_mmaps->elts;
92 
93     for (i = 0; i < port_mmaps->size; i++) {
94         nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
95     }
96 
97     port_mmaps->size = 0;
98 
99     if (free_elts != 0) {
100         nxt_free(port_mmaps->elts);
101     }
102 }
103 
104 
105 #define nxt_port_mmap_free_junk(p, size)                                      \
106     memset((p), 0xA5, size)
107 
108 
109 static void
110 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
111 {
112     u_char                   *p;
113     nxt_mp_t                 *mp;
114     nxt_buf_t                *b;
115     nxt_chunk_id_t           c;
116     nxt_port_mmap_header_t   *hdr;
117     nxt_port_mmap_handler_t  *mmap_handler;
118 
119     if (nxt_buf_ts_handle(task, obj, data)) {
120         return;
121     }
122 
123     b = obj;
124 
125     mp = b->data;
126 
127 #if (NXT_DEBUG)
128     if (nxt_slow_path(data != b->parent)) {
129         nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
130                       data, b->parent);
131         nxt_abort();
132     }
133 #endif
134 
135     mmap_handler = data;
136     hdr = mmap_handler->hdr;
137 
138     if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
139         nxt_debug(task, "mmap buf completion: mmap for other process pair "
140                   "%PI->%PI", hdr->src_pid, hdr->dst_pid);
141 
142         goto release_buf;
143     }
144 
145     if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
146         /*
147          * Chunks until b->mem.pos has been sent to other side,
148          * let's release rest (if any).
149          */
150         p = b->mem.pos - 1;
151         c = nxt_port_mmap_chunk_id(hdr, p) + 1;
152         p = nxt_port_mmap_chunk_start(hdr, c);
153 
154     } else {
155         p = b->mem.start;
156         c = nxt_port_mmap_chunk_id(hdr, p);
157     }
158 
159     nxt_port_mmap_free_junk(p, b->mem.end - p);
160 
161     nxt_debug(task, "mmap buf completion: %p [%p,%d] (sent=%d), "
162               "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start,
163               b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c);
164 
165     while (p < b->mem.end) {
166         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
167 
168         p += PORT_MMAP_CHUNK_SIZE;
169         c++;
170     }
171 
172 release_buf:
173 
174     nxt_port_mmap_handler_use(mmap_handler, -1);
175 
176     nxt_mp_free(mp, b);
177     nxt_mp_release(mp);
178 }
179 
180 
181 nxt_port_mmap_handler_t *
182 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
183     nxt_fd_t fd)
184 {
185     void                     *mem;
186     struct stat              mmap_stat;
187     nxt_port_mmap_t          *port_mmap;
188     nxt_port_mmap_header_t   *hdr;
189     nxt_port_mmap_handler_t  *mmap_handler;
190 
191     nxt_debug(task, "got new mmap fd #%FD from process %PI",
192               fd, process->pid);
193 
194     port_mmap = NULL;
195 
196     if (fstat(fd, &mmap_stat) == -1) {
197         nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
198 
199         return NULL;
200     }
201 
202     mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
203                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
204 
205     if (nxt_slow_path(mem == MAP_FAILED)) {
206         nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
207 
208         return NULL;
209     }
210 
211     hdr = mem;
212 
213     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
214     if (nxt_slow_path(mmap_handler == NULL)) {
215         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
216 
217         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
218 
219         return NULL;
220     }
221 
222     mmap_handler->hdr = hdr;
223 
224     nxt_thread_mutex_lock(&process->incoming.mutex);
225 
226     port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
227     if (nxt_slow_path(port_mmap == NULL)) {
228         nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
229 
230         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
231         hdr = NULL;
232 
233         nxt_free(mmap_handler);
234         mmap_handler = NULL;
235 
236         goto fail;
237     }
238 
239     nxt_assert(hdr->src_pid == process->pid);
240     nxt_assert(hdr->dst_pid == nxt_pid);
241 
242     port_mmap->mmap_handler = mmap_handler;
243     nxt_port_mmap_handler_use(mmap_handler, 1);
244 
245     hdr->sent_over = 0xFFFFu;
246 
247 fail:
248 
249     nxt_thread_mutex_unlock(&process->incoming.mutex);
250 
251     return mmap_handler;
252 }
253 
254 
255 static nxt_port_mmap_handler_t *
256 nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
257     nxt_port_t *port, nxt_bool_t tracking)
258 {
259     void                     *mem;
260     u_char                   *p, name[64];
261     nxt_fd_t                 fd;
262     nxt_free_map_t           *free_map;
263     nxt_port_mmap_t          *port_mmap;
264     nxt_port_mmap_header_t   *hdr;
265     nxt_port_mmap_handler_t  *mmap_handler;
266 
267     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
268     if (nxt_slow_path(mmap_handler == NULL)) {
269         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
270 
271         return NULL;
272     }
273 
274     port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
275     if (nxt_slow_path(port_mmap == NULL)) {
276         nxt_log(task, NXT_LOG_WARN,
277                 "failed to add port mmap to outgoing array");
278 
279         nxt_free(mmap_handler);
280         return NULL;
281     }
282 
283     p = nxt_sprintf(name, name + sizeof(name), "/unit.%PI.%uxD",
284                     nxt_pid, nxt_random(&task->thread->random));
285     *p = '\0';
286 
287 #if (NXT_HAVE_MEMFD_CREATE)
288 
289     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
290 
291     if (nxt_slow_path(fd == -1)) {
292         nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E",
293                 name, nxt_errno);
294 
295         goto remove_fail;
296     }
297 
298     nxt_debug(task, "memfd_create(%s): %FD", name, fd);
299 
300 #elif (NXT_HAVE_SHM_OPEN)
301 
302     /* Just in case. */
303     shm_unlink((char *) name);
304 
305     fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
306 
307     nxt_debug(task, "shm_open(%s): %FD", name, fd);
308 
309     if (nxt_slow_path(fd == -1)) {
310         nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno);
311 
312         goto remove_fail;
313     }
314 
315     if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
316         nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
317                 nxt_errno);
318     }
319 
320 #else
321 
322 #error No working shared memory implementation.
323 
324 #endif
325 
326     if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
327         nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
328 
329         goto remove_fail;
330     }
331 
332     mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
333                        MAP_SHARED, fd, 0);
334 
335     if (nxt_slow_path(mem == MAP_FAILED)) {
336         goto remove_fail;
337     }
338 
339     mmap_handler->hdr = mem;
340     port_mmap->mmap_handler = mmap_handler;
341     nxt_port_mmap_handler_use(mmap_handler, 1);
342 
343     /* Init segment header. */
344     hdr = mmap_handler->hdr;
345 
346     nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
347     nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
348 
349     hdr->id = process->outgoing.size - 1;
350     hdr->src_pid = nxt_pid;
351     hdr->dst_pid = process->pid;
352     hdr->sent_over = port->id;
353 
354     /* Mark first chunk as busy */
355     free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
356 
357     nxt_port_mmap_set_chunk_busy(free_map, 0);
358 
359     /* Mark as busy chunk followed the last available chunk. */
360     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
361     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
362 
363     nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
364 
365     /* TODO handle error */
366     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
367 
368     nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
369             hdr->id, nxt_pid, process->pid);
370 
371     return mmap_handler;
372 
373 remove_fail:
374 
375     nxt_free(mmap_handler);
376 
377     process->outgoing.size--;
378 
379     return NULL;
380 }
381 
382 
383 static nxt_port_mmap_handler_t *
384 nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
385     nxt_bool_t tracking)
386 {
387     nxt_process_t            *process;
388     nxt_free_map_t           *free_map;
389     nxt_port_mmap_t          *port_mmap;
390     nxt_port_mmap_t          *end_port_mmap;
391     nxt_port_mmap_header_t   *hdr;
392     nxt_port_mmap_handler_t  *mmap_handler;
393 
394     process = port->process;
395     if (nxt_slow_path(process == NULL)) {
396         return NULL;
397     }
398 
399     *c = 0;
400 
401     nxt_thread_mutex_lock(&process->outgoing.mutex);
402 
403     end_port_mmap = process->outgoing.elts + process->outgoing.size;
404 
405     for (port_mmap = process->outgoing.elts;
406          port_mmap < end_port_mmap;
407          port_mmap++)
408     {
409         mmap_handler = port_mmap->mmap_handler;
410         hdr = mmap_handler->hdr;
411 
412         if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) {
413             continue;
414         }
415 
416         free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
417 
418         if (nxt_port_mmap_get_free_chunk(free_map, c)) {
419             goto unlock_return;
420         }
421     }
422 
423     /* TODO introduce port_mmap limit and release wait. */
424 
425     mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking);
426 
427 unlock_return:
428 
429     nxt_thread_mutex_unlock(&process->outgoing.mutex);
430 
431     return mmap_handler;
432 }
433 
434 
435 static nxt_port_mmap_handler_t *
436 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
437 {
438     nxt_process_t            *process;
439     nxt_port_mmap_handler_t  *mmap_handler;
440 
441     process = nxt_runtime_process_find(task->thread->runtime, spid);
442     if (nxt_slow_path(process == NULL)) {
443         return NULL;
444     }
445 
446     nxt_thread_mutex_lock(&process->incoming.mutex);
447 
448     if (nxt_fast_path(process->incoming.size > id)) {
449         mmap_handler = process->incoming.elts[id].mmap_handler;
450 
451     } else {
452         mmap_handler = NULL;
453 
454         nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid);
455     }
456 
457     nxt_thread_mutex_unlock(&process->incoming.mutex);
458 
459     return mmap_handler;
460 }
461 
462 
463 nxt_int_t
464 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
465     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
466 {
467     nxt_chunk_id_t           c;
468     nxt_port_mmap_header_t   *hdr;
469     nxt_port_mmap_handler_t  *mmap_handler;
470 
471     nxt_debug(task, "request tracking for stream #%uD", stream);
472 
473     mmap_handler = nxt_port_mmap_get(task, port, &c, 1);
474     if (nxt_slow_path(mmap_handler == NULL)) {
475         return NXT_ERROR;
476     }
477 
478     nxt_port_mmap_handler_use(mmap_handler, 1);
479 
480     hdr = mmap_handler->hdr;
481 
482     tracking->mmap_handler = mmap_handler;
483     tracking->tracking = hdr->tracking + c;
484 
485     *tracking->tracking = stream;
486 
487     nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d",
488               hdr->src_pid, hdr->dst_pid, hdr->id, c);
489 
490     return NXT_OK;
491 }
492 
493 
494 nxt_bool_t
495 nxt_port_mmap_tracking_cancel(nxt_task_t *task,
496     nxt_port_mmap_tracking_t *tracking, uint32_t stream)
497 {
498     nxt_bool_t               res;
499     nxt_chunk_id_t           c;
500     nxt_port_mmap_header_t   *hdr;
501     nxt_port_mmap_handler_t  *mmap_handler;
502 
503     mmap_handler = tracking->mmap_handler;
504 
505     if (nxt_slow_path(mmap_handler == NULL)) {
506         return 0;
507     }
508 
509     hdr = mmap_handler->hdr;
510 
511     res = nxt_atomic_cmp_set(tracking->tracking, stream, 0);
512 
513     nxt_debug(task, "%s tracking for stream #%uD",
514               (res ? "cancelled" : "failed to cancel"), stream);
515 
516     if (!res) {
517         c = tracking->tracking - hdr->tracking;
518         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
519     }
520 
521     nxt_port_mmap_handler_use(mmap_handler, -1);
522 
523     return res;
524 }
525 
526 
527 nxt_int_t
528 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t)
529 {
530     nxt_atomic_t             *tracking;
531     nxt_port_mmap_handler_t  *mmap_handler;
532 
533     mmap_handler = t->mmap_handler;
534     tracking = mmap_handler->hdr->tracking;
535 
536     nxt_assert(t->tracking >= tracking);
537     nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT);
538 
539     buf[0] = mmap_handler->hdr->id;
540     buf[1] = t->tracking - mmap_handler->hdr->tracking;
541 
542     return NXT_OK;
543 }
544 
545 nxt_bool_t
546 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
547 {
548     nxt_buf_t                     *b;
549     nxt_bool_t                    res;
550     nxt_chunk_id_t                c;
551     nxt_port_mmap_header_t        *hdr;
552     nxt_port_mmap_handler_t       *mmap_handler;
553     nxt_port_mmap_tracking_msg_t  *tracking_msg;
554 
555     b = msg->buf;
556 
557     if (nxt_buf_used_size(b) < (int)sizeof(nxt_port_mmap_tracking_msg_t)) {
558         nxt_debug(task, "too small message %u", nxt_buf_used_size(b));
559         return 0;
560     }
561 
562     tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos;
563 
564     b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t);
565     mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid,
566                                                    tracking_msg->mmap_id);
567 
568     if (nxt_slow_path(mmap_handler == NULL)) {
569         return 0;
570     }
571 
572     hdr = mmap_handler->hdr;
573 
574     c = tracking_msg->tracking_id;
575     res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0);
576 
577     nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream,
578               (res ? "received" : "already cancelled"));
579 
580     if (!res) {
581         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
582     }
583 
584     return res;
585 }
586 
587 
588 nxt_buf_t *
589 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
590 {
591     size_t                   nchunks;
592     nxt_mp_t                 *mp;
593     nxt_buf_t                *b;
594     nxt_chunk_id_t           c;
595     nxt_port_mmap_header_t   *hdr;
596     nxt_port_mmap_handler_t  *mmap_handler;
597 
598     nxt_debug(task, "request %z bytes shm buffer", size);
599 
600     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
601     if (nxt_slow_path(b == NULL)) {
602         return NULL;
603     }
604 
605     b->completion_handler = nxt_port_mmap_buf_completion;
606     nxt_buf_set_port_mmap(b);
607 
608     mmap_handler = nxt_port_mmap_get(task, port, &c, 0);
609     if (nxt_slow_path(mmap_handler == NULL)) {
610         mp = task->thread->engine->mem_pool;
611         nxt_mp_free(mp, b);
612         nxt_mp_release(mp);
613         return NULL;
614     }
615 
616     b->parent = mmap_handler;
617 
618     nxt_port_mmap_handler_use(mmap_handler, 1);
619 
620     hdr = mmap_handler->hdr;
621 
622     b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
623     b->mem.pos = b->mem.start;
624     b->mem.free = b->mem.start;
625     b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
626 
627     nchunks = size / PORT_MMAP_CHUNK_SIZE;
628     if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
629         nchunks++;
630     }
631 
632     nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d",
633               b, b->mem.start, b->mem.end - b->mem.start,
634               hdr->src_pid, hdr->dst_pid, hdr->id, c);
635 
636     c++;
637     nchunks--;
638 
639     /* Try to acquire as much chunks as required. */
640     while (nchunks > 0) {
641 
642         if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
643             break;
644         }
645 
646         b->mem.end += PORT_MMAP_CHUNK_SIZE;
647         c++;
648         nchunks--;
649     }
650 
651     return b;
652 }
653 
654 
655 nxt_int_t
656 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
657     size_t min_size)
658 {
659     size_t                   nchunks, free_size;
660     nxt_chunk_id_t           c, start;
661     nxt_port_mmap_header_t   *hdr;
662     nxt_port_mmap_handler_t  *mmap_handler;
663 
664     nxt_debug(task, "request increase %z bytes shm buffer", size);
665 
666     if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
667         nxt_log(task, NXT_LOG_WARN,
668                 "failed to increase, not a mmap buffer");
669         return NXT_ERROR;
670     }
671 
672     free_size = nxt_buf_mem_free_size(&b->mem);
673 
674     if (nxt_slow_path(size <= free_size)) {
675         return NXT_OK;
676     }
677 
678     mmap_handler = b->parent;
679     hdr = mmap_handler->hdr;
680 
681     start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
682 
683     size -= free_size;
684 
685     nchunks = size / PORT_MMAP_CHUNK_SIZE;
686     if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
687         nchunks++;
688     }
689 
690     c = start;
691 
692     /* Try to acquire as much chunks as required. */
693     while (nchunks > 0) {
694 
695         if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
696             break;
697         }
698 
699         c++;
700         nchunks--;
701     }
702 
703     if (nchunks != 0
704         && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start))
705     {
706         c--;
707         while (c >= start) {
708             nxt_port_mmap_set_chunk_free(hdr->free_map, c);
709             c--;
710         }
711 
712         nxt_debug(task, "failed to increase, %d chunks busy", nchunks);
713 
714         return NXT_ERROR;
715 
716     } else {
717         b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
718 
719         return NXT_OK;
720     }
721 }
722 
723 
724 static nxt_buf_t *
725 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
726     nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
727 {
728     size_t                   nchunks;
729     nxt_buf_t                *b;
730     nxt_port_mmap_header_t   *hdr;
731     nxt_port_mmap_handler_t  *mmap_handler;
732 
733     mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
734                                                    mmap_msg->mmap_id);
735     if (nxt_slow_path(mmap_handler == NULL)) {
736         return NULL;
737     }
738 
739     b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
740     if (nxt_slow_path(b == NULL)) {
741         return NULL;
742     }
743 
744     b->completion_handler = nxt_port_mmap_buf_completion;
745 
746     nxt_buf_set_port_mmap(b);
747 
748     nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
749     if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
750         nchunks++;
751     }
752 
753     hdr = mmap_handler->hdr;
754 
755     b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
756     b->mem.pos = b->mem.start;
757     b->mem.free = b->mem.start + mmap_msg->size;
758     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
759 
760     b->parent = mmap_handler;
761     nxt_port_mmap_handler_use(mmap_handler, 1);
762 
763     nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d",
764               b, b->mem.start, b->mem.end - b->mem.start,
765               hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
766 
767     return b;
768 }
769 
770 
771 void
772 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
773     nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
774 {
775     size_t                   bsize;
776     nxt_buf_t                *bmem;
777     nxt_uint_t               i;
778     nxt_port_mmap_msg_t      *mmap_msg;
779     nxt_port_mmap_header_t   *hdr;
780     nxt_port_mmap_handler_t  *mmap_handler;
781 
782     nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
783                     "via shared memory", sb->size, port->pid);
784 
785     bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
786     mmap_msg = port->mmsg_buf;
787 
788     bmem = msg->buf;
789 
790     for (i = 0; i < sb->niov; i++, mmap_msg++) {
791 
792         /* Lookup buffer which starts current iov_base. */
793         while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
794             bmem = bmem->next;
795         }
796 
797         if (nxt_slow_path(bmem == NULL)) {
798             nxt_log_error(NXT_LOG_ERR, task->log,
799                           "failed to find buf for iobuf[%d]", i);
800             return;
801             /* TODO clear b and exit */
802         }
803 
804         mmap_handler = bmem->parent;
805         hdr = mmap_handler->hdr;
806 
807         mmap_msg->mmap_id = hdr->id;
808         mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
809         mmap_msg->size = sb->iobuf[i].iov_len;
810 
811         nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
812                   mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
813                   port->pid);
814     }
815 
816     sb->iobuf[0].iov_base = port->mmsg_buf;
817     sb->iobuf[0].iov_len = bsize;
818     sb->niov = 1;
819     sb->size = bsize;
820 
821     msg->port_msg.mmap = 1;
822 }
823 
824 
825 void
826 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
827 {
828     nxt_buf_t            *b, **pb;
829     nxt_port_mmap_msg_t  *end, *mmap_msg;
830 
831     pb = &msg->buf;
832     msg->size = 0;
833 
834     for (b = msg->buf; b != NULL; b = b->next) {
835 
836         mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
837         end = (nxt_port_mmap_msg_t *) b->mem.free;
838 
839         while (mmap_msg < end) {
840             nxt_assert(mmap_msg + 1 <= end);
841 
842             nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
843                       mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
844                       msg->port_msg.pid);
845 
846             *pb = nxt_port_mmap_get_incoming_buf(task, msg->port,
847                                                  msg->port_msg.pid, mmap_msg);
848             if (nxt_slow_path(*pb == NULL)) {
849                 nxt_log_error(NXT_LOG_ERR, task->log,
850                               "failed to get mmap buffer");
851 
852                 break;
853             }
854 
855             msg->size += mmap_msg->size;
856             pb = &(*pb)->next;
857             mmap_msg++;
858 
859             /* Mark original buf as complete. */
860             b->mem.pos += sizeof(nxt_port_mmap_msg_t);
861         }
862     }
863 }
864 
865 
866 nxt_port_method_t
867 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
868 {
869     nxt_port_method_t        m;
870     nxt_port_mmap_header_t   *hdr;
871     nxt_port_mmap_handler_t  *mmap_handler;
872 
873     m = NXT_PORT_METHOD_ANY;
874 
875     for (; b != NULL; b = b->next) {
876         if (nxt_buf_used_size(b) == 0) {
877             /* empty buffers does not affect method */
878             continue;
879         }
880 
881         if (nxt_buf_is_port_mmap(b)) {
882             mmap_handler = b->parent;
883             hdr = mmap_handler->hdr;
884 
885             if (m == NXT_PORT_METHOD_PLAIN) {
886                 nxt_log_error(NXT_LOG_ERR, task->log,
887                               "mixing plain and mmap buffers, "
888                               "using plain mode");
889 
890                 break;
891             }
892 
893             if (port->pid != hdr->dst_pid) {
894                 nxt_log_error(NXT_LOG_ERR, task->log,
895                               "send mmap buffer for %PI to %PI, "
896                               "using plain mode", hdr->dst_pid, port->pid);
897 
898                 m = NXT_PORT_METHOD_PLAIN;
899 
900                 break;
901             }
902 
903             if (m == NXT_PORT_METHOD_ANY) {
904                 nxt_debug(task, "using mmap mode");
905 
906                 m = NXT_PORT_METHOD_MMAP;
907             }
908         } else {
909             if (m == NXT_PORT_METHOD_MMAP) {
910                 nxt_log_error(NXT_LOG_ERR, task->log,
911                               "mixing mmap and plain buffers, "
912                               "switching to plain mode");
913 
914                 m = NXT_PORT_METHOD_PLAIN;
915 
916                 break;
917             }
918 
919             if (m == NXT_PORT_METHOD_ANY) {
920                 nxt_debug(task, "using plain mode");
921 
922                 m = NXT_PORT_METHOD_PLAIN;
923             }
924         }
925     }
926 
927     return m;
928 }
929