nxt_sendbuf.c (42:def41906e4a5) nxt_sendbuf.c (62:5e1efcc7b740)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
11 size_t *copied);
12
13
14nxt_uint_t
15nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb,
16 struct iovec *iov, nxt_uint_t niov_max)
17{
18 u_char *last;
19 size_t size, total;
20 nxt_buf_t *b;
21 nxt_uint_t n;
22
23 total = sb->size;
24 last = NULL;
25 n = (nxt_uint_t) -1;
26
27 for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
28
29 nxt_prefetch(b->next);
30
31 if (nxt_buf_is_file(b)) {
32 break;
33 }
34
35 if (nxt_buf_is_mem(b)) {
36
37 size = b->mem.free - b->mem.pos;
38
39 if (size != 0) {
40
41 if (total + size > sb->limit) {
42 size = sb->limit - total;
43
44 if (size == 0) {
45 break;
46 }
47 }
48
49 if (b->mem.pos != last) {
50
51 if (++n >= niov_max) {
52 goto done;
53 }
54
55 iov[n].iov_base = b->mem.pos;
56 iov[n].iov_len = size;
57
58 } else {
59 iov[n].iov_len += size;
60 }
61
62 nxt_debug(task, "sendbuf: %ui, %p, %uz",
63 n, iov[n].iov_base, iov[n].iov_len);
64
65 total += size;
66 last = b->mem.pos + size;
67 }
68
69 } else {
70 sb->sync = 1;
71 sb->last |= nxt_buf_is_last(b);
72 }
73 }
74
75 n++;
76
77done:
78
79 sb->buf = b;
80
81 return n;
82}
83
84
85nxt_uint_t
86nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
87{
88 u_char *last;
89 size_t size, total;
90 nxt_buf_t *b;
91 nxt_uint_t n;
92
93 total = sb->size;
94 last = NULL;
95 n = (nxt_uint_t) -1;
96
97 for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
98
99 nxt_prefetch(b->next);
100
101 if (nxt_buf_is_file(b)) {
102 break;
103 }
104
105 if (nxt_buf_is_mem(b)) {
106
107 size = b->mem.free - b->mem.pos;
108
109 if (size != 0) {
110
111 if (total + size > sb->limit) {
112 size = sb->limit - total;
113
114 if (size == 0) {
115 break;
116 }
117 }
118
119 if (b->mem.pos != last) {
120
121 if (++n >= sb->nmax) {
122 goto done;
123 }
124
125 sb->iobuf[n].iov_base = b->mem.pos;
126 sb->iobuf[n].iov_len = size;
127
128 } else {
129 sb->iobuf[n].iov_len += size;
130 }
131
132 nxt_debug(task, "sendbuf: %ui, %p, %uz",
133 n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len);
134
135 total += size;
136 last = b->mem.pos + size;
137 }
138
139 } else {
140 sb->sync = 1;
141 sb->last |= nxt_buf_is_last(b);
142 }
143 }
144
145 n++;
146
147done:
148
149 sb->buf = b;
150 sb->size = total;
151 sb->niov = n;
152
153 return n;
154}
155
156
157size_t
158nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb)
159{
160 size_t file_start, total;
161 nxt_fd_t fd;
162 nxt_off_t size, last;
163 nxt_buf_t *b;
164
165 b = sb->buf;
166 fd = b->file->fd;
167
168 total = sb->size;
169
170 for ( ;; ) {
171
172 nxt_prefetch(b->next);
173
174 size = b->file_end - b->file_pos;
175
176 if (total + size >= sb->limit) {
177 total = sb->limit;
178 break;
179 }
180
181 total += size;
182 last = b->file_pos + size;
183
184 b = b->next;
185
186 if (b == NULL || !nxt_buf_is_file(b)) {
187 break;
188 }
189
190 if (b->file_pos != last || b->file->fd != fd) {
191 break;
192 }
193 }
194
195 sb->buf = b;
196
197 file_start = sb->size;
198 sb->size = total;
199
200 return total - file_start;
201}
202
203
204ssize_t
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
11 size_t *copied);
12
13
14nxt_uint_t
15nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb,
16 struct iovec *iov, nxt_uint_t niov_max)
17{
18 u_char *last;
19 size_t size, total;
20 nxt_buf_t *b;
21 nxt_uint_t n;
22
23 total = sb->size;
24 last = NULL;
25 n = (nxt_uint_t) -1;
26
27 for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
28
29 nxt_prefetch(b->next);
30
31 if (nxt_buf_is_file(b)) {
32 break;
33 }
34
35 if (nxt_buf_is_mem(b)) {
36
37 size = b->mem.free - b->mem.pos;
38
39 if (size != 0) {
40
41 if (total + size > sb->limit) {
42 size = sb->limit - total;
43
44 if (size == 0) {
45 break;
46 }
47 }
48
49 if (b->mem.pos != last) {
50
51 if (++n >= niov_max) {
52 goto done;
53 }
54
55 iov[n].iov_base = b->mem.pos;
56 iov[n].iov_len = size;
57
58 } else {
59 iov[n].iov_len += size;
60 }
61
62 nxt_debug(task, "sendbuf: %ui, %p, %uz",
63 n, iov[n].iov_base, iov[n].iov_len);
64
65 total += size;
66 last = b->mem.pos + size;
67 }
68
69 } else {
70 sb->sync = 1;
71 sb->last |= nxt_buf_is_last(b);
72 }
73 }
74
75 n++;
76
77done:
78
79 sb->buf = b;
80
81 return n;
82}
83
84
85nxt_uint_t
86nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
87{
88 u_char *last;
89 size_t size, total;
90 nxt_buf_t *b;
91 nxt_uint_t n;
92
93 total = sb->size;
94 last = NULL;
95 n = (nxt_uint_t) -1;
96
97 for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
98
99 nxt_prefetch(b->next);
100
101 if (nxt_buf_is_file(b)) {
102 break;
103 }
104
105 if (nxt_buf_is_mem(b)) {
106
107 size = b->mem.free - b->mem.pos;
108
109 if (size != 0) {
110
111 if (total + size > sb->limit) {
112 size = sb->limit - total;
113
114 if (size == 0) {
115 break;
116 }
117 }
118
119 if (b->mem.pos != last) {
120
121 if (++n >= sb->nmax) {
122 goto done;
123 }
124
125 sb->iobuf[n].iov_base = b->mem.pos;
126 sb->iobuf[n].iov_len = size;
127
128 } else {
129 sb->iobuf[n].iov_len += size;
130 }
131
132 nxt_debug(task, "sendbuf: %ui, %p, %uz",
133 n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len);
134
135 total += size;
136 last = b->mem.pos + size;
137 }
138
139 } else {
140 sb->sync = 1;
141 sb->last |= nxt_buf_is_last(b);
142 }
143 }
144
145 n++;
146
147done:
148
149 sb->buf = b;
150 sb->size = total;
151 sb->niov = n;
152
153 return n;
154}
155
156
157size_t
158nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb)
159{
160 size_t file_start, total;
161 nxt_fd_t fd;
162 nxt_off_t size, last;
163 nxt_buf_t *b;
164
165 b = sb->buf;
166 fd = b->file->fd;
167
168 total = sb->size;
169
170 for ( ;; ) {
171
172 nxt_prefetch(b->next);
173
174 size = b->file_end - b->file_pos;
175
176 if (total + size >= sb->limit) {
177 total = sb->limit;
178 break;
179 }
180
181 total += size;
182 last = b->file_pos + size;
183
184 b = b->next;
185
186 if (b == NULL || !nxt_buf_is_file(b)) {
187 break;
188 }
189
190 if (b->file_pos != last || b->file->fd != fd) {
191 break;
192 }
193 }
194
195 sb->buf = b;
196
197 file_start = sb->size;
198 sb->size = total;
199
200 return total - file_start;
201}
202
203
204ssize_t
205nxt_sendbuf_copy_coalesce(nxt_event_conn_t *c, nxt_buf_mem_t *bm,
206 nxt_buf_t *b, size_t limit)
205nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *b,
206 size_t limit)
207{
208 size_t size, bsize, copied;
209 ssize_t n;
210 nxt_bool_t flush;
211
212 size = nxt_buf_mem_used_size(&b->mem);
213 bsize = nxt_buf_mem_size(bm);
214
215 if (bsize != 0) {
216
217 if (size > bsize && bm->pos == bm->free) {
218 /*
219 * A data buffer size is larger than the internal
220 * buffer size and the internal buffer is empty.
221 */
222 goto no_buffer;
223 }
224
225 if (bm->pos == NULL) {
226 bm->pos = nxt_malloc(bsize);
227 if (nxt_slow_path(bm->pos == NULL)) {
228 return NXT_ERROR;
229 }
230
231 bm->start = bm->pos;
232 bm->free = bm->pos;
233 bm->end += (uintptr_t) bm->pos;
234 }
235
236 copied = 0;
237
238 flush = nxt_sendbuf_copy(bm, b, &copied);
239
240 nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush);
241
242 if (flush == 0) {
243 return copied;
244 }
245
246 size = nxt_buf_mem_used_size(bm);
247
248 if (size == 0 && nxt_buf_is_sync(b)) {
249 goto done;
250 }
251
252 n = c->io->send(c, bm->pos, nxt_min(size, limit));
253
254 nxt_log_debug(c->socket.log, "sendbuf sent:%z", n);
255
256 if (n > 0) {
257 bm->pos += n;
258
259 if (bm->pos == bm->free) {
260 bm->pos = bm->start;
261 bm->free = bm->start;
262 }
263
264 n = 0;
265 }
266
267 return (copied != 0) ? (ssize_t) copied : n;
268 }
269
270 /* No internal buffering. */
271
272 if (size == 0 && nxt_buf_is_sync(b)) {
273 goto done;
274 }
275
276no_buffer:
277
278 return c->io->send(c, b->mem.pos, nxt_min(size, limit));
279
280done:
281
282 nxt_log_debug(c->socket.log, "sendbuf done");
283
284 return 0;
285}
286
287
288static nxt_bool_t
289nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied)
290{
291 size_t size, bsize;
292 nxt_bool_t flush;
293
294 flush = 0;
295
296 do {
297 nxt_prefetch(b->next);
298
299 if (nxt_buf_is_mem(b)) {
300 bsize = bm->end - bm->free;
301 size = b->mem.free - b->mem.pos;
302 size = nxt_min(size, bsize);
303
304 nxt_memcpy(bm->free, b->mem.pos, size);
305
306 *copied += size;
307 bm->free += size;
308
309 if (bm->free == bm->end) {
310 return 1;
311 }
312 }
313
314 flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b);
315
316 b = b->next;
317
318 } while (b != NULL);
319
320 return flush;
321}
322
323
324nxt_buf_t *
325nxt_sendbuf_update(nxt_buf_t *b, size_t sent)
326{
327 size_t size;
328
329 while (b != NULL) {
330
331 nxt_prefetch(b->next);
332
333 if (!nxt_buf_is_sync(b)) {
334
335 size = nxt_buf_used_size(b);
336
337 if (size != 0) {
338
339 if (sent == 0) {
340 break;
341 }
342
343 if (sent < size) {
344
345 if (nxt_buf_is_mem(b)) {
346 b->mem.pos += sent;
347 }
348
349 if (nxt_buf_is_file(b)) {
350 b->file_pos += sent;
351 }
352
353 break;
354 }
355
356 /* b->mem.free is NULL in file-only buffer. */
357 b->mem.pos = b->mem.free;
358
359 if (nxt_buf_is_file(b)) {
360 b->file_pos = b->file_end;
361 }
362
363 sent -= size;
364 }
365 }
366
367 b = b->next;
368 }
369
370 return b;
371}
372
373
374nxt_buf_t *
375nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
376 size_t sent)
377{
378 size_t size;
379
380 while (b != NULL) {
381
382 nxt_prefetch(b->next);
383
384 if (!nxt_buf_is_sync(b)) {
385
386 size = nxt_buf_used_size(b);
387
388 if (size != 0) {
389
390 if (sent == 0) {
391 break;
392 }
393
394 if (nxt_buf_is_port_mmap(b)) {
395 /*
396 * buffer has been sent to other side which is now
397 * responsible for shared memory bucket release
398 */
399 b->is_port_mmap_sent = 1;
400 }
401
402 if (sent < size) {
403
404 if (nxt_buf_is_mem(b)) {
405 b->mem.pos += sent;
406 }
407
408 if (nxt_buf_is_file(b)) {
409 b->file_pos += sent;
410 }
411
412 break;
413 }
414
415 /* b->mem.free is NULL in file-only buffer. */
416 b->mem.pos = b->mem.free;
417
418 if (nxt_buf_is_file(b)) {
419 b->file_pos = b->file_end;
420 }
421
422 sent -= size;
423 }
424 }
425
426 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
427
428 b = b->next;
429 }
430
431 return b;
432}
207{
208 size_t size, bsize, copied;
209 ssize_t n;
210 nxt_bool_t flush;
211
212 size = nxt_buf_mem_used_size(&b->mem);
213 bsize = nxt_buf_mem_size(bm);
214
215 if (bsize != 0) {
216
217 if (size > bsize && bm->pos == bm->free) {
218 /*
219 * A data buffer size is larger than the internal
220 * buffer size and the internal buffer is empty.
221 */
222 goto no_buffer;
223 }
224
225 if (bm->pos == NULL) {
226 bm->pos = nxt_malloc(bsize);
227 if (nxt_slow_path(bm->pos == NULL)) {
228 return NXT_ERROR;
229 }
230
231 bm->start = bm->pos;
232 bm->free = bm->pos;
233 bm->end += (uintptr_t) bm->pos;
234 }
235
236 copied = 0;
237
238 flush = nxt_sendbuf_copy(bm, b, &copied);
239
240 nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush);
241
242 if (flush == 0) {
243 return copied;
244 }
245
246 size = nxt_buf_mem_used_size(bm);
247
248 if (size == 0 && nxt_buf_is_sync(b)) {
249 goto done;
250 }
251
252 n = c->io->send(c, bm->pos, nxt_min(size, limit));
253
254 nxt_log_debug(c->socket.log, "sendbuf sent:%z", n);
255
256 if (n > 0) {
257 bm->pos += n;
258
259 if (bm->pos == bm->free) {
260 bm->pos = bm->start;
261 bm->free = bm->start;
262 }
263
264 n = 0;
265 }
266
267 return (copied != 0) ? (ssize_t) copied : n;
268 }
269
270 /* No internal buffering. */
271
272 if (size == 0 && nxt_buf_is_sync(b)) {
273 goto done;
274 }
275
276no_buffer:
277
278 return c->io->send(c, b->mem.pos, nxt_min(size, limit));
279
280done:
281
282 nxt_log_debug(c->socket.log, "sendbuf done");
283
284 return 0;
285}
286
287
288static nxt_bool_t
289nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied)
290{
291 size_t size, bsize;
292 nxt_bool_t flush;
293
294 flush = 0;
295
296 do {
297 nxt_prefetch(b->next);
298
299 if (nxt_buf_is_mem(b)) {
300 bsize = bm->end - bm->free;
301 size = b->mem.free - b->mem.pos;
302 size = nxt_min(size, bsize);
303
304 nxt_memcpy(bm->free, b->mem.pos, size);
305
306 *copied += size;
307 bm->free += size;
308
309 if (bm->free == bm->end) {
310 return 1;
311 }
312 }
313
314 flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b);
315
316 b = b->next;
317
318 } while (b != NULL);
319
320 return flush;
321}
322
323
324nxt_buf_t *
325nxt_sendbuf_update(nxt_buf_t *b, size_t sent)
326{
327 size_t size;
328
329 while (b != NULL) {
330
331 nxt_prefetch(b->next);
332
333 if (!nxt_buf_is_sync(b)) {
334
335 size = nxt_buf_used_size(b);
336
337 if (size != 0) {
338
339 if (sent == 0) {
340 break;
341 }
342
343 if (sent < size) {
344
345 if (nxt_buf_is_mem(b)) {
346 b->mem.pos += sent;
347 }
348
349 if (nxt_buf_is_file(b)) {
350 b->file_pos += sent;
351 }
352
353 break;
354 }
355
356 /* b->mem.free is NULL in file-only buffer. */
357 b->mem.pos = b->mem.free;
358
359 if (nxt_buf_is_file(b)) {
360 b->file_pos = b->file_end;
361 }
362
363 sent -= size;
364 }
365 }
366
367 b = b->next;
368 }
369
370 return b;
371}
372
373
374nxt_buf_t *
375nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
376 size_t sent)
377{
378 size_t size;
379
380 while (b != NULL) {
381
382 nxt_prefetch(b->next);
383
384 if (!nxt_buf_is_sync(b)) {
385
386 size = nxt_buf_used_size(b);
387
388 if (size != 0) {
389
390 if (sent == 0) {
391 break;
392 }
393
394 if (nxt_buf_is_port_mmap(b)) {
395 /*
396 * buffer has been sent to other side which is now
397 * responsible for shared memory bucket release
398 */
399 b->is_port_mmap_sent = 1;
400 }
401
402 if (sent < size) {
403
404 if (nxt_buf_is_mem(b)) {
405 b->mem.pos += sent;
406 }
407
408 if (nxt_buf_is_file(b)) {
409 b->file_pos += sent;
410 }
411
412 break;
413 }
414
415 /* b->mem.free is NULL in file-only buffer. */
416 b->mem.pos = b->mem.free;
417
418 if (nxt_buf_is_file(b)) {
419 b->file_pos = b->file_end;
420 }
421
422 sent -= size;
423 }
424 }
425
426 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
427
428 b = b->next;
429 }
430
431 return b;
432}