1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, 11 size_t *copied); 12 13 14nxt_uint_t 15nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb, 16 struct iovec *iov, nxt_uint_t niov_max) 17{ 18 u_char *last; 19 size_t size, total; 20 nxt_buf_t *b; 21 nxt_uint_t n; 22 23 total = sb->size; 24 last = NULL; 25 n = (nxt_uint_t) -1; 26 27 for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) { 28 29 nxt_prefetch(b->next); 30 31 if (nxt_buf_is_file(b)) { 32 break; 33 } 34 35 if (nxt_buf_is_mem(b)) { 36 37 size = b->mem.free - b->mem.pos; 38 39 if (size != 0) { 40 41 if (total + size > sb->limit) { 42 size = sb->limit - total; 43 44 if (size == 0) { 45 break; 46 } 47 } 48 49 if (b->mem.pos != last) { 50 51 if (++n >= niov_max) { 52 goto done; 53 } 54 55 iov[n].iov_base = b->mem.pos; 56 iov[n].iov_len = size; 57 58 } else { 59 iov[n].iov_len += size; 60 } 61 62 nxt_debug(task, "sendbuf: %ui, %p, %uz", 63 n, iov[n].iov_base, iov[n].iov_len); 64 65 total += size; 66 last = b->mem.pos + size; 67 } 68 69 } else { 70 sb->sync = 1; 71 sb->last |= nxt_buf_is_last(b); 72 } 73 } 74 75 n++; 76 77done: 78 79 sb->buf = b; 80 81 return n; 82} 83 84 85nxt_uint_t 86nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb) 87{ 88 u_char *last; 89 size_t size, total; 90 nxt_buf_t *b; 91 nxt_uint_t n; 92 93 total = sb->size; 94 last = NULL; 95 n = (nxt_uint_t) -1; 96 97 for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) { 98 99 nxt_prefetch(b->next); 100 101 if (nxt_buf_is_file(b)) { 102 break; 103 } 104 105 if (nxt_buf_is_mem(b)) { 106 107 size = b->mem.free - b->mem.pos; 108 109 if (size != 0) { 110 111 if (total + size > sb->limit) { 112 size = sb->limit - total; 113 114 if (size == 0) { 115 break; 116 } 117 } 118 119 if (b->mem.pos != last) { 120 121 if (++n >= sb->nmax) { 122 goto done; 123 } 124 125 sb->iobuf[n].iov_base = b->mem.pos; 126 sb->iobuf[n].iov_len = size; 127 128 } else { 129 sb->iobuf[n].iov_len += size; 130 } 131 132 nxt_debug(task, "sendbuf: %ui, %p, %uz", 133 n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len); 134 135 total += size; 136 last = b->mem.pos + size; 137 } 138 139 } else { 140 sb->sync = 1; 141 sb->last |= nxt_buf_is_last(b); 142 } 143 } 144 145 n++; 146 147done: 148 149 sb->buf = b; 150 sb->size = total; 151 sb->niov = n; 152 153 return n; 154} 155 156 157size_t 158nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb) 159{ 160 size_t file_start, total; 161 nxt_fd_t fd; 162 nxt_off_t size, last; 163 nxt_buf_t *b; 164 165 b = sb->buf; 166 fd = b->file->fd; 167 168 total = sb->size; 169 170 for ( ;; ) { 171 172 nxt_prefetch(b->next); 173 174 size = b->file_end - b->file_pos; 175 176 if (total + size >= sb->limit) { 177 total = sb->limit; 178 break; 179 } 180 181 total += size; 182 last = b->file_pos + size; 183 184 b = b->next; 185 186 if (b == NULL || !nxt_buf_is_file(b)) { 187 break; 188 } 189 190 if (b->file_pos != last || b->file->fd != fd) { 191 break; 192 } 193 } 194 195 sb->buf = b; 196 197 file_start = sb->size; 198 sb->size = total; 199 200 return total - file_start; 201} 202 203 204ssize_t
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, 11 size_t *copied); 12 13 14nxt_uint_t 15nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb, 16 struct iovec *iov, nxt_uint_t niov_max) 17{ 18 u_char *last; 19 size_t size, total; 20 nxt_buf_t *b; 21 nxt_uint_t n; 22 23 total = sb->size; 24 last = NULL; 25 n = (nxt_uint_t) -1; 26 27 for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) { 28 29 nxt_prefetch(b->next); 30 31 if (nxt_buf_is_file(b)) { 32 break; 33 } 34 35 if (nxt_buf_is_mem(b)) { 36 37 size = b->mem.free - b->mem.pos; 38 39 if (size != 0) { 40 41 if (total + size > sb->limit) { 42 size = sb->limit - total; 43 44 if (size == 0) { 45 break; 46 } 47 } 48 49 if (b->mem.pos != last) { 50 51 if (++n >= niov_max) { 52 goto done; 53 } 54 55 iov[n].iov_base = b->mem.pos; 56 iov[n].iov_len = size; 57 58 } else { 59 iov[n].iov_len += size; 60 } 61 62 nxt_debug(task, "sendbuf: %ui, %p, %uz", 63 n, iov[n].iov_base, iov[n].iov_len); 64 65 total += size; 66 last = b->mem.pos + size; 67 } 68 69 } else { 70 sb->sync = 1; 71 sb->last |= nxt_buf_is_last(b); 72 } 73 } 74 75 n++; 76 77done: 78 79 sb->buf = b; 80 81 return n; 82} 83 84 85nxt_uint_t 86nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb) 87{ 88 u_char *last; 89 size_t size, total; 90 nxt_buf_t *b; 91 nxt_uint_t n; 92 93 total = sb->size; 94 last = NULL; 95 n = (nxt_uint_t) -1; 96 97 for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) { 98 99 nxt_prefetch(b->next); 100 101 if (nxt_buf_is_file(b)) { 102 break; 103 } 104 105 if (nxt_buf_is_mem(b)) { 106 107 size = b->mem.free - b->mem.pos; 108 109 if (size != 0) { 110 111 if (total + size > sb->limit) { 112 size = sb->limit - total; 113 114 if (size == 0) { 115 break; 116 } 117 } 118 119 if (b->mem.pos != last) { 120 121 if (++n >= sb->nmax) { 122 goto done; 123 } 124 125 sb->iobuf[n].iov_base = b->mem.pos; 126 sb->iobuf[n].iov_len = size; 127 128 } else { 129 sb->iobuf[n].iov_len += size; 130 } 131 132 nxt_debug(task, "sendbuf: %ui, %p, %uz", 133 n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len); 134 135 total += size; 136 last = b->mem.pos + size; 137 } 138 139 } else { 140 sb->sync = 1; 141 sb->last |= nxt_buf_is_last(b); 142 } 143 } 144 145 n++; 146 147done: 148 149 sb->buf = b; 150 sb->size = total; 151 sb->niov = n; 152 153 return n; 154} 155 156 157size_t 158nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb) 159{ 160 size_t file_start, total; 161 nxt_fd_t fd; 162 nxt_off_t size, last; 163 nxt_buf_t *b; 164 165 b = sb->buf; 166 fd = b->file->fd; 167 168 total = sb->size; 169 170 for ( ;; ) { 171 172 nxt_prefetch(b->next); 173 174 size = b->file_end - b->file_pos; 175 176 if (total + size >= sb->limit) { 177 total = sb->limit; 178 break; 179 } 180 181 total += size; 182 last = b->file_pos + size; 183 184 b = b->next; 185 186 if (b == NULL || !nxt_buf_is_file(b)) { 187 break; 188 } 189 190 if (b->file_pos != last || b->file->fd != fd) { 191 break; 192 } 193 } 194 195 sb->buf = b; 196 197 file_start = sb->size; 198 sb->size = total; 199 200 return total - file_start; 201} 202 203 204ssize_t
|
207{ 208 size_t size, bsize, copied; 209 ssize_t n; 210 nxt_bool_t flush; 211 212 size = nxt_buf_mem_used_size(&b->mem); 213 bsize = nxt_buf_mem_size(bm); 214 215 if (bsize != 0) { 216 217 if (size > bsize && bm->pos == bm->free) { 218 /* 219 * A data buffer size is larger than the internal 220 * buffer size and the internal buffer is empty. 221 */ 222 goto no_buffer; 223 } 224 225 if (bm->pos == NULL) { 226 bm->pos = nxt_malloc(bsize); 227 if (nxt_slow_path(bm->pos == NULL)) { 228 return NXT_ERROR; 229 } 230 231 bm->start = bm->pos; 232 bm->free = bm->pos; 233 bm->end += (uintptr_t) bm->pos; 234 } 235 236 copied = 0; 237 238 flush = nxt_sendbuf_copy(bm, b, &copied); 239 240 nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush); 241 242 if (flush == 0) { 243 return copied; 244 } 245 246 size = nxt_buf_mem_used_size(bm); 247 248 if (size == 0 && nxt_buf_is_sync(b)) { 249 goto done; 250 } 251 252 n = c->io->send(c, bm->pos, nxt_min(size, limit)); 253 254 nxt_log_debug(c->socket.log, "sendbuf sent:%z", n); 255 256 if (n > 0) { 257 bm->pos += n; 258 259 if (bm->pos == bm->free) { 260 bm->pos = bm->start; 261 bm->free = bm->start; 262 } 263 264 n = 0; 265 } 266 267 return (copied != 0) ? (ssize_t) copied : n; 268 } 269 270 /* No internal buffering. */ 271 272 if (size == 0 && nxt_buf_is_sync(b)) { 273 goto done; 274 } 275 276no_buffer: 277 278 return c->io->send(c, b->mem.pos, nxt_min(size, limit)); 279 280done: 281 282 nxt_log_debug(c->socket.log, "sendbuf done"); 283 284 return 0; 285} 286 287 288static nxt_bool_t 289nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied) 290{ 291 size_t size, bsize; 292 nxt_bool_t flush; 293 294 flush = 0; 295 296 do { 297 nxt_prefetch(b->next); 298 299 if (nxt_buf_is_mem(b)) { 300 bsize = bm->end - bm->free; 301 size = b->mem.free - b->mem.pos; 302 size = nxt_min(size, bsize); 303 304 nxt_memcpy(bm->free, b->mem.pos, size); 305 306 *copied += size; 307 bm->free += size; 308 309 if (bm->free == bm->end) { 310 return 1; 311 } 312 } 313 314 flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b); 315 316 b = b->next; 317 318 } while (b != NULL); 319 320 return flush; 321} 322 323 324nxt_buf_t * 325nxt_sendbuf_update(nxt_buf_t *b, size_t sent) 326{ 327 size_t size; 328 329 while (b != NULL) { 330 331 nxt_prefetch(b->next); 332 333 if (!nxt_buf_is_sync(b)) { 334 335 size = nxt_buf_used_size(b); 336 337 if (size != 0) { 338 339 if (sent == 0) { 340 break; 341 } 342 343 if (sent < size) { 344 345 if (nxt_buf_is_mem(b)) { 346 b->mem.pos += sent; 347 } 348 349 if (nxt_buf_is_file(b)) { 350 b->file_pos += sent; 351 } 352 353 break; 354 } 355 356 /* b->mem.free is NULL in file-only buffer. */ 357 b->mem.pos = b->mem.free; 358 359 if (nxt_buf_is_file(b)) { 360 b->file_pos = b->file_end; 361 } 362 363 sent -= size; 364 } 365 } 366 367 b = b->next; 368 } 369 370 return b; 371} 372 373 374nxt_buf_t * 375nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, 376 size_t sent) 377{ 378 size_t size; 379 380 while (b != NULL) { 381 382 nxt_prefetch(b->next); 383 384 if (!nxt_buf_is_sync(b)) { 385 386 size = nxt_buf_used_size(b); 387 388 if (size != 0) { 389 390 if (sent == 0) { 391 break; 392 } 393 394 if (nxt_buf_is_port_mmap(b)) { 395 /* 396 * buffer has been sent to other side which is now 397 * responsible for shared memory bucket release 398 */ 399 b->is_port_mmap_sent = 1; 400 } 401 402 if (sent < size) { 403 404 if (nxt_buf_is_mem(b)) { 405 b->mem.pos += sent; 406 } 407 408 if (nxt_buf_is_file(b)) { 409 b->file_pos += sent; 410 } 411 412 break; 413 } 414 415 /* b->mem.free is NULL in file-only buffer. */ 416 b->mem.pos = b->mem.free; 417 418 if (nxt_buf_is_file(b)) { 419 b->file_pos = b->file_end; 420 } 421 422 sent -= size; 423 } 424 } 425 426 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 427 428 b = b->next; 429 } 430 431 return b; 432}
| 207{ 208 size_t size, bsize, copied; 209 ssize_t n; 210 nxt_bool_t flush; 211 212 size = nxt_buf_mem_used_size(&b->mem); 213 bsize = nxt_buf_mem_size(bm); 214 215 if (bsize != 0) { 216 217 if (size > bsize && bm->pos == bm->free) { 218 /* 219 * A data buffer size is larger than the internal 220 * buffer size and the internal buffer is empty. 221 */ 222 goto no_buffer; 223 } 224 225 if (bm->pos == NULL) { 226 bm->pos = nxt_malloc(bsize); 227 if (nxt_slow_path(bm->pos == NULL)) { 228 return NXT_ERROR; 229 } 230 231 bm->start = bm->pos; 232 bm->free = bm->pos; 233 bm->end += (uintptr_t) bm->pos; 234 } 235 236 copied = 0; 237 238 flush = nxt_sendbuf_copy(bm, b, &copied); 239 240 nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush); 241 242 if (flush == 0) { 243 return copied; 244 } 245 246 size = nxt_buf_mem_used_size(bm); 247 248 if (size == 0 && nxt_buf_is_sync(b)) { 249 goto done; 250 } 251 252 n = c->io->send(c, bm->pos, nxt_min(size, limit)); 253 254 nxt_log_debug(c->socket.log, "sendbuf sent:%z", n); 255 256 if (n > 0) { 257 bm->pos += n; 258 259 if (bm->pos == bm->free) { 260 bm->pos = bm->start; 261 bm->free = bm->start; 262 } 263 264 n = 0; 265 } 266 267 return (copied != 0) ? (ssize_t) copied : n; 268 } 269 270 /* No internal buffering. */ 271 272 if (size == 0 && nxt_buf_is_sync(b)) { 273 goto done; 274 } 275 276no_buffer: 277 278 return c->io->send(c, b->mem.pos, nxt_min(size, limit)); 279 280done: 281 282 nxt_log_debug(c->socket.log, "sendbuf done"); 283 284 return 0; 285} 286 287 288static nxt_bool_t 289nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied) 290{ 291 size_t size, bsize; 292 nxt_bool_t flush; 293 294 flush = 0; 295 296 do { 297 nxt_prefetch(b->next); 298 299 if (nxt_buf_is_mem(b)) { 300 bsize = bm->end - bm->free; 301 size = b->mem.free - b->mem.pos; 302 size = nxt_min(size, bsize); 303 304 nxt_memcpy(bm->free, b->mem.pos, size); 305 306 *copied += size; 307 bm->free += size; 308 309 if (bm->free == bm->end) { 310 return 1; 311 } 312 } 313 314 flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b); 315 316 b = b->next; 317 318 } while (b != NULL); 319 320 return flush; 321} 322 323 324nxt_buf_t * 325nxt_sendbuf_update(nxt_buf_t *b, size_t sent) 326{ 327 size_t size; 328 329 while (b != NULL) { 330 331 nxt_prefetch(b->next); 332 333 if (!nxt_buf_is_sync(b)) { 334 335 size = nxt_buf_used_size(b); 336 337 if (size != 0) { 338 339 if (sent == 0) { 340 break; 341 } 342 343 if (sent < size) { 344 345 if (nxt_buf_is_mem(b)) { 346 b->mem.pos += sent; 347 } 348 349 if (nxt_buf_is_file(b)) { 350 b->file_pos += sent; 351 } 352 353 break; 354 } 355 356 /* b->mem.free is NULL in file-only buffer. */ 357 b->mem.pos = b->mem.free; 358 359 if (nxt_buf_is_file(b)) { 360 b->file_pos = b->file_end; 361 } 362 363 sent -= size; 364 } 365 } 366 367 b = b->next; 368 } 369 370 return b; 371} 372 373 374nxt_buf_t * 375nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, 376 size_t sent) 377{ 378 size_t size; 379 380 while (b != NULL) { 381 382 nxt_prefetch(b->next); 383 384 if (!nxt_buf_is_sync(b)) { 385 386 size = nxt_buf_used_size(b); 387 388 if (size != 0) { 389 390 if (sent == 0) { 391 break; 392 } 393 394 if (nxt_buf_is_port_mmap(b)) { 395 /* 396 * buffer has been sent to other side which is now 397 * responsible for shared memory bucket release 398 */ 399 b->is_port_mmap_sent = 1; 400 } 401 402 if (sent < size) { 403 404 if (nxt_buf_is_mem(b)) { 405 b->mem.pos += sent; 406 } 407 408 if (nxt_buf_is_file(b)) { 409 b->file_pos += sent; 410 } 411 412 break; 413 } 414 415 /* b->mem.free is NULL in file-only buffer. */ 416 b->mem.pos = b->mem.free; 417 418 if (nxt_buf_is_file(b)) { 419 b->file_pos = b->file_end; 420 } 421 422 sent -= size; 423 } 424 } 425 426 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 427 428 b = b->next; 429 } 430 431 return b; 432}
|