xref: /unit/src/nxt_sendbuf.c (revision 13:3a52b2c3d3f1)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
11     size_t *copied);
12 
13 
14 nxt_uint_t
15 nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb,
16     struct iovec *iov, nxt_uint_t niov_max)
17 {
18     u_char      *last;
19     size_t      size, total;
20     nxt_buf_t   *b;
21     nxt_uint_t  n;
22 
23     total = sb->size;
24     last = NULL;
25     n = (nxt_uint_t) -1;
26 
27     for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
28 
29         nxt_prefetch(b->next);
30 
31         if (nxt_buf_is_file(b)) {
32             break;
33         }
34 
35         if (nxt_buf_is_mem(b)) {
36 
37             size = b->mem.free - b->mem.pos;
38 
39             if (size != 0) {
40 
41                 if (total + size > sb->limit) {
42                     size = sb->limit - total;
43 
44                     if (size == 0) {
45                         break;
46                     }
47                 }
48 
49                 if (b->mem.pos != last) {
50 
51                     if (++n >= niov_max) {
52                         goto done;
53                     }
54 
55                     iov[n].iov_base =  b->mem.pos;
56                     iov[n].iov_len = size;
57 
58                 } else {
59                     iov[n].iov_len += size;
60                 }
61 
62                 nxt_debug(task, "sendbuf: %ui, %p, %uz",
63                           n, iov[n].iov_base, iov[n].iov_len);
64 
65                 total += size;
66                 last = b->mem.pos + size;
67             }
68 
69         } else {
70             sb->sync = 1;
71             sb->last |= nxt_buf_is_last(b);
72         }
73     }
74 
75     n++;
76 
77 done:
78 
79     sb->buf = b;
80 
81     return n;
82 }
83 
84 
85 nxt_uint_t
86 nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
87 {
88     u_char      *last;
89     size_t      size, total;
90     nxt_buf_t   *b;
91     nxt_uint_t  n;
92 
93     total = sb->size;
94     last = NULL;
95     n = (nxt_uint_t) -1;
96 
97     for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
98 
99         nxt_prefetch(b->next);
100 
101         if (nxt_buf_is_file(b)) {
102             break;
103         }
104 
105         if (nxt_buf_is_mem(b)) {
106 
107             size = b->mem.free - b->mem.pos;
108 
109             if (size != 0) {
110 
111                 if (total + size > sb->limit) {
112                     size = sb->limit - total;
113 
114                     if (size == 0) {
115                         break;
116                     }
117                 }
118 
119                 if (b->mem.pos != last) {
120 
121                     if (++n >= sb->nmax) {
122                         goto done;
123                     }
124 
125                     nxt_iobuf_set(&sb->iobuf[n], b->mem.pos, size);
126 
127                 } else {
128                     nxt_iobuf_add(&sb->iobuf[n], size);
129                 }
130 
131                 nxt_debug(task, "sendbuf: %ui, %p, %uz", n,
132                           nxt_iobuf_data(&sb->iobuf[n]),
133                           nxt_iobuf_size(&sb->iobuf[n]));
134 
135                 total += size;
136                 last = b->mem.pos + size;
137             }
138 
139         } else {
140             sb->sync = 1;
141             sb->last |= nxt_buf_is_last(b);
142         }
143     }
144 
145     n++;
146 
147 done:
148 
149     sb->buf = b;
150     sb->size = total;
151 
152     return n;
153 }
154 
155 
156 size_t
157 nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb)
158 {
159     size_t     file_start, total;
160     nxt_fd_t   fd;
161     nxt_off_t  size, last;
162     nxt_buf_t  *b;
163 
164     b = sb->buf;
165     fd = b->file->fd;
166 
167     total = sb->size;
168 
169     for ( ;; ) {
170 
171         nxt_prefetch(b->next);
172 
173         size = b->file_end - b->file_pos;
174 
175         if (total + size >= sb->limit) {
176             total = sb->limit;
177             break;
178         }
179 
180         total += size;
181         last = b->file_pos + size;
182 
183         b = b->next;
184 
185         if (b == NULL || !nxt_buf_is_file(b)) {
186             break;
187         }
188 
189         if (b->file_pos != last || b->file->fd != fd) {
190             break;
191         }
192     }
193 
194     sb->buf = b;
195 
196     file_start = sb->size;
197     sb->size = total;
198 
199     return total - file_start;
200 }
201 
202 
203 ssize_t
204 nxt_sendbuf_copy_coalesce(nxt_event_conn_t *c, nxt_buf_mem_t *bm,
205     nxt_buf_t *b, size_t limit)
206 {
207     size_t      size, bsize, copied;
208     ssize_t     n;
209     nxt_bool_t  flush;
210 
211     size = nxt_buf_mem_used_size(&b->mem);
212     bsize = nxt_buf_mem_size(bm);
213 
214     if (bsize != 0) {
215 
216         if (size > bsize && bm->pos == bm->free) {
217             /*
218              * A data buffer size is larger than the internal
219              * buffer size and the internal buffer is empty.
220              */
221             goto no_buffer;
222         }
223 
224         if (bm->pos == NULL) {
225             bm->pos = nxt_malloc(bsize);
226             if (nxt_slow_path(bm->pos == NULL)) {
227                 return NXT_ERROR;
228             }
229 
230             bm->start = bm->pos;
231             bm->free = bm->pos;
232             bm->end += (uintptr_t) bm->pos;
233         }
234 
235         copied = 0;
236 
237         flush = nxt_sendbuf_copy(bm, b, &copied);
238 
239         nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush);
240 
241         if (flush == 0) {
242             return copied;
243         }
244 
245         size = nxt_buf_mem_used_size(bm);
246 
247         if (size == 0 && nxt_buf_is_sync(b)) {
248             goto done;
249         }
250 
251         n = c->io->send(c, bm->pos, nxt_min(size, limit));
252 
253         nxt_log_debug(c->socket.log, "sendbuf sent:%z", n);
254 
255         if (n > 0) {
256             bm->pos += n;
257 
258             if (bm->pos == bm->free) {
259                 bm->pos = bm->start;
260                 bm->free = bm->start;
261             }
262 
263             n = 0;
264         }
265 
266         return (copied != 0) ? (ssize_t) copied : n;
267     }
268 
269     /* No internal buffering. */
270 
271     if (size == 0 && nxt_buf_is_sync(b)) {
272         goto done;
273     }
274 
275 no_buffer:
276 
277     return c->io->send(c, b->mem.pos, nxt_min(size, limit));
278 
279 done:
280 
281     nxt_log_debug(c->socket.log, "sendbuf done");
282 
283     return 0;
284 }
285 
286 
287 static nxt_bool_t
288 nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied)
289 {
290     size_t      size, bsize;
291     nxt_bool_t  flush;
292 
293     flush = 0;
294 
295     do {
296         nxt_prefetch(b->next);
297 
298         if (nxt_buf_is_mem(b)) {
299             bsize = bm->end - bm->free;
300             size = b->mem.free - b->mem.pos;
301             size = nxt_min(size, bsize);
302 
303             nxt_memcpy(bm->free, b->mem.pos, size);
304 
305             *copied += size;
306             bm->free += size;
307 
308             if (bm->free == bm->end) {
309                 return 1;
310             }
311         }
312 
313         flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b);
314 
315         b = b->next;
316 
317     } while (b != NULL);
318 
319     return flush;
320 }
321 
322 
323 nxt_buf_t *
324 nxt_sendbuf_update(nxt_buf_t *b, size_t sent)
325 {
326     size_t  size;
327 
328     while (b != NULL) {
329 
330         nxt_prefetch(b->next);
331 
332         if (!nxt_buf_is_sync(b)) {
333 
334             size = nxt_buf_used_size(b);
335 
336             if (size != 0) {
337 
338                 if (sent == 0) {
339                     break;
340                 }
341 
342                 if (sent < size) {
343 
344                     if (nxt_buf_is_mem(b)) {
345                         b->mem.pos += sent;
346                     }
347 
348                     if (nxt_buf_is_file(b)) {
349                         b->file_pos += sent;
350                     }
351 
352                     break;
353                 }
354 
355                 /* b->mem.free is NULL in file-only buffer. */
356                 b->mem.pos = b->mem.free;
357 
358                 if (nxt_buf_is_file(b)) {
359                     b->file_pos = b->file_end;
360                 }
361 
362                 sent -= size;
363             }
364         }
365 
366         b = b->next;
367     }
368 
369     return b;
370 }
371 
372 
373 nxt_buf_t *
374 nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
375     size_t sent)
376 {
377     size_t  size;
378 
379     while (b != NULL) {
380 
381         nxt_prefetch(b->next);
382 
383         if (!nxt_buf_is_sync(b)) {
384 
385             size = nxt_buf_used_size(b);
386 
387             if (size != 0) {
388 
389                 if (sent == 0) {
390                     break;
391                 }
392 
393                 if (sent < size) {
394 
395                     if (nxt_buf_is_mem(b)) {
396                         b->mem.pos += sent;
397                     }
398 
399                     if (nxt_buf_is_file(b)) {
400                         b->file_pos += sent;
401                     }
402 
403                     break;
404                 }
405 
406                 /* b->mem.free is NULL in file-only buffer. */
407                 b->mem.pos = b->mem.free;
408 
409                 if (nxt_buf_is_file(b)) {
410                     b->file_pos = b->file_end;
411                 }
412 
413                 sent -= size;
414             }
415         }
416 
417         nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
418 
419         b = b->next;
420     }
421 
422     return b;
423 }
424