xref: /unit/src/nxt_unit.c (revision 1714:8e02af45485f)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <stdlib.h>
7 
8 #include "nxt_main.h"
9 #include "nxt_port_memory_int.h"
10 #include "nxt_port_queue.h"
11 #include "nxt_app_queue.h"
12 
13 #include "nxt_unit.h"
14 #include "nxt_unit_request.h"
15 #include "nxt_unit_response.h"
16 #include "nxt_unit_websocket.h"
17 
18 #include "nxt_websocket.h"
19 
20 #if (NXT_HAVE_MEMFD_CREATE)
21 #include <linux/memfd.h>
22 #endif
23 
24 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
25 #define NXT_UNIT_LOCAL_BUF_SIZE  \
26     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
27 
28 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
29 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
30 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
31 typedef struct nxt_unit_process_s               nxt_unit_process_t;
32 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
33 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
34 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
35 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
36 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
37 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
38 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
39 
40 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
41 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
42     nxt_unit_ctx_impl_t *ctx_impl, void *data);
43 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
44 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
45 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
46 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
47 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
48     nxt_unit_mmap_buf_t *mmap_buf);
49 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
50     nxt_unit_mmap_buf_t *mmap_buf);
51 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
52 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
53     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
54     int *log_fd, uint32_t *stream, uint32_t *shm_limit);
55 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
56     int queue_fd);
57 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
58     nxt_unit_request_info_t **preq);
59 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
60     nxt_unit_recv_msg_t *recv_msg);
61 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
62 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
63     nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
64 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
65     nxt_unit_recv_msg_t *recv_msg);
66 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
67     nxt_unit_port_id_t *port_id);
68 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
69 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
70     nxt_unit_recv_msg_t *recv_msg);
71 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
72 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
73     nxt_unit_ctx_t *ctx);
74 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
75 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
76 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
77     nxt_unit_ctx_t *ctx);
78 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
79 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
80     nxt_unit_websocket_frame_impl_t *ws);
81 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
82 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
83 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
84     nxt_unit_mmap_buf_t *mmap_buf, int last);
85 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
86 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
87 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
88 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
89     nxt_unit_ctx_impl_t *ctx_impl);
90 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
91     nxt_unit_read_buf_t *rbuf);
92 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
93     nxt_unit_request_info_t *req, size_t size);
94 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
95     size_t size);
96 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
97     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
98 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
99 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
100 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
101 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
102     nxt_unit_port_t *port, int n);
103 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
104 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
105     int fd);
106 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
107     nxt_unit_port_t *port, uint32_t size,
108     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
109 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
110 
111 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
112     nxt_unit_ctx_impl_t *ctx_impl);
113 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
114 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
115 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
116 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
117 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
118     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
119     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
120 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
121     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
122 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
123 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
124     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
125 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
126 
127 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
128 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
129     pid_t pid, int remove);
130 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
131 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
132 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
133 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
134 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
135 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
136 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
137 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
138 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
139 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
140     nxt_unit_port_t *port);
141 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
142 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
143 
144 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
145     nxt_unit_port_t *port, int queue_fd);
146 
147 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
148 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
149 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
150     nxt_unit_port_t *port, void *queue);
151 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
152     nxt_queue_t *awaiting_req);
153 static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
154     nxt_unit_port_id_t *port_id);
155 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
156     nxt_unit_port_id_t *port_id);
157 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
158 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
159     nxt_unit_process_t *process);
160 static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
161 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
162 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
163     nxt_unit_port_t *port, const void *buf, size_t buf_size,
164     const void *oob, size_t oob_size);
165 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
166     const void *buf, size_t buf_size, const void *oob, size_t oob_size);
167 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
168     nxt_unit_read_buf_t *rbuf);
169 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
170     nxt_unit_read_buf_t *src);
171 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
172     nxt_unit_read_buf_t *rbuf);
173 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
174     nxt_unit_read_buf_t *rbuf);
175 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
176     nxt_unit_read_buf_t *rbuf);
177 static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
178     nxt_unit_read_buf_t *rbuf);
179 nxt_inline int nxt_unit_close(int fd);
180 static int nxt_unit_fd_blocking(int fd);
181 
182 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
183     nxt_unit_port_t *port);
184 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
185     nxt_unit_port_id_t *port_id, int remove);
186 
187 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
188     nxt_unit_request_info_t *req);
189 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
190     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
191 
192 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
193 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
194 static void nxt_unit_lvlhsh_free(void *data, void *p);
195 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
196 
197 
198 struct nxt_unit_mmap_buf_s {
199     nxt_unit_buf_t           buf;
200 
201     nxt_unit_mmap_buf_t      *next;
202     nxt_unit_mmap_buf_t      **prev;
203 
204     nxt_port_mmap_header_t   *hdr;
205     nxt_unit_request_info_t  *req;
206     nxt_unit_ctx_impl_t      *ctx_impl;
207     char                     *free_ptr;
208     char                     *plain_ptr;
209 };
210 
211 
212 struct nxt_unit_recv_msg_s {
213     uint32_t                 stream;
214     nxt_pid_t                pid;
215     nxt_port_id_t            reply_port;
216 
217     uint8_t                  last;      /* 1 bit */
218     uint8_t                  mmap;      /* 1 bit */
219 
220     void                     *start;
221     uint32_t                 size;
222 
223     int                      fd[2];
224 
225     nxt_unit_mmap_buf_t      *incoming_buf;
226 };
227 
228 
229 typedef enum {
230     NXT_UNIT_RS_START           = 0,
231     NXT_UNIT_RS_RESPONSE_INIT,
232     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
233     NXT_UNIT_RS_RESPONSE_SENT,
234     NXT_UNIT_RS_RELEASED,
235 } nxt_unit_req_state_t;
236 
237 
238 struct nxt_unit_request_info_impl_s {
239     nxt_unit_request_info_t  req;
240 
241     uint32_t                 stream;
242 
243     nxt_unit_mmap_buf_t      *outgoing_buf;
244     nxt_unit_mmap_buf_t      *incoming_buf;
245 
246     nxt_unit_req_state_t     state;
247     uint8_t                  websocket;
248     uint8_t                  in_hash;
249 
250     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
251     nxt_queue_link_t         link;
252     /*  for nxt_unit_port_impl_t.awaiting_req */
253     nxt_queue_link_t         port_wait_link;
254 
255     char                     extra_data[];
256 };
257 
258 
259 struct nxt_unit_websocket_frame_impl_s {
260     nxt_unit_websocket_frame_t  ws;
261 
262     nxt_unit_mmap_buf_t         *buf;
263 
264     nxt_queue_link_t            link;
265 
266     nxt_unit_ctx_impl_t         *ctx_impl;
267 };
268 
269 
270 struct nxt_unit_read_buf_s {
271     nxt_queue_link_t              link;
272     nxt_unit_ctx_impl_t           *ctx_impl;
273     ssize_t                       size;
274     char                          buf[16384];
275     char                          oob[256];
276 };
277 
278 
279 struct nxt_unit_ctx_impl_s {
280     nxt_unit_ctx_t                ctx;
281 
282     nxt_atomic_t                  use_count;
283     nxt_atomic_t                  wait_items;
284 
285     pthread_mutex_t               mutex;
286 
287     nxt_unit_port_t               *read_port;
288 
289     nxt_queue_link_t              link;
290 
291     nxt_unit_mmap_buf_t           *free_buf;
292 
293     /*  of nxt_unit_request_info_impl_t */
294     nxt_queue_t                   free_req;
295 
296     /*  of nxt_unit_websocket_frame_impl_t */
297     nxt_queue_t                   free_ws;
298 
299     /*  of nxt_unit_request_info_impl_t */
300     nxt_queue_t                   active_req;
301 
302     /*  of nxt_unit_request_info_impl_t */
303     nxt_lvlhsh_t                  requests;
304 
305     /*  of nxt_unit_request_info_impl_t */
306     nxt_queue_t                   ready_req;
307 
308     /*  of nxt_unit_read_buf_t */
309     nxt_queue_t                   pending_rbuf;
310 
311     /*  of nxt_unit_read_buf_t */
312     nxt_queue_t                   free_rbuf;
313 
314     int                           online;
315     int                           ready;
316 
317     nxt_unit_mmap_buf_t           ctx_buf[2];
318     nxt_unit_read_buf_t           ctx_read_buf;
319 
320     nxt_unit_request_info_impl_t  req;
321 };
322 
323 
324 struct nxt_unit_mmap_s {
325     nxt_port_mmap_header_t   *hdr;
326     pthread_t                src_thread;
327 
328     /*  of nxt_unit_read_buf_t */
329     nxt_queue_t              awaiting_rbuf;
330 };
331 
332 
333 struct nxt_unit_mmaps_s {
334     pthread_mutex_t          mutex;
335     uint32_t                 size;
336     uint32_t                 cap;
337     nxt_atomic_t             allocated_chunks;
338     nxt_unit_mmap_t          *elts;
339 };
340 
341 
342 struct nxt_unit_impl_s {
343     nxt_unit_t               unit;
344     nxt_unit_callbacks_t     callbacks;
345 
346     nxt_atomic_t             use_count;
347 
348     uint32_t                 request_data_size;
349     uint32_t                 shm_mmap_limit;
350 
351     pthread_mutex_t          mutex;
352 
353     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
354     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
355 
356     nxt_unit_port_t          *router_port;
357     nxt_unit_port_t          *shared_port;
358 
359     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
360 
361     nxt_unit_mmaps_t         incoming;
362     nxt_unit_mmaps_t         outgoing;
363 
364     pid_t                    pid;
365     int                      log_fd;
366 
367     nxt_unit_ctx_impl_t      main_ctx;
368 };
369 
370 
371 struct nxt_unit_port_impl_s {
372     nxt_unit_port_t          port;
373 
374     nxt_atomic_t             use_count;
375 
376     /*  for nxt_unit_process_t.ports */
377     nxt_queue_link_t         link;
378     nxt_unit_process_t       *process;
379 
380     /*  of nxt_unit_request_info_impl_t */
381     nxt_queue_t              awaiting_req;
382 
383     int                      ready;
384 
385     void                     *queue;
386 
387     int                      from_socket;
388     nxt_unit_read_buf_t      *socket_rbuf;
389 };
390 
391 
392 struct nxt_unit_process_s {
393     pid_t                    pid;
394 
395     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
396 
397     nxt_unit_impl_t          *lib;
398 
399     nxt_atomic_t             use_count;
400 
401     uint32_t                 next_port_id;
402 };
403 
404 
405 /* Explicitly using 32 bit types to avoid possible alignment. */
406 typedef struct {
407     int32_t   pid;
408     uint32_t  id;
409 } nxt_unit_port_hash_id_t;
410 
411 
412 nxt_unit_ctx_t *
413 nxt_unit_init(nxt_unit_init_t *init)
414 {
415     int              rc, queue_fd;
416     void             *mem;
417     uint32_t         ready_stream, shm_limit;
418     nxt_unit_ctx_t   *ctx;
419     nxt_unit_impl_t  *lib;
420     nxt_unit_port_t  ready_port, router_port, read_port;
421 
422     lib = nxt_unit_create(init);
423     if (nxt_slow_path(lib == NULL)) {
424         return NULL;
425     }
426 
427     queue_fd = -1;
428     mem = MAP_FAILED;
429 
430     if (init->ready_port.id.pid != 0
431         && init->ready_stream != 0
432         && init->read_port.id.pid != 0)
433     {
434         ready_port = init->ready_port;
435         ready_stream = init->ready_stream;
436         router_port = init->router_port;
437         read_port = init->read_port;
438         lib->log_fd = init->log_fd;
439 
440         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
441                               ready_port.id.id);
442         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
443                               router_port.id.id);
444         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
445                               read_port.id.id);
446 
447     } else {
448         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
449                                &lib->log_fd, &ready_stream, &shm_limit);
450         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
451             goto fail;
452         }
453 
454         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
455                                 / PORT_MMAP_DATA_SIZE;
456     }
457 
458     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
459         lib->shm_mmap_limit = 1;
460     }
461 
462     lib->pid = read_port.id.pid;
463 
464     ctx = &lib->main_ctx.ctx;
465 
466     rc = nxt_unit_fd_blocking(router_port.out_fd);
467     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
468         goto fail;
469     }
470 
471     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
472     if (nxt_slow_path(lib->router_port == NULL)) {
473         nxt_unit_alert(NULL, "failed to add router_port");
474 
475         goto fail;
476     }
477 
478     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
479     if (nxt_slow_path(queue_fd == -1)) {
480         goto fail;
481     }
482 
483     mem = mmap(NULL, sizeof(nxt_port_queue_t),
484                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
485     if (nxt_slow_path(mem == MAP_FAILED)) {
486         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
487                        strerror(errno), errno);
488 
489         goto fail;
490     }
491 
492     nxt_port_queue_init(mem);
493 
494     rc = nxt_unit_fd_blocking(read_port.in_fd);
495     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
496         goto fail;
497     }
498 
499     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
500     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
501         nxt_unit_alert(NULL, "failed to add read_port");
502 
503         goto fail;
504     }
505 
506     rc = nxt_unit_fd_blocking(ready_port.out_fd);
507     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
508         goto fail;
509     }
510 
511     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
512     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
513         nxt_unit_alert(NULL, "failed to send READY message");
514 
515         goto fail;
516     }
517 
518     nxt_unit_close(ready_port.out_fd);
519     nxt_unit_close(queue_fd);
520 
521     return ctx;
522 
523 fail:
524 
525     if (mem != MAP_FAILED) {
526         munmap(mem, sizeof(nxt_port_queue_t));
527     }
528 
529     if (queue_fd != -1) {
530         nxt_unit_close(queue_fd);
531     }
532 
533     nxt_unit_ctx_release(&lib->main_ctx.ctx);
534 
535     return NULL;
536 }
537 
538 
539 static nxt_unit_impl_t *
540 nxt_unit_create(nxt_unit_init_t *init)
541 {
542     int                   rc;
543     nxt_unit_impl_t       *lib;
544     nxt_unit_callbacks_t  *cb;
545 
546     lib = nxt_unit_malloc(NULL,
547                           sizeof(nxt_unit_impl_t) + init->request_data_size);
548     if (nxt_slow_path(lib == NULL)) {
549         nxt_unit_alert(NULL, "failed to allocate unit struct");
550 
551         return NULL;
552     }
553 
554     rc = pthread_mutex_init(&lib->mutex, NULL);
555     if (nxt_slow_path(rc != 0)) {
556         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
557 
558         goto fail;
559     }
560 
561     lib->unit.data = init->data;
562     lib->callbacks = init->callbacks;
563 
564     lib->request_data_size = init->request_data_size;
565     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
566                             / PORT_MMAP_DATA_SIZE;
567 
568     lib->processes.slot = NULL;
569     lib->ports.slot = NULL;
570 
571     lib->log_fd = STDERR_FILENO;
572 
573     nxt_queue_init(&lib->contexts);
574 
575     lib->use_count = 0;
576     lib->router_port = NULL;
577     lib->shared_port = NULL;
578 
579     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
580     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
581         pthread_mutex_destroy(&lib->mutex);
582         goto fail;
583     }
584 
585     cb = &lib->callbacks;
586 
587     if (cb->request_handler == NULL) {
588         nxt_unit_alert(NULL, "request_handler is NULL");
589 
590         pthread_mutex_destroy(&lib->mutex);
591         goto fail;
592     }
593 
594     nxt_unit_mmaps_init(&lib->incoming);
595     nxt_unit_mmaps_init(&lib->outgoing);
596 
597     return lib;
598 
599 fail:
600 
601     nxt_unit_free(NULL, lib);
602 
603     return NULL;
604 }
605 
606 
607 static int
608 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
609     void *data)
610 {
611     int  rc;
612 
613     ctx_impl->ctx.data = data;
614     ctx_impl->ctx.unit = &lib->unit;
615 
616     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
617     if (nxt_slow_path(rc != 0)) {
618         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
619 
620         return NXT_UNIT_ERROR;
621     }
622 
623     nxt_unit_lib_use(lib);
624 
625     pthread_mutex_lock(&lib->mutex);
626 
627     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
628 
629     pthread_mutex_unlock(&lib->mutex);
630 
631     ctx_impl->use_count = 1;
632     ctx_impl->wait_items = 0;
633     ctx_impl->online = 1;
634     ctx_impl->ready = 0;
635 
636     nxt_queue_init(&ctx_impl->free_req);
637     nxt_queue_init(&ctx_impl->free_ws);
638     nxt_queue_init(&ctx_impl->active_req);
639     nxt_queue_init(&ctx_impl->ready_req);
640     nxt_queue_init(&ctx_impl->pending_rbuf);
641     nxt_queue_init(&ctx_impl->free_rbuf);
642 
643     ctx_impl->free_buf = NULL;
644     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
645     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
646 
647     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
648     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
649 
650     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
651 
652     ctx_impl->req.req.ctx = &ctx_impl->ctx;
653     ctx_impl->req.req.unit = &lib->unit;
654 
655     ctx_impl->read_port = NULL;
656     ctx_impl->requests.slot = 0;
657 
658     return NXT_UNIT_OK;
659 }
660 
661 
662 nxt_inline void
663 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
664 {
665     nxt_unit_ctx_impl_t  *ctx_impl;
666 
667     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
668 
669     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
670 }
671 
672 
673 nxt_inline void
674 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
675 {
676     long                 c;
677     nxt_unit_ctx_impl_t  *ctx_impl;
678 
679     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
680 
681     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
682 
683     if (c == 1) {
684         nxt_unit_ctx_free(ctx_impl);
685     }
686 }
687 
688 
689 nxt_inline void
690 nxt_unit_lib_use(nxt_unit_impl_t *lib)
691 {
692     nxt_atomic_fetch_add(&lib->use_count, 1);
693 }
694 
695 
696 nxt_inline void
697 nxt_unit_lib_release(nxt_unit_impl_t *lib)
698 {
699     long                c;
700     nxt_unit_process_t  *process;
701 
702     c = nxt_atomic_fetch_add(&lib->use_count, -1);
703 
704     if (c == 1) {
705         for ( ;; ) {
706             pthread_mutex_lock(&lib->mutex);
707 
708             process = nxt_unit_process_pop_first(lib);
709             if (process == NULL) {
710                 pthread_mutex_unlock(&lib->mutex);
711 
712                 break;
713             }
714 
715             nxt_unit_remove_process(lib, process);
716         }
717 
718         pthread_mutex_destroy(&lib->mutex);
719 
720         if (nxt_fast_path(lib->router_port != NULL)) {
721             nxt_unit_port_release(lib->router_port);
722         }
723 
724         if (nxt_fast_path(lib->shared_port != NULL)) {
725             nxt_unit_port_release(lib->shared_port);
726         }
727 
728         nxt_unit_mmaps_destroy(&lib->incoming);
729         nxt_unit_mmaps_destroy(&lib->outgoing);
730 
731         nxt_unit_free(NULL, lib);
732     }
733 }
734 
735 
736 nxt_inline void
737 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
738     nxt_unit_mmap_buf_t *mmap_buf)
739 {
740     mmap_buf->next = *head;
741 
742     if (mmap_buf->next != NULL) {
743         mmap_buf->next->prev = &mmap_buf->next;
744     }
745 
746     *head = mmap_buf;
747     mmap_buf->prev = head;
748 }
749 
750 
751 nxt_inline void
752 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
753     nxt_unit_mmap_buf_t *mmap_buf)
754 {
755     while (*prev != NULL) {
756         prev = &(*prev)->next;
757     }
758 
759     nxt_unit_mmap_buf_insert(prev, mmap_buf);
760 }
761 
762 
763 nxt_inline void
764 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
765 {
766     nxt_unit_mmap_buf_t  **prev;
767 
768     prev = mmap_buf->prev;
769 
770     if (mmap_buf->next != NULL) {
771         mmap_buf->next->prev = prev;
772     }
773 
774     if (prev != NULL) {
775         *prev = mmap_buf->next;
776     }
777 }
778 
779 
780 static int
781 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
782     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
783     uint32_t *shm_limit)
784 {
785     int       rc;
786     int       ready_fd, router_fd, read_in_fd, read_out_fd;
787     char      *unit_init, *version_end;
788     long      version_length;
789     int64_t   ready_pid, router_pid, read_pid;
790     uint32_t  ready_stream, router_id, ready_id, read_id;
791 
792     unit_init = getenv(NXT_UNIT_INIT_ENV);
793     if (nxt_slow_path(unit_init == NULL)) {
794         nxt_unit_alert(NULL, "%s is not in the current environment",
795                        NXT_UNIT_INIT_ENV);
796 
797         return NXT_UNIT_ERROR;
798     }
799 
800     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
801 
802     version_length = nxt_length(NXT_VERSION);
803 
804     version_end = strchr(unit_init, ';');
805     if (version_end == NULL
806         || version_end - unit_init != version_length
807         || memcmp(unit_init, NXT_VERSION, version_length) != 0)
808     {
809         nxt_unit_alert(NULL, "version check error");
810 
811         return NXT_UNIT_ERROR;
812     }
813 
814     rc = sscanf(version_end + 1,
815                 "%"PRIu32";"
816                 "%"PRId64",%"PRIu32",%d;"
817                 "%"PRId64",%"PRIu32",%d;"
818                 "%"PRId64",%"PRIu32",%d,%d;"
819                 "%d,%"PRIu32,
820                 &ready_stream,
821                 &ready_pid, &ready_id, &ready_fd,
822                 &router_pid, &router_id, &router_fd,
823                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
824                 log_fd, shm_limit);
825 
826     if (nxt_slow_path(rc != 13)) {
827         nxt_unit_alert(NULL, "failed to scan variables: %d", rc);
828 
829         return NXT_UNIT_ERROR;
830     }
831 
832     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
833 
834     ready_port->in_fd = -1;
835     ready_port->out_fd = ready_fd;
836     ready_port->data = NULL;
837 
838     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
839 
840     router_port->in_fd = -1;
841     router_port->out_fd = router_fd;
842     router_port->data = NULL;
843 
844     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
845 
846     read_port->in_fd = read_in_fd;
847     read_port->out_fd = read_out_fd;
848     read_port->data = NULL;
849 
850     *stream = ready_stream;
851 
852     return NXT_UNIT_OK;
853 }
854 
855 
856 static int
857 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
858 {
859     ssize_t          res;
860     nxt_port_msg_t   msg;
861     nxt_unit_impl_t  *lib;
862 
863     union {
864         struct cmsghdr  cm;
865         char            space[CMSG_SPACE(sizeof(int))];
866     } cmsg;
867 
868     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
869 
870     msg.stream = stream;
871     msg.pid = lib->pid;
872     msg.reply_port = 0;
873     msg.type = _NXT_PORT_MSG_PROCESS_READY;
874     msg.last = 1;
875     msg.mmap = 0;
876     msg.nf = 0;
877     msg.mf = 0;
878     msg.tracking = 0;
879 
880     memset(&cmsg, 0, sizeof(cmsg));
881 
882     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
883     cmsg.cm.cmsg_level = SOL_SOCKET;
884     cmsg.cm.cmsg_type = SCM_RIGHTS;
885 
886     /*
887      * memcpy() is used instead of simple
888      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
889      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
890      *   dereferencing type-punned pointer will break strict-aliasing rules
891      *
892      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
893      * in the same simple assignment as in the code above.
894      */
895     memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int));
896 
897     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg),
898                            &cmsg, sizeof(cmsg));
899     if (res != sizeof(msg)) {
900         return NXT_UNIT_ERROR;
901     }
902 
903     return NXT_UNIT_OK;
904 }
905 
906 
907 static int
908 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
909     nxt_unit_request_info_t **preq)
910 {
911     int                  rc;
912     pid_t                pid;
913     struct cmsghdr       *cm;
914     nxt_port_msg_t       *port_msg;
915     nxt_unit_impl_t      *lib;
916     nxt_unit_recv_msg_t  recv_msg;
917 
918     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
919 
920     recv_msg.fd[0] = -1;
921     recv_msg.fd[1] = -1;
922     port_msg = (nxt_port_msg_t *) rbuf->buf;
923     cm = (struct cmsghdr *) rbuf->oob;
924 
925     if (cm->cmsg_level == SOL_SOCKET
926         && cm->cmsg_type == SCM_RIGHTS)
927     {
928         if (cm->cmsg_len == CMSG_LEN(sizeof(int))) {
929             memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int));
930         }
931 
932         if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) {
933             memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2);
934         }
935     }
936 
937     recv_msg.incoming_buf = NULL;
938 
939     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
940         if (nxt_slow_path(rbuf->size == 0)) {
941             nxt_unit_debug(ctx, "read port closed");
942 
943             nxt_unit_quit(ctx);
944             rc = NXT_UNIT_OK;
945             goto done;
946         }
947 
948         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
949 
950         rc = NXT_UNIT_ERROR;
951         goto done;
952     }
953 
954     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
955                    port_msg->stream, (int) port_msg->type,
956                    recv_msg.fd[0], recv_msg.fd[1]);
957 
958     recv_msg.stream = port_msg->stream;
959     recv_msg.pid = port_msg->pid;
960     recv_msg.reply_port = port_msg->reply_port;
961     recv_msg.last = port_msg->last;
962     recv_msg.mmap = port_msg->mmap;
963 
964     recv_msg.start = port_msg + 1;
965     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
966 
967     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
968         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
969                        port_msg->stream, (int) port_msg->type);
970         rc = NXT_UNIT_ERROR;
971         goto done;
972     }
973 
974     /* Fragmentation is unsupported. */
975     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
976         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
977                        port_msg->stream, (int) port_msg->type);
978         rc = NXT_UNIT_ERROR;
979         goto done;
980     }
981 
982     if (port_msg->mmap) {
983         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
984 
985         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
986             if (rc == NXT_UNIT_AGAIN) {
987                 recv_msg.fd[0] = -1;
988                 recv_msg.fd[1] = -1;
989             }
990 
991             goto done;
992         }
993     }
994 
995     switch (port_msg->type) {
996 
997     case _NXT_PORT_MSG_RPC_READY:
998         rc = NXT_UNIT_OK;
999         break;
1000 
1001     case _NXT_PORT_MSG_QUIT:
1002         nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
1003 
1004         nxt_unit_quit(ctx);
1005         rc = NXT_UNIT_OK;
1006         break;
1007 
1008     case _NXT_PORT_MSG_NEW_PORT:
1009         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1010         break;
1011 
1012     case _NXT_PORT_MSG_PORT_ACK:
1013         rc = nxt_unit_ctx_ready(ctx);
1014         break;
1015 
1016     case _NXT_PORT_MSG_CHANGE_FILE:
1017         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1018                        port_msg->stream, recv_msg.fd[0]);
1019 
1020         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1021             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1022                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1023                            strerror(errno), errno);
1024 
1025             rc = NXT_UNIT_ERROR;
1026             goto done;
1027         }
1028 
1029         rc = NXT_UNIT_OK;
1030         break;
1031 
1032     case _NXT_PORT_MSG_MMAP:
1033         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1034             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1035                            port_msg->stream, recv_msg.fd[0]);
1036 
1037             rc = NXT_UNIT_ERROR;
1038             goto done;
1039         }
1040 
1041         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1042         break;
1043 
1044     case _NXT_PORT_MSG_REQ_HEADERS:
1045         rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1046         break;
1047 
1048     case _NXT_PORT_MSG_REQ_BODY:
1049         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1050         break;
1051 
1052     case _NXT_PORT_MSG_WEBSOCKET:
1053         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1054         break;
1055 
1056     case _NXT_PORT_MSG_REMOVE_PID:
1057         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1058             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1059                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1060                            (int) sizeof(pid));
1061 
1062             rc = NXT_UNIT_ERROR;
1063             goto done;
1064         }
1065 
1066         memcpy(&pid, recv_msg.start, sizeof(pid));
1067 
1068         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1069                        port_msg->stream, (int) pid);
1070 
1071         nxt_unit_remove_pid(lib, pid);
1072 
1073         rc = NXT_UNIT_OK;
1074         break;
1075 
1076     case _NXT_PORT_MSG_SHM_ACK:
1077         rc = nxt_unit_process_shm_ack(ctx);
1078         break;
1079 
1080     default:
1081         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1082                        port_msg->stream, (int) port_msg->type);
1083 
1084         rc = NXT_UNIT_ERROR;
1085         goto done;
1086     }
1087 
1088 done:
1089 
1090     if (recv_msg.fd[0] != -1) {
1091         nxt_unit_close(recv_msg.fd[0]);
1092     }
1093 
1094     if (recv_msg.fd[1] != -1) {
1095         nxt_unit_close(recv_msg.fd[1]);
1096     }
1097 
1098     while (recv_msg.incoming_buf != NULL) {
1099         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1100     }
1101 
1102     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1103 #if (NXT_DEBUG)
1104         memset(rbuf->buf, 0xAC, rbuf->size);
1105 #endif
1106         nxt_unit_read_buf_release(ctx, rbuf);
1107     }
1108 
1109     return rc;
1110 }
1111 
1112 
1113 static int
1114 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1115 {
1116     void                     *mem;
1117     nxt_unit_impl_t          *lib;
1118     nxt_unit_port_t          new_port, *port;
1119     nxt_port_msg_new_port_t  *new_port_msg;
1120 
1121     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1122         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1123                       "invalid message size (%d)",
1124                       recv_msg->stream, (int) recv_msg->size);
1125 
1126         return NXT_UNIT_ERROR;
1127     }
1128 
1129     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1130         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1131                        recv_msg->stream, recv_msg->fd[0]);
1132 
1133         return NXT_UNIT_ERROR;
1134     }
1135 
1136     new_port_msg = recv_msg->start;
1137 
1138     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1139                    recv_msg->stream, (int) new_port_msg->pid,
1140                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1141 
1142     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1143 
1144     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1145         nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1146 
1147         new_port.in_fd = recv_msg->fd[0];
1148         new_port.out_fd = -1;
1149 
1150         mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
1151                    MAP_SHARED, recv_msg->fd[1], 0);
1152 
1153     } else {
1154         if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
1155                           != NXT_UNIT_OK))
1156         {
1157             return NXT_UNIT_ERROR;
1158         }
1159 
1160         nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1161                               new_port_msg->id);
1162 
1163         new_port.in_fd = -1;
1164         new_port.out_fd = recv_msg->fd[0];
1165 
1166         mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1167                    MAP_SHARED, recv_msg->fd[1], 0);
1168     }
1169 
1170     if (nxt_slow_path(mem == MAP_FAILED)) {
1171         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1172                        strerror(errno), errno);
1173 
1174         return NXT_UNIT_ERROR;
1175     }
1176 
1177     new_port.data = NULL;
1178 
1179     recv_msg->fd[0] = -1;
1180 
1181     port = nxt_unit_add_port(ctx, &new_port, mem);
1182     if (nxt_slow_path(port == NULL)) {
1183         return NXT_UNIT_ERROR;
1184     }
1185 
1186     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1187         lib->shared_port = port;
1188 
1189         return nxt_unit_ctx_ready(ctx);
1190     }
1191 
1192     nxt_unit_port_release(port);
1193 
1194     return NXT_UNIT_OK;
1195 }
1196 
1197 
1198 static int
1199 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1200 {
1201     nxt_unit_impl_t      *lib;
1202     nxt_unit_ctx_impl_t  *ctx_impl;
1203 
1204     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1205     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1206 
1207     ctx_impl->ready = 1;
1208 
1209     if (lib->callbacks.ready_handler) {
1210         return lib->callbacks.ready_handler(ctx);
1211     }
1212 
1213     return NXT_UNIT_OK;
1214 }
1215 
1216 
1217 static int
1218 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1219     nxt_unit_request_info_t **preq)
1220 {
1221     int                           res;
1222     nxt_unit_impl_t               *lib;
1223     nxt_unit_port_id_t            port_id;
1224     nxt_unit_request_t            *r;
1225     nxt_unit_mmap_buf_t           *b;
1226     nxt_unit_request_info_t       *req;
1227     nxt_unit_request_info_impl_t  *req_impl;
1228 
1229     if (nxt_slow_path(recv_msg->mmap == 0)) {
1230         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1231                       recv_msg->stream);
1232 
1233         return NXT_UNIT_ERROR;
1234     }
1235 
1236     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1237         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1238                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1239                       (int) sizeof(nxt_unit_request_t));
1240 
1241         return NXT_UNIT_ERROR;
1242     }
1243 
1244     req_impl = nxt_unit_request_info_get(ctx);
1245     if (nxt_slow_path(req_impl == NULL)) {
1246         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1247                       recv_msg->stream);
1248 
1249         return NXT_UNIT_ERROR;
1250     }
1251 
1252     req = &req_impl->req;
1253 
1254     req->request = recv_msg->start;
1255 
1256     b = recv_msg->incoming_buf;
1257 
1258     req->request_buf = &b->buf;
1259     req->response = NULL;
1260     req->response_buf = NULL;
1261 
1262     r = req->request;
1263 
1264     req->content_length = r->content_length;
1265 
1266     req->content_buf = req->request_buf;
1267     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1268 
1269     req_impl->stream = recv_msg->stream;
1270 
1271     req_impl->outgoing_buf = NULL;
1272 
1273     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1274         b->req = req;
1275     }
1276 
1277     /* "Move" incoming buffer list to req_impl. */
1278     req_impl->incoming_buf = recv_msg->incoming_buf;
1279     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1280     recv_msg->incoming_buf = NULL;
1281 
1282     req->content_fd = recv_msg->fd[0];
1283     recv_msg->fd[0] = -1;
1284 
1285     req->response_max_fields = 0;
1286     req_impl->state = NXT_UNIT_RS_START;
1287     req_impl->websocket = 0;
1288     req_impl->in_hash = 0;
1289 
1290     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1291                    (int) r->method_length,
1292                    (char *) nxt_unit_sptr_get(&r->method),
1293                    (int) r->target_length,
1294                    (char *) nxt_unit_sptr_get(&r->target),
1295                    (int) r->content_length);
1296 
1297     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1298 
1299     res = nxt_unit_request_check_response_port(req, &port_id);
1300     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1301         return NXT_UNIT_ERROR;
1302     }
1303 
1304     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1305         res = nxt_unit_send_req_headers_ack(req);
1306         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1307             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1308 
1309             return NXT_UNIT_ERROR;
1310         }
1311 
1312         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1313 
1314         if (req->content_length
1315             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1316         {
1317             res = nxt_unit_request_hash_add(ctx, req);
1318             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1319                 nxt_unit_req_warn(req, "failed to add request to hash");
1320 
1321                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1322 
1323                 return NXT_UNIT_ERROR;
1324             }
1325 
1326             /*
1327              * If application have separate data handler, we may start
1328              * request processing and process data when it is arrived.
1329              */
1330             if (lib->callbacks.data_handler == NULL) {
1331                 return NXT_UNIT_OK;
1332             }
1333         }
1334 
1335         if (preq == NULL) {
1336             lib->callbacks.request_handler(req);
1337 
1338         } else {
1339             *preq = req;
1340         }
1341     }
1342 
1343     return NXT_UNIT_OK;
1344 }
1345 
1346 
1347 static int
1348 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1349 {
1350     uint64_t                 l;
1351     nxt_unit_impl_t          *lib;
1352     nxt_unit_mmap_buf_t      *b;
1353     nxt_unit_request_info_t  *req;
1354 
1355     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1356     if (req == NULL) {
1357         return NXT_UNIT_OK;
1358     }
1359 
1360     l = req->content_buf->end - req->content_buf->free;
1361 
1362     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1363         b->req = req;
1364         l += b->buf.end - b->buf.free;
1365     }
1366 
1367     if (recv_msg->incoming_buf != NULL) {
1368         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1369 
1370         while (b->next != NULL) {
1371             b = b->next;
1372         }
1373 
1374         /* "Move" incoming buffer list to req_impl. */
1375         b->next = recv_msg->incoming_buf;
1376         b->next->prev = &b->next;
1377 
1378         recv_msg->incoming_buf = NULL;
1379     }
1380 
1381     req->content_fd = recv_msg->fd[0];
1382     recv_msg->fd[0] = -1;
1383 
1384     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1385 
1386     if (lib->callbacks.data_handler != NULL) {
1387         lib->callbacks.data_handler(req);
1388 
1389         return NXT_UNIT_OK;
1390     }
1391 
1392     if (req->content_fd != -1 || l == req->content_length) {
1393         lib->callbacks.request_handler(req);
1394     }
1395 
1396     return NXT_UNIT_OK;
1397 }
1398 
1399 
1400 static int
1401 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1402     nxt_unit_port_id_t *port_id)
1403 {
1404     int                           res;
1405     nxt_unit_ctx_t                *ctx;
1406     nxt_unit_impl_t               *lib;
1407     nxt_unit_port_t               *port;
1408     nxt_unit_process_t            *process;
1409     nxt_unit_ctx_impl_t           *ctx_impl;
1410     nxt_unit_port_impl_t          *port_impl;
1411     nxt_unit_request_info_impl_t  *req_impl;
1412 
1413     ctx = req->ctx;
1414     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1415     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1416 
1417     pthread_mutex_lock(&lib->mutex);
1418 
1419     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1420     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1421 
1422     if (nxt_fast_path(port != NULL)) {
1423         req->response_port = port;
1424 
1425         if (nxt_fast_path(port_impl->ready)) {
1426             pthread_mutex_unlock(&lib->mutex);
1427 
1428             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1429                            (int) port->id.pid, (int) port->id.id);
1430 
1431             return NXT_UNIT_OK;
1432         }
1433 
1434         nxt_unit_debug(ctx, "check_response_port: "
1435                        "port{%d,%d} already requested",
1436                        (int) port->id.pid, (int) port->id.id);
1437 
1438         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1439 
1440         nxt_queue_insert_tail(&port_impl->awaiting_req,
1441                               &req_impl->port_wait_link);
1442 
1443         pthread_mutex_unlock(&lib->mutex);
1444 
1445         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1446 
1447         return NXT_UNIT_AGAIN;
1448     }
1449 
1450     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1451     if (nxt_slow_path(port_impl == NULL)) {
1452         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1453                        (int) sizeof(nxt_unit_port_impl_t));
1454 
1455         pthread_mutex_unlock(&lib->mutex);
1456 
1457         return NXT_UNIT_ERROR;
1458     }
1459 
1460     port = &port_impl->port;
1461 
1462     port->id = *port_id;
1463     port->in_fd = -1;
1464     port->out_fd = -1;
1465     port->data = NULL;
1466 
1467     res = nxt_unit_port_hash_add(&lib->ports, port);
1468     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1469         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1470                        port->id.pid, port->id.id);
1471 
1472         pthread_mutex_unlock(&lib->mutex);
1473 
1474         nxt_unit_free(ctx, port);
1475 
1476         return NXT_UNIT_ERROR;
1477     }
1478 
1479     process = nxt_unit_process_find(lib, port_id->pid, 0);
1480     if (nxt_slow_path(process == NULL)) {
1481         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1482                        port->id.pid);
1483 
1484         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1485 
1486         pthread_mutex_unlock(&lib->mutex);
1487 
1488         nxt_unit_free(ctx, port);
1489 
1490         return NXT_UNIT_ERROR;
1491     }
1492 
1493     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1494 
1495     port_impl->process = process;
1496     port_impl->queue = NULL;
1497     port_impl->from_socket = 0;
1498     port_impl->socket_rbuf = NULL;
1499 
1500     nxt_queue_init(&port_impl->awaiting_req);
1501 
1502     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1503 
1504     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1505 
1506     port_impl->use_count = 2;
1507     port_impl->ready = 0;
1508 
1509     req->response_port = port;
1510 
1511     pthread_mutex_unlock(&lib->mutex);
1512 
1513     res = nxt_unit_get_port(ctx, port_id);
1514     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1515         return NXT_UNIT_ERROR;
1516     }
1517 
1518     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1519 
1520     return NXT_UNIT_AGAIN;
1521 }
1522 
1523 
1524 static int
1525 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1526 {
1527     ssize_t                       res;
1528     nxt_port_msg_t                msg;
1529     nxt_unit_impl_t               *lib;
1530     nxt_unit_ctx_impl_t           *ctx_impl;
1531     nxt_unit_request_info_impl_t  *req_impl;
1532 
1533     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1534     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1535     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1536 
1537     memset(&msg, 0, sizeof(nxt_port_msg_t));
1538 
1539     msg.stream = req_impl->stream;
1540     msg.pid = lib->pid;
1541     msg.reply_port = ctx_impl->read_port->id.id;
1542     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1543 
1544     res = nxt_unit_port_send(req->ctx, req->response_port,
1545                              &msg, sizeof(msg), NULL, 0);
1546     if (nxt_slow_path(res != sizeof(msg))) {
1547         return NXT_UNIT_ERROR;
1548     }
1549 
1550     return NXT_UNIT_OK;
1551 }
1552 
1553 
1554 static int
1555 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1556 {
1557     size_t                           hsize;
1558     nxt_unit_impl_t                  *lib;
1559     nxt_unit_mmap_buf_t              *b;
1560     nxt_unit_callbacks_t             *cb;
1561     nxt_unit_request_info_t          *req;
1562     nxt_unit_request_info_impl_t     *req_impl;
1563     nxt_unit_websocket_frame_impl_t  *ws_impl;
1564 
1565     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1566     if (nxt_slow_path(req == NULL)) {
1567         return NXT_UNIT_OK;
1568     }
1569 
1570     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1571 
1572     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1573     cb = &lib->callbacks;
1574 
1575     if (cb->websocket_handler && recv_msg->size >= 2) {
1576         ws_impl = nxt_unit_websocket_frame_get(ctx);
1577         if (nxt_slow_path(ws_impl == NULL)) {
1578             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1579                           req_impl->stream);
1580 
1581             return NXT_UNIT_ERROR;
1582         }
1583 
1584         ws_impl->ws.req = req;
1585 
1586         ws_impl->buf = NULL;
1587 
1588         if (recv_msg->mmap) {
1589             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1590                 b->req = req;
1591             }
1592 
1593             /* "Move" incoming buffer list to ws_impl. */
1594             ws_impl->buf = recv_msg->incoming_buf;
1595             ws_impl->buf->prev = &ws_impl->buf;
1596             recv_msg->incoming_buf = NULL;
1597 
1598             b = ws_impl->buf;
1599 
1600         } else {
1601             b = nxt_unit_mmap_buf_get(ctx);
1602             if (nxt_slow_path(b == NULL)) {
1603                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1604                                req_impl->stream);
1605 
1606                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1607 
1608                 return NXT_UNIT_ERROR;
1609             }
1610 
1611             b->req = req;
1612             b->buf.start = recv_msg->start;
1613             b->buf.free = b->buf.start;
1614             b->buf.end = b->buf.start + recv_msg->size;
1615 
1616             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1617         }
1618 
1619         ws_impl->ws.header = (void *) b->buf.start;
1620         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1621             ws_impl->ws.header);
1622 
1623         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1624 
1625         if (ws_impl->ws.header->mask) {
1626             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1627 
1628         } else {
1629             ws_impl->ws.mask = NULL;
1630         }
1631 
1632         b->buf.free += hsize;
1633 
1634         ws_impl->ws.content_buf = &b->buf;
1635         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1636 
1637         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1638                            "payload_len=%"PRIu64,
1639                             ws_impl->ws.header->opcode,
1640                             ws_impl->ws.payload_len);
1641 
1642         cb->websocket_handler(&ws_impl->ws);
1643     }
1644 
1645     if (recv_msg->last) {
1646         req_impl->websocket = 0;
1647 
1648         if (cb->close_handler) {
1649             nxt_unit_req_debug(req, "close_handler");
1650 
1651             cb->close_handler(req);
1652 
1653         } else {
1654             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1655         }
1656     }
1657 
1658     return NXT_UNIT_OK;
1659 }
1660 
1661 
1662 static int
1663 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1664 {
1665     nxt_unit_impl_t       *lib;
1666     nxt_unit_callbacks_t  *cb;
1667 
1668     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1669     cb = &lib->callbacks;
1670 
1671     if (cb->shm_ack_handler != NULL) {
1672         cb->shm_ack_handler(ctx);
1673     }
1674 
1675     return NXT_UNIT_OK;
1676 }
1677 
1678 
1679 static nxt_unit_request_info_impl_t *
1680 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1681 {
1682     nxt_unit_impl_t               *lib;
1683     nxt_queue_link_t              *lnk;
1684     nxt_unit_ctx_impl_t           *ctx_impl;
1685     nxt_unit_request_info_impl_t  *req_impl;
1686 
1687     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1688 
1689     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1690 
1691     pthread_mutex_lock(&ctx_impl->mutex);
1692 
1693     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1694         pthread_mutex_unlock(&ctx_impl->mutex);
1695 
1696         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1697                                         + lib->request_data_size);
1698         if (nxt_slow_path(req_impl == NULL)) {
1699             return NULL;
1700         }
1701 
1702         req_impl->req.unit = ctx->unit;
1703         req_impl->req.ctx = ctx;
1704 
1705         pthread_mutex_lock(&ctx_impl->mutex);
1706 
1707     } else {
1708         lnk = nxt_queue_first(&ctx_impl->free_req);
1709         nxt_queue_remove(lnk);
1710 
1711         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1712     }
1713 
1714     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1715 
1716     pthread_mutex_unlock(&ctx_impl->mutex);
1717 
1718     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1719 
1720     return req_impl;
1721 }
1722 
1723 
1724 static void
1725 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1726 {
1727     nxt_unit_ctx_impl_t           *ctx_impl;
1728     nxt_unit_request_info_impl_t  *req_impl;
1729 
1730     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1731     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1732 
1733     req->response = NULL;
1734     req->response_buf = NULL;
1735 
1736     if (req_impl->in_hash) {
1737         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1738     }
1739 
1740     req_impl->websocket = 0;
1741 
1742     while (req_impl->outgoing_buf != NULL) {
1743         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1744     }
1745 
1746     while (req_impl->incoming_buf != NULL) {
1747         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1748     }
1749 
1750     if (req->content_fd != -1) {
1751         nxt_unit_close(req->content_fd);
1752 
1753         req->content_fd = -1;
1754     }
1755 
1756     if (req->response_port != NULL) {
1757         nxt_unit_port_release(req->response_port);
1758 
1759         req->response_port = NULL;
1760     }
1761 
1762     req_impl->state = NXT_UNIT_RS_RELEASED;
1763 
1764     pthread_mutex_lock(&ctx_impl->mutex);
1765 
1766     nxt_queue_remove(&req_impl->link);
1767 
1768     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1769 
1770     pthread_mutex_unlock(&ctx_impl->mutex);
1771 }
1772 
1773 
1774 static void
1775 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1776 {
1777     nxt_unit_ctx_impl_t  *ctx_impl;
1778 
1779     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1780 
1781     nxt_queue_remove(&req_impl->link);
1782 
1783     if (req_impl != &ctx_impl->req) {
1784         nxt_unit_free(&ctx_impl->ctx, req_impl);
1785     }
1786 }
1787 
1788 
1789 static nxt_unit_websocket_frame_impl_t *
1790 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1791 {
1792     nxt_queue_link_t                 *lnk;
1793     nxt_unit_ctx_impl_t              *ctx_impl;
1794     nxt_unit_websocket_frame_impl_t  *ws_impl;
1795 
1796     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1797 
1798     pthread_mutex_lock(&ctx_impl->mutex);
1799 
1800     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1801         pthread_mutex_unlock(&ctx_impl->mutex);
1802 
1803         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1804         if (nxt_slow_path(ws_impl == NULL)) {
1805             return NULL;
1806         }
1807 
1808     } else {
1809         lnk = nxt_queue_first(&ctx_impl->free_ws);
1810         nxt_queue_remove(lnk);
1811 
1812         pthread_mutex_unlock(&ctx_impl->mutex);
1813 
1814         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1815     }
1816 
1817     ws_impl->ctx_impl = ctx_impl;
1818 
1819     return ws_impl;
1820 }
1821 
1822 
1823 static void
1824 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1825 {
1826     nxt_unit_websocket_frame_impl_t  *ws_impl;
1827 
1828     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1829 
1830     while (ws_impl->buf != NULL) {
1831         nxt_unit_mmap_buf_free(ws_impl->buf);
1832     }
1833 
1834     ws->req = NULL;
1835 
1836     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1837 
1838     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1839 
1840     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1841 }
1842 
1843 
1844 static void
1845 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1846     nxt_unit_websocket_frame_impl_t *ws_impl)
1847 {
1848     nxt_queue_remove(&ws_impl->link);
1849 
1850     nxt_unit_free(ctx, ws_impl);
1851 }
1852 
1853 
1854 uint16_t
1855 nxt_unit_field_hash(const char *name, size_t name_length)
1856 {
1857     u_char      ch;
1858     uint32_t    hash;
1859     const char  *p, *end;
1860 
1861     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1862     end = name + name_length;
1863 
1864     for (p = name; p < end; p++) {
1865         ch = *p;
1866         hash = (hash << 4) + hash + nxt_lowcase(ch);
1867     }
1868 
1869     hash = (hash >> 16) ^ hash;
1870 
1871     return hash;
1872 }
1873 
1874 
1875 void
1876 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1877 {
1878     char                *name;
1879     uint32_t            i, j;
1880     nxt_unit_field_t    *fields, f;
1881     nxt_unit_request_t  *r;
1882 
1883     static nxt_str_t  content_length = nxt_string("content-length");
1884     static nxt_str_t  content_type = nxt_string("content-type");
1885     static nxt_str_t  cookie = nxt_string("cookie");
1886 
1887     nxt_unit_req_debug(req, "group_dup_fields");
1888 
1889     r = req->request;
1890     fields = r->fields;
1891 
1892     for (i = 0; i < r->fields_count; i++) {
1893         name = nxt_unit_sptr_get(&fields[i].name);
1894 
1895         switch (fields[i].hash) {
1896         case NXT_UNIT_HASH_CONTENT_LENGTH:
1897             if (fields[i].name_length == content_length.length
1898                 && nxt_unit_memcasecmp(name, content_length.start,
1899                                        content_length.length) == 0)
1900             {
1901                 r->content_length_field = i;
1902             }
1903 
1904             break;
1905 
1906         case NXT_UNIT_HASH_CONTENT_TYPE:
1907             if (fields[i].name_length == content_type.length
1908                 && nxt_unit_memcasecmp(name, content_type.start,
1909                                        content_type.length) == 0)
1910             {
1911                 r->content_type_field = i;
1912             }
1913 
1914             break;
1915 
1916         case NXT_UNIT_HASH_COOKIE:
1917             if (fields[i].name_length == cookie.length
1918                 && nxt_unit_memcasecmp(name, cookie.start,
1919                                        cookie.length) == 0)
1920             {
1921                 r->cookie_field = i;
1922             }
1923 
1924             break;
1925         }
1926 
1927         for (j = i + 1; j < r->fields_count; j++) {
1928             if (fields[i].hash != fields[j].hash
1929                 || fields[i].name_length != fields[j].name_length
1930                 || nxt_unit_memcasecmp(name,
1931                                        nxt_unit_sptr_get(&fields[j].name),
1932                                        fields[j].name_length) != 0)
1933             {
1934                 continue;
1935             }
1936 
1937             f = fields[j];
1938             f.value.offset += (j - (i + 1)) * sizeof(f);
1939 
1940             while (j > i + 1) {
1941                 fields[j] = fields[j - 1];
1942                 fields[j].name.offset -= sizeof(f);
1943                 fields[j].value.offset -= sizeof(f);
1944                 j--;
1945             }
1946 
1947             fields[j] = f;
1948 
1949             /* Assign the same name pointer for further grouping simplicity. */
1950             nxt_unit_sptr_set(&fields[j].name, name);
1951 
1952             i++;
1953         }
1954     }
1955 }
1956 
1957 
1958 int
1959 nxt_unit_response_init(nxt_unit_request_info_t *req,
1960     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
1961 {
1962     uint32_t                      buf_size;
1963     nxt_unit_buf_t                *buf;
1964     nxt_unit_request_info_impl_t  *req_impl;
1965 
1966     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1967 
1968     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1969         nxt_unit_req_warn(req, "init: response already sent");
1970 
1971         return NXT_UNIT_ERROR;
1972     }
1973 
1974     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
1975                        (int) max_fields_count, (int) max_fields_size);
1976 
1977     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
1978         nxt_unit_req_debug(req, "duplicate response init");
1979     }
1980 
1981     /*
1982      * Each field name and value 0-terminated by libunit,
1983      * this is the reason of '+ 2' below.
1984      */
1985     buf_size = sizeof(nxt_unit_response_t)
1986                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
1987                + max_fields_size;
1988 
1989     if (nxt_slow_path(req->response_buf != NULL)) {
1990         buf = req->response_buf;
1991 
1992         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
1993             goto init_response;
1994         }
1995 
1996         nxt_unit_buf_free(buf);
1997 
1998         req->response_buf = NULL;
1999         req->response = NULL;
2000         req->response_max_fields = 0;
2001 
2002         req_impl->state = NXT_UNIT_RS_START;
2003     }
2004 
2005     buf = nxt_unit_response_buf_alloc(req, buf_size);
2006     if (nxt_slow_path(buf == NULL)) {
2007         return NXT_UNIT_ERROR;
2008     }
2009 
2010 init_response:
2011 
2012     memset(buf->start, 0, sizeof(nxt_unit_response_t));
2013 
2014     req->response_buf = buf;
2015 
2016     req->response = (nxt_unit_response_t *) buf->start;
2017     req->response->status = status;
2018 
2019     buf->free = buf->start + sizeof(nxt_unit_response_t)
2020                 + max_fields_count * sizeof(nxt_unit_field_t);
2021 
2022     req->response_max_fields = max_fields_count;
2023     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2024 
2025     return NXT_UNIT_OK;
2026 }
2027 
2028 
2029 int
2030 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2031     uint32_t max_fields_count, uint32_t max_fields_size)
2032 {
2033     char                          *p;
2034     uint32_t                      i, buf_size;
2035     nxt_unit_buf_t                *buf;
2036     nxt_unit_field_t              *f, *src;
2037     nxt_unit_response_t           *resp;
2038     nxt_unit_request_info_impl_t  *req_impl;
2039 
2040     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2041 
2042     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2043         nxt_unit_req_warn(req, "realloc: response not init");
2044 
2045         return NXT_UNIT_ERROR;
2046     }
2047 
2048     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2049         nxt_unit_req_warn(req, "realloc: response already sent");
2050 
2051         return NXT_UNIT_ERROR;
2052     }
2053 
2054     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2055         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2056 
2057         return NXT_UNIT_ERROR;
2058     }
2059 
2060     /*
2061      * Each field name and value 0-terminated by libunit,
2062      * this is the reason of '+ 2' below.
2063      */
2064     buf_size = sizeof(nxt_unit_response_t)
2065                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2066                + max_fields_size;
2067 
2068     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2069 
2070     buf = nxt_unit_response_buf_alloc(req, buf_size);
2071     if (nxt_slow_path(buf == NULL)) {
2072         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2073         return NXT_UNIT_ERROR;
2074     }
2075 
2076     resp = (nxt_unit_response_t *) buf->start;
2077 
2078     memset(resp, 0, sizeof(nxt_unit_response_t));
2079 
2080     resp->status = req->response->status;
2081     resp->content_length = req->response->content_length;
2082 
2083     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2084     f = resp->fields;
2085 
2086     for (i = 0; i < req->response->fields_count; i++) {
2087         src = req->response->fields + i;
2088 
2089         if (nxt_slow_path(src->skip != 0)) {
2090             continue;
2091         }
2092 
2093         if (nxt_slow_path(src->name_length + src->value_length + 2
2094                           > (uint32_t) (buf->end - p)))
2095         {
2096             nxt_unit_req_warn(req, "realloc: not enough space for field"
2097                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2098                   i, src, src->name_length, src->value_length);
2099 
2100             goto fail;
2101         }
2102 
2103         nxt_unit_sptr_set(&f->name, p);
2104         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2105         *p++ = '\0';
2106 
2107         nxt_unit_sptr_set(&f->value, p);
2108         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2109         *p++ = '\0';
2110 
2111         f->hash = src->hash;
2112         f->skip = 0;
2113         f->name_length = src->name_length;
2114         f->value_length = src->value_length;
2115 
2116         resp->fields_count++;
2117         f++;
2118     }
2119 
2120     if (req->response->piggyback_content_length > 0) {
2121         if (nxt_slow_path(req->response->piggyback_content_length
2122                           > (uint32_t) (buf->end - p)))
2123         {
2124             nxt_unit_req_warn(req, "realloc: not enought space for content"
2125                   " #%"PRIu32", %"PRIu32" required",
2126                   i, req->response->piggyback_content_length);
2127 
2128             goto fail;
2129         }
2130 
2131         resp->piggyback_content_length =
2132                                        req->response->piggyback_content_length;
2133 
2134         nxt_unit_sptr_set(&resp->piggyback_content, p);
2135         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2136                        req->response->piggyback_content_length);
2137     }
2138 
2139     buf->free = p;
2140 
2141     nxt_unit_buf_free(req->response_buf);
2142 
2143     req->response = resp;
2144     req->response_buf = buf;
2145     req->response_max_fields = max_fields_count;
2146 
2147     return NXT_UNIT_OK;
2148 
2149 fail:
2150 
2151     nxt_unit_buf_free(buf);
2152 
2153     return NXT_UNIT_ERROR;
2154 }
2155 
2156 
2157 int
2158 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2159 {
2160     nxt_unit_request_info_impl_t  *req_impl;
2161 
2162     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2163 
2164     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2165 }
2166 
2167 
2168 int
2169 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2170     const char *name, uint8_t name_length,
2171     const char *value, uint32_t value_length)
2172 {
2173     nxt_unit_buf_t                *buf;
2174     nxt_unit_field_t              *f;
2175     nxt_unit_response_t           *resp;
2176     nxt_unit_request_info_impl_t  *req_impl;
2177 
2178     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2179 
2180     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2181         nxt_unit_req_warn(req, "add_field: response not initialized or "
2182                           "already sent");
2183 
2184         return NXT_UNIT_ERROR;
2185     }
2186 
2187     resp = req->response;
2188 
2189     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2190         nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2191                           (int) resp->fields_count);
2192 
2193         return NXT_UNIT_ERROR;
2194     }
2195 
2196     buf = req->response_buf;
2197 
2198     if (nxt_slow_path(name_length + value_length + 2
2199                       > (uint32_t) (buf->end - buf->free)))
2200     {
2201         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2202 
2203         return NXT_UNIT_ERROR;
2204     }
2205 
2206     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2207                        resp->fields_count,
2208                        (int) name_length, name,
2209                        (int) value_length, value);
2210 
2211     f = resp->fields + resp->fields_count;
2212 
2213     nxt_unit_sptr_set(&f->name, buf->free);
2214     buf->free = nxt_cpymem(buf->free, name, name_length);
2215     *buf->free++ = '\0';
2216 
2217     nxt_unit_sptr_set(&f->value, buf->free);
2218     buf->free = nxt_cpymem(buf->free, value, value_length);
2219     *buf->free++ = '\0';
2220 
2221     f->hash = nxt_unit_field_hash(name, name_length);
2222     f->skip = 0;
2223     f->name_length = name_length;
2224     f->value_length = value_length;
2225 
2226     resp->fields_count++;
2227 
2228     return NXT_UNIT_OK;
2229 }
2230 
2231 
2232 int
2233 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2234     const void* src, uint32_t size)
2235 {
2236     nxt_unit_buf_t                *buf;
2237     nxt_unit_response_t           *resp;
2238     nxt_unit_request_info_impl_t  *req_impl;
2239 
2240     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2241 
2242     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2243         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2244 
2245         return NXT_UNIT_ERROR;
2246     }
2247 
2248     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2249         nxt_unit_req_warn(req, "add_content: response already sent");
2250 
2251         return NXT_UNIT_ERROR;
2252     }
2253 
2254     buf = req->response_buf;
2255 
2256     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2257         nxt_unit_req_warn(req, "add_content: buffer overflow");
2258 
2259         return NXT_UNIT_ERROR;
2260     }
2261 
2262     resp = req->response;
2263 
2264     if (resp->piggyback_content_length == 0) {
2265         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2266         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2267     }
2268 
2269     resp->piggyback_content_length += size;
2270 
2271     buf->free = nxt_cpymem(buf->free, src, size);
2272 
2273     return NXT_UNIT_OK;
2274 }
2275 
2276 
2277 int
2278 nxt_unit_response_send(nxt_unit_request_info_t *req)
2279 {
2280     int                           rc;
2281     nxt_unit_mmap_buf_t           *mmap_buf;
2282     nxt_unit_request_info_impl_t  *req_impl;
2283 
2284     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2285 
2286     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2287         nxt_unit_req_warn(req, "send: response is not initialized yet");
2288 
2289         return NXT_UNIT_ERROR;
2290     }
2291 
2292     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2293         nxt_unit_req_warn(req, "send: response already sent");
2294 
2295         return NXT_UNIT_ERROR;
2296     }
2297 
2298     if (req->request->websocket_handshake && req->response->status == 101) {
2299         nxt_unit_response_upgrade(req);
2300     }
2301 
2302     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2303                        req->response->fields_count,
2304                        (int) (req->response_buf->free
2305                               - req->response_buf->start));
2306 
2307     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2308 
2309     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2310     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2311         req->response = NULL;
2312         req->response_buf = NULL;
2313         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2314 
2315         nxt_unit_mmap_buf_free(mmap_buf);
2316     }
2317 
2318     return rc;
2319 }
2320 
2321 
2322 int
2323 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2324 {
2325     nxt_unit_request_info_impl_t  *req_impl;
2326 
2327     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2328 
2329     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2330 }
2331 
2332 
2333 nxt_unit_buf_t *
2334 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2335 {
2336     int                           rc;
2337     nxt_unit_mmap_buf_t           *mmap_buf;
2338     nxt_unit_request_info_impl_t  *req_impl;
2339 
2340     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2341         nxt_unit_req_warn(req, "response_buf_alloc: "
2342                           "requested buffer (%"PRIu32") too big", size);
2343 
2344         return NULL;
2345     }
2346 
2347     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2348 
2349     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2350 
2351     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2352     if (nxt_slow_path(mmap_buf == NULL)) {
2353         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2354 
2355         return NULL;
2356     }
2357 
2358     mmap_buf->req = req;
2359 
2360     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2361 
2362     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2363                                    size, size, mmap_buf,
2364                                    NULL);
2365     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2366         nxt_unit_mmap_buf_release(mmap_buf);
2367 
2368         nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2369 
2370         return NULL;
2371     }
2372 
2373     return &mmap_buf->buf;
2374 }
2375 
2376 
2377 static nxt_unit_mmap_buf_t *
2378 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2379 {
2380     nxt_unit_mmap_buf_t  *mmap_buf;
2381     nxt_unit_ctx_impl_t  *ctx_impl;
2382 
2383     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2384 
2385     pthread_mutex_lock(&ctx_impl->mutex);
2386 
2387     if (ctx_impl->free_buf == NULL) {
2388         pthread_mutex_unlock(&ctx_impl->mutex);
2389 
2390         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2391         if (nxt_slow_path(mmap_buf == NULL)) {
2392             return NULL;
2393         }
2394 
2395     } else {
2396         mmap_buf = ctx_impl->free_buf;
2397 
2398         nxt_unit_mmap_buf_unlink(mmap_buf);
2399 
2400         pthread_mutex_unlock(&ctx_impl->mutex);
2401     }
2402 
2403     mmap_buf->ctx_impl = ctx_impl;
2404 
2405     mmap_buf->hdr = NULL;
2406     mmap_buf->free_ptr = NULL;
2407 
2408     return mmap_buf;
2409 }
2410 
2411 
2412 static void
2413 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2414 {
2415     nxt_unit_mmap_buf_unlink(mmap_buf);
2416 
2417     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2418 
2419     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2420 
2421     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2422 }
2423 
2424 
2425 int
2426 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2427 {
2428     return req->request->websocket_handshake;
2429 }
2430 
2431 
2432 int
2433 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2434 {
2435     int                           rc;
2436     nxt_unit_request_info_impl_t  *req_impl;
2437 
2438     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2439 
2440     if (nxt_slow_path(req_impl->websocket != 0)) {
2441         nxt_unit_req_debug(req, "upgrade: already upgraded");
2442 
2443         return NXT_UNIT_OK;
2444     }
2445 
2446     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2447         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2448 
2449         return NXT_UNIT_ERROR;
2450     }
2451 
2452     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2453         nxt_unit_req_warn(req, "upgrade: response already sent");
2454 
2455         return NXT_UNIT_ERROR;
2456     }
2457 
2458     rc = nxt_unit_request_hash_add(req->ctx, req);
2459     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2460         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2461 
2462         return NXT_UNIT_ERROR;
2463     }
2464 
2465     req_impl->websocket = 1;
2466 
2467     req->response->status = 101;
2468 
2469     return NXT_UNIT_OK;
2470 }
2471 
2472 
2473 int
2474 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2475 {
2476     nxt_unit_request_info_impl_t  *req_impl;
2477 
2478     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2479 
2480     return req_impl->websocket;
2481 }
2482 
2483 
2484 nxt_unit_request_info_t *
2485 nxt_unit_get_request_info_from_data(void *data)
2486 {
2487     nxt_unit_request_info_impl_t  *req_impl;
2488 
2489     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2490 
2491     return &req_impl->req;
2492 }
2493 
2494 
2495 int
2496 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2497 {
2498     int                           rc;
2499     nxt_unit_mmap_buf_t           *mmap_buf;
2500     nxt_unit_request_info_t       *req;
2501     nxt_unit_request_info_impl_t  *req_impl;
2502 
2503     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2504 
2505     req = mmap_buf->req;
2506     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2507 
2508     nxt_unit_req_debug(req, "buf_send: %d bytes",
2509                        (int) (buf->free - buf->start));
2510 
2511     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2512         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2513 
2514         return NXT_UNIT_ERROR;
2515     }
2516 
2517     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2518         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2519 
2520         return NXT_UNIT_ERROR;
2521     }
2522 
2523     if (nxt_fast_path(buf->free > buf->start)) {
2524         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2525         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2526             return rc;
2527         }
2528     }
2529 
2530     nxt_unit_mmap_buf_free(mmap_buf);
2531 
2532     return NXT_UNIT_OK;
2533 }
2534 
2535 
2536 static void
2537 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2538 {
2539     int                      rc;
2540     nxt_unit_mmap_buf_t      *mmap_buf;
2541     nxt_unit_request_info_t  *req;
2542 
2543     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2544 
2545     req = mmap_buf->req;
2546 
2547     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2548     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2549         nxt_unit_mmap_buf_free(mmap_buf);
2550 
2551         nxt_unit_request_info_release(req);
2552 
2553     } else {
2554         nxt_unit_request_done(req, rc);
2555     }
2556 }
2557 
2558 
2559 static int
2560 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2561     nxt_unit_mmap_buf_t *mmap_buf, int last)
2562 {
2563     struct {
2564         nxt_port_msg_t       msg;
2565         nxt_port_mmap_msg_t  mmap_msg;
2566     } m;
2567 
2568     int                           rc;
2569     u_char                        *last_used, *first_free;
2570     ssize_t                       res;
2571     nxt_chunk_id_t                first_free_chunk;
2572     nxt_unit_buf_t                *buf;
2573     nxt_unit_impl_t               *lib;
2574     nxt_port_mmap_header_t        *hdr;
2575     nxt_unit_request_info_impl_t  *req_impl;
2576 
2577     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2578     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2579 
2580     buf = &mmap_buf->buf;
2581     hdr = mmap_buf->hdr;
2582 
2583     m.mmap_msg.size = buf->free - buf->start;
2584 
2585     m.msg.stream = req_impl->stream;
2586     m.msg.pid = lib->pid;
2587     m.msg.reply_port = 0;
2588     m.msg.type = _NXT_PORT_MSG_DATA;
2589     m.msg.last = last != 0;
2590     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2591     m.msg.nf = 0;
2592     m.msg.mf = 0;
2593     m.msg.tracking = 0;
2594 
2595     rc = NXT_UNIT_ERROR;
2596 
2597     if (m.msg.mmap) {
2598         m.mmap_msg.mmap_id = hdr->id;
2599         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2600                                                      (u_char *) buf->start);
2601 
2602         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2603                        req_impl->stream,
2604                        (int) m.mmap_msg.mmap_id,
2605                        (int) m.mmap_msg.chunk_id,
2606                        (int) m.mmap_msg.size);
2607 
2608         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2609                                  NULL, 0);
2610         if (nxt_slow_path(res != sizeof(m))) {
2611             goto free_buf;
2612         }
2613 
2614         last_used = (u_char *) buf->free - 1;
2615         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2616 
2617         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2618             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2619 
2620             buf->start = (char *) first_free;
2621             buf->free = buf->start;
2622 
2623             if (buf->end < buf->start) {
2624                 buf->end = buf->start;
2625             }
2626 
2627         } else {
2628             buf->start = NULL;
2629             buf->free = NULL;
2630             buf->end = NULL;
2631 
2632             mmap_buf->hdr = NULL;
2633         }
2634 
2635         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2636                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2637 
2638         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2639                        (int) lib->outgoing.allocated_chunks);
2640 
2641     } else {
2642         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2643                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2644         {
2645             nxt_unit_alert(req->ctx,
2646                            "#%"PRIu32": failed to send plain memory buffer"
2647                            ": no space reserved for message header",
2648                            req_impl->stream);
2649 
2650             goto free_buf;
2651         }
2652 
2653         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2654 
2655         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2656                        req_impl->stream,
2657                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2658 
2659         res = nxt_unit_port_send(req->ctx, req->response_port,
2660                                  buf->start - sizeof(m.msg),
2661                                  m.mmap_msg.size + sizeof(m.msg),
2662                                  NULL, 0);
2663         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2664             goto free_buf;
2665         }
2666     }
2667 
2668     rc = NXT_UNIT_OK;
2669 
2670 free_buf:
2671 
2672     nxt_unit_free_outgoing_buf(mmap_buf);
2673 
2674     return rc;
2675 }
2676 
2677 
2678 void
2679 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2680 {
2681     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2682 }
2683 
2684 
2685 static void
2686 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2687 {
2688     nxt_unit_free_outgoing_buf(mmap_buf);
2689 
2690     nxt_unit_mmap_buf_release(mmap_buf);
2691 }
2692 
2693 
2694 static void
2695 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2696 {
2697     if (mmap_buf->hdr != NULL) {
2698         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2699                               mmap_buf->hdr, mmap_buf->buf.start,
2700                               mmap_buf->buf.end - mmap_buf->buf.start);
2701 
2702         mmap_buf->hdr = NULL;
2703 
2704         return;
2705     }
2706 
2707     if (mmap_buf->free_ptr != NULL) {
2708         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2709 
2710         mmap_buf->free_ptr = NULL;
2711     }
2712 }
2713 
2714 
2715 static nxt_unit_read_buf_t *
2716 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2717 {
2718     nxt_unit_ctx_impl_t  *ctx_impl;
2719     nxt_unit_read_buf_t  *rbuf;
2720 
2721     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2722 
2723     pthread_mutex_lock(&ctx_impl->mutex);
2724 
2725     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2726 
2727     pthread_mutex_unlock(&ctx_impl->mutex);
2728 
2729     memset(rbuf->oob, 0, sizeof(struct cmsghdr));
2730 
2731     return rbuf;
2732 }
2733 
2734 
2735 static nxt_unit_read_buf_t *
2736 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2737 {
2738     nxt_queue_link_t     *link;
2739     nxt_unit_read_buf_t  *rbuf;
2740 
2741     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2742         link = nxt_queue_first(&ctx_impl->free_rbuf);
2743         nxt_queue_remove(link);
2744 
2745         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2746 
2747         return rbuf;
2748     }
2749 
2750     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2751 
2752     if (nxt_fast_path(rbuf != NULL)) {
2753         rbuf->ctx_impl = ctx_impl;
2754     }
2755 
2756     return rbuf;
2757 }
2758 
2759 
2760 static void
2761 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2762     nxt_unit_read_buf_t *rbuf)
2763 {
2764     nxt_unit_ctx_impl_t  *ctx_impl;
2765 
2766     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2767 
2768     pthread_mutex_lock(&ctx_impl->mutex);
2769 
2770     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2771 
2772     pthread_mutex_unlock(&ctx_impl->mutex);
2773 }
2774 
2775 
2776 nxt_unit_buf_t *
2777 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2778 {
2779     nxt_unit_mmap_buf_t  *mmap_buf;
2780 
2781     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2782 
2783     if (mmap_buf->next == NULL) {
2784         return NULL;
2785     }
2786 
2787     return &mmap_buf->next->buf;
2788 }
2789 
2790 
2791 uint32_t
2792 nxt_unit_buf_max(void)
2793 {
2794     return PORT_MMAP_DATA_SIZE;
2795 }
2796 
2797 
2798 uint32_t
2799 nxt_unit_buf_min(void)
2800 {
2801     return PORT_MMAP_CHUNK_SIZE;
2802 }
2803 
2804 
2805 int
2806 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2807     size_t size)
2808 {
2809     ssize_t  res;
2810 
2811     res = nxt_unit_response_write_nb(req, start, size, size);
2812 
2813     return res < 0 ? -res : NXT_UNIT_OK;
2814 }
2815 
2816 
2817 ssize_t
2818 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2819     size_t size, size_t min_size)
2820 {
2821     int                           rc;
2822     ssize_t                       sent;
2823     uint32_t                      part_size, min_part_size, buf_size;
2824     const char                    *part_start;
2825     nxt_unit_mmap_buf_t           mmap_buf;
2826     nxt_unit_request_info_impl_t  *req_impl;
2827     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2828 
2829     nxt_unit_req_debug(req, "write: %d", (int) size);
2830 
2831     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2832 
2833     part_start = start;
2834     sent = 0;
2835 
2836     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2837         nxt_unit_req_alert(req, "write: response not initialized yet");
2838 
2839         return -NXT_UNIT_ERROR;
2840     }
2841 
2842     /* Check if response is not send yet. */
2843     if (nxt_slow_path(req->response_buf != NULL)) {
2844         part_size = req->response_buf->end - req->response_buf->free;
2845         part_size = nxt_min(size, part_size);
2846 
2847         rc = nxt_unit_response_add_content(req, part_start, part_size);
2848         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2849             return -rc;
2850         }
2851 
2852         rc = nxt_unit_response_send(req);
2853         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2854             return -rc;
2855         }
2856 
2857         size -= part_size;
2858         part_start += part_size;
2859         sent += part_size;
2860 
2861         min_size -= nxt_min(min_size, part_size);
2862     }
2863 
2864     while (size > 0) {
2865         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2866         min_part_size = nxt_min(min_size, part_size);
2867         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2868 
2869         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2870                                        min_part_size, &mmap_buf, local_buf);
2871         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2872             return -rc;
2873         }
2874 
2875         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2876         if (nxt_slow_path(buf_size == 0)) {
2877             return sent;
2878         }
2879         part_size = nxt_min(buf_size, part_size);
2880 
2881         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2882                                        part_start, part_size);
2883 
2884         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2885         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2886             return -rc;
2887         }
2888 
2889         size -= part_size;
2890         part_start += part_size;
2891         sent += part_size;
2892 
2893         min_size -= nxt_min(min_size, part_size);
2894     }
2895 
2896     return sent;
2897 }
2898 
2899 
2900 int
2901 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2902     nxt_unit_read_info_t *read_info)
2903 {
2904     int                           rc;
2905     ssize_t                       n;
2906     uint32_t                      buf_size;
2907     nxt_unit_buf_t                *buf;
2908     nxt_unit_mmap_buf_t           mmap_buf;
2909     nxt_unit_request_info_impl_t  *req_impl;
2910     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2911 
2912     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2913 
2914     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2915         nxt_unit_req_alert(req, "write: response not initialized yet");
2916 
2917         return NXT_UNIT_ERROR;
2918     }
2919 
2920     /* Check if response is not send yet. */
2921     if (nxt_slow_path(req->response_buf != NULL)) {
2922 
2923         /* Enable content in headers buf. */
2924         rc = nxt_unit_response_add_content(req, "", 0);
2925         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2926             nxt_unit_req_error(req, "Failed to add piggyback content");
2927 
2928             return rc;
2929         }
2930 
2931         buf = req->response_buf;
2932 
2933         while (buf->end - buf->free > 0) {
2934             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2935             if (nxt_slow_path(n < 0)) {
2936                 nxt_unit_req_error(req, "Read error");
2937 
2938                 return NXT_UNIT_ERROR;
2939             }
2940 
2941             /* Manually increase sizes. */
2942             buf->free += n;
2943             req->response->piggyback_content_length += n;
2944 
2945             if (read_info->eof) {
2946                 break;
2947             }
2948         }
2949 
2950         rc = nxt_unit_response_send(req);
2951         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2952             nxt_unit_req_error(req, "Failed to send headers with content");
2953 
2954             return rc;
2955         }
2956 
2957         if (read_info->eof) {
2958             return NXT_UNIT_OK;
2959         }
2960     }
2961 
2962     while (!read_info->eof) {
2963         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
2964                            read_info->buf_size);
2965 
2966         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
2967 
2968         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2969                                        buf_size, buf_size,
2970                                        &mmap_buf, local_buf);
2971         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2972             return rc;
2973         }
2974 
2975         buf = &mmap_buf.buf;
2976 
2977         while (!read_info->eof && buf->end > buf->free) {
2978             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2979             if (nxt_slow_path(n < 0)) {
2980                 nxt_unit_req_error(req, "Read error");
2981 
2982                 nxt_unit_free_outgoing_buf(&mmap_buf);
2983 
2984                 return NXT_UNIT_ERROR;
2985             }
2986 
2987             buf->free += n;
2988         }
2989 
2990         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2991         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2992             nxt_unit_req_error(req, "Failed to send content");
2993 
2994             return rc;
2995         }
2996     }
2997 
2998     return NXT_UNIT_OK;
2999 }
3000 
3001 
3002 ssize_t
3003 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3004 {
3005     ssize_t  buf_res, res;
3006 
3007     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3008                                 dst, size);
3009 
3010     if (buf_res < (ssize_t) size && req->content_fd != -1) {
3011         res = read(req->content_fd, dst, size);
3012         if (nxt_slow_path(res < 0)) {
3013             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3014                                strerror(errno), errno);
3015 
3016             return res;
3017         }
3018 
3019         if (res < (ssize_t) size) {
3020             nxt_unit_close(req->content_fd);
3021 
3022             req->content_fd = -1;
3023         }
3024 
3025         req->content_length -= res;
3026         size -= res;
3027 
3028         dst = nxt_pointer_to(dst, res);
3029 
3030     } else {
3031         res = 0;
3032     }
3033 
3034     return buf_res + res;
3035 }
3036 
3037 
3038 ssize_t
3039 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3040 {
3041     char                 *p;
3042     size_t               l_size, b_size;
3043     nxt_unit_buf_t       *b;
3044     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3045 
3046     if (req->content_length == 0) {
3047         return 0;
3048     }
3049 
3050     l_size = 0;
3051 
3052     b = req->content_buf;
3053 
3054     while (b != NULL) {
3055         b_size = b->end - b->free;
3056         p = memchr(b->free, '\n', b_size);
3057 
3058         if (p != NULL) {
3059             p++;
3060             l_size += p - b->free;
3061             break;
3062         }
3063 
3064         l_size += b_size;
3065 
3066         if (max_size <= l_size) {
3067             break;
3068         }
3069 
3070         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
3071         if (mmap_buf->next == NULL
3072             && req->content_fd != -1
3073             && l_size < req->content_length)
3074         {
3075             preread_buf = nxt_unit_request_preread(req, 16384);
3076             if (nxt_slow_path(preread_buf == NULL)) {
3077                 return -1;
3078             }
3079 
3080             nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
3081         }
3082 
3083         b = nxt_unit_buf_next(b);
3084     }
3085 
3086     return nxt_min(max_size, l_size);
3087 }
3088 
3089 
3090 static nxt_unit_mmap_buf_t *
3091 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
3092 {
3093     ssize_t              res;
3094     nxt_unit_mmap_buf_t  *mmap_buf;
3095 
3096     if (req->content_fd == -1) {
3097         nxt_unit_req_alert(req, "preread: content_fd == -1");
3098         return NULL;
3099     }
3100 
3101     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
3102     if (nxt_slow_path(mmap_buf == NULL)) {
3103         nxt_unit_req_alert(req, "preread: failed to allocate buf");
3104         return NULL;
3105     }
3106 
3107     mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size);
3108     if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3109         nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
3110         nxt_unit_mmap_buf_release(mmap_buf);
3111         return NULL;
3112     }
3113 
3114     mmap_buf->plain_ptr = mmap_buf->free_ptr;
3115 
3116     mmap_buf->hdr = NULL;
3117     mmap_buf->buf.start = mmap_buf->free_ptr;
3118     mmap_buf->buf.free = mmap_buf->buf.start;
3119     mmap_buf->buf.end = mmap_buf->buf.start + size;
3120 
3121     res = read(req->content_fd, mmap_buf->free_ptr, size);
3122     if (res < 0) {
3123         nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3124                            strerror(errno), errno);
3125 
3126         nxt_unit_mmap_buf_free(mmap_buf);
3127 
3128         return NULL;
3129     }
3130 
3131     if (res < (ssize_t) size) {
3132         nxt_unit_close(req->content_fd);
3133 
3134         req->content_fd = -1;
3135     }
3136 
3137     nxt_unit_req_debug(req, "preread: read %d", (int) res);
3138 
3139     mmap_buf->buf.end = mmap_buf->buf.free + res;
3140 
3141     return mmap_buf;
3142 }
3143 
3144 
3145 static ssize_t
3146 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
3147 {
3148     u_char          *p;
3149     size_t          rest, copy, read;
3150     nxt_unit_buf_t  *buf, *last_buf;
3151 
3152     p = dst;
3153     rest = size;
3154 
3155     buf = *b;
3156     last_buf = buf;
3157 
3158     while (buf != NULL) {
3159         last_buf = buf;
3160 
3161         copy = buf->end - buf->free;
3162         copy = nxt_min(rest, copy);
3163 
3164         p = nxt_cpymem(p, buf->free, copy);
3165 
3166         buf->free += copy;
3167         rest -= copy;
3168 
3169         if (rest == 0) {
3170             if (buf->end == buf->free) {
3171                 buf = nxt_unit_buf_next(buf);
3172             }
3173 
3174             break;
3175         }
3176 
3177         buf = nxt_unit_buf_next(buf);
3178     }
3179 
3180     *b = last_buf;
3181 
3182     read = size - rest;
3183 
3184     *len -= read;
3185 
3186     return read;
3187 }
3188 
3189 
3190 void
3191 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
3192 {
3193     uint32_t                      size;
3194     nxt_port_msg_t                msg;
3195     nxt_unit_impl_t               *lib;
3196     nxt_unit_request_info_impl_t  *req_impl;
3197 
3198     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
3199 
3200     nxt_unit_req_debug(req, "done: %d", rc);
3201 
3202     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3203         goto skip_response_send;
3204     }
3205 
3206     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
3207 
3208         size = nxt_length("Content-Type") + nxt_length("text/plain");
3209 
3210         rc = nxt_unit_response_init(req, 200, 1, size);
3211         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3212             goto skip_response_send;
3213         }
3214 
3215         rc = nxt_unit_response_add_field(req, "Content-Type",
3216                                    nxt_length("Content-Type"),
3217                                    "text/plain", nxt_length("text/plain"));
3218         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3219             goto skip_response_send;
3220         }
3221     }
3222 
3223     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
3224 
3225         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
3226 
3227         nxt_unit_buf_send_done(req->response_buf);
3228 
3229         return;
3230     }
3231 
3232 skip_response_send:
3233 
3234     lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
3235 
3236     msg.stream = req_impl->stream;
3237     msg.pid = lib->pid;
3238     msg.reply_port = 0;
3239     msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
3240                                    : _NXT_PORT_MSG_RPC_ERROR;
3241     msg.last = 1;
3242     msg.mmap = 0;
3243     msg.nf = 0;
3244     msg.mf = 0;
3245     msg.tracking = 0;
3246 
3247     (void) nxt_unit_port_send(req->ctx, req->response_port,
3248                               &msg, sizeof(msg), NULL, 0);
3249 
3250     nxt_unit_request_info_release(req);
3251 }
3252 
3253 
3254 int
3255 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
3256     uint8_t last, const void *start, size_t size)
3257 {
3258     const struct iovec  iov = { (void *) start, size };
3259 
3260     return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
3261 }
3262 
3263 
3264 int
3265 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
3266     uint8_t last, const struct iovec *iov, int iovcnt)
3267 {
3268     int                     i, rc;
3269     size_t                  l, copy;
3270     uint32_t                payload_len, buf_size, alloc_size;
3271     const uint8_t           *b;
3272     nxt_unit_buf_t          *buf;
3273     nxt_unit_mmap_buf_t     mmap_buf;
3274     nxt_websocket_header_t  *wh;
3275     char                    local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
3276 
3277     payload_len = 0;
3278 
3279     for (i = 0; i < iovcnt; i++) {
3280         payload_len += iov[i].iov_len;
3281     }
3282 
3283     buf_size = 10 + payload_len;
3284     alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3285 
3286     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3287                                    alloc_size, alloc_size,
3288                                    &mmap_buf, local_buf);
3289     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3290         return rc;
3291     }
3292 
3293     buf = &mmap_buf.buf;
3294 
3295     buf->start[0] = 0;
3296     buf->start[1] = 0;
3297 
3298     buf_size -= buf->end - buf->start;
3299 
3300     wh = (void *) buf->free;
3301 
3302     buf->free = nxt_websocket_frame_init(wh, payload_len);
3303     wh->fin = last;
3304     wh->opcode = opcode;
3305 
3306     for (i = 0; i < iovcnt; i++) {
3307         b = iov[i].iov_base;
3308         l = iov[i].iov_len;
3309 
3310         while (l > 0) {
3311             copy = buf->end - buf->free;
3312             copy = nxt_min(l, copy);
3313 
3314             buf->free = nxt_cpymem(buf->free, b, copy);
3315             b += copy;
3316             l -= copy;
3317 
3318             if (l > 0) {
3319                 if (nxt_fast_path(buf->free > buf->start)) {
3320                     rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3321 
3322                     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3323                         return rc;
3324                     }
3325                 }
3326 
3327                 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3328 
3329                 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3330                                                alloc_size, alloc_size,
3331                                                &mmap_buf, local_buf);
3332                 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3333                     return rc;
3334                 }
3335 
3336                 buf_size -= buf->end - buf->start;
3337             }
3338         }
3339     }
3340 
3341     if (buf->free > buf->start) {
3342         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3343     }
3344 
3345     return rc;
3346 }
3347 
3348 
3349 ssize_t
3350 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
3351     size_t size)
3352 {
3353     ssize_t   res;
3354     uint8_t   *b;
3355     uint64_t  i, d;
3356 
3357     res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
3358                             dst, size);
3359 
3360     if (ws->mask == NULL) {
3361         return res;
3362     }
3363 
3364     b = dst;
3365     d = (ws->payload_len - ws->content_length - res) % 4;
3366 
3367     for (i = 0; i < (uint64_t) res; i++) {
3368         b[i] ^= ws->mask[ (i + d) % 4 ];
3369     }
3370 
3371     return res;
3372 }
3373 
3374 
3375 int
3376 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
3377 {
3378     char                             *b;
3379     size_t                           size, hsize;
3380     nxt_unit_websocket_frame_impl_t  *ws_impl;
3381 
3382     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
3383 
3384     if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
3385         return NXT_UNIT_OK;
3386     }
3387 
3388     size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
3389 
3390     b = nxt_unit_malloc(ws->req->ctx, size);
3391     if (nxt_slow_path(b == NULL)) {
3392         return NXT_UNIT_ERROR;
3393     }
3394 
3395     memcpy(b, ws_impl->buf->buf.start, size);
3396 
3397     hsize = nxt_websocket_frame_header_size(b);
3398 
3399     ws_impl->buf->buf.start = b;
3400     ws_impl->buf->buf.free = b + hsize;
3401     ws_impl->buf->buf.end = b + size;
3402 
3403     ws_impl->buf->free_ptr = b;
3404 
3405     ws_impl->ws.header = (nxt_websocket_header_t *) b;
3406 
3407     if (ws_impl->ws.header->mask) {
3408         ws_impl->ws.mask = (uint8_t *) b + hsize - 4;
3409 
3410     } else {
3411         ws_impl->ws.mask = NULL;
3412     }
3413 
3414     return NXT_UNIT_OK;
3415 }
3416 
3417 
3418 void
3419 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
3420 {
3421     nxt_unit_websocket_frame_release(ws);
3422 }
3423 
3424 
3425 static nxt_port_mmap_header_t *
3426 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3427     nxt_chunk_id_t *c, int *n, int min_n)
3428 {
3429     int                     res, nchunks, i;
3430     uint32_t                outgoing_size;
3431     nxt_unit_mmap_t         *mm, *mm_end;
3432     nxt_unit_impl_t         *lib;
3433     nxt_port_mmap_header_t  *hdr;
3434 
3435     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3436 
3437     pthread_mutex_lock(&lib->outgoing.mutex);
3438 
3439 retry:
3440 
3441     outgoing_size = lib->outgoing.size;
3442 
3443     mm_end = lib->outgoing.elts + outgoing_size;
3444 
3445     for (mm = lib->outgoing.elts; mm < mm_end; mm++) {
3446         hdr = mm->hdr;
3447 
3448         if (hdr->sent_over != 0xFFFFu
3449             && (hdr->sent_over != port->id.id
3450                 || mm->src_thread != pthread_self()))
3451         {
3452             continue;
3453         }
3454 
3455         *c = 0;
3456 
3457         while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
3458             nchunks = 1;
3459 
3460             while (nchunks < *n) {
3461                 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
3462                                                        *c + nchunks);
3463 
3464                 if (res == 0) {
3465                     if (nchunks >= min_n) {
3466                         *n = nchunks;
3467 
3468                         goto unlock;
3469                     }
3470 
3471                     for (i = 0; i < nchunks; i++) {
3472                         nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
3473                     }
3474 
3475                     *c += nchunks + 1;
3476                     nchunks = 0;
3477                     break;
3478                 }
3479 
3480                 nchunks++;
3481             }
3482 
3483             if (nchunks >= min_n) {
3484                 *n = nchunks;
3485 
3486                 goto unlock;
3487             }
3488         }
3489 
3490         hdr->oosm = 1;
3491     }
3492 
3493     if (outgoing_size >= lib->shm_mmap_limit) {
3494         /* Cannot allocate more shared memory. */
3495         pthread_mutex_unlock(&lib->outgoing.mutex);
3496 
3497         if (min_n == 0) {
3498             *n = 0;
3499         }
3500 
3501         if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n
3502                           >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
3503         {
3504             /* Memory allocated by application, but not send to router. */
3505             return NULL;
3506         }
3507 
3508         /* Notify router about OOSM condition. */
3509 
3510         res = nxt_unit_send_oosm(ctx, port);
3511         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3512             return NULL;
3513         }
3514 
3515         /* Return if caller can handle OOSM condition. Non-blocking mode. */
3516 
3517         if (min_n == 0) {
3518             return NULL;
3519         }
3520 
3521         nxt_unit_debug(ctx, "oosm: waiting for ACK");
3522 
3523         res = nxt_unit_wait_shm_ack(ctx);
3524         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3525             return NULL;
3526         }
3527 
3528         nxt_unit_debug(ctx, "oosm: retry");
3529 
3530         pthread_mutex_lock(&lib->outgoing.mutex);
3531 
3532         goto retry;
3533     }
3534 
3535     *c = 0;
3536     hdr = nxt_unit_new_mmap(ctx, port, *n);
3537 
3538 unlock:
3539 
3540     nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n);
3541 
3542     nxt_unit_debug(ctx, "allocated_chunks %d",
3543                    (int) lib->outgoing.allocated_chunks);
3544 
3545     pthread_mutex_unlock(&lib->outgoing.mutex);
3546 
3547     return hdr;
3548 }
3549 
3550 
3551 static int
3552 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
3553 {
3554     ssize_t          res;
3555     nxt_port_msg_t   msg;
3556     nxt_unit_impl_t  *lib;
3557 
3558     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3559 
3560     msg.stream = 0;
3561     msg.pid = lib->pid;
3562     msg.reply_port = 0;
3563     msg.type = _NXT_PORT_MSG_OOSM;
3564     msg.last = 0;
3565     msg.mmap = 0;
3566     msg.nf = 0;
3567     msg.mf = 0;
3568     msg.tracking = 0;
3569 
3570     res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
3571     if (nxt_slow_path(res != sizeof(msg))) {
3572         return NXT_UNIT_ERROR;
3573     }
3574 
3575     return NXT_UNIT_OK;
3576 }
3577 
3578 
3579 static int
3580 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
3581 {
3582     int                  res;
3583     nxt_unit_ctx_impl_t  *ctx_impl;
3584     nxt_unit_read_buf_t  *rbuf;
3585 
3586     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3587 
3588     while (1) {
3589         rbuf = nxt_unit_read_buf_get(ctx);
3590         if (nxt_slow_path(rbuf == NULL)) {
3591             return NXT_UNIT_ERROR;
3592         }
3593 
3594         res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
3595         if (res == NXT_UNIT_ERROR) {
3596             nxt_unit_read_buf_release(ctx, rbuf);
3597 
3598             return NXT_UNIT_ERROR;
3599         }
3600 
3601         if (nxt_unit_is_shm_ack(rbuf)) {
3602             nxt_unit_read_buf_release(ctx, rbuf);
3603             break;
3604         }
3605 
3606         pthread_mutex_lock(&ctx_impl->mutex);
3607 
3608         nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
3609 
3610         pthread_mutex_unlock(&ctx_impl->mutex);
3611 
3612         if (nxt_unit_is_quit(rbuf)) {
3613             nxt_unit_debug(ctx, "oosm: quit received");
3614 
3615             return NXT_UNIT_ERROR;
3616         }
3617     }
3618 
3619     return NXT_UNIT_OK;
3620 }
3621 
3622 
3623 static nxt_unit_mmap_t *
3624 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
3625 {
3626     uint32_t         cap, n;
3627     nxt_unit_mmap_t  *e;
3628 
3629     if (nxt_fast_path(mmaps->size > i)) {
3630         return mmaps->elts + i;
3631     }
3632 
3633     cap = mmaps->cap;
3634 
3635     if (cap == 0) {
3636         cap = i + 1;
3637     }
3638 
3639     while (i + 1 > cap) {
3640 
3641         if (cap < 16) {
3642             cap = cap * 2;
3643 
3644         } else {
3645             cap = cap + cap / 2;
3646         }
3647     }
3648 
3649     if (cap != mmaps->cap) {
3650 
3651         e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t));
3652         if (nxt_slow_path(e == NULL)) {
3653             return NULL;
3654         }
3655 
3656         mmaps->elts = e;
3657 
3658         for (n = mmaps->cap; n < cap; n++) {
3659             e = mmaps->elts + n;
3660 
3661             e->hdr = NULL;
3662             nxt_queue_init(&e->awaiting_rbuf);
3663         }
3664 
3665         mmaps->cap = cap;
3666     }
3667 
3668     if (i + 1 > mmaps->size) {
3669         mmaps->size = i + 1;
3670     }
3671 
3672     return mmaps->elts + i;
3673 }
3674 
3675 
3676 static nxt_port_mmap_header_t *
3677 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
3678 {
3679     int                     i, fd, rc;
3680     void                    *mem;
3681     nxt_unit_mmap_t         *mm;
3682     nxt_unit_impl_t         *lib;
3683     nxt_port_mmap_header_t  *hdr;
3684 
3685     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3686 
3687     mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size);
3688     if (nxt_slow_path(mm == NULL)) {
3689         nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
3690 
3691         return NULL;
3692     }
3693 
3694     fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE);
3695     if (nxt_slow_path(fd == -1)) {
3696         goto remove_fail;
3697     }
3698 
3699     mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3700     if (nxt_slow_path(mem == MAP_FAILED)) {
3701         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3702                        strerror(errno), errno);
3703 
3704         nxt_unit_close(fd);
3705 
3706         goto remove_fail;
3707     }
3708 
3709     mm->hdr = mem;
3710     hdr = mem;
3711 
3712     memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
3713     memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
3714 
3715     hdr->id = lib->outgoing.size - 1;
3716     hdr->src_pid = lib->pid;
3717     hdr->dst_pid = port->id.pid;
3718     hdr->sent_over = port->id.id;
3719     mm->src_thread = pthread_self();
3720 
3721     /* Mark first n chunk(s) as busy */
3722     for (i = 0; i < n; i++) {
3723         nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
3724     }
3725 
3726     /* Mark as busy chunk followed the last available chunk. */
3727     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
3728     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
3729 
3730     pthread_mutex_unlock(&lib->outgoing.mutex);
3731 
3732     rc = nxt_unit_send_mmap(ctx, port, fd);
3733     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3734         munmap(mem, PORT_MMAP_SIZE);
3735         hdr = NULL;
3736 
3737     } else {
3738         nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
3739                        hdr->id, (int) lib->pid, (int) port->id.pid);
3740     }
3741 
3742     nxt_unit_close(fd);
3743 
3744     pthread_mutex_lock(&lib->outgoing.mutex);
3745 
3746     if (nxt_fast_path(hdr != NULL)) {
3747         return hdr;
3748     }
3749 
3750 remove_fail:
3751 
3752     lib->outgoing.size--;
3753 
3754     return NULL;
3755 }
3756 
3757 
3758 static int
3759 nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size)
3760 {
3761     int              fd;
3762     nxt_unit_impl_t  *lib;
3763 
3764     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3765 
3766 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
3767     char             name[64];
3768 
3769     snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
3770              lib->pid, (void *) pthread_self());
3771 #endif
3772 
3773 #if (NXT_HAVE_MEMFD_CREATE)
3774 
3775     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
3776     if (nxt_slow_path(fd == -1)) {
3777         nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
3778                        strerror(errno), errno);
3779 
3780         return -1;
3781     }
3782 
3783     nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
3784 
3785 #elif (NXT_HAVE_SHM_OPEN_ANON)
3786 
3787     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
3788     if (nxt_slow_path(fd == -1)) {
3789         nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
3790                        strerror(errno), errno);
3791 
3792         return -1;
3793     }
3794 
3795 #elif (NXT_HAVE_SHM_OPEN)
3796 
3797     /* Just in case. */
3798     shm_unlink(name);
3799 
3800     fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
3801     if (nxt_slow_path(fd == -1)) {
3802         nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
3803                        strerror(errno), errno);
3804 
3805         return -1;
3806     }
3807 
3808     if (nxt_slow_path(shm_unlink(name) == -1)) {
3809         nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name,
3810                        strerror(errno), errno);
3811     }
3812 
3813 #else
3814 
3815 #error No working shared memory implementation.
3816 
3817 #endif
3818 
3819     if (nxt_slow_path(ftruncate(fd, size) == -1)) {
3820         nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3821                        strerror(errno), errno);
3822 
3823         nxt_unit_close(fd);
3824 
3825         return -1;
3826     }
3827 
3828     return fd;
3829 }
3830 
3831 
3832 static int
3833 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
3834 {
3835     ssize_t          res;
3836     nxt_port_msg_t   msg;
3837     nxt_unit_impl_t  *lib;
3838     union {
3839         struct cmsghdr  cm;
3840         char            space[CMSG_SPACE(sizeof(int))];
3841     } cmsg;
3842 
3843     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3844 
3845     msg.stream = 0;
3846     msg.pid = lib->pid;
3847     msg.reply_port = 0;
3848     msg.type = _NXT_PORT_MSG_MMAP;
3849     msg.last = 0;
3850     msg.mmap = 0;
3851     msg.nf = 0;
3852     msg.mf = 0;
3853     msg.tracking = 0;
3854 
3855     /*
3856      * Fill all padding fields with 0.
3857      * Code in Go 1.11 validate cmsghdr using padding field as part of len.
3858      * See Cmsghdr definition and socketControlMessageHeaderAndData function.
3859      */
3860     memset(&cmsg, 0, sizeof(cmsg));
3861 
3862     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
3863     cmsg.cm.cmsg_level = SOL_SOCKET;
3864     cmsg.cm.cmsg_type = SCM_RIGHTS;
3865 
3866     /*
3867      * memcpy() is used instead of simple
3868      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
3869      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
3870      *   dereferencing type-punned pointer will break strict-aliasing rules
3871      *
3872      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
3873      * in the same simple assignment as in the code above.
3874      */
3875     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
3876 
3877     res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg),
3878                              &cmsg, sizeof(cmsg));
3879     if (nxt_slow_path(res != sizeof(msg))) {
3880         return NXT_UNIT_ERROR;
3881     }
3882 
3883     return NXT_UNIT_OK;
3884 }
3885 
3886 
3887 static int
3888 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3889     uint32_t size, uint32_t min_size,
3890     nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
3891 {
3892     int                     nchunks, min_nchunks;
3893     nxt_chunk_id_t          c;
3894     nxt_port_mmap_header_t  *hdr;
3895 
3896     if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
3897         if (local_buf != NULL) {
3898             mmap_buf->free_ptr = NULL;
3899             mmap_buf->plain_ptr = local_buf;
3900 
3901         } else {
3902             mmap_buf->free_ptr = nxt_unit_malloc(ctx,
3903                                                  size + sizeof(nxt_port_msg_t));
3904             if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3905                 return NXT_UNIT_ERROR;
3906             }
3907 
3908             mmap_buf->plain_ptr = mmap_buf->free_ptr;
3909         }
3910 
3911         mmap_buf->hdr = NULL;
3912         mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
3913         mmap_buf->buf.free = mmap_buf->buf.start;
3914         mmap_buf->buf.end = mmap_buf->buf.start + size;
3915 
3916         nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
3917                        mmap_buf->buf.start, (int) size);
3918 
3919         return NXT_UNIT_OK;
3920     }
3921 
3922     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3923     min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3924 
3925     hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks);
3926     if (nxt_slow_path(hdr == NULL)) {
3927         if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
3928             mmap_buf->hdr = NULL;
3929             mmap_buf->buf.start = NULL;
3930             mmap_buf->buf.free = NULL;
3931             mmap_buf->buf.end = NULL;
3932             mmap_buf->free_ptr = NULL;
3933 
3934             return NXT_UNIT_OK;
3935         }
3936 
3937         return NXT_UNIT_ERROR;
3938     }
3939 
3940     mmap_buf->hdr = hdr;
3941     mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
3942     mmap_buf->buf.free = mmap_buf->buf.start;
3943     mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
3944     mmap_buf->free_ptr = NULL;
3945     mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3946 
3947     nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
3948                   (int) hdr->id, (int) c,
3949                   (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
3950 
3951     return NXT_UNIT_OK;
3952 }
3953 
3954 
3955 static int
3956 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
3957 {
3958     int                     rc;
3959     void                    *mem;
3960     nxt_queue_t             awaiting_rbuf;
3961     struct stat             mmap_stat;
3962     nxt_unit_mmap_t         *mm;
3963     nxt_unit_impl_t         *lib;
3964     nxt_unit_ctx_impl_t     *ctx_impl;
3965     nxt_unit_read_buf_t     *rbuf;
3966     nxt_port_mmap_header_t  *hdr;
3967 
3968     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3969 
3970     nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
3971 
3972     if (fstat(fd, &mmap_stat) == -1) {
3973         nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
3974                        strerror(errno), errno);
3975 
3976         return NXT_UNIT_ERROR;
3977     }
3978 
3979     mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
3980                MAP_SHARED, fd, 0);
3981     if (nxt_slow_path(mem == MAP_FAILED)) {
3982         nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)",
3983                        strerror(errno), errno);
3984 
3985         return NXT_UNIT_ERROR;
3986     }
3987 
3988     hdr = mem;
3989 
3990     if (nxt_slow_path(hdr->src_pid != pid)) {
3991 
3992         nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header "
3993                        "detected: %d != %d or %d != %d", (int) hdr->src_pid,
3994                        (int) pid, (int) hdr->dst_pid, (int) lib->pid);
3995 
3996         munmap(mem, PORT_MMAP_SIZE);
3997 
3998         return NXT_UNIT_ERROR;
3999     }
4000 
4001     nxt_queue_init(&awaiting_rbuf);
4002 
4003     pthread_mutex_lock(&lib->incoming.mutex);
4004 
4005     mm = nxt_unit_mmap_at(&lib->incoming, hdr->id);
4006     if (nxt_slow_path(mm == NULL)) {
4007         nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array");
4008 
4009         munmap(mem, PORT_MMAP_SIZE);
4010 
4011         rc = NXT_UNIT_ERROR;
4012 
4013     } else {
4014         mm->hdr = hdr;
4015 
4016         hdr->sent_over = 0xFFFFu;
4017 
4018         nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf);
4019         nxt_queue_init(&mm->awaiting_rbuf);
4020 
4021         rc = NXT_UNIT_OK;
4022     }
4023 
4024     pthread_mutex_unlock(&lib->incoming.mutex);
4025 
4026     nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
4027 
4028         ctx_impl = rbuf->ctx_impl;
4029 
4030         pthread_mutex_lock(&ctx_impl->mutex);
4031 
4032         nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link);
4033 
4034         pthread_mutex_unlock(&ctx_impl->mutex);
4035 
4036         nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
4037 
4038         nxt_unit_awake_ctx(ctx, ctx_impl);
4039 
4040     } nxt_queue_loop;
4041 
4042     return rc;
4043 }
4044 
4045 
4046 static void
4047 nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl)
4048 {
4049     nxt_port_msg_t  msg;
4050 
4051     if (nxt_fast_path(ctx == &ctx_impl->ctx)) {
4052         return;
4053     }
4054 
4055     if (nxt_slow_path(ctx_impl->read_port == NULL
4056                       || ctx_impl->read_port->out_fd == -1))
4057     {
4058         nxt_unit_alert(ctx, "target context read_port is NULL or not writable");
4059 
4060         return;
4061     }
4062 
4063     memset(&msg, 0, sizeof(nxt_port_msg_t));
4064 
4065     msg.type = _NXT_PORT_MSG_RPC_READY;
4066 
4067     (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
4068                               &msg, sizeof(msg), NULL, 0);
4069 }
4070 
4071 
4072 static void
4073 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
4074 {
4075     pthread_mutex_init(&mmaps->mutex, NULL);
4076 
4077     mmaps->size = 0;
4078     mmaps->cap = 0;
4079     mmaps->elts = NULL;
4080     mmaps->allocated_chunks = 0;
4081 }
4082 
4083 
4084 nxt_inline void
4085 nxt_unit_process_use(nxt_unit_process_t *process)
4086 {
4087     nxt_atomic_fetch_add(&process->use_count, 1);
4088 }
4089 
4090 
4091 nxt_inline void
4092 nxt_unit_process_release(nxt_unit_process_t *process)
4093 {
4094     long c;
4095 
4096     c = nxt_atomic_fetch_add(&process->use_count, -1);
4097 
4098     if (c == 1) {
4099         nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
4100 
4101         nxt_unit_free(NULL, process);
4102     }
4103 }
4104 
4105 
4106 static void
4107 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
4108 {
4109     nxt_unit_mmap_t  *mm, *end;
4110 
4111     if (mmaps->elts != NULL) {
4112         end = mmaps->elts + mmaps->size;
4113 
4114         for (mm = mmaps->elts; mm < end; mm++) {
4115             munmap(mm->hdr, PORT_MMAP_SIZE);
4116         }
4117 
4118         nxt_unit_free(NULL, mmaps->elts);
4119     }
4120 
4121     pthread_mutex_destroy(&mmaps->mutex);
4122 }
4123 
4124 
4125 static int
4126 nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
4127     pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
4128     nxt_unit_read_buf_t *rbuf)
4129 {
4130     int                  res, need_rbuf;
4131     nxt_unit_mmap_t      *mm;
4132     nxt_unit_ctx_impl_t  *ctx_impl;
4133 
4134     mm = nxt_unit_mmap_at(mmaps, id);
4135     if (nxt_slow_path(mm == NULL)) {
4136         nxt_unit_alert(ctx, "failed to allocate mmap");
4137 
4138         pthread_mutex_unlock(&mmaps->mutex);
4139 
4140         *hdr = NULL;
4141 
4142         return NXT_UNIT_ERROR;
4143     }
4144 
4145     *hdr = mm->hdr;
4146 
4147     if (nxt_fast_path(*hdr != NULL)) {
4148         return NXT_UNIT_OK;
4149     }
4150 
4151     need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf);
4152 
4153     nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link);
4154 
4155     pthread_mutex_unlock(&mmaps->mutex);
4156 
4157     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4158 
4159     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
4160 
4161     if (need_rbuf) {
4162         res = nxt_unit_get_mmap(ctx, pid, id);
4163         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
4164             return NXT_UNIT_ERROR;
4165         }
4166     }
4167 
4168     return NXT_UNIT_AGAIN;
4169 }
4170 
4171 
4172 static int
4173 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
4174     nxt_unit_read_buf_t *rbuf)
4175 {
4176     int                     res;
4177     void                    *start;
4178     uint32_t                size;
4179     nxt_unit_impl_t         *lib;
4180     nxt_unit_mmaps_t        *mmaps;
4181     nxt_unit_mmap_buf_t     *b, **incoming_tail;
4182     nxt_port_mmap_msg_t     *mmap_msg, *end;
4183     nxt_port_mmap_header_t  *hdr;
4184 
4185     if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
4186         nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
4187                       recv_msg->stream, (int) recv_msg->size);
4188 
4189         return NXT_UNIT_ERROR;
4190     }
4191 
4192     mmap_msg = recv_msg->start;
4193     end = nxt_pointer_to(recv_msg->start, recv_msg->size);
4194 
4195     incoming_tail = &recv_msg->incoming_buf;
4196 
4197     /* Allocating buffer structures. */
4198     for (; mmap_msg < end; mmap_msg++) {
4199         b = nxt_unit_mmap_buf_get(ctx);
4200         if (nxt_slow_path(b == NULL)) {
4201             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
4202                           recv_msg->stream);
4203 
4204             while (recv_msg->incoming_buf != NULL) {
4205                 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4206             }
4207 
4208             return NXT_UNIT_ERROR;
4209         }
4210 
4211         nxt_unit_mmap_buf_insert(incoming_tail, b);
4212         incoming_tail = &b->next;
4213     }
4214 
4215     b = recv_msg->incoming_buf;
4216     mmap_msg = recv_msg->start;
4217 
4218     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4219 
4220     mmaps = &lib->incoming;
4221 
4222     pthread_mutex_lock(&mmaps->mutex);
4223 
4224     for (; mmap_msg < end; mmap_msg++) {
4225         res = nxt_unit_check_rbuf_mmap(ctx, mmaps,
4226                                        recv_msg->pid, mmap_msg->mmap_id,
4227                                        &hdr, rbuf);
4228 
4229         if (nxt_slow_path(res != NXT_UNIT_OK)) {
4230             while (recv_msg->incoming_buf != NULL) {
4231                 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4232             }
4233 
4234             return res;
4235         }
4236 
4237         start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
4238         size = mmap_msg->size;
4239 
4240         if (recv_msg->start == mmap_msg) {
4241             recv_msg->start = start;
4242             recv_msg->size = size;
4243         }
4244 
4245         b->buf.start = start;
4246         b->buf.free = start;
4247         b->buf.end = b->buf.start + size;
4248         b->hdr = hdr;
4249 
4250         b = b->next;
4251 
4252         nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
4253                        recv_msg->stream,
4254                        start, (int) size,
4255                        (int) hdr->src_pid, (int) hdr->dst_pid,
4256                        (int) hdr->id, (int) mmap_msg->chunk_id,
4257                        (int) mmap_msg->size);
4258     }
4259 
4260     pthread_mutex_unlock(&mmaps->mutex);
4261 
4262     return NXT_UNIT_OK;
4263 }
4264 
4265 
4266 static int
4267 nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
4268 {
4269     ssize_t              res;
4270     nxt_unit_impl_t      *lib;
4271     nxt_unit_ctx_impl_t  *ctx_impl;
4272 
4273     struct {
4274         nxt_port_msg_t           msg;
4275         nxt_port_msg_get_mmap_t  get_mmap;
4276     } m;
4277 
4278     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4279     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4280 
4281     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
4282 
4283     m.msg.pid = lib->pid;
4284     m.msg.reply_port = ctx_impl->read_port->id.id;
4285     m.msg.type = _NXT_PORT_MSG_GET_MMAP;
4286 
4287     m.get_mmap.id = id;
4288 
4289     nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
4290 
4291     res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
4292     if (nxt_slow_path(res != sizeof(m))) {
4293         return NXT_UNIT_ERROR;
4294     }
4295 
4296     return NXT_UNIT_OK;
4297 }
4298 
4299 
4300 static void
4301 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr,
4302     void *start, uint32_t size)
4303 {
4304     int              freed_chunks;
4305     u_char           *p, *end;
4306     nxt_chunk_id_t   c;
4307     nxt_unit_impl_t  *lib;
4308 
4309     memset(start, 0xA5, size);
4310 
4311     p = start;
4312     end = p + size;
4313     c = nxt_port_mmap_chunk_id(hdr, p);
4314     freed_chunks = 0;
4315 
4316     while (p < end) {
4317         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
4318 
4319         p += PORT_MMAP_CHUNK_SIZE;
4320         c++;
4321         freed_chunks++;
4322     }
4323 
4324     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4325 
4326     if (hdr->src_pid == lib->pid && freed_chunks != 0) {
4327         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks);
4328 
4329         nxt_unit_debug(ctx, "allocated_chunks %d",
4330                        (int) lib->outgoing.allocated_chunks);
4331     }
4332 
4333     if (hdr->dst_pid == lib->pid
4334         && freed_chunks != 0
4335         && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
4336     {
4337         nxt_unit_send_shm_ack(ctx, hdr->src_pid);
4338     }
4339 }
4340 
4341 
4342 static int
4343 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
4344 {
4345     ssize_t          res;
4346     nxt_port_msg_t   msg;
4347     nxt_unit_impl_t  *lib;
4348 
4349     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4350 
4351     msg.stream = 0;
4352     msg.pid = lib->pid;
4353     msg.reply_port = 0;
4354     msg.type = _NXT_PORT_MSG_SHM_ACK;
4355     msg.last = 0;
4356     msg.mmap = 0;
4357     msg.nf = 0;
4358     msg.mf = 0;
4359     msg.tracking = 0;
4360 
4361     res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
4362     if (nxt_slow_path(res != sizeof(msg))) {
4363         return NXT_UNIT_ERROR;
4364     }
4365 
4366     return NXT_UNIT_OK;
4367 }
4368 
4369 
4370 static nxt_int_t
4371 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
4372 {
4373     nxt_process_t  *process;
4374 
4375     process = data;
4376 
4377     if (lhq->key.length == sizeof(pid_t)
4378         && *(pid_t *) lhq->key.start == process->pid)
4379     {
4380         return NXT_OK;
4381     }
4382 
4383     return NXT_DECLINED;
4384 }
4385 
4386 
4387 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
4388     NXT_LVLHSH_DEFAULT,
4389     nxt_unit_lvlhsh_pid_test,
4390     nxt_unit_lvlhsh_alloc,
4391     nxt_unit_lvlhsh_free,
4392 };
4393 
4394 
4395 static inline void
4396 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
4397 {
4398     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
4399     lhq->key.length = sizeof(*pid);
4400     lhq->key.start = (u_char *) pid;
4401     lhq->proto = &lvlhsh_processes_proto;
4402 }
4403 
4404 
4405 static nxt_unit_process_t *
4406 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
4407 {
4408     nxt_unit_impl_t     *lib;
4409     nxt_unit_process_t  *process;
4410     nxt_lvlhsh_query_t  lhq;
4411 
4412     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4413 
4414     nxt_unit_process_lhq_pid(&lhq, &pid);
4415 
4416     if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
4417         process = lhq.value;
4418         nxt_unit_process_use(process);
4419 
4420         return process;
4421     }
4422 
4423     process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t));
4424     if (nxt_slow_path(process == NULL)) {
4425         nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid);
4426 
4427         return NULL;
4428     }
4429 
4430     process->pid = pid;
4431     process->use_count = 2;
4432     process->next_port_id = 0;
4433     process->lib = lib;
4434 
4435     nxt_queue_init(&process->ports);
4436 
4437     lhq.replace = 0;
4438     lhq.value = process;
4439 
4440     switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
4441 
4442     case NXT_OK:
4443         break;
4444 
4445     default:
4446         nxt_unit_alert(ctx, "process %d insert failed", (int) pid);
4447 
4448         nxt_unit_free(ctx, process);
4449         process = NULL;
4450         break;
4451     }
4452 
4453     return process;
4454 }
4455 
4456 
4457 static nxt_unit_process_t *
4458 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
4459 {
4460     int                 rc;
4461     nxt_lvlhsh_query_t  lhq;
4462 
4463     nxt_unit_process_lhq_pid(&lhq, &pid);
4464 
4465     if (remove) {
4466         rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
4467 
4468     } else {
4469         rc = nxt_lvlhsh_find(&lib->processes, &lhq);
4470     }
4471 
4472     if (rc == NXT_OK) {
4473         if (!remove) {
4474             nxt_unit_process_use(lhq.value);
4475         }
4476 
4477         return lhq.value;
4478     }
4479 
4480     return NULL;
4481 }
4482 
4483 
4484 static nxt_unit_process_t *
4485 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
4486 {
4487     return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
4488 }
4489 
4490 
4491 int
4492 nxt_unit_run(nxt_unit_ctx_t *ctx)
4493 {
4494     int                  rc;
4495     nxt_unit_ctx_impl_t  *ctx_impl;
4496 
4497     nxt_unit_ctx_use(ctx);
4498 
4499     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4500 
4501     rc = NXT_UNIT_OK;
4502 
4503     while (nxt_fast_path(ctx_impl->online)) {
4504         rc = nxt_unit_run_once_impl(ctx);
4505 
4506         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4507             nxt_unit_quit(ctx);
4508             break;
4509         }
4510     }
4511 
4512     nxt_unit_ctx_release(ctx);
4513 
4514     return rc;
4515 }
4516 
4517 
4518 int
4519 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
4520 {
4521     int  rc;
4522 
4523     nxt_unit_ctx_use(ctx);
4524 
4525     rc = nxt_unit_run_once_impl(ctx);
4526 
4527     nxt_unit_ctx_release(ctx);
4528 
4529     return rc;
4530 }
4531 
4532 
4533 static int
4534 nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
4535 {
4536     int                  rc;
4537     nxt_unit_read_buf_t  *rbuf;
4538 
4539     rbuf = nxt_unit_read_buf_get(ctx);
4540     if (nxt_slow_path(rbuf == NULL)) {
4541         return NXT_UNIT_ERROR;
4542     }
4543 
4544     rc = nxt_unit_read_buf(ctx, rbuf);
4545     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4546         nxt_unit_read_buf_release(ctx, rbuf);
4547 
4548         return rc;
4549     }
4550 
4551     rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4552     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4553         return NXT_UNIT_ERROR;
4554     }
4555 
4556     rc = nxt_unit_process_pending_rbuf(ctx);
4557     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4558         return NXT_UNIT_ERROR;
4559     }
4560 
4561     nxt_unit_process_ready_req(ctx);
4562 
4563     return rc;
4564 }
4565 
4566 
4567 static int
4568 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4569 {
4570     int                   nevents, res, err;
4571     nxt_unit_impl_t       *lib;
4572     nxt_unit_ctx_impl_t   *ctx_impl;
4573     nxt_unit_port_impl_t  *port_impl;
4574     struct pollfd         fds[2];
4575 
4576     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4577 
4578     if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) {
4579         return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4580     }
4581 
4582     port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
4583                                  port);
4584 
4585     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4586 
4587 retry:
4588 
4589     if (port_impl->from_socket == 0) {
4590         res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf);
4591         if (res == NXT_UNIT_OK) {
4592             if (nxt_unit_is_read_socket(rbuf)) {
4593                 port_impl->from_socket++;
4594 
4595                 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
4596                                (int) ctx_impl->read_port->id.pid,
4597                                (int) ctx_impl->read_port->id.id,
4598                                port_impl->from_socket);
4599 
4600             } else {
4601                 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
4602                                (int) ctx_impl->read_port->id.pid,
4603                                (int) ctx_impl->read_port->id.id,
4604                                (int) rbuf->size);
4605 
4606                 return NXT_UNIT_OK;
4607             }
4608         }
4609     }
4610 
4611     res = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
4612     if (res == NXT_UNIT_OK) {
4613         return NXT_UNIT_OK;
4614     }
4615 
4616     fds[0].fd = ctx_impl->read_port->in_fd;
4617     fds[0].events = POLLIN;
4618     fds[0].revents = 0;
4619 
4620     fds[1].fd = lib->shared_port->in_fd;
4621     fds[1].events = POLLIN;
4622     fds[1].revents = 0;
4623 
4624     nevents = poll(fds, 2, -1);
4625     if (nxt_slow_path(nevents == -1)) {
4626         err = errno;
4627 
4628         if (err == EINTR) {
4629             goto retry;
4630         }
4631 
4632         nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",
4633                        fds[0].fd, fds[1].fd, strerror(err), err);
4634 
4635         rbuf->size = -1;
4636 
4637         return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
4638     }
4639 
4640     nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]",
4641                    fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4642                    fds[1].revents);
4643 
4644     if ((fds[0].revents & POLLIN) != 0) {
4645         res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4646         if (res == NXT_UNIT_AGAIN) {
4647             goto retry;
4648         }
4649 
4650         return res;
4651     }
4652 
4653     if ((fds[1].revents & POLLIN) != 0) {
4654         res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4655         if (res == NXT_UNIT_AGAIN) {
4656             goto retry;
4657         }
4658 
4659         return res;
4660     }
4661 
4662     nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]",
4663                    fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4664                    fds[1].revents);
4665 
4666     return NXT_UNIT_ERROR;
4667 }
4668 
4669 
4670 static int
4671 nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
4672 {
4673     int                  rc;
4674     nxt_queue_t          pending_rbuf;
4675     nxt_unit_ctx_impl_t  *ctx_impl;
4676     nxt_unit_read_buf_t  *rbuf;
4677 
4678     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4679 
4680     pthread_mutex_lock(&ctx_impl->mutex);
4681 
4682     if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
4683         pthread_mutex_unlock(&ctx_impl->mutex);
4684 
4685         return NXT_UNIT_OK;
4686     }
4687 
4688     nxt_queue_init(&pending_rbuf);
4689 
4690     nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf);
4691     nxt_queue_init(&ctx_impl->pending_rbuf);
4692 
4693     pthread_mutex_unlock(&ctx_impl->mutex);
4694 
4695     rc = NXT_UNIT_OK;
4696 
4697     nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
4698 
4699         if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
4700             rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL);
4701 
4702         } else {
4703             nxt_unit_read_buf_release(ctx, rbuf);
4704         }
4705 
4706     } nxt_queue_loop;
4707 
4708     return rc;
4709 }
4710 
4711 
4712 static void
4713 nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
4714 {
4715     int                           res;
4716     nxt_queue_t                   ready_req;
4717     nxt_unit_impl_t               *lib;
4718     nxt_unit_ctx_impl_t           *ctx_impl;
4719     nxt_unit_request_info_t       *req;
4720     nxt_unit_request_info_impl_t  *req_impl;
4721 
4722     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4723 
4724     pthread_mutex_lock(&ctx_impl->mutex);
4725 
4726     if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
4727         pthread_mutex_unlock(&ctx_impl->mutex);
4728 
4729         return;
4730     }
4731 
4732     nxt_queue_init(&ready_req);
4733 
4734     nxt_queue_add(&ready_req, &ctx_impl->ready_req);
4735     nxt_queue_init(&ctx_impl->ready_req);
4736 
4737     pthread_mutex_unlock(&ctx_impl->mutex);
4738 
4739     nxt_queue_each(req_impl, &ready_req,
4740                    nxt_unit_request_info_impl_t, port_wait_link)
4741     {
4742         lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
4743 
4744         req = &req_impl->req;
4745 
4746         res = nxt_unit_send_req_headers_ack(req);
4747         if (nxt_slow_path(res != NXT_UNIT_OK)) {
4748             nxt_unit_request_done(req, NXT_UNIT_ERROR);
4749 
4750             continue;
4751         }
4752 
4753         if (req->content_length
4754             > (uint64_t) (req->content_buf->end - req->content_buf->free))
4755         {
4756             res = nxt_unit_request_hash_add(ctx, req);
4757             if (nxt_slow_path(res != NXT_UNIT_OK)) {
4758                 nxt_unit_req_warn(req, "failed to add request to hash");
4759 
4760                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
4761 
4762                 continue;
4763             }
4764 
4765             /*
4766              * If application have separate data handler, we may start
4767              * request processing and process data when it is arrived.
4768              */
4769             if (lib->callbacks.data_handler == NULL) {
4770                 continue;
4771             }
4772         }
4773 
4774         lib->callbacks.request_handler(&req_impl->req);
4775 
4776     } nxt_queue_loop;
4777 }
4778 
4779 
4780 int
4781 nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
4782 {
4783     int                  rc;
4784     nxt_unit_read_buf_t  *rbuf;
4785     nxt_unit_ctx_impl_t  *ctx_impl;
4786 
4787     nxt_unit_ctx_use(ctx);
4788 
4789     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4790 
4791     rc = NXT_UNIT_OK;
4792 
4793     while (nxt_fast_path(ctx_impl->online)) {
4794         rbuf = nxt_unit_read_buf_get(ctx);
4795         if (nxt_slow_path(rbuf == NULL)) {
4796             rc = NXT_UNIT_ERROR;
4797             break;
4798         }
4799 
4800     retry:
4801 
4802         rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4803         if (rc == NXT_UNIT_AGAIN) {
4804             goto retry;
4805         }
4806 
4807         rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4808         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4809             break;
4810         }
4811 
4812         rc = nxt_unit_process_pending_rbuf(ctx);
4813         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4814             break;
4815         }
4816 
4817         nxt_unit_process_ready_req(ctx);
4818     }
4819 
4820     nxt_unit_ctx_release(ctx);
4821 
4822     return rc;
4823 }
4824 
4825 
4826 nxt_inline int
4827 nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf)
4828 {
4829     nxt_port_msg_t  *port_msg;
4830 
4831     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4832         port_msg = (nxt_port_msg_t *) rbuf->buf;
4833 
4834         return port_msg->type == _NXT_PORT_MSG_READ_QUEUE;
4835     }
4836 
4837     return 0;
4838 }
4839 
4840 
4841 nxt_inline int
4842 nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf)
4843 {
4844     if (nxt_fast_path(rbuf->size == 1)) {
4845         return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET;
4846     }
4847 
4848     return 0;
4849 }
4850 
4851 
4852 nxt_inline int
4853 nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf)
4854 {
4855     nxt_port_msg_t  *port_msg;
4856 
4857     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4858         port_msg = (nxt_port_msg_t *) rbuf->buf;
4859 
4860         return port_msg->type == _NXT_PORT_MSG_SHM_ACK;
4861     }
4862 
4863     return 0;
4864 }
4865 
4866 
4867 nxt_inline int
4868 nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf)
4869 {
4870     nxt_port_msg_t  *port_msg;
4871 
4872     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4873         port_msg = (nxt_port_msg_t *) rbuf->buf;
4874 
4875         return port_msg->type == _NXT_PORT_MSG_QUIT;
4876     }
4877 
4878     return 0;
4879 }
4880 
4881 
4882 int
4883 nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
4884 {
4885     int                  rc;
4886     nxt_unit_impl_t      *lib;
4887     nxt_unit_read_buf_t  *rbuf;
4888     nxt_unit_ctx_impl_t  *ctx_impl;
4889 
4890     nxt_unit_ctx_use(ctx);
4891 
4892     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4893     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4894 
4895     rc = NXT_UNIT_OK;
4896 
4897     while (nxt_fast_path(ctx_impl->online)) {
4898         rbuf = nxt_unit_read_buf_get(ctx);
4899         if (nxt_slow_path(rbuf == NULL)) {
4900             rc = NXT_UNIT_ERROR;
4901             break;
4902         }
4903 
4904     retry:
4905 
4906         rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4907         if (rc == NXT_UNIT_AGAIN) {
4908             goto retry;
4909         }
4910 
4911         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4912             nxt_unit_read_buf_release(ctx, rbuf);
4913             break;
4914         }
4915 
4916         rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4917         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4918             break;
4919         }
4920     }
4921 
4922     nxt_unit_ctx_release(ctx);
4923 
4924     return rc;
4925 }
4926 
4927 
4928 nxt_unit_request_info_t *
4929 nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
4930 {
4931     int                      rc;
4932     nxt_unit_impl_t          *lib;
4933     nxt_unit_read_buf_t      *rbuf;
4934     nxt_unit_ctx_impl_t      *ctx_impl;
4935     nxt_unit_request_info_t  *req;
4936 
4937     nxt_unit_ctx_use(ctx);
4938 
4939     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4940     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4941 
4942     req = NULL;
4943 
4944     if (nxt_slow_path(!ctx_impl->online)) {
4945         goto done;
4946     }
4947 
4948     rbuf = nxt_unit_read_buf_get(ctx);
4949     if (nxt_slow_path(rbuf == NULL)) {
4950         goto done;
4951     }
4952 
4953     rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
4954     if (rc != NXT_UNIT_OK) {
4955         goto done;
4956     }
4957 
4958     (void) nxt_unit_process_msg(ctx, rbuf, &req);
4959 
4960 done:
4961 
4962     nxt_unit_ctx_release(ctx);
4963 
4964     return req;
4965 }
4966 
4967 
4968 int
4969 nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx)
4970 {
4971     nxt_unit_impl_t  *lib;
4972 
4973     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4974 
4975     return (ctx == &lib->main_ctx.ctx);
4976 }
4977 
4978 
4979 int
4980 nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
4981 {
4982     int  rc;
4983 
4984     nxt_unit_ctx_use(ctx);
4985 
4986     rc = nxt_unit_process_port_msg_impl(ctx, port);
4987 
4988     nxt_unit_ctx_release(ctx);
4989 
4990     return rc;
4991 }
4992 
4993 
4994 static int
4995 nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
4996 {
4997     int                  rc;
4998     nxt_unit_impl_t      *lib;
4999     nxt_unit_read_buf_t  *rbuf;
5000     nxt_unit_ctx_impl_t  *ctx_impl;
5001 
5002     rbuf = nxt_unit_read_buf_get(ctx);
5003     if (nxt_slow_path(rbuf == NULL)) {
5004         return NXT_UNIT_ERROR;
5005     }
5006 
5007     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5008     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5009 
5010 retry:
5011 
5012     if (port == lib->shared_port) {
5013         rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
5014 
5015     } else {
5016         rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
5017     }
5018 
5019     if (rc != NXT_UNIT_OK) {
5020         nxt_unit_read_buf_release(ctx, rbuf);
5021         return rc;
5022     }
5023 
5024     rc = nxt_unit_process_msg(ctx, rbuf, NULL);
5025     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
5026         return NXT_UNIT_ERROR;
5027     }
5028 
5029     rc = nxt_unit_process_pending_rbuf(ctx);
5030     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
5031         return NXT_UNIT_ERROR;
5032     }
5033 
5034     nxt_unit_process_ready_req(ctx);
5035 
5036     rbuf = nxt_unit_read_buf_get(ctx);
5037     if (nxt_slow_path(rbuf == NULL)) {
5038         return NXT_UNIT_ERROR;
5039     }
5040 
5041     if (ctx_impl->online) {
5042         goto retry;
5043     }
5044 
5045     return rc;
5046 }
5047 
5048 
5049 void
5050 nxt_unit_done(nxt_unit_ctx_t *ctx)
5051 {
5052     nxt_unit_ctx_release(ctx);
5053 }
5054 
5055 
5056 nxt_unit_ctx_t *
5057 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
5058 {
5059     int                   rc, queue_fd;
5060     void                  *mem;
5061     nxt_unit_impl_t       *lib;
5062     nxt_unit_port_t       *port;
5063     nxt_unit_ctx_impl_t   *new_ctx;
5064     nxt_unit_port_impl_t  *port_impl;
5065 
5066     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5067 
5068     new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t)
5069                                    + lib->request_data_size);
5070     if (nxt_slow_path(new_ctx == NULL)) {
5071         nxt_unit_alert(ctx, "failed to allocate context");
5072 
5073         return NULL;
5074     }
5075 
5076     rc = nxt_unit_ctx_init(lib, new_ctx, data);
5077     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5078          nxt_unit_free(ctx, new_ctx);
5079 
5080          return NULL;
5081     }
5082 
5083     queue_fd = -1;
5084 
5085     port = nxt_unit_create_port(&new_ctx->ctx);
5086     if (nxt_slow_path(port == NULL)) {
5087         goto fail;
5088     }
5089 
5090     new_ctx->read_port = port;
5091 
5092     queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t));
5093     if (nxt_slow_path(queue_fd == -1)) {
5094         goto fail;
5095     }
5096 
5097     mem = mmap(NULL, sizeof(nxt_port_queue_t),
5098                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
5099     if (nxt_slow_path(mem == MAP_FAILED)) {
5100         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
5101                        strerror(errno), errno);
5102 
5103         goto fail;
5104     }
5105 
5106     nxt_port_queue_init(mem);
5107 
5108     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5109     port_impl->queue = mem;
5110 
5111     rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd);
5112     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5113         goto fail;
5114     }
5115 
5116     nxt_unit_close(queue_fd);
5117 
5118     return &new_ctx->ctx;
5119 
5120 fail:
5121 
5122     if (queue_fd != -1) {
5123         nxt_unit_close(queue_fd);
5124     }
5125 
5126     nxt_unit_ctx_release(&new_ctx->ctx);
5127 
5128     return NULL;
5129 }
5130 
5131 
5132 static void
5133 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
5134 {
5135     nxt_unit_impl_t                  *lib;
5136     nxt_unit_mmap_buf_t              *mmap_buf;
5137     nxt_unit_read_buf_t              *rbuf;
5138     nxt_unit_request_info_impl_t     *req_impl;
5139     nxt_unit_websocket_frame_impl_t  *ws_impl;
5140 
5141     lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
5142 
5143     nxt_queue_each(req_impl, &ctx_impl->active_req,
5144                    nxt_unit_request_info_impl_t, link)
5145     {
5146         nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
5147 
5148         nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
5149 
5150     } nxt_queue_loop;
5151 
5152     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]);
5153     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]);
5154 
5155     while (ctx_impl->free_buf != NULL) {
5156         mmap_buf = ctx_impl->free_buf;
5157         nxt_unit_mmap_buf_unlink(mmap_buf);
5158         nxt_unit_free(&ctx_impl->ctx, mmap_buf);
5159     }
5160 
5161     nxt_queue_each(req_impl, &ctx_impl->free_req,
5162                    nxt_unit_request_info_impl_t, link)
5163     {
5164         nxt_unit_request_info_free(req_impl);
5165 
5166     } nxt_queue_loop;
5167 
5168     nxt_queue_each(ws_impl, &ctx_impl->free_ws,
5169                    nxt_unit_websocket_frame_impl_t, link)
5170     {
5171         nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl);
5172 
5173     } nxt_queue_loop;
5174 
5175     nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link)
5176     {
5177         if (rbuf != &ctx_impl->ctx_read_buf) {
5178             nxt_unit_free(&ctx_impl->ctx, rbuf);
5179         }
5180     } nxt_queue_loop;
5181 
5182     pthread_mutex_destroy(&ctx_impl->mutex);
5183 
5184     pthread_mutex_lock(&lib->mutex);
5185 
5186     nxt_queue_remove(&ctx_impl->link);
5187 
5188     pthread_mutex_unlock(&lib->mutex);
5189 
5190     if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5191         nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
5192         nxt_unit_port_release(ctx_impl->read_port);
5193     }
5194 
5195     if (ctx_impl != &lib->main_ctx) {
5196         nxt_unit_free(&lib->main_ctx.ctx, ctx_impl);
5197     }
5198 
5199     nxt_unit_lib_release(lib);
5200 }
5201 
5202 
5203 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
5204 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
5205 #define NXT_UNIX_SOCKET  SOCK_SEQPACKET
5206 #else
5207 #define NXT_UNIX_SOCKET  SOCK_DGRAM
5208 #endif
5209 
5210 
5211 void
5212 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
5213 {
5214     nxt_unit_port_hash_id_t  port_hash_id;
5215 
5216     port_hash_id.pid = pid;
5217     port_hash_id.id = id;
5218 
5219     port_id->pid = pid;
5220     port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
5221     port_id->id = id;
5222 }
5223 
5224 
5225 static nxt_unit_port_t *
5226 nxt_unit_create_port(nxt_unit_ctx_t *ctx)
5227 {
5228     int                 rc, port_sockets[2];
5229     nxt_unit_impl_t     *lib;
5230     nxt_unit_port_t     new_port, *port;
5231     nxt_unit_process_t  *process;
5232 
5233     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5234 
5235     rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
5236     if (nxt_slow_path(rc != 0)) {
5237         nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
5238                       strerror(errno), errno);
5239 
5240         return NULL;
5241     }
5242 
5243     nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
5244                    port_sockets[0], port_sockets[1]);
5245 
5246     pthread_mutex_lock(&lib->mutex);
5247 
5248     process = nxt_unit_process_get(ctx, lib->pid);
5249     if (nxt_slow_path(process == NULL)) {
5250         pthread_mutex_unlock(&lib->mutex);
5251 
5252         nxt_unit_close(port_sockets[0]);
5253         nxt_unit_close(port_sockets[1]);
5254 
5255         return NULL;
5256     }
5257 
5258     nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
5259 
5260     new_port.in_fd = port_sockets[0];
5261     new_port.out_fd = port_sockets[1];
5262     new_port.data = NULL;
5263 
5264     pthread_mutex_unlock(&lib->mutex);
5265 
5266     nxt_unit_process_release(process);
5267 
5268     port = nxt_unit_add_port(ctx, &new_port, NULL);
5269     if (nxt_slow_path(port == NULL)) {
5270         nxt_unit_close(port_sockets[0]);
5271         nxt_unit_close(port_sockets[1]);
5272     }
5273 
5274     return port;
5275 }
5276 
5277 
5278 static int
5279 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
5280     nxt_unit_port_t *port, int queue_fd)
5281 {
5282     ssize_t          res;
5283     nxt_unit_impl_t  *lib;
5284     int              fds[2] = { port->out_fd, queue_fd };
5285 
5286     struct {
5287         nxt_port_msg_t            msg;
5288         nxt_port_msg_new_port_t   new_port;
5289     } m;
5290 
5291     union {
5292         struct cmsghdr  cm;
5293         char            space[CMSG_SPACE(sizeof(int) * 2)];
5294     } cmsg;
5295 
5296     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5297 
5298     m.msg.stream = 0;
5299     m.msg.pid = lib->pid;
5300     m.msg.reply_port = 0;
5301     m.msg.type = _NXT_PORT_MSG_NEW_PORT;
5302     m.msg.last = 0;
5303     m.msg.mmap = 0;
5304     m.msg.nf = 0;
5305     m.msg.mf = 0;
5306     m.msg.tracking = 0;
5307 
5308     m.new_port.id = port->id.id;
5309     m.new_port.pid = port->id.pid;
5310     m.new_port.type = NXT_PROCESS_APP;
5311     m.new_port.max_size = 16 * 1024;
5312     m.new_port.max_share = 64 * 1024;
5313 
5314     memset(&cmsg, 0, sizeof(cmsg));
5315 
5316     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2);
5317     cmsg.cm.cmsg_level = SOL_SOCKET;
5318     cmsg.cm.cmsg_type = SCM_RIGHTS;
5319 
5320     /*
5321      * memcpy() is used instead of simple
5322      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
5323      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
5324      *   dereferencing type-punned pointer will break strict-aliasing rules
5325      *
5326      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
5327      * in the same simple assignment as in the code above.
5328      */
5329     memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2);
5330 
5331     res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
5332 
5333     return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
5334 }
5335 
5336 
5337 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port)
5338 {
5339     nxt_unit_port_impl_t  *port_impl;
5340 
5341     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5342 
5343     nxt_atomic_fetch_add(&port_impl->use_count, 1);
5344 }
5345 
5346 
5347 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
5348 {
5349     long                  c;
5350     nxt_unit_port_impl_t  *port_impl;
5351 
5352     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5353 
5354     c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
5355 
5356     if (c == 1) {
5357         nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d",
5358                        (int) port->id.pid, (int) port->id.id,
5359                        port->in_fd, port->out_fd);
5360 
5361         nxt_unit_process_release(port_impl->process);
5362 
5363         if (port->in_fd != -1) {
5364             nxt_unit_close(port->in_fd);
5365 
5366             port->in_fd = -1;
5367         }
5368 
5369         if (port->out_fd != -1) {
5370             nxt_unit_close(port->out_fd);
5371 
5372             port->out_fd = -1;
5373         }
5374 
5375         if (port_impl->queue != NULL) {
5376             munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID)
5377                                      ? sizeof(nxt_app_queue_t)
5378                                      : sizeof(nxt_port_queue_t));
5379         }
5380 
5381         nxt_unit_free(NULL, port_impl);
5382     }
5383 }
5384 
5385 
5386 static nxt_unit_port_t *
5387 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
5388 {
5389     int                   rc, ready;
5390     nxt_queue_t           awaiting_req;
5391     nxt_unit_impl_t       *lib;
5392     nxt_unit_port_t       *old_port;
5393     nxt_unit_process_t    *process;
5394     nxt_unit_port_impl_t  *new_port, *old_port_impl;
5395 
5396     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5397 
5398     pthread_mutex_lock(&lib->mutex);
5399 
5400     old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
5401 
5402     if (nxt_slow_path(old_port != NULL)) {
5403         nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} "
5404                             "in_fd %d out_fd %d queue %p",
5405                             port->id.pid, port->id.id,
5406                             port->in_fd, port->out_fd, queue);
5407 
5408         if (old_port->data == NULL) {
5409             old_port->data = port->data;
5410             port->data = NULL;
5411         }
5412 
5413         if (old_port->in_fd == -1) {
5414             old_port->in_fd = port->in_fd;
5415             port->in_fd = -1;
5416         }
5417 
5418         if (port->in_fd != -1) {
5419             nxt_unit_close(port->in_fd);
5420             port->in_fd = -1;
5421         }
5422 
5423         if (old_port->out_fd == -1) {
5424             old_port->out_fd = port->out_fd;
5425             port->out_fd = -1;
5426         }
5427 
5428         if (port->out_fd != -1) {
5429             nxt_unit_close(port->out_fd);
5430             port->out_fd = -1;
5431         }
5432 
5433         *port = *old_port;
5434 
5435         nxt_queue_init(&awaiting_req);
5436 
5437         old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
5438 
5439         if (old_port_impl->queue == NULL) {
5440             old_port_impl->queue = queue;
5441         }
5442 
5443         ready = (port->in_fd != -1 || port->out_fd != -1);
5444 
5445         /*
5446          * Port can be market as 'ready' only after callbacks.add_port() call.
5447          * Otherwise, request may try to use the port before callback.
5448          */
5449         if (lib->callbacks.add_port == NULL && ready) {
5450             old_port_impl->ready = ready;
5451 
5452             if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5453                 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5454                 nxt_queue_init(&old_port_impl->awaiting_req);
5455             }
5456         }
5457 
5458         pthread_mutex_unlock(&lib->mutex);
5459 
5460         if (lib->callbacks.add_port != NULL && ready) {
5461             lib->callbacks.add_port(ctx, old_port);
5462 
5463             pthread_mutex_lock(&lib->mutex);
5464 
5465             old_port_impl->ready = ready;
5466 
5467             if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5468                 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5469                 nxt_queue_init(&old_port_impl->awaiting_req);
5470             }
5471 
5472             pthread_mutex_unlock(&lib->mutex);
5473         }
5474 
5475         nxt_unit_process_awaiting_req(ctx, &awaiting_req);
5476 
5477         return old_port;
5478     }
5479 
5480     new_port = NULL;
5481     ready = 0;
5482 
5483     nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
5484                    port->id.pid, port->id.id,
5485                    port->in_fd, port->out_fd, queue);
5486 
5487     process = nxt_unit_process_get(ctx, port->id.pid);
5488     if (nxt_slow_path(process == NULL)) {
5489         goto unlock;
5490     }
5491 
5492     if (port->id.id != NXT_UNIT_SHARED_PORT_ID
5493         && port->id.id >= process->next_port_id)
5494     {
5495         process->next_port_id = port->id.id + 1;
5496     }
5497 
5498     new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
5499     if (nxt_slow_path(new_port == NULL)) {
5500         nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
5501                        port->id.pid, port->id.id);
5502 
5503         goto unlock;
5504     }
5505 
5506     new_port->port = *port;
5507 
5508     rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
5509     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5510         nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
5511                        port->id.pid, port->id.id);
5512 
5513         nxt_unit_free(ctx, new_port);
5514 
5515         new_port = NULL;
5516 
5517         goto unlock;
5518     }
5519 
5520     nxt_queue_insert_tail(&process->ports, &new_port->link);
5521 
5522     new_port->use_count = 2;
5523     new_port->process = process;
5524     new_port->queue = queue;
5525     new_port->from_socket = 0;
5526     new_port->socket_rbuf = NULL;
5527 
5528     nxt_queue_init(&new_port->awaiting_req);
5529 
5530     ready = (port->in_fd != -1 || port->out_fd != -1);
5531 
5532     if (lib->callbacks.add_port == NULL) {
5533         new_port->ready = ready;
5534 
5535     } else {
5536         new_port->ready = 0;
5537     }
5538 
5539     process = NULL;
5540 
5541 unlock:
5542 
5543     pthread_mutex_unlock(&lib->mutex);
5544 
5545     if (nxt_slow_path(process != NULL)) {
5546         nxt_unit_process_release(process);
5547     }
5548 
5549     if (lib->callbacks.add_port != NULL && new_port != NULL && ready) {
5550         lib->callbacks.add_port(ctx, &new_port->port);
5551 
5552         nxt_queue_init(&awaiting_req);
5553 
5554         pthread_mutex_lock(&lib->mutex);
5555 
5556         new_port->ready = 1;
5557 
5558         if (!nxt_queue_is_empty(&new_port->awaiting_req)) {
5559             nxt_queue_add(&awaiting_req, &new_port->awaiting_req);
5560             nxt_queue_init(&new_port->awaiting_req);
5561         }
5562 
5563         pthread_mutex_unlock(&lib->mutex);
5564 
5565         nxt_unit_process_awaiting_req(ctx, &awaiting_req);
5566     }
5567 
5568     return (new_port == NULL) ? NULL : &new_port->port;
5569 }
5570 
5571 
5572 static void
5573 nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req)
5574 {
5575     nxt_unit_ctx_impl_t           *ctx_impl;
5576     nxt_unit_request_info_impl_t  *req_impl;
5577 
5578     nxt_queue_each(req_impl, awaiting_req,
5579                    nxt_unit_request_info_impl_t, port_wait_link)
5580     {
5581         nxt_queue_remove(&req_impl->port_wait_link);
5582 
5583         ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
5584                                     ctx);
5585 
5586         pthread_mutex_lock(&ctx_impl->mutex);
5587 
5588         nxt_queue_insert_tail(&ctx_impl->ready_req,
5589                               &req_impl->port_wait_link);
5590 
5591         pthread_mutex_unlock(&ctx_impl->mutex);
5592 
5593         nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
5594 
5595         nxt_unit_awake_ctx(ctx, ctx_impl);
5596 
5597     } nxt_queue_loop;
5598 }
5599 
5600 
5601 static void
5602 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5603 {
5604     nxt_unit_port_t       *port;
5605     nxt_unit_port_impl_t  *port_impl;
5606 
5607     pthread_mutex_lock(&lib->mutex);
5608 
5609     port = nxt_unit_remove_port_unsafe(lib, port_id);
5610 
5611     if (nxt_fast_path(port != NULL)) {
5612         port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5613 
5614         nxt_queue_remove(&port_impl->link);
5615     }
5616 
5617     pthread_mutex_unlock(&lib->mutex);
5618 
5619     if (lib->callbacks.remove_port != NULL && port != NULL) {
5620         lib->callbacks.remove_port(&lib->unit, port);
5621     }
5622 
5623     if (nxt_fast_path(port != NULL)) {
5624         nxt_unit_port_release(port);
5625     }
5626 }
5627 
5628 
5629 static nxt_unit_port_t *
5630 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5631 {
5632     nxt_unit_port_t  *port;
5633 
5634     port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
5635     if (nxt_slow_path(port == NULL)) {
5636         nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found",
5637                        (int) port_id->pid, (int) port_id->id);
5638 
5639         return NULL;
5640     }
5641 
5642     nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p",
5643                    (int) port_id->pid, (int) port_id->id,
5644                    port->in_fd, port->out_fd, port->data);
5645 
5646     return port;
5647 }
5648 
5649 
5650 static void
5651 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid)
5652 {
5653     nxt_unit_process_t  *process;
5654 
5655     pthread_mutex_lock(&lib->mutex);
5656 
5657     process = nxt_unit_process_find(lib, pid, 1);
5658     if (nxt_slow_path(process == NULL)) {
5659         nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid);
5660 
5661         pthread_mutex_unlock(&lib->mutex);
5662 
5663         return;
5664     }
5665 
5666     nxt_unit_remove_process(lib, process);
5667 
5668     if (lib->callbacks.remove_pid != NULL) {
5669         lib->callbacks.remove_pid(&lib->unit, pid);
5670     }
5671 }
5672 
5673 
5674 static void
5675 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
5676 {
5677     nxt_queue_t           ports;
5678     nxt_unit_port_impl_t  *port;
5679 
5680     nxt_queue_init(&ports);
5681 
5682     nxt_queue_add(&ports, &process->ports);
5683 
5684     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5685 
5686         nxt_unit_remove_port_unsafe(lib, &port->port.id);
5687 
5688     } nxt_queue_loop;
5689 
5690     pthread_mutex_unlock(&lib->mutex);
5691 
5692     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5693 
5694         nxt_queue_remove(&port->link);
5695 
5696         if (lib->callbacks.remove_port != NULL) {
5697             lib->callbacks.remove_port(&lib->unit, &port->port);
5698         }
5699 
5700         nxt_unit_port_release(&port->port);
5701 
5702     } nxt_queue_loop;
5703 
5704     nxt_unit_process_release(process);
5705 }
5706 
5707 
5708 static void
5709 nxt_unit_quit(nxt_unit_ctx_t *ctx)
5710 {
5711     nxt_port_msg_t       msg;
5712     nxt_unit_impl_t      *lib;
5713     nxt_unit_ctx_impl_t  *ctx_impl;
5714 
5715     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5716     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5717 
5718     if (!ctx_impl->online) {
5719         return;
5720     }
5721 
5722     ctx_impl->online = 0;
5723 
5724     if (lib->callbacks.quit != NULL) {
5725         lib->callbacks.quit(ctx);
5726     }
5727 
5728     if (ctx != &lib->main_ctx.ctx) {
5729         return;
5730     }
5731 
5732     memset(&msg, 0, sizeof(nxt_port_msg_t));
5733 
5734     msg.pid = lib->pid;
5735     msg.type = _NXT_PORT_MSG_QUIT;
5736 
5737     pthread_mutex_lock(&lib->mutex);
5738 
5739     nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
5740 
5741         if (ctx == &ctx_impl->ctx
5742             || ctx_impl->read_port == NULL
5743             || ctx_impl->read_port->out_fd == -1)
5744         {
5745             continue;
5746         }
5747 
5748         (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
5749                                   &msg, sizeof(msg), NULL, 0);
5750 
5751     } nxt_queue_loop;
5752 
5753     pthread_mutex_unlock(&lib->mutex);
5754 }
5755 
5756 
5757 static int
5758 nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
5759 {
5760     ssize_t              res;
5761     nxt_unit_impl_t      *lib;
5762     nxt_unit_ctx_impl_t  *ctx_impl;
5763 
5764     struct {
5765         nxt_port_msg_t           msg;
5766         nxt_port_msg_get_port_t  get_port;
5767     } m;
5768 
5769     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5770     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5771 
5772     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
5773 
5774     m.msg.pid = lib->pid;
5775     m.msg.reply_port = ctx_impl->read_port->id.id;
5776     m.msg.type = _NXT_PORT_MSG_GET_PORT;
5777 
5778     m.get_port.id = port_id->id;
5779     m.get_port.pid = port_id->pid;
5780 
5781     nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
5782                    (int) port_id->id);
5783 
5784     res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
5785     if (nxt_slow_path(res != sizeof(m))) {
5786         return NXT_UNIT_ERROR;
5787     }
5788 
5789     return NXT_UNIT_OK;
5790 }
5791 
5792 
5793 static ssize_t
5794 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5795     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
5796 {
5797     int                   notify;
5798     ssize_t               ret;
5799     nxt_int_t             rc;
5800     nxt_port_msg_t        msg;
5801     nxt_unit_impl_t       *lib;
5802     nxt_unit_port_impl_t  *port_impl;
5803 
5804     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5805 
5806     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5807     if (port_impl->queue != NULL && oob_size == 0
5808         && buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
5809     {
5810         rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, &notify);
5811         if (nxt_slow_path(rc != NXT_OK)) {
5812             nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5813                            (int) port->id.pid, (int) port->id.id);
5814 
5815             return -1;
5816         }
5817 
5818         nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d",
5819                        (int) port->id.pid, (int) port->id.id,
5820                        (int) buf_size, notify);
5821 
5822         if (notify) {
5823             memcpy(&msg, buf, sizeof(nxt_port_msg_t));
5824 
5825             msg.type = _NXT_PORT_MSG_READ_QUEUE;
5826 
5827             if (lib->callbacks.port_send == NULL) {
5828                 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
5829                                        sizeof(nxt_port_msg_t), NULL, 0);
5830 
5831                 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
5832                                (int) port->id.pid, (int) port->id.id,
5833                                (int) ret);
5834 
5835             } else {
5836                 ret = lib->callbacks.port_send(ctx, port, &msg,
5837                                                sizeof(nxt_port_msg_t), NULL, 0);
5838 
5839                 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue",
5840                                (int) port->id.pid, (int) port->id.id,
5841                                (int) ret);
5842             }
5843 
5844         }
5845 
5846         return buf_size;
5847     }
5848 
5849     if (port_impl->queue != NULL) {
5850         msg.type = _NXT_PORT_MSG_READ_SOCKET;
5851 
5852         rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, &notify);
5853         if (nxt_slow_path(rc != NXT_OK)) {
5854             nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5855                            (int) port->id.pid, (int) port->id.id);
5856 
5857             return -1;
5858         }
5859 
5860         nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d",
5861                        (int) port->id.pid, (int) port->id.id, notify);
5862     }
5863 
5864     if (lib->callbacks.port_send != NULL) {
5865         ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
5866                                        oob, oob_size);
5867 
5868         nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
5869                        (int) port->id.pid, (int) port->id.id,
5870                        (int) ret);
5871 
5872     } else {
5873         ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
5874                                oob, oob_size);
5875 
5876         nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
5877                        (int) port->id.pid, (int) port->id.id,
5878                        (int) ret);
5879     }
5880 
5881     return ret;
5882 }
5883 
5884 
5885 static ssize_t
5886 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
5887     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
5888 {
5889     int            err;
5890     ssize_t        res;
5891     struct iovec   iov[1];
5892     struct msghdr  msg;
5893 
5894     iov[0].iov_base = (void *) buf;
5895     iov[0].iov_len = buf_size;
5896 
5897     msg.msg_name = NULL;
5898     msg.msg_namelen = 0;
5899     msg.msg_iov = iov;
5900     msg.msg_iovlen = 1;
5901     msg.msg_flags = 0;
5902     msg.msg_control = (void *) oob;
5903     msg.msg_controllen = oob_size;
5904 
5905 retry:
5906 
5907     res = sendmsg(fd, &msg, 0);
5908 
5909     if (nxt_slow_path(res == -1)) {
5910         err = errno;
5911 
5912         if (err == EINTR) {
5913             goto retry;
5914         }
5915 
5916         /*
5917          * FIXME: This should be "alert" after router graceful shutdown
5918          * implementation.
5919          */
5920         nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
5921                       fd, (int) buf_size, strerror(err), err);
5922 
5923     } else {
5924         nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
5925                        (int) res);
5926     }
5927 
5928     return res;
5929 }
5930 
5931 
5932 static int
5933 nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5934     nxt_unit_read_buf_t *rbuf)
5935 {
5936     int                   res, read;
5937     nxt_unit_port_impl_t  *port_impl;
5938 
5939     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5940 
5941     read = 0;
5942 
5943 retry:
5944 
5945     if (port_impl->from_socket > 0) {
5946         if (port_impl->socket_rbuf != NULL
5947             && port_impl->socket_rbuf->size > 0)
5948         {
5949             port_impl->from_socket--;
5950 
5951             nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf);
5952             port_impl->socket_rbuf->size = 0;
5953 
5954             nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d",
5955                            (int) port->id.pid, (int) port->id.id,
5956                            (int) rbuf->size);
5957 
5958             return NXT_UNIT_OK;
5959         }
5960 
5961     } else {
5962         res = nxt_unit_port_queue_recv(port, rbuf);
5963 
5964         if (res == NXT_UNIT_OK) {
5965             if (nxt_unit_is_read_socket(rbuf)) {
5966                 port_impl->from_socket++;
5967 
5968                 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
5969                                (int) port->id.pid, (int) port->id.id,
5970                                port_impl->from_socket);
5971 
5972                 goto retry;
5973             }
5974 
5975             nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
5976                            (int) port->id.pid, (int) port->id.id,
5977                            (int) rbuf->size);
5978 
5979             return NXT_UNIT_OK;
5980         }
5981     }
5982 
5983     if (read) {
5984         return NXT_UNIT_AGAIN;
5985     }
5986 
5987     res = nxt_unit_port_recv(ctx, port, rbuf);
5988     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
5989         return NXT_UNIT_ERROR;
5990     }
5991 
5992     read = 1;
5993 
5994     if (nxt_unit_is_read_queue(rbuf)) {
5995         nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
5996                        (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
5997 
5998         goto retry;
5999     }
6000 
6001     nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d",
6002                    (int) port->id.pid, (int) port->id.id,
6003                    (int) rbuf->size);
6004 
6005     if (res == NXT_UNIT_AGAIN) {
6006         return NXT_UNIT_AGAIN;
6007     }
6008 
6009     if (port_impl->from_socket > 0) {
6010         port_impl->from_socket--;
6011 
6012         return NXT_UNIT_OK;
6013     }
6014 
6015     nxt_unit_debug(ctx, "port{%d,%d} suspend message %d",
6016                    (int) port->id.pid, (int) port->id.id,
6017                    (int) rbuf->size);
6018 
6019     if (port_impl->socket_rbuf == NULL) {
6020         port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx);
6021 
6022         if (nxt_slow_path(port_impl->socket_rbuf == NULL)) {
6023             return NXT_UNIT_ERROR;
6024         }
6025 
6026         port_impl->socket_rbuf->size = 0;
6027     }
6028 
6029     if (port_impl->socket_rbuf->size > 0) {
6030         nxt_unit_alert(ctx, "too many port socket messages");
6031 
6032         return NXT_UNIT_ERROR;
6033     }
6034 
6035     nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
6036 
6037     memset(rbuf->oob, 0, sizeof(struct cmsghdr));
6038 
6039     goto retry;
6040 }
6041 
6042 
6043 nxt_inline void
6044 nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
6045 {
6046     memcpy(dst->buf, src->buf, src->size);
6047     dst->size = src->size;
6048     memcpy(dst->oob, src->oob, sizeof(src->oob));
6049 }
6050 
6051 
6052 static int
6053 nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6054     nxt_unit_read_buf_t *rbuf)
6055 {
6056     int  res;
6057 
6058 retry:
6059 
6060     res = nxt_unit_app_queue_recv(port, rbuf);
6061 
6062     if (res == NXT_UNIT_AGAIN) {
6063         res = nxt_unit_port_recv(ctx, port, rbuf);
6064         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6065             return NXT_UNIT_ERROR;
6066         }
6067 
6068         if (nxt_unit_is_read_queue(rbuf)) {
6069             nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
6070                            (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6071 
6072             goto retry;
6073         }
6074     }
6075 
6076     return res;
6077 }
6078 
6079 
6080 static int
6081 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6082     nxt_unit_read_buf_t *rbuf)
6083 {
6084     int              fd, err;
6085     struct iovec     iov[1];
6086     struct msghdr    msg;
6087     nxt_unit_impl_t  *lib;
6088 
6089     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6090 
6091     if (lib->callbacks.port_recv != NULL) {
6092         rbuf->size = lib->callbacks.port_recv(ctx, port,
6093                                               rbuf->buf, sizeof(rbuf->buf),
6094                                               rbuf->oob, sizeof(rbuf->oob));
6095 
6096         nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
6097                        (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6098 
6099         if (nxt_slow_path(rbuf->size < 0)) {
6100             return NXT_UNIT_ERROR;
6101         }
6102 
6103         return NXT_UNIT_OK;
6104     }
6105 
6106     iov[0].iov_base = rbuf->buf;
6107     iov[0].iov_len = sizeof(rbuf->buf);
6108 
6109     msg.msg_name = NULL;
6110     msg.msg_namelen = 0;
6111     msg.msg_iov = iov;
6112     msg.msg_iovlen = 1;
6113     msg.msg_flags = 0;
6114     msg.msg_control = rbuf->oob;
6115     msg.msg_controllen = sizeof(rbuf->oob);
6116 
6117     fd = port->in_fd;
6118 
6119 retry:
6120 
6121     rbuf->size = recvmsg(fd, &msg, 0);
6122 
6123     if (nxt_slow_path(rbuf->size == -1)) {
6124         err = errno;
6125 
6126         if (err == EINTR) {
6127             goto retry;
6128         }
6129 
6130         if (err == EAGAIN) {
6131             nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
6132                            fd, strerror(err), err);
6133 
6134             return NXT_UNIT_AGAIN;
6135         }
6136 
6137         nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
6138                        fd, strerror(err), err);
6139 
6140         return NXT_UNIT_ERROR;
6141     }
6142 
6143     nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
6144 
6145     return NXT_UNIT_OK;
6146 }
6147 
6148 
6149 static int
6150 nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
6151 {
6152     nxt_unit_port_impl_t  *port_impl;
6153 
6154     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6155 
6156     rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
6157 
6158     return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6159 }
6160 
6161 
6162 static int
6163 nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
6164 {
6165     uint32_t              cookie;
6166     nxt_port_msg_t        *port_msg;
6167     nxt_app_queue_t       *queue;
6168     nxt_unit_port_impl_t  *port_impl;
6169 
6170     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6171     queue = port_impl->queue;
6172 
6173 retry:
6174 
6175     rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
6176 
6177     nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
6178 
6179     if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
6180         port_msg = (nxt_port_msg_t *) rbuf->buf;
6181 
6182         if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
6183             return NXT_UNIT_OK;
6184         }
6185 
6186         nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
6187 
6188         goto retry;
6189     }
6190 
6191     return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6192 }
6193 
6194 
6195 nxt_inline int
6196 nxt_unit_close(int fd)
6197 {
6198     int  res;
6199 
6200     res = close(fd);
6201 
6202     if (nxt_slow_path(res == -1)) {
6203         nxt_unit_alert(NULL, "close(%d) failed: %s (%d)",
6204                        fd, strerror(errno), errno);
6205 
6206     } else {
6207         nxt_unit_debug(NULL, "close(%d): %d", fd, res);
6208     }
6209 
6210     return res;
6211 }
6212 
6213 
6214 static int
6215 nxt_unit_fd_blocking(int fd)
6216 {
6217     int  nb;
6218 
6219     nb = 0;
6220 
6221     if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) {
6222         nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
6223                        fd, strerror(errno), errno);
6224 
6225         return NXT_UNIT_ERROR;
6226     }
6227 
6228     return NXT_UNIT_OK;
6229 }
6230 
6231 
6232 static nxt_int_t
6233 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6234 {
6235     nxt_unit_port_t          *port;
6236     nxt_unit_port_hash_id_t  *port_id;
6237 
6238     port = data;
6239     port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
6240 
6241     if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
6242         && port_id->pid == port->id.pid
6243         && port_id->id == port->id.id)
6244     {
6245         return NXT_OK;
6246     }
6247 
6248     return NXT_DECLINED;
6249 }
6250 
6251 
6252 static const nxt_lvlhsh_proto_t  lvlhsh_ports_proto  nxt_aligned(64) = {
6253     NXT_LVLHSH_DEFAULT,
6254     nxt_unit_port_hash_test,
6255     nxt_unit_lvlhsh_alloc,
6256     nxt_unit_lvlhsh_free,
6257 };
6258 
6259 
6260 static inline void
6261 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
6262     nxt_unit_port_hash_id_t *port_hash_id,
6263     nxt_unit_port_id_t *port_id)
6264 {
6265     port_hash_id->pid = port_id->pid;
6266     port_hash_id->id = port_id->id;
6267 
6268     if (nxt_fast_path(port_id->hash != 0)) {
6269         lhq->key_hash = port_id->hash;
6270 
6271     } else {
6272         lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
6273 
6274         port_id->hash = lhq->key_hash;
6275 
6276         nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
6277                        (int) port_id->pid, (int) port_id->id,
6278                        (int) port_id->hash);
6279     }
6280 
6281     lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
6282     lhq->key.start = (u_char *) port_hash_id;
6283     lhq->proto = &lvlhsh_ports_proto;
6284     lhq->pool = NULL;
6285 }
6286 
6287 
6288 static int
6289 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
6290 {
6291     nxt_int_t                res;
6292     nxt_lvlhsh_query_t       lhq;
6293     nxt_unit_port_hash_id_t  port_hash_id;
6294 
6295     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
6296     lhq.replace = 0;
6297     lhq.value = port;
6298 
6299     res = nxt_lvlhsh_insert(port_hash, &lhq);
6300 
6301     switch (res) {
6302 
6303     case NXT_OK:
6304         return NXT_UNIT_OK;
6305 
6306     default:
6307         return NXT_UNIT_ERROR;
6308     }
6309 }
6310 
6311 
6312 static nxt_unit_port_t *
6313 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
6314     int remove)
6315 {
6316     nxt_int_t                res;
6317     nxt_lvlhsh_query_t       lhq;
6318     nxt_unit_port_hash_id_t  port_hash_id;
6319 
6320     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
6321 
6322     if (remove) {
6323         res = nxt_lvlhsh_delete(port_hash, &lhq);
6324 
6325     } else {
6326         res = nxt_lvlhsh_find(port_hash, &lhq);
6327     }
6328 
6329     switch (res) {
6330 
6331     case NXT_OK:
6332         if (!remove) {
6333             nxt_unit_port_use(lhq.value);
6334         }
6335 
6336         return lhq.value;
6337 
6338     default:
6339         return NULL;
6340     }
6341 }
6342 
6343 
6344 static nxt_int_t
6345 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6346 {
6347     return NXT_OK;
6348 }
6349 
6350 
6351 static const nxt_lvlhsh_proto_t  lvlhsh_requests_proto  nxt_aligned(64) = {
6352     NXT_LVLHSH_DEFAULT,
6353     nxt_unit_request_hash_test,
6354     nxt_unit_lvlhsh_alloc,
6355     nxt_unit_lvlhsh_free,
6356 };
6357 
6358 
6359 static int
6360 nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
6361     nxt_unit_request_info_t *req)
6362 {
6363     uint32_t                      *stream;
6364     nxt_int_t                     res;
6365     nxt_lvlhsh_query_t            lhq;
6366     nxt_unit_ctx_impl_t           *ctx_impl;
6367     nxt_unit_request_info_impl_t  *req_impl;
6368 
6369     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6370     if (req_impl->in_hash) {
6371         return NXT_UNIT_OK;
6372     }
6373 
6374     stream = &req_impl->stream;
6375 
6376     lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
6377     lhq.key.length = sizeof(*stream);
6378     lhq.key.start = (u_char *) stream;
6379     lhq.proto = &lvlhsh_requests_proto;
6380     lhq.pool = NULL;
6381     lhq.replace = 0;
6382     lhq.value = req_impl;
6383 
6384     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6385 
6386     pthread_mutex_lock(&ctx_impl->mutex);
6387 
6388     res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq);
6389 
6390     pthread_mutex_unlock(&ctx_impl->mutex);
6391 
6392     switch (res) {
6393 
6394     case NXT_OK:
6395         req_impl->in_hash = 1;
6396         return NXT_UNIT_OK;
6397 
6398     default:
6399         return NXT_UNIT_ERROR;
6400     }
6401 }
6402 
6403 
6404 static nxt_unit_request_info_t *
6405 nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
6406 {
6407     nxt_int_t                     res;
6408     nxt_lvlhsh_query_t            lhq;
6409     nxt_unit_ctx_impl_t           *ctx_impl;
6410     nxt_unit_request_info_impl_t  *req_impl;
6411 
6412     lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
6413     lhq.key.length = sizeof(stream);
6414     lhq.key.start = (u_char *) &stream;
6415     lhq.proto = &lvlhsh_requests_proto;
6416     lhq.pool = NULL;
6417 
6418     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6419 
6420     pthread_mutex_lock(&ctx_impl->mutex);
6421 
6422     if (remove) {
6423         res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
6424 
6425     } else {
6426         res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq);
6427     }
6428 
6429     pthread_mutex_unlock(&ctx_impl->mutex);
6430 
6431     switch (res) {
6432 
6433     case NXT_OK:
6434         req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
6435                                     req);
6436         if (remove) {
6437             req_impl->in_hash = 0;
6438         }
6439 
6440         return lhq.value;
6441 
6442     default:
6443         return NULL;
6444     }
6445 }
6446 
6447 
6448 void
6449 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
6450 {
6451     int              log_fd, n;
6452     char             msg[NXT_MAX_ERROR_STR], *p, *end;
6453     pid_t            pid;
6454     va_list          ap;
6455     nxt_unit_impl_t  *lib;
6456 
6457     if (nxt_fast_path(ctx != NULL)) {
6458         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6459 
6460         pid = lib->pid;
6461         log_fd = lib->log_fd;
6462 
6463     } else {
6464         pid = getpid();
6465         log_fd = STDERR_FILENO;
6466     }
6467 
6468     p = msg;
6469     end = p + sizeof(msg) - 1;
6470 
6471     p = nxt_unit_snprint_prefix(p, end, pid, level);
6472 
6473     va_start(ap, fmt);
6474     p += vsnprintf(p, end - p, fmt, ap);
6475     va_end(ap);
6476 
6477     if (nxt_slow_path(p > end)) {
6478         memcpy(end - 5, "[...]", 5);
6479         p = end;
6480     }
6481 
6482     *p++ = '\n';
6483 
6484     n = write(log_fd, msg, p - msg);
6485     if (nxt_slow_path(n < 0)) {
6486         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6487     }
6488 }
6489 
6490 
6491 void
6492 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
6493 {
6494     int                           log_fd, n;
6495     char                          msg[NXT_MAX_ERROR_STR], *p, *end;
6496     pid_t                         pid;
6497     va_list                       ap;
6498     nxt_unit_impl_t               *lib;
6499     nxt_unit_request_info_impl_t  *req_impl;
6500 
6501     if (nxt_fast_path(req != NULL)) {
6502         lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
6503 
6504         pid = lib->pid;
6505         log_fd = lib->log_fd;
6506 
6507     } else {
6508         pid = getpid();
6509         log_fd = STDERR_FILENO;
6510     }
6511 
6512     p = msg;
6513     end = p + sizeof(msg) - 1;
6514 
6515     p = nxt_unit_snprint_prefix(p, end, pid, level);
6516 
6517     if (nxt_fast_path(req != NULL)) {
6518         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6519 
6520         p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
6521     }
6522 
6523     va_start(ap, fmt);
6524     p += vsnprintf(p, end - p, fmt, ap);
6525     va_end(ap);
6526 
6527     if (nxt_slow_path(p > end)) {
6528         memcpy(end - 5, "[...]", 5);
6529         p = end;
6530     }
6531 
6532     *p++ = '\n';
6533 
6534     n = write(log_fd, msg, p - msg);
6535     if (nxt_slow_path(n < 0)) {
6536         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6537     }
6538 }
6539 
6540 
6541 static const char * nxt_unit_log_levels[] = {
6542     "alert",
6543     "error",
6544     "warn",
6545     "notice",
6546     "info",
6547     "debug",
6548 };
6549 
6550 
6551 static char *
6552 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
6553 {
6554     struct tm        tm;
6555     struct timespec  ts;
6556 
6557     (void) clock_gettime(CLOCK_REALTIME, &ts);
6558 
6559 #if (NXT_HAVE_LOCALTIME_R)
6560     (void) localtime_r(&ts.tv_sec, &tm);
6561 #else
6562     tm = *localtime(&ts.tv_sec);
6563 #endif
6564 
6565 #if (NXT_DEBUG)
6566     p += snprintf(p, end - p,
6567                   "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
6568                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6569                   tm.tm_hour, tm.tm_min, tm.tm_sec,
6570                   (int) ts.tv_nsec / 1000000);
6571 #else
6572     p += snprintf(p, end - p,
6573                   "%4d/%02d/%02d %02d:%02d:%02d ",
6574                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6575                   tm.tm_hour, tm.tm_min, tm.tm_sec);
6576 #endif
6577 
6578     p += snprintf(p, end - p,
6579                   "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
6580                   (int) pid,
6581                   (uint64_t) (uintptr_t) nxt_thread_get_tid());
6582 
6583     return p;
6584 }
6585 
6586 
6587 static void *
6588 nxt_unit_lvlhsh_alloc(void *data, size_t size)
6589 {
6590     int   err;
6591     void  *p;
6592 
6593     err = posix_memalign(&p, size, size);
6594 
6595     if (nxt_fast_path(err == 0)) {
6596         nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p",
6597                        (int) size, (int) size, p);
6598         return p;
6599     }
6600 
6601     nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)",
6602                    (int) size, (int) size, strerror(err), err);
6603     return NULL;
6604 }
6605 
6606 
6607 static void
6608 nxt_unit_lvlhsh_free(void *data, void *p)
6609 {
6610     nxt_unit_free(NULL, p);
6611 }
6612 
6613 
6614 void *
6615 nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size)
6616 {
6617     void  *p;
6618 
6619     p = malloc(size);
6620 
6621     if (nxt_fast_path(p != NULL)) {
6622 #if (NXT_DEBUG_ALLOC)
6623         nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p);
6624 #endif
6625 
6626     } else {
6627         nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)",
6628                        (int) size, strerror(errno), errno);
6629     }
6630 
6631     return p;
6632 }
6633 
6634 
6635 void
6636 nxt_unit_free(nxt_unit_ctx_t *ctx, void *p)
6637 {
6638 #if (NXT_DEBUG_ALLOC)
6639     nxt_unit_debug(ctx, "free(%p)", p);
6640 #endif
6641 
6642     free(p);
6643 }
6644 
6645 
6646 static int
6647 nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length)
6648 {
6649     u_char        c1, c2;
6650     nxt_int_t     n;
6651     const u_char  *s1, *s2;
6652 
6653     s1 = p1;
6654     s2 = p2;
6655 
6656     while (length-- != 0) {
6657         c1 = *s1++;
6658         c2 = *s2++;
6659 
6660         c1 = nxt_lowcase(c1);
6661         c2 = nxt_lowcase(c2);
6662 
6663         n = c1 - c2;
6664 
6665         if (n != 0) {
6666             return n;
6667         }
6668     }
6669 
6670     return 0;
6671 }
6672