1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6 #include "nxt_main.h"
7 #include "nxt_port_memory_int.h"
8 #include "nxt_socket_msg.h"
9 #include "nxt_port_queue.h"
10 #include "nxt_app_queue.h"
11
12 #include "nxt_unit.h"
13 #include "nxt_unit_request.h"
14 #include "nxt_unit_response.h"
15 #include "nxt_unit_websocket.h"
16
17 #include "nxt_websocket.h"
18
19 #if (NXT_HAVE_MEMFD_CREATE)
20 #include <linux/memfd.h>
21 #endif
22
23 #define NXT_UNIT_MAX_PLAIN_SIZE 1024
24 #define NXT_UNIT_LOCAL_BUF_SIZE \
25 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
26
27 enum {
28 NXT_QUIT_NORMAL = 0,
29 NXT_QUIT_GRACEFUL = 1,
30 };
31
32 typedef struct nxt_unit_impl_s nxt_unit_impl_t;
33 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
34 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
35 typedef struct nxt_unit_process_s nxt_unit_process_t;
36 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
37 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
38 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t;
39 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
40 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
41 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
42 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
43
44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
46 nxt_unit_ctx_impl_t *ctx_impl, void *data);
47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
52 nxt_unit_mmap_buf_t *mmap_buf);
53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
54 nxt_unit_mmap_buf_t *mmap_buf);
55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
57 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
58 int *shared_port_fd, int *shared_queue_fd,
59 int *log_fd, uint32_t *stream, uint32_t *shm_limit,
60 uint32_t *request_limit);
61 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
62 int queue_fd);
63 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
64 nxt_unit_request_info_t **preq);
65 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
66 nxt_unit_recv_msg_t *recv_msg);
67 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
68 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
69 nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
70 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
71 nxt_unit_recv_msg_t *recv_msg);
72 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
73 nxt_unit_port_id_t *port_id);
74 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
75 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
76 nxt_unit_recv_msg_t *recv_msg);
77 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
78 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
79 nxt_unit_ctx_t *ctx);
80 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
81 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
82 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
83 nxt_unit_ctx_t *ctx);
84 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
85 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
86 nxt_unit_websocket_frame_impl_t *ws);
87 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
88 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
89 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
90 nxt_unit_mmap_buf_t *mmap_buf, int last);
91 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
92 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
94 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
95 nxt_unit_ctx_impl_t *ctx_impl);
96 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
97 nxt_unit_read_buf_t *rbuf);
98 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
99 nxt_unit_request_info_t *req, size_t size);
100 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
101 size_t size);
102 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
103 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
104 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
105 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
106 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
107 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
108 nxt_unit_port_t *port, int n);
109 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
110 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
111 int fd);
112 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
113 nxt_unit_port_t *port, uint32_t size,
114 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
115 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
116
117 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
118 nxt_unit_ctx_impl_t *ctx_impl);
119 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
120 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
121 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
122 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
123 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
124 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
125 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
126 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
127 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
128 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
129 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
130 nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
131 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
132
133 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
135 pid_t pid, int remove);
136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
137 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
138 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
139 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
140 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
141 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
142 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
145 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
146 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
147 nxt_unit_port_t *port);
148 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
149 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
150
151 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
152 nxt_unit_port_t *port, int queue_fd);
153
154 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
155 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
156 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
157 nxt_unit_port_t *port, void *queue);
158 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
159 nxt_queue_t *awaiting_req);
160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
161 nxt_unit_port_id_t *port_id);
162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
163 nxt_unit_port_id_t *port_id);
164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
166 nxt_unit_process_t *process);
167 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
168 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
169 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
170 nxt_unit_port_t *port, const void *buf, size_t buf_size,
171 const nxt_send_oob_t *oob);
172 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
173 const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
174 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
175 nxt_unit_read_buf_t *rbuf);
176 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
177 nxt_unit_read_buf_t *src);
178 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
179 nxt_unit_read_buf_t *rbuf);
180 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
181 nxt_unit_read_buf_t *rbuf);
182 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
183 nxt_unit_read_buf_t *rbuf);
184 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
185 nxt_unit_read_buf_t *rbuf);
186 nxt_inline int nxt_unit_close(int fd);
187 static int nxt_unit_fd_blocking(int fd);
188
189 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
190 nxt_unit_port_t *port);
191 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
192 nxt_unit_port_id_t *port_id, int remove);
193
194 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
195 nxt_unit_request_info_t *req);
196 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
197 nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
198
199 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
200 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
201 static void nxt_unit_lvlhsh_free(void *data, void *p);
202 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
203
204
205 struct nxt_unit_mmap_buf_s {
206 nxt_unit_buf_t buf;
207
208 nxt_unit_mmap_buf_t *next;
209 nxt_unit_mmap_buf_t **prev;
210
211 nxt_port_mmap_header_t *hdr;
212 nxt_unit_request_info_t *req;
213 nxt_unit_ctx_impl_t *ctx_impl;
214 char *free_ptr;
215 char *plain_ptr;
216 };
217
218
219 struct nxt_unit_recv_msg_s {
220 uint32_t stream;
221 nxt_pid_t pid;
222 nxt_port_id_t reply_port;
223
224 uint8_t last; /* 1 bit */
225 uint8_t mmap; /* 1 bit */
226
227 void *start;
228 uint32_t size;
229
230 int fd[2];
231
232 nxt_unit_mmap_buf_t *incoming_buf;
233 };
234
235
236 typedef enum {
237 NXT_UNIT_RS_START = 0,
238 NXT_UNIT_RS_RESPONSE_INIT,
239 NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
240 NXT_UNIT_RS_RESPONSE_SENT,
241 NXT_UNIT_RS_RELEASED,
242 } nxt_unit_req_state_t;
243
244
245 struct nxt_unit_request_info_impl_s {
246 nxt_unit_request_info_t req;
247
248 uint32_t stream;
249
250 nxt_unit_mmap_buf_t *outgoing_buf;
251 nxt_unit_mmap_buf_t *incoming_buf;
252
253 nxt_unit_req_state_t state;
254 uint8_t websocket;
255 uint8_t in_hash;
256
257 /* for nxt_unit_ctx_impl_t.free_req or active_req */
258 nxt_queue_link_t link;
259 /* for nxt_unit_port_impl_t.awaiting_req */
260 nxt_queue_link_t port_wait_link;
261
262 char extra_data[];
263 };
264
265
266 struct nxt_unit_websocket_frame_impl_s {
267 nxt_unit_websocket_frame_t ws;
268
269 nxt_unit_mmap_buf_t *buf;
270
271 nxt_queue_link_t link;
272
273 nxt_unit_ctx_impl_t *ctx_impl;
274 };
275
276
277 struct nxt_unit_read_buf_s {
278 nxt_queue_link_t link;
279 nxt_unit_ctx_impl_t *ctx_impl;
280 ssize_t size;
281 nxt_recv_oob_t oob;
282 char buf[16384];
283 };
284
285
286 struct nxt_unit_ctx_impl_s {
287 nxt_unit_ctx_t ctx;
288
289 nxt_atomic_t use_count;
290 nxt_atomic_t wait_items;
291
292 pthread_mutex_t mutex;
293
294 nxt_unit_port_t *read_port;
295
296 nxt_queue_link_t link;
297
298 nxt_unit_mmap_buf_t *free_buf;
299
300 /* of nxt_unit_request_info_impl_t */
301 nxt_queue_t free_req;
302
303 /* of nxt_unit_websocket_frame_impl_t */
304 nxt_queue_t free_ws;
305
306 /* of nxt_unit_request_info_impl_t */
307 nxt_queue_t active_req;
308
309 /* of nxt_unit_request_info_impl_t */
310 nxt_lvlhsh_t requests;
311
312 /* of nxt_unit_request_info_impl_t */
313 nxt_queue_t ready_req;
314
315 /* of nxt_unit_read_buf_t */
316 nxt_queue_t pending_rbuf;
317
318 /* of nxt_unit_read_buf_t */
319 nxt_queue_t free_rbuf;
320
321 uint8_t online; /* 1 bit */
322 uint8_t ready; /* 1 bit */
323 uint8_t quit_param;
324
325 nxt_unit_mmap_buf_t ctx_buf[2];
326 nxt_unit_read_buf_t ctx_read_buf;
327
328 nxt_unit_request_info_impl_t req;
329 };
330
331
332 struct nxt_unit_mmap_s {
333 nxt_port_mmap_header_t *hdr;
334 pthread_t src_thread;
335
336 /* of nxt_unit_read_buf_t */
337 nxt_queue_t awaiting_rbuf;
338 };
339
340
341 struct nxt_unit_mmaps_s {
342 pthread_mutex_t mutex;
343 uint32_t size;
344 uint32_t cap;
345 nxt_atomic_t allocated_chunks;
346 nxt_unit_mmap_t *elts;
347 };
348
349
350 struct nxt_unit_impl_s {
351 nxt_unit_t unit;
352 nxt_unit_callbacks_t callbacks;
353
354 nxt_atomic_t use_count;
355 nxt_atomic_t request_count;
356
357 uint32_t request_data_size;
358 uint32_t shm_mmap_limit;
359 uint32_t request_limit;
360
361 pthread_mutex_t mutex;
362
363 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */
364 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
365
366 nxt_unit_port_t *router_port;
367 nxt_unit_port_t *shared_port;
368
369 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
370
371 nxt_unit_mmaps_t incoming;
372 nxt_unit_mmaps_t outgoing;
373
374 pid_t pid;
375 int log_fd;
376
377 nxt_unit_ctx_impl_t main_ctx;
378 };
379
380
381 struct nxt_unit_port_impl_s {
382 nxt_unit_port_t port;
383
384 nxt_atomic_t use_count;
385
386 /* for nxt_unit_process_t.ports */
387 nxt_queue_link_t link;
388 nxt_unit_process_t *process;
389
390 /* of nxt_unit_request_info_impl_t */
391 nxt_queue_t awaiting_req;
392
393 int ready;
394
395 void *queue;
396
397 int from_socket;
398 nxt_unit_read_buf_t *socket_rbuf;
399 };
400
401
402 struct nxt_unit_process_s {
403 pid_t pid;
404
405 nxt_queue_t ports; /* of nxt_unit_port_impl_t */
406
407 nxt_unit_impl_t *lib;
408
409 nxt_atomic_t use_count;
410
411 uint32_t next_port_id;
412 };
413
414
415 /* Explicitly using 32 bit types to avoid possible alignment. */
416 typedef struct {
417 int32_t pid;
418 uint32_t id;
419 } nxt_unit_port_hash_id_t;
420
421
422 static pid_t nxt_unit_pid;
423
424
425 nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t * init)426 nxt_unit_init(nxt_unit_init_t *init)
427 {
428 int rc, queue_fd, shared_queue_fd;
429 void *mem;
430 uint32_t ready_stream, shm_limit, request_limit;
431 nxt_unit_ctx_t *ctx;
432 nxt_unit_impl_t *lib;
433 nxt_unit_port_t ready_port, router_port, read_port, shared_port;
434
435 nxt_unit_pid = getpid();
436
437 lib = nxt_unit_create(init);
438 if (nxt_slow_path(lib == NULL)) {
439 return NULL;
440 }
441
442 queue_fd = -1;
443 mem = MAP_FAILED;
444 shared_port.out_fd = -1;
445 shared_port.data = NULL;
446
447 if (init->ready_port.id.pid != 0
448 && init->ready_stream != 0
449 && init->read_port.id.pid != 0)
450 {
451 ready_port = init->ready_port;
452 ready_stream = init->ready_stream;
453 router_port = init->router_port;
454 read_port = init->read_port;
455 lib->log_fd = init->log_fd;
456
457 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
458 ready_port.id.id);
459 nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
460 router_port.id.id);
461 nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
462 read_port.id.id);
463
464 shared_port.in_fd = init->shared_port_fd;
465 shared_queue_fd = init->shared_queue_fd;
466
467 } else {
468 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
469 &shared_port.in_fd, &shared_queue_fd,
470 &lib->log_fd, &ready_stream, &shm_limit,
471 &request_limit);
472 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
473 goto fail;
474 }
475
476 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
477 / PORT_MMAP_DATA_SIZE;
478 lib->request_limit = request_limit;
479 }
480
481 if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
482 lib->shm_mmap_limit = 1;
483 }
484
485 lib->pid = read_port.id.pid;
486 nxt_unit_pid = lib->pid;
487
488 ctx = &lib->main_ctx.ctx;
489
490 rc = nxt_unit_fd_blocking(router_port.out_fd);
491 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
492 goto fail;
493 }
494
495 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
496 if (nxt_slow_path(lib->router_port == NULL)) {
497 nxt_unit_alert(NULL, "failed to add router_port");
498
499 goto fail;
500 }
501
502 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
503 if (nxt_slow_path(queue_fd == -1)) {
504 goto fail;
505 }
506
507 mem = mmap(NULL, sizeof(nxt_port_queue_t),
508 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
509 if (nxt_slow_path(mem == MAP_FAILED)) {
510 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
511 strerror(errno), errno);
512
513 goto fail;
514 }
515
516 nxt_port_queue_init(mem);
517
518 rc = nxt_unit_fd_blocking(read_port.in_fd);
519 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
520 goto fail;
521 }
522
523 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
524 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
525 nxt_unit_alert(NULL, "failed to add read_port");
526
527 goto fail;
528 }
529
530 rc = nxt_unit_fd_blocking(ready_port.out_fd);
531 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
532 goto fail;
533 }
534
535 nxt_unit_port_id_init(&shared_port.id, read_port.id.pid,
536 NXT_UNIT_SHARED_PORT_ID);
537
538 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
539 MAP_SHARED, shared_queue_fd, 0);
540 if (nxt_slow_path(mem == MAP_FAILED)) {
541 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd,
542 strerror(errno), errno);
543
544 goto fail;
545 }
546
547 nxt_unit_close(shared_queue_fd);
548
549 lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem);
550 if (nxt_slow_path(lib->shared_port == NULL)) {
551 nxt_unit_alert(NULL, "failed to add shared_port");
552
553 goto fail;
554 }
555
556 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
557 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
558 nxt_unit_alert(NULL, "failed to send READY message");
559
560 goto fail;
561 }
562
563 nxt_unit_close(ready_port.out_fd);
564 nxt_unit_close(queue_fd);
565
566 return ctx;
567
568 fail:
569
570 if (mem != MAP_FAILED) {
571 munmap(mem, sizeof(nxt_port_queue_t));
572 }
573
574 if (queue_fd != -1) {
575 nxt_unit_close(queue_fd);
576 }
577
578 nxt_unit_ctx_release(&lib->main_ctx.ctx);
579
580 return NULL;
581 }
582
583
584 static nxt_unit_impl_t *
nxt_unit_create(nxt_unit_init_t * init)585 nxt_unit_create(nxt_unit_init_t *init)
586 {
587 int rc;
588 nxt_unit_impl_t *lib;
589 nxt_unit_callbacks_t *cb;
590
591 lib = nxt_unit_malloc(NULL,
592 sizeof(nxt_unit_impl_t) + init->request_data_size);
593 if (nxt_slow_path(lib == NULL)) {
594 nxt_unit_alert(NULL, "failed to allocate unit struct");
595
596 return NULL;
597 }
598
599 rc = pthread_mutex_init(&lib->mutex, NULL);
600 if (nxt_slow_path(rc != 0)) {
601 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
602
603 goto fail;
604 }
605
606 lib->unit.data = init->data;
607 lib->callbacks = init->callbacks;
608
609 lib->request_data_size = init->request_data_size;
610 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
611 / PORT_MMAP_DATA_SIZE;
612 lib->request_limit = init->request_limit;
613
614 lib->processes.slot = NULL;
615 lib->ports.slot = NULL;
616
617 lib->log_fd = STDERR_FILENO;
618
619 nxt_queue_init(&lib->contexts);
620
621 lib->use_count = 0;
622 lib->request_count = 0;
623 lib->router_port = NULL;
624 lib->shared_port = NULL;
625
626 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
627 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
628 pthread_mutex_destroy(&lib->mutex);
629 goto fail;
630 }
631
632 cb = &lib->callbacks;
633
634 if (cb->request_handler == NULL) {
635 nxt_unit_alert(NULL, "request_handler is NULL");
636
637 pthread_mutex_destroy(&lib->mutex);
638 goto fail;
639 }
640
641 nxt_unit_mmaps_init(&lib->incoming);
642 nxt_unit_mmaps_init(&lib->outgoing);
643
644 return lib;
645
646 fail:
647
648 nxt_unit_free(NULL, lib);
649
650 return NULL;
651 }
652
653
654 static int
nxt_unit_ctx_init(nxt_unit_impl_t * lib,nxt_unit_ctx_impl_t * ctx_impl,void * data)655 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
656 void *data)
657 {
658 int rc;
659
660 ctx_impl->ctx.data = data;
661 ctx_impl->ctx.unit = &lib->unit;
662
663 rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
664 if (nxt_slow_path(rc != 0)) {
665 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
666
667 return NXT_UNIT_ERROR;
668 }
669
670 nxt_unit_lib_use(lib);
671
672 pthread_mutex_lock(&lib->mutex);
673
674 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
675
676 pthread_mutex_unlock(&lib->mutex);
677
678 ctx_impl->use_count = 1;
679 ctx_impl->wait_items = 0;
680 ctx_impl->online = 1;
681 ctx_impl->ready = 0;
682 ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
683
684 nxt_queue_init(&ctx_impl->free_req);
685 nxt_queue_init(&ctx_impl->free_ws);
686 nxt_queue_init(&ctx_impl->active_req);
687 nxt_queue_init(&ctx_impl->ready_req);
688 nxt_queue_init(&ctx_impl->pending_rbuf);
689 nxt_queue_init(&ctx_impl->free_rbuf);
690
691 ctx_impl->free_buf = NULL;
692 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
693 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
694
695 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
696 nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
697
698 ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
699
700 ctx_impl->req.req.ctx = &ctx_impl->ctx;
701 ctx_impl->req.req.unit = &lib->unit;
702
703 ctx_impl->read_port = NULL;
704 ctx_impl->requests.slot = 0;
705
706 return NXT_UNIT_OK;
707 }
708
709
710 nxt_inline void
nxt_unit_ctx_use(nxt_unit_ctx_t * ctx)711 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
712 {
713 nxt_unit_ctx_impl_t *ctx_impl;
714
715 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
716
717 nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
718 }
719
720
721 nxt_inline void
nxt_unit_ctx_release(nxt_unit_ctx_t * ctx)722 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
723 {
724 long c;
725 nxt_unit_ctx_impl_t *ctx_impl;
726
727 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
728
729 c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
730
731 if (c == 1) {
732 nxt_unit_ctx_free(ctx_impl);
733 }
734 }
735
736
737 nxt_inline void
nxt_unit_lib_use(nxt_unit_impl_t * lib)738 nxt_unit_lib_use(nxt_unit_impl_t *lib)
739 {
740 nxt_atomic_fetch_add(&lib->use_count, 1);
741 }
742
743
744 nxt_inline void
nxt_unit_lib_release(nxt_unit_impl_t * lib)745 nxt_unit_lib_release(nxt_unit_impl_t *lib)
746 {
747 long c;
748 nxt_unit_process_t *process;
749
750 c = nxt_atomic_fetch_add(&lib->use_count, -1);
751
752 if (c == 1) {
753 for ( ;; ) {
754 pthread_mutex_lock(&lib->mutex);
755
756 process = nxt_unit_process_pop_first(lib);
757 if (process == NULL) {
758 pthread_mutex_unlock(&lib->mutex);
759
760 break;
761 }
762
763 nxt_unit_remove_process(lib, process);
764 }
765
766 pthread_mutex_destroy(&lib->mutex);
767
768 if (nxt_fast_path(lib->router_port != NULL)) {
769 nxt_unit_port_release(lib->router_port);
770 }
771
772 if (nxt_fast_path(lib->shared_port != NULL)) {
773 nxt_unit_port_release(lib->shared_port);
774 }
775
776 nxt_unit_mmaps_destroy(&lib->incoming);
777 nxt_unit_mmaps_destroy(&lib->outgoing);
778
779 nxt_unit_free(NULL, lib);
780 }
781 }
782
783
784 nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t ** head,nxt_unit_mmap_buf_t * mmap_buf)785 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
786 nxt_unit_mmap_buf_t *mmap_buf)
787 {
788 mmap_buf->next = *head;
789
790 if (mmap_buf->next != NULL) {
791 mmap_buf->next->prev = &mmap_buf->next;
792 }
793
794 *head = mmap_buf;
795 mmap_buf->prev = head;
796 }
797
798
799 nxt_inline void
nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t ** prev,nxt_unit_mmap_buf_t * mmap_buf)800 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
801 nxt_unit_mmap_buf_t *mmap_buf)
802 {
803 while (*prev != NULL) {
804 prev = &(*prev)->next;
805 }
806
807 nxt_unit_mmap_buf_insert(prev, mmap_buf);
808 }
809
810
811 nxt_inline void
nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t * mmap_buf)812 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
813 {
814 nxt_unit_mmap_buf_t **prev;
815
816 prev = mmap_buf->prev;
817
818 if (mmap_buf->next != NULL) {
819 mmap_buf->next->prev = prev;
820 }
821
822 if (prev != NULL) {
823 *prev = mmap_buf->next;
824 }
825 }
826
827
828 static int
nxt_unit_read_env(nxt_unit_port_t * ready_port,nxt_unit_port_t * router_port,nxt_unit_port_t * read_port,int * shared_port_fd,int * shared_queue_fd,int * log_fd,uint32_t * stream,uint32_t * shm_limit,uint32_t * request_limit)829 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
830 nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd,
831 int *log_fd, uint32_t *stream,
832 uint32_t *shm_limit, uint32_t *request_limit)
833 {
834 int rc;
835 int ready_fd, router_fd, read_in_fd, read_out_fd;
836 char *unit_init, *version_end, *vars;
837 size_t version_length;
838 int64_t ready_pid, router_pid, read_pid;
839 uint32_t ready_stream, router_id, ready_id, read_id;
840
841 unit_init = getenv(NXT_UNIT_INIT_ENV);
842 if (nxt_slow_path(unit_init == NULL)) {
843 nxt_unit_alert(NULL, "%s is not in the current environment",
844 NXT_UNIT_INIT_ENV);
845
846 return NXT_UNIT_ERROR;
847 }
848
849 version_end = strchr(unit_init, ';');
850 if (nxt_slow_path(version_end == NULL)) {
851 nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
852 NXT_UNIT_INIT_ENV, unit_init);
853
854 return NXT_UNIT_ERROR;
855 }
856
857 version_length = version_end - unit_init;
858
859 rc = version_length != nxt_length(NXT_VERSION)
860 || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
861
862 if (nxt_slow_path(rc != 0)) {
863 nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
864 "%.*s, while the app was compiled with libunit %s",
865 (int) version_length, unit_init, NXT_VERSION);
866
867 return NXT_UNIT_ERROR;
868 }
869
870 vars = version_end + 1;
871
872 rc = sscanf(vars,
873 "%"PRIu32";"
874 "%"PRId64",%"PRIu32",%d;"
875 "%"PRId64",%"PRIu32",%d;"
876 "%"PRId64",%"PRIu32",%d,%d;"
877 "%d,%d;"
878 "%d,%"PRIu32",%"PRIu32,
879 &ready_stream,
880 &ready_pid, &ready_id, &ready_fd,
881 &router_pid, &router_id, &router_fd,
882 &read_pid, &read_id, &read_in_fd, &read_out_fd,
883 shared_port_fd, shared_queue_fd,
884 log_fd, shm_limit, request_limit);
885
886 if (nxt_slow_path(rc == EOF)) {
887 nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
888 vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
889
890 return NXT_UNIT_ERROR;
891 }
892
893 if (nxt_slow_path(rc != 16)) {
894 nxt_unit_alert(NULL, "invalid number of variables in %s env: "
895 "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars);
896
897 return NXT_UNIT_ERROR;
898 }
899
900 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
901
902 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
903
904 ready_port->in_fd = -1;
905 ready_port->out_fd = ready_fd;
906 ready_port->data = NULL;
907
908 nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
909
910 router_port->in_fd = -1;
911 router_port->out_fd = router_fd;
912 router_port->data = NULL;
913
914 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
915
916 read_port->in_fd = read_in_fd;
917 read_port->out_fd = read_out_fd;
918 read_port->data = NULL;
919
920 *stream = ready_stream;
921
922 return NXT_UNIT_OK;
923 }
924
925
926 static int
nxt_unit_ready(nxt_unit_ctx_t * ctx,int ready_fd,uint32_t stream,int queue_fd)927 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
928 {
929 ssize_t res;
930 nxt_send_oob_t oob;
931 nxt_port_msg_t msg;
932 nxt_unit_impl_t *lib;
933 int fds[2] = {queue_fd, -1};
934
935 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
936
937 msg.stream = stream;
938 msg.pid = lib->pid;
939 msg.reply_port = 0;
940 msg.type = _NXT_PORT_MSG_PROCESS_READY;
941 msg.last = 1;
942 msg.mmap = 0;
943 msg.nf = 0;
944 msg.mf = 0;
945 msg.tracking = 0;
946
947 nxt_socket_msg_oob_init(&oob, fds);
948
949 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
950 if (res != sizeof(msg)) {
951 return NXT_UNIT_ERROR;
952 }
953
954 return NXT_UNIT_OK;
955 }
956
957
958 static int
nxt_unit_process_msg(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf,nxt_unit_request_info_t ** preq)959 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
960 nxt_unit_request_info_t **preq)
961 {
962 int rc;
963 pid_t pid;
964 uint8_t quit_param;
965 nxt_port_msg_t *port_msg;
966 nxt_unit_impl_t *lib;
967 nxt_unit_recv_msg_t recv_msg;
968
969 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
970
971 recv_msg.incoming_buf = NULL;
972 recv_msg.fd[0] = -1;
973 recv_msg.fd[1] = -1;
974
975 rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
976 if (nxt_slow_path(rc != NXT_OK)) {
977 nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
978 rc = NXT_UNIT_ERROR;
979 goto done;
980 }
981
982 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
983 if (nxt_slow_path(rbuf->size == 0)) {
984 nxt_unit_debug(ctx, "read port closed");
985
986 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
987 rc = NXT_UNIT_OK;
988 goto done;
989 }
990
991 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
992
993 rc = NXT_UNIT_ERROR;
994 goto done;
995 }
996
997 port_msg = (nxt_port_msg_t *) rbuf->buf;
998
999 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
1000 port_msg->stream, (int) port_msg->type,
1001 recv_msg.fd[0], recv_msg.fd[1]);
1002
1003 recv_msg.stream = port_msg->stream;
1004 recv_msg.pid = port_msg->pid;
1005 recv_msg.reply_port = port_msg->reply_port;
1006 recv_msg.last = port_msg->last;
1007 recv_msg.mmap = port_msg->mmap;
1008
1009 recv_msg.start = port_msg + 1;
1010 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
1011
1012 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
1013 nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
1014 port_msg->stream, (int) port_msg->type);
1015 rc = NXT_UNIT_ERROR;
1016 goto done;
1017 }
1018
1019 /* Fragmentation is unsupported. */
1020 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
1021 nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
1022 port_msg->stream, (int) port_msg->type);
1023 rc = NXT_UNIT_ERROR;
1024 goto done;
1025 }
1026
1027 if (port_msg->mmap) {
1028 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
1029
1030 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1031 if (rc == NXT_UNIT_AGAIN) {
1032 recv_msg.fd[0] = -1;
1033 recv_msg.fd[1] = -1;
1034 }
1035
1036 goto done;
1037 }
1038 }
1039
1040 switch (port_msg->type) {
1041
1042 case _NXT_PORT_MSG_RPC_READY:
1043 rc = NXT_UNIT_OK;
1044 break;
1045
1046 case _NXT_PORT_MSG_QUIT:
1047 if (recv_msg.size == sizeof(quit_param)) {
1048 memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1049
1050 } else {
1051 quit_param = NXT_QUIT_NORMAL;
1052 }
1053
1054 nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1055 (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1056
1057 nxt_unit_quit(ctx, quit_param);
1058
1059 rc = NXT_UNIT_OK;
1060 break;
1061
1062 case _NXT_PORT_MSG_NEW_PORT:
1063 rc = nxt_unit_process_new_port(ctx, &recv_msg);
1064 break;
1065
1066 case _NXT_PORT_MSG_PORT_ACK:
1067 rc = nxt_unit_ctx_ready(ctx);
1068 break;
1069
1070 case _NXT_PORT_MSG_CHANGE_FILE:
1071 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1072 port_msg->stream, recv_msg.fd[0]);
1073
1074 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1075 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1076 port_msg->stream, recv_msg.fd[0], lib->log_fd,
1077 strerror(errno), errno);
1078
1079 rc = NXT_UNIT_ERROR;
1080 goto done;
1081 }
1082
1083 rc = NXT_UNIT_OK;
1084 break;
1085
1086 case _NXT_PORT_MSG_MMAP:
1087 if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1088 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1089 port_msg->stream, recv_msg.fd[0]);
1090
1091 rc = NXT_UNIT_ERROR;
1092 goto done;
1093 }
1094
1095 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1096 break;
1097
1098 case _NXT_PORT_MSG_REQ_HEADERS:
1099 rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1100 break;
1101
1102 case _NXT_PORT_MSG_REQ_BODY:
1103 rc = nxt_unit_process_req_body(ctx, &recv_msg);
1104 break;
1105
1106 case _NXT_PORT_MSG_WEBSOCKET:
1107 rc = nxt_unit_process_websocket(ctx, &recv_msg);
1108 break;
1109
1110 case _NXT_PORT_MSG_REMOVE_PID:
1111 if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1112 nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1113 "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1114 (int) sizeof(pid));
1115
1116 rc = NXT_UNIT_ERROR;
1117 goto done;
1118 }
1119
1120 memcpy(&pid, recv_msg.start, sizeof(pid));
1121
1122 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1123 port_msg->stream, (int) pid);
1124
1125 nxt_unit_remove_pid(lib, pid);
1126
1127 rc = NXT_UNIT_OK;
1128 break;
1129
1130 case _NXT_PORT_MSG_SHM_ACK:
1131 rc = nxt_unit_process_shm_ack(ctx);
1132 break;
1133
1134 default:
1135 nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1136 port_msg->stream, (int) port_msg->type);
1137
1138 rc = NXT_UNIT_ERROR;
1139 goto done;
1140 }
1141
1142 done:
1143
1144 if (recv_msg.fd[0] != -1) {
1145 nxt_unit_close(recv_msg.fd[0]);
1146 }
1147
1148 if (recv_msg.fd[1] != -1) {
1149 nxt_unit_close(recv_msg.fd[1]);
1150 }
1151
1152 while (recv_msg.incoming_buf != NULL) {
1153 nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1154 }
1155
1156 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1157 #if (NXT_DEBUG)
1158 memset(rbuf->buf, 0xAC, rbuf->size);
1159 #endif
1160 nxt_unit_read_buf_release(ctx, rbuf);
1161 }
1162
1163 return rc;
1164 }
1165
1166
1167 static int
nxt_unit_process_new_port(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1168 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1169 {
1170 void *mem;
1171 nxt_unit_port_t new_port, *port;
1172 nxt_port_msg_new_port_t *new_port_msg;
1173
1174 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1175 nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1176 "invalid message size (%d)",
1177 recv_msg->stream, (int) recv_msg->size);
1178
1179 return NXT_UNIT_ERROR;
1180 }
1181
1182 if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1183 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1184 recv_msg->stream, recv_msg->fd[0]);
1185
1186 return NXT_UNIT_ERROR;
1187 }
1188
1189 new_port_msg = recv_msg->start;
1190
1191 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1192 recv_msg->stream, (int) new_port_msg->pid,
1193 (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1194
1195 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) {
1196 return NXT_UNIT_ERROR;
1197 }
1198
1199 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id);
1200
1201 new_port.in_fd = -1;
1202 new_port.out_fd = recv_msg->fd[0];
1203
1204 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1205 MAP_SHARED, recv_msg->fd[1], 0);
1206
1207 if (nxt_slow_path(mem == MAP_FAILED)) {
1208 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1209 strerror(errno), errno);
1210
1211 return NXT_UNIT_ERROR;
1212 }
1213
1214 new_port.data = NULL;
1215
1216 recv_msg->fd[0] = -1;
1217
1218 port = nxt_unit_add_port(ctx, &new_port, mem);
1219 if (nxt_slow_path(port == NULL)) {
1220 return NXT_UNIT_ERROR;
1221 }
1222
1223 nxt_unit_port_release(port);
1224
1225 return NXT_UNIT_OK;
1226 }
1227
1228
1229 static int
nxt_unit_ctx_ready(nxt_unit_ctx_t * ctx)1230 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1231 {
1232 nxt_unit_impl_t *lib;
1233 nxt_unit_ctx_impl_t *ctx_impl;
1234
1235 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1236
1237 if (nxt_slow_path(ctx_impl->ready)) {
1238 return NXT_UNIT_OK;
1239 }
1240
1241 ctx_impl->ready = 1;
1242
1243 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1244
1245 /* Call ready_handler() only for main context. */
1246 if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1247 return lib->callbacks.ready_handler(ctx);
1248 }
1249
1250 if (&lib->main_ctx != ctx_impl) {
1251 /* Check if the main context is already stopped or quit. */
1252 if (nxt_slow_path(!lib->main_ctx.ready)) {
1253 ctx_impl->ready = 0;
1254
1255 nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1256
1257 return NXT_UNIT_OK;
1258 }
1259
1260 if (lib->callbacks.add_port != NULL) {
1261 lib->callbacks.add_port(ctx, lib->shared_port);
1262 }
1263 }
1264
1265 return NXT_UNIT_OK;
1266 }
1267
1268
1269 static int
nxt_unit_process_req_headers(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg,nxt_unit_request_info_t ** preq)1270 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1271 nxt_unit_request_info_t **preq)
1272 {
1273 int res;
1274 nxt_unit_impl_t *lib;
1275 nxt_unit_port_id_t port_id;
1276 nxt_unit_request_t *r;
1277 nxt_unit_mmap_buf_t *b;
1278 nxt_unit_request_info_t *req;
1279 nxt_unit_request_info_impl_t *req_impl;
1280
1281 if (nxt_slow_path(recv_msg->mmap == 0)) {
1282 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1283 recv_msg->stream);
1284
1285 return NXT_UNIT_ERROR;
1286 }
1287
1288 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1289 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1290 "%d expected", recv_msg->stream, (int) recv_msg->size,
1291 (int) sizeof(nxt_unit_request_t));
1292
1293 return NXT_UNIT_ERROR;
1294 }
1295
1296 req_impl = nxt_unit_request_info_get(ctx);
1297 if (nxt_slow_path(req_impl == NULL)) {
1298 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1299 recv_msg->stream);
1300
1301 return NXT_UNIT_ERROR;
1302 }
1303
1304 req = &req_impl->req;
1305
1306 req->request = recv_msg->start;
1307
1308 b = recv_msg->incoming_buf;
1309
1310 req->request_buf = &b->buf;
1311 req->response = NULL;
1312 req->response_buf = NULL;
1313
1314 r = req->request;
1315
1316 req->content_length = r->content_length;
1317
1318 req->content_buf = req->request_buf;
1319 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1320
1321 req_impl->stream = recv_msg->stream;
1322
1323 req_impl->outgoing_buf = NULL;
1324
1325 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1326 b->req = req;
1327 }
1328
1329 /* "Move" incoming buffer list to req_impl. */
1330 req_impl->incoming_buf = recv_msg->incoming_buf;
1331 req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1332 recv_msg->incoming_buf = NULL;
1333
1334 req->content_fd = recv_msg->fd[0];
1335 recv_msg->fd[0] = -1;
1336
1337 req->response_max_fields = 0;
1338 req_impl->state = NXT_UNIT_RS_START;
1339 req_impl->websocket = 0;
1340 req_impl->in_hash = 0;
1341
1342 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1343 (int) r->method_length,
1344 (char *) nxt_unit_sptr_get(&r->method),
1345 (int) r->target_length,
1346 (char *) nxt_unit_sptr_get(&r->target),
1347 (int) r->content_length);
1348
1349 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1350
1351 res = nxt_unit_request_check_response_port(req, &port_id);
1352 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1353 return NXT_UNIT_ERROR;
1354 }
1355
1356 if (nxt_fast_path(res == NXT_UNIT_OK)) {
1357 res = nxt_unit_send_req_headers_ack(req);
1358 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1359 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1360
1361 return NXT_UNIT_ERROR;
1362 }
1363
1364 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1365
1366 if (req->content_length
1367 > (uint64_t) (req->content_buf->end - req->content_buf->free))
1368 {
1369 res = nxt_unit_request_hash_add(ctx, req);
1370 if (nxt_slow_path(res != NXT_UNIT_OK)) {
1371 nxt_unit_req_warn(req, "failed to add request to hash");
1372
1373 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1374
1375 return NXT_UNIT_ERROR;
1376 }
1377
1378 /*
1379 * If application have separate data handler, we may start
1380 * request processing and process data when it is arrived.
1381 */
1382 if (lib->callbacks.data_handler == NULL) {
1383 return NXT_UNIT_OK;
1384 }
1385 }
1386
1387 if (preq == NULL) {
1388 lib->callbacks.request_handler(req);
1389
1390 } else {
1391 *preq = req;
1392 }
1393 }
1394
1395 return NXT_UNIT_OK;
1396 }
1397
1398
1399 static int
nxt_unit_process_req_body(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1400 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1401 {
1402 uint64_t l;
1403 nxt_unit_impl_t *lib;
1404 nxt_unit_mmap_buf_t *b;
1405 nxt_unit_request_info_t *req;
1406
1407 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1408 if (req == NULL) {
1409 return NXT_UNIT_OK;
1410 }
1411
1412 l = req->content_buf->end - req->content_buf->free;
1413
1414 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1415 b->req = req;
1416 l += b->buf.end - b->buf.free;
1417 }
1418
1419 if (recv_msg->incoming_buf != NULL) {
1420 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1421
1422 while (b->next != NULL) {
1423 b = b->next;
1424 }
1425
1426 /* "Move" incoming buffer list to req_impl. */
1427 b->next = recv_msg->incoming_buf;
1428 b->next->prev = &b->next;
1429
1430 recv_msg->incoming_buf = NULL;
1431 }
1432
1433 req->content_fd = recv_msg->fd[0];
1434 recv_msg->fd[0] = -1;
1435
1436 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1437
1438 if (lib->callbacks.data_handler != NULL) {
1439 lib->callbacks.data_handler(req);
1440
1441 return NXT_UNIT_OK;
1442 }
1443
1444 if (req->content_fd != -1 || l == req->content_length) {
1445 lib->callbacks.request_handler(req);
1446 }
1447
1448 return NXT_UNIT_OK;
1449 }
1450
1451
1452 static int
nxt_unit_request_check_response_port(nxt_unit_request_info_t * req,nxt_unit_port_id_t * port_id)1453 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1454 nxt_unit_port_id_t *port_id)
1455 {
1456 int res;
1457 nxt_unit_ctx_t *ctx;
1458 nxt_unit_impl_t *lib;
1459 nxt_unit_port_t *port;
1460 nxt_unit_process_t *process;
1461 nxt_unit_ctx_impl_t *ctx_impl;
1462 nxt_unit_port_impl_t *port_impl;
1463 nxt_unit_request_info_impl_t *req_impl;
1464
1465 ctx = req->ctx;
1466 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1467 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1468
1469 pthread_mutex_lock(&lib->mutex);
1470
1471 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1472 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1473
1474 if (nxt_fast_path(port != NULL)) {
1475 req->response_port = port;
1476
1477 if (nxt_fast_path(port_impl->ready)) {
1478 pthread_mutex_unlock(&lib->mutex);
1479
1480 nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1481 (int) port->id.pid, (int) port->id.id);
1482
1483 return NXT_UNIT_OK;
1484 }
1485
1486 nxt_unit_debug(ctx, "check_response_port: "
1487 "port{%d,%d} already requested",
1488 (int) port->id.pid, (int) port->id.id);
1489
1490 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1491
1492 nxt_queue_insert_tail(&port_impl->awaiting_req,
1493 &req_impl->port_wait_link);
1494
1495 pthread_mutex_unlock(&lib->mutex);
1496
1497 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1498
1499 return NXT_UNIT_AGAIN;
1500 }
1501
1502 port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1503 if (nxt_slow_path(port_impl == NULL)) {
1504 nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1505 (int) sizeof(nxt_unit_port_impl_t));
1506
1507 pthread_mutex_unlock(&lib->mutex);
1508
1509 return NXT_UNIT_ERROR;
1510 }
1511
1512 port = &port_impl->port;
1513
1514 port->id = *port_id;
1515 port->in_fd = -1;
1516 port->out_fd = -1;
1517 port->data = NULL;
1518
1519 res = nxt_unit_port_hash_add(&lib->ports, port);
1520 if (nxt_slow_path(res != NXT_UNIT_OK)) {
1521 nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1522 port->id.pid, port->id.id);
1523
1524 pthread_mutex_unlock(&lib->mutex);
1525
1526 nxt_unit_free(ctx, port);
1527
1528 return NXT_UNIT_ERROR;
1529 }
1530
1531 process = nxt_unit_process_find(lib, port_id->pid, 0);
1532 if (nxt_slow_path(process == NULL)) {
1533 nxt_unit_alert(ctx, "check_response_port: process %d not found",
1534 port->id.pid);
1535
1536 nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1537
1538 pthread_mutex_unlock(&lib->mutex);
1539
1540 nxt_unit_free(ctx, port);
1541
1542 return NXT_UNIT_ERROR;
1543 }
1544
1545 nxt_queue_insert_tail(&process->ports, &port_impl->link);
1546
1547 port_impl->process = process;
1548 port_impl->queue = NULL;
1549 port_impl->from_socket = 0;
1550 port_impl->socket_rbuf = NULL;
1551
1552 nxt_queue_init(&port_impl->awaiting_req);
1553
1554 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1555
1556 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1557
1558 port_impl->use_count = 2;
1559 port_impl->ready = 0;
1560
1561 req->response_port = port;
1562
1563 pthread_mutex_unlock(&lib->mutex);
1564
1565 res = nxt_unit_get_port(ctx, port_id);
1566 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1567 return NXT_UNIT_ERROR;
1568 }
1569
1570 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1571
1572 return NXT_UNIT_AGAIN;
1573 }
1574
1575
1576 static int
nxt_unit_send_req_headers_ack(nxt_unit_request_info_t * req)1577 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1578 {
1579 ssize_t res;
1580 nxt_port_msg_t msg;
1581 nxt_unit_impl_t *lib;
1582 nxt_unit_ctx_impl_t *ctx_impl;
1583 nxt_unit_request_info_impl_t *req_impl;
1584
1585 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1586 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1587 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1588
1589 memset(&msg, 0, sizeof(nxt_port_msg_t));
1590
1591 msg.stream = req_impl->stream;
1592 msg.pid = lib->pid;
1593 msg.reply_port = ctx_impl->read_port->id.id;
1594 msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1595
1596 res = nxt_unit_port_send(req->ctx, req->response_port,
1597 &msg, sizeof(msg), NULL);
1598 if (nxt_slow_path(res != sizeof(msg))) {
1599 return NXT_UNIT_ERROR;
1600 }
1601
1602 return NXT_UNIT_OK;
1603 }
1604
1605
1606 static int
nxt_unit_process_websocket(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1607 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1608 {
1609 size_t hsize;
1610 nxt_unit_impl_t *lib;
1611 nxt_unit_mmap_buf_t *b;
1612 nxt_unit_callbacks_t *cb;
1613 nxt_unit_request_info_t *req;
1614 nxt_unit_request_info_impl_t *req_impl;
1615 nxt_unit_websocket_frame_impl_t *ws_impl;
1616
1617 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1618 if (nxt_slow_path(req == NULL)) {
1619 return NXT_UNIT_OK;
1620 }
1621
1622 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1623
1624 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1625 cb = &lib->callbacks;
1626
1627 if (cb->websocket_handler && recv_msg->size >= 2) {
1628 ws_impl = nxt_unit_websocket_frame_get(ctx);
1629 if (nxt_slow_path(ws_impl == NULL)) {
1630 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1631 req_impl->stream);
1632
1633 return NXT_UNIT_ERROR;
1634 }
1635
1636 ws_impl->ws.req = req;
1637
1638 ws_impl->buf = NULL;
1639
1640 if (recv_msg->mmap) {
1641 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1642 b->req = req;
1643 }
1644
1645 /* "Move" incoming buffer list to ws_impl. */
1646 ws_impl->buf = recv_msg->incoming_buf;
1647 ws_impl->buf->prev = &ws_impl->buf;
1648 recv_msg->incoming_buf = NULL;
1649
1650 b = ws_impl->buf;
1651
1652 } else {
1653 b = nxt_unit_mmap_buf_get(ctx);
1654 if (nxt_slow_path(b == NULL)) {
1655 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1656 req_impl->stream);
1657
1658 nxt_unit_websocket_frame_release(&ws_impl->ws);
1659
1660 return NXT_UNIT_ERROR;
1661 }
1662
1663 b->req = req;
1664 b->buf.start = recv_msg->start;
1665 b->buf.free = b->buf.start;
1666 b->buf.end = b->buf.start + recv_msg->size;
1667
1668 nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1669 }
1670
1671 ws_impl->ws.header = (void *) b->buf.start;
1672 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1673 ws_impl->ws.header);
1674
1675 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1676
1677 if (ws_impl->ws.header->mask) {
1678 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1679
1680 } else {
1681 ws_impl->ws.mask = NULL;
1682 }
1683
1684 b->buf.free += hsize;
1685
1686 ws_impl->ws.content_buf = &b->buf;
1687 ws_impl->ws.content_length = ws_impl->ws.payload_len;
1688
1689 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1690 "payload_len=%"PRIu64,
1691 ws_impl->ws.header->opcode,
1692 ws_impl->ws.payload_len);
1693
1694 cb->websocket_handler(&ws_impl->ws);
1695 }
1696
1697 if (recv_msg->last) {
1698 if (cb->close_handler) {
1699 nxt_unit_req_debug(req, "close_handler");
1700
1701 cb->close_handler(req);
1702
1703 } else {
1704 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1705 }
1706 }
1707
1708 return NXT_UNIT_OK;
1709 }
1710
1711
1712 static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t * ctx)1713 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1714 {
1715 nxt_unit_impl_t *lib;
1716 nxt_unit_callbacks_t *cb;
1717
1718 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1719 cb = &lib->callbacks;
1720
1721 if (cb->shm_ack_handler != NULL) {
1722 cb->shm_ack_handler(ctx);
1723 }
1724
1725 return NXT_UNIT_OK;
1726 }
1727
1728
1729 static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t * ctx)1730 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1731 {
1732 nxt_unit_impl_t *lib;
1733 nxt_queue_link_t *lnk;
1734 nxt_unit_ctx_impl_t *ctx_impl;
1735 nxt_unit_request_info_impl_t *req_impl;
1736
1737 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1738
1739 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1740
1741 pthread_mutex_lock(&ctx_impl->mutex);
1742
1743 if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1744 pthread_mutex_unlock(&ctx_impl->mutex);
1745
1746 req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1747 + lib->request_data_size);
1748 if (nxt_slow_path(req_impl == NULL)) {
1749 return NULL;
1750 }
1751
1752 req_impl->req.unit = ctx->unit;
1753 req_impl->req.ctx = ctx;
1754
1755 pthread_mutex_lock(&ctx_impl->mutex);
1756
1757 } else {
1758 lnk = nxt_queue_first(&ctx_impl->free_req);
1759 nxt_queue_remove(lnk);
1760
1761 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1762 }
1763
1764 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1765
1766 pthread_mutex_unlock(&ctx_impl->mutex);
1767
1768 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1769
1770 return req_impl;
1771 }
1772
1773
1774 static void
nxt_unit_request_info_release(nxt_unit_request_info_t * req)1775 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1776 {
1777 nxt_unit_ctx_t *ctx;
1778 nxt_unit_ctx_impl_t *ctx_impl;
1779 nxt_unit_request_info_impl_t *req_impl;
1780
1781 ctx = req->ctx;
1782 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1783 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1784
1785 req->response = NULL;
1786 req->response_buf = NULL;
1787
1788 if (req_impl->in_hash) {
1789 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1790 }
1791
1792 while (req_impl->outgoing_buf != NULL) {
1793 nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1794 }
1795
1796 while (req_impl->incoming_buf != NULL) {
1797 nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1798 }
1799
1800 if (req->content_fd != -1) {
1801 nxt_unit_close(req->content_fd);
1802
1803 req->content_fd = -1;
1804 }
1805
1806 if (req->response_port != NULL) {
1807 nxt_unit_port_release(req->response_port);
1808
1809 req->response_port = NULL;
1810 }
1811
1812 req_impl->state = NXT_UNIT_RS_RELEASED;
1813
1814 pthread_mutex_lock(&ctx_impl->mutex);
1815
1816 nxt_queue_remove(&req_impl->link);
1817
1818 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1819
1820 pthread_mutex_unlock(&ctx_impl->mutex);
1821
1822 if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1823 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1824 }
1825 }
1826
1827
1828 static void
nxt_unit_request_info_free(nxt_unit_request_info_impl_t * req_impl)1829 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1830 {
1831 nxt_unit_ctx_impl_t *ctx_impl;
1832
1833 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1834
1835 nxt_queue_remove(&req_impl->link);
1836
1837 if (req_impl != &ctx_impl->req) {
1838 nxt_unit_free(&ctx_impl->ctx, req_impl);
1839 }
1840 }
1841
1842
1843 static nxt_unit_websocket_frame_impl_t *
nxt_unit_websocket_frame_get(nxt_unit_ctx_t * ctx)1844 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1845 {
1846 nxt_queue_link_t *lnk;
1847 nxt_unit_ctx_impl_t *ctx_impl;
1848 nxt_unit_websocket_frame_impl_t *ws_impl;
1849
1850 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1851
1852 pthread_mutex_lock(&ctx_impl->mutex);
1853
1854 if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1855 pthread_mutex_unlock(&ctx_impl->mutex);
1856
1857 ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1858 if (nxt_slow_path(ws_impl == NULL)) {
1859 return NULL;
1860 }
1861
1862 } else {
1863 lnk = nxt_queue_first(&ctx_impl->free_ws);
1864 nxt_queue_remove(lnk);
1865
1866 pthread_mutex_unlock(&ctx_impl->mutex);
1867
1868 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1869 }
1870
1871 ws_impl->ctx_impl = ctx_impl;
1872
1873 return ws_impl;
1874 }
1875
1876
1877 static void
nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t * ws)1878 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1879 {
1880 nxt_unit_websocket_frame_impl_t *ws_impl;
1881
1882 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1883
1884 while (ws_impl->buf != NULL) {
1885 nxt_unit_mmap_buf_free(ws_impl->buf);
1886 }
1887
1888 ws->req = NULL;
1889
1890 pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1891
1892 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1893
1894 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1895 }
1896
1897
1898 static void
nxt_unit_websocket_frame_free(nxt_unit_ctx_t * ctx,nxt_unit_websocket_frame_impl_t * ws_impl)1899 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1900 nxt_unit_websocket_frame_impl_t *ws_impl)
1901 {
1902 nxt_queue_remove(&ws_impl->link);
1903
1904 nxt_unit_free(ctx, ws_impl);
1905 }
1906
1907
1908 uint16_t
nxt_unit_field_hash(const char * name,size_t name_length)1909 nxt_unit_field_hash(const char *name, size_t name_length)
1910 {
1911 u_char ch;
1912 uint32_t hash;
1913 const char *p, *end;
1914
1915 hash = 159406; /* Magic value copied from nxt_http_parse.c */
1916 end = name + name_length;
1917
1918 for (p = name; p < end; p++) {
1919 ch = *p;
1920 hash = (hash << 4) + hash + nxt_lowcase(ch);
1921 }
1922
1923 hash = (hash >> 16) ^ hash;
1924
1925 return hash;
1926 }
1927
1928
1929 void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t * req)1930 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1931 {
1932 char *name;
1933 uint32_t i, j;
1934 nxt_unit_field_t *fields, f;
1935 nxt_unit_request_t *r;
1936
1937 static nxt_str_t content_length = nxt_string("content-length");
1938 static nxt_str_t content_type = nxt_string("content-type");
1939 static nxt_str_t cookie = nxt_string("cookie");
1940
1941 nxt_unit_req_debug(req, "group_dup_fields");
1942
1943 r = req->request;
1944 fields = r->fields;
1945
1946 for (i = 0; i < r->fields_count; i++) {
1947 name = nxt_unit_sptr_get(&fields[i].name);
1948
1949 switch (fields[i].hash) {
1950 case NXT_UNIT_HASH_CONTENT_LENGTH:
1951 if (fields[i].name_length == content_length.length
1952 && nxt_unit_memcasecmp(name, content_length.start,
1953 content_length.length) == 0)
1954 {
1955 r->content_length_field = i;
1956 }
1957
1958 break;
1959
1960 case NXT_UNIT_HASH_CONTENT_TYPE:
1961 if (fields[i].name_length == content_type.length
1962 && nxt_unit_memcasecmp(name, content_type.start,
1963 content_type.length) == 0)
1964 {
1965 r->content_type_field = i;
1966 }
1967
1968 break;
1969
1970 case NXT_UNIT_HASH_COOKIE:
1971 if (fields[i].name_length == cookie.length
1972 && nxt_unit_memcasecmp(name, cookie.start,
1973 cookie.length) == 0)
1974 {
1975 r->cookie_field = i;
1976 }
1977
1978 break;
1979 }
1980
1981 for (j = i + 1; j < r->fields_count; j++) {
1982 if (fields[i].hash != fields[j].hash
1983 || fields[i].name_length != fields[j].name_length
1984 || nxt_unit_memcasecmp(name,
1985 nxt_unit_sptr_get(&fields[j].name),
1986 fields[j].name_length) != 0)
1987 {
1988 continue;
1989 }
1990
1991 f = fields[j];
1992 f.value.offset += (j - (i + 1)) * sizeof(f);
1993
1994 while (j > i + 1) {
1995 fields[j] = fields[j - 1];
1996 fields[j].name.offset -= sizeof(f);
1997 fields[j].value.offset -= sizeof(f);
1998 j--;
1999 }
2000
2001 fields[j] = f;
2002
2003 /* Assign the same name pointer for further grouping simplicity. */
2004 nxt_unit_sptr_set(&fields[j].name, name);
2005
2006 i++;
2007 }
2008 }
2009 }
2010
2011
2012 int
nxt_unit_response_init(nxt_unit_request_info_t * req,uint16_t status,uint32_t max_fields_count,uint32_t max_fields_size)2013 nxt_unit_response_init(nxt_unit_request_info_t *req,
2014 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2015 {
2016 uint32_t buf_size;
2017 nxt_unit_buf_t *buf;
2018 nxt_unit_request_info_impl_t *req_impl;
2019
2020 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2021
2022 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2023 nxt_unit_req_warn(req, "init: response already sent");
2024
2025 return NXT_UNIT_ERROR;
2026 }
2027
2028 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2029 (int) max_fields_count, (int) max_fields_size);
2030
2031 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2032 nxt_unit_req_debug(req, "duplicate response init");
2033 }
2034
2035 /*
2036 * Each field name and value 0-terminated by libunit,
2037 * this is the reason of '+ 2' below.
2038 */
2039 buf_size = sizeof(nxt_unit_response_t)
2040 + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2041 + max_fields_size;
2042
2043 if (nxt_slow_path(req->response_buf != NULL)) {
2044 buf = req->response_buf;
2045
2046 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2047 goto init_response;
2048 }
2049
2050 nxt_unit_buf_free(buf);
2051
2052 req->response_buf = NULL;
2053 req->response = NULL;
2054 req->response_max_fields = 0;
2055
2056 req_impl->state = NXT_UNIT_RS_START;
2057 }
2058
2059 buf = nxt_unit_response_buf_alloc(req, buf_size);
2060 if (nxt_slow_path(buf == NULL)) {
2061 return NXT_UNIT_ERROR;
2062 }
2063
2064 init_response:
2065
2066 memset(buf->start, 0, sizeof(nxt_unit_response_t));
2067
2068 req->response_buf = buf;
2069
2070 req->response = (nxt_unit_response_t *) buf->start;
2071 req->response->status = status;
2072
2073 buf->free = buf->start + sizeof(nxt_unit_response_t)
2074 + max_fields_count * sizeof(nxt_unit_field_t);
2075
2076 req->response_max_fields = max_fields_count;
2077 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2078
2079 return NXT_UNIT_OK;
2080 }
2081
2082
2083 int
nxt_unit_response_realloc(nxt_unit_request_info_t * req,uint32_t max_fields_count,uint32_t max_fields_size)2084 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2085 uint32_t max_fields_count, uint32_t max_fields_size)
2086 {
2087 char *p;
2088 uint32_t i, buf_size;
2089 nxt_unit_buf_t *buf;
2090 nxt_unit_field_t *f, *src;
2091 nxt_unit_response_t *resp;
2092 nxt_unit_request_info_impl_t *req_impl;
2093
2094 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2095
2096 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2097 nxt_unit_req_warn(req, "realloc: response not init");
2098
2099 return NXT_UNIT_ERROR;
2100 }
2101
2102 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2103 nxt_unit_req_warn(req, "realloc: response already sent");
2104
2105 return NXT_UNIT_ERROR;
2106 }
2107
2108 if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2109 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2110
2111 return NXT_UNIT_ERROR;
2112 }
2113
2114 /*
2115 * Each field name and value 0-terminated by libunit,
2116 * this is the reason of '+ 2' below.
2117 */
2118 buf_size = sizeof(nxt_unit_response_t)
2119 + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2120 + max_fields_size;
2121
2122 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2123
2124 buf = nxt_unit_response_buf_alloc(req, buf_size);
2125 if (nxt_slow_path(buf == NULL)) {
2126 nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2127 return NXT_UNIT_ERROR;
2128 }
2129
2130 resp = (nxt_unit_response_t *) buf->start;
2131
2132 memset(resp, 0, sizeof(nxt_unit_response_t));
2133
2134 resp->status = req->response->status;
2135 resp->content_length = req->response->content_length;
2136
2137 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2138 f = resp->fields;
2139
2140 for (i = 0; i < req->response->fields_count; i++) {
2141 src = req->response->fields + i;
2142
2143 if (nxt_slow_path(src->skip != 0)) {
2144 continue;
2145 }
2146
2147 if (nxt_slow_path(src->name_length + src->value_length + 2
2148 > (uint32_t) (buf->end - p)))
2149 {
2150 nxt_unit_req_warn(req, "realloc: not enough space for field"
2151 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2152 i, src, src->name_length, src->value_length);
2153
2154 goto fail;
2155 }
2156
2157 nxt_unit_sptr_set(&f->name, p);
2158 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2159 *p++ = '\0';
2160
2161 nxt_unit_sptr_set(&f->value, p);
2162 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2163 *p++ = '\0';
2164
2165 f->hash = src->hash;
2166 f->skip = 0;
2167 f->name_length = src->name_length;
2168 f->value_length = src->value_length;
2169
2170 resp->fields_count++;
2171 f++;
2172 }
2173
2174 if (req->response->piggyback_content_length > 0) {
2175 if (nxt_slow_path(req->response->piggyback_content_length
2176 > (uint32_t) (buf->end - p)))
2177 {
2178 nxt_unit_req_warn(req, "realloc: not enought space for content"
2179 " #%"PRIu32", %"PRIu32" required",
2180 i, req->response->piggyback_content_length);
2181
2182 goto fail;
2183 }
2184
2185 resp->piggyback_content_length =
2186 req->response->piggyback_content_length;
2187
2188 nxt_unit_sptr_set(&resp->piggyback_content, p);
2189 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2190 req->response->piggyback_content_length);
2191 }
2192
2193 buf->free = p;
2194
2195 nxt_unit_buf_free(req->response_buf);
2196
2197 req->response = resp;
2198 req->response_buf = buf;
2199 req->response_max_fields = max_fields_count;
2200
2201 return NXT_UNIT_OK;
2202
2203 fail:
2204
2205 nxt_unit_buf_free(buf);
2206
2207 return NXT_UNIT_ERROR;
2208 }
2209
2210
2211 int
nxt_unit_response_is_init(nxt_unit_request_info_t * req)2212 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2213 {
2214 nxt_unit_request_info_impl_t *req_impl;
2215
2216 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2217
2218 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2219 }
2220
2221
2222 int
nxt_unit_response_add_field(nxt_unit_request_info_t * req,const char * name,uint8_t name_length,const char * value,uint32_t value_length)2223 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2224 const char *name, uint8_t name_length,
2225 const char *value, uint32_t value_length)
2226 {
2227 nxt_unit_buf_t *buf;
2228 nxt_unit_field_t *f;
2229 nxt_unit_response_t *resp;
2230 nxt_unit_request_info_impl_t *req_impl;
2231
2232 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2233
2234 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2235 nxt_unit_req_warn(req, "add_field: response not initialized or "
2236 "already sent");
2237
2238 return NXT_UNIT_ERROR;
2239 }
2240
2241 resp = req->response;
2242
2243 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2244 nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2245 (int) resp->fields_count);
2246
2247 return NXT_UNIT_ERROR;
2248 }
2249
2250 buf = req->response_buf;
2251
2252 if (nxt_slow_path(name_length + value_length + 2
2253 > (uint32_t) (buf->end - buf->free)))
2254 {
2255 nxt_unit_req_warn(req, "add_field: response buffer overflow");
2256
2257 return NXT_UNIT_ERROR;
2258 }
2259
2260 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2261 resp->fields_count,
2262 (int) name_length, name,
2263 (int) value_length, value);
2264
2265 f = resp->fields + resp->fields_count;
2266
2267 nxt_unit_sptr_set(&f->name, buf->free);
2268 buf->free = nxt_cpymem(buf->free, name, name_length);
2269 *buf->free++ = '\0';
2270
2271 nxt_unit_sptr_set(&f->value, buf->free);
2272 buf->free = nxt_cpymem(buf->free, value, value_length);
2273 *buf->free++ = '\0';
2274
2275 f->hash = nxt_unit_field_hash(name, name_length);
2276 f->skip = 0;
2277 f->name_length = name_length;
2278 f->value_length = value_length;
2279
2280 resp->fields_count++;
2281
2282 return NXT_UNIT_OK;
2283 }
2284
2285
2286 int
nxt_unit_response_add_content(nxt_unit_request_info_t * req,const void * src,uint32_t size)2287 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2288 const void* src, uint32_t size)
2289 {
2290 nxt_unit_buf_t *buf;
2291 nxt_unit_response_t *resp;
2292 nxt_unit_request_info_impl_t *req_impl;
2293
2294 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2295
2296 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2297 nxt_unit_req_warn(req, "add_content: response not initialized yet");
2298
2299 return NXT_UNIT_ERROR;
2300 }
2301
2302 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2303 nxt_unit_req_warn(req, "add_content: response already sent");
2304
2305 return NXT_UNIT_ERROR;
2306 }
2307
2308 buf = req->response_buf;
2309
2310 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2311 nxt_unit_req_warn(req, "add_content: buffer overflow");
2312
2313 return NXT_UNIT_ERROR;
2314 }
2315
2316 resp = req->response;
2317
2318 if (resp->piggyback_content_length == 0) {
2319 nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2320 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2321 }
2322
2323 resp->piggyback_content_length += size;
2324
2325 buf->free = nxt_cpymem(buf->free, src, size);
2326
2327 return NXT_UNIT_OK;
2328 }
2329
2330
2331 int
nxt_unit_response_send(nxt_unit_request_info_t * req)2332 nxt_unit_response_send(nxt_unit_request_info_t *req)
2333 {
2334 int rc;
2335 nxt_unit_mmap_buf_t *mmap_buf;
2336 nxt_unit_request_info_impl_t *req_impl;
2337
2338 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2339
2340 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2341 nxt_unit_req_warn(req, "send: response is not initialized yet");
2342
2343 return NXT_UNIT_ERROR;
2344 }
2345
2346 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2347 nxt_unit_req_warn(req, "send: response already sent");
2348
2349 return NXT_UNIT_ERROR;
2350 }
2351
2352 if (req->request->websocket_handshake && req->response->status == 101) {
2353 nxt_unit_response_upgrade(req);
2354 }
2355
2356 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2357 req->response->fields_count,
2358 (int) (req->response_buf->free
2359 - req->response_buf->start));
2360
2361 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2362
2363 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2364 if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2365 req->response = NULL;
2366 req->response_buf = NULL;
2367 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2368
2369 nxt_unit_mmap_buf_free(mmap_buf);
2370 }
2371
2372 return rc;
2373 }
2374
2375
2376 int
nxt_unit_response_is_sent(nxt_unit_request_info_t * req)2377 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2378 {
2379 nxt_unit_request_info_impl_t *req_impl;
2380
2381 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2382
2383 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2384 }
2385
2386
2387 nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t * req,uint32_t size)2388 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2389 {
2390 int rc;
2391 nxt_unit_mmap_buf_t *mmap_buf;
2392 nxt_unit_request_info_impl_t *req_impl;
2393
2394 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2395 nxt_unit_req_warn(req, "response_buf_alloc: "
2396 "requested buffer (%"PRIu32") too big", size);
2397
2398 return NULL;
2399 }
2400
2401 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2402
2403 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2404
2405 mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2406 if (nxt_slow_path(mmap_buf == NULL)) {
2407 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2408
2409 return NULL;
2410 }
2411
2412 mmap_buf->req = req;
2413
2414 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2415
2416 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2417 size, size, mmap_buf,
2418 NULL);
2419 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2420 nxt_unit_mmap_buf_release(mmap_buf);
2421
2422 nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2423
2424 return NULL;
2425 }
2426
2427 return &mmap_buf->buf;
2428 }
2429
2430
2431 static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t * ctx)2432 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2433 {
2434 nxt_unit_mmap_buf_t *mmap_buf;
2435 nxt_unit_ctx_impl_t *ctx_impl;
2436
2437 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2438
2439 pthread_mutex_lock(&ctx_impl->mutex);
2440
2441 if (ctx_impl->free_buf == NULL) {
2442 pthread_mutex_unlock(&ctx_impl->mutex);
2443
2444 mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2445 if (nxt_slow_path(mmap_buf == NULL)) {
2446 return NULL;
2447 }
2448
2449 } else {
2450 mmap_buf = ctx_impl->free_buf;
2451
2452 nxt_unit_mmap_buf_unlink(mmap_buf);
2453
2454 pthread_mutex_unlock(&ctx_impl->mutex);
2455 }
2456
2457 mmap_buf->ctx_impl = ctx_impl;
2458
2459 mmap_buf->hdr = NULL;
2460 mmap_buf->free_ptr = NULL;
2461
2462 return mmap_buf;
2463 }
2464
2465
2466 static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t * mmap_buf)2467 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2468 {
2469 nxt_unit_mmap_buf_unlink(mmap_buf);
2470
2471 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2472
2473 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2474
2475 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2476 }
2477
2478
2479 int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t * req)2480 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2481 {
2482 return req->request->websocket_handshake;
2483 }
2484
2485
2486 int
nxt_unit_response_upgrade(nxt_unit_request_info_t * req)2487 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2488 {
2489 int rc;
2490 nxt_unit_request_info_impl_t *req_impl;
2491
2492 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2493
2494 if (nxt_slow_path(req_impl->websocket != 0)) {
2495 nxt_unit_req_debug(req, "upgrade: already upgraded");
2496
2497 return NXT_UNIT_OK;
2498 }
2499
2500 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2501 nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2502
2503 return NXT_UNIT_ERROR;
2504 }
2505
2506 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2507 nxt_unit_req_warn(req, "upgrade: response already sent");
2508
2509 return NXT_UNIT_ERROR;
2510 }
2511
2512 rc = nxt_unit_request_hash_add(req->ctx, req);
2513 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2514 nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2515
2516 return NXT_UNIT_ERROR;
2517 }
2518
2519 req_impl->websocket = 1;
2520
2521 req->response->status = 101;
2522
2523 return NXT_UNIT_OK;
2524 }
2525
2526
2527 int
nxt_unit_response_is_websocket(nxt_unit_request_info_t * req)2528 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2529 {
2530 nxt_unit_request_info_impl_t *req_impl;
2531
2532 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2533
2534 return req_impl->websocket;
2535 }
2536
2537
2538 nxt_unit_request_info_t *
nxt_unit_get_request_info_from_data(void * data)2539 nxt_unit_get_request_info_from_data(void *data)
2540 {
2541 nxt_unit_request_info_impl_t *req_impl;
2542
2543 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2544
2545 return &req_impl->req;
2546 }
2547
2548
2549 int
nxt_unit_buf_send(nxt_unit_buf_t * buf)2550 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2551 {
2552 int rc;
2553 nxt_unit_mmap_buf_t *mmap_buf;
2554 nxt_unit_request_info_t *req;
2555 nxt_unit_request_info_impl_t *req_impl;
2556
2557 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2558
2559 req = mmap_buf->req;
2560 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2561
2562 nxt_unit_req_debug(req, "buf_send: %d bytes",
2563 (int) (buf->free - buf->start));
2564
2565 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2566 nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2567
2568 return NXT_UNIT_ERROR;
2569 }
2570
2571 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2572 nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2573
2574 return NXT_UNIT_ERROR;
2575 }
2576
2577 if (nxt_fast_path(buf->free > buf->start)) {
2578 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2579 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2580 return rc;
2581 }
2582 }
2583
2584 nxt_unit_mmap_buf_free(mmap_buf);
2585
2586 return NXT_UNIT_OK;
2587 }
2588
2589
2590 static void
nxt_unit_buf_send_done(nxt_unit_buf_t * buf)2591 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2592 {
2593 int rc;
2594 nxt_unit_mmap_buf_t *mmap_buf;
2595 nxt_unit_request_info_t *req;
2596
2597 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2598
2599 req = mmap_buf->req;
2600
2601 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2602 if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2603 nxt_unit_mmap_buf_free(mmap_buf);
2604
2605 nxt_unit_request_info_release(req);
2606
2607 } else {
2608 nxt_unit_request_done(req, rc);
2609 }
2610 }
2611
2612
2613 static int
nxt_unit_mmap_buf_send(nxt_unit_request_info_t * req,nxt_unit_mmap_buf_t * mmap_buf,int last)2614 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2615 nxt_unit_mmap_buf_t *mmap_buf, int last)
2616 {
2617 struct {
2618 nxt_port_msg_t msg;
2619 nxt_port_mmap_msg_t mmap_msg;
2620 } m;
2621
2622 int rc;
2623 u_char *last_used, *first_free;
2624 ssize_t res;
2625 nxt_chunk_id_t first_free_chunk;
2626 nxt_unit_buf_t *buf;
2627 nxt_unit_impl_t *lib;
2628 nxt_port_mmap_header_t *hdr;
2629 nxt_unit_request_info_impl_t *req_impl;
2630
2631 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2632 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2633
2634 buf = &mmap_buf->buf;
2635 hdr = mmap_buf->hdr;
2636
2637 m.mmap_msg.size = buf->free - buf->start;
2638
2639 m.msg.stream = req_impl->stream;
2640 m.msg.pid = lib->pid;
2641 m.msg.reply_port = 0;
2642 m.msg.type = _NXT_PORT_MSG_DATA;
2643 m.msg.last = last != 0;
2644 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2645 m.msg.nf = 0;
2646 m.msg.mf = 0;
2647 m.msg.tracking = 0;
2648
2649 rc = NXT_UNIT_ERROR;
2650
2651 if (m.msg.mmap) {
2652 m.mmap_msg.mmap_id = hdr->id;
2653 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2654 (u_char *) buf->start);
2655
2656 nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2657 req_impl->stream,
2658 (int) m.mmap_msg.mmap_id,
2659 (int) m.mmap_msg.chunk_id,
2660 (int) m.mmap_msg.size);
2661
2662 res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2663 NULL);
2664 if (nxt_slow_path(res != sizeof(m))) {
2665 goto free_buf;
2666 }
2667
2668 last_used = (u_char *) buf->free - 1;
2669 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2670
2671 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2672 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2673
2674 buf->start = (char *) first_free;
2675 buf->free = buf->start;
2676
2677 if (buf->end < buf->start) {
2678 buf->end = buf->start;
2679 }
2680
2681 } else {
2682 buf->start = NULL;
2683 buf->free = NULL;
2684 buf->end = NULL;
2685
2686 mmap_buf->hdr = NULL;
2687 }
2688
2689 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2690 (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2691
2692 nxt_unit_debug(req->ctx, "allocated_chunks %d",
2693 (int) lib->outgoing.allocated_chunks);
2694
2695 } else {
2696 if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2697 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2698 {
2699 nxt_unit_alert(req->ctx,
2700 "#%"PRIu32": failed to send plain memory buffer"
2701 ": no space reserved for message header",
2702 req_impl->stream);
2703
2704 goto free_buf;
2705 }
2706
2707 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2708
2709 nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2710 req_impl->stream,
2711 (int) (sizeof(m.msg) + m.mmap_msg.size));
2712
2713 res = nxt_unit_port_send(req->ctx, req->response_port,
2714 buf->start - sizeof(m.msg),
2715 m.mmap_msg.size + sizeof(m.msg), NULL);
2716
2717 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2718 goto free_buf;
2719 }
2720 }
2721
2722 rc = NXT_UNIT_OK;
2723
2724 free_buf:
2725
2726 nxt_unit_free_outgoing_buf(mmap_buf);
2727
2728 return rc;
2729 }
2730
2731
2732 void
nxt_unit_buf_free(nxt_unit_buf_t * buf)2733 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2734 {
2735 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2736 }
2737
2738
2739 static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t * mmap_buf)2740 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2741 {
2742 nxt_unit_free_outgoing_buf(mmap_buf);
2743
2744 nxt_unit_mmap_buf_release(mmap_buf);
2745 }
2746
2747
2748 static void
nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t * mmap_buf)2749 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2750 {
2751 if (mmap_buf->hdr != NULL) {
2752 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2753 mmap_buf->hdr, mmap_buf->buf.start,
2754 mmap_buf->buf.end - mmap_buf->buf.start);
2755
2756 mmap_buf->hdr = NULL;
2757
2758 return;
2759 }
2760
2761 if (mmap_buf->free_ptr != NULL) {
2762 nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2763
2764 mmap_buf->free_ptr = NULL;
2765 }
2766 }
2767
2768
2769 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t * ctx)2770 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2771 {
2772 nxt_unit_ctx_impl_t *ctx_impl;
2773 nxt_unit_read_buf_t *rbuf;
2774
2775 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2776
2777 pthread_mutex_lock(&ctx_impl->mutex);
2778
2779 rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2780
2781 pthread_mutex_unlock(&ctx_impl->mutex);
2782
2783 rbuf->oob.size = 0;
2784
2785 return rbuf;
2786 }
2787
2788
2789 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t * ctx_impl)2790 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2791 {
2792 nxt_queue_link_t *link;
2793 nxt_unit_read_buf_t *rbuf;
2794
2795 if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2796 link = nxt_queue_first(&ctx_impl->free_rbuf);
2797 nxt_queue_remove(link);
2798
2799 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2800
2801 return rbuf;
2802 }
2803
2804 rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2805
2806 if (nxt_fast_path(rbuf != NULL)) {
2807 rbuf->ctx_impl = ctx_impl;
2808 }
2809
2810 return rbuf;
2811 }
2812
2813
2814 static void
nxt_unit_read_buf_release(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf)2815 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2816 nxt_unit_read_buf_t *rbuf)
2817 {
2818 nxt_unit_ctx_impl_t *ctx_impl;
2819
2820 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2821
2822 pthread_mutex_lock(&ctx_impl->mutex);
2823
2824 nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2825
2826 pthread_mutex_unlock(&ctx_impl->mutex);
2827 }
2828
2829
2830 nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t * buf)2831 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2832 {
2833 nxt_unit_mmap_buf_t *mmap_buf;
2834
2835 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2836
2837 if (mmap_buf->next == NULL) {
2838 return NULL;
2839 }
2840
2841 return &mmap_buf->next->buf;
2842 }
2843
2844
2845 uint32_t
nxt_unit_buf_max(void)2846 nxt_unit_buf_max(void)
2847 {
2848 return PORT_MMAP_DATA_SIZE;
2849 }
2850
2851
2852 uint32_t
nxt_unit_buf_min(void)2853 nxt_unit_buf_min(void)
2854 {
2855 return PORT_MMAP_CHUNK_SIZE;
2856 }
2857
2858
2859 int
nxt_unit_response_write(nxt_unit_request_info_t * req,const void * start,size_t size)2860 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2861 size_t size)
2862 {
2863 ssize_t res;
2864
2865 res = nxt_unit_response_write_nb(req, start, size, size);
2866
2867 return res < 0 ? -res : NXT_UNIT_OK;
2868 }
2869
2870
2871 ssize_t
nxt_unit_response_write_nb(nxt_unit_request_info_t * req,const void * start,size_t size,size_t min_size)2872 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2873 size_t size, size_t min_size)
2874 {
2875 int rc;
2876 ssize_t sent;
2877 uint32_t part_size, min_part_size, buf_size;
2878 const char *part_start;
2879 nxt_unit_mmap_buf_t mmap_buf;
2880 nxt_unit_request_info_impl_t *req_impl;
2881 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2882
2883 nxt_unit_req_debug(req, "write: %d", (int) size);
2884
2885 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2886
2887 part_start = start;
2888 sent = 0;
2889
2890 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2891 nxt_unit_req_alert(req, "write: response not initialized yet");
2892
2893 return -NXT_UNIT_ERROR;
2894 }
2895
2896 /* Check if response is not send yet. */
2897 if (nxt_slow_path(req->response_buf != NULL)) {
2898 part_size = req->response_buf->end - req->response_buf->free;
2899 part_size = nxt_min(size, part_size);
2900
2901 rc = nxt_unit_response_add_content(req, part_start, part_size);
2902 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2903 return -rc;
2904 }
2905
2906 rc = nxt_unit_response_send(req);
2907 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2908 return -rc;
2909 }
2910
2911 size -= part_size;
2912 part_start += part_size;
2913 sent += part_size;
2914
2915 min_size -= nxt_min(min_size, part_size);
2916 }
2917
2918 while (size > 0) {
2919 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2920 min_part_size = nxt_min(min_size, part_size);
2921 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2922
2923 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2924 min_part_size, &mmap_buf, local_buf);
2925 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2926 return -rc;
2927 }
2928
2929 buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2930 if (nxt_slow_path(buf_size == 0)) {
2931 return sent;
2932 }
2933 part_size = nxt_min(buf_size, part_size);
2934
2935 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2936 part_start, part_size);
2937
2938 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2939 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2940 return -rc;
2941 }
2942
2943 size -= part_size;
2944 part_start += part_size;
2945 sent += part_size;
2946
2947 min_size -= nxt_min(min_size, part_size);
2948 }
2949
2950 return sent;
2951 }
2952
2953
2954 int
nxt_unit_response_write_cb(nxt_unit_request_info_t * req,nxt_unit_read_info_t * read_info)2955 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2956 nxt_unit_read_info_t *read_info)
2957 {
2958 int rc;
2959 ssize_t n;
2960 uint32_t buf_size;
2961 nxt_unit_buf_t *buf;
2962 nxt_unit_mmap_buf_t mmap_buf;
2963 nxt_unit_request_info_impl_t *req_impl;
2964 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2965
2966 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2967
2968 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2969 nxt_unit_req_alert(req, "write: response not initialized yet");
2970
2971 return NXT_UNIT_ERROR;
2972 }
2973
2974 /* Check if response is not send yet. */
2975 if (nxt_slow_path(req->response_buf != NULL)) {
2976
2977 /* Enable content in headers buf. */
2978 rc = nxt_unit_response_add_content(req, "", 0);
2979 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2980 nxt_unit_req_error(req, "Failed to add piggyback content");
2981
2982 return rc;
2983 }
2984
2985 buf = req->response_buf;
2986
2987 while (buf->end - buf->free > 0) {
2988 n = read_info->read(read_info, buf->free, buf->end - buf->free);
2989 if (nxt_slow_path(n < 0)) {
2990 nxt_unit_req_error(req, "Read error");
2991
2992 return NXT_UNIT_ERROR;
2993 }
2994
2995 /* Manually increase sizes. */
2996 buf->free += n;
2997 req->response->piggyback_content_length += n;
2998
2999 if (read_info->eof) {
3000 break;
3001 }
3002 }
3003
3004 rc = nxt_unit_response_send(req);
3005 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3006 nxt_unit_req_error(req, "Failed to send headers with content");
3007
3008 return rc;
3009 }
3010
3011 if (read_info->eof) {
3012 return NXT_UNIT_OK;
3013 }
3014 }
3015
3016 while (!read_info->eof) {
3017 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3018 read_info->buf_size);
3019
3020 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3021
3022 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3023 buf_size, buf_size,
3024 &mmap_buf, local_buf);
3025 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3026 return rc;
3027 }
3028
3029 buf = &mmap_buf.buf;
3030
3031 while (!read_info->eof && buf->end > buf->free) {
3032 n = read_info->read(read_info, buf->free, buf->end - buf->free);
3033 if (nxt_slow_path(n < 0)) {
3034 nxt_unit_req_error(req, "Read error");
3035
3036 nxt_unit_free_outgoing_buf(&mmap_buf);
3037
3038 return NXT_UNIT_ERROR;
3039 }
3040
3041 buf->free += n;
3042 }
3043
3044 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3045 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3046 nxt_unit_req_error(req, "Failed to send content");
3047
3048 return rc;
3049 }
3050 }
3051
3052 return NXT_UNIT_OK;
3053 }
3054
3055
3056 ssize_t
nxt_unit_request_read(nxt_unit_request_info_t * req,void * dst,size_t size)3057 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3058 {
3059 ssize_t buf_res, res;
3060
3061 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3062 dst, size);
3063
3064 if (buf_res < (ssize_t) size && req->content_fd != -1) {
3065 res = read(req->content_fd, dst, size);
3066 if (nxt_slow_path(res < 0)) {
3067 nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3068 strerror(errno), errno);
3069
3070 return res;
3071 }
3072
3073 if (res < (ssize_t) size) {
3074 nxt_unit_close(req->content_fd);
3075
3076 req->content_fd = -1;
3077 }
3078
3079 req->content_length -= res;
3080 size -= res;
3081
3082 dst = nxt_pointer_to(dst, res);
3083
3084 } else {
3085 res = 0;
3086 }
3087
3088 return buf_res + res;
3089 }
3090
3091
3092 ssize_t
nxt_unit_request_readline_size(nxt_unit_request_info_t * req,size_t max_size)3093 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3094 {
3095 char *p;
3096 size_t l_size, b_size;
3097 nxt_unit_buf_t *b;
3098 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf;
3099
3100 if (req->content_length == 0) {
3101 return 0;
3102 }
3103
3104 l_size = 0;
3105
3106 b = req->content_buf;
3107
3108 while (b != NULL) {
3109 b_size = b->end - b->free;
3110 p = memchr(b->free, '\n', b_size);
3111
3112 if (p != NUL