Deleted
Added
nxt_port_memory.c (1526:5c2a0b6f92e7) | nxt_port_memory.c (1546:06017e6e3a5f) |
---|---|
1 2/* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 --- 268 unchanged lines hidden (view full) --- 277 278 nxt_thread_mutex_unlock(&process->incoming.mutex); 279 280 return mmap_handler; 281} 282 283 284static nxt_port_mmap_handler_t * | 1 2/* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 --- 268 unchanged lines hidden (view full) --- 277 278 nxt_thread_mutex_unlock(&process->incoming.mutex); 279 280 return mmap_handler; 281} 282 283 284static nxt_port_mmap_handler_t * |
285nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, 286 nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n) | 285nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps, 286 nxt_bool_t tracking, nxt_int_t n) |
287{ 288 void *mem; 289 nxt_fd_t fd; 290 nxt_int_t i; 291 nxt_free_map_t *free_map; 292 nxt_port_mmap_t *port_mmap; 293 nxt_port_mmap_header_t *hdr; 294 nxt_port_mmap_handler_t *mmap_handler; 295 296 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 297 if (nxt_slow_path(mmap_handler == NULL)) { | 287{ 288 void *mem; 289 nxt_fd_t fd; 290 nxt_int_t i; 291 nxt_free_map_t *free_map; 292 nxt_port_mmap_t *port_mmap; 293 nxt_port_mmap_header_t *hdr; 294 nxt_port_mmap_handler_t *mmap_handler; 295 296 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 297 if (nxt_slow_path(mmap_handler == NULL)) { |
298 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); | 298 nxt_alert(task, "failed to allocate mmap_handler"); |
299 300 return NULL; 301 } 302 | 299 300 return NULL; 301 } 302 |
303 port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); | 303 port_mmap = nxt_port_mmap_at(mmaps, mmaps->size); |
304 if (nxt_slow_path(port_mmap == NULL)) { | 304 if (nxt_slow_path(port_mmap == NULL)) { |
305 nxt_log(task, NXT_LOG_WARN, 306 "failed to add port mmap to outgoing array"); | 305 nxt_alert(task, "failed to add port mmap to mmaps array"); |
307 308 nxt_free(mmap_handler); 309 return NULL; 310 } 311 312 fd = nxt_shm_open(task, PORT_MMAP_SIZE); 313 if (nxt_slow_path(fd == -1)) { 314 goto remove_fail; 315 } 316 317 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, 318 MAP_SHARED, fd, 0); 319 320 if (nxt_slow_path(mem == MAP_FAILED)) { 321 goto remove_fail; 322 } 323 324 mmap_handler->hdr = mem; | 306 307 nxt_free(mmap_handler); 308 return NULL; 309 } 310 311 fd = nxt_shm_open(task, PORT_MMAP_SIZE); 312 if (nxt_slow_path(fd == -1)) { 313 goto remove_fail; 314 } 315 316 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, 317 MAP_SHARED, fd, 0); 318 319 if (nxt_slow_path(mem == MAP_FAILED)) { 320 goto remove_fail; 321 } 322 323 mmap_handler->hdr = mem; |
324 mmap_handler->fd = fd; |
|
325 port_mmap->mmap_handler = mmap_handler; 326 nxt_port_mmap_handler_use(mmap_handler, 1); 327 328 /* Init segment header. */ 329 hdr = mmap_handler->hdr; 330 331 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 332 nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 333 | 325 port_mmap->mmap_handler = mmap_handler; 326 nxt_port_mmap_handler_use(mmap_handler, 1); 327 328 /* Init segment header. */ 329 hdr = mmap_handler->hdr; 330 331 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 332 nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 333 |
334 hdr->id = process->outgoing.size - 1; | 334 hdr->id = mmaps->size - 1; |
335 hdr->src_pid = nxt_pid; | 335 hdr->src_pid = nxt_pid; |
336 hdr->dst_pid = process->pid; 337 hdr->sent_over = port->id; | 336 hdr->sent_over = 0xFFFFu; |
338 339 /* Mark first chunk as busy */ 340 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 341 342 for (i = 0; i < n; i++) { 343 nxt_port_mmap_set_chunk_busy(free_map, i); 344 } 345 346 /* Mark as busy chunk followed the last available chunk. */ 347 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 348 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 349 | 337 338 /* Mark first chunk as busy */ 339 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 340 341 for (i = 0; i < n; i++) { 342 nxt_port_mmap_set_chunk_busy(free_map, i); 343 } 344 345 /* Mark as busy chunk followed the last available chunk. */ 346 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 347 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 348 |
350 nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); | 349 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...", 350 hdr->id, nxt_pid); |
351 | 351 |
352 /* TODO handle error */ 353 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 354 355 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", 356 hdr->id, nxt_pid, process->pid); 357 | |
358 return mmap_handler; 359 360remove_fail: 361 362 nxt_free(mmap_handler); 363 | 352 return mmap_handler; 353 354remove_fail: 355 356 nxt_free(mmap_handler); 357 |
364 process->outgoing.size--; | 358 mmaps->size--; |
365 366 return NULL; 367} 368 369 370nxt_int_t 371nxt_shm_open(nxt_task_t *task, size_t size) 372{ --- 67 unchanged lines hidden (view full) --- 440 return -1; 441 } 442 443 return fd; 444} 445 446 447static nxt_port_mmap_handler_t * | 359 360 return NULL; 361} 362 363 364nxt_int_t 365nxt_shm_open(nxt_task_t *task, size_t size) 366{ --- 67 unchanged lines hidden (view full) --- 434 return -1; 435 } 436 437 return fd; 438} 439 440 441static nxt_port_mmap_handler_t * |
448nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, | 442nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c, |
449 nxt_int_t n, nxt_bool_t tracking) 450{ 451 nxt_int_t i, res, nchunks; | 443 nxt_int_t n, nxt_bool_t tracking) 444{ 445 nxt_int_t i, res, nchunks; |
452 nxt_process_t *process; | |
453 nxt_free_map_t *free_map; 454 nxt_port_mmap_t *port_mmap; 455 nxt_port_mmap_t *end_port_mmap; 456 nxt_port_mmap_header_t *hdr; 457 nxt_port_mmap_handler_t *mmap_handler; 458 | 446 nxt_free_map_t *free_map; 447 nxt_port_mmap_t *port_mmap; 448 nxt_port_mmap_t *end_port_mmap; 449 nxt_port_mmap_header_t *hdr; 450 nxt_port_mmap_handler_t *mmap_handler; 451 |
459 process = port->process; 460 if (nxt_slow_path(process == NULL)) { 461 return NULL; 462 } | 452 nxt_thread_mutex_lock(&mmaps->mutex); |
463 | 453 |
464 nxt_thread_mutex_lock(&process->outgoing.mutex); | 454 end_port_mmap = mmaps->elts + mmaps->size; |
465 | 455 |
466 end_port_mmap = process->outgoing.elts + process->outgoing.size; 467 468 for (port_mmap = process->outgoing.elts; | 456 for (port_mmap = mmaps->elts; |
469 port_mmap < end_port_mmap; 470 port_mmap++) 471 { 472 mmap_handler = port_mmap->mmap_handler; 473 hdr = mmap_handler->hdr; 474 | 457 port_mmap < end_port_mmap; 458 port_mmap++) 459 { 460 mmap_handler = port_mmap->mmap_handler; 461 hdr = mmap_handler->hdr; 462 |
475 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) { | 463 if (hdr->sent_over != 0xFFFFu) { |
476 continue; 477 } 478 479 *c = 0; 480 481 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 482 483 while (nxt_port_mmap_get_free_chunk(free_map, c)) { --- 21 unchanged lines hidden (view full) --- 505 } 506 507 hdr->oosm = 1; 508 } 509 510 /* TODO introduce port_mmap limit and release wait. */ 511 512 *c = 0; | 464 continue; 465 } 466 467 *c = 0; 468 469 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 470 471 while (nxt_port_mmap_get_free_chunk(free_map, c)) { --- 21 unchanged lines hidden (view full) --- 493 } 494 495 hdr->oosm = 1; 496 } 497 498 /* TODO introduce port_mmap limit and release wait. */ 499 500 *c = 0; |
513 mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n); | 501 mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n); |
514 515unlock_return: 516 | 502 503unlock_return: 504 |
517 nxt_thread_mutex_unlock(&process->outgoing.mutex); | 505 nxt_thread_mutex_unlock(&mmaps->mutex); |
518 519 return mmap_handler; 520} 521 522 523static nxt_port_mmap_handler_t * 524nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) 525{ --- 18 unchanged lines hidden (view full) --- 544 545 nxt_thread_mutex_unlock(&process->incoming.mutex); 546 547 return mmap_handler; 548} 549 550 551nxt_int_t | 506 507 return mmap_handler; 508} 509 510 511static nxt_port_mmap_handler_t * 512nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) 513{ --- 18 unchanged lines hidden (view full) --- 532 533 nxt_thread_mutex_unlock(&process->incoming.mutex); 534 535 return mmap_handler; 536} 537 538 539nxt_int_t |
552nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, | 540nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps, |
553 nxt_port_mmap_tracking_t *tracking, uint32_t stream) 554{ 555 nxt_chunk_id_t c; 556 nxt_port_mmap_header_t *hdr; 557 nxt_port_mmap_handler_t *mmap_handler; 558 559 nxt_debug(task, "request tracking for stream #%uD", stream); 560 | 541 nxt_port_mmap_tracking_t *tracking, uint32_t stream) 542{ 543 nxt_chunk_id_t c; 544 nxt_port_mmap_header_t *hdr; 545 nxt_port_mmap_handler_t *mmap_handler; 546 547 nxt_debug(task, "request tracking for stream #%uD", stream); 548 |
561 mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1); | 549 mmap_handler = nxt_port_mmap_get(task, mmaps, &c, 1, 1); |
562 if (nxt_slow_path(mmap_handler == NULL)) { 563 return NXT_ERROR; 564 } 565 566 nxt_port_mmap_handler_use(mmap_handler, 1); 567 568 hdr = mmap_handler->hdr; 569 --- 105 unchanged lines hidden (view full) --- 675 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 676 } 677 678 return res; 679} 680 681 682nxt_buf_t * | 550 if (nxt_slow_path(mmap_handler == NULL)) { 551 return NXT_ERROR; 552 } 553 554 nxt_port_mmap_handler_use(mmap_handler, 1); 555 556 hdr = mmap_handler->hdr; 557 --- 105 unchanged lines hidden (view full) --- 663 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 664 } 665 666 return res; 667} 668 669 670nxt_buf_t * |
683nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) | 671nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size) |
684{ 685 nxt_mp_t *mp; 686 nxt_buf_t *b; 687 nxt_int_t nchunks; 688 nxt_chunk_id_t c; 689 nxt_port_mmap_header_t *hdr; 690 nxt_port_mmap_handler_t *mmap_handler; 691 --- 10 unchanged lines hidden (view full) --- 702 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0); 703 if (nxt_slow_path(b == NULL)) { 704 return NULL; 705 } 706 707 b->completion_handler = nxt_port_mmap_buf_completion; 708 nxt_buf_set_port_mmap(b); 709 | 672{ 673 nxt_mp_t *mp; 674 nxt_buf_t *b; 675 nxt_int_t nchunks; 676 nxt_chunk_id_t c; 677 nxt_port_mmap_header_t *hdr; 678 nxt_port_mmap_handler_t *mmap_handler; 679 --- 10 unchanged lines hidden (view full) --- 690 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0); 691 if (nxt_slow_path(b == NULL)) { 692 return NULL; 693 } 694 695 b->completion_handler = nxt_port_mmap_buf_completion; 696 nxt_buf_set_port_mmap(b); 697 |
710 mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0); | 698 mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0); |
711 if (nxt_slow_path(mmap_handler == NULL)) { 712 mp = task->thread->engine->mem_pool; 713 nxt_mp_free(mp, b); 714 nxt_mp_release(mp); 715 return NULL; 716 } 717 718 b->parent = mmap_handler; --- 219 unchanged lines hidden (view full) --- 938 } 939 } 940} 941 942 943nxt_port_method_t 944nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) 945{ | 699 if (nxt_slow_path(mmap_handler == NULL)) { 700 mp = task->thread->engine->mem_pool; 701 nxt_mp_free(mp, b); 702 nxt_mp_release(mp); 703 return NULL; 704 } 705 706 b->parent = mmap_handler; --- 219 unchanged lines hidden (view full) --- 926 } 927 } 928} 929 930 931nxt_port_method_t 932nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) 933{ |
946 nxt_port_method_t m; 947 nxt_port_mmap_header_t *hdr; 948 nxt_port_mmap_handler_t *mmap_handler; | 934 nxt_port_method_t m; |
949 950 m = NXT_PORT_METHOD_ANY; 951 952 for (/* void */; b != NULL; b = b->next) { 953 if (nxt_buf_used_size(b) == 0) { 954 /* empty buffers does not affect method */ 955 continue; 956 } 957 958 if (nxt_buf_is_port_mmap(b)) { | 935 936 m = NXT_PORT_METHOD_ANY; 937 938 for (/* void */; b != NULL; b = b->next) { 939 if (nxt_buf_used_size(b) == 0) { 940 /* empty buffers does not affect method */ 941 continue; 942 } 943 944 if (nxt_buf_is_port_mmap(b)) { |
959 mmap_handler = b->parent; 960 hdr = mmap_handler->hdr; 961 | |
962 if (m == NXT_PORT_METHOD_PLAIN) { 963 nxt_log_error(NXT_LOG_ERR, task->log, 964 "mixing plain and mmap buffers, " 965 "using plain mode"); 966 967 break; 968 } 969 | 945 if (m == NXT_PORT_METHOD_PLAIN) { 946 nxt_log_error(NXT_LOG_ERR, task->log, 947 "mixing plain and mmap buffers, " 948 "using plain mode"); 949 950 break; 951 } 952 |
970 if (port->pid != hdr->dst_pid) { 971 nxt_log_error(NXT_LOG_ERR, task->log, 972 "send mmap buffer for %PI to %PI, " 973 "using plain mode", hdr->dst_pid, port->pid); 974 975 m = NXT_PORT_METHOD_PLAIN; 976 977 break; 978 } 979 | |
980 if (m == NXT_PORT_METHOD_ANY) { 981 nxt_debug(task, "using mmap mode"); 982 983 m = NXT_PORT_METHOD_MMAP; 984 } 985 } else { 986 if (m == NXT_PORT_METHOD_MMAP) { 987 nxt_log_error(NXT_LOG_ERR, task->log, --- 18 unchanged lines hidden --- | 953 if (m == NXT_PORT_METHOD_ANY) { 954 nxt_debug(task, "using mmap mode"); 955 956 m = NXT_PORT_METHOD_MMAP; 957 } 958 } else { 959 if (m == NXT_PORT_METHOD_MMAP) { 960 nxt_log_error(NXT_LOG_ERR, task->log, --- 18 unchanged lines hidden --- |