xref: /unit/src/nxt_unit.c (revision 1668:03fa2be97871)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <stdlib.h>
7 
8 #include "nxt_main.h"
9 #include "nxt_port_memory_int.h"
10 #include "nxt_port_queue.h"
11 #include "nxt_app_queue.h"
12 
13 #include "nxt_unit.h"
14 #include "nxt_unit_request.h"
15 #include "nxt_unit_response.h"
16 #include "nxt_unit_websocket.h"
17 
18 #include "nxt_websocket.h"
19 
20 #if (NXT_HAVE_MEMFD_CREATE)
21 #include <linux/memfd.h>
22 #endif
23 
24 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
25 #define NXT_UNIT_LOCAL_BUF_SIZE  \
26     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
27 
28 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
29 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
30 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
31 typedef struct nxt_unit_process_s               nxt_unit_process_t;
32 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
33 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
34 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
35 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
36 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
37 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
38 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
39 
40 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
41 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
42     nxt_unit_ctx_impl_t *ctx_impl, void *data);
43 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
44 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
45 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
46 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
47 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
48     nxt_unit_mmap_buf_t *mmap_buf);
49 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
50     nxt_unit_mmap_buf_t *mmap_buf);
51 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
52 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
53     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
54     int *log_fd, uint32_t *stream, uint32_t *shm_limit);
55 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
56     int queue_fd);
57 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
58 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
59     nxt_unit_recv_msg_t *recv_msg);
60 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
61 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
62     nxt_unit_recv_msg_t *recv_msg);
63 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
64     nxt_unit_recv_msg_t *recv_msg);
65 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
66     nxt_unit_port_id_t *port_id);
67 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
68 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
69     nxt_unit_recv_msg_t *recv_msg);
70 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
71 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
72     nxt_unit_ctx_t *ctx);
73 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
74 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
75 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
76     nxt_unit_ctx_t *ctx);
77 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
78 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
79     nxt_unit_websocket_frame_impl_t *ws);
80 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
81 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
82 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
83     nxt_unit_mmap_buf_t *mmap_buf, int last);
84 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
85 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
86 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
87 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
88     nxt_unit_ctx_impl_t *ctx_impl);
89 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
90     nxt_unit_read_buf_t *rbuf);
91 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
92     nxt_unit_request_info_t *req, size_t size);
93 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
94     size_t size);
95 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
96     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
97 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
98 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
99 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
100 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
101     nxt_unit_port_t *port, int n);
102 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
103 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
104     int fd);
105 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
106     nxt_unit_port_t *port, uint32_t size,
107     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
108 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
109 
110 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
111     nxt_unit_ctx_impl_t *ctx_impl);
112 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
113 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
114 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
115 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
116 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
117     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
118     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
119 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
120     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
121 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
122 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
123     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
124 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
125 
126 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
127 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
128     pid_t pid, int remove);
129 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
130 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
131 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
132 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
133 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
134 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
135 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
136 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
137 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
138 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
139     nxt_unit_port_t *port);
140 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
141 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
142 
143 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
144     nxt_unit_port_t *port, int queue_fd);
145 
146 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
147 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
148 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
149     nxt_unit_port_t *port, void *queue);
150 static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
151     nxt_unit_port_id_t *port_id);
152 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
153     nxt_unit_port_id_t *port_id);
154 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
155 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
156     nxt_unit_process_t *process);
157 static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
158 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
159 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
160     nxt_unit_port_t *port, const void *buf, size_t buf_size,
161     const void *oob, size_t oob_size);
162 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
163     const void *buf, size_t buf_size, const void *oob, size_t oob_size);
164 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
165     nxt_unit_read_buf_t *rbuf);
166 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
167     nxt_unit_read_buf_t *src);
168 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
169     nxt_unit_read_buf_t *rbuf);
170 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
171     nxt_unit_read_buf_t *rbuf);
172 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
173     nxt_unit_read_buf_t *rbuf);
174 static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
175     nxt_unit_read_buf_t *rbuf);
176 nxt_inline int nxt_unit_close(int fd);
177 static int nxt_unit_fd_blocking(int fd);
178 
179 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
180     nxt_unit_port_t *port);
181 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
182     nxt_unit_port_id_t *port_id, int remove);
183 
184 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
185     nxt_unit_request_info_t *req);
186 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
187     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
188 
189 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
190 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
191 static void nxt_unit_lvlhsh_free(void *data, void *p);
192 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
193 
194 
195 struct nxt_unit_mmap_buf_s {
196     nxt_unit_buf_t           buf;
197 
198     nxt_unit_mmap_buf_t      *next;
199     nxt_unit_mmap_buf_t      **prev;
200 
201     nxt_port_mmap_header_t   *hdr;
202     nxt_unit_request_info_t  *req;
203     nxt_unit_ctx_impl_t      *ctx_impl;
204     char                     *free_ptr;
205     char                     *plain_ptr;
206 };
207 
208 
209 struct nxt_unit_recv_msg_s {
210     uint32_t                 stream;
211     nxt_pid_t                pid;
212     nxt_port_id_t            reply_port;
213 
214     uint8_t                  last;      /* 1 bit */
215     uint8_t                  mmap;      /* 1 bit */
216 
217     void                     *start;
218     uint32_t                 size;
219 
220     int                      fd[2];
221 
222     nxt_unit_mmap_buf_t      *incoming_buf;
223 };
224 
225 
226 typedef enum {
227     NXT_UNIT_RS_START           = 0,
228     NXT_UNIT_RS_RESPONSE_INIT,
229     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
230     NXT_UNIT_RS_RESPONSE_SENT,
231     NXT_UNIT_RS_RELEASED,
232 } nxt_unit_req_state_t;
233 
234 
235 struct nxt_unit_request_info_impl_s {
236     nxt_unit_request_info_t  req;
237 
238     uint32_t                 stream;
239 
240     nxt_unit_mmap_buf_t      *outgoing_buf;
241     nxt_unit_mmap_buf_t      *incoming_buf;
242 
243     nxt_unit_req_state_t     state;
244     uint8_t                  websocket;
245     uint8_t                  in_hash;
246 
247     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
248     nxt_queue_link_t         link;
249     /*  for nxt_unit_port_impl_t.awaiting_req */
250     nxt_queue_link_t         port_wait_link;
251 
252     char                     extra_data[];
253 };
254 
255 
256 struct nxt_unit_websocket_frame_impl_s {
257     nxt_unit_websocket_frame_t  ws;
258 
259     nxt_unit_mmap_buf_t         *buf;
260 
261     nxt_queue_link_t            link;
262 
263     nxt_unit_ctx_impl_t         *ctx_impl;
264 };
265 
266 
267 struct nxt_unit_read_buf_s {
268     nxt_queue_link_t              link;
269     nxt_unit_ctx_impl_t           *ctx_impl;
270     ssize_t                       size;
271     char                          buf[16384];
272     char                          oob[256];
273 };
274 
275 
276 struct nxt_unit_ctx_impl_s {
277     nxt_unit_ctx_t                ctx;
278 
279     nxt_atomic_t                  use_count;
280     nxt_atomic_t                  wait_items;
281 
282     pthread_mutex_t               mutex;
283 
284     nxt_unit_port_t               *read_port;
285 
286     nxt_queue_link_t              link;
287 
288     nxt_unit_mmap_buf_t           *free_buf;
289 
290     /*  of nxt_unit_request_info_impl_t */
291     nxt_queue_t                   free_req;
292 
293     /*  of nxt_unit_websocket_frame_impl_t */
294     nxt_queue_t                   free_ws;
295 
296     /*  of nxt_unit_request_info_impl_t */
297     nxt_queue_t                   active_req;
298 
299     /*  of nxt_unit_request_info_impl_t */
300     nxt_lvlhsh_t                  requests;
301 
302     /*  of nxt_unit_request_info_impl_t */
303     nxt_queue_t                   ready_req;
304 
305     /*  of nxt_unit_read_buf_t */
306     nxt_queue_t                   pending_rbuf;
307 
308     /*  of nxt_unit_read_buf_t */
309     nxt_queue_t                   free_rbuf;
310 
311     int                           online;
312     int                           ready;
313 
314     nxt_unit_mmap_buf_t           ctx_buf[2];
315     nxt_unit_read_buf_t           ctx_read_buf;
316 
317     nxt_unit_request_info_impl_t  req;
318 };
319 
320 
321 struct nxt_unit_mmap_s {
322     nxt_port_mmap_header_t   *hdr;
323     pthread_t                src_thread;
324 
325     /*  of nxt_unit_read_buf_t */
326     nxt_queue_t              awaiting_rbuf;
327 };
328 
329 
330 struct nxt_unit_mmaps_s {
331     pthread_mutex_t          mutex;
332     uint32_t                 size;
333     uint32_t                 cap;
334     nxt_atomic_t             allocated_chunks;
335     nxt_unit_mmap_t          *elts;
336 };
337 
338 
339 struct nxt_unit_impl_s {
340     nxt_unit_t               unit;
341     nxt_unit_callbacks_t     callbacks;
342 
343     nxt_atomic_t             use_count;
344 
345     uint32_t                 request_data_size;
346     uint32_t                 shm_mmap_limit;
347 
348     pthread_mutex_t          mutex;
349 
350     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
351     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
352 
353     nxt_unit_port_t          *router_port;
354     nxt_unit_port_t          *shared_port;
355 
356     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
357 
358     nxt_unit_mmaps_t         incoming;
359     nxt_unit_mmaps_t         outgoing;
360 
361     pid_t                    pid;
362     int                      log_fd;
363 
364     nxt_unit_ctx_impl_t      main_ctx;
365 };
366 
367 
368 struct nxt_unit_port_impl_s {
369     nxt_unit_port_t          port;
370 
371     nxt_atomic_t             use_count;
372 
373     /*  for nxt_unit_process_t.ports */
374     nxt_queue_link_t         link;
375     nxt_unit_process_t       *process;
376 
377     /*  of nxt_unit_request_info_impl_t */
378     nxt_queue_t              awaiting_req;
379 
380     int                      ready;
381 
382     void                     *queue;
383 
384     int                      from_socket;
385     nxt_unit_read_buf_t      *socket_rbuf;
386 };
387 
388 
389 struct nxt_unit_process_s {
390     pid_t                    pid;
391 
392     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
393 
394     nxt_unit_impl_t          *lib;
395 
396     nxt_atomic_t             use_count;
397 
398     uint32_t                 next_port_id;
399 };
400 
401 
402 /* Explicitly using 32 bit types to avoid possible alignment. */
403 typedef struct {
404     int32_t   pid;
405     uint32_t  id;
406 } nxt_unit_port_hash_id_t;
407 
408 
409 nxt_unit_ctx_t *
410 nxt_unit_init(nxt_unit_init_t *init)
411 {
412     int              rc, queue_fd;
413     void             *mem;
414     uint32_t         ready_stream, shm_limit;
415     nxt_unit_ctx_t   *ctx;
416     nxt_unit_impl_t  *lib;
417     nxt_unit_port_t  ready_port, router_port, read_port;
418 
419     lib = nxt_unit_create(init);
420     if (nxt_slow_path(lib == NULL)) {
421         return NULL;
422     }
423 
424     queue_fd = -1;
425     mem = MAP_FAILED;
426 
427     if (init->ready_port.id.pid != 0
428         && init->ready_stream != 0
429         && init->read_port.id.pid != 0)
430     {
431         ready_port = init->ready_port;
432         ready_stream = init->ready_stream;
433         router_port = init->router_port;
434         read_port = init->read_port;
435         lib->log_fd = init->log_fd;
436 
437         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
438                               ready_port.id.id);
439         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
440                               router_port.id.id);
441         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
442                               read_port.id.id);
443 
444     } else {
445         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
446                                &lib->log_fd, &ready_stream, &shm_limit);
447         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
448             goto fail;
449         }
450 
451         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
452                                 / PORT_MMAP_DATA_SIZE;
453     }
454 
455     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
456         lib->shm_mmap_limit = 1;
457     }
458 
459     lib->pid = read_port.id.pid;
460 
461     ctx = &lib->main_ctx.ctx;
462 
463     rc = nxt_unit_fd_blocking(router_port.out_fd);
464     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
465         goto fail;
466     }
467 
468     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
469     if (nxt_slow_path(lib->router_port == NULL)) {
470         nxt_unit_alert(NULL, "failed to add router_port");
471 
472         goto fail;
473     }
474 
475     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
476     if (nxt_slow_path(queue_fd == -1)) {
477         goto fail;
478     }
479 
480     mem = mmap(NULL, sizeof(nxt_port_queue_t),
481                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
482     if (nxt_slow_path(mem == MAP_FAILED)) {
483         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
484                        strerror(errno), errno);
485 
486         goto fail;
487     }
488 
489     nxt_port_queue_init(mem);
490 
491     rc = nxt_unit_fd_blocking(read_port.in_fd);
492     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
493         goto fail;
494     }
495 
496     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
497     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
498         nxt_unit_alert(NULL, "failed to add read_port");
499 
500         goto fail;
501     }
502 
503     rc = nxt_unit_fd_blocking(ready_port.out_fd);
504     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
505         goto fail;
506     }
507 
508     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
509     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
510         nxt_unit_alert(NULL, "failed to send READY message");
511 
512         goto fail;
513     }
514 
515     nxt_unit_close(ready_port.out_fd);
516     nxt_unit_close(queue_fd);
517 
518     return ctx;
519 
520 fail:
521 
522     if (mem != MAP_FAILED) {
523         munmap(mem, sizeof(nxt_port_queue_t));
524     }
525 
526     if (queue_fd != -1) {
527         nxt_unit_close(queue_fd);
528     }
529 
530     nxt_unit_ctx_release(&lib->main_ctx.ctx);
531 
532     return NULL;
533 }
534 
535 
536 static nxt_unit_impl_t *
537 nxt_unit_create(nxt_unit_init_t *init)
538 {
539     int                   rc;
540     nxt_unit_impl_t       *lib;
541     nxt_unit_callbacks_t  *cb;
542 
543     lib = nxt_unit_malloc(NULL,
544                           sizeof(nxt_unit_impl_t) + init->request_data_size);
545     if (nxt_slow_path(lib == NULL)) {
546         nxt_unit_alert(NULL, "failed to allocate unit struct");
547 
548         return NULL;
549     }
550 
551     rc = pthread_mutex_init(&lib->mutex, NULL);
552     if (nxt_slow_path(rc != 0)) {
553         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
554 
555         goto fail;
556     }
557 
558     lib->unit.data = init->data;
559     lib->callbacks = init->callbacks;
560 
561     lib->request_data_size = init->request_data_size;
562     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
563                             / PORT_MMAP_DATA_SIZE;
564 
565     lib->processes.slot = NULL;
566     lib->ports.slot = NULL;
567 
568     lib->log_fd = STDERR_FILENO;
569 
570     nxt_queue_init(&lib->contexts);
571 
572     lib->use_count = 0;
573     lib->router_port = NULL;
574     lib->shared_port = NULL;
575 
576     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
577     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
578         pthread_mutex_destroy(&lib->mutex);
579         goto fail;
580     }
581 
582     cb = &lib->callbacks;
583 
584     if (cb->request_handler == NULL) {
585         nxt_unit_alert(NULL, "request_handler is NULL");
586 
587         pthread_mutex_destroy(&lib->mutex);
588         goto fail;
589     }
590 
591     nxt_unit_mmaps_init(&lib->incoming);
592     nxt_unit_mmaps_init(&lib->outgoing);
593 
594     return lib;
595 
596 fail:
597 
598     nxt_unit_free(NULL, lib);
599 
600     return NULL;
601 }
602 
603 
604 static int
605 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
606     void *data)
607 {
608     int  rc;
609 
610     ctx_impl->ctx.data = data;
611     ctx_impl->ctx.unit = &lib->unit;
612 
613     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
614     if (nxt_slow_path(rc != 0)) {
615         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
616 
617         return NXT_UNIT_ERROR;
618     }
619 
620     nxt_unit_lib_use(lib);
621 
622     pthread_mutex_lock(&lib->mutex);
623 
624     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
625 
626     pthread_mutex_unlock(&lib->mutex);
627 
628     ctx_impl->use_count = 1;
629     ctx_impl->wait_items = 0;
630     ctx_impl->online = 1;
631     ctx_impl->ready = 0;
632 
633     nxt_queue_init(&ctx_impl->free_req);
634     nxt_queue_init(&ctx_impl->free_ws);
635     nxt_queue_init(&ctx_impl->active_req);
636     nxt_queue_init(&ctx_impl->ready_req);
637     nxt_queue_init(&ctx_impl->pending_rbuf);
638     nxt_queue_init(&ctx_impl->free_rbuf);
639 
640     ctx_impl->free_buf = NULL;
641     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
642     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
643 
644     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
645     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
646 
647     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
648 
649     ctx_impl->req.req.ctx = &ctx_impl->ctx;
650     ctx_impl->req.req.unit = &lib->unit;
651 
652     ctx_impl->read_port = NULL;
653     ctx_impl->requests.slot = 0;
654 
655     return NXT_UNIT_OK;
656 }
657 
658 
659 nxt_inline void
660 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
661 {
662     nxt_unit_ctx_impl_t  *ctx_impl;
663 
664     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
665 
666     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
667 }
668 
669 
670 nxt_inline void
671 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
672 {
673     long                 c;
674     nxt_unit_ctx_impl_t  *ctx_impl;
675 
676     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
677 
678     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
679 
680     if (c == 1) {
681         nxt_unit_ctx_free(ctx_impl);
682     }
683 }
684 
685 
686 nxt_inline void
687 nxt_unit_lib_use(nxt_unit_impl_t *lib)
688 {
689     nxt_atomic_fetch_add(&lib->use_count, 1);
690 }
691 
692 
693 nxt_inline void
694 nxt_unit_lib_release(nxt_unit_impl_t *lib)
695 {
696     long                c;
697     nxt_unit_process_t  *process;
698 
699     c = nxt_atomic_fetch_add(&lib->use_count, -1);
700 
701     if (c == 1) {
702         for ( ;; ) {
703             pthread_mutex_lock(&lib->mutex);
704 
705             process = nxt_unit_process_pop_first(lib);
706             if (process == NULL) {
707                 pthread_mutex_unlock(&lib->mutex);
708 
709                 break;
710             }
711 
712             nxt_unit_remove_process(lib, process);
713         }
714 
715         pthread_mutex_destroy(&lib->mutex);
716 
717         if (nxt_fast_path(lib->router_port != NULL)) {
718             nxt_unit_port_release(lib->router_port);
719         }
720 
721         if (nxt_fast_path(lib->shared_port != NULL)) {
722             nxt_unit_port_release(lib->shared_port);
723         }
724 
725         nxt_unit_mmaps_destroy(&lib->incoming);
726         nxt_unit_mmaps_destroy(&lib->outgoing);
727 
728         nxt_unit_free(NULL, lib);
729     }
730 }
731 
732 
733 nxt_inline void
734 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
735     nxt_unit_mmap_buf_t *mmap_buf)
736 {
737     mmap_buf->next = *head;
738 
739     if (mmap_buf->next != NULL) {
740         mmap_buf->next->prev = &mmap_buf->next;
741     }
742 
743     *head = mmap_buf;
744     mmap_buf->prev = head;
745 }
746 
747 
748 nxt_inline void
749 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
750     nxt_unit_mmap_buf_t *mmap_buf)
751 {
752     while (*prev != NULL) {
753         prev = &(*prev)->next;
754     }
755 
756     nxt_unit_mmap_buf_insert(prev, mmap_buf);
757 }
758 
759 
760 nxt_inline void
761 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
762 {
763     nxt_unit_mmap_buf_t  **prev;
764 
765     prev = mmap_buf->prev;
766 
767     if (mmap_buf->next != NULL) {
768         mmap_buf->next->prev = prev;
769     }
770 
771     if (prev != NULL) {
772         *prev = mmap_buf->next;
773     }
774 }
775 
776 
777 static int
778 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
779     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
780     uint32_t *shm_limit)
781 {
782     int       rc;
783     int       ready_fd, router_fd, read_in_fd, read_out_fd;
784     char      *unit_init, *version_end;
785     long      version_length;
786     int64_t   ready_pid, router_pid, read_pid;
787     uint32_t  ready_stream, router_id, ready_id, read_id;
788 
789     unit_init = getenv(NXT_UNIT_INIT_ENV);
790     if (nxt_slow_path(unit_init == NULL)) {
791         nxt_unit_alert(NULL, "%s is not in the current environment",
792                        NXT_UNIT_INIT_ENV);
793 
794         return NXT_UNIT_ERROR;
795     }
796 
797     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
798 
799     version_length = nxt_length(NXT_VERSION);
800 
801     version_end = strchr(unit_init, ';');
802     if (version_end == NULL
803         || version_end - unit_init != version_length
804         || memcmp(unit_init, NXT_VERSION, version_length) != 0)
805     {
806         nxt_unit_alert(NULL, "version check error");
807 
808         return NXT_UNIT_ERROR;
809     }
810 
811     rc = sscanf(version_end + 1,
812                 "%"PRIu32";"
813                 "%"PRId64",%"PRIu32",%d;"
814                 "%"PRId64",%"PRIu32",%d;"
815                 "%"PRId64",%"PRIu32",%d,%d;"
816                 "%d,%"PRIu32,
817                 &ready_stream,
818                 &ready_pid, &ready_id, &ready_fd,
819                 &router_pid, &router_id, &router_fd,
820                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
821                 log_fd, shm_limit);
822 
823     if (nxt_slow_path(rc != 13)) {
824         nxt_unit_alert(NULL, "failed to scan variables: %d", rc);
825 
826         return NXT_UNIT_ERROR;
827     }
828 
829     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
830 
831     ready_port->in_fd = -1;
832     ready_port->out_fd = ready_fd;
833     ready_port->data = NULL;
834 
835     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
836 
837     router_port->in_fd = -1;
838     router_port->out_fd = router_fd;
839     router_port->data = NULL;
840 
841     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
842 
843     read_port->in_fd = read_in_fd;
844     read_port->out_fd = read_out_fd;
845     read_port->data = NULL;
846 
847     *stream = ready_stream;
848 
849     return NXT_UNIT_OK;
850 }
851 
852 
853 static int
854 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
855 {
856     ssize_t          res;
857     nxt_port_msg_t   msg;
858     nxt_unit_impl_t  *lib;
859 
860     union {
861         struct cmsghdr  cm;
862         char            space[CMSG_SPACE(sizeof(int))];
863     } cmsg;
864 
865     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
866 
867     msg.stream = stream;
868     msg.pid = lib->pid;
869     msg.reply_port = 0;
870     msg.type = _NXT_PORT_MSG_PROCESS_READY;
871     msg.last = 1;
872     msg.mmap = 0;
873     msg.nf = 0;
874     msg.mf = 0;
875     msg.tracking = 0;
876 
877     memset(&cmsg, 0, sizeof(cmsg));
878 
879     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
880     cmsg.cm.cmsg_level = SOL_SOCKET;
881     cmsg.cm.cmsg_type = SCM_RIGHTS;
882 
883     /*
884      * memcpy() is used instead of simple
885      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
886      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
887      *   dereferencing type-punned pointer will break strict-aliasing rules
888      *
889      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
890      * in the same simple assignment as in the code above.
891      */
892     memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int));
893 
894     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg),
895                            &cmsg, sizeof(cmsg));
896     if (res != sizeof(msg)) {
897         return NXT_UNIT_ERROR;
898     }
899 
900     return NXT_UNIT_OK;
901 }
902 
903 
904 static int
905 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
906 {
907     int                  rc;
908     pid_t                pid;
909     struct cmsghdr       *cm;
910     nxt_port_msg_t       *port_msg;
911     nxt_unit_impl_t      *lib;
912     nxt_unit_recv_msg_t  recv_msg;
913 
914     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
915 
916     recv_msg.fd[0] = -1;
917     recv_msg.fd[1] = -1;
918     port_msg = (nxt_port_msg_t *) rbuf->buf;
919     cm = (struct cmsghdr *) rbuf->oob;
920 
921     if (cm->cmsg_level == SOL_SOCKET
922         && cm->cmsg_type == SCM_RIGHTS)
923     {
924         if (cm->cmsg_len == CMSG_LEN(sizeof(int))) {
925             memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int));
926         }
927 
928         if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) {
929             memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2);
930         }
931     }
932 
933     recv_msg.incoming_buf = NULL;
934 
935     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
936         if (nxt_slow_path(rbuf->size == 0)) {
937             nxt_unit_debug(ctx, "read port closed");
938 
939             nxt_unit_quit(ctx);
940             rc = NXT_UNIT_OK;
941             goto done;
942         }
943 
944         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
945 
946         rc = NXT_UNIT_ERROR;
947         goto done;
948     }
949 
950     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
951                    port_msg->stream, (int) port_msg->type,
952                    recv_msg.fd[0], recv_msg.fd[1]);
953 
954     recv_msg.stream = port_msg->stream;
955     recv_msg.pid = port_msg->pid;
956     recv_msg.reply_port = port_msg->reply_port;
957     recv_msg.last = port_msg->last;
958     recv_msg.mmap = port_msg->mmap;
959 
960     recv_msg.start = port_msg + 1;
961     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
962 
963     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
964         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
965                        port_msg->stream, (int) port_msg->type);
966         rc = NXT_UNIT_ERROR;
967         goto done;
968     }
969 
970     /* Fragmentation is unsupported. */
971     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
972         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
973                        port_msg->stream, (int) port_msg->type);
974         rc = NXT_UNIT_ERROR;
975         goto done;
976     }
977 
978     if (port_msg->mmap) {
979         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
980 
981         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
982             if (rc == NXT_UNIT_AGAIN) {
983                 recv_msg.fd[0] = -1;
984                 recv_msg.fd[1] = -1;
985             }
986 
987             goto done;
988         }
989     }
990 
991     switch (port_msg->type) {
992 
993     case _NXT_PORT_MSG_RPC_READY:
994         rc = NXT_UNIT_OK;
995         break;
996 
997     case _NXT_PORT_MSG_QUIT:
998         nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
999 
1000         nxt_unit_quit(ctx);
1001         rc = NXT_UNIT_OK;
1002         break;
1003 
1004     case _NXT_PORT_MSG_NEW_PORT:
1005         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1006         break;
1007 
1008     case _NXT_PORT_MSG_PORT_ACK:
1009         rc = nxt_unit_ctx_ready(ctx);
1010         break;
1011 
1012     case _NXT_PORT_MSG_CHANGE_FILE:
1013         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1014                        port_msg->stream, recv_msg.fd[0]);
1015 
1016         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1017             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1018                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1019                            strerror(errno), errno);
1020 
1021             rc = NXT_UNIT_ERROR;
1022             goto done;
1023         }
1024 
1025         rc = NXT_UNIT_OK;
1026         break;
1027 
1028     case _NXT_PORT_MSG_MMAP:
1029         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1030             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1031                            port_msg->stream, recv_msg.fd[0]);
1032 
1033             rc = NXT_UNIT_ERROR;
1034             goto done;
1035         }
1036 
1037         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1038         break;
1039 
1040     case _NXT_PORT_MSG_REQ_HEADERS:
1041         rc = nxt_unit_process_req_headers(ctx, &recv_msg);
1042         break;
1043 
1044     case _NXT_PORT_MSG_REQ_BODY:
1045         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1046         break;
1047 
1048     case _NXT_PORT_MSG_WEBSOCKET:
1049         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1050         break;
1051 
1052     case _NXT_PORT_MSG_REMOVE_PID:
1053         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1054             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1055                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1056                            (int) sizeof(pid));
1057 
1058             rc = NXT_UNIT_ERROR;
1059             goto done;
1060         }
1061 
1062         memcpy(&pid, recv_msg.start, sizeof(pid));
1063 
1064         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1065                        port_msg->stream, (int) pid);
1066 
1067         nxt_unit_remove_pid(lib, pid);
1068 
1069         rc = NXT_UNIT_OK;
1070         break;
1071 
1072     case _NXT_PORT_MSG_SHM_ACK:
1073         rc = nxt_unit_process_shm_ack(ctx);
1074         break;
1075 
1076     default:
1077         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1078                        port_msg->stream, (int) port_msg->type);
1079 
1080         rc = NXT_UNIT_ERROR;
1081         goto done;
1082     }
1083 
1084 done:
1085 
1086     if (recv_msg.fd[0] != -1) {
1087         nxt_unit_close(recv_msg.fd[0]);
1088     }
1089 
1090     if (recv_msg.fd[1] != -1) {
1091         nxt_unit_close(recv_msg.fd[1]);
1092     }
1093 
1094     while (recv_msg.incoming_buf != NULL) {
1095         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1096     }
1097 
1098     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1099 #if (NXT_DEBUG)
1100         memset(rbuf->buf, 0xAC, rbuf->size);
1101 #endif
1102         nxt_unit_read_buf_release(ctx, rbuf);
1103     }
1104 
1105     return rc;
1106 }
1107 
1108 
1109 static int
1110 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1111 {
1112     void                     *mem;
1113     nxt_unit_impl_t          *lib;
1114     nxt_unit_port_t          new_port, *port;
1115     nxt_port_msg_new_port_t  *new_port_msg;
1116 
1117     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1118         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1119                       "invalid message size (%d)",
1120                       recv_msg->stream, (int) recv_msg->size);
1121 
1122         return NXT_UNIT_ERROR;
1123     }
1124 
1125     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1126         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1127                        recv_msg->stream, recv_msg->fd[0]);
1128 
1129         return NXT_UNIT_ERROR;
1130     }
1131 
1132     new_port_msg = recv_msg->start;
1133 
1134     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1135                    recv_msg->stream, (int) new_port_msg->pid,
1136                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1137 
1138     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1139 
1140     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1141         nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1142 
1143         new_port.in_fd = recv_msg->fd[0];
1144         new_port.out_fd = -1;
1145 
1146         mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
1147                    MAP_SHARED, recv_msg->fd[1], 0);
1148 
1149     } else {
1150         if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
1151                           != NXT_UNIT_OK))
1152         {
1153             return NXT_UNIT_ERROR;
1154         }
1155 
1156         nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1157                               new_port_msg->id);
1158 
1159         new_port.in_fd = -1;
1160         new_port.out_fd = recv_msg->fd[0];
1161 
1162         mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1163                    MAP_SHARED, recv_msg->fd[1], 0);
1164     }
1165 
1166     if (nxt_slow_path(mem == MAP_FAILED)) {
1167         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1168                        strerror(errno), errno);
1169 
1170         return NXT_UNIT_ERROR;
1171     }
1172 
1173     new_port.data = NULL;
1174 
1175     recv_msg->fd[0] = -1;
1176 
1177     port = nxt_unit_add_port(ctx, &new_port, mem);
1178     if (nxt_slow_path(port == NULL)) {
1179         return NXT_UNIT_ERROR;
1180     }
1181 
1182     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1183         lib->shared_port = port;
1184 
1185         return nxt_unit_ctx_ready(ctx);
1186     }
1187 
1188     nxt_unit_port_release(port);
1189 
1190     return NXT_UNIT_OK;
1191 }
1192 
1193 
1194 static int
1195 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1196 {
1197     nxt_unit_impl_t      *lib;
1198     nxt_unit_ctx_impl_t  *ctx_impl;
1199 
1200     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1201     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1202 
1203     ctx_impl->ready = 1;
1204 
1205     if (lib->callbacks.ready_handler) {
1206         return lib->callbacks.ready_handler(ctx);
1207     }
1208 
1209     return NXT_UNIT_OK;
1210 }
1211 
1212 
1213 static int
1214 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1215 {
1216     int                           res;
1217     nxt_unit_impl_t               *lib;
1218     nxt_unit_port_id_t            port_id;
1219     nxt_unit_request_t            *r;
1220     nxt_unit_mmap_buf_t           *b;
1221     nxt_unit_request_info_t       *req;
1222     nxt_unit_request_info_impl_t  *req_impl;
1223 
1224     if (nxt_slow_path(recv_msg->mmap == 0)) {
1225         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1226                       recv_msg->stream);
1227 
1228         return NXT_UNIT_ERROR;
1229     }
1230 
1231     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1232         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1233                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1234                       (int) sizeof(nxt_unit_request_t));
1235 
1236         return NXT_UNIT_ERROR;
1237     }
1238 
1239     req_impl = nxt_unit_request_info_get(ctx);
1240     if (nxt_slow_path(req_impl == NULL)) {
1241         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1242                       recv_msg->stream);
1243 
1244         return NXT_UNIT_ERROR;
1245     }
1246 
1247     req = &req_impl->req;
1248 
1249     req->request = recv_msg->start;
1250 
1251     b = recv_msg->incoming_buf;
1252 
1253     req->request_buf = &b->buf;
1254     req->response = NULL;
1255     req->response_buf = NULL;
1256 
1257     r = req->request;
1258 
1259     req->content_length = r->content_length;
1260 
1261     req->content_buf = req->request_buf;
1262     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1263 
1264     req_impl->stream = recv_msg->stream;
1265 
1266     req_impl->outgoing_buf = NULL;
1267 
1268     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1269         b->req = req;
1270     }
1271 
1272     /* "Move" incoming buffer list to req_impl. */
1273     req_impl->incoming_buf = recv_msg->incoming_buf;
1274     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1275     recv_msg->incoming_buf = NULL;
1276 
1277     req->content_fd = recv_msg->fd[0];
1278     recv_msg->fd[0] = -1;
1279 
1280     req->response_max_fields = 0;
1281     req_impl->state = NXT_UNIT_RS_START;
1282     req_impl->websocket = 0;
1283     req_impl->in_hash = 0;
1284 
1285     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1286                    (int) r->method_length,
1287                    (char *) nxt_unit_sptr_get(&r->method),
1288                    (int) r->target_length,
1289                    (char *) nxt_unit_sptr_get(&r->target),
1290                    (int) r->content_length);
1291 
1292     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1293 
1294     res = nxt_unit_request_check_response_port(req, &port_id);
1295     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1296         return NXT_UNIT_ERROR;
1297     }
1298 
1299     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1300         res = nxt_unit_send_req_headers_ack(req);
1301         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1302             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1303 
1304             return NXT_UNIT_ERROR;
1305         }
1306 
1307         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1308 
1309         if (req->content_length
1310             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1311         {
1312             res = nxt_unit_request_hash_add(ctx, req);
1313             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1314                 nxt_unit_req_warn(req, "failed to add request to hash");
1315 
1316                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1317 
1318                 return NXT_UNIT_ERROR;
1319             }
1320 
1321             /*
1322              * If application have separate data handler, we may start
1323              * request processing and process data when it is arrived.
1324              */
1325             if (lib->callbacks.data_handler == NULL) {
1326                 return NXT_UNIT_OK;
1327             }
1328         }
1329 
1330         lib->callbacks.request_handler(req);
1331     }
1332 
1333     return NXT_UNIT_OK;
1334 }
1335 
1336 
1337 static int
1338 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1339 {
1340     uint64_t                 l;
1341     nxt_unit_impl_t          *lib;
1342     nxt_unit_mmap_buf_t      *b;
1343     nxt_unit_request_info_t  *req;
1344 
1345     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1346     if (req == NULL) {
1347         return NXT_UNIT_OK;
1348     }
1349 
1350     l = req->content_buf->end - req->content_buf->free;
1351 
1352     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1353         b->req = req;
1354         l += b->buf.end - b->buf.free;
1355     }
1356 
1357     if (recv_msg->incoming_buf != NULL) {
1358         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1359 
1360         /* "Move" incoming buffer list to req_impl. */
1361         nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf);
1362         recv_msg->incoming_buf = NULL;
1363     }
1364 
1365     req->content_fd = recv_msg->fd[0];
1366     recv_msg->fd[0] = -1;
1367 
1368     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1369 
1370     if (lib->callbacks.data_handler != NULL) {
1371         lib->callbacks.data_handler(req);
1372 
1373         return NXT_UNIT_OK;
1374     }
1375 
1376     if (req->content_fd != -1 || l == req->content_length) {
1377         lib->callbacks.request_handler(req);
1378     }
1379 
1380     return NXT_UNIT_OK;
1381 }
1382 
1383 
1384 static int
1385 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1386     nxt_unit_port_id_t *port_id)
1387 {
1388     int                           res;
1389     nxt_unit_ctx_t                *ctx;
1390     nxt_unit_impl_t               *lib;
1391     nxt_unit_port_t               *port;
1392     nxt_unit_process_t            *process;
1393     nxt_unit_ctx_impl_t           *ctx_impl;
1394     nxt_unit_port_impl_t          *port_impl;
1395     nxt_unit_request_info_impl_t  *req_impl;
1396 
1397     ctx = req->ctx;
1398     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1399     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1400 
1401     pthread_mutex_lock(&lib->mutex);
1402 
1403     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1404     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1405 
1406     if (nxt_fast_path(port != NULL)) {
1407         req->response_port = port;
1408 
1409         if (nxt_fast_path(port_impl->ready)) {
1410             pthread_mutex_unlock(&lib->mutex);
1411 
1412             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1413                            (int) port->id.pid, (int) port->id.id);
1414 
1415             return NXT_UNIT_OK;
1416         }
1417 
1418         nxt_unit_debug(ctx, "check_response_port: "
1419                        "port{%d,%d} already requested",
1420                        (int) port->id.pid, (int) port->id.id);
1421 
1422         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1423 
1424         nxt_queue_insert_tail(&port_impl->awaiting_req,
1425                               &req_impl->port_wait_link);
1426 
1427         pthread_mutex_unlock(&lib->mutex);
1428 
1429         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1430 
1431         return NXT_UNIT_AGAIN;
1432     }
1433 
1434     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1435     if (nxt_slow_path(port_impl == NULL)) {
1436         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1437                        (int) sizeof(nxt_unit_port_impl_t));
1438 
1439         pthread_mutex_unlock(&lib->mutex);
1440 
1441         return NXT_UNIT_ERROR;
1442     }
1443 
1444     port = &port_impl->port;
1445 
1446     port->id = *port_id;
1447     port->in_fd = -1;
1448     port->out_fd = -1;
1449     port->data = NULL;
1450 
1451     res = nxt_unit_port_hash_add(&lib->ports, port);
1452     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1453         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1454                        port->id.pid, port->id.id);
1455 
1456         pthread_mutex_unlock(&lib->mutex);
1457 
1458         nxt_unit_free(ctx, port);
1459 
1460         return NXT_UNIT_ERROR;
1461     }
1462 
1463     process = nxt_unit_process_find(lib, port_id->pid, 0);
1464     if (nxt_slow_path(process == NULL)) {
1465         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1466                        port->id.pid);
1467 
1468         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1469 
1470         pthread_mutex_unlock(&lib->mutex);
1471 
1472         nxt_unit_free(ctx, port);
1473 
1474         return NXT_UNIT_ERROR;
1475     }
1476 
1477     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1478 
1479     port_impl->process = process;
1480     port_impl->queue = NULL;
1481     port_impl->from_socket = 0;
1482     port_impl->socket_rbuf = NULL;
1483 
1484     nxt_queue_init(&port_impl->awaiting_req);
1485 
1486     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1487 
1488     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1489 
1490     port_impl->use_count = 2;
1491     port_impl->ready = 0;
1492 
1493     req->response_port = port;
1494 
1495     pthread_mutex_unlock(&lib->mutex);
1496 
1497     res = nxt_unit_get_port(ctx, port_id);
1498     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1499         return NXT_UNIT_ERROR;
1500     }
1501 
1502     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1503 
1504     return NXT_UNIT_AGAIN;
1505 }
1506 
1507 
1508 static int
1509 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1510 {
1511     ssize_t                       res;
1512     nxt_port_msg_t                msg;
1513     nxt_unit_impl_t               *lib;
1514     nxt_unit_ctx_impl_t           *ctx_impl;
1515     nxt_unit_request_info_impl_t  *req_impl;
1516 
1517     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1518     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1519     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1520 
1521     memset(&msg, 0, sizeof(nxt_port_msg_t));
1522 
1523     msg.stream = req_impl->stream;
1524     msg.pid = lib->pid;
1525     msg.reply_port = ctx_impl->read_port->id.id;
1526     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1527 
1528     res = nxt_unit_port_send(req->ctx, req->response_port,
1529                              &msg, sizeof(msg), NULL, 0);
1530     if (nxt_slow_path(res != sizeof(msg))) {
1531         return NXT_UNIT_ERROR;
1532     }
1533 
1534     return NXT_UNIT_OK;
1535 }
1536 
1537 
1538 static int
1539 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1540 {
1541     size_t                           hsize;
1542     nxt_unit_impl_t                  *lib;
1543     nxt_unit_mmap_buf_t              *b;
1544     nxt_unit_callbacks_t             *cb;
1545     nxt_unit_request_info_t          *req;
1546     nxt_unit_request_info_impl_t     *req_impl;
1547     nxt_unit_websocket_frame_impl_t  *ws_impl;
1548 
1549     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1550     if (nxt_slow_path(req == NULL)) {
1551         return NXT_UNIT_OK;
1552     }
1553 
1554     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1555 
1556     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1557     cb = &lib->callbacks;
1558 
1559     if (cb->websocket_handler && recv_msg->size >= 2) {
1560         ws_impl = nxt_unit_websocket_frame_get(ctx);
1561         if (nxt_slow_path(ws_impl == NULL)) {
1562             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1563                           req_impl->stream);
1564 
1565             return NXT_UNIT_ERROR;
1566         }
1567 
1568         ws_impl->ws.req = req;
1569 
1570         ws_impl->buf = NULL;
1571 
1572         if (recv_msg->mmap) {
1573             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1574                 b->req = req;
1575             }
1576 
1577             /* "Move" incoming buffer list to ws_impl. */
1578             ws_impl->buf = recv_msg->incoming_buf;
1579             ws_impl->buf->prev = &ws_impl->buf;
1580             recv_msg->incoming_buf = NULL;
1581 
1582             b = ws_impl->buf;
1583 
1584         } else {
1585             b = nxt_unit_mmap_buf_get(ctx);
1586             if (nxt_slow_path(b == NULL)) {
1587                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1588                                req_impl->stream);
1589 
1590                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1591 
1592                 return NXT_UNIT_ERROR;
1593             }
1594 
1595             b->req = req;
1596             b->buf.start = recv_msg->start;
1597             b->buf.free = b->buf.start;
1598             b->buf.end = b->buf.start + recv_msg->size;
1599 
1600             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1601         }
1602 
1603         ws_impl->ws.header = (void *) b->buf.start;
1604         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1605             ws_impl->ws.header);
1606 
1607         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1608 
1609         if (ws_impl->ws.header->mask) {
1610             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1611 
1612         } else {
1613             ws_impl->ws.mask = NULL;
1614         }
1615 
1616         b->buf.free += hsize;
1617 
1618         ws_impl->ws.content_buf = &b->buf;
1619         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1620 
1621         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1622                            "payload_len=%"PRIu64,
1623                             ws_impl->ws.header->opcode,
1624                             ws_impl->ws.payload_len);
1625 
1626         cb->websocket_handler(&ws_impl->ws);
1627     }
1628 
1629     if (recv_msg->last) {
1630         req_impl->websocket = 0;
1631 
1632         if (cb->close_handler) {
1633             nxt_unit_req_debug(req, "close_handler");
1634 
1635             cb->close_handler(req);
1636 
1637         } else {
1638             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1639         }
1640     }
1641 
1642     return NXT_UNIT_OK;
1643 }
1644 
1645 
1646 static int
1647 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1648 {
1649     nxt_unit_impl_t       *lib;
1650     nxt_unit_callbacks_t  *cb;
1651 
1652     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1653     cb = &lib->callbacks;
1654 
1655     if (cb->shm_ack_handler != NULL) {
1656         cb->shm_ack_handler(ctx);
1657     }
1658 
1659     return NXT_UNIT_OK;
1660 }
1661 
1662 
1663 static nxt_unit_request_info_impl_t *
1664 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1665 {
1666     nxt_unit_impl_t               *lib;
1667     nxt_queue_link_t              *lnk;
1668     nxt_unit_ctx_impl_t           *ctx_impl;
1669     nxt_unit_request_info_impl_t  *req_impl;
1670 
1671     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1672 
1673     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1674 
1675     pthread_mutex_lock(&ctx_impl->mutex);
1676 
1677     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1678         pthread_mutex_unlock(&ctx_impl->mutex);
1679 
1680         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1681                                         + lib->request_data_size);
1682         if (nxt_slow_path(req_impl == NULL)) {
1683             return NULL;
1684         }
1685 
1686         req_impl->req.unit = ctx->unit;
1687         req_impl->req.ctx = ctx;
1688 
1689         pthread_mutex_lock(&ctx_impl->mutex);
1690 
1691     } else {
1692         lnk = nxt_queue_first(&ctx_impl->free_req);
1693         nxt_queue_remove(lnk);
1694 
1695         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1696     }
1697 
1698     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1699 
1700     pthread_mutex_unlock(&ctx_impl->mutex);
1701 
1702     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1703 
1704     return req_impl;
1705 }
1706 
1707 
1708 static void
1709 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1710 {
1711     nxt_unit_ctx_impl_t           *ctx_impl;
1712     nxt_unit_request_info_impl_t  *req_impl;
1713 
1714     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1715     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1716 
1717     req->response = NULL;
1718     req->response_buf = NULL;
1719 
1720     if (req_impl->in_hash) {
1721         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1722     }
1723 
1724     req_impl->websocket = 0;
1725 
1726     while (req_impl->outgoing_buf != NULL) {
1727         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1728     }
1729 
1730     while (req_impl->incoming_buf != NULL) {
1731         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1732     }
1733 
1734     if (req->content_fd != -1) {
1735         nxt_unit_close(req->content_fd);
1736 
1737         req->content_fd = -1;
1738     }
1739 
1740     if (req->response_port != NULL) {
1741         nxt_unit_port_release(req->response_port);
1742 
1743         req->response_port = NULL;
1744     }
1745 
1746     pthread_mutex_lock(&ctx_impl->mutex);
1747 
1748     nxt_queue_remove(&req_impl->link);
1749 
1750     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1751 
1752     pthread_mutex_unlock(&ctx_impl->mutex);
1753 
1754     req_impl->state = NXT_UNIT_RS_RELEASED;
1755 }
1756 
1757 
1758 static void
1759 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1760 {
1761     nxt_unit_ctx_impl_t  *ctx_impl;
1762 
1763     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1764 
1765     nxt_queue_remove(&req_impl->link);
1766 
1767     if (req_impl != &ctx_impl->req) {
1768         nxt_unit_free(&ctx_impl->ctx, req_impl);
1769     }
1770 }
1771 
1772 
1773 static nxt_unit_websocket_frame_impl_t *
1774 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1775 {
1776     nxt_queue_link_t                 *lnk;
1777     nxt_unit_ctx_impl_t              *ctx_impl;
1778     nxt_unit_websocket_frame_impl_t  *ws_impl;
1779 
1780     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1781 
1782     pthread_mutex_lock(&ctx_impl->mutex);
1783 
1784     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1785         pthread_mutex_unlock(&ctx_impl->mutex);
1786 
1787         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1788         if (nxt_slow_path(ws_impl == NULL)) {
1789             return NULL;
1790         }
1791 
1792     } else {
1793         lnk = nxt_queue_first(&ctx_impl->free_ws);
1794         nxt_queue_remove(lnk);
1795 
1796         pthread_mutex_unlock(&ctx_impl->mutex);
1797 
1798         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1799     }
1800 
1801     ws_impl->ctx_impl = ctx_impl;
1802 
1803     return ws_impl;
1804 }
1805 
1806 
1807 static void
1808 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1809 {
1810     nxt_unit_websocket_frame_impl_t  *ws_impl;
1811 
1812     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1813 
1814     while (ws_impl->buf != NULL) {
1815         nxt_unit_mmap_buf_free(ws_impl->buf);
1816     }
1817 
1818     ws->req = NULL;
1819 
1820     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1821 
1822     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1823 
1824     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1825 }
1826 
1827 
1828 static void
1829 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1830     nxt_unit_websocket_frame_impl_t *ws_impl)
1831 {
1832     nxt_queue_remove(&ws_impl->link);
1833 
1834     nxt_unit_free(ctx, ws_impl);
1835 }
1836 
1837 
1838 uint16_t
1839 nxt_unit_field_hash(const char *name, size_t name_length)
1840 {
1841     u_char      ch;
1842     uint32_t    hash;
1843     const char  *p, *end;
1844 
1845     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1846     end = name + name_length;
1847 
1848     for (p = name; p < end; p++) {
1849         ch = *p;
1850         hash = (hash << 4) + hash + nxt_lowcase(ch);
1851     }
1852 
1853     hash = (hash >> 16) ^ hash;
1854 
1855     return hash;
1856 }
1857 
1858 
1859 void
1860 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1861 {
1862     char                *name;
1863     uint32_t            i, j;
1864     nxt_unit_field_t    *fields, f;
1865     nxt_unit_request_t  *r;
1866 
1867     static nxt_str_t  content_length = nxt_string("content-length");
1868     static nxt_str_t  content_type = nxt_string("content-type");
1869     static nxt_str_t  cookie = nxt_string("cookie");
1870 
1871     nxt_unit_req_debug(req, "group_dup_fields");
1872 
1873     r = req->request;
1874     fields = r->fields;
1875 
1876     for (i = 0; i < r->fields_count; i++) {
1877         name = nxt_unit_sptr_get(&fields[i].name);
1878 
1879         switch (fields[i].hash) {
1880         case NXT_UNIT_HASH_CONTENT_LENGTH:
1881             if (fields[i].name_length == content_length.length
1882                 && nxt_unit_memcasecmp(name, content_length.start,
1883                                        content_length.length) == 0)
1884             {
1885                 r->content_length_field = i;
1886             }
1887 
1888             break;
1889 
1890         case NXT_UNIT_HASH_CONTENT_TYPE:
1891             if (fields[i].name_length == content_type.length
1892                 && nxt_unit_memcasecmp(name, content_type.start,
1893                                        content_type.length) == 0)
1894             {
1895                 r->content_type_field = i;
1896             }
1897 
1898             break;
1899 
1900         case NXT_UNIT_HASH_COOKIE:
1901             if (fields[i].name_length == cookie.length
1902                 && nxt_unit_memcasecmp(name, cookie.start,
1903                                        cookie.length) == 0)
1904             {
1905                 r->cookie_field = i;
1906             }
1907 
1908             break;
1909         }
1910 
1911         for (j = i + 1; j < r->fields_count; j++) {
1912             if (fields[i].hash != fields[j].hash
1913                 || fields[i].name_length != fields[j].name_length
1914                 || nxt_unit_memcasecmp(name,
1915                                        nxt_unit_sptr_get(&fields[j].name),
1916                                        fields[j].name_length) != 0)
1917             {
1918                 continue;
1919             }
1920 
1921             f = fields[j];
1922             f.value.offset += (j - (i + 1)) * sizeof(f);
1923 
1924             while (j > i + 1) {
1925                 fields[j] = fields[j - 1];
1926                 fields[j].name.offset -= sizeof(f);
1927                 fields[j].value.offset -= sizeof(f);
1928                 j--;
1929             }
1930 
1931             fields[j] = f;
1932 
1933             /* Assign the same name pointer for further grouping simplicity. */
1934             nxt_unit_sptr_set(&fields[j].name, name);
1935 
1936             i++;
1937         }
1938     }
1939 }
1940 
1941 
1942 int
1943 nxt_unit_response_init(nxt_unit_request_info_t *req,
1944     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
1945 {
1946     uint32_t                      buf_size;
1947     nxt_unit_buf_t                *buf;
1948     nxt_unit_request_info_impl_t  *req_impl;
1949 
1950     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1951 
1952     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1953         nxt_unit_req_warn(req, "init: response already sent");
1954 
1955         return NXT_UNIT_ERROR;
1956     }
1957 
1958     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
1959                        (int) max_fields_count, (int) max_fields_size);
1960 
1961     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
1962         nxt_unit_req_debug(req, "duplicate response init");
1963     }
1964 
1965     /*
1966      * Each field name and value 0-terminated by libunit,
1967      * this is the reason of '+ 2' below.
1968      */
1969     buf_size = sizeof(nxt_unit_response_t)
1970                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
1971                + max_fields_size;
1972 
1973     if (nxt_slow_path(req->response_buf != NULL)) {
1974         buf = req->response_buf;
1975 
1976         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
1977             goto init_response;
1978         }
1979 
1980         nxt_unit_buf_free(buf);
1981 
1982         req->response_buf = NULL;
1983         req->response = NULL;
1984         req->response_max_fields = 0;
1985 
1986         req_impl->state = NXT_UNIT_RS_START;
1987     }
1988 
1989     buf = nxt_unit_response_buf_alloc(req, buf_size);
1990     if (nxt_slow_path(buf == NULL)) {
1991         return NXT_UNIT_ERROR;
1992     }
1993 
1994 init_response:
1995 
1996     memset(buf->start, 0, sizeof(nxt_unit_response_t));
1997 
1998     req->response_buf = buf;
1999 
2000     req->response = (nxt_unit_response_t *) buf->start;
2001     req->response->status = status;
2002 
2003     buf->free = buf->start + sizeof(nxt_unit_response_t)
2004                 + max_fields_count * sizeof(nxt_unit_field_t);
2005 
2006     req->response_max_fields = max_fields_count;
2007     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2008 
2009     return NXT_UNIT_OK;
2010 }
2011 
2012 
2013 int
2014 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2015     uint32_t max_fields_count, uint32_t max_fields_size)
2016 {
2017     char                          *p;
2018     uint32_t                      i, buf_size;
2019     nxt_unit_buf_t                *buf;
2020     nxt_unit_field_t              *f, *src;
2021     nxt_unit_response_t           *resp;
2022     nxt_unit_request_info_impl_t  *req_impl;
2023 
2024     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2025 
2026     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2027         nxt_unit_req_warn(req, "realloc: response not init");
2028 
2029         return NXT_UNIT_ERROR;
2030     }
2031 
2032     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2033         nxt_unit_req_warn(req, "realloc: response already sent");
2034 
2035         return NXT_UNIT_ERROR;
2036     }
2037 
2038     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2039         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2040 
2041         return NXT_UNIT_ERROR;
2042     }
2043 
2044     /*
2045      * Each field name and value 0-terminated by libunit,
2046      * this is the reason of '+ 2' below.
2047      */
2048     buf_size = sizeof(nxt_unit_response_t)
2049                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2050                + max_fields_size;
2051 
2052     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2053 
2054     buf = nxt_unit_response_buf_alloc(req, buf_size);
2055     if (nxt_slow_path(buf == NULL)) {
2056         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2057         return NXT_UNIT_ERROR;
2058     }
2059 
2060     resp = (nxt_unit_response_t *) buf->start;
2061 
2062     memset(resp, 0, sizeof(nxt_unit_response_t));
2063 
2064     resp->status = req->response->status;
2065     resp->content_length = req->response->content_length;
2066 
2067     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2068     f = resp->fields;
2069 
2070     for (i = 0; i < req->response->fields_count; i++) {
2071         src = req->response->fields + i;
2072 
2073         if (nxt_slow_path(src->skip != 0)) {
2074             continue;
2075         }
2076 
2077         if (nxt_slow_path(src->name_length + src->value_length + 2
2078                           > (uint32_t) (buf->end - p)))
2079         {
2080             nxt_unit_req_warn(req, "realloc: not enough space for field"
2081                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2082                   i, src, src->name_length, src->value_length);
2083 
2084             goto fail;
2085         }
2086 
2087         nxt_unit_sptr_set(&f->name, p);
2088         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2089         *p++ = '\0';
2090 
2091         nxt_unit_sptr_set(&f->value, p);
2092         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2093         *p++ = '\0';
2094 
2095         f->hash = src->hash;
2096         f->skip = 0;
2097         f->name_length = src->name_length;
2098         f->value_length = src->value_length;
2099 
2100         resp->fields_count++;
2101         f++;
2102     }
2103 
2104     if (req->response->piggyback_content_length > 0) {
2105         if (nxt_slow_path(req->response->piggyback_content_length
2106                           > (uint32_t) (buf->end - p)))
2107         {
2108             nxt_unit_req_warn(req, "realloc: not enought space for content"
2109                   " #%"PRIu32", %"PRIu32" required",
2110                   i, req->response->piggyback_content_length);
2111 
2112             goto fail;
2113         }
2114 
2115         resp->piggyback_content_length =
2116                                        req->response->piggyback_content_length;
2117 
2118         nxt_unit_sptr_set(&resp->piggyback_content, p);
2119         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2120                        req->response->piggyback_content_length);
2121     }
2122 
2123     buf->free = p;
2124 
2125     nxt_unit_buf_free(req->response_buf);
2126 
2127     req->response = resp;
2128     req->response_buf = buf;
2129     req->response_max_fields = max_fields_count;
2130 
2131     return NXT_UNIT_OK;
2132 
2133 fail:
2134 
2135     nxt_unit_buf_free(buf);
2136 
2137     return NXT_UNIT_ERROR;
2138 }
2139 
2140 
2141 int
2142 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2143 {
2144     nxt_unit_request_info_impl_t  *req_impl;
2145 
2146     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2147 
2148     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2149 }
2150 
2151 
2152 int
2153 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2154     const char *name, uint8_t name_length,
2155     const char *value, uint32_t value_length)
2156 {
2157     nxt_unit_buf_t                *buf;
2158     nxt_unit_field_t              *f;
2159     nxt_unit_response_t           *resp;
2160     nxt_unit_request_info_impl_t  *req_impl;
2161 
2162     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2163 
2164     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2165         nxt_unit_req_warn(req, "add_field: response not initialized or "
2166                           "already sent");
2167 
2168         return NXT_UNIT_ERROR;
2169     }
2170 
2171     resp = req->response;
2172 
2173     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2174         nxt_unit_req_warn(req, "add_field: too many response fields");
2175 
2176         return NXT_UNIT_ERROR;
2177     }
2178 
2179     buf = req->response_buf;
2180 
2181     if (nxt_slow_path(name_length + value_length + 2
2182                       > (uint32_t) (buf->end - buf->free)))
2183     {
2184         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2185 
2186         return NXT_UNIT_ERROR;
2187     }
2188 
2189     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2190                        resp->fields_count,
2191                        (int) name_length, name,
2192                        (int) value_length, value);
2193 
2194     f = resp->fields + resp->fields_count;
2195 
2196     nxt_unit_sptr_set(&f->name, buf->free);
2197     buf->free = nxt_cpymem(buf->free, name, name_length);
2198     *buf->free++ = '\0';
2199 
2200     nxt_unit_sptr_set(&f->value, buf->free);
2201     buf->free = nxt_cpymem(buf->free, value, value_length);
2202     *buf->free++ = '\0';
2203 
2204     f->hash = nxt_unit_field_hash(name, name_length);
2205     f->skip = 0;
2206     f->name_length = name_length;
2207     f->value_length = value_length;
2208 
2209     resp->fields_count++;
2210 
2211     return NXT_UNIT_OK;
2212 }
2213 
2214 
2215 int
2216 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2217     const void* src, uint32_t size)
2218 {
2219     nxt_unit_buf_t                *buf;
2220     nxt_unit_response_t           *resp;
2221     nxt_unit_request_info_impl_t  *req_impl;
2222 
2223     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2224 
2225     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2226         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2227 
2228         return NXT_UNIT_ERROR;
2229     }
2230 
2231     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2232         nxt_unit_req_warn(req, "add_content: response already sent");
2233 
2234         return NXT_UNIT_ERROR;
2235     }
2236 
2237     buf = req->response_buf;
2238 
2239     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2240         nxt_unit_req_warn(req, "add_content: buffer overflow");
2241 
2242         return NXT_UNIT_ERROR;
2243     }
2244 
2245     resp = req->response;
2246 
2247     if (resp->piggyback_content_length == 0) {
2248         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2249         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2250     }
2251 
2252     resp->piggyback_content_length += size;
2253 
2254     buf->free = nxt_cpymem(buf->free, src, size);
2255 
2256     return NXT_UNIT_OK;
2257 }
2258 
2259 
2260 int
2261 nxt_unit_response_send(nxt_unit_request_info_t *req)
2262 {
2263     int                           rc;
2264     nxt_unit_mmap_buf_t           *mmap_buf;
2265     nxt_unit_request_info_impl_t  *req_impl;
2266 
2267     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2268 
2269     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2270         nxt_unit_req_warn(req, "send: response is not initialized yet");
2271 
2272         return NXT_UNIT_ERROR;
2273     }
2274 
2275     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2276         nxt_unit_req_warn(req, "send: response already sent");
2277 
2278         return NXT_UNIT_ERROR;
2279     }
2280 
2281     if (req->request->websocket_handshake && req->response->status == 101) {
2282         nxt_unit_response_upgrade(req);
2283     }
2284 
2285     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2286                        req->response->fields_count,
2287                        (int) (req->response_buf->free
2288                               - req->response_buf->start));
2289 
2290     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2291 
2292     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2293     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2294         req->response = NULL;
2295         req->response_buf = NULL;
2296         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2297 
2298         nxt_unit_mmap_buf_free(mmap_buf);
2299     }
2300 
2301     return rc;
2302 }
2303 
2304 
2305 int
2306 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2307 {
2308     nxt_unit_request_info_impl_t  *req_impl;
2309 
2310     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2311 
2312     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2313 }
2314 
2315 
2316 nxt_unit_buf_t *
2317 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2318 {
2319     int                           rc;
2320     nxt_unit_mmap_buf_t           *mmap_buf;
2321     nxt_unit_request_info_impl_t  *req_impl;
2322 
2323     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2324         nxt_unit_req_warn(req, "response_buf_alloc: "
2325                           "requested buffer (%"PRIu32") too big", size);
2326 
2327         return NULL;
2328     }
2329 
2330     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2331 
2332     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2333 
2334     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2335     if (nxt_slow_path(mmap_buf == NULL)) {
2336         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2337 
2338         return NULL;
2339     }
2340 
2341     mmap_buf->req = req;
2342 
2343     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2344 
2345     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2346                                    size, size, mmap_buf,
2347                                    NULL);
2348     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2349         nxt_unit_mmap_buf_release(mmap_buf);
2350 
2351         return NULL;
2352     }
2353 
2354     return &mmap_buf->buf;
2355 }
2356 
2357 
2358 static nxt_unit_mmap_buf_t *
2359 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2360 {
2361     nxt_unit_mmap_buf_t  *mmap_buf;
2362     nxt_unit_ctx_impl_t  *ctx_impl;
2363 
2364     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2365 
2366     pthread_mutex_lock(&ctx_impl->mutex);
2367 
2368     if (ctx_impl->free_buf == NULL) {
2369         pthread_mutex_unlock(&ctx_impl->mutex);
2370 
2371         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2372         if (nxt_slow_path(mmap_buf == NULL)) {
2373             return NULL;
2374         }
2375 
2376     } else {
2377         mmap_buf = ctx_impl->free_buf;
2378 
2379         nxt_unit_mmap_buf_unlink(mmap_buf);
2380 
2381         pthread_mutex_unlock(&ctx_impl->mutex);
2382     }
2383 
2384     mmap_buf->ctx_impl = ctx_impl;
2385 
2386     mmap_buf->hdr = NULL;
2387     mmap_buf->free_ptr = NULL;
2388 
2389     return mmap_buf;
2390 }
2391 
2392 
2393 static void
2394 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2395 {
2396     nxt_unit_mmap_buf_unlink(mmap_buf);
2397 
2398     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2399 
2400     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2401 
2402     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2403 }
2404 
2405 
2406 int
2407 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2408 {
2409     return req->request->websocket_handshake;
2410 }
2411 
2412 
2413 int
2414 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2415 {
2416     int                           rc;
2417     nxt_unit_request_info_impl_t  *req_impl;
2418 
2419     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2420 
2421     if (nxt_slow_path(req_impl->websocket != 0)) {
2422         nxt_unit_req_debug(req, "upgrade: already upgraded");
2423 
2424         return NXT_UNIT_OK;
2425     }
2426 
2427     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2428         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2429 
2430         return NXT_UNIT_ERROR;
2431     }
2432 
2433     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2434         nxt_unit_req_warn(req, "upgrade: response already sent");
2435 
2436         return NXT_UNIT_ERROR;
2437     }
2438 
2439     rc = nxt_unit_request_hash_add(req->ctx, req);
2440     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2441         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2442 
2443         return NXT_UNIT_ERROR;
2444     }
2445 
2446     req_impl->websocket = 1;
2447 
2448     req->response->status = 101;
2449 
2450     return NXT_UNIT_OK;
2451 }
2452 
2453 
2454 int
2455 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2456 {
2457     nxt_unit_request_info_impl_t  *req_impl;
2458 
2459     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2460 
2461     return req_impl->websocket;
2462 }
2463 
2464 
2465 nxt_unit_request_info_t *
2466 nxt_unit_get_request_info_from_data(void *data)
2467 {
2468     nxt_unit_request_info_impl_t  *req_impl;
2469 
2470     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2471 
2472     return &req_impl->req;
2473 }
2474 
2475 
2476 int
2477 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2478 {
2479     int                           rc;
2480     nxt_unit_mmap_buf_t           *mmap_buf;
2481     nxt_unit_request_info_t       *req;
2482     nxt_unit_request_info_impl_t  *req_impl;
2483 
2484     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2485 
2486     req = mmap_buf->req;
2487     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2488 
2489     nxt_unit_req_debug(req, "buf_send: %d bytes",
2490                        (int) (buf->free - buf->start));
2491 
2492     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2493         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2494 
2495         return NXT_UNIT_ERROR;
2496     }
2497 
2498     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2499         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2500 
2501         return NXT_UNIT_ERROR;
2502     }
2503 
2504     if (nxt_fast_path(buf->free > buf->start)) {
2505         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2506         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2507             return rc;
2508         }
2509     }
2510 
2511     nxt_unit_mmap_buf_free(mmap_buf);
2512 
2513     return NXT_UNIT_OK;
2514 }
2515 
2516 
2517 static void
2518 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2519 {
2520     int                      rc;
2521     nxt_unit_mmap_buf_t      *mmap_buf;
2522     nxt_unit_request_info_t  *req;
2523 
2524     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2525 
2526     req = mmap_buf->req;
2527 
2528     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2529     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2530         nxt_unit_mmap_buf_free(mmap_buf);
2531 
2532         nxt_unit_request_info_release(req);
2533 
2534     } else {
2535         nxt_unit_request_done(req, rc);
2536     }
2537 }
2538 
2539 
2540 static int
2541 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2542     nxt_unit_mmap_buf_t *mmap_buf, int last)
2543 {
2544     struct {
2545         nxt_port_msg_t       msg;
2546         nxt_port_mmap_msg_t  mmap_msg;
2547     } m;
2548 
2549     int                           rc;
2550     u_char                        *last_used, *first_free;
2551     ssize_t                       res;
2552     nxt_chunk_id_t                first_free_chunk;
2553     nxt_unit_buf_t                *buf;
2554     nxt_unit_impl_t               *lib;
2555     nxt_port_mmap_header_t        *hdr;
2556     nxt_unit_request_info_impl_t  *req_impl;
2557 
2558     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2559     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2560 
2561     buf = &mmap_buf->buf;
2562     hdr = mmap_buf->hdr;
2563 
2564     m.mmap_msg.size = buf->free - buf->start;
2565 
2566     m.msg.stream = req_impl->stream;
2567     m.msg.pid = lib->pid;
2568     m.msg.reply_port = 0;
2569     m.msg.type = _NXT_PORT_MSG_DATA;
2570     m.msg.last = last != 0;
2571     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2572     m.msg.nf = 0;
2573     m.msg.mf = 0;
2574     m.msg.tracking = 0;
2575 
2576     rc = NXT_UNIT_ERROR;
2577 
2578     if (m.msg.mmap) {
2579         m.mmap_msg.mmap_id = hdr->id;
2580         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2581                                                      (u_char *) buf->start);
2582 
2583         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2584                        req_impl->stream,
2585                        (int) m.mmap_msg.mmap_id,
2586                        (int) m.mmap_msg.chunk_id,
2587                        (int) m.mmap_msg.size);
2588 
2589         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2590                                  NULL, 0);
2591         if (nxt_slow_path(res != sizeof(m))) {
2592             goto free_buf;
2593         }
2594 
2595         last_used = (u_char *) buf->free - 1;
2596         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2597 
2598         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2599             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2600 
2601             buf->start = (char *) first_free;
2602             buf->free = buf->start;
2603 
2604             if (buf->end < buf->start) {
2605                 buf->end = buf->start;
2606             }
2607 
2608         } else {
2609             buf->start = NULL;
2610             buf->free = NULL;
2611             buf->end = NULL;
2612 
2613             mmap_buf->hdr = NULL;
2614         }
2615 
2616         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2617                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2618 
2619         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2620                        (int) lib->outgoing.allocated_chunks);
2621 
2622     } else {
2623         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2624                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2625         {
2626             nxt_unit_alert(req->ctx,
2627                            "#%"PRIu32": failed to send plain memory buffer"
2628                            ": no space reserved for message header",
2629                            req_impl->stream);
2630 
2631             goto free_buf;
2632         }
2633 
2634         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2635 
2636         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2637                        req_impl->stream,
2638                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2639 
2640         res = nxt_unit_port_send(req->ctx, req->response_port,
2641                                  buf->start - sizeof(m.msg),
2642                                  m.mmap_msg.size + sizeof(m.msg),
2643                                  NULL, 0);
2644         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2645             goto free_buf;
2646         }
2647     }
2648 
2649     rc = NXT_UNIT_OK;
2650 
2651 free_buf:
2652 
2653     nxt_unit_free_outgoing_buf(mmap_buf);
2654 
2655     return rc;
2656 }
2657 
2658 
2659 void
2660 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2661 {
2662     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2663 }
2664 
2665 
2666 static void
2667 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2668 {
2669     nxt_unit_free_outgoing_buf(mmap_buf);
2670 
2671     nxt_unit_mmap_buf_release(mmap_buf);
2672 }
2673 
2674 
2675 static void
2676 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2677 {
2678     if (mmap_buf->hdr != NULL) {
2679         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2680                               mmap_buf->hdr, mmap_buf->buf.start,
2681                               mmap_buf->buf.end - mmap_buf->buf.start);
2682 
2683         mmap_buf->hdr = NULL;
2684 
2685         return;
2686     }
2687 
2688     if (mmap_buf->free_ptr != NULL) {
2689         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2690 
2691         mmap_buf->free_ptr = NULL;
2692     }
2693 }
2694 
2695 
2696 static nxt_unit_read_buf_t *
2697 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2698 {
2699     nxt_unit_ctx_impl_t  *ctx_impl;
2700     nxt_unit_read_buf_t  *rbuf;
2701 
2702     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2703 
2704     pthread_mutex_lock(&ctx_impl->mutex);
2705 
2706     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2707 
2708     pthread_mutex_unlock(&ctx_impl->mutex);
2709 
2710     memset(rbuf->oob, 0, sizeof(struct cmsghdr));
2711 
2712     return rbuf;
2713 }
2714 
2715 
2716 static nxt_unit_read_buf_t *
2717 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2718 {
2719     nxt_queue_link_t     *link;
2720     nxt_unit_read_buf_t  *rbuf;
2721 
2722     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2723         link = nxt_queue_first(&ctx_impl->free_rbuf);
2724         nxt_queue_remove(link);
2725 
2726         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2727 
2728         return rbuf;
2729     }
2730 
2731     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2732 
2733     if (nxt_fast_path(rbuf != NULL)) {
2734         rbuf->ctx_impl = ctx_impl;
2735     }
2736 
2737     return rbuf;
2738 }
2739 
2740 
2741 static void
2742 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2743     nxt_unit_read_buf_t *rbuf)
2744 {
2745     nxt_unit_ctx_impl_t  *ctx_impl;
2746 
2747     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2748 
2749     pthread_mutex_lock(&ctx_impl->mutex);
2750 
2751     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2752 
2753     pthread_mutex_unlock(&ctx_impl->mutex);
2754 }
2755 
2756 
2757 nxt_unit_buf_t *
2758 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2759 {
2760     nxt_unit_mmap_buf_t  *mmap_buf;
2761 
2762     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2763 
2764     if (mmap_buf->next == NULL) {
2765         return NULL;
2766     }
2767 
2768     return &mmap_buf->next->buf;
2769 }
2770 
2771 
2772 uint32_t
2773 nxt_unit_buf_max(void)
2774 {
2775     return PORT_MMAP_DATA_SIZE;
2776 }
2777 
2778 
2779 uint32_t
2780 nxt_unit_buf_min(void)
2781 {
2782     return PORT_MMAP_CHUNK_SIZE;
2783 }
2784 
2785 
2786 int
2787 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2788     size_t size)
2789 {
2790     ssize_t  res;
2791 
2792     res = nxt_unit_response_write_nb(req, start, size, size);
2793 
2794     return res < 0 ? -res : NXT_UNIT_OK;
2795 }
2796 
2797 
2798 ssize_t
2799 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2800     size_t size, size_t min_size)
2801 {
2802     int                           rc;
2803     ssize_t                       sent;
2804     uint32_t                      part_size, min_part_size, buf_size;
2805     const char                    *part_start;
2806     nxt_unit_mmap_buf_t           mmap_buf;
2807     nxt_unit_request_info_impl_t  *req_impl;
2808     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2809 
2810     nxt_unit_req_debug(req, "write: %d", (int) size);
2811 
2812     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2813 
2814     part_start = start;
2815     sent = 0;
2816 
2817     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2818         nxt_unit_req_alert(req, "write: response not initialized yet");
2819 
2820         return -NXT_UNIT_ERROR;
2821     }
2822 
2823     /* Check if response is not send yet. */
2824     if (nxt_slow_path(req->response_buf != NULL)) {
2825         part_size = req->response_buf->end - req->response_buf->free;
2826         part_size = nxt_min(size, part_size);
2827 
2828         rc = nxt_unit_response_add_content(req, part_start, part_size);
2829         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2830             return -rc;
2831         }
2832 
2833         rc = nxt_unit_response_send(req);
2834         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2835             return -rc;
2836         }
2837 
2838         size -= part_size;
2839         part_start += part_size;
2840         sent += part_size;
2841 
2842         min_size -= nxt_min(min_size, part_size);
2843     }
2844 
2845     while (size > 0) {
2846         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2847         min_part_size = nxt_min(min_size, part_size);
2848         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2849 
2850         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2851                                        min_part_size, &mmap_buf, local_buf);
2852         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2853             return -rc;
2854         }
2855 
2856         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2857         if (nxt_slow_path(buf_size == 0)) {
2858             return sent;
2859         }
2860         part_size = nxt_min(buf_size, part_size);
2861 
2862         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2863                                        part_start, part_size);
2864 
2865         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2866         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2867             return -rc;
2868         }
2869 
2870         size -= part_size;
2871         part_start += part_size;
2872         sent += part_size;
2873 
2874         min_size -= nxt_min(min_size, part_size);
2875     }
2876 
2877     return sent;
2878 }
2879 
2880 
2881 int
2882 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2883     nxt_unit_read_info_t *read_info)
2884 {
2885     int                           rc;
2886     ssize_t                       n;
2887     uint32_t                      buf_size;
2888     nxt_unit_buf_t                *buf;
2889     nxt_unit_mmap_buf_t           mmap_buf;
2890     nxt_unit_request_info_impl_t  *req_impl;
2891     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2892 
2893     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2894 
2895     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2896         nxt_unit_req_alert(req, "write: response not initialized yet");
2897 
2898         return NXT_UNIT_ERROR;
2899     }
2900 
2901     /* Check if response is not send yet. */
2902     if (nxt_slow_path(req->response_buf != NULL)) {
2903 
2904         /* Enable content in headers buf. */
2905         rc = nxt_unit_response_add_content(req, "", 0);
2906         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2907             nxt_unit_req_error(req, "Failed to add piggyback content");
2908 
2909             return rc;
2910         }
2911 
2912         buf = req->response_buf;
2913 
2914         while (buf->end - buf->free > 0) {
2915             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2916             if (nxt_slow_path(n < 0)) {
2917                 nxt_unit_req_error(req, "Read error");
2918 
2919                 return NXT_UNIT_ERROR;
2920             }
2921 
2922             /* Manually increase sizes. */
2923             buf->free += n;
2924             req->response->piggyback_content_length += n;
2925 
2926             if (read_info->eof) {
2927                 break;
2928             }
2929         }
2930 
2931         rc = nxt_unit_response_send(req);
2932         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2933             nxt_unit_req_error(req, "Failed to send headers with content");
2934 
2935             return rc;
2936         }
2937 
2938         if (read_info->eof) {
2939             return NXT_UNIT_OK;
2940         }
2941     }
2942 
2943     while (!read_info->eof) {
2944         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
2945                            read_info->buf_size);
2946 
2947         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
2948 
2949         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2950                                        buf_size, buf_size,
2951                                        &mmap_buf, local_buf);
2952         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2953             return rc;
2954         }
2955 
2956         buf = &mmap_buf.buf;
2957 
2958         while (!read_info->eof && buf->end > buf->free) {
2959             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2960             if (nxt_slow_path(n < 0)) {
2961                 nxt_unit_req_error(req, "Read error");
2962 
2963                 nxt_unit_free_outgoing_buf(&mmap_buf);
2964 
2965                 return NXT_UNIT_ERROR;
2966             }
2967 
2968             buf->free += n;
2969         }
2970 
2971         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2972         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2973             nxt_unit_req_error(req, "Failed to send content");
2974 
2975             return rc;
2976         }
2977     }
2978 
2979     return NXT_UNIT_OK;
2980 }
2981 
2982 
2983 ssize_t
2984 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
2985 {
2986     ssize_t  buf_res, res;
2987 
2988     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
2989                                 dst, size);
2990 
2991     nxt_unit_req_debug(req, "read: %d", (int) buf_res);
2992 
2993     if (buf_res < (ssize_t) size && req->content_fd != -1) {
2994         res = read(req->content_fd, dst, size);
2995         if (nxt_slow_path(res < 0)) {
2996             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
2997                                strerror(errno), errno);
2998 
2999             return res;
3000         }
3001 
3002         if (res < (ssize_t) size) {
3003             nxt_unit_close(req->content_fd);
3004 
3005             req->content_fd = -1;
3006         }
3007 
3008         req->content_length -= res;
3009         size -= res;
3010 
3011         dst = nxt_pointer_to(dst, res);
3012 
3013     } else {
3014         res = 0;
3015     }
3016 
3017     return buf_res + res;
3018 }
3019 
3020 
3021 ssize_t
3022 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3023 {
3024     char                 *p;
3025     size_t               l_size, b_size;
3026     nxt_unit_buf_t       *b;
3027     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3028 
3029     if (req->content_length == 0) {
3030         return 0;
3031     }
3032 
3033     l_size = 0;
3034 
3035     b = req->content_buf;
3036 
3037     while (b != NULL) {
3038         b_size = b->end - b->free;
3039         p = memchr(b->free, '\n', b_size);
3040 
3041         if (p != NULL) {
3042             p++;
3043             l_size += p - b->free;
3044             break;
3045         }
3046 
3047         l_size += b_size;
3048 
3049         if (max_size <= l_size) {
3050             break;
3051         }
3052 
3053         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
3054         if (mmap_buf->next == NULL
3055             && req->content_fd != -1
3056             && l_size < req->content_length)
3057         {
3058             preread_buf = nxt_unit_request_preread(req, 16384);
3059             if (nxt_slow_path(preread_buf == NULL)) {
3060                 return -1;
3061             }
3062 
3063             nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
3064         }
3065 
3066         b = nxt_unit_buf_next(b);
3067     }
3068 
3069     return nxt_min(max_size, l_size);
3070 }
3071 
3072 
3073 static nxt_unit_mmap_buf_t *
3074 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
3075 {
3076     ssize_t              res;
3077     nxt_unit_mmap_buf_t  *mmap_buf;
3078 
3079     if (req->content_fd == -1) {
3080         nxt_unit_req_alert(req, "preread: content_fd == -1");
3081         return NULL;
3082     }
3083 
3084     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
3085     if (nxt_slow_path(mmap_buf == NULL)) {
3086         nxt_unit_req_alert(req, "preread: failed to allocate buf");
3087         return NULL;
3088     }
3089 
3090     mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size);
3091     if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3092         nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
3093         nxt_unit_mmap_buf_release(mmap_buf);
3094         return NULL;
3095     }
3096 
3097     mmap_buf->plain_ptr = mmap_buf->free_ptr;
3098 
3099     mmap_buf->hdr = NULL;
3100     mmap_buf->buf.start = mmap_buf->free_ptr;
3101     mmap_buf->buf.free = mmap_buf->buf.start;
3102     mmap_buf->buf.end = mmap_buf->buf.start + size;
3103 
3104     res = read(req->content_fd, mmap_buf->free_ptr, size);
3105     if (res < 0) {
3106         nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3107                            strerror(errno), errno);
3108 
3109         nxt_unit_mmap_buf_free(mmap_buf);
3110 
3111         return NULL;
3112     }
3113 
3114     if (res < (ssize_t) size) {
3115         nxt_unit_close(req->content_fd);
3116 
3117         req->content_fd = -1;
3118     }
3119 
3120     nxt_unit_req_debug(req, "preread: read %d", (int) res);
3121 
3122     mmap_buf->buf.end = mmap_buf->buf.free + res;
3123 
3124     return mmap_buf;
3125 }
3126 
3127 
3128 static ssize_t
3129 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
3130 {
3131     u_char          *p;
3132     size_t          rest, copy, read;
3133     nxt_unit_buf_t  *buf, *last_buf;
3134 
3135     p = dst;
3136     rest = size;
3137 
3138     buf = *b;
3139     last_buf = buf;
3140 
3141     while (buf != NULL) {
3142         last_buf = buf;
3143 
3144         copy = buf->end - buf->free;
3145         copy = nxt_min(rest, copy);
3146 
3147         p = nxt_cpymem(p, buf->free, copy);
3148 
3149         buf->free += copy;
3150         rest -= copy;
3151 
3152         if (rest == 0) {
3153             if (buf->end == buf->free) {
3154                 buf = nxt_unit_buf_next(buf);
3155             }
3156 
3157             break;
3158         }
3159 
3160         buf = nxt_unit_buf_next(buf);
3161     }
3162 
3163     *b = last_buf;
3164 
3165     read = size - rest;
3166 
3167     *len -= read;
3168 
3169     return read;
3170 }
3171 
3172 
3173 void
3174 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
3175 {
3176     uint32_t                      size;
3177     nxt_port_msg_t                msg;
3178     nxt_unit_impl_t               *lib;
3179     nxt_unit_request_info_impl_t  *req_impl;
3180 
3181     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
3182 
3183     nxt_unit_req_debug(req, "done: %d", rc);
3184 
3185     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3186         goto skip_response_send;
3187     }
3188 
3189     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
3190 
3191         size = nxt_length("Content-Type") + nxt_length("text/plain");
3192 
3193         rc = nxt_unit_response_init(req, 200, 1, size);
3194         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3195             goto skip_response_send;
3196         }
3197 
3198         rc = nxt_unit_response_add_field(req, "Content-Type",
3199                                    nxt_length("Content-Type"),
3200                                    "text/plain", nxt_length("text/plain"));
3201         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3202             goto skip_response_send;
3203         }
3204     }
3205 
3206     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
3207 
3208         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
3209 
3210         nxt_unit_buf_send_done(req->response_buf);
3211 
3212         return;
3213     }
3214 
3215 skip_response_send:
3216 
3217     lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
3218 
3219     msg.stream = req_impl->stream;
3220     msg.pid = lib->pid;
3221     msg.reply_port = 0;
3222     msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
3223                                    : _NXT_PORT_MSG_RPC_ERROR;
3224     msg.last = 1;
3225     msg.mmap = 0;
3226     msg.nf = 0;
3227     msg.mf = 0;
3228     msg.tracking = 0;
3229 
3230     (void) nxt_unit_port_send(req->ctx, req->response_port,
3231                               &msg, sizeof(msg), NULL, 0);
3232 
3233     nxt_unit_request_info_release(req);
3234 }
3235 
3236 
3237 int
3238 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
3239     uint8_t last, const void *start, size_t size)
3240 {
3241     const struct iovec  iov = { (void *) start, size };
3242 
3243     return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
3244 }
3245 
3246 
3247 int
3248 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
3249     uint8_t last, const struct iovec *iov, int iovcnt)
3250 {
3251     int                     i, rc;
3252     size_t                  l, copy;
3253     uint32_t                payload_len, buf_size, alloc_size;
3254     const uint8_t           *b;
3255     nxt_unit_buf_t          *buf;
3256     nxt_unit_mmap_buf_t     mmap_buf;
3257     nxt_websocket_header_t  *wh;
3258     char                    local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
3259 
3260     payload_len = 0;
3261 
3262     for (i = 0; i < iovcnt; i++) {
3263         payload_len += iov[i].iov_len;
3264     }
3265 
3266     buf_size = 10 + payload_len;
3267     alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3268 
3269     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3270                                    alloc_size, alloc_size,
3271                                    &mmap_buf, local_buf);
3272     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3273         return rc;
3274     }
3275 
3276     buf = &mmap_buf.buf;
3277 
3278     buf->start[0] = 0;
3279     buf->start[1] = 0;
3280 
3281     buf_size -= buf->end - buf->start;
3282 
3283     wh = (void *) buf->free;
3284 
3285     buf->free = nxt_websocket_frame_init(wh, payload_len);
3286     wh->fin = last;
3287     wh->opcode = opcode;
3288 
3289     for (i = 0; i < iovcnt; i++) {
3290         b = iov[i].iov_base;
3291         l = iov[i].iov_len;
3292 
3293         while (l > 0) {
3294             copy = buf->end - buf->free;
3295             copy = nxt_min(l, copy);
3296 
3297             buf->free = nxt_cpymem(buf->free, b, copy);
3298             b += copy;
3299             l -= copy;
3300 
3301             if (l > 0) {
3302                 if (nxt_fast_path(buf->free > buf->start)) {
3303                     rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3304 
3305                     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3306                         return rc;
3307                     }
3308                 }
3309 
3310                 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3311 
3312                 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3313                                                alloc_size, alloc_size,
3314                                                &mmap_buf, local_buf);
3315                 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3316                     return rc;
3317                 }
3318 
3319                 buf_size -= buf->end - buf->start;
3320             }
3321         }
3322     }
3323 
3324     if (buf->free > buf->start) {
3325         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3326     }
3327 
3328     return rc;
3329 }
3330 
3331 
3332 ssize_t
3333 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
3334     size_t size)
3335 {
3336     ssize_t   res;
3337     uint8_t   *b;
3338     uint64_t  i, d;
3339 
3340     res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
3341                             dst, size);
3342 
3343     if (ws->mask == NULL) {
3344         return res;
3345     }
3346 
3347     b = dst;
3348     d = (ws->payload_len - ws->content_length - res) % 4;
3349 
3350     for (i = 0; i < (uint64_t) res; i++) {
3351         b[i] ^= ws->mask[ (i + d) % 4 ];
3352     }
3353 
3354     return res;
3355 }
3356 
3357 
3358 int
3359 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
3360 {
3361     char                             *b;
3362     size_t                           size, hsize;
3363     nxt_unit_websocket_frame_impl_t  *ws_impl;
3364 
3365     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
3366 
3367     if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
3368         return NXT_UNIT_OK;
3369     }
3370 
3371     size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
3372 
3373     b = nxt_unit_malloc(ws->req->ctx, size);
3374     if (nxt_slow_path(b == NULL)) {
3375         return NXT_UNIT_ERROR;
3376     }
3377 
3378     memcpy(b, ws_impl->buf->buf.start, size);
3379 
3380     hsize = nxt_websocket_frame_header_size(b);
3381 
3382     ws_impl->buf->buf.start = b;
3383     ws_impl->buf->buf.free = b + hsize;
3384     ws_impl->buf->buf.end = b + size;
3385 
3386     ws_impl->buf->free_ptr = b;
3387 
3388     ws_impl->ws.header = (nxt_websocket_header_t *) b;
3389 
3390     if (ws_impl->ws.header->mask) {
3391         ws_impl->ws.mask = (uint8_t *) b + hsize - 4;
3392 
3393     } else {
3394         ws_impl->ws.mask = NULL;
3395     }
3396 
3397     return NXT_UNIT_OK;
3398 }
3399 
3400 
3401 void
3402 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
3403 {
3404     nxt_unit_websocket_frame_release(ws);
3405 }
3406 
3407 
3408 static nxt_port_mmap_header_t *
3409 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3410     nxt_chunk_id_t *c, int *n, int min_n)
3411 {
3412     int                     res, nchunks, i;
3413     uint32_t                outgoing_size;
3414     nxt_unit_mmap_t         *mm, *mm_end;
3415     nxt_unit_impl_t         *lib;
3416     nxt_port_mmap_header_t  *hdr;
3417 
3418     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3419 
3420     pthread_mutex_lock(&lib->outgoing.mutex);
3421 
3422 retry:
3423 
3424     outgoing_size = lib->outgoing.size;
3425 
3426     mm_end = lib->outgoing.elts + outgoing_size;
3427 
3428     for (mm = lib->outgoing.elts; mm < mm_end; mm++) {
3429         hdr = mm->hdr;
3430 
3431         if (hdr->sent_over != 0xFFFFu
3432             && (hdr->sent_over != port->id.id
3433                 || mm->src_thread != pthread_self()))
3434         {
3435             continue;
3436         }
3437 
3438         *c = 0;
3439 
3440         while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
3441             nchunks = 1;
3442 
3443             while (nchunks < *n) {
3444                 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
3445                                                        *c + nchunks);
3446 
3447                 if (res == 0) {
3448                     if (nchunks >= min_n) {
3449                         *n = nchunks;
3450 
3451                         goto unlock;
3452                     }
3453 
3454                     for (i = 0; i < nchunks; i++) {
3455                         nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
3456                     }
3457 
3458                     *c += nchunks + 1;
3459                     nchunks = 0;
3460                     break;
3461                 }
3462 
3463                 nchunks++;
3464             }
3465 
3466             if (nchunks >= min_n) {
3467                 *n = nchunks;
3468 
3469                 goto unlock;
3470             }
3471         }
3472 
3473         hdr->oosm = 1;
3474     }
3475 
3476     if (outgoing_size >= lib->shm_mmap_limit) {
3477         /* Cannot allocate more shared memory. */
3478         pthread_mutex_unlock(&lib->outgoing.mutex);
3479 
3480         if (min_n == 0) {
3481             *n = 0;
3482         }
3483 
3484         if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n
3485                           >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
3486         {
3487             /* Memory allocated by application, but not send to router. */
3488             return NULL;
3489         }
3490 
3491         /* Notify router about OOSM condition. */
3492 
3493         res = nxt_unit_send_oosm(ctx, port);
3494         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3495             return NULL;
3496         }
3497 
3498         /* Return if caller can handle OOSM condition. Non-blocking mode. */
3499 
3500         if (min_n == 0) {
3501             return NULL;
3502         }
3503 
3504         nxt_unit_debug(ctx, "oosm: waiting for ACK");
3505 
3506         res = nxt_unit_wait_shm_ack(ctx);
3507         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3508             return NULL;
3509         }
3510 
3511         nxt_unit_debug(ctx, "oosm: retry");
3512 
3513         pthread_mutex_lock(&lib->outgoing.mutex);
3514 
3515         goto retry;
3516     }
3517 
3518     *c = 0;
3519     hdr = nxt_unit_new_mmap(ctx, port, *n);
3520 
3521 unlock:
3522 
3523     nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n);
3524 
3525     nxt_unit_debug(ctx, "allocated_chunks %d",
3526                    (int) lib->outgoing.allocated_chunks);
3527 
3528     pthread_mutex_unlock(&lib->outgoing.mutex);
3529 
3530     return hdr;
3531 }
3532 
3533 
3534 static int
3535 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
3536 {
3537     ssize_t          res;
3538     nxt_port_msg_t   msg;
3539     nxt_unit_impl_t  *lib;
3540 
3541     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3542 
3543     msg.stream = 0;
3544     msg.pid = lib->pid;
3545     msg.reply_port = 0;
3546     msg.type = _NXT_PORT_MSG_OOSM;
3547     msg.last = 0;
3548     msg.mmap = 0;
3549     msg.nf = 0;
3550     msg.mf = 0;
3551     msg.tracking = 0;
3552 
3553     res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
3554     if (nxt_slow_path(res != sizeof(msg))) {
3555         return NXT_UNIT_ERROR;
3556     }
3557 
3558     return NXT_UNIT_OK;
3559 }
3560 
3561 
3562 static int
3563 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
3564 {
3565     int                  res;
3566     nxt_unit_ctx_impl_t  *ctx_impl;
3567     nxt_unit_read_buf_t  *rbuf;
3568 
3569     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3570 
3571     while (1) {
3572         rbuf = nxt_unit_read_buf_get(ctx);
3573         if (nxt_slow_path(rbuf == NULL)) {
3574             return NXT_UNIT_ERROR;
3575         }
3576 
3577         res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
3578         if (res == NXT_UNIT_ERROR) {
3579             nxt_unit_read_buf_release(ctx, rbuf);
3580 
3581             return NXT_UNIT_ERROR;
3582         }
3583 
3584         if (nxt_unit_is_shm_ack(rbuf)) {
3585             nxt_unit_read_buf_release(ctx, rbuf);
3586             break;
3587         }
3588 
3589         pthread_mutex_lock(&ctx_impl->mutex);
3590 
3591         nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
3592 
3593         pthread_mutex_unlock(&ctx_impl->mutex);
3594 
3595         if (nxt_unit_is_quit(rbuf)) {
3596             nxt_unit_debug(ctx, "oosm: quit received");
3597 
3598             return NXT_UNIT_ERROR;
3599         }
3600     }
3601 
3602     return NXT_UNIT_OK;
3603 }
3604 
3605 
3606 static nxt_unit_mmap_t *
3607 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
3608 {
3609     uint32_t         cap, n;
3610     nxt_unit_mmap_t  *e;
3611 
3612     if (nxt_fast_path(mmaps->size > i)) {
3613         return mmaps->elts + i;
3614     }
3615 
3616     cap = mmaps->cap;
3617 
3618     if (cap == 0) {
3619         cap = i + 1;
3620     }
3621 
3622     while (i + 1 > cap) {
3623 
3624         if (cap < 16) {
3625             cap = cap * 2;
3626 
3627         } else {
3628             cap = cap + cap / 2;
3629         }
3630     }
3631 
3632     if (cap != mmaps->cap) {
3633 
3634         e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t));
3635         if (nxt_slow_path(e == NULL)) {
3636             return NULL;
3637         }
3638 
3639         mmaps->elts = e;
3640 
3641         for (n = mmaps->cap; n < cap; n++) {
3642             e = mmaps->elts + n;
3643 
3644             e->hdr = NULL;
3645             nxt_queue_init(&e->awaiting_rbuf);
3646         }
3647 
3648         mmaps->cap = cap;
3649     }
3650 
3651     if (i + 1 > mmaps->size) {
3652         mmaps->size = i + 1;
3653     }
3654 
3655     return mmaps->elts + i;
3656 }
3657 
3658 
3659 static nxt_port_mmap_header_t *
3660 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
3661 {
3662     int                     i, fd, rc;
3663     void                    *mem;
3664     nxt_unit_mmap_t         *mm;
3665     nxt_unit_impl_t         *lib;
3666     nxt_port_mmap_header_t  *hdr;
3667 
3668     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3669 
3670     mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size);
3671     if (nxt_slow_path(mm == NULL)) {
3672         nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
3673 
3674         return NULL;
3675     }
3676 
3677     fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE);
3678     if (nxt_slow_path(fd == -1)) {
3679         goto remove_fail;
3680     }
3681 
3682     mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3683     if (nxt_slow_path(mem == MAP_FAILED)) {
3684         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3685                        strerror(errno), errno);
3686 
3687         nxt_unit_close(fd);
3688 
3689         goto remove_fail;
3690     }
3691 
3692     mm->hdr = mem;
3693     hdr = mem;
3694 
3695     memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
3696     memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
3697 
3698     hdr->id = lib->outgoing.size - 1;
3699     hdr->src_pid = lib->pid;
3700     hdr->dst_pid = port->id.pid;
3701     hdr->sent_over = port->id.id;
3702     mm->src_thread = pthread_self();
3703 
3704     /* Mark first n chunk(s) as busy */
3705     for (i = 0; i < n; i++) {
3706         nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
3707     }
3708 
3709     /* Mark as busy chunk followed the last available chunk. */
3710     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
3711     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
3712 
3713     pthread_mutex_unlock(&lib->outgoing.mutex);
3714 
3715     rc = nxt_unit_send_mmap(ctx, port, fd);
3716     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3717         munmap(mem, PORT_MMAP_SIZE);
3718         hdr = NULL;
3719 
3720     } else {
3721         nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
3722                        hdr->id, (int) lib->pid, (int) port->id.pid);
3723     }
3724 
3725     nxt_unit_close(fd);
3726 
3727     pthread_mutex_lock(&lib->outgoing.mutex);
3728 
3729     if (nxt_fast_path(hdr != NULL)) {
3730         return hdr;
3731     }
3732 
3733 remove_fail:
3734 
3735     lib->outgoing.size--;
3736 
3737     return NULL;
3738 }
3739 
3740 
3741 static int
3742 nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size)
3743 {
3744     int              fd;
3745     nxt_unit_impl_t  *lib;
3746 
3747     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3748 
3749 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
3750     char             name[64];
3751 
3752     snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
3753              lib->pid, (void *) pthread_self());
3754 #endif
3755 
3756 #if (NXT_HAVE_MEMFD_CREATE)
3757 
3758     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
3759     if (nxt_slow_path(fd == -1)) {
3760         nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
3761                        strerror(errno), errno);
3762 
3763         return -1;
3764     }
3765 
3766     nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
3767 
3768 #elif (NXT_HAVE_SHM_OPEN_ANON)
3769 
3770     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
3771     if (nxt_slow_path(fd == -1)) {
3772         nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
3773                        strerror(errno), errno);
3774 
3775         return -1;
3776     }
3777 
3778 #elif (NXT_HAVE_SHM_OPEN)
3779 
3780     /* Just in case. */
3781     shm_unlink(name);
3782 
3783     fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
3784     if (nxt_slow_path(fd == -1)) {
3785         nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
3786                        strerror(errno), errno);
3787 
3788         return -1;
3789     }
3790 
3791     if (nxt_slow_path(shm_unlink(name) == -1)) {
3792         nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name,
3793                        strerror(errno), errno);
3794     }
3795 
3796 #else
3797 
3798 #error No working shared memory implementation.
3799 
3800 #endif
3801 
3802     if (nxt_slow_path(ftruncate(fd, size) == -1)) {
3803         nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3804                        strerror(errno), errno);
3805 
3806         nxt_unit_close(fd);
3807 
3808         return -1;
3809     }
3810 
3811     return fd;
3812 }
3813 
3814 
3815 static int
3816 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
3817 {
3818     ssize_t          res;
3819     nxt_port_msg_t   msg;
3820     nxt_unit_impl_t  *lib;
3821     union {
3822         struct cmsghdr  cm;
3823         char            space[CMSG_SPACE(sizeof(int))];
3824     } cmsg;
3825 
3826     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3827 
3828     msg.stream = 0;
3829     msg.pid = lib->pid;
3830     msg.reply_port = 0;
3831     msg.type = _NXT_PORT_MSG_MMAP;
3832     msg.last = 0;
3833     msg.mmap = 0;
3834     msg.nf = 0;
3835     msg.mf = 0;
3836     msg.tracking = 0;
3837 
3838     /*
3839      * Fill all padding fields with 0.
3840      * Code in Go 1.11 validate cmsghdr using padding field as part of len.
3841      * See Cmsghdr definition and socketControlMessageHeaderAndData function.
3842      */
3843     memset(&cmsg, 0, sizeof(cmsg));
3844 
3845     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
3846     cmsg.cm.cmsg_level = SOL_SOCKET;
3847     cmsg.cm.cmsg_type = SCM_RIGHTS;
3848 
3849     /*
3850      * memcpy() is used instead of simple
3851      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
3852      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
3853      *   dereferencing type-punned pointer will break strict-aliasing rules
3854      *
3855      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
3856      * in the same simple assignment as in the code above.
3857      */
3858     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
3859 
3860     res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg),
3861                              &cmsg, sizeof(cmsg));
3862     if (nxt_slow_path(res != sizeof(msg))) {
3863         return NXT_UNIT_ERROR;
3864     }
3865 
3866     return NXT_UNIT_OK;
3867 }
3868 
3869 
3870 static int
3871 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3872     uint32_t size, uint32_t min_size,
3873     nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
3874 {
3875     int                     nchunks, min_nchunks;
3876     nxt_chunk_id_t          c;
3877     nxt_port_mmap_header_t  *hdr;
3878 
3879     if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
3880         if (local_buf != NULL) {
3881             mmap_buf->free_ptr = NULL;
3882             mmap_buf->plain_ptr = local_buf;
3883 
3884         } else {
3885             mmap_buf->free_ptr = nxt_unit_malloc(ctx,
3886                                                  size + sizeof(nxt_port_msg_t));
3887             if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3888                 return NXT_UNIT_ERROR;
3889             }
3890 
3891             mmap_buf->plain_ptr = mmap_buf->free_ptr;
3892         }
3893 
3894         mmap_buf->hdr = NULL;
3895         mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
3896         mmap_buf->buf.free = mmap_buf->buf.start;
3897         mmap_buf->buf.end = mmap_buf->buf.start + size;
3898 
3899         nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
3900                        mmap_buf->buf.start, (int) size);
3901 
3902         return NXT_UNIT_OK;
3903     }
3904 
3905     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3906     min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3907 
3908     hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks);
3909     if (nxt_slow_path(hdr == NULL)) {
3910         if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
3911             mmap_buf->hdr = NULL;
3912             mmap_buf->buf.start = NULL;
3913             mmap_buf->buf.free = NULL;
3914             mmap_buf->buf.end = NULL;
3915             mmap_buf->free_ptr = NULL;
3916 
3917             return NXT_UNIT_OK;
3918         }
3919 
3920         return NXT_UNIT_ERROR;
3921     }
3922 
3923     mmap_buf->hdr = hdr;
3924     mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
3925     mmap_buf->buf.free = mmap_buf->buf.start;
3926     mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
3927     mmap_buf->free_ptr = NULL;
3928     mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3929 
3930     nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
3931                   (int) hdr->id, (int) c,
3932                   (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
3933 
3934     return NXT_UNIT_OK;
3935 }
3936 
3937 
3938 static int
3939 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
3940 {
3941     int                     rc;
3942     void                    *mem;
3943     nxt_queue_t             awaiting_rbuf;
3944     struct stat             mmap_stat;
3945     nxt_unit_mmap_t         *mm;
3946     nxt_unit_impl_t         *lib;
3947     nxt_unit_ctx_impl_t     *ctx_impl;
3948     nxt_unit_read_buf_t     *rbuf;
3949     nxt_port_mmap_header_t  *hdr;
3950 
3951     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3952 
3953     nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
3954 
3955     if (fstat(fd, &mmap_stat) == -1) {
3956         nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
3957                        strerror(errno), errno);
3958 
3959         return NXT_UNIT_ERROR;
3960     }
3961 
3962     mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
3963                MAP_SHARED, fd, 0);
3964     if (nxt_slow_path(mem == MAP_FAILED)) {
3965         nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)",
3966                        strerror(errno), errno);
3967 
3968         return NXT_UNIT_ERROR;
3969     }
3970 
3971     hdr = mem;
3972 
3973     if (nxt_slow_path(hdr->src_pid != pid)) {
3974 
3975         nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header "
3976                        "detected: %d != %d or %d != %d", (int) hdr->src_pid,
3977                        (int) pid, (int) hdr->dst_pid, (int) lib->pid);
3978 
3979         munmap(mem, PORT_MMAP_SIZE);
3980 
3981         return NXT_UNIT_ERROR;
3982     }
3983 
3984     nxt_queue_init(&awaiting_rbuf);
3985 
3986     pthread_mutex_lock(&lib->incoming.mutex);
3987 
3988     mm = nxt_unit_mmap_at(&lib->incoming, hdr->id);
3989     if (nxt_slow_path(mm == NULL)) {
3990         nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array");
3991 
3992         munmap(mem, PORT_MMAP_SIZE);
3993 
3994         rc = NXT_UNIT_ERROR;
3995 
3996     } else {
3997         mm->hdr = hdr;
3998 
3999         hdr->sent_over = 0xFFFFu;
4000 
4001         nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf);
4002         nxt_queue_init(&mm->awaiting_rbuf);
4003 
4004         rc = NXT_UNIT_OK;
4005     }
4006 
4007     pthread_mutex_unlock(&lib->incoming.mutex);
4008 
4009     nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
4010 
4011         ctx_impl = rbuf->ctx_impl;
4012 
4013         pthread_mutex_lock(&ctx_impl->mutex);
4014 
4015         nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link);
4016 
4017         pthread_mutex_unlock(&ctx_impl->mutex);
4018 
4019         nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
4020 
4021         nxt_unit_awake_ctx(ctx, ctx_impl);
4022 
4023     } nxt_queue_loop;
4024 
4025     return rc;
4026 }
4027 
4028 
4029 static void
4030 nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl)
4031 {
4032     nxt_port_msg_t  msg;
4033 
4034     if (nxt_fast_path(ctx == &ctx_impl->ctx)) {
4035         return;
4036     }
4037 
4038     if (nxt_slow_path(ctx_impl->read_port == NULL
4039                       || ctx_impl->read_port->out_fd == -1))
4040     {
4041         nxt_unit_alert(ctx, "target context read_port is NULL or not writable");
4042 
4043         return;
4044     }
4045 
4046     memset(&msg, 0, sizeof(nxt_port_msg_t));
4047 
4048     msg.type = _NXT_PORT_MSG_RPC_READY;
4049 
4050     (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
4051                               &msg, sizeof(msg), NULL, 0);
4052 }
4053 
4054 
4055 static void
4056 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
4057 {
4058     pthread_mutex_init(&mmaps->mutex, NULL);
4059 
4060     mmaps->size = 0;
4061     mmaps->cap = 0;
4062     mmaps->elts = NULL;
4063     mmaps->allocated_chunks = 0;
4064 }
4065 
4066 
4067 nxt_inline void
4068 nxt_unit_process_use(nxt_unit_process_t *process)
4069 {
4070     nxt_atomic_fetch_add(&process->use_count, 1);
4071 }
4072 
4073 
4074 nxt_inline void
4075 nxt_unit_process_release(nxt_unit_process_t *process)
4076 {
4077     long c;
4078 
4079     c = nxt_atomic_fetch_add(&process->use_count, -1);
4080 
4081     if (c == 1) {
4082         nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
4083 
4084         nxt_unit_free(NULL, process);
4085     }
4086 }
4087 
4088 
4089 static void
4090 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
4091 {
4092     nxt_unit_mmap_t  *mm, *end;
4093 
4094     if (mmaps->elts != NULL) {
4095         end = mmaps->elts + mmaps->size;
4096 
4097         for (mm = mmaps->elts; mm < end; mm++) {
4098             munmap(mm->hdr, PORT_MMAP_SIZE);
4099         }
4100 
4101         nxt_unit_free(NULL, mmaps->elts);
4102     }
4103 
4104     pthread_mutex_destroy(&mmaps->mutex);
4105 }
4106 
4107 
4108 static int
4109 nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
4110     pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
4111     nxt_unit_read_buf_t *rbuf)
4112 {
4113     int                  res, need_rbuf;
4114     nxt_unit_mmap_t      *mm;
4115     nxt_unit_ctx_impl_t  *ctx_impl;
4116 
4117     mm = nxt_unit_mmap_at(mmaps, id);
4118     if (nxt_slow_path(mm == NULL)) {
4119         nxt_unit_alert(ctx, "failed to allocate mmap");
4120 
4121         pthread_mutex_unlock(&mmaps->mutex);
4122 
4123         *hdr = NULL;
4124 
4125         return NXT_UNIT_ERROR;
4126     }
4127 
4128     *hdr = mm->hdr;
4129 
4130     if (nxt_fast_path(*hdr != NULL)) {
4131         return NXT_UNIT_OK;
4132     }
4133 
4134     need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf);
4135 
4136     nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link);
4137 
4138     pthread_mutex_unlock(&mmaps->mutex);
4139 
4140     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4141 
4142     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
4143 
4144     if (need_rbuf) {
4145         res = nxt_unit_get_mmap(ctx, pid, id);
4146         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
4147             return NXT_UNIT_ERROR;
4148         }
4149     }
4150 
4151     return NXT_UNIT_AGAIN;
4152 }
4153 
4154 
4155 static int
4156 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
4157     nxt_unit_read_buf_t *rbuf)
4158 {
4159     int                     res;
4160     void                    *start;
4161     uint32_t                size;
4162     nxt_unit_impl_t         *lib;
4163     nxt_unit_mmaps_t        *mmaps;
4164     nxt_unit_mmap_buf_t     *b, **incoming_tail;
4165     nxt_port_mmap_msg_t     *mmap_msg, *end;
4166     nxt_port_mmap_header_t  *hdr;
4167 
4168     if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
4169         nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
4170                       recv_msg->stream, (int) recv_msg->size);
4171 
4172         return NXT_UNIT_ERROR;
4173     }
4174 
4175     mmap_msg = recv_msg->start;
4176     end = nxt_pointer_to(recv_msg->start, recv_msg->size);
4177 
4178     incoming_tail = &recv_msg->incoming_buf;
4179 
4180     /* Allocating buffer structures. */
4181     for (; mmap_msg < end; mmap_msg++) {
4182         b = nxt_unit_mmap_buf_get(ctx);
4183         if (nxt_slow_path(b == NULL)) {
4184             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
4185                           recv_msg->stream);
4186 
4187             while (recv_msg->incoming_buf != NULL) {
4188                 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4189             }
4190 
4191             return NXT_UNIT_ERROR;
4192         }
4193 
4194         nxt_unit_mmap_buf_insert(incoming_tail, b);
4195         incoming_tail = &b->next;
4196     }
4197 
4198     b = recv_msg->incoming_buf;
4199     mmap_msg = recv_msg->start;
4200 
4201     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4202 
4203     mmaps = &lib->incoming;
4204 
4205     pthread_mutex_lock(&mmaps->mutex);
4206 
4207     for (; mmap_msg < end; mmap_msg++) {
4208         res = nxt_unit_check_rbuf_mmap(ctx, mmaps,
4209                                        recv_msg->pid, mmap_msg->mmap_id,
4210                                        &hdr, rbuf);
4211 
4212         if (nxt_slow_path(res != NXT_UNIT_OK)) {
4213             while (recv_msg->incoming_buf != NULL) {
4214                 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4215             }
4216 
4217             return res;
4218         }
4219 
4220         start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
4221         size = mmap_msg->size;
4222 
4223         if (recv_msg->start == mmap_msg) {
4224             recv_msg->start = start;
4225             recv_msg->size = size;
4226         }
4227 
4228         b->buf.start = start;
4229         b->buf.free = start;
4230         b->buf.end = b->buf.start + size;
4231         b->hdr = hdr;
4232 
4233         b = b->next;
4234 
4235         nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
4236                        recv_msg->stream,
4237                        start, (int) size,
4238                        (int) hdr->src_pid, (int) hdr->dst_pid,
4239                        (int) hdr->id, (int) mmap_msg->chunk_id,
4240                        (int) mmap_msg->size);
4241     }
4242 
4243     pthread_mutex_unlock(&mmaps->mutex);
4244 
4245     return NXT_UNIT_OK;
4246 }
4247 
4248 
4249 static int
4250 nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
4251 {
4252     ssize_t              res;
4253     nxt_unit_impl_t      *lib;
4254     nxt_unit_ctx_impl_t  *ctx_impl;
4255 
4256     struct {
4257         nxt_port_msg_t           msg;
4258         nxt_port_msg_get_mmap_t  get_mmap;
4259     } m;
4260 
4261     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4262     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4263 
4264     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
4265 
4266     m.msg.pid = lib->pid;
4267     m.msg.reply_port = ctx_impl->read_port->id.id;
4268     m.msg.type = _NXT_PORT_MSG_GET_MMAP;
4269 
4270     m.get_mmap.id = id;
4271 
4272     nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
4273 
4274     res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
4275     if (nxt_slow_path(res != sizeof(m))) {
4276         return NXT_UNIT_ERROR;
4277     }
4278 
4279     return NXT_UNIT_OK;
4280 }
4281 
4282 
4283 static void
4284 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr,
4285     void *start, uint32_t size)
4286 {
4287     int              freed_chunks;
4288     u_char           *p, *end;
4289     nxt_chunk_id_t   c;
4290     nxt_unit_impl_t  *lib;
4291 
4292     memset(start, 0xA5, size);
4293 
4294     p = start;
4295     end = p + size;
4296     c = nxt_port_mmap_chunk_id(hdr, p);
4297     freed_chunks = 0;
4298 
4299     while (p < end) {
4300         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
4301 
4302         p += PORT_MMAP_CHUNK_SIZE;
4303         c++;
4304         freed_chunks++;
4305     }
4306 
4307     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4308 
4309     if (hdr->src_pid == lib->pid && freed_chunks != 0) {
4310         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks);
4311 
4312         nxt_unit_debug(ctx, "allocated_chunks %d",
4313                        (int) lib->outgoing.allocated_chunks);
4314     }
4315 
4316     if (hdr->dst_pid == lib->pid
4317         && freed_chunks != 0
4318         && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
4319     {
4320         nxt_unit_send_shm_ack(ctx, hdr->src_pid);
4321     }
4322 }
4323 
4324 
4325 static int
4326 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
4327 {
4328     ssize_t          res;
4329     nxt_port_msg_t   msg;
4330     nxt_unit_impl_t  *lib;
4331 
4332     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4333 
4334     msg.stream = 0;
4335     msg.pid = lib->pid;
4336     msg.reply_port = 0;
4337     msg.type = _NXT_PORT_MSG_SHM_ACK;
4338     msg.last = 0;
4339     msg.mmap = 0;
4340     msg.nf = 0;
4341     msg.mf = 0;
4342     msg.tracking = 0;
4343 
4344     res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
4345     if (nxt_slow_path(res != sizeof(msg))) {
4346         return NXT_UNIT_ERROR;
4347     }
4348 
4349     return NXT_UNIT_OK;
4350 }
4351 
4352 
4353 static nxt_int_t
4354 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
4355 {
4356     nxt_process_t  *process;
4357 
4358     process = data;
4359 
4360     if (lhq->key.length == sizeof(pid_t)
4361         && *(pid_t *) lhq->key.start == process->pid)
4362     {
4363         return NXT_OK;
4364     }
4365 
4366     return NXT_DECLINED;
4367 }
4368 
4369 
4370 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
4371     NXT_LVLHSH_DEFAULT,
4372     nxt_unit_lvlhsh_pid_test,
4373     nxt_unit_lvlhsh_alloc,
4374     nxt_unit_lvlhsh_free,
4375 };
4376 
4377 
4378 static inline void
4379 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
4380 {
4381     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
4382     lhq->key.length = sizeof(*pid);
4383     lhq->key.start = (u_char *) pid;
4384     lhq->proto = &lvlhsh_processes_proto;
4385 }
4386 
4387 
4388 static nxt_unit_process_t *
4389 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
4390 {
4391     nxt_unit_impl_t     *lib;
4392     nxt_unit_process_t  *process;
4393     nxt_lvlhsh_query_t  lhq;
4394 
4395     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4396 
4397     nxt_unit_process_lhq_pid(&lhq, &pid);
4398 
4399     if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
4400         process = lhq.value;
4401         nxt_unit_process_use(process);
4402 
4403         return process;
4404     }
4405 
4406     process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t));
4407     if (nxt_slow_path(process == NULL)) {
4408         nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid);
4409 
4410         return NULL;
4411     }
4412 
4413     process->pid = pid;
4414     process->use_count = 2;
4415     process->next_port_id = 0;
4416     process->lib = lib;
4417 
4418     nxt_queue_init(&process->ports);
4419 
4420     lhq.replace = 0;
4421     lhq.value = process;
4422 
4423     switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
4424 
4425     case NXT_OK:
4426         break;
4427 
4428     default:
4429         nxt_unit_alert(ctx, "process %d insert failed", (int) pid);
4430 
4431         nxt_unit_free(ctx, process);
4432         process = NULL;
4433         break;
4434     }
4435 
4436     return process;
4437 }
4438 
4439 
4440 static nxt_unit_process_t *
4441 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
4442 {
4443     int                 rc;
4444     nxt_lvlhsh_query_t  lhq;
4445 
4446     nxt_unit_process_lhq_pid(&lhq, &pid);
4447 
4448     if (remove) {
4449         rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
4450 
4451     } else {
4452         rc = nxt_lvlhsh_find(&lib->processes, &lhq);
4453     }
4454 
4455     if (rc == NXT_OK) {
4456         if (!remove) {
4457             nxt_unit_process_use(lhq.value);
4458         }
4459 
4460         return lhq.value;
4461     }
4462 
4463     return NULL;
4464 }
4465 
4466 
4467 static nxt_unit_process_t *
4468 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
4469 {
4470     return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
4471 }
4472 
4473 
4474 int
4475 nxt_unit_run(nxt_unit_ctx_t *ctx)
4476 {
4477     int                  rc;
4478     nxt_unit_ctx_impl_t  *ctx_impl;
4479 
4480     nxt_unit_ctx_use(ctx);
4481 
4482     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4483 
4484     rc = NXT_UNIT_OK;
4485 
4486     while (nxt_fast_path(ctx_impl->online)) {
4487         rc = nxt_unit_run_once_impl(ctx);
4488 
4489         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4490             nxt_unit_quit(ctx);
4491             break;
4492         }
4493     }
4494 
4495     nxt_unit_ctx_release(ctx);
4496 
4497     return rc;
4498 }
4499 
4500 
4501 int
4502 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
4503 {
4504     int  rc;
4505 
4506     nxt_unit_ctx_use(ctx);
4507 
4508     rc = nxt_unit_run_once_impl(ctx);
4509 
4510     nxt_unit_ctx_release(ctx);
4511 
4512     return rc;
4513 }
4514 
4515 
4516 static int
4517 nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
4518 {
4519     int                  rc;
4520     nxt_unit_read_buf_t  *rbuf;
4521 
4522     rbuf = nxt_unit_read_buf_get(ctx);
4523     if (nxt_slow_path(rbuf == NULL)) {
4524         return NXT_UNIT_ERROR;
4525     }
4526 
4527     rc = nxt_unit_read_buf(ctx, rbuf);
4528     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4529         nxt_unit_read_buf_release(ctx, rbuf);
4530 
4531         return rc;
4532     }
4533 
4534     rc = nxt_unit_process_msg(ctx, rbuf);
4535     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4536         return NXT_UNIT_ERROR;
4537     }
4538 
4539     rc = nxt_unit_process_pending_rbuf(ctx);
4540     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4541         return NXT_UNIT_ERROR;
4542     }
4543 
4544     nxt_unit_process_ready_req(ctx);
4545 
4546     return rc;
4547 }
4548 
4549 
4550 static int
4551 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4552 {
4553     int                   nevents, res, err;
4554     nxt_unit_impl_t       *lib;
4555     nxt_unit_ctx_impl_t   *ctx_impl;
4556     nxt_unit_port_impl_t  *port_impl;
4557     struct pollfd         fds[2];
4558 
4559     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4560 
4561     if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) {
4562         return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4563     }
4564 
4565     port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
4566                                  port);
4567 
4568     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4569 
4570 retry:
4571 
4572     if (port_impl->from_socket == 0) {
4573         res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf);
4574         if (res == NXT_UNIT_OK) {
4575             if (nxt_unit_is_read_socket(rbuf)) {
4576                 port_impl->from_socket++;
4577 
4578                 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
4579                                (int) ctx_impl->read_port->id.pid,
4580                                (int) ctx_impl->read_port->id.id,
4581                                port_impl->from_socket);
4582 
4583             } else {
4584                 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
4585                                (int) ctx_impl->read_port->id.pid,
4586                                (int) ctx_impl->read_port->id.id,
4587                                (int) rbuf->size);
4588 
4589                 return NXT_UNIT_OK;
4590             }
4591         }
4592     }
4593 
4594     res = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
4595     if (res == NXT_UNIT_OK) {
4596         return NXT_UNIT_OK;
4597     }
4598 
4599     fds[0].fd = ctx_impl->read_port->in_fd;
4600     fds[0].events = POLLIN;
4601     fds[0].revents = 0;
4602 
4603     fds[1].fd = lib->shared_port->in_fd;
4604     fds[1].events = POLLIN;
4605     fds[1].revents = 0;
4606 
4607     nevents = poll(fds, 2, -1);
4608     if (nxt_slow_path(nevents == -1)) {
4609         err = errno;
4610 
4611         if (err == EINTR) {
4612             goto retry;
4613         }
4614 
4615         nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",
4616                        fds[0].fd, fds[1].fd, strerror(err), err);
4617 
4618         rbuf->size = -1;
4619 
4620         return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
4621     }
4622 
4623     nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]",
4624                    fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4625                    fds[1].revents);
4626 
4627     if ((fds[0].revents & POLLIN) != 0) {
4628         res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4629         if (res == NXT_UNIT_AGAIN) {
4630             goto retry;
4631         }
4632 
4633         return res;
4634     }
4635 
4636     if ((fds[1].revents & POLLIN) != 0) {
4637         res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4638         if (res == NXT_UNIT_AGAIN) {
4639             goto retry;
4640         }
4641 
4642         return res;
4643     }
4644 
4645     nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]",
4646                    fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4647                    fds[1].revents);
4648 
4649     return NXT_UNIT_ERROR;
4650 }
4651 
4652 
4653 static int
4654 nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
4655 {
4656     int                  rc;
4657     nxt_queue_t          pending_rbuf;
4658     nxt_unit_ctx_impl_t  *ctx_impl;
4659     nxt_unit_read_buf_t  *rbuf;
4660 
4661     nxt_queue_init(&pending_rbuf);
4662 
4663     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4664 
4665     pthread_mutex_lock(&ctx_impl->mutex);
4666 
4667     if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
4668         pthread_mutex_unlock(&ctx_impl->mutex);
4669 
4670         return NXT_UNIT_OK;
4671     }
4672 
4673     nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf);
4674     nxt_queue_init(&ctx_impl->pending_rbuf);
4675 
4676     pthread_mutex_unlock(&ctx_impl->mutex);
4677 
4678     rc = NXT_UNIT_OK;
4679 
4680     nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
4681 
4682         if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
4683             rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf);
4684 
4685         } else {
4686             nxt_unit_read_buf_release(ctx, rbuf);
4687         }
4688 
4689     } nxt_queue_loop;
4690 
4691     return rc;
4692 }
4693 
4694 
4695 static void
4696 nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
4697 {
4698     int                           res;
4699     nxt_queue_t                   ready_req;
4700     nxt_unit_impl_t               *lib;
4701     nxt_unit_ctx_impl_t           *ctx_impl;
4702     nxt_unit_request_info_t       *req;
4703     nxt_unit_request_info_impl_t  *req_impl;
4704 
4705     nxt_queue_init(&ready_req);
4706 
4707     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4708 
4709     pthread_mutex_lock(&ctx_impl->mutex);
4710 
4711     if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
4712         pthread_mutex_unlock(&ctx_impl->mutex);
4713 
4714         return;
4715     }
4716 
4717     nxt_queue_add(&ready_req, &ctx_impl->ready_req);
4718     nxt_queue_init(&ctx_impl->ready_req);
4719 
4720     pthread_mutex_unlock(&ctx_impl->mutex);
4721 
4722     nxt_queue_each(req_impl, &ready_req,
4723                    nxt_unit_request_info_impl_t, port_wait_link)
4724     {
4725         lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
4726 
4727         req = &req_impl->req;
4728 
4729         res = nxt_unit_send_req_headers_ack(req);
4730         if (nxt_slow_path(res != NXT_UNIT_OK)) {
4731             nxt_unit_request_done(req, NXT_UNIT_ERROR);
4732 
4733             continue;
4734         }
4735 
4736         if (req->content_length
4737             > (uint64_t) (req->content_buf->end - req->content_buf->free))
4738         {
4739             res = nxt_unit_request_hash_add(ctx, req);
4740             if (nxt_slow_path(res != NXT_UNIT_OK)) {
4741                 nxt_unit_req_warn(req, "failed to add request to hash");
4742 
4743                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
4744 
4745                 continue;
4746             }
4747 
4748             /*
4749              * If application have separate data handler, we may start
4750              * request processing and process data when it is arrived.
4751              */
4752             if (lib->callbacks.data_handler == NULL) {
4753                 continue;
4754             }
4755         }
4756 
4757         lib->callbacks.request_handler(&req_impl->req);
4758 
4759     } nxt_queue_loop;
4760 }
4761 
4762 
4763 int
4764 nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
4765 {
4766     int                  rc;
4767     nxt_unit_read_buf_t  *rbuf;
4768     nxt_unit_ctx_impl_t  *ctx_impl;
4769 
4770     nxt_unit_ctx_use(ctx);
4771 
4772     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4773 
4774     rc = NXT_UNIT_OK;
4775 
4776     while (nxt_fast_path(ctx_impl->online)) {
4777         rbuf = nxt_unit_read_buf_get(ctx);
4778         if (nxt_slow_path(rbuf == NULL)) {
4779             rc = NXT_UNIT_ERROR;
4780             break;
4781         }
4782 
4783     retry:
4784 
4785         rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4786         if (rc == NXT_UNIT_AGAIN) {
4787             goto retry;
4788         }
4789 
4790         rc = nxt_unit_process_msg(ctx, rbuf);
4791         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4792             break;
4793         }
4794 
4795         rc = nxt_unit_process_pending_rbuf(ctx);
4796         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4797             break;
4798         }
4799 
4800         nxt_unit_process_ready_req(ctx);
4801     }
4802 
4803     nxt_unit_ctx_release(ctx);
4804 
4805     return rc;
4806 }
4807 
4808 
4809 nxt_inline int
4810 nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf)
4811 {
4812     nxt_port_msg_t  *port_msg;
4813 
4814     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4815         port_msg = (nxt_port_msg_t *) rbuf->buf;
4816 
4817         return port_msg->type == _NXT_PORT_MSG_READ_QUEUE;
4818     }
4819 
4820     return 0;
4821 }
4822 
4823 
4824 nxt_inline int
4825 nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf)
4826 {
4827     if (nxt_fast_path(rbuf->size == 1)) {
4828         return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET;
4829     }
4830 
4831     return 0;
4832 }
4833 
4834 
4835 nxt_inline int
4836 nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf)
4837 {
4838     nxt_port_msg_t  *port_msg;
4839 
4840     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4841         port_msg = (nxt_port_msg_t *) rbuf->buf;
4842 
4843         return port_msg->type == _NXT_PORT_MSG_SHM_ACK;
4844     }
4845 
4846     return 0;
4847 }
4848 
4849 
4850 nxt_inline int
4851 nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf)
4852 {
4853     nxt_port_msg_t  *port_msg;
4854 
4855     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4856         port_msg = (nxt_port_msg_t *) rbuf->buf;
4857 
4858         return port_msg->type == _NXT_PORT_MSG_QUIT;
4859     }
4860 
4861     return 0;
4862 }
4863 
4864 
4865 int
4866 nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
4867 {
4868     int                  rc;
4869     nxt_unit_impl_t      *lib;
4870     nxt_unit_read_buf_t  *rbuf;
4871     nxt_unit_ctx_impl_t  *ctx_impl;
4872 
4873     nxt_unit_ctx_use(ctx);
4874 
4875     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4876     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4877 
4878     rc = NXT_UNIT_OK;
4879 
4880     while (nxt_fast_path(ctx_impl->online)) {
4881         rbuf = nxt_unit_read_buf_get(ctx);
4882         if (nxt_slow_path(rbuf == NULL)) {
4883             rc = NXT_UNIT_ERROR;
4884             break;
4885         }
4886 
4887     retry:
4888 
4889         rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4890         if (rc == NXT_UNIT_AGAIN) {
4891             goto retry;
4892         }
4893 
4894         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4895             nxt_unit_read_buf_release(ctx, rbuf);
4896             break;
4897         }
4898 
4899         rc = nxt_unit_process_msg(ctx, rbuf);
4900         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4901             break;
4902         }
4903 
4904         rc = nxt_unit_process_pending_rbuf(ctx);
4905         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4906             break;
4907         }
4908 
4909         nxt_unit_process_ready_req(ctx);
4910     }
4911 
4912     nxt_unit_ctx_release(ctx);
4913 
4914     return rc;
4915 }
4916 
4917 
4918 int
4919 nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx)
4920 {
4921     nxt_unit_impl_t  *lib;
4922 
4923     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4924 
4925     return (ctx == &lib->main_ctx.ctx);
4926 }
4927 
4928 
4929 int
4930 nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
4931 {
4932     int  rc;
4933 
4934     nxt_unit_ctx_use(ctx);
4935 
4936     rc = nxt_unit_process_port_msg_impl(ctx, port);
4937 
4938     nxt_unit_ctx_release(ctx);
4939 
4940     return rc;
4941 }
4942 
4943 
4944 static int
4945 nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
4946 {
4947     int                  rc;
4948     nxt_unit_impl_t      *lib;
4949     nxt_unit_read_buf_t  *rbuf;
4950     nxt_unit_ctx_impl_t  *ctx_impl;
4951 
4952     rbuf = nxt_unit_read_buf_get(ctx);
4953     if (nxt_slow_path(rbuf == NULL)) {
4954         return NXT_UNIT_ERROR;
4955     }
4956 
4957     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4958     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4959 
4960 retry:
4961 
4962     if (port == lib->shared_port) {
4963         rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
4964 
4965     } else {
4966         rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
4967     }
4968 
4969     if (rc != NXT_UNIT_OK) {
4970         nxt_unit_read_buf_release(ctx, rbuf);
4971         return rc;
4972     }
4973 
4974     rc = nxt_unit_process_msg(ctx, rbuf);
4975     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4976         return NXT_UNIT_ERROR;
4977     }
4978 
4979     rc = nxt_unit_process_pending_rbuf(ctx);
4980     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4981         return NXT_UNIT_ERROR;
4982     }
4983 
4984     nxt_unit_process_ready_req(ctx);
4985 
4986     rbuf = nxt_unit_read_buf_get(ctx);
4987     if (nxt_slow_path(rbuf == NULL)) {
4988         return NXT_UNIT_ERROR;
4989     }
4990 
4991     if (ctx_impl->online) {
4992         goto retry;
4993     }
4994 
4995     return rc;
4996 }
4997 
4998 
4999 void
5000 nxt_unit_done(nxt_unit_ctx_t *ctx)
5001 {
5002     nxt_unit_ctx_release(ctx);
5003 }
5004 
5005 
5006 nxt_unit_ctx_t *
5007 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
5008 {
5009     int                   rc, queue_fd;
5010     void                  *mem;
5011     nxt_unit_impl_t       *lib;
5012     nxt_unit_port_t       *port;
5013     nxt_unit_ctx_impl_t   *new_ctx;
5014     nxt_unit_port_impl_t  *port_impl;
5015 
5016     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5017 
5018     new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t)
5019                                    + lib->request_data_size);
5020     if (nxt_slow_path(new_ctx == NULL)) {
5021         nxt_unit_alert(ctx, "failed to allocate context");
5022 
5023         return NULL;
5024     }
5025 
5026     rc = nxt_unit_ctx_init(lib, new_ctx, data);
5027     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5028          nxt_unit_free(ctx, new_ctx);
5029 
5030          return NULL;
5031     }
5032 
5033     queue_fd = -1;
5034 
5035     port = nxt_unit_create_port(&new_ctx->ctx);
5036     if (nxt_slow_path(port == NULL)) {
5037         goto fail;
5038     }
5039 
5040     new_ctx->read_port = port;
5041 
5042     queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t));
5043     if (nxt_slow_path(queue_fd == -1)) {
5044         goto fail;
5045     }
5046 
5047     mem = mmap(NULL, sizeof(nxt_port_queue_t),
5048                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
5049     if (nxt_slow_path(mem == MAP_FAILED)) {
5050         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
5051                        strerror(errno), errno);
5052 
5053         goto fail;
5054     }
5055 
5056     nxt_port_queue_init(mem);
5057 
5058     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5059     port_impl->queue = mem;
5060 
5061     rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd);
5062     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5063         goto fail;
5064     }
5065 
5066     nxt_unit_close(queue_fd);
5067 
5068     return &new_ctx->ctx;
5069 
5070 fail:
5071 
5072     if (queue_fd != -1) {
5073         nxt_unit_close(queue_fd);
5074     }
5075 
5076     nxt_unit_ctx_release(&new_ctx->ctx);
5077 
5078     return NULL;
5079 }
5080 
5081 
5082 static void
5083 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
5084 {
5085     nxt_unit_impl_t                  *lib;
5086     nxt_unit_mmap_buf_t              *mmap_buf;
5087     nxt_unit_read_buf_t              *rbuf;
5088     nxt_unit_request_info_impl_t     *req_impl;
5089     nxt_unit_websocket_frame_impl_t  *ws_impl;
5090 
5091     lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
5092 
5093     nxt_queue_each(req_impl, &ctx_impl->active_req,
5094                    nxt_unit_request_info_impl_t, link)
5095     {
5096         nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
5097 
5098         nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
5099 
5100     } nxt_queue_loop;
5101 
5102     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]);
5103     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]);
5104 
5105     while (ctx_impl->free_buf != NULL) {
5106         mmap_buf = ctx_impl->free_buf;
5107         nxt_unit_mmap_buf_unlink(mmap_buf);
5108         nxt_unit_free(&ctx_impl->ctx, mmap_buf);
5109     }
5110 
5111     nxt_queue_each(req_impl, &ctx_impl->free_req,
5112                    nxt_unit_request_info_impl_t, link)
5113     {
5114         nxt_unit_request_info_free(req_impl);
5115 
5116     } nxt_queue_loop;
5117 
5118     nxt_queue_each(ws_impl, &ctx_impl->free_ws,
5119                    nxt_unit_websocket_frame_impl_t, link)
5120     {
5121         nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl);
5122 
5123     } nxt_queue_loop;
5124 
5125     nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link)
5126     {
5127         if (rbuf != &ctx_impl->ctx_read_buf) {
5128             nxt_unit_free(&ctx_impl->ctx, rbuf);
5129         }
5130     } nxt_queue_loop;
5131 
5132     pthread_mutex_destroy(&ctx_impl->mutex);
5133 
5134     pthread_mutex_lock(&lib->mutex);
5135 
5136     nxt_queue_remove(&ctx_impl->link);
5137 
5138     pthread_mutex_unlock(&lib->mutex);
5139 
5140     if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5141         nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
5142         nxt_unit_port_release(ctx_impl->read_port);
5143     }
5144 
5145     if (ctx_impl != &lib->main_ctx) {
5146         nxt_unit_free(&lib->main_ctx.ctx, ctx_impl);
5147     }
5148 
5149     nxt_unit_lib_release(lib);
5150 }
5151 
5152 
5153 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
5154 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
5155 #define NXT_UNIX_SOCKET  SOCK_SEQPACKET
5156 #else
5157 #define NXT_UNIX_SOCKET  SOCK_DGRAM
5158 #endif
5159 
5160 
5161 void
5162 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
5163 {
5164     nxt_unit_port_hash_id_t  port_hash_id;
5165 
5166     port_hash_id.pid = pid;
5167     port_hash_id.id = id;
5168 
5169     port_id->pid = pid;
5170     port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
5171     port_id->id = id;
5172 }
5173 
5174 
5175 static nxt_unit_port_t *
5176 nxt_unit_create_port(nxt_unit_ctx_t *ctx)
5177 {
5178     int                 rc, port_sockets[2];
5179     nxt_unit_impl_t     *lib;
5180     nxt_unit_port_t     new_port, *port;
5181     nxt_unit_process_t  *process;
5182 
5183     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5184 
5185     rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
5186     if (nxt_slow_path(rc != 0)) {
5187         nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
5188                       strerror(errno), errno);
5189 
5190         return NULL;
5191     }
5192 
5193     nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
5194                    port_sockets[0], port_sockets[1]);
5195 
5196     pthread_mutex_lock(&lib->mutex);
5197 
5198     process = nxt_unit_process_get(ctx, lib->pid);
5199     if (nxt_slow_path(process == NULL)) {
5200         pthread_mutex_unlock(&lib->mutex);
5201 
5202         nxt_unit_close(port_sockets[0]);
5203         nxt_unit_close(port_sockets[1]);
5204 
5205         return NULL;
5206     }
5207 
5208     nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
5209 
5210     new_port.in_fd = port_sockets[0];
5211     new_port.out_fd = port_sockets[1];
5212     new_port.data = NULL;
5213 
5214     pthread_mutex_unlock(&lib->mutex);
5215 
5216     nxt_unit_process_release(process);
5217 
5218     port = nxt_unit_add_port(ctx, &new_port, NULL);
5219     if (nxt_slow_path(port == NULL)) {
5220         nxt_unit_close(port_sockets[0]);
5221         nxt_unit_close(port_sockets[1]);
5222     }
5223 
5224     return port;
5225 }
5226 
5227 
5228 static int
5229 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
5230     nxt_unit_port_t *port, int queue_fd)
5231 {
5232     ssize_t          res;
5233     nxt_unit_impl_t  *lib;
5234     int              fds[2] = { port->out_fd, queue_fd };
5235 
5236     struct {
5237         nxt_port_msg_t            msg;
5238         nxt_port_msg_new_port_t   new_port;
5239     } m;
5240 
5241     union {
5242         struct cmsghdr  cm;
5243         char            space[CMSG_SPACE(sizeof(int) * 2)];
5244     } cmsg;
5245 
5246     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5247 
5248     m.msg.stream = 0;
5249     m.msg.pid = lib->pid;
5250     m.msg.reply_port = 0;
5251     m.msg.type = _NXT_PORT_MSG_NEW_PORT;
5252     m.msg.last = 0;
5253     m.msg.mmap = 0;
5254     m.msg.nf = 0;
5255     m.msg.mf = 0;
5256     m.msg.tracking = 0;
5257 
5258     m.new_port.id = port->id.id;
5259     m.new_port.pid = port->id.pid;
5260     m.new_port.type = NXT_PROCESS_APP;
5261     m.new_port.max_size = 16 * 1024;
5262     m.new_port.max_share = 64 * 1024;
5263 
5264     memset(&cmsg, 0, sizeof(cmsg));
5265 
5266     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2);
5267     cmsg.cm.cmsg_level = SOL_SOCKET;
5268     cmsg.cm.cmsg_type = SCM_RIGHTS;
5269 
5270     /*
5271      * memcpy() is used instead of simple
5272      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
5273      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
5274      *   dereferencing type-punned pointer will break strict-aliasing rules
5275      *
5276      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
5277      * in the same simple assignment as in the code above.
5278      */
5279     memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2);
5280 
5281     res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
5282 
5283     return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
5284 }
5285 
5286 
5287 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port)
5288 {
5289     nxt_unit_port_impl_t  *port_impl;
5290 
5291     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5292 
5293     nxt_atomic_fetch_add(&port_impl->use_count, 1);
5294 }
5295 
5296 
5297 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
5298 {
5299     long                  c;
5300     nxt_unit_port_impl_t  *port_impl;
5301 
5302     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5303 
5304     c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
5305 
5306     if (c == 1) {
5307         nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d",
5308                        (int) port->id.pid, (int) port->id.id,
5309                        port->in_fd, port->out_fd);
5310 
5311         nxt_unit_process_release(port_impl->process);
5312 
5313         if (port->in_fd != -1) {
5314             nxt_unit_close(port->in_fd);
5315 
5316             port->in_fd = -1;
5317         }
5318 
5319         if (port->out_fd != -1) {
5320             nxt_unit_close(port->out_fd);
5321 
5322             port->out_fd = -1;
5323         }
5324 
5325         if (port_impl->queue != NULL) {
5326             munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID)
5327                                      ? sizeof(nxt_app_queue_t)
5328                                      : sizeof(nxt_port_queue_t));
5329         }
5330 
5331         nxt_unit_free(NULL, port_impl);
5332     }
5333 }
5334 
5335 
5336 static nxt_unit_port_t *
5337 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
5338 {
5339     int                           rc;
5340     nxt_queue_t                   awaiting_req;
5341     nxt_unit_impl_t               *lib;
5342     nxt_unit_port_t               *old_port;
5343     nxt_unit_process_t            *process;
5344     nxt_unit_ctx_impl_t           *ctx_impl;
5345     nxt_unit_port_impl_t          *new_port, *old_port_impl;
5346     nxt_unit_request_info_impl_t  *req_impl;
5347 
5348     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5349 
5350     pthread_mutex_lock(&lib->mutex);
5351 
5352     old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
5353 
5354     if (nxt_slow_path(old_port != NULL)) {
5355         nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} "
5356                             "in_fd %d out_fd %d queue %p",
5357                             port->id.pid, port->id.id,
5358                             port->in_fd, port->out_fd, queue);
5359 
5360         if (old_port->data == NULL) {
5361             old_port->data = port->data;
5362             port->data = NULL;
5363         }
5364 
5365         if (old_port->in_fd == -1) {
5366             old_port->in_fd = port->in_fd;
5367             port->in_fd = -1;
5368         }
5369 
5370         if (port->in_fd != -1) {
5371             nxt_unit_close(port->in_fd);
5372             port->in_fd = -1;
5373         }
5374 
5375         if (old_port->out_fd == -1) {
5376             old_port->out_fd = port->out_fd;
5377             port->out_fd = -1;
5378         }
5379 
5380         if (port->out_fd != -1) {
5381             nxt_unit_close(port->out_fd);
5382             port->out_fd = -1;
5383         }
5384 
5385         *port = *old_port;
5386 
5387         nxt_queue_init(&awaiting_req);
5388 
5389         old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
5390 
5391         if (old_port_impl->queue == NULL) {
5392             old_port_impl->queue = queue;
5393         }
5394 
5395         if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5396             nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5397             nxt_queue_init(&old_port_impl->awaiting_req);
5398         }
5399 
5400         old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1);
5401 
5402         pthread_mutex_unlock(&lib->mutex);
5403 
5404         if (lib->callbacks.add_port != NULL
5405             && (port->in_fd != -1 || port->out_fd != -1))
5406         {
5407             lib->callbacks.add_port(ctx, old_port);
5408         }
5409 
5410         nxt_queue_each(req_impl, &awaiting_req,
5411                        nxt_unit_request_info_impl_t, port_wait_link)
5412         {
5413             nxt_queue_remove(&req_impl->port_wait_link);
5414 
5415             ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
5416                                         ctx);
5417 
5418             pthread_mutex_lock(&ctx_impl->mutex);
5419 
5420             nxt_queue_insert_tail(&ctx_impl->ready_req,
5421                                   &req_impl->port_wait_link);
5422 
5423             pthread_mutex_unlock(&ctx_impl->mutex);
5424 
5425             nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
5426 
5427             nxt_unit_awake_ctx(ctx, ctx_impl);
5428 
5429         } nxt_queue_loop;
5430 
5431         return old_port;
5432     }
5433 
5434     new_port = NULL;
5435 
5436     nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
5437                    port->id.pid, port->id.id,
5438                    port->in_fd, port->out_fd, queue);
5439 
5440     process = nxt_unit_process_get(ctx, port->id.pid);
5441     if (nxt_slow_path(process == NULL)) {
5442         goto unlock;
5443     }
5444 
5445     if (port->id.id != NXT_UNIT_SHARED_PORT_ID
5446         && port->id.id >= process->next_port_id)
5447     {
5448         process->next_port_id = port->id.id + 1;
5449     }
5450 
5451     new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
5452     if (nxt_slow_path(new_port == NULL)) {
5453         nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
5454                        port->id.pid, port->id.id);
5455 
5456         goto unlock;
5457     }
5458 
5459     new_port->port = *port;
5460 
5461     rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
5462     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5463         nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
5464                        port->id.pid, port->id.id);
5465 
5466         nxt_unit_free(ctx, new_port);
5467 
5468         new_port = NULL;
5469 
5470         goto unlock;
5471     }
5472 
5473     nxt_queue_insert_tail(&process->ports, &new_port->link);
5474 
5475     new_port->use_count = 2;
5476     new_port->process = process;
5477     new_port->ready = (port->in_fd != -1 || port->out_fd != -1);
5478     new_port->queue = queue;
5479     new_port->from_socket = 0;
5480     new_port->socket_rbuf = NULL;
5481 
5482     nxt_queue_init(&new_port->awaiting_req);
5483 
5484     process = NULL;
5485 
5486 unlock:
5487 
5488     pthread_mutex_unlock(&lib->mutex);
5489 
5490     if (nxt_slow_path(process != NULL)) {
5491         nxt_unit_process_release(process);
5492     }
5493 
5494     if (lib->callbacks.add_port != NULL
5495         && new_port != NULL
5496         && (port->in_fd != -1 || port->out_fd != -1))
5497     {
5498         lib->callbacks.add_port(ctx, &new_port->port);
5499     }
5500 
5501     return &new_port->port;
5502 }
5503 
5504 
5505 static void
5506 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5507 {
5508     nxt_unit_port_t       *port;
5509     nxt_unit_port_impl_t  *port_impl;
5510 
5511     pthread_mutex_lock(&lib->mutex);
5512 
5513     port = nxt_unit_remove_port_unsafe(lib, port_id);
5514 
5515     if (nxt_fast_path(port != NULL)) {
5516         port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5517 
5518         nxt_queue_remove(&port_impl->link);
5519     }
5520 
5521     pthread_mutex_unlock(&lib->mutex);
5522 
5523     if (lib->callbacks.remove_port != NULL && port != NULL) {
5524         lib->callbacks.remove_port(&lib->unit, port);
5525     }
5526 
5527     if (nxt_fast_path(port != NULL)) {
5528         nxt_unit_port_release(port);
5529     }
5530 }
5531 
5532 
5533 static nxt_unit_port_t *
5534 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5535 {
5536     nxt_unit_port_t  *port;
5537 
5538     port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
5539     if (nxt_slow_path(port == NULL)) {
5540         nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found",
5541                        (int) port_id->pid, (int) port_id->id);
5542 
5543         return NULL;
5544     }
5545 
5546     nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p",
5547                    (int) port_id->pid, (int) port_id->id,
5548                    port->in_fd, port->out_fd, port->data);
5549 
5550     return port;
5551 }
5552 
5553 
5554 static void
5555 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid)
5556 {
5557     nxt_unit_process_t  *process;
5558 
5559     pthread_mutex_lock(&lib->mutex);
5560 
5561     process = nxt_unit_process_find(lib, pid, 1);
5562     if (nxt_slow_path(process == NULL)) {
5563         nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid);
5564 
5565         pthread_mutex_unlock(&lib->mutex);
5566 
5567         return;
5568     }
5569 
5570     nxt_unit_remove_process(lib, process);
5571 
5572     if (lib->callbacks.remove_pid != NULL) {
5573         lib->callbacks.remove_pid(&lib->unit, pid);
5574     }
5575 }
5576 
5577 
5578 static void
5579 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
5580 {
5581     nxt_queue_t           ports;
5582     nxt_unit_port_impl_t  *port;
5583 
5584     nxt_queue_init(&ports);
5585 
5586     nxt_queue_add(&ports, &process->ports);
5587 
5588     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5589 
5590         nxt_unit_remove_port_unsafe(lib, &port->port.id);
5591 
5592     } nxt_queue_loop;
5593 
5594     pthread_mutex_unlock(&lib->mutex);
5595 
5596     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5597 
5598         nxt_queue_remove(&port->link);
5599 
5600         if (lib->callbacks.remove_port != NULL) {
5601             lib->callbacks.remove_port(&lib->unit, &port->port);
5602         }
5603 
5604         nxt_unit_port_release(&port->port);
5605 
5606     } nxt_queue_loop;
5607 
5608     nxt_unit_process_release(process);
5609 }
5610 
5611 
5612 static void
5613 nxt_unit_quit(nxt_unit_ctx_t *ctx)
5614 {
5615     nxt_port_msg_t       msg;
5616     nxt_unit_impl_t      *lib;
5617     nxt_unit_ctx_impl_t  *ctx_impl;
5618 
5619     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5620     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5621 
5622     if (!ctx_impl->online) {
5623         return;
5624     }
5625 
5626     ctx_impl->online = 0;
5627 
5628     if (lib->callbacks.quit != NULL) {
5629         lib->callbacks.quit(ctx);
5630     }
5631 
5632     if (ctx != &lib->main_ctx.ctx) {
5633         return;
5634     }
5635 
5636     memset(&msg, 0, sizeof(nxt_port_msg_t));
5637 
5638     msg.pid = lib->pid;
5639     msg.type = _NXT_PORT_MSG_QUIT;
5640 
5641     pthread_mutex_lock(&lib->mutex);
5642 
5643     nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
5644 
5645         if (ctx == &ctx_impl->ctx
5646             || ctx_impl->read_port == NULL
5647             || ctx_impl->read_port->out_fd == -1)
5648         {
5649             continue;
5650         }
5651 
5652         (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
5653                                   &msg, sizeof(msg), NULL, 0);
5654 
5655     } nxt_queue_loop;
5656 
5657     pthread_mutex_unlock(&lib->mutex);
5658 }
5659 
5660 
5661 static int
5662 nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
5663 {
5664     ssize_t              res;
5665     nxt_unit_impl_t      *lib;
5666     nxt_unit_ctx_impl_t  *ctx_impl;
5667 
5668     struct {
5669         nxt_port_msg_t           msg;
5670         nxt_port_msg_get_port_t  get_port;
5671     } m;
5672 
5673     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5674     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5675 
5676     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
5677 
5678     m.msg.pid = lib->pid;
5679     m.msg.reply_port = ctx_impl->read_port->id.id;
5680     m.msg.type = _NXT_PORT_MSG_GET_PORT;
5681 
5682     m.get_port.id = port_id->id;
5683     m.get_port.pid = port_id->pid;
5684 
5685     nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
5686                    (int) port_id->id);
5687 
5688     res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
5689     if (nxt_slow_path(res != sizeof(m))) {
5690         return NXT_UNIT_ERROR;
5691     }
5692 
5693     return NXT_UNIT_OK;
5694 }
5695 
5696 
5697 static ssize_t
5698 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5699     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
5700 {
5701     int                   notify;
5702     ssize_t               ret;
5703     nxt_int_t             rc;
5704     nxt_port_msg_t        msg;
5705     nxt_unit_impl_t       *lib;
5706     nxt_unit_port_impl_t  *port_impl;
5707 
5708     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5709 
5710     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5711     if (port_impl->queue != NULL && oob_size == 0
5712         && buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
5713     {
5714         rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, &notify);
5715         if (nxt_slow_path(rc != NXT_OK)) {
5716             nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5717                            (int) port->id.pid, (int) port->id.id);
5718 
5719             return -1;
5720         }
5721 
5722         nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d",
5723                        (int) port->id.pid, (int) port->id.id,
5724                        (int) buf_size, notify);
5725 
5726         if (notify) {
5727             memcpy(&msg, buf, sizeof(nxt_port_msg_t));
5728 
5729             msg.type = _NXT_PORT_MSG_READ_QUEUE;
5730 
5731             if (lib->callbacks.port_send == NULL) {
5732                 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
5733                                        sizeof(nxt_port_msg_t), NULL, 0);
5734 
5735                 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
5736                                (int) port->id.pid, (int) port->id.id,
5737                                (int) ret);
5738 
5739             } else {
5740                 ret = lib->callbacks.port_send(ctx, port, &msg,
5741                                                sizeof(nxt_port_msg_t), NULL, 0);
5742 
5743                 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue",
5744                                (int) port->id.pid, (int) port->id.id,
5745                                (int) ret);
5746             }
5747 
5748         }
5749 
5750         return buf_size;
5751     }
5752 
5753     if (port_impl->queue != NULL) {
5754         msg.type = _NXT_PORT_MSG_READ_SOCKET;
5755 
5756         rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, &notify);
5757         if (nxt_slow_path(rc != NXT_OK)) {
5758             nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5759                            (int) port->id.pid, (int) port->id.id);
5760 
5761             return -1;
5762         }
5763 
5764         nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d",
5765                        (int) port->id.pid, (int) port->id.id, notify);
5766     }
5767 
5768     if (lib->callbacks.port_send != NULL) {
5769         ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
5770                                        oob, oob_size);
5771 
5772         nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
5773                        (int) port->id.pid, (int) port->id.id,
5774                        (int) ret);
5775 
5776     } else {
5777         ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
5778                                oob, oob_size);
5779 
5780         nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
5781                        (int) port->id.pid, (int) port->id.id,
5782                        (int) ret);
5783     }
5784 
5785     return ret;
5786 }
5787 
5788 
5789 static ssize_t
5790 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
5791     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
5792 {
5793     int            err;
5794     ssize_t        res;
5795     struct iovec   iov[1];
5796     struct msghdr  msg;
5797 
5798     iov[0].iov_base = (void *) buf;
5799     iov[0].iov_len = buf_size;
5800 
5801     msg.msg_name = NULL;
5802     msg.msg_namelen = 0;
5803     msg.msg_iov = iov;
5804     msg.msg_iovlen = 1;
5805     msg.msg_flags = 0;
5806     msg.msg_control = (void *) oob;
5807     msg.msg_controllen = oob_size;
5808 
5809 retry:
5810 
5811     res = sendmsg(fd, &msg, 0);
5812 
5813     if (nxt_slow_path(res == -1)) {
5814         err = errno;
5815 
5816         if (err == EINTR) {
5817             goto retry;
5818         }
5819 
5820         /*
5821          * FIXME: This should be "alert" after router graceful shutdown
5822          * implementation.
5823          */
5824         nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
5825                       fd, (int) buf_size, strerror(err), err);
5826 
5827     } else {
5828         nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
5829                        (int) res);
5830     }
5831 
5832     return res;
5833 }
5834 
5835 
5836 static int
5837 nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5838     nxt_unit_read_buf_t *rbuf)
5839 {
5840     int                   res, read;
5841     nxt_unit_port_impl_t  *port_impl;
5842 
5843     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5844 
5845     read = 0;
5846 
5847 retry:
5848 
5849     if (port_impl->from_socket > 0) {
5850         if (port_impl->socket_rbuf != NULL
5851             && port_impl->socket_rbuf->size > 0)
5852         {
5853             port_impl->from_socket--;
5854 
5855             nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf);
5856             port_impl->socket_rbuf->size = 0;
5857 
5858             nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d",
5859                            (int) port->id.pid, (int) port->id.id,
5860                            (int) rbuf->size);
5861 
5862             return NXT_UNIT_OK;
5863         }
5864 
5865     } else {
5866         res = nxt_unit_port_queue_recv(port, rbuf);
5867 
5868         if (res == NXT_UNIT_OK) {
5869             if (nxt_unit_is_read_socket(rbuf)) {
5870                 port_impl->from_socket++;
5871 
5872                 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
5873                                (int) port->id.pid, (int) port->id.id,
5874                                port_impl->from_socket);
5875 
5876                 goto retry;
5877             }
5878 
5879             nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
5880                            (int) port->id.pid, (int) port->id.id,
5881                            (int) rbuf->size);
5882 
5883             return NXT_UNIT_OK;
5884         }
5885     }
5886 
5887     if (read) {
5888         return NXT_UNIT_AGAIN;
5889     }
5890 
5891     res = nxt_unit_port_recv(ctx, port, rbuf);
5892     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
5893         return NXT_UNIT_ERROR;
5894     }
5895 
5896     read = 1;
5897 
5898     if (nxt_unit_is_read_queue(rbuf)) {
5899         nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
5900                        (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
5901 
5902         goto retry;
5903     }
5904 
5905     nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d",
5906                    (int) port->id.pid, (int) port->id.id,
5907                    (int) rbuf->size);
5908 
5909     if (res == NXT_UNIT_AGAIN) {
5910         return NXT_UNIT_AGAIN;
5911     }
5912 
5913     if (port_impl->from_socket > 0) {
5914         port_impl->from_socket--;
5915 
5916         return NXT_UNIT_OK;
5917     }
5918 
5919     nxt_unit_debug(ctx, "port{%d,%d} suspend message %d",
5920                    (int) port->id.pid, (int) port->id.id,
5921                    (int) rbuf->size);
5922 
5923     if (port_impl->socket_rbuf == NULL) {
5924         port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx);
5925 
5926         if (nxt_slow_path(port_impl->socket_rbuf == NULL)) {
5927             return NXT_UNIT_ERROR;
5928         }
5929 
5930         port_impl->socket_rbuf->size = 0;
5931     }
5932 
5933     if (port_impl->socket_rbuf->size > 0) {
5934         nxt_unit_alert(ctx, "too many port socket messages");
5935 
5936         return NXT_UNIT_ERROR;
5937     }
5938 
5939     nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
5940 
5941     memset(rbuf->oob, 0, sizeof(struct cmsghdr));
5942 
5943     goto retry;
5944 }
5945 
5946 
5947 nxt_inline void
5948 nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
5949 {
5950     memcpy(dst->buf, src->buf, src->size);
5951     dst->size = src->size;
5952     memcpy(dst->oob, src->oob, sizeof(src->oob));
5953 }
5954 
5955 
5956 static int
5957 nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5958     nxt_unit_read_buf_t *rbuf)
5959 {
5960     int  res;
5961 
5962 retry:
5963 
5964     res = nxt_unit_app_queue_recv(port, rbuf);
5965 
5966     if (res == NXT_UNIT_AGAIN) {
5967         res = nxt_unit_port_recv(ctx, port, rbuf);
5968         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
5969             return NXT_UNIT_ERROR;
5970         }
5971 
5972         if (nxt_unit_is_read_queue(rbuf)) {
5973             nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
5974                            (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
5975 
5976             goto retry;
5977         }
5978     }
5979 
5980     return res;
5981 }
5982 
5983 
5984 static int
5985 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5986     nxt_unit_read_buf_t *rbuf)
5987 {
5988     int              fd, err;
5989     struct iovec     iov[1];
5990     struct msghdr    msg;
5991     nxt_unit_impl_t  *lib;
5992 
5993     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5994 
5995     if (lib->callbacks.port_recv != NULL) {
5996         rbuf->size = lib->callbacks.port_recv(ctx, port,
5997                                               rbuf->buf, sizeof(rbuf->buf),
5998                                               rbuf->oob, sizeof(rbuf->oob));
5999 
6000         nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
6001                        (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6002 
6003         if (nxt_slow_path(rbuf->size < 0)) {
6004             return NXT_UNIT_ERROR;
6005         }
6006 
6007         return NXT_UNIT_OK;
6008     }
6009 
6010     iov[0].iov_base = rbuf->buf;
6011     iov[0].iov_len = sizeof(rbuf->buf);
6012 
6013     msg.msg_name = NULL;
6014     msg.msg_namelen = 0;
6015     msg.msg_iov = iov;
6016     msg.msg_iovlen = 1;
6017     msg.msg_flags = 0;
6018     msg.msg_control = rbuf->oob;
6019     msg.msg_controllen = sizeof(rbuf->oob);
6020 
6021     fd = port->in_fd;
6022 
6023 retry:
6024 
6025     rbuf->size = recvmsg(fd, &msg, 0);
6026 
6027     if (nxt_slow_path(rbuf->size == -1)) {
6028         err = errno;
6029 
6030         if (err == EINTR) {
6031             goto retry;
6032         }
6033 
6034         if (err == EAGAIN) {
6035             nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
6036                            fd, strerror(err), err);
6037 
6038             return NXT_UNIT_AGAIN;
6039         }
6040 
6041         nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
6042                        fd, strerror(err), err);
6043 
6044         return NXT_UNIT_ERROR;
6045     }
6046 
6047     nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
6048 
6049     return NXT_UNIT_OK;
6050 }
6051 
6052 
6053 static int
6054 nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
6055 {
6056     nxt_unit_port_impl_t  *port_impl;
6057 
6058     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6059 
6060     rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
6061 
6062     return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6063 }
6064 
6065 
6066 static int
6067 nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
6068 {
6069     uint32_t              cookie;
6070     nxt_port_msg_t        *port_msg;
6071     nxt_app_queue_t       *queue;
6072     nxt_unit_port_impl_t  *port_impl;
6073 
6074     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6075     queue = port_impl->queue;
6076 
6077 retry:
6078 
6079     rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
6080 
6081     nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
6082 
6083     if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
6084         port_msg = (nxt_port_msg_t *) rbuf->buf;
6085 
6086         if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
6087             return NXT_UNIT_OK;
6088         }
6089 
6090         nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
6091 
6092         goto retry;
6093     }
6094 
6095     return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6096 }
6097 
6098 
6099 nxt_inline int
6100 nxt_unit_close(int fd)
6101 {
6102     int  res;
6103 
6104     res = close(fd);
6105 
6106     if (nxt_slow_path(res == -1)) {
6107         nxt_unit_alert(NULL, "close(%d) failed: %s (%d)",
6108                        fd, strerror(errno), errno);
6109 
6110     } else {
6111         nxt_unit_debug(NULL, "close(%d): %d", fd, res);
6112     }
6113 
6114     return res;
6115 }
6116 
6117 
6118 static int
6119 nxt_unit_fd_blocking(int fd)
6120 {
6121     int  nb;
6122 
6123     nb = 0;
6124 
6125     if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) {
6126         nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
6127                        fd, strerror(errno), errno);
6128 
6129         return NXT_UNIT_ERROR;
6130     }
6131 
6132     return NXT_UNIT_OK;
6133 }
6134 
6135 
6136 static nxt_int_t
6137 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6138 {
6139     nxt_unit_port_t          *port;
6140     nxt_unit_port_hash_id_t  *port_id;
6141 
6142     port = data;
6143     port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
6144 
6145     if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
6146         && port_id->pid == port->id.pid
6147         && port_id->id == port->id.id)
6148     {
6149         return NXT_OK;
6150     }
6151 
6152     return NXT_DECLINED;
6153 }
6154 
6155 
6156 static const nxt_lvlhsh_proto_t  lvlhsh_ports_proto  nxt_aligned(64) = {
6157     NXT_LVLHSH_DEFAULT,
6158     nxt_unit_port_hash_test,
6159     nxt_unit_lvlhsh_alloc,
6160     nxt_unit_lvlhsh_free,
6161 };
6162 
6163 
6164 static inline void
6165 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
6166     nxt_unit_port_hash_id_t *port_hash_id,
6167     nxt_unit_port_id_t *port_id)
6168 {
6169     port_hash_id->pid = port_id->pid;
6170     port_hash_id->id = port_id->id;
6171 
6172     if (nxt_fast_path(port_id->hash != 0)) {
6173         lhq->key_hash = port_id->hash;
6174 
6175     } else {
6176         lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
6177 
6178         port_id->hash = lhq->key_hash;
6179 
6180         nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
6181                        (int) port_id->pid, (int) port_id->id,
6182                        (int) port_id->hash);
6183     }
6184 
6185     lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
6186     lhq->key.start = (u_char *) port_hash_id;
6187     lhq->proto = &lvlhsh_ports_proto;
6188     lhq->pool = NULL;
6189 }
6190 
6191 
6192 static int
6193 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
6194 {
6195     nxt_int_t                res;
6196     nxt_lvlhsh_query_t       lhq;
6197     nxt_unit_port_hash_id_t  port_hash_id;
6198 
6199     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
6200     lhq.replace = 0;
6201     lhq.value = port;
6202 
6203     res = nxt_lvlhsh_insert(port_hash, &lhq);
6204 
6205     switch (res) {
6206 
6207     case NXT_OK:
6208         return NXT_UNIT_OK;
6209 
6210     default:
6211         return NXT_UNIT_ERROR;
6212     }
6213 }
6214 
6215 
6216 static nxt_unit_port_t *
6217 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
6218     int remove)
6219 {
6220     nxt_int_t                res;
6221     nxt_lvlhsh_query_t       lhq;
6222     nxt_unit_port_hash_id_t  port_hash_id;
6223 
6224     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
6225 
6226     if (remove) {
6227         res = nxt_lvlhsh_delete(port_hash, &lhq);
6228 
6229     } else {
6230         res = nxt_lvlhsh_find(port_hash, &lhq);
6231     }
6232 
6233     switch (res) {
6234 
6235     case NXT_OK:
6236         if (!remove) {
6237             nxt_unit_port_use(lhq.value);
6238         }
6239 
6240         return lhq.value;
6241 
6242     default:
6243         return NULL;
6244     }
6245 }
6246 
6247 
6248 static nxt_int_t
6249 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6250 {
6251     return NXT_OK;
6252 }
6253 
6254 
6255 static const nxt_lvlhsh_proto_t  lvlhsh_requests_proto  nxt_aligned(64) = {
6256     NXT_LVLHSH_DEFAULT,
6257     nxt_unit_request_hash_test,
6258     nxt_unit_lvlhsh_alloc,
6259     nxt_unit_lvlhsh_free,
6260 };
6261 
6262 
6263 static int
6264 nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
6265     nxt_unit_request_info_t *req)
6266 {
6267     uint32_t                      *stream;
6268     nxt_int_t                     res;
6269     nxt_lvlhsh_query_t            lhq;
6270     nxt_unit_ctx_impl_t           *ctx_impl;
6271     nxt_unit_request_info_impl_t  *req_impl;
6272 
6273     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6274     if (req_impl->in_hash) {
6275         return NXT_UNIT_OK;
6276     }
6277 
6278     stream = &req_impl->stream;
6279 
6280     lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
6281     lhq.key.length = sizeof(*stream);
6282     lhq.key.start = (u_char *) stream;
6283     lhq.proto = &lvlhsh_requests_proto;
6284     lhq.pool = NULL;
6285     lhq.replace = 0;
6286     lhq.value = req_impl;
6287 
6288     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6289 
6290     pthread_mutex_lock(&ctx_impl->mutex);
6291 
6292     res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq);
6293 
6294     pthread_mutex_unlock(&ctx_impl->mutex);
6295 
6296     switch (res) {
6297 
6298     case NXT_OK:
6299         req_impl->in_hash = 1;
6300         return NXT_UNIT_OK;
6301 
6302     default:
6303         return NXT_UNIT_ERROR;
6304     }
6305 }
6306 
6307 
6308 static nxt_unit_request_info_t *
6309 nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
6310 {
6311     nxt_int_t                     res;
6312     nxt_lvlhsh_query_t            lhq;
6313     nxt_unit_ctx_impl_t           *ctx_impl;
6314     nxt_unit_request_info_impl_t  *req_impl;
6315 
6316     lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
6317     lhq.key.length = sizeof(stream);
6318     lhq.key.start = (u_char *) &stream;
6319     lhq.proto = &lvlhsh_requests_proto;
6320     lhq.pool = NULL;
6321 
6322     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6323 
6324     pthread_mutex_lock(&ctx_impl->mutex);
6325 
6326     if (remove) {
6327         res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
6328 
6329     } else {
6330         res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq);
6331     }
6332 
6333     pthread_mutex_unlock(&ctx_impl->mutex);
6334 
6335     switch (res) {
6336 
6337     case NXT_OK:
6338         req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
6339                                     req);
6340         if (remove) {
6341             req_impl->in_hash = 0;
6342         }
6343 
6344         return lhq.value;
6345 
6346     default:
6347         return NULL;
6348     }
6349 }
6350 
6351 
6352 void
6353 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
6354 {
6355     int              log_fd, n;
6356     char             msg[NXT_MAX_ERROR_STR], *p, *end;
6357     pid_t            pid;
6358     va_list          ap;
6359     nxt_unit_impl_t  *lib;
6360 
6361     if (nxt_fast_path(ctx != NULL)) {
6362         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6363 
6364         pid = lib->pid;
6365         log_fd = lib->log_fd;
6366 
6367     } else {
6368         pid = getpid();
6369         log_fd = STDERR_FILENO;
6370     }
6371 
6372     p = msg;
6373     end = p + sizeof(msg) - 1;
6374 
6375     p = nxt_unit_snprint_prefix(p, end, pid, level);
6376 
6377     va_start(ap, fmt);
6378     p += vsnprintf(p, end - p, fmt, ap);
6379     va_end(ap);
6380 
6381     if (nxt_slow_path(p > end)) {
6382         memcpy(end - 5, "[...]", 5);
6383         p = end;
6384     }
6385 
6386     *p++ = '\n';
6387 
6388     n = write(log_fd, msg, p - msg);
6389     if (nxt_slow_path(n < 0)) {
6390         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6391     }
6392 }
6393 
6394 
6395 void
6396 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
6397 {
6398     int                           log_fd, n;
6399     char                          msg[NXT_MAX_ERROR_STR], *p, *end;
6400     pid_t                         pid;
6401     va_list                       ap;
6402     nxt_unit_impl_t               *lib;
6403     nxt_unit_request_info_impl_t  *req_impl;
6404 
6405     if (nxt_fast_path(req != NULL)) {
6406         lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
6407 
6408         pid = lib->pid;
6409         log_fd = lib->log_fd;
6410 
6411     } else {
6412         pid = getpid();
6413         log_fd = STDERR_FILENO;
6414     }
6415 
6416     p = msg;
6417     end = p + sizeof(msg) - 1;
6418 
6419     p = nxt_unit_snprint_prefix(p, end, pid, level);
6420 
6421     if (nxt_fast_path(req != NULL)) {
6422         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6423 
6424         p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
6425     }
6426 
6427     va_start(ap, fmt);
6428     p += vsnprintf(p, end - p, fmt, ap);
6429     va_end(ap);
6430 
6431     if (nxt_slow_path(p > end)) {
6432         memcpy(end - 5, "[...]", 5);
6433         p = end;
6434     }
6435 
6436     *p++ = '\n';
6437 
6438     n = write(log_fd, msg, p - msg);
6439     if (nxt_slow_path(n < 0)) {
6440         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6441     }
6442 }
6443 
6444 
6445 static const char * nxt_unit_log_levels[] = {
6446     "alert",
6447     "error",
6448     "warn",
6449     "notice",
6450     "info",
6451     "debug",
6452 };
6453 
6454 
6455 static char *
6456 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
6457 {
6458     struct tm        tm;
6459     struct timespec  ts;
6460 
6461     (void) clock_gettime(CLOCK_REALTIME, &ts);
6462 
6463 #if (NXT_HAVE_LOCALTIME_R)
6464     (void) localtime_r(&ts.tv_sec, &tm);
6465 #else
6466     tm = *localtime(&ts.tv_sec);
6467 #endif
6468 
6469 #if (NXT_DEBUG)
6470     p += snprintf(p, end - p,
6471                   "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
6472                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6473                   tm.tm_hour, tm.tm_min, tm.tm_sec,
6474                   (int) ts.tv_nsec / 1000000);
6475 #else
6476     p += snprintf(p, end - p,
6477                   "%4d/%02d/%02d %02d:%02d:%02d ",
6478                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6479                   tm.tm_hour, tm.tm_min, tm.tm_sec);
6480 #endif
6481 
6482     p += snprintf(p, end - p,
6483                   "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
6484                   (int) pid,
6485                   (uint64_t) (uintptr_t) nxt_thread_get_tid());
6486 
6487     return p;
6488 }
6489 
6490 
6491 static void *
6492 nxt_unit_lvlhsh_alloc(void *data, size_t size)
6493 {
6494     int   err;
6495     void  *p;
6496 
6497     err = posix_memalign(&p, size, size);
6498 
6499     if (nxt_fast_path(err == 0)) {
6500         nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p",
6501                        (int) size, (int) size, p);
6502         return p;
6503     }
6504 
6505     nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)",
6506                    (int) size, (int) size, strerror(err), err);
6507     return NULL;
6508 }
6509 
6510 
6511 static void
6512 nxt_unit_lvlhsh_free(void *data, void *p)
6513 {
6514     nxt_unit_free(NULL, p);
6515 }
6516 
6517 
6518 void *
6519 nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size)
6520 {
6521     void  *p;
6522 
6523     p = malloc(size);
6524 
6525     if (nxt_fast_path(p != NULL)) {
6526         nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p);
6527 
6528     } else {
6529         nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)",
6530                        (int) size, strerror(errno), errno);
6531     }
6532 
6533     return p;
6534 }
6535 
6536 
6537 void
6538 nxt_unit_free(nxt_unit_ctx_t *ctx, void *p)
6539 {
6540     nxt_unit_debug(ctx, "free(%p)", p);
6541 
6542     free(p);
6543 }
6544 
6545 
6546 static int
6547 nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length)
6548 {
6549     u_char        c1, c2;
6550     nxt_int_t     n;
6551     const u_char  *s1, *s2;
6552 
6553     s1 = p1;
6554     s2 = p2;
6555 
6556     while (length-- != 0) {
6557         c1 = *s1++;
6558         c2 = *s2++;
6559 
6560         c1 = nxt_lowcase(c1);
6561         c2 = nxt_lowcase(c2);
6562 
6563         n = c1 - c2;
6564 
6565         if (n != 0) {
6566             return n;
6567         }
6568     }
6569 
6570     return 0;
6571 }
6572