1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6 #include "nxt_main.h"
7 #include "nxt_port_memory_int.h"
8 #include "nxt_socket_msg.h"
9 #include "nxt_port_queue.h"
10 #include "nxt_app_queue.h"
11
12 #include "nxt_unit.h"
13 #include "nxt_unit_request.h"
14 #include "nxt_unit_response.h"
15 #include "nxt_unit_websocket.h"
16
17 #include "nxt_websocket.h"
18
19 #if (NXT_HAVE_MEMFD_CREATE)
20 #include <linux/memfd.h>
21 #endif
22
23 #define NXT_UNIT_MAX_PLAIN_SIZE 1024
24 #define NXT_UNIT_LOCAL_BUF_SIZE \
25 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
26
27 enum {
28 NXT_QUIT_NORMAL = 0,
29 NXT_QUIT_GRACEFUL = 1,
30 };
31
32 typedef struct nxt_unit_impl_s nxt_unit_impl_t;
33 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
34 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
35 typedef struct nxt_unit_process_s nxt_unit_process_t;
36 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
37 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
38 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t;
39 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
40 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
41 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
42 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
43
44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
46 nxt_unit_ctx_impl_t *ctx_impl, void *data);
47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
52 nxt_unit_mmap_buf_t *mmap_buf);
53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
54 nxt_unit_mmap_buf_t *mmap_buf);
55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
57 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
58 int *shared_port_fd, int *shared_queue_fd,
59 int *log_fd, uint32_t *stream, uint32_t *shm_limit,
60 uint32_t *request_limit);
61 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
62 int queue_fd);
63 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
64 nxt_unit_request_info_t **preq);
65 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
66 nxt_unit_recv_msg_t *recv_msg);
67 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
68 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
69 nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
70 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
71 nxt_unit_recv_msg_t *recv_msg);
72 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
73 nxt_unit_port_id_t *port_id);
74 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
75 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
76 nxt_unit_recv_msg_t *recv_msg);
77 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
78 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
79 nxt_unit_ctx_t *ctx);
80 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
81 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
82 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
83 nxt_unit_ctx_t *ctx);
84 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
85 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
86 nxt_unit_websocket_frame_impl_t *ws);
87 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
88 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
89 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
90 nxt_unit_mmap_buf_t *mmap_buf, int last);
91 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
92 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
94 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
95 nxt_unit_ctx_impl_t *ctx_impl);
96 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
97 nxt_unit_read_buf_t *rbuf);
98 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
99 nxt_unit_request_info_t *req, size_t size);
100 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
101 size_t size);
102 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
103 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
104 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
105 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
106 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
107 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
108 nxt_unit_port_t *port, int n);
109 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
110 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
111 int fd);
112 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
113 nxt_unit_port_t *port, uint32_t size,
114 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
115 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
116
117 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
118 nxt_unit_ctx_impl_t *ctx_impl);
119 static int nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
120 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
121 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
122 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
123 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
124 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
125 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
126 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
127 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
128 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
129 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
130 nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
131 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
132
133 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
135 pid_t pid, int remove);
136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
137 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
138 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
139 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
140 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
141 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
142 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
145 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
146 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
147 nxt_unit_port_t *port);
148 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
149 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
150
151 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
152 nxt_unit_port_t *port, int queue_fd);
153
154 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
155 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
156 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
157 nxt_unit_port_t *port, void *queue);
158 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
159 nxt_queue_t *awaiting_req);
160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
161 nxt_unit_port_id_t *port_id);
162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
163 nxt_unit_port_id_t *port_id);
164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
166 nxt_unit_process_t *process);
167 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
168 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
169 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
170 nxt_unit_port_t *port, const void *buf, size_t buf_size,
171 const nxt_send_oob_t *oob);
172 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
173 const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
174 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
175 nxt_unit_read_buf_t *rbuf);
176 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
177 nxt_unit_read_buf_t *src);
178 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
179 nxt_unit_read_buf_t *rbuf);
180 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
181 nxt_unit_read_buf_t *rbuf);
182 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
183 nxt_unit_read_buf_t *rbuf);
184 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
185 nxt_unit_read_buf_t *rbuf);
186 nxt_inline int nxt_unit_close(int fd);
187 static int nxt_unit_fd_blocking(int fd);
188
189 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
190 nxt_unit_port_t *port);
191 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
192 nxt_unit_port_id_t *port_id, int remove);
193
194 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
195 nxt_unit_request_info_t *req);
196 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
197 nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
198
199 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid,
200 int level);
201 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
202 static void nxt_unit_lvlhsh_free(void *data, void *p);
203 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
204
205
206 struct nxt_unit_mmap_buf_s {
207 nxt_unit_buf_t buf;
208
209 nxt_unit_mmap_buf_t *next;
210 nxt_unit_mmap_buf_t **prev;
211
212 nxt_port_mmap_header_t *hdr;
213 nxt_unit_request_info_t *req;
214 nxt_unit_ctx_impl_t *ctx_impl;
215 char *free_ptr;
216 char *plain_ptr;
217 };
218
219
220 struct nxt_unit_recv_msg_s {
221 uint32_t stream;
222 nxt_pid_t pid;
223 nxt_port_id_t reply_port;
224
225 uint8_t last; /* 1 bit */
226 uint8_t mmap; /* 1 bit */
227
228 void *start;
229 uint32_t size;
230
231 int fd[2];
232
233 nxt_unit_mmap_buf_t *incoming_buf;
234 };
235
236
237 typedef enum {
238 NXT_UNIT_RS_START = 0,
239 NXT_UNIT_RS_RESPONSE_INIT,
240 NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
241 NXT_UNIT_RS_RESPONSE_SENT,
242 NXT_UNIT_RS_RELEASED,
243 } nxt_unit_req_state_t;
244
245
246 struct nxt_unit_request_info_impl_s {
247 nxt_unit_request_info_t req;
248
249 uint32_t stream;
250
251 nxt_unit_mmap_buf_t *outgoing_buf;
252 nxt_unit_mmap_buf_t *incoming_buf;
253
254 nxt_unit_req_state_t state;
255 uint8_t websocket;
256 uint8_t in_hash;
257
258 /* for nxt_unit_ctx_impl_t.free_req or active_req */
259 nxt_queue_link_t link;
260 /* for nxt_unit_port_impl_t.awaiting_req */
261 nxt_queue_link_t port_wait_link;
262
263 char extra_data[];
264 };
265
266
267 struct nxt_unit_websocket_frame_impl_s {
268 nxt_unit_websocket_frame_t ws;
269
270 nxt_unit_mmap_buf_t *buf;
271
272 nxt_queue_link_t link;
273
274 nxt_unit_ctx_impl_t *ctx_impl;
275 };
276
277
278 struct nxt_unit_read_buf_s {
279 nxt_queue_link_t link;
280 nxt_unit_ctx_impl_t *ctx_impl;
281 ssize_t size;
282 nxt_recv_oob_t oob;
283 char buf[16384];
284 };
285
286
287 struct nxt_unit_ctx_impl_s {
288 nxt_unit_ctx_t ctx;
289
290 nxt_atomic_t use_count;
291 nxt_atomic_t wait_items;
292
293 pthread_mutex_t mutex;
294
295 nxt_unit_port_t *read_port;
296
297 nxt_queue_link_t link;
298
299 nxt_unit_mmap_buf_t *free_buf;
300
301 /* of nxt_unit_request_info_impl_t */
302 nxt_queue_t free_req;
303
304 /* of nxt_unit_websocket_frame_impl_t */
305 nxt_queue_t free_ws;
306
307 /* of nxt_unit_request_info_impl_t */
308 nxt_queue_t active_req;
309
310 /* of nxt_unit_request_info_impl_t */
311 nxt_lvlhsh_t requests;
312
313 /* of nxt_unit_request_info_impl_t */
314 nxt_queue_t ready_req;
315
316 /* of nxt_unit_read_buf_t */
317 nxt_queue_t pending_rbuf;
318
319 /* of nxt_unit_read_buf_t */
320 nxt_queue_t free_rbuf;
321
322 uint8_t online; /* 1 bit */
323 uint8_t ready; /* 1 bit */
324 uint8_t quit_param;
325
326 nxt_unit_mmap_buf_t ctx_buf[2];
327 nxt_unit_read_buf_t ctx_read_buf;
328
329 nxt_unit_request_info_impl_t req;
330 };
331
332
333 struct nxt_unit_mmap_s {
334 nxt_port_mmap_header_t *hdr;
335 pthread_t src_thread;
336
337 /* of nxt_unit_read_buf_t */
338 nxt_queue_t awaiting_rbuf;
339 };
340
341
342 struct nxt_unit_mmaps_s {
343 pthread_mutex_t mutex;
344 uint32_t size;
345 uint32_t cap;
346 nxt_atomic_t allocated_chunks;
347 nxt_unit_mmap_t *elts;
348 };
349
350
351 struct nxt_unit_impl_s {
352 nxt_unit_t unit;
353 nxt_unit_callbacks_t callbacks;
354
355 nxt_atomic_t use_count;
356 nxt_atomic_t request_count;
357
358 uint32_t request_data_size;
359 uint32_t shm_mmap_limit;
360 uint32_t request_limit;
361
362 pthread_mutex_t mutex;
363
364 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */
365 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
366
367 nxt_unit_port_t *router_port;
368 nxt_unit_port_t *shared_port;
369
370 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
371
372 nxt_unit_mmaps_t incoming;
373 nxt_unit_mmaps_t outgoing;
374
375 pid_t pid;
376 int log_fd;
377
378 nxt_unit_ctx_impl_t main_ctx;
379 };
380
381
382 struct nxt_unit_port_impl_s {
383 nxt_unit_port_t port;
384
385 nxt_atomic_t use_count;
386
387 /* for nxt_unit_process_t.ports */
388 nxt_queue_link_t link;
389 nxt_unit_process_t *process;
390
391 /* of nxt_unit_request_info_impl_t */
392 nxt_queue_t awaiting_req;
393
394 int ready;
395
396 void *queue;
397
398 int from_socket;
399 nxt_unit_read_buf_t *socket_rbuf;
400 };
401
402
403 struct nxt_unit_process_s {
404 pid_t pid;
405
406 nxt_queue_t ports; /* of nxt_unit_port_impl_t */
407
408 nxt_unit_impl_t *lib;
409
410 nxt_atomic_t use_count;
411
412 uint32_t next_port_id;
413 };
414
415
416 /* Explicitly using 32 bit types to avoid possible alignment. */
417 typedef struct {
418 int32_t pid;
419 uint32_t id;
420 } nxt_unit_port_hash_id_t;
421
422
423 static pid_t nxt_unit_pid;
424
425
426 nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t * init)427 nxt_unit_init(nxt_unit_init_t *init)
428 {
429 int rc, queue_fd, shared_queue_fd;
430 void *mem;
431 uint32_t ready_stream, shm_limit, request_limit;
432 nxt_unit_ctx_t *ctx;
433 nxt_unit_impl_t *lib;
434 nxt_unit_port_t ready_port, router_port, read_port, shared_port;
435
436 nxt_unit_pid = getpid();
437
438 lib = nxt_unit_create(init);
439 if (nxt_slow_path(lib == NULL)) {
440 return NULL;
441 }
442
443 queue_fd = -1;
444 mem = MAP_FAILED;
445 shared_port.out_fd = -1;
446 shared_port.data = NULL;
447
448 if (init->ready_port.id.pid != 0
449 && init->ready_stream != 0
450 && init->read_port.id.pid != 0)
451 {
452 ready_port = init->ready_port;
453 ready_stream = init->ready_stream;
454 router_port = init->router_port;
455 read_port = init->read_port;
456 lib->log_fd = init->log_fd;
457
458 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
459 ready_port.id.id);
460 nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
461 router_port.id.id);
462 nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
463 read_port.id.id);
464
465 shared_port.in_fd = init->shared_port_fd;
466 shared_queue_fd = init->shared_queue_fd;
467
468 } else {
469 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
470 &shared_port.in_fd, &shared_queue_fd,
471 &lib->log_fd, &ready_stream, &shm_limit,
472 &request_limit);
473 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
474 goto fail;
475 }
476
477 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
478 / PORT_MMAP_DATA_SIZE;
479 lib->request_limit = request_limit;
480 }
481
482 if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
483 lib->shm_mmap_limit = 1;
484 }
485
486 lib->pid = read_port.id.pid;
487 nxt_unit_pid = lib->pid;
488
489 ctx = &lib->main_ctx.ctx;
490
491 rc = nxt_unit_fd_blocking(router_port.out_fd);
492 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
493 goto fail;
494 }
495
496 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
497 if (nxt_slow_path(lib->router_port == NULL)) {
498 nxt_unit_alert(NULL, "failed to add router_port");
499
500 goto fail;
501 }
502
503 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
504 if (nxt_slow_path(queue_fd == -1)) {
505 goto fail;
506 }
507
508 mem = mmap(NULL, sizeof(nxt_port_queue_t),
509 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
510 if (nxt_slow_path(mem == MAP_FAILED)) {
511 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
512 strerror(errno), errno);
513
514 goto fail;
515 }
516
517 nxt_port_queue_init(mem);
518
519 rc = nxt_unit_fd_blocking(read_port.in_fd);
520 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
521 goto fail;
522 }
523
524 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
525 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
526 nxt_unit_alert(NULL, "failed to add read_port");
527
528 goto fail;
529 }
530
531 rc = nxt_unit_fd_blocking(ready_port.out_fd);
532 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
533 goto fail;
534 }
535
536 nxt_unit_port_id_init(&shared_port.id, read_port.id.pid,
537 NXT_UNIT_SHARED_PORT_ID);
538
539 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
540 MAP_SHARED, shared_queue_fd, 0);
541 if (nxt_slow_path(mem == MAP_FAILED)) {
542 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd,
543 strerror(errno), errno);
544
545 goto fail;
546 }
547
548 nxt_unit_close(shared_queue_fd);
549
550 lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem);
551 if (nxt_slow_path(lib->shared_port == NULL)) {
552 nxt_unit_alert(NULL, "failed to add shared_port");
553
554 goto fail;
555 }
556
557 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
558 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
559 nxt_unit_alert(NULL, "failed to send READY message");
560
561 goto fail;
562 }
563
564 nxt_unit_close(ready_port.out_fd);
565 nxt_unit_close(queue_fd);
566
567 return ctx;
568
569 fail:
570
571 if (mem != MAP_FAILED) {
572 munmap(mem, sizeof(nxt_port_queue_t));
573 }
574
575 if (queue_fd != -1) {
576 nxt_unit_close(queue_fd);
577 }
578
579 nxt_unit_ctx_release(&lib->main_ctx.ctx);
580
581 return NULL;
582 }
583
584
585 static nxt_unit_impl_t *
nxt_unit_create(nxt_unit_init_t * init)586 nxt_unit_create(nxt_unit_init_t *init)
587 {
588 int rc;
589 nxt_unit_impl_t *lib;
590
591 if (nxt_slow_path(init->callbacks.request_handler == NULL)) {
592 nxt_unit_alert(NULL, "request_handler is NULL");
593
594 return NULL;
595 }
596
597 lib = nxt_unit_malloc(NULL,
598 sizeof(nxt_unit_impl_t) + init->request_data_size);
599 if (nxt_slow_path(lib == NULL)) {
600 nxt_unit_alert(NULL, "failed to allocate unit struct");
601
602 return NULL;
603 }
604
605 rc = pthread_mutex_init(&lib->mutex, NULL);
606 if (nxt_slow_path(rc != 0)) {
607 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
608
609 goto out_unit_free;
610 }
611
612 lib->unit.data = init->data;
613 lib->callbacks = init->callbacks;
614
615 lib->request_data_size = init->request_data_size;
616 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
617 / PORT_MMAP_DATA_SIZE;
618 lib->request_limit = init->request_limit;
619
620 lib->processes.slot = NULL;
621 lib->ports.slot = NULL;
622
623 lib->log_fd = STDERR_FILENO;
624
625 nxt_queue_init(&lib->contexts);
626
627 lib->use_count = 0;
628 lib->request_count = 0;
629 lib->router_port = NULL;
630 lib->shared_port = NULL;
631
632 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
633 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
634 goto out_mutex_destroy;
635 }
636
637 rc = nxt_unit_mmaps_init(&lib->incoming);
638 if (nxt_slow_path(rc != 0)) {
639 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
640
641 goto out_ctx_free;
642 }
643
644 rc = nxt_unit_mmaps_init(&lib->outgoing);
645 if (nxt_slow_path(rc != 0)) {
646 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
647
648 goto out_mmaps_destroy;
649 }
650
651 return lib;
652
653 out_mmaps_destroy:
654 nxt_unit_mmaps_destroy(&lib->incoming);
655
656 out_ctx_free:
657 nxt_unit_ctx_free(&lib->main_ctx);
658
659 out_mutex_destroy:
660 pthread_mutex_destroy(&lib->mutex);
661
662 out_unit_free:
663 nxt_unit_free(NULL, lib);
664
665 return NULL;
666 }
667
668
669 static int
nxt_unit_ctx_init(nxt_unit_impl_t * lib,nxt_unit_ctx_impl_t * ctx_impl,void * data)670 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
671 void *data)
672 {
673 int rc;
674
675 ctx_impl->ctx.data = data;
676 ctx_impl->ctx.unit = &lib->unit;
677
678 rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
679 if (nxt_slow_path(rc != 0)) {
680 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
681
682 return NXT_UNIT_ERROR;
683 }
684
685 nxt_unit_lib_use(lib);
686
687 pthread_mutex_lock(&lib->mutex);
688
689 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
690
691 pthread_mutex_unlock(&lib->mutex);
692
693 ctx_impl->use_count = 1;
694 ctx_impl->wait_items = 0;
695 ctx_impl->online = 1;
696 ctx_impl->ready = 0;
697 ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
698
699 nxt_queue_init(&ctx_impl->free_req);
700 nxt_queue_init(&ctx_impl->free_ws);
701 nxt_queue_init(&ctx_impl->active_req);
702 nxt_queue_init(&ctx_impl->ready_req);
703 nxt_queue_init(&ctx_impl->pending_rbuf);
704 nxt_queue_init(&ctx_impl->free_rbuf);
705
706 ctx_impl->free_buf = NULL;
707 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
708 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
709
710 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
711 nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
712
713 ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
714
715 ctx_impl->req.req.ctx = &ctx_impl->ctx;
716 ctx_impl->req.req.unit = &lib->unit;
717
718 ctx_impl->read_port = NULL;
719 ctx_impl->requests.slot = 0;
720
721 return NXT_UNIT_OK;
722 }
723
724
725 nxt_inline void
nxt_unit_ctx_use(nxt_unit_ctx_t * ctx)726 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
727 {
728 nxt_unit_ctx_impl_t *ctx_impl;
729
730 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
731
732 nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
733 }
734
735
736 nxt_inline void
nxt_unit_ctx_release(nxt_unit_ctx_t * ctx)737 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
738 {
739 long c;
740 nxt_unit_ctx_impl_t *ctx_impl;
741
742 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
743
744 c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
745
746 if (c == 1) {
747 nxt_unit_ctx_free(ctx_impl);
748 }
749 }
750
751
752 nxt_inline void
nxt_unit_lib_use(nxt_unit_impl_t * lib)753 nxt_unit_lib_use(nxt_unit_impl_t *lib)
754 {
755 nxt_atomic_fetch_add(&lib->use_count, 1);
756 }
757
758
759 nxt_inline void
nxt_unit_lib_release(nxt_unit_impl_t * lib)760 nxt_unit_lib_release(nxt_unit_impl_t *lib)
761 {
762 long c;
763 nxt_unit_process_t *process;
764
765 c = nxt_atomic_fetch_add(&lib->use_count, -1);
766
767 if (c == 1) {
768 for ( ;; ) {
769 pthread_mutex_lock(&lib->mutex);
770
771 process = nxt_unit_process_pop_first(lib);
772 if (process == NULL) {
773 pthread_mutex_unlock(&lib->mutex);
774
775 break;
776 }
777
778 nxt_unit_remove_process(lib, process);
779 }
780
781 pthread_mutex_destroy(&lib->mutex);
782
783 if (nxt_fast_path(lib->router_port != NULL)) {
784 nxt_unit_port_release(lib->router_port);
785 }
786
787 if (nxt_fast_path(lib->shared_port != NULL)) {
788 nxt_unit_port_release(lib->shared_port);
789 }
790
791 nxt_unit_mmaps_destroy(&lib->incoming);
792 nxt_unit_mmaps_destroy(&lib->outgoing);
793
794 nxt_unit_free(NULL, lib);
795 }
796 }
797
798
799 nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t ** head,nxt_unit_mmap_buf_t * mmap_buf)800 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
801 nxt_unit_mmap_buf_t *mmap_buf)
802 {
803 mmap_buf->next = *head;
804
805 if (mmap_buf->next != NULL) {
806 mmap_buf->next->prev = &mmap_buf->next;
807 }
808
809 *head = mmap_buf;
810 mmap_buf->prev = head;
811 }
812
813
814 nxt_inline void
nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t ** prev,nxt_unit_mmap_buf_t * mmap_buf)815 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
816 nxt_unit_mmap_buf_t *mmap_buf)
817 {
818 while (*prev != NULL) {
819 prev = &(*prev)->next;
820 }
821
822 nxt_unit_mmap_buf_insert(prev, mmap_buf);
823 }
824
825
826 nxt_inline void
nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t * mmap_buf)827 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
828 {
829 nxt_unit_mmap_buf_t **prev;
830
831 prev = mmap_buf->prev;
832
833 if (mmap_buf->next != NULL) {
834 mmap_buf->next->prev = prev;
835 }
836
837 if (prev != NULL) {
838 *prev = mmap_buf->next;
839 }
840 }
841
842
843 static int
nxt_unit_read_env(nxt_unit_port_t * ready_port,nxt_unit_port_t * router_port,nxt_unit_port_t * read_port,int * shared_port_fd,int * shared_queue_fd,int * log_fd,uint32_t * stream,uint32_t * shm_limit,uint32_t * request_limit)844 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
845 nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd,
846 int *log_fd, uint32_t *stream,
847 uint32_t *shm_limit, uint32_t *request_limit)
848 {
849 int rc;
850 int ready_fd, router_fd, read_in_fd, read_out_fd;
851 char *unit_init, *version_end, *vars;
852 size_t version_length;
853 int64_t ready_pid, router_pid, read_pid;
854 uint32_t ready_stream, router_id, ready_id, read_id;
855
856 unit_init = getenv(NXT_UNIT_INIT_ENV);
857 if (nxt_slow_path(unit_init == NULL)) {
858 nxt_unit_alert(NULL, "%s is not in the current environment",
859 NXT_UNIT_INIT_ENV);
860
861 return NXT_UNIT_ERROR;
862 }
863
864 version_end = strchr(unit_init, ';');
865 if (nxt_slow_path(version_end == NULL)) {
866 nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
867 NXT_UNIT_INIT_ENV, unit_init);
868
869 return NXT_UNIT_ERROR;
870 }
871
872 version_length = version_end - unit_init;
873
874 rc = version_length != nxt_length(NXT_VERSION)
875 || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
876
877 if (nxt_slow_path(rc != 0)) {
878 nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
879 "%.*s, while the app was compiled with libunit %s",
880 (int) version_length, unit_init, NXT_VERSION);
881
882 return NXT_UNIT_ERROR;
883 }
884
885 vars = version_end + 1;
886
887 rc = sscanf(vars,
888 "%"PRIu32";"
889 "%"PRId64",%"PRIu32",%d;"
890 "%"PRId64",%"PRIu32",%d;"
891 "%"PRId64",%"PRIu32",%d,%d;"
892 "%d,%d;"
893 "%d,%"PRIu32",%"PRIu32,
894 &ready_stream,
895 &ready_pid, &ready_id, &ready_fd,
896 &router_pid, &router_id, &router_fd,
897 &read_pid, &read_id, &read_in_fd, &read_out_fd,
898 shared_port_fd, shared_queue_fd,
899 log_fd, shm_limit, request_limit);
900
901 if (nxt_slow_path(rc == EOF)) {
902 nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
903 vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
904
905 return NXT_UNIT_ERROR;
906 }
907
908 if (nxt_slow_path(rc != 16)) {
909 nxt_unit_alert(NULL, "invalid number of variables in %s env: "
910 "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars);
911
912 return NXT_UNIT_ERROR;
913 }
914
915 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
916
917 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
918
919 ready_port->in_fd = -1;
920 ready_port->out_fd = ready_fd;
921 ready_port->data = NULL;
922
923 nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
924
925 router_port->in_fd = -1;
926 router_port->out_fd = router_fd;
927 router_port->data = NULL;
928
929 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
930
931 read_port->in_fd = read_in_fd;
932 read_port->out_fd = read_out_fd;
933 read_port->data = NULL;
934
935 *stream = ready_stream;
936
937 return NXT_UNIT_OK;
938 }
939
940
941 static int
nxt_unit_ready(nxt_unit_ctx_t * ctx,int ready_fd,uint32_t stream,int queue_fd)942 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
943 {
944 ssize_t res;
945 nxt_send_oob_t oob;
946 nxt_port_msg_t msg;
947 nxt_unit_impl_t *lib;
948 int fds[2] = {queue_fd, -1};
949
950 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
951
952 msg.stream = stream;
953 msg.pid = lib->pid;
954 msg.reply_port = 0;
955 msg.type = _NXT_PORT_MSG_PROCESS_READY;
956 msg.last = 1;
957 msg.mmap = 0;
958 msg.nf = 0;
959 msg.mf = 0;
960
961 nxt_socket_msg_oob_init(&oob, fds);
962
963 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
964 if (res != sizeof(msg)) {
965 return NXT_UNIT_ERROR;
966 }
967
968 return NXT_UNIT_OK;
969 }
970
971
972 static int
nxt_unit_process_msg(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf,nxt_unit_request_info_t ** preq)973 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
974 nxt_unit_request_info_t **preq)
975 {
976 int rc;
977 pid_t pid;
978 uint8_t quit_param;
979 nxt_port_msg_t *port_msg;
980 nxt_unit_impl_t *lib;
981 nxt_unit_recv_msg_t recv_msg;
982
983 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
984
985 recv_msg.incoming_buf = NULL;
986 recv_msg.fd[0] = -1;
987 recv_msg.fd[1] = -1;
988
989 rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
990 if (nxt_slow_path(rc != NXT_OK)) {
991 nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
992 rc = NXT_UNIT_ERROR;
993 goto done;
994 }
995
996 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
997 if (nxt_slow_path(rbuf->size == 0)) {
998 nxt_unit_debug(ctx, "read port closed");
999
1000 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1001 rc = NXT_UNIT_OK;
1002 goto done;
1003 }
1004
1005 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
1006
1007 rc = NXT_UNIT_ERROR;
1008 goto done;
1009 }
1010
1011 port_msg = (nxt_port_msg_t *) rbuf->buf;
1012
1013 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
1014 port_msg->stream, (int) port_msg->type,
1015 recv_msg.fd[0], recv_msg.fd[1]);
1016
1017 recv_msg.stream = port_msg->stream;
1018 recv_msg.pid = port_msg->pid;
1019 recv_msg.reply_port = port_msg->reply_port;
1020 recv_msg.last = port_msg->last;
1021 recv_msg.mmap = port_msg->mmap;
1022
1023 recv_msg.start = port_msg + 1;
1024 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
1025
1026 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
1027 nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
1028 port_msg->stream, (int) port_msg->type);
1029 rc = NXT_UNIT_ERROR;
1030 goto done;
1031 }
1032
1033 /* Fragmentation is unsupported. */
1034 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
1035 nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
1036 port_msg->stream, (int) port_msg->type);
1037 rc = NXT_UNIT_ERROR;
1038 goto done;
1039 }
1040
1041 if (port_msg->mmap) {
1042 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
1043
1044 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1045 if (rc == NXT_UNIT_AGAIN) {
1046 recv_msg.fd[0] = -1;
1047 recv_msg.fd[1] = -1;
1048 }
1049
1050 goto done;
1051 }
1052 }
1053
1054 switch (port_msg->type) {
1055
1056 case _NXT_PORT_MSG_RPC_READY:
1057 rc = NXT_UNIT_OK;
1058 break;
1059
1060 case _NXT_PORT_MSG_QUIT:
1061 if (recv_msg.size == sizeof(quit_param)) {
1062 memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1063
1064 } else {
1065 quit_param = NXT_QUIT_NORMAL;
1066 }
1067
1068 nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1069 (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1070
1071 nxt_unit_quit(ctx, quit_param);
1072
1073 rc = NXT_UNIT_OK;
1074 break;
1075
1076 case _NXT_PORT_MSG_NEW_PORT:
1077 rc = nxt_unit_process_new_port(ctx, &recv_msg);
1078 break;
1079
1080 case _NXT_PORT_MSG_PORT_ACK:
1081 rc = nxt_unit_ctx_ready(ctx);
1082 break;
1083
1084 case _NXT_PORT_MSG_CHANGE_FILE:
1085 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1086 port_msg->stream, recv_msg.fd[0]);
1087
1088 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1089 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1090 port_msg->stream, recv_msg.fd[0], lib->log_fd,
1091 strerror(errno), errno);
1092
1093 rc = NXT_UNIT_ERROR;
1094 goto done;
1095 }
1096
1097 rc = NXT_UNIT_OK;
1098 break;
1099
1100 case _NXT_PORT_MSG_MMAP:
1101 if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1102 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1103 port_msg->stream, recv_msg.fd[0]);
1104
1105 rc = NXT_UNIT_ERROR;
1106 goto done;
1107 }
1108
1109 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1110 break;
1111
1112 case _NXT_PORT_MSG_REQ_HEADERS:
1113 rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1114 break;
1115
1116 case _NXT_PORT_MSG_REQ_BODY:
1117 rc = nxt_unit_process_req_body(ctx, &recv_msg);
1118 break;
1119
1120 case _NXT_PORT_MSG_WEBSOCKET:
1121 rc = nxt_unit_process_websocket(ctx, &recv_msg);
1122 break;
1123
1124 case _NXT_PORT_MSG_REMOVE_PID:
1125 if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1126 nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1127 "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1128 (int) sizeof(pid));
1129
1130 rc = NXT_UNIT_ERROR;
1131 goto done;
1132 }
1133
1134 memcpy(&pid, recv_msg.start, sizeof(pid));
1135
1136 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1137 port_msg->stream, (int) pid);
1138
1139 nxt_unit_remove_pid(lib, pid);
1140
1141 rc = NXT_UNIT_OK;
1142 break;
1143
1144 case _NXT_PORT_MSG_SHM_ACK:
1145 rc = nxt_unit_process_shm_ack(ctx);
1146 break;
1147
1148 default:
1149 nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1150 port_msg->stream, (int) port_msg->type);
1151
1152 rc = NXT_UNIT_ERROR;
1153 goto done;
1154 }
1155
1156 done:
1157
1158 if (recv_msg.fd[0] != -1) {
1159 nxt_unit_close(recv_msg.fd[0]);
1160 }
1161
1162 if (recv_msg.fd[1] != -1) {
1163 nxt_unit_close(recv_msg.fd[1]);
1164 }
1165
1166 while (recv_msg.incoming_buf != NULL) {
1167 nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1168 }
1169
1170 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1171 #if (NXT_DEBUG)
1172 memset(rbuf->buf, 0xAC, rbuf->size);
1173 #endif
1174 nxt_unit_read_buf_release(ctx, rbuf);
1175 }
1176
1177 return rc;
1178 }
1179
1180
1181 static int
nxt_unit_process_new_port(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1182 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1183 {
1184 void *mem;
1185 nxt_unit_port_t new_port, *port;
1186 nxt_port_msg_new_port_t *new_port_msg;
1187
1188 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1189 nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1190 "invalid message size (%d)",
1191 recv_msg->stream, (int) recv_msg->size);
1192
1193 return NXT_UNIT_ERROR;
1194 }
1195
1196 if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1197 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1198 recv_msg->stream, recv_msg->fd[0]);
1199
1200 return NXT_UNIT_ERROR;
1201 }
1202
1203 new_port_msg = recv_msg->start;
1204
1205 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1206 recv_msg->stream, (int) new_port_msg->pid,
1207 (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1208
1209 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) {
1210 return NXT_UNIT_ERROR;
1211 }
1212
1213 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id);
1214
1215 new_port.in_fd = -1;
1216 new_port.out_fd = recv_msg->fd[0];
1217
1218 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1219 MAP_SHARED, recv_msg->fd[1], 0);
1220
1221 if (nxt_slow_path(mem == MAP_FAILED)) {
1222 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1223 strerror(errno), errno);
1224
1225 return NXT_UNIT_ERROR;
1226 }
1227
1228 new_port.data = NULL;
1229
1230 recv_msg->fd[0] = -1;
1231
1232 port = nxt_unit_add_port(ctx, &new_port, mem);
1233 if (nxt_slow_path(port == NULL)) {
1234 return NXT_UNIT_ERROR;
1235 }
1236
1237 nxt_unit_port_release(port);
1238
1239 return NXT_UNIT_OK;
1240 }
1241
1242
1243 static int
nxt_unit_ctx_ready(nxt_unit_ctx_t * ctx)1244 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1245 {
1246 nxt_unit_impl_t *lib;
1247 nxt_unit_ctx_impl_t *ctx_impl;
1248
1249 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1250
1251 if (nxt_slow_path(ctx_impl->ready)) {
1252 return NXT_UNIT_OK;
1253 }
1254
1255 ctx_impl->ready = 1;
1256
1257 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1258
1259 /* Call ready_handler() only for main context. */
1260 if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1261 return lib->callbacks.ready_handler(ctx);
1262 }
1263
1264 if (&lib->main_ctx != ctx_impl) {
1265 /* Check if the main context is already stopped or quit. */
1266 if (nxt_slow_path(!lib->main_ctx.ready)) {
1267 ctx_impl->ready = 0;
1268
1269 nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1270
1271 return NXT_UNIT_OK;
1272 }
1273
1274 if (lib->callbacks.add_port != NULL) {
1275 lib->callbacks.add_port(ctx, lib->shared_port);
1276 }
1277 }
1278
1279 return NXT_UNIT_OK;
1280 }
1281
1282
1283 static int
nxt_unit_process_req_headers(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg,nxt_unit_request_info_t ** preq)1284 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1285 nxt_unit_request_info_t **preq)
1286 {
1287 int res;
1288 nxt_unit_impl_t *lib;
1289 nxt_unit_port_id_t port_id;
1290 nxt_unit_request_t *r;
1291 nxt_unit_mmap_buf_t *b;
1292 nxt_unit_request_info_t *req;
1293 nxt_unit_request_info_impl_t *req_impl;
1294
1295 if (nxt_slow_path(recv_msg->mmap == 0)) {
1296 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1297 recv_msg->stream);
1298
1299 return NXT_UNIT_ERROR;
1300 }
1301
1302 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1303 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1304 "%d expected", recv_msg->stream, (int) recv_msg->size,
1305 (int) sizeof(nxt_unit_request_t));
1306
1307 return NXT_UNIT_ERROR;
1308 }
1309
1310 req_impl = nxt_unit_request_info_get(ctx);
1311 if (nxt_slow_path(req_impl == NULL)) {
1312 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1313 recv_msg->stream);
1314
1315 return NXT_UNIT_ERROR;
1316 }
1317
1318 req = &req_impl->req;
1319
1320 req->request = recv_msg->start;
1321
1322 b = recv_msg->incoming_buf;
1323
1324 req->request_buf = &b->buf;
1325 req->response = NULL;
1326 req->response_buf = NULL;
1327
1328 r = req->request;
1329
1330 req->content_length = r->content_length;
1331
1332 req->content_buf = req->request_buf;
1333 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1334
1335 req_impl->stream = recv_msg->stream;
1336
1337 req_impl->outgoing_buf = NULL;
1338
1339 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1340 b->req = req;
1341 }
1342
1343 /* "Move" incoming buffer list to req_impl. */
1344 req_impl->incoming_buf = recv_msg->incoming_buf;
1345 req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1346 recv_msg->incoming_buf = NULL;
1347
1348 req->content_fd = recv_msg->fd[0];
1349 recv_msg->fd[0] = -1;
1350
1351 req->response_max_fields = 0;
1352 req_impl->state = NXT_UNIT_RS_START;
1353 req_impl->websocket = 0;
1354 req_impl->in_hash = 0;
1355
1356 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1357 (int) r->method_length,
1358 (char *) nxt_unit_sptr_get(&r->method),
1359 (int) r->target_length,
1360 (char *) nxt_unit_sptr_get(&r->target),
1361 (int) r->content_length);
1362
1363 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1364
1365 res = nxt_unit_request_check_response_port(req, &port_id);
1366 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1367 return NXT_UNIT_ERROR;
1368 }
1369
1370 if (nxt_fast_path(res == NXT_UNIT_OK)) {
1371 res = nxt_unit_send_req_headers_ack(req);
1372 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1373 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1374
1375 return NXT_UNIT_ERROR;
1376 }
1377
1378 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1379
1380 if (req->content_length
1381 > (uint64_t) (req->content_buf->end - req->content_buf->free))
1382 {
1383 res = nxt_unit_request_hash_add(ctx, req);
1384 if (nxt_slow_path(res != NXT_UNIT_OK)) {
1385 nxt_unit_req_warn(req, "failed to add request to hash");
1386
1387 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1388
1389 return NXT_UNIT_ERROR;
1390 }
1391
1392 /*
1393 * If application have separate data handler, we may start
1394 * request processing and process data when it is arrived.
1395 */
1396 if (lib->callbacks.data_handler == NULL) {
1397 return NXT_UNIT_OK;
1398 }
1399 }
1400
1401 if (preq == NULL) {
1402 lib->callbacks.request_handler(req);
1403
1404 } else {
1405 *preq = req;
1406 }
1407 }
1408
1409 return NXT_UNIT_OK;
1410 }
1411
1412
1413 static int
nxt_unit_process_req_body(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1414 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1415 {
1416 uint64_t l;
1417 nxt_unit_impl_t *lib;
1418 nxt_unit_mmap_buf_t *b;
1419 nxt_unit_request_info_t *req;
1420
1421 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1422 if (req == NULL) {
1423 return NXT_UNIT_OK;
1424 }
1425
1426 l = req->content_buf->end - req->content_buf->free;
1427
1428 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1429 b->req = req;
1430 l += b->buf.end - b->buf.free;
1431 }
1432
1433 if (recv_msg->incoming_buf != NULL) {
1434 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1435
1436 while (b->next != NULL) {
1437 b = b->next;
1438 }
1439
1440 /* "Move" incoming buffer list to req_impl. */
1441 b->next = recv_msg->incoming_buf;
1442 b->next->prev = &b->next;
1443
1444 recv_msg->incoming_buf = NULL;
1445 }
1446
1447 req->content_fd = recv_msg->fd[0];
1448 recv_msg->fd[0] = -1;
1449
1450 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1451
1452 if (lib->callbacks.data_handler != NULL) {
1453 lib->callbacks.data_handler(req);
1454
1455 return NXT_UNIT_OK;
1456 }
1457
1458 if (req->content_fd != -1 || l == req->content_length) {
1459 lib->callbacks.request_handler(req);
1460 }
1461
1462 return NXT_UNIT_OK;
1463 }
1464
1465
1466 static int
nxt_unit_request_check_response_port(nxt_unit_request_info_t * req,nxt_unit_port_id_t * port_id)1467 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1468 nxt_unit_port_id_t *port_id)
1469 {
1470 int res;
1471 nxt_unit_ctx_t *ctx;
1472 nxt_unit_impl_t *lib;
1473 nxt_unit_port_t *port;
1474 nxt_unit_process_t *process;
1475 nxt_unit_ctx_impl_t *ctx_impl;
1476 nxt_unit_port_impl_t *port_impl;
1477 nxt_unit_request_info_impl_t *req_impl;
1478
1479 ctx = req->ctx;
1480 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1481 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1482
1483 pthread_mutex_lock(&lib->mutex);
1484
1485 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1486 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1487
1488 if (nxt_fast_path(port != NULL)) {
1489 req->response_port = port;
1490
1491 if (nxt_fast_path(port_impl->ready)) {
1492 pthread_mutex_unlock(&lib->mutex);
1493
1494 nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1495 (int) port->id.pid, (int) port->id.id);
1496
1497 return NXT_UNIT_OK;
1498 }
1499
1500 nxt_unit_debug(ctx, "check_response_port: "
1501 "port{%d,%d} already requested",
1502 (int) port->id.pid, (int) port->id.id);
1503
1504 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1505
1506 nxt_queue_insert_tail(&port_impl->awaiting_req,
1507 &req_impl->port_wait_link);
1508
1509 pthread_mutex_unlock(&lib->mutex);
1510
1511 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1512
1513 return NXT_UNIT_AGAIN;
1514 }
1515
1516 port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1517 if (nxt_slow_path(port_impl == NULL)) {
1518 nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1519 (int) sizeof(nxt_unit_port_impl_t));
1520
1521 pthread_mutex_unlock(&lib->mutex);
1522
1523 return NXT_UNIT_ERROR;
1524 }
1525
1526 port = &port_impl->port;
1527
1528 port->id = *port_id;
1529 port->in_fd = -1;
1530 port->out_fd = -1;
1531 port->data = NULL;
1532
1533 res = nxt_unit_port_hash_add(&lib->ports, port);
1534 if (nxt_slow_path(res != NXT_UNIT_OK)) {
1535 nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1536 port->id.pid, port->id.id);
1537
1538 pthread_mutex_unlock(&lib->mutex);
1539
1540 nxt_unit_free(ctx, port);
1541
1542 return NXT_UNIT_ERROR;
1543 }
1544
1545 process = nxt_unit_process_find(lib, port_id->pid, 0);
1546 if (nxt_slow_path(process == NULL)) {
1547 nxt_unit_alert(ctx, "check_response_port: process %d not found",
1548 port->id.pid);
1549
1550 nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1551
1552 pthread_mutex_unlock(&lib->mutex);
1553
1554 nxt_unit_free(ctx, port);
1555
1556 return NXT_UNIT_ERROR;
1557 }
1558
1559 nxt_queue_insert_tail(&process->ports, &port_impl->link);
1560
1561 port_impl->process = process;
1562 port_impl->queue = NULL;
1563 port_impl->from_socket = 0;
1564 port_impl->socket_rbuf = NULL;
1565
1566 nxt_queue_init(&port_impl->awaiting_req);
1567
1568 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1569
1570 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1571
1572 port_impl->use_count = 2;
1573 port_impl->ready = 0;
1574
1575 req->response_port = port;
1576
1577 pthread_mutex_unlock(&lib->mutex);
1578
1579 res = nxt_unit_get_port(ctx, port_id);
1580 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1581 return NXT_UNIT_ERROR;
1582 }
1583
1584 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1585
1586 return NXT_UNIT_AGAIN;
1587 }
1588
1589
1590 static int
nxt_unit_send_req_headers_ack(nxt_unit_request_info_t * req)1591 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1592 {
1593 ssize_t res;
1594 nxt_port_msg_t msg;
1595 nxt_unit_impl_t *lib;
1596 nxt_unit_ctx_impl_t *ctx_impl;
1597 nxt_unit_request_info_impl_t *req_impl;
1598
1599 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1600 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1601 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1602
1603 memset(&msg, 0, sizeof(nxt_port_msg_t));
1604
1605 msg.stream = req_impl->stream;
1606 msg.pid = lib->pid;
1607 msg.reply_port = ctx_impl->read_port->id.id;
1608 msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1609
1610 res = nxt_unit_port_send(req->ctx, req->response_port,
1611 &msg, sizeof(msg), NULL);
1612 if (nxt_slow_path(res != sizeof(msg))) {
1613 return NXT_UNIT_ERROR;
1614 }
1615
1616 return NXT_UNIT_OK;
1617 }
1618
1619
1620 static int
nxt_unit_process_websocket(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1621 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1622 {
1623 size_t hsize;
1624 nxt_unit_impl_t *lib;
1625 nxt_unit_mmap_buf_t *b;
1626 nxt_unit_callbacks_t *cb;
1627 nxt_unit_request_info_t *req;
1628 nxt_unit_request_info_impl_t *req_impl;
1629 nxt_unit_websocket_frame_impl_t *ws_impl;
1630
1631 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1632 if (nxt_slow_path(req == NULL)) {
1633 return NXT_UNIT_OK;
1634 }
1635
1636 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1637
1638 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1639 cb = &lib->callbacks;
1640
1641 if (cb->websocket_handler && recv_msg->size >= 2) {
1642 ws_impl = nxt_unit_websocket_frame_get(ctx);
1643 if (nxt_slow_path(ws_impl == NULL)) {
1644 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1645 req_impl->stream);
1646
1647 return NXT_UNIT_ERROR;
1648 }
1649
1650 ws_impl->ws.req = req;
1651
1652 ws_impl->buf = NULL;
1653
1654 if (recv_msg->mmap) {
1655 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1656 b->req = req;
1657 }
1658
1659 /* "Move" incoming buffer list to ws_impl. */
1660 ws_impl->buf = recv_msg->incoming_buf;
1661 ws_impl->buf->prev = &ws_impl->buf;
1662 recv_msg->incoming_buf = NULL;
1663
1664 b = ws_impl->buf;
1665
1666 } else {
1667 b = nxt_unit_mmap_buf_get(ctx);
1668 if (nxt_slow_path(b == NULL)) {
1669 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1670 req_impl->stream);
1671
1672 nxt_unit_websocket_frame_release(&ws_impl->ws);
1673
1674 return NXT_UNIT_ERROR;
1675 }
1676
1677 b->req = req;
1678 b->buf.start = recv_msg->start;
1679 b->buf.free = b->buf.start;
1680 b->buf.end = b->buf.start + recv_msg->size;
1681
1682 nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1683 }
1684
1685 ws_impl->ws.header = (void *) b->buf.start;
1686 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1687 ws_impl->ws.header);
1688
1689 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1690
1691 if (ws_impl->ws.header->mask) {
1692 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1693
1694 } else {
1695 ws_impl->ws.mask = NULL;
1696 }
1697
1698 b->buf.free += hsize;
1699
1700 ws_impl->ws.content_buf = &b->buf;
1701 ws_impl->ws.content_length = ws_impl->ws.payload_len;
1702
1703 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1704 "payload_len=%"PRIu64,
1705 ws_impl->ws.header->opcode,
1706 ws_impl->ws.payload_len);
1707
1708 cb->websocket_handler(&ws_impl->ws);
1709 }
1710
1711 if (recv_msg->last) {
1712 if (cb->close_handler) {
1713 nxt_unit_req_debug(req, "close_handler");
1714
1715 cb->close_handler(req);
1716
1717 } else {
1718 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1719 }
1720 }
1721
1722 return NXT_UNIT_OK;
1723 }
1724
1725
1726 static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t * ctx)1727 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1728 {
1729 nxt_unit_impl_t *lib;
1730 nxt_unit_callbacks_t *cb;
1731
1732 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1733 cb = &lib->callbacks;
1734
1735 if (cb->shm_ack_handler != NULL) {
1736 cb->shm_ack_handler(ctx);
1737 }
1738
1739 return NXT_UNIT_OK;
1740 }
1741
1742
1743 static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t * ctx)1744 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1745 {
1746 nxt_unit_impl_t *lib;
1747 nxt_queue_link_t *lnk;
1748 nxt_unit_ctx_impl_t *ctx_impl;
1749 nxt_unit_request_info_impl_t *req_impl;
1750
1751 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1752
1753 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1754
1755 pthread_mutex_lock(&ctx_impl->mutex);
1756
1757 if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1758 pthread_mutex_unlock(&ctx_impl->mutex);
1759
1760 req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1761 + lib->request_data_size);
1762 if (nxt_slow_path(req_impl == NULL)) {
1763 return NULL;
1764 }
1765
1766 req_impl->req.unit = ctx->unit;
1767 req_impl->req.ctx = ctx;
1768
1769 pthread_mutex_lock(&ctx_impl->mutex);
1770
1771 } else {
1772 lnk = nxt_queue_first(&ctx_impl->free_req);
1773 nxt_queue_remove(lnk);
1774
1775 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1776 }
1777
1778 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1779
1780 pthread_mutex_unlock(&ctx_impl->mutex);
1781
1782 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1783
1784 return req_impl;
1785 }
1786
1787
1788 static void
nxt_unit_request_info_release(nxt_unit_request_info_t * req)1789 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1790 {
1791 nxt_unit_ctx_t *ctx;
1792 nxt_unit_ctx_impl_t *ctx_impl;
1793 nxt_unit_request_info_impl_t *req_impl;
1794
1795 ctx = req->ctx;
1796 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1797 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1798
1799 req->response = NULL;
1800 req->response_buf = NULL;
1801
1802 if (req_impl->in_hash) {
1803 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1804 }
1805
1806 while (req_impl->outgoing_buf != NULL) {
1807 nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1808 }
1809
1810 while (req_impl->incoming_buf != NULL) {
1811 nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1812 }
1813
1814 if (req->content_fd != -1) {
1815 nxt_unit_close(req->content_fd);
1816
1817 req->content_fd = -1;
1818 }
1819
1820 if (req->response_port != NULL) {
1821 nxt_unit_port_release(req->response_port);
1822
1823 req->response_port = NULL;
1824 }
1825
1826 req_impl->state = NXT_UNIT_RS_RELEASED;
1827
1828 pthread_mutex_lock(&ctx_impl->mutex);
1829
1830 nxt_queue_remove(&req_impl->link);
1831
1832 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1833
1834 pthread_mutex_unlock(&ctx_impl->mutex);
1835
1836 if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1837 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1838 }
1839 }
1840
1841
1842 static void
nxt_unit_request_info_free(nxt_unit_request_info_impl_t * req_impl)1843 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1844 {
1845 nxt_unit_ctx_impl_t *ctx_impl;
1846
1847 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1848
1849 nxt_queue_remove(&req_impl->link);
1850
1851 if (req_impl != &ctx_impl->req) {
1852 nxt_unit_free(&ctx_impl->ctx, req_impl);
1853 }
1854 }
1855
1856
1857 static nxt_unit_websocket_frame_impl_t *
nxt_unit_websocket_frame_get(nxt_unit_ctx_t * ctx)1858 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1859 {
1860 nxt_queue_link_t *lnk;
1861 nxt_unit_ctx_impl_t *ctx_impl;
1862 nxt_unit_websocket_frame_impl_t *ws_impl;
1863
1864 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1865
1866 pthread_mutex_lock(&ctx_impl->mutex);
1867
1868 if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1869 pthread_mutex_unlock(&ctx_impl->mutex);
1870
1871 ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1872 if (nxt_slow_path(ws_impl == NULL)) {
1873 return NULL;
1874 }
1875
1876 } else {
1877 lnk = nxt_queue_first(&ctx_impl->free_ws);
1878 nxt_queue_remove(lnk);
1879
1880 pthread_mutex_unlock(&ctx_impl->mutex);
1881
1882 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1883 }
1884
1885 ws_impl->ctx_impl = ctx_impl;
1886
1887 return ws_impl;
1888 }
1889
1890
1891 static void
nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t * ws)1892 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1893 {
1894 nxt_unit_websocket_frame_impl_t *ws_impl;
1895
1896 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1897
1898 while (ws_impl->buf != NULL) {
1899 nxt_unit_mmap_buf_free(ws_impl->buf);
1900 }
1901
1902 ws->req = NULL;
1903
1904 pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1905
1906 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1907
1908 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1909 }
1910
1911
1912 static void
nxt_unit_websocket_frame_free(nxt_unit_ctx_t * ctx,nxt_unit_websocket_frame_impl_t * ws_impl)1913 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1914 nxt_unit_websocket_frame_impl_t *ws_impl)
1915 {
1916 nxt_queue_remove(&ws_impl->link);
1917
1918 nxt_unit_free(ctx, ws_impl);
1919 }
1920
1921
1922 uint16_t
nxt_unit_field_hash(const char * name,size_t name_length)1923 nxt_unit_field_hash(const char *name, size_t name_length)
1924 {
1925 u_char ch;
1926 uint32_t hash;
1927 const char *p, *end;
1928
1929 hash = 159406; /* Magic value copied from nxt_http_parse.c */
1930 end = name + name_length;
1931
1932 for (p = name; p < end; p++) {
1933 ch = *p;
1934 hash = (hash << 4) + hash + nxt_lowcase(ch);
1935 }
1936
1937 hash = (hash >> 16) ^ hash;
1938
1939 return hash;
1940 }
1941
1942
1943 void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t * req)1944 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1945 {
1946 char *name;
1947 uint32_t i, j;
1948 nxt_unit_field_t *fields, f;
1949 nxt_unit_request_t *r;
1950
1951 static nxt_str_t content_length = nxt_string("content-length");
1952 static nxt_str_t content_type = nxt_string("content-type");
1953 static nxt_str_t cookie = nxt_string("cookie");
1954
1955 nxt_unit_req_debug(req, "group_dup_fields");
1956
1957 r = req->request;
1958 fields = r->fields;
1959
1960 for (i = 0; i < r->fields_count; i++) {
1961 name = nxt_unit_sptr_get(&fields[i].name);
1962
1963 switch (fields[i].hash) {
1964 case NXT_UNIT_HASH_CONTENT_LENGTH:
1965 if (fields[i].name_length == content_length.length
1966 && nxt_unit_memcasecmp(name, content_length.start,
1967 content_length.length) == 0)
1968 {
1969 r->content_length_field = i;
1970 }
1971
1972 break;
1973
1974 case NXT_UNIT_HASH_CONTENT_TYPE:
1975 if (fields[i].name_length == content_type.length
1976 && nxt_unit_memcasecmp(name, content_type.start,
1977 content_type.length) == 0)
1978 {
1979 r->content_type_field = i;
1980 }
1981
1982 break;
1983
1984 case NXT_UNIT_HASH_COOKIE:
1985 if (fields[i].name_length == cookie.length
1986 && nxt_unit_memcasecmp(name, cookie.start,
1987 cookie.length) == 0)
1988 {
1989 r->cookie_field = i;
1990 }
1991
1992 break;
1993 }
1994
1995 for (j = i + 1; j < r->fields_count; j++) {
1996 if (fields[i].hash != fields[j].hash
1997 || fields[i].name_length != fields[j].name_length
1998 || nxt_unit_memcasecmp(name,
1999 nxt_unit_sptr_get(&fields[j].name),
2000 fields[j].name_length) != 0)
2001 {
2002 continue;
2003 }
2004
2005 f = fields[j];
2006 f.value.offset += (j - (i + 1)) * sizeof(f);
2007
2008 while (j > i + 1) {
2009 fields[j] = fields[j - 1];
2010 fields[j].name.offset -= sizeof(f);
2011 fields[j].value.offset -= sizeof(f);
2012 j--;
2013 }
2014
2015 fields[j] = f;
2016
2017 /* Assign the same name pointer for further grouping simplicity. */
2018 nxt_unit_sptr_set(&fields[j].name, name);
2019
2020 i++;
2021 }
2022 }
2023 }
2024
2025
2026 int
nxt_unit_response_init(nxt_unit_request_info_t * req,uint16_t status,uint32_t max_fields_count,uint32_t max_fields_size)2027 nxt_unit_response_init(nxt_unit_request_info_t *req,
2028 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2029 {
2030 uint32_t buf_size;
2031 nxt_unit_buf_t *buf;
2032 nxt_unit_request_info_impl_t *req_impl;
2033
2034 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2035
2036 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2037 nxt_unit_req_warn(req, "init: response already sent");
2038
2039 return NXT_UNIT_ERROR;
2040 }
2041
2042 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2043 (int) max_fields_count, (int) max_fields_size);
2044
2045 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2046 nxt_unit_req_debug(req, "duplicate response init");
2047 }
2048
2049 /*
2050 * Each field name and value 0-terminated by libunit,
2051 * this is the reason of '+ 2' below.
2052 */
2053 buf_size = sizeof(nxt_unit_response_t)
2054 + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2055 + max_fields_size;
2056
2057 if (nxt_slow_path(req->response_buf != NULL)) {
2058 buf = req->response_buf;
2059
2060 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2061 goto init_response;
2062 }
2063
2064 nxt_unit_buf_free(buf);
2065
2066 req->response_buf = NULL;
2067 req->response = NULL;
2068 req->response_max_fields = 0;
2069
2070 req_impl->state = NXT_UNIT_RS_START;
2071 }
2072
2073 buf = nxt_unit_response_buf_alloc(req, buf_size);
2074 if (nxt_slow_path(buf == NULL)) {
2075 return NXT_UNIT_ERROR;
2076 }
2077
2078 init_response:
2079
2080 memset(buf->start, 0, sizeof(nxt_unit_response_t));
2081
2082 req->response_buf = buf;
2083
2084 req->response = (nxt_unit_response_t *) buf->start;
2085 req->response->status = status;
2086
2087 buf->free = buf->start + sizeof(nxt_unit_response_t)
2088 + max_fields_count * sizeof(nxt_unit_field_t);
2089
2090 req->response_max_fields = max_fields_count;
2091 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2092
2093 return NXT_UNIT_OK;
2094 }
2095
2096
2097 int
nxt_unit_response_realloc(nxt_unit_request_info_t * req,uint32_t max_fields_count,uint32_t max_fields_size)2098 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2099 uint32_t max_fields_count, uint32_t max_fields_size)
2100 {
2101 char *p;
2102 uint32_t i, buf_size;
2103 nxt_unit_buf_t *buf;
2104 nxt_unit_field_t *f, *src;
2105 nxt_unit_response_t *resp;
2106 nxt_unit_request_info_impl_t *req_impl;
2107
2108 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2109
2110 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2111 nxt_unit_req_warn(req, "realloc: response not init");
2112
2113 return NXT_UNIT_ERROR;
2114 }
2115
2116 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2117 nxt_unit_req_warn(req, "realloc: response already sent");
2118
2119 return NXT_UNIT_ERROR;
2120 }
2121
2122 if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2123 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2124
2125 return NXT_UNIT_ERROR;
2126 }
2127
2128 /*
2129 * Each field name and value 0-terminated by libunit,
2130 * this is the reason of '+ 2' below.
2131 */
2132 buf_size = sizeof(nxt_unit_response_t)
2133 + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2134 + max_fields_size;
2135
2136 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2137
2138 buf = nxt_unit_response_buf_alloc(req, buf_size);
2139 if (nxt_slow_path(buf == NULL)) {
2140 nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2141 return NXT_UNIT_ERROR;
2142 }
2143
2144 resp = (nxt_unit_response_t *) buf->start;
2145
2146 memset(resp, 0, sizeof(nxt_unit_response_t));
2147
2148 resp->status = req->response->status;
2149 resp->content_length = req->response->content_length;
2150
2151 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2152 f = resp->fields;
2153
2154 for (i = 0; i < req->response->fields_count; i++) {
2155 src = req->response->fields + i;
2156
2157 if (nxt_slow_path(src->skip != 0)) {
2158 continue;
2159 }
2160
2161 if (nxt_slow_path(src->name_length + src->value_length + 2
2162 > (uint32_t) (buf->end - p)))
2163 {
2164 nxt_unit_req_warn(req, "realloc: not enough space for field"
2165 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2166 i, src, src->name_length, src->value_length);
2167
2168 goto fail;
2169 }
2170
2171 nxt_unit_sptr_set(&f->name, p);
2172 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2173 *p++ = '\0';
2174
2175 nxt_unit_sptr_set(&f->value, p);
2176 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2177 *p++ = '\0';
2178
2179 f->hash = src->hash;
2180 f->skip = 0;
2181 f->name_length = src->name_length;
2182 f->value_length = src->value_length;
2183
2184 resp->fields_count++;
2185 f++;
2186 }
2187
2188 if (req->response->piggyback_content_length > 0) {
2189 if (nxt_slow_path(req->response->piggyback_content_length
2190 > (uint32_t) (buf->end - p)))
2191 {
2192 nxt_unit_req_warn(req, "realloc: not enought space for content"
2193 " #%"PRIu32", %"PRIu32" required",
2194 i, req->response->piggyback_content_length);
2195
2196 goto fail;
2197 }
2198
2199 resp->piggyback_content_length =
2200 req->response->piggyback_content_length;
2201
2202 nxt_unit_sptr_set(&resp->piggyback_content, p);
2203 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2204 req->response->piggyback_content_length);
2205 }
2206
2207 buf->free = p;
2208
2209 nxt_unit_buf_free(req->response_buf);
2210
2211 req->response = resp;
2212 req->response_buf = buf;
2213 req->response_max_fields = max_fields_count;
2214
2215 return NXT_UNIT_OK;
2216
2217 fail:
2218
2219 nxt_unit_buf_free(buf);
2220
2221 return NXT_UNIT_ERROR;
2222 }
2223
2224
2225 int
nxt_unit_response_is_init(nxt_unit_request_info_t * req)2226 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2227 {
2228 nxt_unit_request_info_impl_t *req_impl;
2229
2230 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2231
2232 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2233 }
2234
2235
2236 int
nxt_unit_response_add_field(nxt_unit_request_info_t * req,const char * name,uint8_t name_length,const char * value,uint32_t value_length)2237 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2238 const char *name, uint8_t name_length,
2239 const char *value, uint32_t value_length)
2240 {
2241 nxt_unit_buf_t *buf;
2242 nxt_unit_field_t *f;
2243 nxt_unit_response_t *resp;
2244 nxt_unit_request_info_impl_t *req_impl;
2245
2246 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2247
2248 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2249 nxt_unit_req_warn(req, "add_field: response not initialized or "
2250 "already sent");
2251
2252 return NXT_UNIT_ERROR;
2253 }
2254
2255 resp = req->response;
2256
2257 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2258 nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2259 (int) resp->fields_count);
2260
2261 return NXT_UNIT_ERROR;
2262 }
2263
2264 buf = req->response_buf;
2265
2266 if (nxt_slow_path(name_length + value_length + 2
2267 > (uint32_t) (buf->end - buf->free)))
2268 {
2269 nxt_unit_req_warn(req, "add_field: response buffer overflow");
2270
2271 return NXT_UNIT_ERROR;
2272 }
2273
2274 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2275 resp->fields_count,
2276 (int) name_length, name,
2277 (int) value_length, value);
2278
2279 f = resp->fields + resp->fields_count;
2280
2281 nxt_unit_sptr_set(&f->name, buf->free);
2282 buf->free = nxt_cpymem(buf->free, name, name_length);
2283 *buf->free++ = '\0';
2284
2285 nxt_unit_sptr_set(&f->value, buf->free);
2286 buf->free = nxt_cpymem(buf->free, value, value_length);
2287 *buf->free++ = '\0';
2288
2289 f->hash = nxt_unit_field_hash(name, name_length);
2290 f->skip = 0;
2291 f->name_length = name_length;
2292 f->value_length = value_length;
2293
2294 resp->fields_count++;
2295
2296 return NXT_UNIT_OK;
2297 }
2298
2299
2300 int
nxt_unit_response_add_content(nxt_unit_request_info_t * req,const void * src,uint32_t size)2301 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2302 const void* src, uint32_t size)
2303 {
2304 nxt_unit_buf_t *buf;
2305 nxt_unit_response_t *resp;
2306 nxt_unit_request_info_impl_t *req_impl;
2307
2308 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2309
2310 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2311 nxt_unit_req_warn(req, "add_content: response not initialized yet");
2312
2313 return NXT_UNIT_ERROR;
2314 }
2315
2316 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2317 nxt_unit_req_warn(req, "add_content: response already sent");
2318
2319 return NXT_UNIT_ERROR;
2320 }
2321
2322 buf = req->response_buf;
2323
2324 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2325 nxt_unit_req_warn(req, "add_content: buffer overflow");
2326
2327 return NXT_UNIT_ERROR;
2328 }
2329
2330 resp = req->response;
2331
2332 if (resp->piggyback_content_length == 0) {
2333 nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2334 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2335 }
2336
2337 resp->piggyback_content_length += size;
2338
2339 buf->free = nxt_cpymem(buf->free, src, size);
2340
2341 return NXT_UNIT_OK;
2342 }
2343
2344
2345 int
nxt_unit_response_send(nxt_unit_request_info_t * req)2346 nxt_unit_response_send(nxt_unit_request_info_t *req)
2347 {
2348 int rc;
2349 nxt_unit_mmap_buf_t *mmap_buf;
2350 nxt_unit_request_info_impl_t *req_impl;
2351
2352 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2353
2354 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2355 nxt_unit_req_warn(req, "send: response is not initialized yet");
2356
2357 return NXT_UNIT_ERROR;
2358 }
2359
2360 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2361 nxt_unit_req_warn(req, "send: response already sent");
2362
2363 return NXT_UNIT_ERROR;
2364 }
2365
2366 if (req->request->websocket_handshake && req->response->status == 101) {
2367 nxt_unit_response_upgrade(req);
2368 }
2369
2370 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2371 req->response->fields_count,
2372 (int) (req->response_buf->free
2373 - req->response_buf->start));
2374
2375 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2376
2377 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2378 if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2379 req->response = NULL;
2380 req->response_buf = NULL;
2381 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2382
2383 nxt_unit_mmap_buf_free(mmap_buf);
2384 }
2385
2386 return rc;
2387 }
2388
2389
2390 int
nxt_unit_response_is_sent(nxt_unit_request_info_t * req)2391 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2392 {
2393 nxt_unit_request_info_impl_t *req_impl;
2394
2395 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2396
2397 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2398 }
2399
2400
2401 nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t * req,uint32_t size)2402 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2403 {
2404 int rc;
2405 nxt_unit_mmap_buf_t *mmap_buf;
2406 nxt_unit_request_info_impl_t *req_impl;
2407
2408 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2409 nxt_unit_req_warn(req, "response_buf_alloc: "
2410 "requested buffer (%"PRIu32") too big", size);
2411
2412 return NULL;
2413 }
2414
2415 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2416
2417 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2418
2419 mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2420 if (nxt_slow_path(mmap_buf == NULL)) {
2421 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2422
2423 return NULL;
2424 }
2425
2426 mmap_buf->req = req;
2427
2428 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2429
2430 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2431 size, size, mmap_buf,
2432 NULL);
2433 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2434 nxt_unit_mmap_buf_release(mmap_buf);
2435
2436 nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2437
2438 return NULL;
2439 }
2440
2441 return &mmap_buf->buf;
2442 }
2443
2444
2445 static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t * ctx)2446 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2447 {
2448 nxt_unit_mmap_buf_t *mmap_buf;
2449 nxt_unit_ctx_impl_t *ctx_impl;
2450
2451 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2452
2453 pthread_mutex_lock(&ctx_impl->mutex);
2454
2455 if (ctx_impl->free_buf == NULL) {
2456 pthread_mutex_unlock(&ctx_impl->mutex);
2457
2458 mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2459 if (nxt_slow_path(mmap_buf == NULL)) {
2460 return NULL;
2461 }
2462
2463 } else {
2464 mmap_buf = ctx_impl->free_buf;
2465
2466 nxt_unit_mmap_buf_unlink(mmap_buf);
2467
2468 pthread_mutex_unlock(&ctx_impl->mutex);
2469 }
2470
2471 mmap_buf->ctx_impl = ctx_impl;
2472
2473 mmap_buf->hdr = NULL;
2474 mmap_buf->free_ptr = NULL;
2475
2476 return mmap_buf;
2477 }
2478
2479
2480 static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t * mmap_buf)2481 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2482 {
2483 nxt_unit_mmap_buf_unlink(mmap_buf);
2484
2485 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2486
2487 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2488
2489 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2490 }
2491
2492
2493 int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t * req)2494 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2495 {
2496 return req->request->websocket_handshake;
2497 }
2498
2499
2500 int
nxt_unit_response_upgrade(nxt_unit_request_info_t * req)2501 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2502 {
2503 int rc;
2504 nxt_unit_request_info_impl_t *req_impl;
2505
2506 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2507
2508 if (nxt_slow_path(req_impl->websocket != 0)) {
2509 nxt_unit_req_debug(req, "upgrade: already upgraded");
2510
2511 return NXT_UNIT_OK;
2512 }
2513
2514 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2515 nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2516
2517 return NXT_UNIT_ERROR;
2518 }
2519
2520 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2521 nxt_unit_req_warn(req, "upgrade: response already sent");
2522
2523 return NXT_UNIT_ERROR;
2524 }
2525
2526 rc = nxt_unit_request_hash_add(req->ctx, req);
2527 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2528 nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2529
2530 return NXT_UNIT_ERROR;
2531 }
2532
2533 req_impl->websocket = 1;
2534
2535 req->response->status = 101;
2536
2537 return NXT_UNIT_OK;
2538 }
2539
2540
2541 int
nxt_unit_response_is_websocket(nxt_unit_request_info_t * req)2542 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2543 {
2544 nxt_unit_request_info_impl_t *req_impl;
2545
2546 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2547
2548 return req_impl->websocket;
2549 }
2550
2551
2552 nxt_unit_request_info_t *
nxt_unit_get_request_info_from_data(void * data)2553 nxt_unit_get_request_info_from_data(void *data)
2554 {
2555 nxt_unit_request_info_impl_t *req_impl;
2556
2557 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2558
2559 return &req_impl->req;
2560 }
2561
2562
2563 int
nxt_unit_buf_send(nxt_unit_buf_t * buf)2564 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2565 {
2566 int rc;
2567 nxt_unit_mmap_buf_t *mmap_buf;
2568 nxt_unit_request_info_t *req;
2569 nxt_unit_request_info_impl_t *req_impl;
2570
2571 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2572
2573 req = mmap_buf->req;
2574 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2575
2576 nxt_unit_req_debug(req, "buf_send: %d bytes",
2577 (int) (buf->free - buf->start));
2578
2579 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2580 nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2581
2582 return NXT_UNIT_ERROR;
2583 }
2584
2585 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2586 nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2587
2588 return NXT_UNIT_ERROR;
2589 }
2590
2591 if (nxt_fast_path(buf->free > buf->start)) {
2592 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2593 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2594 return rc;
2595 }
2596 }
2597
2598 nxt_unit_mmap_buf_free(mmap_buf);
2599
2600 return NXT_UNIT_OK;
2601 }
2602
2603
2604 static void
nxt_unit_buf_send_done(nxt_unit_buf_t * buf)2605 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2606 {
2607 int rc;
2608 nxt_unit_mmap_buf_t *mmap_buf;
2609 nxt_unit_request_info_t *req;
2610
2611 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2612
2613 req = mmap_buf->req;
2614
2615 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2616 if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2617 nxt_unit_mmap_buf_free(mmap_buf);
2618
2619 nxt_unit_request_info_release(req);
2620
2621 } else {
2622 nxt_unit_request_done(req, rc);
2623 }
2624 }
2625
2626
2627 static int
nxt_unit_mmap_buf_send(nxt_unit_request_info_t * req,nxt_unit_mmap_buf_t * mmap_buf,int last)2628 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2629 nxt_unit_mmap_buf_t *mmap_buf, int last)
2630 {
2631 struct {
2632 nxt_port_msg_t msg;
2633 nxt_port_mmap_msg_t mmap_msg;
2634 } m;
2635
2636 int rc;
2637 u_char *last_used, *first_free;
2638 ssize_t res;
2639 nxt_chunk_id_t first_free_chunk;
2640 nxt_unit_buf_t *buf;
2641 nxt_unit_impl_t *lib;
2642 nxt_port_mmap_header_t *hdr;
2643 nxt_unit_request_info_impl_t *req_impl;
2644
2645 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2646 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2647
2648 buf = &mmap_buf->buf;
2649 hdr = mmap_buf->hdr;
2650
2651 m.mmap_msg.size = buf->free - buf->start;
2652
2653 m.msg.stream = req_impl->stream;
2654 m.msg.pid = lib->pid;
2655 m.msg.reply_port = 0;
2656 m.msg.type = _NXT_PORT_MSG_DATA;
2657 m.msg.last = last != 0;
2658 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2659 m.msg.nf = 0;
2660 m.msg.mf = 0;
2661
2662 rc = NXT_UNIT_ERROR;
2663
2664 if (m.msg.mmap) {
2665 m.mmap_msg.mmap_id = hdr->id;
2666 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2667 (u_char *) buf->start);
2668
2669 nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2670 req_impl->stream,
2671 (int) m.mmap_msg.mmap_id,
2672 (int) m.mmap_msg.chunk_id,
2673 (int) m.mmap_msg.size);
2674
2675 res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2676 NULL);
2677 if (nxt_slow_path(res != sizeof(m))) {
2678 goto free_buf;
2679 }
2680
2681 last_used = (u_char *) buf->free - 1;
2682 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2683
2684 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2685 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2686
2687 buf->start = (char *) first_free;
2688 buf->free = buf->start;
2689
2690 if (buf->end < buf->start) {
2691 buf->end = buf->start;
2692 }
2693
2694 } else {
2695 buf->start = NULL;
2696 buf->free = NULL;
2697 buf->end = NULL;
2698
2699 mmap_buf->hdr = NULL;
2700 }
2701
2702 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2703 (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2704
2705 nxt_unit_debug(req->ctx, "allocated_chunks %d",
2706 (int) lib->outgoing.allocated_chunks);
2707
2708 } else {
2709 if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2710 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2711 {
2712 nxt_unit_alert(req->ctx,
2713 "#%"PRIu32": failed to send plain memory buffer"
2714 ": no space reserved for message header",
2715 req_impl->stream);
2716
2717 goto free_buf;
2718 }
2719
2720 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2721
2722 nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2723 req_impl->stream,
2724 (int) (sizeof(m.msg) + m.mmap_msg.size));
2725
2726 res = nxt_unit_port_send(req->ctx, req->response_port,
2727 buf->start - sizeof(m.msg),
2728 m.mmap_msg.size + sizeof(m.msg), NULL);
2729
2730 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2731 goto free_buf;
2732 }
2733 }
2734
2735 rc = NXT_UNIT_OK;
2736
2737 free_buf:
2738
2739 nxt_unit_free_outgoing_buf(mmap_buf);
2740
2741 return rc;
2742 }
2743
2744
2745 void
nxt_unit_buf_free(nxt_unit_buf_t * buf)2746 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2747 {
2748 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2749 }
2750
2751
2752 static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t * mmap_buf)2753 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2754 {
2755 nxt_unit_free_outgoing_buf(mmap_buf);
2756
2757 nxt_unit_mmap_buf_release(mmap_buf);
2758 }
2759
2760
2761 static void
nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t * mmap_buf)2762 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2763 {
2764 if (mmap_buf->hdr != NULL) {
2765 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2766 mmap_buf->hdr, mmap_buf->buf.start,
2767 mmap_buf->buf.end - mmap_buf->buf.start);
2768
2769 mmap_buf->hdr = NULL;
2770
2771 return;
2772 }
2773
2774 if (mmap_buf->free_ptr != NULL) {
2775 nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2776
2777 mmap_buf->free_ptr = NULL;
2778 }
2779 }
2780
2781
2782 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t * ctx)2783 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2784 {
2785 nxt_unit_ctx_impl_t *ctx_impl;
2786 nxt_unit_read_buf_t *rbuf;
2787
2788 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2789
2790 pthread_mutex_lock(&ctx_impl->mutex);
2791
2792 rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2793
2794 pthread_mutex_unlock(&ctx_impl->mutex);
2795
2796 rbuf->oob.size = 0;
2797
2798 return rbuf;
2799 }
2800
2801
2802 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t * ctx_impl)2803 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2804 {
2805 nxt_queue_link_t *link;
2806 nxt_unit_read_buf_t *rbuf;
2807
2808 if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2809 link = nxt_queue_first(&ctx_impl->free_rbuf);
2810 nxt_queue_remove(link);
2811
2812 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2813
2814 return rbuf;
2815 }
2816
2817 rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2818
2819 if (nxt_fast_path(rbuf != NULL)) {
2820 rbuf->ctx_impl = ctx_impl;
2821 }
2822
2823 return rbuf;
2824 }
2825
2826
2827 static void
nxt_unit_read_buf_release(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf)2828 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2829 nxt_unit_read_buf_t *rbuf)
2830 {
2831 nxt_unit_ctx_impl_t *ctx_impl;
2832
2833 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2834
2835 pthread_mutex_lock(&ctx_impl->mutex);
2836
2837 nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2838
2839 pthread_mutex_unlock(&ctx_impl->mutex);
2840 }
2841
2842
2843 nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t * buf)2844 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2845 {
2846 nxt_unit_mmap_buf_t *mmap_buf;
2847
2848 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2849
2850 if (mmap_buf->next == NULL) {
2851 return NULL;
2852 }
2853
2854 return &mmap_buf->next->buf;
2855 }
2856
2857
2858 uint32_t
nxt_unit_buf_max(void)2859 nxt_unit_buf_max(void)
2860 {
2861 return PORT_MMAP_DATA_SIZE;
2862 }
2863
2864
2865 uint32_t
nxt_unit_buf_min(void)2866 nxt_unit_buf_min(void)
2867 {
2868 return PORT_MMAP_CHUNK_SIZE;
2869 }
2870
2871
2872 int
nxt_unit_response_write(nxt_unit_request_info_t * req,const void * start,size_t size)2873 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2874 size_t size)
2875 {
2876 ssize_t res;
2877
2878 res = nxt_unit_response_write_nb(req, start, size, size);
2879
2880 return res < 0 ? -res : NXT_UNIT_OK;
2881 }
2882
2883
2884 ssize_t
nxt_unit_response_write_nb(nxt_unit_request_info_t * req,const void * start,size_t size,size_t min_size)2885 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2886 size_t size, size_t min_size)
2887 {
2888 int rc;
2889 ssize_t sent;
2890 uint32_t part_size, min_part_size, buf_size;
2891 const char *part_start;
2892 nxt_unit_mmap_buf_t mmap_buf;
2893 nxt_unit_request_info_impl_t *req_impl;
2894 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2895
2896 nxt_unit_req_debug(req, "write: %d", (int) size);
2897
2898 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2899
2900 part_start = start;
2901 sent = 0;
2902
2903 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2904 nxt_unit_req_alert(req, "write: response not initialized yet");
2905
2906 return -NXT_UNIT_ERROR;
2907 }
2908
2909 /* Check if response is not send yet. */
2910 if (nxt_slow_path(req->response_buf != NULL)) {
2911 part_size = req->response_buf->end - req->response_buf->free;
2912 part_size = nxt_min(size, part_size);
2913
2914 rc = nxt_unit_response_add_content(req, part_start, part_size);
2915 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2916 return -rc;
2917 }
2918
2919 rc = nxt_unit_response_send(req);
2920 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2921 return -rc;
2922 }
2923
2924 size -= part_size;
2925 part_start += part_size;
2926 sent += part_size;
2927
2928 min_size -= nxt_min(min_size, part_size);
2929 }
2930
2931 while (size > 0) {
2932 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2933 min_part_size = nxt_min(min_size, part_size);
2934 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2935
2936 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2937 min_part_size, &mmap_buf, local_buf);
2938 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2939 return -rc;
2940 }
2941
2942 buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2943 if (nxt_slow_path(buf_size == 0)) {
2944 return sent;
2945 }
2946 part_size = nxt_min(buf_size, part_size);
2947
2948 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2949 part_start, part_size);
2950
2951 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2952 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2953 return -rc;
2954 }
2955
2956 size -= part_size;
2957 part_start += part_size;
2958 sent += part_size;
2959
2960 min_size -= nxt_min(min_size, part_size);
2961 }
2962
2963 return sent;
2964 }
2965
2966
2967 int
nxt_unit_response_write_cb(nxt_unit_request_info_t * req,nxt_unit_read_info_t * read_info)2968 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2969 nxt_unit_read_info_t *read_info)
2970 {
2971 int rc;
2972 ssize_t n;
2973 uint32_t buf_size;
2974 nxt_unit_buf_t *buf;
2975 nxt_unit_mmap_buf_t mmap_buf;
2976 nxt_unit_request_info_impl_t *req_impl;
2977 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2978
2979 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2980
2981 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2982 nxt_unit_req_alert(req, "write: response not initialized yet");
2983
2984 return NXT_UNIT_ERROR;
2985 }
2986
2987 /* Check if response is not send yet. */
2988 if (nxt_slow_path(req->response_buf != NULL)) {
2989
2990 /* Enable content in headers buf. */
2991 rc = nxt_unit_response_add_content(req, "", 0);
2992 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2993 nxt_unit_req_error(req, "Failed to add piggyback content");
2994
2995 return rc;
2996 }
2997
2998 buf = req->response_buf;
2999
3000 while (buf->end - buf->free > 0) {
3001 n = read_info->read(read_info, buf->free, buf->end - buf->free);
3002 if (nxt_slow_path(n < 0)) {
3003 nxt_unit_req_error(req, "Read error");
3004
3005 return NXT_UNIT_ERROR;
3006 }
3007
3008 /* Manually increase sizes. */
3009 buf->free += n;
3010 req->response->piggyback_content_length += n;
3011
3012 if (read_info->eof) {
3013 break;
3014 }
3015 }
3016
3017 rc = nxt_unit_response_send(req);
3018 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3019 nxt_unit_req_error(req, "Failed to send headers with content");
3020
3021 return rc;
3022 }
3023
3024 if (read_info->eof) {
3025 return NXT_UNIT_OK;
3026 }
3027 }
3028
3029 while (!read_info->eof) {
3030 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3031 read_info->buf_size);
3032
3033 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3034
3035 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3036 buf_size, buf_size,
3037 &mmap_buf, local_buf);
3038 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3039 return rc;
3040 }
3041
3042 buf = &mmap_buf.buf;
3043
3044 while (!read_info->eof && buf->end > buf->free) {
3045 n = read_info->read(read_info, buf->free, buf->end - buf->free);
3046 if (nxt_slow_path(n < 0)) {
3047 nxt_unit_req_error(req, "Read error");
3048
3049 nxt_unit_free_outgoing_buf(&mmap_buf);
3050
3051 return NXT_UNIT_ERROR;
3052 }
3053
3054 buf->free += n;
3055 }
3056
3057 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3058 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3059 nxt_unit_req_error(req, "Failed to send content");
3060
3061 return rc;
3062 }
3063 }
3064
3065 return NXT_UNIT_OK;
3066 }
3067
3068
3069 ssize_t
nxt_unit_request_read(nxt_unit_request_info_t * req,void * dst,size_t size)3070 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3071 {
3072 ssize_t buf_res, res;
3073
3074 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3075 dst, size);
3076
3077 if (buf_res < (ssize_t) size && req->content_fd != -1) {
3078 res = read(req->content_fd, dst, size);
3079 if (nxt_slow_path(res < 0)) {
3080 nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3081 strerror(errno), errno);
3082
3083 return res;
3084 }
3085
3086 if (res < (ssize_t) size) {
3087 nxt_unit_close(req->content_fd);
3088
3089 req->content_fd = -1;
3090 }
3091
3092 req->content_length -= res;
3093
3094 dst = nxt_pointer_to(dst, res);
3095
3096 } else {
3097 res = 0;
3098 }
3099
3100 return buf_res + res;
3101 }
3102
3103
3104 ssize_t
nxt_unit_request_readline_size(nxt_unit_request_info_t * req,size_t max_size)3105 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3106 {
3107 char *p;
3108 size_t l_size, b_size;
3109 nxt_unit_buf_t *b;
3110 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf;
3111
3112 if (req->content_length == 0) {
3113 return 0;
3114 }
3115
3116 l_size = 0;
3117
3118