1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, 11 size_t *copied); 12 13 14 nxt_uint_t 15 nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb, 16 struct iovec *iov, nxt_uint_t niov_max) 17 { 18 u_char *last; 19 size_t size, total; 20 nxt_buf_t *b; 21 nxt_uint_t n; 22 23 total = sb->size; 24 last = NULL; 25 n = (nxt_uint_t) -1; 26 27 for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) { 28 29 nxt_prefetch(b->next); 30 31 if (nxt_buf_is_file(b)) { 32 break; 33 } 34 35 if (nxt_buf_is_mem(b)) { 36 37 size = b->mem.free - b->mem.pos; 38 39 if (size != 0) { 40 41 if (total + size > sb->limit) { 42 size = sb->limit - total; 43 44 if (size == 0) { 45 break; 46 } 47 } 48 49 if (b->mem.pos != last) { 50 51 if (++n >= niov_max) { 52 goto done; 53 } 54 55 iov[n].iov_base = b->mem.pos; 56 iov[n].iov_len = size; 57 58 } else { 59 iov[n].iov_len += size; 60 } 61 62 nxt_debug(task, "sendbuf: %ui, %p, %uz", 63 n, iov[n].iov_base, iov[n].iov_len); 64 65 total += size; 66 last = b->mem.pos + size; 67 } 68 69 } else { 70 sb->sync = 1; 71 sb->last |= nxt_buf_is_last(b); 72 } 73 } 74 75 n++; 76 77 done: 78 79 sb->buf = b; 80 81 return n; 82 } 83 84 85 nxt_uint_t 86 nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb) 87 { 88 u_char *last; 89 size_t size, total; 90 nxt_buf_t *b; 91 nxt_uint_t n; 92 93 total = sb->size; 94 last = NULL; 95 n = (nxt_uint_t) -1; 96 97 for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) { 98 99 nxt_prefetch(b->next); 100 101 if (nxt_buf_is_file(b)) { 102 break; 103 } 104 105 if (nxt_buf_is_mem(b)) { 106 107 size = b->mem.free - b->mem.pos; 108 109 if (size != 0) { 110 111 if (total + size > sb->limit) { 112 size = sb->limit - total; 113 114 sb->limit_reached = 1; 115 116 if (nxt_slow_path(size == 0)) { 117 break; 118 } 119 } 120 121 if (b->mem.pos != last) { 122 123 if (++n >= sb->nmax) { 124 sb->nmax_reached = 1; 125 126 goto done; 127 } 128 129 sb->iobuf[n].iov_base = b->mem.pos; 130 sb->iobuf[n].iov_len = size; 131 132 } else { 133 sb->iobuf[n].iov_len += size; 134 } 135 136 nxt_debug(task, "sendbuf: %ui, %p, %uz", 137 n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len); 138 139 total += size; 140 last = b->mem.pos + size; 141 } 142 143 } else { 144 sb->sync = 1; 145 sb->last |= nxt_buf_is_last(b); 146 } 147 } 148 149 n++; 150 151 done: 152 153 sb->buf = b; 154 sb->size = total; 155 sb->niov = n; 156 157 return n; 158 } 159 160 161 size_t 162 nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb) 163 { 164 size_t file_start, total; 165 nxt_fd_t fd; 166 nxt_off_t size, last; 167 nxt_buf_t *b; 168 169 b = sb->buf; 170 fd = b->file->fd; 171 172 total = sb->size; 173 174 for ( ;; ) { 175 176 nxt_prefetch(b->next); 177 178 size = b->file_end - b->file_pos; 179 180 if (total + size >= sb->limit) { 181 total = sb->limit; 182 break; 183 } 184 185 total += size; 186 last = b->file_pos + size; 187 188 b = b->next; 189 190 if (b == NULL || !nxt_buf_is_file(b)) { 191 break; 192 } 193 194 if (b->file_pos != last || b->file->fd != fd) { 195 break; 196 } 197 } 198 199 sb->buf = b; 200 201 file_start = sb->size; 202 sb->size = total; 203 204 return total - file_start; 205 } 206 207 208 ssize_t 209 nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *b, 210 size_t limit) 211 { 212 size_t size, bsize, copied; 213 ssize_t n; 214 nxt_bool_t flush; 215 216 size = nxt_buf_mem_used_size(&b->mem); 217 bsize = nxt_buf_mem_size(bm); 218 219 if (bsize != 0) { 220 221 if (size > bsize && bm->pos == bm->free) { 222 /* 223 * A data buffer size is larger than the internal 224 * buffer size and the internal buffer is empty. 225 */ 226 goto no_buffer; 227 } 228 229 if (bm->pos == NULL) { 230 bm->pos = nxt_malloc(bsize); 231 if (nxt_slow_path(bm->pos == NULL)) { 232 return NXT_ERROR; 233 } 234 235 bm->start = bm->pos; 236 bm->free = bm->pos; 237 bm->end += (uintptr_t) bm->pos; 238 } 239 240 copied = 0; 241 242 flush = nxt_sendbuf_copy(bm, b, &copied); 243 244 nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush); 245 246 if (flush == 0) { 247 return copied; 248 } 249 250 size = nxt_buf_mem_used_size(bm); 251 252 if (size == 0 && nxt_buf_is_sync(b)) { 253 goto done; 254 } 255 256 n = c->io->send(c, bm->pos, nxt_min(size, limit)); 257 258 nxt_log_debug(c->socket.log, "sendbuf sent:%z", n); 259 260 if (n > 0) { 261 bm->pos += n; 262 263 if (bm->pos == bm->free) { 264 bm->pos = bm->start; 265 bm->free = bm->start; 266 } 267 268 n = 0; 269 } 270 271 return (copied != 0) ? (ssize_t) copied : n; 272 } 273 274 /* No internal buffering. */ 275 276 if (size == 0 && nxt_buf_is_sync(b)) { 277 goto done; 278 } 279 280 no_buffer: 281 282 return c->io->send(c, b->mem.pos, nxt_min(size, limit)); 283 284 done: 285 286 nxt_log_debug(c->socket.log, "sendbuf done"); 287 288 return 0; 289 } 290 291 292 static nxt_bool_t 293 nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied) 294 { 295 size_t size, bsize; 296 nxt_bool_t flush; 297 298 flush = 0; 299 300 do { 301 nxt_prefetch(b->next); 302 303 if (nxt_buf_is_mem(b)) { 304 bsize = bm->end - bm->free; 305 size = b->mem.free - b->mem.pos; 306 size = nxt_min(size, bsize); 307 308 nxt_memcpy(bm->free, b->mem.pos, size); 309 310 *copied += size; 311 bm->free += size; 312 313 if (bm->free == bm->end) { 314 return 1; 315 } 316 } 317 318 flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b); 319 320 b = b->next; 321 322 } while (b != NULL); 323 324 return flush; 325 } 326 327 328 nxt_buf_t * 329 nxt_sendbuf_update(nxt_buf_t *b, size_t sent) 330 { 331 size_t size; 332 333 while (b != NULL) { 334 335 nxt_prefetch(b->next); 336 337 if (!nxt_buf_is_sync(b)) { 338 339 size = nxt_buf_used_size(b); 340 341 if (size != 0) { 342 343 if (sent == 0) { 344 break; 345 } 346 347 if (sent < size) { 348 349 if (nxt_buf_is_mem(b)) { 350 b->mem.pos += sent; 351 } 352 353 if (nxt_buf_is_file(b)) { 354 b->file_pos += sent; 355 } 356 357 break; 358 } 359 360 /* b->mem.free is NULL in file-only buffer. */ 361 b->mem.pos = b->mem.free; 362 363 if (nxt_buf_is_file(b)) { 364 b->file_pos = b->file_end; 365 } 366 367 sent -= size; 368 } 369 } 370 371 b = b->next; 372 } 373 374 return b; 375 } 376 377 378 nxt_buf_t * 379 nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, 380 size_t sent, nxt_bool_t mmap_mode) 381 { 382 size_t size; 383 384 while (b != NULL) { 385 386 nxt_prefetch(b->next); 387 388 if (!nxt_buf_is_sync(b)) { 389 390 size = nxt_buf_used_size(b); 391 392 if (size != 0) { 393 394 if (sent == 0) { 395 break; 396 } 397 398 if (nxt_buf_is_port_mmap(b) && mmap_mode) { 399 /* 400 * buffer has been sent to other side which is now 401 * responsible for shared memory bucket release 402 */ 403 b->is_port_mmap_sent = 1; 404 } 405 406 if (sent < size) { 407 408 if (nxt_buf_is_mem(b)) { 409 b->mem.pos += sent; 410 } 411 412 if (nxt_buf_is_file(b)) { 413 b->file_pos += sent; 414 } 415 416 break; 417 } 418 419 /* b->mem.free is NULL in file-only buffer. */ 420 b->mem.pos = b->mem.free; 421 422 if (nxt_buf_is_file(b)) { 423 b->file_pos = b->file_end; 424 } 425 426 sent -= size; 427 } 428 } 429 430 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 431 432 b = b->next; 433 } 434 435 return b; 436 } 437