1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, 11 size_t *copied); 12 13 14 nxt_uint_t 15 nxt_sendbuf_mem_coalesce(nxt_sendbuf_coalesce_t *sb) 16 { 17 u_char *last; 18 size_t size, total; 19 nxt_buf_t *b; 20 nxt_uint_t n; 21 22 total = sb->size; 23 last = NULL; 24 n = (nxt_uint_t) -1; 25 26 for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) { 27 28 nxt_prefetch(b->next); 29 30 if (nxt_buf_is_file(b)) { 31 break; 32 } 33 34 if (nxt_buf_is_mem(b)) { 35 36 size = b->mem.free - b->mem.pos; 37 38 if (size != 0) { 39 40 if (total + size > sb->limit) { 41 size = sb->limit - total; 42 43 if (size == 0) { 44 break; 45 } 46 } 47 48 if (b->mem.pos != last) { 49 50 if (++n >= sb->nmax) { 51 goto done; 52 } 53 54 nxt_iobuf_set(&sb->iobuf[n], b->mem.pos, size); 55 56 } else { 57 nxt_iobuf_add(&sb->iobuf[n], size); 58 } 59 60 nxt_thread_log_debug("sendbuf: %ui, %p, %uz", n, 61 nxt_iobuf_data(&sb->iobuf[n]), 62 nxt_iobuf_size(&sb->iobuf[n])); 63 64 total += size; 65 last = b->mem.pos + size; 66 } 67 68 } else { 69 sb->sync = 1; 70 sb->last |= nxt_buf_is_last(b); 71 } 72 } 73 74 n++; 75 76 done: 77 78 sb->buf = b; 79 sb->size = total; 80 81 return n; 82 } 83 84 85 size_t 86 nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb) 87 { 88 size_t file_start, total; 89 nxt_fd_t fd; 90 nxt_off_t size, last; 91 nxt_buf_t *b; 92 93 b = sb->buf; 94 fd = b->file->fd; 95 96 total = sb->size; 97 98 for ( ;; ) { 99 100 nxt_prefetch(b->next); 101 102 size = b->file_end - b->file_pos; 103 104 if (total + size >= sb->limit) { 105 total = sb->limit; 106 break; 107 } 108 109 total += size; 110 last = b->file_pos + size; 111 112 b = b->next; 113 114 if (b == NULL || !nxt_buf_is_file(b)) { 115 break; 116 } 117 118 if (b->file_pos != last || b->file->fd != fd) { 119 break; 120 } 121 } 122 123 sb->buf = b; 124 125 file_start = sb->size; 126 sb->size = total; 127 128 return total - file_start; 129 } 130 131 132 ssize_t 133 nxt_sendbuf_copy_coalesce(nxt_event_conn_t *c, nxt_buf_mem_t *bm, 134 nxt_buf_t *b, size_t limit) 135 { 136 size_t size, bsize, copied; 137 ssize_t n; 138 nxt_bool_t flush; 139 140 size = nxt_buf_mem_used_size(&b->mem); 141 bsize = nxt_buf_mem_size(bm); 142 143 if (bsize != 0) { 144 145 if (size > bsize && bm->pos == bm->free) { 146 /* 147 * A data buffer size is larger than the internal 148 * buffer size and the internal buffer is empty. 149 */ 150 goto no_buffer; 151 } 152 153 if (bm->pos == NULL) { 154 bm->pos = nxt_malloc(bsize); 155 if (nxt_slow_path(bm->pos == NULL)) { 156 return NXT_ERROR; 157 } 158 159 bm->start = bm->pos; 160 bm->free = bm->pos; 161 bm->end += (uintptr_t) bm->pos; 162 } 163 164 copied = 0; 165 166 flush = nxt_sendbuf_copy(bm, b, &copied); 167 168 nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush); 169 170 if (flush == 0) { 171 return copied; 172 } 173 174 size = nxt_buf_mem_used_size(bm); 175 176 if (size == 0 && nxt_buf_is_sync(b)) { 177 goto done; 178 } 179 180 n = c->io->send(c, bm->pos, nxt_min(size, limit)); 181 182 nxt_log_debug(c->socket.log, "sendbuf sent:%z", n); 183 184 if (n > 0) { 185 bm->pos += n; 186 187 if (bm->pos == bm->free) { 188 bm->pos = bm->start; 189 bm->free = bm->start; 190 } 191 192 n = 0; 193 } 194 195 return (copied != 0) ? (ssize_t) copied : n; 196 } 197 198 /* No internal buffering. */ 199 200 if (size == 0 && nxt_buf_is_sync(b)) { 201 goto done; 202 } 203 204 no_buffer: 205 206 return c->io->send(c, b->mem.pos, nxt_min(size, limit)); 207 208 done: 209 210 nxt_log_debug(c->socket.log, "sendbuf done"); 211 212 return 0; 213 } 214 215 216 static nxt_bool_t 217 nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied) 218 { 219 size_t size, bsize; 220 nxt_bool_t flush; 221 222 flush = 0; 223 224 do { 225 nxt_prefetch(b->next); 226 227 if (nxt_buf_is_mem(b)) { 228 bsize = bm->end - bm->free; 229 size = b->mem.free - b->mem.pos; 230 size = nxt_min(size, bsize); 231 232 nxt_memcpy(bm->free, b->mem.pos, size); 233 234 *copied += size; 235 bm->free += size; 236 237 if (bm->free == bm->end) { 238 return 1; 239 } 240 } 241 242 flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b); 243 244 b = b->next; 245 246 } while (b != NULL); 247 248 return flush; 249 } 250 251 252 nxt_buf_t * 253 nxt_sendbuf_update(nxt_buf_t *b, size_t sent) 254 { 255 size_t size; 256 257 while (b != NULL) { 258 259 nxt_prefetch(b->next); 260 261 if (!nxt_buf_is_sync(b)) { 262 263 size = nxt_buf_used_size(b); 264 265 if (size != 0) { 266 267 if (sent == 0) { 268 break; 269 } 270 271 if (sent < size) { 272 273 if (nxt_buf_is_mem(b)) { 274 b->mem.pos += sent; 275 } 276 277 if (nxt_buf_is_file(b)) { 278 b->file_pos += sent; 279 } 280 281 break; 282 } 283 284 /* b->mem.free is NULL in file-only buffer. */ 285 b->mem.pos = b->mem.free; 286 287 if (nxt_buf_is_file(b)) { 288 b->file_pos = b->file_end; 289 } 290 291 sent -= size; 292 } 293 } 294 295 b = b->next; 296 } 297 298 return b; 299 } 300 301 302 nxt_buf_t * 303 nxt_sendbuf_completion(nxt_thread_t *thr, nxt_work_queue_t *wq, nxt_buf_t *b, 304 size_t sent) 305 { 306 size_t size; 307 308 while (b != NULL) { 309 310 nxt_prefetch(b->next); 311 312 if (!nxt_buf_is_sync(b)) { 313 314 size = nxt_buf_used_size(b); 315 316 if (size != 0) { 317 318 if (sent == 0) { 319 break; 320 } 321 322 if (sent < size) { 323 324 if (nxt_buf_is_mem(b)) { 325 b->mem.pos += sent; 326 } 327 328 if (nxt_buf_is_file(b)) { 329 b->file_pos += sent; 330 } 331 332 break; 333 } 334 335 /* b->mem.free is NULL in file-only buffer. */ 336 b->mem.pos = b->mem.free; 337 338 if (nxt_buf_is_file(b)) { 339 b->file_pos = b->file_end; 340 } 341 342 sent -= size; 343 } 344 } 345 346 nxt_thread_work_queue_add(thr, wq, b->completion_handler, 347 b, b->parent, thr->log); 348 349 b = b->next; 350 } 351 352 return b; 353 } 354