xref: /unit/src/nxt_unit.c (revision 2014:f8a0992944df)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "nxt_main.h"
7 #include "nxt_port_memory_int.h"
8 #include "nxt_socket_msg.h"
9 #include "nxt_port_queue.h"
10 #include "nxt_app_queue.h"
11 
12 #include "nxt_unit.h"
13 #include "nxt_unit_request.h"
14 #include "nxt_unit_response.h"
15 #include "nxt_unit_websocket.h"
16 
17 #include "nxt_websocket.h"
18 
19 #if (NXT_HAVE_MEMFD_CREATE)
20 #include <linux/memfd.h>
21 #endif
22 
23 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
24 #define NXT_UNIT_LOCAL_BUF_SIZE  \
25     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
26 
27 enum {
28     NXT_QUIT_NORMAL   = 0,
29     NXT_QUIT_GRACEFUL = 1,
30 };
31 
32 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
33 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
34 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
35 typedef struct nxt_unit_process_s               nxt_unit_process_t;
36 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
37 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
38 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
39 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
40 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
41 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
42 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
43 
44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
46     nxt_unit_ctx_impl_t *ctx_impl, void *data);
47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
52     nxt_unit_mmap_buf_t *mmap_buf);
53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
54     nxt_unit_mmap_buf_t *mmap_buf);
55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
57     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
58     int *shared_port_fd, int *shared_queue_fd,
59     int *log_fd, uint32_t *stream, uint32_t *shm_limit,
60     uint32_t *request_limit);
61 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
62     int queue_fd);
63 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
64     nxt_unit_request_info_t **preq);
65 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
66     nxt_unit_recv_msg_t *recv_msg);
67 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
68 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
69     nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
70 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
71     nxt_unit_recv_msg_t *recv_msg);
72 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
73     nxt_unit_port_id_t *port_id);
74 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
75 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
76     nxt_unit_recv_msg_t *recv_msg);
77 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
78 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
79     nxt_unit_ctx_t *ctx);
80 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
81 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
82 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
83     nxt_unit_ctx_t *ctx);
84 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
85 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
86     nxt_unit_websocket_frame_impl_t *ws);
87 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
88 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
89 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
90     nxt_unit_mmap_buf_t *mmap_buf, int last);
91 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
92 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
94 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
95     nxt_unit_ctx_impl_t *ctx_impl);
96 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
97     nxt_unit_read_buf_t *rbuf);
98 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
99     nxt_unit_request_info_t *req, size_t size);
100 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
101     size_t size);
102 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
103     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
104 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
105 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
106 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
107 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
108     nxt_unit_port_t *port, int n);
109 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
110 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
111     int fd);
112 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
113     nxt_unit_port_t *port, uint32_t size,
114     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
115 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
116 
117 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
118     nxt_unit_ctx_impl_t *ctx_impl);
119 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
120 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
121 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
122 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
123 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
124     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
125     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
126 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
127     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
128 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
129 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
130     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
131 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
132 
133 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
135     pid_t pid, int remove);
136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
137 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
138 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
139 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
140 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
141 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
142 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
145 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
146 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
147     nxt_unit_port_t *port);
148 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
149 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
150 
151 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
152     nxt_unit_port_t *port, int queue_fd);
153 
154 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
155 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
156 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
157     nxt_unit_port_t *port, void *queue);
158 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
159     nxt_queue_t *awaiting_req);
160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
161     nxt_unit_port_id_t *port_id);
162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
163     nxt_unit_port_id_t *port_id);
164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
166     nxt_unit_process_t *process);
167 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
168 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
169 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
170     nxt_unit_port_t *port, const void *buf, size_t buf_size,
171     const nxt_send_oob_t *oob);
172 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
173     const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
174 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
175     nxt_unit_read_buf_t *rbuf);
176 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
177     nxt_unit_read_buf_t *src);
178 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
179     nxt_unit_read_buf_t *rbuf);
180 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
181     nxt_unit_read_buf_t *rbuf);
182 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
183     nxt_unit_read_buf_t *rbuf);
184 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
185     nxt_unit_read_buf_t *rbuf);
186 nxt_inline int nxt_unit_close(int fd);
187 static int nxt_unit_fd_blocking(int fd);
188 
189 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
190     nxt_unit_port_t *port);
191 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
192     nxt_unit_port_id_t *port_id, int remove);
193 
194 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
195     nxt_unit_request_info_t *req);
196 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
197     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
198 
199 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
200 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
201 static void nxt_unit_lvlhsh_free(void *data, void *p);
202 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
203 
204 
205 struct nxt_unit_mmap_buf_s {
206     nxt_unit_buf_t           buf;
207 
208     nxt_unit_mmap_buf_t      *next;
209     nxt_unit_mmap_buf_t      **prev;
210 
211     nxt_port_mmap_header_t   *hdr;
212     nxt_unit_request_info_t  *req;
213     nxt_unit_ctx_impl_t      *ctx_impl;
214     char                     *free_ptr;
215     char                     *plain_ptr;
216 };
217 
218 
219 struct nxt_unit_recv_msg_s {
220     uint32_t                 stream;
221     nxt_pid_t                pid;
222     nxt_port_id_t            reply_port;
223 
224     uint8_t                  last;      /* 1 bit */
225     uint8_t                  mmap;      /* 1 bit */
226 
227     void                     *start;
228     uint32_t                 size;
229 
230     int                      fd[2];
231 
232     nxt_unit_mmap_buf_t      *incoming_buf;
233 };
234 
235 
236 typedef enum {
237     NXT_UNIT_RS_START           = 0,
238     NXT_UNIT_RS_RESPONSE_INIT,
239     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
240     NXT_UNIT_RS_RESPONSE_SENT,
241     NXT_UNIT_RS_RELEASED,
242 } nxt_unit_req_state_t;
243 
244 
245 struct nxt_unit_request_info_impl_s {
246     nxt_unit_request_info_t  req;
247 
248     uint32_t                 stream;
249 
250     nxt_unit_mmap_buf_t      *outgoing_buf;
251     nxt_unit_mmap_buf_t      *incoming_buf;
252 
253     nxt_unit_req_state_t     state;
254     uint8_t                  websocket;
255     uint8_t                  in_hash;
256 
257     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
258     nxt_queue_link_t         link;
259     /*  for nxt_unit_port_impl_t.awaiting_req */
260     nxt_queue_link_t         port_wait_link;
261 
262     char                     extra_data[];
263 };
264 
265 
266 struct nxt_unit_websocket_frame_impl_s {
267     nxt_unit_websocket_frame_t  ws;
268 
269     nxt_unit_mmap_buf_t         *buf;
270 
271     nxt_queue_link_t            link;
272 
273     nxt_unit_ctx_impl_t         *ctx_impl;
274 };
275 
276 
277 struct nxt_unit_read_buf_s {
278     nxt_queue_link_t              link;
279     nxt_unit_ctx_impl_t           *ctx_impl;
280     ssize_t                       size;
281     nxt_recv_oob_t                oob;
282     char                          buf[16384];
283 };
284 
285 
286 struct nxt_unit_ctx_impl_s {
287     nxt_unit_ctx_t                ctx;
288 
289     nxt_atomic_t                  use_count;
290     nxt_atomic_t                  wait_items;
291 
292     pthread_mutex_t               mutex;
293 
294     nxt_unit_port_t               *read_port;
295 
296     nxt_queue_link_t              link;
297 
298     nxt_unit_mmap_buf_t           *free_buf;
299 
300     /*  of nxt_unit_request_info_impl_t */
301     nxt_queue_t                   free_req;
302 
303     /*  of nxt_unit_websocket_frame_impl_t */
304     nxt_queue_t                   free_ws;
305 
306     /*  of nxt_unit_request_info_impl_t */
307     nxt_queue_t                   active_req;
308 
309     /*  of nxt_unit_request_info_impl_t */
310     nxt_lvlhsh_t                  requests;
311 
312     /*  of nxt_unit_request_info_impl_t */
313     nxt_queue_t                   ready_req;
314 
315     /*  of nxt_unit_read_buf_t */
316     nxt_queue_t                   pending_rbuf;
317 
318     /*  of nxt_unit_read_buf_t */
319     nxt_queue_t                   free_rbuf;
320 
321     uint8_t                       online;       /* 1 bit */
322     uint8_t                       ready;        /* 1 bit */
323     uint8_t                       quit_param;
324 
325     nxt_unit_mmap_buf_t           ctx_buf[2];
326     nxt_unit_read_buf_t           ctx_read_buf;
327 
328     nxt_unit_request_info_impl_t  req;
329 };
330 
331 
332 struct nxt_unit_mmap_s {
333     nxt_port_mmap_header_t   *hdr;
334     pthread_t                src_thread;
335 
336     /*  of nxt_unit_read_buf_t */
337     nxt_queue_t              awaiting_rbuf;
338 };
339 
340 
341 struct nxt_unit_mmaps_s {
342     pthread_mutex_t          mutex;
343     uint32_t                 size;
344     uint32_t                 cap;
345     nxt_atomic_t             allocated_chunks;
346     nxt_unit_mmap_t          *elts;
347 };
348 
349 
350 struct nxt_unit_impl_s {
351     nxt_unit_t               unit;
352     nxt_unit_callbacks_t     callbacks;
353 
354     nxt_atomic_t             use_count;
355     nxt_atomic_t             request_count;
356 
357     uint32_t                 request_data_size;
358     uint32_t                 shm_mmap_limit;
359     uint32_t                 request_limit;
360 
361     pthread_mutex_t          mutex;
362 
363     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
364     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
365 
366     nxt_unit_port_t          *router_port;
367     nxt_unit_port_t          *shared_port;
368 
369     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
370 
371     nxt_unit_mmaps_t         incoming;
372     nxt_unit_mmaps_t         outgoing;
373 
374     pid_t                    pid;
375     int                      log_fd;
376 
377     nxt_unit_ctx_impl_t      main_ctx;
378 };
379 
380 
381 struct nxt_unit_port_impl_s {
382     nxt_unit_port_t          port;
383 
384     nxt_atomic_t             use_count;
385 
386     /*  for nxt_unit_process_t.ports */
387     nxt_queue_link_t         link;
388     nxt_unit_process_t       *process;
389 
390     /*  of nxt_unit_request_info_impl_t */
391     nxt_queue_t              awaiting_req;
392 
393     int                      ready;
394 
395     void                     *queue;
396 
397     int                      from_socket;
398     nxt_unit_read_buf_t      *socket_rbuf;
399 };
400 
401 
402 struct nxt_unit_process_s {
403     pid_t                    pid;
404 
405     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
406 
407     nxt_unit_impl_t          *lib;
408 
409     nxt_atomic_t             use_count;
410 
411     uint32_t                 next_port_id;
412 };
413 
414 
415 /* Explicitly using 32 bit types to avoid possible alignment. */
416 typedef struct {
417     int32_t   pid;
418     uint32_t  id;
419 } nxt_unit_port_hash_id_t;
420 
421 
422 static pid_t  nxt_unit_pid;
423 
424 
425 nxt_unit_ctx_t *
426 nxt_unit_init(nxt_unit_init_t *init)
427 {
428     int              rc, queue_fd, shared_queue_fd;
429     void             *mem;
430     uint32_t         ready_stream, shm_limit, request_limit;
431     nxt_unit_ctx_t   *ctx;
432     nxt_unit_impl_t  *lib;
433     nxt_unit_port_t  ready_port, router_port, read_port, shared_port;
434 
435     nxt_unit_pid = getpid();
436 
437     lib = nxt_unit_create(init);
438     if (nxt_slow_path(lib == NULL)) {
439         return NULL;
440     }
441 
442     queue_fd = -1;
443     mem = MAP_FAILED;
444     shared_port.out_fd = -1;
445 
446     if (init->ready_port.id.pid != 0
447         && init->ready_stream != 0
448         && init->read_port.id.pid != 0)
449     {
450         ready_port = init->ready_port;
451         ready_stream = init->ready_stream;
452         router_port = init->router_port;
453         read_port = init->read_port;
454         lib->log_fd = init->log_fd;
455 
456         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
457                               ready_port.id.id);
458         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
459                               router_port.id.id);
460         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
461                               read_port.id.id);
462 
463         shared_port.in_fd = init->shared_port_fd;
464         shared_queue_fd = init->shared_queue_fd;
465 
466     } else {
467         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
468                                &shared_port.in_fd, &shared_queue_fd,
469                                &lib->log_fd, &ready_stream, &shm_limit,
470                                &request_limit);
471         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
472             goto fail;
473         }
474 
475         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
476                                 / PORT_MMAP_DATA_SIZE;
477         lib->request_limit = request_limit;
478     }
479 
480     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
481         lib->shm_mmap_limit = 1;
482     }
483 
484     lib->pid = read_port.id.pid;
485     nxt_unit_pid = lib->pid;
486 
487     ctx = &lib->main_ctx.ctx;
488 
489     rc = nxt_unit_fd_blocking(router_port.out_fd);
490     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
491         goto fail;
492     }
493 
494     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
495     if (nxt_slow_path(lib->router_port == NULL)) {
496         nxt_unit_alert(NULL, "failed to add router_port");
497 
498         goto fail;
499     }
500 
501     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
502     if (nxt_slow_path(queue_fd == -1)) {
503         goto fail;
504     }
505 
506     mem = mmap(NULL, sizeof(nxt_port_queue_t),
507                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
508     if (nxt_slow_path(mem == MAP_FAILED)) {
509         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
510                        strerror(errno), errno);
511 
512         goto fail;
513     }
514 
515     nxt_port_queue_init(mem);
516 
517     rc = nxt_unit_fd_blocking(read_port.in_fd);
518     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
519         goto fail;
520     }
521 
522     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
523     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
524         nxt_unit_alert(NULL, "failed to add read_port");
525 
526         goto fail;
527     }
528 
529     rc = nxt_unit_fd_blocking(ready_port.out_fd);
530     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
531         goto fail;
532     }
533 
534     nxt_unit_port_id_init(&shared_port.id, read_port.id.pid,
535                           NXT_UNIT_SHARED_PORT_ID);
536 
537     mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
538                MAP_SHARED, shared_queue_fd, 0);
539     if (nxt_slow_path(mem == MAP_FAILED)) {
540         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd,
541                        strerror(errno), errno);
542 
543         goto fail;
544     }
545 
546     nxt_unit_close(shared_queue_fd);
547 
548     lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem);
549     if (nxt_slow_path(lib->shared_port == NULL)) {
550         nxt_unit_alert(NULL, "failed to add shared_port");
551 
552         goto fail;
553     }
554 
555     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
556     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
557         nxt_unit_alert(NULL, "failed to send READY message");
558 
559         goto fail;
560     }
561 
562     nxt_unit_close(ready_port.out_fd);
563     nxt_unit_close(queue_fd);
564 
565     return ctx;
566 
567 fail:
568 
569     if (mem != MAP_FAILED) {
570         munmap(mem, sizeof(nxt_port_queue_t));
571     }
572 
573     if (queue_fd != -1) {
574         nxt_unit_close(queue_fd);
575     }
576 
577     nxt_unit_ctx_release(&lib->main_ctx.ctx);
578 
579     return NULL;
580 }
581 
582 
583 static nxt_unit_impl_t *
584 nxt_unit_create(nxt_unit_init_t *init)
585 {
586     int                   rc;
587     nxt_unit_impl_t       *lib;
588     nxt_unit_callbacks_t  *cb;
589 
590     lib = nxt_unit_malloc(NULL,
591                           sizeof(nxt_unit_impl_t) + init->request_data_size);
592     if (nxt_slow_path(lib == NULL)) {
593         nxt_unit_alert(NULL, "failed to allocate unit struct");
594 
595         return NULL;
596     }
597 
598     rc = pthread_mutex_init(&lib->mutex, NULL);
599     if (nxt_slow_path(rc != 0)) {
600         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
601 
602         goto fail;
603     }
604 
605     lib->unit.data = init->data;
606     lib->callbacks = init->callbacks;
607 
608     lib->request_data_size = init->request_data_size;
609     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
610                             / PORT_MMAP_DATA_SIZE;
611     lib->request_limit = init->request_limit;
612 
613     lib->processes.slot = NULL;
614     lib->ports.slot = NULL;
615 
616     lib->log_fd = STDERR_FILENO;
617 
618     nxt_queue_init(&lib->contexts);
619 
620     lib->use_count = 0;
621     lib->request_count = 0;
622     lib->router_port = NULL;
623     lib->shared_port = NULL;
624 
625     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
626     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
627         pthread_mutex_destroy(&lib->mutex);
628         goto fail;
629     }
630 
631     cb = &lib->callbacks;
632 
633     if (cb->request_handler == NULL) {
634         nxt_unit_alert(NULL, "request_handler is NULL");
635 
636         pthread_mutex_destroy(&lib->mutex);
637         goto fail;
638     }
639 
640     nxt_unit_mmaps_init(&lib->incoming);
641     nxt_unit_mmaps_init(&lib->outgoing);
642 
643     return lib;
644 
645 fail:
646 
647     nxt_unit_free(NULL, lib);
648 
649     return NULL;
650 }
651 
652 
653 static int
654 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
655     void *data)
656 {
657     int  rc;
658 
659     ctx_impl->ctx.data = data;
660     ctx_impl->ctx.unit = &lib->unit;
661 
662     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
663     if (nxt_slow_path(rc != 0)) {
664         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
665 
666         return NXT_UNIT_ERROR;
667     }
668 
669     nxt_unit_lib_use(lib);
670 
671     pthread_mutex_lock(&lib->mutex);
672 
673     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
674 
675     pthread_mutex_unlock(&lib->mutex);
676 
677     ctx_impl->use_count = 1;
678     ctx_impl->wait_items = 0;
679     ctx_impl->online = 1;
680     ctx_impl->ready = 0;
681     ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
682 
683     nxt_queue_init(&ctx_impl->free_req);
684     nxt_queue_init(&ctx_impl->free_ws);
685     nxt_queue_init(&ctx_impl->active_req);
686     nxt_queue_init(&ctx_impl->ready_req);
687     nxt_queue_init(&ctx_impl->pending_rbuf);
688     nxt_queue_init(&ctx_impl->free_rbuf);
689 
690     ctx_impl->free_buf = NULL;
691     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
692     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
693 
694     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
695     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
696 
697     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
698 
699     ctx_impl->req.req.ctx = &ctx_impl->ctx;
700     ctx_impl->req.req.unit = &lib->unit;
701 
702     ctx_impl->read_port = NULL;
703     ctx_impl->requests.slot = 0;
704 
705     return NXT_UNIT_OK;
706 }
707 
708 
709 nxt_inline void
710 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
711 {
712     nxt_unit_ctx_impl_t  *ctx_impl;
713 
714     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
715 
716     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
717 }
718 
719 
720 nxt_inline void
721 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
722 {
723     long                 c;
724     nxt_unit_ctx_impl_t  *ctx_impl;
725 
726     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
727 
728     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
729 
730     if (c == 1) {
731         nxt_unit_ctx_free(ctx_impl);
732     }
733 }
734 
735 
736 nxt_inline void
737 nxt_unit_lib_use(nxt_unit_impl_t *lib)
738 {
739     nxt_atomic_fetch_add(&lib->use_count, 1);
740 }
741 
742 
743 nxt_inline void
744 nxt_unit_lib_release(nxt_unit_impl_t *lib)
745 {
746     long                c;
747     nxt_unit_process_t  *process;
748 
749     c = nxt_atomic_fetch_add(&lib->use_count, -1);
750 
751     if (c == 1) {
752         for ( ;; ) {
753             pthread_mutex_lock(&lib->mutex);
754 
755             process = nxt_unit_process_pop_first(lib);
756             if (process == NULL) {
757                 pthread_mutex_unlock(&lib->mutex);
758 
759                 break;
760             }
761 
762             nxt_unit_remove_process(lib, process);
763         }
764 
765         pthread_mutex_destroy(&lib->mutex);
766 
767         if (nxt_fast_path(lib->router_port != NULL)) {
768             nxt_unit_port_release(lib->router_port);
769         }
770 
771         if (nxt_fast_path(lib->shared_port != NULL)) {
772             nxt_unit_port_release(lib->shared_port);
773         }
774 
775         nxt_unit_mmaps_destroy(&lib->incoming);
776         nxt_unit_mmaps_destroy(&lib->outgoing);
777 
778         nxt_unit_free(NULL, lib);
779     }
780 }
781 
782 
783 nxt_inline void
784 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
785     nxt_unit_mmap_buf_t *mmap_buf)
786 {
787     mmap_buf->next = *head;
788 
789     if (mmap_buf->next != NULL) {
790         mmap_buf->next->prev = &mmap_buf->next;
791     }
792 
793     *head = mmap_buf;
794     mmap_buf->prev = head;
795 }
796 
797 
798 nxt_inline void
799 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
800     nxt_unit_mmap_buf_t *mmap_buf)
801 {
802     while (*prev != NULL) {
803         prev = &(*prev)->next;
804     }
805 
806     nxt_unit_mmap_buf_insert(prev, mmap_buf);
807 }
808 
809 
810 nxt_inline void
811 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
812 {
813     nxt_unit_mmap_buf_t  **prev;
814 
815     prev = mmap_buf->prev;
816 
817     if (mmap_buf->next != NULL) {
818         mmap_buf->next->prev = prev;
819     }
820 
821     if (prev != NULL) {
822         *prev = mmap_buf->next;
823     }
824 }
825 
826 
827 static int
828 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
829     nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd,
830     int *log_fd, uint32_t *stream,
831     uint32_t *shm_limit, uint32_t *request_limit)
832 {
833     int       rc;
834     int       ready_fd, router_fd, read_in_fd, read_out_fd;
835     char      *unit_init, *version_end, *vars;
836     size_t    version_length;
837     int64_t   ready_pid, router_pid, read_pid;
838     uint32_t  ready_stream, router_id, ready_id, read_id;
839 
840     unit_init = getenv(NXT_UNIT_INIT_ENV);
841     if (nxt_slow_path(unit_init == NULL)) {
842         nxt_unit_alert(NULL, "%s is not in the current environment",
843                        NXT_UNIT_INIT_ENV);
844 
845         return NXT_UNIT_ERROR;
846     }
847 
848     version_end = strchr(unit_init, ';');
849     if (nxt_slow_path(version_end == NULL)) {
850         nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
851                        NXT_UNIT_INIT_ENV, unit_init);
852 
853         return NXT_UNIT_ERROR;
854     }
855 
856     version_length = version_end - unit_init;
857 
858     rc = version_length != nxt_length(NXT_VERSION)
859          || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
860 
861     if (nxt_slow_path(rc != 0)) {
862         nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
863                        "%.*s, while the app was compiled with libunit %s",
864                        (int) version_length, unit_init, NXT_VERSION);
865 
866         return NXT_UNIT_ERROR;
867     }
868 
869     vars = version_end + 1;
870 
871     rc = sscanf(vars,
872                 "%"PRIu32";"
873                 "%"PRId64",%"PRIu32",%d;"
874                 "%"PRId64",%"PRIu32",%d;"
875                 "%"PRId64",%"PRIu32",%d,%d;"
876                 "%d,%d;"
877                 "%d,%"PRIu32",%"PRIu32,
878                 &ready_stream,
879                 &ready_pid, &ready_id, &ready_fd,
880                 &router_pid, &router_id, &router_fd,
881                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
882                 shared_port_fd, shared_queue_fd,
883                 log_fd, shm_limit, request_limit);
884 
885     if (nxt_slow_path(rc == EOF)) {
886         nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
887                        vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
888 
889         return NXT_UNIT_ERROR;
890     }
891 
892     if (nxt_slow_path(rc != 16)) {
893         nxt_unit_alert(NULL, "invalid number of variables in %s env: "
894                        "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars);
895 
896         return NXT_UNIT_ERROR;
897     }
898 
899     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
900 
901     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
902 
903     ready_port->in_fd = -1;
904     ready_port->out_fd = ready_fd;
905     ready_port->data = NULL;
906 
907     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
908 
909     router_port->in_fd = -1;
910     router_port->out_fd = router_fd;
911     router_port->data = NULL;
912 
913     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
914 
915     read_port->in_fd = read_in_fd;
916     read_port->out_fd = read_out_fd;
917     read_port->data = NULL;
918 
919     *stream = ready_stream;
920 
921     return NXT_UNIT_OK;
922 }
923 
924 
925 static int
926 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
927 {
928     ssize_t          res;
929     nxt_send_oob_t   oob;
930     nxt_port_msg_t   msg;
931     nxt_unit_impl_t  *lib;
932     int              fds[2] = {queue_fd, -1};
933 
934     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
935 
936     msg.stream = stream;
937     msg.pid = lib->pid;
938     msg.reply_port = 0;
939     msg.type = _NXT_PORT_MSG_PROCESS_READY;
940     msg.last = 1;
941     msg.mmap = 0;
942     msg.nf = 0;
943     msg.mf = 0;
944     msg.tracking = 0;
945 
946     nxt_socket_msg_oob_init(&oob, fds);
947 
948     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
949     if (res != sizeof(msg)) {
950         return NXT_UNIT_ERROR;
951     }
952 
953     return NXT_UNIT_OK;
954 }
955 
956 
957 static int
958 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
959     nxt_unit_request_info_t **preq)
960 {
961     int                  rc;
962     pid_t                pid;
963     uint8_t              quit_param;
964     nxt_port_msg_t       *port_msg;
965     nxt_unit_impl_t      *lib;
966     nxt_unit_recv_msg_t  recv_msg;
967 
968     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
969 
970     recv_msg.incoming_buf = NULL;
971     recv_msg.fd[0] = -1;
972     recv_msg.fd[1] = -1;
973 
974     rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
975     if (nxt_slow_path(rc != NXT_OK)) {
976         nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
977         rc = NXT_UNIT_ERROR;
978         goto done;
979     }
980 
981     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
982         if (nxt_slow_path(rbuf->size == 0)) {
983             nxt_unit_debug(ctx, "read port closed");
984 
985             nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
986             rc = NXT_UNIT_OK;
987             goto done;
988         }
989 
990         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
991 
992         rc = NXT_UNIT_ERROR;
993         goto done;
994     }
995 
996     port_msg = (nxt_port_msg_t *) rbuf->buf;
997 
998     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
999                    port_msg->stream, (int) port_msg->type,
1000                    recv_msg.fd[0], recv_msg.fd[1]);
1001 
1002     recv_msg.stream = port_msg->stream;
1003     recv_msg.pid = port_msg->pid;
1004     recv_msg.reply_port = port_msg->reply_port;
1005     recv_msg.last = port_msg->last;
1006     recv_msg.mmap = port_msg->mmap;
1007 
1008     recv_msg.start = port_msg + 1;
1009     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
1010 
1011     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
1012         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
1013                        port_msg->stream, (int) port_msg->type);
1014         rc = NXT_UNIT_ERROR;
1015         goto done;
1016     }
1017 
1018     /* Fragmentation is unsupported. */
1019     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
1020         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
1021                        port_msg->stream, (int) port_msg->type);
1022         rc = NXT_UNIT_ERROR;
1023         goto done;
1024     }
1025 
1026     if (port_msg->mmap) {
1027         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
1028 
1029         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1030             if (rc == NXT_UNIT_AGAIN) {
1031                 recv_msg.fd[0] = -1;
1032                 recv_msg.fd[1] = -1;
1033             }
1034 
1035             goto done;
1036         }
1037     }
1038 
1039     switch (port_msg->type) {
1040 
1041     case _NXT_PORT_MSG_RPC_READY:
1042         rc = NXT_UNIT_OK;
1043         break;
1044 
1045     case _NXT_PORT_MSG_QUIT:
1046         if (recv_msg.size == sizeof(quit_param)) {
1047             memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1048 
1049         } else {
1050             quit_param = NXT_QUIT_NORMAL;
1051         }
1052 
1053         nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1054                        (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1055 
1056         nxt_unit_quit(ctx, quit_param);
1057 
1058         rc = NXT_UNIT_OK;
1059         break;
1060 
1061     case _NXT_PORT_MSG_NEW_PORT:
1062         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1063         break;
1064 
1065     case _NXT_PORT_MSG_PORT_ACK:
1066         rc = nxt_unit_ctx_ready(ctx);
1067         break;
1068 
1069     case _NXT_PORT_MSG_CHANGE_FILE:
1070         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1071                        port_msg->stream, recv_msg.fd[0]);
1072 
1073         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1074             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1075                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1076                            strerror(errno), errno);
1077 
1078             rc = NXT_UNIT_ERROR;
1079             goto done;
1080         }
1081 
1082         rc = NXT_UNIT_OK;
1083         break;
1084 
1085     case _NXT_PORT_MSG_MMAP:
1086         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1087             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1088                            port_msg->stream, recv_msg.fd[0]);
1089 
1090             rc = NXT_UNIT_ERROR;
1091             goto done;
1092         }
1093 
1094         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1095         break;
1096 
1097     case _NXT_PORT_MSG_REQ_HEADERS:
1098         rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1099         break;
1100 
1101     case _NXT_PORT_MSG_REQ_BODY:
1102         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1103         break;
1104 
1105     case _NXT_PORT_MSG_WEBSOCKET:
1106         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1107         break;
1108 
1109     case _NXT_PORT_MSG_REMOVE_PID:
1110         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1111             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1112                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1113                            (int) sizeof(pid));
1114 
1115             rc = NXT_UNIT_ERROR;
1116             goto done;
1117         }
1118 
1119         memcpy(&pid, recv_msg.start, sizeof(pid));
1120 
1121         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1122                        port_msg->stream, (int) pid);
1123 
1124         nxt_unit_remove_pid(lib, pid);
1125 
1126         rc = NXT_UNIT_OK;
1127         break;
1128 
1129     case _NXT_PORT_MSG_SHM_ACK:
1130         rc = nxt_unit_process_shm_ack(ctx);
1131         break;
1132 
1133     default:
1134         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1135                        port_msg->stream, (int) port_msg->type);
1136 
1137         rc = NXT_UNIT_ERROR;
1138         goto done;
1139     }
1140 
1141 done:
1142 
1143     if (recv_msg.fd[0] != -1) {
1144         nxt_unit_close(recv_msg.fd[0]);
1145     }
1146 
1147     if (recv_msg.fd[1] != -1) {
1148         nxt_unit_close(recv_msg.fd[1]);
1149     }
1150 
1151     while (recv_msg.incoming_buf != NULL) {
1152         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1153     }
1154 
1155     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1156 #if (NXT_DEBUG)
1157         memset(rbuf->buf, 0xAC, rbuf->size);
1158 #endif
1159         nxt_unit_read_buf_release(ctx, rbuf);
1160     }
1161 
1162     return rc;
1163 }
1164 
1165 
1166 static int
1167 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1168 {
1169     void                     *mem;
1170     nxt_unit_port_t          new_port, *port;
1171     nxt_port_msg_new_port_t  *new_port_msg;
1172 
1173     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1174         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1175                       "invalid message size (%d)",
1176                       recv_msg->stream, (int) recv_msg->size);
1177 
1178         return NXT_UNIT_ERROR;
1179     }
1180 
1181     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1182         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1183                        recv_msg->stream, recv_msg->fd[0]);
1184 
1185         return NXT_UNIT_ERROR;
1186     }
1187 
1188     new_port_msg = recv_msg->start;
1189 
1190     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1191                    recv_msg->stream, (int) new_port_msg->pid,
1192                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1193 
1194     if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) {
1195         return NXT_UNIT_ERROR;
1196     }
1197 
1198     nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id);
1199 
1200     new_port.in_fd = -1;
1201     new_port.out_fd = recv_msg->fd[0];
1202 
1203     mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1204                MAP_SHARED, recv_msg->fd[1], 0);
1205 
1206     if (nxt_slow_path(mem == MAP_FAILED)) {
1207         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1208                        strerror(errno), errno);
1209 
1210         return NXT_UNIT_ERROR;
1211     }
1212 
1213     new_port.data = NULL;
1214 
1215     recv_msg->fd[0] = -1;
1216 
1217     port = nxt_unit_add_port(ctx, &new_port, mem);
1218     if (nxt_slow_path(port == NULL)) {
1219         return NXT_UNIT_ERROR;
1220     }
1221 
1222     nxt_unit_port_release(port);
1223 
1224     return NXT_UNIT_OK;
1225 }
1226 
1227 
1228 static int
1229 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1230 {
1231     nxt_unit_impl_t      *lib;
1232     nxt_unit_ctx_impl_t  *ctx_impl;
1233 
1234     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1235 
1236     if (nxt_slow_path(ctx_impl->ready)) {
1237         return NXT_UNIT_OK;
1238     }
1239 
1240     ctx_impl->ready = 1;
1241 
1242     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1243 
1244     /* Call ready_handler() only for main context. */
1245     if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1246         return lib->callbacks.ready_handler(ctx);
1247     }
1248 
1249     if (&lib->main_ctx != ctx_impl) {
1250         /* Check if the main context is already stopped or quit. */
1251         if (nxt_slow_path(!lib->main_ctx.ready)) {
1252             ctx_impl->ready = 0;
1253 
1254             nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1255 
1256             return NXT_UNIT_OK;
1257         }
1258 
1259         if (lib->callbacks.add_port != NULL) {
1260             lib->callbacks.add_port(ctx, lib->shared_port);
1261         }
1262     }
1263 
1264     return NXT_UNIT_OK;
1265 }
1266 
1267 
1268 static int
1269 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1270     nxt_unit_request_info_t **preq)
1271 {
1272     int                           res;
1273     nxt_unit_impl_t               *lib;
1274     nxt_unit_port_id_t            port_id;
1275     nxt_unit_request_t            *r;
1276     nxt_unit_mmap_buf_t           *b;
1277     nxt_unit_request_info_t       *req;
1278     nxt_unit_request_info_impl_t  *req_impl;
1279 
1280     if (nxt_slow_path(recv_msg->mmap == 0)) {
1281         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1282                       recv_msg->stream);
1283 
1284         return NXT_UNIT_ERROR;
1285     }
1286 
1287     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1288         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1289                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1290                       (int) sizeof(nxt_unit_request_t));
1291 
1292         return NXT_UNIT_ERROR;
1293     }
1294 
1295     req_impl = nxt_unit_request_info_get(ctx);
1296     if (nxt_slow_path(req_impl == NULL)) {
1297         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1298                       recv_msg->stream);
1299 
1300         return NXT_UNIT_ERROR;
1301     }
1302 
1303     req = &req_impl->req;
1304 
1305     req->request = recv_msg->start;
1306 
1307     b = recv_msg->incoming_buf;
1308 
1309     req->request_buf = &b->buf;
1310     req->response = NULL;
1311     req->response_buf = NULL;
1312 
1313     r = req->request;
1314 
1315     req->content_length = r->content_length;
1316 
1317     req->content_buf = req->request_buf;
1318     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1319 
1320     req_impl->stream = recv_msg->stream;
1321 
1322     req_impl->outgoing_buf = NULL;
1323 
1324     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1325         b->req = req;
1326     }
1327 
1328     /* "Move" incoming buffer list to req_impl. */
1329     req_impl->incoming_buf = recv_msg->incoming_buf;
1330     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1331     recv_msg->incoming_buf = NULL;
1332 
1333     req->content_fd = recv_msg->fd[0];
1334     recv_msg->fd[0] = -1;
1335 
1336     req->response_max_fields = 0;
1337     req_impl->state = NXT_UNIT_RS_START;
1338     req_impl->websocket = 0;
1339     req_impl->in_hash = 0;
1340 
1341     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1342                    (int) r->method_length,
1343                    (char *) nxt_unit_sptr_get(&r->method),
1344                    (int) r->target_length,
1345                    (char *) nxt_unit_sptr_get(&r->target),
1346                    (int) r->content_length);
1347 
1348     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1349 
1350     res = nxt_unit_request_check_response_port(req, &port_id);
1351     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1352         return NXT_UNIT_ERROR;
1353     }
1354 
1355     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1356         res = nxt_unit_send_req_headers_ack(req);
1357         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1358             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1359 
1360             return NXT_UNIT_ERROR;
1361         }
1362 
1363         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1364 
1365         if (req->content_length
1366             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1367         {
1368             res = nxt_unit_request_hash_add(ctx, req);
1369             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1370                 nxt_unit_req_warn(req, "failed to add request to hash");
1371 
1372                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1373 
1374                 return NXT_UNIT_ERROR;
1375             }
1376 
1377             /*
1378              * If application have separate data handler, we may start
1379              * request processing and process data when it is arrived.
1380              */
1381             if (lib->callbacks.data_handler == NULL) {
1382                 return NXT_UNIT_OK;
1383             }
1384         }
1385 
1386         if (preq == NULL) {
1387             lib->callbacks.request_handler(req);
1388 
1389         } else {
1390             *preq = req;
1391         }
1392     }
1393 
1394     return NXT_UNIT_OK;
1395 }
1396 
1397 
1398 static int
1399 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1400 {
1401     uint64_t                 l;
1402     nxt_unit_impl_t          *lib;
1403     nxt_unit_mmap_buf_t      *b;
1404     nxt_unit_request_info_t  *req;
1405 
1406     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1407     if (req == NULL) {
1408         return NXT_UNIT_OK;
1409     }
1410 
1411     l = req->content_buf->end - req->content_buf->free;
1412 
1413     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1414         b->req = req;
1415         l += b->buf.end - b->buf.free;
1416     }
1417 
1418     if (recv_msg->incoming_buf != NULL) {
1419         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1420 
1421         while (b->next != NULL) {
1422             b = b->next;
1423         }
1424 
1425         /* "Move" incoming buffer list to req_impl. */
1426         b->next = recv_msg->incoming_buf;
1427         b->next->prev = &b->next;
1428 
1429         recv_msg->incoming_buf = NULL;
1430     }
1431 
1432     req->content_fd = recv_msg->fd[0];
1433     recv_msg->fd[0] = -1;
1434 
1435     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1436 
1437     if (lib->callbacks.data_handler != NULL) {
1438         lib->callbacks.data_handler(req);
1439 
1440         return NXT_UNIT_OK;
1441     }
1442 
1443     if (req->content_fd != -1 || l == req->content_length) {
1444         lib->callbacks.request_handler(req);
1445     }
1446 
1447     return NXT_UNIT_OK;
1448 }
1449 
1450 
1451 static int
1452 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1453     nxt_unit_port_id_t *port_id)
1454 {
1455     int                           res;
1456     nxt_unit_ctx_t                *ctx;
1457     nxt_unit_impl_t               *lib;
1458     nxt_unit_port_t               *port;
1459     nxt_unit_process_t            *process;
1460     nxt_unit_ctx_impl_t           *ctx_impl;
1461     nxt_unit_port_impl_t          *port_impl;
1462     nxt_unit_request_info_impl_t  *req_impl;
1463 
1464     ctx = req->ctx;
1465     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1466     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1467 
1468     pthread_mutex_lock(&lib->mutex);
1469 
1470     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1471     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1472 
1473     if (nxt_fast_path(port != NULL)) {
1474         req->response_port = port;
1475 
1476         if (nxt_fast_path(port_impl->ready)) {
1477             pthread_mutex_unlock(&lib->mutex);
1478 
1479             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1480                            (int) port->id.pid, (int) port->id.id);
1481 
1482             return NXT_UNIT_OK;
1483         }
1484 
1485         nxt_unit_debug(ctx, "check_response_port: "
1486                        "port{%d,%d} already requested",
1487                        (int) port->id.pid, (int) port->id.id);
1488 
1489         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1490 
1491         nxt_queue_insert_tail(&port_impl->awaiting_req,
1492                               &req_impl->port_wait_link);
1493 
1494         pthread_mutex_unlock(&lib->mutex);
1495 
1496         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1497 
1498         return NXT_UNIT_AGAIN;
1499     }
1500 
1501     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1502     if (nxt_slow_path(port_impl == NULL)) {
1503         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1504                        (int) sizeof(nxt_unit_port_impl_t));
1505 
1506         pthread_mutex_unlock(&lib->mutex);
1507 
1508         return NXT_UNIT_ERROR;
1509     }
1510 
1511     port = &port_impl->port;
1512 
1513     port->id = *port_id;
1514     port->in_fd = -1;
1515     port->out_fd = -1;
1516     port->data = NULL;
1517 
1518     res = nxt_unit_port_hash_add(&lib->ports, port);
1519     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1520         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1521                        port->id.pid, port->id.id);
1522 
1523         pthread_mutex_unlock(&lib->mutex);
1524 
1525         nxt_unit_free(ctx, port);
1526 
1527         return NXT_UNIT_ERROR;
1528     }
1529 
1530     process = nxt_unit_process_find(lib, port_id->pid, 0);
1531     if (nxt_slow_path(process == NULL)) {
1532         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1533                        port->id.pid);
1534 
1535         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1536 
1537         pthread_mutex_unlock(&lib->mutex);
1538 
1539         nxt_unit_free(ctx, port);
1540 
1541         return NXT_UNIT_ERROR;
1542     }
1543 
1544     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1545 
1546     port_impl->process = process;
1547     port_impl->queue = NULL;
1548     port_impl->from_socket = 0;
1549     port_impl->socket_rbuf = NULL;
1550 
1551     nxt_queue_init(&port_impl->awaiting_req);
1552 
1553     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1554 
1555     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1556 
1557     port_impl->use_count = 2;
1558     port_impl->ready = 0;
1559 
1560     req->response_port = port;
1561 
1562     pthread_mutex_unlock(&lib->mutex);
1563 
1564     res = nxt_unit_get_port(ctx, port_id);
1565     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1566         return NXT_UNIT_ERROR;
1567     }
1568 
1569     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1570 
1571     return NXT_UNIT_AGAIN;
1572 }
1573 
1574 
1575 static int
1576 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1577 {
1578     ssize_t                       res;
1579     nxt_port_msg_t                msg;
1580     nxt_unit_impl_t               *lib;
1581     nxt_unit_ctx_impl_t           *ctx_impl;
1582     nxt_unit_request_info_impl_t  *req_impl;
1583 
1584     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1585     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1586     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1587 
1588     memset(&msg, 0, sizeof(nxt_port_msg_t));
1589 
1590     msg.stream = req_impl->stream;
1591     msg.pid = lib->pid;
1592     msg.reply_port = ctx_impl->read_port->id.id;
1593     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1594 
1595     res = nxt_unit_port_send(req->ctx, req->response_port,
1596                              &msg, sizeof(msg), NULL);
1597     if (nxt_slow_path(res != sizeof(msg))) {
1598         return NXT_UNIT_ERROR;
1599     }
1600 
1601     return NXT_UNIT_OK;
1602 }
1603 
1604 
1605 static int
1606 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1607 {
1608     size_t                           hsize;
1609     nxt_unit_impl_t                  *lib;
1610     nxt_unit_mmap_buf_t              *b;
1611     nxt_unit_callbacks_t             *cb;
1612     nxt_unit_request_info_t          *req;
1613     nxt_unit_request_info_impl_t     *req_impl;
1614     nxt_unit_websocket_frame_impl_t  *ws_impl;
1615 
1616     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1617     if (nxt_slow_path(req == NULL)) {
1618         return NXT_UNIT_OK;
1619     }
1620 
1621     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1622 
1623     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1624     cb = &lib->callbacks;
1625 
1626     if (cb->websocket_handler && recv_msg->size >= 2) {
1627         ws_impl = nxt_unit_websocket_frame_get(ctx);
1628         if (nxt_slow_path(ws_impl == NULL)) {
1629             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1630                           req_impl->stream);
1631 
1632             return NXT_UNIT_ERROR;
1633         }
1634 
1635         ws_impl->ws.req = req;
1636 
1637         ws_impl->buf = NULL;
1638 
1639         if (recv_msg->mmap) {
1640             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1641                 b->req = req;
1642             }
1643 
1644             /* "Move" incoming buffer list to ws_impl. */
1645             ws_impl->buf = recv_msg->incoming_buf;
1646             ws_impl->buf->prev = &ws_impl->buf;
1647             recv_msg->incoming_buf = NULL;
1648 
1649             b = ws_impl->buf;
1650 
1651         } else {
1652             b = nxt_unit_mmap_buf_get(ctx);
1653             if (nxt_slow_path(b == NULL)) {
1654                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1655                                req_impl->stream);
1656 
1657                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1658 
1659                 return NXT_UNIT_ERROR;
1660             }
1661 
1662             b->req = req;
1663             b->buf.start = recv_msg->start;
1664             b->buf.free = b->buf.start;
1665             b->buf.end = b->buf.start + recv_msg->size;
1666 
1667             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1668         }
1669 
1670         ws_impl->ws.header = (void *) b->buf.start;
1671         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1672             ws_impl->ws.header);
1673 
1674         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1675 
1676         if (ws_impl->ws.header->mask) {
1677             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1678 
1679         } else {
1680             ws_impl->ws.mask = NULL;
1681         }
1682 
1683         b->buf.free += hsize;
1684 
1685         ws_impl->ws.content_buf = &b->buf;
1686         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1687 
1688         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1689                            "payload_len=%"PRIu64,
1690                             ws_impl->ws.header->opcode,
1691                             ws_impl->ws.payload_len);
1692 
1693         cb->websocket_handler(&ws_impl->ws);
1694     }
1695 
1696     if (recv_msg->last) {
1697         if (cb->close_handler) {
1698             nxt_unit_req_debug(req, "close_handler");
1699 
1700             cb->close_handler(req);
1701 
1702         } else {
1703             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1704         }
1705     }
1706 
1707     return NXT_UNIT_OK;
1708 }
1709 
1710 
1711 static int
1712 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1713 {
1714     nxt_unit_impl_t       *lib;
1715     nxt_unit_callbacks_t  *cb;
1716 
1717     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1718     cb = &lib->callbacks;
1719 
1720     if (cb->shm_ack_handler != NULL) {
1721         cb->shm_ack_handler(ctx);
1722     }
1723 
1724     return NXT_UNIT_OK;
1725 }
1726 
1727 
1728 static nxt_unit_request_info_impl_t *
1729 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1730 {
1731     nxt_unit_impl_t               *lib;
1732     nxt_queue_link_t              *lnk;
1733     nxt_unit_ctx_impl_t           *ctx_impl;
1734     nxt_unit_request_info_impl_t  *req_impl;
1735 
1736     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1737 
1738     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1739 
1740     pthread_mutex_lock(&ctx_impl->mutex);
1741 
1742     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1743         pthread_mutex_unlock(&ctx_impl->mutex);
1744 
1745         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1746                                         + lib->request_data_size);
1747         if (nxt_slow_path(req_impl == NULL)) {
1748             return NULL;
1749         }
1750 
1751         req_impl->req.unit = ctx->unit;
1752         req_impl->req.ctx = ctx;
1753 
1754         pthread_mutex_lock(&ctx_impl->mutex);
1755 
1756     } else {
1757         lnk = nxt_queue_first(&ctx_impl->free_req);
1758         nxt_queue_remove(lnk);
1759 
1760         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1761     }
1762 
1763     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1764 
1765     pthread_mutex_unlock(&ctx_impl->mutex);
1766 
1767     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1768 
1769     return req_impl;
1770 }
1771 
1772 
1773 static void
1774 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1775 {
1776     nxt_unit_ctx_t                *ctx;
1777     nxt_unit_ctx_impl_t           *ctx_impl;
1778     nxt_unit_request_info_impl_t  *req_impl;
1779 
1780     ctx = req->ctx;
1781     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1782     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1783 
1784     req->response = NULL;
1785     req->response_buf = NULL;
1786 
1787     if (req_impl->in_hash) {
1788         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1789     }
1790 
1791     while (req_impl->outgoing_buf != NULL) {
1792         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1793     }
1794 
1795     while (req_impl->incoming_buf != NULL) {
1796         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1797     }
1798 
1799     if (req->content_fd != -1) {
1800         nxt_unit_close(req->content_fd);
1801 
1802         req->content_fd = -1;
1803     }
1804 
1805     if (req->response_port != NULL) {
1806         nxt_unit_port_release(req->response_port);
1807 
1808         req->response_port = NULL;
1809     }
1810 
1811     req_impl->state = NXT_UNIT_RS_RELEASED;
1812 
1813     pthread_mutex_lock(&ctx_impl->mutex);
1814 
1815     nxt_queue_remove(&req_impl->link);
1816 
1817     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1818 
1819     pthread_mutex_unlock(&ctx_impl->mutex);
1820 
1821     if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1822         nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1823     }
1824 }
1825 
1826 
1827 static void
1828 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1829 {
1830     nxt_unit_ctx_impl_t  *ctx_impl;
1831 
1832     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1833 
1834     nxt_queue_remove(&req_impl->link);
1835 
1836     if (req_impl != &ctx_impl->req) {
1837         nxt_unit_free(&ctx_impl->ctx, req_impl);
1838     }
1839 }
1840 
1841 
1842 static nxt_unit_websocket_frame_impl_t *
1843 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1844 {
1845     nxt_queue_link_t                 *lnk;
1846     nxt_unit_ctx_impl_t              *ctx_impl;
1847     nxt_unit_websocket_frame_impl_t  *ws_impl;
1848 
1849     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1850 
1851     pthread_mutex_lock(&ctx_impl->mutex);
1852 
1853     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1854         pthread_mutex_unlock(&ctx_impl->mutex);
1855 
1856         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1857         if (nxt_slow_path(ws_impl == NULL)) {
1858             return NULL;
1859         }
1860 
1861     } else {
1862         lnk = nxt_queue_first(&ctx_impl->free_ws);
1863         nxt_queue_remove(lnk);
1864 
1865         pthread_mutex_unlock(&ctx_impl->mutex);
1866 
1867         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1868     }
1869 
1870     ws_impl->ctx_impl = ctx_impl;
1871 
1872     return ws_impl;
1873 }
1874 
1875 
1876 static void
1877 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1878 {
1879     nxt_unit_websocket_frame_impl_t  *ws_impl;
1880 
1881     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1882 
1883     while (ws_impl->buf != NULL) {
1884         nxt_unit_mmap_buf_free(ws_impl->buf);
1885     }
1886 
1887     ws->req = NULL;
1888 
1889     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1890 
1891     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1892 
1893     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1894 }
1895 
1896 
1897 static void
1898 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1899     nxt_unit_websocket_frame_impl_t *ws_impl)
1900 {
1901     nxt_queue_remove(&ws_impl->link);
1902 
1903     nxt_unit_free(ctx, ws_impl);
1904 }
1905 
1906 
1907 uint16_t
1908 nxt_unit_field_hash(const char *name, size_t name_length)
1909 {
1910     u_char      ch;
1911     uint32_t    hash;
1912     const char  *p, *end;
1913 
1914     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1915     end = name + name_length;
1916 
1917     for (p = name; p < end; p++) {
1918         ch = *p;
1919         hash = (hash << 4) + hash + nxt_lowcase(ch);
1920     }
1921 
1922     hash = (hash >> 16) ^ hash;
1923 
1924     return hash;
1925 }
1926 
1927 
1928 void
1929 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1930 {
1931     char                *name;
1932     uint32_t            i, j;
1933     nxt_unit_field_t    *fields, f;
1934     nxt_unit_request_t  *r;
1935 
1936     static nxt_str_t  content_length = nxt_string("content-length");
1937     static nxt_str_t  content_type = nxt_string("content-type");
1938     static nxt_str_t  cookie = nxt_string("cookie");
1939 
1940     nxt_unit_req_debug(req, "group_dup_fields");
1941 
1942     r = req->request;
1943     fields = r->fields;
1944 
1945     for (i = 0; i < r->fields_count; i++) {
1946         name = nxt_unit_sptr_get(&fields[i].name);
1947 
1948         switch (fields[i].hash) {
1949         case NXT_UNIT_HASH_CONTENT_LENGTH:
1950             if (fields[i].name_length == content_length.length
1951                 && nxt_unit_memcasecmp(name, content_length.start,
1952                                        content_length.length) == 0)
1953             {
1954                 r->content_length_field = i;
1955             }
1956 
1957             break;
1958 
1959         case NXT_UNIT_HASH_CONTENT_TYPE:
1960             if (fields[i].name_length == content_type.length
1961                 && nxt_unit_memcasecmp(name, content_type.start,
1962                                        content_type.length) == 0)
1963             {
1964                 r->content_type_field = i;
1965             }
1966 
1967             break;
1968 
1969         case NXT_UNIT_HASH_COOKIE:
1970             if (fields[i].name_length == cookie.length
1971                 && nxt_unit_memcasecmp(name, cookie.start,
1972                                        cookie.length) == 0)
1973             {
1974                 r->cookie_field = i;
1975             }
1976 
1977             break;
1978         }
1979 
1980         for (j = i + 1; j < r->fields_count; j++) {
1981             if (fields[i].hash != fields[j].hash
1982                 || fields[i].name_length != fields[j].name_length
1983                 || nxt_unit_memcasecmp(name,
1984                                        nxt_unit_sptr_get(&fields[j].name),
1985                                        fields[j].name_length) != 0)
1986             {
1987                 continue;
1988             }
1989 
1990             f = fields[j];
1991             f.value.offset += (j - (i + 1)) * sizeof(f);
1992 
1993             while (j > i + 1) {
1994                 fields[j] = fields[j - 1];
1995                 fields[j].name.offset -= sizeof(f);
1996                 fields[j].value.offset -= sizeof(f);
1997                 j--;
1998             }
1999 
2000             fields[j] = f;
2001 
2002             /* Assign the same name pointer for further grouping simplicity. */
2003             nxt_unit_sptr_set(&fields[j].name, name);
2004 
2005             i++;
2006         }
2007     }
2008 }
2009 
2010 
2011 int
2012 nxt_unit_response_init(nxt_unit_request_info_t *req,
2013     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2014 {
2015     uint32_t                      buf_size;
2016     nxt_unit_buf_t                *buf;
2017     nxt_unit_request_info_impl_t  *req_impl;
2018 
2019     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2020 
2021     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2022         nxt_unit_req_warn(req, "init: response already sent");
2023 
2024         return NXT_UNIT_ERROR;
2025     }
2026 
2027     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2028                        (int) max_fields_count, (int) max_fields_size);
2029 
2030     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2031         nxt_unit_req_debug(req, "duplicate response init");
2032     }
2033 
2034     /*
2035      * Each field name and value 0-terminated by libunit,
2036      * this is the reason of '+ 2' below.
2037      */
2038     buf_size = sizeof(nxt_unit_response_t)
2039                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2040                + max_fields_size;
2041 
2042     if (nxt_slow_path(req->response_buf != NULL)) {
2043         buf = req->response_buf;
2044 
2045         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2046             goto init_response;
2047         }
2048 
2049         nxt_unit_buf_free(buf);
2050 
2051         req->response_buf = NULL;
2052         req->response = NULL;
2053         req->response_max_fields = 0;
2054 
2055         req_impl->state = NXT_UNIT_RS_START;
2056     }
2057 
2058     buf = nxt_unit_response_buf_alloc(req, buf_size);
2059     if (nxt_slow_path(buf == NULL)) {
2060         return NXT_UNIT_ERROR;
2061     }
2062 
2063 init_response:
2064 
2065     memset(buf->start, 0, sizeof(nxt_unit_response_t));
2066 
2067     req->response_buf = buf;
2068 
2069     req->response = (nxt_unit_response_t *) buf->start;
2070     req->response->status = status;
2071 
2072     buf->free = buf->start + sizeof(nxt_unit_response_t)
2073                 + max_fields_count * sizeof(nxt_unit_field_t);
2074 
2075     req->response_max_fields = max_fields_count;
2076     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2077 
2078     return NXT_UNIT_OK;
2079 }
2080 
2081 
2082 int
2083 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2084     uint32_t max_fields_count, uint32_t max_fields_size)
2085 {
2086     char                          *p;
2087     uint32_t                      i, buf_size;
2088     nxt_unit_buf_t                *buf;
2089     nxt_unit_field_t              *f, *src;
2090     nxt_unit_response_t           *resp;
2091     nxt_unit_request_info_impl_t  *req_impl;
2092 
2093     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2094 
2095     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2096         nxt_unit_req_warn(req, "realloc: response not init");
2097 
2098         return NXT_UNIT_ERROR;
2099     }
2100 
2101     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2102         nxt_unit_req_warn(req, "realloc: response already sent");
2103 
2104         return NXT_UNIT_ERROR;
2105     }
2106 
2107     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2108         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2109 
2110         return NXT_UNIT_ERROR;
2111     }
2112 
2113     /*
2114      * Each field name and value 0-terminated by libunit,
2115      * this is the reason of '+ 2' below.
2116      */
2117     buf_size = sizeof(nxt_unit_response_t)
2118                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2119                + max_fields_size;
2120 
2121     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2122 
2123     buf = nxt_unit_response_buf_alloc(req, buf_size);
2124     if (nxt_slow_path(buf == NULL)) {
2125         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2126         return NXT_UNIT_ERROR;
2127     }
2128 
2129     resp = (nxt_unit_response_t *) buf->start;
2130 
2131     memset(resp, 0, sizeof(nxt_unit_response_t));
2132 
2133     resp->status = req->response->status;
2134     resp->content_length = req->response->content_length;
2135 
2136     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2137     f = resp->fields;
2138 
2139     for (i = 0; i < req->response->fields_count; i++) {
2140         src = req->response->fields + i;
2141 
2142         if (nxt_slow_path(src->skip != 0)) {
2143             continue;
2144         }
2145 
2146         if (nxt_slow_path(src->name_length + src->value_length + 2
2147                           > (uint32_t) (buf->end - p)))
2148         {
2149             nxt_unit_req_warn(req, "realloc: not enough space for field"
2150                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2151                   i, src, src->name_length, src->value_length);
2152 
2153             goto fail;
2154         }
2155 
2156         nxt_unit_sptr_set(&f->name, p);
2157         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2158         *p++ = '\0';
2159 
2160         nxt_unit_sptr_set(&f->value, p);
2161         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2162         *p++ = '\0';
2163 
2164         f->hash = src->hash;
2165         f->skip = 0;
2166         f->name_length = src->name_length;
2167         f->value_length = src->value_length;
2168 
2169         resp->fields_count++;
2170         f++;
2171     }
2172 
2173     if (req->response->piggyback_content_length > 0) {
2174         if (nxt_slow_path(req->response->piggyback_content_length
2175                           > (uint32_t) (buf->end - p)))
2176         {
2177             nxt_unit_req_warn(req, "realloc: not enought space for content"
2178                   " #%"PRIu32", %"PRIu32" required",
2179                   i, req->response->piggyback_content_length);
2180 
2181             goto fail;
2182         }
2183 
2184         resp->piggyback_content_length =
2185                                        req->response->piggyback_content_length;
2186 
2187         nxt_unit_sptr_set(&resp->piggyback_content, p);
2188         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2189                        req->response->piggyback_content_length);
2190     }
2191 
2192     buf->free = p;
2193 
2194     nxt_unit_buf_free(req->response_buf);
2195 
2196     req->response = resp;
2197     req->response_buf = buf;
2198     req->response_max_fields = max_fields_count;
2199 
2200     return NXT_UNIT_OK;
2201 
2202 fail:
2203 
2204     nxt_unit_buf_free(buf);
2205 
2206     return NXT_UNIT_ERROR;
2207 }
2208 
2209 
2210 int
2211 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2212 {
2213     nxt_unit_request_info_impl_t  *req_impl;
2214 
2215     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2216 
2217     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2218 }
2219 
2220 
2221 int
2222 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2223     const char *name, uint8_t name_length,
2224     const char *value, uint32_t value_length)
2225 {
2226     nxt_unit_buf_t                *buf;
2227     nxt_unit_field_t              *f;
2228     nxt_unit_response_t           *resp;
2229     nxt_unit_request_info_impl_t  *req_impl;
2230 
2231     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2232 
2233     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2234         nxt_unit_req_warn(req, "add_field: response not initialized or "
2235                           "already sent");
2236 
2237         return NXT_UNIT_ERROR;
2238     }
2239 
2240     resp = req->response;
2241 
2242     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2243         nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2244                           (int) resp->fields_count);
2245 
2246         return NXT_UNIT_ERROR;
2247     }
2248 
2249     buf = req->response_buf;
2250 
2251     if (nxt_slow_path(name_length + value_length + 2
2252                       > (uint32_t) (buf->end - buf->free)))
2253     {
2254         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2255 
2256         return NXT_UNIT_ERROR;
2257     }
2258 
2259     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2260                        resp->fields_count,
2261                        (int) name_length, name,
2262                        (int) value_length, value);
2263 
2264     f = resp->fields + resp->fields_count;
2265 
2266     nxt_unit_sptr_set(&f->name, buf->free);
2267     buf->free = nxt_cpymem(buf->free, name, name_length);
2268     *buf->free++ = '\0';
2269 
2270     nxt_unit_sptr_set(&f->value, buf->free);
2271     buf->free = nxt_cpymem(buf->free, value, value_length);
2272     *buf->free++ = '\0';
2273 
2274     f->hash = nxt_unit_field_hash(name, name_length);
2275     f->skip = 0;
2276     f->name_length = name_length;
2277     f->value_length = value_length;
2278 
2279     resp->fields_count++;
2280 
2281     return NXT_UNIT_OK;
2282 }
2283 
2284 
2285 int
2286 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2287     const void* src, uint32_t size)
2288 {
2289     nxt_unit_buf_t                *buf;
2290     nxt_unit_response_t           *resp;
2291     nxt_unit_request_info_impl_t  *req_impl;
2292 
2293     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2294 
2295     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2296         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2297 
2298         return NXT_UNIT_ERROR;
2299     }
2300 
2301     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2302         nxt_unit_req_warn(req, "add_content: response already sent");
2303 
2304         return NXT_UNIT_ERROR;
2305     }
2306 
2307     buf = req->response_buf;
2308 
2309     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2310         nxt_unit_req_warn(req, "add_content: buffer overflow");
2311 
2312         return NXT_UNIT_ERROR;
2313     }
2314 
2315     resp = req->response;
2316 
2317     if (resp->piggyback_content_length == 0) {
2318         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2319         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2320     }
2321 
2322     resp->piggyback_content_length += size;
2323 
2324     buf->free = nxt_cpymem(buf->free, src, size);
2325 
2326     return NXT_UNIT_OK;
2327 }
2328 
2329 
2330 int
2331 nxt_unit_response_send(nxt_unit_request_info_t *req)
2332 {
2333     int                           rc;
2334     nxt_unit_mmap_buf_t           *mmap_buf;
2335     nxt_unit_request_info_impl_t  *req_impl;
2336 
2337     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2338 
2339     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2340         nxt_unit_req_warn(req, "send: response is not initialized yet");
2341 
2342         return NXT_UNIT_ERROR;
2343     }
2344 
2345     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2346         nxt_unit_req_warn(req, "send: response already sent");
2347 
2348         return NXT_UNIT_ERROR;
2349     }
2350 
2351     if (req->request->websocket_handshake && req->response->status == 101) {
2352         nxt_unit_response_upgrade(req);
2353     }
2354 
2355     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2356                        req->response->fields_count,
2357                        (int) (req->response_buf->free
2358                               - req->response_buf->start));
2359 
2360     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2361 
2362     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2363     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2364         req->response = NULL;
2365         req->response_buf = NULL;
2366         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2367 
2368         nxt_unit_mmap_buf_free(mmap_buf);
2369     }
2370 
2371     return rc;
2372 }
2373 
2374 
2375 int
2376 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2377 {
2378     nxt_unit_request_info_impl_t  *req_impl;
2379 
2380     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2381 
2382     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2383 }
2384 
2385 
2386 nxt_unit_buf_t *
2387 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2388 {
2389     int                           rc;
2390     nxt_unit_mmap_buf_t           *mmap_buf;
2391     nxt_unit_request_info_impl_t  *req_impl;
2392 
2393     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2394         nxt_unit_req_warn(req, "response_buf_alloc: "
2395                           "requested buffer (%"PRIu32") too big", size);
2396 
2397         return NULL;
2398     }
2399 
2400     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2401 
2402     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2403 
2404     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2405     if (nxt_slow_path(mmap_buf == NULL)) {
2406         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2407 
2408         return NULL;
2409     }
2410 
2411     mmap_buf->req = req;
2412 
2413     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2414 
2415     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2416                                    size, size, mmap_buf,
2417                                    NULL);
2418     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2419         nxt_unit_mmap_buf_release(mmap_buf);
2420 
2421         nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2422 
2423         return NULL;
2424     }
2425 
2426     return &mmap_buf->buf;
2427 }
2428 
2429 
2430 static nxt_unit_mmap_buf_t *
2431 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2432 {
2433     nxt_unit_mmap_buf_t  *mmap_buf;
2434     nxt_unit_ctx_impl_t  *ctx_impl;
2435 
2436     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2437 
2438     pthread_mutex_lock(&ctx_impl->mutex);
2439 
2440     if (ctx_impl->free_buf == NULL) {
2441         pthread_mutex_unlock(&ctx_impl->mutex);
2442 
2443         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2444         if (nxt_slow_path(mmap_buf == NULL)) {
2445             return NULL;
2446         }
2447 
2448     } else {
2449         mmap_buf = ctx_impl->free_buf;
2450 
2451         nxt_unit_mmap_buf_unlink(mmap_buf);
2452 
2453         pthread_mutex_unlock(&ctx_impl->mutex);
2454     }
2455 
2456     mmap_buf->ctx_impl = ctx_impl;
2457 
2458     mmap_buf->hdr = NULL;
2459     mmap_buf->free_ptr = NULL;
2460 
2461     return mmap_buf;
2462 }
2463 
2464 
2465 static void
2466 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2467 {
2468     nxt_unit_mmap_buf_unlink(mmap_buf);
2469 
2470     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2471 
2472     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2473 
2474     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2475 }
2476 
2477 
2478 int
2479 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2480 {
2481     return req->request->websocket_handshake;
2482 }
2483 
2484 
2485 int
2486 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2487 {
2488     int                           rc;
2489     nxt_unit_request_info_impl_t  *req_impl;
2490 
2491     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2492 
2493     if (nxt_slow_path(req_impl->websocket != 0)) {
2494         nxt_unit_req_debug(req, "upgrade: already upgraded");
2495 
2496         return NXT_UNIT_OK;
2497     }
2498 
2499     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2500         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2501 
2502         return NXT_UNIT_ERROR;
2503     }
2504 
2505     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2506         nxt_unit_req_warn(req, "upgrade: response already sent");
2507 
2508         return NXT_UNIT_ERROR;
2509     }
2510 
2511     rc = nxt_unit_request_hash_add(req->ctx, req);
2512     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2513         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2514 
2515         return NXT_UNIT_ERROR;
2516     }
2517 
2518     req_impl->websocket = 1;
2519 
2520     req->response->status = 101;
2521 
2522     return NXT_UNIT_OK;
2523 }
2524 
2525 
2526 int
2527 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2528 {
2529     nxt_unit_request_info_impl_t  *req_impl;
2530 
2531     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2532 
2533     return req_impl->websocket;
2534 }
2535 
2536 
2537 nxt_unit_request_info_t *
2538 nxt_unit_get_request_info_from_data(void *data)
2539 {
2540     nxt_unit_request_info_impl_t  *req_impl;
2541 
2542     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2543 
2544     return &req_impl->req;
2545 }
2546 
2547 
2548 int
2549 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2550 {
2551     int                           rc;
2552     nxt_unit_mmap_buf_t           *mmap_buf;
2553     nxt_unit_request_info_t       *req;
2554     nxt_unit_request_info_impl_t  *req_impl;
2555 
2556     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2557 
2558     req = mmap_buf->req;
2559     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2560 
2561     nxt_unit_req_debug(req, "buf_send: %d bytes",
2562                        (int) (buf->free - buf->start));
2563 
2564     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2565         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2566 
2567         return NXT_UNIT_ERROR;
2568     }
2569 
2570     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2571         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2572 
2573         return NXT_UNIT_ERROR;
2574     }
2575 
2576     if (nxt_fast_path(buf->free > buf->start)) {
2577         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2578         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2579             return rc;
2580         }
2581     }
2582 
2583     nxt_unit_mmap_buf_free(mmap_buf);
2584 
2585     return NXT_UNIT_OK;
2586 }
2587 
2588 
2589 static void
2590 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2591 {
2592     int                      rc;
2593     nxt_unit_mmap_buf_t      *mmap_buf;
2594     nxt_unit_request_info_t  *req;
2595 
2596     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2597 
2598     req = mmap_buf->req;
2599 
2600     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2601     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2602         nxt_unit_mmap_buf_free(mmap_buf);
2603 
2604         nxt_unit_request_info_release(req);
2605 
2606     } else {
2607         nxt_unit_request_done(req, rc);
2608     }
2609 }
2610 
2611 
2612 static int
2613 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2614     nxt_unit_mmap_buf_t *mmap_buf, int last)
2615 {
2616     struct {
2617         nxt_port_msg_t       msg;
2618         nxt_port_mmap_msg_t  mmap_msg;
2619     } m;
2620 
2621     int                           rc;
2622     u_char                        *last_used, *first_free;
2623     ssize_t                       res;
2624     nxt_chunk_id_t                first_free_chunk;
2625     nxt_unit_buf_t                *buf;
2626     nxt_unit_impl_t               *lib;
2627     nxt_port_mmap_header_t        *hdr;
2628     nxt_unit_request_info_impl_t  *req_impl;
2629 
2630     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2631     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2632 
2633     buf = &mmap_buf->buf;
2634     hdr = mmap_buf->hdr;
2635 
2636     m.mmap_msg.size = buf->free - buf->start;
2637 
2638     m.msg.stream = req_impl->stream;
2639     m.msg.pid = lib->pid;
2640     m.msg.reply_port = 0;
2641     m.msg.type = _NXT_PORT_MSG_DATA;
2642     m.msg.last = last != 0;
2643     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2644     m.msg.nf = 0;
2645     m.msg.mf = 0;
2646     m.msg.tracking = 0;
2647 
2648     rc = NXT_UNIT_ERROR;
2649 
2650     if (m.msg.mmap) {
2651         m.mmap_msg.mmap_id = hdr->id;
2652         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2653                                                      (u_char *) buf->start);
2654 
2655         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2656                        req_impl->stream,
2657                        (int) m.mmap_msg.mmap_id,
2658                        (int) m.mmap_msg.chunk_id,
2659                        (int) m.mmap_msg.size);
2660 
2661         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2662                                  NULL);
2663         if (nxt_slow_path(res != sizeof(m))) {
2664             goto free_buf;
2665         }
2666 
2667         last_used = (u_char *) buf->free - 1;
2668         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2669 
2670         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2671             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2672 
2673             buf->start = (char *) first_free;
2674             buf->free = buf->start;
2675 
2676             if (buf->end < buf->start) {
2677                 buf->end = buf->start;
2678             }
2679 
2680         } else {
2681             buf->start = NULL;
2682             buf->free = NULL;
2683             buf->end = NULL;
2684 
2685             mmap_buf->hdr = NULL;
2686         }
2687 
2688         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2689                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2690 
2691         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2692                        (int) lib->outgoing.allocated_chunks);
2693 
2694     } else {
2695         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2696                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2697         {
2698             nxt_unit_alert(req->ctx,
2699                            "#%"PRIu32": failed to send plain memory buffer"
2700                            ": no space reserved for message header",
2701                            req_impl->stream);
2702 
2703             goto free_buf;
2704         }
2705 
2706         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2707 
2708         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2709                        req_impl->stream,
2710                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2711 
2712         res = nxt_unit_port_send(req->ctx, req->response_port,
2713                                  buf->start - sizeof(m.msg),
2714                                  m.mmap_msg.size + sizeof(m.msg), NULL);
2715 
2716         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2717             goto free_buf;
2718         }
2719     }
2720 
2721     rc = NXT_UNIT_OK;
2722 
2723 free_buf:
2724 
2725     nxt_unit_free_outgoing_buf(mmap_buf);
2726 
2727     return rc;
2728 }
2729 
2730 
2731 void
2732 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2733 {
2734     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2735 }
2736 
2737 
2738 static void
2739 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2740 {
2741     nxt_unit_free_outgoing_buf(mmap_buf);
2742 
2743     nxt_unit_mmap_buf_release(mmap_buf);
2744 }
2745 
2746 
2747 static void
2748 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2749 {
2750     if (mmap_buf->hdr != NULL) {
2751         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2752                               mmap_buf->hdr, mmap_buf->buf.start,
2753                               mmap_buf->buf.end - mmap_buf->buf.start);
2754 
2755         mmap_buf->hdr = NULL;
2756 
2757         return;
2758     }
2759 
2760     if (mmap_buf->free_ptr != NULL) {
2761         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2762 
2763         mmap_buf->free_ptr = NULL;
2764     }
2765 }
2766 
2767 
2768 static nxt_unit_read_buf_t *
2769 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2770 {
2771     nxt_unit_ctx_impl_t  *ctx_impl;
2772     nxt_unit_read_buf_t  *rbuf;
2773 
2774     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2775 
2776     pthread_mutex_lock(&ctx_impl->mutex);
2777 
2778     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2779 
2780     pthread_mutex_unlock(&ctx_impl->mutex);
2781 
2782     rbuf->oob.size = 0;
2783 
2784     return rbuf;
2785 }
2786 
2787 
2788 static nxt_unit_read_buf_t *
2789 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2790 {
2791     nxt_queue_link_t     *link;
2792     nxt_unit_read_buf_t  *rbuf;
2793 
2794     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2795         link = nxt_queue_first(&ctx_impl->free_rbuf);
2796         nxt_queue_remove(link);
2797 
2798         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2799 
2800         return rbuf;
2801     }
2802 
2803     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2804 
2805     if (nxt_fast_path(rbuf != NULL)) {
2806         rbuf->ctx_impl = ctx_impl;
2807     }
2808 
2809     return rbuf;
2810 }
2811 
2812 
2813 static void
2814 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2815     nxt_unit_read_buf_t *rbuf)
2816 {
2817     nxt_unit_ctx_impl_t  *ctx_impl;
2818 
2819     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2820 
2821     pthread_mutex_lock(&ctx_impl->mutex);
2822 
2823     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2824 
2825     pthread_mutex_unlock(&ctx_impl->mutex);
2826 }
2827 
2828 
2829 nxt_unit_buf_t *
2830 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2831 {
2832     nxt_unit_mmap_buf_t  *mmap_buf;
2833 
2834     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2835 
2836     if (mmap_buf->next == NULL) {
2837         return NULL;
2838     }
2839 
2840     return &mmap_buf->next->buf;
2841 }
2842 
2843 
2844 uint32_t
2845 nxt_unit_buf_max(void)
2846 {
2847     return PORT_MMAP_DATA_SIZE;
2848 }
2849 
2850 
2851 uint32_t
2852 nxt_unit_buf_min(void)
2853 {
2854     return PORT_MMAP_CHUNK_SIZE;
2855 }
2856 
2857 
2858 int
2859 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2860     size_t size)
2861 {
2862     ssize_t  res;
2863 
2864     res = nxt_unit_response_write_nb(req, start, size, size);
2865 
2866     return res < 0 ? -res : NXT_UNIT_OK;
2867 }
2868 
2869 
2870 ssize_t
2871 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2872     size_t size, size_t min_size)
2873 {
2874     int                           rc;
2875     ssize_t                       sent;
2876     uint32_t                      part_size, min_part_size, buf_size;
2877     const char                    *part_start;
2878     nxt_unit_mmap_buf_t           mmap_buf;
2879     nxt_unit_request_info_impl_t  *req_impl;
2880     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2881 
2882     nxt_unit_req_debug(req, "write: %d", (int) size);
2883 
2884     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2885 
2886     part_start = start;
2887     sent = 0;
2888 
2889     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2890         nxt_unit_req_alert(req, "write: response not initialized yet");
2891 
2892         return -NXT_UNIT_ERROR;
2893     }
2894 
2895     /* Check if response is not send yet. */
2896     if (nxt_slow_path(req->response_buf != NULL)) {
2897         part_size = req->response_buf->end - req->response_buf->free;
2898         part_size = nxt_min(size, part_size);
2899 
2900         rc = nxt_unit_response_add_content(req, part_start, part_size);
2901         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2902             return -rc;
2903         }
2904 
2905         rc = nxt_unit_response_send(req);
2906         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2907             return -rc;
2908         }
2909 
2910         size -= part_size;
2911         part_start += part_size;
2912         sent += part_size;
2913 
2914         min_size -= nxt_min(min_size, part_size);
2915     }
2916 
2917     while (size > 0) {
2918         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2919         min_part_size = nxt_min(min_size, part_size);
2920         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2921 
2922         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2923                                        min_part_size, &mmap_buf, local_buf);
2924         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2925             return -rc;
2926         }
2927 
2928         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2929         if (nxt_slow_path(buf_size == 0)) {
2930             return sent;
2931         }
2932         part_size = nxt_min(buf_size, part_size);
2933 
2934         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2935                                        part_start, part_size);
2936 
2937         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2938         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2939             return -rc;
2940         }
2941 
2942         size -= part_size;
2943         part_start += part_size;
2944         sent += part_size;
2945 
2946         min_size -= nxt_min(min_size, part_size);
2947     }
2948 
2949     return sent;
2950 }
2951 
2952 
2953 int
2954 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2955     nxt_unit_read_info_t *read_info)
2956 {
2957     int                           rc;
2958     ssize_t                       n;
2959     uint32_t                      buf_size;
2960     nxt_unit_buf_t                *buf;
2961     nxt_unit_mmap_buf_t           mmap_buf;
2962     nxt_unit_request_info_impl_t  *req_impl;
2963     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2964 
2965     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2966 
2967     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2968         nxt_unit_req_alert(req, "write: response not initialized yet");
2969 
2970         return NXT_UNIT_ERROR;
2971     }
2972 
2973     /* Check if response is not send yet. */
2974     if (nxt_slow_path(req->response_buf != NULL)) {
2975 
2976         /* Enable content in headers buf. */
2977         rc = nxt_unit_response_add_content(req, "", 0);
2978         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2979             nxt_unit_req_error(req, "Failed to add piggyback content");
2980 
2981             return rc;
2982         }
2983 
2984         buf = req->response_buf;
2985 
2986         while (buf->end - buf->free > 0) {
2987             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2988             if (nxt_slow_path(n < 0)) {
2989                 nxt_unit_req_error(req, "Read error");
2990 
2991                 return NXT_UNIT_ERROR;
2992             }
2993 
2994             /* Manually increase sizes. */
2995             buf->free += n;
2996             req->response->piggyback_content_length += n;
2997 
2998             if (read_info->eof) {
2999                 break;
3000             }
3001         }
3002 
3003         rc = nxt_unit_response_send(req);
3004         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3005             nxt_unit_req_error(req, "Failed to send headers with content");
3006 
3007             return rc;
3008         }
3009 
3010         if (read_info->eof) {
3011             return NXT_UNIT_OK;
3012         }
3013     }
3014 
3015     while (!read_info->eof) {
3016         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3017                            read_info->buf_size);
3018 
3019         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3020 
3021         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3022                                        buf_size, buf_size,
3023                                        &mmap_buf, local_buf);
3024         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3025             return rc;
3026         }
3027 
3028         buf = &mmap_buf.buf;
3029 
3030         while (!read_info->eof && buf->end > buf->free) {
3031             n = read_info->read(read_info, buf->free, buf->end - buf->free);
3032             if (nxt_slow_path(n < 0)) {
3033                 nxt_unit_req_error(req, "Read error");
3034 
3035                 nxt_unit_free_outgoing_buf(&mmap_buf);
3036 
3037                 return NXT_UNIT_ERROR;
3038             }
3039 
3040             buf->free += n;
3041         }
3042 
3043         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3044         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3045             nxt_unit_req_error(req, "Failed to send content");
3046 
3047             return rc;
3048         }
3049     }
3050 
3051     return NXT_UNIT_OK;
3052 }
3053 
3054 
3055 ssize_t
3056 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3057 {
3058     ssize_t  buf_res, res;
3059 
3060     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3061                                 dst, size);
3062 
3063     if (buf_res < (ssize_t) size && req->content_fd != -1) {
3064         res = read(req->content_fd, dst, size);
3065         if (nxt_slow_path(res < 0)) {
3066             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3067                                strerror(errno), errno);
3068 
3069             return res;
3070         }
3071 
3072         if (res < (ssize_t) size) {
3073             nxt_unit_close(req->content_fd);
3074 
3075             req->content_fd = -1;
3076         }
3077 
3078         req->content_length -= res;
3079         size -= res;
3080 
3081         dst = nxt_pointer_to(dst, res);
3082 
3083     } else {
3084         res = 0;
3085     }
3086 
3087     return buf_res + res;
3088 }
3089 
3090 
3091 ssize_t
3092 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3093 {
3094     char                 *p;
3095     size_t               l_size, b_size;
3096     nxt_unit_buf_t       *b;
3097     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3098 
3099     if (req->content_length == 0) {
3100         return 0;
3101     }
3102 
3103     l_size = 0;
3104 
3105     b = req->content_buf;
3106 
3107     while (b != NULL) {
3108         b_size = b->end - b->free;
3109         p = memchr(b->free, '\n', b_size);
3110 
3111         if (p != NULL) {
3112             p++;
3113             l_size += p - b->free;
3114             break;
3115         }
3116 
3117         l_size += b_size;
3118 
3119         if (max_size <= l_size) {
3120             break;
3121         }
3122 
3123         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
3124         if (mmap_buf->next == NULL
3125             && req->content_fd != -1
3126             && l_size < req->content_length)
3127         {
3128             preread_buf = nxt_unit_request_preread(req, 16384);
3129             if (nxt_slow_path(preread_buf == NULL)) {
3130                 return -1;
3131             }
3132 
3133             nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
3134         }
3135 
3136         b = nxt_unit_buf_next(b);
3137     }
3138 
3139     return