xref: /unit/src/nxt_unit.c (revision 1712:bbd7893e9ce1)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <stdlib.h>
7 
8 #include "nxt_main.h"
9 #include "nxt_port_memory_int.h"
10 #include "nxt_port_queue.h"
11 #include "nxt_app_queue.h"
12 
13 #include "nxt_unit.h"
14 #include "nxt_unit_request.h"
15 #include "nxt_unit_response.h"
16 #include "nxt_unit_websocket.h"
17 
18 #include "nxt_websocket.h"
19 
20 #if (NXT_HAVE_MEMFD_CREATE)
21 #include <linux/memfd.h>
22 #endif
23 
24 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
25 #define NXT_UNIT_LOCAL_BUF_SIZE  \
26     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
27 
28 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
29 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
30 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
31 typedef struct nxt_unit_process_s               nxt_unit_process_t;
32 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
33 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
34 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
35 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
36 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
37 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
38 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
39 
40 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
41 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
42     nxt_unit_ctx_impl_t *ctx_impl, void *data);
43 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
44 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
45 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
46 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
47 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
48     nxt_unit_mmap_buf_t *mmap_buf);
49 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
50     nxt_unit_mmap_buf_t *mmap_buf);
51 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
52 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
53     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
54     int *log_fd, uint32_t *stream, uint32_t *shm_limit);
55 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
56     int queue_fd);
57 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
58 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
59     nxt_unit_recv_msg_t *recv_msg);
60 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
61 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
62     nxt_unit_recv_msg_t *recv_msg);
63 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
64     nxt_unit_recv_msg_t *recv_msg);
65 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
66     nxt_unit_port_id_t *port_id);
67 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
68 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
69     nxt_unit_recv_msg_t *recv_msg);
70 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
71 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
72     nxt_unit_ctx_t *ctx);
73 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
74 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
75 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
76     nxt_unit_ctx_t *ctx);
77 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
78 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
79     nxt_unit_websocket_frame_impl_t *ws);
80 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
81 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
82 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
83     nxt_unit_mmap_buf_t *mmap_buf, int last);
84 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
85 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
86 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
87 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
88     nxt_unit_ctx_impl_t *ctx_impl);
89 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
90     nxt_unit_read_buf_t *rbuf);
91 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
92     nxt_unit_request_info_t *req, size_t size);
93 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
94     size_t size);
95 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
96     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
97 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
98 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
99 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
100 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
101     nxt_unit_port_t *port, int n);
102 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
103 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
104     int fd);
105 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
106     nxt_unit_port_t *port, uint32_t size,
107     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
108 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
109 
110 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
111     nxt_unit_ctx_impl_t *ctx_impl);
112 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
113 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
114 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
115 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
116 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
117     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
118     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
119 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
120     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
121 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
122 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
123     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
124 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
125 
126 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
127 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
128     pid_t pid, int remove);
129 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
130 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
131 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
132 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
133 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
134 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
135 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
136 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
137 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
138 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
139     nxt_unit_port_t *port);
140 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
141 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
142 
143 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
144     nxt_unit_port_t *port, int queue_fd);
145 
146 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
147 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
148 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
149     nxt_unit_port_t *port, void *queue);
150 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
151     nxt_queue_t *awaiting_req);
152 static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
153     nxt_unit_port_id_t *port_id);
154 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
155     nxt_unit_port_id_t *port_id);
156 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
157 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
158     nxt_unit_process_t *process);
159 static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
160 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
161 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
162     nxt_unit_port_t *port, const void *buf, size_t buf_size,
163     const void *oob, size_t oob_size);
164 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
165     const void *buf, size_t buf_size, const void *oob, size_t oob_size);
166 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
167     nxt_unit_read_buf_t *rbuf);
168 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
169     nxt_unit_read_buf_t *src);
170 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
171     nxt_unit_read_buf_t *rbuf);
172 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
173     nxt_unit_read_buf_t *rbuf);
174 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
175     nxt_unit_read_buf_t *rbuf);
176 static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
177     nxt_unit_read_buf_t *rbuf);
178 nxt_inline int nxt_unit_close(int fd);
179 static int nxt_unit_fd_blocking(int fd);
180 
181 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
182     nxt_unit_port_t *port);
183 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
184     nxt_unit_port_id_t *port_id, int remove);
185 
186 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
187     nxt_unit_request_info_t *req);
188 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
189     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
190 
191 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
192 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
193 static void nxt_unit_lvlhsh_free(void *data, void *p);
194 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
195 
196 
197 struct nxt_unit_mmap_buf_s {
198     nxt_unit_buf_t           buf;
199 
200     nxt_unit_mmap_buf_t      *next;
201     nxt_unit_mmap_buf_t      **prev;
202 
203     nxt_port_mmap_header_t   *hdr;
204     nxt_unit_request_info_t  *req;
205     nxt_unit_ctx_impl_t      *ctx_impl;
206     char                     *free_ptr;
207     char                     *plain_ptr;
208 };
209 
210 
211 struct nxt_unit_recv_msg_s {
212     uint32_t                 stream;
213     nxt_pid_t                pid;
214     nxt_port_id_t            reply_port;
215 
216     uint8_t                  last;      /* 1 bit */
217     uint8_t                  mmap;      /* 1 bit */
218 
219     void                     *start;
220     uint32_t                 size;
221 
222     int                      fd[2];
223 
224     nxt_unit_mmap_buf_t      *incoming_buf;
225 };
226 
227 
228 typedef enum {
229     NXT_UNIT_RS_START           = 0,
230     NXT_UNIT_RS_RESPONSE_INIT,
231     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
232     NXT_UNIT_RS_RESPONSE_SENT,
233     NXT_UNIT_RS_RELEASED,
234 } nxt_unit_req_state_t;
235 
236 
237 struct nxt_unit_request_info_impl_s {
238     nxt_unit_request_info_t  req;
239 
240     uint32_t                 stream;
241 
242     nxt_unit_mmap_buf_t      *outgoing_buf;
243     nxt_unit_mmap_buf_t      *incoming_buf;
244 
245     nxt_unit_req_state_t     state;
246     uint8_t                  websocket;
247     uint8_t                  in_hash;
248 
249     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
250     nxt_queue_link_t         link;
251     /*  for nxt_unit_port_impl_t.awaiting_req */
252     nxt_queue_link_t         port_wait_link;
253 
254     char                     extra_data[];
255 };
256 
257 
258 struct nxt_unit_websocket_frame_impl_s {
259     nxt_unit_websocket_frame_t  ws;
260 
261     nxt_unit_mmap_buf_t         *buf;
262 
263     nxt_queue_link_t            link;
264 
265     nxt_unit_ctx_impl_t         *ctx_impl;
266 };
267 
268 
269 struct nxt_unit_read_buf_s {
270     nxt_queue_link_t              link;
271     nxt_unit_ctx_impl_t           *ctx_impl;
272     ssize_t                       size;
273     char                          buf[16384];
274     char                          oob[256];
275 };
276 
277 
278 struct nxt_unit_ctx_impl_s {
279     nxt_unit_ctx_t                ctx;
280 
281     nxt_atomic_t                  use_count;
282     nxt_atomic_t                  wait_items;
283 
284     pthread_mutex_t               mutex;
285 
286     nxt_unit_port_t               *read_port;
287 
288     nxt_queue_link_t              link;
289 
290     nxt_unit_mmap_buf_t           *free_buf;
291 
292     /*  of nxt_unit_request_info_impl_t */
293     nxt_queue_t                   free_req;
294 
295     /*  of nxt_unit_websocket_frame_impl_t */
296     nxt_queue_t                   free_ws;
297 
298     /*  of nxt_unit_request_info_impl_t */
299     nxt_queue_t                   active_req;
300 
301     /*  of nxt_unit_request_info_impl_t */
302     nxt_lvlhsh_t                  requests;
303 
304     /*  of nxt_unit_request_info_impl_t */
305     nxt_queue_t                   ready_req;
306 
307     /*  of nxt_unit_read_buf_t */
308     nxt_queue_t                   pending_rbuf;
309 
310     /*  of nxt_unit_read_buf_t */
311     nxt_queue_t                   free_rbuf;
312 
313     int                           online;
314     int                           ready;
315 
316     nxt_unit_mmap_buf_t           ctx_buf[2];
317     nxt_unit_read_buf_t           ctx_read_buf;
318 
319     nxt_unit_request_info_impl_t  req;
320 };
321 
322 
323 struct nxt_unit_mmap_s {
324     nxt_port_mmap_header_t   *hdr;
325     pthread_t                src_thread;
326 
327     /*  of nxt_unit_read_buf_t */
328     nxt_queue_t              awaiting_rbuf;
329 };
330 
331 
332 struct nxt_unit_mmaps_s {
333     pthread_mutex_t          mutex;
334     uint32_t                 size;
335     uint32_t                 cap;
336     nxt_atomic_t             allocated_chunks;
337     nxt_unit_mmap_t          *elts;
338 };
339 
340 
341 struct nxt_unit_impl_s {
342     nxt_unit_t               unit;
343     nxt_unit_callbacks_t     callbacks;
344 
345     nxt_atomic_t             use_count;
346 
347     uint32_t                 request_data_size;
348     uint32_t                 shm_mmap_limit;
349 
350     pthread_mutex_t          mutex;
351 
352     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
353     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
354 
355     nxt_unit_port_t          *router_port;
356     nxt_unit_port_t          *shared_port;
357 
358     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
359 
360     nxt_unit_mmaps_t         incoming;
361     nxt_unit_mmaps_t         outgoing;
362 
363     pid_t                    pid;
364     int                      log_fd;
365 
366     nxt_unit_ctx_impl_t      main_ctx;
367 };
368 
369 
370 struct nxt_unit_port_impl_s {
371     nxt_unit_port_t          port;
372 
373     nxt_atomic_t             use_count;
374 
375     /*  for nxt_unit_process_t.ports */
376     nxt_queue_link_t         link;
377     nxt_unit_process_t       *process;
378 
379     /*  of nxt_unit_request_info_impl_t */
380     nxt_queue_t              awaiting_req;
381 
382     int                      ready;
383 
384     void                     *queue;
385 
386     int                      from_socket;
387     nxt_unit_read_buf_t      *socket_rbuf;
388 };
389 
390 
391 struct nxt_unit_process_s {
392     pid_t                    pid;
393 
394     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
395 
396     nxt_unit_impl_t          *lib;
397 
398     nxt_atomic_t             use_count;
399 
400     uint32_t                 next_port_id;
401 };
402 
403 
404 /* Explicitly using 32 bit types to avoid possible alignment. */
405 typedef struct {
406     int32_t   pid;
407     uint32_t  id;
408 } nxt_unit_port_hash_id_t;
409 
410 
411 nxt_unit_ctx_t *
412 nxt_unit_init(nxt_unit_init_t *init)
413 {
414     int              rc, queue_fd;
415     void             *mem;
416     uint32_t         ready_stream, shm_limit;
417     nxt_unit_ctx_t   *ctx;
418     nxt_unit_impl_t  *lib;
419     nxt_unit_port_t  ready_port, router_port, read_port;
420 
421     lib = nxt_unit_create(init);
422     if (nxt_slow_path(lib == NULL)) {
423         return NULL;
424     }
425 
426     queue_fd = -1;
427     mem = MAP_FAILED;
428 
429     if (init->ready_port.id.pid != 0
430         && init->ready_stream != 0
431         && init->read_port.id.pid != 0)
432     {
433         ready_port = init->ready_port;
434         ready_stream = init->ready_stream;
435         router_port = init->router_port;
436         read_port = init->read_port;
437         lib->log_fd = init->log_fd;
438 
439         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
440                               ready_port.id.id);
441         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
442                               router_port.id.id);
443         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
444                               read_port.id.id);
445 
446     } else {
447         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
448                                &lib->log_fd, &ready_stream, &shm_limit);
449         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
450             goto fail;
451         }
452 
453         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
454                                 / PORT_MMAP_DATA_SIZE;
455     }
456 
457     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
458         lib->shm_mmap_limit = 1;
459     }
460 
461     lib->pid = read_port.id.pid;
462 
463     ctx = &lib->main_ctx.ctx;
464 
465     rc = nxt_unit_fd_blocking(router_port.out_fd);
466     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
467         goto fail;
468     }
469 
470     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
471     if (nxt_slow_path(lib->router_port == NULL)) {
472         nxt_unit_alert(NULL, "failed to add router_port");
473 
474         goto fail;
475     }
476 
477     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
478     if (nxt_slow_path(queue_fd == -1)) {
479         goto fail;
480     }
481 
482     mem = mmap(NULL, sizeof(nxt_port_queue_t),
483                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
484     if (nxt_slow_path(mem == MAP_FAILED)) {
485         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
486                        strerror(errno), errno);
487 
488         goto fail;
489     }
490 
491     nxt_port_queue_init(mem);
492 
493     rc = nxt_unit_fd_blocking(read_port.in_fd);
494     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
495         goto fail;
496     }
497 
498     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
499     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
500         nxt_unit_alert(NULL, "failed to add read_port");
501 
502         goto fail;
503     }
504 
505     rc = nxt_unit_fd_blocking(ready_port.out_fd);
506     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
507         goto fail;
508     }
509 
510     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
511     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
512         nxt_unit_alert(NULL, "failed to send READY message");
513 
514         goto fail;
515     }
516 
517     nxt_unit_close(ready_port.out_fd);
518     nxt_unit_close(queue_fd);
519 
520     return ctx;
521 
522 fail:
523 
524     if (mem != MAP_FAILED) {
525         munmap(mem, sizeof(nxt_port_queue_t));
526     }
527 
528     if (queue_fd != -1) {
529         nxt_unit_close(queue_fd);
530     }
531 
532     nxt_unit_ctx_release(&lib->main_ctx.ctx);
533 
534     return NULL;
535 }
536 
537 
538 static nxt_unit_impl_t *
539 nxt_unit_create(nxt_unit_init_t *init)
540 {
541     int                   rc;
542     nxt_unit_impl_t       *lib;
543     nxt_unit_callbacks_t  *cb;
544 
545     lib = nxt_unit_malloc(NULL,
546                           sizeof(nxt_unit_impl_t) + init->request_data_size);
547     if (nxt_slow_path(lib == NULL)) {
548         nxt_unit_alert(NULL, "failed to allocate unit struct");
549 
550         return NULL;
551     }
552 
553     rc = pthread_mutex_init(&lib->mutex, NULL);
554     if (nxt_slow_path(rc != 0)) {
555         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
556 
557         goto fail;
558     }
559 
560     lib->unit.data = init->data;
561     lib->callbacks = init->callbacks;
562 
563     lib->request_data_size = init->request_data_size;
564     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
565                             / PORT_MMAP_DATA_SIZE;
566 
567     lib->processes.slot = NULL;
568     lib->ports.slot = NULL;
569 
570     lib->log_fd = STDERR_FILENO;
571 
572     nxt_queue_init(&lib->contexts);
573 
574     lib->use_count = 0;
575     lib->router_port = NULL;
576     lib->shared_port = NULL;
577 
578     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
579     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
580         pthread_mutex_destroy(&lib->mutex);
581         goto fail;
582     }
583 
584     cb = &lib->callbacks;
585 
586     if (cb->request_handler == NULL) {
587         nxt_unit_alert(NULL, "request_handler is NULL");
588 
589         pthread_mutex_destroy(&lib->mutex);
590         goto fail;
591     }
592 
593     nxt_unit_mmaps_init(&lib->incoming);
594     nxt_unit_mmaps_init(&lib->outgoing);
595 
596     return lib;
597 
598 fail:
599 
600     nxt_unit_free(NULL, lib);
601 
602     return NULL;
603 }
604 
605 
606 static int
607 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
608     void *data)
609 {
610     int  rc;
611 
612     ctx_impl->ctx.data = data;
613     ctx_impl->ctx.unit = &lib->unit;
614 
615     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
616     if (nxt_slow_path(rc != 0)) {
617         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
618 
619         return NXT_UNIT_ERROR;
620     }
621 
622     nxt_unit_lib_use(lib);
623 
624     pthread_mutex_lock(&lib->mutex);
625 
626     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
627 
628     pthread_mutex_unlock(&lib->mutex);
629 
630     ctx_impl->use_count = 1;
631     ctx_impl->wait_items = 0;
632     ctx_impl->online = 1;
633     ctx_impl->ready = 0;
634 
635     nxt_queue_init(&ctx_impl->free_req);
636     nxt_queue_init(&ctx_impl->free_ws);
637     nxt_queue_init(&ctx_impl->active_req);
638     nxt_queue_init(&ctx_impl->ready_req);
639     nxt_queue_init(&ctx_impl->pending_rbuf);
640     nxt_queue_init(&ctx_impl->free_rbuf);
641 
642     ctx_impl->free_buf = NULL;
643     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
644     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
645 
646     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
647     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
648 
649     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
650 
651     ctx_impl->req.req.ctx = &ctx_impl->ctx;
652     ctx_impl->req.req.unit = &lib->unit;
653 
654     ctx_impl->read_port = NULL;
655     ctx_impl->requests.slot = 0;
656 
657     return NXT_UNIT_OK;
658 }
659 
660 
661 nxt_inline void
662 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
663 {
664     nxt_unit_ctx_impl_t  *ctx_impl;
665 
666     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
667 
668     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
669 }
670 
671 
672 nxt_inline void
673 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
674 {
675     long                 c;
676     nxt_unit_ctx_impl_t  *ctx_impl;
677 
678     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
679 
680     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
681 
682     if (c == 1) {
683         nxt_unit_ctx_free(ctx_impl);
684     }
685 }
686 
687 
688 nxt_inline void
689 nxt_unit_lib_use(nxt_unit_impl_t *lib)
690 {
691     nxt_atomic_fetch_add(&lib->use_count, 1);
692 }
693 
694 
695 nxt_inline void
696 nxt_unit_lib_release(nxt_unit_impl_t *lib)
697 {
698     long                c;
699     nxt_unit_process_t  *process;
700 
701     c = nxt_atomic_fetch_add(&lib->use_count, -1);
702 
703     if (c == 1) {
704         for ( ;; ) {
705             pthread_mutex_lock(&lib->mutex);
706 
707             process = nxt_unit_process_pop_first(lib);
708             if (process == NULL) {
709                 pthread_mutex_unlock(&lib->mutex);
710 
711                 break;
712             }
713 
714             nxt_unit_remove_process(lib, process);
715         }
716 
717         pthread_mutex_destroy(&lib->mutex);
718 
719         if (nxt_fast_path(lib->router_port != NULL)) {
720             nxt_unit_port_release(lib->router_port);
721         }
722 
723         if (nxt_fast_path(lib->shared_port != NULL)) {
724             nxt_unit_port_release(lib->shared_port);
725         }
726 
727         nxt_unit_mmaps_destroy(&lib->incoming);
728         nxt_unit_mmaps_destroy(&lib->outgoing);
729 
730         nxt_unit_free(NULL, lib);
731     }
732 }
733 
734 
735 nxt_inline void
736 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
737     nxt_unit_mmap_buf_t *mmap_buf)
738 {
739     mmap_buf->next = *head;
740 
741     if (mmap_buf->next != NULL) {
742         mmap_buf->next->prev = &mmap_buf->next;
743     }
744 
745     *head = mmap_buf;
746     mmap_buf->prev = head;
747 }
748 
749 
750 nxt_inline void
751 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
752     nxt_unit_mmap_buf_t *mmap_buf)
753 {
754     while (*prev != NULL) {
755         prev = &(*prev)->next;
756     }
757 
758     nxt_unit_mmap_buf_insert(prev, mmap_buf);
759 }
760 
761 
762 nxt_inline void
763 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
764 {
765     nxt_unit_mmap_buf_t  **prev;
766 
767     prev = mmap_buf->prev;
768 
769     if (mmap_buf->next != NULL) {
770         mmap_buf->next->prev = prev;
771     }
772 
773     if (prev != NULL) {
774         *prev = mmap_buf->next;
775     }
776 }
777 
778 
779 static int
780 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
781     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
782     uint32_t *shm_limit)
783 {
784     int       rc;
785     int       ready_fd, router_fd, read_in_fd, read_out_fd;
786     char      *unit_init, *version_end;
787     long      version_length;
788     int64_t   ready_pid, router_pid, read_pid;
789     uint32_t  ready_stream, router_id, ready_id, read_id;
790 
791     unit_init = getenv(NXT_UNIT_INIT_ENV);
792     if (nxt_slow_path(unit_init == NULL)) {
793         nxt_unit_alert(NULL, "%s is not in the current environment",
794                        NXT_UNIT_INIT_ENV);
795 
796         return NXT_UNIT_ERROR;
797     }
798 
799     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
800 
801     version_length = nxt_length(NXT_VERSION);
802 
803     version_end = strchr(unit_init, ';');
804     if (version_end == NULL
805         || version_end - unit_init != version_length
806         || memcmp(unit_init, NXT_VERSION, version_length) != 0)
807     {
808         nxt_unit_alert(NULL, "version check error");
809 
810         return NXT_UNIT_ERROR;
811     }
812 
813     rc = sscanf(version_end + 1,
814                 "%"PRIu32";"
815                 "%"PRId64",%"PRIu32",%d;"
816                 "%"PRId64",%"PRIu32",%d;"
817                 "%"PRId64",%"PRIu32",%d,%d;"
818                 "%d,%"PRIu32,
819                 &ready_stream,
820                 &ready_pid, &ready_id, &ready_fd,
821                 &router_pid, &router_id, &router_fd,
822                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
823                 log_fd, shm_limit);
824 
825     if (nxt_slow_path(rc != 13)) {
826         nxt_unit_alert(NULL, "failed to scan variables: %d", rc);
827 
828         return NXT_UNIT_ERROR;
829     }
830 
831     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
832 
833     ready_port->in_fd = -1;
834     ready_port->out_fd = ready_fd;
835     ready_port->data = NULL;
836 
837     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
838 
839     router_port->in_fd = -1;
840     router_port->out_fd = router_fd;
841     router_port->data = NULL;
842 
843     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
844 
845     read_port->in_fd = read_in_fd;
846     read_port->out_fd = read_out_fd;
847     read_port->data = NULL;
848 
849     *stream = ready_stream;
850 
851     return NXT_UNIT_OK;
852 }
853 
854 
855 static int
856 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
857 {
858     ssize_t          res;
859     nxt_port_msg_t   msg;
860     nxt_unit_impl_t  *lib;
861 
862     union {
863         struct cmsghdr  cm;
864         char            space[CMSG_SPACE(sizeof(int))];
865     } cmsg;
866 
867     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
868 
869     msg.stream = stream;
870     msg.pid = lib->pid;
871     msg.reply_port = 0;
872     msg.type = _NXT_PORT_MSG_PROCESS_READY;
873     msg.last = 1;
874     msg.mmap = 0;
875     msg.nf = 0;
876     msg.mf = 0;
877     msg.tracking = 0;
878 
879     memset(&cmsg, 0, sizeof(cmsg));
880 
881     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
882     cmsg.cm.cmsg_level = SOL_SOCKET;
883     cmsg.cm.cmsg_type = SCM_RIGHTS;
884 
885     /*
886      * memcpy() is used instead of simple
887      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
888      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
889      *   dereferencing type-punned pointer will break strict-aliasing rules
890      *
891      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
892      * in the same simple assignment as in the code above.
893      */
894     memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int));
895 
896     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg),
897                            &cmsg, sizeof(cmsg));
898     if (res != sizeof(msg)) {
899         return NXT_UNIT_ERROR;
900     }
901 
902     return NXT_UNIT_OK;
903 }
904 
905 
906 static int
907 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
908 {
909     int                  rc;
910     pid_t                pid;
911     struct cmsghdr       *cm;
912     nxt_port_msg_t       *port_msg;
913     nxt_unit_impl_t      *lib;
914     nxt_unit_recv_msg_t  recv_msg;
915 
916     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
917 
918     recv_msg.fd[0] = -1;
919     recv_msg.fd[1] = -1;
920     port_msg = (nxt_port_msg_t *) rbuf->buf;
921     cm = (struct cmsghdr *) rbuf->oob;
922 
923     if (cm->cmsg_level == SOL_SOCKET
924         && cm->cmsg_type == SCM_RIGHTS)
925     {
926         if (cm->cmsg_len == CMSG_LEN(sizeof(int))) {
927             memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int));
928         }
929 
930         if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) {
931             memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2);
932         }
933     }
934 
935     recv_msg.incoming_buf = NULL;
936 
937     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
938         if (nxt_slow_path(rbuf->size == 0)) {
939             nxt_unit_debug(ctx, "read port closed");
940 
941             nxt_unit_quit(ctx);
942             rc = NXT_UNIT_OK;
943             goto done;
944         }
945 
946         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
947 
948         rc = NXT_UNIT_ERROR;
949         goto done;
950     }
951 
952     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
953                    port_msg->stream, (int) port_msg->type,
954                    recv_msg.fd[0], recv_msg.fd[1]);
955 
956     recv_msg.stream = port_msg->stream;
957     recv_msg.pid = port_msg->pid;
958     recv_msg.reply_port = port_msg->reply_port;
959     recv_msg.last = port_msg->last;
960     recv_msg.mmap = port_msg->mmap;
961 
962     recv_msg.start = port_msg + 1;
963     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
964 
965     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
966         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
967                        port_msg->stream, (int) port_msg->type);
968         rc = NXT_UNIT_ERROR;
969         goto done;
970     }
971 
972     /* Fragmentation is unsupported. */
973     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
974         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
975                        port_msg->stream, (int) port_msg->type);
976         rc = NXT_UNIT_ERROR;
977         goto done;
978     }
979 
980     if (port_msg->mmap) {
981         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
982 
983         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
984             if (rc == NXT_UNIT_AGAIN) {
985                 recv_msg.fd[0] = -1;
986                 recv_msg.fd[1] = -1;
987             }
988 
989             goto done;
990         }
991     }
992 
993     switch (port_msg->type) {
994 
995     case _NXT_PORT_MSG_RPC_READY:
996         rc = NXT_UNIT_OK;
997         break;
998 
999     case _NXT_PORT_MSG_QUIT:
1000         nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
1001 
1002         nxt_unit_quit(ctx);
1003         rc = NXT_UNIT_OK;
1004         break;
1005 
1006     case _NXT_PORT_MSG_NEW_PORT:
1007         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1008         break;
1009 
1010     case _NXT_PORT_MSG_PORT_ACK:
1011         rc = nxt_unit_ctx_ready(ctx);
1012         break;
1013 
1014     case _NXT_PORT_MSG_CHANGE_FILE:
1015         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1016                        port_msg->stream, recv_msg.fd[0]);
1017 
1018         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1019             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1020                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1021                            strerror(errno), errno);
1022 
1023             rc = NXT_UNIT_ERROR;
1024             goto done;
1025         }
1026 
1027         rc = NXT_UNIT_OK;
1028         break;
1029 
1030     case _NXT_PORT_MSG_MMAP:
1031         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1032             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1033                            port_msg->stream, recv_msg.fd[0]);
1034 
1035             rc = NXT_UNIT_ERROR;
1036             goto done;
1037         }
1038 
1039         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1040         break;
1041 
1042     case _NXT_PORT_MSG_REQ_HEADERS:
1043         rc = nxt_unit_process_req_headers(ctx, &recv_msg);
1044         break;
1045 
1046     case _NXT_PORT_MSG_REQ_BODY:
1047         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1048         break;
1049 
1050     case _NXT_PORT_MSG_WEBSOCKET:
1051         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1052         break;
1053 
1054     case _NXT_PORT_MSG_REMOVE_PID:
1055         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1056             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1057                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1058                            (int) sizeof(pid));
1059 
1060             rc = NXT_UNIT_ERROR;
1061             goto done;
1062         }
1063 
1064         memcpy(&pid, recv_msg.start, sizeof(pid));
1065 
1066         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1067                        port_msg->stream, (int) pid);
1068 
1069         nxt_unit_remove_pid(lib, pid);
1070 
1071         rc = NXT_UNIT_OK;
1072         break;
1073 
1074     case _NXT_PORT_MSG_SHM_ACK:
1075         rc = nxt_unit_process_shm_ack(ctx);
1076         break;
1077 
1078     default:
1079         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1080                        port_msg->stream, (int) port_msg->type);
1081 
1082         rc = NXT_UNIT_ERROR;
1083         goto done;
1084     }
1085 
1086 done:
1087 
1088     if (recv_msg.fd[0] != -1) {
1089         nxt_unit_close(recv_msg.fd[0]);
1090     }
1091 
1092     if (recv_msg.fd[1] != -1) {
1093         nxt_unit_close(recv_msg.fd[1]);
1094     }
1095 
1096     while (recv_msg.incoming_buf != NULL) {
1097         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1098     }
1099 
1100     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1101 #if (NXT_DEBUG)
1102         memset(rbuf->buf, 0xAC, rbuf->size);
1103 #endif
1104         nxt_unit_read_buf_release(ctx, rbuf);
1105     }
1106 
1107     return rc;
1108 }
1109 
1110 
1111 static int
1112 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1113 {
1114     void                     *mem;
1115     nxt_unit_impl_t          *lib;
1116     nxt_unit_port_t          new_port, *port;
1117     nxt_port_msg_new_port_t  *new_port_msg;
1118 
1119     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1120         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1121                       "invalid message size (%d)",
1122                       recv_msg->stream, (int) recv_msg->size);
1123 
1124         return NXT_UNIT_ERROR;
1125     }
1126 
1127     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1128         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1129                        recv_msg->stream, recv_msg->fd[0]);
1130 
1131         return NXT_UNIT_ERROR;
1132     }
1133 
1134     new_port_msg = recv_msg->start;
1135 
1136     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1137                    recv_msg->stream, (int) new_port_msg->pid,
1138                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1139 
1140     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1141 
1142     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1143         nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1144 
1145         new_port.in_fd = recv_msg->fd[0];
1146         new_port.out_fd = -1;
1147 
1148         mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
1149                    MAP_SHARED, recv_msg->fd[1], 0);
1150 
1151     } else {
1152         if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
1153                           != NXT_UNIT_OK))
1154         {
1155             return NXT_UNIT_ERROR;
1156         }
1157 
1158         nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1159                               new_port_msg->id);
1160 
1161         new_port.in_fd = -1;
1162         new_port.out_fd = recv_msg->fd[0];
1163 
1164         mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1165                    MAP_SHARED, recv_msg->fd[1], 0);
1166     }
1167 
1168     if (nxt_slow_path(mem == MAP_FAILED)) {
1169         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1170                        strerror(errno), errno);
1171 
1172         return NXT_UNIT_ERROR;
1173     }
1174 
1175     new_port.data = NULL;
1176 
1177     recv_msg->fd[0] = -1;
1178 
1179     port = nxt_unit_add_port(ctx, &new_port, mem);
1180     if (nxt_slow_path(port == NULL)) {
1181         return NXT_UNIT_ERROR;
1182     }
1183 
1184     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1185         lib->shared_port = port;
1186 
1187         return nxt_unit_ctx_ready(ctx);
1188     }
1189 
1190     nxt_unit_port_release(port);
1191 
1192     return NXT_UNIT_OK;
1193 }
1194 
1195 
1196 static int
1197 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1198 {
1199     nxt_unit_impl_t      *lib;
1200     nxt_unit_ctx_impl_t  *ctx_impl;
1201 
1202     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1203     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1204 
1205     ctx_impl->ready = 1;
1206 
1207     if (lib->callbacks.ready_handler) {
1208         return lib->callbacks.ready_handler(ctx);
1209     }
1210 
1211     return NXT_UNIT_OK;
1212 }
1213 
1214 
1215 static int
1216 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1217 {
1218     int                           res;
1219     nxt_unit_impl_t               *lib;
1220     nxt_unit_port_id_t            port_id;
1221     nxt_unit_request_t            *r;
1222     nxt_unit_mmap_buf_t           *b;
1223     nxt_unit_request_info_t       *req;
1224     nxt_unit_request_info_impl_t  *req_impl;
1225 
1226     if (nxt_slow_path(recv_msg->mmap == 0)) {
1227         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1228                       recv_msg->stream);
1229 
1230         return NXT_UNIT_ERROR;
1231     }
1232 
1233     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1234         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1235                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1236                       (int) sizeof(nxt_unit_request_t));
1237 
1238         return NXT_UNIT_ERROR;
1239     }
1240 
1241     req_impl = nxt_unit_request_info_get(ctx);
1242     if (nxt_slow_path(req_impl == NULL)) {
1243         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1244                       recv_msg->stream);
1245 
1246         return NXT_UNIT_ERROR;
1247     }
1248 
1249     req = &req_impl->req;
1250 
1251     req->request = recv_msg->start;
1252 
1253     b = recv_msg->incoming_buf;
1254 
1255     req->request_buf = &b->buf;
1256     req->response = NULL;
1257     req->response_buf = NULL;
1258 
1259     r = req->request;
1260 
1261     req->content_length = r->content_length;
1262 
1263     req->content_buf = req->request_buf;
1264     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1265 
1266     req_impl->stream = recv_msg->stream;
1267 
1268     req_impl->outgoing_buf = NULL;
1269 
1270     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1271         b->req = req;
1272     }
1273 
1274     /* "Move" incoming buffer list to req_impl. */
1275     req_impl->incoming_buf = recv_msg->incoming_buf;
1276     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1277     recv_msg->incoming_buf = NULL;
1278 
1279     req->content_fd = recv_msg->fd[0];
1280     recv_msg->fd[0] = -1;
1281 
1282     req->response_max_fields = 0;
1283     req_impl->state = NXT_UNIT_RS_START;
1284     req_impl->websocket = 0;
1285     req_impl->in_hash = 0;
1286 
1287     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1288                    (int) r->method_length,
1289                    (char *) nxt_unit_sptr_get(&r->method),
1290                    (int) r->target_length,
1291                    (char *) nxt_unit_sptr_get(&r->target),
1292                    (int) r->content_length);
1293 
1294     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1295 
1296     res = nxt_unit_request_check_response_port(req, &port_id);
1297     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1298         return NXT_UNIT_ERROR;
1299     }
1300 
1301     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1302         res = nxt_unit_send_req_headers_ack(req);
1303         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1304             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1305 
1306             return NXT_UNIT_ERROR;
1307         }
1308 
1309         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1310 
1311         if (req->content_length
1312             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1313         {
1314             res = nxt_unit_request_hash_add(ctx, req);
1315             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1316                 nxt_unit_req_warn(req, "failed to add request to hash");
1317 
1318                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1319 
1320                 return NXT_UNIT_ERROR;
1321             }
1322 
1323             /*
1324              * If application have separate data handler, we may start
1325              * request processing and process data when it is arrived.
1326              */
1327             if (lib->callbacks.data_handler == NULL) {
1328                 return NXT_UNIT_OK;
1329             }
1330         }
1331 
1332         lib->callbacks.request_handler(req);
1333     }
1334 
1335     return NXT_UNIT_OK;
1336 }
1337 
1338 
1339 static int
1340 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1341 {
1342     uint64_t                 l;
1343     nxt_unit_impl_t          *lib;
1344     nxt_unit_mmap_buf_t      *b;
1345     nxt_unit_request_info_t  *req;
1346 
1347     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1348     if (req == NULL) {
1349         return NXT_UNIT_OK;
1350     }
1351 
1352     l = req->content_buf->end - req->content_buf->free;
1353 
1354     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1355         b->req = req;
1356         l += b->buf.end - b->buf.free;
1357     }
1358 
1359     if (recv_msg->incoming_buf != NULL) {
1360         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1361 
1362         while (b->next != NULL) {
1363             b = b->next;
1364         }
1365 
1366         /* "Move" incoming buffer list to req_impl. */
1367         b->next = recv_msg->incoming_buf;
1368         b->next->prev = &b->next;
1369 
1370         recv_msg->incoming_buf = NULL;
1371     }
1372 
1373     req->content_fd = recv_msg->fd[0];
1374     recv_msg->fd[0] = -1;
1375 
1376     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1377 
1378     if (lib->callbacks.data_handler != NULL) {
1379         lib->callbacks.data_handler(req);
1380 
1381         return NXT_UNIT_OK;
1382     }
1383 
1384     if (req->content_fd != -1 || l == req->content_length) {
1385         lib->callbacks.request_handler(req);
1386     }
1387 
1388     return NXT_UNIT_OK;
1389 }
1390 
1391 
1392 static int
1393 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1394     nxt_unit_port_id_t *port_id)
1395 {
1396     int                           res;
1397     nxt_unit_ctx_t                *ctx;
1398     nxt_unit_impl_t               *lib;
1399     nxt_unit_port_t               *port;
1400     nxt_unit_process_t            *process;
1401     nxt_unit_ctx_impl_t           *ctx_impl;
1402     nxt_unit_port_impl_t          *port_impl;
1403     nxt_unit_request_info_impl_t  *req_impl;
1404 
1405     ctx = req->ctx;
1406     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1407     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1408 
1409     pthread_mutex_lock(&lib->mutex);
1410 
1411     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1412     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1413 
1414     if (nxt_fast_path(port != NULL)) {
1415         req->response_port = port;
1416 
1417         if (nxt_fast_path(port_impl->ready)) {
1418             pthread_mutex_unlock(&lib->mutex);
1419 
1420             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1421                            (int) port->id.pid, (int) port->id.id);
1422 
1423             return NXT_UNIT_OK;
1424         }
1425 
1426         nxt_unit_debug(ctx, "check_response_port: "
1427                        "port{%d,%d} already requested",
1428                        (int) port->id.pid, (int) port->id.id);
1429 
1430         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1431 
1432         nxt_queue_insert_tail(&port_impl->awaiting_req,
1433                               &req_impl->port_wait_link);
1434 
1435         pthread_mutex_unlock(&lib->mutex);
1436 
1437         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1438 
1439         return NXT_UNIT_AGAIN;
1440     }
1441 
1442     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1443     if (nxt_slow_path(port_impl == NULL)) {
1444         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1445                        (int) sizeof(nxt_unit_port_impl_t));
1446 
1447         pthread_mutex_unlock(&lib->mutex);
1448 
1449         return NXT_UNIT_ERROR;
1450     }
1451 
1452     port = &port_impl->port;
1453 
1454     port->id = *port_id;
1455     port->in_fd = -1;
1456     port->out_fd = -1;
1457     port->data = NULL;
1458 
1459     res = nxt_unit_port_hash_add(&lib->ports, port);
1460     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1461         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1462                        port->id.pid, port->id.id);
1463 
1464         pthread_mutex_unlock(&lib->mutex);
1465 
1466         nxt_unit_free(ctx, port);
1467 
1468         return NXT_UNIT_ERROR;
1469     }
1470 
1471     process = nxt_unit_process_find(lib, port_id->pid, 0);
1472     if (nxt_slow_path(process == NULL)) {
1473         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1474                        port->id.pid);
1475 
1476         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1477 
1478         pthread_mutex_unlock(&lib->mutex);
1479 
1480         nxt_unit_free(ctx, port);
1481 
1482         return NXT_UNIT_ERROR;
1483     }
1484 
1485     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1486 
1487     port_impl->process = process;
1488     port_impl->queue = NULL;
1489     port_impl->from_socket = 0;
1490     port_impl->socket_rbuf = NULL;
1491 
1492     nxt_queue_init(&port_impl->awaiting_req);
1493 
1494     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1495 
1496     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1497 
1498     port_impl->use_count = 2;
1499     port_impl->ready = 0;
1500 
1501     req->response_port = port;
1502 
1503     pthread_mutex_unlock(&lib->mutex);
1504 
1505     res = nxt_unit_get_port(ctx, port_id);
1506     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1507         return NXT_UNIT_ERROR;
1508     }
1509 
1510     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1511 
1512     return NXT_UNIT_AGAIN;
1513 }
1514 
1515 
1516 static int
1517 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1518 {
1519     ssize_t                       res;
1520     nxt_port_msg_t                msg;
1521     nxt_unit_impl_t               *lib;
1522     nxt_unit_ctx_impl_t           *ctx_impl;
1523     nxt_unit_request_info_impl_t  *req_impl;
1524 
1525     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1526     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1527     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1528 
1529     memset(&msg, 0, sizeof(nxt_port_msg_t));
1530 
1531     msg.stream = req_impl->stream;
1532     msg.pid = lib->pid;
1533     msg.reply_port = ctx_impl->read_port->id.id;
1534     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1535 
1536     res = nxt_unit_port_send(req->ctx, req->response_port,
1537                              &msg, sizeof(msg), NULL, 0);
1538     if (nxt_slow_path(res != sizeof(msg))) {
1539         return NXT_UNIT_ERROR;
1540     }
1541 
1542     return NXT_UNIT_OK;
1543 }
1544 
1545 
1546 static int
1547 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1548 {
1549     size_t                           hsize;
1550     nxt_unit_impl_t                  *lib;
1551     nxt_unit_mmap_buf_t              *b;
1552     nxt_unit_callbacks_t             *cb;
1553     nxt_unit_request_info_t          *req;
1554     nxt_unit_request_info_impl_t     *req_impl;
1555     nxt_unit_websocket_frame_impl_t  *ws_impl;
1556 
1557     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1558     if (nxt_slow_path(req == NULL)) {
1559         return NXT_UNIT_OK;
1560     }
1561 
1562     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1563 
1564     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1565     cb = &lib->callbacks;
1566 
1567     if (cb->websocket_handler && recv_msg->size >= 2) {
1568         ws_impl = nxt_unit_websocket_frame_get(ctx);
1569         if (nxt_slow_path(ws_impl == NULL)) {
1570             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1571                           req_impl->stream);
1572 
1573             return NXT_UNIT_ERROR;
1574         }
1575 
1576         ws_impl->ws.req = req;
1577 
1578         ws_impl->buf = NULL;
1579 
1580         if (recv_msg->mmap) {
1581             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1582                 b->req = req;
1583             }
1584 
1585             /* "Move" incoming buffer list to ws_impl. */
1586             ws_impl->buf = recv_msg->incoming_buf;
1587             ws_impl->buf->prev = &ws_impl->buf;
1588             recv_msg->incoming_buf = NULL;
1589 
1590             b = ws_impl->buf;
1591 
1592         } else {
1593             b = nxt_unit_mmap_buf_get(ctx);
1594             if (nxt_slow_path(b == NULL)) {
1595                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1596                                req_impl->stream);
1597 
1598                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1599 
1600                 return NXT_UNIT_ERROR;
1601             }
1602 
1603             b->req = req;
1604             b->buf.start = recv_msg->start;
1605             b->buf.free = b->buf.start;
1606             b->buf.end = b->buf.start + recv_msg->size;
1607 
1608             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1609         }
1610 
1611         ws_impl->ws.header = (void *) b->buf.start;
1612         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1613             ws_impl->ws.header);
1614 
1615         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1616 
1617         if (ws_impl->ws.header->mask) {
1618             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1619 
1620         } else {
1621             ws_impl->ws.mask = NULL;
1622         }
1623 
1624         b->buf.free += hsize;
1625 
1626         ws_impl->ws.content_buf = &b->buf;
1627         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1628 
1629         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1630                            "payload_len=%"PRIu64,
1631                             ws_impl->ws.header->opcode,
1632                             ws_impl->ws.payload_len);
1633 
1634         cb->websocket_handler(&ws_impl->ws);
1635     }
1636 
1637     if (recv_msg->last) {
1638         req_impl->websocket = 0;
1639 
1640         if (cb->close_handler) {
1641             nxt_unit_req_debug(req, "close_handler");
1642 
1643             cb->close_handler(req);
1644 
1645         } else {
1646             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1647         }
1648     }
1649 
1650     return NXT_UNIT_OK;
1651 }
1652 
1653 
1654 static int
1655 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1656 {
1657     nxt_unit_impl_t       *lib;
1658     nxt_unit_callbacks_t  *cb;
1659 
1660     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1661     cb = &lib->callbacks;
1662 
1663     if (cb->shm_ack_handler != NULL) {
1664         cb->shm_ack_handler(ctx);
1665     }
1666 
1667     return NXT_UNIT_OK;
1668 }
1669 
1670 
1671 static nxt_unit_request_info_impl_t *
1672 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1673 {
1674     nxt_unit_impl_t               *lib;
1675     nxt_queue_link_t              *lnk;
1676     nxt_unit_ctx_impl_t           *ctx_impl;
1677     nxt_unit_request_info_impl_t  *req_impl;
1678 
1679     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1680 
1681     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1682 
1683     pthread_mutex_lock(&ctx_impl->mutex);
1684 
1685     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1686         pthread_mutex_unlock(&ctx_impl->mutex);
1687 
1688         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1689                                         + lib->request_data_size);
1690         if (nxt_slow_path(req_impl == NULL)) {
1691             return NULL;
1692         }
1693 
1694         req_impl->req.unit = ctx->unit;
1695         req_impl->req.ctx = ctx;
1696 
1697         pthread_mutex_lock(&ctx_impl->mutex);
1698 
1699     } else {
1700         lnk = nxt_queue_first(&ctx_impl->free_req);
1701         nxt_queue_remove(lnk);
1702 
1703         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1704     }
1705 
1706     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1707 
1708     pthread_mutex_unlock(&ctx_impl->mutex);
1709 
1710     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1711 
1712     return req_impl;
1713 }
1714 
1715 
1716 static void
1717 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1718 {
1719     nxt_unit_ctx_impl_t           *ctx_impl;
1720     nxt_unit_request_info_impl_t  *req_impl;
1721 
1722     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1723     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1724 
1725     req->response = NULL;
1726     req->response_buf = NULL;
1727 
1728     if (req_impl->in_hash) {
1729         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1730     }
1731 
1732     req_impl->websocket = 0;
1733 
1734     while (req_impl->outgoing_buf != NULL) {
1735         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1736     }
1737 
1738     while (req_impl->incoming_buf != NULL) {
1739         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1740     }
1741 
1742     if (req->content_fd != -1) {
1743         nxt_unit_close(req->content_fd);
1744 
1745         req->content_fd = -1;
1746     }
1747 
1748     if (req->response_port != NULL) {
1749         nxt_unit_port_release(req->response_port);
1750 
1751         req->response_port = NULL;
1752     }
1753 
1754     req_impl->state = NXT_UNIT_RS_RELEASED;
1755 
1756     pthread_mutex_lock(&ctx_impl->mutex);
1757 
1758     nxt_queue_remove(&req_impl->link);
1759 
1760     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1761 
1762     pthread_mutex_unlock(&ctx_impl->mutex);
1763 }
1764 
1765 
1766 static void
1767 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1768 {
1769     nxt_unit_ctx_impl_t  *ctx_impl;
1770 
1771     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1772 
1773     nxt_queue_remove(&req_impl->link);
1774 
1775     if (req_impl != &ctx_impl->req) {
1776         nxt_unit_free(&ctx_impl->ctx, req_impl);
1777     }
1778 }
1779 
1780 
1781 static nxt_unit_websocket_frame_impl_t *
1782 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1783 {
1784     nxt_queue_link_t                 *lnk;
1785     nxt_unit_ctx_impl_t              *ctx_impl;
1786     nxt_unit_websocket_frame_impl_t  *ws_impl;
1787 
1788     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1789 
1790     pthread_mutex_lock(&ctx_impl->mutex);
1791 
1792     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1793         pthread_mutex_unlock(&ctx_impl->mutex);
1794 
1795         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1796         if (nxt_slow_path(ws_impl == NULL)) {
1797             return NULL;
1798         }
1799 
1800     } else {
1801         lnk = nxt_queue_first(&ctx_impl->free_ws);
1802         nxt_queue_remove(lnk);
1803 
1804         pthread_mutex_unlock(&ctx_impl->mutex);
1805 
1806         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1807     }
1808 
1809     ws_impl->ctx_impl = ctx_impl;
1810 
1811     return ws_impl;
1812 }
1813 
1814 
1815 static void
1816 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1817 {
1818     nxt_unit_websocket_frame_impl_t  *ws_impl;
1819 
1820     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1821 
1822     while (ws_impl->buf != NULL) {
1823         nxt_unit_mmap_buf_free(ws_impl->buf);
1824     }
1825 
1826     ws->req = NULL;
1827 
1828     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1829 
1830     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1831 
1832     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1833 }
1834 
1835 
1836 static void
1837 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1838     nxt_unit_websocket_frame_impl_t *ws_impl)
1839 {
1840     nxt_queue_remove(&ws_impl->link);
1841 
1842     nxt_unit_free(ctx, ws_impl);
1843 }
1844 
1845 
1846 uint16_t
1847 nxt_unit_field_hash(const char *name, size_t name_length)
1848 {
1849     u_char      ch;
1850     uint32_t    hash;
1851     const char  *p, *end;
1852 
1853     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1854     end = name + name_length;
1855 
1856     for (p = name; p < end; p++) {
1857         ch = *p;
1858         hash = (hash << 4) + hash + nxt_lowcase(ch);
1859     }
1860 
1861     hash = (hash >> 16) ^ hash;
1862 
1863     return hash;
1864 }
1865 
1866 
1867 void
1868 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1869 {
1870     char                *name;
1871     uint32_t            i, j;
1872     nxt_unit_field_t    *fields, f;
1873     nxt_unit_request_t  *r;
1874 
1875     static nxt_str_t  content_length = nxt_string("content-length");
1876     static nxt_str_t  content_type = nxt_string("content-type");
1877     static nxt_str_t  cookie = nxt_string("cookie");
1878 
1879     nxt_unit_req_debug(req, "group_dup_fields");
1880 
1881     r = req->request;
1882     fields = r->fields;
1883 
1884     for (i = 0; i < r->fields_count; i++) {
1885         name = nxt_unit_sptr_get(&fields[i].name);
1886 
1887         switch (fields[i].hash) {
1888         case NXT_UNIT_HASH_CONTENT_LENGTH:
1889             if (fields[i].name_length == content_length.length
1890                 && nxt_unit_memcasecmp(name, content_length.start,
1891                                        content_length.length) == 0)
1892             {
1893                 r->content_length_field = i;
1894             }
1895 
1896             break;
1897 
1898         case NXT_UNIT_HASH_CONTENT_TYPE:
1899             if (fields[i].name_length == content_type.length
1900                 && nxt_unit_memcasecmp(name, content_type.start,
1901                                        content_type.length) == 0)
1902             {
1903                 r->content_type_field = i;
1904             }
1905 
1906             break;
1907 
1908         case NXT_UNIT_HASH_COOKIE:
1909             if (fields[i].name_length == cookie.length
1910                 && nxt_unit_memcasecmp(name, cookie.start,
1911                                        cookie.length) == 0)
1912             {
1913                 r->cookie_field = i;
1914             }
1915 
1916             break;
1917         }
1918 
1919         for (j = i + 1; j < r->fields_count; j++) {
1920             if (fields[i].hash != fields[j].hash
1921                 || fields[i].name_length != fields[j].name_length
1922                 || nxt_unit_memcasecmp(name,
1923                                        nxt_unit_sptr_get(&fields[j].name),
1924                                        fields[j].name_length) != 0)
1925             {
1926                 continue;
1927             }
1928 
1929             f = fields[j];
1930             f.value.offset += (j - (i + 1)) * sizeof(f);
1931 
1932             while (j > i + 1) {
1933                 fields[j] = fields[j - 1];
1934                 fields[j].name.offset -= sizeof(f);
1935                 fields[j].value.offset -= sizeof(f);
1936                 j--;
1937             }
1938 
1939             fields[j] = f;
1940 
1941             /* Assign the same name pointer for further grouping simplicity. */
1942             nxt_unit_sptr_set(&fields[j].name, name);
1943 
1944             i++;
1945         }
1946     }
1947 }
1948 
1949 
1950 int
1951 nxt_unit_response_init(nxt_unit_request_info_t *req,
1952     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
1953 {
1954     uint32_t                      buf_size;
1955     nxt_unit_buf_t                *buf;
1956     nxt_unit_request_info_impl_t  *req_impl;
1957 
1958     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1959 
1960     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1961         nxt_unit_req_warn(req, "init: response already sent");
1962 
1963         return NXT_UNIT_ERROR;
1964     }
1965 
1966     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
1967                        (int) max_fields_count, (int) max_fields_size);
1968 
1969     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
1970         nxt_unit_req_debug(req, "duplicate response init");
1971     }
1972 
1973     /*
1974      * Each field name and value 0-terminated by libunit,
1975      * this is the reason of '+ 2' below.
1976      */
1977     buf_size = sizeof(nxt_unit_response_t)
1978                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
1979                + max_fields_size;
1980 
1981     if (nxt_slow_path(req->response_buf != NULL)) {
1982         buf = req->response_buf;
1983 
1984         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
1985             goto init_response;
1986         }
1987 
1988         nxt_unit_buf_free(buf);
1989 
1990         req->response_buf = NULL;
1991         req->response = NULL;
1992         req->response_max_fields = 0;
1993 
1994         req_impl->state = NXT_UNIT_RS_START;
1995     }
1996 
1997     buf = nxt_unit_response_buf_alloc(req, buf_size);
1998     if (nxt_slow_path(buf == NULL)) {
1999         return NXT_UNIT_ERROR;
2000     }
2001 
2002 init_response:
2003 
2004     memset(buf->start, 0, sizeof(nxt_unit_response_t));
2005 
2006     req->response_buf = buf;
2007 
2008     req->response = (nxt_unit_response_t *) buf->start;
2009     req->response->status = status;
2010 
2011     buf->free = buf->start + sizeof(nxt_unit_response_t)
2012                 + max_fields_count * sizeof(nxt_unit_field_t);
2013 
2014     req->response_max_fields = max_fields_count;
2015     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2016 
2017     return NXT_UNIT_OK;
2018 }
2019 
2020 
2021 int
2022 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2023     uint32_t max_fields_count, uint32_t max_fields_size)
2024 {
2025     char                          *p;
2026     uint32_t                      i, buf_size;
2027     nxt_unit_buf_t                *buf;
2028     nxt_unit_field_t              *f, *src;
2029     nxt_unit_response_t           *resp;
2030     nxt_unit_request_info_impl_t  *req_impl;
2031 
2032     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2033 
2034     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2035         nxt_unit_req_warn(req, "realloc: response not init");
2036 
2037         return NXT_UNIT_ERROR;
2038     }
2039 
2040     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2041         nxt_unit_req_warn(req, "realloc: response already sent");
2042 
2043         return NXT_UNIT_ERROR;
2044     }
2045 
2046     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2047         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2048 
2049         return NXT_UNIT_ERROR;
2050     }
2051 
2052     /*
2053      * Each field name and value 0-terminated by libunit,
2054      * this is the reason of '+ 2' below.
2055      */
2056     buf_size = sizeof(nxt_unit_response_t)
2057                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2058                + max_fields_size;
2059 
2060     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2061 
2062     buf = nxt_unit_response_buf_alloc(req, buf_size);
2063     if (nxt_slow_path(buf == NULL)) {
2064         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2065         return NXT_UNIT_ERROR;
2066     }
2067 
2068     resp = (nxt_unit_response_t *) buf->start;
2069 
2070     memset(resp, 0, sizeof(nxt_unit_response_t));
2071 
2072     resp->status = req->response->status;
2073     resp->content_length = req->response->content_length;
2074 
2075     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2076     f = resp->fields;
2077 
2078     for (i = 0; i < req->response->fields_count; i++) {
2079         src = req->response->fields + i;
2080 
2081         if (nxt_slow_path(src->skip != 0)) {
2082             continue;
2083         }
2084 
2085         if (nxt_slow_path(src->name_length + src->value_length + 2
2086                           > (uint32_t) (buf->end - p)))
2087         {
2088             nxt_unit_req_warn(req, "realloc: not enough space for field"
2089                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2090                   i, src, src->name_length, src->value_length);
2091 
2092             goto fail;
2093         }
2094 
2095         nxt_unit_sptr_set(&f->name, p);
2096         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2097         *p++ = '\0';
2098 
2099         nxt_unit_sptr_set(&f->value, p);
2100         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2101         *p++ = '\0';
2102 
2103         f->hash = src->hash;
2104         f->skip = 0;
2105         f->name_length = src->name_length;
2106         f->value_length = src->value_length;
2107 
2108         resp->fields_count++;
2109         f++;
2110     }
2111 
2112     if (req->response->piggyback_content_length > 0) {
2113         if (nxt_slow_path(req->response->piggyback_content_length
2114                           > (uint32_t) (buf->end - p)))
2115         {
2116             nxt_unit_req_warn(req, "realloc: not enought space for content"
2117                   " #%"PRIu32", %"PRIu32" required",
2118                   i, req->response->piggyback_content_length);
2119 
2120             goto fail;
2121         }
2122 
2123         resp->piggyback_content_length =
2124                                        req->response->piggyback_content_length;
2125 
2126         nxt_unit_sptr_set(&resp->piggyback_content, p);
2127         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2128                        req->response->piggyback_content_length);
2129     }
2130 
2131     buf->free = p;
2132 
2133     nxt_unit_buf_free(req->response_buf);
2134 
2135     req->response = resp;
2136     req->response_buf = buf;
2137     req->response_max_fields = max_fields_count;
2138 
2139     return NXT_UNIT_OK;
2140 
2141 fail:
2142 
2143     nxt_unit_buf_free(buf);
2144 
2145     return NXT_UNIT_ERROR;
2146 }
2147 
2148 
2149 int
2150 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2151 {
2152     nxt_unit_request_info_impl_t  *req_impl;
2153 
2154     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2155 
2156     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2157 }
2158 
2159 
2160 int
2161 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2162     const char *name, uint8_t name_length,
2163     const char *value, uint32_t value_length)
2164 {
2165     nxt_unit_buf_t                *buf;
2166     nxt_unit_field_t              *f;
2167     nxt_unit_response_t           *resp;
2168     nxt_unit_request_info_impl_t  *req_impl;
2169 
2170     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2171 
2172     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2173         nxt_unit_req_warn(req, "add_field: response not initialized or "
2174                           "already sent");
2175 
2176         return NXT_UNIT_ERROR;
2177     }
2178 
2179     resp = req->response;
2180 
2181     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2182         nxt_unit_req_warn(req, "add_field: too many response fields");
2183 
2184         return NXT_UNIT_ERROR;
2185     }
2186 
2187     buf = req->response_buf;
2188 
2189     if (nxt_slow_path(name_length + value_length + 2
2190                       > (uint32_t) (buf->end - buf->free)))
2191     {
2192         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2193 
2194         return NXT_UNIT_ERROR;
2195     }
2196 
2197     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2198                        resp->fields_count,
2199                        (int) name_length, name,
2200                        (int) value_length, value);
2201 
2202     f = resp->fields + resp->fields_count;
2203 
2204     nxt_unit_sptr_set(&f->name, buf->free);
2205     buf->free = nxt_cpymem(buf->free, name, name_length);
2206     *buf->free++ = '\0';
2207 
2208     nxt_unit_sptr_set(&f->value, buf->free);
2209     buf->free = nxt_cpymem(buf->free, value, value_length);
2210     *buf->free++ = '\0';
2211 
2212     f->hash = nxt_unit_field_hash(name, name_length);
2213     f->skip = 0;
2214     f->name_length = name_length;
2215     f->value_length = value_length;
2216 
2217     resp->fields_count++;
2218 
2219     return NXT_UNIT_OK;
2220 }
2221 
2222 
2223 int
2224 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2225     const void* src, uint32_t size)
2226 {
2227     nxt_unit_buf_t                *buf;
2228     nxt_unit_response_t           *resp;
2229     nxt_unit_request_info_impl_t  *req_impl;
2230 
2231     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2232 
2233     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2234         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2235 
2236         return NXT_UNIT_ERROR;
2237     }
2238 
2239     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2240         nxt_unit_req_warn(req, "add_content: response already sent");
2241 
2242         return NXT_UNIT_ERROR;
2243     }
2244 
2245     buf = req->response_buf;
2246 
2247     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2248         nxt_unit_req_warn(req, "add_content: buffer overflow");
2249 
2250         return NXT_UNIT_ERROR;
2251     }
2252 
2253     resp = req->response;
2254 
2255     if (resp->piggyback_content_length == 0) {
2256         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2257         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2258     }
2259 
2260     resp->piggyback_content_length += size;
2261 
2262     buf->free = nxt_cpymem(buf->free, src, size);
2263 
2264     return NXT_UNIT_OK;
2265 }
2266 
2267 
2268 int
2269 nxt_unit_response_send(nxt_unit_request_info_t *req)
2270 {
2271     int                           rc;
2272     nxt_unit_mmap_buf_t           *mmap_buf;
2273     nxt_unit_request_info_impl_t  *req_impl;
2274 
2275     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2276 
2277     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2278         nxt_unit_req_warn(req, "send: response is not initialized yet");
2279 
2280         return NXT_UNIT_ERROR;
2281     }
2282 
2283     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2284         nxt_unit_req_warn(req, "send: response already sent");
2285 
2286         return NXT_UNIT_ERROR;
2287     }
2288 
2289     if (req->request->websocket_handshake && req->response->status == 101) {
2290         nxt_unit_response_upgrade(req);
2291     }
2292 
2293     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2294                        req->response->fields_count,
2295                        (int) (req->response_buf->free
2296                               - req->response_buf->start));
2297 
2298     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2299 
2300     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2301     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2302         req->response = NULL;
2303         req->response_buf = NULL;
2304         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2305 
2306         nxt_unit_mmap_buf_free(mmap_buf);
2307     }
2308 
2309     return rc;
2310 }
2311 
2312 
2313 int
2314 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2315 {
2316     nxt_unit_request_info_impl_t  *req_impl;
2317 
2318     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2319 
2320     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2321 }
2322 
2323 
2324 nxt_unit_buf_t *
2325 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2326 {
2327     int                           rc;
2328     nxt_unit_mmap_buf_t           *mmap_buf;
2329     nxt_unit_request_info_impl_t  *req_impl;
2330 
2331     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2332         nxt_unit_req_warn(req, "response_buf_alloc: "
2333                           "requested buffer (%"PRIu32") too big", size);
2334 
2335         return NULL;
2336     }
2337 
2338     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2339 
2340     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2341 
2342     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2343     if (nxt_slow_path(mmap_buf == NULL)) {
2344         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2345 
2346         return NULL;
2347     }
2348 
2349     mmap_buf->req = req;
2350 
2351     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2352 
2353     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2354                                    size, size, mmap_buf,
2355                                    NULL);
2356     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2357         nxt_unit_mmap_buf_release(mmap_buf);
2358 
2359         return NULL;
2360     }
2361 
2362     return &mmap_buf->buf;
2363 }
2364 
2365 
2366 static nxt_unit_mmap_buf_t *
2367 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2368 {
2369     nxt_unit_mmap_buf_t  *mmap_buf;
2370     nxt_unit_ctx_impl_t  *ctx_impl;
2371 
2372     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2373 
2374     pthread_mutex_lock(&ctx_impl->mutex);
2375 
2376     if (ctx_impl->free_buf == NULL) {
2377         pthread_mutex_unlock(&ctx_impl->mutex);
2378 
2379         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2380         if (nxt_slow_path(mmap_buf == NULL)) {
2381             return NULL;
2382         }
2383 
2384     } else {
2385         mmap_buf = ctx_impl->free_buf;
2386 
2387         nxt_unit_mmap_buf_unlink(mmap_buf);
2388 
2389         pthread_mutex_unlock(&ctx_impl->mutex);
2390     }
2391 
2392     mmap_buf->ctx_impl = ctx_impl;
2393 
2394     mmap_buf->hdr = NULL;
2395     mmap_buf->free_ptr = NULL;
2396 
2397     return mmap_buf;
2398 }
2399 
2400 
2401 static void
2402 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2403 {
2404     nxt_unit_mmap_buf_unlink(mmap_buf);
2405 
2406     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2407 
2408     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2409 
2410     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2411 }
2412 
2413 
2414 int
2415 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2416 {
2417     return req->request->websocket_handshake;
2418 }
2419 
2420 
2421 int
2422 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2423 {
2424     int                           rc;
2425     nxt_unit_request_info_impl_t  *req_impl;
2426 
2427     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2428 
2429     if (nxt_slow_path(req_impl->websocket != 0)) {
2430         nxt_unit_req_debug(req, "upgrade: already upgraded");
2431 
2432         return NXT_UNIT_OK;
2433     }
2434 
2435     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2436         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2437 
2438         return NXT_UNIT_ERROR;
2439     }
2440 
2441     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2442         nxt_unit_req_warn(req, "upgrade: response already sent");
2443 
2444         return NXT_UNIT_ERROR;
2445     }
2446 
2447     rc = nxt_unit_request_hash_add(req->ctx, req);
2448     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2449         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2450 
2451         return NXT_UNIT_ERROR;
2452     }
2453 
2454     req_impl->websocket = 1;
2455 
2456     req->response->status = 101;
2457 
2458     return NXT_UNIT_OK;
2459 }
2460 
2461 
2462 int
2463 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2464 {
2465     nxt_unit_request_info_impl_t  *req_impl;
2466 
2467     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2468 
2469     return req_impl->websocket;
2470 }
2471 
2472 
2473 nxt_unit_request_info_t *
2474 nxt_unit_get_request_info_from_data(void *data)
2475 {
2476     nxt_unit_request_info_impl_t  *req_impl;
2477 
2478     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2479 
2480     return &req_impl->req;
2481 }
2482 
2483 
2484 int
2485 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2486 {
2487     int                           rc;
2488     nxt_unit_mmap_buf_t           *mmap_buf;
2489     nxt_unit_request_info_t       *req;
2490     nxt_unit_request_info_impl_t  *req_impl;
2491 
2492     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2493 
2494     req = mmap_buf->req;
2495     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2496 
2497     nxt_unit_req_debug(req, "buf_send: %d bytes",
2498                        (int) (buf->free - buf->start));
2499 
2500     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2501         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2502 
2503         return NXT_UNIT_ERROR;
2504     }
2505 
2506     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2507         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2508 
2509         return NXT_UNIT_ERROR;
2510     }
2511 
2512     if (nxt_fast_path(buf->free > buf->start)) {
2513         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2514         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2515             return rc;
2516         }
2517     }
2518 
2519     nxt_unit_mmap_buf_free(mmap_buf);
2520 
2521     return NXT_UNIT_OK;
2522 }
2523 
2524 
2525 static void
2526 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2527 {
2528     int                      rc;
2529     nxt_unit_mmap_buf_t      *mmap_buf;
2530     nxt_unit_request_info_t  *req;
2531 
2532     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2533 
2534     req = mmap_buf->req;
2535 
2536     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2537     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2538         nxt_unit_mmap_buf_free(mmap_buf);
2539 
2540         nxt_unit_request_info_release(req);
2541 
2542     } else {
2543         nxt_unit_request_done(req, rc);
2544     }
2545 }
2546 
2547 
2548 static int
2549 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2550     nxt_unit_mmap_buf_t *mmap_buf, int last)
2551 {
2552     struct {
2553         nxt_port_msg_t       msg;
2554         nxt_port_mmap_msg_t  mmap_msg;
2555     } m;
2556 
2557     int                           rc;
2558     u_char                        *last_used, *first_free;
2559     ssize_t                       res;
2560     nxt_chunk_id_t                first_free_chunk;
2561     nxt_unit_buf_t                *buf;
2562     nxt_unit_impl_t               *lib;
2563     nxt_port_mmap_header_t        *hdr;
2564     nxt_unit_request_info_impl_t  *req_impl;
2565 
2566     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2567     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2568 
2569     buf = &mmap_buf->buf;
2570     hdr = mmap_buf->hdr;
2571 
2572     m.mmap_msg.size = buf->free - buf->start;
2573 
2574     m.msg.stream = req_impl->stream;
2575     m.msg.pid = lib->pid;
2576     m.msg.reply_port = 0;
2577     m.msg.type = _NXT_PORT_MSG_DATA;
2578     m.msg.last = last != 0;
2579     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2580     m.msg.nf = 0;
2581     m.msg.mf = 0;
2582     m.msg.tracking = 0;
2583 
2584     rc = NXT_UNIT_ERROR;
2585 
2586     if (m.msg.mmap) {
2587         m.mmap_msg.mmap_id = hdr->id;
2588         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2589                                                      (u_char *) buf->start);
2590 
2591         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2592                        req_impl->stream,
2593                        (int) m.mmap_msg.mmap_id,
2594                        (int) m.mmap_msg.chunk_id,
2595                        (int) m.mmap_msg.size);
2596 
2597         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2598                                  NULL, 0);
2599         if (nxt_slow_path(res != sizeof(m))) {
2600             goto free_buf;
2601         }
2602 
2603         last_used = (u_char *) buf->free - 1;
2604         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2605 
2606         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2607             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2608 
2609             buf->start = (char *) first_free;
2610             buf->free = buf->start;
2611 
2612             if (buf->end < buf->start) {
2613                 buf->end = buf->start;
2614             }
2615 
2616         } else {
2617             buf->start = NULL;
2618             buf->free = NULL;
2619             buf->end = NULL;
2620 
2621             mmap_buf->hdr = NULL;
2622         }
2623 
2624         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2625                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2626 
2627         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2628                        (int) lib->outgoing.allocated_chunks);
2629 
2630     } else {
2631         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2632                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2633         {
2634             nxt_unit_alert(req->ctx,
2635                            "#%"PRIu32": failed to send plain memory buffer"
2636                            ": no space reserved for message header",
2637                            req_impl->stream);
2638 
2639             goto free_buf;
2640         }
2641 
2642         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2643 
2644         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2645                        req_impl->stream,
2646                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2647 
2648         res = nxt_unit_port_send(req->ctx, req->response_port,
2649                                  buf->start - sizeof(m.msg),
2650                                  m.mmap_msg.size + sizeof(m.msg),
2651                                  NULL, 0);
2652         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2653             goto free_buf;
2654         }
2655     }
2656 
2657     rc = NXT_UNIT_OK;
2658 
2659 free_buf:
2660 
2661     nxt_unit_free_outgoing_buf(mmap_buf);
2662 
2663     return rc;
2664 }
2665 
2666 
2667 void
2668 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2669 {
2670     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2671 }
2672 
2673 
2674 static void
2675 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2676 {
2677     nxt_unit_free_outgoing_buf(mmap_buf);
2678 
2679     nxt_unit_mmap_buf_release(mmap_buf);
2680 }
2681 
2682 
2683 static void
2684 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2685 {
2686     if (mmap_buf->hdr != NULL) {
2687         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2688                               mmap_buf->hdr, mmap_buf->buf.start,
2689                               mmap_buf->buf.end - mmap_buf->buf.start);
2690 
2691         mmap_buf->hdr = NULL;
2692 
2693         return;
2694     }
2695 
2696     if (mmap_buf->free_ptr != NULL) {
2697         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2698 
2699         mmap_buf->free_ptr = NULL;
2700     }
2701 }
2702 
2703 
2704 static nxt_unit_read_buf_t *
2705 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2706 {
2707     nxt_unit_ctx_impl_t  *ctx_impl;
2708     nxt_unit_read_buf_t  *rbuf;
2709 
2710     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2711 
2712     pthread_mutex_lock(&ctx_impl->mutex);
2713 
2714     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2715 
2716     pthread_mutex_unlock(&ctx_impl->mutex);
2717 
2718     memset(rbuf->oob, 0, sizeof(struct cmsghdr));
2719 
2720     return rbuf;
2721 }
2722 
2723 
2724 static nxt_unit_read_buf_t *
2725 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2726 {
2727     nxt_queue_link_t     *link;
2728     nxt_unit_read_buf_t  *rbuf;
2729 
2730     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2731         link = nxt_queue_first(&ctx_impl->free_rbuf);
2732         nxt_queue_remove(link);
2733 
2734         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2735 
2736         return rbuf;
2737     }
2738 
2739     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2740 
2741     if (nxt_fast_path(rbuf != NULL)) {
2742         rbuf->ctx_impl = ctx_impl;
2743     }
2744 
2745     return rbuf;
2746 }
2747 
2748 
2749 static void
2750 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2751     nxt_unit_read_buf_t *rbuf)
2752 {
2753     nxt_unit_ctx_impl_t  *ctx_impl;
2754 
2755     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2756 
2757     pthread_mutex_lock(&ctx_impl->mutex);
2758 
2759     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2760 
2761     pthread_mutex_unlock(&ctx_impl->mutex);
2762 }
2763 
2764 
2765 nxt_unit_buf_t *
2766 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2767 {
2768     nxt_unit_mmap_buf_t  *mmap_buf;
2769 
2770     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2771 
2772     if (mmap_buf->next == NULL) {
2773         return NULL;
2774     }
2775 
2776     return &mmap_buf->next->buf;
2777 }
2778 
2779 
2780 uint32_t
2781 nxt_unit_buf_max(void)
2782 {
2783     return PORT_MMAP_DATA_SIZE;
2784 }
2785 
2786 
2787 uint32_t
2788 nxt_unit_buf_min(void)
2789 {
2790     return PORT_MMAP_CHUNK_SIZE;
2791 }
2792 
2793 
2794 int
2795 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2796     size_t size)
2797 {
2798     ssize_t  res;
2799 
2800     res = nxt_unit_response_write_nb(req, start, size, size);
2801 
2802     return res < 0 ? -res : NXT_UNIT_OK;
2803 }
2804 
2805 
2806 ssize_t
2807 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2808     size_t size, size_t min_size)
2809 {
2810     int                           rc;
2811     ssize_t                       sent;
2812     uint32_t                      part_size, min_part_size, buf_size;
2813     const char                    *part_start;
2814     nxt_unit_mmap_buf_t           mmap_buf;
2815     nxt_unit_request_info_impl_t  *req_impl;
2816     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2817 
2818     nxt_unit_req_debug(req, "write: %d", (int) size);
2819 
2820     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2821 
2822     part_start = start;
2823     sent = 0;
2824 
2825     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2826         nxt_unit_req_alert(req, "write: response not initialized yet");
2827 
2828         return -NXT_UNIT_ERROR;
2829     }
2830 
2831     /* Check if response is not send yet. */
2832     if (nxt_slow_path(req->response_buf != NULL)) {
2833         part_size = req->response_buf->end - req->response_buf->free;
2834         part_size = nxt_min(size, part_size);
2835 
2836         rc = nxt_unit_response_add_content(req, part_start, part_size);
2837         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2838             return -rc;
2839         }
2840 
2841         rc = nxt_unit_response_send(req);
2842         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2843             return -rc;
2844         }
2845 
2846         size -= part_size;
2847         part_start += part_size;
2848         sent += part_size;
2849 
2850         min_size -= nxt_min(min_size, part_size);
2851     }
2852 
2853     while (size > 0) {
2854         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2855         min_part_size = nxt_min(min_size, part_size);
2856         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2857 
2858         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2859                                        min_part_size, &mmap_buf, local_buf);
2860         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2861             return -rc;
2862         }
2863 
2864         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2865         if (nxt_slow_path(buf_size == 0)) {
2866             return sent;
2867         }
2868         part_size = nxt_min(buf_size, part_size);
2869 
2870         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2871                                        part_start, part_size);
2872 
2873         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2874         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2875             return -rc;
2876         }
2877 
2878         size -= part_size;
2879         part_start += part_size;
2880         sent += part_size;
2881 
2882         min_size -= nxt_min(min_size, part_size);
2883     }
2884 
2885     return sent;
2886 }
2887 
2888 
2889 int
2890 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2891     nxt_unit_read_info_t *read_info)
2892 {
2893     int                           rc;
2894     ssize_t                       n;
2895     uint32_t                      buf_size;
2896     nxt_unit_buf_t                *buf;
2897     nxt_unit_mmap_buf_t           mmap_buf;
2898     nxt_unit_request_info_impl_t  *req_impl;
2899     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2900 
2901     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2902 
2903     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2904         nxt_unit_req_alert(req, "write: response not initialized yet");
2905 
2906         return NXT_UNIT_ERROR;
2907     }
2908 
2909     /* Check if response is not send yet. */
2910     if (nxt_slow_path(req->response_buf != NULL)) {
2911 
2912         /* Enable content in headers buf. */
2913         rc = nxt_unit_response_add_content(req, "", 0);
2914         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2915             nxt_unit_req_error(req, "Failed to add piggyback content");
2916 
2917             return rc;
2918         }
2919 
2920         buf = req->response_buf;
2921 
2922         while (buf->end - buf->free > 0) {
2923             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2924             if (nxt_slow_path(n < 0)) {
2925                 nxt_unit_req_error(req, "Read error");
2926 
2927                 return NXT_UNIT_ERROR;
2928             }
2929 
2930             /* Manually increase sizes. */
2931             buf->free += n;
2932             req->response->piggyback_content_length += n;
2933 
2934             if (read_info->eof) {
2935                 break;
2936             }
2937         }
2938 
2939         rc = nxt_unit_response_send(req);
2940         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2941             nxt_unit_req_error(req, "Failed to send headers with content");
2942 
2943             return rc;
2944         }
2945 
2946         if (read_info->eof) {
2947             return NXT_UNIT_OK;
2948         }
2949     }
2950 
2951     while (!read_info->eof) {
2952         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
2953                            read_info->buf_size);
2954 
2955         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
2956 
2957         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2958                                        buf_size, buf_size,
2959                                        &mmap_buf, local_buf);
2960         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2961             return rc;
2962         }
2963 
2964         buf = &mmap_buf.buf;
2965 
2966         while (!read_info->eof && buf->end > buf->free) {
2967             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2968             if (nxt_slow_path(n < 0)) {
2969                 nxt_unit_req_error(req, "Read error");
2970 
2971                 nxt_unit_free_outgoing_buf(&mmap_buf);
2972 
2973                 return NXT_UNIT_ERROR;
2974             }
2975 
2976             buf->free += n;
2977         }
2978 
2979         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2980         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2981             nxt_unit_req_error(req, "Failed to send content");
2982 
2983             return rc;
2984         }
2985     }
2986 
2987     return NXT_UNIT_OK;
2988 }
2989 
2990 
2991 ssize_t
2992 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
2993 {
2994     ssize_t  buf_res, res;
2995 
2996     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
2997                                 dst, size);
2998 
2999     if (buf_res < (ssize_t) size && req->content_fd != -1) {
3000         res = read(req->content_fd, dst, size);
3001         if (nxt_slow_path(res < 0)) {
3002             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3003                                strerror(errno), errno);
3004 
3005             return res;
3006         }
3007 
3008         if (res < (ssize_t) size) {
3009             nxt_unit_close(req->content_fd);
3010 
3011             req->content_fd = -1;
3012         }
3013 
3014         req->content_length -= res;
3015         size -= res;
3016 
3017         dst = nxt_pointer_to(dst, res);
3018 
3019     } else {
3020         res = 0;
3021     }
3022 
3023     return buf_res + res;
3024 }
3025 
3026 
3027 ssize_t
3028 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3029 {
3030     char                 *p;
3031     size_t               l_size, b_size;
3032     nxt_unit_buf_t       *b;
3033     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3034 
3035     if (req->content_length == 0) {
3036         return 0;
3037     }
3038 
3039     l_size = 0;
3040 
3041     b = req->content_buf;
3042 
3043     while (b != NULL) {
3044         b_size = b->end - b->free;
3045         p = memchr(b->free, '\n', b_size);
3046 
3047         if (p != NULL) {
3048             p++;
3049             l_size += p - b->free;
3050             break;
3051         }
3052 
3053         l_size += b_size;
3054 
3055         if (max_size <= l_size) {
3056             break;
3057         }
3058 
3059         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
3060         if (mmap_buf->next == NULL
3061             && req->content_fd != -1
3062             && l_size < req->content_length)
3063         {
3064             preread_buf = nxt_unit_request_preread(req, 16384);
3065             if (nxt_slow_path(preread_buf == NULL)) {
3066                 return -1;
3067             }
3068 
3069             nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
3070         }
3071 
3072         b = nxt_unit_buf_next(b);
3073     }
3074 
3075     return nxt_min(max_size, l_size);
3076 }
3077 
3078 
3079 static nxt_unit_mmap_buf_t *
3080 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
3081 {
3082     ssize_t              res;
3083     nxt_unit_mmap_buf_t  *mmap_buf;
3084 
3085     if (req->content_fd == -1) {
3086         nxt_unit_req_alert(req, "preread: content_fd == -1");
3087         return NULL;
3088     }
3089 
3090     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
3091     if (nxt_slow_path(mmap_buf == NULL)) {
3092         nxt_unit_req_alert(req, "preread: failed to allocate buf");
3093         return NULL;
3094     }
3095 
3096     mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size);
3097     if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3098         nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
3099         nxt_unit_mmap_buf_release(mmap_buf);
3100         return NULL;
3101     }
3102 
3103     mmap_buf->plain_ptr = mmap_buf->free_ptr;
3104 
3105     mmap_buf->hdr = NULL;
3106     mmap_buf->buf.start = mmap_buf->free_ptr;
3107     mmap_buf->buf.free = mmap_buf->buf.start;
3108     mmap_buf->buf.end = mmap_buf->buf.start + size;
3109 
3110     res = read(req->content_fd, mmap_buf->free_ptr, size);
3111     if (res < 0) {
3112         nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3113                            strerror(errno), errno);
3114 
3115         nxt_unit_mmap_buf_free(mmap_buf);
3116 
3117         return NULL;
3118     }
3119 
3120     if (res < (ssize_t) size) {
3121         nxt_unit_close(req->content_fd);
3122 
3123         req->content_fd = -1;
3124     }
3125 
3126     nxt_unit_req_debug(req, "preread: read %d", (int) res);
3127 
3128     mmap_buf->buf.end = mmap_buf->buf.free + res;
3129 
3130     return mmap_buf;
3131 }
3132 
3133 
3134 static ssize_t
3135 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
3136 {
3137     u_char          *p;
3138     size_t          rest, copy, read;
3139     nxt_unit_buf_t  *buf, *last_buf;
3140 
3141     p = dst;
3142     rest = size;
3143 
3144     buf = *b;
3145     last_buf = buf;
3146 
3147     while (buf != NULL) {
3148         last_buf = buf;
3149 
3150         copy = buf->end - buf->free;
3151         copy = nxt_min(rest, copy);
3152 
3153         p = nxt_cpymem(p, buf->free, copy);
3154 
3155         buf->free += copy;
3156         rest -= copy;
3157 
3158         if (rest == 0) {
3159             if (buf->end == buf->free) {
3160                 buf = nxt_unit_buf_next(buf);
3161             }
3162 
3163             break;
3164         }
3165 
3166         buf = nxt_unit_buf_next(buf);
3167     }
3168 
3169     *b = last_buf;
3170 
3171     read = size - rest;
3172 
3173     *len -= read;
3174 
3175     return read;
3176 }
3177 
3178 
3179 void
3180 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
3181 {
3182     uint32_t                      size;
3183     nxt_port_msg_t                msg;
3184     nxt_unit_impl_t               *lib;
3185     nxt_unit_request_info_impl_t  *req_impl;
3186 
3187     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
3188 
3189     nxt_unit_req_debug(req, "done: %d", rc);
3190 
3191     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3192         goto skip_response_send;
3193     }
3194 
3195     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
3196 
3197         size = nxt_length("Content-Type") + nxt_length("text/plain");
3198 
3199         rc = nxt_unit_response_init(req, 200, 1, size);
3200         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3201             goto skip_response_send;
3202         }
3203 
3204         rc = nxt_unit_response_add_field(req, "Content-Type",
3205                                    nxt_length("Content-Type"),
3206                                    "text/plain", nxt_length("text/plain"));
3207         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3208             goto skip_response_send;
3209         }
3210     }
3211 
3212     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
3213 
3214         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
3215 
3216         nxt_unit_buf_send_done(req->response_buf);
3217 
3218         return;
3219     }
3220 
3221 skip_response_send:
3222 
3223     lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
3224 
3225     msg.stream = req_impl->stream;
3226     msg.pid = lib->pid;
3227     msg.reply_port = 0;
3228     msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
3229                                    : _NXT_PORT_MSG_RPC_ERROR;
3230     msg.last = 1;
3231     msg.mmap = 0;
3232     msg.nf = 0;
3233     msg.mf = 0;
3234     msg.tracking = 0;
3235 
3236     (void) nxt_unit_port_send(req->ctx, req->response_port,
3237                               &msg, sizeof(msg), NULL, 0);
3238 
3239     nxt_unit_request_info_release(req);
3240 }
3241 
3242 
3243 int
3244 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
3245     uint8_t last, const void *start, size_t size)
3246 {
3247     const struct iovec  iov = { (void *) start, size };
3248 
3249     return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
3250 }
3251 
3252 
3253 int
3254 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
3255     uint8_t last, const struct iovec *iov, int iovcnt)
3256 {
3257     int                     i, rc;
3258     size_t                  l, copy;
3259     uint32_t                payload_len, buf_size, alloc_size;
3260     const uint8_t           *b;
3261     nxt_unit_buf_t          *buf;
3262     nxt_unit_mmap_buf_t     mmap_buf;
3263     nxt_websocket_header_t  *wh;
3264     char                    local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
3265 
3266     payload_len = 0;
3267 
3268     for (i = 0; i < iovcnt; i++) {
3269         payload_len += iov[i].iov_len;
3270     }
3271 
3272     buf_size = 10 + payload_len;
3273     alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3274 
3275     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3276                                    alloc_size, alloc_size,
3277                                    &mmap_buf, local_buf);
3278     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3279         return rc;
3280     }
3281 
3282     buf = &mmap_buf.buf;
3283 
3284     buf->start[0] = 0;
3285     buf->start[1] = 0;
3286 
3287     buf_size -= buf->end - buf->start;
3288 
3289     wh = (void *) buf->free;
3290 
3291     buf->free = nxt_websocket_frame_init(wh, payload_len);
3292     wh->fin = last;
3293     wh->opcode = opcode;
3294 
3295     for (i = 0; i < iovcnt; i++) {
3296         b = iov[i].iov_base;
3297         l = iov[i].iov_len;
3298 
3299         while (l > 0) {
3300             copy = buf->end - buf->free;
3301             copy = nxt_min(l, copy);
3302 
3303             buf->free = nxt_cpymem(buf->free, b, copy);
3304             b += copy;
3305             l -= copy;
3306 
3307             if (l > 0) {
3308                 if (nxt_fast_path(buf->free > buf->start)) {
3309                     rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3310 
3311                     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3312                         return rc;
3313                     }
3314                 }
3315 
3316                 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3317 
3318                 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3319                                                alloc_size, alloc_size,
3320                                                &mmap_buf, local_buf);
3321                 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3322                     return rc;
3323                 }
3324 
3325                 buf_size -= buf->end - buf->start;
3326             }
3327         }
3328     }
3329 
3330     if (buf->free > buf->start) {
3331         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3332     }
3333 
3334     return rc;
3335 }
3336 
3337 
3338 ssize_t
3339 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
3340     size_t size)
3341 {
3342     ssize_t   res;
3343     uint8_t   *b;
3344     uint64_t  i, d;
3345 
3346     res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
3347                             dst, size);
3348 
3349     if (ws->mask == NULL) {
3350         return res;
3351     }
3352 
3353     b = dst;
3354     d = (ws->payload_len - ws->content_length - res) % 4;
3355 
3356     for (i = 0; i < (uint64_t) res; i++) {
3357         b[i] ^= ws->mask[ (i + d) % 4 ];
3358     }
3359 
3360     return res;
3361 }
3362 
3363 
3364 int
3365 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
3366 {
3367     char                             *b;
3368     size_t                           size, hsize;
3369     nxt_unit_websocket_frame_impl_t  *ws_impl;
3370 
3371     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
3372 
3373     if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
3374         return NXT_UNIT_OK;
3375     }
3376 
3377     size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
3378 
3379     b = nxt_unit_malloc(ws->req->ctx, size);
3380     if (nxt_slow_path(b == NULL)) {
3381         return NXT_UNIT_ERROR;
3382     }
3383 
3384     memcpy(b, ws_impl->buf->buf.start, size);
3385 
3386     hsize = nxt_websocket_frame_header_size(b);
3387 
3388     ws_impl->buf->buf.start = b;
3389     ws_impl->buf->buf.free = b + hsize;
3390     ws_impl->buf->buf.end = b + size;
3391 
3392     ws_impl->buf->free_ptr = b;
3393 
3394     ws_impl->ws.header = (nxt_websocket_header_t *) b;
3395 
3396     if (ws_impl->ws.header->mask) {
3397         ws_impl->ws.mask = (uint8_t *) b + hsize - 4;
3398 
3399     } else {
3400         ws_impl->ws.mask = NULL;
3401     }
3402 
3403     return NXT_UNIT_OK;
3404 }
3405 
3406 
3407 void
3408 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
3409 {
3410     nxt_unit_websocket_frame_release(ws);
3411 }
3412 
3413 
3414 static nxt_port_mmap_header_t *
3415 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3416     nxt_chunk_id_t *c, int *n, int min_n)
3417 {
3418     int                     res, nchunks, i;
3419     uint32_t                outgoing_size;
3420     nxt_unit_mmap_t         *mm, *mm_end;
3421     nxt_unit_impl_t         *lib;
3422     nxt_port_mmap_header_t  *hdr;
3423 
3424     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3425 
3426     pthread_mutex_lock(&lib->outgoing.mutex);
3427 
3428 retry:
3429 
3430     outgoing_size = lib->outgoing.size;
3431 
3432     mm_end = lib->outgoing.elts + outgoing_size;
3433 
3434     for (mm = lib->outgoing.elts; mm < mm_end; mm++) {
3435         hdr = mm->hdr;
3436 
3437         if (hdr->sent_over != 0xFFFFu
3438             && (hdr->sent_over != port->id.id
3439                 || mm->src_thread != pthread_self()))
3440         {
3441             continue;
3442         }
3443 
3444         *c = 0;
3445 
3446         while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
3447             nchunks = 1;
3448 
3449             while (nchunks < *n) {
3450                 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
3451                                                        *c + nchunks);
3452 
3453                 if (res == 0) {
3454                     if (nchunks >= min_n) {
3455                         *n = nchunks;
3456 
3457                         goto unlock;
3458                     }
3459 
3460                     for (i = 0; i < nchunks; i++) {
3461                         nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
3462                     }
3463 
3464                     *c += nchunks + 1;
3465                     nchunks = 0;
3466                     break;
3467                 }
3468 
3469                 nchunks++;
3470             }
3471 
3472             if (nchunks >= min_n) {
3473                 *n = nchunks;
3474 
3475                 goto unlock;
3476             }
3477         }
3478 
3479         hdr->oosm = 1;
3480     }
3481 
3482     if (outgoing_size >= lib->shm_mmap_limit) {
3483         /* Cannot allocate more shared memory. */
3484         pthread_mutex_unlock(&lib->outgoing.mutex);
3485 
3486         if (min_n == 0) {
3487             *n = 0;
3488         }
3489 
3490         if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n
3491                           >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
3492         {
3493             /* Memory allocated by application, but not send to router. */
3494             return NULL;
3495         }
3496 
3497         /* Notify router about OOSM condition. */
3498 
3499         res = nxt_unit_send_oosm(ctx, port);
3500         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3501             return NULL;
3502         }
3503 
3504         /* Return if caller can handle OOSM condition. Non-blocking mode. */
3505 
3506         if (min_n == 0) {
3507             return NULL;
3508         }
3509 
3510         nxt_unit_debug(ctx, "oosm: waiting for ACK");
3511 
3512         res = nxt_unit_wait_shm_ack(ctx);
3513         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3514             return NULL;
3515         }
3516 
3517         nxt_unit_debug(ctx, "oosm: retry");
3518 
3519         pthread_mutex_lock(&lib->outgoing.mutex);
3520 
3521         goto retry;
3522     }
3523 
3524     *c = 0;
3525     hdr = nxt_unit_new_mmap(ctx, port, *n);
3526 
3527 unlock:
3528 
3529     nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n);
3530 
3531     nxt_unit_debug(ctx, "allocated_chunks %d",
3532                    (int) lib->outgoing.allocated_chunks);
3533 
3534     pthread_mutex_unlock(&lib->outgoing.mutex);
3535 
3536     return hdr;
3537 }
3538 
3539 
3540 static int
3541 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
3542 {
3543     ssize_t          res;
3544     nxt_port_msg_t   msg;
3545     nxt_unit_impl_t  *lib;
3546 
3547     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3548 
3549     msg.stream = 0;
3550     msg.pid = lib->pid;
3551     msg.reply_port = 0;
3552     msg.type = _NXT_PORT_MSG_OOSM;
3553     msg.last = 0;
3554     msg.mmap = 0;
3555     msg.nf = 0;
3556     msg.mf = 0;
3557     msg.tracking = 0;
3558 
3559     res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
3560     if (nxt_slow_path(res != sizeof(msg))) {
3561         return NXT_UNIT_ERROR;
3562     }
3563 
3564     return NXT_UNIT_OK;
3565 }
3566 
3567 
3568 static int
3569 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
3570 {
3571     int                  res;
3572     nxt_unit_ctx_impl_t  *ctx_impl;
3573     nxt_unit_read_buf_t  *rbuf;
3574 
3575     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3576 
3577     while (1) {
3578         rbuf = nxt_unit_read_buf_get(ctx);
3579         if (nxt_slow_path(rbuf == NULL)) {
3580             return NXT_UNIT_ERROR;
3581         }
3582 
3583         res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
3584         if (res == NXT_UNIT_ERROR) {
3585             nxt_unit_read_buf_release(ctx, rbuf);
3586 
3587             return NXT_UNIT_ERROR;
3588         }
3589 
3590         if (nxt_unit_is_shm_ack(rbuf)) {
3591             nxt_unit_read_buf_release(ctx, rbuf);
3592             break;
3593         }
3594 
3595         pthread_mutex_lock(&ctx_impl->mutex);
3596 
3597         nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
3598 
3599         pthread_mutex_unlock(&ctx_impl->mutex);
3600 
3601         if (nxt_unit_is_quit(rbuf)) {
3602             nxt_unit_debug(ctx, "oosm: quit received");
3603 
3604             return NXT_UNIT_ERROR;
3605         }
3606     }
3607 
3608     return NXT_UNIT_OK;
3609 }
3610 
3611 
3612 static nxt_unit_mmap_t *
3613 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
3614 {
3615     uint32_t         cap, n;
3616     nxt_unit_mmap_t  *e;
3617 
3618     if (nxt_fast_path(mmaps->size > i)) {
3619         return mmaps->elts + i;
3620     }
3621 
3622     cap = mmaps->cap;
3623 
3624     if (cap == 0) {
3625         cap = i + 1;
3626     }
3627 
3628     while (i + 1 > cap) {
3629 
3630         if (cap < 16) {
3631             cap = cap * 2;
3632 
3633         } else {
3634             cap = cap + cap / 2;
3635         }
3636     }
3637 
3638     if (cap != mmaps->cap) {
3639 
3640         e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t));
3641         if (nxt_slow_path(e == NULL)) {
3642             return NULL;
3643         }
3644 
3645         mmaps->elts = e;
3646 
3647         for (n = mmaps->cap; n < cap; n++) {
3648             e = mmaps->elts + n;
3649 
3650             e->hdr = NULL;
3651             nxt_queue_init(&e->awaiting_rbuf);
3652         }
3653 
3654         mmaps->cap = cap;
3655     }
3656 
3657     if (i + 1 > mmaps->size) {
3658         mmaps->size = i + 1;
3659     }
3660 
3661     return mmaps->elts + i;
3662 }
3663 
3664 
3665 static nxt_port_mmap_header_t *
3666 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
3667 {
3668     int                     i, fd, rc;
3669     void                    *mem;
3670     nxt_unit_mmap_t         *mm;
3671     nxt_unit_impl_t         *lib;
3672     nxt_port_mmap_header_t  *hdr;
3673 
3674     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3675 
3676     mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size);
3677     if (nxt_slow_path(mm == NULL)) {
3678         nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
3679 
3680         return NULL;
3681     }
3682 
3683     fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE);
3684     if (nxt_slow_path(fd == -1)) {
3685         goto remove_fail;
3686     }
3687 
3688     mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3689     if (nxt_slow_path(mem == MAP_FAILED)) {
3690         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3691                        strerror(errno), errno);
3692 
3693         nxt_unit_close(fd);
3694 
3695         goto remove_fail;
3696     }
3697 
3698     mm->hdr = mem;
3699     hdr = mem;
3700 
3701     memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
3702     memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
3703 
3704     hdr->id = lib->outgoing.size - 1;
3705     hdr->src_pid = lib->pid;
3706     hdr->dst_pid = port->id.pid;
3707     hdr->sent_over = port->id.id;
3708     mm->src_thread = pthread_self();
3709 
3710     /* Mark first n chunk(s) as busy */
3711     for (i = 0; i < n; i++) {
3712         nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
3713     }
3714 
3715     /* Mark as busy chunk followed the last available chunk. */
3716     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
3717     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
3718 
3719     pthread_mutex_unlock(&lib->outgoing.mutex);
3720 
3721     rc = nxt_unit_send_mmap(ctx, port, fd);
3722     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3723         munmap(mem, PORT_MMAP_SIZE);
3724         hdr = NULL;
3725 
3726     } else {
3727         nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
3728                        hdr->id, (int) lib->pid, (int) port->id.pid);
3729     }
3730 
3731     nxt_unit_close(fd);
3732 
3733     pthread_mutex_lock(&lib->outgoing.mutex);
3734 
3735     if (nxt_fast_path(hdr != NULL)) {
3736         return hdr;
3737     }
3738 
3739 remove_fail:
3740 
3741     lib->outgoing.size--;
3742 
3743     return NULL;
3744 }
3745 
3746 
3747 static int
3748 nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size)
3749 {
3750     int              fd;
3751     nxt_unit_impl_t  *lib;
3752 
3753     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3754 
3755 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
3756     char             name[64];
3757 
3758     snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
3759              lib->pid, (void *) pthread_self());
3760 #endif
3761 
3762 #if (NXT_HAVE_MEMFD_CREATE)
3763 
3764     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
3765     if (nxt_slow_path(fd == -1)) {
3766         nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
3767                        strerror(errno), errno);
3768 
3769         return -1;
3770     }
3771 
3772     nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
3773 
3774 #elif (NXT_HAVE_SHM_OPEN_ANON)
3775 
3776     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
3777     if (nxt_slow_path(fd == -1)) {
3778         nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
3779                        strerror(errno), errno);
3780 
3781         return -1;
3782     }
3783 
3784 #elif (NXT_HAVE_SHM_OPEN)
3785 
3786     /* Just in case. */
3787     shm_unlink(name);
3788 
3789     fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
3790     if (nxt_slow_path(fd == -1)) {
3791         nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
3792                        strerror(errno), errno);
3793 
3794         return -1;
3795     }
3796 
3797     if (nxt_slow_path(shm_unlink(name) == -1)) {
3798         nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name,
3799                        strerror(errno), errno);
3800     }
3801 
3802 #else
3803 
3804 #error No working shared memory implementation.
3805 
3806 #endif
3807 
3808     if (nxt_slow_path(ftruncate(fd, size) == -1)) {
3809         nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3810                        strerror(errno), errno);
3811 
3812         nxt_unit_close(fd);
3813 
3814         return -1;
3815     }
3816 
3817     return fd;
3818 }
3819 
3820 
3821 static int
3822 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
3823 {
3824     ssize_t          res;
3825     nxt_port_msg_t   msg;
3826     nxt_unit_impl_t  *lib;
3827     union {
3828         struct cmsghdr  cm;
3829         char            space[CMSG_SPACE(sizeof(int))];
3830     } cmsg;
3831 
3832     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3833 
3834     msg.stream = 0;
3835     msg.pid = lib->pid;
3836     msg.reply_port = 0;
3837     msg.type = _NXT_PORT_MSG_MMAP;
3838     msg.last = 0;
3839     msg.mmap = 0;
3840     msg.nf = 0;
3841     msg.mf = 0;
3842     msg.tracking = 0;
3843 
3844     /*
3845      * Fill all padding fields with 0.
3846      * Code in Go 1.11 validate cmsghdr using padding field as part of len.
3847      * See Cmsghdr definition and socketControlMessageHeaderAndData function.
3848      */
3849     memset(&cmsg, 0, sizeof(cmsg));
3850 
3851     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
3852     cmsg.cm.cmsg_level = SOL_SOCKET;
3853     cmsg.cm.cmsg_type = SCM_RIGHTS;
3854 
3855     /*
3856      * memcpy() is used instead of simple
3857      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
3858      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
3859      *   dereferencing type-punned pointer will break strict-aliasing rules
3860      *
3861      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
3862      * in the same simple assignment as in the code above.
3863      */
3864     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
3865 
3866     res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg),
3867                              &cmsg, sizeof(cmsg));
3868     if (nxt_slow_path(res != sizeof(msg))) {
3869         return NXT_UNIT_ERROR;
3870     }
3871 
3872     return NXT_UNIT_OK;
3873 }
3874 
3875 
3876 static int
3877 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3878     uint32_t size, uint32_t min_size,
3879     nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
3880 {
3881     int                     nchunks, min_nchunks;
3882     nxt_chunk_id_t          c;
3883     nxt_port_mmap_header_t  *hdr;
3884 
3885     if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
3886         if (local_buf != NULL) {
3887             mmap_buf->free_ptr = NULL;
3888             mmap_buf->plain_ptr = local_buf;
3889 
3890         } else {
3891             mmap_buf->free_ptr = nxt_unit_malloc(ctx,
3892                                                  size + sizeof(nxt_port_msg_t));
3893             if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3894                 return NXT_UNIT_ERROR;
3895             }
3896 
3897             mmap_buf->plain_ptr = mmap_buf->free_ptr;
3898         }
3899 
3900         mmap_buf->hdr = NULL;
3901         mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
3902         mmap_buf->buf.free = mmap_buf->buf.start;
3903         mmap_buf->buf.end = mmap_buf->buf.start + size;
3904 
3905         nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
3906                        mmap_buf->buf.start, (int) size);
3907 
3908         return NXT_UNIT_OK;
3909     }
3910 
3911     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3912     min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3913 
3914     hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks);
3915     if (nxt_slow_path(hdr == NULL)) {
3916         if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
3917             mmap_buf->hdr = NULL;
3918             mmap_buf->buf.start = NULL;
3919             mmap_buf->buf.free = NULL;
3920             mmap_buf->buf.end = NULL;
3921             mmap_buf->free_ptr = NULL;
3922 
3923             return NXT_UNIT_OK;
3924         }
3925 
3926         return NXT_UNIT_ERROR;
3927     }
3928 
3929     mmap_buf->hdr = hdr;
3930     mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
3931     mmap_buf->buf.free = mmap_buf->buf.start;
3932     mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
3933     mmap_buf->free_ptr = NULL;
3934     mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3935 
3936     nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
3937                   (int) hdr->id, (int) c,
3938                   (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
3939 
3940     return NXT_UNIT_OK;
3941 }
3942 
3943 
3944 static int
3945 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
3946 {
3947     int                     rc;
3948     void                    *mem;
3949     nxt_queue_t             awaiting_rbuf;
3950     struct stat             mmap_stat;
3951     nxt_unit_mmap_t         *mm;
3952     nxt_unit_impl_t         *lib;
3953     nxt_unit_ctx_impl_t     *ctx_impl;
3954     nxt_unit_read_buf_t     *rbuf;
3955     nxt_port_mmap_header_t  *hdr;
3956 
3957     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3958 
3959     nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
3960 
3961     if (fstat(fd, &mmap_stat) == -1) {
3962         nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
3963                        strerror(errno), errno);
3964 
3965         return NXT_UNIT_ERROR;
3966     }
3967 
3968     mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
3969                MAP_SHARED, fd, 0);
3970     if (nxt_slow_path(mem == MAP_FAILED)) {
3971         nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)",
3972                        strerror(errno), errno);
3973 
3974         return NXT_UNIT_ERROR;
3975     }
3976 
3977     hdr = mem;
3978 
3979     if (nxt_slow_path(hdr->src_pid != pid)) {
3980 
3981         nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header "
3982                        "detected: %d != %d or %d != %d", (int) hdr->src_pid,
3983                        (int) pid, (int) hdr->dst_pid, (int) lib->pid);
3984 
3985         munmap(mem, PORT_MMAP_SIZE);
3986 
3987         return NXT_UNIT_ERROR;
3988     }
3989 
3990     nxt_queue_init(&awaiting_rbuf);
3991 
3992     pthread_mutex_lock(&lib->incoming.mutex);
3993 
3994     mm = nxt_unit_mmap_at(&lib->incoming, hdr->id);
3995     if (nxt_slow_path(mm == NULL)) {
3996         nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array");
3997 
3998         munmap(mem, PORT_MMAP_SIZE);
3999 
4000         rc = NXT_UNIT_ERROR;
4001 
4002     } else {
4003         mm->hdr = hdr;
4004 
4005         hdr->sent_over = 0xFFFFu;
4006 
4007         nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf);
4008         nxt_queue_init(&mm->awaiting_rbuf);
4009 
4010         rc = NXT_UNIT_OK;
4011     }
4012 
4013     pthread_mutex_unlock(&lib->incoming.mutex);
4014 
4015     nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
4016 
4017         ctx_impl = rbuf->ctx_impl;
4018 
4019         pthread_mutex_lock(&ctx_impl->mutex);
4020 
4021         nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link);
4022 
4023         pthread_mutex_unlock(&ctx_impl->mutex);
4024 
4025         nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
4026 
4027         nxt_unit_awake_ctx(ctx, ctx_impl);
4028 
4029     } nxt_queue_loop;
4030 
4031     return rc;
4032 }
4033 
4034 
4035 static void
4036 nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl)
4037 {
4038     nxt_port_msg_t  msg;
4039 
4040     if (nxt_fast_path(ctx == &ctx_impl->ctx)) {
4041         return;
4042     }
4043 
4044     if (nxt_slow_path(ctx_impl->read_port == NULL
4045                       || ctx_impl->read_port->out_fd == -1))
4046     {
4047         nxt_unit_alert(ctx, "target context read_port is NULL or not writable");
4048 
4049         return;
4050     }
4051 
4052     memset(&msg, 0, sizeof(nxt_port_msg_t));
4053 
4054     msg.type = _NXT_PORT_MSG_RPC_READY;
4055 
4056     (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
4057                               &msg, sizeof(msg), NULL, 0);
4058 }
4059 
4060 
4061 static void
4062 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
4063 {
4064     pthread_mutex_init(&mmaps->mutex, NULL);
4065 
4066     mmaps->size = 0;
4067     mmaps->cap = 0;
4068     mmaps->elts = NULL;
4069     mmaps->allocated_chunks = 0;
4070 }
4071 
4072 
4073 nxt_inline void
4074 nxt_unit_process_use(nxt_unit_process_t *process)
4075 {
4076     nxt_atomic_fetch_add(&process->use_count, 1);
4077 }
4078 
4079 
4080 nxt_inline void
4081 nxt_unit_process_release(nxt_unit_process_t *process)
4082 {
4083     long c;
4084 
4085     c = nxt_atomic_fetch_add(&process->use_count, -1);
4086 
4087     if (c == 1) {
4088         nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
4089 
4090         nxt_unit_free(NULL, process);
4091     }
4092 }
4093 
4094 
4095 static void
4096 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
4097 {
4098     nxt_unit_mmap_t  *mm, *end;
4099 
4100     if (mmaps->elts != NULL) {
4101         end = mmaps->elts + mmaps->size;
4102 
4103         for (mm = mmaps->elts; mm < end; mm++) {
4104             munmap(mm->hdr, PORT_MMAP_SIZE);
4105         }
4106 
4107         nxt_unit_free(NULL, mmaps->elts);
4108     }
4109 
4110     pthread_mutex_destroy(&mmaps->mutex);
4111 }
4112 
4113 
4114 static int
4115 nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
4116     pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
4117     nxt_unit_read_buf_t *rbuf)
4118 {
4119     int                  res, need_rbuf;
4120     nxt_unit_mmap_t      *mm;
4121     nxt_unit_ctx_impl_t  *ctx_impl;
4122 
4123     mm = nxt_unit_mmap_at(mmaps, id);
4124     if (nxt_slow_path(mm == NULL)) {
4125         nxt_unit_alert(ctx, "failed to allocate mmap");
4126 
4127         pthread_mutex_unlock(&mmaps->mutex);
4128 
4129         *hdr = NULL;
4130 
4131         return NXT_UNIT_ERROR;
4132     }
4133 
4134     *hdr = mm->hdr;
4135 
4136     if (nxt_fast_path(*hdr != NULL)) {
4137         return NXT_UNIT_OK;
4138     }
4139 
4140     need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf);
4141 
4142     nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link);
4143 
4144     pthread_mutex_unlock(&mmaps->mutex);
4145 
4146     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4147 
4148     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
4149 
4150     if (need_rbuf) {
4151         res = nxt_unit_get_mmap(ctx, pid, id);
4152         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
4153             return NXT_UNIT_ERROR;
4154         }
4155     }
4156 
4157     return NXT_UNIT_AGAIN;
4158 }
4159 
4160 
4161 static int
4162 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
4163     nxt_unit_read_buf_t *rbuf)
4164 {
4165     int                     res;
4166     void                    *start;
4167     uint32_t                size;
4168     nxt_unit_impl_t         *lib;
4169     nxt_unit_mmaps_t        *mmaps;
4170     nxt_unit_mmap_buf_t     *b, **incoming_tail;
4171     nxt_port_mmap_msg_t     *mmap_msg, *end;
4172     nxt_port_mmap_header_t  *hdr;
4173 
4174     if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
4175         nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
4176                       recv_msg->stream, (int) recv_msg->size);
4177 
4178         return NXT_UNIT_ERROR;
4179     }
4180 
4181     mmap_msg = recv_msg->start;
4182     end = nxt_pointer_to(recv_msg->start, recv_msg->size);
4183 
4184     incoming_tail = &recv_msg->incoming_buf;
4185 
4186     /* Allocating buffer structures. */
4187     for (; mmap_msg < end; mmap_msg++) {
4188         b = nxt_unit_mmap_buf_get(ctx);
4189         if (nxt_slow_path(b == NULL)) {
4190             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
4191                           recv_msg->stream);
4192 
4193             while (recv_msg->incoming_buf != NULL) {
4194                 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4195             }
4196 
4197             return NXT_UNIT_ERROR;
4198         }
4199 
4200         nxt_unit_mmap_buf_insert(incoming_tail, b);
4201         incoming_tail = &b->next;
4202     }
4203 
4204     b = recv_msg->incoming_buf;
4205     mmap_msg = recv_msg->start;
4206 
4207     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4208 
4209     mmaps = &lib->incoming;
4210 
4211     pthread_mutex_lock(&mmaps->mutex);
4212 
4213     for (; mmap_msg < end; mmap_msg++) {
4214         res = nxt_unit_check_rbuf_mmap(ctx, mmaps,
4215                                        recv_msg->pid, mmap_msg->mmap_id,
4216                                        &hdr, rbuf);
4217 
4218         if (nxt_slow_path(res != NXT_UNIT_OK)) {
4219             while (recv_msg->incoming_buf != NULL) {
4220                 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4221             }
4222 
4223             return res;
4224         }
4225 
4226         start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
4227         size = mmap_msg->size;
4228 
4229         if (recv_msg->start == mmap_msg) {
4230             recv_msg->start = start;
4231             recv_msg->size = size;
4232         }
4233 
4234         b->buf.start = start;
4235         b->buf.free = start;
4236         b->buf.end = b->buf.start + size;
4237         b->hdr = hdr;
4238 
4239         b = b->next;
4240 
4241         nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
4242                        recv_msg->stream,
4243                        start, (int) size,
4244                        (int) hdr->src_pid, (int) hdr->dst_pid,
4245                        (int) hdr->id, (int) mmap_msg->chunk_id,
4246                        (int) mmap_msg->size);
4247     }
4248 
4249     pthread_mutex_unlock(&mmaps->mutex);
4250 
4251     return NXT_UNIT_OK;
4252 }
4253 
4254 
4255 static int
4256 nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
4257 {
4258     ssize_t              res;
4259     nxt_unit_impl_t      *lib;
4260     nxt_unit_ctx_impl_t  *ctx_impl;
4261 
4262     struct {
4263         nxt_port_msg_t           msg;
4264         nxt_port_msg_get_mmap_t  get_mmap;
4265     } m;
4266 
4267     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4268     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4269 
4270     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
4271 
4272     m.msg.pid = lib->pid;
4273     m.msg.reply_port = ctx_impl->read_port->id.id;
4274     m.msg.type = _NXT_PORT_MSG_GET_MMAP;
4275 
4276     m.get_mmap.id = id;
4277 
4278     nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
4279 
4280     res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
4281     if (nxt_slow_path(res != sizeof(m))) {
4282         return NXT_UNIT_ERROR;
4283     }
4284 
4285     return NXT_UNIT_OK;
4286 }
4287 
4288 
4289 static void
4290 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr,
4291     void *start, uint32_t size)
4292 {
4293     int              freed_chunks;
4294     u_char           *p, *end;
4295     nxt_chunk_id_t   c;
4296     nxt_unit_impl_t  *lib;
4297 
4298     memset(start, 0xA5, size);
4299 
4300     p = start;
4301     end = p + size;
4302     c = nxt_port_mmap_chunk_id(hdr, p);
4303     freed_chunks = 0;
4304 
4305     while (p < end) {
4306         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
4307 
4308         p += PORT_MMAP_CHUNK_SIZE;
4309         c++;
4310         freed_chunks++;
4311     }
4312 
4313     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4314 
4315     if (hdr->src_pid == lib->pid && freed_chunks != 0) {
4316         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks);
4317 
4318         nxt_unit_debug(ctx, "allocated_chunks %d",
4319                        (int) lib->outgoing.allocated_chunks);
4320     }
4321 
4322     if (hdr->dst_pid == lib->pid
4323         && freed_chunks != 0
4324         && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
4325     {
4326         nxt_unit_send_shm_ack(ctx, hdr->src_pid);
4327     }
4328 }
4329 
4330 
4331 static int
4332 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
4333 {
4334     ssize_t          res;
4335     nxt_port_msg_t   msg;
4336     nxt_unit_impl_t  *lib;
4337 
4338     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4339 
4340     msg.stream = 0;
4341     msg.pid = lib->pid;
4342     msg.reply_port = 0;
4343     msg.type = _NXT_PORT_MSG_SHM_ACK;
4344     msg.last = 0;
4345     msg.mmap = 0;
4346     msg.nf = 0;
4347     msg.mf = 0;
4348     msg.tracking = 0;
4349 
4350     res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
4351     if (nxt_slow_path(res != sizeof(msg))) {
4352         return NXT_UNIT_ERROR;
4353     }
4354 
4355     return NXT_UNIT_OK;
4356 }
4357 
4358 
4359 static nxt_int_t
4360 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
4361 {
4362     nxt_process_t  *process;
4363 
4364     process = data;
4365 
4366     if (lhq->key.length == sizeof(pid_t)
4367         && *(pid_t *) lhq->key.start == process->pid)
4368     {
4369         return NXT_OK;
4370     }
4371 
4372     return NXT_DECLINED;
4373 }
4374 
4375 
4376 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
4377     NXT_LVLHSH_DEFAULT,
4378     nxt_unit_lvlhsh_pid_test,
4379     nxt_unit_lvlhsh_alloc,
4380     nxt_unit_lvlhsh_free,
4381 };
4382 
4383 
4384 static inline void
4385 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
4386 {
4387     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
4388     lhq->key.length = sizeof(*pid);
4389     lhq->key.start = (u_char *) pid;
4390     lhq->proto = &lvlhsh_processes_proto;
4391 }
4392 
4393 
4394 static nxt_unit_process_t *
4395 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
4396 {
4397     nxt_unit_impl_t     *lib;
4398     nxt_unit_process_t  *process;
4399     nxt_lvlhsh_query_t  lhq;
4400 
4401     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4402 
4403     nxt_unit_process_lhq_pid(&lhq, &pid);
4404 
4405     if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
4406         process = lhq.value;
4407         nxt_unit_process_use(process);
4408 
4409         return process;
4410     }
4411 
4412     process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t));
4413     if (nxt_slow_path(process == NULL)) {
4414         nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid);
4415 
4416         return NULL;
4417     }
4418 
4419     process->pid = pid;
4420     process->use_count = 2;
4421     process->next_port_id = 0;
4422     process->lib = lib;
4423 
4424     nxt_queue_init(&process->ports);
4425 
4426     lhq.replace = 0;
4427     lhq.value = process;
4428 
4429     switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
4430 
4431     case NXT_OK:
4432         break;
4433 
4434     default:
4435         nxt_unit_alert(ctx, "process %d insert failed", (int) pid);
4436 
4437         nxt_unit_free(ctx, process);
4438         process = NULL;
4439         break;
4440     }
4441 
4442     return process;
4443 }
4444 
4445 
4446 static nxt_unit_process_t *
4447 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
4448 {
4449     int                 rc;
4450     nxt_lvlhsh_query_t  lhq;
4451 
4452     nxt_unit_process_lhq_pid(&lhq, &pid);
4453 
4454     if (remove) {
4455         rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
4456 
4457     } else {
4458         rc = nxt_lvlhsh_find(&lib->processes, &lhq);
4459     }
4460 
4461     if (rc == NXT_OK) {
4462         if (!remove) {
4463             nxt_unit_process_use(lhq.value);
4464         }
4465 
4466         return lhq.value;
4467     }
4468 
4469     return NULL;
4470 }
4471 
4472 
4473 static nxt_unit_process_t *
4474 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
4475 {
4476     return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
4477 }
4478 
4479 
4480 int
4481 nxt_unit_run(nxt_unit_ctx_t *ctx)
4482 {
4483     int                  rc;
4484     nxt_unit_ctx_impl_t  *ctx_impl;
4485 
4486     nxt_unit_ctx_use(ctx);
4487 
4488     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4489 
4490     rc = NXT_UNIT_OK;
4491 
4492     while (nxt_fast_path(ctx_impl->online)) {
4493         rc = nxt_unit_run_once_impl(ctx);
4494 
4495         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4496             nxt_unit_quit(ctx);
4497             break;
4498         }
4499     }
4500 
4501     nxt_unit_ctx_release(ctx);
4502 
4503     return rc;
4504 }
4505 
4506 
4507 int
4508 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
4509 {
4510     int  rc;
4511 
4512     nxt_unit_ctx_use(ctx);
4513 
4514     rc = nxt_unit_run_once_impl(ctx);
4515 
4516     nxt_unit_ctx_release(ctx);
4517 
4518     return rc;
4519 }
4520 
4521 
4522 static int
4523 nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
4524 {
4525     int                  rc;
4526     nxt_unit_read_buf_t  *rbuf;
4527 
4528     rbuf = nxt_unit_read_buf_get(ctx);
4529     if (nxt_slow_path(rbuf == NULL)) {
4530         return NXT_UNIT_ERROR;
4531     }
4532 
4533     rc = nxt_unit_read_buf(ctx, rbuf);
4534     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4535         nxt_unit_read_buf_release(ctx, rbuf);
4536 
4537         return rc;
4538     }
4539 
4540     rc = nxt_unit_process_msg(ctx, rbuf);
4541     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4542         return NXT_UNIT_ERROR;
4543     }
4544 
4545     rc = nxt_unit_process_pending_rbuf(ctx);
4546     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4547         return NXT_UNIT_ERROR;
4548     }
4549 
4550     nxt_unit_process_ready_req(ctx);
4551 
4552     return rc;
4553 }
4554 
4555 
4556 static int
4557 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4558 {
4559     int                   nevents, res, err;
4560     nxt_unit_impl_t       *lib;
4561     nxt_unit_ctx_impl_t   *ctx_impl;
4562     nxt_unit_port_impl_t  *port_impl;
4563     struct pollfd         fds[2];
4564 
4565     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4566 
4567     if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) {
4568         return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4569     }
4570 
4571     port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
4572                                  port);
4573 
4574     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4575 
4576 retry:
4577 
4578     if (port_impl->from_socket == 0) {
4579         res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf);
4580         if (res == NXT_UNIT_OK) {
4581             if (nxt_unit_is_read_socket(rbuf)) {
4582                 port_impl->from_socket++;
4583 
4584                 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
4585                                (int) ctx_impl->read_port->id.pid,
4586                                (int) ctx_impl->read_port->id.id,
4587                                port_impl->from_socket);
4588 
4589             } else {
4590                 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
4591                                (int) ctx_impl->read_port->id.pid,
4592                                (int) ctx_impl->read_port->id.id,
4593                                (int) rbuf->size);
4594 
4595                 return NXT_UNIT_OK;
4596             }
4597         }
4598     }
4599 
4600     res = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
4601     if (res == NXT_UNIT_OK) {
4602         return NXT_UNIT_OK;
4603     }
4604 
4605     fds[0].fd = ctx_impl->read_port->in_fd;
4606     fds[0].events = POLLIN;
4607     fds[0].revents = 0;
4608 
4609     fds[1].fd = lib->shared_port->in_fd;
4610     fds[1].events = POLLIN;
4611     fds[1].revents = 0;
4612 
4613     nevents = poll(fds, 2, -1);
4614     if (nxt_slow_path(nevents == -1)) {
4615         err = errno;
4616 
4617         if (err == EINTR) {
4618             goto retry;
4619         }
4620 
4621         nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",
4622                        fds[0].fd, fds[1].fd, strerror(err), err);
4623 
4624         rbuf->size = -1;
4625 
4626         return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
4627     }
4628 
4629     nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]",
4630                    fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4631                    fds[1].revents);
4632 
4633     if ((fds[0].revents & POLLIN) != 0) {
4634         res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4635         if (res == NXT_UNIT_AGAIN) {
4636             goto retry;
4637         }
4638 
4639         return res;
4640     }
4641 
4642     if ((fds[1].revents & POLLIN) != 0) {
4643         res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4644         if (res == NXT_UNIT_AGAIN) {
4645             goto retry;
4646         }
4647 
4648         return res;
4649     }
4650 
4651     nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]",
4652                    fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4653                    fds[1].revents);
4654 
4655     return NXT_UNIT_ERROR;
4656 }
4657 
4658 
4659 static int
4660 nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
4661 {
4662     int                  rc;
4663     nxt_queue_t          pending_rbuf;
4664     nxt_unit_ctx_impl_t  *ctx_impl;
4665     nxt_unit_read_buf_t  *rbuf;
4666 
4667     nxt_queue_init(&pending_rbuf);
4668 
4669     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4670 
4671     pthread_mutex_lock(&ctx_impl->mutex);
4672 
4673     if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
4674         pthread_mutex_unlock(&ctx_impl->mutex);
4675 
4676         return NXT_UNIT_OK;
4677     }
4678 
4679     nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf);
4680     nxt_queue_init(&ctx_impl->pending_rbuf);
4681 
4682     pthread_mutex_unlock(&ctx_impl->mutex);
4683 
4684     rc = NXT_UNIT_OK;
4685 
4686     nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
4687 
4688         if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
4689             rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf);
4690 
4691         } else {
4692             nxt_unit_read_buf_release(ctx, rbuf);
4693         }
4694 
4695     } nxt_queue_loop;
4696 
4697     return rc;
4698 }
4699 
4700 
4701 static void
4702 nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
4703 {
4704     int                           res;
4705     nxt_queue_t                   ready_req;
4706     nxt_unit_impl_t               *lib;
4707     nxt_unit_ctx_impl_t           *ctx_impl;
4708     nxt_unit_request_info_t       *req;
4709     nxt_unit_request_info_impl_t  *req_impl;
4710 
4711     nxt_queue_init(&ready_req);
4712 
4713     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4714 
4715     pthread_mutex_lock(&ctx_impl->mutex);
4716 
4717     if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
4718         pthread_mutex_unlock(&ctx_impl->mutex);
4719 
4720         return;
4721     }
4722 
4723     nxt_queue_add(&ready_req, &ctx_impl->ready_req);
4724     nxt_queue_init(&ctx_impl->ready_req);
4725 
4726     pthread_mutex_unlock(&ctx_impl->mutex);
4727 
4728     nxt_queue_each(req_impl, &ready_req,
4729                    nxt_unit_request_info_impl_t, port_wait_link)
4730     {
4731         lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
4732 
4733         req = &req_impl->req;
4734 
4735         res = nxt_unit_send_req_headers_ack(req);
4736         if (nxt_slow_path(res != NXT_UNIT_OK)) {
4737             nxt_unit_request_done(req, NXT_UNIT_ERROR);
4738 
4739             continue;
4740         }
4741 
4742         if (req->content_length
4743             > (uint64_t) (req->content_buf->end - req->content_buf->free))
4744         {
4745             res = nxt_unit_request_hash_add(ctx, req);
4746             if (nxt_slow_path(res != NXT_UNIT_OK)) {
4747                 nxt_unit_req_warn(req, "failed to add request to hash");
4748 
4749                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
4750 
4751                 continue;
4752             }
4753 
4754             /*
4755              * If application have separate data handler, we may start
4756              * request processing and process data when it is arrived.
4757              */
4758             if (lib->callbacks.data_handler == NULL) {
4759                 continue;
4760             }
4761         }
4762 
4763         lib->callbacks.request_handler(&req_impl->req);
4764 
4765     } nxt_queue_loop;
4766 }
4767 
4768 
4769 int
4770 nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
4771 {
4772     int                  rc;
4773     nxt_unit_read_buf_t  *rbuf;
4774     nxt_unit_ctx_impl_t  *ctx_impl;
4775 
4776     nxt_unit_ctx_use(ctx);
4777 
4778     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4779 
4780     rc = NXT_UNIT_OK;
4781 
4782     while (nxt_fast_path(ctx_impl->online)) {
4783         rbuf = nxt_unit_read_buf_get(ctx);
4784         if (nxt_slow_path(rbuf == NULL)) {
4785             rc = NXT_UNIT_ERROR;
4786             break;
4787         }
4788 
4789     retry:
4790 
4791         rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4792         if (rc == NXT_UNIT_AGAIN) {
4793             goto retry;
4794         }
4795 
4796         rc = nxt_unit_process_msg(ctx, rbuf);
4797         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4798             break;
4799         }
4800 
4801         rc = nxt_unit_process_pending_rbuf(ctx);
4802         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4803             break;
4804         }
4805 
4806         nxt_unit_process_ready_req(ctx);
4807     }
4808 
4809     nxt_unit_ctx_release(ctx);
4810 
4811     return rc;
4812 }
4813 
4814 
4815 nxt_inline int
4816 nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf)
4817 {
4818     nxt_port_msg_t  *port_msg;
4819 
4820     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4821         port_msg = (nxt_port_msg_t *) rbuf->buf;
4822 
4823         return port_msg->type == _NXT_PORT_MSG_READ_QUEUE;
4824     }
4825 
4826     return 0;
4827 }
4828 
4829 
4830 nxt_inline int
4831 nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf)
4832 {
4833     if (nxt_fast_path(rbuf->size == 1)) {
4834         return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET;
4835     }
4836 
4837     return 0;
4838 }
4839 
4840 
4841 nxt_inline int
4842 nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf)
4843 {
4844     nxt_port_msg_t  *port_msg;
4845 
4846     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4847         port_msg = (nxt_port_msg_t *) rbuf->buf;
4848 
4849         return port_msg->type == _NXT_PORT_MSG_SHM_ACK;
4850     }
4851 
4852     return 0;
4853 }
4854 
4855 
4856 nxt_inline int
4857 nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf)
4858 {
4859     nxt_port_msg_t  *port_msg;
4860 
4861     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4862         port_msg = (nxt_port_msg_t *) rbuf->buf;
4863 
4864         return port_msg->type == _NXT_PORT_MSG_QUIT;
4865     }
4866 
4867     return 0;
4868 }
4869 
4870 
4871 int
4872 nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
4873 {
4874     int                  rc;
4875     nxt_unit_impl_t      *lib;
4876     nxt_unit_read_buf_t  *rbuf;
4877     nxt_unit_ctx_impl_t  *ctx_impl;
4878 
4879     nxt_unit_ctx_use(ctx);
4880 
4881     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4882     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4883 
4884     rc = NXT_UNIT_OK;
4885 
4886     while (nxt_fast_path(ctx_impl->online)) {
4887         rbuf = nxt_unit_read_buf_get(ctx);
4888         if (nxt_slow_path(rbuf == NULL)) {
4889             rc = NXT_UNIT_ERROR;
4890             break;
4891         }
4892 
4893     retry:
4894 
4895         rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4896         if (rc == NXT_UNIT_AGAIN) {
4897             goto retry;
4898         }
4899 
4900         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4901             nxt_unit_read_buf_release(ctx, rbuf);
4902             break;
4903         }
4904 
4905         rc = nxt_unit_process_msg(ctx, rbuf);
4906         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4907             break;
4908         }
4909 
4910         rc = nxt_unit_process_pending_rbuf(ctx);
4911         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4912             break;
4913         }
4914 
4915         nxt_unit_process_ready_req(ctx);
4916     }
4917 
4918     nxt_unit_ctx_release(ctx);
4919 
4920     return rc;
4921 }
4922 
4923 
4924 int
4925 nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx)
4926 {
4927     nxt_unit_impl_t  *lib;
4928 
4929     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4930 
4931     return (ctx == &lib->main_ctx.ctx);
4932 }
4933 
4934 
4935 int
4936 nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
4937 {
4938     int  rc;
4939 
4940     nxt_unit_ctx_use(ctx);
4941 
4942     rc = nxt_unit_process_port_msg_impl(ctx, port);
4943 
4944     nxt_unit_ctx_release(ctx);
4945 
4946     return rc;
4947 }
4948 
4949 
4950 static int
4951 nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
4952 {
4953     int                  rc;
4954     nxt_unit_impl_t      *lib;
4955     nxt_unit_read_buf_t  *rbuf;
4956     nxt_unit_ctx_impl_t  *ctx_impl;
4957 
4958     rbuf = nxt_unit_read_buf_get(ctx);
4959     if (nxt_slow_path(rbuf == NULL)) {
4960         return NXT_UNIT_ERROR;
4961     }
4962 
4963     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4964     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4965 
4966 retry:
4967 
4968     if (port == lib->shared_port) {
4969         rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
4970 
4971     } else {
4972         rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
4973     }
4974 
4975     if (rc != NXT_UNIT_OK) {
4976         nxt_unit_read_buf_release(ctx, rbuf);
4977         return rc;
4978     }
4979 
4980     rc = nxt_unit_process_msg(ctx, rbuf);
4981     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4982         return NXT_UNIT_ERROR;
4983     }
4984 
4985     rc = nxt_unit_process_pending_rbuf(ctx);
4986     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4987         return NXT_UNIT_ERROR;
4988     }
4989 
4990     nxt_unit_process_ready_req(ctx);
4991 
4992     rbuf = nxt_unit_read_buf_get(ctx);
4993     if (nxt_slow_path(rbuf == NULL)) {
4994         return NXT_UNIT_ERROR;
4995     }
4996 
4997     if (ctx_impl->online) {
4998         goto retry;
4999     }
5000 
5001     return rc;
5002 }
5003 
5004 
5005 void
5006 nxt_unit_done(nxt_unit_ctx_t *ctx)
5007 {
5008     nxt_unit_ctx_release(ctx);
5009 }
5010 
5011 
5012 nxt_unit_ctx_t *
5013 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
5014 {
5015     int                   rc, queue_fd;
5016     void                  *mem;
5017     nxt_unit_impl_t       *lib;
5018     nxt_unit_port_t       *port;
5019     nxt_unit_ctx_impl_t   *new_ctx;
5020     nxt_unit_port_impl_t  *port_impl;
5021 
5022     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5023 
5024     new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t)
5025                                    + lib->request_data_size);
5026     if (nxt_slow_path(new_ctx == NULL)) {
5027         nxt_unit_alert(ctx, "failed to allocate context");
5028 
5029         return NULL;
5030     }
5031 
5032     rc = nxt_unit_ctx_init(lib, new_ctx, data);
5033     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5034          nxt_unit_free(ctx, new_ctx);
5035 
5036          return NULL;
5037     }
5038 
5039     queue_fd = -1;
5040 
5041     port = nxt_unit_create_port(&new_ctx->ctx);
5042     if (nxt_slow_path(port == NULL)) {
5043         goto fail;
5044     }
5045 
5046     new_ctx->read_port = port;
5047 
5048     queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t));
5049     if (nxt_slow_path(queue_fd == -1)) {
5050         goto fail;
5051     }
5052 
5053     mem = mmap(NULL, sizeof(nxt_port_queue_t),
5054                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
5055     if (nxt_slow_path(mem == MAP_FAILED)) {
5056         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
5057                        strerror(errno), errno);
5058 
5059         goto fail;
5060     }
5061 
5062     nxt_port_queue_init(mem);
5063 
5064     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5065     port_impl->queue = mem;
5066 
5067     rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd);
5068     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5069         goto fail;
5070     }
5071 
5072     nxt_unit_close(queue_fd);
5073 
5074     return &new_ctx->ctx;
5075 
5076 fail:
5077 
5078     if (queue_fd != -1) {
5079         nxt_unit_close(queue_fd);
5080     }
5081 
5082     nxt_unit_ctx_release(&new_ctx->ctx);
5083 
5084     return NULL;
5085 }
5086 
5087 
5088 static void
5089 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
5090 {
5091     nxt_unit_impl_t                  *lib;
5092     nxt_unit_mmap_buf_t              *mmap_buf;
5093     nxt_unit_read_buf_t              *rbuf;
5094     nxt_unit_request_info_impl_t     *req_impl;
5095     nxt_unit_websocket_frame_impl_t  *ws_impl;
5096 
5097     lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
5098 
5099     nxt_queue_each(req_impl, &ctx_impl->active_req,
5100                    nxt_unit_request_info_impl_t, link)
5101     {
5102         nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
5103 
5104         nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
5105 
5106     } nxt_queue_loop;
5107 
5108     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]);
5109     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]);
5110 
5111     while (ctx_impl->free_buf != NULL) {
5112         mmap_buf = ctx_impl->free_buf;
5113         nxt_unit_mmap_buf_unlink(mmap_buf);
5114         nxt_unit_free(&ctx_impl->ctx, mmap_buf);
5115     }
5116 
5117     nxt_queue_each(req_impl, &ctx_impl->free_req,
5118                    nxt_unit_request_info_impl_t, link)
5119     {
5120         nxt_unit_request_info_free(req_impl);
5121 
5122     } nxt_queue_loop;
5123 
5124     nxt_queue_each(ws_impl, &ctx_impl->free_ws,
5125                    nxt_unit_websocket_frame_impl_t, link)
5126     {
5127         nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl);
5128 
5129     } nxt_queue_loop;
5130 
5131     nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link)
5132     {
5133         if (rbuf != &ctx_impl->ctx_read_buf) {
5134             nxt_unit_free(&ctx_impl->ctx, rbuf);
5135         }
5136     } nxt_queue_loop;
5137 
5138     pthread_mutex_destroy(&ctx_impl->mutex);
5139 
5140     pthread_mutex_lock(&lib->mutex);
5141 
5142     nxt_queue_remove(&ctx_impl->link);
5143 
5144     pthread_mutex_unlock(&lib->mutex);
5145 
5146     if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5147         nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
5148         nxt_unit_port_release(ctx_impl->read_port);
5149     }
5150 
5151     if (ctx_impl != &lib->main_ctx) {
5152         nxt_unit_free(&lib->main_ctx.ctx, ctx_impl);
5153     }
5154 
5155     nxt_unit_lib_release(lib);
5156 }
5157 
5158 
5159 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
5160 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
5161 #define NXT_UNIX_SOCKET  SOCK_SEQPACKET
5162 #else
5163 #define NXT_UNIX_SOCKET  SOCK_DGRAM
5164 #endif
5165 
5166 
5167 void
5168 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
5169 {
5170     nxt_unit_port_hash_id_t  port_hash_id;
5171 
5172     port_hash_id.pid = pid;
5173     port_hash_id.id = id;
5174 
5175     port_id->pid = pid;
5176     port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
5177     port_id->id = id;
5178 }
5179 
5180 
5181 static nxt_unit_port_t *
5182 nxt_unit_create_port(nxt_unit_ctx_t *ctx)
5183 {
5184     int                 rc, port_sockets[2];
5185     nxt_unit_impl_t     *lib;
5186     nxt_unit_port_t     new_port, *port;
5187     nxt_unit_process_t  *process;
5188 
5189     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5190 
5191     rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
5192     if (nxt_slow_path(rc != 0)) {
5193         nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
5194                       strerror(errno), errno);
5195 
5196         return NULL;
5197     }
5198 
5199     nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
5200                    port_sockets[0], port_sockets[1]);
5201 
5202     pthread_mutex_lock(&lib->mutex);
5203 
5204     process = nxt_unit_process_get(ctx, lib->pid);
5205     if (nxt_slow_path(process == NULL)) {
5206         pthread_mutex_unlock(&lib->mutex);
5207 
5208         nxt_unit_close(port_sockets[0]);
5209         nxt_unit_close(port_sockets[1]);
5210 
5211         return NULL;
5212     }
5213 
5214     nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
5215 
5216     new_port.in_fd = port_sockets[0];
5217     new_port.out_fd = port_sockets[1];
5218     new_port.data = NULL;
5219 
5220     pthread_mutex_unlock(&lib->mutex);
5221 
5222     nxt_unit_process_release(process);
5223 
5224     port = nxt_unit_add_port(ctx, &new_port, NULL);
5225     if (nxt_slow_path(port == NULL)) {
5226         nxt_unit_close(port_sockets[0]);
5227         nxt_unit_close(port_sockets[1]);
5228     }
5229 
5230     return port;
5231 }
5232 
5233 
5234 static int
5235 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
5236     nxt_unit_port_t *port, int queue_fd)
5237 {
5238     ssize_t          res;
5239     nxt_unit_impl_t  *lib;
5240     int              fds[2] = { port->out_fd, queue_fd };
5241 
5242     struct {
5243         nxt_port_msg_t            msg;
5244         nxt_port_msg_new_port_t   new_port;
5245     } m;
5246 
5247     union {
5248         struct cmsghdr  cm;
5249         char            space[CMSG_SPACE(sizeof(int) * 2)];
5250     } cmsg;
5251 
5252     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5253 
5254     m.msg.stream = 0;
5255     m.msg.pid = lib->pid;
5256     m.msg.reply_port = 0;
5257     m.msg.type = _NXT_PORT_MSG_NEW_PORT;
5258     m.msg.last = 0;
5259     m.msg.mmap = 0;
5260     m.msg.nf = 0;
5261     m.msg.mf = 0;
5262     m.msg.tracking = 0;
5263 
5264     m.new_port.id = port->id.id;
5265     m.new_port.pid = port->id.pid;
5266     m.new_port.type = NXT_PROCESS_APP;
5267     m.new_port.max_size = 16 * 1024;
5268     m.new_port.max_share = 64 * 1024;
5269 
5270     memset(&cmsg, 0, sizeof(cmsg));
5271 
5272     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2);
5273     cmsg.cm.cmsg_level = SOL_SOCKET;
5274     cmsg.cm.cmsg_type = SCM_RIGHTS;
5275 
5276     /*
5277      * memcpy() is used instead of simple
5278      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
5279      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
5280      *   dereferencing type-punned pointer will break strict-aliasing rules
5281      *
5282      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
5283      * in the same simple assignment as in the code above.
5284      */
5285     memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2);
5286 
5287     res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
5288 
5289     return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
5290 }
5291 
5292 
5293 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port)
5294 {
5295     nxt_unit_port_impl_t  *port_impl;
5296 
5297     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5298 
5299     nxt_atomic_fetch_add(&port_impl->use_count, 1);
5300 }
5301 
5302 
5303 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
5304 {
5305     long                  c;
5306     nxt_unit_port_impl_t  *port_impl;
5307 
5308     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5309 
5310     c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
5311 
5312     if (c == 1) {
5313         nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d",
5314                        (int) port->id.pid, (int) port->id.id,
5315                        port->in_fd, port->out_fd);
5316 
5317         nxt_unit_process_release(port_impl->process);
5318 
5319         if (port->in_fd != -1) {
5320             nxt_unit_close(port->in_fd);
5321 
5322             port->in_fd = -1;
5323         }
5324 
5325         if (port->out_fd != -1) {
5326             nxt_unit_close(port->out_fd);
5327 
5328             port->out_fd = -1;
5329         }
5330 
5331         if (port_impl->queue != NULL) {
5332             munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID)
5333                                      ? sizeof(nxt_app_queue_t)
5334                                      : sizeof(nxt_port_queue_t));
5335         }
5336 
5337         nxt_unit_free(NULL, port_impl);
5338     }
5339 }
5340 
5341 
5342 static nxt_unit_port_t *
5343 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
5344 {
5345     int                   rc, ready;
5346     nxt_queue_t           awaiting_req;
5347     nxt_unit_impl_t       *lib;
5348     nxt_unit_port_t       *old_port;
5349     nxt_unit_process_t    *process;
5350     nxt_unit_port_impl_t  *new_port, *old_port_impl;
5351 
5352     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5353 
5354     pthread_mutex_lock(&lib->mutex);
5355 
5356     old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
5357 
5358     if (nxt_slow_path(old_port != NULL)) {
5359         nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} "
5360                             "in_fd %d out_fd %d queue %p",
5361                             port->id.pid, port->id.id,
5362                             port->in_fd, port->out_fd, queue);
5363 
5364         if (old_port->data == NULL) {
5365             old_port->data = port->data;
5366             port->data = NULL;
5367         }
5368 
5369         if (old_port->in_fd == -1) {
5370             old_port->in_fd = port->in_fd;
5371             port->in_fd = -1;
5372         }
5373 
5374         if (port->in_fd != -1) {
5375             nxt_unit_close(port->in_fd);
5376             port->in_fd = -1;
5377         }
5378 
5379         if (old_port->out_fd == -1) {
5380             old_port->out_fd = port->out_fd;
5381             port->out_fd = -1;
5382         }
5383 
5384         if (port->out_fd != -1) {
5385             nxt_unit_close(port->out_fd);
5386             port->out_fd = -1;
5387         }
5388 
5389         *port = *old_port;
5390 
5391         nxt_queue_init(&awaiting_req);
5392 
5393         old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
5394 
5395         if (old_port_impl->queue == NULL) {
5396             old_port_impl->queue = queue;
5397         }
5398 
5399         ready = (port->in_fd != -1 || port->out_fd != -1);
5400 
5401         /*
5402          * Port can be market as 'ready' only after callbacks.add_port() call.
5403          * Otherwise, request may try to use the port before callback.
5404          */
5405         if (lib->callbacks.add_port == NULL && ready) {
5406             old_port_impl->ready = ready;
5407 
5408             if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5409                 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5410                 nxt_queue_init(&old_port_impl->awaiting_req);
5411             }
5412         }
5413 
5414         pthread_mutex_unlock(&lib->mutex);
5415 
5416         if (lib->callbacks.add_port != NULL && ready) {
5417             lib->callbacks.add_port(ctx, old_port);
5418 
5419             pthread_mutex_lock(&lib->mutex);
5420 
5421             old_port_impl->ready = ready;
5422 
5423             if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5424                 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5425                 nxt_queue_init(&old_port_impl->awaiting_req);
5426             }
5427 
5428             pthread_mutex_unlock(&lib->mutex);
5429         }
5430 
5431         nxt_unit_process_awaiting_req(ctx, &awaiting_req);
5432 
5433         return old_port;
5434     }
5435 
5436     new_port = NULL;
5437     ready = 0;
5438 
5439     nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
5440                    port->id.pid, port->id.id,
5441                    port->in_fd, port->out_fd, queue);
5442 
5443     process = nxt_unit_process_get(ctx, port->id.pid);
5444     if (nxt_slow_path(process == NULL)) {
5445         goto unlock;
5446     }
5447 
5448     if (port->id.id != NXT_UNIT_SHARED_PORT_ID
5449         && port->id.id >= process->next_port_id)
5450     {
5451         process->next_port_id = port->id.id + 1;
5452     }
5453 
5454     new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
5455     if (nxt_slow_path(new_port == NULL)) {
5456         nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
5457                        port->id.pid, port->id.id);
5458 
5459         goto unlock;
5460     }
5461 
5462     new_port->port = *port;
5463 
5464     rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
5465     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5466         nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
5467                        port->id.pid, port->id.id);
5468 
5469         nxt_unit_free(ctx, new_port);
5470 
5471         new_port = NULL;
5472 
5473         goto unlock;
5474     }
5475 
5476     nxt_queue_insert_tail(&process->ports, &new_port->link);
5477 
5478     new_port->use_count = 2;
5479     new_port->process = process;
5480     new_port->queue = queue;
5481     new_port->from_socket = 0;
5482     new_port->socket_rbuf = NULL;
5483 
5484     nxt_queue_init(&new_port->awaiting_req);
5485 
5486     ready = (port->in_fd != -1 || port->out_fd != -1);
5487 
5488     if (lib->callbacks.add_port == NULL) {
5489         new_port->ready = ready;
5490 
5491     } else {
5492         new_port->ready = 0;
5493     }
5494 
5495     process = NULL;
5496 
5497 unlock:
5498 
5499     pthread_mutex_unlock(&lib->mutex);
5500 
5501     if (nxt_slow_path(process != NULL)) {
5502         nxt_unit_process_release(process);
5503     }
5504 
5505     if (lib->callbacks.add_port != NULL && new_port != NULL && ready) {
5506         lib->callbacks.add_port(ctx, &new_port->port);
5507 
5508         nxt_queue_init(&awaiting_req);
5509 
5510         pthread_mutex_lock(&lib->mutex);
5511 
5512         new_port->ready = 1;
5513 
5514         if (!nxt_queue_is_empty(&new_port->awaiting_req)) {
5515             nxt_queue_add(&awaiting_req, &new_port->awaiting_req);
5516             nxt_queue_init(&new_port->awaiting_req);
5517         }
5518 
5519         pthread_mutex_unlock(&lib->mutex);
5520 
5521         nxt_unit_process_awaiting_req(ctx, &awaiting_req);
5522     }
5523 
5524     return (new_port == NULL) ? NULL : &new_port->port;
5525 }
5526 
5527 
5528 static void
5529 nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req)
5530 {
5531     nxt_unit_ctx_impl_t           *ctx_impl;
5532     nxt_unit_request_info_impl_t  *req_impl;
5533 
5534     nxt_queue_each(req_impl, awaiting_req,
5535                    nxt_unit_request_info_impl_t, port_wait_link)
5536     {
5537         nxt_queue_remove(&req_impl->port_wait_link);
5538 
5539         ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
5540                                     ctx);
5541 
5542         pthread_mutex_lock(&ctx_impl->mutex);
5543 
5544         nxt_queue_insert_tail(&ctx_impl->ready_req,
5545                               &req_impl->port_wait_link);
5546 
5547         pthread_mutex_unlock(&ctx_impl->mutex);
5548 
5549         nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
5550 
5551         nxt_unit_awake_ctx(ctx, ctx_impl);
5552 
5553     } nxt_queue_loop;
5554 }
5555 
5556 
5557 static void
5558 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5559 {
5560     nxt_unit_port_t       *port;
5561     nxt_unit_port_impl_t  *port_impl;
5562 
5563     pthread_mutex_lock(&lib->mutex);
5564 
5565     port = nxt_unit_remove_port_unsafe(lib, port_id);
5566 
5567     if (nxt_fast_path(port != NULL)) {
5568         port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5569 
5570         nxt_queue_remove(&port_impl->link);
5571     }
5572 
5573     pthread_mutex_unlock(&lib->mutex);
5574 
5575     if (lib->callbacks.remove_port != NULL && port != NULL) {
5576         lib->callbacks.remove_port(&lib->unit, port);
5577     }
5578 
5579     if (nxt_fast_path(port != NULL)) {
5580         nxt_unit_port_release(port);
5581     }
5582 }
5583 
5584 
5585 static nxt_unit_port_t *
5586 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5587 {
5588     nxt_unit_port_t  *port;
5589 
5590     port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
5591     if (nxt_slow_path(port == NULL)) {
5592         nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found",
5593                        (int) port_id->pid, (int) port_id->id);
5594 
5595         return NULL;
5596     }
5597 
5598     nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p",
5599                    (int) port_id->pid, (int) port_id->id,
5600                    port->in_fd, port->out_fd, port->data);
5601 
5602     return port;
5603 }
5604 
5605 
5606 static void
5607 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid)
5608 {
5609     nxt_unit_process_t  *process;
5610 
5611     pthread_mutex_lock(&lib->mutex);
5612 
5613     process = nxt_unit_process_find(lib, pid, 1);
5614     if (nxt_slow_path(process == NULL)) {
5615         nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid);
5616 
5617         pthread_mutex_unlock(&lib->mutex);
5618 
5619         return;
5620     }
5621 
5622     nxt_unit_remove_process(lib, process);
5623 
5624     if (lib->callbacks.remove_pid != NULL) {
5625         lib->callbacks.remove_pid(&lib->unit, pid);
5626     }
5627 }
5628 
5629 
5630 static void
5631 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
5632 {
5633     nxt_queue_t           ports;
5634     nxt_unit_port_impl_t  *port;
5635 
5636     nxt_queue_init(&ports);
5637 
5638     nxt_queue_add(&ports, &process->ports);
5639 
5640     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5641 
5642         nxt_unit_remove_port_unsafe(lib, &port->port.id);
5643 
5644     } nxt_queue_loop;
5645 
5646     pthread_mutex_unlock(&lib->mutex);
5647 
5648     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5649 
5650         nxt_queue_remove(&port->link);
5651 
5652         if (lib->callbacks.remove_port != NULL) {
5653             lib->callbacks.remove_port(&lib->unit, &port->port);
5654         }
5655 
5656         nxt_unit_port_release(&port->port);
5657 
5658     } nxt_queue_loop;
5659 
5660     nxt_unit_process_release(process);
5661 }
5662 
5663 
5664 static void
5665 nxt_unit_quit(nxt_unit_ctx_t *ctx)
5666 {
5667     nxt_port_msg_t       msg;
5668     nxt_unit_impl_t      *lib;
5669     nxt_unit_ctx_impl_t  *ctx_impl;
5670 
5671     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5672     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5673 
5674     if (!ctx_impl->online) {
5675         return;
5676     }
5677 
5678     ctx_impl->online = 0;
5679 
5680     if (lib->callbacks.quit != NULL) {
5681         lib->callbacks.quit(ctx);
5682     }
5683 
5684     if (ctx != &lib->main_ctx.ctx) {
5685         return;
5686     }
5687 
5688     memset(&msg, 0, sizeof(nxt_port_msg_t));
5689 
5690     msg.pid = lib->pid;
5691     msg.type = _NXT_PORT_MSG_QUIT;
5692 
5693     pthread_mutex_lock(&lib->mutex);
5694 
5695     nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
5696 
5697         if (ctx == &ctx_impl->ctx
5698             || ctx_impl->read_port == NULL
5699             || ctx_impl->read_port->out_fd == -1)
5700         {
5701             continue;
5702         }
5703 
5704         (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
5705                                   &msg, sizeof(msg), NULL, 0);
5706 
5707     } nxt_queue_loop;
5708 
5709     pthread_mutex_unlock(&lib->mutex);
5710 }
5711 
5712 
5713 static int
5714 nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
5715 {
5716     ssize_t              res;
5717     nxt_unit_impl_t      *lib;
5718     nxt_unit_ctx_impl_t  *ctx_impl;
5719 
5720     struct {
5721         nxt_port_msg_t           msg;
5722         nxt_port_msg_get_port_t  get_port;
5723     } m;
5724 
5725     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5726     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5727 
5728     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
5729 
5730     m.msg.pid = lib->pid;
5731     m.msg.reply_port = ctx_impl->read_port->id.id;
5732     m.msg.type = _NXT_PORT_MSG_GET_PORT;
5733 
5734     m.get_port.id = port_id->id;
5735     m.get_port.pid = port_id->pid;
5736 
5737     nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
5738                    (int) port_id->id);
5739 
5740     res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
5741     if (nxt_slow_path(res != sizeof(m))) {
5742         return NXT_UNIT_ERROR;
5743     }
5744 
5745     return NXT_UNIT_OK;
5746 }
5747 
5748 
5749 static ssize_t
5750 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5751     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
5752 {
5753     int                   notify;
5754     ssize_t               ret;
5755     nxt_int_t             rc;
5756     nxt_port_msg_t        msg;
5757     nxt_unit_impl_t       *lib;
5758     nxt_unit_port_impl_t  *port_impl;
5759 
5760     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5761 
5762     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5763     if (port_impl->queue != NULL && oob_size == 0
5764         && buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
5765     {
5766         rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, &notify);
5767         if (nxt_slow_path(rc != NXT_OK)) {
5768             nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5769                            (int) port->id.pid, (int) port->id.id);
5770 
5771             return -1;
5772         }
5773 
5774         nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d",
5775                        (int) port->id.pid, (int) port->id.id,
5776                        (int) buf_size, notify);
5777 
5778         if (notify) {
5779             memcpy(&msg, buf, sizeof(nxt_port_msg_t));
5780 
5781             msg.type = _NXT_PORT_MSG_READ_QUEUE;
5782 
5783             if (lib->callbacks.port_send == NULL) {
5784                 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
5785                                        sizeof(nxt_port_msg_t), NULL, 0);
5786 
5787                 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
5788                                (int) port->id.pid, (int) port->id.id,
5789                                (int) ret);
5790 
5791             } else {
5792                 ret = lib->callbacks.port_send(ctx, port, &msg,
5793                                                sizeof(nxt_port_msg_t), NULL, 0);
5794 
5795                 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue",
5796                                (int) port->id.pid, (int) port->id.id,
5797                                (int) ret);
5798             }
5799 
5800         }
5801 
5802         return buf_size;
5803     }
5804 
5805     if (port_impl->queue != NULL) {
5806         msg.type = _NXT_PORT_MSG_READ_SOCKET;
5807 
5808         rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, &notify);
5809         if (nxt_slow_path(rc != NXT_OK)) {
5810             nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5811                            (int) port->id.pid, (int) port->id.id);
5812 
5813             return -1;
5814         }
5815 
5816         nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d",
5817                        (int) port->id.pid, (int) port->id.id, notify);
5818     }
5819 
5820     if (lib->callbacks.port_send != NULL) {
5821         ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
5822                                        oob, oob_size);
5823 
5824         nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
5825                        (int) port->id.pid, (int) port->id.id,
5826                        (int) ret);
5827 
5828     } else {
5829         ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
5830                                oob, oob_size);
5831 
5832         nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
5833                        (int) port->id.pid, (int) port->id.id,
5834                        (int) ret);
5835     }
5836 
5837     return ret;
5838 }
5839 
5840 
5841 static ssize_t
5842 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
5843     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
5844 {
5845     int            err;
5846     ssize_t        res;
5847     struct iovec   iov[1];
5848     struct msghdr  msg;
5849 
5850     iov[0].iov_base = (void *) buf;
5851     iov[0].iov_len = buf_size;
5852 
5853     msg.msg_name = NULL;
5854     msg.msg_namelen = 0;
5855     msg.msg_iov = iov;
5856     msg.msg_iovlen = 1;
5857     msg.msg_flags = 0;
5858     msg.msg_control = (void *) oob;
5859     msg.msg_controllen = oob_size;
5860 
5861 retry:
5862 
5863     res = sendmsg(fd, &msg, 0);
5864 
5865     if (nxt_slow_path(res == -1)) {
5866         err = errno;
5867 
5868         if (err == EINTR) {
5869             goto retry;
5870         }
5871 
5872         /*
5873          * FIXME: This should be "alert" after router graceful shutdown
5874          * implementation.
5875          */
5876         nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
5877                       fd, (int) buf_size, strerror(err), err);
5878 
5879     } else {
5880         nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
5881                        (int) res);
5882     }
5883 
5884     return res;
5885 }
5886 
5887 
5888 static int
5889 nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5890     nxt_unit_read_buf_t *rbuf)
5891 {
5892     int                   res, read;
5893     nxt_unit_port_impl_t  *port_impl;
5894 
5895     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5896 
5897     read = 0;
5898 
5899 retry:
5900 
5901     if (port_impl->from_socket > 0) {
5902         if (port_impl->socket_rbuf != NULL
5903             && port_impl->socket_rbuf->size > 0)
5904         {
5905             port_impl->from_socket--;
5906 
5907             nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf);
5908             port_impl->socket_rbuf->size = 0;
5909 
5910             nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d",
5911                            (int) port->id.pid, (int) port->id.id,
5912                            (int) rbuf->size);
5913 
5914             return NXT_UNIT_OK;
5915         }
5916 
5917     } else {
5918         res = nxt_unit_port_queue_recv(port, rbuf);
5919 
5920         if (res == NXT_UNIT_OK) {
5921             if (nxt_unit_is_read_socket(rbuf)) {
5922                 port_impl->from_socket++;
5923 
5924                 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
5925                                (int) port->id.pid, (int) port->id.id,
5926                                port_impl->from_socket);
5927 
5928                 goto retry;
5929             }
5930 
5931             nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
5932                            (int) port->id.pid, (int) port->id.id,
5933                            (int) rbuf->size);
5934 
5935             return NXT_UNIT_OK;
5936         }
5937     }
5938 
5939     if (read) {
5940         return NXT_UNIT_AGAIN;
5941     }
5942 
5943     res = nxt_unit_port_recv(ctx, port, rbuf);
5944     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
5945         return NXT_UNIT_ERROR;
5946     }
5947 
5948     read = 1;
5949 
5950     if (nxt_unit_is_read_queue(rbuf)) {
5951         nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
5952                        (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
5953 
5954         goto retry;
5955     }
5956 
5957     nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d",
5958                    (int) port->id.pid, (int) port->id.id,
5959                    (int) rbuf->size);
5960 
5961     if (res == NXT_UNIT_AGAIN) {
5962         return NXT_UNIT_AGAIN;
5963     }
5964 
5965     if (port_impl->from_socket > 0) {
5966         port_impl->from_socket--;
5967 
5968         return NXT_UNIT_OK;
5969     }
5970 
5971     nxt_unit_debug(ctx, "port{%d,%d} suspend message %d",
5972                    (int) port->id.pid, (int) port->id.id,
5973                    (int) rbuf->size);
5974 
5975     if (port_impl->socket_rbuf == NULL) {
5976         port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx);
5977 
5978         if (nxt_slow_path(port_impl->socket_rbuf == NULL)) {
5979             return NXT_UNIT_ERROR;
5980         }
5981 
5982         port_impl->socket_rbuf->size = 0;
5983     }
5984 
5985     if (port_impl->socket_rbuf->size > 0) {
5986         nxt_unit_alert(ctx, "too many port socket messages");
5987 
5988         return NXT_UNIT_ERROR;
5989     }
5990 
5991     nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
5992 
5993     memset(rbuf->oob, 0, sizeof(struct cmsghdr));
5994 
5995     goto retry;
5996 }
5997 
5998 
5999 nxt_inline void
6000 nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
6001 {
6002     memcpy(dst->buf, src->buf, src->size);
6003     dst->size = src->size;
6004     memcpy(dst->oob, src->oob, sizeof(src->oob));
6005 }
6006 
6007 
6008 static int
6009 nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6010     nxt_unit_read_buf_t *rbuf)
6011 {
6012     int  res;
6013 
6014 retry:
6015 
6016     res = nxt_unit_app_queue_recv(port, rbuf);
6017 
6018     if (res == NXT_UNIT_AGAIN) {
6019         res = nxt_unit_port_recv(ctx, port, rbuf);
6020         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6021             return NXT_UNIT_ERROR;
6022         }
6023 
6024         if (nxt_unit_is_read_queue(rbuf)) {
6025             nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
6026                            (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6027 
6028             goto retry;
6029         }
6030     }
6031 
6032     return res;
6033 }
6034 
6035 
6036 static int
6037 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6038     nxt_unit_read_buf_t *rbuf)
6039 {
6040     int              fd, err;
6041     struct iovec     iov[1];
6042     struct msghdr    msg;
6043     nxt_unit_impl_t  *lib;
6044 
6045     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6046 
6047     if (lib->callbacks.port_recv != NULL) {
6048         rbuf->size = lib->callbacks.port_recv(ctx, port,
6049                                               rbuf->buf, sizeof(rbuf->buf),
6050                                               rbuf->oob, sizeof(rbuf->oob));
6051 
6052         nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
6053                        (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6054 
6055         if (nxt_slow_path(rbuf->size < 0)) {
6056             return NXT_UNIT_ERROR;
6057         }
6058 
6059         return NXT_UNIT_OK;
6060     }
6061 
6062     iov[0].iov_base = rbuf->buf;
6063     iov[0].iov_len = sizeof(rbuf->buf);
6064 
6065     msg.msg_name = NULL;
6066     msg.msg_namelen = 0;
6067     msg.msg_iov = iov;
6068     msg.msg_iovlen = 1;
6069     msg.msg_flags = 0;
6070     msg.msg_control = rbuf->oob;
6071     msg.msg_controllen = sizeof(rbuf->oob);
6072 
6073     fd = port->in_fd;
6074 
6075 retry:
6076 
6077     rbuf->size = recvmsg(fd, &msg, 0);
6078 
6079     if (nxt_slow_path(rbuf->size == -1)) {
6080         err = errno;
6081 
6082         if (err == EINTR) {
6083             goto retry;
6084         }
6085 
6086         if (err == EAGAIN) {
6087             nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
6088                            fd, strerror(err), err);
6089 
6090             return NXT_UNIT_AGAIN;
6091         }
6092 
6093         nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
6094                        fd, strerror(err), err);
6095 
6096         return NXT_UNIT_ERROR;
6097     }
6098 
6099     nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
6100 
6101     return NXT_UNIT_OK;
6102 }
6103 
6104 
6105 static int
6106 nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
6107 {
6108     nxt_unit_port_impl_t  *port_impl;
6109 
6110     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6111 
6112     rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
6113 
6114     return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6115 }
6116 
6117 
6118 static int
6119 nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
6120 {
6121     uint32_t              cookie;
6122     nxt_port_msg_t        *port_msg;
6123     nxt_app_queue_t       *queue;
6124     nxt_unit_port_impl_t  *port_impl;
6125 
6126     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6127     queue = port_impl->queue;
6128 
6129 retry:
6130 
6131     rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
6132 
6133     nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
6134 
6135     if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
6136         port_msg = (nxt_port_msg_t *) rbuf->buf;
6137 
6138         if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
6139             return NXT_UNIT_OK;
6140         }
6141 
6142         nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
6143 
6144         goto retry;
6145     }
6146 
6147     return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6148 }
6149 
6150 
6151 nxt_inline int
6152 nxt_unit_close(int fd)
6153 {
6154     int  res;
6155 
6156     res = close(fd);
6157 
6158     if (nxt_slow_path(res == -1)) {
6159         nxt_unit_alert(NULL, "close(%d) failed: %s (%d)",
6160                        fd, strerror(errno), errno);
6161 
6162     } else {
6163         nxt_unit_debug(NULL, "close(%d): %d", fd, res);
6164     }
6165 
6166     return res;
6167 }
6168 
6169 
6170 static int
6171 nxt_unit_fd_blocking(int fd)
6172 {
6173     int  nb;
6174 
6175     nb = 0;
6176 
6177     if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) {
6178         nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
6179                        fd, strerror(errno), errno);
6180 
6181         return NXT_UNIT_ERROR;
6182     }
6183 
6184     return NXT_UNIT_OK;
6185 }
6186 
6187 
6188 static nxt_int_t
6189 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6190 {
6191     nxt_unit_port_t          *port;
6192     nxt_unit_port_hash_id_t  *port_id;
6193 
6194     port = data;
6195     port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
6196 
6197     if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
6198         && port_id->pid == port->id.pid
6199         && port_id->id == port->id.id)
6200     {
6201         return NXT_OK;
6202     }
6203 
6204     return NXT_DECLINED;
6205 }
6206 
6207 
6208 static const nxt_lvlhsh_proto_t  lvlhsh_ports_proto  nxt_aligned(64) = {
6209     NXT_LVLHSH_DEFAULT,
6210     nxt_unit_port_hash_test,
6211     nxt_unit_lvlhsh_alloc,
6212     nxt_unit_lvlhsh_free,
6213 };
6214 
6215 
6216 static inline void
6217 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
6218     nxt_unit_port_hash_id_t *port_hash_id,
6219     nxt_unit_port_id_t *port_id)
6220 {
6221     port_hash_id->pid = port_id->pid;
6222     port_hash_id->id = port_id->id;
6223 
6224     if (nxt_fast_path(port_id->hash != 0)) {
6225         lhq->key_hash = port_id->hash;
6226 
6227     } else {
6228         lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
6229 
6230         port_id->hash = lhq->key_hash;
6231 
6232         nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
6233                        (int) port_id->pid, (int) port_id->id,
6234                        (int) port_id->hash);
6235     }
6236 
6237     lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
6238     lhq->key.start = (u_char *) port_hash_id;
6239     lhq->proto = &lvlhsh_ports_proto;
6240     lhq->pool = NULL;
6241 }
6242 
6243 
6244 static int
6245 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
6246 {
6247     nxt_int_t                res;
6248     nxt_lvlhsh_query_t       lhq;
6249     nxt_unit_port_hash_id_t  port_hash_id;
6250 
6251     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
6252     lhq.replace = 0;
6253     lhq.value = port;
6254 
6255     res = nxt_lvlhsh_insert(port_hash, &lhq);
6256 
6257     switch (res) {
6258 
6259     case NXT_OK:
6260         return NXT_UNIT_OK;
6261 
6262     default:
6263         return NXT_UNIT_ERROR;
6264     }
6265 }
6266 
6267 
6268 static nxt_unit_port_t *
6269 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
6270     int remove)
6271 {
6272     nxt_int_t                res;
6273     nxt_lvlhsh_query_t       lhq;
6274     nxt_unit_port_hash_id_t  port_hash_id;
6275 
6276     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
6277 
6278     if (remove) {
6279         res = nxt_lvlhsh_delete(port_hash, &lhq);
6280 
6281     } else {
6282         res = nxt_lvlhsh_find(port_hash, &lhq);
6283     }
6284 
6285     switch (res) {
6286 
6287     case NXT_OK:
6288         if (!remove) {
6289             nxt_unit_port_use(lhq.value);
6290         }
6291 
6292         return lhq.value;
6293 
6294     default:
6295         return NULL;
6296     }
6297 }
6298 
6299 
6300 static nxt_int_t
6301 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6302 {
6303     return NXT_OK;
6304 }
6305 
6306 
6307 static const nxt_lvlhsh_proto_t  lvlhsh_requests_proto  nxt_aligned(64) = {
6308     NXT_LVLHSH_DEFAULT,
6309     nxt_unit_request_hash_test,
6310     nxt_unit_lvlhsh_alloc,
6311     nxt_unit_lvlhsh_free,
6312 };
6313 
6314 
6315 static int
6316 nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
6317     nxt_unit_request_info_t *req)
6318 {
6319     uint32_t                      *stream;
6320     nxt_int_t                     res;
6321     nxt_lvlhsh_query_t            lhq;
6322     nxt_unit_ctx_impl_t           *ctx_impl;
6323     nxt_unit_request_info_impl_t  *req_impl;
6324 
6325     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6326     if (req_impl->in_hash) {
6327         return NXT_UNIT_OK;
6328     }
6329 
6330     stream = &req_impl->stream;
6331 
6332     lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
6333     lhq.key.length = sizeof(*stream);
6334     lhq.key.start = (u_char *) stream;
6335     lhq.proto = &lvlhsh_requests_proto;
6336     lhq.pool = NULL;
6337     lhq.replace = 0;
6338     lhq.value = req_impl;
6339 
6340     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6341 
6342     pthread_mutex_lock(&ctx_impl->mutex);
6343 
6344     res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq);
6345 
6346     pthread_mutex_unlock(&ctx_impl->mutex);
6347 
6348     switch (res) {
6349 
6350     case NXT_OK:
6351         req_impl->in_hash = 1;
6352         return NXT_UNIT_OK;
6353 
6354     default:
6355         return NXT_UNIT_ERROR;
6356     }
6357 }
6358 
6359 
6360 static nxt_unit_request_info_t *
6361 nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
6362 {
6363     nxt_int_t                     res;
6364     nxt_lvlhsh_query_t            lhq;
6365     nxt_unit_ctx_impl_t           *ctx_impl;
6366     nxt_unit_request_info_impl_t  *req_impl;
6367 
6368     lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
6369     lhq.key.length = sizeof(stream);
6370     lhq.key.start = (u_char *) &stream;
6371     lhq.proto = &lvlhsh_requests_proto;
6372     lhq.pool = NULL;
6373 
6374     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6375 
6376     pthread_mutex_lock(&ctx_impl->mutex);
6377 
6378     if (remove) {
6379         res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
6380 
6381     } else {
6382         res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq);
6383     }
6384 
6385     pthread_mutex_unlock(&ctx_impl->mutex);
6386 
6387     switch (res) {
6388 
6389     case NXT_OK:
6390         req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
6391                                     req);
6392         if (remove) {
6393             req_impl->in_hash = 0;
6394         }
6395 
6396         return lhq.value;
6397 
6398     default:
6399         return NULL;
6400     }
6401 }
6402 
6403 
6404 void
6405 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
6406 {
6407     int              log_fd, n;
6408     char             msg[NXT_MAX_ERROR_STR], *p, *end;
6409     pid_t            pid;
6410     va_list          ap;
6411     nxt_unit_impl_t  *lib;
6412 
6413     if (nxt_fast_path(ctx != NULL)) {
6414         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6415 
6416         pid = lib->pid;
6417         log_fd = lib->log_fd;
6418 
6419     } else {
6420         pid = getpid();
6421         log_fd = STDERR_FILENO;
6422     }
6423 
6424     p = msg;
6425     end = p + sizeof(msg) - 1;
6426 
6427     p = nxt_unit_snprint_prefix(p, end, pid, level);
6428 
6429     va_start(ap, fmt);
6430     p += vsnprintf(p, end - p, fmt, ap);
6431     va_end(ap);
6432 
6433     if (nxt_slow_path(p > end)) {
6434         memcpy(end - 5, "[...]", 5);
6435         p = end;
6436     }
6437 
6438     *p++ = '\n';
6439 
6440     n = write(log_fd, msg, p - msg);
6441     if (nxt_slow_path(n < 0)) {
6442         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6443     }
6444 }
6445 
6446 
6447 void
6448 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
6449 {
6450     int                           log_fd, n;
6451     char                          msg[NXT_MAX_ERROR_STR], *p, *end;
6452     pid_t                         pid;
6453     va_list                       ap;
6454     nxt_unit_impl_t               *lib;
6455     nxt_unit_request_info_impl_t  *req_impl;
6456 
6457     if (nxt_fast_path(req != NULL)) {
6458         lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
6459 
6460         pid = lib->pid;
6461         log_fd = lib->log_fd;
6462 
6463     } else {
6464         pid = getpid();
6465         log_fd = STDERR_FILENO;
6466     }
6467 
6468     p = msg;
6469     end = p + sizeof(msg) - 1;
6470 
6471     p = nxt_unit_snprint_prefix(p, end, pid, level);
6472 
6473     if (nxt_fast_path(req != NULL)) {
6474         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6475 
6476         p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
6477     }
6478 
6479     va_start(ap, fmt);
6480     p += vsnprintf(p, end - p, fmt, ap);
6481     va_end(ap);
6482 
6483     if (nxt_slow_path(p > end)) {
6484         memcpy(end - 5, "[...]", 5);
6485         p = end;
6486     }
6487 
6488     *p++ = '\n';
6489 
6490     n = write(log_fd, msg, p - msg);
6491     if (nxt_slow_path(n < 0)) {
6492         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6493     }
6494 }
6495 
6496 
6497 static const char * nxt_unit_log_levels[] = {
6498     "alert",
6499     "error",
6500     "warn",
6501     "notice",
6502     "info",
6503     "debug",
6504 };
6505 
6506 
6507 static char *
6508 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
6509 {
6510     struct tm        tm;
6511     struct timespec  ts;
6512 
6513     (void) clock_gettime(CLOCK_REALTIME, &ts);
6514 
6515 #if (NXT_HAVE_LOCALTIME_R)
6516     (void) localtime_r(&ts.tv_sec, &tm);
6517 #else
6518     tm = *localtime(&ts.tv_sec);
6519 #endif
6520 
6521 #if (NXT_DEBUG)
6522     p += snprintf(p, end - p,
6523                   "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
6524                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6525                   tm.tm_hour, tm.tm_min, tm.tm_sec,
6526                   (int) ts.tv_nsec / 1000000);
6527 #else
6528     p += snprintf(p, end - p,
6529                   "%4d/%02d/%02d %02d:%02d:%02d ",
6530                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6531                   tm.tm_hour, tm.tm_min, tm.tm_sec);
6532 #endif
6533 
6534     p += snprintf(p, end - p,
6535                   "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
6536                   (int) pid,
6537                   (uint64_t) (uintptr_t) nxt_thread_get_tid());
6538 
6539     return p;
6540 }
6541 
6542 
6543 static void *
6544 nxt_unit_lvlhsh_alloc(void *data, size_t size)
6545 {
6546     int   err;
6547     void  *p;
6548 
6549     err = posix_memalign(&p, size, size);
6550 
6551     if (nxt_fast_path(err == 0)) {
6552         nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p",
6553                        (int) size, (int) size, p);
6554         return p;
6555     }
6556 
6557     nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)",
6558                    (int) size, (int) size, strerror(err), err);
6559     return NULL;
6560 }
6561 
6562 
6563 static void
6564 nxt_unit_lvlhsh_free(void *data, void *p)
6565 {
6566     nxt_unit_free(NULL, p);
6567 }
6568 
6569 
6570 void *
6571 nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size)
6572 {
6573     void  *p;
6574 
6575     p = malloc(size);
6576 
6577     if (nxt_fast_path(p != NULL)) {
6578 #if (NXT_DEBUG_ALLOC)
6579         nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p);
6580 #endif
6581 
6582     } else {
6583         nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)",
6584                        (int) size, strerror(errno), errno);
6585     }
6586 
6587     return p;
6588 }
6589 
6590 
6591 void
6592 nxt_unit_free(nxt_unit_ctx_t *ctx, void *p)
6593 {
6594 #if (NXT_DEBUG_ALLOC)
6595     nxt_unit_debug(ctx, "free(%p)", p);
6596 #endif
6597 
6598     free(p);
6599 }
6600 
6601 
6602 static int
6603 nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length)
6604 {
6605     u_char        c1, c2;
6606     nxt_int_t     n;
6607     const u_char  *s1, *s2;
6608 
6609     s1 = p1;
6610     s2 = p2;
6611 
6612     while (length-- != 0) {
6613         c1 = *s1++;
6614         c2 = *s2++;
6615 
6616         c1 = nxt_lowcase(c1);
6617         c2 = nxt_lowcase(c2);
6618 
6619         n = c1 - c2;
6620 
6621         if (n != 0) {
6622             return n;
6623         }
6624     }
6625 
6626     return 0;
6627 }
6628