xref: /unit/src/nxt_unit.c (revision 1710:e598cd15bd91)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <stdlib.h>
7 
8 #include "nxt_main.h"
9 #include "nxt_port_memory_int.h"
10 #include "nxt_port_queue.h"
11 #include "nxt_app_queue.h"
12 
13 #include "nxt_unit.h"
14 #include "nxt_unit_request.h"
15 #include "nxt_unit_response.h"
16 #include "nxt_unit_websocket.h"
17 
18 #include "nxt_websocket.h"
19 
20 #if (NXT_HAVE_MEMFD_CREATE)
21 #include <linux/memfd.h>
22 #endif
23 
24 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
25 #define NXT_UNIT_LOCAL_BUF_SIZE  \
26     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
27 
28 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
29 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
30 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
31 typedef struct nxt_unit_process_s               nxt_unit_process_t;
32 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
33 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
34 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
35 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
36 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
37 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
38 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
39 
40 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
41 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
42     nxt_unit_ctx_impl_t *ctx_impl, void *data);
43 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
44 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
45 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
46 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
47 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
48     nxt_unit_mmap_buf_t *mmap_buf);
49 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
50     nxt_unit_mmap_buf_t *mmap_buf);
51 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
52 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
53     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
54     int *log_fd, uint32_t *stream, uint32_t *shm_limit);
55 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
56     int queue_fd);
57 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
58 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
59     nxt_unit_recv_msg_t *recv_msg);
60 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
61 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
62     nxt_unit_recv_msg_t *recv_msg);
63 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
64     nxt_unit_recv_msg_t *recv_msg);
65 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
66     nxt_unit_port_id_t *port_id);
67 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
68 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
69     nxt_unit_recv_msg_t *recv_msg);
70 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
71 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
72     nxt_unit_ctx_t *ctx);
73 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
74 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
75 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
76     nxt_unit_ctx_t *ctx);
77 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
78 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
79     nxt_unit_websocket_frame_impl_t *ws);
80 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
81 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
82 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
83     nxt_unit_mmap_buf_t *mmap_buf, int last);
84 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
85 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
86 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
87 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
88     nxt_unit_ctx_impl_t *ctx_impl);
89 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
90     nxt_unit_read_buf_t *rbuf);
91 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
92     nxt_unit_request_info_t *req, size_t size);
93 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
94     size_t size);
95 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
96     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
97 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
98 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
99 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
100 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
101     nxt_unit_port_t *port, int n);
102 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
103 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
104     int fd);
105 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
106     nxt_unit_port_t *port, uint32_t size,
107     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
108 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
109 
110 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
111     nxt_unit_ctx_impl_t *ctx_impl);
112 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
113 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
114 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
115 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
116 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
117     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
118     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
119 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
120     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
121 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
122 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
123     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
124 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
125 
126 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
127 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
128     pid_t pid, int remove);
129 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
130 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
131 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
132 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
133 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
134 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
135 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
136 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
137 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
138 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
139     nxt_unit_port_t *port);
140 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
141 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
142 
143 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
144     nxt_unit_port_t *port, int queue_fd);
145 
146 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
147 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
148 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
149     nxt_unit_port_t *port, void *queue);
150 static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
151     nxt_unit_port_id_t *port_id);
152 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
153     nxt_unit_port_id_t *port_id);
154 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
155 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
156     nxt_unit_process_t *process);
157 static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
158 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
159 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
160     nxt_unit_port_t *port, const void *buf, size_t buf_size,
161     const void *oob, size_t oob_size);
162 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
163     const void *buf, size_t buf_size, const void *oob, size_t oob_size);
164 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
165     nxt_unit_read_buf_t *rbuf);
166 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
167     nxt_unit_read_buf_t *src);
168 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
169     nxt_unit_read_buf_t *rbuf);
170 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
171     nxt_unit_read_buf_t *rbuf);
172 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
173     nxt_unit_read_buf_t *rbuf);
174 static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
175     nxt_unit_read_buf_t *rbuf);
176 nxt_inline int nxt_unit_close(int fd);
177 static int nxt_unit_fd_blocking(int fd);
178 
179 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
180     nxt_unit_port_t *port);
181 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
182     nxt_unit_port_id_t *port_id, int remove);
183 
184 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
185     nxt_unit_request_info_t *req);
186 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
187     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
188 
189 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
190 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
191 static void nxt_unit_lvlhsh_free(void *data, void *p);
192 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
193 
194 
195 struct nxt_unit_mmap_buf_s {
196     nxt_unit_buf_t           buf;
197 
198     nxt_unit_mmap_buf_t      *next;
199     nxt_unit_mmap_buf_t      **prev;
200 
201     nxt_port_mmap_header_t   *hdr;
202     nxt_unit_request_info_t  *req;
203     nxt_unit_ctx_impl_t      *ctx_impl;
204     char                     *free_ptr;
205     char                     *plain_ptr;
206 };
207 
208 
209 struct nxt_unit_recv_msg_s {
210     uint32_t                 stream;
211     nxt_pid_t                pid;
212     nxt_port_id_t            reply_port;
213 
214     uint8_t                  last;      /* 1 bit */
215     uint8_t                  mmap;      /* 1 bit */
216 
217     void                     *start;
218     uint32_t                 size;
219 
220     int                      fd[2];
221 
222     nxt_unit_mmap_buf_t      *incoming_buf;
223 };
224 
225 
226 typedef enum {
227     NXT_UNIT_RS_START           = 0,
228     NXT_UNIT_RS_RESPONSE_INIT,
229     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
230     NXT_UNIT_RS_RESPONSE_SENT,
231     NXT_UNIT_RS_RELEASED,
232 } nxt_unit_req_state_t;
233 
234 
235 struct nxt_unit_request_info_impl_s {
236     nxt_unit_request_info_t  req;
237 
238     uint32_t                 stream;
239 
240     nxt_unit_mmap_buf_t      *outgoing_buf;
241     nxt_unit_mmap_buf_t      *incoming_buf;
242 
243     nxt_unit_req_state_t     state;
244     uint8_t                  websocket;
245     uint8_t                  in_hash;
246 
247     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
248     nxt_queue_link_t         link;
249     /*  for nxt_unit_port_impl_t.awaiting_req */
250     nxt_queue_link_t         port_wait_link;
251 
252     char                     extra_data[];
253 };
254 
255 
256 struct nxt_unit_websocket_frame_impl_s {
257     nxt_unit_websocket_frame_t  ws;
258 
259     nxt_unit_mmap_buf_t         *buf;
260 
261     nxt_queue_link_t            link;
262 
263     nxt_unit_ctx_impl_t         *ctx_impl;
264 };
265 
266 
267 struct nxt_unit_read_buf_s {
268     nxt_queue_link_t              link;
269     nxt_unit_ctx_impl_t           *ctx_impl;
270     ssize_t                       size;
271     char                          buf[16384];
272     char                          oob[256];
273 };
274 
275 
276 struct nxt_unit_ctx_impl_s {
277     nxt_unit_ctx_t                ctx;
278 
279     nxt_atomic_t                  use_count;
280     nxt_atomic_t                  wait_items;
281 
282     pthread_mutex_t               mutex;
283 
284     nxt_unit_port_t               *read_port;
285 
286     nxt_queue_link_t              link;
287 
288     nxt_unit_mmap_buf_t           *free_buf;
289 
290     /*  of nxt_unit_request_info_impl_t */
291     nxt_queue_t                   free_req;
292 
293     /*  of nxt_unit_websocket_frame_impl_t */
294     nxt_queue_t                   free_ws;
295 
296     /*  of nxt_unit_request_info_impl_t */
297     nxt_queue_t                   active_req;
298 
299     /*  of nxt_unit_request_info_impl_t */
300     nxt_lvlhsh_t                  requests;
301 
302     /*  of nxt_unit_request_info_impl_t */
303     nxt_queue_t                   ready_req;
304 
305     /*  of nxt_unit_read_buf_t */
306     nxt_queue_t                   pending_rbuf;
307 
308     /*  of nxt_unit_read_buf_t */
309     nxt_queue_t                   free_rbuf;
310 
311     int                           online;
312     int                           ready;
313 
314     nxt_unit_mmap_buf_t           ctx_buf[2];
315     nxt_unit_read_buf_t           ctx_read_buf;
316 
317     nxt_unit_request_info_impl_t  req;
318 };
319 
320 
321 struct nxt_unit_mmap_s {
322     nxt_port_mmap_header_t   *hdr;
323     pthread_t                src_thread;
324 
325     /*  of nxt_unit_read_buf_t */
326     nxt_queue_t              awaiting_rbuf;
327 };
328 
329 
330 struct nxt_unit_mmaps_s {
331     pthread_mutex_t          mutex;
332     uint32_t                 size;
333     uint32_t                 cap;
334     nxt_atomic_t             allocated_chunks;
335     nxt_unit_mmap_t          *elts;
336 };
337 
338 
339 struct nxt_unit_impl_s {
340     nxt_unit_t               unit;
341     nxt_unit_callbacks_t     callbacks;
342 
343     nxt_atomic_t             use_count;
344 
345     uint32_t                 request_data_size;
346     uint32_t                 shm_mmap_limit;
347 
348     pthread_mutex_t          mutex;
349 
350     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
351     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
352 
353     nxt_unit_port_t          *router_port;
354     nxt_unit_port_t          *shared_port;
355 
356     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
357 
358     nxt_unit_mmaps_t         incoming;
359     nxt_unit_mmaps_t         outgoing;
360 
361     pid_t                    pid;
362     int                      log_fd;
363 
364     nxt_unit_ctx_impl_t      main_ctx;
365 };
366 
367 
368 struct nxt_unit_port_impl_s {
369     nxt_unit_port_t          port;
370 
371     nxt_atomic_t             use_count;
372 
373     /*  for nxt_unit_process_t.ports */
374     nxt_queue_link_t         link;
375     nxt_unit_process_t       *process;
376 
377     /*  of nxt_unit_request_info_impl_t */
378     nxt_queue_t              awaiting_req;
379 
380     int                      ready;
381 
382     void                     *queue;
383 
384     int                      from_socket;
385     nxt_unit_read_buf_t      *socket_rbuf;
386 };
387 
388 
389 struct nxt_unit_process_s {
390     pid_t                    pid;
391 
392     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
393 
394     nxt_unit_impl_t          *lib;
395 
396     nxt_atomic_t             use_count;
397 
398     uint32_t                 next_port_id;
399 };
400 
401 
402 /* Explicitly using 32 bit types to avoid possible alignment. */
403 typedef struct {
404     int32_t   pid;
405     uint32_t  id;
406 } nxt_unit_port_hash_id_t;
407 
408 
409 nxt_unit_ctx_t *
410 nxt_unit_init(nxt_unit_init_t *init)
411 {
412     int              rc, queue_fd;
413     void             *mem;
414     uint32_t         ready_stream, shm_limit;
415     nxt_unit_ctx_t   *ctx;
416     nxt_unit_impl_t  *lib;
417     nxt_unit_port_t  ready_port, router_port, read_port;
418 
419     lib = nxt_unit_create(init);
420     if (nxt_slow_path(lib == NULL)) {
421         return NULL;
422     }
423 
424     queue_fd = -1;
425     mem = MAP_FAILED;
426 
427     if (init->ready_port.id.pid != 0
428         && init->ready_stream != 0
429         && init->read_port.id.pid != 0)
430     {
431         ready_port = init->ready_port;
432         ready_stream = init->ready_stream;
433         router_port = init->router_port;
434         read_port = init->read_port;
435         lib->log_fd = init->log_fd;
436 
437         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
438                               ready_port.id.id);
439         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
440                               router_port.id.id);
441         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
442                               read_port.id.id);
443 
444     } else {
445         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
446                                &lib->log_fd, &ready_stream, &shm_limit);
447         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
448             goto fail;
449         }
450 
451         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
452                                 / PORT_MMAP_DATA_SIZE;
453     }
454 
455     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
456         lib->shm_mmap_limit = 1;
457     }
458 
459     lib->pid = read_port.id.pid;
460 
461     ctx = &lib->main_ctx.ctx;
462 
463     rc = nxt_unit_fd_blocking(router_port.out_fd);
464     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
465         goto fail;
466     }
467 
468     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
469     if (nxt_slow_path(lib->router_port == NULL)) {
470         nxt_unit_alert(NULL, "failed to add router_port");
471 
472         goto fail;
473     }
474 
475     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
476     if (nxt_slow_path(queue_fd == -1)) {
477         goto fail;
478     }
479 
480     mem = mmap(NULL, sizeof(nxt_port_queue_t),
481                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
482     if (nxt_slow_path(mem == MAP_FAILED)) {
483         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
484                        strerror(errno), errno);
485 
486         goto fail;
487     }
488 
489     nxt_port_queue_init(mem);
490 
491     rc = nxt_unit_fd_blocking(read_port.in_fd);
492     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
493         goto fail;
494     }
495 
496     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
497     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
498         nxt_unit_alert(NULL, "failed to add read_port");
499 
500         goto fail;
501     }
502 
503     rc = nxt_unit_fd_blocking(ready_port.out_fd);
504     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
505         goto fail;
506     }
507 
508     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
509     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
510         nxt_unit_alert(NULL, "failed to send READY message");
511 
512         goto fail;
513     }
514 
515     nxt_unit_close(ready_port.out_fd);
516     nxt_unit_close(queue_fd);
517 
518     return ctx;
519 
520 fail:
521 
522     if (mem != MAP_FAILED) {
523         munmap(mem, sizeof(nxt_port_queue_t));
524     }
525 
526     if (queue_fd != -1) {
527         nxt_unit_close(queue_fd);
528     }
529 
530     nxt_unit_ctx_release(&lib->main_ctx.ctx);
531 
532     return NULL;
533 }
534 
535 
536 static nxt_unit_impl_t *
537 nxt_unit_create(nxt_unit_init_t *init)
538 {
539     int                   rc;
540     nxt_unit_impl_t       *lib;
541     nxt_unit_callbacks_t  *cb;
542 
543     lib = nxt_unit_malloc(NULL,
544                           sizeof(nxt_unit_impl_t) + init->request_data_size);
545     if (nxt_slow_path(lib == NULL)) {
546         nxt_unit_alert(NULL, "failed to allocate unit struct");
547 
548         return NULL;
549     }
550 
551     rc = pthread_mutex_init(&lib->mutex, NULL);
552     if (nxt_slow_path(rc != 0)) {
553         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
554 
555         goto fail;
556     }
557 
558     lib->unit.data = init->data;
559     lib->callbacks = init->callbacks;
560 
561     lib->request_data_size = init->request_data_size;
562     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
563                             / PORT_MMAP_DATA_SIZE;
564 
565     lib->processes.slot = NULL;
566     lib->ports.slot = NULL;
567 
568     lib->log_fd = STDERR_FILENO;
569 
570     nxt_queue_init(&lib->contexts);
571 
572     lib->use_count = 0;
573     lib->router_port = NULL;
574     lib->shared_port = NULL;
575 
576     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
577     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
578         pthread_mutex_destroy(&lib->mutex);
579         goto fail;
580     }
581 
582     cb = &lib->callbacks;
583 
584     if (cb->request_handler == NULL) {
585         nxt_unit_alert(NULL, "request_handler is NULL");
586 
587         pthread_mutex_destroy(&lib->mutex);
588         goto fail;
589     }
590 
591     nxt_unit_mmaps_init(&lib->incoming);
592     nxt_unit_mmaps_init(&lib->outgoing);
593 
594     return lib;
595 
596 fail:
597 
598     nxt_unit_free(NULL, lib);
599 
600     return NULL;
601 }
602 
603 
604 static int
605 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
606     void *data)
607 {
608     int  rc;
609 
610     ctx_impl->ctx.data = data;
611     ctx_impl->ctx.unit = &lib->unit;
612 
613     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
614     if (nxt_slow_path(rc != 0)) {
615         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
616 
617         return NXT_UNIT_ERROR;
618     }
619 
620     nxt_unit_lib_use(lib);
621 
622     pthread_mutex_lock(&lib->mutex);
623 
624     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
625 
626     pthread_mutex_unlock(&lib->mutex);
627 
628     ctx_impl->use_count = 1;
629     ctx_impl->wait_items = 0;
630     ctx_impl->online = 1;
631     ctx_impl->ready = 0;
632 
633     nxt_queue_init(&ctx_impl->free_req);
634     nxt_queue_init(&ctx_impl->free_ws);
635     nxt_queue_init(&ctx_impl->active_req);
636     nxt_queue_init(&ctx_impl->ready_req);
637     nxt_queue_init(&ctx_impl->pending_rbuf);
638     nxt_queue_init(&ctx_impl->free_rbuf);
639 
640     ctx_impl->free_buf = NULL;
641     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
642     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
643 
644     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
645     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
646 
647     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
648 
649     ctx_impl->req.req.ctx = &ctx_impl->ctx;
650     ctx_impl->req.req.unit = &lib->unit;
651 
652     ctx_impl->read_port = NULL;
653     ctx_impl->requests.slot = 0;
654 
655     return NXT_UNIT_OK;
656 }
657 
658 
659 nxt_inline void
660 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
661 {
662     nxt_unit_ctx_impl_t  *ctx_impl;
663 
664     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
665 
666     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
667 }
668 
669 
670 nxt_inline void
671 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
672 {
673     long                 c;
674     nxt_unit_ctx_impl_t  *ctx_impl;
675 
676     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
677 
678     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
679 
680     if (c == 1) {
681         nxt_unit_ctx_free(ctx_impl);
682     }
683 }
684 
685 
686 nxt_inline void
687 nxt_unit_lib_use(nxt_unit_impl_t *lib)
688 {
689     nxt_atomic_fetch_add(&lib->use_count, 1);
690 }
691 
692 
693 nxt_inline void
694 nxt_unit_lib_release(nxt_unit_impl_t *lib)
695 {
696     long                c;
697     nxt_unit_process_t  *process;
698 
699     c = nxt_atomic_fetch_add(&lib->use_count, -1);
700 
701     if (c == 1) {
702         for ( ;; ) {
703             pthread_mutex_lock(&lib->mutex);
704 
705             process = nxt_unit_process_pop_first(lib);
706             if (process == NULL) {
707                 pthread_mutex_unlock(&lib->mutex);
708 
709                 break;
710             }
711 
712             nxt_unit_remove_process(lib, process);
713         }
714 
715         pthread_mutex_destroy(&lib->mutex);
716 
717         if (nxt_fast_path(lib->router_port != NULL)) {
718             nxt_unit_port_release(lib->router_port);
719         }
720 
721         if (nxt_fast_path(lib->shared_port != NULL)) {
722             nxt_unit_port_release(lib->shared_port);
723         }
724 
725         nxt_unit_mmaps_destroy(&lib->incoming);
726         nxt_unit_mmaps_destroy(&lib->outgoing);
727 
728         nxt_unit_free(NULL, lib);
729     }
730 }
731 
732 
733 nxt_inline void
734 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
735     nxt_unit_mmap_buf_t *mmap_buf)
736 {
737     mmap_buf->next = *head;
738 
739     if (mmap_buf->next != NULL) {
740         mmap_buf->next->prev = &mmap_buf->next;
741     }
742 
743     *head = mmap_buf;
744     mmap_buf->prev = head;
745 }
746 
747 
748 nxt_inline void
749 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
750     nxt_unit_mmap_buf_t *mmap_buf)
751 {
752     while (*prev != NULL) {
753         prev = &(*prev)->next;
754     }
755 
756     nxt_unit_mmap_buf_insert(prev, mmap_buf);
757 }
758 
759 
760 nxt_inline void
761 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
762 {
763     nxt_unit_mmap_buf_t  **prev;
764 
765     prev = mmap_buf->prev;
766 
767     if (mmap_buf->next != NULL) {
768         mmap_buf->next->prev = prev;
769     }
770 
771     if (prev != NULL) {
772         *prev = mmap_buf->next;
773     }
774 }
775 
776 
777 static int
778 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
779     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
780     uint32_t *shm_limit)
781 {
782     int       rc;
783     int       ready_fd, router_fd, read_in_fd, read_out_fd;
784     char      *unit_init, *version_end;
785     long      version_length;
786     int64_t   ready_pid, router_pid, read_pid;
787     uint32_t  ready_stream, router_id, ready_id, read_id;
788 
789     unit_init = getenv(NXT_UNIT_INIT_ENV);
790     if (nxt_slow_path(unit_init == NULL)) {
791         nxt_unit_alert(NULL, "%s is not in the current environment",
792                        NXT_UNIT_INIT_ENV);
793 
794         return NXT_UNIT_ERROR;
795     }
796 
797     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
798 
799     version_length = nxt_length(NXT_VERSION);
800 
801     version_end = strchr(unit_init, ';');
802     if (version_end == NULL
803         || version_end - unit_init != version_length
804         || memcmp(unit_init, NXT_VERSION, version_length) != 0)
805     {
806         nxt_unit_alert(NULL, "version check error");
807 
808         return NXT_UNIT_ERROR;
809     }
810 
811     rc = sscanf(version_end + 1,
812                 "%"PRIu32";"
813                 "%"PRId64",%"PRIu32",%d;"
814                 "%"PRId64",%"PRIu32",%d;"
815                 "%"PRId64",%"PRIu32",%d,%d;"
816                 "%d,%"PRIu32,
817                 &ready_stream,
818                 &ready_pid, &ready_id, &ready_fd,
819                 &router_pid, &router_id, &router_fd,
820                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
821                 log_fd, shm_limit);
822 
823     if (nxt_slow_path(rc != 13)) {
824         nxt_unit_alert(NULL, "failed to scan variables: %d", rc);
825 
826         return NXT_UNIT_ERROR;
827     }
828 
829     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
830 
831     ready_port->in_fd = -1;
832     ready_port->out_fd = ready_fd;
833     ready_port->data = NULL;
834 
835     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
836 
837     router_port->in_fd = -1;
838     router_port->out_fd = router_fd;
839     router_port->data = NULL;
840 
841     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
842 
843     read_port->in_fd = read_in_fd;
844     read_port->out_fd = read_out_fd;
845     read_port->data = NULL;
846 
847     *stream = ready_stream;
848 
849     return NXT_UNIT_OK;
850 }
851 
852 
853 static int
854 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
855 {
856     ssize_t          res;
857     nxt_port_msg_t   msg;
858     nxt_unit_impl_t  *lib;
859 
860     union {
861         struct cmsghdr  cm;
862         char            space[CMSG_SPACE(sizeof(int))];
863     } cmsg;
864 
865     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
866 
867     msg.stream = stream;
868     msg.pid = lib->pid;
869     msg.reply_port = 0;
870     msg.type = _NXT_PORT_MSG_PROCESS_READY;
871     msg.last = 1;
872     msg.mmap = 0;
873     msg.nf = 0;
874     msg.mf = 0;
875     msg.tracking = 0;
876 
877     memset(&cmsg, 0, sizeof(cmsg));
878 
879     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
880     cmsg.cm.cmsg_level = SOL_SOCKET;
881     cmsg.cm.cmsg_type = SCM_RIGHTS;
882 
883     /*
884      * memcpy() is used instead of simple
885      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
886      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
887      *   dereferencing type-punned pointer will break strict-aliasing rules
888      *
889      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
890      * in the same simple assignment as in the code above.
891      */
892     memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int));
893 
894     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg),
895                            &cmsg, sizeof(cmsg));
896     if (res != sizeof(msg)) {
897         return NXT_UNIT_ERROR;
898     }
899 
900     return NXT_UNIT_OK;
901 }
902 
903 
904 static int
905 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
906 {
907     int                  rc;
908     pid_t                pid;
909     struct cmsghdr       *cm;
910     nxt_port_msg_t       *port_msg;
911     nxt_unit_impl_t      *lib;
912     nxt_unit_recv_msg_t  recv_msg;
913 
914     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
915 
916     recv_msg.fd[0] = -1;
917     recv_msg.fd[1] = -1;
918     port_msg = (nxt_port_msg_t *) rbuf->buf;
919     cm = (struct cmsghdr *) rbuf->oob;
920 
921     if (cm->cmsg_level == SOL_SOCKET
922         && cm->cmsg_type == SCM_RIGHTS)
923     {
924         if (cm->cmsg_len == CMSG_LEN(sizeof(int))) {
925             memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int));
926         }
927 
928         if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) {
929             memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2);
930         }
931     }
932 
933     recv_msg.incoming_buf = NULL;
934 
935     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
936         if (nxt_slow_path(rbuf->size == 0)) {
937             nxt_unit_debug(ctx, "read port closed");
938 
939             nxt_unit_quit(ctx);
940             rc = NXT_UNIT_OK;
941             goto done;
942         }
943 
944         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
945 
946         rc = NXT_UNIT_ERROR;
947         goto done;
948     }
949 
950     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
951                    port_msg->stream, (int) port_msg->type,
952                    recv_msg.fd[0], recv_msg.fd[1]);
953 
954     recv_msg.stream = port_msg->stream;
955     recv_msg.pid = port_msg->pid;
956     recv_msg.reply_port = port_msg->reply_port;
957     recv_msg.last = port_msg->last;
958     recv_msg.mmap = port_msg->mmap;
959 
960     recv_msg.start = port_msg + 1;
961     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
962 
963     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
964         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
965                        port_msg->stream, (int) port_msg->type);
966         rc = NXT_UNIT_ERROR;
967         goto done;
968     }
969 
970     /* Fragmentation is unsupported. */
971     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
972         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
973                        port_msg->stream, (int) port_msg->type);
974         rc = NXT_UNIT_ERROR;
975         goto done;
976     }
977 
978     if (port_msg->mmap) {
979         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
980 
981         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
982             if (rc == NXT_UNIT_AGAIN) {
983                 recv_msg.fd[0] = -1;
984                 recv_msg.fd[1] = -1;
985             }
986 
987             goto done;
988         }
989     }
990 
991     switch (port_msg->type) {
992 
993     case _NXT_PORT_MSG_RPC_READY:
994         rc = NXT_UNIT_OK;
995         break;
996 
997     case _NXT_PORT_MSG_QUIT:
998         nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
999 
1000         nxt_unit_quit(ctx);
1001         rc = NXT_UNIT_OK;
1002         break;
1003 
1004     case _NXT_PORT_MSG_NEW_PORT:
1005         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1006         break;
1007 
1008     case _NXT_PORT_MSG_PORT_ACK:
1009         rc = nxt_unit_ctx_ready(ctx);
1010         break;
1011 
1012     case _NXT_PORT_MSG_CHANGE_FILE:
1013         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1014                        port_msg->stream, recv_msg.fd[0]);
1015 
1016         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1017             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1018                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1019                            strerror(errno), errno);
1020 
1021             rc = NXT_UNIT_ERROR;
1022             goto done;
1023         }
1024 
1025         rc = NXT_UNIT_OK;
1026         break;
1027 
1028     case _NXT_PORT_MSG_MMAP:
1029         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1030             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1031                            port_msg->stream, recv_msg.fd[0]);
1032 
1033             rc = NXT_UNIT_ERROR;
1034             goto done;
1035         }
1036 
1037         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1038         break;
1039 
1040     case _NXT_PORT_MSG_REQ_HEADERS:
1041         rc = nxt_unit_process_req_headers(ctx, &recv_msg);
1042         break;
1043 
1044     case _NXT_PORT_MSG_REQ_BODY:
1045         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1046         break;
1047 
1048     case _NXT_PORT_MSG_WEBSOCKET:
1049         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1050         break;
1051 
1052     case _NXT_PORT_MSG_REMOVE_PID:
1053         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1054             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1055                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1056                            (int) sizeof(pid));
1057 
1058             rc = NXT_UNIT_ERROR;
1059             goto done;
1060         }
1061 
1062         memcpy(&pid, recv_msg.start, sizeof(pid));
1063 
1064         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1065                        port_msg->stream, (int) pid);
1066 
1067         nxt_unit_remove_pid(lib, pid);
1068 
1069         rc = NXT_UNIT_OK;
1070         break;
1071 
1072     case _NXT_PORT_MSG_SHM_ACK:
1073         rc = nxt_unit_process_shm_ack(ctx);
1074         break;
1075 
1076     default:
1077         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1078                        port_msg->stream, (int) port_msg->type);
1079 
1080         rc = NXT_UNIT_ERROR;
1081         goto done;
1082     }
1083 
1084 done:
1085 
1086     if (recv_msg.fd[0] != -1) {
1087         nxt_unit_close(recv_msg.fd[0]);
1088     }
1089 
1090     if (recv_msg.fd[1] != -1) {
1091         nxt_unit_close(recv_msg.fd[1]);
1092     }
1093 
1094     while (recv_msg.incoming_buf != NULL) {
1095         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1096     }
1097 
1098     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1099 #if (NXT_DEBUG)
1100         memset(rbuf->buf, 0xAC, rbuf->size);
1101 #endif
1102         nxt_unit_read_buf_release(ctx, rbuf);
1103     }
1104 
1105     return rc;
1106 }
1107 
1108 
1109 static int
1110 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1111 {
1112     void                     *mem;
1113     nxt_unit_impl_t          *lib;
1114     nxt_unit_port_t          new_port, *port;
1115     nxt_port_msg_new_port_t  *new_port_msg;
1116 
1117     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1118         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1119                       "invalid message size (%d)",
1120                       recv_msg->stream, (int) recv_msg->size);
1121 
1122         return NXT_UNIT_ERROR;
1123     }
1124 
1125     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1126         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1127                        recv_msg->stream, recv_msg->fd[0]);
1128 
1129         return NXT_UNIT_ERROR;
1130     }
1131 
1132     new_port_msg = recv_msg->start;
1133 
1134     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1135                    recv_msg->stream, (int) new_port_msg->pid,
1136                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1137 
1138     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1139 
1140     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1141         nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1142 
1143         new_port.in_fd = recv_msg->fd[0];
1144         new_port.out_fd = -1;
1145 
1146         mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
1147                    MAP_SHARED, recv_msg->fd[1], 0);
1148 
1149     } else {
1150         if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
1151                           != NXT_UNIT_OK))
1152         {
1153             return NXT_UNIT_ERROR;
1154         }
1155 
1156         nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1157                               new_port_msg->id);
1158 
1159         new_port.in_fd = -1;
1160         new_port.out_fd = recv_msg->fd[0];
1161 
1162         mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1163                    MAP_SHARED, recv_msg->fd[1], 0);
1164     }
1165 
1166     if (nxt_slow_path(mem == MAP_FAILED)) {
1167         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1168                        strerror(errno), errno);
1169 
1170         return NXT_UNIT_ERROR;
1171     }
1172 
1173     new_port.data = NULL;
1174 
1175     recv_msg->fd[0] = -1;
1176 
1177     port = nxt_unit_add_port(ctx, &new_port, mem);
1178     if (nxt_slow_path(port == NULL)) {
1179         return NXT_UNIT_ERROR;
1180     }
1181 
1182     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1183         lib->shared_port = port;
1184 
1185         return nxt_unit_ctx_ready(ctx);
1186     }
1187 
1188     nxt_unit_port_release(port);
1189 
1190     return NXT_UNIT_OK;
1191 }
1192 
1193 
1194 static int
1195 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1196 {
1197     nxt_unit_impl_t      *lib;
1198     nxt_unit_ctx_impl_t  *ctx_impl;
1199 
1200     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1201     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1202 
1203     ctx_impl->ready = 1;
1204 
1205     if (lib->callbacks.ready_handler) {
1206         return lib->callbacks.ready_handler(ctx);
1207     }
1208 
1209     return NXT_UNIT_OK;
1210 }
1211 
1212 
1213 static int
1214 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1215 {
1216     int                           res;
1217     nxt_unit_impl_t               *lib;
1218     nxt_unit_port_id_t            port_id;
1219     nxt_unit_request_t            *r;
1220     nxt_unit_mmap_buf_t           *b;
1221     nxt_unit_request_info_t       *req;
1222     nxt_unit_request_info_impl_t  *req_impl;
1223 
1224     if (nxt_slow_path(recv_msg->mmap == 0)) {
1225         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1226                       recv_msg->stream);
1227 
1228         return NXT_UNIT_ERROR;
1229     }
1230 
1231     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1232         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1233                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1234                       (int) sizeof(nxt_unit_request_t));
1235 
1236         return NXT_UNIT_ERROR;
1237     }
1238 
1239     req_impl = nxt_unit_request_info_get(ctx);
1240     if (nxt_slow_path(req_impl == NULL)) {
1241         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1242                       recv_msg->stream);
1243 
1244         return NXT_UNIT_ERROR;
1245     }
1246 
1247     req = &req_impl->req;
1248 
1249     req->request = recv_msg->start;
1250 
1251     b = recv_msg->incoming_buf;
1252 
1253     req->request_buf = &b->buf;
1254     req->response = NULL;
1255     req->response_buf = NULL;
1256 
1257     r = req->request;
1258 
1259     req->content_length = r->content_length;
1260 
1261     req->content_buf = req->request_buf;
1262     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1263 
1264     req_impl->stream = recv_msg->stream;
1265 
1266     req_impl->outgoing_buf = NULL;
1267 
1268     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1269         b->req = req;
1270     }
1271 
1272     /* "Move" incoming buffer list to req_impl. */
1273     req_impl->incoming_buf = recv_msg->incoming_buf;
1274     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1275     recv_msg->incoming_buf = NULL;
1276 
1277     req->content_fd = recv_msg->fd[0];
1278     recv_msg->fd[0] = -1;
1279 
1280     req->response_max_fields = 0;
1281     req_impl->state = NXT_UNIT_RS_START;
1282     req_impl->websocket = 0;
1283     req_impl->in_hash = 0;
1284 
1285     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1286                    (int) r->method_length,
1287                    (char *) nxt_unit_sptr_get(&r->method),
1288                    (int) r->target_length,
1289                    (char *) nxt_unit_sptr_get(&r->target),
1290                    (int) r->content_length);
1291 
1292     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1293 
1294     res = nxt_unit_request_check_response_port(req, &port_id);
1295     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1296         return NXT_UNIT_ERROR;
1297     }
1298 
1299     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1300         res = nxt_unit_send_req_headers_ack(req);
1301         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1302             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1303 
1304             return NXT_UNIT_ERROR;
1305         }
1306 
1307         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1308 
1309         if (req->content_length
1310             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1311         {
1312             res = nxt_unit_request_hash_add(ctx, req);
1313             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1314                 nxt_unit_req_warn(req, "failed to add request to hash");
1315 
1316                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1317 
1318                 return NXT_UNIT_ERROR;
1319             }
1320 
1321             /*
1322              * If application have separate data handler, we may start
1323              * request processing and process data when it is arrived.
1324              */
1325             if (lib->callbacks.data_handler == NULL) {
1326                 return NXT_UNIT_OK;
1327             }
1328         }
1329 
1330         lib->callbacks.request_handler(req);
1331     }
1332 
1333     return NXT_UNIT_OK;
1334 }
1335 
1336 
1337 static int
1338 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1339 {
1340     uint64_t                 l;
1341     nxt_unit_impl_t          *lib;
1342     nxt_unit_mmap_buf_t      *b;
1343     nxt_unit_request_info_t  *req;
1344 
1345     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1346     if (req == NULL) {
1347         return NXT_UNIT_OK;
1348     }
1349 
1350     l = req->content_buf->end - req->content_buf->free;
1351 
1352     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1353         b->req = req;
1354         l += b->buf.end - b->buf.free;
1355     }
1356 
1357     if (recv_msg->incoming_buf != NULL) {
1358         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1359 
1360         while (b->next != NULL) {
1361             b = b->next;
1362         }
1363 
1364         /* "Move" incoming buffer list to req_impl. */
1365         b->next = recv_msg->incoming_buf;
1366         b->next->prev = &b->next;
1367 
1368         recv_msg->incoming_buf = NULL;
1369     }
1370 
1371     req->content_fd = recv_msg->fd[0];
1372     recv_msg->fd[0] = -1;
1373 
1374     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1375 
1376     if (lib->callbacks.data_handler != NULL) {
1377         lib->callbacks.data_handler(req);
1378 
1379         return NXT_UNIT_OK;
1380     }
1381 
1382     if (req->content_fd != -1 || l == req->content_length) {
1383         lib->callbacks.request_handler(req);
1384     }
1385 
1386     return NXT_UNIT_OK;
1387 }
1388 
1389 
1390 static int
1391 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1392     nxt_unit_port_id_t *port_id)
1393 {
1394     int                           res;
1395     nxt_unit_ctx_t                *ctx;
1396     nxt_unit_impl_t               *lib;
1397     nxt_unit_port_t               *port;
1398     nxt_unit_process_t            *process;
1399     nxt_unit_ctx_impl_t           *ctx_impl;
1400     nxt_unit_port_impl_t          *port_impl;
1401     nxt_unit_request_info_impl_t  *req_impl;
1402 
1403     ctx = req->ctx;
1404     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1405     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1406 
1407     pthread_mutex_lock(&lib->mutex);
1408 
1409     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1410     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1411 
1412     if (nxt_fast_path(port != NULL)) {
1413         req->response_port = port;
1414 
1415         if (nxt_fast_path(port_impl->ready)) {
1416             pthread_mutex_unlock(&lib->mutex);
1417 
1418             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1419                            (int) port->id.pid, (int) port->id.id);
1420 
1421             return NXT_UNIT_OK;
1422         }
1423 
1424         nxt_unit_debug(ctx, "check_response_port: "
1425                        "port{%d,%d} already requested",
1426                        (int) port->id.pid, (int) port->id.id);
1427 
1428         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1429 
1430         nxt_queue_insert_tail(&port_impl->awaiting_req,
1431                               &req_impl->port_wait_link);
1432 
1433         pthread_mutex_unlock(&lib->mutex);
1434 
1435         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1436 
1437         return NXT_UNIT_AGAIN;
1438     }
1439 
1440     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1441     if (nxt_slow_path(port_impl == NULL)) {
1442         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1443                        (int) sizeof(nxt_unit_port_impl_t));
1444 
1445         pthread_mutex_unlock(&lib->mutex);
1446 
1447         return NXT_UNIT_ERROR;
1448     }
1449 
1450     port = &port_impl->port;
1451 
1452     port->id = *port_id;
1453     port->in_fd = -1;
1454     port->out_fd = -1;
1455     port->data = NULL;
1456 
1457     res = nxt_unit_port_hash_add(&lib->ports, port);
1458     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1459         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1460                        port->id.pid, port->id.id);
1461 
1462         pthread_mutex_unlock(&lib->mutex);
1463 
1464         nxt_unit_free(ctx, port);
1465 
1466         return NXT_UNIT_ERROR;
1467     }
1468 
1469     process = nxt_unit_process_find(lib, port_id->pid, 0);
1470     if (nxt_slow_path(process == NULL)) {
1471         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1472                        port->id.pid);
1473 
1474         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1475 
1476         pthread_mutex_unlock(&lib->mutex);
1477 
1478         nxt_unit_free(ctx, port);
1479 
1480         return NXT_UNIT_ERROR;
1481     }
1482 
1483     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1484 
1485     port_impl->process = process;
1486     port_impl->queue = NULL;
1487     port_impl->from_socket = 0;
1488     port_impl->socket_rbuf = NULL;
1489 
1490     nxt_queue_init(&port_impl->awaiting_req);
1491 
1492     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1493 
1494     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1495 
1496     port_impl->use_count = 2;
1497     port_impl->ready = 0;
1498 
1499     req->response_port = port;
1500 
1501     pthread_mutex_unlock(&lib->mutex);
1502 
1503     res = nxt_unit_get_port(ctx, port_id);
1504     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1505         return NXT_UNIT_ERROR;
1506     }
1507 
1508     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1509 
1510     return NXT_UNIT_AGAIN;
1511 }
1512 
1513 
1514 static int
1515 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1516 {
1517     ssize_t                       res;
1518     nxt_port_msg_t                msg;
1519     nxt_unit_impl_t               *lib;
1520     nxt_unit_ctx_impl_t           *ctx_impl;
1521     nxt_unit_request_info_impl_t  *req_impl;
1522 
1523     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1524     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1525     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1526 
1527     memset(&msg, 0, sizeof(nxt_port_msg_t));
1528 
1529     msg.stream = req_impl->stream;
1530     msg.pid = lib->pid;
1531     msg.reply_port = ctx_impl->read_port->id.id;
1532     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1533 
1534     res = nxt_unit_port_send(req->ctx, req->response_port,
1535                              &msg, sizeof(msg), NULL, 0);
1536     if (nxt_slow_path(res != sizeof(msg))) {
1537         return NXT_UNIT_ERROR;
1538     }
1539 
1540     return NXT_UNIT_OK;
1541 }
1542 
1543 
1544 static int
1545 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1546 {
1547     size_t                           hsize;
1548     nxt_unit_impl_t                  *lib;
1549     nxt_unit_mmap_buf_t              *b;
1550     nxt_unit_callbacks_t             *cb;
1551     nxt_unit_request_info_t          *req;
1552     nxt_unit_request_info_impl_t     *req_impl;
1553     nxt_unit_websocket_frame_impl_t  *ws_impl;
1554 
1555     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1556     if (nxt_slow_path(req == NULL)) {
1557         return NXT_UNIT_OK;
1558     }
1559 
1560     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1561 
1562     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1563     cb = &lib->callbacks;
1564 
1565     if (cb->websocket_handler && recv_msg->size >= 2) {
1566         ws_impl = nxt_unit_websocket_frame_get(ctx);
1567         if (nxt_slow_path(ws_impl == NULL)) {
1568             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1569                           req_impl->stream);
1570 
1571             return NXT_UNIT_ERROR;
1572         }
1573 
1574         ws_impl->ws.req = req;
1575 
1576         ws_impl->buf = NULL;
1577 
1578         if (recv_msg->mmap) {
1579             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1580                 b->req = req;
1581             }
1582 
1583             /* "Move" incoming buffer list to ws_impl. */
1584             ws_impl->buf = recv_msg->incoming_buf;
1585             ws_impl->buf->prev = &ws_impl->buf;
1586             recv_msg->incoming_buf = NULL;
1587 
1588             b = ws_impl->buf;
1589 
1590         } else {
1591             b = nxt_unit_mmap_buf_get(ctx);
1592             if (nxt_slow_path(b == NULL)) {
1593                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1594                                req_impl->stream);
1595 
1596                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1597 
1598                 return NXT_UNIT_ERROR;
1599             }
1600 
1601             b->req = req;
1602             b->buf.start = recv_msg->start;
1603             b->buf.free = b->buf.start;
1604             b->buf.end = b->buf.start + recv_msg->size;
1605 
1606             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1607         }
1608 
1609         ws_impl->ws.header = (void *) b->buf.start;
1610         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1611             ws_impl->ws.header);
1612 
1613         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1614 
1615         if (ws_impl->ws.header->mask) {
1616             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1617 
1618         } else {
1619             ws_impl->ws.mask = NULL;
1620         }
1621 
1622         b->buf.free += hsize;
1623 
1624         ws_impl->ws.content_buf = &b->buf;
1625         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1626 
1627         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1628                            "payload_len=%"PRIu64,
1629                             ws_impl->ws.header->opcode,
1630                             ws_impl->ws.payload_len);
1631 
1632         cb->websocket_handler(&ws_impl->ws);
1633     }
1634 
1635     if (recv_msg->last) {
1636         req_impl->websocket = 0;
1637 
1638         if (cb->close_handler) {
1639             nxt_unit_req_debug(req, "close_handler");
1640 
1641             cb->close_handler(req);
1642 
1643         } else {
1644             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1645         }
1646     }
1647 
1648     return NXT_UNIT_OK;
1649 }
1650 
1651 
1652 static int
1653 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1654 {
1655     nxt_unit_impl_t       *lib;
1656     nxt_unit_callbacks_t  *cb;
1657 
1658     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1659     cb = &lib->callbacks;
1660 
1661     if (cb->shm_ack_handler != NULL) {
1662         cb->shm_ack_handler(ctx);
1663     }
1664 
1665     return NXT_UNIT_OK;
1666 }
1667 
1668 
1669 static nxt_unit_request_info_impl_t *
1670 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1671 {
1672     nxt_unit_impl_t               *lib;
1673     nxt_queue_link_t              *lnk;
1674     nxt_unit_ctx_impl_t           *ctx_impl;
1675     nxt_unit_request_info_impl_t  *req_impl;
1676 
1677     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1678 
1679     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1680 
1681     pthread_mutex_lock(&ctx_impl->mutex);
1682 
1683     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1684         pthread_mutex_unlock(&ctx_impl->mutex);
1685 
1686         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1687                                         + lib->request_data_size);
1688         if (nxt_slow_path(req_impl == NULL)) {
1689             return NULL;
1690         }
1691 
1692         req_impl->req.unit = ctx->unit;
1693         req_impl->req.ctx = ctx;
1694 
1695         pthread_mutex_lock(&ctx_impl->mutex);
1696 
1697     } else {
1698         lnk = nxt_queue_first(&ctx_impl->free_req);
1699         nxt_queue_remove(lnk);
1700 
1701         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1702     }
1703 
1704     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1705 
1706     pthread_mutex_unlock(&ctx_impl->mutex);
1707 
1708     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1709 
1710     return req_impl;
1711 }
1712 
1713 
1714 static void
1715 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1716 {
1717     nxt_unit_ctx_impl_t           *ctx_impl;
1718     nxt_unit_request_info_impl_t  *req_impl;
1719 
1720     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1721     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1722 
1723     req->response = NULL;
1724     req->response_buf = NULL;
1725 
1726     if (req_impl->in_hash) {
1727         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1728     }
1729 
1730     req_impl->websocket = 0;
1731 
1732     while (req_impl->outgoing_buf != NULL) {
1733         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1734     }
1735 
1736     while (req_impl->incoming_buf != NULL) {
1737         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1738     }
1739 
1740     if (req->content_fd != -1) {
1741         nxt_unit_close(req->content_fd);
1742 
1743         req->content_fd = -1;
1744     }
1745 
1746     if (req->response_port != NULL) {
1747         nxt_unit_port_release(req->response_port);
1748 
1749         req->response_port = NULL;
1750     }
1751 
1752     pthread_mutex_lock(&ctx_impl->mutex);
1753 
1754     nxt_queue_remove(&req_impl->link);
1755 
1756     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1757 
1758     pthread_mutex_unlock(&ctx_impl->mutex);
1759 
1760     req_impl->state = NXT_UNIT_RS_RELEASED;
1761 }
1762 
1763 
1764 static void
1765 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1766 {
1767     nxt_unit_ctx_impl_t  *ctx_impl;
1768 
1769     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1770 
1771     nxt_queue_remove(&req_impl->link);
1772 
1773     if (req_impl != &ctx_impl->req) {
1774         nxt_unit_free(&ctx_impl->ctx, req_impl);
1775     }
1776 }
1777 
1778 
1779 static nxt_unit_websocket_frame_impl_t *
1780 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1781 {
1782     nxt_queue_link_t                 *lnk;
1783     nxt_unit_ctx_impl_t              *ctx_impl;
1784     nxt_unit_websocket_frame_impl_t  *ws_impl;
1785 
1786     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1787 
1788     pthread_mutex_lock(&ctx_impl->mutex);
1789 
1790     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1791         pthread_mutex_unlock(&ctx_impl->mutex);
1792 
1793         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1794         if (nxt_slow_path(ws_impl == NULL)) {
1795             return NULL;
1796         }
1797 
1798     } else {
1799         lnk = nxt_queue_first(&ctx_impl->free_ws);
1800         nxt_queue_remove(lnk);
1801 
1802         pthread_mutex_unlock(&ctx_impl->mutex);
1803 
1804         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1805     }
1806 
1807     ws_impl->ctx_impl = ctx_impl;
1808 
1809     return ws_impl;
1810 }
1811 
1812 
1813 static void
1814 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1815 {
1816     nxt_unit_websocket_frame_impl_t  *ws_impl;
1817 
1818     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1819 
1820     while (ws_impl->buf != NULL) {
1821         nxt_unit_mmap_buf_free(ws_impl->buf);
1822     }
1823 
1824     ws->req = NULL;
1825 
1826     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1827 
1828     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1829 
1830     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1831 }
1832 
1833 
1834 static void
1835 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1836     nxt_unit_websocket_frame_impl_t *ws_impl)
1837 {
1838     nxt_queue_remove(&ws_impl->link);
1839 
1840     nxt_unit_free(ctx, ws_impl);
1841 }
1842 
1843 
1844 uint16_t
1845 nxt_unit_field_hash(const char *name, size_t name_length)
1846 {
1847     u_char      ch;
1848     uint32_t    hash;
1849     const char  *p, *end;
1850 
1851     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1852     end = name + name_length;
1853 
1854     for (p = name; p < end; p++) {
1855         ch = *p;
1856         hash = (hash << 4) + hash + nxt_lowcase(ch);
1857     }
1858 
1859     hash = (hash >> 16) ^ hash;
1860 
1861     return hash;
1862 }
1863 
1864 
1865 void
1866 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1867 {
1868     char                *name;
1869     uint32_t            i, j;
1870     nxt_unit_field_t    *fields, f;
1871     nxt_unit_request_t  *r;
1872 
1873     static nxt_str_t  content_length = nxt_string("content-length");
1874     static nxt_str_t  content_type = nxt_string("content-type");
1875     static nxt_str_t  cookie = nxt_string("cookie");
1876 
1877     nxt_unit_req_debug(req, "group_dup_fields");
1878 
1879     r = req->request;
1880     fields = r->fields;
1881 
1882     for (i = 0; i < r->fields_count; i++) {
1883         name = nxt_unit_sptr_get(&fields[i].name);
1884 
1885         switch (fields[i].hash) {
1886         case NXT_UNIT_HASH_CONTENT_LENGTH:
1887             if (fields[i].name_length == content_length.length
1888                 && nxt_unit_memcasecmp(name, content_length.start,
1889                                        content_length.length) == 0)
1890             {
1891                 r->content_length_field = i;
1892             }
1893 
1894             break;
1895 
1896         case NXT_UNIT_HASH_CONTENT_TYPE:
1897             if (fields[i].name_length == content_type.length
1898                 && nxt_unit_memcasecmp(name, content_type.start,
1899                                        content_type.length) == 0)
1900             {
1901                 r->content_type_field = i;
1902             }
1903 
1904             break;
1905 
1906         case NXT_UNIT_HASH_COOKIE:
1907             if (fields[i].name_length == cookie.length
1908                 && nxt_unit_memcasecmp(name, cookie.start,
1909                                        cookie.length) == 0)
1910             {
1911                 r->cookie_field = i;
1912             }
1913 
1914             break;
1915         }
1916 
1917         for (j = i + 1; j < r->fields_count; j++) {
1918             if (fields[i].hash != fields[j].hash
1919                 || fields[i].name_length != fields[j].name_length
1920                 || nxt_unit_memcasecmp(name,
1921                                        nxt_unit_sptr_get(&fields[j].name),
1922                                        fields[j].name_length) != 0)
1923             {
1924                 continue;
1925             }
1926 
1927             f = fields[j];
1928             f.value.offset += (j - (i + 1)) * sizeof(f);
1929 
1930             while (j > i + 1) {
1931                 fields[j] = fields[j - 1];
1932                 fields[j].name.offset -= sizeof(f);
1933                 fields[j].value.offset -= sizeof(f);
1934                 j--;
1935             }
1936 
1937             fields[j] = f;
1938 
1939             /* Assign the same name pointer for further grouping simplicity. */
1940             nxt_unit_sptr_set(&fields[j].name, name);
1941 
1942             i++;
1943         }
1944     }
1945 }
1946 
1947 
1948 int
1949 nxt_unit_response_init(nxt_unit_request_info_t *req,
1950     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
1951 {
1952     uint32_t                      buf_size;
1953     nxt_unit_buf_t                *buf;
1954     nxt_unit_request_info_impl_t  *req_impl;
1955 
1956     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1957 
1958     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1959         nxt_unit_req_warn(req, "init: response already sent");
1960 
1961         return NXT_UNIT_ERROR;
1962     }
1963 
1964     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
1965                        (int) max_fields_count, (int) max_fields_size);
1966 
1967     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
1968         nxt_unit_req_debug(req, "duplicate response init");
1969     }
1970 
1971     /*
1972      * Each field name and value 0-terminated by libunit,
1973      * this is the reason of '+ 2' below.
1974      */
1975     buf_size = sizeof(nxt_unit_response_t)
1976                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
1977                + max_fields_size;
1978 
1979     if (nxt_slow_path(req->response_buf != NULL)) {
1980         buf = req->response_buf;
1981 
1982         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
1983             goto init_response;
1984         }
1985 
1986         nxt_unit_buf_free(buf);
1987 
1988         req->response_buf = NULL;
1989         req->response = NULL;
1990         req->response_max_fields = 0;
1991 
1992         req_impl->state = NXT_UNIT_RS_START;
1993     }
1994 
1995     buf = nxt_unit_response_buf_alloc(req, buf_size);
1996     if (nxt_slow_path(buf == NULL)) {
1997         return NXT_UNIT_ERROR;
1998     }
1999 
2000 init_response:
2001 
2002     memset(buf->start, 0, sizeof(nxt_unit_response_t));
2003 
2004     req->response_buf = buf;
2005 
2006     req->response = (nxt_unit_response_t *) buf->start;
2007     req->response->status = status;
2008 
2009     buf->free = buf->start + sizeof(nxt_unit_response_t)
2010                 + max_fields_count * sizeof(nxt_unit_field_t);
2011 
2012     req->response_max_fields = max_fields_count;
2013     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2014 
2015     return NXT_UNIT_OK;
2016 }
2017 
2018 
2019 int
2020 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2021     uint32_t max_fields_count, uint32_t max_fields_size)
2022 {
2023     char                          *p;
2024     uint32_t                      i, buf_size;
2025     nxt_unit_buf_t                *buf;
2026     nxt_unit_field_t              *f, *src;
2027     nxt_unit_response_t           *resp;
2028     nxt_unit_request_info_impl_t  *req_impl;
2029 
2030     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2031 
2032     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2033         nxt_unit_req_warn(req, "realloc: response not init");
2034 
2035         return NXT_UNIT_ERROR;
2036     }
2037 
2038     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2039         nxt_unit_req_warn(req, "realloc: response already sent");
2040 
2041         return NXT_UNIT_ERROR;
2042     }
2043 
2044     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2045         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2046 
2047         return NXT_UNIT_ERROR;
2048     }
2049 
2050     /*
2051      * Each field name and value 0-terminated by libunit,
2052      * this is the reason of '+ 2' below.
2053      */
2054     buf_size = sizeof(nxt_unit_response_t)
2055                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2056                + max_fields_size;
2057 
2058     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2059 
2060     buf = nxt_unit_response_buf_alloc(req, buf_size);
2061     if (nxt_slow_path(buf == NULL)) {
2062         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2063         return NXT_UNIT_ERROR;
2064     }
2065 
2066     resp = (nxt_unit_response_t *) buf->start;
2067 
2068     memset(resp, 0, sizeof(nxt_unit_response_t));
2069 
2070     resp->status = req->response->status;
2071     resp->content_length = req->response->content_length;
2072 
2073     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2074     f = resp->fields;
2075 
2076     for (i = 0; i < req->response->fields_count; i++) {
2077         src = req->response->fields + i;
2078 
2079         if (nxt_slow_path(src->skip != 0)) {
2080             continue;
2081         }
2082 
2083         if (nxt_slow_path(src->name_length + src->value_length + 2
2084                           > (uint32_t) (buf->end - p)))
2085         {
2086             nxt_unit_req_warn(req, "realloc: not enough space for field"
2087                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2088                   i, src, src->name_length, src->value_length);
2089 
2090             goto fail;
2091         }
2092 
2093         nxt_unit_sptr_set(&f->name, p);
2094         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2095         *p++ = '\0';
2096 
2097         nxt_unit_sptr_set(&f->value, p);
2098         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2099         *p++ = '\0';
2100 
2101         f->hash = src->hash;
2102         f->skip = 0;
2103         f->name_length = src->name_length;
2104         f->value_length = src->value_length;
2105 
2106         resp->fields_count++;
2107         f++;
2108     }
2109 
2110     if (req->response->piggyback_content_length > 0) {
2111         if (nxt_slow_path(req->response->piggyback_content_length
2112                           > (uint32_t) (buf->end - p)))
2113         {
2114             nxt_unit_req_warn(req, "realloc: not enought space for content"
2115                   " #%"PRIu32", %"PRIu32" required",
2116                   i, req->response->piggyback_content_length);
2117 
2118             goto fail;
2119         }
2120 
2121         resp->piggyback_content_length =
2122                                        req->response->piggyback_content_length;
2123 
2124         nxt_unit_sptr_set(&resp->piggyback_content, p);
2125         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2126                        req->response->piggyback_content_length);
2127     }
2128 
2129     buf->free = p;
2130 
2131     nxt_unit_buf_free(req->response_buf);
2132 
2133     req->response = resp;
2134     req->response_buf = buf;
2135     req->response_max_fields = max_fields_count;
2136 
2137     return NXT_UNIT_OK;
2138 
2139 fail:
2140 
2141     nxt_unit_buf_free(buf);
2142 
2143     return NXT_UNIT_ERROR;
2144 }
2145 
2146 
2147 int
2148 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2149 {
2150     nxt_unit_request_info_impl_t  *req_impl;
2151 
2152     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2153 
2154     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2155 }
2156 
2157 
2158 int
2159 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2160     const char *name, uint8_t name_length,
2161     const char *value, uint32_t value_length)
2162 {
2163     nxt_unit_buf_t                *buf;
2164     nxt_unit_field_t              *f;
2165     nxt_unit_response_t           *resp;
2166     nxt_unit_request_info_impl_t  *req_impl;
2167 
2168     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2169 
2170     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2171         nxt_unit_req_warn(req, "add_field: response not initialized or "
2172                           "already sent");
2173 
2174         return NXT_UNIT_ERROR;
2175     }
2176 
2177     resp = req->response;
2178 
2179     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2180         nxt_unit_req_warn(req, "add_field: too many response fields");
2181 
2182         return NXT_UNIT_ERROR;
2183     }
2184 
2185     buf = req->response_buf;
2186 
2187     if (nxt_slow_path(name_length + value_length + 2
2188                       > (uint32_t) (buf->end - buf->free)))
2189     {
2190         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2191 
2192         return NXT_UNIT_ERROR;
2193     }
2194 
2195     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2196                        resp->fields_count,
2197                        (int) name_length, name,
2198                        (int) value_length, value);
2199 
2200     f = resp->fields + resp->fields_count;
2201 
2202     nxt_unit_sptr_set(&f->name, buf->free);
2203     buf->free = nxt_cpymem(buf->free, name, name_length);
2204     *buf->free++ = '\0';
2205 
2206     nxt_unit_sptr_set(&f->value, buf->free);
2207     buf->free = nxt_cpymem(buf->free, value, value_length);
2208     *buf->free++ = '\0';
2209 
2210     f->hash = nxt_unit_field_hash(name, name_length);
2211     f->skip = 0;
2212     f->name_length = name_length;
2213     f->value_length = value_length;
2214 
2215     resp->fields_count++;
2216 
2217     return NXT_UNIT_OK;
2218 }
2219 
2220 
2221 int
2222 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2223     const void* src, uint32_t size)
2224 {
2225     nxt_unit_buf_t                *buf;
2226     nxt_unit_response_t           *resp;
2227     nxt_unit_request_info_impl_t  *req_impl;
2228 
2229     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2230 
2231     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2232         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2233 
2234         return NXT_UNIT_ERROR;
2235     }
2236 
2237     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2238         nxt_unit_req_warn(req, "add_content: response already sent");
2239 
2240         return NXT_UNIT_ERROR;
2241     }
2242 
2243     buf = req->response_buf;
2244 
2245     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2246         nxt_unit_req_warn(req, "add_content: buffer overflow");
2247 
2248         return NXT_UNIT_ERROR;
2249     }
2250 
2251     resp = req->response;
2252 
2253     if (resp->piggyback_content_length == 0) {
2254         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2255         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2256     }
2257 
2258     resp->piggyback_content_length += size;
2259 
2260     buf->free = nxt_cpymem(buf->free, src, size);
2261 
2262     return NXT_UNIT_OK;
2263 }
2264 
2265 
2266 int
2267 nxt_unit_response_send(nxt_unit_request_info_t *req)
2268 {
2269     int                           rc;
2270     nxt_unit_mmap_buf_t           *mmap_buf;
2271     nxt_unit_request_info_impl_t  *req_impl;
2272 
2273     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2274 
2275     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2276         nxt_unit_req_warn(req, "send: response is not initialized yet");
2277 
2278         return NXT_UNIT_ERROR;
2279     }
2280 
2281     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2282         nxt_unit_req_warn(req, "send: response already sent");
2283 
2284         return NXT_UNIT_ERROR;
2285     }
2286 
2287     if (req->request->websocket_handshake && req->response->status == 101) {
2288         nxt_unit_response_upgrade(req);
2289     }
2290 
2291     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2292                        req->response->fields_count,
2293                        (int) (req->response_buf->free
2294                               - req->response_buf->start));
2295 
2296     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2297 
2298     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2299     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2300         req->response = NULL;
2301         req->response_buf = NULL;
2302         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2303 
2304         nxt_unit_mmap_buf_free(mmap_buf);
2305     }
2306 
2307     return rc;
2308 }
2309 
2310 
2311 int
2312 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2313 {
2314     nxt_unit_request_info_impl_t  *req_impl;
2315 
2316     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2317 
2318     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2319 }
2320 
2321 
2322 nxt_unit_buf_t *
2323 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2324 {
2325     int                           rc;
2326     nxt_unit_mmap_buf_t           *mmap_buf;
2327     nxt_unit_request_info_impl_t  *req_impl;
2328 
2329     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2330         nxt_unit_req_warn(req, "response_buf_alloc: "
2331                           "requested buffer (%"PRIu32") too big", size);
2332 
2333         return NULL;
2334     }
2335 
2336     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2337 
2338     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2339 
2340     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2341     if (nxt_slow_path(mmap_buf == NULL)) {
2342         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2343 
2344         return NULL;
2345     }
2346 
2347     mmap_buf->req = req;
2348 
2349     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2350 
2351     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2352                                    size, size, mmap_buf,
2353                                    NULL);
2354     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2355         nxt_unit_mmap_buf_release(mmap_buf);
2356 
2357         return NULL;
2358     }
2359 
2360     return &mmap_buf->buf;
2361 }
2362 
2363 
2364 static nxt_unit_mmap_buf_t *
2365 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2366 {
2367     nxt_unit_mmap_buf_t  *mmap_buf;
2368     nxt_unit_ctx_impl_t  *ctx_impl;
2369 
2370     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2371 
2372     pthread_mutex_lock(&ctx_impl->mutex);
2373 
2374     if (ctx_impl->free_buf == NULL) {
2375         pthread_mutex_unlock(&ctx_impl->mutex);
2376 
2377         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2378         if (nxt_slow_path(mmap_buf == NULL)) {
2379             return NULL;
2380         }
2381 
2382     } else {
2383         mmap_buf = ctx_impl->free_buf;
2384 
2385         nxt_unit_mmap_buf_unlink(mmap_buf);
2386 
2387         pthread_mutex_unlock(&ctx_impl->mutex);
2388     }
2389 
2390     mmap_buf->ctx_impl = ctx_impl;
2391 
2392     mmap_buf->hdr = NULL;
2393     mmap_buf->free_ptr = NULL;
2394 
2395     return mmap_buf;
2396 }
2397 
2398 
2399 static void
2400 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2401 {
2402     nxt_unit_mmap_buf_unlink(mmap_buf);
2403 
2404     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2405 
2406     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2407 
2408     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2409 }
2410 
2411 
2412 int
2413 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2414 {
2415     return req->request->websocket_handshake;
2416 }
2417 
2418 
2419 int
2420 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2421 {
2422     int                           rc;
2423     nxt_unit_request_info_impl_t  *req_impl;
2424 
2425     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2426 
2427     if (nxt_slow_path(req_impl->websocket != 0)) {
2428         nxt_unit_req_debug(req, "upgrade: already upgraded");
2429 
2430         return NXT_UNIT_OK;
2431     }
2432 
2433     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2434         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2435 
2436         return NXT_UNIT_ERROR;
2437     }
2438 
2439     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2440         nxt_unit_req_warn(req, "upgrade: response already sent");
2441 
2442         return NXT_UNIT_ERROR;
2443     }
2444 
2445     rc = nxt_unit_request_hash_add(req->ctx, req);
2446     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2447         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2448 
2449         return NXT_UNIT_ERROR;
2450     }
2451 
2452     req_impl->websocket = 1;
2453 
2454     req->response->status = 101;
2455 
2456     return NXT_UNIT_OK;
2457 }
2458 
2459 
2460 int
2461 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2462 {
2463     nxt_unit_request_info_impl_t  *req_impl;
2464 
2465     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2466 
2467     return req_impl->websocket;
2468 }
2469 
2470 
2471 nxt_unit_request_info_t *
2472 nxt_unit_get_request_info_from_data(void *data)
2473 {
2474     nxt_unit_request_info_impl_t  *req_impl;
2475 
2476     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2477 
2478     return &req_impl->req;
2479 }
2480 
2481 
2482 int
2483 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2484 {
2485     int                           rc;
2486     nxt_unit_mmap_buf_t           *mmap_buf;
2487     nxt_unit_request_info_t       *req;
2488     nxt_unit_request_info_impl_t  *req_impl;
2489 
2490     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2491 
2492     req = mmap_buf->req;
2493     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2494 
2495     nxt_unit_req_debug(req, "buf_send: %d bytes",
2496                        (int) (buf->free - buf->start));
2497 
2498     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2499         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2500 
2501         return NXT_UNIT_ERROR;
2502     }
2503 
2504     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2505         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2506 
2507         return NXT_UNIT_ERROR;
2508     }
2509 
2510     if (nxt_fast_path(buf->free > buf->start)) {
2511         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2512         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2513             return rc;
2514         }
2515     }
2516 
2517     nxt_unit_mmap_buf_free(mmap_buf);
2518 
2519     return NXT_UNIT_OK;
2520 }
2521 
2522 
2523 static void
2524 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2525 {
2526     int                      rc;
2527     nxt_unit_mmap_buf_t      *mmap_buf;
2528     nxt_unit_request_info_t  *req;
2529 
2530     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2531 
2532     req = mmap_buf->req;
2533 
2534     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2535     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2536         nxt_unit_mmap_buf_free(mmap_buf);
2537 
2538         nxt_unit_request_info_release(req);
2539 
2540     } else {
2541         nxt_unit_request_done(req, rc);
2542     }
2543 }
2544 
2545 
2546 static int
2547 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2548     nxt_unit_mmap_buf_t *mmap_buf, int last)
2549 {
2550     struct {
2551         nxt_port_msg_t       msg;
2552         nxt_port_mmap_msg_t  mmap_msg;
2553     } m;
2554 
2555     int                           rc;
2556     u_char                        *last_used, *first_free;
2557     ssize_t                       res;
2558     nxt_chunk_id_t                first_free_chunk;
2559     nxt_unit_buf_t                *buf;
2560     nxt_unit_impl_t               *lib;
2561     nxt_port_mmap_header_t        *hdr;
2562     nxt_unit_request_info_impl_t  *req_impl;
2563 
2564     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2565     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2566 
2567     buf = &mmap_buf->buf;
2568     hdr = mmap_buf->hdr;
2569 
2570     m.mmap_msg.size = buf->free - buf->start;
2571 
2572     m.msg.stream = req_impl->stream;
2573     m.msg.pid = lib->pid;
2574     m.msg.reply_port = 0;
2575     m.msg.type = _NXT_PORT_MSG_DATA;
2576     m.msg.last = last != 0;
2577     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2578     m.msg.nf = 0;
2579     m.msg.mf = 0;
2580     m.msg.tracking = 0;
2581 
2582     rc = NXT_UNIT_ERROR;
2583 
2584     if (m.msg.mmap) {
2585         m.mmap_msg.mmap_id = hdr->id;
2586         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2587                                                      (u_char *) buf->start);
2588 
2589         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2590                        req_impl->stream,
2591                        (int) m.mmap_msg.mmap_id,
2592                        (int) m.mmap_msg.chunk_id,
2593                        (int) m.mmap_msg.size);
2594 
2595         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2596                                  NULL, 0);
2597         if (nxt_slow_path(res != sizeof(m))) {
2598             goto free_buf;
2599         }
2600 
2601         last_used = (u_char *) buf->free - 1;
2602         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2603 
2604         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2605             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2606 
2607             buf->start = (char *) first_free;
2608             buf->free = buf->start;
2609 
2610             if (buf->end < buf->start) {
2611                 buf->end = buf->start;
2612             }
2613 
2614         } else {
2615             buf->start = NULL;
2616             buf->free = NULL;
2617             buf->end = NULL;
2618 
2619             mmap_buf->hdr = NULL;
2620         }
2621 
2622         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2623                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2624 
2625         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2626                        (int) lib->outgoing.allocated_chunks);
2627 
2628     } else {
2629         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2630                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2631         {
2632             nxt_unit_alert(req->ctx,
2633                            "#%"PRIu32": failed to send plain memory buffer"
2634                            ": no space reserved for message header",
2635                            req_impl->stream);
2636 
2637             goto free_buf;
2638         }
2639 
2640         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2641 
2642         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2643                        req_impl->stream,
2644                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2645 
2646         res = nxt_unit_port_send(req->ctx, req->response_port,
2647                                  buf->start - sizeof(m.msg),
2648                                  m.mmap_msg.size + sizeof(m.msg),
2649                                  NULL, 0);
2650         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2651             goto free_buf;
2652         }
2653     }
2654 
2655     rc = NXT_UNIT_OK;
2656 
2657 free_buf:
2658 
2659     nxt_unit_free_outgoing_buf(mmap_buf);
2660 
2661     return rc;
2662 }
2663 
2664 
2665 void
2666 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2667 {
2668     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2669 }
2670 
2671 
2672 static void
2673 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2674 {
2675     nxt_unit_free_outgoing_buf(mmap_buf);
2676 
2677     nxt_unit_mmap_buf_release(mmap_buf);
2678 }
2679 
2680 
2681 static void
2682 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2683 {
2684     if (mmap_buf->hdr != NULL) {
2685         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2686                               mmap_buf->hdr, mmap_buf->buf.start,
2687                               mmap_buf->buf.end - mmap_buf->buf.start);
2688 
2689         mmap_buf->hdr = NULL;
2690 
2691         return;
2692     }
2693 
2694     if (mmap_buf->free_ptr != NULL) {
2695         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2696 
2697         mmap_buf->free_ptr = NULL;
2698     }
2699 }
2700 
2701 
2702 static nxt_unit_read_buf_t *
2703 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2704 {
2705     nxt_unit_ctx_impl_t  *ctx_impl;
2706     nxt_unit_read_buf_t  *rbuf;
2707 
2708     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2709 
2710     pthread_mutex_lock(&ctx_impl->mutex);
2711 
2712     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2713 
2714     pthread_mutex_unlock(&ctx_impl->mutex);
2715 
2716     memset(rbuf->oob, 0, sizeof(struct cmsghdr));
2717 
2718     return rbuf;
2719 }
2720 
2721 
2722 static nxt_unit_read_buf_t *
2723 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2724 {
2725     nxt_queue_link_t     *link;
2726     nxt_unit_read_buf_t  *rbuf;
2727 
2728     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2729         link = nxt_queue_first(&ctx_impl->free_rbuf);
2730         nxt_queue_remove(link);
2731 
2732         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2733 
2734         return rbuf;
2735     }
2736 
2737     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2738 
2739     if (nxt_fast_path(rbuf != NULL)) {
2740         rbuf->ctx_impl = ctx_impl;
2741     }
2742 
2743     return rbuf;
2744 }
2745 
2746 
2747 static void
2748 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2749     nxt_unit_read_buf_t *rbuf)
2750 {
2751     nxt_unit_ctx_impl_t  *ctx_impl;
2752 
2753     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2754 
2755     pthread_mutex_lock(&ctx_impl->mutex);
2756 
2757     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2758 
2759     pthread_mutex_unlock(&ctx_impl->mutex);
2760 }
2761 
2762 
2763 nxt_unit_buf_t *
2764 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2765 {
2766     nxt_unit_mmap_buf_t  *mmap_buf;
2767 
2768     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2769 
2770     if (mmap_buf->next == NULL) {
2771         return NULL;
2772     }
2773 
2774     return &mmap_buf->next->buf;
2775 }
2776 
2777 
2778 uint32_t
2779 nxt_unit_buf_max(void)
2780 {
2781     return PORT_MMAP_DATA_SIZE;
2782 }
2783 
2784 
2785 uint32_t
2786 nxt_unit_buf_min(void)
2787 {
2788     return PORT_MMAP_CHUNK_SIZE;
2789 }
2790 
2791 
2792 int
2793 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2794     size_t size)
2795 {
2796     ssize_t  res;
2797 
2798     res = nxt_unit_response_write_nb(req, start, size, size);
2799 
2800     return res < 0 ? -res : NXT_UNIT_OK;
2801 }
2802 
2803 
2804 ssize_t
2805 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2806     size_t size, size_t min_size)
2807 {
2808     int                           rc;
2809     ssize_t                       sent;
2810     uint32_t                      part_size, min_part_size, buf_size;
2811     const char                    *part_start;
2812     nxt_unit_mmap_buf_t           mmap_buf;
2813     nxt_unit_request_info_impl_t  *req_impl;
2814     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2815 
2816     nxt_unit_req_debug(req, "write: %d", (int) size);
2817 
2818     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2819 
2820     part_start = start;
2821     sent = 0;
2822 
2823     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2824         nxt_unit_req_alert(req, "write: response not initialized yet");
2825 
2826         return -NXT_UNIT_ERROR;
2827     }
2828 
2829     /* Check if response is not send yet. */
2830     if (nxt_slow_path(req->response_buf != NULL)) {
2831         part_size = req->response_buf->end - req->response_buf->free;
2832         part_size = nxt_min(size, part_size);
2833 
2834         rc = nxt_unit_response_add_content(req, part_start, part_size);
2835         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2836             return -rc;
2837         }
2838 
2839         rc = nxt_unit_response_send(req);
2840         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2841             return -rc;
2842         }
2843 
2844         size -= part_size;
2845         part_start += part_size;
2846         sent += part_size;
2847 
2848         min_size -= nxt_min(min_size, part_size);
2849     }
2850 
2851     while (size > 0) {
2852         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2853         min_part_size = nxt_min(min_size, part_size);
2854         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2855 
2856         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2857                                        min_part_size, &mmap_buf, local_buf);
2858         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2859             return -rc;
2860         }
2861 
2862         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2863         if (nxt_slow_path(buf_size == 0)) {
2864             return sent;
2865         }
2866         part_size = nxt_min(buf_size, part_size);
2867 
2868         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2869                                        part_start, part_size);
2870 
2871         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2872         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2873             return -rc;
2874         }
2875 
2876         size -= part_size;
2877         part_start += part_size;
2878         sent += part_size;
2879 
2880         min_size -= nxt_min(min_size, part_size);
2881     }
2882 
2883     return sent;
2884 }
2885 
2886 
2887 int
2888 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2889     nxt_unit_read_info_t *read_info)
2890 {
2891     int                           rc;
2892     ssize_t                       n;
2893     uint32_t                      buf_size;
2894     nxt_unit_buf_t                *buf;
2895     nxt_unit_mmap_buf_t           mmap_buf;
2896     nxt_unit_request_info_impl_t  *req_impl;
2897     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2898 
2899     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2900 
2901     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2902         nxt_unit_req_alert(req, "write: response not initialized yet");
2903 
2904         return NXT_UNIT_ERROR;
2905     }
2906 
2907     /* Check if response is not send yet. */
2908     if (nxt_slow_path(req->response_buf != NULL)) {
2909 
2910         /* Enable content in headers buf. */
2911         rc = nxt_unit_response_add_content(req, "", 0);
2912         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2913             nxt_unit_req_error(req, "Failed to add piggyback content");
2914 
2915             return rc;
2916         }
2917 
2918         buf = req->response_buf;
2919 
2920         while (buf->end - buf->free > 0) {
2921             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2922             if (nxt_slow_path(n < 0)) {
2923                 nxt_unit_req_error(req, "Read error");
2924 
2925                 return NXT_UNIT_ERROR;
2926             }
2927 
2928             /* Manually increase sizes. */
2929             buf->free += n;
2930             req->response->piggyback_content_length += n;
2931 
2932             if (read_info->eof) {
2933                 break;
2934             }
2935         }
2936 
2937         rc = nxt_unit_response_send(req);
2938         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2939             nxt_unit_req_error(req, "Failed to send headers with content");
2940 
2941             return rc;
2942         }
2943 
2944         if (read_info->eof) {
2945             return NXT_UNIT_OK;
2946         }
2947     }
2948 
2949     while (!read_info->eof) {
2950         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
2951                            read_info->buf_size);
2952 
2953         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
2954 
2955         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2956                                        buf_size, buf_size,
2957                                        &mmap_buf, local_buf);
2958         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2959             return rc;
2960         }
2961 
2962         buf = &mmap_buf.buf;
2963 
2964         while (!read_info->eof && buf->end > buf->free) {
2965             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2966             if (nxt_slow_path(n < 0)) {
2967                 nxt_unit_req_error(req, "Read error");
2968 
2969                 nxt_unit_free_outgoing_buf(&mmap_buf);
2970 
2971                 return NXT_UNIT_ERROR;
2972             }
2973 
2974             buf->free += n;
2975         }
2976 
2977         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2978         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2979             nxt_unit_req_error(req, "Failed to send content");
2980 
2981             return rc;
2982         }
2983     }
2984 
2985     return NXT_UNIT_OK;
2986 }
2987 
2988 
2989 ssize_t
2990 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
2991 {
2992     ssize_t  buf_res, res;
2993 
2994     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
2995                                 dst, size);
2996 
2997     if (buf_res < (ssize_t) size && req->content_fd != -1) {
2998         res = read(req->content_fd, dst, size);
2999         if (nxt_slow_path(res < 0)) {
3000             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3001                                strerror(errno), errno);
3002 
3003             return res;
3004         }
3005 
3006         if (res < (ssize_t) size) {
3007             nxt_unit_close(req->content_fd);
3008 
3009             req->content_fd = -1;
3010         }
3011 
3012         req->content_length -= res;
3013         size -= res;
3014 
3015         dst = nxt_pointer_to(dst, res);
3016 
3017     } else {
3018         res = 0;
3019     }
3020 
3021     return buf_res + res;
3022 }
3023 
3024 
3025 ssize_t
3026 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3027 {
3028     char                 *p;
3029     size_t               l_size, b_size;
3030     nxt_unit_buf_t       *b;
3031     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3032 
3033     if (req->content_length == 0) {
3034         return 0;
3035     }
3036 
3037     l_size = 0;
3038 
3039     b = req->content_buf;
3040 
3041     while (b != NULL) {
3042         b_size = b->end - b->free;
3043         p = memchr(b->free, '\n', b_size);
3044 
3045         if (p != NULL) {
3046             p++;
3047             l_size += p - b->free;
3048             break;
3049         }
3050 
3051         l_size += b_size;
3052 
3053         if (max_size <= l_size) {
3054             break;
3055         }
3056 
3057         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
3058         if (mmap_buf->next == NULL
3059             && req->content_fd != -1
3060             && l_size < req->content_length)
3061         {
3062             preread_buf = nxt_unit_request_preread(req, 16384);
3063             if (nxt_slow_path(preread_buf == NULL)) {
3064                 return -1;
3065             }
3066 
3067             nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
3068         }
3069 
3070         b = nxt_unit_buf_next(b);
3071     }
3072 
3073     return nxt_min(max_size, l_size);
3074 }
3075 
3076 
3077 static nxt_unit_mmap_buf_t *
3078 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
3079 {
3080     ssize_t              res;
3081     nxt_unit_mmap_buf_t  *mmap_buf;
3082 
3083     if (req->content_fd == -1) {
3084         nxt_unit_req_alert(req, "preread: content_fd == -1");
3085         return NULL;
3086     }
3087 
3088     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
3089     if (nxt_slow_path(mmap_buf == NULL)) {
3090         nxt_unit_req_alert(req, "preread: failed to allocate buf");
3091         return NULL;
3092     }
3093 
3094     mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size);
3095     if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3096         nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
3097         nxt_unit_mmap_buf_release(mmap_buf);
3098         return NULL;
3099     }
3100 
3101     mmap_buf->plain_ptr = mmap_buf->free_ptr;
3102 
3103     mmap_buf->hdr = NULL;
3104     mmap_buf->buf.start = mmap_buf->free_ptr;
3105     mmap_buf->buf.free = mmap_buf->buf.start;
3106     mmap_buf->buf.end = mmap_buf->buf.start + size;
3107 
3108     res = read(req->content_fd, mmap_buf->free_ptr, size);
3109     if (res < 0) {
3110         nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3111                            strerror(errno), errno);
3112 
3113         nxt_unit_mmap_buf_free(mmap_buf);
3114 
3115         return NULL;
3116     }
3117 
3118     if (res < (ssize_t) size) {
3119         nxt_unit_close(req->content_fd);
3120 
3121         req->content_fd = -1;
3122     }
3123 
3124     nxt_unit_req_debug(req, "preread: read %d", (int) res);
3125 
3126     mmap_buf->buf.end = mmap_buf->buf.free + res;
3127 
3128     return mmap_buf;
3129 }
3130 
3131 
3132 static ssize_t
3133 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
3134 {
3135     u_char          *p;
3136     size_t          rest, copy, read;
3137     nxt_unit_buf_t  *buf, *last_buf;
3138 
3139     p = dst;
3140     rest = size;
3141 
3142     buf = *b;
3143     last_buf = buf;
3144 
3145     while (buf != NULL) {
3146         last_buf = buf;
3147 
3148         copy = buf->end - buf->free;
3149         copy = nxt_min(