xref: /unit/src/nxt_port_memory.c (revision 2126:8542c8141a13)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 #if (NXT_HAVE_MEMFD_CREATE)
10 
11 #include <linux/memfd.h>
12 #include <unistd.h>
13 #include <sys/syscall.h>
14 
15 #endif
16 
17 #include <nxt_port_memory_int.h>
18 
19 
20 static void nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port,
21     void *data);
22 
23 
24 nxt_inline void
nxt_port_mmap_handler_use(nxt_port_mmap_handler_t * mmap_handler,int i)25 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
26 {
27     int  c;
28 
29     c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
30 
31     if (i < 0 && c == -i) {
32         if (mmap_handler->hdr != NULL) {
33             nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
34             mmap_handler->hdr = NULL;
35         }
36 
37         if (mmap_handler->fd != -1) {
38             nxt_fd_close(mmap_handler->fd);
39         }
40 
41         nxt_free(mmap_handler);
42     }
43 }
44 
45 
46 static nxt_port_mmap_t *
nxt_port_mmap_at(nxt_port_mmaps_t * port_mmaps,uint32_t i)47 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
48 {
49     uint32_t  cap;
50 
51     cap = port_mmaps->cap;
52 
53     if (cap == 0) {
54         cap = i + 1;
55     }
56 
57     while (i + 1 > cap) {
58 
59         if (cap < 16) {
60             cap = cap * 2;
61 
62         } else {
63             cap = cap + cap / 2;
64         }
65     }
66 
67     if (cap != port_mmaps->cap) {
68 
69         port_mmaps->elts = nxt_realloc(port_mmaps->elts,
70                                        cap * sizeof(nxt_port_mmap_t));
71         if (nxt_slow_path(port_mmaps->elts == NULL)) {
72             return NULL;
73         }
74 
75         nxt_memzero(port_mmaps->elts + port_mmaps->cap,
76                     sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap));
77 
78         port_mmaps->cap = cap;
79     }
80 
81     if (i + 1 > port_mmaps->size) {
82         port_mmaps->size = i + 1;
83     }
84 
85     return port_mmaps->elts + i;
86 }
87 
88 
89 void
nxt_port_mmaps_destroy(nxt_port_mmaps_t * port_mmaps,nxt_bool_t free_elts)90 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
91 {
92     uint32_t         i;
93     nxt_port_mmap_t  *port_mmap;
94 
95     if (port_mmaps == NULL) {
96         return;
97     }
98 
99     port_mmap = port_mmaps->elts;
100 
101     for (i = 0; i < port_mmaps->size; i++) {
102         nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
103     }
104 
105     port_mmaps->size = 0;
106 
107     if (free_elts != 0) {
108         nxt_free(port_mmaps->elts);
109     }
110 }
111 
112 
113 #define nxt_port_mmap_free_junk(p, size)                                      \
114     memset((p), 0xA5, size)
115 
116 
117 static void
nxt_port_mmap_buf_completion(nxt_task_t * task,void * obj,void * data)118 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
119 {
120     u_char                   *p;
121     nxt_mp_t                 *mp;
122     nxt_buf_t                *b, *next;
123     nxt_process_t            *process;
124     nxt_chunk_id_t           c;
125     nxt_port_mmap_header_t   *hdr;
126     nxt_port_mmap_handler_t  *mmap_handler;
127 
128     if (nxt_buf_ts_handle(task, obj, data)) {
129         return;
130     }
131 
132     b = obj;
133 
134     nxt_assert(data == b->parent);
135 
136     mmap_handler = data;
137 
138 complete_buf:
139 
140     hdr = mmap_handler->hdr;
141 
142     if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
143         nxt_debug(task, "mmap buf completion: mmap for other process pair "
144                   "%PI->%PI", hdr->src_pid, hdr->dst_pid);
145 
146         goto release_buf;
147     }
148 
149     if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
150         /*
151          * Chunks until b->mem.pos has been sent to other side,
152          * let's release rest (if any).
153          */
154         p = b->mem.pos - 1;
155         c = nxt_port_mmap_chunk_id(hdr, p) + 1;
156         p = nxt_port_mmap_chunk_start(hdr, c);
157 
158     } else {
159         p = b->mem.start;
160         c = nxt_port_mmap_chunk_id(hdr, p);
161     }
162 
163     nxt_port_mmap_free_junk(p, b->mem.end - p);
164 
165     nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), "
166               "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start,
167               b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c);
168 
169     while (p < b->mem.end) {
170         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
171 
172         p += PORT_MMAP_CHUNK_SIZE;
173         c++;
174     }
175 
176     if (hdr->dst_pid == nxt_pid
177         && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
178     {
179         process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid);
180 
181         nxt_process_broadcast_shm_ack(task, process);
182     }
183 
184 release_buf:
185 
186     nxt_port_mmap_handler_use(mmap_handler, -1);
187 
188     next = b->next;
189     mp = b->data;
190 
191     nxt_mp_free(mp, b);
192     nxt_mp_release(mp);
193 
194     if (next != NULL) {
195         b = next;
196         mmap_handler = b->parent;
197 
198         goto complete_buf;
199     }
200 }
201 
202 
203 nxt_port_mmap_handler_t *
nxt_port_incoming_port_mmap(nxt_task_t * task,nxt_process_t * process,nxt_fd_t fd)204 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
205     nxt_fd_t fd)
206 {
207     void                     *mem;
208     struct stat              mmap_stat;
209     nxt_port_mmap_t          *port_mmap;
210     nxt_port_mmap_header_t   *hdr;
211     nxt_port_mmap_handler_t  *mmap_handler;
212 
213     nxt_debug(task, "got new mmap fd #%FD from process %PI",
214               fd, process->pid);
215 
216     port_mmap = NULL;
217 
218     if (fstat(fd, &mmap_stat) == -1) {
219         nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
220 
221         return NULL;
222     }
223 
224     mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
225                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
226 
227     if (nxt_slow_path(mem == MAP_FAILED)) {
228         nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
229 
230         return NULL;
231     }
232 
233     hdr = mem;
234 
235     if (nxt_slow_path(hdr->src_pid != process->pid
236                       || hdr->dst_pid != nxt_pid))
237     {
238         nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: "
239                 "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid,
240                 hdr->dst_pid, nxt_pid);
241 
242         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
243 
244         return NULL;
245     }
246 
247     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
248     if (nxt_slow_path(mmap_handler == NULL)) {
249         nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
250 
251         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
252 
253         return NULL;
254     }
255 
256     mmap_handler->hdr = hdr;
257     mmap_handler->fd = -1;
258 
259     nxt_thread_mutex_lock(&process->incoming.mutex);
260 
261     port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
262     if (nxt_slow_path(port_mmap == NULL)) {
263         nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
264 
265         nxt_mem_munmap(mem, PORT_MMAP_SIZE);
266 
267         nxt_free(mmap_handler);
268         mmap_handler = NULL;
269 
270         goto fail;
271     }
272 
273     port_mmap->mmap_handler = mmap_handler;
274     nxt_port_mmap_handler_use(mmap_handler, 1);
275 
276     hdr->sent_over = 0xFFFFu;
277 
278 fail:
279 
280     nxt_thread_mutex_unlock(&process->incoming.mutex);
281 
282     return mmap_handler;
283 }
284 
285 
286 static nxt_port_mmap_handler_t *
nxt_port_new_port_mmap(nxt_task_t * task,nxt_port_mmaps_t * mmaps,nxt_bool_t tracking,nxt_int_t n)287 nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
288     nxt_bool_t tracking, nxt_int_t n)
289 {
290     void                     *mem;
291     nxt_fd_t                 fd;
292     nxt_int_t                i;
293     nxt_free_map_t           *free_map;
294     nxt_port_mmap_t          *port_mmap;
295     nxt_port_mmap_header_t   *hdr;
296     nxt_port_mmap_handler_t  *mmap_handler;
297 
298     mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
299     if (nxt_slow_path(mmap_handler == NULL)) {
300         nxt_alert(task, "failed to allocate mmap_handler");
301 
302         return NULL;
303     }
304 
305     port_mmap = nxt_port_mmap_at(mmaps, mmaps->size);
306     if (nxt_slow_path(port_mmap == NULL)) {
307         nxt_alert(task, "failed to add port mmap to mmaps array");
308 
309         nxt_free(mmap_handler);
310         return NULL;
311     }
312 
313     fd = nxt_shm_open(task, PORT_MMAP_SIZE);
314     if (nxt_slow_path(fd == -1)) {
315         goto remove_fail;
316     }
317 
318     mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
319                        MAP_SHARED, fd, 0);
320 
321     if (nxt_slow_path(mem == MAP_FAILED)) {
322         nxt_fd_close(fd);
323         goto remove_fail;
324     }
325 
326     mmap_handler->hdr = mem;
327     mmap_handler->fd = fd;
328     port_mmap->mmap_handler = mmap_handler;
329     nxt_port_mmap_handler_use(mmap_handler, 1);
330 
331     /* Init segment header. */
332     hdr = mmap_handler->hdr;
333 
334     nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
335     nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
336 
337     hdr->id = mmaps->size - 1;
338     hdr->src_pid = nxt_pid;
339     hdr->sent_over = 0xFFFFu;
340 
341     /* Mark first chunk as busy */
342     free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
343 
344     for (i = 0; i < n; i++) {
345         nxt_port_mmap_set_chunk_busy(free_map, i);
346     }
347 
348     /* Mark as busy chunk followed the last available chunk. */
349     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
350     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
351 
352     nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...",
353             hdr->id, nxt_pid);
354 
355     return mmap_handler;
356 
357 remove_fail:
358 
359     nxt_free(mmap_handler);
360 
361     mmaps->size--;
362 
363     return NULL;
364 }
365 
366 
367 nxt_int_t
nxt_shm_open(nxt_task_t * task,size_t size)368 nxt_shm_open(nxt_task_t *task, size_t size)
369 {
370     nxt_fd_t  fd;
371 
372 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
373 
374     u_char    *p, name[64];
375 
376     p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
377                     nxt_pid, nxt_random(&task->thread->random));
378     *p = '\0';
379 
380 #endif
381 
382 #if (NXT_HAVE_MEMFD_CREATE)
383 
384     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
385 
386     if (nxt_slow_path(fd == -1)) {
387         nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
388 
389         return -1;
390     }
391 
392     nxt_debug(task, "memfd_create(%s): %FD", name, fd);
393 
394 #elif (NXT_HAVE_SHM_OPEN_ANON)
395 
396     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
397 
398     if (nxt_slow_path(fd == -1)) {
399         nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
400 
401         return -1;
402     }
403 
404     nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
405 
406 #elif (NXT_HAVE_SHM_OPEN)
407 
408     /* Just in case. */
409     shm_unlink((char *) name);
410 
411     fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
412 
413     if (nxt_slow_path(fd == -1)) {
414         nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
415 
416         return -1;
417     }
418 
419     nxt_debug(task, "shm_open(%s): %FD", name, fd);
420 
421     if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
422         nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
423                 nxt_errno);
424     }
425 
426 #else
427 
428 #error No working shared memory implementation.
429 
430 #endif
431 
432     if (nxt_slow_path(ftruncate(fd, size) == -1)) {
433         nxt_alert(task, "ftruncate() failed %E", nxt_errno);
434 
435         nxt_fd_close(fd);
436 
437         return -1;
438     }
439 
440     return fd;
441 }
442 
443 
444 static nxt_port_mmap_handler_t *
nxt_port_mmap_get(nxt_task_t * task,nxt_port_mmaps_t * mmaps,nxt_chunk_id_t * c,nxt_int_t n,nxt_bool_t tracking)445 nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c,
446     nxt_int_t n, nxt_bool_t tracking)
447 {
448     nxt_int_t                i, res, nchunks;
449     nxt_free_map_t           *free_map;
450     nxt_port_mmap_t          *port_mmap;
451     nxt_port_mmap_t          *end_port_mmap;
452     nxt_port_mmap_header_t   *hdr;
453     nxt_port_mmap_handler_t  *mmap_handler;
454 
455     nxt_thread_mutex_lock(&mmaps->mutex);
456 
457     end_port_mmap = mmaps->elts + mmaps->size;
458 
459     for (port_mmap = mmaps->elts;
460          port_mmap < end_port_mmap;
461          port_mmap++)
462     {
463         mmap_handler = port_mmap->mmap_handler;
464         hdr = mmap_handler->hdr;
465 
466         if (hdr->sent_over != 0xFFFFu) {
467             continue;
468         }
469 
470         *c = 0;
471 
472         free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
473 
474         while (nxt_port_mmap_get_free_chunk(free_map, c)) {
475             nchunks = 1;
476 
477             while (nchunks < n) {
478                 res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks);
479 
480                 if (res == 0) {
481                     for (i = 0; i < nchunks; i++) {
482                         nxt_port_mmap_set_chunk_free(free_map, *c + i);
483                     }
484 
485                     *c += nchunks + 1;
486                     nchunks = 0;
487                     break;
488                 }
489 
490                 nchunks++;
491             }
492 
493             if (nchunks == n) {
494                 goto unlock_return;
495             }
496         }
497 
498         hdr->oosm = 1;
499     }
500 
501     /* TODO introduce port_mmap limit and release wait. */
502 
503     *c = 0;
504     mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n);
505 
506 unlock_return:
507 
508     nxt_thread_mutex_unlock(&mmaps->mutex);
509 
510     return mmap_handler;
511 }
512 
513 
514 static nxt_port_mmap_handler_t *
nxt_port_get_port_incoming_mmap(nxt_task_t * task,nxt_pid_t spid,uint32_t id)515 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
516 {
517     nxt_process_t            *process;
518     nxt_port_mmap_handler_t  *mmap_handler;
519 
520     process = nxt_runtime_process_find(task->thread->runtime, spid);
521     if (nxt_slow_path(process == NULL)) {
522         return NULL;
523     }
524 
525     nxt_thread_mutex_lock(&process->incoming.mutex);
526 
527     if (nxt_fast_path(process->incoming.size > id)) {
528         mmap_handler = process->incoming.elts[id].mmap_handler;
529 
530     } else {
531         mmap_handler = NULL;
532 
533         nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid);
534     }
535 
536     nxt_thread_mutex_unlock(&process->incoming.mutex);
537 
538     return mmap_handler;
539 }
540 
541 
542 nxt_buf_t *
nxt_port_mmap_get_buf(nxt_task_t * task,nxt_port_mmaps_t * mmaps,size_t size)543 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size)
544 {
545     nxt_mp_t                 *mp;
546     nxt_buf_t                *b;
547     nxt_int_t                nchunks;
548     nxt_chunk_id_t           c;
549     nxt_port_mmap_header_t   *hdr;
550     nxt_port_mmap_handler_t  *mmap_handler;
551 
552     nxt_debug(task, "request %z bytes shm buffer", size);
553 
554     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
555 
556     if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) {
557         nxt_alert(task, "requested buffer (%z) too big", size);
558 
559         return NULL;
560     }
561 
562     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
563     if (nxt_slow_path(b == NULL)) {
564         return NULL;
565     }
566 
567     b->completion_handler = nxt_port_mmap_buf_completion;
568     nxt_buf_set_port_mmap(b);
569 
570     mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0);
571     if (nxt_slow_path(mmap_handler == NULL)) {
572         mp = task->thread->engine->mem_pool;
573         nxt_mp_free(mp, b);
574         nxt_mp_release(mp);
575         return NULL;
576     }
577 
578     b->parent = mmap_handler;
579 
580     nxt_port_mmap_handler_use(mmap_handler, 1);
581 
582     hdr = mmap_handler->hdr;
583 
584     b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
585     b->mem.pos = b->mem.start;
586     b->mem.free = b->mem.start;
587     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
588 
589     nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
590               b, b->mem.start, b->mem.end - b->mem.start,
591               hdr->src_pid, hdr->dst_pid, hdr->id, c);
592 
593     return b;
594 }
595 
596 
597 nxt_int_t
nxt_port_mmap_increase_buf(nxt_task_t * task,nxt_buf_t * b,size_t size,size_t min_size)598 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
599     size_t min_size)
600 {
601     size_t                   nchunks, free_size;
602     nxt_chunk_id_t           c, start;
603     nxt_port_mmap_header_t   *hdr;
604     nxt_port_mmap_handler_t  *mmap_handler;
605 
606     nxt_debug(task, "request increase %z bytes shm buffer", size);
607 
608     if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
609         nxt_log(task, NXT_LOG_WARN,
610                 "failed to increase, not a mmap buffer");
611         return NXT_ERROR;
612     }
613 
614     free_size = nxt_buf_mem_free_size(&b->mem);
615 
616     if (nxt_slow_path(size <= free_size)) {
617         return NXT_OK;
618     }
619 
620     mmap_handler = b->parent;
621     hdr = mmap_handler->hdr;
622 
623     start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
624 
625     size -= free_size;
626 
627     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
628 
629     c = start;
630 
631     /* Try to acquire as much chunks as required. */
632     while (nchunks > 0) {
633 
634         if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
635             break;
636         }
637 
638         c++;
639         nchunks--;
640     }
641 
642     if (nchunks != 0
643         && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start))
644     {
645         c--;
646         while (c >= start) {
647             nxt_port_mmap_set_chunk_free(hdr->free_map, c);
648             c--;
649         }
650 
651         nxt_debug(task, "failed to increase, %uz chunks busy", nchunks);
652 
653         return NXT_ERROR;
654 
655     } else {
656         b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
657 
658         return NXT_OK;
659     }
660 }
661 
662 
663 static nxt_buf_t *
nxt_port_mmap_get_incoming_buf(nxt_task_t * task,nxt_port_t * port,nxt_pid_t spid,nxt_port_mmap_msg_t * mmap_msg)664 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
665     nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
666 {
667     size_t                   nchunks;
668     nxt_buf_t                *b;
669     nxt_port_mmap_header_t   *hdr;
670     nxt_port_mmap_handler_t  *mmap_handler;
671 
672     mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
673                                                    mmap_msg->mmap_id);
674     if (nxt_slow_path(mmap_handler == NULL)) {
675         return NULL;
676     }
677 
678     b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
679     if (nxt_slow_path(b == NULL)) {
680         return NULL;
681     }
682 
683     b->completion_handler = nxt_port_mmap_buf_completion;
684 
685     nxt_buf_set_port_mmap(b);
686 
687     nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
688     if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
689         nchunks++;
690     }
691 
692     hdr = mmap_handler->hdr;
693 
694     b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
695     b->mem.pos = b->mem.start;
696     b->mem.free = b->mem.start + mmap_msg->size;
697     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
698 
699     b->parent = mmap_handler;
700     nxt_port_mmap_handler_use(mmap_handler, 1);
701 
702     nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
703               b, b->mem.start, b->mem.end - b->mem.start,
704               hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
705 
706     return b;
707 }
708 
709 
710 void
nxt_port_mmap_write(nxt_task_t * task,nxt_port_t * port,nxt_port_send_msg_t * msg,nxt_sendbuf_coalesce_t * sb,void * mmsg_buf)711 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
712     nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf)
713 {
714     size_t                   bsize;
715     nxt_buf_t                *bmem;
716     nxt_uint_t               i;
717     nxt_port_mmap_msg_t      *mmap_msg;
718     nxt_port_mmap_header_t   *hdr;
719     nxt_port_mmap_handler_t  *mmap_handler;
720 
721     nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
722                     "via shared memory", sb->size, port->pid);
723 
724     bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
725     mmap_msg = mmsg_buf;
726 
727     bmem = msg->buf;
728 
729     for (i = 0; i < sb->niov; i++, mmap_msg++) {
730 
731         /* Lookup buffer which starts current iov_base. */
732         while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
733             bmem = bmem->next;
734         }
735 
736         if (nxt_slow_path(bmem == NULL)) {
737             nxt_log_error(NXT_LOG_ERR, task->log,
738                           "failed to find buf for iobuf[%d]", i);
739             return;
740             /* TODO clear b and exit */
741         }
742 
743         mmap_handler = bmem->parent;
744         hdr = mmap_handler->hdr;
745 
746         mmap_msg->mmap_id = hdr->id;
747         mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
748         mmap_msg->size = sb->iobuf[i].iov_len;
749 
750         nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
751                   mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
752                   port->pid);
753     }
754 
755     sb->iobuf[0].iov_base = mmsg_buf;
756     sb->iobuf[0].iov_len = bsize;
757     sb->niov = 1;
758     sb->size = bsize;
759 
760     msg->port_msg.mmap = 1;
761 }
762 
763 
764 void
nxt_port_mmap_read(nxt_task_t * task,nxt_port_recv_msg_t * msg)765 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
766 {
767     nxt_buf_t            *b, **pb;
768     nxt_port_mmap_msg_t  *end, *mmap_msg;
769 
770     pb = &msg->buf;
771     msg->size = 0;
772 
773     for (b = msg->buf; b != NULL; b = b->next) {
774 
775         mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
776         end = (nxt_port_mmap_msg_t *) b->mem.free;
777 
778         while (mmap_msg < end) {
779             nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
780                       mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
781                       msg->port_msg.pid);
782 
783             *pb = nxt_port_mmap_get_incoming_buf(task, msg->port,
784                                                  msg->port_msg.pid, mmap_msg);
785             if (nxt_slow_path(*pb == NULL)) {
786                 nxt_log_error(NXT_LOG_ERR, task->log,
787                               "failed to get mmap buffer");
788 
789                 break;
790             }
791 
792             msg->size += mmap_msg->size;
793             pb = &(*pb)->next;
794             mmap_msg++;
795 
796             /* Mark original buf as complete. */
797             b->mem.pos += sizeof(nxt_port_mmap_msg_t);
798         }
799     }
800 }
801 
802 
803 nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t * task,nxt_port_t * port,nxt_buf_t * b)804 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
805 {
806     nxt_port_method_t  m;
807 
808     m = NXT_PORT_METHOD_ANY;
809 
810     for (/* void */; b != NULL; b = b->next) {
811         if (nxt_buf_used_size(b) == 0) {
812             /* empty buffers does not affect method */
813             continue;
814         }
815 
816         if (nxt_buf_is_port_mmap(b)) {
817             if (m == NXT_PORT_METHOD_PLAIN) {
818                 nxt_log_error(NXT_LOG_ERR, task->log,
819                               "mixing plain and mmap buffers, "
820                               "using plain mode");
821 
822                 break;
823             }
824 
825             if (m == NXT_PORT_METHOD_ANY) {
826                 nxt_debug(task, "using mmap mode");
827 
828                 m = NXT_PORT_METHOD_MMAP;
829             }
830         } else {
831             if (m == NXT_PORT_METHOD_MMAP) {
832                 nxt_log_error(NXT_LOG_ERR, task->log,
833                               "mixing mmap and plain buffers, "
834                               "switching to plain mode");
835 
836                 m = NXT_PORT_METHOD_PLAIN;
837 
838                 break;
839             }
840 
841             if (m == NXT_PORT_METHOD_ANY) {
842                 nxt_debug(task, "using plain mode");
843 
844                 m = NXT_PORT_METHOD_PLAIN;
845             }
846         }
847     }
848 
849     return m;
850 }
851 
852 
853 void
nxt_process_broadcast_shm_ack(nxt_task_t * task,nxt_process_t * process)854 nxt_process_broadcast_shm_ack(nxt_task_t *task, nxt_process_t *process)
855 {
856     nxt_port_t  *port;
857 
858     if (nxt_slow_path(process == NULL || nxt_queue_is_empty(&process->ports)))
859     {
860         return;
861     }
862 
863     port = nxt_process_port_first(process);
864 
865     if (port->type == NXT_PROCESS_APP) {
866         nxt_port_post(task, port, nxt_port_broadcast_shm_ack, process);
867     }
868 }
869 
870 
871 static void
nxt_port_broadcast_shm_ack(nxt_task_t * task,nxt_port_t * port,void * data)872 nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port, void *data)
873 {
874     nxt_process_t  *process;
875 
876     process = data;
877 
878     nxt_queue_each(port, &process->ports, nxt_port_t, link) {
879         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
880                                      -1, 0, 0, NULL);
881     } nxt_queue_loop;
882 }
883