nxt_port_memory.c (1526:5c2a0b6f92e7) nxt_port_memory.c (1546:06017e6e3a5f)
1
2/*
3 * Copyright (C) Max Romanov
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 268 unchanged lines hidden (view full) ---

277
278 nxt_thread_mutex_unlock(&process->incoming.mutex);
279
280 return mmap_handler;
281}
282
283
284static nxt_port_mmap_handler_t *
1
2/*
3 * Copyright (C) Max Romanov
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 268 unchanged lines hidden (view full) ---

277
278 nxt_thread_mutex_unlock(&process->incoming.mutex);
279
280 return mmap_handler;
281}
282
283
284static nxt_port_mmap_handler_t *
285nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
286 nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n)
285nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
286 nxt_bool_t tracking, nxt_int_t n)
287{
288 void *mem;
289 nxt_fd_t fd;
290 nxt_int_t i;
291 nxt_free_map_t *free_map;
292 nxt_port_mmap_t *port_mmap;
293 nxt_port_mmap_header_t *hdr;
294 nxt_port_mmap_handler_t *mmap_handler;
295
296 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
297 if (nxt_slow_path(mmap_handler == NULL)) {
287{
288 void *mem;
289 nxt_fd_t fd;
290 nxt_int_t i;
291 nxt_free_map_t *free_map;
292 nxt_port_mmap_t *port_mmap;
293 nxt_port_mmap_header_t *hdr;
294 nxt_port_mmap_handler_t *mmap_handler;
295
296 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
297 if (nxt_slow_path(mmap_handler == NULL)) {
298 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
298 nxt_alert(task, "failed to allocate mmap_handler");
299
300 return NULL;
301 }
302
299
300 return NULL;
301 }
302
303 port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
303 port_mmap = nxt_port_mmap_at(mmaps, mmaps->size);
304 if (nxt_slow_path(port_mmap == NULL)) {
304 if (nxt_slow_path(port_mmap == NULL)) {
305 nxt_log(task, NXT_LOG_WARN,
306 "failed to add port mmap to outgoing array");
305 nxt_alert(task, "failed to add port mmap to mmaps array");
307
308 nxt_free(mmap_handler);
309 return NULL;
310 }
311
312 fd = nxt_shm_open(task, PORT_MMAP_SIZE);
313 if (nxt_slow_path(fd == -1)) {
314 goto remove_fail;
315 }
316
317 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
318 MAP_SHARED, fd, 0);
319
320 if (nxt_slow_path(mem == MAP_FAILED)) {
321 goto remove_fail;
322 }
323
324 mmap_handler->hdr = mem;
306
307 nxt_free(mmap_handler);
308 return NULL;
309 }
310
311 fd = nxt_shm_open(task, PORT_MMAP_SIZE);
312 if (nxt_slow_path(fd == -1)) {
313 goto remove_fail;
314 }
315
316 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE,
317 MAP_SHARED, fd, 0);
318
319 if (nxt_slow_path(mem == MAP_FAILED)) {
320 goto remove_fail;
321 }
322
323 mmap_handler->hdr = mem;
324 mmap_handler->fd = fd;
325 port_mmap->mmap_handler = mmap_handler;
326 nxt_port_mmap_handler_use(mmap_handler, 1);
327
328 /* Init segment header. */
329 hdr = mmap_handler->hdr;
330
331 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
332 nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
333
325 port_mmap->mmap_handler = mmap_handler;
326 nxt_port_mmap_handler_use(mmap_handler, 1);
327
328 /* Init segment header. */
329 hdr = mmap_handler->hdr;
330
331 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
332 nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
333
334 hdr->id = process->outgoing.size - 1;
334 hdr->id = mmaps->size - 1;
335 hdr->src_pid = nxt_pid;
335 hdr->src_pid = nxt_pid;
336 hdr->dst_pid = process->pid;
337 hdr->sent_over = port->id;
336 hdr->sent_over = 0xFFFFu;
338
339 /* Mark first chunk as busy */
340 free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
341
342 for (i = 0; i < n; i++) {
343 nxt_port_mmap_set_chunk_busy(free_map, i);
344 }
345
346 /* Mark as busy chunk followed the last available chunk. */
347 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
348 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
349
337
338 /* Mark first chunk as busy */
339 free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
340
341 for (i = 0; i < n; i++) {
342 nxt_port_mmap_set_chunk_busy(free_map, i);
343 }
344
345 /* Mark as busy chunk followed the last available chunk. */
346 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
347 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
348
350 nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
349 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...",
350 hdr->id, nxt_pid);
351
351
352 /* TODO handle error */
353 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
354
355 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
356 hdr->id, nxt_pid, process->pid);
357
358 return mmap_handler;
359
360remove_fail:
361
362 nxt_free(mmap_handler);
363
352 return mmap_handler;
353
354remove_fail:
355
356 nxt_free(mmap_handler);
357
364 process->outgoing.size--;
358 mmaps->size--;
365
366 return NULL;
367}
368
369
370nxt_int_t
371nxt_shm_open(nxt_task_t *task, size_t size)
372{

--- 67 unchanged lines hidden (view full) ---

440 return -1;
441 }
442
443 return fd;
444}
445
446
447static nxt_port_mmap_handler_t *
359
360 return NULL;
361}
362
363
364nxt_int_t
365nxt_shm_open(nxt_task_t *task, size_t size)
366{

--- 67 unchanged lines hidden (view full) ---

434 return -1;
435 }
436
437 return fd;
438}
439
440
441static nxt_port_mmap_handler_t *
448nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
442nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c,
449 nxt_int_t n, nxt_bool_t tracking)
450{
451 nxt_int_t i, res, nchunks;
443 nxt_int_t n, nxt_bool_t tracking)
444{
445 nxt_int_t i, res, nchunks;
452 nxt_process_t *process;
453 nxt_free_map_t *free_map;
454 nxt_port_mmap_t *port_mmap;
455 nxt_port_mmap_t *end_port_mmap;
456 nxt_port_mmap_header_t *hdr;
457 nxt_port_mmap_handler_t *mmap_handler;
458
446 nxt_free_map_t *free_map;
447 nxt_port_mmap_t *port_mmap;
448 nxt_port_mmap_t *end_port_mmap;
449 nxt_port_mmap_header_t *hdr;
450 nxt_port_mmap_handler_t *mmap_handler;
451
459 process = port->process;
460 if (nxt_slow_path(process == NULL)) {
461 return NULL;
462 }
452 nxt_thread_mutex_lock(&mmaps->mutex);
463
453
464 nxt_thread_mutex_lock(&process->outgoing.mutex);
454 end_port_mmap = mmaps->elts + mmaps->size;
465
455
466 end_port_mmap = process->outgoing.elts + process->outgoing.size;
467
468 for (port_mmap = process->outgoing.elts;
456 for (port_mmap = mmaps->elts;
469 port_mmap < end_port_mmap;
470 port_mmap++)
471 {
472 mmap_handler = port_mmap->mmap_handler;
473 hdr = mmap_handler->hdr;
474
457 port_mmap < end_port_mmap;
458 port_mmap++)
459 {
460 mmap_handler = port_mmap->mmap_handler;
461 hdr = mmap_handler->hdr;
462
475 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) {
463 if (hdr->sent_over != 0xFFFFu) {
476 continue;
477 }
478
479 *c = 0;
480
481 free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
482
483 while (nxt_port_mmap_get_free_chunk(free_map, c)) {

--- 21 unchanged lines hidden (view full) ---

505 }
506
507 hdr->oosm = 1;
508 }
509
510 /* TODO introduce port_mmap limit and release wait. */
511
512 *c = 0;
464 continue;
465 }
466
467 *c = 0;
468
469 free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
470
471 while (nxt_port_mmap_get_free_chunk(free_map, c)) {

--- 21 unchanged lines hidden (view full) ---

493 }
494
495 hdr->oosm = 1;
496 }
497
498 /* TODO introduce port_mmap limit and release wait. */
499
500 *c = 0;
513 mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n);
501 mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n);
514
515unlock_return:
516
502
503unlock_return:
504
517 nxt_thread_mutex_unlock(&process->outgoing.mutex);
505 nxt_thread_mutex_unlock(&mmaps->mutex);
518
519 return mmap_handler;
520}
521
522
523static nxt_port_mmap_handler_t *
524nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
525{

--- 18 unchanged lines hidden (view full) ---

544
545 nxt_thread_mutex_unlock(&process->incoming.mutex);
546
547 return mmap_handler;
548}
549
550
551nxt_int_t
506
507 return mmap_handler;
508}
509
510
511static nxt_port_mmap_handler_t *
512nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
513{

--- 18 unchanged lines hidden (view full) ---

532
533 nxt_thread_mutex_unlock(&process->incoming.mutex);
534
535 return mmap_handler;
536}
537
538
539nxt_int_t
552nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
540nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
553 nxt_port_mmap_tracking_t *tracking, uint32_t stream)
554{
555 nxt_chunk_id_t c;
556 nxt_port_mmap_header_t *hdr;
557 nxt_port_mmap_handler_t *mmap_handler;
558
559 nxt_debug(task, "request tracking for stream #%uD", stream);
560
541 nxt_port_mmap_tracking_t *tracking, uint32_t stream)
542{
543 nxt_chunk_id_t c;
544 nxt_port_mmap_header_t *hdr;
545 nxt_port_mmap_handler_t *mmap_handler;
546
547 nxt_debug(task, "request tracking for stream #%uD", stream);
548
561 mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1);
549 mmap_handler = nxt_port_mmap_get(task, mmaps, &c, 1, 1);
562 if (nxt_slow_path(mmap_handler == NULL)) {
563 return NXT_ERROR;
564 }
565
566 nxt_port_mmap_handler_use(mmap_handler, 1);
567
568 hdr = mmap_handler->hdr;
569

--- 105 unchanged lines hidden (view full) ---

675 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
676 }
677
678 return res;
679}
680
681
682nxt_buf_t *
550 if (nxt_slow_path(mmap_handler == NULL)) {
551 return NXT_ERROR;
552 }
553
554 nxt_port_mmap_handler_use(mmap_handler, 1);
555
556 hdr = mmap_handler->hdr;
557

--- 105 unchanged lines hidden (view full) ---

663 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
664 }
665
666 return res;
667}
668
669
670nxt_buf_t *
683nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
671nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size)
684{
685 nxt_mp_t *mp;
686 nxt_buf_t *b;
687 nxt_int_t nchunks;
688 nxt_chunk_id_t c;
689 nxt_port_mmap_header_t *hdr;
690 nxt_port_mmap_handler_t *mmap_handler;
691

--- 10 unchanged lines hidden (view full) ---

702 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
703 if (nxt_slow_path(b == NULL)) {
704 return NULL;
705 }
706
707 b->completion_handler = nxt_port_mmap_buf_completion;
708 nxt_buf_set_port_mmap(b);
709
672{
673 nxt_mp_t *mp;
674 nxt_buf_t *b;
675 nxt_int_t nchunks;
676 nxt_chunk_id_t c;
677 nxt_port_mmap_header_t *hdr;
678 nxt_port_mmap_handler_t *mmap_handler;
679

--- 10 unchanged lines hidden (view full) ---

690 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
691 if (nxt_slow_path(b == NULL)) {
692 return NULL;
693 }
694
695 b->completion_handler = nxt_port_mmap_buf_completion;
696 nxt_buf_set_port_mmap(b);
697
710 mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0);
698 mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0);
711 if (nxt_slow_path(mmap_handler == NULL)) {
712 mp = task->thread->engine->mem_pool;
713 nxt_mp_free(mp, b);
714 nxt_mp_release(mp);
715 return NULL;
716 }
717
718 b->parent = mmap_handler;

--- 219 unchanged lines hidden (view full) ---

938 }
939 }
940}
941
942
943nxt_port_method_t
944nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
945{
699 if (nxt_slow_path(mmap_handler == NULL)) {
700 mp = task->thread->engine->mem_pool;
701 nxt_mp_free(mp, b);
702 nxt_mp_release(mp);
703 return NULL;
704 }
705
706 b->parent = mmap_handler;

--- 219 unchanged lines hidden (view full) ---

926 }
927 }
928}
929
930
931nxt_port_method_t
932nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
933{
946 nxt_port_method_t m;
947 nxt_port_mmap_header_t *hdr;
948 nxt_port_mmap_handler_t *mmap_handler;
934 nxt_port_method_t m;
949
950 m = NXT_PORT_METHOD_ANY;
951
952 for (/* void */; b != NULL; b = b->next) {
953 if (nxt_buf_used_size(b) == 0) {
954 /* empty buffers does not affect method */
955 continue;
956 }
957
958 if (nxt_buf_is_port_mmap(b)) {
935
936 m = NXT_PORT_METHOD_ANY;
937
938 for (/* void */; b != NULL; b = b->next) {
939 if (nxt_buf_used_size(b) == 0) {
940 /* empty buffers does not affect method */
941 continue;
942 }
943
944 if (nxt_buf_is_port_mmap(b)) {
959 mmap_handler = b->parent;
960 hdr = mmap_handler->hdr;
961
962 if (m == NXT_PORT_METHOD_PLAIN) {
963 nxt_log_error(NXT_LOG_ERR, task->log,
964 "mixing plain and mmap buffers, "
965 "using plain mode");
966
967 break;
968 }
969
945 if (m == NXT_PORT_METHOD_PLAIN) {
946 nxt_log_error(NXT_LOG_ERR, task->log,
947 "mixing plain and mmap buffers, "
948 "using plain mode");
949
950 break;
951 }
952
970 if (port->pid != hdr->dst_pid) {
971 nxt_log_error(NXT_LOG_ERR, task->log,
972 "send mmap buffer for %PI to %PI, "
973 "using plain mode", hdr->dst_pid, port->pid);
974
975 m = NXT_PORT_METHOD_PLAIN;
976
977 break;
978 }
979
980 if (m == NXT_PORT_METHOD_ANY) {
981 nxt_debug(task, "using mmap mode");
982
983 m = NXT_PORT_METHOD_MMAP;
984 }
985 } else {
986 if (m == NXT_PORT_METHOD_MMAP) {
987 nxt_log_error(NXT_LOG_ERR, task->log,

--- 18 unchanged lines hidden ---
953 if (m == NXT_PORT_METHOD_ANY) {
954 nxt_debug(task, "using mmap mode");
955
956 m = NXT_PORT_METHOD_MMAP;
957 }
958 } else {
959 if (m == NXT_PORT_METHOD_MMAP) {
960 nxt_log_error(NXT_LOG_ERR, task->log,

--- 18 unchanged lines hidden ---