1 2/* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9#if (NXT_HAVE_MEMFD_CREATE) 10 11#include <linux/memfd.h> 12#include <unistd.h> 13#include <sys/syscall.h> 14 15#endif 16 17#include <nxt_port_memory_int.h> 18 19void 20nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) 21{ 22 if (port_mmap->hdr != NULL) { 23 nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE); 24 port_mmap->hdr = NULL; 25 } 26} 27 28 29static nxt_array_t * 30nxt_port_mmaps_create() 31{ 32 nxt_mp_t *mp; 33 34 mp = nxt_mp_create(1024, 128, 256, 32); 35 36 if (nxt_slow_path(mp == NULL)) { 37 return NULL; 38 } 39 40 return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t)); 41} 42 43 44static nxt_port_mmap_t * 45nxt_port_mmap_add(nxt_array_t *port_mmaps) 46{ 47 nxt_mp_thread_adopt(port_mmaps->mem_pool); 48 49 return nxt_array_zero_add(port_mmaps); 50} 51 52 53void 54nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool) 55{ 56 uint32_t i; 57 nxt_port_mmap_t *port_mmap; 58 59 if (port_mmaps == NULL) { 60 return; 61 } 62 63 nxt_mp_thread_adopt(port_mmaps->mem_pool); 64 65 port_mmap = port_mmaps->elts; 66 67 for (i = 0; i < port_mmaps->nelts; i++) { 68 nxt_port_mmap_destroy(port_mmap); 69 } 70 71 port_mmaps->nelts = 0; 72 73 if (destroy_pool != 0) { 74 nxt_mp_destroy(port_mmaps->mem_pool); 75 } 76} 77 78 79#define nxt_port_mmap_free_junk(p, size) \ 80 memset((p), 0xA5, size) 81 82 83static void 84nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) 85{ 86 u_char *p; 87 nxt_mp_t *mp; 88 nxt_buf_t *b; 89 nxt_chunk_id_t c; 90 nxt_port_mmap_header_t *hdr; 91 92 if (nxt_buf_ts_handle(task, obj, data)) { 93 return; 94 } 95 96 b = obj; 97
| 1 2/* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9#if (NXT_HAVE_MEMFD_CREATE) 10 11#include <linux/memfd.h> 12#include <unistd.h> 13#include <sys/syscall.h> 14 15#endif 16 17#include <nxt_port_memory_int.h> 18 19void 20nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) 21{ 22 if (port_mmap->hdr != NULL) { 23 nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE); 24 port_mmap->hdr = NULL; 25 } 26} 27 28 29static nxt_array_t * 30nxt_port_mmaps_create() 31{ 32 nxt_mp_t *mp; 33 34 mp = nxt_mp_create(1024, 128, 256, 32); 35 36 if (nxt_slow_path(mp == NULL)) { 37 return NULL; 38 } 39 40 return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t)); 41} 42 43 44static nxt_port_mmap_t * 45nxt_port_mmap_add(nxt_array_t *port_mmaps) 46{ 47 nxt_mp_thread_adopt(port_mmaps->mem_pool); 48 49 return nxt_array_zero_add(port_mmaps); 50} 51 52 53void 54nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool) 55{ 56 uint32_t i; 57 nxt_port_mmap_t *port_mmap; 58 59 if (port_mmaps == NULL) { 60 return; 61 } 62 63 nxt_mp_thread_adopt(port_mmaps->mem_pool); 64 65 port_mmap = port_mmaps->elts; 66 67 for (i = 0; i < port_mmaps->nelts; i++) { 68 nxt_port_mmap_destroy(port_mmap); 69 } 70 71 port_mmaps->nelts = 0; 72 73 if (destroy_pool != 0) { 74 nxt_mp_destroy(port_mmaps->mem_pool); 75 } 76} 77 78 79#define nxt_port_mmap_free_junk(p, size) \ 80 memset((p), 0xA5, size) 81 82 83static void 84nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) 85{ 86 u_char *p; 87 nxt_mp_t *mp; 88 nxt_buf_t *b; 89 nxt_chunk_id_t c; 90 nxt_port_mmap_header_t *hdr; 91 92 if (nxt_buf_ts_handle(task, obj, data)) { 93 return; 94 } 95 96 b = obj; 97
|
98 nxt_debug(task, "mmap buf completion: %p %p", b, b->mem.start); 99
| |
100 mp = b->data; 101 102#if (NXT_DEBUG) 103 if (nxt_slow_path(data != b->parent)) { 104 nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)", 105 data, b->parent); 106 nxt_abort(); 107 } 108#endif 109 110 hdr = data; 111 112 if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) { 113 /* 114 * Chunks until b->mem.pos has been sent to other side, 115 * let's release rest (if any). 116 */ 117 p = b->mem.pos - 1; 118 c = nxt_port_mmap_chunk_id(hdr, p) + 1; 119 p = nxt_port_mmap_chunk_start(hdr, c); 120 121 } else { 122 p = b->mem.start; 123 c = nxt_port_mmap_chunk_id(hdr, p); 124 } 125 126 nxt_port_mmap_free_junk(p, b->mem.end - p); 127
| 98 mp = b->data; 99 100#if (NXT_DEBUG) 101 if (nxt_slow_path(data != b->parent)) { 102 nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)", 103 data, b->parent); 104 nxt_abort(); 105 } 106#endif 107 108 hdr = data; 109 110 if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) { 111 /* 112 * Chunks until b->mem.pos has been sent to other side, 113 * let's release rest (if any). 114 */ 115 p = b->mem.pos - 1; 116 c = nxt_port_mmap_chunk_id(hdr, p) + 1; 117 p = nxt_port_mmap_chunk_start(hdr, c); 118 119 } else { 120 p = b->mem.start; 121 c = nxt_port_mmap_chunk_id(hdr, p); 122 } 123 124 nxt_port_mmap_free_junk(p, b->mem.end - p); 125
|
| 126 nxt_debug(task, "mmap buf completion: %p [%p,%d] (sent=%d), %PI,%d,%d", b, 127 b->mem.start, b->mem.end - b->mem.start, b->is_port_mmap_sent, 128 hdr->pid, hdr->id, c); 129
|
128 while (p < b->mem.end) { 129 nxt_port_mmap_set_chunk_free(hdr, c); 130 131 p += PORT_MMAP_CHUNK_SIZE; 132 c++; 133 } 134 135 nxt_mp_release(mp, b); 136} 137 138 139nxt_port_mmap_header_t * 140nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, 141 nxt_fd_t fd) 142{ 143 void *mem; 144 struct stat mmap_stat; 145 nxt_port_mmap_t *port_mmap; 146 nxt_port_mmap_header_t *hdr; 147 148 nxt_debug(task, "got new mmap fd #%FD from process %PI", 149 fd, process->pid); 150 151 port_mmap = NULL; 152 hdr = NULL; 153 154 if (fstat(fd, &mmap_stat) == -1) { 155 nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno); 156 157 return NULL; 158 } 159 160 nxt_thread_mutex_lock(&process->incoming_mutex); 161 162 if (process->incoming == NULL) { 163 process->incoming = nxt_port_mmaps_create(); 164 } 165 166 if (nxt_slow_path(process->incoming == NULL)) { 167 nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array"); 168 169 goto fail; 170 } 171 172 port_mmap = nxt_port_mmap_add(process->incoming); 173 if (nxt_slow_path(port_mmap == NULL)) { 174 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); 175 176 goto fail; 177 } 178 179 mem = nxt_mem_mmap(NULL, mmap_stat.st_size, 180 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 181 182 if (nxt_slow_path(mem == MAP_FAILED)) { 183 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); 184 185 port_mmap = NULL; 186 187 goto fail; 188 } 189 190 port_mmap->hdr = mem; 191 hdr = port_mmap->hdr; 192 193 if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) { 194 nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)", 195 port_mmap->hdr->id, process->incoming->nelts - 1); 196 nxt_abort(); 197 } 198 199fail: 200 201 nxt_thread_mutex_unlock(&process->incoming_mutex); 202 203 return hdr; 204} 205 206 207static nxt_port_mmap_header_t * 208nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, 209 nxt_port_t *port) 210{ 211 void *mem; 212 u_char *p, name[64]; 213 nxt_fd_t fd; 214 nxt_port_mmap_t *port_mmap; 215 nxt_port_mmap_header_t *hdr; 216 217 port_mmap = NULL; 218 219 if (process->outgoing == NULL) { 220 process->outgoing = nxt_port_mmaps_create(); 221 } 222 223 if (nxt_slow_path(process->outgoing == NULL)) { 224 nxt_log(task, NXT_LOG_WARN, "failed to allocate outgoing array"); 225 226 return NULL; 227 } 228 229 port_mmap = nxt_port_mmap_add(process->outgoing); 230 if (nxt_slow_path(port_mmap == NULL)) { 231 nxt_log(task, NXT_LOG_WARN, 232 "failed to add port mmap to outgoing array"); 233 234 return NULL; 235 } 236 237 p = nxt_sprintf(name, name + sizeof(name), "/nginext.%PI.%uxD", 238 nxt_pid, nxt_random(&task->thread->random)); 239 *p = '\0'; 240 241#if (NXT_HAVE_MEMFD_CREATE) 242 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 243 244 if (nxt_slow_path(fd == -1)) { 245 nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E", 246 name, nxt_errno); 247 248 goto remove_fail; 249 } 250 251 nxt_debug(task, "memfd_create(%s): %FD", name, fd); 252 253#elif (NXT_HAVE_SHM_OPEN) 254 shm_unlink((char *) name); // just in case 255 256 fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 257 258 nxt_debug(task, "shm_open(%s): %FD", name, fd); 259 260 if (nxt_slow_path(fd == -1)) { 261 nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno); 262 263 goto remove_fail; 264 } 265 266 if (nxt_slow_path(shm_unlink((char *) name) == -1)) { 267 nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, 268 nxt_errno); 269 } 270#endif 271 272 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { 273 nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno); 274 275 goto remove_fail; 276 } 277 278 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, 279 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 280 281 if (nxt_slow_path(mem == MAP_FAILED)) { 282 goto remove_fail; 283 } 284 285 port_mmap->hdr = mem; 286 287 /* Init segment header. */ 288 hdr = port_mmap->hdr; 289 290 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 291 292 hdr->id = process->outgoing->nelts - 1; 293 hdr->pid = process->pid; 294 295 /* Mark first chunk as busy */ 296 nxt_port_mmap_set_chunk_busy(hdr, 0); 297 298 /* Mark as busy chunk followed the last available chunk. */ 299 nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT); 300 301 nxt_debug(task, "send mmap fd %FD to process %PI", fd, 302 port->pid); 303 304 /* TODO handle error */ 305 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 306 307 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", 308 hdr->id, nxt_pid, process->pid); 309 310 return hdr; 311 312remove_fail: 313 314 nxt_array_remove(process->outgoing, port_mmap); 315 316 return NULL; 317} 318 319 320static nxt_port_mmap_header_t * 321nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, 322 size_t size) 323{ 324 nxt_array_t *outgoing; 325 nxt_process_t *process; 326 nxt_port_mmap_t *port_mmap; 327 nxt_port_mmap_t *end_port_mmap; 328 nxt_port_mmap_header_t *hdr; 329 330 process = port->process; 331 if (nxt_slow_path(process == NULL)) { 332 return NULL; 333 } 334 335 *c = 0; 336 port_mmap = NULL; 337 hdr = NULL; 338 339 nxt_thread_mutex_lock(&process->outgoing_mutex); 340 341 if (process->outgoing == NULL) { 342 hdr = nxt_port_new_port_mmap(task, process, port); 343 344 goto unlock_return; 345 } 346 347 outgoing = process->outgoing; 348 port_mmap = outgoing->elts; 349 end_port_mmap = port_mmap + outgoing->nelts; 350 351 while (port_mmap < end_port_mmap) { 352 353 if (nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) { 354 hdr = port_mmap->hdr; 355 356 goto unlock_return; 357 } 358 359 port_mmap++; 360 } 361 362 /* TODO introduce port_mmap limit and release wait. */ 363 364 hdr = nxt_port_new_port_mmap(task, process, port); 365 366unlock_return: 367 368 nxt_thread_mutex_unlock(&process->outgoing_mutex); 369 370 return hdr; 371} 372 373 374static nxt_port_mmap_header_t * 375nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) 376{ 377 nxt_array_t *incoming; 378 nxt_process_t *process; 379 nxt_port_mmap_t *port_mmap; 380 nxt_port_mmap_header_t *hdr; 381 382 process = nxt_runtime_process_find(task->thread->runtime, spid); 383 if (nxt_slow_path(process == NULL)) { 384 return NULL; 385 } 386 387 hdr = NULL; 388 389 nxt_thread_mutex_lock(&process->incoming_mutex); 390 391 incoming = process->incoming; 392 393 if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) { 394 port_mmap = incoming->elts; 395 hdr = port_mmap[id].hdr; 396 } else { 397 nxt_log(task, NXT_LOG_WARN, 398 "failed to get incoming mmap #%d for process %PI", id, spid); 399 } 400 401 nxt_thread_mutex_unlock(&process->incoming_mutex); 402 403 return hdr; 404} 405 406 407nxt_buf_t * 408nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) 409{ 410 size_t nchunks; 411 nxt_buf_t *b; 412 nxt_chunk_id_t c; 413 nxt_port_mmap_header_t *hdr; 414 415 nxt_debug(task, "request %z bytes shm buffer", size); 416
| 130 while (p < b->mem.end) { 131 nxt_port_mmap_set_chunk_free(hdr, c); 132 133 p += PORT_MMAP_CHUNK_SIZE; 134 c++; 135 } 136 137 nxt_mp_release(mp, b); 138} 139 140 141nxt_port_mmap_header_t * 142nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, 143 nxt_fd_t fd) 144{ 145 void *mem; 146 struct stat mmap_stat; 147 nxt_port_mmap_t *port_mmap; 148 nxt_port_mmap_header_t *hdr; 149 150 nxt_debug(task, "got new mmap fd #%FD from process %PI", 151 fd, process->pid); 152 153 port_mmap = NULL; 154 hdr = NULL; 155 156 if (fstat(fd, &mmap_stat) == -1) { 157 nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno); 158 159 return NULL; 160 } 161 162 nxt_thread_mutex_lock(&process->incoming_mutex); 163 164 if (process->incoming == NULL) { 165 process->incoming = nxt_port_mmaps_create(); 166 } 167 168 if (nxt_slow_path(process->incoming == NULL)) { 169 nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array"); 170 171 goto fail; 172 } 173 174 port_mmap = nxt_port_mmap_add(process->incoming); 175 if (nxt_slow_path(port_mmap == NULL)) { 176 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); 177 178 goto fail; 179 } 180 181 mem = nxt_mem_mmap(NULL, mmap_stat.st_size, 182 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 183 184 if (nxt_slow_path(mem == MAP_FAILED)) { 185 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); 186 187 port_mmap = NULL; 188 189 goto fail; 190 } 191 192 port_mmap->hdr = mem; 193 hdr = port_mmap->hdr; 194 195 if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) { 196 nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)", 197 port_mmap->hdr->id, process->incoming->nelts - 1); 198 nxt_abort(); 199 } 200 201fail: 202 203 nxt_thread_mutex_unlock(&process->incoming_mutex); 204 205 return hdr; 206} 207 208 209static nxt_port_mmap_header_t * 210nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, 211 nxt_port_t *port) 212{ 213 void *mem; 214 u_char *p, name[64]; 215 nxt_fd_t fd; 216 nxt_port_mmap_t *port_mmap; 217 nxt_port_mmap_header_t *hdr; 218 219 port_mmap = NULL; 220 221 if (process->outgoing == NULL) { 222 process->outgoing = nxt_port_mmaps_create(); 223 } 224 225 if (nxt_slow_path(process->outgoing == NULL)) { 226 nxt_log(task, NXT_LOG_WARN, "failed to allocate outgoing array"); 227 228 return NULL; 229 } 230 231 port_mmap = nxt_port_mmap_add(process->outgoing); 232 if (nxt_slow_path(port_mmap == NULL)) { 233 nxt_log(task, NXT_LOG_WARN, 234 "failed to add port mmap to outgoing array"); 235 236 return NULL; 237 } 238 239 p = nxt_sprintf(name, name + sizeof(name), "/nginext.%PI.%uxD", 240 nxt_pid, nxt_random(&task->thread->random)); 241 *p = '\0'; 242 243#if (NXT_HAVE_MEMFD_CREATE) 244 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 245 246 if (nxt_slow_path(fd == -1)) { 247 nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E", 248 name, nxt_errno); 249 250 goto remove_fail; 251 } 252 253 nxt_debug(task, "memfd_create(%s): %FD", name, fd); 254 255#elif (NXT_HAVE_SHM_OPEN) 256 shm_unlink((char *) name); // just in case 257 258 fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 259 260 nxt_debug(task, "shm_open(%s): %FD", name, fd); 261 262 if (nxt_slow_path(fd == -1)) { 263 nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno); 264 265 goto remove_fail; 266 } 267 268 if (nxt_slow_path(shm_unlink((char *) name) == -1)) { 269 nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, 270 nxt_errno); 271 } 272#endif 273 274 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { 275 nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno); 276 277 goto remove_fail; 278 } 279 280 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, 281 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 282 283 if (nxt_slow_path(mem == MAP_FAILED)) { 284 goto remove_fail; 285 } 286 287 port_mmap->hdr = mem; 288 289 /* Init segment header. */ 290 hdr = port_mmap->hdr; 291 292 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 293 294 hdr->id = process->outgoing->nelts - 1; 295 hdr->pid = process->pid; 296 297 /* Mark first chunk as busy */ 298 nxt_port_mmap_set_chunk_busy(hdr, 0); 299 300 /* Mark as busy chunk followed the last available chunk. */ 301 nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT); 302 303 nxt_debug(task, "send mmap fd %FD to process %PI", fd, 304 port->pid); 305 306 /* TODO handle error */ 307 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 308 309 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", 310 hdr->id, nxt_pid, process->pid); 311 312 return hdr; 313 314remove_fail: 315 316 nxt_array_remove(process->outgoing, port_mmap); 317 318 return NULL; 319} 320 321 322static nxt_port_mmap_header_t * 323nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, 324 size_t size) 325{ 326 nxt_array_t *outgoing; 327 nxt_process_t *process; 328 nxt_port_mmap_t *port_mmap; 329 nxt_port_mmap_t *end_port_mmap; 330 nxt_port_mmap_header_t *hdr; 331 332 process = port->process; 333 if (nxt_slow_path(process == NULL)) { 334 return NULL; 335 } 336 337 *c = 0; 338 port_mmap = NULL; 339 hdr = NULL; 340 341 nxt_thread_mutex_lock(&process->outgoing_mutex); 342 343 if (process->outgoing == NULL) { 344 hdr = nxt_port_new_port_mmap(task, process, port); 345 346 goto unlock_return; 347 } 348 349 outgoing = process->outgoing; 350 port_mmap = outgoing->elts; 351 end_port_mmap = port_mmap + outgoing->nelts; 352 353 while (port_mmap < end_port_mmap) { 354 355 if (nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) { 356 hdr = port_mmap->hdr; 357 358 goto unlock_return; 359 } 360 361 port_mmap++; 362 } 363 364 /* TODO introduce port_mmap limit and release wait. */ 365 366 hdr = nxt_port_new_port_mmap(task, process, port); 367 368unlock_return: 369 370 nxt_thread_mutex_unlock(&process->outgoing_mutex); 371 372 return hdr; 373} 374 375 376static nxt_port_mmap_header_t * 377nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) 378{ 379 nxt_array_t *incoming; 380 nxt_process_t *process; 381 nxt_port_mmap_t *port_mmap; 382 nxt_port_mmap_header_t *hdr; 383 384 process = nxt_runtime_process_find(task->thread->runtime, spid); 385 if (nxt_slow_path(process == NULL)) { 386 return NULL; 387 } 388 389 hdr = NULL; 390 391 nxt_thread_mutex_lock(&process->incoming_mutex); 392 393 incoming = process->incoming; 394 395 if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) { 396 port_mmap = incoming->elts; 397 hdr = port_mmap[id].hdr; 398 } else { 399 nxt_log(task, NXT_LOG_WARN, 400 "failed to get incoming mmap #%d for process %PI", id, spid); 401 } 402 403 nxt_thread_mutex_unlock(&process->incoming_mutex); 404 405 return hdr; 406} 407 408 409nxt_buf_t * 410nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) 411{ 412 size_t nchunks; 413 nxt_buf_t *b; 414 nxt_chunk_id_t c; 415 nxt_port_mmap_header_t *hdr; 416 417 nxt_debug(task, "request %z bytes shm buffer", size); 418
|
417 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 418 nxt_debug(task, "requested size (%z bytes) too big", size); 419 return NULL; 420 } 421
| |
422 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); 423 if (nxt_slow_path(b == NULL)) { 424 return NULL; 425 } 426 427 b->completion_handler = nxt_port_mmap_buf_completion; 428 nxt_buf_set_port_mmap(b); 429 430 hdr = nxt_port_mmap_get(task, port, &c, size); 431 if (nxt_slow_path(hdr == NULL)) { 432 nxt_mp_release(port->mem_pool, b); 433 return NULL; 434 } 435 436 b->parent = hdr; 437 438 b->mem.start = nxt_port_mmap_chunk_start(hdr, c); 439 b->mem.pos = b->mem.start; 440 b->mem.free = b->mem.start; 441 b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE; 442 443 nchunks = size / PORT_MMAP_CHUNK_SIZE; 444 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { 445 nchunks++; 446 } 447
| 419 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); 420 if (nxt_slow_path(b == NULL)) { 421 return NULL; 422 } 423 424 b->completion_handler = nxt_port_mmap_buf_completion; 425 nxt_buf_set_port_mmap(b); 426 427 hdr = nxt_port_mmap_get(task, port, &c, size); 428 if (nxt_slow_path(hdr == NULL)) { 429 nxt_mp_release(port->mem_pool, b); 430 return NULL; 431 } 432 433 b->parent = hdr; 434 435 b->mem.start = nxt_port_mmap_chunk_start(hdr, c); 436 b->mem.pos = b->mem.start; 437 b->mem.free = b->mem.start; 438 b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE; 439 440 nchunks = size / PORT_MMAP_CHUNK_SIZE; 441 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { 442 nchunks++; 443 } 444
|
| 445 nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%d] %PI,%d,%d", b, 446 b->mem.start, b->mem.end - b->mem.start, 447 hdr->pid, hdr->id, c); 448
|
448 c++; 449 nchunks--; 450 451 /* Try to acquire as much chunks as required. */ 452 while (nchunks > 0) { 453 454 if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { 455 break; 456 } 457 458 b->mem.end += PORT_MMAP_CHUNK_SIZE; 459 c++; 460 nchunks--; 461 } 462 463 return b; 464} 465 466 467nxt_int_t
| 449 c++; 450 nchunks--; 451 452 /* Try to acquire as much chunks as required. */ 453 while (nchunks > 0) { 454 455 if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { 456 break; 457 } 458 459 b->mem.end += PORT_MMAP_CHUNK_SIZE; 460 c++; 461 nchunks--; 462 } 463 464 return b; 465} 466 467 468nxt_int_t
|
468nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size)
| 469nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, 470 size_t min_size)
|
469{
| 471{
|
470 size_t nchunks;
| 472 size_t nchunks, free_size;
|
471 nxt_chunk_id_t c, start; 472 nxt_port_mmap_header_t *hdr; 473 474 nxt_debug(task, "request increase %z bytes shm buffer", size); 475 476 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) { 477 nxt_log(task, NXT_LOG_WARN, 478 "failed to increase, not a mmap buffer"); 479 return NXT_ERROR; 480 } 481
| 473 nxt_chunk_id_t c, start; 474 nxt_port_mmap_header_t *hdr; 475 476 nxt_debug(task, "request increase %z bytes shm buffer", size); 477 478 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) { 479 nxt_log(task, NXT_LOG_WARN, 480 "failed to increase, not a mmap buffer"); 481 return NXT_ERROR; 482 } 483
|
482 if (nxt_slow_path(size <= (size_t) nxt_buf_mem_free_size(&b->mem))) {
| 484 free_size = nxt_buf_mem_free_size(&b->mem); 485 486 if (nxt_slow_path(size <= free_size)) {
|
483 return NXT_OK; 484 } 485 486 hdr = b->parent; 487 488 start = nxt_port_mmap_chunk_id(hdr, b->mem.end); 489
| 487 return NXT_OK; 488 } 489 490 hdr = b->parent; 491 492 start = nxt_port_mmap_chunk_id(hdr, b->mem.end); 493
|
490 size -= nxt_buf_mem_free_size(&b->mem);
| 494 size -= free_size;
|
491 492 nchunks = size / PORT_MMAP_CHUNK_SIZE; 493 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { 494 nchunks++; 495 } 496 497 c = start; 498 499 /* Try to acquire as much chunks as required. */ 500 while (nchunks > 0) { 501 502 if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { 503 break; 504 } 505 506 c++; 507 nchunks--; 508 } 509
| 495 496 nchunks = size / PORT_MMAP_CHUNK_SIZE; 497 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { 498 nchunks++; 499 } 500 501 c = start; 502 503 /* Try to acquire as much chunks as required. */ 504 while (nchunks > 0) { 505 506 if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { 507 break; 508 } 509 510 c++; 511 nchunks--; 512 } 513
|
510 if (nchunks != 0) {
| 514 if (nchunks != 0 && 515 min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) { 516
|
511 c--; 512 while (c >= start) { 513 nxt_port_mmap_set_chunk_free(hdr, c); 514 c--; 515 } 516 517 nxt_debug(task, "failed to increase, %d chunks busy", nchunks); 518 519 return NXT_ERROR; 520 } else { 521 b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); 522 523 return NXT_OK; 524 } 525} 526 527 528static nxt_buf_t * 529nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, 530 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) 531{ 532 size_t nchunks; 533 nxt_buf_t *b; 534 nxt_port_mmap_header_t *hdr; 535 536 hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id); 537 if (nxt_slow_path(hdr == NULL)) { 538 return NULL; 539 } 540 541 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); 542 if (nxt_slow_path(b == NULL)) { 543 return NULL; 544 } 545 546 b->completion_handler = nxt_port_mmap_buf_completion; 547 548 nxt_buf_set_port_mmap(b); 549 550 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; 551 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { 552 nchunks++; 553 } 554 555 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 556 b->mem.pos = b->mem.start; 557 b->mem.free = b->mem.start + mmap_msg->size; 558 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 559 560 b->parent = hdr; 561
| 517 c--; 518 while (c >= start) { 519 nxt_port_mmap_set_chunk_free(hdr, c); 520 c--; 521 } 522 523 nxt_debug(task, "failed to increase, %d chunks busy", nchunks); 524 525 return NXT_ERROR; 526 } else { 527 b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); 528 529 return NXT_OK; 530 } 531} 532 533 534static nxt_buf_t * 535nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, 536 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) 537{ 538 size_t nchunks; 539 nxt_buf_t *b; 540 nxt_port_mmap_header_t *hdr; 541 542 hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id); 543 if (nxt_slow_path(hdr == NULL)) { 544 return NULL; 545 } 546 547 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); 548 if (nxt_slow_path(b == NULL)) { 549 return NULL; 550 } 551 552 b->completion_handler = nxt_port_mmap_buf_completion; 553 554 nxt_buf_set_port_mmap(b); 555 556 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; 557 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { 558 nchunks++; 559 } 560 561 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 562 b->mem.pos = b->mem.start; 563 b->mem.free = b->mem.start + mmap_msg->size; 564 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 565 566 b->parent = hdr; 567
|
| 568 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI,%d,%d", b, 569 b->mem.start, b->mem.end - b->mem.start, 570 hdr->pid, hdr->id, mmap_msg->chunk_id); 571
|
562 return b; 563} 564 565 566void 567nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, 568 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb) 569{ 570 size_t bsize; 571 nxt_buf_t *bmem; 572 nxt_uint_t i; 573 nxt_port_mmap_msg_t *mmap_msg; 574 nxt_port_mmap_header_t *hdr; 575 576 nxt_debug(task, "prepare %z bytes message for transfer to process %PI " 577 "via shared memory", sb->size, port->pid); 578 579 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); 580 mmap_msg = port->mmsg_buf; 581 582 bmem = msg->buf; 583 584 for (i = 0; i < sb->niov; i++, mmap_msg++) { 585 586 /* Lookup buffer which starts current iov_base. */ 587 while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) { 588 bmem = bmem->next; 589 } 590 591 if (nxt_slow_path(bmem == NULL)) { 592 nxt_log_error(NXT_LOG_ERR, task->log, "failed to find buf for " 593 "iobuf[%d]", i); 594 return; 595 /* TODO clear b and exit */ 596 } 597 598 hdr = bmem->parent; 599 600 mmap_msg->mmap_id = hdr->id; 601 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); 602 mmap_msg->size = sb->iobuf[i].iov_len; 603 604 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI", 605 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 606 port->pid); 607 } 608 609 sb->iobuf[0].iov_base = port->mmsg_buf; 610 sb->iobuf[0].iov_len = bsize; 611 sb->niov = 1; 612 sb->size = bsize; 613 614 msg->port_msg.mmap = 1; 615} 616 617 618void 619nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port, 620 nxt_port_recv_msg_t *msg) 621{ 622 nxt_buf_t *b, **pb; 623 nxt_port_mmap_msg_t *end, *mmap_msg; 624 625 b = msg->buf; 626 627 mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; 628 end = (nxt_port_mmap_msg_t *) b->mem.free; 629 630 pb = &msg->buf; 631 msg->size = 0; 632 633 while (mmap_msg < end) { 634 nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", 635 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 636 msg->port_msg.pid); 637 638 *pb = nxt_port_mmap_get_incoming_buf(task, port, msg->port_msg.pid, 639 mmap_msg); 640 if (nxt_slow_path(*pb == NULL)) { 641 nxt_log_error(NXT_LOG_ERR, task->log, "failed to get mmap buffer"); 642 643 break; 644 } 645 646 msg->size += mmap_msg->size; 647 pb = &(*pb)->next; 648 mmap_msg++; 649 } 650 651 /* Mark original buf as complete. */ 652 b->mem.pos += nxt_buf_used_size(b); 653} 654 655 656nxt_port_method_t 657nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) 658{ 659 nxt_port_method_t m; 660 nxt_port_mmap_header_t *hdr; 661 662 m = NXT_PORT_METHOD_ANY; 663 664 for (; b != NULL; b = b->next) { 665 if (nxt_buf_used_size(b) == 0) { 666 /* empty buffers does not affect method */ 667 continue; 668 } 669 670 if (nxt_buf_is_port_mmap(b)) { 671 hdr = b->parent; 672 673 if (m == NXT_PORT_METHOD_PLAIN) { 674 nxt_log_error(NXT_LOG_ERR, task->log, 675 "mixing plain and mmap buffers, " 676 "using plain mode"); 677 678 break; 679 } 680 681 if (port->pid != hdr->pid) { 682 nxt_log_error(NXT_LOG_ERR, task->log, 683 "send mmap buffer for %PI to %PI, " 684 "using plain mode", hdr->pid, port->pid); 685 686 m = NXT_PORT_METHOD_PLAIN; 687 688 break; 689 } 690 691 if (m == NXT_PORT_METHOD_ANY) { 692 nxt_debug(task, "using mmap mode"); 693 694 m = NXT_PORT_METHOD_MMAP; 695 } 696 } else { 697 if (m == NXT_PORT_METHOD_MMAP) { 698 nxt_log_error(NXT_LOG_ERR, task->log, 699 "mixing mmap and plain buffers, " 700 "switching to plain mode"); 701 702 m = NXT_PORT_METHOD_PLAIN; 703 704 break; 705 } 706 707 if (m == NXT_PORT_METHOD_ANY) { 708 nxt_debug(task, "using plain mode"); 709 710 m = NXT_PORT_METHOD_PLAIN; 711 } 712 } 713 } 714 715 return m; 716}
| 572 return b; 573} 574 575 576void 577nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, 578 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb) 579{ 580 size_t bsize; 581 nxt_buf_t *bmem; 582 nxt_uint_t i; 583 nxt_port_mmap_msg_t *mmap_msg; 584 nxt_port_mmap_header_t *hdr; 585 586 nxt_debug(task, "prepare %z bytes message for transfer to process %PI " 587 "via shared memory", sb->size, port->pid); 588 589 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); 590 mmap_msg = port->mmsg_buf; 591 592 bmem = msg->buf; 593 594 for (i = 0; i < sb->niov; i++, mmap_msg++) { 595 596 /* Lookup buffer which starts current iov_base. */ 597 while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) { 598 bmem = bmem->next; 599 } 600 601 if (nxt_slow_path(bmem == NULL)) { 602 nxt_log_error(NXT_LOG_ERR, task->log, "failed to find buf for " 603 "iobuf[%d]", i); 604 return; 605 /* TODO clear b and exit */ 606 } 607 608 hdr = bmem->parent; 609 610 mmap_msg->mmap_id = hdr->id; 611 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); 612 mmap_msg->size = sb->iobuf[i].iov_len; 613 614 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI", 615 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 616 port->pid); 617 } 618 619 sb->iobuf[0].iov_base = port->mmsg_buf; 620 sb->iobuf[0].iov_len = bsize; 621 sb->niov = 1; 622 sb->size = bsize; 623 624 msg->port_msg.mmap = 1; 625} 626 627 628void 629nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port, 630 nxt_port_recv_msg_t *msg) 631{ 632 nxt_buf_t *b, **pb; 633 nxt_port_mmap_msg_t *end, *mmap_msg; 634 635 b = msg->buf; 636 637 mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; 638 end = (nxt_port_mmap_msg_t *) b->mem.free; 639 640 pb = &msg->buf; 641 msg->size = 0; 642 643 while (mmap_msg < end) { 644 nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", 645 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 646 msg->port_msg.pid); 647 648 *pb = nxt_port_mmap_get_incoming_buf(task, port, msg->port_msg.pid, 649 mmap_msg); 650 if (nxt_slow_path(*pb == NULL)) { 651 nxt_log_error(NXT_LOG_ERR, task->log, "failed to get mmap buffer"); 652 653 break; 654 } 655 656 msg->size += mmap_msg->size; 657 pb = &(*pb)->next; 658 mmap_msg++; 659 } 660 661 /* Mark original buf as complete. */ 662 b->mem.pos += nxt_buf_used_size(b); 663} 664 665 666nxt_port_method_t 667nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) 668{ 669 nxt_port_method_t m; 670 nxt_port_mmap_header_t *hdr; 671 672 m = NXT_PORT_METHOD_ANY; 673 674 for (; b != NULL; b = b->next) { 675 if (nxt_buf_used_size(b) == 0) { 676 /* empty buffers does not affect method */ 677 continue; 678 } 679 680 if (nxt_buf_is_port_mmap(b)) { 681 hdr = b->parent; 682 683 if (m == NXT_PORT_METHOD_PLAIN) { 684 nxt_log_error(NXT_LOG_ERR, task->log, 685 "mixing plain and mmap buffers, " 686 "using plain mode"); 687 688 break; 689 } 690 691 if (port->pid != hdr->pid) { 692 nxt_log_error(NXT_LOG_ERR, task->log, 693 "send mmap buffer for %PI to %PI, " 694 "using plain mode", hdr->pid, port->pid); 695 696 m = NXT_PORT_METHOD_PLAIN; 697 698 break; 699 } 700 701 if (m == NXT_PORT_METHOD_ANY) { 702 nxt_debug(task, "using mmap mode"); 703 704 m = NXT_PORT_METHOD_MMAP; 705 } 706 } else { 707 if (m == NXT_PORT_METHOD_MMAP) { 708 nxt_log_error(NXT_LOG_ERR, task->log, 709 "mixing mmap and plain buffers, " 710 "switching to plain mode"); 711 712 m = NXT_PORT_METHOD_PLAIN; 713 714 break; 715 } 716 717 if (m == NXT_PORT_METHOD_ANY) { 718 nxt_debug(task, "using plain mode"); 719 720 m = NXT_PORT_METHOD_PLAIN; 721 } 722 } 723 } 724 725 return m; 726}
|