nxt_port_memory.c (197:ae8f843e1fd4) nxt_port_memory.c (206:86a529b2ea9b)
1
2/*
3 * Copyright (C) Max Romanov
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9#if (NXT_HAVE_MEMFD_CREATE)
10
11#include <linux/memfd.h>
12#include <unistd.h>
13#include <sys/syscall.h>
14
15#endif
16
17#include <nxt_port_memory_int.h>
18
19void
20nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
21{
22 if (port_mmap->hdr != NULL) {
23 nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE);
24 port_mmap->hdr = NULL;
25 }
26}
27
28
29static nxt_array_t *
30nxt_port_mmaps_create()
31{
32 nxt_mp_t *mp;
33
34 mp = nxt_mp_create(1024, 128, 256, 32);
35
36 if (nxt_slow_path(mp == NULL)) {
37 return NULL;
38 }
39
40 return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t));
41}
42
43
44static nxt_port_mmap_t *
45nxt_port_mmap_add(nxt_array_t *port_mmaps)
46{
47 nxt_mp_thread_adopt(port_mmaps->mem_pool);
48
49 return nxt_array_zero_add(port_mmaps);
50}
51
52
53void
54nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool)
55{
56 uint32_t i;
57 nxt_port_mmap_t *port_mmap;
58
59 if (port_mmaps == NULL) {
60 return;
61 }
62
63 nxt_mp_thread_adopt(port_mmaps->mem_pool);
64
65 port_mmap = port_mmaps->elts;
66
67 for (i = 0; i < port_mmaps->nelts; i++) {
68 nxt_port_mmap_destroy(port_mmap);
69 }
70
71 port_mmaps->nelts = 0;
72
73 if (destroy_pool != 0) {
74 nxt_mp_destroy(port_mmaps->mem_pool);
75 }
76}
77
78
79#define nxt_port_mmap_free_junk(p, size) \
80 memset((p), 0xA5, size)
81
82
83static void
84nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
85{
86 u_char *p;
87 nxt_mp_t *mp;
88 nxt_buf_t *b;
89 nxt_chunk_id_t c;
90 nxt_port_mmap_header_t *hdr;
91
92 if (nxt_buf_ts_handle(task, obj, data)) {
93 return;
94 }
95
96 b = obj;
97
1
2/*
3 * Copyright (C) Max Romanov
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9#if (NXT_HAVE_MEMFD_CREATE)
10
11#include <linux/memfd.h>
12#include <unistd.h>
13#include <sys/syscall.h>
14
15#endif
16
17#include <nxt_port_memory_int.h>
18
19void
20nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
21{
22 if (port_mmap->hdr != NULL) {
23 nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE);
24 port_mmap->hdr = NULL;
25 }
26}
27
28
29static nxt_array_t *
30nxt_port_mmaps_create()
31{
32 nxt_mp_t *mp;
33
34 mp = nxt_mp_create(1024, 128, 256, 32);
35
36 if (nxt_slow_path(mp == NULL)) {
37 return NULL;
38 }
39
40 return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t));
41}
42
43
44static nxt_port_mmap_t *
45nxt_port_mmap_add(nxt_array_t *port_mmaps)
46{
47 nxt_mp_thread_adopt(port_mmaps->mem_pool);
48
49 return nxt_array_zero_add(port_mmaps);
50}
51
52
53void
54nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool)
55{
56 uint32_t i;
57 nxt_port_mmap_t *port_mmap;
58
59 if (port_mmaps == NULL) {
60 return;
61 }
62
63 nxt_mp_thread_adopt(port_mmaps->mem_pool);
64
65 port_mmap = port_mmaps->elts;
66
67 for (i = 0; i < port_mmaps->nelts; i++) {
68 nxt_port_mmap_destroy(port_mmap);
69 }
70
71 port_mmaps->nelts = 0;
72
73 if (destroy_pool != 0) {
74 nxt_mp_destroy(port_mmaps->mem_pool);
75 }
76}
77
78
79#define nxt_port_mmap_free_junk(p, size) \
80 memset((p), 0xA5, size)
81
82
83static void
84nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
85{
86 u_char *p;
87 nxt_mp_t *mp;
88 nxt_buf_t *b;
89 nxt_chunk_id_t c;
90 nxt_port_mmap_header_t *hdr;
91
92 if (nxt_buf_ts_handle(task, obj, data)) {
93 return;
94 }
95
96 b = obj;
97
98 nxt_debug(task, "mmap buf completion: %p %p", b, b->mem.start);
99
100 mp = b->data;
101
102#if (NXT_DEBUG)
103 if (nxt_slow_path(data != b->parent)) {
104 nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
105 data, b->parent);
106 nxt_abort();
107 }
108#endif
109
110 hdr = data;
111
112 if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
113 /*
114 * Chunks until b->mem.pos has been sent to other side,
115 * let's release rest (if any).
116 */
117 p = b->mem.pos - 1;
118 c = nxt_port_mmap_chunk_id(hdr, p) + 1;
119 p = nxt_port_mmap_chunk_start(hdr, c);
120
121 } else {
122 p = b->mem.start;
123 c = nxt_port_mmap_chunk_id(hdr, p);
124 }
125
126 nxt_port_mmap_free_junk(p, b->mem.end - p);
127
98 mp = b->data;
99
100#if (NXT_DEBUG)
101 if (nxt_slow_path(data != b->parent)) {
102 nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
103 data, b->parent);
104 nxt_abort();
105 }
106#endif
107
108 hdr = data;
109
110 if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
111 /*
112 * Chunks until b->mem.pos has been sent to other side,
113 * let's release rest (if any).
114 */
115 p = b->mem.pos - 1;
116 c = nxt_port_mmap_chunk_id(hdr, p) + 1;
117 p = nxt_port_mmap_chunk_start(hdr, c);
118
119 } else {
120 p = b->mem.start;
121 c = nxt_port_mmap_chunk_id(hdr, p);
122 }
123
124 nxt_port_mmap_free_junk(p, b->mem.end - p);
125
126 nxt_debug(task, "mmap buf completion: %p [%p,%d] (sent=%d), %PI,%d,%d", b,
127 b->mem.start, b->mem.end - b->mem.start, b->is_port_mmap_sent,
128 hdr->pid, hdr->id, c);
129
128 while (p < b->mem.end) {
129 nxt_port_mmap_set_chunk_free(hdr, c);
130
131 p += PORT_MMAP_CHUNK_SIZE;
132 c++;
133 }
134
135 nxt_mp_release(mp, b);
136}
137
138
139nxt_port_mmap_header_t *
140nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
141 nxt_fd_t fd)
142{
143 void *mem;
144 struct stat mmap_stat;
145 nxt_port_mmap_t *port_mmap;
146 nxt_port_mmap_header_t *hdr;
147
148 nxt_debug(task, "got new mmap fd #%FD from process %PI",
149 fd, process->pid);
150
151 port_mmap = NULL;
152 hdr = NULL;
153
154 if (fstat(fd, &mmap_stat) == -1) {
155 nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
156
157 return NULL;
158 }
159
160 nxt_thread_mutex_lock(&process->incoming_mutex);
161
162 if (process->incoming == NULL) {
163 process->incoming = nxt_port_mmaps_create();
164 }
165
166 if (nxt_slow_path(process->incoming == NULL)) {
167 nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array");
168
169 goto fail;
170 }
171
172 port_mmap = nxt_port_mmap_add(process->incoming);
173 if (nxt_slow_path(port_mmap == NULL)) {
174 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
175
176 goto fail;
177 }
178
179 mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
180 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
181
182 if (nxt_slow_path(mem == MAP_FAILED)) {
183 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
184
185 port_mmap = NULL;
186
187 goto fail;
188 }
189
190 port_mmap->hdr = mem;
191 hdr = port_mmap->hdr;
192
193 if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) {
194 nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)",
195 port_mmap->hdr->id, process->incoming->nelts - 1);
196 nxt_abort();
197 }
198
199fail:
200
201 nxt_thread_mutex_unlock(&process->incoming_mutex);
202
203 return hdr;
204}
205
206
207static nxt_port_mmap_header_t *
208nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
209 nxt_port_t *port)
210{
211 void *mem;
212 u_char *p, name[64];
213 nxt_fd_t fd;
214 nxt_port_mmap_t *port_mmap;
215 nxt_port_mmap_header_t *hdr;
216
217 port_mmap = NULL;
218
219 if (process->outgoing == NULL) {
220 process->outgoing = nxt_port_mmaps_create();
221 }
222
223 if (nxt_slow_path(process->outgoing == NULL)) {
224 nxt_log(task, NXT_LOG_WARN, "failed to allocate outgoing array");
225
226 return NULL;
227 }
228
229 port_mmap = nxt_port_mmap_add(process->outgoing);
230 if (nxt_slow_path(port_mmap == NULL)) {
231 nxt_log(task, NXT_LOG_WARN,
232 "failed to add port mmap to outgoing array");
233
234 return NULL;
235 }
236
237 p = nxt_sprintf(name, name + sizeof(name), "/nginext.%PI.%uxD",
238 nxt_pid, nxt_random(&task->thread->random));
239 *p = '\0';
240
241#if (NXT_HAVE_MEMFD_CREATE)
242 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
243
244 if (nxt_slow_path(fd == -1)) {
245 nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E",
246 name, nxt_errno);
247
248 goto remove_fail;
249 }
250
251 nxt_debug(task, "memfd_create(%s): %FD", name, fd);
252
253#elif (NXT_HAVE_SHM_OPEN)
254 shm_unlink((char *) name); // just in case
255
256 fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
257
258 nxt_debug(task, "shm_open(%s): %FD", name, fd);
259
260 if (nxt_slow_path(fd == -1)) {
261 nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno);
262
263 goto remove_fail;
264 }
265
266 if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
267 nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
268 nxt_errno);
269 }
270#endif
271
272 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
273 nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
274
275 goto remove_fail;
276 }
277
278 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE,
279 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
280
281 if (nxt_slow_path(mem == MAP_FAILED)) {
282 goto remove_fail;
283 }
284
285 port_mmap->hdr = mem;
286
287 /* Init segment header. */
288 hdr = port_mmap->hdr;
289
290 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
291
292 hdr->id = process->outgoing->nelts - 1;
293 hdr->pid = process->pid;
294
295 /* Mark first chunk as busy */
296 nxt_port_mmap_set_chunk_busy(hdr, 0);
297
298 /* Mark as busy chunk followed the last available chunk. */
299 nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT);
300
301 nxt_debug(task, "send mmap fd %FD to process %PI", fd,
302 port->pid);
303
304 /* TODO handle error */
305 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
306
307 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
308 hdr->id, nxt_pid, process->pid);
309
310 return hdr;
311
312remove_fail:
313
314 nxt_array_remove(process->outgoing, port_mmap);
315
316 return NULL;
317}
318
319
320static nxt_port_mmap_header_t *
321nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
322 size_t size)
323{
324 nxt_array_t *outgoing;
325 nxt_process_t *process;
326 nxt_port_mmap_t *port_mmap;
327 nxt_port_mmap_t *end_port_mmap;
328 nxt_port_mmap_header_t *hdr;
329
330 process = port->process;
331 if (nxt_slow_path(process == NULL)) {
332 return NULL;
333 }
334
335 *c = 0;
336 port_mmap = NULL;
337 hdr = NULL;
338
339 nxt_thread_mutex_lock(&process->outgoing_mutex);
340
341 if (process->outgoing == NULL) {
342 hdr = nxt_port_new_port_mmap(task, process, port);
343
344 goto unlock_return;
345 }
346
347 outgoing = process->outgoing;
348 port_mmap = outgoing->elts;
349 end_port_mmap = port_mmap + outgoing->nelts;
350
351 while (port_mmap < end_port_mmap) {
352
353 if (nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) {
354 hdr = port_mmap->hdr;
355
356 goto unlock_return;
357 }
358
359 port_mmap++;
360 }
361
362 /* TODO introduce port_mmap limit and release wait. */
363
364 hdr = nxt_port_new_port_mmap(task, process, port);
365
366unlock_return:
367
368 nxt_thread_mutex_unlock(&process->outgoing_mutex);
369
370 return hdr;
371}
372
373
374static nxt_port_mmap_header_t *
375nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
376{
377 nxt_array_t *incoming;
378 nxt_process_t *process;
379 nxt_port_mmap_t *port_mmap;
380 nxt_port_mmap_header_t *hdr;
381
382 process = nxt_runtime_process_find(task->thread->runtime, spid);
383 if (nxt_slow_path(process == NULL)) {
384 return NULL;
385 }
386
387 hdr = NULL;
388
389 nxt_thread_mutex_lock(&process->incoming_mutex);
390
391 incoming = process->incoming;
392
393 if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) {
394 port_mmap = incoming->elts;
395 hdr = port_mmap[id].hdr;
396 } else {
397 nxt_log(task, NXT_LOG_WARN,
398 "failed to get incoming mmap #%d for process %PI", id, spid);
399 }
400
401 nxt_thread_mutex_unlock(&process->incoming_mutex);
402
403 return hdr;
404}
405
406
407nxt_buf_t *
408nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
409{
410 size_t nchunks;
411 nxt_buf_t *b;
412 nxt_chunk_id_t c;
413 nxt_port_mmap_header_t *hdr;
414
415 nxt_debug(task, "request %z bytes shm buffer", size);
416
130 while (p < b->mem.end) {
131 nxt_port_mmap_set_chunk_free(hdr, c);
132
133 p += PORT_MMAP_CHUNK_SIZE;
134 c++;
135 }
136
137 nxt_mp_release(mp, b);
138}
139
140
141nxt_port_mmap_header_t *
142nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
143 nxt_fd_t fd)
144{
145 void *mem;
146 struct stat mmap_stat;
147 nxt_port_mmap_t *port_mmap;
148 nxt_port_mmap_header_t *hdr;
149
150 nxt_debug(task, "got new mmap fd #%FD from process %PI",
151 fd, process->pid);
152
153 port_mmap = NULL;
154 hdr = NULL;
155
156 if (fstat(fd, &mmap_stat) == -1) {
157 nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
158
159 return NULL;
160 }
161
162 nxt_thread_mutex_lock(&process->incoming_mutex);
163
164 if (process->incoming == NULL) {
165 process->incoming = nxt_port_mmaps_create();
166 }
167
168 if (nxt_slow_path(process->incoming == NULL)) {
169 nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array");
170
171 goto fail;
172 }
173
174 port_mmap = nxt_port_mmap_add(process->incoming);
175 if (nxt_slow_path(port_mmap == NULL)) {
176 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
177
178 goto fail;
179 }
180
181 mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
182 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
183
184 if (nxt_slow_path(mem == MAP_FAILED)) {
185 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
186
187 port_mmap = NULL;
188
189 goto fail;
190 }
191
192 port_mmap->hdr = mem;
193 hdr = port_mmap->hdr;
194
195 if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) {
196 nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)",
197 port_mmap->hdr->id, process->incoming->nelts - 1);
198 nxt_abort();
199 }
200
201fail:
202
203 nxt_thread_mutex_unlock(&process->incoming_mutex);
204
205 return hdr;
206}
207
208
209static nxt_port_mmap_header_t *
210nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
211 nxt_port_t *port)
212{
213 void *mem;
214 u_char *p, name[64];
215 nxt_fd_t fd;
216 nxt_port_mmap_t *port_mmap;
217 nxt_port_mmap_header_t *hdr;
218
219 port_mmap = NULL;
220
221 if (process->outgoing == NULL) {
222 process->outgoing = nxt_port_mmaps_create();
223 }
224
225 if (nxt_slow_path(process->outgoing == NULL)) {
226 nxt_log(task, NXT_LOG_WARN, "failed to allocate outgoing array");
227
228 return NULL;
229 }
230
231 port_mmap = nxt_port_mmap_add(process->outgoing);
232 if (nxt_slow_path(port_mmap == NULL)) {
233 nxt_log(task, NXT_LOG_WARN,
234 "failed to add port mmap to outgoing array");
235
236 return NULL;
237 }
238
239 p = nxt_sprintf(name, name + sizeof(name), "/nginext.%PI.%uxD",
240 nxt_pid, nxt_random(&task->thread->random));
241 *p = '\0';
242
243#if (NXT_HAVE_MEMFD_CREATE)
244 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
245
246 if (nxt_slow_path(fd == -1)) {
247 nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E",
248 name, nxt_errno);
249
250 goto remove_fail;
251 }
252
253 nxt_debug(task, "memfd_create(%s): %FD", name, fd);
254
255#elif (NXT_HAVE_SHM_OPEN)
256 shm_unlink((char *) name); // just in case
257
258 fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
259
260 nxt_debug(task, "shm_open(%s): %FD", name, fd);
261
262 if (nxt_slow_path(fd == -1)) {
263 nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno);
264
265 goto remove_fail;
266 }
267
268 if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
269 nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
270 nxt_errno);
271 }
272#endif
273
274 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
275 nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
276
277 goto remove_fail;
278 }
279
280 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE,
281 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
282
283 if (nxt_slow_path(mem == MAP_FAILED)) {
284 goto remove_fail;
285 }
286
287 port_mmap->hdr = mem;
288
289 /* Init segment header. */
290 hdr = port_mmap->hdr;
291
292 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
293
294 hdr->id = process->outgoing->nelts - 1;
295 hdr->pid = process->pid;
296
297 /* Mark first chunk as busy */
298 nxt_port_mmap_set_chunk_busy(hdr, 0);
299
300 /* Mark as busy chunk followed the last available chunk. */
301 nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT);
302
303 nxt_debug(task, "send mmap fd %FD to process %PI", fd,
304 port->pid);
305
306 /* TODO handle error */
307 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
308
309 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
310 hdr->id, nxt_pid, process->pid);
311
312 return hdr;
313
314remove_fail:
315
316 nxt_array_remove(process->outgoing, port_mmap);
317
318 return NULL;
319}
320
321
322static nxt_port_mmap_header_t *
323nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
324 size_t size)
325{
326 nxt_array_t *outgoing;
327 nxt_process_t *process;
328 nxt_port_mmap_t *port_mmap;
329 nxt_port_mmap_t *end_port_mmap;
330 nxt_port_mmap_header_t *hdr;
331
332 process = port->process;
333 if (nxt_slow_path(process == NULL)) {
334 return NULL;
335 }
336
337 *c = 0;
338 port_mmap = NULL;
339 hdr = NULL;
340
341 nxt_thread_mutex_lock(&process->outgoing_mutex);
342
343 if (process->outgoing == NULL) {
344 hdr = nxt_port_new_port_mmap(task, process, port);
345
346 goto unlock_return;
347 }
348
349 outgoing = process->outgoing;
350 port_mmap = outgoing->elts;
351 end_port_mmap = port_mmap + outgoing->nelts;
352
353 while (port_mmap < end_port_mmap) {
354
355 if (nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) {
356 hdr = port_mmap->hdr;
357
358 goto unlock_return;
359 }
360
361 port_mmap++;
362 }
363
364 /* TODO introduce port_mmap limit and release wait. */
365
366 hdr = nxt_port_new_port_mmap(task, process, port);
367
368unlock_return:
369
370 nxt_thread_mutex_unlock(&process->outgoing_mutex);
371
372 return hdr;
373}
374
375
376static nxt_port_mmap_header_t *
377nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
378{
379 nxt_array_t *incoming;
380 nxt_process_t *process;
381 nxt_port_mmap_t *port_mmap;
382 nxt_port_mmap_header_t *hdr;
383
384 process = nxt_runtime_process_find(task->thread->runtime, spid);
385 if (nxt_slow_path(process == NULL)) {
386 return NULL;
387 }
388
389 hdr = NULL;
390
391 nxt_thread_mutex_lock(&process->incoming_mutex);
392
393 incoming = process->incoming;
394
395 if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) {
396 port_mmap = incoming->elts;
397 hdr = port_mmap[id].hdr;
398 } else {
399 nxt_log(task, NXT_LOG_WARN,
400 "failed to get incoming mmap #%d for process %PI", id, spid);
401 }
402
403 nxt_thread_mutex_unlock(&process->incoming_mutex);
404
405 return hdr;
406}
407
408
409nxt_buf_t *
410nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
411{
412 size_t nchunks;
413 nxt_buf_t *b;
414 nxt_chunk_id_t c;
415 nxt_port_mmap_header_t *hdr;
416
417 nxt_debug(task, "request %z bytes shm buffer", size);
418
417 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
418 nxt_debug(task, "requested size (%z bytes) too big", size);
419 return NULL;
420 }
421
422 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
423 if (nxt_slow_path(b == NULL)) {
424 return NULL;
425 }
426
427 b->completion_handler = nxt_port_mmap_buf_completion;
428 nxt_buf_set_port_mmap(b);
429
430 hdr = nxt_port_mmap_get(task, port, &c, size);
431 if (nxt_slow_path(hdr == NULL)) {
432 nxt_mp_release(port->mem_pool, b);
433 return NULL;
434 }
435
436 b->parent = hdr;
437
438 b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
439 b->mem.pos = b->mem.start;
440 b->mem.free = b->mem.start;
441 b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
442
443 nchunks = size / PORT_MMAP_CHUNK_SIZE;
444 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
445 nchunks++;
446 }
447
419 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
420 if (nxt_slow_path(b == NULL)) {
421 return NULL;
422 }
423
424 b->completion_handler = nxt_port_mmap_buf_completion;
425 nxt_buf_set_port_mmap(b);
426
427 hdr = nxt_port_mmap_get(task, port, &c, size);
428 if (nxt_slow_path(hdr == NULL)) {
429 nxt_mp_release(port->mem_pool, b);
430 return NULL;
431 }
432
433 b->parent = hdr;
434
435 b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
436 b->mem.pos = b->mem.start;
437 b->mem.free = b->mem.start;
438 b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
439
440 nchunks = size / PORT_MMAP_CHUNK_SIZE;
441 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
442 nchunks++;
443 }
444
445 nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%d] %PI,%d,%d", b,
446 b->mem.start, b->mem.end - b->mem.start,
447 hdr->pid, hdr->id, c);
448
448 c++;
449 nchunks--;
450
451 /* Try to acquire as much chunks as required. */
452 while (nchunks > 0) {
453
454 if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
455 break;
456 }
457
458 b->mem.end += PORT_MMAP_CHUNK_SIZE;
459 c++;
460 nchunks--;
461 }
462
463 return b;
464}
465
466
467nxt_int_t
449 c++;
450 nchunks--;
451
452 /* Try to acquire as much chunks as required. */
453 while (nchunks > 0) {
454
455 if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
456 break;
457 }
458
459 b->mem.end += PORT_MMAP_CHUNK_SIZE;
460 c++;
461 nchunks--;
462 }
463
464 return b;
465}
466
467
468nxt_int_t
468nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size)
469nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
470 size_t min_size)
469{
471{
470 size_t nchunks;
472 size_t nchunks, free_size;
471 nxt_chunk_id_t c, start;
472 nxt_port_mmap_header_t *hdr;
473
474 nxt_debug(task, "request increase %z bytes shm buffer", size);
475
476 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
477 nxt_log(task, NXT_LOG_WARN,
478 "failed to increase, not a mmap buffer");
479 return NXT_ERROR;
480 }
481
473 nxt_chunk_id_t c, start;
474 nxt_port_mmap_header_t *hdr;
475
476 nxt_debug(task, "request increase %z bytes shm buffer", size);
477
478 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
479 nxt_log(task, NXT_LOG_WARN,
480 "failed to increase, not a mmap buffer");
481 return NXT_ERROR;
482 }
483
482 if (nxt_slow_path(size <= (size_t) nxt_buf_mem_free_size(&b->mem))) {
484 free_size = nxt_buf_mem_free_size(&b->mem);
485
486 if (nxt_slow_path(size <= free_size)) {
483 return NXT_OK;
484 }
485
486 hdr = b->parent;
487
488 start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
489
487 return NXT_OK;
488 }
489
490 hdr = b->parent;
491
492 start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
493
490 size -= nxt_buf_mem_free_size(&b->mem);
494 size -= free_size;
491
492 nchunks = size / PORT_MMAP_CHUNK_SIZE;
493 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
494 nchunks++;
495 }
496
497 c = start;
498
499 /* Try to acquire as much chunks as required. */
500 while (nchunks > 0) {
501
502 if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
503 break;
504 }
505
506 c++;
507 nchunks--;
508 }
509
495
496 nchunks = size / PORT_MMAP_CHUNK_SIZE;
497 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
498 nchunks++;
499 }
500
501 c = start;
502
503 /* Try to acquire as much chunks as required. */
504 while (nchunks > 0) {
505
506 if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
507 break;
508 }
509
510 c++;
511 nchunks--;
512 }
513
510 if (nchunks != 0) {
514 if (nchunks != 0 &&
515 min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) {
516
511 c--;
512 while (c >= start) {
513 nxt_port_mmap_set_chunk_free(hdr, c);
514 c--;
515 }
516
517 nxt_debug(task, "failed to increase, %d chunks busy", nchunks);
518
519 return NXT_ERROR;
520 } else {
521 b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
522
523 return NXT_OK;
524 }
525}
526
527
528static nxt_buf_t *
529nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
530 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
531{
532 size_t nchunks;
533 nxt_buf_t *b;
534 nxt_port_mmap_header_t *hdr;
535
536 hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id);
537 if (nxt_slow_path(hdr == NULL)) {
538 return NULL;
539 }
540
541 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
542 if (nxt_slow_path(b == NULL)) {
543 return NULL;
544 }
545
546 b->completion_handler = nxt_port_mmap_buf_completion;
547
548 nxt_buf_set_port_mmap(b);
549
550 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
551 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
552 nchunks++;
553 }
554
555 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
556 b->mem.pos = b->mem.start;
557 b->mem.free = b->mem.start + mmap_msg->size;
558 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
559
560 b->parent = hdr;
561
517 c--;
518 while (c >= start) {
519 nxt_port_mmap_set_chunk_free(hdr, c);
520 c--;
521 }
522
523 nxt_debug(task, "failed to increase, %d chunks busy", nchunks);
524
525 return NXT_ERROR;
526 } else {
527 b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
528
529 return NXT_OK;
530 }
531}
532
533
534static nxt_buf_t *
535nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
536 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
537{
538 size_t nchunks;
539 nxt_buf_t *b;
540 nxt_port_mmap_header_t *hdr;
541
542 hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id);
543 if (nxt_slow_path(hdr == NULL)) {
544 return NULL;
545 }
546
547 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
548 if (nxt_slow_path(b == NULL)) {
549 return NULL;
550 }
551
552 b->completion_handler = nxt_port_mmap_buf_completion;
553
554 nxt_buf_set_port_mmap(b);
555
556 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
557 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
558 nchunks++;
559 }
560
561 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
562 b->mem.pos = b->mem.start;
563 b->mem.free = b->mem.start + mmap_msg->size;
564 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
565
566 b->parent = hdr;
567
568 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI,%d,%d", b,
569 b->mem.start, b->mem.end - b->mem.start,
570 hdr->pid, hdr->id, mmap_msg->chunk_id);
571
562 return b;
563}
564
565
566void
567nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
568 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
569{
570 size_t bsize;
571 nxt_buf_t *bmem;
572 nxt_uint_t i;
573 nxt_port_mmap_msg_t *mmap_msg;
574 nxt_port_mmap_header_t *hdr;
575
576 nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
577 "via shared memory", sb->size, port->pid);
578
579 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
580 mmap_msg = port->mmsg_buf;
581
582 bmem = msg->buf;
583
584 for (i = 0; i < sb->niov; i++, mmap_msg++) {
585
586 /* Lookup buffer which starts current iov_base. */
587 while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
588 bmem = bmem->next;
589 }
590
591 if (nxt_slow_path(bmem == NULL)) {
592 nxt_log_error(NXT_LOG_ERR, task->log, "failed to find buf for "
593 "iobuf[%d]", i);
594 return;
595 /* TODO clear b and exit */
596 }
597
598 hdr = bmem->parent;
599
600 mmap_msg->mmap_id = hdr->id;
601 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
602 mmap_msg->size = sb->iobuf[i].iov_len;
603
604 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
605 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
606 port->pid);
607 }
608
609 sb->iobuf[0].iov_base = port->mmsg_buf;
610 sb->iobuf[0].iov_len = bsize;
611 sb->niov = 1;
612 sb->size = bsize;
613
614 msg->port_msg.mmap = 1;
615}
616
617
618void
619nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
620 nxt_port_recv_msg_t *msg)
621{
622 nxt_buf_t *b, **pb;
623 nxt_port_mmap_msg_t *end, *mmap_msg;
624
625 b = msg->buf;
626
627 mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
628 end = (nxt_port_mmap_msg_t *) b->mem.free;
629
630 pb = &msg->buf;
631 msg->size = 0;
632
633 while (mmap_msg < end) {
634 nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
635 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
636 msg->port_msg.pid);
637
638 *pb = nxt_port_mmap_get_incoming_buf(task, port, msg->port_msg.pid,
639 mmap_msg);
640 if (nxt_slow_path(*pb == NULL)) {
641 nxt_log_error(NXT_LOG_ERR, task->log, "failed to get mmap buffer");
642
643 break;
644 }
645
646 msg->size += mmap_msg->size;
647 pb = &(*pb)->next;
648 mmap_msg++;
649 }
650
651 /* Mark original buf as complete. */
652 b->mem.pos += nxt_buf_used_size(b);
653}
654
655
656nxt_port_method_t
657nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
658{
659 nxt_port_method_t m;
660 nxt_port_mmap_header_t *hdr;
661
662 m = NXT_PORT_METHOD_ANY;
663
664 for (; b != NULL; b = b->next) {
665 if (nxt_buf_used_size(b) == 0) {
666 /* empty buffers does not affect method */
667 continue;
668 }
669
670 if (nxt_buf_is_port_mmap(b)) {
671 hdr = b->parent;
672
673 if (m == NXT_PORT_METHOD_PLAIN) {
674 nxt_log_error(NXT_LOG_ERR, task->log,
675 "mixing plain and mmap buffers, "
676 "using plain mode");
677
678 break;
679 }
680
681 if (port->pid != hdr->pid) {
682 nxt_log_error(NXT_LOG_ERR, task->log,
683 "send mmap buffer for %PI to %PI, "
684 "using plain mode", hdr->pid, port->pid);
685
686 m = NXT_PORT_METHOD_PLAIN;
687
688 break;
689 }
690
691 if (m == NXT_PORT_METHOD_ANY) {
692 nxt_debug(task, "using mmap mode");
693
694 m = NXT_PORT_METHOD_MMAP;
695 }
696 } else {
697 if (m == NXT_PORT_METHOD_MMAP) {
698 nxt_log_error(NXT_LOG_ERR, task->log,
699 "mixing mmap and plain buffers, "
700 "switching to plain mode");
701
702 m = NXT_PORT_METHOD_PLAIN;
703
704 break;
705 }
706
707 if (m == NXT_PORT_METHOD_ANY) {
708 nxt_debug(task, "using plain mode");
709
710 m = NXT_PORT_METHOD_PLAIN;
711 }
712 }
713 }
714
715 return m;
716}
572 return b;
573}
574
575
576void
577nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
578 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
579{
580 size_t bsize;
581 nxt_buf_t *bmem;
582 nxt_uint_t i;
583 nxt_port_mmap_msg_t *mmap_msg;
584 nxt_port_mmap_header_t *hdr;
585
586 nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
587 "via shared memory", sb->size, port->pid);
588
589 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
590 mmap_msg = port->mmsg_buf;
591
592 bmem = msg->buf;
593
594 for (i = 0; i < sb->niov; i++, mmap_msg++) {
595
596 /* Lookup buffer which starts current iov_base. */
597 while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
598 bmem = bmem->next;
599 }
600
601 if (nxt_slow_path(bmem == NULL)) {
602 nxt_log_error(NXT_LOG_ERR, task->log, "failed to find buf for "
603 "iobuf[%d]", i);
604 return;
605 /* TODO clear b and exit */
606 }
607
608 hdr = bmem->parent;
609
610 mmap_msg->mmap_id = hdr->id;
611 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
612 mmap_msg->size = sb->iobuf[i].iov_len;
613
614 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
615 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
616 port->pid);
617 }
618
619 sb->iobuf[0].iov_base = port->mmsg_buf;
620 sb->iobuf[0].iov_len = bsize;
621 sb->niov = 1;
622 sb->size = bsize;
623
624 msg->port_msg.mmap = 1;
625}
626
627
628void
629nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
630 nxt_port_recv_msg_t *msg)
631{
632 nxt_buf_t *b, **pb;
633 nxt_port_mmap_msg_t *end, *mmap_msg;
634
635 b = msg->buf;
636
637 mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
638 end = (nxt_port_mmap_msg_t *) b->mem.free;
639
640 pb = &msg->buf;
641 msg->size = 0;
642
643 while (mmap_msg < end) {
644 nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
645 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
646 msg->port_msg.pid);
647
648 *pb = nxt_port_mmap_get_incoming_buf(task, port, msg->port_msg.pid,
649 mmap_msg);
650 if (nxt_slow_path(*pb == NULL)) {
651 nxt_log_error(NXT_LOG_ERR, task->log, "failed to get mmap buffer");
652
653 break;
654 }
655
656 msg->size += mmap_msg->size;
657 pb = &(*pb)->next;
658 mmap_msg++;
659 }
660
661 /* Mark original buf as complete. */
662 b->mem.pos += nxt_buf_used_size(b);
663}
664
665
666nxt_port_method_t
667nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
668{
669 nxt_port_method_t m;
670 nxt_port_mmap_header_t *hdr;
671
672 m = NXT_PORT_METHOD_ANY;
673
674 for (; b != NULL; b = b->next) {
675 if (nxt_buf_used_size(b) == 0) {
676 /* empty buffers does not affect method */
677 continue;
678 }
679
680 if (nxt_buf_is_port_mmap(b)) {
681 hdr = b->parent;
682
683 if (m == NXT_PORT_METHOD_PLAIN) {
684 nxt_log_error(NXT_LOG_ERR, task->log,
685 "mixing plain and mmap buffers, "
686 "using plain mode");
687
688 break;
689 }
690
691 if (port->pid != hdr->pid) {
692 nxt_log_error(NXT_LOG_ERR, task->log,
693 "send mmap buffer for %PI to %PI, "
694 "using plain mode", hdr->pid, port->pid);
695
696 m = NXT_PORT_METHOD_PLAIN;
697
698 break;
699 }
700
701 if (m == NXT_PORT_METHOD_ANY) {
702 nxt_debug(task, "using mmap mode");
703
704 m = NXT_PORT_METHOD_MMAP;
705 }
706 } else {
707 if (m == NXT_PORT_METHOD_MMAP) {
708 nxt_log_error(NXT_LOG_ERR, task->log,
709 "mixing mmap and plain buffers, "
710 "switching to plain mode");
711
712 m = NXT_PORT_METHOD_PLAIN;
713
714 break;
715 }
716
717 if (m == NXT_PORT_METHOD_ANY) {
718 nxt_debug(task, "using plain mode");
719
720 m = NXT_PORT_METHOD_PLAIN;
721 }
722 }
723 }
724
725 return m;
726}