xref: /unit/src/nxt_port_memory.c (revision 259:9cf0e151e752)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 #if (NXT_HAVE_MEMFD_CREATE)
10 
11 #include <linux/memfd.h>
12 #include <unistd.h>
13 #include <sys/syscall.h>
14 
15 #endif
16 
17 #include <nxt_port_memory_int.h>
18 
19 void
20 nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
21 {
22     if (port_mmap->hdr != NULL) {
23         nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE);
24         port_mmap->hdr = NULL;
25     }
26 }
27 
28 
29 static nxt_array_t *
30 nxt_port_mmaps_create()
31 {
32     nxt_mp_t  *mp;
33 
34     mp = nxt_mp_create(1024, 128, 256, 32);
35 
36     if (nxt_slow_path(mp == NULL)) {
37         return NULL;
38     }
39 
40     return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t));
41 }
42 
43 
44 static nxt_port_mmap_t *
45 nxt_port_mmap_add(nxt_array_t *port_mmaps)
46 {
47     nxt_mp_thread_adopt(port_mmaps->mem_pool);
48 
49     return nxt_array_zero_add(port_mmaps);
50 }
51 
52 
53 void
54 nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool)
55 {
56     uint32_t         i;
57     nxt_port_mmap_t  *port_mmap;
58 
59     if (port_mmaps == NULL) {
60         return;
61     }
62 
63     nxt_mp_thread_adopt(port_mmaps->mem_pool);
64 
65     port_mmap = port_mmaps->elts;
66 
67     for (i = 0; i < port_mmaps->nelts; i++) {
68         nxt_port_mmap_destroy(port_mmap);
69     }
70 
71     port_mmaps->nelts = 0;
72 
73     if (destroy_pool != 0) {
74         nxt_mp_destroy(port_mmaps->mem_pool);
75     }
76 }
77 
78 
79 #define nxt_port_mmap_free_junk(p, size)                                      \
80     memset((p), 0xA5, size)
81 
82 
83 static void
84 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
85 {
86     u_char                  *p;
87     nxt_mp_t                *mp;
88     nxt_buf_t               *b;
89     nxt_chunk_id_t          c;
90     nxt_port_mmap_header_t  *hdr;
91 
92     if (nxt_buf_ts_handle(task, obj, data)) {
93         return;
94     }
95 
96     b = obj;
97 
98     mp = b->data;
99 
100 #if (NXT_DEBUG)
101     if (nxt_slow_path(data != b->parent)) {
102         nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
103                       data, b->parent);
104         nxt_abort();
105     }
106 #endif
107 
108     hdr = data;
109 
110     if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
111         /*
112          * Chunks until b->mem.pos has been sent to other side,
113          * let's release rest (if any).
114          */
115         p = b->mem.pos - 1;
116         c = nxt_port_mmap_chunk_id(hdr, p) + 1;
117         p = nxt_port_mmap_chunk_start(hdr, c);
118 
119     } else {
120         p = b->mem.start;
121         c = nxt_port_mmap_chunk_id(hdr, p);
122     }
123 
124     nxt_port_mmap_free_junk(p, b->mem.end - p);
125 
126     nxt_debug(task, "mmap buf completion: %p [%p,%d] (sent=%d), %PI,%d,%d", b,
127               b->mem.start, b->mem.end - b->mem.start, b->is_port_mmap_sent,
128               hdr->pid, hdr->id, c);
129 
130     while (p < b->mem.end) {
131         nxt_port_mmap_set_chunk_free(hdr, c);
132 
133         p += PORT_MMAP_CHUNK_SIZE;
134         c++;
135     }
136 
137     nxt_mp_release(mp, b);
138 }
139 
140 
141 nxt_port_mmap_header_t *
142 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
143     nxt_fd_t fd)
144 {
145     void                    *mem;
146     struct stat             mmap_stat;
147     nxt_port_mmap_t         *port_mmap;
148     nxt_port_mmap_header_t  *hdr;
149 
150     nxt_debug(task, "got new mmap fd #%FD from process %PI",
151               fd, process->pid);
152 
153     port_mmap = NULL;
154     hdr = NULL;
155 
156     if (fstat(fd, &mmap_stat) == -1) {
157         nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
158 
159         return NULL;
160     }
161 
162     nxt_thread_mutex_lock(&process->incoming_mutex);
163 
164     if (process->incoming == NULL) {
165         process->incoming = nxt_port_mmaps_create();
166     }
167 
168     if (nxt_slow_path(process->incoming == NULL)) {
169         nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array");
170 
171         goto fail;
172     }
173 
174     port_mmap = nxt_port_mmap_add(process->incoming);
175     if (nxt_slow_path(port_mmap == NULL)) {
176         nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
177 
178         goto fail;
179     }
180 
181     mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
182                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
183 
184     if (nxt_slow_path(mem == MAP_FAILED)) {
185         nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
186 
187         port_mmap = NULL;
188 
189         goto fail;
190     }
191 
192     port_mmap->hdr = mem;
193     hdr = port_mmap->hdr;
194 
195     if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) {
196         nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)",
197                 port_mmap->hdr->id, process->incoming->nelts - 1);
198         nxt_abort();
199     }
200 
201 fail:
202 
203     nxt_thread_mutex_unlock(&process->incoming_mutex);
204 
205     return hdr;
206 }
207 
208 
209 static nxt_port_mmap_header_t *
210 nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
211     nxt_port_t *port)
212 {
213     void                    *mem;
214     u_char                  *p, name[64];
215     nxt_fd_t                fd;
216     nxt_port_mmap_t         *port_mmap;
217     nxt_port_mmap_header_t  *hdr;
218 
219     port_mmap = NULL;
220 
221     if (process->outgoing == NULL) {
222         process->outgoing = nxt_port_mmaps_create();
223     }
224 
225     if (nxt_slow_path(process->outgoing == NULL)) {
226         nxt_log(task, NXT_LOG_WARN, "failed to allocate outgoing array");
227 
228         return NULL;
229     }
230 
231     port_mmap = nxt_port_mmap_add(process->outgoing);
232     if (nxt_slow_path(port_mmap == NULL)) {
233         nxt_log(task, NXT_LOG_WARN,
234                 "failed to add port mmap to outgoing array");
235 
236         return NULL;
237     }
238 
239     p = nxt_sprintf(name, name + sizeof(name), "/unit.%PI.%uxD",
240                     nxt_pid, nxt_random(&task->thread->random));
241     *p = '\0';
242 
243 #if (NXT_HAVE_MEMFD_CREATE)
244     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
245 
246     if (nxt_slow_path(fd == -1)) {
247         nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E",
248                 name, nxt_errno);
249 
250         goto remove_fail;
251     }
252 
253     nxt_debug(task, "memfd_create(%s): %FD", name, fd);
254 
255 #elif (NXT_HAVE_SHM_OPEN)
256     shm_unlink((char *) name); // just in case
257 
258     fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
259 
260     nxt_debug(task, "shm_open(%s): %FD", name, fd);
261 
262     if (nxt_slow_path(fd == -1)) {
263         nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno);
264 
265         goto remove_fail;
266     }
267 
268     if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
269         nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
270                 nxt_errno);
271     }
272 #endif
273 
274     if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
275         nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
276 
277         goto remove_fail;
278     }
279 
280     mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE,
281                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
282 
283     if (nxt_slow_path(mem == MAP_FAILED)) {
284         goto remove_fail;
285     }
286 
287     port_mmap->hdr = mem;
288 
289     /* Init segment header. */
290     hdr = port_mmap->hdr;
291 
292     nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
293 
294     hdr->id = process->outgoing->nelts - 1;
295     hdr->pid = process->pid;
296 
297     /* Mark first chunk as busy */
298     nxt_port_mmap_set_chunk_busy(hdr, 0);
299 
300     /* Mark as busy chunk followed the last available chunk. */
301     nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT);
302 
303     nxt_debug(task, "send mmap fd %FD to process %PI", fd,
304               port->pid);
305 
306     /* TODO handle error */
307     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
308 
309     nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
310             hdr->id, nxt_pid, process->pid);
311 
312     return hdr;
313 
314 remove_fail:
315 
316     nxt_array_remove(process->outgoing, port_mmap);
317 
318     return NULL;
319 }
320 
321 
322 static nxt_port_mmap_header_t *
323 nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
324     size_t size)
325 {
326     nxt_array_t             *outgoing;
327     nxt_process_t           *process;
328     nxt_port_mmap_t         *port_mmap;
329     nxt_port_mmap_t         *end_port_mmap;
330     nxt_port_mmap_header_t  *hdr;
331 
332     process = port->process;
333     if (nxt_slow_path(process == NULL)) {
334         return NULL;
335     }
336 
337     *c = 0;
338     port_mmap = NULL;
339     hdr = NULL;
340 
341     nxt_thread_mutex_lock(&process->outgoing_mutex);
342 
343     if (process->outgoing == NULL) {
344         hdr = nxt_port_new_port_mmap(task, process, port);
345 
346         goto unlock_return;
347     }
348 
349     outgoing = process->outgoing;
350     port_mmap = outgoing->elts;
351     end_port_mmap = port_mmap + outgoing->nelts;
352 
353     while (port_mmap < end_port_mmap) {
354 
355         if (nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) {
356             hdr = port_mmap->hdr;
357 
358             goto unlock_return;
359         }
360 
361         port_mmap++;
362     }
363 
364     /* TODO introduce port_mmap limit and release wait. */
365 
366     hdr = nxt_port_new_port_mmap(task, process, port);
367 
368 unlock_return:
369 
370     nxt_thread_mutex_unlock(&process->outgoing_mutex);
371 
372     return hdr;
373 }
374 
375 
376 static nxt_port_mmap_header_t *
377 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
378 {
379     nxt_array_t             *incoming;
380     nxt_process_t           *process;
381     nxt_port_mmap_t         *port_mmap;
382     nxt_port_mmap_header_t  *hdr;
383 
384     process = nxt_runtime_process_find(task->thread->runtime, spid);
385     if (nxt_slow_path(process == NULL)) {
386         return NULL;
387     }
388 
389     hdr = NULL;
390 
391     nxt_thread_mutex_lock(&process->incoming_mutex);
392 
393     incoming = process->incoming;
394 
395     if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) {
396         port_mmap = incoming->elts;
397         hdr = port_mmap[id].hdr;
398     } else {
399         nxt_log(task, NXT_LOG_WARN,
400                 "failed to get incoming mmap #%d for process %PI", id, spid);
401     }
402 
403     nxt_thread_mutex_unlock(&process->incoming_mutex);
404 
405     return hdr;
406 }
407 
408 
409 nxt_buf_t *
410 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
411 {
412     size_t                  nchunks;
413     nxt_buf_t               *b;
414     nxt_chunk_id_t          c;
415     nxt_port_mmap_header_t  *hdr;
416 
417     nxt_debug(task, "request %z bytes shm buffer", size);
418 
419     b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
420     if (nxt_slow_path(b == NULL)) {
421         return NULL;
422     }
423 
424     b->completion_handler = nxt_port_mmap_buf_completion;
425     nxt_buf_set_port_mmap(b);
426 
427     hdr = nxt_port_mmap_get(task, port, &c, size);
428     if (nxt_slow_path(hdr == NULL)) {
429         nxt_mp_release(port->mem_pool, b);
430         return NULL;
431     }
432 
433     b->parent = hdr;
434 
435     b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
436     b->mem.pos = b->mem.start;
437     b->mem.free = b->mem.start;
438     b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
439 
440     nchunks = size / PORT_MMAP_CHUNK_SIZE;
441     if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
442         nchunks++;
443     }
444 
445     nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%d] %PI,%d,%d", b,
446               b->mem.start, b->mem.end - b->mem.start,
447               hdr->pid, hdr->id, c);
448 
449     c++;
450     nchunks--;
451 
452     /* Try to acquire as much chunks as required. */
453     while (nchunks > 0) {
454 
455         if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
456             break;
457         }
458 
459         b->mem.end += PORT_MMAP_CHUNK_SIZE;
460         c++;
461         nchunks--;
462     }
463 
464     return b;
465 }
466 
467 
468 nxt_int_t
469 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
470     size_t min_size)
471 {
472     size_t                  nchunks, free_size;
473     nxt_chunk_id_t          c, start;
474     nxt_port_mmap_header_t  *hdr;
475 
476     nxt_debug(task, "request increase %z bytes shm buffer", size);
477 
478     if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
479         nxt_log(task, NXT_LOG_WARN,
480                 "failed to increase, not a mmap buffer");
481         return NXT_ERROR;
482     }
483 
484     free_size = nxt_buf_mem_free_size(&b->mem);
485 
486     if (nxt_slow_path(size <= free_size)) {
487         return NXT_OK;
488     }
489 
490     hdr = b->parent;
491 
492     start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
493 
494     size -= free_size;
495 
496     nchunks = size / PORT_MMAP_CHUNK_SIZE;
497     if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
498         nchunks++;
499     }
500 
501     c = start;
502 
503     /* Try to acquire as much chunks as required. */
504     while (nchunks > 0) {
505 
506         if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
507             break;
508         }
509 
510         c++;
511         nchunks--;
512     }
513 
514     if (nchunks != 0 &&
515         min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) {
516 
517         c--;
518         while (c >= start) {
519             nxt_port_mmap_set_chunk_free(hdr, c);
520             c--;
521         }
522 
523         nxt_debug(task, "failed to increase, %d chunks busy", nchunks);
524 
525         return NXT_ERROR;
526     } else {
527         b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
528 
529         return NXT_OK;
530     }
531 }
532 
533 
534 static nxt_buf_t *
535 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
536     nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
537 {
538     size_t                  nchunks;
539     nxt_buf_t               *b;
540     nxt_port_mmap_header_t  *hdr;
541 
542     hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id);
543     if (nxt_slow_path(hdr == NULL)) {
544         return NULL;
545     }
546 
547     b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
548     if (nxt_slow_path(b == NULL)) {
549         return NULL;
550     }
551 
552     b->completion_handler = nxt_port_mmap_buf_completion;
553 
554     nxt_buf_set_port_mmap(b);
555 
556     nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
557     if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
558         nchunks++;
559     }
560 
561     b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
562     b->mem.pos = b->mem.start;
563     b->mem.free = b->mem.start + mmap_msg->size;
564     b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
565 
566     b->parent = hdr;
567 
568     nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI,%d,%d", b,
569               b->mem.start, b->mem.end - b->mem.start,
570               hdr->pid, hdr->id, mmap_msg->chunk_id);
571 
572     return b;
573 }
574 
575 
576 void
577 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
578     nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
579 {
580     size_t                  bsize;
581     nxt_buf_t               *bmem;
582     nxt_uint_t              i;
583     nxt_port_mmap_msg_t     *mmap_msg;
584     nxt_port_mmap_header_t  *hdr;
585 
586     nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
587               "via shared memory", sb->size, port->pid);
588 
589     bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
590     mmap_msg = port->mmsg_buf;
591 
592     bmem = msg->buf;
593 
594     for (i = 0; i < sb->niov; i++, mmap_msg++) {
595 
596         /* Lookup buffer which starts current iov_base. */
597         while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
598             bmem = bmem->next;
599         }
600 
601         if (nxt_slow_path(bmem == NULL)) {
602             nxt_log_error(NXT_LOG_ERR, task->log, "failed to find buf for "
603                           "iobuf[%d]", i);
604             return;
605             /* TODO clear b and exit */
606         }
607 
608         hdr = bmem->parent;
609 
610         mmap_msg->mmap_id = hdr->id;
611         mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
612         mmap_msg->size = sb->iobuf[i].iov_len;
613 
614         nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
615                   mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
616                   port->pid);
617     }
618 
619     sb->iobuf[0].iov_base = port->mmsg_buf;
620     sb->iobuf[0].iov_len = bsize;
621     sb->niov = 1;
622     sb->size = bsize;
623 
624     msg->port_msg.mmap = 1;
625 }
626 
627 
628 void
629 nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
630     nxt_port_recv_msg_t *msg)
631 {
632     nxt_buf_t            *b, **pb;
633     nxt_port_mmap_msg_t  *end, *mmap_msg;
634 
635     b = msg->buf;
636 
637     mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
638     end = (nxt_port_mmap_msg_t *) b->mem.free;
639 
640     pb = &msg->buf;
641     msg->size = 0;
642 
643     while (mmap_msg < end) {
644         nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
645                   mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
646                   msg->port_msg.pid);
647 
648         *pb = nxt_port_mmap_get_incoming_buf(task, port, msg->port_msg.pid,
649                                              mmap_msg);
650         if (nxt_slow_path(*pb == NULL)) {
651             nxt_log_error(NXT_LOG_ERR, task->log, "failed to get mmap buffer");
652 
653             break;
654         }
655 
656         msg->size += mmap_msg->size;
657         pb = &(*pb)->next;
658         mmap_msg++;
659     }
660 
661     /* Mark original buf as complete. */
662     b->mem.pos += nxt_buf_used_size(b);
663 }
664 
665 
666 nxt_port_method_t
667 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
668 {
669     nxt_port_method_t       m;
670     nxt_port_mmap_header_t  *hdr;
671 
672     m = NXT_PORT_METHOD_ANY;
673 
674     for (; b != NULL; b = b->next) {
675         if (nxt_buf_used_size(b) == 0) {
676             /* empty buffers does not affect method */
677             continue;
678         }
679 
680         if (nxt_buf_is_port_mmap(b)) {
681             hdr = b->parent;
682 
683             if (m == NXT_PORT_METHOD_PLAIN) {
684                 nxt_log_error(NXT_LOG_ERR, task->log,
685                               "mixing plain and mmap buffers, "
686                               "using plain mode");
687 
688                 break;
689             }
690 
691             if (port->pid != hdr->pid) {
692                 nxt_log_error(NXT_LOG_ERR, task->log,
693                               "send mmap buffer for %PI to %PI, "
694                               "using plain mode", hdr->pid, port->pid);
695 
696                 m = NXT_PORT_METHOD_PLAIN;
697 
698                 break;
699             }
700 
701             if (m == NXT_PORT_METHOD_ANY) {
702                 nxt_debug(task, "using mmap mode");
703 
704                 m = NXT_PORT_METHOD_MMAP;
705             }
706         } else {
707             if (m == NXT_PORT_METHOD_MMAP) {
708                 nxt_log_error(NXT_LOG_ERR, task->log,
709                               "mixing mmap and plain buffers, "
710                               "switching to plain mode");
711 
712                 m = NXT_PORT_METHOD_PLAIN;
713 
714                 break;
715             }
716 
717             if (m == NXT_PORT_METHOD_ANY) {
718                 nxt_debug(task, "using plain mode");
719 
720                 m = NXT_PORT_METHOD_PLAIN;
721             }
722         }
723     }
724 
725     return m;
726 }
727