xref: /unit/src/nxt_sendbuf.c (revision 352:47649fbbcb53)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
11     size_t *copied);
12 
13 
14 nxt_uint_t
15 nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb,
16     struct iovec *iov, nxt_uint_t niov_max)
17 {
18     u_char      *last;
19     size_t      size, total;
20     nxt_buf_t   *b;
21     nxt_uint_t  n;
22 
23     total = sb->size;
24     last = NULL;
25     n = (nxt_uint_t) -1;
26 
27     for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
28 
29         nxt_prefetch(b->next);
30 
31         if (nxt_buf_is_file(b)) {
32             break;
33         }
34 
35         if (nxt_buf_is_mem(b)) {
36 
37             size = b->mem.free - b->mem.pos;
38 
39             if (size != 0) {
40 
41                 if (total + size > sb->limit) {
42                     size = sb->limit - total;
43 
44                     if (size == 0) {
45                         break;
46                     }
47                 }
48 
49                 if (b->mem.pos != last) {
50 
51                     if (++n >= niov_max) {
52                         goto done;
53                     }
54 
55                     iov[n].iov_base =  b->mem.pos;
56                     iov[n].iov_len = size;
57 
58                 } else {
59                     iov[n].iov_len += size;
60                 }
61 
62                 nxt_debug(task, "sendbuf: %ui, %p, %uz",
63                           n, iov[n].iov_base, iov[n].iov_len);
64 
65                 total += size;
66                 last = b->mem.pos + size;
67             }
68 
69         } else {
70             sb->sync = 1;
71             sb->last |= nxt_buf_is_last(b);
72         }
73     }
74 
75     n++;
76 
77 done:
78 
79     sb->buf = b;
80 
81     return n;
82 }
83 
84 
85 nxt_uint_t
86 nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
87 {
88     u_char      *last;
89     size_t      size, total;
90     nxt_buf_t   *b;
91     nxt_uint_t  n;
92 
93     total = sb->size;
94     last = NULL;
95     n = (nxt_uint_t) -1;
96 
97     for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
98 
99         nxt_prefetch(b->next);
100 
101         if (nxt_buf_is_file(b)) {
102             break;
103         }
104 
105         if (nxt_buf_is_mem(b)) {
106 
107             size = b->mem.free - b->mem.pos;
108 
109             if (size != 0) {
110 
111                 if (total + size > sb->limit) {
112                     size = sb->limit - total;
113 
114                     sb->limit_reached = 1;
115 
116                     if (nxt_slow_path(size == 0)) {
117                         break;
118                     }
119                 }
120 
121                 if (b->mem.pos != last) {
122 
123                     if (++n >= sb->nmax) {
124                         sb->nmax_reached = 1;
125 
126                         goto done;
127                     }
128 
129                     sb->iobuf[n].iov_base =  b->mem.pos;
130                     sb->iobuf[n].iov_len = size;
131 
132                 } else {
133                     sb->iobuf[n].iov_len += size;
134                 }
135 
136                 nxt_debug(task, "sendbuf: %ui, %p, %uz",
137                           n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len);
138 
139                 total += size;
140                 last = b->mem.pos + size;
141             }
142 
143         } else {
144             sb->sync = 1;
145             sb->last |= nxt_buf_is_last(b);
146         }
147     }
148 
149     n++;
150 
151 done:
152 
153     sb->buf = b;
154     sb->size = total;
155     sb->niov = n;
156 
157     return n;
158 }
159 
160 
161 size_t
162 nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb)
163 {
164     size_t     file_start, total;
165     nxt_fd_t   fd;
166     nxt_off_t  size, last;
167     nxt_buf_t  *b;
168 
169     b = sb->buf;
170     fd = b->file->fd;
171 
172     total = sb->size;
173 
174     for ( ;; ) {
175 
176         nxt_prefetch(b->next);
177 
178         size = b->file_end - b->file_pos;
179 
180         if (total + size >= sb->limit) {
181             total = sb->limit;
182             break;
183         }
184 
185         total += size;
186         last = b->file_pos + size;
187 
188         b = b->next;
189 
190         if (b == NULL || !nxt_buf_is_file(b)) {
191             break;
192         }
193 
194         if (b->file_pos != last || b->file->fd != fd) {
195             break;
196         }
197     }
198 
199     sb->buf = b;
200 
201     file_start = sb->size;
202     sb->size = total;
203 
204     return total - file_start;
205 }
206 
207 
208 ssize_t
209 nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *b,
210     size_t limit)
211 {
212     size_t      size, bsize, copied;
213     ssize_t     n;
214     nxt_bool_t  flush;
215 
216     size = nxt_buf_mem_used_size(&b->mem);
217     bsize = nxt_buf_mem_size(bm);
218 
219     if (bsize != 0) {
220 
221         if (size > bsize && bm->pos == bm->free) {
222             /*
223              * A data buffer size is larger than the internal
224              * buffer size and the internal buffer is empty.
225              */
226             goto no_buffer;
227         }
228 
229         if (bm->pos == NULL) {
230             bm->pos = nxt_malloc(bsize);
231             if (nxt_slow_path(bm->pos == NULL)) {
232                 return NXT_ERROR;
233             }
234 
235             bm->start = bm->pos;
236             bm->free = bm->pos;
237             bm->end += (uintptr_t) bm->pos;
238         }
239 
240         copied = 0;
241 
242         flush = nxt_sendbuf_copy(bm, b, &copied);
243 
244         nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush);
245 
246         if (flush == 0) {
247             return copied;
248         }
249 
250         size = nxt_buf_mem_used_size(bm);
251 
252         if (size == 0 && nxt_buf_is_sync(b)) {
253             goto done;
254         }
255 
256         n = c->io->send(c, bm->pos, nxt_min(size, limit));
257 
258         nxt_log_debug(c->socket.log, "sendbuf sent:%z", n);
259 
260         if (n > 0) {
261             bm->pos += n;
262 
263             if (bm->pos == bm->free) {
264                 bm->pos = bm->start;
265                 bm->free = bm->start;
266             }
267 
268             n = 0;
269         }
270 
271         return (copied != 0) ? (ssize_t) copied : n;
272     }
273 
274     /* No internal buffering. */
275 
276     if (size == 0 && nxt_buf_is_sync(b)) {
277         goto done;
278     }
279 
280 no_buffer:
281 
282     return c->io->send(c, b->mem.pos, nxt_min(size, limit));
283 
284 done:
285 
286     nxt_log_debug(c->socket.log, "sendbuf done");
287 
288     return 0;
289 }
290 
291 
292 static nxt_bool_t
293 nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied)
294 {
295     size_t      size, bsize;
296     nxt_bool_t  flush;
297 
298     flush = 0;
299 
300     do {
301         nxt_prefetch(b->next);
302 
303         if (nxt_buf_is_mem(b)) {
304             bsize = bm->end - bm->free;
305             size = b->mem.free - b->mem.pos;
306             size = nxt_min(size, bsize);
307 
308             nxt_memcpy(bm->free, b->mem.pos, size);
309 
310             *copied += size;
311             bm->free += size;
312 
313             if (bm->free == bm->end) {
314                 return 1;
315             }
316         }
317 
318         flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b);
319 
320         b = b->next;
321 
322     } while (b != NULL);
323 
324     return flush;
325 }
326 
327 
328 nxt_buf_t *
329 nxt_sendbuf_update(nxt_buf_t *b, size_t sent)
330 {
331     size_t  size;
332 
333     while (b != NULL) {
334 
335         nxt_prefetch(b->next);
336 
337         if (!nxt_buf_is_sync(b)) {
338 
339             size = nxt_buf_used_size(b);
340 
341             if (size != 0) {
342 
343                 if (sent == 0) {
344                     break;
345                 }
346 
347                 if (sent < size) {
348 
349                     if (nxt_buf_is_mem(b)) {
350                         b->mem.pos += sent;
351                     }
352 
353                     if (nxt_buf_is_file(b)) {
354                         b->file_pos += sent;
355                     }
356 
357                     break;
358                 }
359 
360                 /* b->mem.free is NULL in file-only buffer. */
361                 b->mem.pos = b->mem.free;
362 
363                 if (nxt_buf_is_file(b)) {
364                     b->file_pos = b->file_end;
365                 }
366 
367                 sent -= size;
368             }
369         }
370 
371         b = b->next;
372     }
373 
374     return b;
375 }
376 
377 
378 nxt_buf_t *
379 nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
380     size_t sent, nxt_bool_t mmap_mode)
381 {
382     size_t  size;
383 
384     while (b != NULL) {
385 
386         nxt_prefetch(b->next);
387 
388         if (!nxt_buf_is_sync(b)) {
389 
390             size = nxt_buf_used_size(b);
391 
392             if (size != 0) {
393 
394                 if (sent == 0) {
395                     break;
396                 }
397 
398                 if (nxt_buf_is_port_mmap(b) && mmap_mode) {
399                     /*
400                      * buffer has been sent to other side which is now
401                      * responsible for shared memory bucket release
402                      */
403                     b->is_port_mmap_sent = 1;
404                 }
405 
406                 if (sent < size) {
407 
408                     if (nxt_buf_is_mem(b)) {
409                         b->mem.pos += sent;
410                     }
411 
412                     if (nxt_buf_is_file(b)) {
413                         b->file_pos += sent;
414                     }
415 
416                     break;
417                 }
418 
419                 /* b->mem.free is NULL in file-only buffer. */
420                 b->mem.pos = b->mem.free;
421 
422                 if (nxt_buf_is_file(b)) {
423                     b->file_pos = b->file_end;
424                 }
425 
426                 sent -= size;
427             }
428         }
429 
430         nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
431 
432         b = b->next;
433     }
434 
435     return b;
436 }
437