Deleted
Added
nxt_port_memory.c (364:742e5c203c6d) | nxt_port_memory.c (365:28b2a468be43) |
---|---|
1 2/* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9#if (NXT_HAVE_MEMFD_CREATE) 10 11#include <linux/memfd.h> 12#include <unistd.h> 13#include <sys/syscall.h> 14 15#endif 16 17#include <nxt_port_memory_int.h> 18 | 1 2/* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9#if (NXT_HAVE_MEMFD_CREATE) 10 11#include <linux/memfd.h> 12#include <unistd.h> 13#include <sys/syscall.h> 14 15#endif 16 17#include <nxt_port_memory_int.h> 18 |
19 |
|
19nxt_inline void | 20nxt_inline void |
20nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) | 21nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i) |
21{ | 22{ |
22 if (port_mmap->hdr != NULL) { 23 nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE); 24 port_mmap->hdr = NULL; | 23 int c; 24 25 c = nxt_atomic_fetch_add(&mmap_handler->use_count, i); 26 27 if (i < 0 && c == -i) { 28 if (mmap_handler->hdr != NULL) { 29 nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE); 30 mmap_handler->hdr = NULL; 31 } 32 33 nxt_free(mmap_handler); |
25 } 26} 27 28 29static nxt_port_mmap_t * 30nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i) 31{ 32 uint32_t cap; --- 32 unchanged lines hidden (view full) --- 65 port_mmaps->size = i + 1; 66 } 67 68 return port_mmaps->elts + i; 69} 70 71 72void | 34 } 35} 36 37 38static nxt_port_mmap_t * 39nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i) 40{ 41 uint32_t cap; --- 32 unchanged lines hidden (view full) --- 74 port_mmaps->size = i + 1; 75 } 76 77 return port_mmaps->elts + i; 78} 79 80 81void |
73nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free) | 82nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts) |
74{ 75 uint32_t i; 76 nxt_port_mmap_t *port_mmap; 77 78 if (port_mmaps == NULL) { 79 return; 80 } 81 82 port_mmap = port_mmaps->elts; 83 84 for (i = 0; i < port_mmaps->size; i++) { | 83{ 84 uint32_t i; 85 nxt_port_mmap_t *port_mmap; 86 87 if (port_mmaps == NULL) { 88 return; 89 } 90 91 port_mmap = port_mmaps->elts; 92 93 for (i = 0; i < port_mmaps->size; i++) { |
85 nxt_port_mmap_destroy(port_mmap + i); | 94 nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1); |
86 } 87 88 port_mmaps->size = 0; 89 | 95 } 96 97 port_mmaps->size = 0; 98 |
90 if (free != 0) { | 99 if (free_elts != 0) { |
91 nxt_free(port_mmaps->elts); 92 } 93} 94 95 96#define nxt_port_mmap_free_junk(p, size) \ 97 memset((p), 0xA5, size) 98 99 100static void 101nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) 102{ | 100 nxt_free(port_mmaps->elts); 101 } 102} 103 104 105#define nxt_port_mmap_free_junk(p, size) \ 106 memset((p), 0xA5, size) 107 108 109static void 110nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) 111{ |
103 u_char *p; 104 nxt_mp_t *mp; 105 nxt_buf_t *b; 106 nxt_chunk_id_t c; 107 nxt_port_mmap_header_t *hdr; | 112 u_char *p; 113 nxt_mp_t *mp; 114 nxt_buf_t *b; 115 nxt_chunk_id_t c; 116 nxt_port_mmap_header_t *hdr; 117 nxt_port_mmap_handler_t *mmap_handler; |
108 109 if (nxt_buf_ts_handle(task, obj, data)) { 110 return; 111 } 112 113 b = obj; 114 115 mp = b->data; 116 117#if (NXT_DEBUG) 118 if (nxt_slow_path(data != b->parent)) { 119 nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)", 120 data, b->parent); 121 nxt_abort(); 122 } 123#endif 124 | 118 119 if (nxt_buf_ts_handle(task, obj, data)) { 120 return; 121 } 122 123 b = obj; 124 125 mp = b->data; 126 127#if (NXT_DEBUG) 128 if (nxt_slow_path(data != b->parent)) { 129 nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)", 130 data, b->parent); 131 nxt_abort(); 132 } 133#endif 134 |
125 hdr = data; | 135 mmap_handler = data; 136 hdr = mmap_handler->hdr; |
126 127 if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) { 128 nxt_debug(task, "mmap buf completion: mmap for other process pair " 129 "%PI->%PI", hdr->src_pid, hdr->dst_pid); 130 131 goto release_buf; 132 } 133 --- 21 unchanged lines hidden (view full) --- 155 nxt_port_mmap_set_chunk_free(hdr, c); 156 157 p += PORT_MMAP_CHUNK_SIZE; 158 c++; 159 } 160 161release_buf: 162 | 137 138 if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) { 139 nxt_debug(task, "mmap buf completion: mmap for other process pair " 140 "%PI->%PI", hdr->src_pid, hdr->dst_pid); 141 142 goto release_buf; 143 } 144 --- 21 unchanged lines hidden (view full) --- 166 nxt_port_mmap_set_chunk_free(hdr, c); 167 168 p += PORT_MMAP_CHUNK_SIZE; 169 c++; 170 } 171 172release_buf: 173 |
174 nxt_port_mmap_handler_use(mmap_handler, -1); 175 |
|
163 nxt_mp_release(mp, b); 164} 165 166 | 176 nxt_mp_release(mp, b); 177} 178 179 |
167nxt_port_mmap_header_t * | 180nxt_port_mmap_handler_t * |
168nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, 169 nxt_fd_t fd) 170{ | 181nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, 182 nxt_fd_t fd) 183{ |
171 void *mem; 172 struct stat mmap_stat; 173 nxt_port_mmap_t *port_mmap; 174 nxt_port_mmap_header_t *hdr; | 184 void *mem; 185 struct stat mmap_stat; 186 nxt_port_mmap_t *port_mmap; 187 nxt_port_mmap_header_t *hdr; 188 nxt_port_mmap_handler_t *mmap_handler; |
175 176 nxt_debug(task, "got new mmap fd #%FD from process %PI", 177 fd, process->pid); 178 179 port_mmap = NULL; 180 hdr = NULL; 181 182 if (fstat(fd, &mmap_stat) == -1) { --- 8 unchanged lines hidden (view full) --- 191 if (nxt_slow_path(mem == MAP_FAILED)) { 192 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); 193 194 return NULL; 195 } 196 197 hdr = mem; 198 | 189 190 nxt_debug(task, "got new mmap fd #%FD from process %PI", 191 fd, process->pid); 192 193 port_mmap = NULL; 194 hdr = NULL; 195 196 if (fstat(fd, &mmap_stat) == -1) { --- 8 unchanged lines hidden (view full) --- 205 if (nxt_slow_path(mem == MAP_FAILED)) { 206 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); 207 208 return NULL; 209 } 210 211 hdr = mem; 212 |
213 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 214 if (nxt_slow_path(mmap_handler == NULL)) { 215 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); 216 217 return NULL; 218 } 219 220 mmap_handler->hdr = hdr; 221 |
|
199 nxt_thread_mutex_lock(&process->incoming.mutex); 200 201 port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); 202 if (nxt_slow_path(port_mmap == NULL)) { 203 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); 204 205 nxt_mem_munmap(mem, PORT_MMAP_SIZE); 206 hdr = NULL; 207 | 222 nxt_thread_mutex_lock(&process->incoming.mutex); 223 224 port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); 225 if (nxt_slow_path(port_mmap == NULL)) { 226 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); 227 228 nxt_mem_munmap(mem, PORT_MMAP_SIZE); 229 hdr = NULL; 230 |
231 nxt_free(mmap_handler); 232 mmap_handler = NULL; 233 |
|
208 goto fail; 209 } 210 211 nxt_assert(hdr->src_pid == process->pid); 212 nxt_assert(hdr->dst_pid == nxt_pid); 213 | 234 goto fail; 235 } 236 237 nxt_assert(hdr->src_pid == process->pid); 238 nxt_assert(hdr->dst_pid == nxt_pid); 239 |
214 port_mmap->hdr = hdr; | 240 port_mmap->mmap_handler = mmap_handler; 241 nxt_port_mmap_handler_use(mmap_handler, 1); |
215 216 hdr->sent_over = 0xFFFFu; 217 218fail: 219 220 nxt_thread_mutex_unlock(&process->incoming.mutex); 221 | 242 243 hdr->sent_over = 0xFFFFu; 244 245fail: 246 247 nxt_thread_mutex_unlock(&process->incoming.mutex); 248 |
222 return hdr; | 249 return mmap_handler; |
223} 224 225 | 250} 251 252 |
226static nxt_port_mmap_header_t * | 253static nxt_port_mmap_handler_t * |
227nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, 228 nxt_port_t *port) 229{ | 254nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, 255 nxt_port_t *port) 256{ |
230 void *mem; 231 u_char *p, name[64]; 232 nxt_fd_t fd; 233 nxt_port_mmap_t *port_mmap; 234 nxt_port_mmap_header_t *hdr; | 257 void *mem; 258 u_char *p, name[64]; 259 nxt_fd_t fd; 260 nxt_port_mmap_t *port_mmap; 261 nxt_port_mmap_header_t *hdr; 262 nxt_port_mmap_handler_t *mmap_handler; |
235 | 263 |
264 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 265 if (nxt_slow_path(mmap_handler == NULL)) { 266 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); 267 268 return NULL; 269 } 270 |
|
236 port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); 237 if (nxt_slow_path(port_mmap == NULL)) { 238 nxt_log(task, NXT_LOG_WARN, 239 "failed to add port mmap to outgoing array"); 240 | 271 port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); 272 if (nxt_slow_path(port_mmap == NULL)) { 273 nxt_log(task, NXT_LOG_WARN, 274 "failed to add port mmap to outgoing array"); 275 |
276 nxt_free(mmap_handler); |
|
241 return NULL; 242 } 243 244 p = nxt_sprintf(name, name + sizeof(name), "/unit.%PI.%uxD", 245 nxt_pid, nxt_random(&task->thread->random)); 246 *p = '\0'; 247 248#if (NXT_HAVE_MEMFD_CREATE) --- 39 unchanged lines hidden (view full) --- 288 289 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, 290 MAP_SHARED, fd, 0); 291 292 if (nxt_slow_path(mem == MAP_FAILED)) { 293 goto remove_fail; 294 } 295 | 277 return NULL; 278 } 279 280 p = nxt_sprintf(name, name + sizeof(name), "/unit.%PI.%uxD", 281 nxt_pid, nxt_random(&task->thread->random)); 282 *p = '\0'; 283 284#if (NXT_HAVE_MEMFD_CREATE) --- 39 unchanged lines hidden (view full) --- 324 325 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, 326 MAP_SHARED, fd, 0); 327 328 if (nxt_slow_path(mem == MAP_FAILED)) { 329 goto remove_fail; 330 } 331 |
296 port_mmap->hdr = mem; | 332 mmap_handler->hdr = mem; 333 port_mmap->mmap_handler = mmap_handler; 334 nxt_port_mmap_handler_use(mmap_handler, 1); |
297 298 /* Init segment header. */ | 335 336 /* Init segment header. */ |
299 hdr = port_mmap->hdr; | 337 hdr = mmap_handler->hdr; |
300 301 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 302 303 hdr->id = process->outgoing.size - 1; 304 hdr->src_pid = nxt_pid; 305 hdr->dst_pid = process->pid; 306 hdr->sent_over = port->id; 307 --- 6 unchanged lines hidden (view full) --- 314 nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); 315 316 /* TODO handle error */ 317 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 318 319 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", 320 hdr->id, nxt_pid, process->pid); 321 | 338 339 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 340 341 hdr->id = process->outgoing.size - 1; 342 hdr->src_pid = nxt_pid; 343 hdr->dst_pid = process->pid; 344 hdr->sent_over = port->id; 345 --- 6 unchanged lines hidden (view full) --- 352 nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); 353 354 /* TODO handle error */ 355 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 356 357 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", 358 hdr->id, nxt_pid, process->pid); 359 |
322 return hdr; | 360 return mmap_handler; |
323 324remove_fail: 325 | 361 362remove_fail: 363 |
364 nxt_free(mmap_handler); 365 |
|
326 process->outgoing.size--; 327 328 return NULL; 329} 330 331 | 366 process->outgoing.size--; 367 368 return NULL; 369} 370 371 |
332static nxt_port_mmap_header_t * | 372static nxt_port_mmap_handler_t * |
333nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, 334 size_t size) 335{ | 373nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, 374 size_t size) 375{ |
336 nxt_process_t *process; 337 nxt_port_mmap_t *port_mmap; 338 nxt_port_mmap_t *end_port_mmap; 339 nxt_port_mmap_header_t *hdr; | 376 nxt_process_t *process; 377 nxt_port_mmap_t *port_mmap; 378 nxt_port_mmap_t *end_port_mmap; 379 nxt_port_mmap_header_t *hdr; 380 nxt_port_mmap_handler_t *mmap_handler; |
340 341 process = port->process; 342 if (nxt_slow_path(process == NULL)) { 343 return NULL; 344 } 345 346 *c = 0; | 381 382 process = port->process; 383 if (nxt_slow_path(process == NULL)) { 384 return NULL; 385 } 386 387 *c = 0; |
347 port_mmap = NULL; 348 hdr = NULL; | |
349 350 nxt_thread_mutex_lock(&process->outgoing.mutex); 351 | 388 389 nxt_thread_mutex_lock(&process->outgoing.mutex); 390 |
352 port_mmap = process->outgoing.elts; 353 end_port_mmap = port_mmap + process->outgoing.size; | 391 end_port_mmap = process->outgoing.elts + process->outgoing.size; |
354 | 392 |
355 while (port_mmap < end_port_mmap) { | 393 for (port_mmap = process->outgoing.elts; 394 port_mmap < end_port_mmap; 395 port_mmap++) 396 { 397 mmap_handler = port_mmap->mmap_handler; 398 hdr = mmap_handler->hdr; |
356 | 399 |
357 if ( (port_mmap->hdr->sent_over == 0xFFFFu || 358 port_mmap->hdr->sent_over == port->id) && 359 nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) { 360 hdr = port_mmap->hdr; | 400 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) { 401 continue; 402 } |
361 | 403 |
404 if (nxt_port_mmap_get_free_chunk(hdr, c)) { |
|
362 goto unlock_return; 363 } | 405 goto unlock_return; 406 } |
364 365 port_mmap++; | |
366 } 367 368 /* TODO introduce port_mmap limit and release wait. */ 369 | 407 } 408 409 /* TODO introduce port_mmap limit and release wait. */ 410 |
370 hdr = nxt_port_new_port_mmap(task, process, port); | 411 mmap_handler = nxt_port_new_port_mmap(task, process, port); |
371 372unlock_return: 373 374 nxt_thread_mutex_unlock(&process->outgoing.mutex); 375 | 412 413unlock_return: 414 415 nxt_thread_mutex_unlock(&process->outgoing.mutex); 416 |
376 return hdr; | 417 return mmap_handler; |
377} 378 379 | 418} 419 420 |
380static nxt_port_mmap_header_t * | 421static nxt_port_mmap_handler_t * |
381nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) 382{ | 422nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) 423{ |
383 nxt_process_t *process; 384 nxt_port_mmap_header_t *hdr; | 424 nxt_process_t *process; 425 nxt_port_mmap_handler_t *mmap_handler; |
385 386 process = nxt_runtime_process_find(task->thread->runtime, spid); 387 if (nxt_slow_path(process == NULL)) { 388 return NULL; 389 } 390 | 426 427 process = nxt_runtime_process_find(task->thread->runtime, spid); 428 if (nxt_slow_path(process == NULL)) { 429 return NULL; 430 } 431 |
391 hdr = NULL; | 432 mmap_handler = NULL; |
392 393 nxt_thread_mutex_lock(&process->incoming.mutex); 394 395 if (nxt_fast_path(process->incoming.size > id)) { | 433 434 nxt_thread_mutex_lock(&process->incoming.mutex); 435 436 if (nxt_fast_path(process->incoming.size > id)) { |
396 hdr = process->incoming.elts[id].hdr; 397 398 } else { 399 nxt_log(task, NXT_LOG_WARN, 400 "failed to get incoming mmap #%d for process %PI", id, spid); | 437 mmap_handler = process->incoming.elts[id].mmap_handler; |
401 } 402 403 nxt_thread_mutex_unlock(&process->incoming.mutex); 404 | 438 } 439 440 nxt_thread_mutex_unlock(&process->incoming.mutex); 441 |
405 return hdr; | 442 return mmap_handler; |
406} 407 408 409nxt_buf_t * 410nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) 411{ | 443} 444 445 446nxt_buf_t * 447nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) 448{ |
412 size_t nchunks; 413 nxt_buf_t *b; 414 nxt_chunk_id_t c; 415 nxt_port_mmap_header_t *hdr; | 449 size_t nchunks; 450 nxt_buf_t *b; 451 nxt_chunk_id_t c; 452 nxt_port_mmap_header_t *hdr; 453 nxt_port_mmap_handler_t *mmap_handler; |
416 417 nxt_debug(task, "request %z bytes shm buffer", size); 418 419 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0); 420 if (nxt_slow_path(b == NULL)) { 421 return NULL; 422 } 423 424 b->completion_handler = nxt_port_mmap_buf_completion; 425 nxt_buf_set_port_mmap(b); 426 | 454 455 nxt_debug(task, "request %z bytes shm buffer", size); 456 457 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0); 458 if (nxt_slow_path(b == NULL)) { 459 return NULL; 460 } 461 462 b->completion_handler = nxt_port_mmap_buf_completion; 463 nxt_buf_set_port_mmap(b); 464 |
427 hdr = nxt_port_mmap_get(task, port, &c, size); 428 if (nxt_slow_path(hdr == NULL)) { | 465 mmap_handler = nxt_port_mmap_get(task, port, &c, size); 466 if (nxt_slow_path(mmap_handler == NULL)) { |
429 nxt_mp_release(task->thread->engine->mem_pool, b); 430 return NULL; 431 } 432 | 467 nxt_mp_release(task->thread->engine->mem_pool, b); 468 return NULL; 469 } 470 |
433 b->parent = hdr; | 471 b->parent = mmap_handler; |
434 | 472 |
473 nxt_port_mmap_handler_use(mmap_handler, 1); 474 475 hdr = mmap_handler->hdr; 476 |
|
435 b->mem.start = nxt_port_mmap_chunk_start(hdr, c); 436 b->mem.pos = b->mem.start; 437 b->mem.free = b->mem.start; 438 b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE; 439 440 nchunks = size / PORT_MMAP_CHUNK_SIZE; 441 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { 442 nchunks++; --- 21 unchanged lines hidden (view full) --- 464 return b; 465} 466 467 468nxt_int_t 469nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, 470 size_t min_size) 471{ | 477 b->mem.start = nxt_port_mmap_chunk_start(hdr, c); 478 b->mem.pos = b->mem.start; 479 b->mem.free = b->mem.start; 480 b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE; 481 482 nchunks = size / PORT_MMAP_CHUNK_SIZE; 483 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { 484 nchunks++; --- 21 unchanged lines hidden (view full) --- 506 return b; 507} 508 509 510nxt_int_t 511nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, 512 size_t min_size) 513{ |
472 size_t nchunks, free_size; 473 nxt_chunk_id_t c, start; 474 nxt_port_mmap_header_t *hdr; | 514 size_t nchunks, free_size; 515 nxt_chunk_id_t c, start; 516 nxt_port_mmap_header_t *hdr; 517 nxt_port_mmap_handler_t *mmap_handler; |
475 476 nxt_debug(task, "request increase %z bytes shm buffer", size); 477 478 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) { 479 nxt_log(task, NXT_LOG_WARN, 480 "failed to increase, not a mmap buffer"); 481 return NXT_ERROR; 482 } 483 484 free_size = nxt_buf_mem_free_size(&b->mem); 485 486 if (nxt_slow_path(size <= free_size)) { 487 return NXT_OK; 488 } 489 | 518 519 nxt_debug(task, "request increase %z bytes shm buffer", size); 520 521 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) { 522 nxt_log(task, NXT_LOG_WARN, 523 "failed to increase, not a mmap buffer"); 524 return NXT_ERROR; 525 } 526 527 free_size = nxt_buf_mem_free_size(&b->mem); 528 529 if (nxt_slow_path(size <= free_size)) { 530 return NXT_OK; 531 } 532 |
490 hdr = b->parent; | 533 mmap_handler = b->parent; 534 hdr = mmap_handler->hdr; |
491 492 start = nxt_port_mmap_chunk_id(hdr, b->mem.end); 493 494 size -= free_size; 495 496 nchunks = size / PORT_MMAP_CHUNK_SIZE; 497 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { 498 nchunks++; --- 32 unchanged lines hidden (view full) --- 531 } 532} 533 534 535static nxt_buf_t * 536nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, 537 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) 538{ | 535 536 start = nxt_port_mmap_chunk_id(hdr, b->mem.end); 537 538 size -= free_size; 539 540 nchunks = size / PORT_MMAP_CHUNK_SIZE; 541 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { 542 nchunks++; --- 32 unchanged lines hidden (view full) --- 575 } 576} 577 578 579static nxt_buf_t * 580nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, 581 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) 582{ |
539 size_t nchunks; 540 nxt_buf_t *b; 541 nxt_port_mmap_header_t *hdr; | 583 size_t nchunks; 584 nxt_buf_t *b; 585 nxt_port_mmap_header_t *hdr; 586 nxt_port_mmap_handler_t *mmap_handler; |
542 | 587 |
543 hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id); 544 if (nxt_slow_path(hdr == NULL)) { | 588 mmap_handler = nxt_port_get_port_incoming_mmap(task, spid, 589 mmap_msg->mmap_id); 590 if (nxt_slow_path(mmap_handler == NULL)) { |
545 return NULL; 546 } 547 548 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); 549 if (nxt_slow_path(b == NULL)) { 550 return NULL; 551 } 552 553 b->completion_handler = nxt_port_mmap_buf_completion; 554 555 nxt_buf_set_port_mmap(b); 556 557 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; 558 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { 559 nchunks++; 560 } 561 | 591 return NULL; 592 } 593 594 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); 595 if (nxt_slow_path(b == NULL)) { 596 return NULL; 597 } 598 599 b->completion_handler = nxt_port_mmap_buf_completion; 600 601 nxt_buf_set_port_mmap(b); 602 603 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; 604 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { 605 nchunks++; 606 } 607 |
608 hdr = mmap_handler->hdr; 609 |
|
562 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 563 b->mem.pos = b->mem.start; 564 b->mem.free = b->mem.start + mmap_msg->size; 565 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 566 | 610 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 611 b->mem.pos = b->mem.start; 612 b->mem.free = b->mem.start + mmap_msg->size; 613 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 614 |
567 b->parent = hdr; | 615 b->parent = mmap_handler; 616 nxt_port_mmap_handler_use(mmap_handler, 1); |
568 569 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d", 570 b, b->mem.start, b->mem.end - b->mem.start, 571 hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id); 572 573 return b; 574} 575 576 577void 578nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, 579 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb) 580{ | 617 618 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d", 619 b, b->mem.start, b->mem.end - b->mem.start, 620 hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id); 621 622 return b; 623} 624 625 626void 627nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, 628 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb) 629{ |
581 size_t bsize; 582 nxt_buf_t *bmem; 583 nxt_uint_t i; 584 nxt_port_mmap_msg_t *mmap_msg; 585 nxt_port_mmap_header_t *hdr; | 630 size_t bsize; 631 nxt_buf_t *bmem; 632 nxt_uint_t i; 633 nxt_port_mmap_msg_t *mmap_msg; 634 nxt_port_mmap_header_t *hdr; 635 nxt_port_mmap_handler_t *mmap_handler; |
586 587 nxt_debug(task, "prepare %z bytes message for transfer to process %PI " 588 "via shared memory", sb->size, port->pid); 589 590 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); 591 mmap_msg = port->mmsg_buf; 592 593 bmem = msg->buf; --- 7 unchanged lines hidden (view full) --- 601 602 if (nxt_slow_path(bmem == NULL)) { 603 nxt_log_error(NXT_LOG_ERR, task->log, 604 "failed to find buf for iobuf[%d]", i); 605 return; 606 /* TODO clear b and exit */ 607 } 608 | 636 637 nxt_debug(task, "prepare %z bytes message for transfer to process %PI " 638 "via shared memory", sb->size, port->pid); 639 640 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); 641 mmap_msg = port->mmsg_buf; 642 643 bmem = msg->buf; --- 7 unchanged lines hidden (view full) --- 651 652 if (nxt_slow_path(bmem == NULL)) { 653 nxt_log_error(NXT_LOG_ERR, task->log, 654 "failed to find buf for iobuf[%d]", i); 655 return; 656 /* TODO clear b and exit */ 657 } 658 |
609 hdr = bmem->parent; | 659 mmap_handler = bmem->parent; 660 hdr = mmap_handler->hdr; |
610 611 mmap_msg->mmap_id = hdr->id; 612 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); 613 mmap_msg->size = sb->iobuf[i].iov_len; 614 615 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI", 616 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 617 port->pid); --- 44 unchanged lines hidden (view full) --- 662 /* Mark original buf as complete. */ 663 b->mem.pos += nxt_buf_used_size(b); 664} 665 666 667nxt_port_method_t 668nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) 669{ | 661 662 mmap_msg->mmap_id = hdr->id; 663 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); 664 mmap_msg->size = sb->iobuf[i].iov_len; 665 666 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI", 667 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 668 port->pid); --- 44 unchanged lines hidden (view full) --- 713 /* Mark original buf as complete. */ 714 b->mem.pos += nxt_buf_used_size(b); 715} 716 717 718nxt_port_method_t 719nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) 720{ |
670 nxt_port_method_t m; 671 nxt_port_mmap_header_t *hdr; | 721 nxt_port_method_t m; 722 nxt_port_mmap_header_t *hdr; 723 nxt_port_mmap_handler_t *mmap_handler; |
672 673 m = NXT_PORT_METHOD_ANY; 674 675 for (; b != NULL; b = b->next) { 676 if (nxt_buf_used_size(b) == 0) { 677 /* empty buffers does not affect method */ 678 continue; 679 } 680 681 if (nxt_buf_is_port_mmap(b)) { | 724 725 m = NXT_PORT_METHOD_ANY; 726 727 for (; b != NULL; b = b->next) { 728 if (nxt_buf_used_size(b) == 0) { 729 /* empty buffers does not affect method */ 730 continue; 731 } 732 733 if (nxt_buf_is_port_mmap(b)) { |
682 hdr = b->parent; | 734 mmap_handler = b->parent; 735 hdr = mmap_handler->hdr; |
683 684 if (m == NXT_PORT_METHOD_PLAIN) { 685 nxt_log_error(NXT_LOG_ERR, task->log, 686 "mixing plain and mmap buffers, " 687 "using plain mode"); 688 689 break; 690 } --- 37 unchanged lines hidden --- | 736 737 if (m == NXT_PORT_METHOD_PLAIN) { 738 nxt_log_error(NXT_LOG_ERR, task->log, 739 "mixing plain and mmap buffers, " 740 "using plain mode"); 741 742 break; 743 } --- 37 unchanged lines hidden --- |