xref: /unit/src/nxt_unit.c (revision 1998:c8790d2a89bb)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "nxt_main.h"
7 #include "nxt_port_memory_int.h"
8 #include "nxt_socket_msg.h"
9 #include "nxt_port_queue.h"
10 #include "nxt_app_queue.h"
11 
12 #include "nxt_unit.h"
13 #include "nxt_unit_request.h"
14 #include "nxt_unit_response.h"
15 #include "nxt_unit_websocket.h"
16 
17 #include "nxt_websocket.h"
18 
19 #if (NXT_HAVE_MEMFD_CREATE)
20 #include <linux/memfd.h>
21 #endif
22 
23 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
24 #define NXT_UNIT_LOCAL_BUF_SIZE  \
25     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
26 
27 enum {
28     NXT_QUIT_NORMAL   = 0,
29     NXT_QUIT_GRACEFUL = 1,
30 };
31 
32 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
33 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
34 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
35 typedef struct nxt_unit_process_s               nxt_unit_process_t;
36 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
37 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
38 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
39 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
40 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
41 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
42 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
43 
44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
46     nxt_unit_ctx_impl_t *ctx_impl, void *data);
47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
52     nxt_unit_mmap_buf_t *mmap_buf);
53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
54     nxt_unit_mmap_buf_t *mmap_buf);
55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
57     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
58     int *log_fd, uint32_t *stream, uint32_t *shm_limit,
59     uint32_t *request_limit);
60 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
61     int queue_fd);
62 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
63     nxt_unit_request_info_t **preq);
64 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
65     nxt_unit_recv_msg_t *recv_msg);
66 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
67 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
68     nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
69 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
70     nxt_unit_recv_msg_t *recv_msg);
71 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
72     nxt_unit_port_id_t *port_id);
73 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
74 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
75     nxt_unit_recv_msg_t *recv_msg);
76 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
77 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
78     nxt_unit_ctx_t *ctx);
79 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
80 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
81 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
82     nxt_unit_ctx_t *ctx);
83 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
84 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
85     nxt_unit_websocket_frame_impl_t *ws);
86 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
87 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
88 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
89     nxt_unit_mmap_buf_t *mmap_buf, int last);
90 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
91 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
92 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
94     nxt_unit_ctx_impl_t *ctx_impl);
95 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
96     nxt_unit_read_buf_t *rbuf);
97 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
98     nxt_unit_request_info_t *req, size_t size);
99 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
100     size_t size);
101 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
102     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
103 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
104 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
105 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
106 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
107     nxt_unit_port_t *port, int n);
108 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
109 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
110     int fd);
111 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
112     nxt_unit_port_t *port, uint32_t size,
113     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
114 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
115 
116 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
117     nxt_unit_ctx_impl_t *ctx_impl);
118 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
119 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
120 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
121 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
122 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
123     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
124     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
125 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
126     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
127 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
128 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
129     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
130 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
131 
132 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
133 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
134     pid_t pid, int remove);
135 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
136 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
137 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
138 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
139 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
140 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
141 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
142 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
145 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
146     nxt_unit_port_t *port);
147 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
148 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
149 
150 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
151     nxt_unit_port_t *port, int queue_fd);
152 
153 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
154 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
155 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
156     nxt_unit_port_t *port, void *queue);
157 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
158     nxt_queue_t *awaiting_req);
159 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
160     nxt_unit_port_id_t *port_id);
161 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
162     nxt_unit_port_id_t *port_id);
163 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
164 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
165     nxt_unit_process_t *process);
166 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
167 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
168 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
169     nxt_unit_port_t *port, const void *buf, size_t buf_size,
170     const nxt_send_oob_t *oob);
171 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
172     const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
173 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
174     nxt_unit_read_buf_t *rbuf);
175 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
176     nxt_unit_read_buf_t *src);
177 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
178     nxt_unit_read_buf_t *rbuf);
179 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
180     nxt_unit_read_buf_t *rbuf);
181 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
182     nxt_unit_read_buf_t *rbuf);
183 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
184     nxt_unit_read_buf_t *rbuf);
185 nxt_inline int nxt_unit_close(int fd);
186 static int nxt_unit_fd_blocking(int fd);
187 
188 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
189     nxt_unit_port_t *port);
190 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
191     nxt_unit_port_id_t *port_id, int remove);
192 
193 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
194     nxt_unit_request_info_t *req);
195 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
196     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
197 
198 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
199 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
200 static void nxt_unit_lvlhsh_free(void *data, void *p);
201 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
202 
203 
204 struct nxt_unit_mmap_buf_s {
205     nxt_unit_buf_t           buf;
206 
207     nxt_unit_mmap_buf_t      *next;
208     nxt_unit_mmap_buf_t      **prev;
209 
210     nxt_port_mmap_header_t   *hdr;
211     nxt_unit_request_info_t  *req;
212     nxt_unit_ctx_impl_t      *ctx_impl;
213     char                     *free_ptr;
214     char                     *plain_ptr;
215 };
216 
217 
218 struct nxt_unit_recv_msg_s {
219     uint32_t                 stream;
220     nxt_pid_t                pid;
221     nxt_port_id_t            reply_port;
222 
223     uint8_t                  last;      /* 1 bit */
224     uint8_t                  mmap;      /* 1 bit */
225 
226     void                     *start;
227     uint32_t                 size;
228 
229     int                      fd[2];
230 
231     nxt_unit_mmap_buf_t      *incoming_buf;
232 };
233 
234 
235 typedef enum {
236     NXT_UNIT_RS_START           = 0,
237     NXT_UNIT_RS_RESPONSE_INIT,
238     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
239     NXT_UNIT_RS_RESPONSE_SENT,
240     NXT_UNIT_RS_RELEASED,
241 } nxt_unit_req_state_t;
242 
243 
244 struct nxt_unit_request_info_impl_s {
245     nxt_unit_request_info_t  req;
246 
247     uint32_t                 stream;
248 
249     nxt_unit_mmap_buf_t      *outgoing_buf;
250     nxt_unit_mmap_buf_t      *incoming_buf;
251 
252     nxt_unit_req_state_t     state;
253     uint8_t                  websocket;
254     uint8_t                  in_hash;
255 
256     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
257     nxt_queue_link_t         link;
258     /*  for nxt_unit_port_impl_t.awaiting_req */
259     nxt_queue_link_t         port_wait_link;
260 
261     char                     extra_data[];
262 };
263 
264 
265 struct nxt_unit_websocket_frame_impl_s {
266     nxt_unit_websocket_frame_t  ws;
267 
268     nxt_unit_mmap_buf_t         *buf;
269 
270     nxt_queue_link_t            link;
271 
272     nxt_unit_ctx_impl_t         *ctx_impl;
273 };
274 
275 
276 struct nxt_unit_read_buf_s {
277     nxt_queue_link_t              link;
278     nxt_unit_ctx_impl_t           *ctx_impl;
279     ssize_t                       size;
280     nxt_recv_oob_t                oob;
281     char                          buf[16384];
282 };
283 
284 
285 struct nxt_unit_ctx_impl_s {
286     nxt_unit_ctx_t                ctx;
287 
288     nxt_atomic_t                  use_count;
289     nxt_atomic_t                  wait_items;
290 
291     pthread_mutex_t               mutex;
292 
293     nxt_unit_port_t               *read_port;
294 
295     nxt_queue_link_t              link;
296 
297     nxt_unit_mmap_buf_t           *free_buf;
298 
299     /*  of nxt_unit_request_info_impl_t */
300     nxt_queue_t                   free_req;
301 
302     /*  of nxt_unit_websocket_frame_impl_t */
303     nxt_queue_t                   free_ws;
304 
305     /*  of nxt_unit_request_info_impl_t */
306     nxt_queue_t                   active_req;
307 
308     /*  of nxt_unit_request_info_impl_t */
309     nxt_lvlhsh_t                  requests;
310 
311     /*  of nxt_unit_request_info_impl_t */
312     nxt_queue_t                   ready_req;
313 
314     /*  of nxt_unit_read_buf_t */
315     nxt_queue_t                   pending_rbuf;
316 
317     /*  of nxt_unit_read_buf_t */
318     nxt_queue_t                   free_rbuf;
319 
320     uint8_t                       online;       /* 1 bit */
321     uint8_t                       ready;        /* 1 bit */
322     uint8_t                       quit_param;
323 
324     nxt_unit_mmap_buf_t           ctx_buf[2];
325     nxt_unit_read_buf_t           ctx_read_buf;
326 
327     nxt_unit_request_info_impl_t  req;
328 };
329 
330 
331 struct nxt_unit_mmap_s {
332     nxt_port_mmap_header_t   *hdr;
333     pthread_t                src_thread;
334 
335     /*  of nxt_unit_read_buf_t */
336     nxt_queue_t              awaiting_rbuf;
337 };
338 
339 
340 struct nxt_unit_mmaps_s {
341     pthread_mutex_t          mutex;
342     uint32_t                 size;
343     uint32_t                 cap;
344     nxt_atomic_t             allocated_chunks;
345     nxt_unit_mmap_t          *elts;
346 };
347 
348 
349 struct nxt_unit_impl_s {
350     nxt_unit_t               unit;
351     nxt_unit_callbacks_t     callbacks;
352 
353     nxt_atomic_t             use_count;
354     nxt_atomic_t             request_count;
355 
356     uint32_t                 request_data_size;
357     uint32_t                 shm_mmap_limit;
358     uint32_t                 request_limit;
359 
360     pthread_mutex_t          mutex;
361 
362     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
363     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
364 
365     nxt_unit_port_t          *router_port;
366     nxt_unit_port_t          *shared_port;
367 
368     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
369 
370     nxt_unit_mmaps_t         incoming;
371     nxt_unit_mmaps_t         outgoing;
372 
373     pid_t                    pid;
374     int                      log_fd;
375 
376     nxt_unit_ctx_impl_t      main_ctx;
377 };
378 
379 
380 struct nxt_unit_port_impl_s {
381     nxt_unit_port_t          port;
382 
383     nxt_atomic_t             use_count;
384 
385     /*  for nxt_unit_process_t.ports */
386     nxt_queue_link_t         link;
387     nxt_unit_process_t       *process;
388 
389     /*  of nxt_unit_request_info_impl_t */
390     nxt_queue_t              awaiting_req;
391 
392     int                      ready;
393 
394     void                     *queue;
395 
396     int                      from_socket;
397     nxt_unit_read_buf_t      *socket_rbuf;
398 };
399 
400 
401 struct nxt_unit_process_s {
402     pid_t                    pid;
403 
404     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
405 
406     nxt_unit_impl_t          *lib;
407 
408     nxt_atomic_t             use_count;
409 
410     uint32_t                 next_port_id;
411 };
412 
413 
414 /* Explicitly using 32 bit types to avoid possible alignment. */
415 typedef struct {
416     int32_t   pid;
417     uint32_t  id;
418 } nxt_unit_port_hash_id_t;
419 
420 
421 static pid_t  nxt_unit_pid;
422 
423 
424 nxt_unit_ctx_t *
425 nxt_unit_init(nxt_unit_init_t *init)
426 {
427     int              rc, queue_fd;
428     void             *mem;
429     uint32_t         ready_stream, shm_limit, request_limit;
430     nxt_unit_ctx_t   *ctx;
431     nxt_unit_impl_t  *lib;
432     nxt_unit_port_t  ready_port, router_port, read_port;
433 
434     nxt_unit_pid = getpid();
435 
436     lib = nxt_unit_create(init);
437     if (nxt_slow_path(lib == NULL)) {
438         return NULL;
439     }
440 
441     queue_fd = -1;
442     mem = MAP_FAILED;
443 
444     if (init->ready_port.id.pid != 0
445         && init->ready_stream != 0
446         && init->read_port.id.pid != 0)
447     {
448         ready_port = init->ready_port;
449         ready_stream = init->ready_stream;
450         router_port = init->router_port;
451         read_port = init->read_port;
452         lib->log_fd = init->log_fd;
453 
454         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
455                               ready_port.id.id);
456         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
457                               router_port.id.id);
458         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
459                               read_port.id.id);
460 
461     } else {
462         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
463                                &lib->log_fd, &ready_stream, &shm_limit,
464                                &request_limit);
465         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
466             goto fail;
467         }
468 
469         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
470                                 / PORT_MMAP_DATA_SIZE;
471         lib->request_limit = request_limit;
472     }
473 
474     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
475         lib->shm_mmap_limit = 1;
476     }
477 
478     lib->pid = read_port.id.pid;
479     nxt_unit_pid = lib->pid;
480 
481     ctx = &lib->main_ctx.ctx;
482 
483     rc = nxt_unit_fd_blocking(router_port.out_fd);
484     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
485         goto fail;
486     }
487 
488     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
489     if (nxt_slow_path(lib->router_port == NULL)) {
490         nxt_unit_alert(NULL, "failed to add router_port");
491 
492         goto fail;
493     }
494 
495     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
496     if (nxt_slow_path(queue_fd == -1)) {
497         goto fail;
498     }
499 
500     mem = mmap(NULL, sizeof(nxt_port_queue_t),
501                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
502     if (nxt_slow_path(mem == MAP_FAILED)) {
503         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
504                        strerror(errno), errno);
505 
506         goto fail;
507     }
508 
509     nxt_port_queue_init(mem);
510 
511     rc = nxt_unit_fd_blocking(read_port.in_fd);
512     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
513         goto fail;
514     }
515 
516     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
517     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
518         nxt_unit_alert(NULL, "failed to add read_port");
519 
520         goto fail;
521     }
522 
523     rc = nxt_unit_fd_blocking(ready_port.out_fd);
524     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
525         goto fail;
526     }
527 
528     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
529     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
530         nxt_unit_alert(NULL, "failed to send READY message");
531 
532         goto fail;
533     }
534 
535     nxt_unit_close(ready_port.out_fd);
536     nxt_unit_close(queue_fd);
537 
538     return ctx;
539 
540 fail:
541 
542     if (mem != MAP_FAILED) {
543         munmap(mem, sizeof(nxt_port_queue_t));
544     }
545 
546     if (queue_fd != -1) {
547         nxt_unit_close(queue_fd);
548     }
549 
550     nxt_unit_ctx_release(&lib->main_ctx.ctx);
551 
552     return NULL;
553 }
554 
555 
556 static nxt_unit_impl_t *
557 nxt_unit_create(nxt_unit_init_t *init)
558 {
559     int                   rc;
560     nxt_unit_impl_t       *lib;
561     nxt_unit_callbacks_t  *cb;
562 
563     lib = nxt_unit_malloc(NULL,
564                           sizeof(nxt_unit_impl_t) + init->request_data_size);
565     if (nxt_slow_path(lib == NULL)) {
566         nxt_unit_alert(NULL, "failed to allocate unit struct");
567 
568         return NULL;
569     }
570 
571     rc = pthread_mutex_init(&lib->mutex, NULL);
572     if (nxt_slow_path(rc != 0)) {
573         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
574 
575         goto fail;
576     }
577 
578     lib->unit.data = init->data;
579     lib->callbacks = init->callbacks;
580 
581     lib->request_data_size = init->request_data_size;
582     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
583                             / PORT_MMAP_DATA_SIZE;
584     lib->request_limit = init->request_limit;
585 
586     lib->processes.slot = NULL;
587     lib->ports.slot = NULL;
588 
589     lib->log_fd = STDERR_FILENO;
590 
591     nxt_queue_init(&lib->contexts);
592 
593     lib->use_count = 0;
594     lib->request_count = 0;
595     lib->router_port = NULL;
596     lib->shared_port = NULL;
597 
598     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
599     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
600         pthread_mutex_destroy(&lib->mutex);
601         goto fail;
602     }
603 
604     cb = &lib->callbacks;
605 
606     if (cb->request_handler == NULL) {
607         nxt_unit_alert(NULL, "request_handler is NULL");
608 
609         pthread_mutex_destroy(&lib->mutex);
610         goto fail;
611     }
612 
613     nxt_unit_mmaps_init(&lib->incoming);
614     nxt_unit_mmaps_init(&lib->outgoing);
615 
616     return lib;
617 
618 fail:
619 
620     nxt_unit_free(NULL, lib);
621 
622     return NULL;
623 }
624 
625 
626 static int
627 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
628     void *data)
629 {
630     int  rc;
631 
632     ctx_impl->ctx.data = data;
633     ctx_impl->ctx.unit = &lib->unit;
634 
635     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
636     if (nxt_slow_path(rc != 0)) {
637         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
638 
639         return NXT_UNIT_ERROR;
640     }
641 
642     nxt_unit_lib_use(lib);
643 
644     pthread_mutex_lock(&lib->mutex);
645 
646     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
647 
648     pthread_mutex_unlock(&lib->mutex);
649 
650     ctx_impl->use_count = 1;
651     ctx_impl->wait_items = 0;
652     ctx_impl->online = 1;
653     ctx_impl->ready = 0;
654     ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
655 
656     nxt_queue_init(&ctx_impl->free_req);
657     nxt_queue_init(&ctx_impl->free_ws);
658     nxt_queue_init(&ctx_impl->active_req);
659     nxt_queue_init(&ctx_impl->ready_req);
660     nxt_queue_init(&ctx_impl->pending_rbuf);
661     nxt_queue_init(&ctx_impl->free_rbuf);
662 
663     ctx_impl->free_buf = NULL;
664     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
665     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
666 
667     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
668     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
669 
670     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
671 
672     ctx_impl->req.req.ctx = &ctx_impl->ctx;
673     ctx_impl->req.req.unit = &lib->unit;
674 
675     ctx_impl->read_port = NULL;
676     ctx_impl->requests.slot = 0;
677 
678     return NXT_UNIT_OK;
679 }
680 
681 
682 nxt_inline void
683 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
684 {
685     nxt_unit_ctx_impl_t  *ctx_impl;
686 
687     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
688 
689     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
690 }
691 
692 
693 nxt_inline void
694 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
695 {
696     long                 c;
697     nxt_unit_ctx_impl_t  *ctx_impl;
698 
699     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
700 
701     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
702 
703     if (c == 1) {
704         nxt_unit_ctx_free(ctx_impl);
705     }
706 }
707 
708 
709 nxt_inline void
710 nxt_unit_lib_use(nxt_unit_impl_t *lib)
711 {
712     nxt_atomic_fetch_add(&lib->use_count, 1);
713 }
714 
715 
716 nxt_inline void
717 nxt_unit_lib_release(nxt_unit_impl_t *lib)
718 {
719     long                c;
720     nxt_unit_process_t  *process;
721 
722     c = nxt_atomic_fetch_add(&lib->use_count, -1);
723 
724     if (c == 1) {
725         for ( ;; ) {
726             pthread_mutex_lock(&lib->mutex);
727 
728             process = nxt_unit_process_pop_first(lib);
729             if (process == NULL) {
730                 pthread_mutex_unlock(&lib->mutex);
731 
732                 break;
733             }
734 
735             nxt_unit_remove_process(lib, process);
736         }
737 
738         pthread_mutex_destroy(&lib->mutex);
739 
740         if (nxt_fast_path(lib->router_port != NULL)) {
741             nxt_unit_port_release(lib->router_port);
742         }
743 
744         if (nxt_fast_path(lib->shared_port != NULL)) {
745             nxt_unit_port_release(lib->shared_port);
746         }
747 
748         nxt_unit_mmaps_destroy(&lib->incoming);
749         nxt_unit_mmaps_destroy(&lib->outgoing);
750 
751         nxt_unit_free(NULL, lib);
752     }
753 }
754 
755 
756 nxt_inline void
757 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
758     nxt_unit_mmap_buf_t *mmap_buf)
759 {
760     mmap_buf->next = *head;
761 
762     if (mmap_buf->next != NULL) {
763         mmap_buf->next->prev = &mmap_buf->next;
764     }
765 
766     *head = mmap_buf;
767     mmap_buf->prev = head;
768 }
769 
770 
771 nxt_inline void
772 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
773     nxt_unit_mmap_buf_t *mmap_buf)
774 {
775     while (*prev != NULL) {
776         prev = &(*prev)->next;
777     }
778 
779     nxt_unit_mmap_buf_insert(prev, mmap_buf);
780 }
781 
782 
783 nxt_inline void
784 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
785 {
786     nxt_unit_mmap_buf_t  **prev;
787 
788     prev = mmap_buf->prev;
789 
790     if (mmap_buf->next != NULL) {
791         mmap_buf->next->prev = prev;
792     }
793 
794     if (prev != NULL) {
795         *prev = mmap_buf->next;
796     }
797 }
798 
799 
800 static int
801 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
802     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
803     uint32_t *shm_limit, uint32_t *request_limit)
804 {
805     int       rc;
806     int       ready_fd, router_fd, read_in_fd, read_out_fd;
807     char      *unit_init, *version_end, *vars;
808     size_t    version_length;
809     int64_t   ready_pid, router_pid, read_pid;
810     uint32_t  ready_stream, router_id, ready_id, read_id;
811 
812     unit_init = getenv(NXT_UNIT_INIT_ENV);
813     if (nxt_slow_path(unit_init == NULL)) {
814         nxt_unit_alert(NULL, "%s is not in the current environment",
815                        NXT_UNIT_INIT_ENV);
816 
817         return NXT_UNIT_ERROR;
818     }
819 
820     version_end = strchr(unit_init, ';');
821     if (nxt_slow_path(version_end == NULL)) {
822         nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
823                        NXT_UNIT_INIT_ENV, unit_init);
824 
825         return NXT_UNIT_ERROR;
826     }
827 
828     version_length = version_end - unit_init;
829 
830     rc = version_length != nxt_length(NXT_VERSION)
831          || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
832 
833     if (nxt_slow_path(rc != 0)) {
834         nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
835                        "%.*s, while the app was compiled with libunit %s",
836                        (int) version_length, unit_init, NXT_VERSION);
837 
838         return NXT_UNIT_ERROR;
839     }
840 
841     vars = version_end + 1;
842 
843     rc = sscanf(vars,
844                 "%"PRIu32";"
845                 "%"PRId64",%"PRIu32",%d;"
846                 "%"PRId64",%"PRIu32",%d;"
847                 "%"PRId64",%"PRIu32",%d,%d;"
848                 "%d,%"PRIu32",%"PRIu32,
849                 &ready_stream,
850                 &ready_pid, &ready_id, &ready_fd,
851                 &router_pid, &router_id, &router_fd,
852                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
853                 log_fd, shm_limit, request_limit);
854 
855     if (nxt_slow_path(rc == EOF)) {
856         nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
857                        vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
858 
859         return NXT_UNIT_ERROR;
860     }
861 
862     if (nxt_slow_path(rc != 14)) {
863         nxt_unit_alert(NULL, "invalid number of variables in %s env: "
864                        "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars);
865 
866         return NXT_UNIT_ERROR;
867     }
868 
869     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
870 
871     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
872 
873     ready_port->in_fd = -1;
874     ready_port->out_fd = ready_fd;
875     ready_port->data = NULL;
876 
877     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
878 
879     router_port->in_fd = -1;
880     router_port->out_fd = router_fd;
881     router_port->data = NULL;
882 
883     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
884 
885     read_port->in_fd = read_in_fd;
886     read_port->out_fd = read_out_fd;
887     read_port->data = NULL;
888 
889     *stream = ready_stream;
890 
891     return NXT_UNIT_OK;
892 }
893 
894 
895 static int
896 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
897 {
898     ssize_t          res;
899     nxt_send_oob_t   oob;
900     nxt_port_msg_t   msg;
901     nxt_unit_impl_t  *lib;
902     int              fds[2] = {queue_fd, -1};
903 
904     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
905 
906     msg.stream = stream;
907     msg.pid = lib->pid;
908     msg.reply_port = 0;
909     msg.type = _NXT_PORT_MSG_PROCESS_READY;
910     msg.last = 1;
911     msg.mmap = 0;
912     msg.nf = 0;
913     msg.mf = 0;
914     msg.tracking = 0;
915 
916     nxt_socket_msg_oob_init(&oob, fds);
917 
918     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
919     if (res != sizeof(msg)) {
920         return NXT_UNIT_ERROR;
921     }
922 
923     return NXT_UNIT_OK;
924 }
925 
926 
927 static int
928 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
929     nxt_unit_request_info_t **preq)
930 {
931     int                  rc;
932     pid_t                pid;
933     uint8_t              quit_param;
934     nxt_port_msg_t       *port_msg;
935     nxt_unit_impl_t      *lib;
936     nxt_unit_recv_msg_t  recv_msg;
937 
938     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
939 
940     recv_msg.fd[0] = -1;
941     recv_msg.fd[1] = -1;
942     port_msg = (nxt_port_msg_t *) rbuf->buf;
943 
944     rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
945     if (nxt_slow_path(rc != NXT_OK)) {
946         nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
947         rc = NXT_UNIT_ERROR;
948         goto done;
949     }
950 
951     recv_msg.incoming_buf = NULL;
952 
953     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
954         if (nxt_slow_path(rbuf->size == 0)) {
955             nxt_unit_debug(ctx, "read port closed");
956 
957             nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
958             rc = NXT_UNIT_OK;
959             goto done;
960         }
961 
962         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
963 
964         rc = NXT_UNIT_ERROR;
965         goto done;
966     }
967 
968     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
969                    port_msg->stream, (int) port_msg->type,
970                    recv_msg.fd[0], recv_msg.fd[1]);
971 
972     recv_msg.stream = port_msg->stream;
973     recv_msg.pid = port_msg->pid;
974     recv_msg.reply_port = port_msg->reply_port;
975     recv_msg.last = port_msg->last;
976     recv_msg.mmap = port_msg->mmap;
977 
978     recv_msg.start = port_msg + 1;
979     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
980 
981     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
982         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
983                        port_msg->stream, (int) port_msg->type);
984         rc = NXT_UNIT_ERROR;
985         goto done;
986     }
987 
988     /* Fragmentation is unsupported. */
989     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
990         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
991                        port_msg->stream, (int) port_msg->type);
992         rc = NXT_UNIT_ERROR;
993         goto done;
994     }
995 
996     if (port_msg->mmap) {
997         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
998 
999         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1000             if (rc == NXT_UNIT_AGAIN) {
1001                 recv_msg.fd[0] = -1;
1002                 recv_msg.fd[1] = -1;
1003             }
1004 
1005             goto done;
1006         }
1007     }
1008 
1009     switch (port_msg->type) {
1010 
1011     case _NXT_PORT_MSG_RPC_READY:
1012         rc = NXT_UNIT_OK;
1013         break;
1014 
1015     case _NXT_PORT_MSG_QUIT:
1016         if (recv_msg.size == sizeof(quit_param)) {
1017             memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1018 
1019         } else {
1020             quit_param = NXT_QUIT_NORMAL;
1021         }
1022 
1023         nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1024                        (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1025 
1026         nxt_unit_quit(ctx, quit_param);
1027 
1028         rc = NXT_UNIT_OK;
1029         break;
1030 
1031     case _NXT_PORT_MSG_NEW_PORT:
1032         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1033         break;
1034 
1035     case _NXT_PORT_MSG_PORT_ACK:
1036         rc = nxt_unit_ctx_ready(ctx);
1037         break;
1038 
1039     case _NXT_PORT_MSG_CHANGE_FILE:
1040         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1041                        port_msg->stream, recv_msg.fd[0]);
1042 
1043         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1044             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1045                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1046                            strerror(errno), errno);
1047 
1048             rc = NXT_UNIT_ERROR;
1049             goto done;
1050         }
1051 
1052         rc = NXT_UNIT_OK;
1053         break;
1054 
1055     case _NXT_PORT_MSG_MMAP:
1056         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1057             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1058                            port_msg->stream, recv_msg.fd[0]);
1059 
1060             rc = NXT_UNIT_ERROR;
1061             goto done;
1062         }
1063 
1064         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1065         break;
1066 
1067     case _NXT_PORT_MSG_REQ_HEADERS:
1068         rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1069         break;
1070 
1071     case _NXT_PORT_MSG_REQ_BODY:
1072         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1073         break;
1074 
1075     case _NXT_PORT_MSG_WEBSOCKET:
1076         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1077         break;
1078 
1079     case _NXT_PORT_MSG_REMOVE_PID:
1080         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1081             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1082                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1083                            (int) sizeof(pid));
1084 
1085             rc = NXT_UNIT_ERROR;
1086             goto done;
1087         }
1088 
1089         memcpy(&pid, recv_msg.start, sizeof(pid));
1090 
1091         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1092                        port_msg->stream, (int) pid);
1093 
1094         nxt_unit_remove_pid(lib, pid);
1095 
1096         rc = NXT_UNIT_OK;
1097         break;
1098 
1099     case _NXT_PORT_MSG_SHM_ACK:
1100         rc = nxt_unit_process_shm_ack(ctx);
1101         break;
1102 
1103     default:
1104         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1105                        port_msg->stream, (int) port_msg->type);
1106 
1107         rc = NXT_UNIT_ERROR;
1108         goto done;
1109     }
1110 
1111 done:
1112 
1113     if (recv_msg.fd[0] != -1) {
1114         nxt_unit_close(recv_msg.fd[0]);
1115     }
1116 
1117     if (recv_msg.fd[1] != -1) {
1118         nxt_unit_close(recv_msg.fd[1]);
1119     }
1120 
1121     while (recv_msg.incoming_buf != NULL) {
1122         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1123     }
1124 
1125     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1126 #if (NXT_DEBUG)
1127         memset(rbuf->buf, 0xAC, rbuf->size);
1128 #endif
1129         nxt_unit_read_buf_release(ctx, rbuf);
1130     }
1131 
1132     return rc;
1133 }
1134 
1135 
1136 static int
1137 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1138 {
1139     void                     *mem;
1140     nxt_unit_impl_t          *lib;
1141     nxt_unit_port_t          new_port, *port;
1142     nxt_port_msg_new_port_t  *new_port_msg;
1143 
1144     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1145         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1146                       "invalid message size (%d)",
1147                       recv_msg->stream, (int) recv_msg->size);
1148 
1149         return NXT_UNIT_ERROR;
1150     }
1151 
1152     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1153         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1154                        recv_msg->stream, recv_msg->fd[0]);
1155 
1156         return NXT_UNIT_ERROR;
1157     }
1158 
1159     new_port_msg = recv_msg->start;
1160 
1161     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1162                    recv_msg->stream, (int) new_port_msg->pid,
1163                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1164 
1165     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1166 
1167     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1168         nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1169 
1170         new_port.in_fd = recv_msg->fd[0];
1171         new_port.out_fd = -1;
1172 
1173         mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
1174                    MAP_SHARED, recv_msg->fd[1], 0);
1175 
1176     } else {
1177         if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
1178                           != NXT_UNIT_OK))
1179         {
1180             return NXT_UNIT_ERROR;
1181         }
1182 
1183         nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1184                               new_port_msg->id);
1185 
1186         new_port.in_fd = -1;
1187         new_port.out_fd = recv_msg->fd[0];
1188 
1189         mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1190                    MAP_SHARED, recv_msg->fd[1], 0);
1191     }
1192 
1193     if (nxt_slow_path(mem == MAP_FAILED)) {
1194         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1195                        strerror(errno), errno);
1196 
1197         return NXT_UNIT_ERROR;
1198     }
1199 
1200     new_port.data = NULL;
1201 
1202     recv_msg->fd[0] = -1;
1203 
1204     port = nxt_unit_add_port(ctx, &new_port, mem);
1205     if (nxt_slow_path(port == NULL)) {
1206         return NXT_UNIT_ERROR;
1207     }
1208 
1209     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1210         lib->shared_port = port;
1211 
1212         return nxt_unit_ctx_ready(ctx);
1213     }
1214 
1215     nxt_unit_port_release(port);
1216 
1217     return NXT_UNIT_OK;
1218 }
1219 
1220 
1221 static int
1222 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1223 {
1224     nxt_unit_impl_t      *lib;
1225     nxt_unit_ctx_impl_t  *ctx_impl;
1226 
1227     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1228 
1229     if (nxt_slow_path(ctx_impl->ready)) {
1230         return NXT_UNIT_OK;
1231     }
1232 
1233     ctx_impl->ready = 1;
1234 
1235     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1236 
1237     /* Call ready_handler() only for main context. */
1238     if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1239         return lib->callbacks.ready_handler(ctx);
1240     }
1241 
1242     if (&lib->main_ctx != ctx_impl) {
1243         /* Check if the main context is already stopped or quit. */
1244         if (nxt_slow_path(!lib->main_ctx.ready)) {
1245             ctx_impl->ready = 0;
1246 
1247             nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1248 
1249             return NXT_UNIT_OK;
1250         }
1251 
1252         if (lib->callbacks.add_port != NULL) {
1253             lib->callbacks.add_port(ctx, lib->shared_port);
1254         }
1255     }
1256 
1257     return NXT_UNIT_OK;
1258 }
1259 
1260 
1261 static int
1262 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1263     nxt_unit_request_info_t **preq)
1264 {
1265     int                           res;
1266     nxt_unit_impl_t               *lib;
1267     nxt_unit_port_id_t            port_id;
1268     nxt_unit_request_t            *r;
1269     nxt_unit_mmap_buf_t           *b;
1270     nxt_unit_request_info_t       *req;
1271     nxt_unit_request_info_impl_t  *req_impl;
1272 
1273     if (nxt_slow_path(recv_msg->mmap == 0)) {
1274         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1275                       recv_msg->stream);
1276 
1277         return NXT_UNIT_ERROR;
1278     }
1279 
1280     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1281         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1282                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1283                       (int) sizeof(nxt_unit_request_t));
1284 
1285         return NXT_UNIT_ERROR;
1286     }
1287 
1288     req_impl = nxt_unit_request_info_get(ctx);
1289     if (nxt_slow_path(req_impl == NULL)) {
1290         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1291                       recv_msg->stream);
1292 
1293         return NXT_UNIT_ERROR;
1294     }
1295 
1296     req = &req_impl->req;
1297 
1298     req->request = recv_msg->start;
1299 
1300     b = recv_msg->incoming_buf;
1301 
1302     req->request_buf = &b->buf;
1303     req->response = NULL;
1304     req->response_buf = NULL;
1305 
1306     r = req->request;
1307 
1308     req->content_length = r->content_length;
1309 
1310     req->content_buf = req->request_buf;
1311     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1312 
1313     req_impl->stream = recv_msg->stream;
1314 
1315     req_impl->outgoing_buf = NULL;
1316 
1317     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1318         b->req = req;
1319     }
1320 
1321     /* "Move" incoming buffer list to req_impl. */
1322     req_impl->incoming_buf = recv_msg->incoming_buf;
1323     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1324     recv_msg->incoming_buf = NULL;
1325 
1326     req->content_fd = recv_msg->fd[0];
1327     recv_msg->fd[0] = -1;
1328 
1329     req->response_max_fields = 0;
1330     req_impl->state = NXT_UNIT_RS_START;
1331     req_impl->websocket = 0;
1332     req_impl->in_hash = 0;
1333 
1334     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1335                    (int) r->method_length,
1336                    (char *) nxt_unit_sptr_get(&r->method),
1337                    (int) r->target_length,
1338                    (char *) nxt_unit_sptr_get(&r->target),
1339                    (int) r->content_length);
1340 
1341     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1342 
1343     res = nxt_unit_request_check_response_port(req, &port_id);
1344     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1345         return NXT_UNIT_ERROR;
1346     }
1347 
1348     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1349         res = nxt_unit_send_req_headers_ack(req);
1350         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1351             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1352 
1353             return NXT_UNIT_ERROR;
1354         }
1355 
1356         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1357 
1358         if (req->content_length
1359             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1360         {
1361             res = nxt_unit_request_hash_add(ctx, req);
1362             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1363                 nxt_unit_req_warn(req, "failed to add request to hash");
1364 
1365                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1366 
1367                 return NXT_UNIT_ERROR;
1368             }
1369 
1370             /*
1371              * If application have separate data handler, we may start
1372              * request processing and process data when it is arrived.
1373              */
1374             if (lib->callbacks.data_handler == NULL) {
1375                 return NXT_UNIT_OK;
1376             }
1377         }
1378 
1379         if (preq == NULL) {
1380             lib->callbacks.request_handler(req);
1381 
1382         } else {
1383             *preq = req;
1384         }
1385     }
1386 
1387     return NXT_UNIT_OK;
1388 }
1389 
1390 
1391 static int
1392 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1393 {
1394     uint64_t                 l;
1395     nxt_unit_impl_t          *lib;
1396     nxt_unit_mmap_buf_t      *b;
1397     nxt_unit_request_info_t  *req;
1398 
1399     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1400     if (req == NULL) {
1401         return NXT_UNIT_OK;
1402     }
1403 
1404     l = req->content_buf->end - req->content_buf->free;
1405 
1406     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1407         b->req = req;
1408         l += b->buf.end - b->buf.free;
1409     }
1410 
1411     if (recv_msg->incoming_buf != NULL) {
1412         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1413 
1414         while (b->next != NULL) {
1415             b = b->next;
1416         }
1417 
1418         /* "Move" incoming buffer list to req_impl. */
1419         b->next = recv_msg->incoming_buf;
1420         b->next->prev = &b->next;
1421 
1422         recv_msg->incoming_buf = NULL;
1423     }
1424 
1425     req->content_fd = recv_msg->fd[0];
1426     recv_msg->fd[0] = -1;
1427 
1428     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1429 
1430     if (lib->callbacks.data_handler != NULL) {
1431         lib->callbacks.data_handler(req);
1432 
1433         return NXT_UNIT_OK;
1434     }
1435 
1436     if (req->content_fd != -1 || l == req->content_length) {
1437         lib->callbacks.request_handler(req);
1438     }
1439 
1440     return NXT_UNIT_OK;
1441 }
1442 
1443 
1444 static int
1445 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1446     nxt_unit_port_id_t *port_id)
1447 {
1448     int                           res;
1449     nxt_unit_ctx_t                *ctx;
1450     nxt_unit_impl_t               *lib;
1451     nxt_unit_port_t               *port;
1452     nxt_unit_process_t            *process;
1453     nxt_unit_ctx_impl_t           *ctx_impl;
1454     nxt_unit_port_impl_t          *port_impl;
1455     nxt_unit_request_info_impl_t  *req_impl;
1456 
1457     ctx = req->ctx;
1458     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1459     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1460 
1461     pthread_mutex_lock(&lib->mutex);
1462 
1463     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1464     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1465 
1466     if (nxt_fast_path(port != NULL)) {
1467         req->response_port = port;
1468 
1469         if (nxt_fast_path(port_impl->ready)) {
1470             pthread_mutex_unlock(&lib->mutex);
1471 
1472             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1473                            (int) port->id.pid, (int) port->id.id);
1474 
1475             return NXT_UNIT_OK;
1476         }
1477 
1478         nxt_unit_debug(ctx, "check_response_port: "
1479                        "port{%d,%d} already requested",
1480                        (int) port->id.pid, (int) port->id.id);
1481 
1482         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1483 
1484         nxt_queue_insert_tail(&port_impl->awaiting_req,
1485                               &req_impl->port_wait_link);
1486 
1487         pthread_mutex_unlock(&lib->mutex);
1488 
1489         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1490 
1491         return NXT_UNIT_AGAIN;
1492     }
1493 
1494     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1495     if (nxt_slow_path(port_impl == NULL)) {
1496         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1497                        (int) sizeof(nxt_unit_port_impl_t));
1498 
1499         pthread_mutex_unlock(&lib->mutex);
1500 
1501         return NXT_UNIT_ERROR;
1502     }
1503 
1504     port = &port_impl->port;
1505 
1506     port->id = *port_id;
1507     port->in_fd = -1;
1508     port->out_fd = -1;
1509     port->data = NULL;
1510 
1511     res = nxt_unit_port_hash_add(&lib->ports, port);
1512     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1513         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1514                        port->id.pid, port->id.id);
1515 
1516         pthread_mutex_unlock(&lib->mutex);
1517 
1518         nxt_unit_free(ctx, port);
1519 
1520         return NXT_UNIT_ERROR;
1521     }
1522 
1523     process = nxt_unit_process_find(lib, port_id->pid, 0);
1524     if (nxt_slow_path(process == NULL)) {
1525         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1526                        port->id.pid);
1527 
1528         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1529 
1530         pthread_mutex_unlock(&lib->mutex);
1531 
1532         nxt_unit_free(ctx, port);
1533 
1534         return NXT_UNIT_ERROR;
1535     }
1536 
1537     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1538 
1539     port_impl->process = process;
1540     port_impl->queue = NULL;
1541     port_impl->from_socket = 0;
1542     port_impl->socket_rbuf = NULL;
1543 
1544     nxt_queue_init(&port_impl->awaiting_req);
1545 
1546     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1547 
1548     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1549 
1550     port_impl->use_count = 2;
1551     port_impl->ready = 0;
1552 
1553     req->response_port = port;
1554 
1555     pthread_mutex_unlock(&lib->mutex);
1556 
1557     res = nxt_unit_get_port(ctx, port_id);
1558     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1559         return NXT_UNIT_ERROR;
1560     }
1561 
1562     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1563 
1564     return NXT_UNIT_AGAIN;
1565 }
1566 
1567 
1568 static int
1569 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1570 {
1571     ssize_t                       res;
1572     nxt_port_msg_t                msg;
1573     nxt_unit_impl_t               *lib;
1574     nxt_unit_ctx_impl_t           *ctx_impl;
1575     nxt_unit_request_info_impl_t  *req_impl;
1576 
1577     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1578     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1579     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1580 
1581     memset(&msg, 0, sizeof(nxt_port_msg_t));
1582 
1583     msg.stream = req_impl->stream;
1584     msg.pid = lib->pid;
1585     msg.reply_port = ctx_impl->read_port->id.id;
1586     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1587 
1588     res = nxt_unit_port_send(req->ctx, req->response_port,
1589                              &msg, sizeof(msg), NULL);
1590     if (nxt_slow_path(res != sizeof(msg))) {
1591         return NXT_UNIT_ERROR;
1592     }
1593 
1594     return NXT_UNIT_OK;
1595 }
1596 
1597 
1598 static int
1599 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1600 {
1601     size_t                           hsize;
1602     nxt_unit_impl_t                  *lib;
1603     nxt_unit_mmap_buf_t              *b;
1604     nxt_unit_callbacks_t             *cb;
1605     nxt_unit_request_info_t          *req;
1606     nxt_unit_request_info_impl_t     *req_impl;
1607     nxt_unit_websocket_frame_impl_t  *ws_impl;
1608 
1609     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1610     if (nxt_slow_path(req == NULL)) {
1611         return NXT_UNIT_OK;
1612     }
1613 
1614     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1615 
1616     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1617     cb = &lib->callbacks;
1618 
1619     if (cb->websocket_handler && recv_msg->size >= 2) {
1620         ws_impl = nxt_unit_websocket_frame_get(ctx);
1621         if (nxt_slow_path(ws_impl == NULL)) {
1622             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1623                           req_impl->stream);
1624 
1625             return NXT_UNIT_ERROR;
1626         }
1627 
1628         ws_impl->ws.req = req;
1629 
1630         ws_impl->buf = NULL;
1631 
1632         if (recv_msg->mmap) {
1633             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1634                 b->req = req;
1635             }
1636 
1637             /* "Move" incoming buffer list to ws_impl. */
1638             ws_impl->buf = recv_msg->incoming_buf;
1639             ws_impl->buf->prev = &ws_impl->buf;
1640             recv_msg->incoming_buf = NULL;
1641 
1642             b = ws_impl->buf;
1643 
1644         } else {
1645             b = nxt_unit_mmap_buf_get(ctx);
1646             if (nxt_slow_path(b == NULL)) {
1647                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1648                                req_impl->stream);
1649 
1650                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1651 
1652                 return NXT_UNIT_ERROR;
1653             }
1654 
1655             b->req = req;
1656             b->buf.start = recv_msg->start;
1657             b->buf.free = b->buf.start;
1658             b->buf.end = b->buf.start + recv_msg->size;
1659 
1660             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1661         }
1662 
1663         ws_impl->ws.header = (void *) b->buf.start;
1664         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1665             ws_impl->ws.header);
1666 
1667         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1668 
1669         if (ws_impl->ws.header->mask) {
1670             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1671 
1672         } else {
1673             ws_impl->ws.mask = NULL;
1674         }
1675 
1676         b->buf.free += hsize;
1677 
1678         ws_impl->ws.content_buf = &b->buf;
1679         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1680 
1681         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1682                            "payload_len=%"PRIu64,
1683                             ws_impl->ws.header->opcode,
1684                             ws_impl->ws.payload_len);
1685 
1686         cb->websocket_handler(&ws_impl->ws);
1687     }
1688 
1689     if (recv_msg->last) {
1690         if (cb->close_handler) {
1691             nxt_unit_req_debug(req, "close_handler");
1692 
1693             cb->close_handler(req);
1694 
1695         } else {
1696             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1697         }
1698     }
1699 
1700     return NXT_UNIT_OK;
1701 }
1702 
1703 
1704 static int
1705 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1706 {
1707     nxt_unit_impl_t       *lib;
1708     nxt_unit_callbacks_t  *cb;
1709 
1710     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1711     cb = &lib->callbacks;
1712 
1713     if (cb->shm_ack_handler != NULL) {
1714         cb->shm_ack_handler(ctx);
1715     }
1716 
1717     return NXT_UNIT_OK;
1718 }
1719 
1720 
1721 static nxt_unit_request_info_impl_t *
1722 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1723 {
1724     nxt_unit_impl_t               *lib;
1725     nxt_queue_link_t              *lnk;
1726     nxt_unit_ctx_impl_t           *ctx_impl;
1727     nxt_unit_request_info_impl_t  *req_impl;
1728 
1729     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1730 
1731     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1732 
1733     pthread_mutex_lock(&ctx_impl->mutex);
1734 
1735     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1736         pthread_mutex_unlock(&ctx_impl->mutex);
1737 
1738         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1739                                         + lib->request_data_size);
1740         if (nxt_slow_path(req_impl == NULL)) {
1741             return NULL;
1742         }
1743 
1744         req_impl->req.unit = ctx->unit;
1745         req_impl->req.ctx = ctx;
1746 
1747         pthread_mutex_lock(&ctx_impl->mutex);
1748 
1749     } else {
1750         lnk = nxt_queue_first(&ctx_impl->free_req);
1751         nxt_queue_remove(lnk);
1752 
1753         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1754     }
1755 
1756     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1757 
1758     pthread_mutex_unlock(&ctx_impl->mutex);
1759 
1760     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1761 
1762     return req_impl;
1763 }
1764 
1765 
1766 static void
1767 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1768 {
1769     nxt_unit_ctx_t                *ctx;
1770     nxt_unit_ctx_impl_t           *ctx_impl;
1771     nxt_unit_request_info_impl_t  *req_impl;
1772 
1773     ctx = req->ctx;
1774     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1775     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1776 
1777     req->response = NULL;
1778     req->response_buf = NULL;
1779 
1780     if (req_impl->in_hash) {
1781         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1782     }
1783 
1784     while (req_impl->outgoing_buf != NULL) {
1785         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1786     }
1787 
1788     while (req_impl->incoming_buf != NULL) {
1789         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1790     }
1791 
1792     if (req->content_fd != -1) {
1793         nxt_unit_close(req->content_fd);
1794 
1795         req->content_fd = -1;
1796     }
1797 
1798     if (req->response_port != NULL) {
1799         nxt_unit_port_release(req->response_port);
1800 
1801         req->response_port = NULL;
1802     }
1803 
1804     req_impl->state = NXT_UNIT_RS_RELEASED;
1805 
1806     pthread_mutex_lock(&ctx_impl->mutex);
1807 
1808     nxt_queue_remove(&req_impl->link);
1809 
1810     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1811 
1812     pthread_mutex_unlock(&ctx_impl->mutex);
1813 
1814     if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1815         nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1816     }
1817 }
1818 
1819 
1820 static void
1821 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1822 {
1823     nxt_unit_ctx_impl_t  *ctx_impl;
1824 
1825     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1826 
1827     nxt_queue_remove(&req_impl->link);
1828 
1829     if (req_impl != &ctx_impl->req) {
1830         nxt_unit_free(&ctx_impl->ctx, req_impl);
1831     }
1832 }
1833 
1834 
1835 static nxt_unit_websocket_frame_impl_t *
1836 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1837 {
1838     nxt_queue_link_t                 *lnk;
1839     nxt_unit_ctx_impl_t              *ctx_impl;
1840     nxt_unit_websocket_frame_impl_t  *ws_impl;
1841 
1842     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1843 
1844     pthread_mutex_lock(&ctx_impl->mutex);
1845 
1846     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1847         pthread_mutex_unlock(&ctx_impl->mutex);
1848 
1849         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1850         if (nxt_slow_path(ws_impl == NULL)) {
1851             return NULL;
1852         }
1853 
1854     } else {
1855         lnk = nxt_queue_first(&ctx_impl->free_ws);
1856         nxt_queue_remove(lnk);
1857 
1858         pthread_mutex_unlock(&ctx_impl->mutex);
1859 
1860         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1861     }
1862 
1863     ws_impl->ctx_impl = ctx_impl;
1864 
1865     return ws_impl;
1866 }
1867 
1868 
1869 static void
1870 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1871 {
1872     nxt_unit_websocket_frame_impl_t  *ws_impl;
1873 
1874     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1875 
1876     while (ws_impl->buf != NULL) {
1877         nxt_unit_mmap_buf_free(ws_impl->buf);
1878     }
1879 
1880     ws->req = NULL;
1881 
1882     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1883 
1884     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1885 
1886     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1887 }
1888 
1889 
1890 static void
1891 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1892     nxt_unit_websocket_frame_impl_t *ws_impl)
1893 {
1894     nxt_queue_remove(&ws_impl->link);
1895 
1896     nxt_unit_free(ctx, ws_impl);
1897 }
1898 
1899 
1900 uint16_t
1901 nxt_unit_field_hash(const char *name, size_t name_length)
1902 {
1903     u_char      ch;
1904     uint32_t    hash;
1905     const char  *p, *end;
1906 
1907     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1908     end = name + name_length;
1909 
1910     for (p = name; p < end; p++) {
1911         ch = *p;
1912         hash = (hash << 4) + hash + nxt_lowcase(ch);
1913     }
1914 
1915     hash = (hash >> 16) ^ hash;
1916 
1917     return hash;
1918 }
1919 
1920 
1921 void
1922 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1923 {
1924     char                *name;
1925     uint32_t            i, j;
1926     nxt_unit_field_t    *fields, f;
1927     nxt_unit_request_t  *r;
1928 
1929     static nxt_str_t  content_length = nxt_string("content-length");
1930     static nxt_str_t  content_type = nxt_string("content-type");
1931     static nxt_str_t  cookie = nxt_string("cookie");
1932 
1933     nxt_unit_req_debug(req, "group_dup_fields");
1934 
1935     r = req->request;
1936     fields = r->fields;
1937 
1938     for (i = 0; i < r->fields_count; i++) {
1939         name = nxt_unit_sptr_get(&fields[i].name);
1940 
1941         switch (fields[i].hash) {
1942         case NXT_UNIT_HASH_CONTENT_LENGTH:
1943             if (fields[i].name_length == content_length.length
1944                 && nxt_unit_memcasecmp(name, content_length.start,
1945                                        content_length.length) == 0)
1946             {
1947                 r->content_length_field = i;
1948             }
1949 
1950             break;
1951 
1952         case NXT_UNIT_HASH_CONTENT_TYPE:
1953             if (fields[i].name_length == content_type.length
1954                 && nxt_unit_memcasecmp(name, content_type.start,
1955                                        content_type.length) == 0)
1956             {
1957                 r->content_type_field = i;
1958             }
1959 
1960             break;
1961 
1962         case NXT_UNIT_HASH_COOKIE:
1963             if (fields[i].name_length == cookie.length
1964                 && nxt_unit_memcasecmp(name, cookie.start,
1965                                        cookie.length) == 0)
1966             {
1967                 r->cookie_field = i;
1968             }
1969 
1970             break;
1971         }
1972 
1973         for (j = i + 1; j < r->fields_count; j++) {
1974             if (fields[i].hash != fields[j].hash
1975                 || fields[i].name_length != fields[j].name_length
1976                 || nxt_unit_memcasecmp(name,
1977                                        nxt_unit_sptr_get(&fields[j].name),
1978                                        fields[j].name_length) != 0)
1979             {
1980                 continue;
1981             }
1982 
1983             f = fields[j];
1984             f.value.offset += (j - (i + 1)) * sizeof(f);
1985 
1986             while (j > i + 1) {
1987                 fields[j] = fields[j - 1];
1988                 fields[j].name.offset -= sizeof(f);
1989                 fields[j].value.offset -= sizeof(f);
1990                 j--;
1991             }
1992 
1993             fields[j] = f;
1994 
1995             /* Assign the same name pointer for further grouping simplicity. */
1996             nxt_unit_sptr_set(&fields[j].name, name);
1997 
1998             i++;
1999         }
2000     }
2001 }
2002 
2003 
2004 int
2005 nxt_unit_response_init(nxt_unit_request_info_t *req,
2006     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2007 {
2008     uint32_t                      buf_size;
2009     nxt_unit_buf_t                *buf;
2010     nxt_unit_request_info_impl_t  *req_impl;
2011 
2012     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2013 
2014     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2015         nxt_unit_req_warn(req, "init: response already sent");
2016 
2017         return NXT_UNIT_ERROR;
2018     }
2019 
2020     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2021                        (int) max_fields_count, (int) max_fields_size);
2022 
2023     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2024         nxt_unit_req_debug(req, "duplicate response init");
2025     }
2026 
2027     /*
2028      * Each field name and value 0-terminated by libunit,
2029      * this is the reason of '+ 2' below.
2030      */
2031     buf_size = sizeof(nxt_unit_response_t)
2032                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2033                + max_fields_size;
2034 
2035     if (nxt_slow_path(req->response_buf != NULL)) {
2036         buf = req->response_buf;
2037 
2038         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2039             goto init_response;
2040         }
2041 
2042         nxt_unit_buf_free(buf);
2043 
2044         req->response_buf = NULL;
2045         req->response = NULL;
2046         req->response_max_fields = 0;
2047 
2048         req_impl->state = NXT_UNIT_RS_START;
2049     }
2050 
2051     buf = nxt_unit_response_buf_alloc(req, buf_size);
2052     if (nxt_slow_path(buf == NULL)) {
2053         return NXT_UNIT_ERROR;
2054     }
2055 
2056 init_response:
2057 
2058     memset(buf->start, 0, sizeof(nxt_unit_response_t));
2059 
2060     req->response_buf = buf;
2061 
2062     req->response = (nxt_unit_response_t *) buf->start;
2063     req->response->status = status;
2064 
2065     buf->free = buf->start + sizeof(nxt_unit_response_t)
2066                 + max_fields_count * sizeof(nxt_unit_field_t);
2067 
2068     req->response_max_fields = max_fields_count;
2069     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2070 
2071     return NXT_UNIT_OK;
2072 }
2073 
2074 
2075 int
2076 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2077     uint32_t max_fields_count, uint32_t max_fields_size)
2078 {
2079     char                          *p;
2080     uint32_t                      i, buf_size;
2081     nxt_unit_buf_t                *buf;
2082     nxt_unit_field_t              *f, *src;
2083     nxt_unit_response_t           *resp;
2084     nxt_unit_request_info_impl_t  *req_impl;
2085 
2086     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2087 
2088     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2089         nxt_unit_req_warn(req, "realloc: response not init");
2090 
2091         return NXT_UNIT_ERROR;
2092     }
2093 
2094     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2095         nxt_unit_req_warn(req, "realloc: response already sent");
2096 
2097         return NXT_UNIT_ERROR;
2098     }
2099 
2100     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2101         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2102 
2103         return NXT_UNIT_ERROR;
2104     }
2105 
2106     /*
2107      * Each field name and value 0-terminated by libunit,
2108      * this is the reason of '+ 2' below.
2109      */
2110     buf_size = sizeof(nxt_unit_response_t)
2111                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2112                + max_fields_size;
2113 
2114     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2115 
2116     buf = nxt_unit_response_buf_alloc(req, buf_size);
2117     if (nxt_slow_path(buf == NULL)) {
2118         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2119         return NXT_UNIT_ERROR;
2120     }
2121 
2122     resp = (nxt_unit_response_t *) buf->start;
2123 
2124     memset(resp, 0, sizeof(nxt_unit_response_t));
2125 
2126     resp->status = req->response->status;
2127     resp->content_length = req->response->content_length;
2128 
2129     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2130     f = resp->fields;
2131 
2132     for (i = 0; i < req->response->fields_count; i++) {
2133         src = req->response->fields + i;
2134 
2135         if (nxt_slow_path(src->skip != 0)) {
2136             continue;
2137         }
2138 
2139         if (nxt_slow_path(src->name_length + src->value_length + 2
2140                           > (uint32_t) (buf->end - p)))
2141         {
2142             nxt_unit_req_warn(req, "realloc: not enough space for field"
2143                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2144                   i, src, src->name_length, src->value_length);
2145 
2146             goto fail;
2147         }
2148 
2149         nxt_unit_sptr_set(&f->name, p);
2150         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2151         *p++ = '\0';
2152 
2153         nxt_unit_sptr_set(&f->value, p);
2154         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2155         *p++ = '\0';
2156 
2157         f->hash = src->hash;
2158         f->skip = 0;
2159         f->name_length = src->name_length;
2160         f->value_length = src->value_length;
2161 
2162         resp->fields_count++;
2163         f++;
2164     }
2165 
2166     if (req->response->piggyback_content_length > 0) {
2167         if (nxt_slow_path(req->response->piggyback_content_length
2168                           > (uint32_t) (buf->end - p)))
2169         {
2170             nxt_unit_req_warn(req, "realloc: not enought space for content"
2171                   " #%"PRIu32", %"PRIu32" required",
2172                   i, req->response->piggyback_content_length);
2173 
2174             goto fail;
2175         }
2176 
2177         resp->piggyback_content_length =
2178                                        req->response->piggyback_content_length;
2179 
2180         nxt_unit_sptr_set(&resp->piggyback_content, p);
2181         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2182                        req->response->piggyback_content_length);
2183     }
2184 
2185     buf->free = p;
2186 
2187     nxt_unit_buf_free(req->response_buf);
2188 
2189     req->response = resp;
2190     req->response_buf = buf;
2191     req->response_max_fields = max_fields_count;
2192 
2193     return NXT_UNIT_OK;
2194 
2195 fail:
2196 
2197     nxt_unit_buf_free(buf);
2198 
2199     return NXT_UNIT_ERROR;
2200 }
2201 
2202 
2203 int
2204 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2205 {
2206     nxt_unit_request_info_impl_t  *req_impl;
2207 
2208     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2209 
2210     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2211 }
2212 
2213 
2214 int
2215 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2216     const char *name, uint8_t name_length,
2217     const char *value, uint32_t value_length)
2218 {
2219     nxt_unit_buf_t                *buf;
2220     nxt_unit_field_t              *f;
2221     nxt_unit_response_t           *resp;
2222     nxt_unit_request_info_impl_t  *req_impl;
2223 
2224     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2225 
2226     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2227         nxt_unit_req_warn(req, "add_field: response not initialized or "
2228                           "already sent");
2229 
2230         return NXT_UNIT_ERROR;
2231     }
2232 
2233     resp = req->response;
2234 
2235     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2236         nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2237                           (int) resp->fields_count);
2238 
2239         return NXT_UNIT_ERROR;
2240     }
2241 
2242     buf = req->response_buf;
2243 
2244     if (nxt_slow_path(name_length + value_length + 2
2245                       > (uint32_t) (buf->end - buf->free)))
2246     {
2247         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2248 
2249         return NXT_UNIT_ERROR;
2250     }
2251 
2252     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2253                        resp->fields_count,
2254                        (int) name_length, name,
2255                        (int) value_length, value);
2256 
2257     f = resp->fields + resp->fields_count;
2258 
2259     nxt_unit_sptr_set(&f->name, buf->free);
2260     buf->free = nxt_cpymem(buf->free, name, name_length);
2261     *buf->free++ = '\0';
2262 
2263     nxt_unit_sptr_set(&f->value, buf->free);
2264     buf->free = nxt_cpymem(buf->free, value, value_length);
2265     *buf->free++ = '\0';
2266 
2267     f->hash = nxt_unit_field_hash(name, name_length);
2268     f->skip = 0;
2269     f->name_length = name_length;
2270     f->value_length = value_length;
2271 
2272     resp->fields_count++;
2273 
2274     return NXT_UNIT_OK;
2275 }
2276 
2277 
2278 int
2279 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2280     const void* src, uint32_t size)
2281 {
2282     nxt_unit_buf_t                *buf;
2283     nxt_unit_response_t           *resp;
2284     nxt_unit_request_info_impl_t  *req_impl;
2285 
2286     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2287 
2288     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2289         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2290 
2291         return NXT_UNIT_ERROR;
2292     }
2293 
2294     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2295         nxt_unit_req_warn(req, "add_content: response already sent");
2296 
2297         return NXT_UNIT_ERROR;
2298     }
2299 
2300     buf = req->response_buf;
2301 
2302     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2303         nxt_unit_req_warn(req, "add_content: buffer overflow");
2304 
2305         return NXT_UNIT_ERROR;
2306     }
2307 
2308     resp = req->response;
2309 
2310     if (resp->piggyback_content_length == 0) {
2311         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2312         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2313     }
2314 
2315     resp->piggyback_content_length += size;
2316 
2317     buf->free = nxt_cpymem(buf->free, src, size);
2318 
2319     return NXT_UNIT_OK;
2320 }
2321 
2322 
2323 int
2324 nxt_unit_response_send(nxt_unit_request_info_t *req)
2325 {
2326     int                           rc;
2327     nxt_unit_mmap_buf_t           *mmap_buf;
2328     nxt_unit_request_info_impl_t  *req_impl;
2329 
2330     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2331 
2332     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2333         nxt_unit_req_warn(req, "send: response is not initialized yet");
2334 
2335         return NXT_UNIT_ERROR;
2336     }
2337 
2338     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2339         nxt_unit_req_warn(req, "send: response already sent");
2340 
2341         return NXT_UNIT_ERROR;
2342     }
2343 
2344     if (req->request->websocket_handshake && req->response->status == 101) {
2345         nxt_unit_response_upgrade(req);
2346     }
2347 
2348     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2349                        req->response->fields_count,
2350                        (int) (req->response_buf->free
2351                               - req->response_buf->start));
2352 
2353     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2354 
2355     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2356     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2357         req->response = NULL;
2358         req->response_buf = NULL;
2359         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2360 
2361         nxt_unit_mmap_buf_free(mmap_buf);
2362     }
2363 
2364     return rc;
2365 }
2366 
2367 
2368 int
2369 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2370 {
2371     nxt_unit_request_info_impl_t  *req_impl;
2372 
2373     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2374 
2375     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2376 }
2377 
2378 
2379 nxt_unit_buf_t *
2380 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2381 {
2382     int                           rc;
2383     nxt_unit_mmap_buf_t           *mmap_buf;
2384     nxt_unit_request_info_impl_t  *req_impl;
2385 
2386     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2387         nxt_unit_req_warn(req, "response_buf_alloc: "
2388                           "requested buffer (%"PRIu32") too big", size);
2389 
2390         return NULL;
2391     }
2392 
2393     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2394 
2395     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2396 
2397     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2398     if (nxt_slow_path(mmap_buf == NULL)) {
2399         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2400 
2401         return NULL;
2402     }
2403 
2404     mmap_buf->req = req;
2405 
2406     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2407 
2408     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2409                                    size, size, mmap_buf,
2410                                    NULL);
2411     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2412         nxt_unit_mmap_buf_release(mmap_buf);
2413 
2414         nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2415 
2416         return NULL;
2417     }
2418 
2419     return &mmap_buf->buf;
2420 }
2421 
2422 
2423 static nxt_unit_mmap_buf_t *
2424 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2425 {
2426     nxt_unit_mmap_buf_t  *mmap_buf;
2427     nxt_unit_ctx_impl_t  *ctx_impl;
2428 
2429     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2430 
2431     pthread_mutex_lock(&ctx_impl->mutex);
2432 
2433     if (ctx_impl->free_buf == NULL) {
2434         pthread_mutex_unlock(&ctx_impl->mutex);
2435 
2436         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2437         if (nxt_slow_path(mmap_buf == NULL)) {
2438             return NULL;
2439         }
2440 
2441     } else {
2442         mmap_buf = ctx_impl->free_buf;
2443 
2444         nxt_unit_mmap_buf_unlink(mmap_buf);
2445 
2446         pthread_mutex_unlock(&ctx_impl->mutex);
2447     }
2448 
2449     mmap_buf->ctx_impl = ctx_impl;
2450 
2451     mmap_buf->hdr = NULL;
2452     mmap_buf->free_ptr = NULL;
2453 
2454     return mmap_buf;
2455 }
2456 
2457 
2458 static void
2459 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2460 {
2461     nxt_unit_mmap_buf_unlink(mmap_buf);
2462 
2463     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2464 
2465     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2466 
2467     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2468 }
2469 
2470 
2471 int
2472 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2473 {
2474     return req->request->websocket_handshake;
2475 }
2476 
2477 
2478 int
2479 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2480 {
2481     int                           rc;
2482     nxt_unit_request_info_impl_t  *req_impl;
2483 
2484     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2485 
2486     if (nxt_slow_path(req_impl->websocket != 0)) {
2487         nxt_unit_req_debug(req, "upgrade: already upgraded");
2488 
2489         return NXT_UNIT_OK;
2490     }
2491 
2492     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2493         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2494 
2495         return NXT_UNIT_ERROR;
2496     }
2497 
2498     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2499         nxt_unit_req_warn(req, "upgrade: response already sent");
2500 
2501         return NXT_UNIT_ERROR;
2502     }
2503 
2504     rc = nxt_unit_request_hash_add(req->ctx, req);
2505     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2506         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2507 
2508         return NXT_UNIT_ERROR;
2509     }
2510 
2511     req_impl->websocket = 1;
2512 
2513     req->response->status = 101;
2514 
2515     return NXT_UNIT_OK;
2516 }
2517 
2518 
2519 int
2520 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2521 {
2522     nxt_unit_request_info_impl_t  *req_impl;
2523 
2524     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2525 
2526     return req_impl->websocket;
2527 }
2528 
2529 
2530 nxt_unit_request_info_t *
2531 nxt_unit_get_request_info_from_data(void *data)
2532 {
2533     nxt_unit_request_info_impl_t  *req_impl;
2534 
2535     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2536 
2537     return &req_impl->req;
2538 }
2539 
2540 
2541 int
2542 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2543 {
2544     int                           rc;
2545     nxt_unit_mmap_buf_t           *mmap_buf;
2546     nxt_unit_request_info_t       *req;
2547     nxt_unit_request_info_impl_t  *req_impl;
2548 
2549     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2550 
2551     req = mmap_buf->req;
2552     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2553 
2554     nxt_unit_req_debug(req, "buf_send: %d bytes",
2555                        (int) (buf->free - buf->start));
2556 
2557     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2558         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2559 
2560         return NXT_UNIT_ERROR;
2561     }
2562 
2563     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2564         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2565 
2566         return NXT_UNIT_ERROR;
2567     }
2568 
2569     if (nxt_fast_path(buf->free > buf->start)) {
2570         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2571         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2572             return rc;
2573         }
2574     }
2575 
2576     nxt_unit_mmap_buf_free(mmap_buf);
2577 
2578     return NXT_UNIT_OK;
2579 }
2580 
2581 
2582 static void
2583 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2584 {
2585     int                      rc;
2586     nxt_unit_mmap_buf_t      *mmap_buf;
2587     nxt_unit_request_info_t  *req;
2588 
2589     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2590 
2591     req = mmap_buf->req;
2592 
2593     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2594     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2595         nxt_unit_mmap_buf_free(mmap_buf);
2596 
2597         nxt_unit_request_info_release(req);
2598 
2599     } else {
2600         nxt_unit_request_done(req, rc);
2601     }
2602 }
2603 
2604 
2605 static int
2606 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2607     nxt_unit_mmap_buf_t *mmap_buf, int last)
2608 {
2609     struct {
2610         nxt_port_msg_t       msg;
2611         nxt_port_mmap_msg_t  mmap_msg;
2612     } m;
2613 
2614     int                           rc;
2615     u_char                        *last_used, *first_free;
2616     ssize_t                       res;
2617     nxt_chunk_id_t                first_free_chunk;
2618     nxt_unit_buf_t                *buf;
2619     nxt_unit_impl_t               *lib;
2620     nxt_port_mmap_header_t        *hdr;
2621     nxt_unit_request_info_impl_t  *req_impl;
2622 
2623     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2624     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2625 
2626     buf = &mmap_buf->buf;
2627     hdr = mmap_buf->hdr;
2628 
2629     m.mmap_msg.size = buf->free - buf->start;
2630 
2631     m.msg.stream = req_impl->stream;
2632     m.msg.pid = lib->pid;
2633     m.msg.reply_port = 0;
2634     m.msg.type = _NXT_PORT_MSG_DATA;
2635     m.msg.last = last != 0;
2636     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2637     m.msg.nf = 0;
2638     m.msg.mf = 0;
2639     m.msg.tracking = 0;
2640 
2641     rc = NXT_UNIT_ERROR;
2642 
2643     if (m.msg.mmap) {
2644         m.mmap_msg.mmap_id = hdr->id;
2645         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2646                                                      (u_char *) buf->start);
2647 
2648         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2649                        req_impl->stream,
2650                        (int) m.mmap_msg.mmap_id,
2651                        (int) m.mmap_msg.chunk_id,
2652                        (int) m.mmap_msg.size);
2653 
2654         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2655                                  NULL);
2656         if (nxt_slow_path(res != sizeof(m))) {
2657             goto free_buf;
2658         }
2659 
2660         last_used = (u_char *) buf->free - 1;
2661         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2662 
2663         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2664             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2665 
2666             buf->start = (char *) first_free;
2667             buf->free = buf->start;
2668 
2669             if (buf->end < buf->start) {
2670                 buf->end = buf->start;
2671             }
2672 
2673         } else {
2674             buf->start = NULL;
2675             buf->free = NULL;
2676             buf->end = NULL;
2677 
2678             mmap_buf->hdr = NULL;
2679         }
2680 
2681         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2682                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2683 
2684         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2685                        (int) lib->outgoing.allocated_chunks);
2686 
2687     } else {
2688         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2689                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2690         {
2691             nxt_unit_alert(req->ctx,
2692                            "#%"PRIu32": failed to send plain memory buffer"
2693                            ": no space reserved for message header",
2694                            req_impl->stream);
2695 
2696             goto free_buf;
2697         }
2698 
2699         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2700 
2701         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2702                        req_impl->stream,
2703                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2704 
2705         res = nxt_unit_port_send(req->ctx, req->response_port,
2706                                  buf->start - sizeof(m.msg),
2707                                  m.mmap_msg.size + sizeof(m.msg), NULL);
2708 
2709         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2710             goto free_buf;
2711         }
2712     }
2713 
2714     rc = NXT_UNIT_OK;
2715 
2716 free_buf:
2717 
2718     nxt_unit_free_outgoing_buf(mmap_buf);
2719 
2720     return rc;
2721 }
2722 
2723 
2724 void
2725 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2726 {
2727     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2728 }
2729 
2730 
2731 static void
2732 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2733 {
2734     nxt_unit_free_outgoing_buf(mmap_buf);
2735 
2736     nxt_unit_mmap_buf_release(mmap_buf);
2737 }
2738 
2739 
2740 static void
2741 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2742 {
2743     if (mmap_buf->hdr != NULL) {
2744         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2745                               mmap_buf->hdr, mmap_buf->buf.start,
2746                               mmap_buf->buf.end - mmap_buf->buf.start);
2747 
2748         mmap_buf->hdr = NULL;
2749 
2750         return;
2751     }
2752 
2753     if (mmap_buf->free_ptr != NULL) {
2754         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2755 
2756         mmap_buf->free_ptr = NULL;
2757     }
2758 }
2759 
2760 
2761 static nxt_unit_read_buf_t *
2762 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2763 {
2764     nxt_unit_ctx_impl_t  *ctx_impl;
2765     nxt_unit_read_buf_t  *rbuf;
2766 
2767     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2768 
2769     pthread_mutex_lock(&ctx_impl->mutex);
2770 
2771     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2772 
2773     pthread_mutex_unlock(&ctx_impl->mutex);
2774 
2775     rbuf->oob.size = 0;
2776 
2777     return rbuf;
2778 }
2779 
2780 
2781 static nxt_unit_read_buf_t *
2782 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2783 {
2784     nxt_queue_link_t     *link;
2785     nxt_unit_read_buf_t  *rbuf;
2786 
2787     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2788         link = nxt_queue_first(&ctx_impl->free_rbuf);
2789         nxt_queue_remove(link);
2790 
2791         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2792 
2793         return rbuf;
2794     }
2795 
2796     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2797 
2798     if (nxt_fast_path(rbuf != NULL)) {
2799         rbuf->ctx_impl = ctx_impl;
2800     }
2801 
2802     return rbuf;
2803 }
2804 
2805 
2806 static void
2807 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2808     nxt_unit_read_buf_t *rbuf)
2809 {
2810     nxt_unit_ctx_impl_t  *ctx_impl;
2811 
2812     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2813 
2814     pthread_mutex_lock(&ctx_impl->mutex);
2815 
2816     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2817 
2818     pthread_mutex_unlock(&ctx_impl->mutex);
2819 }
2820 
2821 
2822 nxt_unit_buf_t *
2823 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2824 {
2825     nxt_unit_mmap_buf_t  *mmap_buf;
2826 
2827     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2828 
2829     if (mmap_buf->next == NULL) {
2830         return NULL;
2831     }
2832 
2833     return &mmap_buf->next->buf;
2834 }
2835 
2836 
2837 uint32_t
2838 nxt_unit_buf_max(void)
2839 {
2840     return PORT_MMAP_DATA_SIZE;
2841 }
2842 
2843 
2844 uint32_t
2845 nxt_unit_buf_min(void)
2846 {
2847     return PORT_MMAP_CHUNK_SIZE;
2848 }
2849 
2850 
2851 int
2852 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2853     size_t size)
2854 {
2855     ssize_t  res;
2856 
2857     res = nxt_unit_response_write_nb(req, start, size, size);
2858 
2859     return res < 0 ? -res : NXT_UNIT_OK;
2860 }
2861 
2862 
2863 ssize_t
2864 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2865     size_t size, size_t min_size)
2866 {
2867     int                           rc;
2868     ssize_t                       sent;
2869     uint32_t                      part_size, min_part_size, buf_size;
2870     const char                    *part_start;
2871     nxt_unit_mmap_buf_t           mmap_buf;
2872     nxt_unit_request_info_impl_t  *req_impl;
2873     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2874 
2875     nxt_unit_req_debug(req, "write: %d", (int) size);
2876 
2877     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2878 
2879     part_start = start;
2880     sent = 0;
2881 
2882     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2883         nxt_unit_req_alert(req, "write: response not initialized yet");
2884 
2885         return -NXT_UNIT_ERROR;
2886     }
2887 
2888     /* Check if response is not send yet. */
2889     if (nxt_slow_path(req->response_buf != NULL)) {
2890         part_size = req->response_buf->end - req->response_buf->free;
2891         part_size = nxt_min(size, part_size);
2892 
2893         rc = nxt_unit_response_add_content(req, part_start, part_size);
2894         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2895             return -rc;
2896         }
2897 
2898         rc = nxt_unit_response_send(req);
2899         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2900             return -rc;
2901         }
2902 
2903         size -= part_size;
2904         part_start += part_size;
2905         sent += part_size;
2906 
2907         min_size -= nxt_min(min_size, part_size);
2908     }
2909 
2910     while (size > 0) {
2911         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2912         min_part_size = nxt_min(min_size, part_size);
2913         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2914 
2915         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2916                                        min_part_size, &mmap_buf, local_buf);
2917         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2918             return -rc;
2919         }
2920 
2921         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2922         if (nxt_slow_path(buf_size == 0)) {
2923             return sent;
2924         }
2925         part_size = nxt_min(buf_size, part_size);
2926 
2927         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2928                                        part_start, part_size);
2929 
2930         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2931         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2932             return -rc;
2933         }
2934 
2935         size -= part_size;
2936         part_start += part_size;
2937         sent += part_size;
2938 
2939         min_size -= nxt_min(min_size, part_size);
2940     }
2941 
2942     return sent;
2943 }
2944 
2945 
2946 int
2947 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2948     nxt_unit_read_info_t *read_info)
2949 {
2950     int                           rc;
2951     ssize_t                       n;
2952     uint32_t                      buf_size;
2953     nxt_unit_buf_t                *buf;
2954     nxt_unit_mmap_buf_t           mmap_buf;
2955     nxt_unit_request_info_impl_t  *req_impl;
2956     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2957 
2958     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2959 
2960     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2961         nxt_unit_req_alert(req, "write: response not initialized yet");
2962 
2963         return NXT_UNIT_ERROR;
2964     }
2965 
2966     /* Check if response is not send yet. */
2967     if (nxt_slow_path(req->response_buf != NULL)) {
2968 
2969         /* Enable content in headers buf. */
2970         rc = nxt_unit_response_add_content(req, "", 0);
2971         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2972             nxt_unit_req_error(req, "Failed to add piggyback content");
2973 
2974             return rc;
2975         }
2976 
2977         buf = req->response_buf;
2978 
2979         while (buf->end - buf->free > 0) {
2980             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2981             if (nxt_slow_path(n < 0)) {
2982                 nxt_unit_req_error(req, "Read error");
2983 
2984                 return NXT_UNIT_ERROR;
2985             }
2986 
2987             /* Manually increase sizes. */
2988             buf->free += n;
2989             req->response->piggyback_content_length += n;
2990 
2991             if (read_info->eof) {
2992                 break;
2993             }
2994         }
2995 
2996         rc = nxt_unit_response_send(req);
2997         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2998             nxt_unit_req_error(req, "Failed to send headers with content");
2999 
3000             return rc;
3001         }
3002 
3003         if (read_info->eof) {
3004             return NXT_UNIT_OK;
3005         }
3006     }
3007 
3008     while (!read_info->eof) {
3009         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3010                            read_info->buf_size);
3011 
3012         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3013 
3014         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3015                                        buf_size, buf_size,
3016                                        &mmap_buf, local_buf);
3017         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3018             return rc;
3019         }
3020 
3021         buf = &mmap_buf.buf;
3022 
3023         while (!read_info->eof && buf->end > buf->free) {
3024             n = read_info->read(read_info, buf->free, buf->end - buf->free);
3025             if (nxt_slow_path(n < 0)) {
3026                 nxt_unit_req_error(req, "Read error");
3027 
3028                 nxt_unit_free_outgoing_buf(&mmap_buf);
3029 
3030                 return NXT_UNIT_ERROR;
3031             }
3032 
3033             buf->free += n;
3034         }
3035 
3036         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3037         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3038             nxt_unit_req_error(req, "Failed to send content");
3039 
3040             return rc;
3041         }
3042     }
3043 
3044     return NXT_UNIT_OK;
3045 }
3046 
3047 
3048 ssize_t
3049 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3050 {
3051     ssize_t  buf_res, res;
3052 
3053     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3054                                 dst, size);
3055 
3056     if (buf_res < (ssize_t) size && req->content_fd != -1) {
3057         res = read(req->content_fd, dst, size);
3058         if (nxt_slow_path(res < 0)) {
3059             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3060                                strerror(errno), errno);
3061 
3062             return res;
3063         }
3064 
3065         if (res < (ssize_t) size) {
3066             nxt_unit_close(req->content_fd);
3067 
3068             req->content_fd = -1;
3069         }
3070 
3071         req->content_length -= res;
3072         size -= res;
3073 
3074         dst = nxt_pointer_to(dst, res);
3075 
3076     } else {
3077         res = 0;
3078     }
3079 
3080     return buf_res + res;
3081 }
3082 
3083 
3084 ssize_t
3085 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3086 {
3087     char                 *p;
3088     size_t               l_size, b_size;
3089     nxt_unit_buf_t       *b;
3090     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3091 
3092     if (req->content_length == 0) {
3093         return 0;
3094     }
3095 
3096     l_size = 0;
3097 
3098     b = req->content_buf;
3099 
3100     while (b != NULL) {
3101         b_size = b->end - b->free;
3102         p = memchr(b->free, '\n', b_size);
3103 
3104         if (p != NULL) {
3105             p++;
3106             l_size += p - b->free;
3107             break;
3108         }
3109 
3110         l_size += b_size;
3111 
3112         if (max_size <= l_size) {
3113             break;
3114         }
3115 
3116         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
3117         if (mmap_buf->next == NULL
3118             && req->content_fd != -1
3119             && l_size < req->content_length)
3120         {
3121             preread_buf = nxt_unit_request_preread(req, 16384);
3122             if (nxt_slow_path(preread_buf == NULL)) {
3123                 return -1;
3124             }
3125 
3126             nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
3127         }
3128 
3129         b = nxt_unit_buf_next(b);
3130     }
3131 
3132     return nxt_min(max_size, l_size);
3133 }
3134 
3135 
3136 static nxt_unit_mmap_buf_t *
3137 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
3138 {
3139     ssize_t              res;
3140     nxt_unit_mmap_buf_t  *mmap_buf;
3141 
3142     if (req->content_fd == -1) {
3143         nxt_unit_req_alert(req, "preread: content_fd == -1");
3144         return