xref: /unit/src/nxt_unit.c (revision 1996:35873fa78fed)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "nxt_main.h"
7 #include "nxt_port_memory_int.h"
8 #include "nxt_socket_msg.h"
9 #include "nxt_port_queue.h"
10 #include "nxt_app_queue.h"
11 
12 #include "nxt_unit.h"
13 #include "nxt_unit_request.h"
14 #include "nxt_unit_response.h"
15 #include "nxt_unit_websocket.h"
16 
17 #include "nxt_websocket.h"
18 
19 #if (NXT_HAVE_MEMFD_CREATE)
20 #include <linux/memfd.h>
21 #endif
22 
23 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
24 #define NXT_UNIT_LOCAL_BUF_SIZE  \
25     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
26 
27 enum {
28     NXT_QUIT_NORMAL   = 0,
29     NXT_QUIT_GRACEFUL = 1,
30 };
31 
32 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
33 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
34 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
35 typedef struct nxt_unit_process_s               nxt_unit_process_t;
36 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
37 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
38 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
39 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
40 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
41 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
42 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
43 
44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
46     nxt_unit_ctx_impl_t *ctx_impl, void *data);
47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
52     nxt_unit_mmap_buf_t *mmap_buf);
53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
54     nxt_unit_mmap_buf_t *mmap_buf);
55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
57     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
58     int *log_fd, uint32_t *stream, uint32_t *shm_limit,
59     uint32_t *request_limit);
60 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
61     int queue_fd);
62 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
63     nxt_unit_request_info_t **preq);
64 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
65     nxt_unit_recv_msg_t *recv_msg);
66 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
67 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
68     nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
69 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
70     nxt_unit_recv_msg_t *recv_msg);
71 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
72     nxt_unit_port_id_t *port_id);
73 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
74 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
75     nxt_unit_recv_msg_t *recv_msg);
76 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
77 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
78     nxt_unit_ctx_t *ctx);
79 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
80 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
81 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
82     nxt_unit_ctx_t *ctx);
83 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
84 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
85     nxt_unit_websocket_frame_impl_t *ws);
86 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
87 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
88 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
89     nxt_unit_mmap_buf_t *mmap_buf, int last);
90 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
91 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
92 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
94     nxt_unit_ctx_impl_t *ctx_impl);
95 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
96     nxt_unit_read_buf_t *rbuf);
97 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
98     nxt_unit_request_info_t *req, size_t size);
99 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
100     size_t size);
101 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
102     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
103 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
104 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
105 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
106 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
107     nxt_unit_port_t *port, int n);
108 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
109 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
110     int fd);
111 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
112     nxt_unit_port_t *port, uint32_t size,
113     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
114 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
115 
116 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
117     nxt_unit_ctx_impl_t *ctx_impl);
118 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
119 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
120 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
121 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
122 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
123     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
124     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
125 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
126     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
127 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
128 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
129     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
130 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
131 
132 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
133 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
134     pid_t pid, int remove);
135 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
136 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
137 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
138 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
139 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
140 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
141 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
142 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
145 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
146     nxt_unit_port_t *port);
147 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
148 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
149 
150 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
151     nxt_unit_port_t *port, int queue_fd);
152 
153 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
154 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
155 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
156     nxt_unit_port_t *port, void *queue);
157 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
158     nxt_queue_t *awaiting_req);
159 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
160     nxt_unit_port_id_t *port_id);
161 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
162     nxt_unit_port_id_t *port_id);
163 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
164 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
165     nxt_unit_process_t *process);
166 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
167 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
168 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
169     nxt_unit_port_t *port, const void *buf, size_t buf_size,
170     const nxt_send_oob_t *oob);
171 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
172     const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
173 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
174     nxt_unit_read_buf_t *rbuf);
175 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
176     nxt_unit_read_buf_t *src);
177 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
178     nxt_unit_read_buf_t *rbuf);
179 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
180     nxt_unit_read_buf_t *rbuf);
181 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
182     nxt_unit_read_buf_t *rbuf);
183 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
184     nxt_unit_read_buf_t *rbuf);
185 nxt_inline int nxt_unit_close(int fd);
186 static int nxt_unit_fd_blocking(int fd);
187 
188 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
189     nxt_unit_port_t *port);
190 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
191     nxt_unit_port_id_t *port_id, int remove);
192 
193 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
194     nxt_unit_request_info_t *req);
195 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
196     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
197 
198 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
199 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
200 static void nxt_unit_lvlhsh_free(void *data, void *p);
201 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
202 
203 
204 struct nxt_unit_mmap_buf_s {
205     nxt_unit_buf_t           buf;
206 
207     nxt_unit_mmap_buf_t      *next;
208     nxt_unit_mmap_buf_t      **prev;
209 
210     nxt_port_mmap_header_t   *hdr;
211     nxt_unit_request_info_t  *req;
212     nxt_unit_ctx_impl_t      *ctx_impl;
213     char                     *free_ptr;
214     char                     *plain_ptr;
215 };
216 
217 
218 struct nxt_unit_recv_msg_s {
219     uint32_t                 stream;
220     nxt_pid_t                pid;
221     nxt_port_id_t            reply_port;
222 
223     uint8_t                  last;      /* 1 bit */
224     uint8_t                  mmap;      /* 1 bit */
225 
226     void                     *start;
227     uint32_t                 size;
228 
229     int                      fd[2];
230 
231     nxt_unit_mmap_buf_t      *incoming_buf;
232 };
233 
234 
235 typedef enum {
236     NXT_UNIT_RS_START           = 0,
237     NXT_UNIT_RS_RESPONSE_INIT,
238     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
239     NXT_UNIT_RS_RESPONSE_SENT,
240     NXT_UNIT_RS_RELEASED,
241 } nxt_unit_req_state_t;
242 
243 
244 struct nxt_unit_request_info_impl_s {
245     nxt_unit_request_info_t  req;
246 
247     uint32_t                 stream;
248 
249     nxt_unit_mmap_buf_t      *outgoing_buf;
250     nxt_unit_mmap_buf_t      *incoming_buf;
251 
252     nxt_unit_req_state_t     state;
253     uint8_t                  websocket;
254     uint8_t                  in_hash;
255 
256     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
257     nxt_queue_link_t         link;
258     /*  for nxt_unit_port_impl_t.awaiting_req */
259     nxt_queue_link_t         port_wait_link;
260 
261     char                     extra_data[];
262 };
263 
264 
265 struct nxt_unit_websocket_frame_impl_s {
266     nxt_unit_websocket_frame_t  ws;
267 
268     nxt_unit_mmap_buf_t         *buf;
269 
270     nxt_queue_link_t            link;
271 
272     nxt_unit_ctx_impl_t         *ctx_impl;
273 };
274 
275 
276 struct nxt_unit_read_buf_s {
277     nxt_queue_link_t              link;
278     nxt_unit_ctx_impl_t           *ctx_impl;
279     ssize_t                       size;
280     nxt_recv_oob_t                oob;
281     char                          buf[16384];
282 };
283 
284 
285 struct nxt_unit_ctx_impl_s {
286     nxt_unit_ctx_t                ctx;
287 
288     nxt_atomic_t                  use_count;
289     nxt_atomic_t                  wait_items;
290 
291     pthread_mutex_t               mutex;
292 
293     nxt_unit_port_t               *read_port;
294 
295     nxt_queue_link_t              link;
296 
297     nxt_unit_mmap_buf_t           *free_buf;
298 
299     /*  of nxt_unit_request_info_impl_t */
300     nxt_queue_t                   free_req;
301 
302     /*  of nxt_unit_websocket_frame_impl_t */
303     nxt_queue_t                   free_ws;
304 
305     /*  of nxt_unit_request_info_impl_t */
306     nxt_queue_t                   active_req;
307 
308     /*  of nxt_unit_request_info_impl_t */
309     nxt_lvlhsh_t                  requests;
310 
311     /*  of nxt_unit_request_info_impl_t */
312     nxt_queue_t                   ready_req;
313 
314     /*  of nxt_unit_read_buf_t */
315     nxt_queue_t                   pending_rbuf;
316 
317     /*  of nxt_unit_read_buf_t */
318     nxt_queue_t                   free_rbuf;
319 
320     uint8_t                       online;       /* 1 bit */
321     uint8_t                       ready;        /* 1 bit */
322     uint8_t                       quit_param;
323 
324     nxt_unit_mmap_buf_t           ctx_buf[2];
325     nxt_unit_read_buf_t           ctx_read_buf;
326 
327     nxt_unit_request_info_impl_t  req;
328 };
329 
330 
331 struct nxt_unit_mmap_s {
332     nxt_port_mmap_header_t   *hdr;
333     pthread_t                src_thread;
334 
335     /*  of nxt_unit_read_buf_t */
336     nxt_queue_t              awaiting_rbuf;
337 };
338 
339 
340 struct nxt_unit_mmaps_s {
341     pthread_mutex_t          mutex;
342     uint32_t                 size;
343     uint32_t                 cap;
344     nxt_atomic_t             allocated_chunks;
345     nxt_unit_mmap_t          *elts;
346 };
347 
348 
349 struct nxt_unit_impl_s {
350     nxt_unit_t               unit;
351     nxt_unit_callbacks_t     callbacks;
352 
353     nxt_atomic_t             use_count;
354     nxt_atomic_t             request_count;
355 
356     uint32_t                 request_data_size;
357     uint32_t                 shm_mmap_limit;
358     uint32_t                 request_limit;
359 
360     pthread_mutex_t          mutex;
361 
362     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
363     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
364 
365     nxt_unit_port_t          *router_port;
366     nxt_unit_port_t          *shared_port;
367 
368     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
369 
370     nxt_unit_mmaps_t         incoming;
371     nxt_unit_mmaps_t         outgoing;
372 
373     pid_t                    pid;
374     int                      log_fd;
375 
376     nxt_unit_ctx_impl_t      main_ctx;
377 };
378 
379 
380 struct nxt_unit_port_impl_s {
381     nxt_unit_port_t          port;
382 
383     nxt_atomic_t             use_count;
384 
385     /*  for nxt_unit_process_t.ports */
386     nxt_queue_link_t         link;
387     nxt_unit_process_t       *process;
388 
389     /*  of nxt_unit_request_info_impl_t */
390     nxt_queue_t              awaiting_req;
391 
392     int                      ready;
393 
394     void                     *queue;
395 
396     int                      from_socket;
397     nxt_unit_read_buf_t      *socket_rbuf;
398 };
399 
400 
401 struct nxt_unit_process_s {
402     pid_t                    pid;
403 
404     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
405 
406     nxt_unit_impl_t          *lib;
407 
408     nxt_atomic_t             use_count;
409 
410     uint32_t                 next_port_id;
411 };
412 
413 
414 /* Explicitly using 32 bit types to avoid possible alignment. */
415 typedef struct {
416     int32_t   pid;
417     uint32_t  id;
418 } nxt_unit_port_hash_id_t;
419 
420 
421 nxt_unit_ctx_t *
422 nxt_unit_init(nxt_unit_init_t *init)
423 {
424     int              rc, queue_fd;
425     void             *mem;
426     uint32_t         ready_stream, shm_limit, request_limit;
427     nxt_unit_ctx_t   *ctx;
428     nxt_unit_impl_t  *lib;
429     nxt_unit_port_t  ready_port, router_port, read_port;
430 
431     lib = nxt_unit_create(init);
432     if (nxt_slow_path(lib == NULL)) {
433         return NULL;
434     }
435 
436     queue_fd = -1;
437     mem = MAP_FAILED;
438 
439     if (init->ready_port.id.pid != 0
440         && init->ready_stream != 0
441         && init->read_port.id.pid != 0)
442     {
443         ready_port = init->ready_port;
444         ready_stream = init->ready_stream;
445         router_port = init->router_port;
446         read_port = init->read_port;
447         lib->log_fd = init->log_fd;
448 
449         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
450                               ready_port.id.id);
451         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
452                               router_port.id.id);
453         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
454                               read_port.id.id);
455 
456     } else {
457         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
458                                &lib->log_fd, &ready_stream, &shm_limit,
459                                &request_limit);
460         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
461             goto fail;
462         }
463 
464         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
465                                 / PORT_MMAP_DATA_SIZE;
466         lib->request_limit = request_limit;
467     }
468 
469     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
470         lib->shm_mmap_limit = 1;
471     }
472 
473     lib->pid = read_port.id.pid;
474 
475     ctx = &lib->main_ctx.ctx;
476 
477     rc = nxt_unit_fd_blocking(router_port.out_fd);
478     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
479         goto fail;
480     }
481 
482     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
483     if (nxt_slow_path(lib->router_port == NULL)) {
484         nxt_unit_alert(NULL, "failed to add router_port");
485 
486         goto fail;
487     }
488 
489     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
490     if (nxt_slow_path(queue_fd == -1)) {
491         goto fail;
492     }
493 
494     mem = mmap(NULL, sizeof(nxt_port_queue_t),
495                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
496     if (nxt_slow_path(mem == MAP_FAILED)) {
497         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
498                        strerror(errno), errno);
499 
500         goto fail;
501     }
502 
503     nxt_port_queue_init(mem);
504 
505     rc = nxt_unit_fd_blocking(read_port.in_fd);
506     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
507         goto fail;
508     }
509 
510     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
511     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
512         nxt_unit_alert(NULL, "failed to add read_port");
513 
514         goto fail;
515     }
516 
517     rc = nxt_unit_fd_blocking(ready_port.out_fd);
518     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
519         goto fail;
520     }
521 
522     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
523     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
524         nxt_unit_alert(NULL, "failed to send READY message");
525 
526         goto fail;
527     }
528 
529     nxt_unit_close(ready_port.out_fd);
530     nxt_unit_close(queue_fd);
531 
532     return ctx;
533 
534 fail:
535 
536     if (mem != MAP_FAILED) {
537         munmap(mem, sizeof(nxt_port_queue_t));
538     }
539 
540     if (queue_fd != -1) {
541         nxt_unit_close(queue_fd);
542     }
543 
544     nxt_unit_ctx_release(&lib->main_ctx.ctx);
545 
546     return NULL;
547 }
548 
549 
550 static nxt_unit_impl_t *
551 nxt_unit_create(nxt_unit_init_t *init)
552 {
553     int                   rc;
554     nxt_unit_impl_t       *lib;
555     nxt_unit_callbacks_t  *cb;
556 
557     lib = nxt_unit_malloc(NULL,
558                           sizeof(nxt_unit_impl_t) + init->request_data_size);
559     if (nxt_slow_path(lib == NULL)) {
560         nxt_unit_alert(NULL, "failed to allocate unit struct");
561 
562         return NULL;
563     }
564 
565     rc = pthread_mutex_init(&lib->mutex, NULL);
566     if (nxt_slow_path(rc != 0)) {
567         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
568 
569         goto fail;
570     }
571 
572     lib->unit.data = init->data;
573     lib->callbacks = init->callbacks;
574 
575     lib->request_data_size = init->request_data_size;
576     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
577                             / PORT_MMAP_DATA_SIZE;
578     lib->request_limit = init->request_limit;
579 
580     lib->processes.slot = NULL;
581     lib->ports.slot = NULL;
582 
583     lib->log_fd = STDERR_FILENO;
584 
585     nxt_queue_init(&lib->contexts);
586 
587     lib->use_count = 0;
588     lib->request_count = 0;
589     lib->router_port = NULL;
590     lib->shared_port = NULL;
591 
592     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
593     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
594         pthread_mutex_destroy(&lib->mutex);
595         goto fail;
596     }
597 
598     cb = &lib->callbacks;
599 
600     if (cb->request_handler == NULL) {
601         nxt_unit_alert(NULL, "request_handler is NULL");
602 
603         pthread_mutex_destroy(&lib->mutex);
604         goto fail;
605     }
606 
607     nxt_unit_mmaps_init(&lib->incoming);
608     nxt_unit_mmaps_init(&lib->outgoing);
609 
610     return lib;
611 
612 fail:
613 
614     nxt_unit_free(NULL, lib);
615 
616     return NULL;
617 }
618 
619 
620 static int
621 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
622     void *data)
623 {
624     int  rc;
625 
626     ctx_impl->ctx.data = data;
627     ctx_impl->ctx.unit = &lib->unit;
628 
629     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
630     if (nxt_slow_path(rc != 0)) {
631         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
632 
633         return NXT_UNIT_ERROR;
634     }
635 
636     nxt_unit_lib_use(lib);
637 
638     pthread_mutex_lock(&lib->mutex);
639 
640     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
641 
642     pthread_mutex_unlock(&lib->mutex);
643 
644     ctx_impl->use_count = 1;
645     ctx_impl->wait_items = 0;
646     ctx_impl->online = 1;
647     ctx_impl->ready = 0;
648     ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
649 
650     nxt_queue_init(&ctx_impl->free_req);
651     nxt_queue_init(&ctx_impl->free_ws);
652     nxt_queue_init(&ctx_impl->active_req);
653     nxt_queue_init(&ctx_impl->ready_req);
654     nxt_queue_init(&ctx_impl->pending_rbuf);
655     nxt_queue_init(&ctx_impl->free_rbuf);
656 
657     ctx_impl->free_buf = NULL;
658     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
659     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
660 
661     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
662     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
663 
664     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
665 
666     ctx_impl->req.req.ctx = &ctx_impl->ctx;
667     ctx_impl->req.req.unit = &lib->unit;
668 
669     ctx_impl->read_port = NULL;
670     ctx_impl->requests.slot = 0;
671 
672     return NXT_UNIT_OK;
673 }
674 
675 
676 nxt_inline void
677 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
678 {
679     nxt_unit_ctx_impl_t  *ctx_impl;
680 
681     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
682 
683     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
684 }
685 
686 
687 nxt_inline void
688 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
689 {
690     long                 c;
691     nxt_unit_ctx_impl_t  *ctx_impl;
692 
693     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
694 
695     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
696 
697     if (c == 1) {
698         nxt_unit_ctx_free(ctx_impl);
699     }
700 }
701 
702 
703 nxt_inline void
704 nxt_unit_lib_use(nxt_unit_impl_t *lib)
705 {
706     nxt_atomic_fetch_add(&lib->use_count, 1);
707 }
708 
709 
710 nxt_inline void
711 nxt_unit_lib_release(nxt_unit_impl_t *lib)
712 {
713     long                c;
714     nxt_unit_process_t  *process;
715 
716     c = nxt_atomic_fetch_add(&lib->use_count, -1);
717 
718     if (c == 1) {
719         for ( ;; ) {
720             pthread_mutex_lock(&lib->mutex);
721 
722             process = nxt_unit_process_pop_first(lib);
723             if (process == NULL) {
724                 pthread_mutex_unlock(&lib->mutex);
725 
726                 break;
727             }
728 
729             nxt_unit_remove_process(lib, process);
730         }
731 
732         pthread_mutex_destroy(&lib->mutex);
733 
734         if (nxt_fast_path(lib->router_port != NULL)) {
735             nxt_unit_port_release(lib->router_port);
736         }
737 
738         if (nxt_fast_path(lib->shared_port != NULL)) {
739             nxt_unit_port_release(lib->shared_port);
740         }
741 
742         nxt_unit_mmaps_destroy(&lib->incoming);
743         nxt_unit_mmaps_destroy(&lib->outgoing);
744 
745         nxt_unit_free(NULL, lib);
746     }
747 }
748 
749 
750 nxt_inline void
751 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
752     nxt_unit_mmap_buf_t *mmap_buf)
753 {
754     mmap_buf->next = *head;
755 
756     if (mmap_buf->next != NULL) {
757         mmap_buf->next->prev = &mmap_buf->next;
758     }
759 
760     *head = mmap_buf;
761     mmap_buf->prev = head;
762 }
763 
764 
765 nxt_inline void
766 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
767     nxt_unit_mmap_buf_t *mmap_buf)
768 {
769     while (*prev != NULL) {
770         prev = &(*prev)->next;
771     }
772 
773     nxt_unit_mmap_buf_insert(prev, mmap_buf);
774 }
775 
776 
777 nxt_inline void
778 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
779 {
780     nxt_unit_mmap_buf_t  **prev;
781 
782     prev = mmap_buf->prev;
783 
784     if (mmap_buf->next != NULL) {
785         mmap_buf->next->prev = prev;
786     }
787 
788     if (prev != NULL) {
789         *prev = mmap_buf->next;
790     }
791 }
792 
793 
794 static int
795 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
796     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
797     uint32_t *shm_limit, uint32_t *request_limit)
798 {
799     int       rc;
800     int       ready_fd, router_fd, read_in_fd, read_out_fd;
801     char      *unit_init, *version_end, *vars;
802     size_t    version_length;
803     int64_t   ready_pid, router_pid, read_pid;
804     uint32_t  ready_stream, router_id, ready_id, read_id;
805 
806     unit_init = getenv(NXT_UNIT_INIT_ENV);
807     if (nxt_slow_path(unit_init == NULL)) {
808         nxt_unit_alert(NULL, "%s is not in the current environment",
809                        NXT_UNIT_INIT_ENV);
810 
811         return NXT_UNIT_ERROR;
812     }
813 
814     version_end = strchr(unit_init, ';');
815     if (nxt_slow_path(version_end == NULL)) {
816         nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
817                        NXT_UNIT_INIT_ENV, unit_init);
818 
819         return NXT_UNIT_ERROR;
820     }
821 
822     version_length = version_end - unit_init;
823 
824     rc = version_length != nxt_length(NXT_VERSION)
825          || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
826 
827     if (nxt_slow_path(rc != 0)) {
828         nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
829                        "%.*s, while the app was compiled with libunit %s",
830                        (int) version_length, unit_init, NXT_VERSION);
831 
832         return NXT_UNIT_ERROR;
833     }
834 
835     vars = version_end + 1;
836 
837     rc = sscanf(vars,
838                 "%"PRIu32";"
839                 "%"PRId64",%"PRIu32",%d;"
840                 "%"PRId64",%"PRIu32",%d;"
841                 "%"PRId64",%"PRIu32",%d,%d;"
842                 "%d,%"PRIu32",%"PRIu32,
843                 &ready_stream,
844                 &ready_pid, &ready_id, &ready_fd,
845                 &router_pid, &router_id, &router_fd,
846                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
847                 log_fd, shm_limit, request_limit);
848 
849     if (nxt_slow_path(rc == EOF)) {
850         nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
851                        vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
852 
853         return NXT_UNIT_ERROR;
854     }
855 
856     if (nxt_slow_path(rc != 14)) {
857         nxt_unit_alert(NULL, "invalid number of variables in %s env: "
858                        "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars);
859 
860         return NXT_UNIT_ERROR;
861     }
862 
863     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
864 
865     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
866 
867     ready_port->in_fd = -1;
868     ready_port->out_fd = ready_fd;
869     ready_port->data = NULL;
870 
871     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
872 
873     router_port->in_fd = -1;
874     router_port->out_fd = router_fd;
875     router_port->data = NULL;
876 
877     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
878 
879     read_port->in_fd = read_in_fd;
880     read_port->out_fd = read_out_fd;
881     read_port->data = NULL;
882 
883     *stream = ready_stream;
884 
885     return NXT_UNIT_OK;
886 }
887 
888 
889 static int
890 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
891 {
892     ssize_t          res;
893     nxt_send_oob_t   oob;
894     nxt_port_msg_t   msg;
895     nxt_unit_impl_t  *lib;
896     int              fds[2] = {queue_fd, -1};
897 
898     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
899 
900     msg.stream = stream;
901     msg.pid = lib->pid;
902     msg.reply_port = 0;
903     msg.type = _NXT_PORT_MSG_PROCESS_READY;
904     msg.last = 1;
905     msg.mmap = 0;
906     msg.nf = 0;
907     msg.mf = 0;
908     msg.tracking = 0;
909 
910     nxt_socket_msg_oob_init(&oob, fds);
911 
912     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
913     if (res != sizeof(msg)) {
914         return NXT_UNIT_ERROR;
915     }
916 
917     return NXT_UNIT_OK;
918 }
919 
920 
921 static int
922 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
923     nxt_unit_request_info_t **preq)
924 {
925     int                  rc;
926     pid_t                pid;
927     uint8_t              quit_param;
928     nxt_port_msg_t       *port_msg;
929     nxt_unit_impl_t      *lib;
930     nxt_unit_recv_msg_t  recv_msg;
931 
932     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
933 
934     recv_msg.fd[0] = -1;
935     recv_msg.fd[1] = -1;
936     port_msg = (nxt_port_msg_t *) rbuf->buf;
937 
938     rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
939     if (nxt_slow_path(rc != NXT_OK)) {
940         nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
941         rc = NXT_UNIT_ERROR;
942         goto done;
943     }
944 
945     recv_msg.incoming_buf = NULL;
946 
947     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
948         if (nxt_slow_path(rbuf->size == 0)) {
949             nxt_unit_debug(ctx, "read port closed");
950 
951             nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
952             rc = NXT_UNIT_OK;
953             goto done;
954         }
955 
956         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
957 
958         rc = NXT_UNIT_ERROR;
959         goto done;
960     }
961 
962     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
963                    port_msg->stream, (int) port_msg->type,
964                    recv_msg.fd[0], recv_msg.fd[1]);
965 
966     recv_msg.stream = port_msg->stream;
967     recv_msg.pid = port_msg->pid;
968     recv_msg.reply_port = port_msg->reply_port;
969     recv_msg.last = port_msg->last;
970     recv_msg.mmap = port_msg->mmap;
971 
972     recv_msg.start = port_msg + 1;
973     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
974 
975     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
976         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
977                        port_msg->stream, (int) port_msg->type);
978         rc = NXT_UNIT_ERROR;
979         goto done;
980     }
981 
982     /* Fragmentation is unsupported. */
983     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
984         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
985                        port_msg->stream, (int) port_msg->type);
986         rc = NXT_UNIT_ERROR;
987         goto done;
988     }
989 
990     if (port_msg->mmap) {
991         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
992 
993         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
994             if (rc == NXT_UNIT_AGAIN) {
995                 recv_msg.fd[0] = -1;
996                 recv_msg.fd[1] = -1;
997             }
998 
999             goto done;
1000         }
1001     }
1002 
1003     switch (port_msg->type) {
1004 
1005     case _NXT_PORT_MSG_RPC_READY:
1006         rc = NXT_UNIT_OK;
1007         break;
1008 
1009     case _NXT_PORT_MSG_QUIT:
1010         if (recv_msg.size == sizeof(quit_param)) {
1011             memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1012 
1013         } else {
1014             quit_param = NXT_QUIT_NORMAL;
1015         }
1016 
1017         nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1018                        (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1019 
1020         nxt_unit_quit(ctx, quit_param);
1021 
1022         rc = NXT_UNIT_OK;
1023         break;
1024 
1025     case _NXT_PORT_MSG_NEW_PORT:
1026         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1027         break;
1028 
1029     case _NXT_PORT_MSG_PORT_ACK:
1030         rc = nxt_unit_ctx_ready(ctx);
1031         break;
1032 
1033     case _NXT_PORT_MSG_CHANGE_FILE:
1034         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1035                        port_msg->stream, recv_msg.fd[0]);
1036 
1037         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1038             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1039                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1040                            strerror(errno), errno);
1041 
1042             rc = NXT_UNIT_ERROR;
1043             goto done;
1044         }
1045 
1046         rc = NXT_UNIT_OK;
1047         break;
1048 
1049     case _NXT_PORT_MSG_MMAP:
1050         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1051             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1052                            port_msg->stream, recv_msg.fd[0]);
1053 
1054             rc = NXT_UNIT_ERROR;
1055             goto done;
1056         }
1057 
1058         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1059         break;
1060 
1061     case _NXT_PORT_MSG_REQ_HEADERS:
1062         rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1063         break;
1064 
1065     case _NXT_PORT_MSG_REQ_BODY:
1066         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1067         break;
1068 
1069     case _NXT_PORT_MSG_WEBSOCKET:
1070         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1071         break;
1072 
1073     case _NXT_PORT_MSG_REMOVE_PID:
1074         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1075             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1076                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1077                            (int) sizeof(pid));
1078 
1079             rc = NXT_UNIT_ERROR;
1080             goto done;
1081         }
1082 
1083         memcpy(&pid, recv_msg.start, sizeof(pid));
1084 
1085         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1086                        port_msg->stream, (int) pid);
1087 
1088         nxt_unit_remove_pid(lib, pid);
1089 
1090         rc = NXT_UNIT_OK;
1091         break;
1092 
1093     case _NXT_PORT_MSG_SHM_ACK:
1094         rc = nxt_unit_process_shm_ack(ctx);
1095         break;
1096 
1097     default:
1098         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1099                        port_msg->stream, (int) port_msg->type);
1100 
1101         rc = NXT_UNIT_ERROR;
1102         goto done;
1103     }
1104 
1105 done:
1106 
1107     if (recv_msg.fd[0] != -1) {
1108         nxt_unit_close(recv_msg.fd[0]);
1109     }
1110 
1111     if (recv_msg.fd[1] != -1) {
1112         nxt_unit_close(recv_msg.fd[1]);
1113     }
1114 
1115     while (recv_msg.incoming_buf != NULL) {
1116         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1117     }
1118 
1119     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1120 #if (NXT_DEBUG)
1121         memset(rbuf->buf, 0xAC, rbuf->size);
1122 #endif
1123         nxt_unit_read_buf_release(ctx, rbuf);
1124     }
1125 
1126     return rc;
1127 }
1128 
1129 
1130 static int
1131 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1132 {
1133     void                     *mem;
1134     nxt_unit_impl_t          *lib;
1135     nxt_unit_port_t          new_port, *port;
1136     nxt_port_msg_new_port_t  *new_port_msg;
1137 
1138     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1139         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1140                       "invalid message size (%d)",
1141                       recv_msg->stream, (int) recv_msg->size);
1142 
1143         return NXT_UNIT_ERROR;
1144     }
1145 
1146     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1147         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1148                        recv_msg->stream, recv_msg->fd[0]);
1149 
1150         return NXT_UNIT_ERROR;
1151     }
1152 
1153     new_port_msg = recv_msg->start;
1154 
1155     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1156                    recv_msg->stream, (int) new_port_msg->pid,
1157                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1158 
1159     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1160 
1161     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1162         nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1163 
1164         new_port.in_fd = recv_msg->fd[0];
1165         new_port.out_fd = -1;
1166 
1167         mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
1168                    MAP_SHARED, recv_msg->fd[1], 0);
1169 
1170     } else {
1171         if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
1172                           != NXT_UNIT_OK))
1173         {
1174             return NXT_UNIT_ERROR;
1175         }
1176 
1177         nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1178                               new_port_msg->id);
1179 
1180         new_port.in_fd = -1;
1181         new_port.out_fd = recv_msg->fd[0];
1182 
1183         mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1184                    MAP_SHARED, recv_msg->fd[1], 0);
1185     }
1186 
1187     if (nxt_slow_path(mem == MAP_FAILED)) {
1188         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1189                        strerror(errno), errno);
1190 
1191         return NXT_UNIT_ERROR;
1192     }
1193 
1194     new_port.data = NULL;
1195 
1196     recv_msg->fd[0] = -1;
1197 
1198     port = nxt_unit_add_port(ctx, &new_port, mem);
1199     if (nxt_slow_path(port == NULL)) {
1200         return NXT_UNIT_ERROR;
1201     }
1202 
1203     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1204         lib->shared_port = port;
1205 
1206         return nxt_unit_ctx_ready(ctx);
1207     }
1208 
1209     nxt_unit_port_release(port);
1210 
1211     return NXT_UNIT_OK;
1212 }
1213 
1214 
1215 static int
1216 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1217 {
1218     nxt_unit_impl_t      *lib;
1219     nxt_unit_ctx_impl_t  *ctx_impl;
1220 
1221     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1222 
1223     if (nxt_slow_path(ctx_impl->ready)) {
1224         return NXT_UNIT_OK;
1225     }
1226 
1227     ctx_impl->ready = 1;
1228 
1229     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1230 
1231     /* Call ready_handler() only for main context. */
1232     if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1233         return lib->callbacks.ready_handler(ctx);
1234     }
1235 
1236     if (&lib->main_ctx != ctx_impl) {
1237         /* Check if the main context is already stopped or quit. */
1238         if (nxt_slow_path(!lib->main_ctx.ready)) {
1239             ctx_impl->ready = 0;
1240 
1241             nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1242 
1243             return NXT_UNIT_OK;
1244         }
1245 
1246         if (lib->callbacks.add_port != NULL) {
1247             lib->callbacks.add_port(ctx, lib->shared_port);
1248         }
1249     }
1250 
1251     return NXT_UNIT_OK;
1252 }
1253 
1254 
1255 static int
1256 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1257     nxt_unit_request_info_t **preq)
1258 {
1259     int                           res;
1260     nxt_unit_impl_t               *lib;
1261     nxt_unit_port_id_t            port_id;
1262     nxt_unit_request_t            *r;
1263     nxt_unit_mmap_buf_t           *b;
1264     nxt_unit_request_info_t       *req;
1265     nxt_unit_request_info_impl_t  *req_impl;
1266 
1267     if (nxt_slow_path(recv_msg->mmap == 0)) {
1268         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1269                       recv_msg->stream);
1270 
1271         return NXT_UNIT_ERROR;
1272     }
1273 
1274     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1275         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1276                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1277                       (int) sizeof(nxt_unit_request_t));
1278 
1279         return NXT_UNIT_ERROR;
1280     }
1281 
1282     req_impl = nxt_unit_request_info_get(ctx);
1283     if (nxt_slow_path(req_impl == NULL)) {
1284         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1285                       recv_msg->stream);
1286 
1287         return NXT_UNIT_ERROR;
1288     }
1289 
1290     req = &req_impl->req;
1291 
1292     req->request = recv_msg->start;
1293 
1294     b = recv_msg->incoming_buf;
1295 
1296     req->request_buf = &b->buf;
1297     req->response = NULL;
1298     req->response_buf = NULL;
1299 
1300     r = req->request;
1301 
1302     req->content_length = r->content_length;
1303 
1304     req->content_buf = req->request_buf;
1305     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1306 
1307     req_impl->stream = recv_msg->stream;
1308 
1309     req_impl->outgoing_buf = NULL;
1310 
1311     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1312         b->req = req;
1313     }
1314 
1315     /* "Move" incoming buffer list to req_impl. */
1316     req_impl->incoming_buf = recv_msg->incoming_buf;
1317     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1318     recv_msg->incoming_buf = NULL;
1319 
1320     req->content_fd = recv_msg->fd[0];
1321     recv_msg->fd[0] = -1;
1322 
1323     req->response_max_fields = 0;
1324     req_impl->state = NXT_UNIT_RS_START;
1325     req_impl->websocket = 0;
1326     req_impl->in_hash = 0;
1327 
1328     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1329                    (int) r->method_length,
1330                    (char *) nxt_unit_sptr_get(&r->method),
1331                    (int) r->target_length,
1332                    (char *) nxt_unit_sptr_get(&r->target),
1333                    (int) r->content_length);
1334 
1335     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1336 
1337     res = nxt_unit_request_check_response_port(req, &port_id);
1338     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1339         return NXT_UNIT_ERROR;
1340     }
1341 
1342     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1343         res = nxt_unit_send_req_headers_ack(req);
1344         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1345             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1346 
1347             return NXT_UNIT_ERROR;
1348         }
1349 
1350         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1351 
1352         if (req->content_length
1353             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1354         {
1355             res = nxt_unit_request_hash_add(ctx, req);
1356             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1357                 nxt_unit_req_warn(req, "failed to add request to hash");
1358 
1359                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1360 
1361                 return NXT_UNIT_ERROR;
1362             }
1363 
1364             /*
1365              * If application have separate data handler, we may start
1366              * request processing and process data when it is arrived.
1367              */
1368             if (lib->callbacks.data_handler == NULL) {
1369                 return NXT_UNIT_OK;
1370             }
1371         }
1372 
1373         if (preq == NULL) {
1374             lib->callbacks.request_handler(req);
1375 
1376         } else {
1377             *preq = req;
1378         }
1379     }
1380 
1381     return NXT_UNIT_OK;
1382 }
1383 
1384 
1385 static int
1386 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1387 {
1388     uint64_t                 l;
1389     nxt_unit_impl_t          *lib;
1390     nxt_unit_mmap_buf_t      *b;
1391     nxt_unit_request_info_t  *req;
1392 
1393     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1394     if (req == NULL) {
1395         return NXT_UNIT_OK;
1396     }
1397 
1398     l = req->content_buf->end - req->content_buf->free;
1399 
1400     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1401         b->req = req;
1402         l += b->buf.end - b->buf.free;
1403     }
1404 
1405     if (recv_msg->incoming_buf != NULL) {
1406         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1407 
1408         while (b->next != NULL) {
1409             b = b->next;
1410         }
1411 
1412         /* "Move" incoming buffer list to req_impl. */
1413         b->next = recv_msg->incoming_buf;
1414         b->next->prev = &b->next;
1415 
1416         recv_msg->incoming_buf = NULL;
1417     }
1418 
1419     req->content_fd = recv_msg->fd[0];
1420     recv_msg->fd[0] = -1;
1421 
1422     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1423 
1424     if (lib->callbacks.data_handler != NULL) {
1425         lib->callbacks.data_handler(req);
1426 
1427         return NXT_UNIT_OK;
1428     }
1429 
1430     if (req->content_fd != -1 || l == req->content_length) {
1431         lib->callbacks.request_handler(req);
1432     }
1433 
1434     return NXT_UNIT_OK;
1435 }
1436 
1437 
1438 static int
1439 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1440     nxt_unit_port_id_t *port_id)
1441 {
1442     int                           res;
1443     nxt_unit_ctx_t                *ctx;
1444     nxt_unit_impl_t               *lib;
1445     nxt_unit_port_t               *port;
1446     nxt_unit_process_t            *process;
1447     nxt_unit_ctx_impl_t           *ctx_impl;
1448     nxt_unit_port_impl_t          *port_impl;
1449     nxt_unit_request_info_impl_t  *req_impl;
1450 
1451     ctx = req->ctx;
1452     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1453     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1454 
1455     pthread_mutex_lock(&lib->mutex);
1456 
1457     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1458     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1459 
1460     if (nxt_fast_path(port != NULL)) {
1461         req->response_port = port;
1462 
1463         if (nxt_fast_path(port_impl->ready)) {
1464             pthread_mutex_unlock(&lib->mutex);
1465 
1466             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1467                            (int) port->id.pid, (int) port->id.id);
1468 
1469             return NXT_UNIT_OK;
1470         }
1471 
1472         nxt_unit_debug(ctx, "check_response_port: "
1473                        "port{%d,%d} already requested",
1474                        (int) port->id.pid, (int) port->id.id);
1475 
1476         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1477 
1478         nxt_queue_insert_tail(&port_impl->awaiting_req,
1479                               &req_impl->port_wait_link);
1480 
1481         pthread_mutex_unlock(&lib->mutex);
1482 
1483         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1484 
1485         return NXT_UNIT_AGAIN;
1486     }
1487 
1488     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1489     if (nxt_slow_path(port_impl == NULL)) {
1490         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1491                        (int) sizeof(nxt_unit_port_impl_t));
1492 
1493         pthread_mutex_unlock(&lib->mutex);
1494 
1495         return NXT_UNIT_ERROR;
1496     }
1497 
1498     port = &port_impl->port;
1499 
1500     port->id = *port_id;
1501     port->in_fd = -1;
1502     port->out_fd = -1;
1503     port->data = NULL;
1504 
1505     res = nxt_unit_port_hash_add(&lib->ports, port);
1506     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1507         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1508                        port->id.pid, port->id.id);
1509 
1510         pthread_mutex_unlock(&lib->mutex);
1511 
1512         nxt_unit_free(ctx, port);
1513 
1514         return NXT_UNIT_ERROR;
1515     }
1516 
1517     process = nxt_unit_process_find(lib, port_id->pid, 0);
1518     if (nxt_slow_path(process == NULL)) {
1519         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1520                        port->id.pid);
1521 
1522         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1523 
1524         pthread_mutex_unlock(&lib->mutex);
1525 
1526         nxt_unit_free(ctx, port);
1527 
1528         return NXT_UNIT_ERROR;
1529     }
1530 
1531     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1532 
1533     port_impl->process = process;
1534     port_impl->queue = NULL;
1535     port_impl->from_socket = 0;
1536     port_impl->socket_rbuf = NULL;
1537 
1538     nxt_queue_init(&port_impl->awaiting_req);
1539 
1540     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1541 
1542     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1543 
1544     port_impl->use_count = 2;
1545     port_impl->ready = 0;
1546 
1547     req->response_port = port;
1548 
1549     pthread_mutex_unlock(&lib->mutex);
1550 
1551     res = nxt_unit_get_port(ctx, port_id);
1552     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1553         return NXT_UNIT_ERROR;
1554     }
1555 
1556     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1557 
1558     return NXT_UNIT_AGAIN;
1559 }
1560 
1561 
1562 static int
1563 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1564 {
1565     ssize_t                       res;
1566     nxt_port_msg_t                msg;
1567     nxt_unit_impl_t               *lib;
1568     nxt_unit_ctx_impl_t           *ctx_impl;
1569     nxt_unit_request_info_impl_t  *req_impl;
1570 
1571     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1572     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1573     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1574 
1575     memset(&msg, 0, sizeof(nxt_port_msg_t));
1576 
1577     msg.stream = req_impl->stream;
1578     msg.pid = lib->pid;
1579     msg.reply_port = ctx_impl->read_port->id.id;
1580     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1581 
1582     res = nxt_unit_port_send(req->ctx, req->response_port,
1583                              &msg, sizeof(msg), NULL);
1584     if (nxt_slow_path(res != sizeof(msg))) {
1585         return NXT_UNIT_ERROR;
1586     }
1587 
1588     return NXT_UNIT_OK;
1589 }
1590 
1591 
1592 static int
1593 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1594 {
1595     size_t                           hsize;
1596     nxt_unit_impl_t                  *lib;
1597     nxt_unit_mmap_buf_t              *b;
1598     nxt_unit_callbacks_t             *cb;
1599     nxt_unit_request_info_t          *req;
1600     nxt_unit_request_info_impl_t     *req_impl;
1601     nxt_unit_websocket_frame_impl_t  *ws_impl;
1602 
1603     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1604     if (nxt_slow_path(req == NULL)) {
1605         return NXT_UNIT_OK;
1606     }
1607 
1608     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1609 
1610     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1611     cb = &lib->callbacks;
1612 
1613     if (cb->websocket_handler && recv_msg->size >= 2) {
1614         ws_impl = nxt_unit_websocket_frame_get(ctx);
1615         if (nxt_slow_path(ws_impl == NULL)) {
1616             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1617                           req_impl->stream);
1618 
1619             return NXT_UNIT_ERROR;
1620         }
1621 
1622         ws_impl->ws.req = req;
1623 
1624         ws_impl->buf = NULL;
1625 
1626         if (recv_msg->mmap) {
1627             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1628                 b->req = req;
1629             }
1630 
1631             /* "Move" incoming buffer list to ws_impl. */
1632             ws_impl->buf = recv_msg->incoming_buf;
1633             ws_impl->buf->prev = &ws_impl->buf;
1634             recv_msg->incoming_buf = NULL;
1635 
1636             b = ws_impl->buf;
1637 
1638         } else {
1639             b = nxt_unit_mmap_buf_get(ctx);
1640             if (nxt_slow_path(b == NULL)) {
1641                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1642                                req_impl->stream);
1643 
1644                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1645 
1646                 return NXT_UNIT_ERROR;
1647             }
1648 
1649             b->req = req;
1650             b->buf.start = recv_msg->start;
1651             b->buf.free = b->buf.start;
1652             b->buf.end = b->buf.start + recv_msg->size;
1653 
1654             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1655         }
1656 
1657         ws_impl->ws.header = (void *) b->buf.start;
1658         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1659             ws_impl->ws.header);
1660 
1661         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1662 
1663         if (ws_impl->ws.header->mask) {
1664             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1665 
1666         } else {
1667             ws_impl->ws.mask = NULL;
1668         }
1669 
1670         b->buf.free += hsize;
1671 
1672         ws_impl->ws.content_buf = &b->buf;
1673         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1674 
1675         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1676                            "payload_len=%"PRIu64,
1677                             ws_impl->ws.header->opcode,
1678                             ws_impl->ws.payload_len);
1679 
1680         cb->websocket_handler(&ws_impl->ws);
1681     }
1682 
1683     if (recv_msg->last) {
1684         if (cb->close_handler) {
1685             nxt_unit_req_debug(req, "close_handler");
1686 
1687             cb->close_handler(req);
1688 
1689         } else {
1690             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1691         }
1692     }
1693 
1694     return NXT_UNIT_OK;
1695 }
1696 
1697 
1698 static int
1699 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1700 {
1701     nxt_unit_impl_t       *lib;
1702     nxt_unit_callbacks_t  *cb;
1703 
1704     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1705     cb = &lib->callbacks;
1706 
1707     if (cb->shm_ack_handler != NULL) {
1708         cb->shm_ack_handler(ctx);
1709     }
1710 
1711     return NXT_UNIT_OK;
1712 }
1713 
1714 
1715 static nxt_unit_request_info_impl_t *
1716 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1717 {
1718     nxt_unit_impl_t               *lib;
1719     nxt_queue_link_t              *lnk;
1720     nxt_unit_ctx_impl_t           *ctx_impl;
1721     nxt_unit_request_info_impl_t  *req_impl;
1722 
1723     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1724 
1725     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1726 
1727     pthread_mutex_lock(&ctx_impl->mutex);
1728 
1729     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1730         pthread_mutex_unlock(&ctx_impl->mutex);
1731 
1732         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1733                                         + lib->request_data_size);
1734         if (nxt_slow_path(req_impl == NULL)) {
1735             return NULL;
1736         }
1737 
1738         req_impl->req.unit = ctx->unit;
1739         req_impl->req.ctx = ctx;
1740 
1741         pthread_mutex_lock(&ctx_impl->mutex);
1742 
1743     } else {
1744         lnk = nxt_queue_first(&ctx_impl->free_req);
1745         nxt_queue_remove(lnk);
1746 
1747         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1748     }
1749 
1750     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1751 
1752     pthread_mutex_unlock(&ctx_impl->mutex);
1753 
1754     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1755 
1756     return req_impl;
1757 }
1758 
1759 
1760 static void
1761 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1762 {
1763     nxt_unit_ctx_t                *ctx;
1764     nxt_unit_ctx_impl_t           *ctx_impl;
1765     nxt_unit_request_info_impl_t  *req_impl;
1766 
1767     ctx = req->ctx;
1768     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1769     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1770 
1771     req->response = NULL;
1772     req->response_buf = NULL;
1773 
1774     if (req_impl->in_hash) {
1775         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1776     }
1777 
1778     while (req_impl->outgoing_buf != NULL) {
1779         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1780     }
1781 
1782     while (req_impl->incoming_buf != NULL) {
1783         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1784     }
1785 
1786     if (req->content_fd != -1) {
1787         nxt_unit_close(req->content_fd);
1788 
1789         req->content_fd = -1;
1790     }
1791 
1792     if (req->response_port != NULL) {
1793         nxt_unit_port_release(req->response_port);
1794 
1795         req->response_port = NULL;
1796     }
1797 
1798     req_impl->state = NXT_UNIT_RS_RELEASED;
1799 
1800     pthread_mutex_lock(&ctx_impl->mutex);
1801 
1802     nxt_queue_remove(&req_impl->link);
1803 
1804     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1805 
1806     pthread_mutex_unlock(&ctx_impl->mutex);
1807 
1808     if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1809         nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1810     }
1811 }
1812 
1813 
1814 static void
1815 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1816 {
1817     nxt_unit_ctx_impl_t  *ctx_impl;
1818 
1819     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1820 
1821     nxt_queue_remove(&req_impl->link);
1822 
1823     if (req_impl != &ctx_impl->req) {
1824         nxt_unit_free(&ctx_impl->ctx, req_impl);
1825     }
1826 }
1827 
1828 
1829 static nxt_unit_websocket_frame_impl_t *
1830 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1831 {
1832     nxt_queue_link_t                 *lnk;
1833     nxt_unit_ctx_impl_t              *ctx_impl;
1834     nxt_unit_websocket_frame_impl_t  *ws_impl;
1835 
1836     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1837 
1838     pthread_mutex_lock(&ctx_impl->mutex);
1839 
1840     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1841         pthread_mutex_unlock(&ctx_impl->mutex);
1842 
1843         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1844         if (nxt_slow_path(ws_impl == NULL)) {
1845             return NULL;
1846         }
1847 
1848     } else {
1849         lnk = nxt_queue_first(&ctx_impl->free_ws);
1850         nxt_queue_remove(lnk);
1851 
1852         pthread_mutex_unlock(&ctx_impl->mutex);
1853 
1854         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1855     }
1856 
1857     ws_impl->ctx_impl = ctx_impl;
1858 
1859     return ws_impl;
1860 }
1861 
1862 
1863 static void
1864 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1865 {
1866     nxt_unit_websocket_frame_impl_t  *ws_impl;
1867 
1868     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1869 
1870     while (ws_impl->buf != NULL) {
1871         nxt_unit_mmap_buf_free(ws_impl->buf);
1872     }
1873 
1874     ws->req = NULL;
1875 
1876     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1877 
1878     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1879 
1880     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1881 }
1882 
1883 
1884 static void
1885 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1886     nxt_unit_websocket_frame_impl_t *ws_impl)
1887 {
1888     nxt_queue_remove(&ws_impl->link);
1889 
1890     nxt_unit_free(ctx, ws_impl);
1891 }
1892 
1893 
1894 uint16_t
1895 nxt_unit_field_hash(const char *name, size_t name_length)
1896 {
1897     u_char      ch;
1898     uint32_t    hash;
1899     const char  *p, *end;
1900 
1901     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1902     end = name + name_length;
1903 
1904     for (p = name; p < end; p++) {
1905         ch = *p;
1906         hash = (hash << 4) + hash + nxt_lowcase(ch);
1907     }
1908 
1909     hash = (hash >> 16) ^ hash;
1910 
1911     return hash;
1912 }
1913 
1914 
1915 void
1916 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1917 {
1918     char                *name;
1919     uint32_t            i, j;
1920     nxt_unit_field_t    *fields, f;
1921     nxt_unit_request_t  *r;
1922 
1923     static nxt_str_t  content_length = nxt_string("content-length");
1924     static nxt_str_t  content_type = nxt_string("content-type");
1925     static nxt_str_t  cookie = nxt_string("cookie");
1926 
1927     nxt_unit_req_debug(req, "group_dup_fields");
1928 
1929     r = req->request;
1930     fields = r->fields;
1931 
1932     for (i = 0; i < r->fields_count; i++) {
1933         name = nxt_unit_sptr_get(&fields[i].name);
1934 
1935         switch (fields[i].hash) {
1936         case NXT_UNIT_HASH_CONTENT_LENGTH:
1937             if (fields[i].name_length == content_length.length
1938                 && nxt_unit_memcasecmp(name, content_length.start,
1939                                        content_length.length) == 0)
1940             {
1941                 r->content_length_field = i;
1942             }
1943 
1944             break;
1945 
1946         case NXT_UNIT_HASH_CONTENT_TYPE:
1947             if (fields[i].name_length == content_type.length
1948                 && nxt_unit_memcasecmp(name, content_type.start,
1949                                        content_type.length) == 0)
1950             {
1951                 r->content_type_field = i;
1952             }
1953 
1954             break;
1955 
1956         case NXT_UNIT_HASH_COOKIE:
1957             if (fields[i].name_length == cookie.length
1958                 && nxt_unit_memcasecmp(name, cookie.start,
1959                                        cookie.length) == 0)
1960             {
1961                 r->cookie_field = i;
1962             }
1963 
1964             break;
1965         }
1966 
1967         for (j = i + 1; j < r->fields_count; j++) {
1968             if (fields[i].hash != fields[j].hash
1969                 || fields[i].name_length != fields[j].name_length
1970                 || nxt_unit_memcasecmp(name,
1971                                        nxt_unit_sptr_get(&fields[j].name),
1972                                        fields[j].name_length) != 0)
1973             {
1974                 continue;
1975             }
1976 
1977             f = fields[j];
1978             f.value.offset += (j - (i + 1)) * sizeof(f);
1979 
1980             while (j > i + 1) {
1981                 fields[j] = fields[j - 1];
1982                 fields[j].name.offset -= sizeof(f);
1983                 fields[j].value.offset -= sizeof(f);
1984                 j--;
1985             }
1986 
1987             fields[j] = f;
1988 
1989             /* Assign the same name pointer for further grouping simplicity. */
1990             nxt_unit_sptr_set(&fields[j].name, name);
1991 
1992             i++;
1993         }
1994     }
1995 }
1996 
1997 
1998 int
1999 nxt_unit_response_init(nxt_unit_request_info_t *req,
2000     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2001 {
2002     uint32_t                      buf_size;
2003     nxt_unit_buf_t                *buf;
2004     nxt_unit_request_info_impl_t  *req_impl;
2005 
2006     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2007 
2008     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2009         nxt_unit_req_warn(req, "init: response already sent");
2010 
2011         return NXT_UNIT_ERROR;
2012     }
2013 
2014     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2015                        (int) max_fields_count, (int) max_fields_size);
2016 
2017     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2018         nxt_unit_req_debug(req, "duplicate response init");
2019     }
2020 
2021     /*
2022      * Each field name and value 0-terminated by libunit,
2023      * this is the reason of '+ 2' below.
2024      */
2025     buf_size = sizeof(nxt_unit_response_t)
2026                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2027                + max_fields_size;
2028 
2029     if (nxt_slow_path(req->response_buf != NULL)) {
2030         buf = req->response_buf;
2031 
2032         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2033             goto init_response;
2034         }
2035 
2036         nxt_unit_buf_free(buf);
2037 
2038         req->response_buf = NULL;
2039         req->response = NULL;
2040         req->response_max_fields = 0;
2041 
2042         req_impl->state = NXT_UNIT_RS_START;
2043     }
2044 
2045     buf = nxt_unit_response_buf_alloc(req, buf_size);
2046     if (nxt_slow_path(buf == NULL)) {
2047         return NXT_UNIT_ERROR;
2048     }
2049 
2050 init_response:
2051 
2052     memset(buf->start, 0, sizeof(nxt_unit_response_t));
2053 
2054     req->response_buf = buf;
2055 
2056     req->response = (nxt_unit_response_t *) buf->start;
2057     req->response->status = status;
2058 
2059     buf->free = buf->start + sizeof(nxt_unit_response_t)
2060                 + max_fields_count * sizeof(nxt_unit_field_t);
2061 
2062     req->response_max_fields = max_fields_count;
2063     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2064 
2065     return NXT_UNIT_OK;
2066 }
2067 
2068 
2069 int
2070 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2071     uint32_t max_fields_count, uint32_t max_fields_size)
2072 {
2073     char                          *p;
2074     uint32_t                      i, buf_size;
2075     nxt_unit_buf_t                *buf;
2076     nxt_unit_field_t              *f, *src;
2077     nxt_unit_response_t           *resp;
2078     nxt_unit_request_info_impl_t  *req_impl;
2079 
2080     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2081 
2082     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2083         nxt_unit_req_warn(req, "realloc: response not init");
2084 
2085         return NXT_UNIT_ERROR;
2086     }
2087 
2088     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2089         nxt_unit_req_warn(req, "realloc: response already sent");
2090 
2091         return NXT_UNIT_ERROR;
2092     }
2093 
2094     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2095         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2096 
2097         return NXT_UNIT_ERROR;
2098     }
2099 
2100     /*
2101      * Each field name and value 0-terminated by libunit,
2102      * this is the reason of '+ 2' below.
2103      */
2104     buf_size = sizeof(nxt_unit_response_t)
2105                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2106                + max_fields_size;
2107 
2108     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2109 
2110     buf = nxt_unit_response_buf_alloc(req, buf_size);
2111     if (nxt_slow_path(buf == NULL)) {
2112         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2113         return NXT_UNIT_ERROR;
2114     }
2115 
2116     resp = (nxt_unit_response_t *) buf->start;
2117 
2118     memset(resp, 0, sizeof(nxt_unit_response_t));
2119 
2120     resp->status = req->response->status;
2121     resp->content_length = req->response->content_length;
2122 
2123     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2124     f = resp->fields;
2125 
2126     for (i = 0; i < req->response->fields_count; i++) {
2127         src = req->response->fields + i;
2128 
2129         if (nxt_slow_path(src->skip != 0)) {
2130             continue;
2131         }
2132 
2133         if (nxt_slow_path(src->name_length + src->value_length + 2
2134                           > (uint32_t) (buf->end - p)))
2135         {
2136             nxt_unit_req_warn(req, "realloc: not enough space for field"
2137                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2138                   i, src, src->name_length, src->value_length);
2139 
2140             goto fail;
2141         }
2142 
2143         nxt_unit_sptr_set(&f->name, p);
2144         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2145         *p++ = '\0';
2146 
2147         nxt_unit_sptr_set(&f->value, p);
2148         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2149         *p++ = '\0';
2150 
2151         f->hash = src->hash;
2152         f->skip = 0;
2153         f->name_length = src->name_length;
2154         f->value_length = src->value_length;
2155 
2156         resp->fields_count++;
2157         f++;
2158     }
2159 
2160     if (req->response->piggyback_content_length > 0) {
2161         if (nxt_slow_path(req->response->piggyback_content_length
2162                           > (uint32_t) (buf->end - p)))
2163         {
2164             nxt_unit_req_warn(req, "realloc: not enought space for content"
2165                   " #%"PRIu32", %"PRIu32" required",
2166                   i, req->response->piggyback_content_length);
2167 
2168             goto fail;
2169         }
2170 
2171         resp->piggyback_content_length =
2172                                        req->response->piggyback_content_length;
2173 
2174         nxt_unit_sptr_set(&resp->piggyback_content, p);
2175         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2176                        req->response->piggyback_content_length);
2177     }
2178 
2179     buf->free = p;
2180 
2181     nxt_unit_buf_free(req->response_buf);
2182 
2183     req->response = resp;
2184     req->response_buf = buf;
2185     req->response_max_fields = max_fields_count;
2186 
2187     return NXT_UNIT_OK;
2188 
2189 fail:
2190 
2191     nxt_unit_buf_free(buf);
2192 
2193     return NXT_UNIT_ERROR;
2194 }
2195 
2196 
2197 int
2198 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2199 {
2200     nxt_unit_request_info_impl_t  *req_impl;
2201 
2202     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2203 
2204     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2205 }
2206 
2207 
2208 int
2209 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2210     const char *name, uint8_t name_length,
2211     const char *value, uint32_t value_length)
2212 {
2213     nxt_unit_buf_t                *buf;
2214     nxt_unit_field_t              *f;
2215     nxt_unit_response_t           *resp;
2216     nxt_unit_request_info_impl_t  *req_impl;
2217 
2218     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2219 
2220     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2221         nxt_unit_req_warn(req, "add_field: response not initialized or "
2222                           "already sent");
2223 
2224         return NXT_UNIT_ERROR;
2225     }
2226 
2227     resp = req->response;
2228 
2229     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2230         nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2231                           (int) resp->fields_count);
2232 
2233         return NXT_UNIT_ERROR;
2234     }
2235 
2236     buf = req->response_buf;
2237 
2238     if (nxt_slow_path(name_length + value_length + 2
2239                       > (uint32_t) (buf->end - buf->free)))
2240     {
2241         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2242 
2243         return NXT_UNIT_ERROR;
2244     }
2245 
2246     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2247                        resp->fields_count,
2248                        (int) name_length, name,
2249                        (int) value_length, value);
2250 
2251     f = resp->fields + resp->fields_count;
2252 
2253     nxt_unit_sptr_set(&f->name, buf->free);
2254     buf->free = nxt_cpymem(buf->free, name, name_length);
2255     *buf->free++ = '\0';
2256 
2257     nxt_unit_sptr_set(&f->value, buf->free);
2258     buf->free = nxt_cpymem(buf->free, value, value_length);
2259     *buf->free++ = '\0';
2260 
2261     f->hash = nxt_unit_field_hash(name, name_length);
2262     f->skip = 0;
2263     f->name_length = name_length;
2264     f->value_length = value_length;
2265 
2266     resp->fields_count++;
2267 
2268     return NXT_UNIT_OK;
2269 }
2270 
2271 
2272 int
2273 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2274     const void* src, uint32_t size)
2275 {
2276     nxt_unit_buf_t                *buf;
2277     nxt_unit_response_t           *resp;
2278     nxt_unit_request_info_impl_t  *req_impl;
2279 
2280     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2281 
2282     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2283         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2284 
2285         return NXT_UNIT_ERROR;
2286     }
2287 
2288     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2289         nxt_unit_req_warn(req, "add_content: response already sent");
2290 
2291         return NXT_UNIT_ERROR;
2292     }
2293 
2294     buf = req->response_buf;
2295 
2296     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2297         nxt_unit_req_warn(req, "add_content: buffer overflow");
2298 
2299         return NXT_UNIT_ERROR;
2300     }
2301 
2302     resp = req->response;
2303 
2304     if (resp->piggyback_content_length == 0) {
2305         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2306         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2307     }
2308 
2309     resp->piggyback_content_length += size;
2310 
2311     buf->free = nxt_cpymem(buf->free, src, size);
2312 
2313     return NXT_UNIT_OK;
2314 }
2315 
2316 
2317 int
2318 nxt_unit_response_send(nxt_unit_request_info_t *req)
2319 {
2320     int                           rc;
2321     nxt_unit_mmap_buf_t           *mmap_buf;
2322     nxt_unit_request_info_impl_t  *req_impl;
2323 
2324     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2325 
2326     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2327         nxt_unit_req_warn(req, "send: response is not initialized yet");
2328 
2329         return NXT_UNIT_ERROR;
2330     }
2331 
2332     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2333         nxt_unit_req_warn(req, "send: response already sent");
2334 
2335         return NXT_UNIT_ERROR;
2336     }
2337 
2338     if (req->request->websocket_handshake && req->response->status == 101) {
2339         nxt_unit_response_upgrade(req);
2340     }
2341 
2342     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2343                        req->response->fields_count,
2344                        (int) (req->response_buf->free
2345                               - req->response_buf->start));
2346 
2347     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2348 
2349     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2350     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2351         req->response = NULL;
2352         req->response_buf = NULL;
2353         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2354 
2355         nxt_unit_mmap_buf_free(mmap_buf);
2356     }
2357 
2358     return rc;
2359 }
2360 
2361 
2362 int
2363 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2364 {
2365     nxt_unit_request_info_impl_t  *req_impl;
2366 
2367     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2368 
2369     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2370 }
2371 
2372 
2373 nxt_unit_buf_t *
2374 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2375 {
2376     int                           rc;
2377     nxt_unit_mmap_buf_t           *mmap_buf;
2378     nxt_unit_request_info_impl_t  *req_impl;
2379 
2380     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2381         nxt_unit_req_warn(req, "response_buf_alloc: "
2382                           "requested buffer (%"PRIu32") too big", size);
2383 
2384         return NULL;
2385     }
2386 
2387     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2388 
2389     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2390 
2391     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2392     if (nxt_slow_path(mmap_buf == NULL)) {
2393         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2394 
2395         return NULL;
2396     }
2397 
2398     mmap_buf->req = req;
2399 
2400     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2401 
2402     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2403                                    size, size, mmap_buf,
2404                                    NULL);
2405     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2406         nxt_unit_mmap_buf_release(mmap_buf);
2407 
2408         nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2409 
2410         return NULL;
2411     }
2412 
2413     return &mmap_buf->buf;
2414 }
2415 
2416 
2417 static nxt_unit_mmap_buf_t *
2418 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2419 {
2420     nxt_unit_mmap_buf_t  *mmap_buf;
2421     nxt_unit_ctx_impl_t  *ctx_impl;
2422 
2423     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2424 
2425     pthread_mutex_lock(&ctx_impl->mutex);
2426 
2427     if (ctx_impl->free_buf == NULL) {
2428         pthread_mutex_unlock(&ctx_impl->mutex);
2429 
2430         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2431         if (nxt_slow_path(mmap_buf == NULL)) {
2432             return NULL;
2433         }
2434 
2435     } else {
2436         mmap_buf = ctx_impl->free_buf;
2437 
2438         nxt_unit_mmap_buf_unlink(mmap_buf);
2439 
2440         pthread_mutex_unlock(&ctx_impl->mutex);
2441     }
2442 
2443     mmap_buf->ctx_impl = ctx_impl;
2444 
2445     mmap_buf->hdr = NULL;
2446     mmap_buf->free_ptr = NULL;
2447 
2448     return mmap_buf;
2449 }
2450 
2451 
2452 static void
2453 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2454 {
2455     nxt_unit_mmap_buf_unlink(mmap_buf);
2456 
2457     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2458 
2459     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2460 
2461     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2462 }
2463 
2464 
2465 int
2466 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2467 {
2468     return req->request->websocket_handshake;
2469 }
2470 
2471 
2472 int
2473 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2474 {
2475     int                           rc;
2476     nxt_unit_request_info_impl_t  *req_impl;
2477 
2478     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2479 
2480     if (nxt_slow_path(req_impl->websocket != 0)) {
2481         nxt_unit_req_debug(req, "upgrade: already upgraded");
2482 
2483         return NXT_UNIT_OK;
2484     }
2485 
2486     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2487         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2488 
2489         return NXT_UNIT_ERROR;
2490     }
2491 
2492     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2493         nxt_unit_req_warn(req, "upgrade: response already sent");
2494 
2495         return NXT_UNIT_ERROR;
2496     }
2497 
2498     rc = nxt_unit_request_hash_add(req->ctx, req);
2499     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2500         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2501 
2502         return NXT_UNIT_ERROR;
2503     }
2504 
2505     req_impl->websocket = 1;
2506 
2507     req->response->status = 101;
2508 
2509     return NXT_UNIT_OK;
2510 }
2511 
2512 
2513 int
2514 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2515 {
2516     nxt_unit_request_info_impl_t  *req_impl;
2517 
2518     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2519 
2520     return req_impl->websocket;
2521 }
2522 
2523 
2524 nxt_unit_request_info_t *
2525 nxt_unit_get_request_info_from_data(void *data)
2526 {
2527     nxt_unit_request_info_impl_t  *req_impl;
2528 
2529     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2530 
2531     return &req_impl->req;
2532 }
2533 
2534 
2535 int
2536 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2537 {
2538     int                           rc;
2539     nxt_unit_mmap_buf_t           *mmap_buf;
2540     nxt_unit_request_info_t       *req;
2541     nxt_unit_request_info_impl_t  *req_impl;
2542 
2543     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2544 
2545     req = mmap_buf->req;
2546     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2547 
2548     nxt_unit_req_debug(req, "buf_send: %d bytes",
2549                        (int) (buf->free - buf->start));
2550 
2551     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2552         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2553 
2554         return NXT_UNIT_ERROR;
2555     }
2556 
2557     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2558         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2559 
2560         return NXT_UNIT_ERROR;
2561     }
2562 
2563     if (nxt_fast_path(buf->free > buf->start)) {
2564         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2565         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2566             return rc;
2567         }
2568     }
2569 
2570     nxt_unit_mmap_buf_free(mmap_buf);
2571 
2572     return NXT_UNIT_OK;
2573 }
2574 
2575 
2576 static void
2577 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2578 {
2579     int                      rc;
2580     nxt_unit_mmap_buf_t      *mmap_buf;
2581     nxt_unit_request_info_t  *req;
2582 
2583     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2584 
2585     req = mmap_buf->req;
2586 
2587     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2588     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2589         nxt_unit_mmap_buf_free(mmap_buf);
2590 
2591         nxt_unit_request_info_release(req);
2592 
2593     } else {
2594         nxt_unit_request_done(req, rc);
2595     }
2596 }
2597 
2598 
2599 static int
2600 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2601     nxt_unit_mmap_buf_t *mmap_buf, int last)
2602 {
2603     struct {
2604         nxt_port_msg_t       msg;
2605         nxt_port_mmap_msg_t  mmap_msg;
2606     } m;
2607 
2608     int                           rc;
2609     u_char                        *last_used, *first_free;
2610     ssize_t                       res;
2611     nxt_chunk_id_t                first_free_chunk;
2612     nxt_unit_buf_t                *buf;
2613     nxt_unit_impl_t               *lib;
2614     nxt_port_mmap_header_t        *hdr;
2615     nxt_unit_request_info_impl_t  *req_impl;
2616 
2617     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2618     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2619 
2620     buf = &mmap_buf->buf;
2621     hdr = mmap_buf->hdr;
2622 
2623     m.mmap_msg.size = buf->free - buf->start;
2624 
2625     m.msg.stream = req_impl->stream;
2626     m.msg.pid = lib->pid;
2627     m.msg.reply_port = 0;
2628     m.msg.type = _NXT_PORT_MSG_DATA;
2629     m.msg.last = last != 0;
2630     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2631     m.msg.nf = 0;
2632     m.msg.mf = 0;
2633     m.msg.tracking = 0;
2634 
2635     rc = NXT_UNIT_ERROR;
2636 
2637     if (m.msg.mmap) {
2638         m.mmap_msg.mmap_id = hdr->id;
2639         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2640                                                      (u_char *) buf->start);
2641 
2642         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2643                        req_impl->stream,
2644                        (int) m.mmap_msg.mmap_id,
2645                        (int) m.mmap_msg.chunk_id,
2646                        (int) m.mmap_msg.size);
2647 
2648         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2649                                  NULL);
2650         if (nxt_slow_path(res != sizeof(m))) {
2651             goto free_buf;
2652         }
2653 
2654         last_used = (u_char *) buf->free - 1;
2655         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2656 
2657         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2658             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2659 
2660             buf->start = (char *) first_free;
2661             buf->free = buf->start;
2662 
2663             if (buf->end < buf->start) {
2664                 buf->end = buf->start;
2665             }
2666 
2667         } else {
2668             buf->start = NULL;
2669             buf->free = NULL;
2670             buf->end = NULL;
2671 
2672             mmap_buf->hdr = NULL;
2673         }
2674 
2675         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2676                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2677 
2678         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2679                        (int) lib->outgoing.allocated_chunks);
2680 
2681     } else {
2682         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2683                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2684         {
2685             nxt_unit_alert(req->ctx,
2686                            "#%"PRIu32": failed to send plain memory buffer"
2687                            ": no space reserved for message header",
2688                            req_impl->stream);
2689 
2690             goto free_buf;
2691         }
2692 
2693         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2694 
2695         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2696                        req_impl->stream,
2697                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2698 
2699         res = nxt_unit_port_send(req->ctx, req->response_port,
2700                                  buf->start - sizeof(m.msg),
2701                                  m.mmap_msg.size + sizeof(m.msg), NULL);
2702 
2703         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2704             goto free_buf;
2705         }
2706     }
2707 
2708     rc = NXT_UNIT_OK;
2709 
2710 free_buf:
2711 
2712     nxt_unit_free_outgoing_buf(mmap_buf);
2713 
2714     return rc;
2715 }
2716 
2717 
2718 void
2719 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2720 {
2721     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2722 }
2723 
2724 
2725 static void
2726 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2727 {
2728     nxt_unit_free_outgoing_buf(mmap_buf);
2729 
2730     nxt_unit_mmap_buf_release(mmap_buf);
2731 }
2732 
2733 
2734 static void
2735 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2736 {
2737     if (mmap_buf->hdr != NULL) {
2738         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2739                               mmap_buf->hdr, mmap_buf->buf.start,
2740                               mmap_buf->buf.end - mmap_buf->buf.start);
2741 
2742         mmap_buf->hdr = NULL;
2743 
2744         return;
2745     }
2746 
2747     if (mmap_buf->free_ptr != NULL) {
2748         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2749 
2750         mmap_buf->free_ptr = NULL;
2751     }
2752 }
2753 
2754 
2755 static nxt_unit_read_buf_t *
2756 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2757 {
2758     nxt_unit_ctx_impl_t  *ctx_impl;
2759     nxt_unit_read_buf_t  *rbuf;
2760 
2761     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2762 
2763     pthread_mutex_lock(&ctx_impl->mutex);
2764 
2765     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2766 
2767     pthread_mutex_unlock(&ctx_impl->mutex);
2768 
2769     rbuf->oob.size = 0;
2770 
2771     return rbuf;
2772 }
2773 
2774 
2775 static nxt_unit_read_buf_t *
2776 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2777 {
2778     nxt_queue_link_t     *link;
2779     nxt_unit_read_buf_t  *rbuf;
2780 
2781     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2782         link = nxt_queue_first(&ctx_impl->free_rbuf);
2783         nxt_queue_remove(link);
2784 
2785         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2786 
2787         return rbuf;
2788     }
2789 
2790     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2791 
2792     if (nxt_fast_path(rbuf != NULL)) {
2793         rbuf->ctx_impl = ctx_impl;
2794     }
2795 
2796     return rbuf;
2797 }
2798 
2799 
2800 static void
2801 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2802     nxt_unit_read_buf_t *rbuf)
2803 {
2804     nxt_unit_ctx_impl_t  *ctx_impl;
2805 
2806     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2807 
2808     pthread_mutex_lock(&ctx_impl->mutex);
2809 
2810     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2811 
2812     pthread_mutex_unlock(&ctx_impl->mutex);
2813 }
2814 
2815 
2816 nxt_unit_buf_t *
2817 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2818 {
2819     nxt_unit_mmap_buf_t  *mmap_buf;
2820 
2821     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2822 
2823     if (mmap_buf->next == NULL) {
2824         return NULL;
2825     }
2826 
2827     return &mmap_buf->next->buf;
2828 }
2829 
2830 
2831 uint32_t
2832 nxt_unit_buf_max(void)
2833 {
2834     return PORT_MMAP_DATA_SIZE;
2835 }
2836 
2837 
2838 uint32_t
2839 nxt_unit_buf_min(void)
2840 {
2841     return PORT_MMAP_CHUNK_SIZE;
2842 }
2843 
2844 
2845 int
2846 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2847     size_t size)
2848 {
2849     ssize_t  res;
2850 
2851     res = nxt_unit_response_write_nb(req, start, size, size);
2852 
2853     return res < 0 ? -res : NXT_UNIT_OK;
2854 }
2855 
2856 
2857 ssize_t
2858 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2859     size_t size, size_t min_size)
2860 {
2861     int                           rc;
2862     ssize_t                       sent;
2863     uint32_t                      part_size, min_part_size, buf_size;
2864     const char                    *part_start;
2865     nxt_unit_mmap_buf_t           mmap_buf;
2866     nxt_unit_request_info_impl_t  *req_impl;
2867     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2868 
2869     nxt_unit_req_debug(req, "write: %d", (int) size);
2870 
2871     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2872 
2873     part_start = start;
2874     sent = 0;
2875 
2876     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2877         nxt_unit_req_alert(req, "write: response not initialized yet");
2878 
2879         return -NXT_UNIT_ERROR;
2880     }
2881 
2882     /* Check if response is not send yet. */
2883     if (nxt_slow_path(req->response_buf != NULL)) {
2884         part_size = req->response_buf->end - req->response_buf->free;
2885         part_size = nxt_min(size, part_size);
2886 
2887         rc = nxt_unit_response_add_content(req, part_start, part_size);
2888         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2889             return -rc;
2890         }
2891 
2892         rc = nxt_unit_response_send(req);
2893         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2894             return -rc;
2895         }
2896 
2897         size -= part_size;
2898         part_start += part_size;
2899         sent += part_size;
2900 
2901         min_size -= nxt_min(min_size, part_size);
2902     }
2903 
2904     while (size > 0) {
2905         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2906         min_part_size = nxt_min(min_size, part_size);
2907         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2908 
2909         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2910                                        min_part_size, &mmap_buf, local_buf);
2911         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2912             return -rc;
2913         }
2914 
2915         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2916         if (nxt_slow_path(buf_size == 0)) {
2917             return sent;
2918         }
2919         part_size = nxt_min(buf_size, part_size);
2920 
2921         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2922                                        part_start, part_size);
2923 
2924         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2925         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2926             return -rc;
2927         }
2928 
2929         size -= part_size;
2930         part_start += part_size;
2931         sent += part_size;
2932 
2933         min_size -= nxt_min(min_size, part_size);
2934     }
2935 
2936     return sent;
2937 }
2938 
2939 
2940 int
2941 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2942     nxt_unit_read_info_t *read_info)
2943 {
2944     int                           rc;
2945     ssize_t                       n;
2946     uint32_t                      buf_size;
2947     nxt_unit_buf_t                *buf;
2948     nxt_unit_mmap_buf_t           mmap_buf;
2949     nxt_unit_request_info_impl_t  *req_impl;
2950     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2951 
2952     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2953 
2954     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2955         nxt_unit_req_alert(req, "write: response not initialized yet");
2956 
2957         return NXT_UNIT_ERROR;
2958     }
2959 
2960     /* Check if response is not send yet. */
2961     if (nxt_slow_path(req->response_buf != NULL)) {
2962 
2963         /* Enable content in headers buf. */
2964         rc = nxt_unit_response_add_content(req, "", 0);
2965         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2966             nxt_unit_req_error(req, "Failed to add piggyback content");
2967 
2968             return rc;
2969         }
2970 
2971         buf = req->response_buf;
2972 
2973         while (buf->end - buf->free > 0) {
2974             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2975             if (nxt_slow_path(n < 0)) {
2976                 nxt_unit_req_error(req, "Read error");
2977 
2978                 return NXT_UNIT_ERROR;
2979             }
2980 
2981             /* Manually increase sizes. */
2982             buf->free += n;
2983             req->response->piggyback_content_length += n;
2984 
2985             if (read_info->eof) {
2986                 break;
2987             }
2988         }
2989 
2990         rc = nxt_unit_response_send(req);
2991         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2992             nxt_unit_req_error(req, "Failed to send headers with content");
2993 
2994             return rc;
2995         }
2996 
2997         if (read_info->eof) {
2998             return NXT_UNIT_OK;
2999         }
3000     }
3001 
3002     while (!read_info->eof) {
3003         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3004                            read_info->buf_size);
3005 
3006         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3007 
3008         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3009                                        buf_size, buf_size,
3010                                        &mmap_buf, local_buf);
3011         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3012             return rc;
3013         }
3014 
3015         buf = &mmap_buf.buf;
3016 
3017         while (!read_info->eof && buf->end > buf->free) {
3018             n = read_info->read(read_info, buf->free, buf->end - buf->free);
3019             if (nxt_slow_path(n < 0)) {
3020                 nxt_unit_req_error(req, "Read error");
3021 
3022                 nxt_unit_free_outgoing_buf(&mmap_buf);
3023 
3024                 return NXT_UNIT_ERROR;
3025             }
3026 
3027             buf->free += n;
3028         }
3029 
3030         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3031         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3032             nxt_unit_req_error(req, "Failed to send content");
3033 
3034             return rc;
3035         }
3036     }
3037 
3038     return NXT_UNIT_OK;
3039 }
3040 
3041 
3042 ssize_t
3043 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3044 {
3045     ssize_t  buf_res, res;
3046 
3047     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3048                                 dst, size);
3049 
3050     if (buf_res < (ssize_t) size && req->content_fd != -1) {
3051         res = read(req->content_fd, dst, size);
3052         if (nxt_slow_path(res < 0)) {
3053             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3054                                strerror(errno), errno);
3055 
3056             return res;
3057         }
3058 
3059         if (res < (ssize_t) size) {
3060             nxt_unit_close(req->content_fd);
3061 
3062             req->content_fd = -1;
3063         }
3064 
3065         req->content_length -= res;
3066         size -= res;
3067 
3068         dst = nxt_pointer_to(dst, res);
3069 
3070     } else {
3071         res = 0;
3072     }
3073 
3074     return buf_res + res;
3075 }
3076 
3077 
3078 ssize_t
3079 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3080 {
3081     char                 *p;
3082     size_t               l_size, b_size;
3083     nxt_unit_buf_t       *b;
3084     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3085 
3086     if (req->content_length == 0) {
3087         return 0;
3088     }
3089 
3090     l_size = 0;
3091 
3092     b = req->content_buf;
3093 
3094     while (b != NULL) {
3095         b_size = b->end - b->free;
3096         p = memchr(b->free, '\n', b_size);
3097 
3098         if (p != NULL) {
3099             p++;
3100             l_size += p - b->free;
3101             break;
3102         }
3103 
3104         l_size += b_size;
3105 
3106         if (max_size <= l_size) {
3107             break;
3108         }
3109 
3110         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
3111         if (mmap_buf->next == NULL
3112             && req->content_fd != -1
3113             && l_size < req->content_length)
3114         {
3115             preread_buf = nxt_unit_request_preread(req, 16384);
3116             if (nxt_slow_path(preread_buf == NULL)) {
3117                 return -1;
3118             }
3119 
3120             nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
3121         }
3122 
3123         b = nxt_unit_buf_next(b);
3124     }
3125 
3126     return nxt_min(max_size, l_size);
3127 }
3128 
3129 
3130 static nxt_unit_mmap_buf_t *
3131 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
3132 {
3133     ssize_t              res;
3134     nxt_unit_mmap_buf_t  *mmap_buf;
3135 
3136     if (req->content_fd == -1) {
3137         nxt_unit_req_alert(req, "preread: content_fd == -1");
3138         return NULL;
3139     }
3140 
3141     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
3142     if (nxt_slow_path(mmap_buf == NULL)) {