1
2 /*
3 * Copyright (C) Max Romanov
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9 #if (NXT_HAVE_MEMFD_CREATE)
10
11 #include <linux/memfd.h>
12 #include <unistd.h>
13 #include <sys/syscall.h>
14
15 #endif
16
17 #include <nxt_port_memory_int.h>
18
19
20 static void nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port,
21 void *data);
22
23
24 nxt_inline void
nxt_port_mmap_handler_use(nxt_port_mmap_handler_t * mmap_handler,int i)25 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
26 {
27 int c;
28
29 c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
30
31 if (i < 0 && c == -i) {
32 if (mmap_handler->hdr != NULL) {
33 nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
34 mmap_handler->hdr = NULL;
35 }
36
37 if (mmap_handler->fd != -1) {
38 nxt_fd_close(mmap_handler->fd);
39 }
40
41 nxt_free(mmap_handler);
42 }
43 }
44
45
46 static nxt_port_mmap_t *
nxt_port_mmap_at(nxt_port_mmaps_t * port_mmaps,uint32_t i)47 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
48 {
49 uint32_t cap;
50
51 cap = port_mmaps->cap;
52
53 if (cap == 0) {
54 cap = i + 1;
55 }
56
57 while (i + 1 > cap) {
58
59 if (cap < 16) {
60 cap = cap * 2;
61
62 } else {
63 cap = cap + cap / 2;
64 }
65 }
66
67 if (cap != port_mmaps->cap) {
68
69 port_mmaps->elts = nxt_realloc(port_mmaps->elts,
70 cap * sizeof(nxt_port_mmap_t));
71 if (nxt_slow_path(port_mmaps->elts == NULL)) {
72 return NULL;
73 }
74
75 nxt_memzero(port_mmaps->elts + port_mmaps->cap,
76 sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap));
77
78 port_mmaps->cap = cap;
79 }
80
81 if (i + 1 > port_mmaps->size) {
82 port_mmaps->size = i + 1;
83 }
84
85 return port_mmaps->elts + i;
86 }
87
88
89 void
nxt_port_mmaps_destroy(nxt_port_mmaps_t * port_mmaps,nxt_bool_t free_elts)90 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
91 {
92 uint32_t i;
93 nxt_port_mmap_t *port_mmap;
94
95 if (port_mmaps == NULL) {
96 return;
97 }
98
99 port_mmap = port_mmaps->elts;
100
101 for (i = 0; i < port_mmaps->size; i++) {
102 nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
103 }
104
105 port_mmaps->size = 0;
106
107 if (free_elts != 0) {
108 nxt_free(port_mmaps->elts);
109 }
110 }
111
112
113 #define nxt_port_mmap_free_junk(p, size) \
114 memset((p), 0xA5, size)
115
116
117 static void
nxt_port_mmap_buf_completion(nxt_task_t * task,void * obj,void * data)118 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
119 {
120 u_char *p;
121 nxt_mp_t *mp;
122 nxt_buf_t *b, *next;
123 nxt_process_t *process;
124 nxt_chunk_id_t c;
125 nxt_port_mmap_header_t *hdr;
126 nxt_port_mmap_handler_t *mmap_handler;
127
128 if (nxt_buf_ts_handle(task, obj, data)) {
129 return;
130 }
131
132 b = obj;
133
134 nxt_assert(data == b->parent);
135
136 mmap_handler = data;
137
138 complete_buf:
139
140 hdr = mmap_handler->hdr;
141
142 if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
143 nxt_debug(task, "mmap buf completion: mmap for other process pair "
144 "%PI->%PI", hdr->src_pid, hdr->dst_pid);
145
146 goto release_buf;
147 }
148
149 if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
150 /*
151 * Chunks until b->mem.pos has been sent to other side,
152 * let's release rest (if any).
153 */
154 p = b->mem.pos - 1;
155 c = nxt_port_mmap_chunk_id(hdr, p) + 1;
156 p = nxt_port_mmap_chunk_start(hdr, c);
157
158 } else {
159 p = b->mem.start;
160 c = nxt_port_mmap_chunk_id(hdr, p);
161 }
162
163 nxt_port_mmap_free_junk(p, b->mem.end - p);
164
165 nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), "
166 "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start,
167 b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c);
168
169 while (p < b->mem.end) {
170 nxt_port_mmap_set_chunk_free(hdr->free_map, c);
171
172 p += PORT_MMAP_CHUNK_SIZE;
173 c++;
174 }
175
176 if (hdr->dst_pid == nxt_pid
177 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
178 {
179 process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid);
180
181 nxt_process_broadcast_shm_ack(task, process);
182 }
183
184 release_buf:
185
186 nxt_port_mmap_handler_use(mmap_handler, -1);
187
188 next = b->next;
189 mp = b->data;
190
191 nxt_mp_free(mp, b);
192 nxt_mp_release(mp);
193
194 if (next != NULL) {
195 b = next;
196 mmap_handler = b->parent;
197
198 goto complete_buf;
199 }
200 }
201
202
203 nxt_port_mmap_handler_t *
nxt_port_incoming_port_mmap(nxt_task_t * task,nxt_process_t * process,nxt_fd_t fd)204 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
205 nxt_fd_t fd)
206 {
207 void *mem;
208 struct stat mmap_stat;
209 nxt_port_mmap_t *port_mmap;
210 nxt_port_mmap_header_t *hdr;
211 nxt_port_mmap_handler_t *mmap_handler;
212
213 nxt_debug(task, "got new mmap fd #%FD from process %PI",
214 fd, process->pid);
215
216 port_mmap = NULL;
217
218 if (fstat(fd, &mmap_stat) == -1) {
219 nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
220
221 return NULL;
222 }
223
224 mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
225 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
226
227 if (nxt_slow_path(mem == MAP_FAILED)) {
228 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
229
230 return NULL;
231 }
232
233 hdr = mem;
234
235 if (nxt_slow_path(hdr->src_pid != process->pid
236 || hdr->dst_pid != nxt_pid))
237 {
238 nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: "
239 "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid,
240 hdr->dst_pid, nxt_pid);
241
242 nxt_mem_munmap(mem, PORT_MMAP_SIZE);
243
244 return NULL;
245 }
246
247 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
248 if (nxt_slow_path(mmap_handler == NULL)) {
249 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
250
251 nxt_mem_munmap(mem, PORT_MMAP_SIZE);
252
253 return NULL;
254 }
255
256 mmap_handler->hdr = hdr;
257 mmap_handler->fd = -1;
258
259 nxt_thread_mutex_lock(&process->incoming.mutex);
260
261 port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
262 if (nxt_slow_path(port_mmap == NULL)) {
263 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
264
265 nxt_mem_munmap(mem, PORT_MMAP_SIZE);
266
267 nxt_free(mmap_handler);
268 mmap_handler = NULL;
269
270 goto fail;
271 }
272
273 port_mmap->mmap_handler = mmap_handler;
274 nxt_port_mmap_handler_use(mmap_handler, 1);
275
276 hdr->sent_over = 0xFFFFu;
277
278 fail:
279
280 nxt_thread_mutex_unlock(&process->incoming.mutex);
281
282 return mmap_handler;
283 }
284
285
286 static nxt_port_mmap_handler_t *
nxt_port_new_port_mmap(nxt_task_t * task,nxt_port_mmaps_t * mmaps,nxt_bool_t tracking,nxt_int_t n)287 nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
288 nxt_bool_t tracking, nxt_int_t n)
289 {
290 void *mem;
291 nxt_fd_t fd;
292 nxt_int_t i;
293 nxt_free_map_t *free_map;
294 nxt_port_mmap_t *port_mmap;
295 nxt_port_mmap_header_t *hdr;
296 nxt_port_mmap_handler_t *mmap_handler;
297
298 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
299 if (nxt_slow_path(mmap_handler == NULL)) {
300 nxt_alert(task, "failed to allocate mmap_handler");
301
302 return NULL;
303 }
304
305 port_mmap = nxt_port_mmap_at(mmaps, mmaps->size);
306 if (nxt_slow_path(port_mmap == NULL)) {
307 nxt_alert(task, "failed to add port mmap to mmaps array");
308
309 nxt_free(mmap_handler);
310 return NULL;
311 }
312
313 fd = nxt_shm_open(task, PORT_MMAP_SIZE);
314 if (nxt_slow_path(fd == -1)) {
315 goto remove_fail;
316 }
317
318 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
319 MAP_SHARED, fd, 0);
320
321 if (nxt_slow_path(mem == MAP_FAILED)) {
322 nxt_fd_close(fd);
323 goto remove_fail;
324 }
325
326 mmap_handler->hdr = mem;
327 mmap_handler->fd = fd;
328 port_mmap->mmap_handler = mmap_handler;
329 nxt_port_mmap_handler_use(mmap_handler, 1);
330
331 /* Init segment header. */
332 hdr = mmap_handler->hdr;
333
334 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
335 nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
336
337 hdr->id = mmaps->size - 1;
338 hdr->src_pid = nxt_pid;
339 hdr->sent_over = 0xFFFFu;
340
341 /* Mark first chunk as busy */
342 free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
343
344 for (i = 0; i < n; i++) {
345 nxt_port_mmap_set_chunk_busy(free_map, i);
346 }
347
348 /* Mark as busy chunk followed the last available chunk. */
349 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
350 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
351
352 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...",
353 hdr->id, nxt_pid);
354
355 return mmap_handler;
356
357 remove_fail:
358
359 nxt_free(mmap_handler);
360
361 mmaps->size--;
362
363 return NULL;
364 }
365
366
367 nxt_int_t
nxt_shm_open(nxt_task_t * task,size_t size)368 nxt_shm_open(nxt_task_t *task, size_t size)
369 {
370 nxt_fd_t fd;
371
372 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
373
374 u_char *p, name[64];
375
376 p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
377 nxt_pid, nxt_random(&task->thread->random));
378 *p = '\0';
379
380 #endif
381
382 #if (NXT_HAVE_MEMFD_CREATE)
383
384 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
385
386 if (nxt_slow_path(fd == -1)) {
387 nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
388
389 return -1;
390 }
391
392 nxt_debug(task, "memfd_create(%s): %FD", name, fd);
393
394 #elif (NXT_HAVE_SHM_OPEN_ANON)
395
396 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
397
398 if (nxt_slow_path(fd == -1)) {
399 nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
400
401 return -1;
402 }
403
404 nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
405
406 #elif (NXT_HAVE_SHM_OPEN)
407
408 /* Just in case. */
409 shm_unlink((char *) name);
410
411 fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
412
413 if (nxt_slow_path(fd == -1)) {
414 nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
415
416 return -1;
417 }
418
419 nxt_debug(task, "shm_open(%s): %FD", name, fd);
420
421 if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
422 nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
423 nxt_errno);
424 }
425
426 #else
427
428 #error No working shared memory implementation.
429
430 #endif
431
432 if (nxt_slow_path(ftruncate(fd, size) == -1)) {
433 nxt_alert(task, "ftruncate() failed %E", nxt_errno);
434
435 nxt_fd_close(fd);
436
437 return -1;
438 }
439
440 return fd;
441 }
442
443
444 static nxt_port_mmap_handler_t *
nxt_port_mmap_get(nxt_task_t * task,nxt_port_mmaps_t * mmaps,nxt_chunk_id_t * c,nxt_int_t n,nxt_bool_t tracking)445 nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c,
446 nxt_int_t n, nxt_bool_t tracking)
447 {
448 nxt_int_t i, res, nchunks;
449 nxt_free_map_t *free_map;
450 nxt_port_mmap_t *port_mmap;
451 nxt_port_mmap_t *end_port_mmap;
452 nxt_port_mmap_header_t *hdr;
453 nxt_port_mmap_handler_t *mmap_handler;
454
455 nxt_thread_mutex_lock(&mmaps->mutex);
456
457 end_port_mmap = mmaps->elts + mmaps->size;
458
459 for (port_mmap = mmaps->elts;
460 port_mmap < end_port_mmap;
461 port_mmap++)
462 {
463 mmap_handler = port_mmap->mmap_handler;
464 hdr = mmap_handler->hdr;
465
466 if (hdr->sent_over != 0xFFFFu) {
467 continue;
468 }
469
470 *c = 0;
471
472 free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
473
474 while (nxt_port_mmap_get_free_chunk(free_map, c)) {
475 nchunks = 1;
476
477 while (nchunks < n) {
478 res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks);
479
480 if (res == 0) {
481 for (i = 0; i < nchunks; i++) {
482 nxt_port_mmap_set_chunk_free(free_map, *c + i);
483 }
484
485 *c += nchunks + 1;
486 nchunks = 0;
487 break;
488 }
489
490 nchunks++;
491 }
492
493 if (nchunks == n) {
494 goto unlock_return;
495 }
496 }
497
498 hdr->oosm = 1;
499 }
500
501 /* TODO introduce port_mmap limit and release wait. */
502
503 *c = 0;
504 mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n);
505
506 unlock_return:
507
508 nxt_thread_mutex_unlock(&mmaps->mutex);
509
510 return mmap_handler;
511 }
512
513
514 static nxt_port_mmap_handler_t *
nxt_port_get_port_incoming_mmap(nxt_task_t * task,nxt_pid_t spid,uint32_t id)515 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
516 {
517 nxt_process_t *process;
518 nxt_port_mmap_handler_t *mmap_handler;
519
520 process = nxt_runtime_process_find(task->thread->runtime, spid);
521 if (nxt_slow_path(process == NULL)) {
522 return NULL;
523 }
524
525 nxt_thread_mutex_lock(&process->incoming.mutex);
526
527 if (nxt_fast_path(process->incoming.size > id)) {
528 mmap_handler = process->incoming.elts[id].mmap_handler;
529
530 } else {
531 mmap_handler = NULL;
532
533 nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid);
534 }
535
536 nxt_thread_mutex_unlock(&process->incoming.mutex);
537
538 return mmap_handler;
539 }
540
541
542 nxt_buf_t *
nxt_port_mmap_get_buf(nxt_task_t * task,nxt_port_mmaps_t * mmaps,size_t size)543 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size)
544 {
545 nxt_mp_t *mp;
546 nxt_buf_t *b;
547 nxt_int_t nchunks;
548 nxt_chunk_id_t c;
549 nxt_port_mmap_header_t *hdr;
550 nxt_port_mmap_handler_t *mmap_handler;
551
552 nxt_debug(task, "request %z bytes shm buffer", size);
553
554 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
555
556 if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) {
557 nxt_alert(task, "requested buffer (%z) too big", size);
558
559 return NULL;
560 }
561
562 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
563 if (nxt_slow_path(b == NULL)) {
564 return NULL;
565 }
566
567 b->completion_handler = nxt_port_mmap_buf_completion;
568 nxt_buf_set_port_mmap(b);
569
570 mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0);
571 if (nxt_slow_path(mmap_handler == NULL)) {
572 mp = task->thread->engine->mem_pool;
573 nxt_mp_free(mp, b);
574 nxt_mp_release(mp);
575 return NULL;
576 }
577
578 b->parent = mmap_handler;
579
580 nxt_port_mmap_handler_use(mmap_handler, 1);
581
582 hdr = mmap_handler->hdr;
583
584 b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
585 b->mem.pos = b->mem.start;
586 b->mem.free = b->mem.start;
587 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
588
589 nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
590 b, b->mem.start, b->mem.end - b->mem.start,
591 hdr->src_pid, hdr->dst_pid, hdr->id, c);
592
593 return b;
594 }
595
596
597 nxt_int_t
nxt_port_mmap_increase_buf(nxt_task_t * task,nxt_buf_t * b,size_t size,size_t min_size)598 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
599 size_t min_size)
600 {
601 size_t nchunks, free_size;
602 nxt_chunk_id_t c, start;
603 nxt_port_mmap_header_t *hdr;
604 nxt_port_mmap_handler_t *mmap_handler;
605
606 nxt_debug(task, "request increase %z bytes shm buffer", size);
607
608 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
609 nxt_log(task, NXT_LOG_WARN,
610 "failed to increase, not a mmap buffer");
611 return NXT_ERROR;
612 }
613
614 free_size = nxt_buf_mem_free_size(&b->mem);
615
616 if (nxt_slow_path(size <= free_size)) {
617 return NXT_OK;
618 }
619
620 mmap_handler = b->parent;
621 hdr = mmap_handler->hdr;
622
623 start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
624
625 size -= free_size;
626
627 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
628
629 c = start;
630
631 /* Try to acquire as much chunks as required. */
632 while (nchunks > 0) {
633
634 if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
635 break;
636 }
637
638 c++;
639 nchunks--;
640 }
641
642 if (nchunks != 0
643 && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start))
644 {
645 c--;
646 while (c >= start) {
647 nxt_port_mmap_set_chunk_free(hdr->free_map, c);
648 c--;
649 }
650
651 nxt_debug(task, "failed to increase, %uz chunks busy", nchunks);
652
653 return NXT_ERROR;
654
655 } else {
656 b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
657
658 return NXT_OK;
659 }
660 }
661
662
663 static nxt_buf_t *
nxt_port_mmap_get_incoming_buf(nxt_task_t * task,nxt_port_t * port,nxt_pid_t spid,nxt_port_mmap_msg_t * mmap_msg)664 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
665 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
666 {
667 size_t nchunks;
668 nxt_buf_t *b;
669 nxt_port_mmap_header_t *hdr;
670 nxt_port_mmap_handler_t *mmap_handler;
671
672 mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
673 mmap_msg->mmap_id);
674 if (nxt_slow_path(mmap_handler == NULL)) {
675 return NULL;
676 }
677
678 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
679 if (nxt_slow_path(b == NULL)) {
680 return NULL;
681 }
682
683 b->completion_handler = nxt_port_mmap_buf_completion;
684
685 nxt_buf_set_port_mmap(b);
686
687 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
688 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
689 nchunks++;
690 }
691
692 hdr = mmap_handler->hdr;
693
694 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
695 b->mem.pos = b->mem.start;
696 b->mem.free = b->mem.start + mmap_msg->size;
697 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
698
699 b->parent = mmap_handler;
700 nxt_port_mmap_handler_use(mmap_handler, 1);
701
702 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
703 b, b->mem.start, b->mem.end - b->mem.start,
704 hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
705
706 return b;
707 }
708
709
710 void
nxt_port_mmap_write(nxt_task_t * task,nxt_port_t * port,nxt_port_send_msg_t * msg,nxt_sendbuf_coalesce_t * sb,void * mmsg_buf)711 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
712 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf)
713 {
714 size_t bsize;
715 nxt_buf_t *bmem;
716 nxt_uint_t i;
717 nxt_port_mmap_msg_t *mmap_msg;
718 nxt_port_mmap_header_t *hdr;
719 nxt_port_mmap_handler_t *mmap_handler;
720
721 nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
722 "via shared memory", sb->size, port->pid);
723
724 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
725 mmap_msg = mmsg_buf;
726
727 bmem = msg->buf;
728
729 for (i = 0; i < sb->niov; i++, mmap_msg++) {
730
731 /* Lookup buffer which starts current iov_base. */
732 while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
733 bmem = bmem->next;
734 }
735
736 if (nxt_slow_path(bmem == NULL)) {
737 nxt_log_error(NXT_LOG_ERR, task->log,
738 "failed to find buf for iobuf[%d]", i);
739 return;
740 /* TODO clear b and exit */
741 }
742
743 mmap_handler = bmem->parent;
744 hdr = mmap_handler->hdr;
745
746 mmap_msg->mmap_id = hdr->id;
747 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
748 mmap_msg->size = sb->iobuf[i].iov_len;
749
750 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
751 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
752 port->pid);
753 }
754
755 sb->iobuf[0].iov_base = mmsg_buf;
756 sb->iobuf[0].iov_len = bsize;
757 sb->niov = 1;
758 sb->size = bsize;
759
760 msg->port_msg.mmap = 1;
761 }
762
763
764 void
nxt_port_mmap_read(nxt_task_t * task,nxt_port_recv_msg_t * msg)765 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
766 {
767 nxt_buf_t *b, **pb;
768 nxt_port_mmap_msg_t *end, *mmap_msg;
769
770 pb = &msg->buf;
771 msg->size = 0;
772
773 for (b = msg->buf; b != NULL; b = b->next) {
774
775 mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
776 end = (nxt_port_mmap_msg_t *) b->mem.free;
777
778 while (mmap_msg < end) {
779 nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
780 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
781 msg->port_msg.pid);
782
783 *pb = nxt_port_mmap_get_incoming_buf(task, msg->port,
784 msg->port_msg.pid, mmap_msg);
785 if (nxt_slow_path(*pb == NULL)) {
786 nxt_log_error(NXT_LOG_ERR, task->log,
787 "failed to get mmap buffer");
788
789 break;
790 }
791
792 msg->size += mmap_msg->size;
793 pb = &(*pb)->next;
794 mmap_msg++;
795
796 /* Mark original buf as complete. */
797 b->mem.pos += sizeof(nxt_port_mmap_msg_t);
798 }
799 }
800 }
801
802
803 nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t * task,nxt_port_t * port,nxt_buf_t * b)804 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
805 {
806 nxt_port_method_t m;
807
808 m = NXT_PORT_METHOD_ANY;
809
810 for (/* void */; b != NULL; b = b->next) {
811 if (nxt_buf_used_size(b) == 0) {
812 /* empty buffers does not affect method */
813 continue;
814 }
815
816 if (nxt_buf_is_port_mmap(b)) {
817 if (m == NXT_PORT_METHOD_PLAIN) {
818 nxt_log_error(NXT_LOG_ERR, task->log,
819 "mixing plain and mmap buffers, "
820 "using plain mode");
821
822 break;
823 }
824
825 if (m == NXT_PORT_METHOD_ANY) {
826 nxt_debug(task, "using mmap mode");
827
828 m = NXT_PORT_METHOD_MMAP;
829 }
830 } else {
831 if (m == NXT_PORT_METHOD_MMAP) {
832 nxt_log_error(NXT_LOG_ERR, task->log,
833 "mixing mmap and plain buffers, "
834 "switching to plain mode");
835
836 m = NXT_PORT_METHOD_PLAIN;
837
838 break;
839 }
840
841 if (m == NXT_PORT_METHOD_ANY) {
842 nxt_debug(task, "using plain mode");
843
844 m = NXT_PORT_METHOD_PLAIN;
845 }
846 }
847 }
848
849 return m;
850 }
851
852
853 void
nxt_process_broadcast_shm_ack(nxt_task_t * task,nxt_process_t * process)854 nxt_process_broadcast_shm_ack(nxt_task_t *task, nxt_process_t *process)
855 {
856 nxt_port_t *port;
857
858 if (nxt_slow_path(process == NULL || nxt_queue_is_empty(&process->ports)))
859 {
860 return;
861 }
862
863 port = nxt_process_port_first(process);
864
865 if (port->type == NXT_PROCESS_APP) {
866 nxt_port_post(task, port, nxt_port_broadcast_shm_ack, process);
867 }
868 }
869
870
871 static void
nxt_port_broadcast_shm_ack(nxt_task_t * task,nxt_port_t * port,void * data)872 nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port, void *data)
873 {
874 nxt_process_t *process;
875
876 process = data;
877
878 nxt_queue_each(port, &process->ports, nxt_port_t, link) {
879 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
880 -1, 0, 0, NULL);
881 } nxt_queue_loop;
882 }
883