xref: /unit/src/nxt_sendbuf.c (revision 1:fdc027c56872)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
11     size_t *copied);
12 
13 
14 nxt_uint_t
15 nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
16 {
17     u_char      *last;
18     size_t      size, total;
19     nxt_buf_t   *b;
20     nxt_uint_t  n;
21 
22     total = sb->size;
23     last = NULL;
24     n = (nxt_uint_t) -1;
25 
26     for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
27 
28         nxt_prefetch(b->next);
29 
30         if (nxt_buf_is_file(b)) {
31             break;
32         }
33 
34         if (nxt_buf_is_mem(b)) {
35 
36             size = b->mem.free - b->mem.pos;
37 
38             if (size != 0) {
39 
40                 if (total + size > sb->limit) {
41                     size = sb->limit - total;
42 
43                     if (size == 0) {
44                         break;
45                     }
46                 }
47 
48                 if (b->mem.pos != last) {
49 
50                     if (++n >= sb->nmax) {
51                         goto done;
52                     }
53 
54                     nxt_iobuf_set(&sb->iobuf[n], b->mem.pos, size);
55 
56                 } else {
57                     nxt_iobuf_add(&sb->iobuf[n], size);
58                 }
59 
60                 nxt_debug(task, "sendbuf: %ui, %p, %uz", n,
61                           nxt_iobuf_data(&sb->iobuf[n]),
62                           nxt_iobuf_size(&sb->iobuf[n]));
63 
64                 total += size;
65                 last = b->mem.pos + size;
66             }
67 
68         } else {
69             sb->sync = 1;
70             sb->last |= nxt_buf_is_last(b);
71         }
72     }
73 
74     n++;
75 
76 done:
77 
78     sb->buf = b;
79     sb->size = total;
80 
81     return n;
82 }
83 
84 
85 size_t
86 nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb)
87 {
88     size_t     file_start, total;
89     nxt_fd_t   fd;
90     nxt_off_t  size, last;
91     nxt_buf_t  *b;
92 
93     b = sb->buf;
94     fd = b->file->fd;
95 
96     total = sb->size;
97 
98     for ( ;; ) {
99 
100         nxt_prefetch(b->next);
101 
102         size = b->file_end - b->file_pos;
103 
104         if (total + size >= sb->limit) {
105             total = sb->limit;
106             break;
107         }
108 
109         total += size;
110         last = b->file_pos + size;
111 
112         b = b->next;
113 
114         if (b == NULL || !nxt_buf_is_file(b)) {
115             break;
116         }
117 
118         if (b->file_pos != last || b->file->fd != fd) {
119             break;
120         }
121     }
122 
123     sb->buf = b;
124 
125     file_start = sb->size;
126     sb->size = total;
127 
128     return total - file_start;
129 }
130 
131 
132 ssize_t
133 nxt_sendbuf_copy_coalesce(nxt_event_conn_t *c, nxt_buf_mem_t *bm,
134     nxt_buf_t *b, size_t limit)
135 {
136     size_t      size, bsize, copied;
137     ssize_t     n;
138     nxt_bool_t  flush;
139 
140     size = nxt_buf_mem_used_size(&b->mem);
141     bsize = nxt_buf_mem_size(bm);
142 
143     if (bsize != 0) {
144 
145         if (size > bsize && bm->pos == bm->free) {
146             /*
147              * A data buffer size is larger than the internal
148              * buffer size and the internal buffer is empty.
149              */
150             goto no_buffer;
151         }
152 
153         if (bm->pos == NULL) {
154             bm->pos = nxt_malloc(bsize);
155             if (nxt_slow_path(bm->pos == NULL)) {
156                 return NXT_ERROR;
157             }
158 
159             bm->start = bm->pos;
160             bm->free = bm->pos;
161             bm->end += (uintptr_t) bm->pos;
162         }
163 
164         copied = 0;
165 
166         flush = nxt_sendbuf_copy(bm, b, &copied);
167 
168         nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush);
169 
170         if (flush == 0) {
171             return copied;
172         }
173 
174         size = nxt_buf_mem_used_size(bm);
175 
176         if (size == 0 && nxt_buf_is_sync(b)) {
177             goto done;
178         }
179 
180         n = c->io->send(c, bm->pos, nxt_min(size, limit));
181 
182         nxt_log_debug(c->socket.log, "sendbuf sent:%z", n);
183 
184         if (n > 0) {
185             bm->pos += n;
186 
187             if (bm->pos == bm->free) {
188                 bm->pos = bm->start;
189                 bm->free = bm->start;
190             }
191 
192             n = 0;
193         }
194 
195         return (copied != 0) ? (ssize_t) copied : n;
196     }
197 
198     /* No internal buffering. */
199 
200     if (size == 0 && nxt_buf_is_sync(b)) {
201         goto done;
202     }
203 
204 no_buffer:
205 
206     return c->io->send(c, b->mem.pos, nxt_min(size, limit));
207 
208 done:
209 
210     nxt_log_debug(c->socket.log, "sendbuf done");
211 
212     return 0;
213 }
214 
215 
216 static nxt_bool_t
217 nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied)
218 {
219     size_t      size, bsize;
220     nxt_bool_t  flush;
221 
222     flush = 0;
223 
224     do {
225         nxt_prefetch(b->next);
226 
227         if (nxt_buf_is_mem(b)) {
228             bsize = bm->end - bm->free;
229             size = b->mem.free - b->mem.pos;
230             size = nxt_min(size, bsize);
231 
232             nxt_memcpy(bm->free, b->mem.pos, size);
233 
234             *copied += size;
235             bm->free += size;
236 
237             if (bm->free == bm->end) {
238                 return 1;
239             }
240         }
241 
242         flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b);
243 
244         b = b->next;
245 
246     } while (b != NULL);
247 
248     return flush;
249 }
250 
251 
252 nxt_buf_t *
253 nxt_sendbuf_update(nxt_buf_t *b, size_t sent)
254 {
255     size_t  size;
256 
257     while (b != NULL) {
258 
259         nxt_prefetch(b->next);
260 
261         if (!nxt_buf_is_sync(b)) {
262 
263             size = nxt_buf_used_size(b);
264 
265             if (size != 0) {
266 
267                 if (sent == 0) {
268                     break;
269                 }
270 
271                 if (sent < size) {
272 
273                     if (nxt_buf_is_mem(b)) {
274                         b->mem.pos += sent;
275                     }
276 
277                     if (nxt_buf_is_file(b)) {
278                         b->file_pos += sent;
279                     }
280 
281                     break;
282                 }
283 
284                 /* b->mem.free is NULL in file-only buffer. */
285                 b->mem.pos = b->mem.free;
286 
287                 if (nxt_buf_is_file(b)) {
288                     b->file_pos = b->file_end;
289                 }
290 
291                 sent -= size;
292             }
293         }
294 
295         b = b->next;
296     }
297 
298     return b;
299 }
300 
301 
302 nxt_buf_t *
303 nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
304     size_t sent)
305 {
306     size_t  size;
307 
308     while (b != NULL) {
309 
310         nxt_prefetch(b->next);
311 
312         if (!nxt_buf_is_sync(b)) {
313 
314             size = nxt_buf_used_size(b);
315 
316             if (size != 0) {
317 
318                 if (sent == 0) {
319                     break;
320                 }
321 
322                 if (sent < size) {
323 
324                     if (nxt_buf_is_mem(b)) {
325                         b->mem.pos += sent;
326                     }
327 
328                     if (nxt_buf_is_file(b)) {
329                         b->file_pos += sent;
330                     }
331 
332                     break;
333                 }
334 
335                 /* b->mem.free is NULL in file-only buffer. */
336                 b->mem.pos = b->mem.free;
337 
338                 if (nxt_buf_is_file(b)) {
339                     b->file_pos = b->file_end;
340                 }
341 
342                 sent -= size;
343             }
344         }
345 
346         nxt_thread_work_queue_add(task->thread, wq, b->completion_handler, task,
347                                   b, b->parent);
348 
349         b = b->next;
350     }
351 
352     return b;
353 }
354