xref: /unit/src/nxt_sendbuf.c (revision 42:def41906e4a5)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
11     size_t *copied);
12 
13 
14 nxt_uint_t
15 nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb,
16     struct iovec *iov, nxt_uint_t niov_max)
17 {
18     u_char      *last;
19     size_t      size, total;
20     nxt_buf_t   *b;
21     nxt_uint_t  n;
22 
23     total = sb->size;
24     last = NULL;
25     n = (nxt_uint_t) -1;
26 
27     for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
28 
29         nxt_prefetch(b->next);
30 
31         if (nxt_buf_is_file(b)) {
32             break;
33         }
34 
35         if (nxt_buf_is_mem(b)) {
36 
37             size = b->mem.free - b->mem.pos;
38 
39             if (size != 0) {
40 
41                 if (total + size > sb->limit) {
42                     size = sb->limit - total;
43 
44                     if (size == 0) {
45                         break;
46                     }
47                 }
48 
49                 if (b->mem.pos != last) {
50 
51                     if (++n >= niov_max) {
52                         goto done;
53                     }
54 
55                     iov[n].iov_base =  b->mem.pos;
56                     iov[n].iov_len = size;
57 
58                 } else {
59                     iov[n].iov_len += size;
60                 }
61 
62                 nxt_debug(task, "sendbuf: %ui, %p, %uz",
63                           n, iov[n].iov_base, iov[n].iov_len);
64 
65                 total += size;
66                 last = b->mem.pos + size;
67             }
68 
69         } else {
70             sb->sync = 1;
71             sb->last |= nxt_buf_is_last(b);
72         }
73     }
74 
75     n++;
76 
77 done:
78 
79     sb->buf = b;
80 
81     return n;
82 }
83 
84 
85 nxt_uint_t
86 nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
87 {
88     u_char      *last;
89     size_t      size, total;
90     nxt_buf_t   *b;
91     nxt_uint_t  n;
92 
93     total = sb->size;
94     last = NULL;
95     n = (nxt_uint_t) -1;
96 
97     for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
98 
99         nxt_prefetch(b->next);
100 
101         if (nxt_buf_is_file(b)) {
102             break;
103         }
104 
105         if (nxt_buf_is_mem(b)) {
106 
107             size = b->mem.free - b->mem.pos;
108 
109             if (size != 0) {
110 
111                 if (total + size > sb->limit) {
112                     size = sb->limit - total;
113 
114                     if (size == 0) {
115                         break;
116                     }
117                 }
118 
119                 if (b->mem.pos != last) {
120 
121                     if (++n >= sb->nmax) {
122                         goto done;
123                     }
124 
125                     sb->iobuf[n].iov_base =  b->mem.pos;
126                     sb->iobuf[n].iov_len = size;
127 
128                 } else {
129                     sb->iobuf[n].iov_len += size;
130                 }
131 
132                 nxt_debug(task, "sendbuf: %ui, %p, %uz",
133                           n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len);
134 
135                 total += size;
136                 last = b->mem.pos + size;
137             }
138 
139         } else {
140             sb->sync = 1;
141             sb->last |= nxt_buf_is_last(b);
142         }
143     }
144 
145     n++;
146 
147 done:
148 
149     sb->buf = b;
150     sb->size = total;
151     sb->niov = n;
152 
153     return n;
154 }
155 
156 
157 size_t
158 nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb)
159 {
160     size_t     file_start, total;
161     nxt_fd_t   fd;
162     nxt_off_t  size, last;
163     nxt_buf_t  *b;
164 
165     b = sb->buf;
166     fd = b->file->fd;
167 
168     total = sb->size;
169 
170     for ( ;; ) {
171 
172         nxt_prefetch(b->next);
173 
174         size = b->file_end - b->file_pos;
175 
176         if (total + size >= sb->limit) {
177             total = sb->limit;
178             break;
179         }
180 
181         total += size;
182         last = b->file_pos + size;
183 
184         b = b->next;
185 
186         if (b == NULL || !nxt_buf_is_file(b)) {
187             break;
188         }
189 
190         if (b->file_pos != last || b->file->fd != fd) {
191             break;
192         }
193     }
194 
195     sb->buf = b;
196 
197     file_start = sb->size;
198     sb->size = total;
199 
200     return total - file_start;
201 }
202 
203 
204 ssize_t
205 nxt_sendbuf_copy_coalesce(nxt_event_conn_t *c, nxt_buf_mem_t *bm,
206     nxt_buf_t *b, size_t limit)
207 {
208     size_t      size, bsize, copied;
209     ssize_t     n;
210     nxt_bool_t  flush;
211 
212     size = nxt_buf_mem_used_size(&b->mem);
213     bsize = nxt_buf_mem_size(bm);
214 
215     if (bsize != 0) {
216 
217         if (size > bsize && bm->pos == bm->free) {
218             /*
219              * A data buffer size is larger than the internal
220              * buffer size and the internal buffer is empty.
221              */
222             goto no_buffer;
223         }
224 
225         if (bm->pos == NULL) {
226             bm->pos = nxt_malloc(bsize);
227             if (nxt_slow_path(bm->pos == NULL)) {
228                 return NXT_ERROR;
229             }
230 
231             bm->start = bm->pos;
232             bm->free = bm->pos;
233             bm->end += (uintptr_t) bm->pos;
234         }
235 
236         copied = 0;
237 
238         flush = nxt_sendbuf_copy(bm, b, &copied);
239 
240         nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush);
241 
242         if (flush == 0) {
243             return copied;
244         }
245 
246         size = nxt_buf_mem_used_size(bm);
247 
248         if (size == 0 && nxt_buf_is_sync(b)) {
249             goto done;
250         }
251 
252         n = c->io->send(c, bm->pos, nxt_min(size, limit));
253 
254         nxt_log_debug(c->socket.log, "sendbuf sent:%z", n);
255 
256         if (n > 0) {
257             bm->pos += n;
258 
259             if (bm->pos == bm->free) {
260                 bm->pos = bm->start;
261                 bm->free = bm->start;
262             }
263 
264             n = 0;
265         }
266 
267         return (copied != 0) ? (ssize_t) copied : n;
268     }
269 
270     /* No internal buffering. */
271 
272     if (size == 0 && nxt_buf_is_sync(b)) {
273         goto done;
274     }
275 
276 no_buffer:
277 
278     return c->io->send(c, b->mem.pos, nxt_min(size, limit));
279 
280 done:
281 
282     nxt_log_debug(c->socket.log, "sendbuf done");
283 
284     return 0;
285 }
286 
287 
288 static nxt_bool_t
289 nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied)
290 {
291     size_t      size, bsize;
292     nxt_bool_t  flush;
293 
294     flush = 0;
295 
296     do {
297         nxt_prefetch(b->next);
298 
299         if (nxt_buf_is_mem(b)) {
300             bsize = bm->end - bm->free;
301             size = b->mem.free - b->mem.pos;
302             size = nxt_min(size, bsize);
303 
304             nxt_memcpy(bm->free, b->mem.pos, size);
305 
306             *copied += size;
307             bm->free += size;
308 
309             if (bm->free == bm->end) {
310                 return 1;
311             }
312         }
313 
314         flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b);
315 
316         b = b->next;
317 
318     } while (b != NULL);
319 
320     return flush;
321 }
322 
323 
324 nxt_buf_t *
325 nxt_sendbuf_update(nxt_buf_t *b, size_t sent)
326 {
327     size_t  size;
328 
329     while (b != NULL) {
330 
331         nxt_prefetch(b->next);
332 
333         if (!nxt_buf_is_sync(b)) {
334 
335             size = nxt_buf_used_size(b);
336 
337             if (size != 0) {
338 
339                 if (sent == 0) {
340                     break;
341                 }
342 
343                 if (sent < size) {
344 
345                     if (nxt_buf_is_mem(b)) {
346                         b->mem.pos += sent;
347                     }
348 
349                     if (nxt_buf_is_file(b)) {
350                         b->file_pos += sent;
351                     }
352 
353                     break;
354                 }
355 
356                 /* b->mem.free is NULL in file-only buffer. */
357                 b->mem.pos = b->mem.free;
358 
359                 if (nxt_buf_is_file(b)) {
360                     b->file_pos = b->file_end;
361                 }
362 
363                 sent -= size;
364             }
365         }
366 
367         b = b->next;
368     }
369 
370     return b;
371 }
372 
373 
374 nxt_buf_t *
375 nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
376     size_t sent)
377 {
378     size_t  size;
379 
380     while (b != NULL) {
381 
382         nxt_prefetch(b->next);
383 
384         if (!nxt_buf_is_sync(b)) {
385 
386             size = nxt_buf_used_size(b);
387 
388             if (size != 0) {
389 
390                 if (sent == 0) {
391                     break;
392                 }
393 
394                 if (nxt_buf_is_port_mmap(b)) {
395                     /*
396                      * buffer has been sent to other side which is now
397                      * responsible for shared memory bucket release
398                      */
399                     b->is_port_mmap_sent = 1;
400                 }
401 
402                 if (sent < size) {
403 
404                     if (nxt_buf_is_mem(b)) {
405                         b->mem.pos += sent;
406                     }
407 
408                     if (nxt_buf_is_file(b)) {
409                         b->file_pos += sent;
410                     }
411 
412                     break;
413                 }
414 
415                 /* b->mem.free is NULL in file-only buffer. */
416                 b->mem.pos = b->mem.free;
417 
418                 if (nxt_buf_is_file(b)) {
419                     b->file_pos = b->file_end;
420                 }
421 
422                 sent -= size;
423             }
424         }
425 
426         nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
427 
428         b = b->next;
429     }
430 
431     return b;
432 }
433