1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6 #include "nxt_main.h"
7 #include "nxt_port_memory_int.h"
8 #include "nxt_socket_msg.h"
9 #include "nxt_port_queue.h"
10 #include "nxt_app_queue.h"
11
12 #include "nxt_unit.h"
13 #include "nxt_unit_request.h"
14 #include "nxt_unit_response.h"
15 #include "nxt_unit_websocket.h"
16
17 #include "nxt_websocket.h"
18
19 #if (NXT_HAVE_MEMFD_CREATE)
20 #include <linux/memfd.h>
21 #endif
22
23 #define NXT_UNIT_MAX_PLAIN_SIZE 1024
24 #define NXT_UNIT_LOCAL_BUF_SIZE \
25 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
26
27 enum {
28 NXT_QUIT_NORMAL = 0,
29 NXT_QUIT_GRACEFUL = 1,
30 };
31
32 typedef struct nxt_unit_impl_s nxt_unit_impl_t;
33 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
34 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
35 typedef struct nxt_unit_process_s nxt_unit_process_t;
36 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
37 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
38 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t;
39 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
40 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
41 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
42 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
43
44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
46 nxt_unit_ctx_impl_t *ctx_impl, void *data);
47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
52 nxt_unit_mmap_buf_t *mmap_buf);
53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
54 nxt_unit_mmap_buf_t *mmap_buf);
55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
57 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
58 int *shared_port_fd, int *shared_queue_fd,
59 int *log_fd, uint32_t *stream, uint32_t *shm_limit,
60 uint32_t *request_limit);
61 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
62 int queue_fd);
63 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
64 nxt_unit_request_info_t **preq);
65 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
66 nxt_unit_recv_msg_t *recv_msg);
67 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
68 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
69 nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
70 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
71 nxt_unit_recv_msg_t *recv_msg);
72 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
73 nxt_unit_port_id_t *port_id);
74 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
75 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
76 nxt_unit_recv_msg_t *recv_msg);
77 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
78 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
79 nxt_unit_ctx_t *ctx);
80 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
81 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
82 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
83 nxt_unit_ctx_t *ctx);
84 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
85 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
86 nxt_unit_websocket_frame_impl_t *ws);
87 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
88 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
89 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
90 nxt_unit_mmap_buf_t *mmap_buf, int last);
91 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
92 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
94 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
95 nxt_unit_ctx_impl_t *ctx_impl);
96 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
97 nxt_unit_read_buf_t *rbuf);
98 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
99 nxt_unit_request_info_t *req, size_t size);
100 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
101 size_t size);
102 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
103 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
104 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
105 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
106 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
107 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
108 nxt_unit_port_t *port, int n);
109 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
110 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
111 int fd);
112 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
113 nxt_unit_port_t *port, uint32_t size,
114 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
115 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
116
117 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
118 nxt_unit_ctx_impl_t *ctx_impl);
119 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
120 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
121 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
122 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
123 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
124 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
125 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
126 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
127 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
128 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
129 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
130 nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
131 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
132
133 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
135 pid_t pid, int remove);
136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
137 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
138 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
139 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
140 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
141 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
142 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
145 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
146 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
147 nxt_unit_port_t *port);
148 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
149 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
150
151 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
152 nxt_unit_port_t *port, int queue_fd);
153
154 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
155 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
156 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
157 nxt_unit_port_t *port, void *queue);
158 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
159 nxt_queue_t *awaiting_req);
160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
161 nxt_unit_port_id_t *port_id);
162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
163 nxt_unit_port_id_t *port_id);
164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
166 nxt_unit_process_t *process);
167 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
168 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
169 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
170 nxt_unit_port_t *port, const void *buf, size_t buf_size,
171 const nxt_send_oob_t *oob);
172 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
173 const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
174 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
175 nxt_unit_read_buf_t *rbuf);
176 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
177 nxt_unit_read_buf_t *src);
178 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
179 nxt_unit_read_buf_t *rbuf);
180 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
181 nxt_unit_read_buf_t *rbuf);
182 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
183 nxt_unit_read_buf_t *rbuf);
184 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
185 nxt_unit_read_buf_t *rbuf);
186 nxt_inline int nxt_unit_close(int fd);
187 static int nxt_unit_fd_blocking(int fd);
188
189 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
190 nxt_unit_port_t *port);
191 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
192 nxt_unit_port_id_t *port_id, int remove);
193
194 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
195 nxt_unit_request_info_t *req);
196 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
197 nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
198
199 static char * nxt_unit_snprint_prefix(char *p, const char *end, pid_t pid,
200 int level);
201 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
202 static void nxt_unit_lvlhsh_free(void *data, void *p);
203 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
204
205
206 struct nxt_unit_mmap_buf_s {
207 nxt_unit_buf_t buf;
208
209 nxt_unit_mmap_buf_t *next;
210 nxt_unit_mmap_buf_t **prev;
211
212 nxt_port_mmap_header_t *hdr;
213 nxt_unit_request_info_t *req;
214 nxt_unit_ctx_impl_t *ctx_impl;
215 char *free_ptr;
216 char *plain_ptr;
217 };
218
219
220 struct nxt_unit_recv_msg_s {
221 uint32_t stream;
222 nxt_pid_t pid;
223 nxt_port_id_t reply_port;
224
225 uint8_t last; /* 1 bit */
226 uint8_t mmap; /* 1 bit */
227
228 void *start;
229 uint32_t size;
230
231 int fd[2];
232
233 nxt_unit_mmap_buf_t *incoming_buf;
234 };
235
236
237 typedef enum {
238 NXT_UNIT_RS_START = 0,
239 NXT_UNIT_RS_RESPONSE_INIT,
240 NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
241 NXT_UNIT_RS_RESPONSE_SENT,
242 NXT_UNIT_RS_RELEASED,
243 } nxt_unit_req_state_t;
244
245
246 struct nxt_unit_request_info_impl_s {
247 nxt_unit_request_info_t req;
248
249 uint32_t stream;
250
251 nxt_unit_mmap_buf_t *outgoing_buf;
252 nxt_unit_mmap_buf_t *incoming_buf;
253
254 nxt_unit_req_state_t state;
255 uint8_t websocket;
256 uint8_t in_hash;
257
258 /* for nxt_unit_ctx_impl_t.free_req or active_req */
259 nxt_queue_link_t link;
260 /* for nxt_unit_port_impl_t.awaiting_req */
261 nxt_queue_link_t port_wait_link;
262
263 char extra_data[];
264 };
265
266
267 struct nxt_unit_websocket_frame_impl_s {
268 nxt_unit_websocket_frame_t ws;
269
270 nxt_unit_mmap_buf_t *buf;
271
272 nxt_queue_link_t link;
273
274 nxt_unit_ctx_impl_t *ctx_impl;
275 };
276
277
278 struct nxt_unit_read_buf_s {
279 nxt_queue_link_t link;
280 nxt_unit_ctx_impl_t *ctx_impl;
281 ssize_t size;
282 nxt_recv_oob_t oob;
283 char buf[16384];
284 };
285
286
287 struct nxt_unit_ctx_impl_s {
288 nxt_unit_ctx_t ctx;
289
290 nxt_atomic_t use_count;
291 nxt_atomic_t wait_items;
292
293 pthread_mutex_t mutex;
294
295 nxt_unit_port_t *read_port;
296
297 nxt_queue_link_t link;
298
299 nxt_unit_mmap_buf_t *free_buf;
300
301 /* of nxt_unit_request_info_impl_t */
302 nxt_queue_t free_req;
303
304 /* of nxt_unit_websocket_frame_impl_t */
305 nxt_queue_t free_ws;
306
307 /* of nxt_unit_request_info_impl_t */
308 nxt_queue_t active_req;
309
310 /* of nxt_unit_request_info_impl_t */
311 nxt_lvlhsh_t requests;
312
313 /* of nxt_unit_request_info_impl_t */
314 nxt_queue_t ready_req;
315
316 /* of nxt_unit_read_buf_t */
317 nxt_queue_t pending_rbuf;
318
319 /* of nxt_unit_read_buf_t */
320 nxt_queue_t free_rbuf;
321
322 uint8_t online; /* 1 bit */
323 uint8_t ready; /* 1 bit */
324 uint8_t quit_param;
325
326 nxt_unit_mmap_buf_t ctx_buf[2];
327 nxt_unit_read_buf_t ctx_read_buf;
328
329 nxt_unit_request_info_impl_t req;
330 };
331
332
333 struct nxt_unit_mmap_s {
334 nxt_port_mmap_header_t *hdr;
335 pthread_t src_thread;
336
337 /* of nxt_unit_read_buf_t */
338 nxt_queue_t awaiting_rbuf;
339 };
340
341
342 struct nxt_unit_mmaps_s {
343 pthread_mutex_t mutex;
344 uint32_t size;
345 uint32_t cap;
346 nxt_atomic_t allocated_chunks;
347 nxt_unit_mmap_t *elts;
348 };
349
350
351 struct nxt_unit_impl_s {
352 nxt_unit_t unit;
353 nxt_unit_callbacks_t callbacks;
354
355 nxt_atomic_t use_count;
356 nxt_atomic_t request_count;
357
358 uint32_t request_data_size;
359 uint32_t shm_mmap_limit;
360 uint32_t request_limit;
361
362 pthread_mutex_t mutex;
363
364 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */
365 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
366
367 nxt_unit_port_t *router_port;
368 nxt_unit_port_t *shared_port;
369
370 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
371
372 nxt_unit_mmaps_t incoming;
373 nxt_unit_mmaps_t outgoing;
374
375 pid_t pid;
376 int log_fd;
377
378 nxt_unit_ctx_impl_t main_ctx;
379 };
380
381
382 struct nxt_unit_port_impl_s {
383 nxt_unit_port_t port;
384
385 nxt_atomic_t use_count;
386
387 /* for nxt_unit_process_t.ports */
388 nxt_queue_link_t link;
389 nxt_unit_process_t *process;
390
391 /* of nxt_unit_request_info_impl_t */
392 nxt_queue_t awaiting_req;
393
394 int ready;
395
396 void *queue;
397
398 int from_socket;
399 nxt_unit_read_buf_t *socket_rbuf;
400 };
401
402
403 struct nxt_unit_process_s {
404 pid_t pid;
405
406 nxt_queue_t ports; /* of nxt_unit_port_impl_t */
407
408 nxt_unit_impl_t *lib;
409
410 nxt_atomic_t use_count;
411
412 uint32_t next_port_id;
413 };
414
415
416 /* Explicitly using 32 bit types to avoid possible alignment. */
417 typedef struct {
418 int32_t pid;
419 uint32_t id;
420 } nxt_unit_port_hash_id_t;
421
422
423 static pid_t nxt_unit_pid;
424
425
426 nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t * init)427 nxt_unit_init(nxt_unit_init_t *init)
428 {
429 int rc, queue_fd, shared_queue_fd;
430 void *mem;
431 uint32_t ready_stream, shm_limit, request_limit;
432 nxt_unit_ctx_t *ctx;
433 nxt_unit_impl_t *lib;
434 nxt_unit_port_t ready_port, router_port, read_port, shared_port;
435
436 nxt_unit_pid = getpid();
437
438 lib = nxt_unit_create(init);
439 if (nxt_slow_path(lib == NULL)) {
440 return NULL;
441 }
442
443 queue_fd = -1;
444 mem = MAP_FAILED;
445 shared_port.out_fd = -1;
446 shared_port.data = NULL;
447
448 if (init->ready_port.id.pid != 0
449 && init->ready_stream != 0
450 && init->read_port.id.pid != 0)
451 {
452 ready_port = init->ready_port;
453 ready_stream = init->ready_stream;
454 router_port = init->router_port;
455 read_port = init->read_port;
456 lib->log_fd = init->log_fd;
457
458 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
459 ready_port.id.id);
460 nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
461 router_port.id.id);
462 nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
463 read_port.id.id);
464
465 shared_port.in_fd = init->shared_port_fd;
466 shared_queue_fd = init->shared_queue_fd;
467
468 } else {
469 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
470 &shared_port.in_fd, &shared_queue_fd,
471 &lib->log_fd, &ready_stream, &shm_limit,
472 &request_limit);
473 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
474 goto fail;
475 }
476
477 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
478 / PORT_MMAP_DATA_SIZE;
479 lib->request_limit = request_limit;
480 }
481
482 if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
483 lib->shm_mmap_limit = 1;
484 }
485
486 lib->pid = read_port.id.pid;
487 nxt_unit_pid = lib->pid;
488
489 ctx = &lib->main_ctx.ctx;
490
491 rc = nxt_unit_fd_blocking(router_port.out_fd);
492 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
493 goto fail;
494 }
495
496 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
497 if (nxt_slow_path(lib->router_port == NULL)) {
498 nxt_unit_alert(NULL, "failed to add router_port");
499
500 goto fail;
501 }
502
503 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
504 if (nxt_slow_path(queue_fd == -1)) {
505 goto fail;
506 }
507
508 mem = mmap(NULL, sizeof(nxt_port_queue_t),
509 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
510 if (nxt_slow_path(mem == MAP_FAILED)) {
511 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
512 strerror(errno), errno);
513
514 goto fail;
515 }
516
517 nxt_port_queue_init(mem);
518
519 rc = nxt_unit_fd_blocking(read_port.in_fd);
520 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
521 goto fail;
522 }
523
524 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
525 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
526 nxt_unit_alert(NULL, "failed to add read_port");
527
528 goto fail;
529 }
530
531 rc = nxt_unit_fd_blocking(ready_port.out_fd);
532 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
533 goto fail;
534 }
535
536 nxt_unit_port_id_init(&shared_port.id, read_port.id.pid,
537 NXT_UNIT_SHARED_PORT_ID);
538
539 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
540 MAP_SHARED, shared_queue_fd, 0);
541 if (nxt_slow_path(mem == MAP_FAILED)) {
542 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd,
543 strerror(errno), errno);
544
545 goto fail;
546 }
547
548 nxt_unit_close(shared_queue_fd);
549
550 lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem);
551 if (nxt_slow_path(lib->shared_port == NULL)) {
552 nxt_unit_alert(NULL, "failed to add shared_port");
553
554 goto fail;
555 }
556
557 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
558 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
559 nxt_unit_alert(NULL, "failed to send READY message");
560
561 goto fail;
562 }
563
564 nxt_unit_close(ready_port.out_fd);
565 nxt_unit_close(queue_fd);
566
567 return ctx;
568
569 fail:
570
571 if (mem != MAP_FAILED) {
572 munmap(mem, sizeof(nxt_port_queue_t));
573 }
574
575 if (queue_fd != -1) {
576 nxt_unit_close(queue_fd);
577 }
578
579 nxt_unit_ctx_release(&lib->main_ctx.ctx);
580
581 return NULL;
582 }
583
584
585 static nxt_unit_impl_t *
nxt_unit_create(nxt_unit_init_t * init)586 nxt_unit_create(nxt_unit_init_t *init)
587 {
588 int rc;
589 nxt_unit_impl_t *lib;
590 nxt_unit_callbacks_t *cb;
591
592 lib = nxt_unit_malloc(NULL,
593 sizeof(nxt_unit_impl_t) + init->request_data_size);
594 if (nxt_slow_path(lib == NULL)) {
595 nxt_unit_alert(NULL, "failed to allocate unit struct");
596
597 return NULL;
598 }
599
600 rc = pthread_mutex_init(&lib->mutex, NULL);
601 if (nxt_slow_path(rc != 0)) {
602 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
603
604 goto fail;
605 }
606
607 lib->unit.data = init->data;
608 lib->callbacks = init->callbacks;
609
610 lib->request_data_size = init->request_data_size;
611 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
612 / PORT_MMAP_DATA_SIZE;
613 lib->request_limit = init->request_limit;
614
615 lib->processes.slot = NULL;
616 lib->ports.slot = NULL;
617
618 lib->log_fd = STDERR_FILENO;
619
620 nxt_queue_init(&lib->contexts);
621
622 lib->use_count = 0;
623 lib->request_count = 0;
624 lib->router_port = NULL;
625 lib->shared_port = NULL;
626
627 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
628 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
629 pthread_mutex_destroy(&lib->mutex);
630 goto fail;
631 }
632
633 cb = &lib->callbacks;
634
635 if (cb->request_handler == NULL) {
636 nxt_unit_alert(NULL, "request_handler is NULL");
637
638 pthread_mutex_destroy(&lib->mutex);
639 goto fail;
640 }
641
642 nxt_unit_mmaps_init(&lib->incoming);
643 nxt_unit_mmaps_init(&lib->outgoing);
644
645 return lib;
646
647 fail:
648
649 nxt_unit_free(NULL, lib);
650
651 return NULL;
652 }
653
654
655 static int
nxt_unit_ctx_init(nxt_unit_impl_t * lib,nxt_unit_ctx_impl_t * ctx_impl,void * data)656 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
657 void *data)
658 {
659 int rc;
660
661 ctx_impl->ctx.data = data;
662 ctx_impl->ctx.unit = &lib->unit;
663
664 rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
665 if (nxt_slow_path(rc != 0)) {
666 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
667
668 return NXT_UNIT_ERROR;
669 }
670
671 nxt_unit_lib_use(lib);
672
673 pthread_mutex_lock(&lib->mutex);
674
675 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
676
677 pthread_mutex_unlock(&lib->mutex);
678
679 ctx_impl->use_count = 1;
680 ctx_impl->wait_items = 0;
681 ctx_impl->online = 1;
682 ctx_impl->ready = 0;
683 ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
684
685 nxt_queue_init(&ctx_impl->free_req);
686 nxt_queue_init(&ctx_impl->free_ws);
687 nxt_queue_init(&ctx_impl->active_req);
688 nxt_queue_init(&ctx_impl->ready_req);
689 nxt_queue_init(&ctx_impl->pending_rbuf);
690 nxt_queue_init(&ctx_impl->free_rbuf);
691
692 ctx_impl->free_buf = NULL;
693 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
694 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
695
696 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
697 nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
698
699 ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
700
701 ctx_impl->req.req.ctx = &ctx_impl->ctx;
702 ctx_impl->req.req.unit = &lib->unit;
703
704 ctx_impl->read_port = NULL;
705 ctx_impl->requests.slot = 0;
706
707 return NXT_UNIT_OK;
708 }
709
710
711 nxt_inline void
nxt_unit_ctx_use(nxt_unit_ctx_t * ctx)712 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
713 {
714 nxt_unit_ctx_impl_t *ctx_impl;
715
716 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
717
718 nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
719 }
720
721
722 nxt_inline void
nxt_unit_ctx_release(nxt_unit_ctx_t * ctx)723 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
724 {
725 long c;
726 nxt_unit_ctx_impl_t *ctx_impl;
727
728 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
729
730 c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
731
732 if (c == 1) {
733 nxt_unit_ctx_free(ctx_impl);
734 }
735 }
736
737
738 nxt_inline void
nxt_unit_lib_use(nxt_unit_impl_t * lib)739 nxt_unit_lib_use(nxt_unit_impl_t *lib)
740 {
741 nxt_atomic_fetch_add(&lib->use_count, 1);
742 }
743
744
745 nxt_inline void
nxt_unit_lib_release(nxt_unit_impl_t * lib)746 nxt_unit_lib_release(nxt_unit_impl_t *lib)
747 {
748 long c;
749 nxt_unit_process_t *process;
750
751 c = nxt_atomic_fetch_add(&lib->use_count, -1);
752
753 if (c == 1) {
754 for ( ;; ) {
755 pthread_mutex_lock(&lib->mutex);
756
757 process = nxt_unit_process_pop_first(lib);
758 if (process == NULL) {
759 pthread_mutex_unlock(&lib->mutex);
760
761 break;
762 }
763
764 nxt_unit_remove_process(lib, process);
765 }
766
767 pthread_mutex_destroy(&lib->mutex);
768
769 if (nxt_fast_path(lib->router_port != NULL)) {
770 nxt_unit_port_release(lib->router_port);
771 }
772
773 if (nxt_fast_path(lib->shared_port != NULL)) {
774 nxt_unit_port_release(lib->shared_port);
775 }
776
777 nxt_unit_mmaps_destroy(&lib->incoming);
778 nxt_unit_mmaps_destroy(&lib->outgoing);
779
780 nxt_unit_free(NULL, lib);
781 }
782 }
783
784
785 nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t ** head,nxt_unit_mmap_buf_t * mmap_buf)786 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
787 nxt_unit_mmap_buf_t *mmap_buf)
788 {
789 mmap_buf->next = *head;
790
791 if (mmap_buf->next != NULL) {
792 mmap_buf->next->prev = &mmap_buf->next;
793 }
794
795 *head = mmap_buf;
796 mmap_buf->prev = head;
797 }
798
799
800 nxt_inline void
nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t ** prev,nxt_unit_mmap_buf_t * mmap_buf)801 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
802 nxt_unit_mmap_buf_t *mmap_buf)
803 {
804 while (*prev != NULL) {
805 prev = &(*prev)->next;
806 }
807
808 nxt_unit_mmap_buf_insert(prev, mmap_buf);
809 }
810
811
812 nxt_inline void
nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t * mmap_buf)813 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
814 {
815 nxt_unit_mmap_buf_t **prev;
816
817 prev = mmap_buf->prev;
818
819 if (mmap_buf->next != NULL) {
820 mmap_buf->next->prev = prev;
821 }
822
823 if (prev != NULL) {
824 *prev = mmap_buf->next;
825 }
826 }
827
828
829 static int
nxt_unit_read_env(nxt_unit_port_t * ready_port,nxt_unit_port_t * router_port,nxt_unit_port_t * read_port,int * shared_port_fd,int * shared_queue_fd,int * log_fd,uint32_t * stream,uint32_t * shm_limit,uint32_t * request_limit)830 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
831 nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd,
832 int *log_fd, uint32_t *stream,
833 uint32_t *shm_limit, uint32_t *request_limit)
834 {
835 int rc;
836 int ready_fd, router_fd, read_in_fd, read_out_fd;
837 char *unit_init, *version_end, *vars;
838 size_t version_length;
839 int64_t ready_pid, router_pid, read_pid;
840 uint32_t ready_stream, router_id, ready_id, read_id;
841
842 unit_init = getenv(NXT_UNIT_INIT_ENV);
843 if (nxt_slow_path(unit_init == NULL)) {
844 nxt_unit_alert(NULL, "%s is not in the current environment",
845 NXT_UNIT_INIT_ENV);
846
847 return NXT_UNIT_ERROR;
848 }
849
850 version_end = strchr(unit_init, ';');
851 if (nxt_slow_path(version_end == NULL)) {
852 nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
853 NXT_UNIT_INIT_ENV, unit_init);
854
855 return NXT_UNIT_ERROR;
856 }
857
858 version_length = version_end - unit_init;
859
860 rc = version_length != nxt_length(NXT_VERSION)
861 || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
862
863 if (nxt_slow_path(rc != 0)) {
864 nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
865 "%.*s, while the app was compiled with libunit %s",
866 (int) version_length, unit_init, NXT_VERSION);
867
868 return NXT_UNIT_ERROR;
869 }
870
871 vars = version_end + 1;
872
873 rc = sscanf(vars,
874 "%"PRIu32";"
875 "%"PRId64",%"PRIu32",%d;"
876 "%"PRId64",%"PRIu32",%d;"
877 "%"PRId64",%"PRIu32",%d,%d;"
878 "%d,%d;"
879 "%d,%"PRIu32",%"PRIu32,
880 &ready_stream,
881 &ready_pid, &ready_id, &ready_fd,
882 &router_pid, &router_id, &router_fd,
883 &read_pid, &read_id, &read_in_fd, &read_out_fd,
884 shared_port_fd, shared_queue_fd,
885 log_fd, shm_limit, request_limit);
886
887 if (nxt_slow_path(rc == EOF)) {
888 nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
889 vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
890
891 return NXT_UNIT_ERROR;
892 }
893
894 if (nxt_slow_path(rc != 16)) {
895 nxt_unit_alert(NULL, "invalid number of variables in %s env: "
896 "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars);
897
898 return NXT_UNIT_ERROR;
899 }
900
901 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
902
903 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
904
905 ready_port->in_fd = -1;
906 ready_port->out_fd = ready_fd;
907 ready_port->data = NULL;
908
909 nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
910
911 router_port->in_fd = -1;
912 router_port->out_fd = router_fd;
913 router_port->data = NULL;
914
915 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
916
917 read_port->in_fd = read_in_fd;
918 read_port->out_fd = read_out_fd;
919 read_port->data = NULL;
920
921 *stream = ready_stream;
922
923 return NXT_UNIT_OK;
924 }
925
926
927 static int
nxt_unit_ready(nxt_unit_ctx_t * ctx,int ready_fd,uint32_t stream,int queue_fd)928 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
929 {
930 ssize_t res;
931 nxt_send_oob_t oob;
932 nxt_port_msg_t msg;
933 nxt_unit_impl_t *lib;
934 int fds[2] = {queue_fd, -1};
935
936 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
937
938 msg.stream = stream;
939 msg.pid = lib->pid;
940 msg.reply_port = 0;
941 msg.type = _NXT_PORT_MSG_PROCESS_READY;
942 msg.last = 1;
943 msg.mmap = 0;
944 msg.nf = 0;
945 msg.mf = 0;
946
947 nxt_socket_msg_oob_init(&oob, fds);
948
949 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
950 if (res != sizeof(msg)) {
951 return NXT_UNIT_ERROR;
952 }
953
954 return NXT_UNIT_OK;
955 }
956
957
958 static int
nxt_unit_process_msg(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf,nxt_unit_request_info_t ** preq)959 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
960 nxt_unit_request_info_t **preq)
961 {
962 int rc;
963 pid_t pid;
964 uint8_t quit_param;
965 nxt_port_msg_t *port_msg;
966 nxt_unit_impl_t *lib;
967 nxt_unit_recv_msg_t recv_msg;
968
969 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
970
971 recv_msg.incoming_buf = NULL;
972 recv_msg.fd[0] = -1;
973 recv_msg.fd[1] = -1;
974
975 rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
976 if (nxt_slow_path(rc != NXT_OK)) {
977 nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
978 rc = NXT_UNIT_ERROR;
979 goto done;
980 }
981
982 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
983 if (nxt_slow_path(rbuf->size == 0)) {
984 nxt_unit_debug(ctx, "read port closed");
985
986 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
987 rc = NXT_UNIT_OK;
988 goto done;
989 }
990
991 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
992
993 rc = NXT_UNIT_ERROR;
994 goto done;
995 }
996
997 port_msg = (nxt_port_msg_t *) rbuf->buf;
998
999 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
1000 port_msg->stream, (int) port_msg->type,
1001 recv_msg.fd[0], recv_msg.fd[1]);
1002
1003 recv_msg.stream = port_msg->stream;
1004 recv_msg.pid = port_msg->pid;
1005 recv_msg.reply_port = port_msg->reply_port;
1006 recv_msg.last = port_msg->last;
1007 recv_msg.mmap = port_msg->mmap;
1008
1009 recv_msg.start = port_msg + 1;
1010 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
1011
1012 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
1013 nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
1014 port_msg->stream, (int) port_msg->type);
1015 rc = NXT_UNIT_ERROR;
1016 goto done;
1017 }
1018
1019 /* Fragmentation is unsupported. */
1020 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
1021 nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
1022 port_msg->stream, (int) port_msg->type);
1023 rc = NXT_UNIT_ERROR;
1024 goto done;
1025 }
1026
1027 if (port_msg->mmap) {
1028 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
1029
1030 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1031 if (rc == NXT_UNIT_AGAIN) {
1032 recv_msg.fd[0] = -1;
1033 recv_msg.fd[1] = -1;
1034 }
1035
1036 goto done;
1037 }
1038 }
1039
1040 switch (port_msg->type) {
1041
1042 case _NXT_PORT_MSG_RPC_READY:
1043 rc = NXT_UNIT_OK;
1044 break;
1045
1046 case _NXT_PORT_MSG_QUIT:
1047 if (recv_msg.size == sizeof(quit_param)) {
1048 memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1049
1050 } else {
1051 quit_param = NXT_QUIT_NORMAL;
1052 }
1053
1054 nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1055 (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1056
1057 nxt_unit_quit(ctx, quit_param);
1058
1059 rc = NXT_UNIT_OK;
1060 break;
1061
1062 case _NXT_PORT_MSG_NEW_PORT:
1063 rc = nxt_unit_process_new_port(ctx, &recv_msg);
1064 break;
1065
1066 case _NXT_PORT_MSG_PORT_ACK:
1067 rc = nxt_unit_ctx_ready(ctx);
1068 break;
1069
1070 case _NXT_PORT_MSG_CHANGE_FILE:
1071 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1072 port_msg->stream, recv_msg.fd[0]);
1073
1074 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1075 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1076 port_msg->stream, recv_msg.fd[0], lib->log_fd,
1077 strerror(errno), errno);
1078
1079 rc = NXT_UNIT_ERROR;
1080 goto done;
1081 }
1082
1083 rc = NXT_UNIT_OK;
1084 break;
1085
1086 case _NXT_PORT_MSG_MMAP:
1087 if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1088 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1089 port_msg->stream, recv_msg.fd[0]);
1090
1091 rc = NXT_UNIT_ERROR;
1092 goto done;
1093 }
1094
1095 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1096 break;
1097
1098 case _NXT_PORT_MSG_REQ_HEADERS:
1099 rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1100 break;
1101
1102 case _NXT_PORT_MSG_REQ_BODY:
1103 rc = nxt_unit_process_req_body(ctx, &recv_msg);
1104 break;
1105
1106 case _NXT_PORT_MSG_WEBSOCKET:
1107 rc = nxt_unit_process_websocket(ctx, &recv_msg);
1108 break;
1109
1110 case _NXT_PORT_MSG_REMOVE_PID:
1111 if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1112 nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1113 "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1114 (int) sizeof(pid));
1115
1116 rc = NXT_UNIT_ERROR;
1117 goto done;
1118 }
1119
1120 memcpy(&pid, recv_msg.start, sizeof(pid));
1121
1122 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1123 port_msg->stream, (int) pid);
1124
1125 nxt_unit_remove_pid(lib, pid);
1126
1127 rc = NXT_UNIT_OK;
1128 break;
1129
1130 case _NXT_PORT_MSG_SHM_ACK:
1131 rc = nxt_unit_process_shm_ack(ctx);
1132 break;
1133
1134 default:
1135 nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1136 port_msg->stream, (int) port_msg->type);
1137
1138 rc = NXT_UNIT_ERROR;
1139 goto done;
1140 }
1141
1142 done:
1143
1144 if (recv_msg.fd[0] != -1) {
1145 nxt_unit_close(recv_msg.fd[0]);
1146 }
1147
1148 if (recv_msg.fd[1] != -1) {
1149 nxt_unit_close(recv_msg.fd[1]);
1150 }
1151
1152 while (recv_msg.incoming_buf != NULL) {
1153 nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1154 }
1155
1156 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1157 #if (NXT_DEBUG)
1158 memset(rbuf->buf, 0xAC, rbuf->size);
1159 #endif
1160 nxt_unit_read_buf_release(ctx, rbuf);
1161 }
1162
1163 return rc;
1164 }
1165
1166
1167 static int
nxt_unit_process_new_port(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1168 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1169 {
1170 void *mem;
1171 nxt_unit_port_t new_port, *port;
1172 nxt_port_msg_new_port_t *new_port_msg;
1173
1174 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1175 nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1176 "invalid message size (%d)",
1177 recv_msg->stream, (int) recv_msg->size);
1178
1179 return NXT_UNIT_ERROR;
1180 }
1181
1182 if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1183 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1184 recv_msg->stream, recv_msg->fd[0]);
1185
1186 return NXT_UNIT_ERROR;
1187 }
1188
1189 new_port_msg = recv_msg->start;
1190
1191 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1192 recv_msg->stream, (int) new_port_msg->pid,
1193 (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1194
1195 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) {
1196 return NXT_UNIT_ERROR;
1197 }
1198
1199 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id);
1200
1201 new_port.in_fd = -1;
1202 new_port.out_fd = recv_msg->fd[0];
1203
1204 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1205 MAP_SHARED, recv_msg->fd[1], 0);
1206
1207 if (nxt_slow_path(mem == MAP_FAILED)) {
1208 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1209 strerror(errno), errno);
1210
1211 return NXT_UNIT_ERROR;
1212 }
1213
1214 new_port.data = NULL;
1215
1216 recv_msg->fd[0] = -1;
1217
1218 port = nxt_unit_add_port(ctx, &new_port, mem);
1219 if (nxt_slow_path(port == NULL)) {
1220 return NXT_UNIT_ERROR;
1221 }
1222
1223 nxt_unit_port_release(port);
1224
1225 return NXT_UNIT_OK;
1226 }
1227
1228
1229 static int
nxt_unit_ctx_ready(nxt_unit_ctx_t * ctx)1230 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1231 {
1232 nxt_unit_impl_t *lib;
1233 nxt_unit_ctx_impl_t *ctx_impl;
1234
1235 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1236
1237 if (nxt_slow_path(ctx_impl->ready)) {
1238 return NXT_UNIT_OK;
1239 }
1240
1241 ctx_impl->ready = 1;
1242
1243 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1244
1245 /* Call ready_handler() only for main context. */
1246 if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1247 return lib->callbacks.ready_handler(ctx);
1248 }
1249
1250 if (&lib->main_ctx != ctx_impl) {
1251 /* Check if the main context is already stopped or quit. */
1252 if (nxt_slow_path(!lib->main_ctx.ready)) {
1253 ctx_impl->ready = 0;
1254
1255 nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1256
1257 return NXT_UNIT_OK;
1258 }
1259
1260 if (lib->callbacks.add_port != NULL) {
1261 lib->callbacks.add_port(ctx, lib->shared_port);
1262 }
1263 }
1264
1265 return NXT_UNIT_OK;
1266 }
1267
1268
1269 static int
nxt_unit_process_req_headers(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg,nxt_unit_request_info_t ** preq)1270 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1271 nxt_unit_request_info_t **preq)
1272 {
1273 int res;
1274 nxt_unit_impl_t *lib;
1275 nxt_unit_port_id_t port_id;
1276 nxt_unit_request_t *r;
1277 nxt_unit_mmap_buf_t *b;
1278 nxt_unit_request_info_t *req;
1279 nxt_unit_request_info_impl_t *req_impl;
1280
1281 if (nxt_slow_path(recv_msg->mmap == 0)) {
1282 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1283 recv_msg->stream);
1284
1285 return NXT_UNIT_ERROR;
1286 }
1287
1288 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1289 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1290 "%d expected", recv_msg->stream, (int) recv_msg->size,
1291 (int) sizeof(nxt_unit_request_t));
1292
1293 return NXT_UNIT_ERROR;
1294 }
1295
1296 req_impl = nxt_unit_request_info_get(ctx);
1297 if (nxt_slow_path(req_impl == NULL)) {
1298 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1299 recv_msg->stream);
1300
1301 return NXT_UNIT_ERROR;
1302 }
1303
1304 req = &req_impl->req;
1305
1306 req->request = recv_msg->start;
1307
1308 b = recv_msg->incoming_buf;
1309
1310 req->request_buf = &b->buf;
1311 req->response = NULL;
1312 req->response_buf = NULL;
1313
1314 r = req->request;
1315
1316 req->content_length = r->content_length;
1317
1318 req->content_buf = req->request_buf;
1319 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1320
1321 req_impl->stream = recv_msg->stream;
1322
1323 req_impl->outgoing_buf = NULL;
1324
1325 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1326 b->req = req;
1327 }
1328
1329 /* "Move" incoming buffer list to req_impl. */
1330 req_impl->incoming_buf = recv_msg->incoming_buf;
1331 req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1332 recv_msg->incoming_buf = NULL;
1333
1334 req->content_fd = recv_msg->fd[0];
1335 recv_msg->fd[0] = -1;
1336
1337 req->response_max_fields = 0;
1338 req_impl->state = NXT_UNIT_RS_START;
1339 req_impl->websocket = 0;
1340 req_impl->in_hash = 0;
1341
1342 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1343 (int) r->method_length,
1344 (char *) nxt_unit_sptr_get(&r->method),
1345 (int) r->target_length,
1346 (char *) nxt_unit_sptr_get(&r->target),
1347 (int) r->content_length);
1348
1349 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1350
1351 res = nxt_unit_request_check_response_port(req, &port_id);
1352 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1353 return NXT_UNIT_ERROR;
1354 }
1355
1356 if (nxt_fast_path(res == NXT_UNIT_OK)) {
1357 res = nxt_unit_send_req_headers_ack(req);
1358 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1359 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1360
1361 return NXT_UNIT_ERROR;
1362 }
1363
1364 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1365
1366 if (req->content_length
1367 > (uint64_t) (req->content_buf->end - req->content_buf->free))
1368 {
1369 res = nxt_unit_request_hash_add(ctx, req);
1370 if (nxt_slow_path(res != NXT_UNIT_OK)) {
1371 nxt_unit_req_warn(req, "failed to add request to hash");
1372
1373 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1374
1375 return NXT_UNIT_ERROR;
1376 }
1377
1378 /*
1379 * If application have separate data handler, we may start
1380 * request processing and process data when it is arrived.
1381 */
1382 if (lib->callbacks.data_handler == NULL) {
1383 return NXT_UNIT_OK;
1384 }
1385 }
1386
1387 if (preq == NULL) {
1388 lib->callbacks.request_handler(req);
1389
1390 } else {
1391 *preq = req;
1392 }
1393 }
1394
1395 return NXT_UNIT_OK;
1396 }
1397
1398
1399 static int
nxt_unit_process_req_body(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1400 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1401 {
1402 uint64_t l;
1403 nxt_unit_impl_t *lib;
1404 nxt_unit_mmap_buf_t *b;
1405 nxt_unit_request_info_t *req;
1406
1407 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1408 if (req == NULL) {
1409 return NXT_UNIT_OK;
1410 }
1411
1412 l = req->content_buf->end - req->content_buf->free;
1413
1414 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1415 b->req = req;
1416 l += b->buf.end - b->buf.free;
1417 }
1418
1419 if (recv_msg->incoming_buf != NULL) {
1420 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1421
1422 while (b->next != NULL) {
1423 b = b->next;
1424 }
1425
1426 /* "Move" incoming buffer list to req_impl. */
1427 b->next = recv_msg->incoming_buf;
1428 b->next->prev = &b->next;
1429
1430 recv_msg->incoming_buf = NULL;
1431 }
1432
1433 req->content_fd = recv_msg->fd[0];
1434 recv_msg->fd[0] = -1;
1435
1436 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1437
1438 if (lib->callbacks.data_handler != NULL) {
1439 lib->callbacks.data_handler(req);
1440
1441 return NXT_UNIT_OK;
1442 }
1443
1444 if (req->content_fd != -1 || l == req->content_length) {
1445 lib->callbacks.request_handler(req);
1446 }
1447
1448 return NXT_UNIT_OK;
1449 }
1450
1451
1452 static int
nxt_unit_request_check_response_port(nxt_unit_request_info_t * req,nxt_unit_port_id_t * port_id)1453 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1454 nxt_unit_port_id_t *port_id)
1455 {
1456 int res;
1457 nxt_unit_ctx_t *ctx;
1458 nxt_unit_impl_t *lib;
1459 nxt_unit_port_t *port;
1460 nxt_unit_process_t *process;
1461 nxt_unit_ctx_impl_t *ctx_impl;
1462 nxt_unit_port_impl_t *port_impl;
1463 nxt_unit_request_info_impl_t *req_impl;
1464
1465 ctx = req->ctx;
1466 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1467 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1468
1469 pthread_mutex_lock(&lib->mutex);
1470
1471 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1472 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1473
1474 if (nxt_fast_path(port != NULL)) {
1475 req->response_port = port;
1476
1477 if (nxt_fast_path(port_impl->ready)) {
1478 pthread_mutex_unlock(&lib->mutex);
1479
1480 nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1481 (int) port->id.pid, (int) port->id.id);
1482
1483 return NXT_UNIT_OK;
1484 }
1485
1486 nxt_unit_debug(ctx, "check_response_port: "
1487 "port{%d,%d} already requested",
1488 (int) port->id.pid, (int) port->id.id);
1489
1490 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1491
1492 nxt_queue_insert_tail(&port_impl->awaiting_req,
1493 &req_impl->port_wait_link);
1494
1495 pthread_mutex_unlock(&lib->mutex);
1496
1497 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1498
1499 return NXT_UNIT_AGAIN;
1500 }
1501
1502 port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1503 if (nxt_slow_path(port_impl == NULL)) {
1504 nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1505 (int) sizeof(nxt_unit_port_impl_t));
1506
1507 pthread_mutex_unlock(&lib->mutex);
1508
1509 return NXT_UNIT_ERROR;
1510 }
1511
1512 port = &port_impl->port;
1513
1514 port->id = *port_id;
1515 port->in_fd = -1;
1516 port->out_fd = -1;
1517 port->data = NULL;
1518
1519 res = nxt_unit_port_hash_add(&lib->ports, port);
1520 if (nxt_slow_path(res != NXT_UNIT_OK)) {
1521 nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1522 port->id.pid, port->id.id);
1523
1524 pthread_mutex_unlock(&lib->mutex);
1525
1526 nxt_unit_free(ctx, port);
1527
1528 return NXT_UNIT_ERROR;
1529 }
1530
1531 process = nxt_unit_process_find(lib, port_id->pid, 0);
1532 if (nxt_slow_path(process == NULL)) {
1533 nxt_unit_alert(ctx, "check_response_port: process %d not found",
1534 port->id.pid);
1535
1536 nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1537
1538 pthread_mutex_unlock(&lib->mutex);
1539
1540 nxt_unit_free(ctx, port);
1541
1542 return NXT_UNIT_ERROR;
1543 }
1544
1545 nxt_queue_insert_tail(&process->ports, &port_impl->link);
1546
1547 port_impl->process = process;
1548 port_impl->queue = NULL;
1549 port_impl->from_socket = 0;
1550 port_impl->socket_rbuf = NULL;
1551
1552 nxt_queue_init(&port_impl->awaiting_req);
1553
1554 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1555
1556 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1557
1558 port_impl->use_count = 2;
1559 port_impl->ready = 0;
1560
1561 req->response_port = port;
1562
1563 pthread_mutex_unlock(&lib->mutex);
1564
1565 res = nxt_unit_get_port(ctx, port_id);
1566 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1567 return NXT_UNIT_ERROR;
1568 }
1569
1570 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1571
1572 return NXT_UNIT_AGAIN;
1573 }
1574
1575
1576 static int
nxt_unit_send_req_headers_ack(nxt_unit_request_info_t * req)1577 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1578 {
1579 ssize_t res;
1580 nxt_port_msg_t msg;
1581 nxt_unit_impl_t *lib;
1582 nxt_unit_ctx_impl_t *ctx_impl;
1583 nxt_unit_request_info_impl_t *req_impl;
1584
1585 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1586 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1587 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1588
1589 memset(&msg, 0, sizeof(nxt_port_msg_t));
1590
1591 msg.stream = req_impl->stream;
1592 msg.pid = lib->pid;
1593 msg.reply_port = ctx_impl->read_port->id.id;
1594 msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1595
1596 res = nxt_unit_port_send(req->ctx, req->response_port,
1597 &msg, sizeof(msg), NULL);
1598 if (nxt_slow_path(res != sizeof(msg))) {
1599 return NXT_UNIT_ERROR;
1600 }
1601
1602 return NXT_UNIT_OK;
1603 }
1604
1605
1606 static int
nxt_unit_process_websocket(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1607 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1608 {
1609 size_t hsize;
1610 nxt_unit_impl_t *lib;
1611 nxt_unit_mmap_buf_t *b;
1612 nxt_unit_callbacks_t *cb;
1613 nxt_unit_request_info_t *req;
1614 nxt_unit_request_info_impl_t *req_impl;
1615 nxt_unit_websocket_frame_impl_t *ws_impl;
1616
1617 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1618 if (nxt_slow_path(req == NULL)) {
1619 return NXT_UNIT_OK;
1620 }
1621
1622 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1623
1624 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1625 cb = &lib->callbacks;
1626
1627 if (cb->websocket_handler && recv_msg->size >= 2) {
1628 ws_impl = nxt_unit_websocket_frame_get(ctx);
1629 if (nxt_slow_path(ws_impl == NULL)) {
1630 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1631 req_impl->stream);
1632
1633 return NXT_UNIT_ERROR;
1634 }
1635
1636 ws_impl->ws.req = req;
1637
1638 ws_impl->buf = NULL;
1639
1640 if (recv_msg->mmap) {
1641 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1642 b->req = req;
1643 }
1644
1645 /* "Move" incoming buffer list to ws_impl. */
1646 ws_impl->buf = recv_msg->incoming_buf;
1647 ws_impl->buf->prev = &ws_impl->buf;
1648 recv_msg->incoming_buf = NULL;
1649
1650 b = ws_impl->buf;
1651
1652 } else {
1653 b = nxt_unit_mmap_buf_get(ctx);
1654 if (nxt_slow_path(b == NULL)) {
1655 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1656 req_impl->stream);
1657
1658 nxt_unit_websocket_frame_release(&ws_impl->ws);
1659
1660 return NXT_UNIT_ERROR;
1661 }
1662
1663 b->req = req;
1664 b->buf.start = recv_msg->start;
1665 b->buf.free = b->buf.start;
1666 b->buf.end = b->buf.start + recv_msg->size;
1667
1668 nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1669 }
1670
1671 ws_impl->ws.header = (void *) b->buf.start;
1672 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1673 ws_impl->ws.header);
1674
1675 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1676
1677 if (ws_impl->ws.header->mask) {
1678 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1679
1680 } else {
1681 ws_impl->ws.mask = NULL;
1682 }
1683
1684 b->buf.free += hsize;
1685
1686 ws_impl->ws.content_buf = &b->buf;
1687 ws_impl->ws.content_length = ws_impl->ws.payload_len;
1688
1689 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1690 "payload_len=%"PRIu64,
1691 ws_impl->ws.header->opcode,
1692 ws_impl->ws.payload_len);
1693
1694 cb->websocket_handler(&ws_impl->ws);
1695 }
1696
1697 if (recv_msg->last) {
1698 if (cb->close_handler) {
1699 nxt_unit_req_debug(req, "close_handler");
1700
1701 cb->close_handler(req);
1702
1703 } else {
1704 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1705 }
1706 }
1707
1708 return NXT_UNIT_OK;
1709 }
1710
1711
1712 static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t * ctx)1713 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1714 {
1715 nxt_unit_impl_t *lib;
1716 nxt_unit_callbacks_t *cb;
1717
1718 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1719 cb = &lib->callbacks;
1720
1721 if (cb->shm_ack_handler != NULL) {
1722 cb->shm_ack_handler(ctx);
1723 }
1724
1725 return NXT_UNIT_OK;
1726 }
1727
1728
1729 static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t * ctx)1730 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1731 {
1732 nxt_unit_impl_t *lib;
1733 nxt_queue_link_t *lnk;
1734 nxt_unit_ctx_impl_t *ctx_impl;
1735 nxt_unit_request_info_impl_t *req_impl;
1736
1737 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1738
1739 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1740
1741 pthread_mutex_lock(&ctx_impl->mutex);
1742
1743 if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1744 pthread_mutex_unlock(&ctx_impl->mutex);
1745
1746 req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1747 + lib->request_data_size);
1748 if (nxt_slow_path(req_impl == NULL)) {
1749 return NULL;
1750 }
1751
1752 req_impl->req.unit = ctx->unit;
1753 req_impl->req.ctx = ctx;
1754
1755 pthread_mutex_lock(&ctx_impl->mutex);
1756
1757 } else {
1758 lnk = nxt_queue_first(&ctx_impl->free_req);
1759 nxt_queue_remove(lnk);
1760
1761 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1762 }
1763
1764 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1765
1766 pthread_mutex_unlock(&ctx_impl->mutex);
1767
1768 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1769
1770 return req_impl;
1771 }
1772
1773
1774 static void
nxt_unit_request_info_release(nxt_unit_request_info_t * req)1775 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1776 {
1777 nxt_unit_ctx_t *ctx;
1778 nxt_unit_ctx_impl_t *ctx_impl;
1779 nxt_unit_request_info_impl_t *req_impl;
1780
1781 ctx = req->ctx;
1782 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1783 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1784
1785 req->response = NULL;
1786 req->response_buf = NULL;
1787
1788 if (req_impl->in_hash) {
1789 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1790 }
1791
1792 while (req_impl->outgoing_buf != NULL) {
1793 nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1794 }
1795
1796 while (req_impl->incoming_buf != NULL) {
1797 nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1798 }
1799
1800 if (req->content_fd != -1) {
1801 nxt_unit_close(req->content_fd);
1802
1803 req->content_fd = -1;
1804 }
1805
1806 if (req->response_port != NULL) {
1807 nxt_unit_port_release(req->response_port);
1808
1809 req->response_port = NULL;
1810 }
1811
1812 req_impl->state = NXT_UNIT_RS_RELEASED;
1813
1814 pthread_mutex_lock(&ctx_impl->mutex);
1815
1816 nxt_queue_remove(&req_impl->link);
1817
1818 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1819
1820 pthread_mutex_unlock(&ctx_impl->mutex);
1821
1822 if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1823 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1824 }
1825 }
1826
1827
1828 static void
nxt_unit_request_info_free(nxt_unit_request_info_impl_t * req_impl)1829 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1830 {
1831 nxt_unit_ctx_impl_t *ctx_impl;
1832
1833 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1834
1835 nxt_queue_remove(&req_impl->link);
1836
1837 if (req_impl != &ctx_impl->req) {
1838 nxt_unit_free(&ctx_impl->ctx, req_impl);
1839 }
1840 }
1841
1842
1843 static nxt_unit_websocket_frame_impl_t *
nxt_unit_websocket_frame_get(nxt_unit_ctx_t * ctx)1844 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1845 {
1846 nxt_queue_link_t *lnk;
1847 nxt_unit_ctx_impl_t *ctx_impl;
1848 nxt_unit_websocket_frame_impl_t *ws_impl;
1849
1850 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1851
1852 pthread_mutex_lock(&ctx_impl->mutex);
1853
1854 if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1855 pthread_mutex_unlock(&ctx_impl->mutex);
1856
1857 ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1858 if (nxt_slow_path(ws_impl == NULL)) {
1859 return NULL;
1860 }
1861
1862 } else {
1863 lnk = nxt_queue_first(&ctx_impl->free_ws);
1864 nxt_queue_remove(lnk);
1865
1866 pthread_mutex_unlock(&ctx_impl->mutex);
1867
1868 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1869 }
1870
1871 ws_impl->ctx_impl = ctx_impl;
1872
1873 return ws_impl;
1874 }
1875
1876
1877 static void
nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t * ws)1878 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1879 {
1880 nxt_unit_websocket_frame_impl_t *ws_impl;
1881
1882 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1883
1884 while (ws_impl->buf != NULL) {
1885 nxt_unit_mmap_buf_free(ws_impl->buf);
1886 }
1887
1888 ws->req = NULL;
1889
1890 pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1891
1892 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1893
1894 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1895 }
1896
1897
1898 static void
nxt_unit_websocket_frame_free(nxt_unit_ctx_t * ctx,nxt_unit_websocket_frame_impl_t * ws_impl)1899 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1900 nxt_unit_websocket_frame_impl_t *ws_impl)
1901 {
1902 nxt_queue_remove(&ws_impl->link);
1903
1904 nxt_unit_free(ctx, ws_impl);
1905 }
1906
1907
1908 uint16_t
nxt_unit_field_hash(const char * name,size_t name_length)1909 nxt_unit_field_hash(const char *name, size_t name_length)
1910 {
1911 u_char ch;
1912 uint32_t hash;
1913 const char *p, *end;
1914
1915 hash = 159406; /* Magic value copied from nxt_http_parse.c */
1916 end = name + name_length;
1917
1918 for (p = name; p < end; p++) {
1919 ch = *p;
1920 hash = (hash << 4) + hash + nxt_lowcase(ch);
1921 }
1922
1923 hash = (hash >> 16) ^ hash;
1924
1925 return hash;
1926 }
1927
1928
1929 void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t * req)1930 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1931 {
1932 char *name;
1933 uint32_t i, j;
1934 nxt_unit_field_t *fields, f;
1935 nxt_unit_request_t *r;
1936
1937 static nxt_str_t content_length = nxt_string("content-length");
1938 static nxt_str_t content_type = nxt_string("content-type");
1939 static nxt_str_t cookie = nxt_string("cookie");
1940
1941 nxt_unit_req_debug(req, "group_dup_fields");
1942
1943 r = req->request;
1944 fields = r->fields;
1945
1946 for (i = 0; i < r->fields_count; i++) {
1947 name = nxt_unit_sptr_get(&fields[i].name);
1948
1949 switch (fields[i].hash) {
1950 case NXT_UNIT_HASH_CONTENT_LENGTH:
1951 if (fields[i].name_length == content_length.length
1952 && nxt_unit_memcasecmp(name, content_length.start,
1953 content_length.length) == 0)
1954 {
1955 r->content_length_field = i;
1956 }
1957
1958 break;
1959
1960 case NXT_UNIT_HASH_CONTENT_TYPE:
1961 if (fields[i].name_length == content_type.length
1962 && nxt_unit_memcasecmp(name, content_type.start,
1963 content_type.length) == 0)
1964 {
1965 r->content_type_field = i;
1966 }
1967
1968 break;
1969
1970 case NXT_UNIT_HASH_COOKIE:
1971 if (fields[i].name_length == cookie.length
1972 && nxt_unit_memcasecmp(name, cookie.start,
1973 cookie.length) == 0)
1974 {
1975 r->cookie_field = i;
1976 }
1977
1978 break;
1979 }
1980
1981 for (j = i + 1; j < r->fields_count; j++) {
1982 if (fields[i].hash != fields[j].hash
1983 || fields[i].name_length != fields[j].name_length
1984 || nxt_unit_memcasecmp(name,
1985 nxt_unit_sptr_get(&fields[j].name),
1986 fields[j].name_length) != 0)
1987 {
1988 continue;
1989 }
1990
1991 f = fields[j];
1992 f.value.offset += (j - (i + 1)) * sizeof(f);
1993
1994 while (j > i + 1) {
1995 fields[j] = fields[j - 1];
1996 fields[j].name.offset -= sizeof(f);
1997 fields[j].value.offset -= sizeof(f);
1998 j--;
1999 }
2000
2001 fields[j] = f;
2002
2003 /* Assign the same name pointer for further grouping simplicity. */
2004 nxt_unit_sptr_set(&fields[j].name, name);
2005
2006 i++;
2007 }
2008 }
2009 }
2010
2011
2012 int
nxt_unit_response_init(nxt_unit_request_info_t * req,uint16_t status,uint32_t max_fields_count,uint32_t max_fields_size)2013 nxt_unit_response_init(nxt_unit_request_info_t *req,
2014 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2015 {
2016 uint32_t buf_size;
2017 nxt_unit_buf_t *buf;
2018 nxt_unit_request_info_impl_t *req_impl;
2019
2020 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2021
2022 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2023 nxt_unit_req_warn(req, "init: response already sent");
2024
2025 return NXT_UNIT_ERROR;
2026 }
2027
2028 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2029 (int) max_fields_count, (int) max_fields_size);
2030
2031 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2032 nxt_unit_req_debug(req, "duplicate response init");
2033 }
2034
2035 /*
2036 * Each field name and value 0-terminated by libunit,
2037 * this is the reason of '+ 2' below.
2038 */
2039 buf_size = sizeof(nxt_unit_response_t)
2040 + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2041 + max_fields_size;
2042
2043 if (nxt_slow_path(req->response_buf != NULL)) {
2044 buf = req->response_buf;
2045
2046 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2047 goto init_response;
2048 }
2049
2050 nxt_unit_buf_free(buf);
2051
2052 req->response_buf = NULL;
2053 req->response = NULL;
2054 req->response_max_fields = 0;
2055
2056 req_impl->state = NXT_UNIT_RS_START;
2057 }
2058
2059 buf = nxt_unit_response_buf_alloc(req, buf_size);
2060 if (nxt_slow_path(buf == NULL)) {
2061 return NXT_UNIT_ERROR;
2062 }
2063
2064 init_response:
2065
2066 memset(buf->start, 0, sizeof(nxt_unit_response_t));
2067
2068 req->response_buf = buf;
2069
2070 req->response = (nxt_unit_response_t *) buf->start;
2071 req->response->status = status;
2072
2073 buf->free = buf->start + sizeof(nxt_unit_response_t)
2074 + max_fields_count * sizeof(nxt_unit_field_t);
2075
2076 req->response_max_fields = max_fields_count;
2077 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2078
2079 return NXT_UNIT_OK;
2080 }
2081
2082
2083 int
nxt_unit_response_realloc(nxt_unit_request_info_t * req,uint32_t max_fields_count,uint32_t max_fields_size)2084 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2085 uint32_t max_fields_count, uint32_t max_fields_size)
2086 {
2087 char *p;
2088 uint32_t i, buf_size;
2089 nxt_unit_buf_t *buf;
2090 nxt_unit_field_t *f, *src;
2091 nxt_unit_response_t *resp;
2092 nxt_unit_request_info_impl_t *req_impl;
2093
2094 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2095
2096 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2097 nxt_unit_req_warn(req, "realloc: response not init");
2098
2099 return NXT_UNIT_ERROR;
2100 }
2101
2102 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2103 nxt_unit_req_warn(req, "realloc: response already sent");
2104
2105 return NXT_UNIT_ERROR;
2106 }
2107
2108 if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2109 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2110
2111 return NXT_UNIT_ERROR;
2112 }
2113
2114 /*
2115 * Each field name and value 0-terminated by libunit,
2116 * this is the reason of '+ 2' below.
2117 */
2118 buf_size = sizeof(nxt_unit_response_t)
2119 + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2120 + max_fields_size;
2121
2122 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2123
2124 buf = nxt_unit_response_buf_alloc(req, buf_size);
2125 if (nxt_slow_path(buf == NULL)) {
2126 nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2127 return NXT_UNIT_ERROR;
2128 }
2129
2130 resp = (nxt_unit_response_t *) buf->start;
2131
2132 memset(resp, 0, sizeof(nxt_unit_response_t));
2133
2134 resp->status = req->response->status;
2135 resp->content_length = req->response->content_length;
2136
2137 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2138 f = resp->fields;
2139
2140 for (i = 0; i < req->response->fields_count; i++) {
2141 src = req->response->fields + i;
2142
2143 if (nxt_slow_path(src->skip != 0)) {
2144 continue;
2145 }
2146
2147 if (nxt_slow_path(src->name_length + src->value_length + 2
2148 > (uint32_t) (buf->end - p)))
2149 {
2150 nxt_unit_req_warn(req, "realloc: not enough space for field"
2151 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2152 i, src, src->name_length, src->value_length);
2153
2154 goto fail;
2155 }
2156
2157 nxt_unit_sptr_set(&f->name, p);
2158 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2159 *p++ = '\0';
2160
2161 nxt_unit_sptr_set(&f->value, p);
2162 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2163 *p++ = '\0';
2164
2165 f->hash = src->hash;
2166 f->skip = 0;
2167 f->name_length = src->name_length;
2168 f->value_length = src->value_length;
2169
2170 resp->fields_count++;
2171 f++;
2172 }
2173
2174 if (req->response->piggyback_content_length > 0) {
2175 if (nxt_slow_path(req->response->piggyback_content_length
2176 > (uint32_t) (buf->end - p)))
2177 {
2178 nxt_unit_req_warn(req, "realloc: not enought space for content"
2179 " #%"PRIu32", %"PRIu32" required",
2180 i, req->response->piggyback_content_length);
2181
2182 goto fail;
2183 }
2184
2185 resp->piggyback_content_length =
2186 req->response->piggyback_content_length;
2187
2188 nxt_unit_sptr_set(&resp->piggyback_content, p);
2189 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2190 req->response->piggyback_content_length);
2191 }
2192
2193 buf->free = p;
2194
2195 nxt_unit_buf_free(req->response_buf);
2196
2197 req->response = resp;
2198 req->response_buf = buf;
2199 req->response_max_fields = max_fields_count;
2200
2201 return NXT_UNIT_OK;
2202
2203 fail:
2204
2205 nxt_unit_buf_free(buf);
2206
2207 return NXT_UNIT_ERROR;
2208 }
2209
2210
2211 int
nxt_unit_response_is_init(nxt_unit_request_info_t * req)2212 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2213 {
2214 nxt_unit_request_info_impl_t *req_impl;
2215
2216 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2217
2218 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2219 }
2220
2221
2222 int
nxt_unit_response_add_field(nxt_unit_request_info_t * req,const char * name,uint8_t name_length,const char * value,uint32_t value_length)2223 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2224 const char *name, uint8_t name_length,
2225 const char *value, uint32_t value_length)
2226 {
2227 nxt_unit_buf_t *buf;
2228 nxt_unit_field_t *f;
2229 nxt_unit_response_t *resp;
2230 nxt_unit_request_info_impl_t *req_impl;
2231
2232 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2233
2234 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2235 nxt_unit_req_warn(req, "add_field: response not initialized or "
2236 "already sent");
2237
2238 return NXT_UNIT_ERROR;
2239 }
2240
2241 resp = req->response;
2242
2243 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2244 nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2245 (int) resp->fields_count);
2246
2247 return NXT_UNIT_ERROR;
2248 }
2249
2250 buf = req->response_buf;
2251
2252 if (nxt_slow_path(name_length + value_length + 2
2253 > (uint32_t) (buf->end - buf->free)))
2254 {
2255 nxt_unit_req_warn(req, "add_field: response buffer overflow");
2256
2257 return NXT_UNIT_ERROR;
2258 }
2259
2260 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2261 resp->fields_count,
2262 (int) name_length, name,
2263 (int) value_length, value);
2264
2265 f = resp->fields + resp->fields_count;
2266
2267 nxt_unit_sptr_set(&f->name, buf->free);
2268 buf->free = nxt_cpymem(buf->free, name, name_length);
2269 *buf->free++ = '\0';
2270
2271 nxt_unit_sptr_set(&f->value, buf->free);
2272 buf->free = nxt_cpymem(buf->free, value, value_length);
2273 *buf->free++ = '\0';
2274
2275 f->hash = nxt_unit_field_hash(name, name_length);
2276 f->skip = 0;
2277 f->name_length = name_length;
2278 f->value_length = value_length;
2279
2280 resp->fields_count++;
2281
2282 return NXT_UNIT_OK;
2283 }
2284
2285
2286 int
nxt_unit_response_add_content(nxt_unit_request_info_t * req,const void * src,uint32_t size)2287 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2288 const void* src, uint32_t size)
2289 {
2290 nxt_unit_buf_t *buf;
2291 nxt_unit_response_t *resp;
2292 nxt_unit_request_info_impl_t *req_impl;
2293
2294 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2295
2296 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2297 nxt_unit_req_warn(req, "add_content: response not initialized yet");
2298
2299 return NXT_UNIT_ERROR;
2300 }
2301
2302 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2303 nxt_unit_req_warn(req, "add_content: response already sent");
2304
2305 return NXT_UNIT_ERROR;
2306 }
2307
2308 buf = req->response_buf;
2309
2310 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2311 nxt_unit_req_warn(req, "add_content: buffer overflow");
2312
2313 return NXT_UNIT_ERROR;
2314 }
2315
2316 resp = req->response;
2317
2318 if (resp->piggyback_content_length == 0) {
2319 nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2320 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2321 }
2322
2323 resp->piggyback_content_length += size;
2324
2325 buf->free = nxt_cpymem(buf->free, src, size);
2326
2327 return NXT_UNIT_OK;
2328 }
2329
2330
2331 int
nxt_unit_response_send(nxt_unit_request_info_t * req)2332 nxt_unit_response_send(nxt_unit_request_info_t *req)
2333 {
2334 int rc;
2335 nxt_unit_mmap_buf_t *mmap_buf;
2336 nxt_unit_request_info_impl_t *req_impl;
2337
2338 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2339
2340 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2341 nxt_unit_req_warn(req, "send: response is not initialized yet");
2342
2343 return NXT_UNIT_ERROR;
2344 }
2345
2346 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2347 nxt_unit_req_warn(req, "send: response already sent");
2348
2349 return NXT_UNIT_ERROR;
2350 }
2351
2352 if (req->request->websocket_handshake && req->response->status == 101) {
2353 nxt_unit_response_upgrade(req);
2354 }
2355
2356 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2357 req->response->fields_count,
2358 (int) (req->response_buf->free
2359 - req->response_buf->start));
2360
2361 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2362
2363 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2364 if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2365 req->response = NULL;
2366 req->response_buf = NULL;
2367 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2368
2369 nxt_unit_mmap_buf_free(mmap_buf);
2370 }
2371
2372 return rc;
2373 }
2374
2375
2376 int
nxt_unit_response_is_sent(nxt_unit_request_info_t * req)2377 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2378 {
2379 nxt_unit_request_info_impl_t *req_impl;
2380
2381 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2382
2383 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2384 }
2385
2386
2387 nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t * req,uint32_t size)2388 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2389 {
2390 int rc;
2391 nxt_unit_mmap_buf_t *mmap_buf;
2392 nxt_unit_request_info_impl_t *req_impl;
2393
2394 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2395 nxt_unit_req_warn(req, "response_buf_alloc: "
2396 "requested buffer (%"PRIu32") too big", size);
2397
2398 return NULL;
2399 }
2400
2401 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2402
2403 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2404
2405 mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2406 if (nxt_slow_path(mmap_buf == NULL)) {
2407 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2408
2409 return NULL;
2410 }
2411
2412 mmap_buf->req = req;
2413
2414 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2415
2416 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2417 size, size, mmap_buf,
2418 NULL);
2419 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2420 nxt_unit_mmap_buf_release(mmap_buf);
2421
2422 nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2423
2424 return NULL;
2425 }
2426
2427 return &mmap_buf->buf;
2428 }
2429
2430
2431 static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t * ctx)2432 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2433 {
2434 nxt_unit_mmap_buf_t *mmap_buf;
2435 nxt_unit_ctx_impl_t *ctx_impl;
2436
2437 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2438
2439 pthread_mutex_lock(&ctx_impl->mutex);
2440
2441 if (ctx_impl->free_buf == NULL) {
2442 pthread_mutex_unlock(&ctx_impl->mutex);
2443
2444 mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2445 if (nxt_slow_path(mmap_buf == NULL)) {
2446 return NULL;
2447 }
2448
2449 } else {
2450 mmap_buf = ctx_impl->free_buf;
2451
2452 nxt_unit_mmap_buf_unlink(mmap_buf);
2453
2454 pthread_mutex_unlock(&ctx_impl->mutex);
2455 }
2456
2457 mmap_buf->ctx_impl = ctx_impl;
2458
2459 mmap_buf->hdr = NULL;
2460 mmap_buf->free_ptr = NULL;
2461
2462 return mmap_buf;
2463 }
2464
2465
2466 static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t * mmap_buf)2467 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2468 {
2469 nxt_unit_mmap_buf_unlink(mmap_buf);
2470
2471 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2472
2473 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2474
2475 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2476 }
2477
2478
2479 int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t * req)2480 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2481 {
2482 return req->request->websocket_handshake;
2483 }
2484
2485
2486 int
nxt_unit_response_upgrade(nxt_unit_request_info_t * req)2487 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2488 {
2489 int rc;
2490 nxt_unit_request_info_impl_t *req_impl;
2491
2492 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2493
2494 if (nxt_slow_path(req_impl->websocket != 0)) {
2495 nxt_unit_req_debug(req, "upgrade: already upgraded");
2496
2497 return NXT_UNIT_OK;
2498 }
2499
2500 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2501 nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2502
2503 return NXT_UNIT_ERROR;
2504 }
2505
2506 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2507 nxt_unit_req_warn(req, "upgrade: response already sent");
2508
2509 return NXT_UNIT_ERROR;
2510 }
2511
2512 rc = nxt_unit_request_hash_add(req->ctx, req);
2513 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2514 nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2515
2516 return NXT_UNIT_ERROR;
2517 }
2518
2519 req_impl->websocket = 1;
2520
2521 req->response->status = 101;
2522
2523 return NXT_UNIT_OK;
2524 }
2525
2526
2527 int
nxt_unit_response_is_websocket(nxt_unit_request_info_t * req)2528 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2529 {
2530 nxt_unit_request_info_impl_t *req_impl;
2531
2532 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2533
2534 return req_impl->websocket;
2535 }
2536
2537
2538 nxt_unit_request_info_t *
nxt_unit_get_request_info_from_data(void * data)2539 nxt_unit_get_request_info_from_data(void *data)
2540 {
2541 nxt_unit_request_info_impl_t *req_impl;
2542
2543 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2544
2545 return &req_impl->req;
2546 }
2547
2548
2549 int
nxt_unit_buf_send(nxt_unit_buf_t * buf)2550 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2551 {
2552 int rc;
2553 nxt_unit_mmap_buf_t *mmap_buf;
2554 nxt_unit_request_info_t *req;
2555 nxt_unit_request_info_impl_t *req_impl;
2556
2557 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2558
2559 req = mmap_buf->req;
2560 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2561
2562 nxt_unit_req_debug(req, "buf_send: %d bytes",
2563 (int) (buf->free - buf->start));
2564
2565 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2566 nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2567
2568 return NXT_UNIT_ERROR;
2569 }
2570
2571 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2572 nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2573
2574 return NXT_UNIT_ERROR;
2575 }
2576
2577 if (nxt_fast_path(buf->free > buf->start)) {
2578 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2579 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2580 return rc;
2581 }
2582 }
2583
2584 nxt_unit_mmap_buf_free(mmap_buf);
2585
2586 return NXT_UNIT_OK;
2587 }
2588
2589
2590 static void
nxt_unit_buf_send_done(nxt_unit_buf_t * buf)2591 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2592 {
2593 int rc;
2594 nxt_unit_mmap_buf_t *mmap_buf;
2595 nxt_unit_request_info_t *req;
2596
2597 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2598
2599 req = mmap_buf->req;
2600
2601 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2602 if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2603 nxt_unit_mmap_buf_free(mmap_buf);
2604
2605 nxt_unit_request_info_release(req);
2606
2607 } else {
2608 nxt_unit_request_done(req, rc);
2609 }
2610 }
2611
2612
2613 static int
nxt_unit_mmap_buf_send(nxt_unit_request_info_t * req,nxt_unit_mmap_buf_t * mmap_buf,int last)2614 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2615 nxt_unit_mmap_buf_t *mmap_buf, int last)
2616 {
2617 struct {
2618 nxt_port_msg_t msg;
2619 nxt_port_mmap_msg_t mmap_msg;
2620 } m;
2621
2622 int rc;
2623 u_char *last_used, *first_free;
2624 ssize_t res;
2625 nxt_chunk_id_t first_free_chunk;
2626 nxt_unit_buf_t *buf;
2627 nxt_unit_impl_t *lib;
2628 nxt_port_mmap_header_t *hdr;
2629 nxt_unit_request_info_impl_t *req_impl;
2630
2631 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2632 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2633
2634 buf = &mmap_buf->buf;
2635 hdr = mmap_buf->hdr;
2636
2637 m.mmap_msg.size = buf->free - buf->start;
2638
2639 m.msg.stream = req_impl->stream;
2640 m.msg.pid = lib->pid;
2641 m.msg.reply_port = 0;
2642 m.msg.type = _NXT_PORT_MSG_DATA;
2643 m.msg.last = last != 0;
2644 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2645 m.msg.nf = 0;
2646 m.msg.mf = 0;
2647
2648 rc = NXT_UNIT_ERROR;
2649
2650 if (m.msg.mmap) {
2651 m.mmap_msg.mmap_id = hdr->id;
2652 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2653 (u_char *) buf->start);
2654
2655 nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2656 req_impl->stream,
2657 (int) m.mmap_msg.mmap_id,
2658 (int) m.mmap_msg.chunk_id,
2659 (int) m.mmap_msg.size);
2660
2661 res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2662 NULL);
2663 if (nxt_slow_path(res != sizeof(m))) {
2664 goto free_buf;
2665 }
2666
2667 last_used = (u_char *) buf->free - 1;
2668 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2669
2670 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2671 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2672
2673 buf->start = (char *) first_free;
2674 buf->free = buf->start;
2675
2676 if (buf->end < buf->start) {
2677 buf->end = buf->start;
2678 }
2679
2680 } else {
2681 buf->start = NULL;
2682 buf->free = NULL;
2683 buf->end = NULL;
2684
2685 mmap_buf->hdr = NULL;
2686 }
2687
2688 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2689 (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2690
2691 nxt_unit_debug(req->ctx, "allocated_chunks %d",
2692 (int) lib->outgoing.allocated_chunks);
2693
2694 } else {
2695 if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2696 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2697 {
2698 nxt_unit_alert(req->ctx,
2699 "#%"PRIu32": failed to send plain memory buffer"
2700 ": no space reserved for message header",
2701 req_impl->stream);
2702
2703 goto free_buf;
2704 }
2705
2706 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2707
2708 nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2709 req_impl->stream,
2710 (int) (sizeof(m.msg) + m.mmap_msg.size));
2711
2712 res = nxt_unit_port_send(req->ctx, req->response_port,
2713 buf->start - sizeof(m.msg),
2714 m.mmap_msg.size + sizeof(m.msg), NULL);
2715
2716 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2717 goto free_buf;
2718 }
2719 }
2720
2721 rc = NXT_UNIT_OK;
2722
2723 free_buf:
2724
2725 nxt_unit_free_outgoing_buf(mmap_buf);
2726
2727 return rc;
2728 }
2729
2730
2731 void
nxt_unit_buf_free(nxt_unit_buf_t * buf)2732 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2733 {
2734 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2735 }
2736
2737
2738 static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t * mmap_buf)2739 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2740 {
2741 nxt_unit_free_outgoing_buf(mmap_buf);
2742
2743 nxt_unit_mmap_buf_release(mmap_buf);
2744 }
2745
2746
2747 static void
nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t * mmap_buf)2748 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2749 {
2750 if (mmap_buf->hdr != NULL) {
2751 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2752 mmap_buf->hdr, mmap_buf->buf.start,
2753 mmap_buf->buf.end - mmap_buf->buf.start);
2754
2755 mmap_buf->hdr = NULL;
2756
2757 return;
2758 }
2759
2760 if (mmap_buf->free_ptr != NULL) {
2761 nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2762
2763 mmap_buf->free_ptr = NULL;
2764 }
2765 }
2766
2767
2768 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t * ctx)2769 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2770 {
2771 nxt_unit_ctx_impl_t *ctx_impl;
2772 nxt_unit_read_buf_t *rbuf;
2773
2774 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2775
2776 pthread_mutex_lock(&ctx_impl->mutex);
2777
2778 rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2779
2780 pthread_mutex_unlock(&ctx_impl->mutex);
2781
2782 rbuf->oob.size = 0;
2783
2784 return rbuf;
2785 }
2786
2787
2788 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t * ctx_impl)2789 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2790 {
2791 nxt_queue_link_t *link;
2792 nxt_unit_read_buf_t *rbuf;
2793
2794 if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2795 link = nxt_queue_first(&ctx_impl->free_rbuf);
2796 nxt_queue_remove(link);
2797
2798 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2799
2800 return rbuf;
2801 }
2802
2803 rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2804
2805 if (nxt_fast_path(rbuf != NULL)) {
2806 rbuf->ctx_impl = ctx_impl;
2807 }
2808
2809 return rbuf;
2810 }
2811
2812
2813 static void
nxt_unit_read_buf_release(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf)2814 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2815 nxt_unit_read_buf_t *rbuf)
2816 {
2817 nxt_unit_ctx_impl_t *ctx_impl;
2818
2819 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2820
2821 pthread_mutex_lock(&ctx_impl->mutex);
2822
2823 nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2824
2825 pthread_mutex_unlock(&ctx_impl->mutex);
2826 }
2827
2828
2829 nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t * buf)2830 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2831 {
2832 nxt_unit_mmap_buf_t *mmap_buf;
2833
2834 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2835
2836 if (mmap_buf->next == NULL) {
2837 return NULL;
2838 }
2839
2840 return &mmap_buf->next->buf;
2841 }
2842
2843
2844 uint32_t
nxt_unit_buf_max(void)2845 nxt_unit_buf_max(void)
2846 {
2847 return PORT_MMAP_DATA_SIZE;
2848 }
2849
2850
2851 uint32_t
nxt_unit_buf_min(void)2852 nxt_unit_buf_min(void)
2853 {
2854 return PORT_MMAP_CHUNK_SIZE;
2855 }
2856
2857
2858 int
nxt_unit_response_write(nxt_unit_request_info_t * req,const void * start,size_t size)2859 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2860 size_t size)
2861 {
2862 ssize_t res;
2863
2864 res = nxt_unit_response_write_nb(req, start, size, size);
2865
2866 return res < 0 ? -res : NXT_UNIT_OK;
2867 }
2868
2869
2870 ssize_t
nxt_unit_response_write_nb(nxt_unit_request_info_t * req,const void * start,size_t size,size_t min_size)2871 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2872 size_t size, size_t min_size)
2873 {
2874 int rc;
2875 ssize_t sent;
2876 uint32_t part_size, min_part_size, buf_size;
2877 const char *part_start;
2878 nxt_unit_mmap_buf_t mmap_buf;
2879 nxt_unit_request_info_impl_t *req_impl;
2880 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2881
2882 nxt_unit_req_debug(req, "write: %d", (int) size);
2883
2884 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2885
2886 part_start = start;
2887 sent = 0;
2888
2889 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2890 nxt_unit_req_alert(req, "write: response not initialized yet");
2891
2892 return -NXT_UNIT_ERROR;
2893 }
2894
2895 /* Check if response is not send yet. */
2896 if (nxt_slow_path(req->response_buf != NULL)) {
2897 part_size = req->response_buf->end - req->response_buf->free;
2898 part_size = nxt_min(size, part_size);
2899
2900 rc = nxt_unit_response_add_content(req, part_start, part_size);
2901 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2902 return -rc;
2903 }
2904
2905 rc = nxt_unit_response_send(req);
2906 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2907 return -rc;
2908 }
2909
2910 size -= part_size;
2911 part_start += part_size;
2912 sent += part_size;
2913
2914 min_size -= nxt_min(min_size, part_size);
2915 }
2916
2917 while (size > 0) {
2918 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2919 min_part_size = nxt_min(min_size, part_size);
2920 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2921
2922 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2923 min_part_size, &mmap_buf, local_buf);
2924 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2925 return -rc;
2926 }
2927
2928 buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2929 if (nxt_slow_path(buf_size == 0)) {
2930 return sent;
2931 }
2932 part_size = nxt_min(buf_size, part_size);
2933
2934 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2935 part_start, part_size);
2936
2937 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2938 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2939 return -rc;
2940 }
2941
2942 size -= part_size;
2943 part_start += part_size;
2944 sent += part_size;
2945
2946 min_size -= nxt_min(min_size, part_size);
2947 }
2948
2949 return sent;
2950 }
2951
2952
2953 int
nxt_unit_response_write_cb(nxt_unit_request_info_t * req,nxt_unit_read_info_t * read_info)2954 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2955 nxt_unit_read_info_t *read_info)
2956 {
2957 int rc;
2958 ssize_t n;
2959 uint32_t buf_size;
2960 nxt_unit_buf_t *buf;
2961 nxt_unit_mmap_buf_t mmap_buf;
2962 nxt_unit_request_info_impl_t *req_impl;
2963 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2964
2965 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2966
2967 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2968 nxt_unit_req_alert(req, "write: response not initialized yet");
2969
2970 return NXT_UNIT_ERROR;
2971 }
2972
2973 /* Check if response is not send yet. */
2974 if (nxt_slow_path(req->response_buf != NULL)) {
2975
2976 /* Enable content in headers buf. */
2977 rc = nxt_unit_response_add_content(req, "", 0);
2978 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2979 nxt_unit_req_error(req, "Failed to add piggyback content");
2980
2981 return rc;
2982 }
2983
2984 buf = req->response_buf;
2985
2986 while (buf->end - buf->free > 0) {
2987 n = read_info->read(read_info, buf->free, buf->end - buf->free);
2988 if (nxt_slow_path(n < 0)) {
2989 nxt_unit_req_error(req, "Read error");
2990
2991 return NXT_UNIT_ERROR;
2992 }
2993
2994 /* Manually increase sizes. */
2995 buf->free += n;
2996 req->response->piggyback_content_length += n;
2997
2998 if (read_info->eof) {
2999 break;
3000 }
3001 }
3002
3003 rc = nxt_unit_response_send(req);
3004 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3005 nxt_unit_req_error(req, "Failed to send headers with content");
3006
3007 return rc;
3008 }
3009
3010 if (read_info->eof) {
3011 return NXT_UNIT_OK;
3012 }
3013 }
3014
3015 while (!read_info->eof) {
3016 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3017 read_info->buf_size);
3018
3019 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3020
3021 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3022 buf_size, buf_size,
3023 &mmap_buf, local_buf);
3024 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3025 return rc;
3026 }
3027
3028 buf = &mmap_buf.buf;
3029
3030 while (!read_info->eof && buf->end > buf->free) {
3031 n = read_info->read(read_info, buf->free, buf->end - buf->free);
3032 if (nxt_slow_path(n < 0)) {
3033 nxt_unit_req_error(req, "Read error");
3034
3035 nxt_unit_free_outgoing_buf(&mmap_buf);
3036
3037 return NXT_UNIT_ERROR;
3038 }
3039
3040 buf->free += n;
3041 }
3042
3043 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3044 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3045 nxt_unit_req_error(req, "Failed to send content");
3046
3047 return rc;
3048 }
3049 }
3050
3051 return NXT_UNIT_OK;
3052 }
3053
3054
3055 ssize_t
nxt_unit_request_read(nxt_unit_request_info_t * req,void * dst,size_t size)3056 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3057 {
3058 ssize_t buf_res, res;
3059
3060 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3061 dst, size);
3062
3063 if (buf_res < (ssize_t) size && req->content_fd != -1) {
3064 res = read(req->content_fd, dst, size);
3065 if (nxt_slow_path(res < 0)) {
3066 nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3067 strerror(errno), errno);
3068
3069 return res;
3070 }
3071
3072 if (res < (ssize_t) size) {
3073 nxt_unit_close(req->content_fd);
3074
3075 req->content_fd = -1;
3076 }
3077
3078 req->content_length -= res;
3079
3080 dst = nxt_pointer_to(dst, res);
3081
3082 } else {
3083 res = 0;
3084 }
3085
3086 return buf_res + res;
3087 }
3088
3089
3090 ssize_t
nxt_unit_request_readline_size(nxt_unit_request_info_t * req,size_t max_size)3091 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3092 {
3093 char *p;
3094 size_t l_size, b_size;
3095 nxt_unit_buf_t *b;
3096 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf;
3097
3098 if (req->content_length == 0) {
3099 return 0;
3100 }
3101
3102 l_size = 0;
3103
3104 b = req->content_buf;
3105
3106 while (b != NULL) {
3107 b_size = b->end - b->free;
3108 p = memchr(b->free, '\n', b_size);
3109
3110 if (p != NULL) {
3111 p++;
3112 l_size += p - b->free;
3113 break;
3114 }
3115