xref: /unit/src/nxt_unit.c (revision 2078:0996dd223cdd)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "nxt_main.h"
7 #include "nxt_port_memory_int.h"
8 #include "nxt_socket_msg.h"
9 #include "nxt_port_queue.h"
10 #include "nxt_app_queue.h"
11 
12 #include "nxt_unit.h"
13 #include "nxt_unit_request.h"
14 #include "nxt_unit_response.h"
15 #include "nxt_unit_websocket.h"
16 
17 #include "nxt_websocket.h"
18 
19 #if (NXT_HAVE_MEMFD_CREATE)
20 #include <linux/memfd.h>
21 #endif
22 
23 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
24 #define NXT_UNIT_LOCAL_BUF_SIZE  \
25     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
26 
27 enum {
28     NXT_QUIT_NORMAL   = 0,
29     NXT_QUIT_GRACEFUL = 1,
30 };
31 
32 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
33 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
34 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
35 typedef struct nxt_unit_process_s               nxt_unit_process_t;
36 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
37 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
38 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
39 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
40 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
41 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
42 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
43 
44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
46     nxt_unit_ctx_impl_t *ctx_impl, void *data);
47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
52     nxt_unit_mmap_buf_t *mmap_buf);
53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
54     nxt_unit_mmap_buf_t *mmap_buf);
55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
57     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
58     int *shared_port_fd, int *shared_queue_fd,
59     int *log_fd, uint32_t *stream, uint32_t *shm_limit,
60     uint32_t *request_limit);
61 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
62     int queue_fd);
63 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
64     nxt_unit_request_info_t **preq);
65 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
66     nxt_unit_recv_msg_t *recv_msg);
67 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
68 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
69     nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
70 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
71     nxt_unit_recv_msg_t *recv_msg);
72 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
73     nxt_unit_port_id_t *port_id);
74 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
75 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
76     nxt_unit_recv_msg_t *recv_msg);
77 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
78 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
79     nxt_unit_ctx_t *ctx);
80 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
81 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
82 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
83     nxt_unit_ctx_t *ctx);
84 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
85 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
86     nxt_unit_websocket_frame_impl_t *ws);
87 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
88 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
89 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
90     nxt_unit_mmap_buf_t *mmap_buf, int last);
91 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
92 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
94 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
95     nxt_unit_ctx_impl_t *ctx_impl);
96 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
97     nxt_unit_read_buf_t *rbuf);
98 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
99     nxt_unit_request_info_t *req, size_t size);
100 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
101     size_t size);
102 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
103     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
104 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
105 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
106 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
107 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
108     nxt_unit_port_t *port, int n);
109 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
110 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
111     int fd);
112 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
113     nxt_unit_port_t *port, uint32_t size,
114     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
115 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
116 
117 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
118     nxt_unit_ctx_impl_t *ctx_impl);
119 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
120 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
121 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
122 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
123 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
124     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
125     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
126 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
127     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
128 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
129 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
130     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
131 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
132 
133 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
135     pid_t pid, int remove);
136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
137 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
138 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
139 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
140 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
141 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
142 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
145 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
146 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
147     nxt_unit_port_t *port);
148 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
149 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
150 
151 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
152     nxt_unit_port_t *port, int queue_fd);
153 
154 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
155 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
156 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
157     nxt_unit_port_t *port, void *queue);
158 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
159     nxt_queue_t *awaiting_req);
160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
161     nxt_unit_port_id_t *port_id);
162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
163     nxt_unit_port_id_t *port_id);
164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
166     nxt_unit_process_t *process);
167 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
168 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
169 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
170     nxt_unit_port_t *port, const void *buf, size_t buf_size,
171     const nxt_send_oob_t *oob);
172 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
173     const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
174 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
175     nxt_unit_read_buf_t *rbuf);
176 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
177     nxt_unit_read_buf_t *src);
178 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
179     nxt_unit_read_buf_t *rbuf);
180 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
181     nxt_unit_read_buf_t *rbuf);
182 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
183     nxt_unit_read_buf_t *rbuf);
184 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
185     nxt_unit_read_buf_t *rbuf);
186 nxt_inline int nxt_unit_close(int fd);
187 static int nxt_unit_fd_blocking(int fd);
188 
189 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
190     nxt_unit_port_t *port);
191 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
192     nxt_unit_port_id_t *port_id, int remove);
193 
194 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
195     nxt_unit_request_info_t *req);
196 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
197     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
198 
199 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
200 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
201 static void nxt_unit_lvlhsh_free(void *data, void *p);
202 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
203 
204 
205 struct nxt_unit_mmap_buf_s {
206     nxt_unit_buf_t           buf;
207 
208     nxt_unit_mmap_buf_t      *next;
209     nxt_unit_mmap_buf_t      **prev;
210 
211     nxt_port_mmap_header_t   *hdr;
212     nxt_unit_request_info_t  *req;
213     nxt_unit_ctx_impl_t      *ctx_impl;
214     char                     *free_ptr;
215     char                     *plain_ptr;
216 };
217 
218 
219 struct nxt_unit_recv_msg_s {
220     uint32_t                 stream;
221     nxt_pid_t                pid;
222     nxt_port_id_t            reply_port;
223 
224     uint8_t                  last;      /* 1 bit */
225     uint8_t                  mmap;      /* 1 bit */
226 
227     void                     *start;
228     uint32_t                 size;
229 
230     int                      fd[2];
231 
232     nxt_unit_mmap_buf_t      *incoming_buf;
233 };
234 
235 
236 typedef enum {
237     NXT_UNIT_RS_START           = 0,
238     NXT_UNIT_RS_RESPONSE_INIT,
239     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
240     NXT_UNIT_RS_RESPONSE_SENT,
241     NXT_UNIT_RS_RELEASED,
242 } nxt_unit_req_state_t;
243 
244 
245 struct nxt_unit_request_info_impl_s {
246     nxt_unit_request_info_t  req;
247 
248     uint32_t                 stream;
249 
250     nxt_unit_mmap_buf_t      *outgoing_buf;
251     nxt_unit_mmap_buf_t      *incoming_buf;
252 
253     nxt_unit_req_state_t     state;
254     uint8_t                  websocket;
255     uint8_t                  in_hash;
256 
257     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
258     nxt_queue_link_t         link;
259     /*  for nxt_unit_port_impl_t.awaiting_req */
260     nxt_queue_link_t         port_wait_link;
261 
262     char                     extra_data[];
263 };
264 
265 
266 struct nxt_unit_websocket_frame_impl_s {
267     nxt_unit_websocket_frame_t  ws;
268 
269     nxt_unit_mmap_buf_t         *buf;
270 
271     nxt_queue_link_t            link;
272 
273     nxt_unit_ctx_impl_t         *ctx_impl;
274 };
275 
276 
277 struct nxt_unit_read_buf_s {
278     nxt_queue_link_t              link;
279     nxt_unit_ctx_impl_t           *ctx_impl;
280     ssize_t                       size;
281     nxt_recv_oob_t                oob;
282     char                          buf[16384];
283 };
284 
285 
286 struct nxt_unit_ctx_impl_s {
287     nxt_unit_ctx_t                ctx;
288 
289     nxt_atomic_t                  use_count;
290     nxt_atomic_t                  wait_items;
291 
292     pthread_mutex_t               mutex;
293 
294     nxt_unit_port_t               *read_port;
295 
296     nxt_queue_link_t              link;
297 
298     nxt_unit_mmap_buf_t           *free_buf;
299 
300     /*  of nxt_unit_request_info_impl_t */
301     nxt_queue_t                   free_req;
302 
303     /*  of nxt_unit_websocket_frame_impl_t */
304     nxt_queue_t                   free_ws;
305 
306     /*  of nxt_unit_request_info_impl_t */
307     nxt_queue_t                   active_req;
308 
309     /*  of nxt_unit_request_info_impl_t */
310     nxt_lvlhsh_t                  requests;
311 
312     /*  of nxt_unit_request_info_impl_t */
313     nxt_queue_t                   ready_req;
314 
315     /*  of nxt_unit_read_buf_t */
316     nxt_queue_t                   pending_rbuf;
317 
318     /*  of nxt_unit_read_buf_t */
319     nxt_queue_t                   free_rbuf;
320 
321     uint8_t                       online;       /* 1 bit */
322     uint8_t                       ready;        /* 1 bit */
323     uint8_t                       quit_param;
324 
325     nxt_unit_mmap_buf_t           ctx_buf[2];
326     nxt_unit_read_buf_t           ctx_read_buf;
327 
328     nxt_unit_request_info_impl_t  req;
329 };
330 
331 
332 struct nxt_unit_mmap_s {
333     nxt_port_mmap_header_t   *hdr;
334     pthread_t                src_thread;
335 
336     /*  of nxt_unit_read_buf_t */
337     nxt_queue_t              awaiting_rbuf;
338 };
339 
340 
341 struct nxt_unit_mmaps_s {
342     pthread_mutex_t          mutex;
343     uint32_t                 size;
344     uint32_t                 cap;
345     nxt_atomic_t             allocated_chunks;
346     nxt_unit_mmap_t          *elts;
347 };
348 
349 
350 struct nxt_unit_impl_s {
351     nxt_unit_t               unit;
352     nxt_unit_callbacks_t     callbacks;
353 
354     nxt_atomic_t             use_count;
355     nxt_atomic_t             request_count;
356 
357     uint32_t                 request_data_size;
358     uint32_t                 shm_mmap_limit;
359     uint32_t                 request_limit;
360 
361     pthread_mutex_t          mutex;
362 
363     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
364     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
365 
366     nxt_unit_port_t          *router_port;
367     nxt_unit_port_t          *shared_port;
368 
369     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
370 
371     nxt_unit_mmaps_t         incoming;
372     nxt_unit_mmaps_t         outgoing;
373 
374     pid_t                    pid;
375     int                      log_fd;
376 
377     nxt_unit_ctx_impl_t      main_ctx;
378 };
379 
380 
381 struct nxt_unit_port_impl_s {
382     nxt_unit_port_t          port;
383 
384     nxt_atomic_t             use_count;
385 
386     /*  for nxt_unit_process_t.ports */
387     nxt_queue_link_t         link;
388     nxt_unit_process_t       *process;
389 
390     /*  of nxt_unit_request_info_impl_t */
391     nxt_queue_t              awaiting_req;
392 
393     int                      ready;
394 
395     void                     *queue;
396 
397     int                      from_socket;
398     nxt_unit_read_buf_t      *socket_rbuf;
399 };
400 
401 
402 struct nxt_unit_process_s {
403     pid_t                    pid;
404 
405     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
406 
407     nxt_unit_impl_t          *lib;
408 
409     nxt_atomic_t             use_count;
410 
411     uint32_t                 next_port_id;
412 };
413 
414 
415 /* Explicitly using 32 bit types to avoid possible alignment. */
416 typedef struct {
417     int32_t   pid;
418     uint32_t  id;
419 } nxt_unit_port_hash_id_t;
420 
421 
422 static pid_t  nxt_unit_pid;
423 
424 
425 nxt_unit_ctx_t *
426 nxt_unit_init(nxt_unit_init_t *init)
427 {
428     int              rc, queue_fd, shared_queue_fd;
429     void             *mem;
430     uint32_t         ready_stream, shm_limit, request_limit;
431     nxt_unit_ctx_t   *ctx;
432     nxt_unit_impl_t  *lib;
433     nxt_unit_port_t  ready_port, router_port, read_port, shared_port;
434 
435     nxt_unit_pid = getpid();
436 
437     lib = nxt_unit_create(init);
438     if (nxt_slow_path(lib == NULL)) {
439         return NULL;
440     }
441 
442     queue_fd = -1;
443     mem = MAP_FAILED;
444     shared_port.out_fd = -1;
445     shared_port.data = NULL;
446 
447     if (init->ready_port.id.pid != 0
448         && init->ready_stream != 0
449         && init->read_port.id.pid != 0)
450     {
451         ready_port = init->ready_port;
452         ready_stream = init->ready_stream;
453         router_port = init->router_port;
454         read_port = init->read_port;
455         lib->log_fd = init->log_fd;
456 
457         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
458                               ready_port.id.id);
459         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
460                               router_port.id.id);
461         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
462                               read_port.id.id);
463 
464         shared_port.in_fd = init->shared_port_fd;
465         shared_queue_fd = init->shared_queue_fd;
466 
467     } else {
468         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
469                                &shared_port.in_fd, &shared_queue_fd,
470                                &lib->log_fd, &ready_stream, &shm_limit,
471                                &request_limit);
472         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
473             goto fail;
474         }
475 
476         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
477                                 / PORT_MMAP_DATA_SIZE;
478         lib->request_limit = request_limit;
479     }
480 
481     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
482         lib->shm_mmap_limit = 1;
483     }
484 
485     lib->pid = read_port.id.pid;
486     nxt_unit_pid = lib->pid;
487 
488     ctx = &lib->main_ctx.ctx;
489 
490     rc = nxt_unit_fd_blocking(router_port.out_fd);
491     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
492         goto fail;
493     }
494 
495     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
496     if (nxt_slow_path(lib->router_port == NULL)) {
497         nxt_unit_alert(NULL, "failed to add router_port");
498 
499         goto fail;
500     }
501 
502     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
503     if (nxt_slow_path(queue_fd == -1)) {
504         goto fail;
505     }
506 
507     mem = mmap(NULL, sizeof(nxt_port_queue_t),
508                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
509     if (nxt_slow_path(mem == MAP_FAILED)) {
510         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
511                        strerror(errno), errno);
512 
513         goto fail;
514     }
515 
516     nxt_port_queue_init(mem);
517 
518     rc = nxt_unit_fd_blocking(read_port.in_fd);
519     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
520         goto fail;
521     }
522 
523     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
524     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
525         nxt_unit_alert(NULL, "failed to add read_port");
526 
527         goto fail;
528     }
529 
530     rc = nxt_unit_fd_blocking(ready_port.out_fd);
531     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
532         goto fail;
533     }
534 
535     nxt_unit_port_id_init(&shared_port.id, read_port.id.pid,
536                           NXT_UNIT_SHARED_PORT_ID);
537 
538     mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
539                MAP_SHARED, shared_queue_fd, 0);
540     if (nxt_slow_path(mem == MAP_FAILED)) {
541         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd,
542                        strerror(errno), errno);
543 
544         goto fail;
545     }
546 
547     nxt_unit_close(shared_queue_fd);
548 
549     lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem);
550     if (nxt_slow_path(lib->shared_port == NULL)) {
551         nxt_unit_alert(NULL, "failed to add shared_port");
552 
553         goto fail;
554     }
555 
556     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
557     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
558         nxt_unit_alert(NULL, "failed to send READY message");
559 
560         goto fail;
561     }
562 
563     nxt_unit_close(ready_port.out_fd);
564     nxt_unit_close(queue_fd);
565 
566     return ctx;
567 
568 fail:
569 
570     if (mem != MAP_FAILED) {
571         munmap(mem, sizeof(nxt_port_queue_t));
572     }
573 
574     if (queue_fd != -1) {
575         nxt_unit_close(queue_fd);
576     }
577 
578     nxt_unit_ctx_release(&lib->main_ctx.ctx);
579 
580     return NULL;
581 }
582 
583 
584 static nxt_unit_impl_t *
585 nxt_unit_create(nxt_unit_init_t *init)
586 {
587     int                   rc;
588     nxt_unit_impl_t       *lib;
589     nxt_unit_callbacks_t  *cb;
590 
591     lib = nxt_unit_malloc(NULL,
592                           sizeof(nxt_unit_impl_t) + init->request_data_size);
593     if (nxt_slow_path(lib == NULL)) {
594         nxt_unit_alert(NULL, "failed to allocate unit struct");
595 
596         return NULL;
597     }
598 
599     rc = pthread_mutex_init(&lib->mutex, NULL);
600     if (nxt_slow_path(rc != 0)) {
601         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
602 
603         goto fail;
604     }
605 
606     lib->unit.data = init->data;
607     lib->callbacks = init->callbacks;
608 
609     lib->request_data_size = init->request_data_size;
610     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
611                             / PORT_MMAP_DATA_SIZE;
612     lib->request_limit = init->request_limit;
613 
614     lib->processes.slot = NULL;
615     lib->ports.slot = NULL;
616 
617     lib->log_fd = STDERR_FILENO;
618 
619     nxt_queue_init(&lib->contexts);
620 
621     lib->use_count = 0;
622     lib->request_count = 0;
623     lib->router_port = NULL;
624     lib->shared_port = NULL;
625 
626     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
627     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
628         pthread_mutex_destroy(&lib->mutex);
629         goto fail;
630     }
631 
632     cb = &lib->callbacks;
633 
634     if (cb->request_handler == NULL) {
635         nxt_unit_alert(NULL, "request_handler is NULL");
636 
637         pthread_mutex_destroy(&lib->mutex);
638         goto fail;
639     }
640 
641     nxt_unit_mmaps_init(&lib->incoming);
642     nxt_unit_mmaps_init(&lib->outgoing);
643 
644     return lib;
645 
646 fail:
647 
648     nxt_unit_free(NULL, lib);
649 
650     return NULL;
651 }
652 
653 
654 static int
655 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
656     void *data)
657 {
658     int  rc;
659 
660     ctx_impl->ctx.data = data;
661     ctx_impl->ctx.unit = &lib->unit;
662 
663     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
664     if (nxt_slow_path(rc != 0)) {
665         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
666 
667         return NXT_UNIT_ERROR;
668     }
669 
670     nxt_unit_lib_use(lib);
671 
672     pthread_mutex_lock(&lib->mutex);
673 
674     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
675 
676     pthread_mutex_unlock(&lib->mutex);
677 
678     ctx_impl->use_count = 1;
679     ctx_impl->wait_items = 0;
680     ctx_impl->online = 1;
681     ctx_impl->ready = 0;
682     ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
683 
684     nxt_queue_init(&ctx_impl->free_req);
685     nxt_queue_init(&ctx_impl->free_ws);
686     nxt_queue_init(&ctx_impl->active_req);
687     nxt_queue_init(&ctx_impl->ready_req);
688     nxt_queue_init(&ctx_impl->pending_rbuf);
689     nxt_queue_init(&ctx_impl->free_rbuf);
690 
691     ctx_impl->free_buf = NULL;
692     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
693     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
694 
695     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
696     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
697 
698     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
699 
700     ctx_impl->req.req.ctx = &ctx_impl->ctx;
701     ctx_impl->req.req.unit = &lib->unit;
702 
703     ctx_impl->read_port = NULL;
704     ctx_impl->requests.slot = 0;
705 
706     return NXT_UNIT_OK;
707 }
708 
709 
710 nxt_inline void
711 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
712 {
713     nxt_unit_ctx_impl_t  *ctx_impl;
714 
715     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
716 
717     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
718 }
719 
720 
721 nxt_inline void
722 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
723 {
724     long                 c;
725     nxt_unit_ctx_impl_t  *ctx_impl;
726 
727     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
728 
729     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
730 
731     if (c == 1) {
732         nxt_unit_ctx_free(ctx_impl);
733     }
734 }
735 
736 
737 nxt_inline void
738 nxt_unit_lib_use(nxt_unit_impl_t *lib)
739 {
740     nxt_atomic_fetch_add(&lib->use_count, 1);
741 }
742 
743 
744 nxt_inline void
745 nxt_unit_lib_release(nxt_unit_impl_t *lib)
746 {
747     long                c;
748     nxt_unit_process_t  *process;
749 
750     c = nxt_atomic_fetch_add(&lib->use_count, -1);
751 
752     if (c == 1) {
753         for ( ;; ) {
754             pthread_mutex_lock(&lib->mutex);
755 
756             process = nxt_unit_process_pop_first(lib);
757             if (process == NULL) {
758                 pthread_mutex_unlock(&lib->mutex);
759 
760                 break;
761             }
762 
763             nxt_unit_remove_process(lib, process);
764         }
765 
766         pthread_mutex_destroy(&lib->mutex);
767 
768         if (nxt_fast_path(lib->router_port != NULL)) {
769             nxt_unit_port_release(lib->router_port);
770         }
771 
772         if (nxt_fast_path(lib->shared_port != NULL)) {
773             nxt_unit_port_release(lib->shared_port);
774         }
775 
776         nxt_unit_mmaps_destroy(&lib->incoming);
777         nxt_unit_mmaps_destroy(&lib->outgoing);
778 
779         nxt_unit_free(NULL, lib);
780     }
781 }
782 
783 
784 nxt_inline void
785 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
786     nxt_unit_mmap_buf_t *mmap_buf)
787 {
788     mmap_buf->next = *head;
789 
790     if (mmap_buf->next != NULL) {
791         mmap_buf->next->prev = &mmap_buf->next;
792     }
793 
794     *head = mmap_buf;
795     mmap_buf->prev = head;
796 }
797 
798 
799 nxt_inline void
800 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
801     nxt_unit_mmap_buf_t *mmap_buf)
802 {
803     while (*prev != NULL) {
804         prev = &(*prev)->next;
805     }
806 
807     nxt_unit_mmap_buf_insert(prev, mmap_buf);
808 }
809 
810 
811 nxt_inline void
812 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
813 {
814     nxt_unit_mmap_buf_t  **prev;
815 
816     prev = mmap_buf->prev;
817 
818     if (mmap_buf->next != NULL) {
819         mmap_buf->next->prev = prev;
820     }
821 
822     if (prev != NULL) {
823         *prev = mmap_buf->next;
824     }
825 }
826 
827 
828 static int
829 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
830     nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd,
831     int *log_fd, uint32_t *stream,
832     uint32_t *shm_limit, uint32_t *request_limit)
833 {
834     int       rc;
835     int       ready_fd, router_fd, read_in_fd, read_out_fd;
836     char      *unit_init, *version_end, *vars;
837     size_t    version_length;
838     int64_t   ready_pid, router_pid, read_pid;
839     uint32_t  ready_stream, router_id, ready_id, read_id;
840 
841     unit_init = getenv(NXT_UNIT_INIT_ENV);
842     if (nxt_slow_path(unit_init == NULL)) {
843         nxt_unit_alert(NULL, "%s is not in the current environment",
844                        NXT_UNIT_INIT_ENV);
845 
846         return NXT_UNIT_ERROR;
847     }
848 
849     version_end = strchr(unit_init, ';');
850     if (nxt_slow_path(version_end == NULL)) {
851         nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
852                        NXT_UNIT_INIT_ENV, unit_init);
853 
854         return NXT_UNIT_ERROR;
855     }
856 
857     version_length = version_end - unit_init;
858 
859     rc = version_length != nxt_length(NXT_VERSION)
860          || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
861 
862     if (nxt_slow_path(rc != 0)) {
863         nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
864                        "%.*s, while the app was compiled with libunit %s",
865                        (int) version_length, unit_init, NXT_VERSION);
866 
867         return NXT_UNIT_ERROR;
868     }
869 
870     vars = version_end + 1;
871 
872     rc = sscanf(vars,
873                 "%"PRIu32";"
874                 "%"PRId64",%"PRIu32",%d;"
875                 "%"PRId64",%"PRIu32",%d;"
876                 "%"PRId64",%"PRIu32",%d,%d;"
877                 "%d,%d;"
878                 "%d,%"PRIu32",%"PRIu32,
879                 &ready_stream,
880                 &ready_pid, &ready_id, &ready_fd,
881                 &router_pid, &router_id, &router_fd,
882                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
883                 shared_port_fd, shared_queue_fd,
884                 log_fd, shm_limit, request_limit);
885 
886     if (nxt_slow_path(rc == EOF)) {
887         nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
888                        vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
889 
890         return NXT_UNIT_ERROR;
891     }
892 
893     if (nxt_slow_path(rc != 16)) {
894         nxt_unit_alert(NULL, "invalid number of variables in %s env: "
895                        "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars);
896 
897         return NXT_UNIT_ERROR;
898     }
899 
900     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
901 
902     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
903 
904     ready_port->in_fd = -1;
905     ready_port->out_fd = ready_fd;
906     ready_port->data = NULL;
907 
908     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
909 
910     router_port->in_fd = -1;
911     router_port->out_fd = router_fd;
912     router_port->data = NULL;
913 
914     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
915 
916     read_port->in_fd = read_in_fd;
917     read_port->out_fd = read_out_fd;
918     read_port->data = NULL;
919 
920     *stream = ready_stream;
921 
922     return NXT_UNIT_OK;
923 }
924 
925 
926 static int
927 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
928 {
929     ssize_t          res;
930     nxt_send_oob_t   oob;
931     nxt_port_msg_t   msg;
932     nxt_unit_impl_t  *lib;
933     int              fds[2] = {queue_fd, -1};
934 
935     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
936 
937     msg.stream = stream;
938     msg.pid = lib->pid;
939     msg.reply_port = 0;
940     msg.type = _NXT_PORT_MSG_PROCESS_READY;
941     msg.last = 1;
942     msg.mmap = 0;
943     msg.nf = 0;
944     msg.mf = 0;
945     msg.tracking = 0;
946 
947     nxt_socket_msg_oob_init(&oob, fds);
948 
949     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
950     if (res != sizeof(msg)) {
951         return NXT_UNIT_ERROR;
952     }
953 
954     return NXT_UNIT_OK;
955 }
956 
957 
958 static int
959 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
960     nxt_unit_request_info_t **preq)
961 {
962     int                  rc;
963     pid_t                pid;
964     uint8_t              quit_param;
965     nxt_port_msg_t       *port_msg;
966     nxt_unit_impl_t      *lib;
967     nxt_unit_recv_msg_t  recv_msg;
968 
969     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
970 
971     recv_msg.incoming_buf = NULL;
972     recv_msg.fd[0] = -1;
973     recv_msg.fd[1] = -1;
974 
975     rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
976     if (nxt_slow_path(rc != NXT_OK)) {
977         nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
978         rc = NXT_UNIT_ERROR;
979         goto done;
980     }
981 
982     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
983         if (nxt_slow_path(rbuf->size == 0)) {
984             nxt_unit_debug(ctx, "read port closed");
985 
986             nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
987             rc = NXT_UNIT_OK;
988             goto done;
989         }
990 
991         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
992 
993         rc = NXT_UNIT_ERROR;
994         goto done;
995     }
996 
997     port_msg = (nxt_port_msg_t *) rbuf->buf;
998 
999     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
1000                    port_msg->stream, (int) port_msg->type,
1001                    recv_msg.fd[0], recv_msg.fd[1]);
1002 
1003     recv_msg.stream = port_msg->stream;
1004     recv_msg.pid = port_msg->pid;
1005     recv_msg.reply_port = port_msg->reply_port;
1006     recv_msg.last = port_msg->last;
1007     recv_msg.mmap = port_msg->mmap;
1008 
1009     recv_msg.start = port_msg + 1;
1010     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
1011 
1012     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
1013         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
1014                        port_msg->stream, (int) port_msg->type);
1015         rc = NXT_UNIT_ERROR;
1016         goto done;
1017     }
1018 
1019     /* Fragmentation is unsupported. */
1020     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
1021         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
1022                        port_msg->stream, (int) port_msg->type);
1023         rc = NXT_UNIT_ERROR;
1024         goto done;
1025     }
1026 
1027     if (port_msg->mmap) {
1028         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
1029 
1030         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1031             if (rc == NXT_UNIT_AGAIN) {
1032                 recv_msg.fd[0] = -1;
1033                 recv_msg.fd[1] = -1;
1034             }
1035 
1036             goto done;
1037         }
1038     }
1039 
1040     switch (port_msg->type) {
1041 
1042     case _NXT_PORT_MSG_RPC_READY:
1043         rc = NXT_UNIT_OK;
1044         break;
1045 
1046     case _NXT_PORT_MSG_QUIT:
1047         if (recv_msg.size == sizeof(quit_param)) {
1048             memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1049 
1050         } else {
1051             quit_param = NXT_QUIT_NORMAL;
1052         }
1053 
1054         nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1055                        (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1056 
1057         nxt_unit_quit(ctx, quit_param);
1058 
1059         rc = NXT_UNIT_OK;
1060         break;
1061 
1062     case _NXT_PORT_MSG_NEW_PORT:
1063         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1064         break;
1065 
1066     case _NXT_PORT_MSG_PORT_ACK:
1067         rc = nxt_unit_ctx_ready(ctx);
1068         break;
1069 
1070     case _NXT_PORT_MSG_CHANGE_FILE:
1071         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1072                        port_msg->stream, recv_msg.fd[0]);
1073 
1074         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1075             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1076                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1077                            strerror(errno), errno);
1078 
1079             rc = NXT_UNIT_ERROR;
1080             goto done;
1081         }
1082 
1083         rc = NXT_UNIT_OK;
1084         break;
1085 
1086     case _NXT_PORT_MSG_MMAP:
1087         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1088             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1089                            port_msg->stream, recv_msg.fd[0]);
1090 
1091             rc = NXT_UNIT_ERROR;
1092             goto done;
1093         }
1094 
1095         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1096         break;
1097 
1098     case _NXT_PORT_MSG_REQ_HEADERS:
1099         rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1100         break;
1101 
1102     case _NXT_PORT_MSG_REQ_BODY:
1103         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1104         break;
1105 
1106     case _NXT_PORT_MSG_WEBSOCKET:
1107         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1108         break;
1109 
1110     case _NXT_PORT_MSG_REMOVE_PID:
1111         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1112             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1113                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1114                            (int) sizeof(pid));
1115 
1116             rc = NXT_UNIT_ERROR;
1117             goto done;
1118         }
1119 
1120         memcpy(&pid, recv_msg.start, sizeof(pid));
1121 
1122         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1123                        port_msg->stream, (int) pid);
1124 
1125         nxt_unit_remove_pid(lib, pid);
1126 
1127         rc = NXT_UNIT_OK;
1128         break;
1129 
1130     case _NXT_PORT_MSG_SHM_ACK:
1131         rc = nxt_unit_process_shm_ack(ctx);
1132         break;
1133 
1134     default:
1135         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1136                        port_msg->stream, (int) port_msg->type);
1137 
1138         rc = NXT_UNIT_ERROR;
1139         goto done;
1140     }
1141 
1142 done:
1143 
1144     if (recv_msg.fd[0] != -1) {
1145         nxt_unit_close(recv_msg.fd[0]);
1146     }
1147 
1148     if (recv_msg.fd[1] != -1) {
1149         nxt_unit_close(recv_msg.fd[1]);
1150     }
1151 
1152     while (recv_msg.incoming_buf != NULL) {
1153         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1154     }
1155 
1156     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1157 #if (NXT_DEBUG)
1158         memset(rbuf->buf, 0xAC, rbuf->size);
1159 #endif
1160         nxt_unit_read_buf_release(ctx, rbuf);
1161     }
1162 
1163     return rc;
1164 }
1165 
1166 
1167 static int
1168 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1169 {
1170     void                     *mem;
1171     nxt_unit_port_t          new_port, *port;
1172     nxt_port_msg_new_port_t  *new_port_msg;
1173 
1174     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1175         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1176                       "invalid message size (%d)",
1177                       recv_msg->stream, (int) recv_msg->size);
1178 
1179         return NXT_UNIT_ERROR;
1180     }
1181 
1182     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1183         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1184                        recv_msg->stream, recv_msg->fd[0]);
1185 
1186         return NXT_UNIT_ERROR;
1187     }
1188 
1189     new_port_msg = recv_msg->start;
1190 
1191     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1192                    recv_msg->stream, (int) new_port_msg->pid,
1193                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1194 
1195     if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) {
1196         return NXT_UNIT_ERROR;
1197     }
1198 
1199     nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id);
1200 
1201     new_port.in_fd = -1;
1202     new_port.out_fd = recv_msg->fd[0];
1203 
1204     mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1205                MAP_SHARED, recv_msg->fd[1], 0);
1206 
1207     if (nxt_slow_path(mem == MAP_FAILED)) {
1208         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1209                        strerror(errno), errno);
1210 
1211         return NXT_UNIT_ERROR;
1212     }
1213 
1214     new_port.data = NULL;
1215 
1216     recv_msg->fd[0] = -1;
1217 
1218     port = nxt_unit_add_port(ctx, &new_port, mem);
1219     if (nxt_slow_path(port == NULL)) {
1220         return NXT_UNIT_ERROR;
1221     }
1222 
1223     nxt_unit_port_release(port);
1224 
1225     return NXT_UNIT_OK;
1226 }
1227 
1228 
1229 static int
1230 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1231 {
1232     nxt_unit_impl_t      *lib;
1233     nxt_unit_ctx_impl_t  *ctx_impl;
1234 
1235     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1236 
1237     if (nxt_slow_path(ctx_impl->ready)) {
1238         return NXT_UNIT_OK;
1239     }
1240 
1241     ctx_impl->ready = 1;
1242 
1243     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1244 
1245     /* Call ready_handler() only for main context. */
1246     if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1247         return lib->callbacks.ready_handler(ctx);
1248     }
1249 
1250     if (&lib->main_ctx != ctx_impl) {
1251         /* Check if the main context is already stopped or quit. */
1252         if (nxt_slow_path(!lib->main_ctx.ready)) {
1253             ctx_impl->ready = 0;
1254 
1255             nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1256 
1257             return NXT_UNIT_OK;
1258         }
1259 
1260         if (lib->callbacks.add_port != NULL) {
1261             lib->callbacks.add_port(ctx, lib->shared_port);
1262         }
1263     }
1264 
1265     return NXT_UNIT_OK;
1266 }
1267 
1268 
1269 static int
1270 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1271     nxt_unit_request_info_t **preq)
1272 {
1273     int                           res;
1274     nxt_unit_impl_t               *lib;
1275     nxt_unit_port_id_t            port_id;
1276     nxt_unit_request_t            *r;
1277     nxt_unit_mmap_buf_t           *b;
1278     nxt_unit_request_info_t       *req;
1279     nxt_unit_request_info_impl_t  *req_impl;
1280 
1281     if (nxt_slow_path(recv_msg->mmap == 0)) {
1282         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1283                       recv_msg->stream);
1284 
1285         return NXT_UNIT_ERROR;
1286     }
1287 
1288     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1289         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1290                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1291                       (int) sizeof(nxt_unit_request_t));
1292 
1293         return NXT_UNIT_ERROR;
1294     }
1295 
1296     req_impl = nxt_unit_request_info_get(ctx);
1297     if (nxt_slow_path(req_impl == NULL)) {
1298         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1299                       recv_msg->stream);
1300 
1301         return NXT_UNIT_ERROR;
1302     }
1303 
1304     req = &req_impl->req;
1305 
1306     req->request = recv_msg->start;
1307 
1308     b = recv_msg->incoming_buf;
1309 
1310     req->request_buf = &b->buf;
1311     req->response = NULL;
1312     req->response_buf = NULL;
1313 
1314     r = req->request;
1315 
1316     req->content_length = r->content_length;
1317 
1318     req->content_buf = req->request_buf;
1319     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1320 
1321     req_impl->stream = recv_msg->stream;
1322 
1323     req_impl->outgoing_buf = NULL;
1324 
1325     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1326         b->req = req;
1327     }
1328 
1329     /* "Move" incoming buffer list to req_impl. */
1330     req_impl->incoming_buf = recv_msg->incoming_buf;
1331     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1332     recv_msg->incoming_buf = NULL;
1333 
1334     req->content_fd = recv_msg->fd[0];
1335     recv_msg->fd[0] = -1;
1336 
1337     req->response_max_fields = 0;
1338     req_impl->state = NXT_UNIT_RS_START;
1339     req_impl->websocket = 0;
1340     req_impl->in_hash = 0;
1341 
1342     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1343                    (int) r->method_length,
1344                    (char *) nxt_unit_sptr_get(&r->method),
1345                    (int) r->target_length,
1346                    (char *) nxt_unit_sptr_get(&r->target),
1347                    (int) r->content_length);
1348 
1349     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1350 
1351     res = nxt_unit_request_check_response_port(req, &port_id);
1352     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1353         return NXT_UNIT_ERROR;
1354     }
1355 
1356     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1357         res = nxt_unit_send_req_headers_ack(req);
1358         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1359             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1360 
1361             return NXT_UNIT_ERROR;
1362         }
1363 
1364         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1365 
1366         if (req->content_length
1367             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1368         {
1369             res = nxt_unit_request_hash_add(ctx, req);
1370             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1371                 nxt_unit_req_warn(req, "failed to add request to hash");
1372 
1373                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1374 
1375                 return NXT_UNIT_ERROR;
1376             }
1377 
1378             /*
1379              * If application have separate data handler, we may start
1380              * request processing and process data when it is arrived.
1381              */
1382             if (lib->callbacks.data_handler == NULL) {
1383                 return NXT_UNIT_OK;
1384             }
1385         }
1386 
1387         if (preq == NULL) {
1388             lib->callbacks.request_handler(req);
1389 
1390         } else {
1391             *preq = req;
1392         }
1393     }
1394 
1395     return NXT_UNIT_OK;
1396 }
1397 
1398 
1399 static int
1400 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1401 {
1402     uint64_t                 l;
1403     nxt_unit_impl_t          *lib;
1404     nxt_unit_mmap_buf_t      *b;
1405     nxt_unit_request_info_t  *req;
1406 
1407     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1408     if (req == NULL) {
1409         return NXT_UNIT_OK;
1410     }
1411 
1412     l = req->content_buf->end - req->content_buf->free;
1413 
1414     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1415         b->req = req;
1416         l += b->buf.end - b->buf.free;
1417     }
1418 
1419     if (recv_msg->incoming_buf != NULL) {
1420         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1421 
1422         while (b->next != NULL) {
1423             b = b->next;
1424         }
1425 
1426         /* "Move" incoming buffer list to req_impl. */
1427         b->next = recv_msg->incoming_buf;
1428         b->next->prev = &b->next;
1429 
1430         recv_msg->incoming_buf = NULL;
1431     }
1432 
1433     req->content_fd = recv_msg->fd[0];
1434     recv_msg->fd[0] = -1;
1435 
1436     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1437 
1438     if (lib->callbacks.data_handler != NULL) {
1439         lib->callbacks.data_handler(req);
1440 
1441         return NXT_UNIT_OK;
1442     }
1443 
1444     if (req->content_fd != -1 || l == req->content_length) {
1445         lib->callbacks.request_handler(req);
1446     }
1447 
1448     return NXT_UNIT_OK;
1449 }
1450 
1451 
1452 static int
1453 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1454     nxt_unit_port_id_t *port_id)
1455 {
1456     int                           res;
1457     nxt_unit_ctx_t                *ctx;
1458     nxt_unit_impl_t               *lib;
1459     nxt_unit_port_t               *port;
1460     nxt_unit_process_t            *process;
1461     nxt_unit_ctx_impl_t           *ctx_impl;
1462     nxt_unit_port_impl_t          *port_impl;
1463     nxt_unit_request_info_impl_t  *req_impl;
1464 
1465     ctx = req->ctx;
1466     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1467     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1468 
1469     pthread_mutex_lock(&lib->mutex);
1470 
1471     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1472     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1473 
1474     if (nxt_fast_path(port != NULL)) {
1475         req->response_port = port;
1476 
1477         if (nxt_fast_path(port_impl->ready)) {
1478             pthread_mutex_unlock(&lib->mutex);
1479 
1480             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1481                            (int) port->id.pid, (int) port->id.id);
1482 
1483             return NXT_UNIT_OK;
1484         }
1485 
1486         nxt_unit_debug(ctx, "check_response_port: "
1487                        "port{%d,%d} already requested",
1488                        (int) port->id.pid, (int) port->id.id);
1489 
1490         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1491 
1492         nxt_queue_insert_tail(&port_impl->awaiting_req,
1493                               &req_impl->port_wait_link);
1494 
1495         pthread_mutex_unlock(&lib->mutex);
1496 
1497         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1498 
1499         return NXT_UNIT_AGAIN;
1500     }
1501 
1502     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1503     if (nxt_slow_path(port_impl == NULL)) {
1504         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1505                        (int) sizeof(nxt_unit_port_impl_t));
1506 
1507         pthread_mutex_unlock(&lib->mutex);
1508 
1509         return NXT_UNIT_ERROR;
1510     }
1511 
1512     port = &port_impl->port;
1513 
1514     port->id = *port_id;
1515     port->in_fd = -1;
1516     port->out_fd = -1;
1517     port->data = NULL;
1518 
1519     res = nxt_unit_port_hash_add(&lib->ports, port);
1520     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1521         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1522                        port->id.pid, port->id.id);
1523 
1524         pthread_mutex_unlock(&lib->mutex);
1525 
1526         nxt_unit_free(ctx, port);
1527 
1528         return NXT_UNIT_ERROR;
1529     }
1530 
1531     process = nxt_unit_process_find(lib, port_id->pid, 0);
1532     if (nxt_slow_path(process == NULL)) {
1533         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1534                        port->id.pid);
1535 
1536         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1537 
1538         pthread_mutex_unlock(&lib->mutex);
1539 
1540         nxt_unit_free(ctx, port);
1541 
1542         return NXT_UNIT_ERROR;
1543     }
1544 
1545     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1546 
1547     port_impl->process = process;
1548     port_impl->queue = NULL;
1549     port_impl->from_socket = 0;
1550     port_impl->socket_rbuf = NULL;
1551 
1552     nxt_queue_init(&port_impl->awaiting_req);
1553 
1554     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1555 
1556     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1557 
1558     port_impl->use_count = 2;
1559     port_impl->ready = 0;
1560 
1561     req->response_port = port;
1562 
1563     pthread_mutex_unlock(&lib->mutex);
1564 
1565     res = nxt_unit_get_port(ctx, port_id);
1566     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1567         return NXT_UNIT_ERROR;
1568     }
1569 
1570     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1571 
1572     return NXT_UNIT_AGAIN;
1573 }
1574 
1575 
1576 static int
1577 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1578 {
1579     ssize_t                       res;
1580     nxt_port_msg_t                msg;
1581     nxt_unit_impl_t               *lib;
1582     nxt_unit_ctx_impl_t           *ctx_impl;
1583     nxt_unit_request_info_impl_t  *req_impl;
1584 
1585     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1586     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1587     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1588 
1589     memset(&msg, 0, sizeof(nxt_port_msg_t));
1590 
1591     msg.stream = req_impl->stream;
1592     msg.pid = lib->pid;
1593     msg.reply_port = ctx_impl->read_port->id.id;
1594     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1595 
1596     res = nxt_unit_port_send(req->ctx, req->response_port,
1597                              &msg, sizeof(msg), NULL);
1598     if (nxt_slow_path(res != sizeof(msg))) {
1599         return NXT_UNIT_ERROR;
1600     }
1601 
1602     return NXT_UNIT_OK;
1603 }
1604 
1605 
1606 static int
1607 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1608 {
1609     size_t                           hsize;
1610     nxt_unit_impl_t                  *lib;
1611     nxt_unit_mmap_buf_t              *b;
1612     nxt_unit_callbacks_t             *cb;
1613     nxt_unit_request_info_t          *req;
1614     nxt_unit_request_info_impl_t     *req_impl;
1615     nxt_unit_websocket_frame_impl_t  *ws_impl;
1616 
1617     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1618     if (nxt_slow_path(req == NULL)) {
1619         return NXT_UNIT_OK;
1620     }
1621 
1622     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1623 
1624     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1625     cb = &lib->callbacks;
1626 
1627     if (cb->websocket_handler && recv_msg->size >= 2) {
1628         ws_impl = nxt_unit_websocket_frame_get(ctx);
1629         if (nxt_slow_path(ws_impl == NULL)) {
1630             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1631                           req_impl->stream);
1632 
1633             return NXT_UNIT_ERROR;
1634         }
1635 
1636         ws_impl->ws.req = req;
1637 
1638         ws_impl->buf = NULL;
1639 
1640         if (recv_msg->mmap) {
1641             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1642                 b->req = req;
1643             }
1644 
1645             /* "Move" incoming buffer list to ws_impl. */
1646             ws_impl->buf = recv_msg->incoming_buf;
1647             ws_impl->buf->prev = &ws_impl->buf;
1648             recv_msg->incoming_buf = NULL;
1649 
1650             b = ws_impl->buf;
1651 
1652         } else {
1653             b = nxt_unit_mmap_buf_get(ctx);
1654             if (nxt_slow_path(b == NULL)) {
1655                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1656                                req_impl->stream);
1657 
1658                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1659 
1660                 return NXT_UNIT_ERROR;
1661             }
1662 
1663             b->req = req;
1664             b->buf.start = recv_msg->start;
1665             b->buf.free = b->buf.start;
1666             b->buf.end = b->buf.start + recv_msg->size;
1667 
1668             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1669         }
1670 
1671         ws_impl->ws.header = (void *) b->buf.start;
1672         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1673             ws_impl->ws.header);
1674 
1675         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1676 
1677         if (ws_impl->ws.header->mask) {
1678             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1679 
1680         } else {
1681             ws_impl->ws.mask = NULL;
1682         }
1683 
1684         b->buf.free += hsize;
1685 
1686         ws_impl->ws.content_buf = &b->buf;
1687         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1688 
1689         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1690                            "payload_len=%"PRIu64,
1691                             ws_impl->ws.header->opcode,
1692                             ws_impl->ws.payload_len);
1693 
1694         cb->websocket_handler(&ws_impl->ws);
1695     }
1696 
1697     if (recv_msg->last) {
1698         if (cb->close_handler) {
1699             nxt_unit_req_debug(req, "close_handler");
1700 
1701             cb->close_handler(req);
1702 
1703         } else {
1704             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1705         }
1706     }
1707 
1708     return NXT_UNIT_OK;
1709 }
1710 
1711 
1712 static int
1713 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1714 {
1715     nxt_unit_impl_t       *lib;
1716     nxt_unit_callbacks_t  *cb;
1717 
1718     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1719     cb = &lib->callbacks;
1720 
1721     if (cb->shm_ack_handler != NULL) {
1722         cb->shm_ack_handler(ctx);
1723     }
1724 
1725     return NXT_UNIT_OK;
1726 }
1727 
1728 
1729 static nxt_unit_request_info_impl_t *
1730 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1731 {
1732     nxt_unit_impl_t               *lib;
1733     nxt_queue_link_t              *lnk;
1734     nxt_unit_ctx_impl_t           *ctx_impl;
1735     nxt_unit_request_info_impl_t  *req_impl;
1736 
1737     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1738 
1739     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1740 
1741     pthread_mutex_lock(&ctx_impl->mutex);
1742 
1743     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1744         pthread_mutex_unlock(&ctx_impl->mutex);
1745 
1746         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1747                                         + lib->request_data_size);
1748         if (nxt_slow_path(req_impl == NULL)) {
1749             return NULL;
1750         }
1751 
1752         req_impl->req.unit = ctx->unit;
1753         req_impl->req.ctx = ctx;
1754 
1755         pthread_mutex_lock(&ctx_impl->mutex);
1756 
1757     } else {
1758         lnk = nxt_queue_first(&ctx_impl->free_req);
1759         nxt_queue_remove(lnk);
1760 
1761         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1762     }
1763 
1764     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1765 
1766     pthread_mutex_unlock(&ctx_impl->mutex);
1767 
1768     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1769 
1770     return req_impl;
1771 }
1772 
1773 
1774 static void
1775 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1776 {
1777     nxt_unit_ctx_t                *ctx;
1778     nxt_unit_ctx_impl_t           *ctx_impl;
1779     nxt_unit_request_info_impl_t  *req_impl;
1780 
1781     ctx = req->ctx;
1782     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1783     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1784 
1785     req->response = NULL;
1786     req->response_buf = NULL;
1787 
1788     if (req_impl->in_hash) {
1789         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1790     }
1791 
1792     while (req_impl->outgoing_buf != NULL) {
1793         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1794     }
1795 
1796     while (req_impl->incoming_buf != NULL) {
1797         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1798     }
1799 
1800     if (req->content_fd != -1) {
1801         nxt_unit_close(req->content_fd);
1802 
1803         req->content_fd = -1;
1804     }
1805 
1806     if (req->response_port != NULL) {
1807         nxt_unit_port_release(req->response_port);
1808 
1809         req->response_port = NULL;
1810     }
1811 
1812     req_impl->state = NXT_UNIT_RS_RELEASED;
1813 
1814     pthread_mutex_lock(&ctx_impl->mutex);
1815 
1816     nxt_queue_remove(&req_impl->link);
1817 
1818     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1819 
1820     pthread_mutex_unlock(&ctx_impl->mutex);
1821 
1822     if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1823         nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1824     }
1825 }
1826 
1827 
1828 static void
1829 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1830 {
1831     nxt_unit_ctx_impl_t  *ctx_impl;
1832 
1833     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1834 
1835     nxt_queue_remove(&req_impl->link);
1836 
1837     if (req_impl != &ctx_impl->req) {
1838         nxt_unit_free(&ctx_impl->ctx, req_impl);
1839     }
1840 }
1841 
1842 
1843 static nxt_unit_websocket_frame_impl_t *
1844 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1845 {
1846     nxt_queue_link_t                 *lnk;
1847     nxt_unit_ctx_impl_t              *ctx_impl;
1848     nxt_unit_websocket_frame_impl_t  *ws_impl;
1849 
1850     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1851 
1852     pthread_mutex_lock(&ctx_impl->mutex);
1853 
1854     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1855         pthread_mutex_unlock(&ctx_impl->mutex);
1856 
1857         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1858         if (nxt_slow_path(ws_impl == NULL)) {
1859             return NULL;
1860         }
1861 
1862     } else {
1863         lnk = nxt_queue_first(&ctx_impl->free_ws);
1864         nxt_queue_remove(lnk);
1865 
1866         pthread_mutex_unlock(&ctx_impl->mutex);
1867 
1868         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1869     }
1870 
1871     ws_impl->ctx_impl = ctx_impl;
1872 
1873     return ws_impl;
1874 }
1875 
1876 
1877 static void
1878 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1879 {
1880     nxt_unit_websocket_frame_impl_t  *ws_impl;
1881 
1882     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1883 
1884     while (ws_impl->buf != NULL) {
1885         nxt_unit_mmap_buf_free(ws_impl->buf);
1886     }
1887 
1888     ws->req = NULL;
1889 
1890     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1891 
1892     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1893 
1894     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1895 }
1896 
1897 
1898 static void
1899 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1900     nxt_unit_websocket_frame_impl_t *ws_impl)
1901 {
1902     nxt_queue_remove(&ws_impl->link);
1903 
1904     nxt_unit_free(ctx, ws_impl);
1905 }
1906 
1907 
1908 uint16_t
1909 nxt_unit_field_hash(const char *name, size_t name_length)
1910 {
1911     u_char      ch;
1912     uint32_t    hash;
1913     const char  *p, *end;
1914 
1915     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1916     end = name + name_length;
1917 
1918     for (p = name; p < end; p++) {
1919         ch = *p;
1920         hash = (hash << 4) + hash + nxt_lowcase(ch);
1921     }
1922 
1923     hash = (hash >> 16) ^ hash;
1924 
1925     return hash;
1926 }
1927 
1928 
1929 void
1930 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1931 {
1932     char                *name;
1933     uint32_t            i, j;
1934     nxt_unit_field_t    *fields, f;
1935     nxt_unit_request_t  *r;
1936 
1937     static nxt_str_t  content_length = nxt_string("content-length");
1938     static nxt_str_t  content_type = nxt_string("content-type");
1939     static nxt_str_t  cookie = nxt_string("cookie");
1940 
1941     nxt_unit_req_debug(req, "group_dup_fields");
1942 
1943     r = req->request;
1944     fields = r->fields;
1945 
1946     for (i = 0; i < r->fields_count; i++) {
1947         name = nxt_unit_sptr_get(&fields[i].name);
1948 
1949         switch (fields[i].hash) {
1950         case NXT_UNIT_HASH_CONTENT_LENGTH:
1951             if (fields[i].name_length == content_length.length
1952                 && nxt_unit_memcasecmp(name, content_length.start,
1953                                        content_length.length) == 0)
1954             {
1955                 r->content_length_field = i;
1956             }
1957 
1958             break;
1959 
1960         case NXT_UNIT_HASH_CONTENT_TYPE:
1961             if (fields[i].name_length == content_type.length
1962                 && nxt_unit_memcasecmp(name, content_type.start,
1963                                        content_type.length) == 0)
1964             {
1965                 r->content_type_field = i;
1966             }
1967 
1968             break;
1969 
1970         case NXT_UNIT_HASH_COOKIE:
1971             if (fields[i].name_length == cookie.length
1972                 && nxt_unit_memcasecmp(name, cookie.start,
1973                                        cookie.length) == 0)
1974             {
1975                 r->cookie_field = i;
1976             }
1977 
1978             break;
1979         }
1980 
1981         for (j = i + 1; j < r->fields_count; j++) {
1982             if (fields[i].hash != fields[j].hash
1983                 || fields[i].name_length != fields[j].name_length
1984                 || nxt_unit_memcasecmp(name,
1985                                        nxt_unit_sptr_get(&fields[j].name),
1986                                        fields[j].name_length) != 0)
1987             {
1988                 continue;
1989             }
1990 
1991             f = fields[j];
1992             f.value.offset += (j - (i + 1)) * sizeof(f);
1993 
1994             while (j > i + 1) {
1995                 fields[j] = fields[j - 1];
1996                 fields[j].name.offset -= sizeof(f);
1997                 fields[j].value.offset -= sizeof(f);
1998                 j--;
1999             }
2000 
2001             fields[j] = f;
2002 
2003             /* Assign the same name pointer for further grouping simplicity. */
2004             nxt_unit_sptr_set(&fields[j].name, name);
2005 
2006             i++;
2007         }
2008     }
2009 }
2010 
2011 
2012 int
2013 nxt_unit_response_init(nxt_unit_request_info_t *req,
2014     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2015 {
2016     uint32_t                      buf_size;
2017     nxt_unit_buf_t                *buf;
2018     nxt_unit_request_info_impl_t  *req_impl;
2019 
2020     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2021 
2022     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2023         nxt_unit_req_warn(req, "init: response already sent");
2024 
2025         return NXT_UNIT_ERROR;
2026     }
2027 
2028     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2029                        (int) max_fields_count, (int) max_fields_size);
2030 
2031     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2032         nxt_unit_req_debug(req, "duplicate response init");
2033     }
2034 
2035     /*
2036      * Each field name and value 0-terminated by libunit,
2037      * this is the reason of '+ 2' below.
2038      */
2039     buf_size = sizeof(nxt_unit_response_t)
2040                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2041                + max_fields_size;
2042 
2043     if (nxt_slow_path(req->response_buf != NULL)) {
2044         buf = req->response_buf;
2045 
2046         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2047             goto init_response;
2048         }
2049 
2050         nxt_unit_buf_free(buf);
2051 
2052         req->response_buf = NULL;
2053         req->response = NULL;
2054         req->response_max_fields = 0;
2055 
2056         req_impl->state = NXT_UNIT_RS_START;
2057     }
2058 
2059     buf = nxt_unit_response_buf_alloc(req, buf_size);
2060     if (nxt_slow_path(buf == NULL)) {
2061         return NXT_UNIT_ERROR;
2062     }
2063 
2064 init_response:
2065 
2066     memset(buf->start, 0, sizeof(nxt_unit_response_t));
2067 
2068     req->response_buf = buf;
2069 
2070     req->response = (nxt_unit_response_t *) buf->start;
2071     req->response->status = status;
2072 
2073     buf->free = buf->start + sizeof(nxt_unit_response_t)
2074                 + max_fields_count * sizeof(nxt_unit_field_t);
2075 
2076     req->response_max_fields = max_fields_count;
2077     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2078 
2079     return NXT_UNIT_OK;
2080 }
2081 
2082 
2083 int
2084 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2085     uint32_t max_fields_count, uint32_t max_fields_size)
2086 {
2087     char                          *p;
2088     uint32_t                      i, buf_size;
2089     nxt_unit_buf_t                *buf;
2090     nxt_unit_field_t              *f, *src;
2091     nxt_unit_response_t           *resp;
2092     nxt_unit_request_info_impl_t  *req_impl;
2093 
2094     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2095 
2096     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2097         nxt_unit_req_warn(req, "realloc: response not init");
2098 
2099         return NXT_UNIT_ERROR;
2100     }
2101 
2102     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2103         nxt_unit_req_warn(req, "realloc: response already sent");
2104 
2105         return NXT_UNIT_ERROR;
2106     }
2107 
2108     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2109         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2110 
2111         return NXT_UNIT_ERROR;
2112     }
2113 
2114     /*
2115      * Each field name and value 0-terminated by libunit,
2116      * this is the reason of '+ 2' below.
2117      */
2118     buf_size = sizeof(nxt_unit_response_t)
2119                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2120                + max_fields_size;
2121 
2122     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2123 
2124     buf = nxt_unit_response_buf_alloc(req, buf_size);
2125     if (nxt_slow_path(buf == NULL)) {
2126         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2127         return NXT_UNIT_ERROR;
2128     }
2129 
2130     resp = (nxt_unit_response_t *) buf->start;
2131 
2132     memset(resp, 0, sizeof(nxt_unit_response_t));
2133 
2134     resp->status = req->response->status;
2135     resp->content_length = req->response->content_length;
2136 
2137     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2138     f = resp->fields;
2139 
2140     for (i = 0; i < req->response->fields_count; i++) {
2141         src = req->response->fields + i;
2142 
2143         if (nxt_slow_path(src->skip != 0)) {
2144             continue;
2145         }
2146 
2147         if (nxt_slow_path(src->name_length + src->value_length + 2
2148                           > (uint32_t) (buf->end - p)))
2149         {
2150             nxt_unit_req_warn(req, "realloc: not enough space for field"
2151                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2152                   i, src, src->name_length, src->value_length);
2153 
2154             goto fail;
2155         }
2156 
2157         nxt_unit_sptr_set(&f->name, p);
2158         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2159         *p++ = '\0';
2160 
2161         nxt_unit_sptr_set(&f->value, p);
2162         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2163         *p++ = '\0';
2164 
2165         f->hash = src->hash;
2166         f->skip = 0;
2167         f->name_length = src->name_length;
2168         f->value_length = src->value_length;
2169 
2170         resp->fields_count++;
2171         f++;
2172     }
2173 
2174     if (req->response->piggyback_content_length > 0) {
2175         if (nxt_slow_path(req->response->piggyback_content_length
2176                           > (uint32_t) (buf->end - p)))
2177         {
2178             nxt_unit_req_warn(req, "realloc: not enought space for content"
2179                   " #%"PRIu32", %"PRIu32" required",
2180                   i, req->response->piggyback_content_length);
2181 
2182             goto fail;
2183         }
2184 
2185         resp->piggyback_content_length =
2186                                        req->response->piggyback_content_length;
2187 
2188         nxt_unit_sptr_set(&resp->piggyback_content, p);
2189         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2190                        req->response->piggyback_content_length);
2191     }
2192 
2193     buf->free = p;
2194 
2195     nxt_unit_buf_free(req->response_buf);
2196 
2197     req->response = resp;
2198     req->response_buf = buf;
2199     req->response_max_fields = max_fields_count;
2200 
2201     return NXT_UNIT_OK;
2202 
2203 fail:
2204 
2205     nxt_unit_buf_free(buf);
2206 
2207     return NXT_UNIT_ERROR;
2208 }
2209 
2210 
2211 int
2212 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2213 {
2214     nxt_unit_request_info_impl_t  *req_impl;
2215 
2216     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2217 
2218     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2219 }
2220 
2221 
2222 int
2223 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2224     const char *name, uint8_t name_length,
2225     const char *value, uint32_t value_length)
2226 {
2227     nxt_unit_buf_t                *buf;
2228     nxt_unit_field_t              *f;
2229     nxt_unit_response_t           *resp;
2230     nxt_unit_request_info_impl_t  *req_impl;
2231 
2232     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2233 
2234     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2235         nxt_unit_req_warn(req, "add_field: response not initialized or "
2236                           "already sent");
2237 
2238         return NXT_UNIT_ERROR;
2239     }
2240 
2241     resp = req->response;
2242 
2243     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2244         nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2245                           (int) resp->fields_count);
2246 
2247         return NXT_UNIT_ERROR;
2248     }
2249 
2250     buf = req->response_buf;
2251 
2252     if (nxt_slow_path(name_length + value_length + 2
2253                       > (uint32_t) (buf->end - buf->free)))
2254     {
2255         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2256 
2257         return NXT_UNIT_ERROR;
2258     }
2259 
2260     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2261                        resp->fields_count,
2262                        (int) name_length, name,
2263                        (int) value_length, value);
2264 
2265     f = resp->fields + resp->fields_count;
2266 
2267     nxt_unit_sptr_set(&f->name, buf->free);
2268     buf->free = nxt_cpymem(buf->free, name, name_length);
2269     *buf->free++ = '\0';
2270 
2271     nxt_unit_sptr_set(&f->value, buf->free);
2272     buf->free = nxt_cpymem(buf->free, value, value_length);
2273     *buf->free++ = '\0';
2274 
2275     f->hash = nxt_unit_field_hash(name, name_length);
2276     f->skip = 0;
2277     f->name_length = name_length;
2278     f->value_length = value_length;
2279 
2280     resp->fields_count++;
2281 
2282     return NXT_UNIT_OK;
2283 }
2284 
2285 
2286 int
2287 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2288     const void* src, uint32_t size)
2289 {
2290     nxt_unit_buf_t                *buf;
2291     nxt_unit_response_t           *resp;
2292     nxt_unit_request_info_impl_t  *req_impl;
2293 
2294     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2295 
2296     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2297         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2298 
2299         return NXT_UNIT_ERROR;
2300     }
2301 
2302     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2303         nxt_unit_req_warn(req, "add_content: response already sent");
2304 
2305         return NXT_UNIT_ERROR;
2306     }
2307 
2308     buf = req->response_buf;
2309 
2310     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2311         nxt_unit_req_warn(req, "add_content: buffer overflow");
2312 
2313         return NXT_UNIT_ERROR;
2314     }
2315 
2316     resp = req->response;
2317 
2318     if (resp->piggyback_content_length == 0) {
2319         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2320         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2321     }
2322 
2323     resp->piggyback_content_length += size;
2324 
2325     buf->free = nxt_cpymem(buf->free, src, size);
2326 
2327     return NXT_UNIT_OK;
2328 }
2329 
2330 
2331 int
2332 nxt_unit_response_send(nxt_unit_request_info_t *req)
2333 {
2334     int                           rc;
2335     nxt_unit_mmap_buf_t           *mmap_buf;
2336     nxt_unit_request_info_impl_t  *req_impl;
2337 
2338     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2339 
2340     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2341         nxt_unit_req_warn(req, "send: response is not initialized yet");
2342 
2343         return NXT_UNIT_ERROR;
2344     }
2345 
2346     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2347         nxt_unit_req_warn(req, "send: response already sent");
2348 
2349         return NXT_UNIT_ERROR;
2350     }
2351 
2352     if (req->request->websocket_handshake && req->response->status == 101) {
2353         nxt_unit_response_upgrade(req);
2354     }
2355 
2356     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2357                        req->response->fields_count,
2358                        (int) (req->response_buf->free
2359                               - req->response_buf->start));
2360 
2361     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2362 
2363     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2364     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2365         req->response = NULL;
2366         req->response_buf = NULL;
2367         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2368 
2369         nxt_unit_mmap_buf_free(mmap_buf);
2370     }
2371 
2372     return rc;
2373 }
2374 
2375 
2376 int
2377 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2378 {
2379     nxt_unit_request_info_impl_t  *req_impl;
2380 
2381     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2382 
2383     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2384 }
2385 
2386 
2387 nxt_unit_buf_t *
2388 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2389 {
2390     int                           rc;
2391     nxt_unit_mmap_buf_t           *mmap_buf;
2392     nxt_unit_request_info_impl_t  *req_impl;
2393 
2394     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2395         nxt_unit_req_warn(req, "response_buf_alloc: "
2396                           "requested buffer (%"PRIu32") too big", size);
2397 
2398         return NULL;
2399     }
2400 
2401     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2402 
2403     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2404 
2405     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2406     if (nxt_slow_path(mmap_buf == NULL)) {
2407         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2408 
2409         return NULL;
2410     }
2411 
2412     mmap_buf->req = req;
2413 
2414     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2415 
2416     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2417                                    size, size, mmap_buf,
2418                                    NULL);
2419     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2420         nxt_unit_mmap_buf_release(mmap_buf);
2421 
2422         nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2423 
2424         return NULL;
2425     }
2426 
2427     return &mmap_buf->buf;
2428 }
2429 
2430 
2431 static nxt_unit_mmap_buf_t *
2432 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2433 {
2434     nxt_unit_mmap_buf_t  *mmap_buf;
2435     nxt_unit_ctx_impl_t  *ctx_impl;
2436 
2437     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2438 
2439     pthread_mutex_lock(&ctx_impl->mutex);
2440 
2441     if (ctx_impl->free_buf == NULL) {
2442         pthread_mutex_unlock(&ctx_impl->mutex);
2443 
2444         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2445         if (nxt_slow_path(mmap_buf == NULL)) {
2446             return NULL;
2447         }
2448 
2449     } else {
2450         mmap_buf = ctx_impl->free_buf;
2451 
2452         nxt_unit_mmap_buf_unlink(mmap_buf);
2453 
2454         pthread_mutex_unlock(&ctx_impl->mutex);
2455     }
2456 
2457     mmap_buf->ctx_impl = ctx_impl;
2458 
2459     mmap_buf->hdr = NULL;
2460     mmap_buf->free_ptr = NULL;
2461 
2462     return mmap_buf;
2463 }
2464 
2465 
2466 static void
2467 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2468 {
2469     nxt_unit_mmap_buf_unlink(mmap_buf);
2470 
2471     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2472 
2473     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2474 
2475     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2476 }
2477 
2478 
2479 int
2480 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2481 {
2482     return req->request->websocket_handshake;
2483 }
2484 
2485 
2486 int
2487 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2488 {
2489     int                           rc;
2490     nxt_unit_request_info_impl_t  *req_impl;
2491 
2492     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2493 
2494     if (nxt_slow_path(req_impl->websocket != 0)) {
2495         nxt_unit_req_debug(req, "upgrade: already upgraded");
2496 
2497         return NXT_UNIT_OK;
2498     }
2499 
2500     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2501         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2502 
2503         return NXT_UNIT_ERROR;
2504     }
2505 
2506     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2507         nxt_unit_req_warn(req, "upgrade: response already sent");
2508 
2509         return NXT_UNIT_ERROR;
2510     }
2511 
2512     rc = nxt_unit_request_hash_add(req->ctx, req);
2513     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2514         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2515 
2516         return NXT_UNIT_ERROR;
2517     }
2518 
2519     req_impl->websocket = 1;
2520 
2521     req->response->status = 101;
2522 
2523     return NXT_UNIT_OK;
2524 }
2525 
2526 
2527 int
2528 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2529 {
2530     nxt_unit_request_info_impl_t  *req_impl;
2531 
2532     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2533 
2534     return req_impl->websocket;
2535 }
2536 
2537 
2538 nxt_unit_request_info_t *
2539 nxt_unit_get_request_info_from_data(void *data)
2540 {
2541     nxt_unit_request_info_impl_t  *req_impl;
2542 
2543     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2544 
2545     return &req_impl->req;
2546 }
2547 
2548 
2549 int
2550 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2551 {
2552     int                           rc;
2553     nxt_unit_mmap_buf_t           *mmap_buf;
2554     nxt_unit_request_info_t       *req;
2555     nxt_unit_request_info_impl_t  *req_impl;
2556 
2557     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2558 
2559     req = mmap_buf->req;
2560     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2561 
2562     nxt_unit_req_debug(req, "buf_send: %d bytes",
2563                        (int) (buf->free - buf->start));
2564 
2565     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2566         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2567 
2568         return NXT_UNIT_ERROR;
2569     }
2570 
2571     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2572         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2573 
2574         return NXT_UNIT_ERROR;
2575     }
2576 
2577     if (nxt_fast_path(buf->free > buf->start)) {
2578         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2579         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2580             return rc;
2581         }
2582     }
2583 
2584     nxt_unit_mmap_buf_free(mmap_buf);
2585 
2586     return NXT_UNIT_OK;
2587 }
2588 
2589 
2590 static void
2591 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2592 {
2593     int                      rc;
2594     nxt_unit_mmap_buf_t      *mmap_buf;
2595     nxt_unit_request_info_t  *req;
2596 
2597     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2598 
2599     req = mmap_buf->req;
2600 
2601     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2602     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2603         nxt_unit_mmap_buf_free(mmap_buf);
2604 
2605         nxt_unit_request_info_release(req);
2606 
2607     } else {
2608         nxt_unit_request_done(req, rc);
2609     }
2610 }
2611 
2612 
2613 static int
2614 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2615     nxt_unit_mmap_buf_t *mmap_buf, int last)
2616 {
2617     struct {
2618         nxt_port_msg_t       msg;
2619         nxt_port_mmap_msg_t  mmap_msg;
2620     } m;
2621 
2622     int                           rc;
2623     u_char                        *last_used, *first_free;
2624     ssize_t                       res;
2625     nxt_chunk_id_t                first_free_chunk;
2626     nxt_unit_buf_t                *buf;
2627     nxt_unit_impl_t               *lib;
2628     nxt_port_mmap_header_t        *hdr;
2629     nxt_unit_request_info_impl_t  *req_impl;
2630 
2631     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2632     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2633 
2634     buf = &mmap_buf->buf;
2635     hdr = mmap_buf->hdr;
2636 
2637     m.mmap_msg.size = buf->free - buf->start;
2638 
2639     m.msg.stream = req_impl->stream;
2640     m.msg.pid = lib->pid;
2641     m.msg.reply_port = 0;
2642     m.msg.type = _NXT_PORT_MSG_DATA;
2643     m.msg.last = last != 0;
2644     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2645     m.msg.nf = 0;
2646     m.msg.mf = 0;
2647     m.msg.tracking = 0;
2648 
2649     rc = NXT_UNIT_ERROR;
2650 
2651     if (m.msg.mmap) {
2652         m.mmap_msg.mmap_id = hdr->id;
2653         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2654                                                      (u_char *) buf->start);
2655 
2656         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2657                        req_impl->stream,
2658                        (int) m.mmap_msg.mmap_id,
2659                        (int) m.mmap_msg.chunk_id,
2660                        (int) m.mmap_msg.size);
2661 
2662         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2663                                  NULL);
2664         if (nxt_slow_path(res != sizeof(m))) {
2665             goto free_buf;
2666         }
2667 
2668         last_used = (u_char *) buf->free - 1;
2669         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2670 
2671         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2672             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2673 
2674             buf->start = (char *) first_free;
2675             buf->free = buf->start;
2676 
2677             if (buf->end < buf->start) {
2678                 buf->end = buf->start;
2679             }
2680 
2681         } else {
2682             buf->start = NULL;
2683             buf->free = NULL;
2684             buf->end = NULL;
2685 
2686             mmap_buf->hdr = NULL;
2687         }
2688 
2689         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2690                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2691 
2692         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2693                        (int) lib->outgoing.allocated_chunks);
2694 
2695     } else {
2696         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2697                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2698         {
2699             nxt_unit_alert(req->ctx,
2700                            "#%"PRIu32": failed to send plain memory buffer"
2701                            ": no space reserved for message header",
2702                            req_impl->stream);
2703 
2704             goto free_buf;
2705         }
2706 
2707         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2708 
2709         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2710                        req_impl->stream,
2711                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2712 
2713         res = nxt_unit_port_send(req->ctx, req->response_port,
2714                                  buf->start - sizeof(m.msg),
2715                                  m.mmap_msg.size + sizeof(m.msg), NULL);
2716 
2717         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2718             goto free_buf;
2719         }
2720     }
2721 
2722     rc = NXT_UNIT_OK;
2723 
2724 free_buf:
2725 
2726     nxt_unit_free_outgoing_buf(mmap_buf);
2727 
2728     return rc;
2729 }
2730 
2731 
2732 void
2733 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2734 {
2735     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2736 }
2737 
2738 
2739 static void
2740 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2741 {
2742     nxt_unit_free_outgoing_buf(mmap_buf);
2743 
2744     nxt_unit_mmap_buf_release(mmap_buf);
2745 }
2746 
2747 
2748 static void
2749 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2750 {
2751     if (mmap_buf->hdr != NULL) {
2752         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2753                               mmap_buf->hdr, mmap_buf->buf.start,
2754                               mmap_buf->buf.end - mmap_buf->buf.start);
2755 
2756         mmap_buf->hdr = NULL;
2757 
2758         return;
2759     }
2760 
2761     if (mmap_buf->free_ptr != NULL) {
2762         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2763 
2764         mmap_buf->free_ptr = NULL;
2765     }
2766 }
2767 
2768 
2769 static nxt_unit_read_buf_t *
2770 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2771 {
2772     nxt_unit_ctx_impl_t  *ctx_impl;
2773     nxt_unit_read_buf_t  *rbuf;
2774 
2775     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2776 
2777     pthread_mutex_lock(&ctx_impl->mutex);
2778 
2779     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2780 
2781     pthread_mutex_unlock(&ctx_impl->mutex);
2782 
2783     rbuf->oob.size = 0;
2784 
2785     return rbuf;
2786 }
2787 
2788 
2789 static nxt_unit_read_buf_t *
2790 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2791 {
2792     nxt_queue_link_t     *link;
2793     nxt_unit_read_buf_t  *rbuf;
2794 
2795     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2796         link = nxt_queue_first(&ctx_impl->free_rbuf);
2797         nxt_queue_remove(link);
2798 
2799         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2800 
2801         return rbuf;
2802     }
2803 
2804     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2805 
2806     if (nxt_fast_path(rbuf != NULL)) {
2807         rbuf->ctx_impl = ctx_impl;
2808     }
2809 
2810     return rbuf;
2811 }
2812 
2813 
2814 static void
2815 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2816     nxt_unit_read_buf_t *rbuf)
2817 {
2818     nxt_unit_ctx_impl_t  *ctx_impl;
2819 
2820     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2821 
2822     pthread_mutex_lock(&ctx_impl->mutex);
2823 
2824     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2825 
2826     pthread_mutex_unlock(&ctx_impl->mutex);
2827 }
2828 
2829 
2830 nxt_unit_buf_t *
2831 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2832 {
2833     nxt_unit_mmap_buf_t  *mmap_buf;
2834 
2835     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2836 
2837     if (mmap_buf->next == NULL) {
2838         return NULL;
2839     }
2840 
2841     return &mmap_buf->next->buf;
2842 }
2843 
2844 
2845 uint32_t
2846 nxt_unit_buf_max(void)
2847 {
2848     return PORT_MMAP_DATA_SIZE;
2849 }
2850 
2851 
2852 uint32_t
2853 nxt_unit_buf_min(void)
2854 {
2855     return PORT_MMAP_CHUNK_SIZE;
2856 }
2857 
2858 
2859 int
2860 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2861     size_t size)
2862 {
2863     ssize_t  res;
2864 
2865     res = nxt_unit_response_write_nb(req, start, size, size);
2866 
2867     return res < 0 ? -res : NXT_UNIT_OK;
2868 }
2869 
2870 
2871 ssize_t
2872 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2873     size_t size, size_t min_size)
2874 {
2875     int                           rc;
2876     ssize_t                       sent;
2877     uint32_t                      part_size, min_part_size, buf_size;
2878     const char                    *part_start;
2879     nxt_unit_mmap_buf_t           mmap_buf;
2880     nxt_unit_request_info_impl_t  *req_impl;
2881     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2882 
2883     nxt_unit_req_debug(req, "write: %d", (int) size);
2884 
2885     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2886 
2887     part_start = start;
2888     sent = 0;
2889 
2890     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2891         nxt_unit_req_alert(req, "write: response not initialized yet");
2892 
2893         return -NXT_UNIT_ERROR;
2894     }
2895 
2896     /* Check if response is not send yet. */
2897     if (nxt_slow_path(req->response_buf != NULL)) {
2898         part_size = req->response_buf->end - req->response_buf->free;
2899         part_size = nxt_min(size, part_size);
2900 
2901         rc = nxt_unit_response_add_content(req, part_start, part_size);
2902         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2903             return -rc;
2904         }
2905 
2906         rc = nxt_unit_response_send(req);
2907         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2908             return -rc;
2909         }
2910 
2911         size -= part_size;
2912         part_start += part_size;
2913         sent += part_size;
2914 
2915         min_size -= nxt_min(min_size, part_size);
2916     }
2917 
2918     while (size > 0) {
2919         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2920         min_part_size = nxt_min(min_size, part_size);
2921         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2922 
2923         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2924                                        min_part_size, &mmap_buf, local_buf);
2925         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2926             return -rc;
2927         }
2928 
2929         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2930         if (nxt_slow_path(buf_size == 0)) {
2931             return sent;
2932         }
2933         part_size = nxt_min(buf_size, part_size);
2934 
2935         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2936                                        part_start, part_size);
2937 
2938         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2939         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2940             return -rc;
2941         }
2942 
2943         size -= part_size;
2944         part_start += part_size;
2945         sent += part_size;
2946 
2947         min_size -= nxt_min(min_size, part_size);
2948     }
2949 
2950     return sent;
2951 }
2952 
2953 
2954 int
2955 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2956     nxt_unit_read_info_t *read_info)
2957 {
2958     int                           rc;
2959     ssize_t                       n;
2960     uint32_t                      buf_size;
2961     nxt_unit_buf_t                *buf;
2962     nxt_unit_mmap_buf_t           mmap_buf;
2963     nxt_unit_request_info_impl_t  *req_impl;
2964     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2965 
2966     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2967 
2968     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2969         nxt_unit_req_alert(req, "write: response not initialized yet");
2970 
2971         return NXT_UNIT_ERROR;
2972     }
2973 
2974     /* Check if response is not send yet. */
2975     if (nxt_slow_path(req->response_buf != NULL)) {
2976 
2977         /* Enable content in headers buf. */
2978         rc = nxt_unit_response_add_content(req, "", 0);
2979         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2980             nxt_unit_req_error(req, "Failed to add piggyback content");
2981 
2982             return rc;
2983         }
2984 
2985         buf = req->response_buf;
2986 
2987         while (buf->end - buf->free > 0) {
2988             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2989             if (nxt_slow_path(n < 0)) {
2990                 nxt_unit_req_error(req, "Read error");
2991 
2992                 return NXT_UNIT_ERROR;
2993             }
2994 
2995             /* Manually increase sizes. */
2996             buf->free += n;
2997             req->response->piggyback_content_length += n;
2998 
2999             if (read_info->eof) {
3000                 break;
3001             }
3002         }
3003 
3004         rc = nxt_unit_response_send(req);
3005         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3006             nxt_unit_req_error(req, "Failed to send headers with content");
3007 
3008             return rc;
3009         }
3010 
3011         if (read_info->eof) {
3012             return NXT_UNIT_OK;
3013         }
3014     }
3015 
3016     while (!read_info->eof) {
3017         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3018                            read_info->buf_size);
3019 
3020         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3021 
3022         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3023                                        buf_size, buf_size,
3024                                        &mmap_buf, local_buf);
3025         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3026             return rc;
3027         }
3028 
3029         buf = &mmap_buf.buf;
3030 
3031         while (!read_info->eof && buf->end > buf->free) {
3032             n = read_info->read(read_info, buf->free, buf->end - buf->free);
3033             if (nxt_slow_path(n < 0)) {
3034                 nxt_unit_req_error(req, "Read error");
3035 
3036                 nxt_unit_free_outgoing_buf(&mmap_buf);
3037 
3038                 return NXT_UNIT_ERROR;
3039             }
3040 
3041             buf->free += n;
3042         }
3043 
3044         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3045         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3046             nxt_unit_req_error(req, "Failed to send content");
3047 
3048             return rc;
3049         }
3050     }
3051 
3052     return NXT_UNIT_OK;
3053 }
3054 
3055 
3056 ssize_t
3057 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3058 {
3059     ssize_t  buf_res, res;
3060 
3061     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3062                                 dst, size);
3063 
3064     if (buf_res < (ssize_t) size && req->content_fd != -1) {
3065         res = read(req->content_fd, dst, size);
3066         if (nxt_slow_path(res < 0)) {
3067             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3068                                strerror(errno), errno);
3069 
3070             return res;
3071         }
3072 
3073         if (res < (ssize_t) size) {
3074             nxt_unit_close(req->content_fd);
3075 
3076             req->content_fd = -1;
3077         }
3078 
3079         req->content_length -= res;
3080         size -= res;
3081 
3082         dst = nxt_pointer_to(dst, res);
3083 
3084     } else {
3085         res = 0;
3086     }
3087 
3088     return buf_res + res;
3089 }
3090 
3091 
3092 ssize_t
3093 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3094 {
3095     char                 *p;
3096     size_t               l_size, b_size;
3097     nxt_unit_buf_t       *b;
3098     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3099 
3100     if (req->content_length == 0) {
3101         return 0;
3102     }
3103 
3104     l_size = 0;
3105 
3106     b = req->content_buf;
3107 
3108     while (b != NULL) {
3109         b_size = b->end - b->free;
3110         p = memchr(b->free, '\n', b_size);
3111 
3112         if (p != NULL) {
3113             p++;
3114             l_size += p - b->free;
3115             break;
3116         }
3117 
3118         l_size += b_size;
3119 
3120         if (max_size <= l_size) {
3121             break;
3122         }
3123 
3124         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
3125         if (mmap_buf->next == NULL
3126             && req->content_fd != -1
3127             && l_size < req->content_length)
3128         {
3129             preread_buf = nxt_unit_request_preread(req, 16384);
3130             if (nxt_slow_path(preread_buf == NULL)) {
3131                 return -1;
3132             }
3133 
3134             nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
3135         }
3136 
3137         b =