xref: /unit/src/nxt_unit.c (revision 1755:3b0331284155)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <stdlib.h>
7 
8 #include "nxt_main.h"
9 #include "nxt_port_memory_int.h"
10 #include "nxt_port_queue.h"
11 #include "nxt_app_queue.h"
12 
13 #include "nxt_unit.h"
14 #include "nxt_unit_request.h"
15 #include "nxt_unit_response.h"
16 #include "nxt_unit_websocket.h"
17 
18 #include "nxt_websocket.h"
19 
20 #if (NXT_HAVE_MEMFD_CREATE)
21 #include <linux/memfd.h>
22 #endif
23 
24 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
25 #define NXT_UNIT_LOCAL_BUF_SIZE  \
26     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
27 
28 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
29 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
30 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
31 typedef struct nxt_unit_process_s               nxt_unit_process_t;
32 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
33 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
34 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
35 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
36 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
37 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
38 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
39 
40 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
41 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
42     nxt_unit_ctx_impl_t *ctx_impl, void *data);
43 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
44 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
45 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
46 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
47 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
48     nxt_unit_mmap_buf_t *mmap_buf);
49 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
50     nxt_unit_mmap_buf_t *mmap_buf);
51 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
52 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
53     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
54     int *log_fd, uint32_t *stream, uint32_t *shm_limit);
55 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
56     int queue_fd);
57 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
58     nxt_unit_request_info_t **preq);
59 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
60     nxt_unit_recv_msg_t *recv_msg);
61 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
62 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
63     nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
64 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
65     nxt_unit_recv_msg_t *recv_msg);
66 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
67     nxt_unit_port_id_t *port_id);
68 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
69 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
70     nxt_unit_recv_msg_t *recv_msg);
71 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
72 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
73     nxt_unit_ctx_t *ctx);
74 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
75 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
76 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
77     nxt_unit_ctx_t *ctx);
78 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
79 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
80     nxt_unit_websocket_frame_impl_t *ws);
81 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
82 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
83 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
84     nxt_unit_mmap_buf_t *mmap_buf, int last);
85 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
86 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
87 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
88 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
89     nxt_unit_ctx_impl_t *ctx_impl);
90 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
91     nxt_unit_read_buf_t *rbuf);
92 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
93     nxt_unit_request_info_t *req, size_t size);
94 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
95     size_t size);
96 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
97     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
98 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
99 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
100 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
101 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
102     nxt_unit_port_t *port, int n);
103 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
104 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
105     int fd);
106 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
107     nxt_unit_port_t *port, uint32_t size,
108     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
109 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
110 
111 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
112     nxt_unit_ctx_impl_t *ctx_impl);
113 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
114 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
115 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
116 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
117 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
118     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
119     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
120 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
121     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
122 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
123 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
124     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
125 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
126 
127 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
128 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
129     pid_t pid, int remove);
130 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
131 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
132 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
133 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
134 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
135 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
136 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
137 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
138 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
139 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
140     nxt_unit_port_t *port);
141 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
142 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
143 
144 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
145     nxt_unit_port_t *port, int queue_fd);
146 
147 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
148 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
149 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
150     nxt_unit_port_t *port, void *queue);
151 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
152     nxt_queue_t *awaiting_req);
153 static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
154     nxt_unit_port_id_t *port_id);
155 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
156     nxt_unit_port_id_t *port_id);
157 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
158 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
159     nxt_unit_process_t *process);
160 static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
161 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
162 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
163     nxt_unit_port_t *port, const void *buf, size_t buf_size,
164     const void *oob, size_t oob_size);
165 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
166     const void *buf, size_t buf_size, const void *oob, size_t oob_size);
167 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
168     nxt_unit_read_buf_t *rbuf);
169 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
170     nxt_unit_read_buf_t *src);
171 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
172     nxt_unit_read_buf_t *rbuf);
173 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
174     nxt_unit_read_buf_t *rbuf);
175 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
176     nxt_unit_read_buf_t *rbuf);
177 static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
178     nxt_unit_read_buf_t *rbuf);
179 nxt_inline int nxt_unit_close(int fd);
180 static int nxt_unit_fd_blocking(int fd);
181 
182 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
183     nxt_unit_port_t *port);
184 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
185     nxt_unit_port_id_t *port_id, int remove);
186 
187 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
188     nxt_unit_request_info_t *req);
189 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
190     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
191 
192 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
193 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
194 static void nxt_unit_lvlhsh_free(void *data, void *p);
195 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
196 
197 
198 struct nxt_unit_mmap_buf_s {
199     nxt_unit_buf_t           buf;
200 
201     nxt_unit_mmap_buf_t      *next;
202     nxt_unit_mmap_buf_t      **prev;
203 
204     nxt_port_mmap_header_t   *hdr;
205     nxt_unit_request_info_t  *req;
206     nxt_unit_ctx_impl_t      *ctx_impl;
207     char                     *free_ptr;
208     char                     *plain_ptr;
209 };
210 
211 
212 struct nxt_unit_recv_msg_s {
213     uint32_t                 stream;
214     nxt_pid_t                pid;
215     nxt_port_id_t            reply_port;
216 
217     uint8_t                  last;      /* 1 bit */
218     uint8_t                  mmap;      /* 1 bit */
219 
220     void                     *start;
221     uint32_t                 size;
222 
223     int                      fd[2];
224 
225     nxt_unit_mmap_buf_t      *incoming_buf;
226 };
227 
228 
229 typedef enum {
230     NXT_UNIT_RS_START           = 0,
231     NXT_UNIT_RS_RESPONSE_INIT,
232     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
233     NXT_UNIT_RS_RESPONSE_SENT,
234     NXT_UNIT_RS_RELEASED,
235 } nxt_unit_req_state_t;
236 
237 
238 struct nxt_unit_request_info_impl_s {
239     nxt_unit_request_info_t  req;
240 
241     uint32_t                 stream;
242 
243     nxt_unit_mmap_buf_t      *outgoing_buf;
244     nxt_unit_mmap_buf_t      *incoming_buf;
245 
246     nxt_unit_req_state_t     state;
247     uint8_t                  websocket;
248     uint8_t                  in_hash;
249 
250     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
251     nxt_queue_link_t         link;
252     /*  for nxt_unit_port_impl_t.awaiting_req */
253     nxt_queue_link_t         port_wait_link;
254 
255     char                     extra_data[];
256 };
257 
258 
259 struct nxt_unit_websocket_frame_impl_s {
260     nxt_unit_websocket_frame_t  ws;
261 
262     nxt_unit_mmap_buf_t         *buf;
263 
264     nxt_queue_link_t            link;
265 
266     nxt_unit_ctx_impl_t         *ctx_impl;
267 };
268 
269 
270 struct nxt_unit_read_buf_s {
271     nxt_queue_link_t              link;
272     nxt_unit_ctx_impl_t           *ctx_impl;
273     ssize_t                       size;
274     char                          buf[16384];
275     char                          oob[256];
276 };
277 
278 
279 struct nxt_unit_ctx_impl_s {
280     nxt_unit_ctx_t                ctx;
281 
282     nxt_atomic_t                  use_count;
283     nxt_atomic_t                  wait_items;
284 
285     pthread_mutex_t               mutex;
286 
287     nxt_unit_port_t               *read_port;
288 
289     nxt_queue_link_t              link;
290 
291     nxt_unit_mmap_buf_t           *free_buf;
292 
293     /*  of nxt_unit_request_info_impl_t */
294     nxt_queue_t                   free_req;
295 
296     /*  of nxt_unit_websocket_frame_impl_t */
297     nxt_queue_t                   free_ws;
298 
299     /*  of nxt_unit_request_info_impl_t */
300     nxt_queue_t                   active_req;
301 
302     /*  of nxt_unit_request_info_impl_t */
303     nxt_lvlhsh_t                  requests;
304 
305     /*  of nxt_unit_request_info_impl_t */
306     nxt_queue_t                   ready_req;
307 
308     /*  of nxt_unit_read_buf_t */
309     nxt_queue_t                   pending_rbuf;
310 
311     /*  of nxt_unit_read_buf_t */
312     nxt_queue_t                   free_rbuf;
313 
314     int                           online;
315     int                           ready;
316 
317     nxt_unit_mmap_buf_t           ctx_buf[2];
318     nxt_unit_read_buf_t           ctx_read_buf;
319 
320     nxt_unit_request_info_impl_t  req;
321 };
322 
323 
324 struct nxt_unit_mmap_s {
325     nxt_port_mmap_header_t   *hdr;
326     pthread_t                src_thread;
327 
328     /*  of nxt_unit_read_buf_t */
329     nxt_queue_t              awaiting_rbuf;
330 };
331 
332 
333 struct nxt_unit_mmaps_s {
334     pthread_mutex_t          mutex;
335     uint32_t                 size;
336     uint32_t                 cap;
337     nxt_atomic_t             allocated_chunks;
338     nxt_unit_mmap_t          *elts;
339 };
340 
341 
342 struct nxt_unit_impl_s {
343     nxt_unit_t               unit;
344     nxt_unit_callbacks_t     callbacks;
345 
346     nxt_atomic_t             use_count;
347 
348     uint32_t                 request_data_size;
349     uint32_t                 shm_mmap_limit;
350 
351     pthread_mutex_t          mutex;
352 
353     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
354     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
355 
356     nxt_unit_port_t          *router_port;
357     nxt_unit_port_t          *shared_port;
358 
359     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
360 
361     nxt_unit_mmaps_t         incoming;
362     nxt_unit_mmaps_t         outgoing;
363 
364     pid_t                    pid;
365     int                      log_fd;
366 
367     nxt_unit_ctx_impl_t      main_ctx;
368 };
369 
370 
371 struct nxt_unit_port_impl_s {
372     nxt_unit_port_t          port;
373 
374     nxt_atomic_t             use_count;
375 
376     /*  for nxt_unit_process_t.ports */
377     nxt_queue_link_t         link;
378     nxt_unit_process_t       *process;
379 
380     /*  of nxt_unit_request_info_impl_t */
381     nxt_queue_t              awaiting_req;
382 
383     int                      ready;
384 
385     void                     *queue;
386 
387     int                      from_socket;
388     nxt_unit_read_buf_t      *socket_rbuf;
389 };
390 
391 
392 struct nxt_unit_process_s {
393     pid_t                    pid;
394 
395     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
396 
397     nxt_unit_impl_t          *lib;
398 
399     nxt_atomic_t             use_count;
400 
401     uint32_t                 next_port_id;
402 };
403 
404 
405 /* Explicitly using 32 bit types to avoid possible alignment. */
406 typedef struct {
407     int32_t   pid;
408     uint32_t  id;
409 } nxt_unit_port_hash_id_t;
410 
411 
412 nxt_unit_ctx_t *
413 nxt_unit_init(nxt_unit_init_t *init)
414 {
415     int              rc, queue_fd;
416     void             *mem;
417     uint32_t         ready_stream, shm_limit;
418     nxt_unit_ctx_t   *ctx;
419     nxt_unit_impl_t  *lib;
420     nxt_unit_port_t  ready_port, router_port, read_port;
421 
422     lib = nxt_unit_create(init);
423     if (nxt_slow_path(lib == NULL)) {
424         return NULL;
425     }
426 
427     queue_fd = -1;
428     mem = MAP_FAILED;
429 
430     if (init->ready_port.id.pid != 0
431         && init->ready_stream != 0
432         && init->read_port.id.pid != 0)
433     {
434         ready_port = init->ready_port;
435         ready_stream = init->ready_stream;
436         router_port = init->router_port;
437         read_port = init->read_port;
438         lib->log_fd = init->log_fd;
439 
440         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
441                               ready_port.id.id);
442         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
443                               router_port.id.id);
444         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
445                               read_port.id.id);
446 
447     } else {
448         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
449                                &lib->log_fd, &ready_stream, &shm_limit);
450         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
451             goto fail;
452         }
453 
454         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
455                                 / PORT_MMAP_DATA_SIZE;
456     }
457 
458     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
459         lib->shm_mmap_limit = 1;
460     }
461 
462     lib->pid = read_port.id.pid;
463 
464     ctx = &lib->main_ctx.ctx;
465 
466     rc = nxt_unit_fd_blocking(router_port.out_fd);
467     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
468         goto fail;
469     }
470 
471     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
472     if (nxt_slow_path(lib->router_port == NULL)) {
473         nxt_unit_alert(NULL, "failed to add router_port");
474 
475         goto fail;
476     }
477 
478     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
479     if (nxt_slow_path(queue_fd == -1)) {
480         goto fail;
481     }
482 
483     mem = mmap(NULL, sizeof(nxt_port_queue_t),
484                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
485     if (nxt_slow_path(mem == MAP_FAILED)) {
486         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
487                        strerror(errno), errno);
488 
489         goto fail;
490     }
491 
492     nxt_port_queue_init(mem);
493 
494     rc = nxt_unit_fd_blocking(read_port.in_fd);
495     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
496         goto fail;
497     }
498 
499     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
500     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
501         nxt_unit_alert(NULL, "failed to add read_port");
502 
503         goto fail;
504     }
505 
506     rc = nxt_unit_fd_blocking(ready_port.out_fd);
507     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
508         goto fail;
509     }
510 
511     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
512     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
513         nxt_unit_alert(NULL, "failed to send READY message");
514 
515         goto fail;
516     }
517 
518     nxt_unit_close(ready_port.out_fd);
519     nxt_unit_close(queue_fd);
520 
521     return ctx;
522 
523 fail:
524 
525     if (mem != MAP_FAILED) {
526         munmap(mem, sizeof(nxt_port_queue_t));
527     }
528 
529     if (queue_fd != -1) {
530         nxt_unit_close(queue_fd);
531     }
532 
533     nxt_unit_ctx_release(&lib->main_ctx.ctx);
534 
535     return NULL;
536 }
537 
538 
539 static nxt_unit_impl_t *
540 nxt_unit_create(nxt_unit_init_t *init)
541 {
542     int                   rc;
543     nxt_unit_impl_t       *lib;
544     nxt_unit_callbacks_t  *cb;
545 
546     lib = nxt_unit_malloc(NULL,
547                           sizeof(nxt_unit_impl_t) + init->request_data_size);
548     if (nxt_slow_path(lib == NULL)) {
549         nxt_unit_alert(NULL, "failed to allocate unit struct");
550 
551         return NULL;
552     }
553 
554     rc = pthread_mutex_init(&lib->mutex, NULL);
555     if (nxt_slow_path(rc != 0)) {
556         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
557 
558         goto fail;
559     }
560 
561     lib->unit.data = init->data;
562     lib->callbacks = init->callbacks;
563 
564     lib->request_data_size = init->request_data_size;
565     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
566                             / PORT_MMAP_DATA_SIZE;
567 
568     lib->processes.slot = NULL;
569     lib->ports.slot = NULL;
570 
571     lib->log_fd = STDERR_FILENO;
572 
573     nxt_queue_init(&lib->contexts);
574 
575     lib->use_count = 0;
576     lib->router_port = NULL;
577     lib->shared_port = NULL;
578 
579     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
580     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
581         pthread_mutex_destroy(&lib->mutex);
582         goto fail;
583     }
584 
585     cb = &lib->callbacks;
586 
587     if (cb->request_handler == NULL) {
588         nxt_unit_alert(NULL, "request_handler is NULL");
589 
590         pthread_mutex_destroy(&lib->mutex);
591         goto fail;
592     }
593 
594     nxt_unit_mmaps_init(&lib->incoming);
595     nxt_unit_mmaps_init(&lib->outgoing);
596 
597     return lib;
598 
599 fail:
600 
601     nxt_unit_free(NULL, lib);
602 
603     return NULL;
604 }
605 
606 
607 static int
608 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
609     void *data)
610 {
611     int  rc;
612 
613     ctx_impl->ctx.data = data;
614     ctx_impl->ctx.unit = &lib->unit;
615 
616     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
617     if (nxt_slow_path(rc != 0)) {
618         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
619 
620         return NXT_UNIT_ERROR;
621     }
622 
623     nxt_unit_lib_use(lib);
624 
625     pthread_mutex_lock(&lib->mutex);
626 
627     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
628 
629     pthread_mutex_unlock(&lib->mutex);
630 
631     ctx_impl->use_count = 1;
632     ctx_impl->wait_items = 0;
633     ctx_impl->online = 1;
634     ctx_impl->ready = 0;
635 
636     nxt_queue_init(&ctx_impl->free_req);
637     nxt_queue_init(&ctx_impl->free_ws);
638     nxt_queue_init(&ctx_impl->active_req);
639     nxt_queue_init(&ctx_impl->ready_req);
640     nxt_queue_init(&ctx_impl->pending_rbuf);
641     nxt_queue_init(&ctx_impl->free_rbuf);
642 
643     ctx_impl->free_buf = NULL;
644     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
645     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
646 
647     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
648     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
649 
650     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
651 
652     ctx_impl->req.req.ctx = &ctx_impl->ctx;
653     ctx_impl->req.req.unit = &lib->unit;
654 
655     ctx_impl->read_port = NULL;
656     ctx_impl->requests.slot = 0;
657 
658     return NXT_UNIT_OK;
659 }
660 
661 
662 nxt_inline void
663 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
664 {
665     nxt_unit_ctx_impl_t  *ctx_impl;
666 
667     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
668 
669     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
670 }
671 
672 
673 nxt_inline void
674 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
675 {
676     long                 c;
677     nxt_unit_ctx_impl_t  *ctx_impl;
678 
679     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
680 
681     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
682 
683     if (c == 1) {
684         nxt_unit_ctx_free(ctx_impl);
685     }
686 }
687 
688 
689 nxt_inline void
690 nxt_unit_lib_use(nxt_unit_impl_t *lib)
691 {
692     nxt_atomic_fetch_add(&lib->use_count, 1);
693 }
694 
695 
696 nxt_inline void
697 nxt_unit_lib_release(nxt_unit_impl_t *lib)
698 {
699     long                c;
700     nxt_unit_process_t  *process;
701 
702     c = nxt_atomic_fetch_add(&lib->use_count, -1);
703 
704     if (c == 1) {
705         for ( ;; ) {
706             pthread_mutex_lock(&lib->mutex);
707 
708             process = nxt_unit_process_pop_first(lib);
709             if (process == NULL) {
710                 pthread_mutex_unlock(&lib->mutex);
711 
712                 break;
713             }
714 
715             nxt_unit_remove_process(lib, process);
716         }
717 
718         pthread_mutex_destroy(&lib->mutex);
719 
720         if (nxt_fast_path(lib->router_port != NULL)) {
721             nxt_unit_port_release(lib->router_port);
722         }
723 
724         if (nxt_fast_path(lib->shared_port != NULL)) {
725             nxt_unit_port_release(lib->shared_port);
726         }
727 
728         nxt_unit_mmaps_destroy(&lib->incoming);
729         nxt_unit_mmaps_destroy(&lib->outgoing);
730 
731         nxt_unit_free(NULL, lib);
732     }
733 }
734 
735 
736 nxt_inline void
737 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
738     nxt_unit_mmap_buf_t *mmap_buf)
739 {
740     mmap_buf->next = *head;
741 
742     if (mmap_buf->next != NULL) {
743         mmap_buf->next->prev = &mmap_buf->next;
744     }
745 
746     *head = mmap_buf;
747     mmap_buf->prev = head;
748 }
749 
750 
751 nxt_inline void
752 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
753     nxt_unit_mmap_buf_t *mmap_buf)
754 {
755     while (*prev != NULL) {
756         prev = &(*prev)->next;
757     }
758 
759     nxt_unit_mmap_buf_insert(prev, mmap_buf);
760 }
761 
762 
763 nxt_inline void
764 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
765 {
766     nxt_unit_mmap_buf_t  **prev;
767 
768     prev = mmap_buf->prev;
769 
770     if (mmap_buf->next != NULL) {
771         mmap_buf->next->prev = prev;
772     }
773 
774     if (prev != NULL) {
775         *prev = mmap_buf->next;
776     }
777 }
778 
779 
780 static int
781 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
782     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
783     uint32_t *shm_limit)
784 {
785     int       rc;
786     int       ready_fd, router_fd, read_in_fd, read_out_fd;
787     char      *unit_init, *version_end, *vars;
788     size_t    version_length;
789     int64_t   ready_pid, router_pid, read_pid;
790     uint32_t  ready_stream, router_id, ready_id, read_id;
791 
792     unit_init = getenv(NXT_UNIT_INIT_ENV);
793     if (nxt_slow_path(unit_init == NULL)) {
794         nxt_unit_alert(NULL, "%s is not in the current environment",
795                        NXT_UNIT_INIT_ENV);
796 
797         return NXT_UNIT_ERROR;
798     }
799 
800     version_end = strchr(unit_init, ';');
801     if (nxt_slow_path(version_end == NULL)) {
802         nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
803                        NXT_UNIT_INIT_ENV, unit_init);
804 
805         return NXT_UNIT_ERROR;
806     }
807 
808     version_length = version_end - unit_init;
809 
810     rc = version_length != nxt_length(NXT_VERSION)
811          || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
812 
813     if (nxt_slow_path(rc != 0)) {
814         nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
815                        "%.*s, while the app was compiled with libunit %s",
816                        (int) version_length, unit_init, NXT_VERSION);
817 
818         return NXT_UNIT_ERROR;
819     }
820 
821     vars = version_end + 1;
822 
823     rc = sscanf(vars,
824                 "%"PRIu32";"
825                 "%"PRId64",%"PRIu32",%d;"
826                 "%"PRId64",%"PRIu32",%d;"
827                 "%"PRId64",%"PRIu32",%d,%d;"
828                 "%d,%"PRIu32,
829                 &ready_stream,
830                 &ready_pid, &ready_id, &ready_fd,
831                 &router_pid, &router_id, &router_fd,
832                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
833                 log_fd, shm_limit);
834 
835     if (nxt_slow_path(rc == EOF)) {
836         nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
837                        vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
838 
839         return NXT_UNIT_ERROR;
840     }
841 
842     if (nxt_slow_path(rc != 13)) {
843         nxt_unit_alert(NULL, "invalid number of variables in %s env: "
844                        "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 13, vars);
845 
846         return NXT_UNIT_ERROR;
847     }
848 
849     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
850 
851     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
852 
853     ready_port->in_fd = -1;
854     ready_port->out_fd = ready_fd;
855     ready_port->data = NULL;
856 
857     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
858 
859     router_port->in_fd = -1;
860     router_port->out_fd = router_fd;
861     router_port->data = NULL;
862 
863     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
864 
865     read_port->in_fd = read_in_fd;
866     read_port->out_fd = read_out_fd;
867     read_port->data = NULL;
868 
869     *stream = ready_stream;
870 
871     return NXT_UNIT_OK;
872 }
873 
874 
875 static int
876 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
877 {
878     ssize_t          res;
879     nxt_port_msg_t   msg;
880     nxt_unit_impl_t  *lib;
881 
882     union {
883         struct cmsghdr  cm;
884         char            space[CMSG_SPACE(sizeof(int))];
885     } cmsg;
886 
887     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
888 
889     msg.stream = stream;
890     msg.pid = lib->pid;
891     msg.reply_port = 0;
892     msg.type = _NXT_PORT_MSG_PROCESS_READY;
893     msg.last = 1;
894     msg.mmap = 0;
895     msg.nf = 0;
896     msg.mf = 0;
897     msg.tracking = 0;
898 
899     memset(&cmsg, 0, sizeof(cmsg));
900 
901     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
902     cmsg.cm.cmsg_level = SOL_SOCKET;
903     cmsg.cm.cmsg_type = SCM_RIGHTS;
904 
905     /*
906      * memcpy() is used instead of simple
907      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
908      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
909      *   dereferencing type-punned pointer will break strict-aliasing rules
910      *
911      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
912      * in the same simple assignment as in the code above.
913      */
914     memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int));
915 
916     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg),
917                            &cmsg, sizeof(cmsg));
918     if (res != sizeof(msg)) {
919         return NXT_UNIT_ERROR;
920     }
921 
922     return NXT_UNIT_OK;
923 }
924 
925 
926 static int
927 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
928     nxt_unit_request_info_t **preq)
929 {
930     int                  rc;
931     pid_t                pid;
932     struct cmsghdr       *cm;
933     nxt_port_msg_t       *port_msg;
934     nxt_unit_impl_t      *lib;
935     nxt_unit_recv_msg_t  recv_msg;
936 
937     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
938 
939     recv_msg.fd[0] = -1;
940     recv_msg.fd[1] = -1;
941     port_msg = (nxt_port_msg_t *) rbuf->buf;
942     cm = (struct cmsghdr *) rbuf->oob;
943 
944     if (cm->cmsg_level == SOL_SOCKET
945         && cm->cmsg_type == SCM_RIGHTS)
946     {
947         if (cm->cmsg_len == CMSG_LEN(sizeof(int))) {
948             memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int));
949         }
950 
951         if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) {
952             memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2);
953         }
954     }
955 
956     recv_msg.incoming_buf = NULL;
957 
958     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
959         if (nxt_slow_path(rbuf->size == 0)) {
960             nxt_unit_debug(ctx, "read port closed");
961 
962             nxt_unit_quit(ctx);
963             rc = NXT_UNIT_OK;
964             goto done;
965         }
966 
967         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
968 
969         rc = NXT_UNIT_ERROR;
970         goto done;
971     }
972 
973     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
974                    port_msg->stream, (int) port_msg->type,
975                    recv_msg.fd[0], recv_msg.fd[1]);
976 
977     recv_msg.stream = port_msg->stream;
978     recv_msg.pid = port_msg->pid;
979     recv_msg.reply_port = port_msg->reply_port;
980     recv_msg.last = port_msg->last;
981     recv_msg.mmap = port_msg->mmap;
982 
983     recv_msg.start = port_msg + 1;
984     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
985 
986     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
987         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
988                        port_msg->stream, (int) port_msg->type);
989         rc = NXT_UNIT_ERROR;
990         goto done;
991     }
992 
993     /* Fragmentation is unsupported. */
994     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
995         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
996                        port_msg->stream, (int) port_msg->type);
997         rc = NXT_UNIT_ERROR;
998         goto done;
999     }
1000 
1001     if (port_msg->mmap) {
1002         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
1003 
1004         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1005             if (rc == NXT_UNIT_AGAIN) {
1006                 recv_msg.fd[0] = -1;
1007                 recv_msg.fd[1] = -1;
1008             }
1009 
1010             goto done;
1011         }
1012     }
1013 
1014     switch (port_msg->type) {
1015 
1016     case _NXT_PORT_MSG_RPC_READY:
1017         rc = NXT_UNIT_OK;
1018         break;
1019 
1020     case _NXT_PORT_MSG_QUIT:
1021         nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
1022 
1023         nxt_unit_quit(ctx);
1024         rc = NXT_UNIT_OK;
1025         break;
1026 
1027     case _NXT_PORT_MSG_NEW_PORT:
1028         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1029         break;
1030 
1031     case _NXT_PORT_MSG_PORT_ACK:
1032         rc = nxt_unit_ctx_ready(ctx);
1033         break;
1034 
1035     case _NXT_PORT_MSG_CHANGE_FILE:
1036         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1037                        port_msg->stream, recv_msg.fd[0]);
1038 
1039         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1040             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1041                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1042                            strerror(errno), errno);
1043 
1044             rc = NXT_UNIT_ERROR;
1045             goto done;
1046         }
1047 
1048         rc = NXT_UNIT_OK;
1049         break;
1050 
1051     case _NXT_PORT_MSG_MMAP:
1052         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1053             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1054                            port_msg->stream, recv_msg.fd[0]);
1055 
1056             rc = NXT_UNIT_ERROR;
1057             goto done;
1058         }
1059 
1060         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1061         break;
1062 
1063     case _NXT_PORT_MSG_REQ_HEADERS:
1064         rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1065         break;
1066 
1067     case _NXT_PORT_MSG_REQ_BODY:
1068         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1069         break;
1070 
1071     case _NXT_PORT_MSG_WEBSOCKET:
1072         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1073         break;
1074 
1075     case _NXT_PORT_MSG_REMOVE_PID:
1076         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1077             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1078                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1079                            (int) sizeof(pid));
1080 
1081             rc = NXT_UNIT_ERROR;
1082             goto done;
1083         }
1084 
1085         memcpy(&pid, recv_msg.start, sizeof(pid));
1086 
1087         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1088                        port_msg->stream, (int) pid);
1089 
1090         nxt_unit_remove_pid(lib, pid);
1091 
1092         rc = NXT_UNIT_OK;
1093         break;
1094 
1095     case _NXT_PORT_MSG_SHM_ACK:
1096         rc = nxt_unit_process_shm_ack(ctx);
1097         break;
1098 
1099     default:
1100         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1101                        port_msg->stream, (int) port_msg->type);
1102 
1103         rc = NXT_UNIT_ERROR;
1104         goto done;
1105     }
1106 
1107 done:
1108 
1109     if (recv_msg.fd[0] != -1) {
1110         nxt_unit_close(recv_msg.fd[0]);
1111     }
1112 
1113     if (recv_msg.fd[1] != -1) {
1114         nxt_unit_close(recv_msg.fd[1]);
1115     }
1116 
1117     while (recv_msg.incoming_buf != NULL) {
1118         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1119     }
1120 
1121     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1122 #if (NXT_DEBUG)
1123         memset(rbuf->buf, 0xAC, rbuf->size);
1124 #endif
1125         nxt_unit_read_buf_release(ctx, rbuf);
1126     }
1127 
1128     return rc;
1129 }
1130 
1131 
1132 static int
1133 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1134 {
1135     void                     *mem;
1136     nxt_unit_impl_t          *lib;
1137     nxt_unit_port_t          new_port, *port;
1138     nxt_port_msg_new_port_t  *new_port_msg;
1139 
1140     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1141         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1142                       "invalid message size (%d)",
1143                       recv_msg->stream, (int) recv_msg->size);
1144 
1145         return NXT_UNIT_ERROR;
1146     }
1147 
1148     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1149         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1150                        recv_msg->stream, recv_msg->fd[0]);
1151 
1152         return NXT_UNIT_ERROR;
1153     }
1154 
1155     new_port_msg = recv_msg->start;
1156 
1157     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1158                    recv_msg->stream, (int) new_port_msg->pid,
1159                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1160 
1161     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1162 
1163     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1164         nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1165 
1166         new_port.in_fd = recv_msg->fd[0];
1167         new_port.out_fd = -1;
1168 
1169         mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
1170                    MAP_SHARED, recv_msg->fd[1], 0);
1171 
1172     } else {
1173         if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
1174                           != NXT_UNIT_OK))
1175         {
1176             return NXT_UNIT_ERROR;
1177         }
1178 
1179         nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1180                               new_port_msg->id);
1181 
1182         new_port.in_fd = -1;
1183         new_port.out_fd = recv_msg->fd[0];
1184 
1185         mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1186                    MAP_SHARED, recv_msg->fd[1], 0);
1187     }
1188 
1189     if (nxt_slow_path(mem == MAP_FAILED)) {
1190         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1191                        strerror(errno), errno);
1192 
1193         return NXT_UNIT_ERROR;
1194     }
1195 
1196     new_port.data = NULL;
1197 
1198     recv_msg->fd[0] = -1;
1199 
1200     port = nxt_unit_add_port(ctx, &new_port, mem);
1201     if (nxt_slow_path(port == NULL)) {
1202         return NXT_UNIT_ERROR;
1203     }
1204 
1205     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1206         lib->shared_port = port;
1207 
1208         return nxt_unit_ctx_ready(ctx);
1209     }
1210 
1211     nxt_unit_port_release(port);
1212 
1213     return NXT_UNIT_OK;
1214 }
1215 
1216 
1217 static int
1218 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1219 {
1220     nxt_unit_impl_t      *lib;
1221     nxt_unit_ctx_impl_t  *ctx_impl;
1222 
1223     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1224     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1225 
1226     ctx_impl->ready = 1;
1227 
1228     if (lib->callbacks.ready_handler) {
1229         return lib->callbacks.ready_handler(ctx);
1230     }
1231 
1232     return NXT_UNIT_OK;
1233 }
1234 
1235 
1236 static int
1237 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1238     nxt_unit_request_info_t **preq)
1239 {
1240     int                           res;
1241     nxt_unit_impl_t               *lib;
1242     nxt_unit_port_id_t            port_id;
1243     nxt_unit_request_t            *r;
1244     nxt_unit_mmap_buf_t           *b;
1245     nxt_unit_request_info_t       *req;
1246     nxt_unit_request_info_impl_t  *req_impl;
1247 
1248     if (nxt_slow_path(recv_msg->mmap == 0)) {
1249         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1250                       recv_msg->stream);
1251 
1252         return NXT_UNIT_ERROR;
1253     }
1254 
1255     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1256         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1257                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1258                       (int) sizeof(nxt_unit_request_t));
1259 
1260         return NXT_UNIT_ERROR;
1261     }
1262 
1263     req_impl = nxt_unit_request_info_get(ctx);
1264     if (nxt_slow_path(req_impl == NULL)) {
1265         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1266                       recv_msg->stream);
1267 
1268         return NXT_UNIT_ERROR;
1269     }
1270 
1271     req = &req_impl->req;
1272 
1273     req->request = recv_msg->start;
1274 
1275     b = recv_msg->incoming_buf;
1276 
1277     req->request_buf = &b->buf;
1278     req->response = NULL;
1279     req->response_buf = NULL;
1280 
1281     r = req->request;
1282 
1283     req->content_length = r->content_length;
1284 
1285     req->content_buf = req->request_buf;
1286     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1287 
1288     req_impl->stream = recv_msg->stream;
1289 
1290     req_impl->outgoing_buf = NULL;
1291 
1292     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1293         b->req = req;
1294     }
1295 
1296     /* "Move" incoming buffer list to req_impl. */
1297     req_impl->incoming_buf = recv_msg->incoming_buf;
1298     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1299     recv_msg->incoming_buf = NULL;
1300 
1301     req->content_fd = recv_msg->fd[0];
1302     recv_msg->fd[0] = -1;
1303 
1304     req->response_max_fields = 0;
1305     req_impl->state = NXT_UNIT_RS_START;
1306     req_impl->websocket = 0;
1307     req_impl->in_hash = 0;
1308 
1309     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1310                    (int) r->method_length,
1311                    (char *) nxt_unit_sptr_get(&r->method),
1312                    (int) r->target_length,
1313                    (char *) nxt_unit_sptr_get(&r->target),
1314                    (int) r->content_length);
1315 
1316     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1317 
1318     res = nxt_unit_request_check_response_port(req, &port_id);
1319     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1320         return NXT_UNIT_ERROR;
1321     }
1322 
1323     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1324         res = nxt_unit_send_req_headers_ack(req);
1325         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1326             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1327 
1328             return NXT_UNIT_ERROR;
1329         }
1330 
1331         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1332 
1333         if (req->content_length
1334             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1335         {
1336             res = nxt_unit_request_hash_add(ctx, req);
1337             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1338                 nxt_unit_req_warn(req, "failed to add request to hash");
1339 
1340                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1341 
1342                 return NXT_UNIT_ERROR;
1343             }
1344 
1345             /*
1346              * If application have separate data handler, we may start
1347              * request processing and process data when it is arrived.
1348              */
1349             if (lib->callbacks.data_handler == NULL) {
1350                 return NXT_UNIT_OK;
1351             }
1352         }
1353 
1354         if (preq == NULL) {
1355             lib->callbacks.request_handler(req);
1356 
1357         } else {
1358             *preq = req;
1359         }
1360     }
1361 
1362     return NXT_UNIT_OK;
1363 }
1364 
1365 
1366 static int
1367 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1368 {
1369     uint64_t                 l;
1370     nxt_unit_impl_t          *lib;
1371     nxt_unit_mmap_buf_t      *b;
1372     nxt_unit_request_info_t  *req;
1373 
1374     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1375     if (req == NULL) {
1376         return NXT_UNIT_OK;
1377     }
1378 
1379     l = req->content_buf->end - req->content_buf->free;
1380 
1381     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1382         b->req = req;
1383         l += b->buf.end - b->buf.free;
1384     }
1385 
1386     if (recv_msg->incoming_buf != NULL) {
1387         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1388 
1389         while (b->next != NULL) {
1390             b = b->next;
1391         }
1392 
1393         /* "Move" incoming buffer list to req_impl. */
1394         b->next = recv_msg->incoming_buf;
1395         b->next->prev = &b->next;
1396 
1397         recv_msg->incoming_buf = NULL;
1398     }
1399 
1400     req->content_fd = recv_msg->fd[0];
1401     recv_msg->fd[0] = -1;
1402 
1403     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1404 
1405     if (lib->callbacks.data_handler != NULL) {
1406         lib->callbacks.data_handler(req);
1407 
1408         return NXT_UNIT_OK;
1409     }
1410 
1411     if (req->content_fd != -1 || l == req->content_length) {
1412         lib->callbacks.request_handler(req);
1413     }
1414 
1415     return NXT_UNIT_OK;
1416 }
1417 
1418 
1419 static int
1420 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1421     nxt_unit_port_id_t *port_id)
1422 {
1423     int                           res;
1424     nxt_unit_ctx_t                *ctx;
1425     nxt_unit_impl_t               *lib;
1426     nxt_unit_port_t               *port;
1427     nxt_unit_process_t            *process;
1428     nxt_unit_ctx_impl_t           *ctx_impl;
1429     nxt_unit_port_impl_t          *port_impl;
1430     nxt_unit_request_info_impl_t  *req_impl;
1431 
1432     ctx = req->ctx;
1433     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1434     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1435 
1436     pthread_mutex_lock(&lib->mutex);
1437 
1438     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1439     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1440 
1441     if (nxt_fast_path(port != NULL)) {
1442         req->response_port = port;
1443 
1444         if (nxt_fast_path(port_impl->ready)) {
1445             pthread_mutex_unlock(&lib->mutex);
1446 
1447             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1448                            (int) port->id.pid, (int) port->id.id);
1449 
1450             return NXT_UNIT_OK;
1451         }
1452 
1453         nxt_unit_debug(ctx, "check_response_port: "
1454                        "port{%d,%d} already requested",
1455                        (int) port->id.pid, (int) port->id.id);
1456 
1457         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1458 
1459         nxt_queue_insert_tail(&port_impl->awaiting_req,
1460                               &req_impl->port_wait_link);
1461 
1462         pthread_mutex_unlock(&lib->mutex);
1463 
1464         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1465 
1466         return NXT_UNIT_AGAIN;
1467     }
1468 
1469     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1470     if (nxt_slow_path(port_impl == NULL)) {
1471         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1472                        (int) sizeof(nxt_unit_port_impl_t));
1473 
1474         pthread_mutex_unlock(&lib->mutex);
1475 
1476         return NXT_UNIT_ERROR;
1477     }
1478 
1479     port = &port_impl->port;
1480 
1481     port->id = *port_id;
1482     port->in_fd = -1;
1483     port->out_fd = -1;
1484     port->data = NULL;
1485 
1486     res = nxt_unit_port_hash_add(&lib->ports, port);
1487     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1488         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1489                        port->id.pid, port->id.id);
1490 
1491         pthread_mutex_unlock(&lib->mutex);
1492 
1493         nxt_unit_free(ctx, port);
1494 
1495         return NXT_UNIT_ERROR;
1496     }
1497 
1498     process = nxt_unit_process_find(lib, port_id->pid, 0);
1499     if (nxt_slow_path(process == NULL)) {
1500         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1501                        port->id.pid);
1502 
1503         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1504 
1505         pthread_mutex_unlock(&lib->mutex);
1506 
1507         nxt_unit_free(ctx, port);
1508 
1509         return NXT_UNIT_ERROR;
1510     }
1511 
1512     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1513 
1514     port_impl->process = process;
1515     port_impl->queue = NULL;
1516     port_impl->from_socket = 0;
1517     port_impl->socket_rbuf = NULL;
1518 
1519     nxt_queue_init(&port_impl->awaiting_req);
1520 
1521     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1522 
1523     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1524 
1525     port_impl->use_count = 2;
1526     port_impl->ready = 0;
1527 
1528     req->response_port = port;
1529 
1530     pthread_mutex_unlock(&lib->mutex);
1531 
1532     res = nxt_unit_get_port(ctx, port_id);
1533     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1534         return NXT_UNIT_ERROR;
1535     }
1536 
1537     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1538 
1539     return NXT_UNIT_AGAIN;
1540 }
1541 
1542 
1543 static int
1544 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1545 {
1546     ssize_t                       res;
1547     nxt_port_msg_t                msg;
1548     nxt_unit_impl_t               *lib;
1549     nxt_unit_ctx_impl_t           *ctx_impl;
1550     nxt_unit_request_info_impl_t  *req_impl;
1551 
1552     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1553     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1554     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1555 
1556     memset(&msg, 0, sizeof(nxt_port_msg_t));
1557 
1558     msg.stream = req_impl->stream;
1559     msg.pid = lib->pid;
1560     msg.reply_port = ctx_impl->read_port->id.id;
1561     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1562 
1563     res = nxt_unit_port_send(req->ctx, req->response_port,
1564                              &msg, sizeof(msg), NULL, 0);
1565     if (nxt_slow_path(res != sizeof(msg))) {
1566         return NXT_UNIT_ERROR;
1567     }
1568 
1569     return NXT_UNIT_OK;
1570 }
1571 
1572 
1573 static int
1574 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1575 {
1576     size_t                           hsize;
1577     nxt_unit_impl_t                  *lib;
1578     nxt_unit_mmap_buf_t              *b;
1579     nxt_unit_callbacks_t             *cb;
1580     nxt_unit_request_info_t          *req;
1581     nxt_unit_request_info_impl_t     *req_impl;
1582     nxt_unit_websocket_frame_impl_t  *ws_impl;
1583 
1584     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1585     if (nxt_slow_path(req == NULL)) {
1586         return NXT_UNIT_OK;
1587     }
1588 
1589     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1590 
1591     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1592     cb = &lib->callbacks;
1593 
1594     if (cb->websocket_handler && recv_msg->size >= 2) {
1595         ws_impl = nxt_unit_websocket_frame_get(ctx);
1596         if (nxt_slow_path(ws_impl == NULL)) {
1597             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1598                           req_impl->stream);
1599 
1600             return NXT_UNIT_ERROR;
1601         }
1602 
1603         ws_impl->ws.req = req;
1604 
1605         ws_impl->buf = NULL;
1606 
1607         if (recv_msg->mmap) {
1608             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1609                 b->req = req;
1610             }
1611 
1612             /* "Move" incoming buffer list to ws_impl. */
1613             ws_impl->buf = recv_msg->incoming_buf;
1614             ws_impl->buf->prev = &ws_impl->buf;
1615             recv_msg->incoming_buf = NULL;
1616 
1617             b = ws_impl->buf;
1618 
1619         } else {
1620             b = nxt_unit_mmap_buf_get(ctx);
1621             if (nxt_slow_path(b == NULL)) {
1622                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1623                                req_impl->stream);
1624 
1625                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1626 
1627                 return NXT_UNIT_ERROR;
1628             }
1629 
1630             b->req = req;
1631             b->buf.start = recv_msg->start;
1632             b->buf.free = b->buf.start;
1633             b->buf.end = b->buf.start + recv_msg->size;
1634 
1635             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1636         }
1637 
1638         ws_impl->ws.header = (void *) b->buf.start;
1639         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1640             ws_impl->ws.header);
1641 
1642         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1643 
1644         if (ws_impl->ws.header->mask) {
1645             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1646 
1647         } else {
1648             ws_impl->ws.mask = NULL;
1649         }
1650 
1651         b->buf.free += hsize;
1652 
1653         ws_impl->ws.content_buf = &b->buf;
1654         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1655 
1656         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1657                            "payload_len=%"PRIu64,
1658                             ws_impl->ws.header->opcode,
1659                             ws_impl->ws.payload_len);
1660 
1661         cb->websocket_handler(&ws_impl->ws);
1662     }
1663 
1664     if (recv_msg->last) {
1665         if (cb->close_handler) {
1666             nxt_unit_req_debug(req, "close_handler");
1667 
1668             cb->close_handler(req);
1669 
1670         } else {
1671             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1672         }
1673     }
1674 
1675     return NXT_UNIT_OK;
1676 }
1677 
1678 
1679 static int
1680 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1681 {
1682     nxt_unit_impl_t       *lib;
1683     nxt_unit_callbacks_t  *cb;
1684 
1685     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1686     cb = &lib->callbacks;
1687 
1688     if (cb->shm_ack_handler != NULL) {
1689         cb->shm_ack_handler(ctx);
1690     }
1691 
1692     return NXT_UNIT_OK;
1693 }
1694 
1695 
1696 static nxt_unit_request_info_impl_t *
1697 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1698 {
1699     nxt_unit_impl_t               *lib;
1700     nxt_queue_link_t              *lnk;
1701     nxt_unit_ctx_impl_t           *ctx_impl;
1702     nxt_unit_request_info_impl_t  *req_impl;
1703 
1704     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1705 
1706     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1707 
1708     pthread_mutex_lock(&ctx_impl->mutex);
1709 
1710     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1711         pthread_mutex_unlock(&ctx_impl->mutex);
1712 
1713         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1714                                         + lib->request_data_size);
1715         if (nxt_slow_path(req_impl == NULL)) {
1716             return NULL;
1717         }
1718 
1719         req_impl->req.unit = ctx->unit;
1720         req_impl->req.ctx = ctx;
1721 
1722         pthread_mutex_lock(&ctx_impl->mutex);
1723 
1724     } else {
1725         lnk = nxt_queue_first(&ctx_impl->free_req);
1726         nxt_queue_remove(lnk);
1727 
1728         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1729     }
1730 
1731     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1732 
1733     pthread_mutex_unlock(&ctx_impl->mutex);
1734 
1735     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1736 
1737     return req_impl;
1738 }
1739 
1740 
1741 static void
1742 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1743 {
1744     nxt_unit_ctx_impl_t           *ctx_impl;
1745     nxt_unit_request_info_impl_t  *req_impl;
1746 
1747     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1748     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1749 
1750     req->response = NULL;
1751     req->response_buf = NULL;
1752 
1753     if (req_impl->in_hash) {
1754         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1755     }
1756 
1757     while (req_impl->outgoing_buf != NULL) {
1758         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1759     }
1760 
1761     while (req_impl->incoming_buf != NULL) {
1762         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1763     }
1764 
1765     if (req->content_fd != -1) {
1766         nxt_unit_close(req->content_fd);
1767 
1768         req->content_fd = -1;
1769     }
1770 
1771     if (req->response_port != NULL) {
1772         nxt_unit_port_release(req->response_port);
1773 
1774         req->response_port = NULL;
1775     }
1776 
1777     req_impl->state = NXT_UNIT_RS_RELEASED;
1778 
1779     pthread_mutex_lock(&ctx_impl->mutex);
1780 
1781     nxt_queue_remove(&req_impl->link);
1782 
1783     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1784 
1785     pthread_mutex_unlock(&ctx_impl->mutex);
1786 }
1787 
1788 
1789 static void
1790 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1791 {
1792     nxt_unit_ctx_impl_t  *ctx_impl;
1793 
1794     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1795 
1796     nxt_queue_remove(&req_impl->link);
1797 
1798     if (req_impl != &ctx_impl->req) {
1799         nxt_unit_free(&ctx_impl->ctx, req_impl);
1800     }
1801 }
1802 
1803 
1804 static nxt_unit_websocket_frame_impl_t *
1805 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1806 {
1807     nxt_queue_link_t                 *lnk;
1808     nxt_unit_ctx_impl_t              *ctx_impl;
1809     nxt_unit_websocket_frame_impl_t  *ws_impl;
1810 
1811     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1812 
1813     pthread_mutex_lock(&ctx_impl->mutex);
1814 
1815     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1816         pthread_mutex_unlock(&ctx_impl->mutex);
1817 
1818         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1819         if (nxt_slow_path(ws_impl == NULL)) {
1820             return NULL;
1821         }
1822 
1823     } else {
1824         lnk = nxt_queue_first(&ctx_impl->free_ws);
1825         nxt_queue_remove(lnk);
1826 
1827         pthread_mutex_unlock(&ctx_impl->mutex);
1828 
1829         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1830     }
1831 
1832     ws_impl->ctx_impl = ctx_impl;
1833 
1834     return ws_impl;
1835 }
1836 
1837 
1838 static void
1839 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1840 {
1841     nxt_unit_websocket_frame_impl_t  *ws_impl;
1842 
1843     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1844 
1845     while (ws_impl->buf != NULL) {
1846         nxt_unit_mmap_buf_free(ws_impl->buf);
1847     }
1848 
1849     ws->req = NULL;
1850 
1851     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1852 
1853     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1854 
1855     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1856 }
1857 
1858 
1859 static void
1860 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1861     nxt_unit_websocket_frame_impl_t *ws_impl)
1862 {
1863     nxt_queue_remove(&ws_impl->link);
1864 
1865     nxt_unit_free(ctx, ws_impl);
1866 }
1867 
1868 
1869 uint16_t
1870 nxt_unit_field_hash(const char *name, size_t name_length)
1871 {
1872     u_char      ch;
1873     uint32_t    hash;
1874     const char  *p, *end;
1875 
1876     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1877     end = name + name_length;
1878 
1879     for (p = name; p < end; p++) {
1880         ch = *p;
1881         hash = (hash << 4) + hash + nxt_lowcase(ch);
1882     }
1883 
1884     hash = (hash >> 16) ^ hash;
1885 
1886     return hash;
1887 }
1888 
1889 
1890 void
1891 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1892 {
1893     char                *name;
1894     uint32_t            i, j;
1895     nxt_unit_field_t    *fields, f;
1896     nxt_unit_request_t  *r;
1897 
1898     static nxt_str_t  content_length = nxt_string("content-length");
1899     static nxt_str_t  content_type = nxt_string("content-type");
1900     static nxt_str_t  cookie = nxt_string("cookie");
1901 
1902     nxt_unit_req_debug(req, "group_dup_fields");
1903 
1904     r = req->request;
1905     fields = r->fields;
1906 
1907     for (i = 0; i < r->fields_count; i++) {
1908         name = nxt_unit_sptr_get(&fields[i].name);
1909 
1910         switch (fields[i].hash) {
1911         case NXT_UNIT_HASH_CONTENT_LENGTH:
1912             if (fields[i].name_length == content_length.length
1913                 && nxt_unit_memcasecmp(name, content_length.start,
1914                                        content_length.length) == 0)
1915             {
1916                 r->content_length_field = i;
1917             }
1918 
1919             break;
1920 
1921         case NXT_UNIT_HASH_CONTENT_TYPE:
1922             if (fields[i].name_length == content_type.length
1923                 && nxt_unit_memcasecmp(name, content_type.start,
1924                                        content_type.length) == 0)
1925             {
1926                 r->content_type_field = i;
1927             }
1928 
1929             break;
1930 
1931         case NXT_UNIT_HASH_COOKIE:
1932             if (fields[i].name_length == cookie.length
1933                 && nxt_unit_memcasecmp(name, cookie.start,
1934                                        cookie.length) == 0)
1935             {
1936                 r->cookie_field = i;
1937             }
1938 
1939             break;
1940         }
1941 
1942         for (j = i + 1; j < r->fields_count; j++) {
1943             if (fields[i].hash != fields[j].hash
1944                 || fields[i].name_length != fields[j].name_length
1945                 || nxt_unit_memcasecmp(name,
1946                                        nxt_unit_sptr_get(&fields[j].name),
1947                                        fields[j].name_length) != 0)
1948             {
1949                 continue;
1950             }
1951 
1952             f = fields[j];
1953             f.value.offset += (j - (i + 1)) * sizeof(f);
1954 
1955             while (j > i + 1) {
1956                 fields[j] = fields[j - 1];
1957                 fields[j].name.offset -= sizeof(f);
1958                 fields[j].value.offset -= sizeof(f);
1959                 j--;
1960             }
1961 
1962             fields[j] = f;
1963 
1964             /* Assign the same name pointer for further grouping simplicity. */
1965             nxt_unit_sptr_set(&fields[j].name, name);
1966 
1967             i++;
1968         }
1969     }
1970 }
1971 
1972 
1973 int
1974 nxt_unit_response_init(nxt_unit_request_info_t *req,
1975     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
1976 {
1977     uint32_t                      buf_size;
1978     nxt_unit_buf_t                *buf;
1979     nxt_unit_request_info_impl_t  *req_impl;
1980 
1981     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1982 
1983     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1984         nxt_unit_req_warn(req, "init: response already sent");
1985 
1986         return NXT_UNIT_ERROR;
1987     }
1988 
1989     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
1990                        (int) max_fields_count, (int) max_fields_size);
1991 
1992     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
1993         nxt_unit_req_debug(req, "duplicate response init");
1994     }
1995 
1996     /*
1997      * Each field name and value 0-terminated by libunit,
1998      * this is the reason of '+ 2' below.
1999      */
2000     buf_size = sizeof(nxt_unit_response_t)
2001                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2002                + max_fields_size;
2003 
2004     if (nxt_slow_path(req->response_buf != NULL)) {
2005         buf = req->response_buf;
2006 
2007         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2008             goto init_response;
2009         }
2010 
2011         nxt_unit_buf_free(buf);
2012 
2013         req->response_buf = NULL;
2014         req->response = NULL;
2015         req->response_max_fields = 0;
2016 
2017         req_impl->state = NXT_UNIT_RS_START;
2018     }
2019 
2020     buf = nxt_unit_response_buf_alloc(req, buf_size);
2021     if (nxt_slow_path(buf == NULL)) {
2022         return NXT_UNIT_ERROR;
2023     }
2024 
2025 init_response:
2026 
2027     memset(buf->start, 0, sizeof(nxt_unit_response_t));
2028 
2029     req->response_buf = buf;
2030 
2031     req->response = (nxt_unit_response_t *) buf->start;
2032     req->response->status = status;
2033 
2034     buf->free = buf->start + sizeof(nxt_unit_response_t)
2035                 + max_fields_count * sizeof(nxt_unit_field_t);
2036 
2037     req->response_max_fields = max_fields_count;
2038     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2039 
2040     return NXT_UNIT_OK;
2041 }
2042 
2043 
2044 int
2045 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2046     uint32_t max_fields_count, uint32_t max_fields_size)
2047 {
2048     char                          *p;
2049     uint32_t                      i, buf_size;
2050     nxt_unit_buf_t                *buf;
2051     nxt_unit_field_t              *f, *src;
2052     nxt_unit_response_t           *resp;
2053     nxt_unit_request_info_impl_t  *req_impl;
2054 
2055     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2056 
2057     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2058         nxt_unit_req_warn(req, "realloc: response not init");
2059 
2060         return NXT_UNIT_ERROR;
2061     }
2062 
2063     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2064         nxt_unit_req_warn(req, "realloc: response already sent");
2065 
2066         return NXT_UNIT_ERROR;
2067     }
2068 
2069     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2070         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2071 
2072         return NXT_UNIT_ERROR;
2073     }
2074 
2075     /*
2076      * Each field name and value 0-terminated by libunit,
2077      * this is the reason of '+ 2' below.
2078      */
2079     buf_size = sizeof(nxt_unit_response_t)
2080                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2081                + max_fields_size;
2082 
2083     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2084 
2085     buf = nxt_unit_response_buf_alloc(req, buf_size);
2086     if (nxt_slow_path(buf == NULL)) {
2087         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2088         return NXT_UNIT_ERROR;
2089     }
2090 
2091     resp = (nxt_unit_response_t *) buf->start;
2092 
2093     memset(resp, 0, sizeof(nxt_unit_response_t));
2094 
2095     resp->status = req->response->status;
2096     resp->content_length = req->response->content_length;
2097 
2098     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2099     f = resp->fields;
2100 
2101     for (i = 0; i < req->response->fields_count; i++) {
2102         src = req->response->fields + i;
2103 
2104         if (nxt_slow_path(src->skip != 0)) {
2105             continue;
2106         }
2107 
2108         if (nxt_slow_path(src->name_length + src->value_length + 2
2109                           > (uint32_t) (buf->end - p)))
2110         {
2111             nxt_unit_req_warn(req, "realloc: not enough space for field"
2112                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2113                   i, src, src->name_length, src->value_length);
2114 
2115             goto fail;
2116         }
2117 
2118         nxt_unit_sptr_set(&f->name, p);
2119         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2120         *p++ = '\0';
2121 
2122         nxt_unit_sptr_set(&f->value, p);
2123         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2124         *p++ = '\0';
2125 
2126         f->hash = src->hash;
2127         f->skip = 0;
2128         f->name_length = src->name_length;
2129         f->value_length = src->value_length;
2130 
2131         resp->fields_count++;
2132         f++;
2133     }
2134 
2135     if (req->response->piggyback_content_length > 0) {
2136         if (nxt_slow_path(req->response->piggyback_content_length
2137                           > (uint32_t) (buf->end - p)))
2138         {
2139             nxt_unit_req_warn(req, "realloc: not enought space for content"
2140                   " #%"PRIu32", %"PRIu32" required",
2141                   i, req->response->piggyback_content_length);
2142 
2143             goto fail;
2144         }
2145 
2146         resp->piggyback_content_length =
2147                                        req->response->piggyback_content_length;
2148 
2149         nxt_unit_sptr_set(&resp->piggyback_content, p);
2150         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2151                        req->response->piggyback_content_length);
2152     }
2153 
2154     buf->free = p;
2155 
2156     nxt_unit_buf_free(req->response_buf);
2157 
2158     req->response = resp;
2159     req->response_buf = buf;
2160     req->response_max_fields = max_fields_count;
2161 
2162     return NXT_UNIT_OK;
2163 
2164 fail:
2165 
2166     nxt_unit_buf_free(buf);
2167 
2168     return NXT_UNIT_ERROR;
2169 }
2170 
2171 
2172 int
2173 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2174 {
2175     nxt_unit_request_info_impl_t  *req_impl;
2176 
2177     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2178 
2179     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2180 }
2181 
2182 
2183 int
2184 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2185     const char *name, uint8_t name_length,
2186     const char *value, uint32_t value_length)
2187 {
2188     nxt_unit_buf_t                *buf;
2189     nxt_unit_field_t              *f;
2190     nxt_unit_response_t           *resp;
2191     nxt_unit_request_info_impl_t  *req_impl;
2192 
2193     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2194 
2195     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2196         nxt_unit_req_warn(req, "add_field: response not initialized or "
2197                           "already sent");
2198 
2199         return NXT_UNIT_ERROR;
2200     }
2201 
2202     resp = req->response;
2203 
2204     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2205         nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2206                           (int) resp->fields_count);
2207 
2208         return NXT_UNIT_ERROR;
2209     }
2210 
2211     buf = req->response_buf;
2212 
2213     if (nxt_slow_path(name_length + value_length + 2
2214                       > (uint32_t) (buf->end - buf->free)))
2215     {
2216         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2217 
2218         return NXT_UNIT_ERROR;
2219     }
2220 
2221     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2222                        resp->fields_count,
2223                        (int) name_length, name,
2224                        (int) value_length, value);
2225 
2226     f = resp->fields + resp->fields_count;
2227 
2228     nxt_unit_sptr_set(&f->name, buf->free);
2229     buf->free = nxt_cpymem(buf->free, name, name_length);
2230     *buf->free++ = '\0';
2231 
2232     nxt_unit_sptr_set(&f->value, buf->free);
2233     buf->free = nxt_cpymem(buf->free, value, value_length);
2234     *buf->free++ = '\0';
2235 
2236     f->hash = nxt_unit_field_hash(name, name_length);
2237     f->skip = 0;
2238     f->name_length = name_length;
2239     f->value_length = value_length;
2240 
2241     resp->fields_count++;
2242 
2243     return NXT_UNIT_OK;
2244 }
2245 
2246 
2247 int
2248 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2249     const void* src, uint32_t size)
2250 {
2251     nxt_unit_buf_t                *buf;
2252     nxt_unit_response_t           *resp;
2253     nxt_unit_request_info_impl_t  *req_impl;
2254 
2255     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2256 
2257     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2258         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2259 
2260         return NXT_UNIT_ERROR;
2261     }
2262 
2263     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2264         nxt_unit_req_warn(req, "add_content: response already sent");
2265 
2266         return NXT_UNIT_ERROR;
2267     }
2268 
2269     buf = req->response_buf;
2270 
2271     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2272         nxt_unit_req_warn(req, "add_content: buffer overflow");
2273 
2274         return NXT_UNIT_ERROR;
2275     }
2276 
2277     resp = req->response;
2278 
2279     if (resp->piggyback_content_length == 0) {
2280         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2281         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2282     }
2283 
2284     resp->piggyback_content_length += size;
2285 
2286     buf->free = nxt_cpymem(buf->free, src, size);
2287 
2288     return NXT_UNIT_OK;
2289 }
2290 
2291 
2292 int
2293 nxt_unit_response_send(nxt_unit_request_info_t *req)
2294 {
2295     int                           rc;
2296     nxt_unit_mmap_buf_t           *mmap_buf;
2297     nxt_unit_request_info_impl_t  *req_impl;
2298 
2299     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2300 
2301     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2302         nxt_unit_req_warn(req, "send: response is not initialized yet");
2303 
2304         return NXT_UNIT_ERROR;
2305     }
2306 
2307     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2308         nxt_unit_req_warn(req, "send: response already sent");
2309 
2310         return NXT_UNIT_ERROR;
2311     }
2312 
2313     if (req->request->websocket_handshake && req->response->status == 101) {
2314         nxt_unit_response_upgrade(req);
2315     }
2316 
2317     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2318                        req->response->fields_count,
2319                        (int) (req->response_buf->free
2320                               - req->response_buf->start));
2321 
2322     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2323 
2324     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2325     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2326         req->response = NULL;
2327         req->response_buf = NULL;
2328         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2329 
2330         nxt_unit_mmap_buf_free(mmap_buf);
2331     }
2332 
2333     return rc;
2334 }
2335 
2336 
2337 int
2338 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2339 {
2340     nxt_unit_request_info_impl_t  *req_impl;
2341 
2342     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2343 
2344     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2345 }
2346 
2347 
2348 nxt_unit_buf_t *
2349 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2350 {
2351     int                           rc;
2352     nxt_unit_mmap_buf_t           *mmap_buf;
2353     nxt_unit_request_info_impl_t  *req_impl;
2354 
2355     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2356         nxt_unit_req_warn(req, "response_buf_alloc: "
2357                           "requested buffer (%"PRIu32") too big", size);
2358 
2359         return NULL;
2360     }
2361 
2362     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2363 
2364     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2365 
2366     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2367     if (nxt_slow_path(mmap_buf == NULL)) {
2368         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2369 
2370         return NULL;
2371     }
2372 
2373     mmap_buf->req = req;
2374 
2375     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2376 
2377     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2378                                    size, size, mmap_buf,
2379                                    NULL);
2380     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2381         nxt_unit_mmap_buf_release(mmap_buf);
2382 
2383         nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2384 
2385         return NULL;
2386     }
2387 
2388     return &mmap_buf->buf;
2389 }
2390 
2391 
2392 static nxt_unit_mmap_buf_t *
2393 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2394 {
2395     nxt_unit_mmap_buf_t  *mmap_buf;
2396     nxt_unit_ctx_impl_t  *ctx_impl;
2397 
2398     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2399 
2400     pthread_mutex_lock(&ctx_impl->mutex);
2401 
2402     if (ctx_impl->free_buf == NULL) {
2403         pthread_mutex_unlock(&ctx_impl->mutex);
2404 
2405         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2406         if (nxt_slow_path(mmap_buf == NULL)) {
2407             return NULL;
2408         }
2409 
2410     } else {
2411         mmap_buf = ctx_impl->free_buf;
2412 
2413         nxt_unit_mmap_buf_unlink(mmap_buf);
2414 
2415         pthread_mutex_unlock(&ctx_impl->mutex);
2416     }
2417 
2418     mmap_buf->ctx_impl = ctx_impl;
2419 
2420     mmap_buf->hdr = NULL;
2421     mmap_buf->free_ptr = NULL;
2422 
2423     return mmap_buf;
2424 }
2425 
2426 
2427 static void
2428 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2429 {
2430     nxt_unit_mmap_buf_unlink(mmap_buf);
2431 
2432     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2433 
2434     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2435 
2436     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2437 }
2438 
2439 
2440 int
2441 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2442 {
2443     return req->request->websocket_handshake;
2444 }
2445 
2446 
2447 int
2448 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2449 {
2450     int                           rc;
2451     nxt_unit_request_info_impl_t  *req_impl;
2452 
2453     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2454 
2455     if (nxt_slow_path(req_impl->websocket != 0)) {
2456         nxt_unit_req_debug(req, "upgrade: already upgraded");
2457 
2458         return NXT_UNIT_OK;
2459     }
2460 
2461     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2462         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2463 
2464         return NXT_UNIT_ERROR;
2465     }
2466 
2467     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2468         nxt_unit_req_warn(req, "upgrade: response already sent");
2469 
2470         return NXT_UNIT_ERROR;
2471     }
2472 
2473     rc = nxt_unit_request_hash_add(req->ctx, req);
2474     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2475         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2476 
2477         return NXT_UNIT_ERROR;
2478     }
2479 
2480     req_impl->websocket = 1;
2481 
2482     req->response->status = 101;
2483 
2484     return NXT_UNIT_OK;
2485 }
2486 
2487 
2488 int
2489 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2490 {
2491     nxt_unit_request_info_impl_t  *req_impl;
2492 
2493     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2494 
2495     return req_impl->websocket;
2496 }
2497 
2498 
2499 nxt_unit_request_info_t *
2500 nxt_unit_get_request_info_from_data(void *data)
2501 {
2502     nxt_unit_request_info_impl_t  *req_impl;
2503 
2504     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2505 
2506     return &req_impl->req;
2507 }
2508 
2509 
2510 int
2511 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2512 {
2513     int                           rc;
2514     nxt_unit_mmap_buf_t           *mmap_buf;
2515     nxt_unit_request_info_t       *req;
2516     nxt_unit_request_info_impl_t  *req_impl;
2517 
2518     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2519 
2520     req = mmap_buf->req;
2521     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2522 
2523     nxt_unit_req_debug(req, "buf_send: %d bytes",
2524                        (int) (buf->free - buf->start));
2525 
2526     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2527         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2528 
2529         return NXT_UNIT_ERROR;
2530     }
2531 
2532     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2533         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2534 
2535         return NXT_UNIT_ERROR;
2536     }
2537 
2538     if (nxt_fast_path(buf->free > buf->start)) {
2539         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2540         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2541             return rc;
2542         }
2543     }
2544 
2545     nxt_unit_mmap_buf_free(mmap_buf);
2546 
2547     return NXT_UNIT_OK;
2548 }
2549 
2550 
2551 static void
2552 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2553 {
2554     int                      rc;
2555     nxt_unit_mmap_buf_t      *mmap_buf;
2556     nxt_unit_request_info_t  *req;
2557 
2558     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2559 
2560     req = mmap_buf->req;
2561 
2562     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2563     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2564         nxt_unit_mmap_buf_free(mmap_buf);
2565 
2566         nxt_unit_request_info_release(req);
2567 
2568     } else {
2569         nxt_unit_request_done(req, rc);
2570     }
2571 }
2572 
2573 
2574 static int
2575 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2576     nxt_unit_mmap_buf_t *mmap_buf, int last)
2577 {
2578     struct {
2579         nxt_port_msg_t       msg;
2580         nxt_port_mmap_msg_t  mmap_msg;
2581     } m;
2582 
2583     int                           rc;
2584     u_char                        *last_used, *first_free;
2585     ssize_t                       res;
2586     nxt_chunk_id_t                first_free_chunk;
2587     nxt_unit_buf_t                *buf;
2588     nxt_unit_impl_t               *lib;
2589     nxt_port_mmap_header_t        *hdr;
2590     nxt_unit_request_info_impl_t  *req_impl;
2591 
2592     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2593     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2594 
2595     buf = &mmap_buf->buf;
2596     hdr = mmap_buf->hdr;
2597 
2598     m.mmap_msg.size = buf->free - buf->start;
2599 
2600     m.msg.stream = req_impl->stream;
2601     m.msg.pid = lib->pid;
2602     m.msg.reply_port = 0;
2603     m.msg.type = _NXT_PORT_MSG_DATA;
2604     m.msg.last = last != 0;
2605     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2606     m.msg.nf = 0;
2607     m.msg.mf = 0;
2608     m.msg.tracking = 0;
2609 
2610     rc = NXT_UNIT_ERROR;
2611 
2612     if (m.msg.mmap) {
2613         m.mmap_msg.mmap_id = hdr->id;
2614         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2615                                                      (u_char *) buf->start);
2616 
2617         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2618                        req_impl->stream,
2619                        (int) m.mmap_msg.mmap_id,
2620                        (int) m.mmap_msg.chunk_id,
2621                        (int) m.mmap_msg.size);
2622 
2623         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2624                                  NULL, 0);
2625         if (nxt_slow_path(res != sizeof(m))) {
2626             goto free_buf;
2627         }
2628 
2629         last_used = (u_char *) buf->free - 1;
2630         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2631 
2632         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2633             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2634 
2635             buf->start = (char *) first_free;
2636             buf->free = buf->start;
2637 
2638             if (buf->end < buf->start) {
2639                 buf->end = buf->start;
2640             }
2641 
2642         } else {
2643             buf->start = NULL;
2644             buf->free = NULL;
2645             buf->end = NULL;
2646 
2647             mmap_buf->hdr = NULL;
2648         }
2649 
2650         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2651                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2652 
2653         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2654                        (int) lib->outgoing.allocated_chunks);
2655 
2656     } else {
2657         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2658                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2659         {
2660             nxt_unit_alert(req->ctx,
2661                            "#%"PRIu32": failed to send plain memory buffer"
2662                            ": no space reserved for message header",
2663                            req_impl->stream);
2664 
2665             goto free_buf;
2666         }
2667 
2668         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2669 
2670         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2671                        req_impl->stream,
2672                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2673 
2674         res = nxt_unit_port_send(req->ctx, req->response_port,
2675                                  buf->start - sizeof(m.msg),
2676                                  m.mmap_msg.size + sizeof(m.msg),
2677                                  NULL, 0);
2678         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2679             goto free_buf;
2680         }
2681     }
2682 
2683     rc = NXT_UNIT_OK;
2684 
2685 free_buf:
2686 
2687     nxt_unit_free_outgoing_buf(mmap_buf);
2688 
2689     return rc;
2690 }
2691 
2692 
2693 void
2694 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2695 {
2696     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2697 }
2698 
2699 
2700 static void
2701 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2702 {
2703     nxt_unit_free_outgoing_buf(mmap_buf);
2704 
2705     nxt_unit_mmap_buf_release(mmap_buf);
2706 }
2707 
2708 
2709 static void
2710 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2711 {
2712     if (mmap_buf->hdr != NULL) {
2713         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2714                               mmap_buf->hdr, mmap_buf->buf.start,
2715                               mmap_buf->buf.end - mmap_buf->buf.start);
2716 
2717         mmap_buf->hdr = NULL;
2718 
2719         return;
2720     }
2721 
2722     if (mmap_buf->free_ptr != NULL) {
2723         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2724 
2725         mmap_buf->free_ptr = NULL;
2726     }
2727 }
2728 
2729 
2730 static nxt_unit_read_buf_t *
2731 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2732 {
2733     nxt_unit_ctx_impl_t  *ctx_impl;
2734     nxt_unit_read_buf_t  *rbuf;
2735 
2736     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2737 
2738     pthread_mutex_lock(&ctx_impl->mutex);
2739 
2740     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2741 
2742     pthread_mutex_unlock(&ctx_impl->mutex);
2743 
2744     memset(rbuf->oob, 0, sizeof(struct cmsghdr));
2745 
2746     return rbuf;
2747 }
2748 
2749 
2750 static nxt_unit_read_buf_t *
2751 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2752 {
2753     nxt_queue_link_t     *link;
2754     nxt_unit_read_buf_t  *rbuf;
2755 
2756     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2757         link = nxt_queue_first(&ctx_impl->free_rbuf);
2758         nxt_queue_remove(link);
2759 
2760         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2761 
2762         return rbuf;
2763     }
2764 
2765     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2766 
2767     if (nxt_fast_path(rbuf != NULL)) {
2768         rbuf->ctx_impl = ctx_impl;
2769     }
2770 
2771     return rbuf;
2772 }
2773 
2774 
2775 static void
2776 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2777     nxt_unit_read_buf_t *rbuf)
2778 {
2779     nxt_unit_ctx_impl_t  *ctx_impl;
2780 
2781     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2782 
2783     pthread_mutex_lock(&ctx_impl->mutex);
2784 
2785     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2786 
2787     pthread_mutex_unlock(&ctx_impl->mutex);
2788 }
2789 
2790 
2791 nxt_unit_buf_t *
2792 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2793 {
2794     nxt_unit_mmap_buf_t  *mmap_buf;
2795 
2796     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2797 
2798     if (mmap_buf->next == NULL) {
2799         return NULL;
2800     }
2801 
2802     return &mmap_buf->next->buf;
2803 }
2804 
2805 
2806 uint32_t
2807 nxt_unit_buf_max(void)
2808 {
2809     return PORT_MMAP_DATA_SIZE;
2810 }
2811 
2812 
2813 uint32_t
2814 nxt_unit_buf_min(void)
2815 {
2816     return PORT_MMAP_CHUNK_SIZE;
2817 }
2818 
2819 
2820 int
2821 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2822     size_t size)
2823 {
2824     ssize_t  res;
2825 
2826     res = nxt_unit_response_write_nb(req, start, size, size);
2827 
2828     return res < 0 ? -res : NXT_UNIT_OK;
2829 }
2830 
2831 
2832 ssize_t
2833 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2834     size_t size, size_t min_size)
2835 {
2836     int                           rc;
2837     ssize_t                       sent;
2838     uint32_t                      part_size, min_part_size, buf_size;
2839     const char                    *part_start;
2840     nxt_unit_mmap_buf_t           mmap_buf;
2841     nxt_unit_request_info_impl_t  *req_impl;
2842     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2843 
2844     nxt_unit_req_debug(req, "write: %d", (int) size);
2845 
2846     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2847 
2848     part_start = start;
2849     sent = 0;
2850 
2851     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2852         nxt_unit_req_alert(req, "write: response not initialized yet");
2853 
2854         return -NXT_UNIT_ERROR;
2855     }
2856 
2857     /* Check if response is not send yet. */
2858     if (nxt_slow_path(req->response_buf != NULL)) {
2859         part_size = req->response_buf->end - req->response_buf->free;
2860         part_size = nxt_min(size, part_size);
2861 
2862         rc = nxt_unit_response_add_content(req, part_start, part_size);
2863         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2864             return -rc;
2865         }
2866 
2867         rc = nxt_unit_response_send(req);
2868         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2869             return -rc;
2870         }
2871 
2872         size -= part_size;
2873         part_start += part_size;
2874         sent += part_size;
2875 
2876         min_size -= nxt_min(min_size, part_size);
2877     }
2878 
2879     while (size > 0) {
2880         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2881         min_part_size = nxt_min(min_size, part_size);
2882         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2883 
2884         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2885                                        min_part_size, &mmap_buf, local_buf);
2886         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2887             return -rc;
2888         }
2889 
2890         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2891         if (nxt_slow_path(buf_size == 0)) {
2892             return sent;
2893         }
2894         part_size = nxt_min(buf_size, part_size);
2895 
2896         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2897                                        part_start, part_size);
2898 
2899         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2900         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2901             return -rc;
2902         }
2903 
2904         size -= part_size;
2905         part_start += part_size;
2906         sent += part_size;
2907 
2908         min_size -= nxt_min(min_size, part_size);
2909     }
2910 
2911     return sent;
2912 }
2913 
2914 
2915 int
2916 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2917     nxt_unit_read_info_t *read_info)
2918 {
2919     int                           rc;
2920     ssize_t                       n;
2921     uint32_t                      buf_size;
2922     nxt_unit_buf_t                *buf;
2923     nxt_unit_mmap_buf_t           mmap_buf;
2924     nxt_unit_request_info_impl_t  *req_impl;
2925     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2926 
2927     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2928 
2929     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2930         nxt_unit_req_alert(req, "write: response not initialized yet");
2931 
2932         return NXT_UNIT_ERROR;
2933     }
2934 
2935     /* Check if response is not send yet. */
2936     if (nxt_slow_path(req->response_buf != NULL)) {
2937 
2938         /* Enable content in headers buf. */
2939         rc = nxt_unit_response_add_content(req, "", 0);
2940         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2941             nxt_unit_req_error(req, "Failed to add piggyback content");
2942 
2943             return rc;
2944         }
2945 
2946         buf = req->response_buf;
2947 
2948         while (buf->end - buf->free > 0) {
2949             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2950             if (nxt_slow_path(n < 0)) {
2951                 nxt_unit_req_error(req, "Read error");
2952 
2953                 return NXT_UNIT_ERROR;
2954             }
2955 
2956             /* Manually increase sizes. */
2957             buf->free += n;
2958             req->response->piggyback_content_length += n;
2959 
2960             if (read_info->eof) {
2961                 break;
2962             }
2963         }
2964 
2965         rc = nxt_unit_response_send(req);
2966         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2967             nxt_unit_req_error(req, "Failed to send headers with content");
2968 
2969             return rc;
2970         }
2971 
2972         if (read_info->eof) {
2973             return NXT_UNIT_OK;
2974         }
2975     }
2976 
2977     while (!read_info->eof) {
2978         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
2979                            read_info->buf_size);
2980 
2981         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
2982 
2983         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2984                                        buf_size, buf_size,
2985                                        &mmap_buf, local_buf);
2986         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2987             return rc;
2988         }
2989 
2990         buf = &mmap_buf.buf;
2991 
2992         while (!read_info->eof && buf->end > buf->free) {
2993             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2994             if (nxt_slow_path(n < 0)) {
2995                 nxt_unit_req_error(req, "Read error");
2996 
2997                 nxt_unit_free_outgoing_buf(&mmap_buf);
2998 
2999                 return NXT_UNIT_ERROR;
3000             }
3001 
3002             buf->free += n;
3003         }
3004 
3005         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3006         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3007             nxt_unit_req_error(req, "Failed to send content");
3008 
3009             return rc;
3010         }
3011     }
3012 
3013     return NXT_UNIT_OK;
3014 }
3015 
3016 
3017 ssize_t
3018 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3019 {
3020     ssize_t  buf_res, res;
3021 
3022     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3023                                 dst, size);
3024 
3025     if (buf_res < (ssize_t) size && req->content_fd != -1) {
3026         res = read(req->content_fd, dst, size);
3027         if (nxt_slow_path(res < 0)) {
3028             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3029                                strerror(errno), errno);
3030 
3031             return res;
3032         }
3033 
3034         if (res < (ssize_t) size) {
3035             nxt_unit_close(req->content_fd);
3036 
3037             req->content_fd = -1;
3038         }
3039 
3040         req->content_length -= res;
3041         size -= res;
3042 
3043         dst = nxt_pointer_to(dst, res);
3044 
3045     } else {
3046         res = 0;
3047     }
3048 
3049     return buf_res + res;
3050 }
3051 
3052 
3053 ssize_t
3054 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3055 {
3056     char                 *p;
3057     size_t               l_size, b_size;
3058     nxt_unit_buf_t       *b;
3059     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3060 
3061     if (req->content_length == 0) {
3062         return 0;
3063     }
3064 
3065     l_size = 0;
3066 
3067     b = req->content_buf;
3068 
3069     while (b != NULL) {
3070         b_size = b->end - b->free;
3071         p = memchr(b->free, '\n', b_size);
3072 
3073         if (p != NULL) {
3074             p++;
3075             l_size += p - b->free;
3076             break;
3077         }
3078 
3079         l_size += b_size;
3080 
3081         if (max_size <= l_size) {
3082             break;
3083         }
3084 
3085         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
3086         if (mmap_buf->next == NULL
3087             && req->content_fd != -1
3088             && l_size < req->content_length)
3089         {
3090             preread_buf = nxt_unit_request_preread(req, 16384);
3091             if (nxt_slow_path(preread_buf == NULL)) {
3092                 return -1;
3093             }
3094 
3095             nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
3096         }
3097 
3098         b = nxt_unit_buf_next(b);
3099     }
3100 
3101     return nxt_min(max_size, l_size);
3102 }
3103 
3104 
3105 static nxt_unit_mmap_buf_t *
3106 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
3107 {
3108     ssize_t              res;
3109     nxt_unit_mmap_buf_t  *mmap_buf;
3110 
3111     if (req->content_fd == -1) {
3112         nxt_unit_req_alert(req, "preread: content_fd == -1");
3113         return NULL;
3114     }
3115 
3116     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
3117     if (nxt_slow_path(mmap_buf == NULL)) {
3118         nxt_unit_req_alert(req, "preread: failed to allocate buf");
3119         return NULL;
3120     }
3121 
3122     mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size);
3123     if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3124         nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
3125         nxt_unit_mmap_buf_release(mmap_buf);
3126         return NULL;
3127     }
3128 
3129     mmap_buf->plain_ptr = mmap_buf->free_ptr;
3130 
3131     mmap_buf->hdr = NULL;
3132     mmap_buf->buf.start = mmap_buf->free_ptr;
3133     mmap_buf->buf.free = mmap_buf->buf.start;
3134     mmap_buf->buf.end = mmap_buf->buf.start + size;
3135 
3136     res = read(req->content_fd, mmap_buf->free_ptr, size);
3137     if (res < 0) {
3138         nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3139                            strerror(errno), errno);
3140 
3141         nxt_unit_mmap_buf_free(mmap_buf);
3142 
3143         return NULL;
3144     }
3145 
3146     if (res < (ssize_t) size) {
3147         nxt_unit_close(req->content_fd);
3148 
3149         req->content_fd = -1;
3150     }
3151 
3152     nxt_unit_req_debug(req, "preread: read %d", (int) res);
3153 
3154     mmap_buf->buf.end = mmap_buf->buf.free + res;
3155 
3156     return mmap_buf;
3157 }
3158 
3159 
3160 static ssize_t
3161 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
3162 {
3163     u_char          *p;
3164     size_t          rest, copy, read;
3165     nxt_unit_buf_t  *buf, *last_buf;
3166 
3167     p = dst;
3168     rest = size;
3169 
3170     buf = *b;
3171     last_buf = buf;
3172 
3173     while (buf != NULL) {
3174         last_buf = buf;
3175 
3176         copy = buf->end - buf->free;
3177         copy = nxt_min(rest, copy);
3178 
3179         p = nxt_cpymem(p, buf->free, copy);
3180 
3181         buf->free += copy;
3182         rest -= copy;
3183 
3184         if (rest == 0) {
3185             if (buf->end == buf->free) {
3186                 buf = nxt_unit_buf_next(buf);
3187             }
3188 
3189             break;
3190         }
3191 
3192         buf = nxt_unit_buf_next(buf);
3193     }
3194 
3195     *b = last_buf;
3196 
3197     read = size - rest;
3198 
3199     *len -= read;
3200 
3201     return read;
3202 }
3203 
3204 
3205 void
3206 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
3207 {
3208     uint32_t                      size;
3209     nxt_port_msg_t                msg;
3210     nxt_unit_impl_t               *lib;
3211     nxt_unit_request_info_impl_t  *req_impl;
3212 
3213     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
3214 
3215     nxt_unit_req_debug(req, "done: %d", rc);
3216 
3217     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3218         goto skip_response_send;
3219     }
3220 
3221     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
3222 
3223         size = nxt_length("Content-Type") + nxt_length("text/plain");
3224 
3225         rc = nxt_unit_response_init(req, 200, 1, size);
3226         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3227             goto skip_response_send;
3228         }
3229 
3230         rc = nxt_unit_response_add_field(req, "Content-Type",
3231                                    nxt_length("Content-Type"),
3232                                    "text/plain", nxt_length("text/plain"));
3233         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3234             goto skip_response_send;
3235         }
3236     }
3237 
3238     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
3239 
3240         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
3241 
3242         nxt_unit_buf_send_done(req->response_buf);
3243 
3244         return;
3245     }
3246 
3247 skip_response_send:
3248 
3249     lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
3250 
3251     msg.stream = req_impl->stream;
3252     msg.pid = lib->pid;
3253     msg.reply_port = 0;
3254     msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
3255                                    : _NXT_PORT_MSG_RPC_ERROR;
3256     msg.last = 1;
3257     msg.mmap = 0;
3258     msg.nf = 0;
3259     msg.mf = 0;
3260     msg.tracking = 0;
3261 
3262     (void) nxt_unit_port_send(req->ctx, req->response_port,
3263                               &msg, sizeof(msg), NULL, 0);
3264 
3265     nxt_unit_request_info_release(req);
3266 }
3267 
3268 
3269 int
3270 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
3271     uint8_t last, const void *start, size_t size)
3272 {
3273     const struct iovec  iov = { (void *) start, size };
3274 
3275     return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
3276 }
3277 
3278 
3279 int
3280 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
3281     uint8_t last, const struct iovec *iov, int iovcnt)
3282 {
3283     int                     i, rc;
3284     size_t                  l, copy;
3285     uint32_t                payload_len, buf_size, alloc_size;
3286     const uint8_t           *b;
3287     nxt_unit_buf_t          *buf;
3288     nxt_unit_mmap_buf_t     mmap_buf;
3289     nxt_websocket_header_t  *wh;
3290     char                    local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
3291 
3292     payload_len = 0;
3293 
3294     for (i = 0; i < iovcnt; i++) {
3295         payload_len += iov[i].iov_len;
3296     }
3297 
3298     buf_size = 10 + payload_len;
3299     alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3300 
3301     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3302                                    alloc_size, alloc_size,
3303                                    &mmap_buf, local_buf);
3304     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3305         return rc;
3306     }
3307 
3308     buf = &mmap_buf.buf;
3309 
3310     buf->start[0] = 0;
3311     buf->start[1] = 0;
3312 
3313     buf_size -= buf->end - buf->start;
3314 
3315     wh = (void *) buf->free;
3316 
3317     buf->free = nxt_websocket_frame_init(wh, payload_len);
3318     wh->fin = last;
3319     wh->opcode = opcode;
3320 
3321     for (i = 0; i < iovcnt; i++) {
3322         b = iov[i].iov_base;
3323         l = iov[i].iov_len;
3324 
3325         while (l > 0) {
3326             copy = buf->end - buf->free;
3327             copy = nxt_min(l, copy);
3328 
3329             buf->free = nxt_cpymem(buf->free, b, copy);
3330             b += copy;
3331             l -= copy;
3332 
3333             if (l > 0) {
3334                 if (nxt_fast_path(buf->free > buf->start)) {
3335                     rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3336 
3337                     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3338                         return rc;
3339                     }
3340                 }
3341 
3342                 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3343 
3344                 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3345                                                alloc_size, alloc_size,
3346                                                &mmap_buf, local_buf);
3347                 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3348                     return rc;
3349                 }
3350 
3351                 buf_size -= buf->end - buf->start;
3352             }
3353         }
3354     }
3355 
3356     if (buf->free > buf->start) {
3357         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3358     }
3359 
3360     return rc;
3361 }
3362 
3363 
3364 ssize_t
3365 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
3366     size_t size)
3367 {
3368     ssize_t   res;
3369     uint8_t   *b;
3370     uint64_t  i, d;
3371 
3372     res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
3373                             dst, size);
3374 
3375     if (ws->mask == NULL) {
3376         return res;
3377     }
3378 
3379     b = dst;
3380     d = (ws->payload_len - ws->content_length - res) % 4;
3381 
3382     for (i = 0; i < (uint64_t) res; i++) {
3383         b[i] ^= ws->mask[ (i + d) % 4 ];
3384     }
3385 
3386     return res;
3387 }
3388 
3389 
3390 int
3391 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
3392 {
3393     char                             *b;
3394     size_t                           size, hsize;
3395     nxt_unit_websocket_frame_impl_t  *ws_impl;
3396 
3397     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
3398 
3399     if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
3400         return NXT_UNIT_OK;
3401     }
3402 
3403     size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
3404 
3405     b = nxt_unit_malloc(ws->req->ctx, size);
3406     if (nxt_slow_path(b == NULL)) {
3407         return NXT_UNIT_ERROR;
3408     }
3409 
3410     memcpy(b, ws_impl->buf->buf.start, size);
3411 
3412     hsize = nxt_websocket_frame_header_size(b);
3413 
3414     ws_impl->buf->buf.start = b;
3415     ws_impl->buf->buf.free = b + hsize;
3416     ws_impl->buf->buf.end = b + size;
3417 
3418     ws_impl->buf->free_ptr = b;
3419 
3420     ws_impl->ws.header = (nxt_websocket_header_t *) b;
3421 
3422     if (ws_impl->ws.header->mask) {
3423         ws_impl->ws.mask = (uint8_t *) b + hsize - 4;
3424 
3425     } else {
3426         ws_impl->ws.mask = NULL;
3427     }
3428 
3429     return NXT_UNIT_OK;
3430 }
3431 
3432 
3433 void
3434 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
3435 {
3436     nxt_unit_websocket_frame_release(ws);
3437 }
3438 
3439 
3440 static nxt_port_mmap_header_t *
3441 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3442     nxt_chunk_id_t *c, int *n, int min_n)
3443 {
3444     int                     res, nchunks, i;
3445     uint32_t                outgoing_size;
3446     nxt_unit_mmap_t         *mm, *mm_end;
3447     nxt_unit_impl_t         *lib;
3448     nxt_port_mmap_header_t  *hdr;
3449 
3450     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3451 
3452     pthread_mutex_lock(&lib->outgoing.mutex);
3453 
3454 retry:
3455 
3456     outgoing_size = lib->outgoing.size;
3457 
3458     mm_end = lib->outgoing.elts + outgoing_size;
3459 
3460     for (mm = lib->outgoing.elts; mm < mm_end; mm++) {
3461         hdr = mm->hdr;
3462 
3463         if (hdr->sent_over != 0xFFFFu
3464             && (hdr->sent_over != port->id.id
3465                 || mm->src_thread != pthread_self()))
3466         {
3467             continue;
3468         }
3469 
3470         *c = 0;
3471 
3472         while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
3473             nchunks = 1;
3474 
3475             while (nchunks < *n) {
3476                 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
3477                                                        *c + nchunks);
3478 
3479                 if (res == 0) {
3480                     if (nchunks >= min_n) {
3481                         *n = nchunks;
3482 
3483                         goto unlock;
3484                     }
3485 
3486                     for (i = 0; i < nchunks; i++) {
3487                         nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
3488                     }
3489 
3490                     *c += nchunks + 1;
3491                     nchunks = 0;
3492                     break;
3493                 }
3494 
3495                 nchunks++;
3496             }
3497 
3498             if (nchunks >= min_n) {
3499                 *n = nchunks;
3500 
3501                 goto unlock;
3502             }
3503         }
3504 
3505         hdr->oosm = 1;
3506     }
3507 
3508     if (outgoing_size >= lib->shm_mmap_limit) {
3509         /* Cannot allocate more shared memory. */
3510         pthread_mutex_unlock(&lib->outgoing.mutex);
3511 
3512         if (min_n == 0) {
3513             *n = 0;
3514         }
3515 
3516         if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n
3517                           >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
3518         {
3519             /* Memory allocated by application, but not send to router. */
3520             return NULL;
3521         }
3522 
3523         /* Notify router about OOSM condition. */
3524 
3525         res = nxt_unit_send_oosm(ctx, port);
3526         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3527             return NULL;
3528         }
3529 
3530         /* Return if caller can handle OOSM condition. Non-blocking mode. */
3531 
3532         if (min_n == 0) {
3533             return NULL;
3534         }
3535 
3536         nxt_unit_debug(ctx, "oosm: waiting for ACK");
3537 
3538         res = nxt_unit_wait_shm_ack(ctx);
3539         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3540             return NULL;
3541         }
3542 
3543         nxt_unit_debug(ctx, "oosm: retry");
3544 
3545         pthread_mutex_lock(&lib->outgoing.mutex);
3546 
3547         goto retry;
3548     }
3549 
3550     *c = 0;
3551     hdr = nxt_unit_new_mmap(ctx, port, *n);
3552 
3553 unlock:
3554 
3555     nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n);
3556 
3557     nxt_unit_debug(ctx, "allocated_chunks %d",
3558                    (int) lib->outgoing.allocated_chunks);
3559 
3560     pthread_mutex_unlock(&lib->outgoing.mutex);
3561 
3562     return hdr;
3563 }
3564 
3565 
3566 static int
3567 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
3568 {
3569     ssize_t          res;
3570     nxt_port_msg_t   msg;
3571     nxt_unit_impl_t  *lib;
3572 
3573     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3574 
3575     msg.stream = 0;
3576     msg.pid = lib->pid;
3577     msg.reply_port = 0;
3578     msg.type = _NXT_PORT_MSG_OOSM;
3579     msg.last = 0;
3580     msg.mmap = 0;
3581     msg.nf = 0;
3582     msg.mf = 0;
3583     msg.tracking = 0;
3584 
3585     res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
3586     if (nxt_slow_path(res != sizeof(msg))) {
3587         return NXT_UNIT_ERROR;
3588     }
3589 
3590     return NXT_UNIT_OK;
3591 }
3592 
3593 
3594 static int
3595 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
3596 {
3597     int                  res;
3598     nxt_unit_ctx_impl_t  *ctx_impl;
3599     nxt_unit_read_buf_t  *rbuf;
3600 
3601     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3602 
3603     while (1) {
3604         rbuf = nxt_unit_read_buf_get(ctx);
3605         if (nxt_slow_path(rbuf == NULL)) {
3606             return NXT_UNIT_ERROR;
3607         }
3608 
3609         res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
3610         if (res == NXT_UNIT_ERROR) {
3611             nxt_unit_read_buf_release(ctx, rbuf);
3612 
3613             return NXT_UNIT_ERROR;
3614         }
3615 
3616         if (nxt_unit_is_shm_ack(rbuf)) {
3617             nxt_unit_read_buf_release(ctx, rbuf);
3618             break;
3619         }
3620 
3621         pthread_mutex_lock(&ctx_impl->mutex);
3622 
3623         nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
3624 
3625         pthread_mutex_unlock(&ctx_impl->mutex);
3626 
3627         if (nxt_unit_is_quit(rbuf)) {
3628             nxt_unit_debug(ctx, "oosm: quit received");
3629 
3630             return NXT_UNIT_ERROR;
3631         }
3632     }
3633 
3634     return NXT_UNIT_OK;
3635 }
3636 
3637 
3638 static nxt_unit_mmap_t *
3639 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
3640 {
3641     uint32_t         cap, n;
3642     nxt_unit_mmap_t  *e;
3643 
3644     if (nxt_fast_path(mmaps->size > i)) {
3645         return mmaps->elts + i;
3646     }
3647 
3648     cap = mmaps->cap;
3649 
3650     if (cap == 0) {
3651         cap = i + 1;
3652     }
3653 
3654     while (i + 1 > cap) {
3655 
3656         if (cap < 16) {
3657             cap = cap * 2;
3658 
3659         } else {
3660             cap = cap + cap / 2;
3661         }
3662     }
3663 
3664     if (cap != mmaps->cap) {
3665 
3666         e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t));
3667         if (nxt_slow_path(e == NULL)) {
3668             return NULL;
3669         }
3670 
3671         mmaps->elts = e;
3672 
3673         for (n = mmaps->cap; n < cap; n++) {
3674             e = mmaps->elts + n;
3675 
3676             e->hdr = NULL;
3677             nxt_queue_init(&e->awaiting_rbuf);
3678         }
3679 
3680         mmaps->cap = cap;
3681     }
3682 
3683     if (i + 1 > mmaps->size) {
3684         mmaps->size = i + 1;
3685     }
3686 
3687     return mmaps->elts + i;
3688 }
3689 
3690 
3691 static nxt_port_mmap_header_t *
3692 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
3693 {
3694     int                     i, fd, rc;
3695     void                    *mem;
3696     nxt_unit_mmap_t         *mm;
3697     nxt_unit_impl_t         *lib;
3698     nxt_port_mmap_header_t  *hdr;
3699 
3700     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3701 
3702     mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size);
3703     if (nxt_slow_path(mm == NULL)) {
3704         nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
3705 
3706         return NULL;
3707     }
3708 
3709     fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE);
3710     if (nxt_slow_path(fd == -1)) {
3711         goto remove_fail;
3712     }
3713 
3714     mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3715     if (nxt_slow_path(mem == MAP_FAILED)) {
3716         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3717                        strerror(errno), errno);
3718 
3719         nxt_unit_close(fd);
3720 
3721         goto remove_fail;
3722     }
3723 
3724     mm->hdr = mem;
3725     hdr = mem;
3726 
3727     memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
3728     memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
3729 
3730     hdr->id = lib->outgoing.size - 1;
3731     hdr->src_pid = lib->pid;
3732     hdr->dst_pid = port->id.pid;
3733     hdr->sent_over = port->id.id;
3734     mm->src_thread = pthread_self();
3735 
3736     /* Mark first n chunk(s) as busy */
3737     for (i = 0; i < n; i++) {
3738         nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
3739     }
3740 
3741     /* Mark as busy chunk followed the last available chunk. */
3742     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
3743     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
3744 
3745     pthread_mutex_unlock(&lib->outgoing.mutex);
3746 
3747     rc = nxt_unit_send_mmap(ctx, port, fd);
3748     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3749         munmap(mem, PORT_MMAP_SIZE);
3750         hdr = NULL;
3751 
3752     } else {
3753         nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
3754                        hdr->id, (int) lib->pid, (int) port->id.pid);
3755     }
3756 
3757     nxt_unit_close(fd);
3758 
3759     pthread_mutex_lock(&lib->outgoing.mutex);
3760 
3761     if (nxt_fast_path(hdr != NULL)) {
3762         return hdr;
3763     }
3764 
3765 remove_fail:
3766 
3767     lib->outgoing.size--;
3768 
3769     return NULL;
3770 }
3771 
3772 
3773 static int
3774 nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size)
3775 {
3776     int              fd;
3777     nxt_unit_impl_t  *lib;
3778 
3779     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3780 
3781 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
3782     char             name[64];
3783 
3784     snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
3785              lib->pid, (void *) pthread_self());
3786 #endif
3787 
3788 #if (NXT_HAVE_MEMFD_CREATE)
3789 
3790     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
3791     if (nxt_slow_path(fd == -1)) {
3792         nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
3793                        strerror(errno), errno);
3794 
3795         return -1;
3796     }
3797 
3798     nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
3799 
3800 #elif (NXT_HAVE_SHM_OPEN_ANON)
3801 
3802     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
3803     if (nxt_slow_path(fd == -1)) {
3804         nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
3805                        strerror(errno), errno);
3806 
3807         return -1;
3808     }
3809 
3810 #elif (NXT_HAVE_SHM_OPEN)
3811 
3812     /* Just in case. */
3813     shm_unlink(name);
3814 
3815     fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
3816     if (nxt_slow_path(fd == -1)) {
3817         nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
3818                        strerror(errno), errno);
3819 
3820         return -1;
3821     }
3822 
3823     if (nxt_slow_path(shm_unlink(name) == -1)) {
3824         nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name,
3825                        strerror(errno), errno);
3826     }
3827 
3828 #else
3829 
3830 #error No working shared memory implementation.
3831 
3832 #endif
3833 
3834     if (nxt_slow_path(ftruncate(fd, size) == -1)) {
3835         nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3836                        strerror(errno), errno);
3837 
3838         nxt_unit_close(fd);
3839 
3840         return -1;
3841     }
3842 
3843     return fd;
3844 }
3845 
3846 
3847 static int
3848 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
3849 {
3850     ssize_t          res;
3851     nxt_port_msg_t   msg;
3852     nxt_unit_impl_t  *lib;
3853     union {
3854         struct cmsghdr  cm;
3855         char            space[CMSG_SPACE(sizeof(int))];
3856     } cmsg;
3857 
3858     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3859 
3860     msg.stream = 0;
3861     msg.pid = lib->pid;
3862     msg.reply_port = 0;
3863     msg.type = _NXT_PORT_MSG_MMAP;
3864     msg.last = 0;
3865     msg.mmap = 0;
3866     msg.nf = 0;
3867     msg.mf = 0;
3868     msg.tracking = 0;
3869 
3870     /*
3871      * Fill all padding fields with 0.
3872      * Code in Go 1.11 validate cmsghdr using padding field as part of len.
3873      * See Cmsghdr definition and socketControlMessageHeaderAndData function.
3874      */
3875     memset(&cmsg, 0, sizeof(cmsg));
3876 
3877     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
3878     cmsg.cm.cmsg_level = SOL_SOCKET;
3879     cmsg.cm.cmsg_type = SCM_RIGHTS;
3880 
3881     /*
3882      * memcpy() is used instead of simple
3883      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
3884      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
3885      *   dereferencing type-punned pointer will break strict-aliasing rules
3886      *
3887      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
3888      * in the same simple assignment as in the code above.
3889      */
3890     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
3891 
3892     res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg),
3893                              &cmsg, sizeof(cmsg));
3894     if (nxt_slow_path(res != sizeof(msg))) {
3895         return NXT_UNIT_ERROR;
3896     }
3897 
3898     return NXT_UNIT_OK;
3899 }
3900 
3901 
3902 static int
3903 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3904     uint32_t size, uint32_t min_size,
3905     nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
3906 {
3907     int                     nchunks, min_nchunks;
3908     nxt_chunk_id_t          c;
3909     nxt_port_mmap_header_t  *hdr;
3910 
3911     if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
3912         if (local_buf != NULL) {
3913             mmap_buf->free_ptr = NULL;
3914             mmap_buf->plain_ptr = local_buf;
3915 
3916         } else {
3917             mmap_buf->free_ptr = nxt_unit_malloc(ctx,
3918                                                  size + sizeof(nxt_port_msg_t));
3919             if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3920                 return NXT_UNIT_ERROR;
3921             }
3922 
3923             mmap_buf->plain_ptr = mmap_buf->free_ptr;
3924         }
3925 
3926         mmap_buf->hdr = NULL;
3927         mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
3928         mmap_buf->buf.free = mmap_buf->buf.start;
3929         mmap_buf->buf.end = mmap_buf->buf.start + size;
3930 
3931         nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
3932                        mmap_buf->buf.start, (int) size);
3933 
3934         return NXT_UNIT_OK;
3935     }
3936 
3937     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3938     min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3939 
3940     hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks);
3941     if (nxt_slow_path(hdr == NULL)) {
3942         if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
3943             mmap_buf->hdr = NULL;
3944             mmap_buf->buf.start = NULL;
3945             mmap_buf->buf.free = NULL;
3946             mmap_buf->buf.end = NULL;
3947             mmap_buf->free_ptr = NULL;
3948 
3949             return NXT_UNIT_OK;
3950         }
3951 
3952         return NXT_UNIT_ERROR;
3953     }
3954 
3955     mmap_buf->hdr = hdr;
3956     mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
3957     mmap_buf->buf.free = mmap_buf->buf.start;
3958     mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
3959     mmap_buf->free_ptr = NULL;
3960     mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3961 
3962     nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
3963                   (int) hdr->id, (int) c,
3964                   (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
3965 
3966     return NXT_UNIT_OK;
3967 }
3968 
3969 
3970 static int
3971 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
3972 {
3973     int                     rc;
3974     void                    *mem;
3975     nxt_queue_t             awaiting_rbuf;
3976     struct stat             mmap_stat;
3977     nxt_unit_mmap_t         *mm;
3978     nxt_unit_impl_t         *lib;
3979     nxt_unit_ctx_impl_t     *ctx_impl;
3980     nxt_unit_read_buf_t     *rbuf;
3981     nxt_port_mmap_header_t  *hdr;
3982 
3983     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3984 
3985     nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
3986 
3987     if (fstat(fd, &mmap_stat) == -1) {
3988         nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
3989                        strerror(errno), errno);
3990 
3991         return NXT_UNIT_ERROR;
3992     }
3993 
3994     mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
3995                MAP_SHARED, fd, 0);
3996     if (nxt_slow_path(mem == MAP_FAILED)) {
3997         nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)",
3998                        strerror(errno), errno);
3999 
4000         return NXT_UNIT_ERROR;
4001     }
4002 
4003     hdr = mem;
4004 
4005     if (nxt_slow_path(hdr->src_pid != pid)) {
4006 
4007         nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header "
4008                        "detected: %d != %d or %d != %d", (int) hdr->src_pid,
4009                        (int) pid, (int) hdr->dst_pid, (int) lib->pid);
4010 
4011         munmap(mem, PORT_MMAP_SIZE);
4012 
4013         return NXT_UNIT_ERROR;
4014     }
4015 
4016     nxt_queue_init(&awaiting_rbuf);
4017 
4018     pthread_mutex_lock(&lib->incoming.mutex);
4019 
4020     mm = nxt_unit_mmap_at(&lib->incoming, hdr->id);
4021     if (nxt_slow_path(mm == NULL)) {
4022         nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array");
4023 
4024         munmap(mem, PORT_MMAP_SIZE);
4025 
4026         rc = NXT_UNIT_ERROR;
4027 
4028     } else {
4029         mm->hdr = hdr;
4030 
4031         hdr->sent_over = 0xFFFFu;
4032 
4033         nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf);
4034         nxt_queue_init(&mm->awaiting_rbuf);
4035 
4036         rc = NXT_UNIT_OK;
4037     }
4038 
4039     pthread_mutex_unlock(&lib->incoming.mutex);
4040 
4041     nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
4042 
4043         ctx_impl = rbuf->ctx_impl;
4044 
4045         pthread_mutex_lock(&ctx_impl->mutex);
4046 
4047         nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link);
4048 
4049         pthread_mutex_unlock(&ctx_impl->mutex);
4050 
4051         nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
4052 
4053         nxt_unit_awake_ctx(ctx, ctx_impl);
4054 
4055     } nxt_queue_loop;
4056 
4057     return rc;
4058 }
4059 
4060 
4061 static void
4062 nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl)
4063 {
4064     nxt_port_msg_t  msg;
4065 
4066     if (nxt_fast_path(ctx == &ctx_impl->ctx)) {
4067         return;
4068     }
4069 
4070     if (nxt_slow_path(ctx_impl->read_port == NULL
4071                       || ctx_impl->read_port->out_fd == -1))
4072     {
4073         nxt_unit_alert(ctx, "target context read_port is NULL or not writable");
4074 
4075         return;
4076     }
4077 
4078     memset(&msg, 0, sizeof(nxt_port_msg_t));
4079 
4080     msg.type = _NXT_PORT_MSG_RPC_READY;
4081 
4082     (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
4083                               &msg, sizeof(msg), NULL, 0);
4084 }
4085 
4086 
4087 static void
4088 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
4089 {
4090     pthread_mutex_init(&mmaps->mutex, NULL);
4091 
4092     mmaps->size = 0;
4093     mmaps->cap = 0;
4094     mmaps->elts = NULL;
4095     mmaps->allocated_chunks = 0;
4096 }
4097 
4098 
4099 nxt_inline void
4100 nxt_unit_process_use(nxt_unit_process_t *process)
4101 {
4102     nxt_atomic_fetch_add(&process->use_count, 1);
4103 }
4104 
4105 
4106 nxt_inline void
4107 nxt_unit_process_release(nxt_unit_process_t *process)
4108 {
4109     long c;
4110 
4111     c = nxt_atomic_fetch_add(&process->use_count, -1);
4112 
4113     if (c == 1) {
4114         nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
4115 
4116         nxt_unit_free(NULL, process);
4117     }
4118 }
4119 
4120 
4121 static void
4122 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
4123 {
4124     nxt_unit_mmap_t  *mm, *end;
4125 
4126     if (mmaps->elts != NULL) {
4127         end = mmaps->elts + mmaps->size;
4128 
4129         for (mm = mmaps->elts; mm < end; mm++) {
4130             munmap(mm->hdr, PORT_MMAP_SIZE);
4131         }
4132 
4133         nxt_unit_free(NULL, mmaps->elts);
4134     }
4135 
4136     pthread_mutex_destroy(&mmaps->mutex);
4137 }
4138 
4139 
4140 static int
4141 nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
4142     pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
4143     nxt_unit_read_buf_t *rbuf)
4144 {
4145     int                  res, need_rbuf;
4146     nxt_unit_mmap_t      *mm;
4147     nxt_unit_ctx_impl_t  *ctx_impl;
4148 
4149     mm = nxt_unit_mmap_at(mmaps, id);
4150     if (nxt_slow_path(mm == NULL)) {
4151         nxt_unit_alert(ctx, "failed to allocate mmap");
4152 
4153         pthread_mutex_unlock(&mmaps->mutex);
4154 
4155         *hdr = NULL;
4156 
4157         return NXT_UNIT_ERROR;
4158     }
4159 
4160     *hdr = mm->hdr;
4161 
4162     if (nxt_fast_path(*hdr != NULL)) {
4163         return NXT_UNIT_OK;
4164     }
4165 
4166     need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf);
4167 
4168     nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link);
4169 
4170     pthread_mutex_unlock(&mmaps->mutex);
4171 
4172     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4173 
4174     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
4175 
4176     if (need_rbuf) {
4177         res = nxt_unit_get_mmap(ctx, pid, id);
4178         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
4179             return NXT_UNIT_ERROR;
4180         }
4181     }
4182 
4183     return NXT_UNIT_AGAIN;
4184 }
4185 
4186 
4187 static int
4188 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
4189     nxt_unit_read_buf_t *rbuf)
4190 {
4191     int                     res;
4192     void                    *start;
4193     uint32_t                size;
4194     nxt_unit_impl_t         *lib;
4195     nxt_unit_mmaps_t        *mmaps;
4196     nxt_unit_mmap_buf_t     *b, **incoming_tail;
4197     nxt_port_mmap_msg_t     *mmap_msg, *end;
4198     nxt_port_mmap_header_t  *hdr;
4199 
4200     if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
4201         nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
4202                       recv_msg->stream, (int) recv_msg->size);
4203 
4204         return NXT_UNIT_ERROR;
4205     }
4206 
4207     mmap_msg = recv_msg->start;
4208     end = nxt_pointer_to(recv_msg->start, recv_msg->size);
4209 
4210     incoming_tail = &recv_msg->incoming_buf;
4211 
4212     /* Allocating buffer structures. */
4213     for (; mmap_msg < end; mmap_msg++) {
4214         b = nxt_unit_mmap_buf_get(ctx);
4215         if (nxt_slow_path(b == NULL)) {
4216             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
4217                           recv_msg->stream);
4218 
4219             while (recv_msg->incoming_buf != NULL) {
4220                 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4221             }
4222 
4223             return NXT_UNIT_ERROR;
4224         }
4225 
4226         nxt_unit_mmap_buf_insert(incoming_tail, b);
4227         incoming_tail = &b->next;
4228     }
4229 
4230     b = recv_msg->incoming_buf;
4231     mmap_msg = recv_msg->start;
4232 
4233     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4234 
4235     mmaps = &lib->incoming;
4236 
4237     pthread_mutex_lock(&mmaps->mutex);
4238 
4239     for (; mmap_msg < end; mmap_msg++) {
4240         res = nxt_unit_check_rbuf_mmap(ctx, mmaps,
4241                                        recv_msg->pid, mmap_msg->mmap_id,
4242                                        &hdr, rbuf);
4243 
4244         if (nxt_slow_path(res != NXT_UNIT_OK)) {
4245             while (recv_msg->incoming_buf != NULL) {
4246                 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4247             }
4248 
4249             return res;
4250         }
4251 
4252         start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
4253         size = mmap_msg->size;
4254 
4255         if (recv_msg->start == mmap_msg) {
4256             recv_msg->start = start;
4257             recv_msg->size = size;
4258         }
4259 
4260         b->buf.start = start;
4261         b->buf.free = start;
4262         b->buf.end = b->buf.start + size;
4263         b->hdr = hdr;
4264 
4265         b = b->next;
4266 
4267         nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
4268                        recv_msg->stream,
4269                        start, (int) size,
4270                        (int) hdr->src_pid, (int) hdr->dst_pid,
4271                        (int) hdr->id, (int) mmap_msg->chunk_id,
4272                        (int) mmap_msg->size);
4273     }
4274 
4275     pthread_mutex_unlock(&mmaps->mutex);
4276 
4277     return NXT_UNIT_OK;
4278 }
4279 
4280 
4281 static int
4282 nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
4283 {
4284     ssize_t              res;
4285     nxt_unit_impl_t      *lib;
4286     nxt_unit_ctx_impl_t  *ctx_impl;
4287 
4288     struct {
4289         nxt_port_msg_t           msg;
4290         nxt_port_msg_get_mmap_t  get_mmap;
4291     } m;
4292 
4293     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4294     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4295 
4296     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
4297 
4298     m.msg.pid = lib->pid;
4299     m.msg.reply_port = ctx_impl->read_port->id.id;
4300     m.msg.type = _NXT_PORT_MSG_GET_MMAP;
4301 
4302     m.get_mmap.id = id;
4303 
4304     nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
4305 
4306     res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
4307     if (nxt_slow_path(res != sizeof(m))) {
4308         return NXT_UNIT_ERROR;
4309     }
4310 
4311     return NXT_UNIT_OK;
4312 }
4313 
4314 
4315 static void
4316 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr,
4317     void *start, uint32_t size)
4318 {
4319     int              freed_chunks;
4320     u_char           *p, *end;
4321     nxt_chunk_id_t   c;
4322     nxt_unit_impl_t  *lib;
4323 
4324     memset(start, 0xA5, size);
4325 
4326     p = start;
4327     end = p + size;
4328     c = nxt_port_mmap_chunk_id(hdr, p);
4329     freed_chunks = 0;
4330 
4331     while (p < end) {
4332         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
4333 
4334         p += PORT_MMAP_CHUNK_SIZE;
4335         c++;
4336         freed_chunks++;
4337     }
4338 
4339     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4340 
4341     if (hdr->src_pid == lib->pid && freed_chunks != 0) {
4342         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks);
4343 
4344         nxt_unit_debug(ctx, "allocated_chunks %d",
4345                        (int) lib->outgoing.allocated_chunks);
4346     }
4347 
4348     if (hdr->dst_pid == lib->pid
4349         && freed_chunks != 0
4350         && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
4351     {
4352         nxt_unit_send_shm_ack(ctx, hdr->src_pid);
4353     }
4354 }
4355 
4356 
4357 static int
4358 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
4359 {
4360     ssize_t          res;
4361     nxt_port_msg_t   msg;
4362     nxt_unit_impl_t  *lib;
4363 
4364     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4365 
4366     msg.stream = 0;
4367     msg.pid = lib->pid;
4368     msg.reply_port = 0;
4369     msg.type = _NXT_PORT_MSG_SHM_ACK;
4370     msg.last = 0;
4371     msg.mmap = 0;
4372     msg.nf = 0;
4373     msg.mf = 0;
4374     msg.tracking = 0;
4375 
4376     res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
4377     if (nxt_slow_path(res != sizeof(msg))) {
4378         return NXT_UNIT_ERROR;
4379     }
4380 
4381     return NXT_UNIT_OK;
4382 }
4383 
4384 
4385 static nxt_int_t
4386 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
4387 {
4388     nxt_process_t  *process;
4389 
4390     process = data;
4391 
4392     if (lhq->key.length == sizeof(pid_t)
4393         && *(pid_t *) lhq->key.start == process->pid)
4394     {
4395         return NXT_OK;
4396     }
4397 
4398     return NXT_DECLINED;
4399 }
4400 
4401 
4402 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
4403     NXT_LVLHSH_DEFAULT,
4404     nxt_unit_lvlhsh_pid_test,
4405     nxt_unit_lvlhsh_alloc,
4406     nxt_unit_lvlhsh_free,
4407 };
4408 
4409 
4410 static inline void
4411 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
4412 {
4413     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
4414     lhq->key.length = sizeof(*pid);
4415     lhq->key.start = (u_char *) pid;
4416     lhq->proto = &lvlhsh_processes_proto;
4417 }
4418 
4419 
4420 static nxt_unit_process_t *
4421 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
4422 {
4423     nxt_unit_impl_t     *lib;
4424     nxt_unit_process_t  *process;
4425     nxt_lvlhsh_query_t  lhq;
4426 
4427     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4428 
4429     nxt_unit_process_lhq_pid(&lhq, &pid);
4430 
4431     if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
4432         process = lhq.value;
4433         nxt_unit_process_use(process);
4434 
4435         return process;
4436     }
4437 
4438     process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t));
4439     if (nxt_slow_path(process == NULL)) {
4440         nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid);
4441 
4442         return NULL;
4443     }
4444 
4445     process->pid = pid;
4446     process->use_count = 2;
4447     process->next_port_id = 0;
4448     process->lib = lib;
4449 
4450     nxt_queue_init(&process->ports);
4451 
4452     lhq.replace = 0;
4453     lhq.value = process;
4454 
4455     switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
4456 
4457     case NXT_OK:
4458         break;
4459 
4460     default:
4461         nxt_unit_alert(ctx, "process %d insert failed", (int) pid);
4462 
4463         nxt_unit_free(ctx, process);
4464         process = NULL;
4465         break;
4466     }
4467 
4468     return process;
4469 }
4470 
4471 
4472 static nxt_unit_process_t *
4473 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
4474 {
4475     int                 rc;
4476     nxt_lvlhsh_query_t  lhq;
4477 
4478     nxt_unit_process_lhq_pid(&lhq, &pid);
4479 
4480     if (remove) {
4481         rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
4482 
4483     } else {
4484         rc = nxt_lvlhsh_find(&lib->processes, &lhq);
4485     }
4486 
4487     if (rc == NXT_OK) {
4488         if (!remove) {
4489             nxt_unit_process_use(lhq.value);
4490         }
4491 
4492         return lhq.value;
4493     }
4494 
4495     return NULL;
4496 }
4497 
4498 
4499 static nxt_unit_process_t *
4500 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
4501 {
4502     return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
4503 }
4504 
4505 
4506 int
4507 nxt_unit_run(nxt_unit_ctx_t *ctx)
4508 {
4509     int                  rc;
4510     nxt_unit_ctx_impl_t  *ctx_impl;
4511 
4512     nxt_unit_ctx_use(ctx);
4513 
4514     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4515 
4516     rc = NXT_UNIT_OK;
4517 
4518     while (nxt_fast_path(ctx_impl->online)) {
4519         rc = nxt_unit_run_once_impl(ctx);
4520 
4521         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4522             nxt_unit_quit(ctx);
4523             break;
4524         }
4525     }
4526 
4527     nxt_unit_ctx_release(ctx);
4528 
4529     return rc;
4530 }
4531 
4532 
4533 int
4534 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
4535 {
4536     int  rc;
4537 
4538     nxt_unit_ctx_use(ctx);
4539 
4540     rc = nxt_unit_run_once_impl(ctx);
4541 
4542     nxt_unit_ctx_release(ctx);
4543 
4544     return rc;
4545 }
4546 
4547 
4548 static int
4549 nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
4550 {
4551     int                  rc;
4552     nxt_unit_read_buf_t  *rbuf;
4553 
4554     rbuf = nxt_unit_read_buf_get(ctx);
4555     if (nxt_slow_path(rbuf == NULL)) {
4556         return NXT_UNIT_ERROR;
4557     }
4558 
4559     rc = nxt_unit_read_buf(ctx, rbuf);
4560     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4561         nxt_unit_read_buf_release(ctx, rbuf);
4562 
4563         return rc;
4564     }
4565 
4566     rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4567     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4568         return NXT_UNIT_ERROR;
4569     }
4570 
4571     rc = nxt_unit_process_pending_rbuf(ctx);
4572     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4573         return NXT_UNIT_ERROR;
4574     }
4575 
4576     nxt_unit_process_ready_req(ctx);
4577 
4578     return rc;
4579 }
4580 
4581 
4582 static int
4583 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4584 {
4585     int                   nevents, res, err;
4586     nxt_unit_impl_t       *lib;
4587     nxt_unit_ctx_impl_t   *ctx_impl;
4588     nxt_unit_port_impl_t  *port_impl;
4589     struct pollfd         fds[2];
4590 
4591     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4592 
4593     if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) {
4594         return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4595     }
4596 
4597     port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
4598                                  port);
4599 
4600     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4601 
4602 retry:
4603 
4604     if (port_impl->from_socket == 0) {
4605         res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf);
4606         if (res == NXT_UNIT_OK) {
4607             if (nxt_unit_is_read_socket(rbuf)) {
4608                 port_impl->from_socket++;
4609 
4610                 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
4611                                (int) ctx_impl->read_port->id.pid,
4612                                (int) ctx_impl->read_port->id.id,
4613                                port_impl->from_socket);
4614 
4615             } else {
4616                 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
4617                                (int) ctx_impl->read_port->id.pid,
4618                                (int) ctx_impl->read_port->id.id,
4619                                (int) rbuf->size);
4620 
4621                 return NXT_UNIT_OK;
4622             }
4623         }
4624     }
4625 
4626     res = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
4627     if (res == NXT_UNIT_OK) {
4628         return NXT_UNIT_OK;
4629     }
4630 
4631     fds[0].fd = ctx_impl->read_port->in_fd;
4632     fds[0].events = POLLIN;
4633     fds[0].revents = 0;
4634 
4635     fds[1].fd = lib->shared_port->in_fd;
4636     fds[1].events = POLLIN;
4637     fds[1].revents = 0;
4638 
4639     nevents = poll(fds, 2, -1);
4640     if (nxt_slow_path(nevents == -1)) {
4641         err = errno;
4642 
4643         if (err == EINTR) {
4644             goto retry;
4645         }
4646 
4647         nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",
4648                        fds[0].fd, fds[1].fd, strerror(err), err);
4649 
4650         rbuf->size = -1;
4651 
4652         return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
4653     }
4654 
4655     nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]",
4656                    fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4657                    fds[1].revents);
4658 
4659     if ((fds[0].revents & POLLIN) != 0) {
4660         res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4661         if (res == NXT_UNIT_AGAIN) {
4662             goto retry;
4663         }
4664 
4665         return res;
4666     }
4667 
4668     if ((fds[1].revents & POLLIN) != 0) {
4669         res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4670         if (res == NXT_UNIT_AGAIN) {
4671             goto retry;
4672         }
4673 
4674         return res;
4675     }
4676 
4677     nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]",
4678                    fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4679                    fds[1].revents);
4680 
4681     return NXT_UNIT_ERROR;
4682 }
4683 
4684 
4685 static int
4686 nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
4687 {
4688     int                  rc;
4689     nxt_queue_t          pending_rbuf;
4690     nxt_unit_ctx_impl_t  *ctx_impl;
4691     nxt_unit_read_buf_t  *rbuf;
4692 
4693     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4694 
4695     pthread_mutex_lock(&ctx_impl->mutex);
4696 
4697     if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
4698         pthread_mutex_unlock(&ctx_impl->mutex);
4699 
4700         return NXT_UNIT_OK;
4701     }
4702 
4703     nxt_queue_init(&pending_rbuf);
4704 
4705     nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf);
4706     nxt_queue_init(&ctx_impl->pending_rbuf);
4707 
4708     pthread_mutex_unlock(&ctx_impl->mutex);
4709 
4710     rc = NXT_UNIT_OK;
4711 
4712     nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
4713 
4714         if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
4715             rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL);
4716 
4717         } else {
4718             nxt_unit_read_buf_release(ctx, rbuf);
4719         }
4720 
4721     } nxt_queue_loop;
4722 
4723     return rc;
4724 }
4725 
4726 
4727 static void
4728 nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
4729 {
4730     int                           res;
4731     nxt_queue_t                   ready_req;
4732     nxt_unit_impl_t               *lib;
4733     nxt_unit_ctx_impl_t           *ctx_impl;
4734     nxt_unit_request_info_t       *req;
4735     nxt_unit_request_info_impl_t  *req_impl;
4736 
4737     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4738 
4739     pthread_mutex_lock(&ctx_impl->mutex);
4740 
4741     if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
4742         pthread_mutex_unlock(&ctx_impl->mutex);
4743 
4744         return;
4745     }
4746 
4747     nxt_queue_init(&ready_req);
4748 
4749     nxt_queue_add(&ready_req, &ctx_impl->ready_req);
4750     nxt_queue_init(&ctx_impl->ready_req);
4751 
4752     pthread_mutex_unlock(&ctx_impl->mutex);
4753 
4754     nxt_queue_each(req_impl, &ready_req,
4755                    nxt_unit_request_info_impl_t, port_wait_link)
4756     {
4757         lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
4758 
4759         req = &req_impl->req;
4760 
4761         res = nxt_unit_send_req_headers_ack(req);
4762         if (nxt_slow_path(res != NXT_UNIT_OK)) {
4763             nxt_unit_request_done(req, NXT_UNIT_ERROR);
4764 
4765             continue;
4766         }
4767 
4768         if (req->content_length
4769             > (uint64_t) (req->content_buf->end - req->content_buf->free))
4770         {
4771             res = nxt_unit_request_hash_add(ctx, req);
4772             if (nxt_slow_path(res != NXT_UNIT_OK)) {
4773                 nxt_unit_req_warn(req, "failed to add request to hash");
4774 
4775                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
4776 
4777                 continue;
4778             }
4779 
4780             /*
4781              * If application have separate data handler, we may start
4782              * request processing and process data when it is arrived.
4783              */
4784             if (lib->callbacks.data_handler == NULL) {
4785                 continue;
4786             }
4787         }
4788 
4789         lib->callbacks.request_handler(&req_impl->req);
4790 
4791     } nxt_queue_loop;
4792 }
4793 
4794 
4795 int
4796 nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
4797 {
4798     int                  rc;
4799     nxt_unit_read_buf_t  *rbuf;
4800     nxt_unit_ctx_impl_t  *ctx_impl;
4801 
4802     nxt_unit_ctx_use(ctx);
4803 
4804     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4805 
4806     rc = NXT_UNIT_OK;
4807 
4808     while (nxt_fast_path(ctx_impl->online)) {
4809         rbuf = nxt_unit_read_buf_get(ctx);
4810         if (nxt_slow_path(rbuf == NULL)) {
4811             rc = NXT_UNIT_ERROR;
4812             break;
4813         }
4814 
4815     retry:
4816 
4817         rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4818         if (rc == NXT_UNIT_AGAIN) {
4819             goto retry;
4820         }
4821 
4822         rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4823         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4824             break;
4825         }
4826 
4827         rc = nxt_unit_process_pending_rbuf(ctx);
4828         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4829             break;
4830         }
4831 
4832         nxt_unit_process_ready_req(ctx);
4833     }
4834 
4835     nxt_unit_ctx_release(ctx);
4836 
4837     return rc;
4838 }
4839 
4840 
4841 nxt_inline int
4842 nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf)
4843 {
4844     nxt_port_msg_t  *port_msg;
4845 
4846     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4847         port_msg = (nxt_port_msg_t *) rbuf->buf;
4848 
4849         return port_msg->type == _NXT_PORT_MSG_READ_QUEUE;
4850     }
4851 
4852     return 0;
4853 }
4854 
4855 
4856 nxt_inline int
4857 nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf)
4858 {
4859     if (nxt_fast_path(rbuf->size == 1)) {
4860         return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET;
4861     }
4862 
4863     return 0;
4864 }
4865 
4866 
4867 nxt_inline int
4868 nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf)
4869 {
4870     nxt_port_msg_t  *port_msg;
4871 
4872     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4873         port_msg = (nxt_port_msg_t *) rbuf->buf;
4874 
4875         return port_msg->type == _NXT_PORT_MSG_SHM_ACK;
4876     }
4877 
4878     return 0;
4879 }
4880 
4881 
4882 nxt_inline int
4883 nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf)
4884 {
4885     nxt_port_msg_t  *port_msg;
4886 
4887     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4888         port_msg = (nxt_port_msg_t *) rbuf->buf;
4889 
4890         return port_msg->type == _NXT_PORT_MSG_QUIT;
4891     }
4892 
4893     return 0;
4894 }
4895 
4896 
4897 int
4898 nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
4899 {
4900     int                  rc;
4901     nxt_unit_impl_t      *lib;
4902     nxt_unit_read_buf_t  *rbuf;
4903     nxt_unit_ctx_impl_t  *ctx_impl;
4904 
4905     nxt_unit_ctx_use(ctx);
4906 
4907     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4908     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4909 
4910     rc = NXT_UNIT_OK;
4911 
4912     while (nxt_fast_path(ctx_impl->online)) {
4913         rbuf = nxt_unit_read_buf_get(ctx);
4914         if (nxt_slow_path(rbuf == NULL)) {
4915             rc = NXT_UNIT_ERROR;
4916             break;
4917         }
4918 
4919     retry:
4920 
4921         rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4922         if (rc == NXT_UNIT_AGAIN) {
4923             goto retry;
4924         }
4925 
4926         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4927             nxt_unit_read_buf_release(ctx, rbuf);
4928             break;
4929         }
4930 
4931         rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4932         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4933             break;
4934         }
4935     }
4936 
4937     nxt_unit_ctx_release(ctx);
4938 
4939     return rc;
4940 }
4941 
4942 
4943 nxt_unit_request_info_t *
4944 nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
4945 {
4946     int                      rc;
4947     nxt_unit_impl_t          *lib;
4948     nxt_unit_read_buf_t      *rbuf;
4949     nxt_unit_ctx_impl_t      *ctx_impl;
4950     nxt_unit_request_info_t  *req;
4951 
4952     nxt_unit_ctx_use(ctx);
4953 
4954     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4955     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4956 
4957     req = NULL;
4958 
4959     if (nxt_slow_path(!ctx_impl->online)) {
4960         goto done;
4961     }
4962 
4963     rbuf = nxt_unit_read_buf_get(ctx);
4964     if (nxt_slow_path(rbuf == NULL)) {
4965         goto done;
4966     }
4967 
4968     rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
4969     if (rc != NXT_UNIT_OK) {
4970         nxt_unit_read_buf_release(ctx, rbuf);
4971         goto done;
4972     }
4973 
4974     (void) nxt_unit_process_msg(ctx, rbuf, &req);
4975 
4976 done:
4977 
4978     nxt_unit_ctx_release(ctx);
4979 
4980     return req;
4981 }
4982 
4983 
4984 int
4985 nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx)
4986 {
4987     nxt_unit_impl_t  *lib;
4988 
4989     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4990 
4991     return (ctx == &lib->main_ctx.ctx);
4992 }
4993 
4994 
4995 int
4996 nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
4997 {
4998     int  rc;
4999 
5000     nxt_unit_ctx_use(ctx);
5001 
5002     rc = nxt_unit_process_port_msg_impl(ctx, port);
5003 
5004     nxt_unit_ctx_release(ctx);
5005 
5006     return rc;
5007 }
5008 
5009 
5010 static int
5011 nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5012 {
5013     int                  rc;
5014     nxt_unit_impl_t      *lib;
5015     nxt_unit_read_buf_t  *rbuf;
5016     nxt_unit_ctx_impl_t  *ctx_impl;
5017 
5018     rbuf = nxt_unit_read_buf_get(ctx);
5019     if (nxt_slow_path(rbuf == NULL)) {
5020         return NXT_UNIT_ERROR;
5021     }
5022 
5023     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5024     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5025 
5026 retry:
5027 
5028     if (port == lib->shared_port) {
5029         rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
5030 
5031     } else {
5032         rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
5033     }
5034 
5035     if (rc != NXT_UNIT_OK) {
5036         nxt_unit_read_buf_release(ctx, rbuf);
5037         return rc;
5038     }
5039 
5040     rc = nxt_unit_process_msg(ctx, rbuf, NULL);
5041     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
5042         return NXT_UNIT_ERROR;
5043     }
5044 
5045     rc = nxt_unit_process_pending_rbuf(ctx);
5046     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
5047         return NXT_UNIT_ERROR;
5048     }
5049 
5050     nxt_unit_process_ready_req(ctx);
5051 
5052     if (ctx_impl->online) {
5053         rbuf = nxt_unit_read_buf_get(ctx);
5054         if (nxt_slow_path(rbuf == NULL)) {
5055             return NXT_UNIT_ERROR;
5056         }
5057 
5058         goto retry;
5059     }
5060 
5061     return rc;
5062 }
5063 
5064 
5065 void
5066 nxt_unit_done(nxt_unit_ctx_t *ctx)
5067 {
5068     nxt_unit_ctx_release(ctx);
5069 }
5070 
5071 
5072 nxt_unit_ctx_t *
5073 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
5074 {
5075     int                   rc, queue_fd;
5076     void                  *mem;
5077     nxt_unit_impl_t       *lib;
5078     nxt_unit_port_t       *port;
5079     nxt_unit_ctx_impl_t   *new_ctx;
5080     nxt_unit_port_impl_t  *port_impl;
5081 
5082     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5083 
5084     new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t)
5085                                    + lib->request_data_size);
5086     if (nxt_slow_path(new_ctx == NULL)) {
5087         nxt_unit_alert(ctx, "failed to allocate context");
5088 
5089         return NULL;
5090     }
5091 
5092     rc = nxt_unit_ctx_init(lib, new_ctx, data);
5093     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5094          nxt_unit_free(ctx, new_ctx);
5095 
5096          return NULL;
5097     }
5098 
5099     queue_fd = -1;
5100 
5101     port = nxt_unit_create_port(&new_ctx->ctx);
5102     if (nxt_slow_path(port == NULL)) {
5103         goto fail;
5104     }
5105 
5106     new_ctx->read_port = port;
5107 
5108     queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t));
5109     if (nxt_slow_path(queue_fd == -1)) {
5110         goto fail;
5111     }
5112 
5113     mem = mmap(NULL, sizeof(nxt_port_queue_t),
5114                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
5115     if (nxt_slow_path(mem == MAP_FAILED)) {
5116         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
5117                        strerror(errno), errno);
5118 
5119         goto fail;
5120     }
5121 
5122     nxt_port_queue_init(mem);
5123 
5124     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5125     port_impl->queue = mem;
5126 
5127     rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd);
5128     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5129         goto fail;
5130     }
5131 
5132     nxt_unit_close(queue_fd);
5133 
5134     return &new_ctx->ctx;
5135 
5136 fail:
5137 
5138     if (queue_fd != -1) {
5139         nxt_unit_close(queue_fd);
5140     }
5141 
5142     nxt_unit_ctx_release(&new_ctx->ctx);
5143 
5144     return NULL;
5145 }
5146 
5147 
5148 static void
5149 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
5150 {
5151     nxt_unit_impl_t                  *lib;
5152     nxt_unit_mmap_buf_t              *mmap_buf;
5153     nxt_unit_read_buf_t              *rbuf;
5154     nxt_unit_request_info_impl_t     *req_impl;
5155     nxt_unit_websocket_frame_impl_t  *ws_impl;
5156 
5157     lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
5158 
5159     nxt_queue_each(req_impl, &ctx_impl->active_req,
5160                    nxt_unit_request_info_impl_t, link)
5161     {
5162         nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
5163 
5164         nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
5165 
5166     } nxt_queue_loop;
5167 
5168     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]);
5169     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]);
5170 
5171     while (ctx_impl->free_buf != NULL) {
5172         mmap_buf = ctx_impl->free_buf;
5173         nxt_unit_mmap_buf_unlink(mmap_buf);
5174         nxt_unit_free(&ctx_impl->ctx, mmap_buf);
5175     }
5176 
5177     nxt_queue_each(req_impl, &ctx_impl->free_req,
5178                    nxt_unit_request_info_impl_t, link)
5179     {
5180         nxt_unit_request_info_free(req_impl);
5181 
5182     } nxt_queue_loop;
5183 
5184     nxt_queue_each(ws_impl, &ctx_impl->free_ws,
5185                    nxt_unit_websocket_frame_impl_t, link)
5186     {
5187         nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl);
5188 
5189     } nxt_queue_loop;
5190 
5191     nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link)
5192     {
5193         if (rbuf != &ctx_impl->ctx_read_buf) {
5194             nxt_unit_free(&ctx_impl->ctx, rbuf);
5195         }
5196     } nxt_queue_loop;
5197 
5198     pthread_mutex_destroy(&ctx_impl->mutex);
5199 
5200     pthread_mutex_lock(&lib->mutex);
5201 
5202     nxt_queue_remove(&ctx_impl->link);
5203 
5204     pthread_mutex_unlock(&lib->mutex);
5205 
5206     if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5207         nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
5208         nxt_unit_port_release(ctx_impl->read_port);
5209     }
5210 
5211     if (ctx_impl != &lib->main_ctx) {
5212         nxt_unit_free(&lib->main_ctx.ctx, ctx_impl);
5213     }
5214 
5215     nxt_unit_lib_release(lib);
5216 }
5217 
5218 
5219 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
5220 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
5221 #define NXT_UNIX_SOCKET  SOCK_SEQPACKET
5222 #else
5223 #define NXT_UNIX_SOCKET  SOCK_DGRAM
5224 #endif
5225 
5226 
5227 void
5228 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
5229 {
5230     nxt_unit_port_hash_id_t  port_hash_id;
5231 
5232     port_hash_id.pid = pid;
5233     port_hash_id.id = id;
5234 
5235     port_id->pid = pid;
5236     port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
5237     port_id->id = id;
5238 }
5239 
5240 
5241 static nxt_unit_port_t *
5242 nxt_unit_create_port(nxt_unit_ctx_t *ctx)
5243 {
5244     int                 rc, port_sockets[2];
5245     nxt_unit_impl_t     *lib;
5246     nxt_unit_port_t     new_port, *port;
5247     nxt_unit_process_t  *process;
5248 
5249     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5250 
5251     rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
5252     if (nxt_slow_path(rc != 0)) {
5253         nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
5254                       strerror(errno), errno);
5255 
5256         return NULL;
5257     }
5258 
5259     nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
5260                    port_sockets[0], port_sockets[1]);
5261 
5262     pthread_mutex_lock(&lib->mutex);
5263 
5264     process = nxt_unit_process_get(ctx, lib->pid);
5265     if (nxt_slow_path(process == NULL)) {
5266         pthread_mutex_unlock(&lib->mutex);
5267 
5268         nxt_unit_close(port_sockets[0]);
5269         nxt_unit_close(port_sockets[1]);
5270 
5271         return NULL;
5272     }
5273 
5274     nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
5275 
5276     new_port.in_fd = port_sockets[0];
5277     new_port.out_fd = port_sockets[1];
5278     new_port.data = NULL;
5279 
5280     pthread_mutex_unlock(&lib->mutex);
5281 
5282     nxt_unit_process_release(process);
5283 
5284     port = nxt_unit_add_port(ctx, &new_port, NULL);
5285     if (nxt_slow_path(port == NULL)) {
5286         nxt_unit_close(port_sockets[0]);
5287         nxt_unit_close(port_sockets[1]);
5288     }
5289 
5290     return port;
5291 }
5292 
5293 
5294 static int
5295 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
5296     nxt_unit_port_t *port, int queue_fd)
5297 {
5298     ssize_t          res;
5299     nxt_unit_impl_t  *lib;
5300     int              fds[2] = { port->out_fd, queue_fd };
5301 
5302     struct {
5303         nxt_port_msg_t            msg;
5304         nxt_port_msg_new_port_t   new_port;
5305     } m;
5306 
5307     union {
5308         struct cmsghdr  cm;
5309         char            space[CMSG_SPACE(sizeof(int) * 2)];
5310     } cmsg;
5311 
5312     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5313 
5314     m.msg.stream = 0;
5315     m.msg.pid = lib->pid;
5316     m.msg.reply_port = 0;
5317     m.msg.type = _NXT_PORT_MSG_NEW_PORT;
5318     m.msg.last = 0;
5319     m.msg.mmap = 0;
5320     m.msg.nf = 0;
5321     m.msg.mf = 0;
5322     m.msg.tracking = 0;
5323 
5324     m.new_port.id = port->id.id;
5325     m.new_port.pid = port->id.pid;
5326     m.new_port.type = NXT_PROCESS_APP;
5327     m.new_port.max_size = 16 * 1024;
5328     m.new_port.max_share = 64 * 1024;
5329 
5330     memset(&cmsg, 0, sizeof(cmsg));
5331 
5332     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2);
5333     cmsg.cm.cmsg_level = SOL_SOCKET;
5334     cmsg.cm.cmsg_type = SCM_RIGHTS;
5335 
5336     /*
5337      * memcpy() is used instead of simple
5338      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
5339      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
5340      *   dereferencing type-punned pointer will break strict-aliasing rules
5341      *
5342      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
5343      * in the same simple assignment as in the code above.
5344      */
5345     memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2);
5346 
5347     res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
5348 
5349     return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
5350 }
5351 
5352 
5353 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port)
5354 {
5355     nxt_unit_port_impl_t  *port_impl;
5356 
5357     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5358 
5359     nxt_atomic_fetch_add(&port_impl->use_count, 1);
5360 }
5361 
5362 
5363 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
5364 {
5365     long                  c;
5366     nxt_unit_port_impl_t  *port_impl;
5367 
5368     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5369 
5370     c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
5371 
5372     if (c == 1) {
5373         nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d",
5374                        (int) port->id.pid, (int) port->id.id,
5375                        port->in_fd, port->out_fd);
5376 
5377         nxt_unit_process_release(port_impl->process);
5378 
5379         if (port->in_fd != -1) {
5380             nxt_unit_close(port->in_fd);
5381 
5382             port->in_fd = -1;
5383         }
5384 
5385         if (port->out_fd != -1) {
5386             nxt_unit_close(port->out_fd);
5387 
5388             port->out_fd = -1;
5389         }
5390 
5391         if (port_impl->queue != NULL) {
5392             munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID)
5393                                      ? sizeof(nxt_app_queue_t)
5394                                      : sizeof(nxt_port_queue_t));
5395         }
5396 
5397         nxt_unit_free(NULL, port_impl);
5398     }
5399 }
5400 
5401 
5402 static nxt_unit_port_t *
5403 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
5404 {
5405     int                   rc, ready;
5406     nxt_queue_t           awaiting_req;
5407     nxt_unit_impl_t       *lib;
5408     nxt_unit_port_t       *old_port;
5409     nxt_unit_process_t    *process;
5410     nxt_unit_port_impl_t  *new_port, *old_port_impl;
5411 
5412     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5413 
5414     pthread_mutex_lock(&lib->mutex);
5415 
5416     old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
5417 
5418     if (nxt_slow_path(old_port != NULL)) {
5419         nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} "
5420                             "in_fd %d out_fd %d queue %p",
5421                             port->id.pid, port->id.id,
5422                             port->in_fd, port->out_fd, queue);
5423 
5424         if (old_port->data == NULL) {
5425             old_port->data = port->data;
5426             port->data = NULL;
5427         }
5428 
5429         if (old_port->in_fd == -1) {
5430             old_port->in_fd = port->in_fd;
5431             port->in_fd = -1;
5432         }
5433 
5434         if (port->in_fd != -1) {
5435             nxt_unit_close(port->in_fd);
5436             port->in_fd = -1;
5437         }
5438 
5439         if (old_port->out_fd == -1) {
5440             old_port->out_fd = port->out_fd;
5441             port->out_fd = -1;
5442         }
5443 
5444         if (port->out_fd != -1) {
5445             nxt_unit_close(port->out_fd);
5446             port->out_fd = -1;
5447         }
5448 
5449         *port = *old_port;
5450 
5451         nxt_queue_init(&awaiting_req);
5452 
5453         old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
5454 
5455         if (old_port_impl->queue == NULL) {
5456             old_port_impl->queue = queue;
5457         }
5458 
5459         ready = (port->in_fd != -1 || port->out_fd != -1);
5460 
5461         /*
5462          * Port can be market as 'ready' only after callbacks.add_port() call.
5463          * Otherwise, request may try to use the port before callback.
5464          */
5465         if (lib->callbacks.add_port == NULL && ready) {
5466             old_port_impl->ready = ready;
5467 
5468             if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5469                 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5470                 nxt_queue_init(&old_port_impl->awaiting_req);
5471             }
5472         }
5473 
5474         pthread_mutex_unlock(&lib->mutex);
5475 
5476         if (lib->callbacks.add_port != NULL && ready) {
5477             lib->callbacks.add_port(ctx, old_port);
5478 
5479             pthread_mutex_lock(&lib->mutex);
5480 
5481             old_port_impl->ready = ready;
5482 
5483             if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5484                 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5485                 nxt_queue_init(&old_port_impl->awaiting_req);
5486             }
5487 
5488             pthread_mutex_unlock(&lib->mutex);
5489         }
5490 
5491         nxt_unit_process_awaiting_req(ctx, &awaiting_req);
5492 
5493         return old_port;
5494     }
5495 
5496     new_port = NULL;
5497     ready = 0;
5498 
5499     nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
5500                    port->id.pid, port->id.id,
5501                    port->in_fd, port->out_fd, queue);
5502 
5503     process = nxt_unit_process_get(ctx, port->id.pid);
5504     if (nxt_slow_path(process == NULL)) {
5505         goto unlock;
5506     }
5507 
5508     if (port->id.id != NXT_UNIT_SHARED_PORT_ID
5509         && port->id.id >= process->next_port_id)
5510     {
5511         process->next_port_id = port->id.id + 1;
5512     }
5513 
5514     new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
5515     if (nxt_slow_path(new_port == NULL)) {
5516         nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
5517                        port->id.pid, port->id.id);
5518 
5519         goto unlock;
5520     }
5521 
5522     new_port->port = *port;
5523 
5524     rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
5525     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5526         nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
5527                        port->id.pid, port->id.id);
5528 
5529         nxt_unit_free(ctx, new_port);
5530 
5531         new_port = NULL;
5532 
5533         goto unlock;
5534     }
5535 
5536     nxt_queue_insert_tail(&process->ports, &new_port->link);
5537 
5538     new_port->use_count = 2;
5539     new_port->process = process;
5540     new_port->queue = queue;
5541     new_port->from_socket = 0;
5542     new_port->socket_rbuf = NULL;
5543 
5544     nxt_queue_init(&new_port->awaiting_req);
5545 
5546     ready = (port->in_fd != -1 || port->out_fd != -1);
5547 
5548     if (lib->callbacks.add_port == NULL) {
5549         new_port->ready = ready;
5550 
5551     } else {
5552         new_port->ready = 0;
5553     }
5554 
5555     process = NULL;
5556 
5557 unlock:
5558 
5559     pthread_mutex_unlock(&lib->mutex);
5560 
5561     if (nxt_slow_path(process != NULL)) {
5562         nxt_unit_process_release(process);
5563     }
5564 
5565     if (lib->callbacks.add_port != NULL && new_port != NULL && ready) {
5566         lib->callbacks.add_port(ctx, &new_port->port);
5567 
5568         nxt_queue_init(&awaiting_req);
5569 
5570         pthread_mutex_lock(&lib->mutex);
5571 
5572         new_port->ready = 1;
5573 
5574         if (!nxt_queue_is_empty(&new_port->awaiting_req)) {
5575             nxt_queue_add(&awaiting_req, &new_port->awaiting_req);
5576             nxt_queue_init(&new_port->awaiting_req);
5577         }
5578 
5579         pthread_mutex_unlock(&lib->mutex);
5580 
5581         nxt_unit_process_awaiting_req(ctx, &awaiting_req);
5582     }
5583 
5584     return (new_port == NULL) ? NULL : &new_port->port;
5585 }
5586 
5587 
5588 static void
5589 nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req)
5590 {
5591     nxt_unit_ctx_impl_t           *ctx_impl;
5592     nxt_unit_request_info_impl_t  *req_impl;
5593 
5594     nxt_queue_each(req_impl, awaiting_req,
5595                    nxt_unit_request_info_impl_t, port_wait_link)
5596     {
5597         nxt_queue_remove(&req_impl->port_wait_link);
5598 
5599         ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
5600                                     ctx);
5601 
5602         pthread_mutex_lock(&ctx_impl->mutex);
5603 
5604         nxt_queue_insert_tail(&ctx_impl->ready_req,
5605                               &req_impl->port_wait_link);
5606 
5607         pthread_mutex_unlock(&ctx_impl->mutex);
5608 
5609         nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
5610 
5611         nxt_unit_awake_ctx(ctx, ctx_impl);
5612 
5613     } nxt_queue_loop;
5614 }
5615 
5616 
5617 static void
5618 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5619 {
5620     nxt_unit_port_t       *port;
5621     nxt_unit_port_impl_t  *port_impl;
5622 
5623     pthread_mutex_lock(&lib->mutex);
5624 
5625     port = nxt_unit_remove_port_unsafe(lib, port_id);
5626 
5627     if (nxt_fast_path(port != NULL)) {
5628         port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5629 
5630         nxt_queue_remove(&port_impl->link);
5631     }
5632 
5633     pthread_mutex_unlock(&lib->mutex);
5634 
5635     if (lib->callbacks.remove_port != NULL && port != NULL) {
5636         lib->callbacks.remove_port(&lib->unit, port);
5637     }
5638 
5639     if (nxt_fast_path(port != NULL)) {
5640         nxt_unit_port_release(port);
5641     }
5642 }
5643 
5644 
5645 static nxt_unit_port_t *
5646 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5647 {
5648     nxt_unit_port_t  *port;
5649 
5650     port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
5651     if (nxt_slow_path(port == NULL)) {
5652         nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found",
5653                        (int) port_id->pid, (int) port_id->id);
5654 
5655         return NULL;
5656     }
5657 
5658     nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p",
5659                    (int) port_id->pid, (int) port_id->id,
5660                    port->in_fd, port->out_fd, port->data);
5661 
5662     return port;
5663 }
5664 
5665 
5666 static void
5667 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid)
5668 {
5669     nxt_unit_process_t  *process;
5670 
5671     pthread_mutex_lock(&lib->mutex);
5672 
5673     process = nxt_unit_process_find(lib, pid, 1);
5674     if (nxt_slow_path(process == NULL)) {
5675         nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid);
5676 
5677         pthread_mutex_unlock(&lib->mutex);
5678 
5679         return;
5680     }
5681 
5682     nxt_unit_remove_process(lib, process);
5683 
5684     if (lib->callbacks.remove_pid != NULL) {
5685         lib->callbacks.remove_pid(&lib->unit, pid);
5686     }
5687 }
5688 
5689 
5690 static void
5691 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
5692 {
5693     nxt_queue_t           ports;
5694     nxt_unit_port_impl_t  *port;
5695 
5696     nxt_queue_init(&ports);
5697 
5698     nxt_queue_add(&ports, &process->ports);
5699 
5700     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5701 
5702         nxt_unit_remove_port_unsafe(lib, &port->port.id);
5703 
5704     } nxt_queue_loop;
5705 
5706     pthread_mutex_unlock(&lib->mutex);
5707 
5708     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5709 
5710         nxt_queue_remove(&port->link);
5711 
5712         if (lib->callbacks.remove_port != NULL) {
5713             lib->callbacks.remove_port(&lib->unit, &port->port);
5714         }
5715 
5716         nxt_unit_port_release(&port->port);
5717 
5718     } nxt_queue_loop;
5719 
5720     nxt_unit_process_release(process);
5721 }
5722 
5723 
5724 static void
5725 nxt_unit_quit(nxt_unit_ctx_t *ctx)
5726 {
5727     nxt_port_msg_t                msg;
5728     nxt_unit_impl_t               *lib;
5729     nxt_unit_ctx_impl_t           *ctx_impl;
5730     nxt_unit_callbacks_t          *cb;
5731     nxt_unit_request_info_t       *req;
5732     nxt_unit_request_info_impl_t  *req_impl;
5733 
5734     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5735     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5736 
5737     if (!ctx_impl->online) {
5738         return;
5739     }
5740 
5741     ctx_impl->online = 0;
5742 
5743     cb = &lib->callbacks;
5744 
5745     if (cb->quit != NULL) {
5746         cb->quit(ctx);
5747     }
5748 
5749     nxt_queue_each(req_impl, &ctx_impl->active_req,
5750                    nxt_unit_request_info_impl_t, link)
5751     {
5752         req = &req_impl->req;
5753 
5754         nxt_unit_req_warn(req, "active request on ctx quit");
5755 
5756         if (cb->close_handler) {
5757             nxt_unit_req_debug(req, "close_handler");
5758 
5759             cb->close_handler(req);
5760 
5761         } else {
5762             nxt_unit_request_done(req, NXT_UNIT_ERROR);
5763         }
5764 
5765     } nxt_queue_loop;
5766 
5767     if (ctx != &lib->main_ctx.ctx) {
5768         return;
5769     }
5770 
5771     memset(&msg, 0, sizeof(nxt_port_msg_t));
5772 
5773     msg.pid = lib->pid;
5774     msg.type = _NXT_PORT_MSG_QUIT;
5775 
5776     pthread_mutex_lock(&lib->mutex);
5777 
5778     nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
5779 
5780         if (ctx == &ctx_impl->ctx
5781             || ctx_impl->read_port == NULL
5782             || ctx_impl->read_port->out_fd == -1)
5783         {
5784             continue;
5785         }
5786 
5787         (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
5788                                   &msg, sizeof(msg), NULL, 0);
5789 
5790     } nxt_queue_loop;
5791 
5792     pthread_mutex_unlock(&lib->mutex);
5793 }
5794 
5795 
5796 static int
5797 nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
5798 {
5799     ssize_t              res;
5800     nxt_unit_impl_t      *lib;
5801     nxt_unit_ctx_impl_t  *ctx_impl;
5802 
5803     struct {
5804         nxt_port_msg_t           msg;
5805         nxt_port_msg_get_port_t  get_port;
5806     } m;
5807 
5808     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5809     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5810 
5811     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
5812 
5813     m.msg.pid = lib->pid;
5814     m.msg.reply_port = ctx_impl->read_port->id.id;
5815     m.msg.type = _NXT_PORT_MSG_GET_PORT;
5816 
5817     m.get_port.id = port_id->id;
5818     m.get_port.pid = port_id->pid;
5819 
5820     nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
5821                    (int) port_id->id);
5822 
5823     res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
5824     if (nxt_slow_path(res != sizeof(m))) {
5825         return NXT_UNIT_ERROR;
5826     }
5827 
5828     return NXT_UNIT_OK;
5829 }
5830 
5831 
5832 static ssize_t
5833 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5834     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
5835 {
5836     int                   notify;
5837     ssize_t               ret;
5838     nxt_int_t             rc;
5839     nxt_port_msg_t        msg;
5840     nxt_unit_impl_t       *lib;
5841     nxt_unit_port_impl_t  *port_impl;
5842 
5843     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5844 
5845     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5846     if (port_impl->queue != NULL && oob_size == 0
5847         && buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
5848     {
5849         rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, &notify);
5850         if (nxt_slow_path(rc != NXT_OK)) {
5851             nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5852                            (int) port->id.pid, (int) port->id.id);
5853 
5854             return -1;
5855         }
5856 
5857         nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d",
5858                        (int) port->id.pid, (int) port->id.id,
5859                        (int) buf_size, notify);
5860 
5861         if (notify) {
5862             memcpy(&msg, buf, sizeof(nxt_port_msg_t));
5863 
5864             msg.type = _NXT_PORT_MSG_READ_QUEUE;
5865 
5866             if (lib->callbacks.port_send == NULL) {
5867                 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
5868                                        sizeof(nxt_port_msg_t), NULL, 0);
5869 
5870                 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
5871                                (int) port->id.pid, (int) port->id.id,
5872                                (int) ret);
5873 
5874             } else {
5875                 ret = lib->callbacks.port_send(ctx, port, &msg,
5876                                                sizeof(nxt_port_msg_t), NULL, 0);
5877 
5878                 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue",
5879                                (int) port->id.pid, (int) port->id.id,
5880                                (int) ret);
5881             }
5882 
5883         }
5884 
5885         return buf_size;
5886     }
5887 
5888     if (port_impl->queue != NULL) {
5889         msg.type = _NXT_PORT_MSG_READ_SOCKET;
5890 
5891         rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, &notify);
5892         if (nxt_slow_path(rc != NXT_OK)) {
5893             nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5894                            (int) port->id.pid, (int) port->id.id);
5895 
5896             return -1;
5897         }
5898 
5899         nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d",
5900                        (int) port->id.pid, (int) port->id.id, notify);
5901     }
5902 
5903     if (lib->callbacks.port_send != NULL) {
5904         ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
5905                                        oob, oob_size);
5906 
5907         nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
5908                        (int) port->id.pid, (int) port->id.id,
5909                        (int) ret);
5910 
5911     } else {
5912         ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
5913                                oob, oob_size);
5914 
5915         nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
5916                        (int) port->id.pid, (int) port->id.id,
5917                        (int) ret);
5918     }
5919 
5920     return ret;
5921 }
5922 
5923 
5924 static ssize_t
5925 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
5926     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
5927 {
5928     int            err;
5929     ssize_t        res;
5930     struct iovec   iov[1];
5931     struct msghdr  msg;
5932 
5933     iov[0].iov_base = (void *) buf;
5934     iov[0].iov_len = buf_size;
5935 
5936     msg.msg_name = NULL;
5937     msg.msg_namelen = 0;
5938     msg.msg_iov = iov;
5939     msg.msg_iovlen = 1;
5940     msg.msg_flags = 0;
5941     msg.msg_control = (void *) oob;
5942     msg.msg_controllen = oob_size;
5943 
5944 retry:
5945 
5946     res = sendmsg(fd, &msg, 0);
5947 
5948     if (nxt_slow_path(res == -1)) {
5949         err = errno;
5950 
5951         if (err == EINTR) {
5952             goto retry;
5953         }
5954 
5955         /*
5956          * FIXME: This should be "alert" after router graceful shutdown
5957          * implementation.
5958          */
5959         nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
5960                       fd, (int) buf_size, strerror(err), err);
5961 
5962     } else {
5963         nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
5964                        (int) res);
5965     }
5966 
5967     return res;
5968 }
5969 
5970 
5971 static int
5972 nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5973     nxt_unit_read_buf_t *rbuf)
5974 {
5975     int                   res, read;
5976     nxt_unit_port_impl_t  *port_impl;
5977 
5978     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5979 
5980     read = 0;
5981 
5982 retry:
5983 
5984     if (port_impl->from_socket > 0) {
5985         if (port_impl->socket_rbuf != NULL
5986             && port_impl->socket_rbuf->size > 0)
5987         {
5988             port_impl->from_socket--;
5989 
5990             nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf);
5991             port_impl->socket_rbuf->size = 0;
5992 
5993             nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d",
5994                            (int) port->id.pid, (int) port->id.id,
5995                            (int) rbuf->size);
5996 
5997             return NXT_UNIT_OK;
5998         }
5999 
6000     } else {
6001         res = nxt_unit_port_queue_recv(port, rbuf);
6002 
6003         if (res == NXT_UNIT_OK) {
6004             if (nxt_unit_is_read_socket(rbuf)) {
6005                 port_impl->from_socket++;
6006 
6007                 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
6008                                (int) port->id.pid, (int) port->id.id,
6009                                port_impl->from_socket);
6010 
6011                 goto retry;
6012             }
6013 
6014             nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
6015                            (int) port->id.pid, (int) port->id.id,
6016                            (int) rbuf->size);
6017 
6018             return NXT_UNIT_OK;
6019         }
6020     }
6021 
6022     if (read) {
6023         return NXT_UNIT_AGAIN;
6024     }
6025 
6026     res = nxt_unit_port_recv(ctx, port, rbuf);
6027     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6028         return NXT_UNIT_ERROR;
6029     }
6030 
6031     read = 1;
6032 
6033     if (nxt_unit_is_read_queue(rbuf)) {
6034         nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
6035                        (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6036 
6037         goto retry;
6038     }
6039 
6040     nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d",
6041                    (int) port->id.pid, (int) port->id.id,
6042                    (int) rbuf->size);
6043 
6044     if (res == NXT_UNIT_AGAIN) {
6045         return NXT_UNIT_AGAIN;
6046     }
6047 
6048     if (port_impl->from_socket > 0) {
6049         port_impl->from_socket--;
6050 
6051         return NXT_UNIT_OK;
6052     }
6053 
6054     nxt_unit_debug(ctx, "port{%d,%d} suspend message %d",
6055                    (int) port->id.pid, (int) port->id.id,
6056                    (int) rbuf->size);
6057 
6058     if (port_impl->socket_rbuf == NULL) {
6059         port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx);
6060 
6061         if (nxt_slow_path(port_impl->socket_rbuf == NULL)) {
6062             return NXT_UNIT_ERROR;
6063         }
6064 
6065         port_impl->socket_rbuf->size = 0;
6066     }
6067 
6068     if (port_impl->socket_rbuf->size > 0) {
6069         nxt_unit_alert(ctx, "too many port socket messages");
6070 
6071         return NXT_UNIT_ERROR;
6072     }
6073 
6074     nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
6075 
6076     memset(rbuf->oob, 0, sizeof(struct cmsghdr));
6077 
6078     goto retry;
6079 }
6080 
6081 
6082 nxt_inline void
6083 nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
6084 {
6085     memcpy(dst->buf, src->buf, src->size);
6086     dst->size = src->size;
6087     memcpy(dst->oob, src->oob, sizeof(src->oob));
6088 }
6089 
6090 
6091 static int
6092 nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6093     nxt_unit_read_buf_t *rbuf)
6094 {
6095     int                   res;
6096     nxt_unit_port_impl_t  *port_impl;
6097 
6098     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6099 
6100 retry:
6101 
6102     res = nxt_unit_app_queue_recv(port, rbuf);
6103 
6104     if (res == NXT_UNIT_AGAIN) {
6105         res = nxt_unit_port_recv(ctx, port, rbuf);
6106         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6107             return NXT_UNIT_ERROR;
6108         }
6109 
6110         if (nxt_unit_is_read_queue(rbuf)) {
6111             nxt_app_queue_notification_received(port_impl->queue);
6112 
6113             nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
6114                            (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6115 
6116             goto retry;
6117         }
6118     }
6119 
6120     return res;
6121 }
6122 
6123 
6124 static int
6125 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6126     nxt_unit_read_buf_t *rbuf)
6127 {
6128     int              fd, err;
6129     struct iovec     iov[1];
6130     struct msghdr    msg;
6131     nxt_unit_impl_t  *lib;
6132 
6133     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6134 
6135     if (lib->callbacks.port_recv != NULL) {
6136         rbuf->size = lib->callbacks.port_recv(ctx, port,
6137                                               rbuf->buf, sizeof(rbuf->buf),
6138                                               rbuf->oob, sizeof(rbuf->oob));
6139 
6140         nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
6141                        (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6142 
6143         if (nxt_slow_path(rbuf->size < 0)) {
6144             return NXT_UNIT_ERROR;
6145         }
6146 
6147         return NXT_UNIT_OK;
6148     }
6149 
6150     iov[0].iov_base = rbuf->buf;
6151     iov[0].iov_len = sizeof(rbuf->buf);
6152 
6153     msg.msg_name = NULL;
6154     msg.msg_namelen = 0;
6155     msg.msg_iov = iov;
6156     msg.msg_iovlen = 1;
6157     msg.msg_flags = 0;
6158     msg.msg_control = rbuf->oob;
6159     msg.msg_controllen = sizeof(rbuf->oob);
6160 
6161     fd = port->in_fd;
6162 
6163 retry:
6164 
6165     rbuf->size = recvmsg(fd, &msg, 0);
6166 
6167     if (nxt_slow_path(rbuf->size == -1)) {
6168         err = errno;
6169 
6170         if (err == EINTR) {
6171             goto retry;
6172         }
6173 
6174         if (err == EAGAIN) {
6175             nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
6176                            fd, strerror(err), err);
6177 
6178             return NXT_UNIT_AGAIN;
6179         }
6180 
6181         nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
6182                        fd, strerror(err), err);
6183 
6184         return NXT_UNIT_ERROR;
6185     }
6186 
6187     nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
6188 
6189     return NXT_UNIT_OK;
6190 }
6191 
6192 
6193 static int
6194 nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
6195 {
6196     nxt_unit_port_impl_t  *port_impl;
6197 
6198     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6199 
6200     rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
6201 
6202     return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6203 }
6204 
6205 
6206 static int
6207 nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
6208 {
6209     uint32_t              cookie;
6210     nxt_port_msg_t        *port_msg;
6211     nxt_app_queue_t       *queue;
6212     nxt_unit_port_impl_t  *port_impl;
6213 
6214     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6215     queue = port_impl->queue;
6216 
6217 retry:
6218 
6219     rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
6220 
6221     nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
6222 
6223     if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
6224         port_msg = (nxt_port_msg_t *) rbuf->buf;
6225 
6226         if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
6227             return NXT_UNIT_OK;
6228         }
6229 
6230         nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
6231 
6232         goto retry;
6233     }
6234 
6235     return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6236 }
6237 
6238 
6239 nxt_inline int
6240 nxt_unit_close(int fd)
6241 {
6242     int  res;
6243 
6244     res = close(fd);
6245 
6246     if (nxt_slow_path(res == -1)) {
6247         nxt_unit_alert(NULL, "close(%d) failed: %s (%d)",
6248                        fd, strerror(errno), errno);
6249 
6250     } else {
6251         nxt_unit_debug(NULL, "close(%d): %d", fd, res);
6252     }
6253 
6254     return res;
6255 }
6256 
6257 
6258 static int
6259 nxt_unit_fd_blocking(int fd)
6260 {
6261     int  nb;
6262 
6263     nb = 0;
6264 
6265     if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) {
6266         nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
6267                        fd, strerror(errno), errno);
6268 
6269         return NXT_UNIT_ERROR;
6270     }
6271 
6272     return NXT_UNIT_OK;
6273 }
6274 
6275 
6276 static nxt_int_t
6277 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6278 {
6279     nxt_unit_port_t          *port;
6280     nxt_unit_port_hash_id_t  *port_id;
6281 
6282     port = data;
6283     port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
6284 
6285     if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
6286         && port_id->pid == port->id.pid
6287         && port_id->id == port->id.id)
6288     {
6289         return NXT_OK;
6290     }
6291 
6292     return NXT_DECLINED;
6293 }
6294 
6295 
6296 static const nxt_lvlhsh_proto_t  lvlhsh_ports_proto  nxt_aligned(64) = {
6297     NXT_LVLHSH_DEFAULT,
6298     nxt_unit_port_hash_test,
6299     nxt_unit_lvlhsh_alloc,
6300     nxt_unit_lvlhsh_free,
6301 };
6302 
6303 
6304 static inline void
6305 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
6306     nxt_unit_port_hash_id_t *port_hash_id,
6307     nxt_unit_port_id_t *port_id)
6308 {
6309     port_hash_id->pid = port_id->pid;
6310     port_hash_id->id = port_id->id;
6311 
6312     if (nxt_fast_path(port_id->hash != 0)) {
6313         lhq->key_hash = port_id->hash;
6314 
6315     } else {
6316         lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
6317 
6318         port_id->hash = lhq->key_hash;
6319 
6320         nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
6321                        (int) port_id->pid, (int) port_id->id,
6322                        (int) port_id->hash);
6323     }
6324 
6325     lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
6326     lhq->key.start = (u_char *) port_hash_id;
6327     lhq->proto = &lvlhsh_ports_proto;
6328     lhq->pool = NULL;
6329 }
6330 
6331 
6332 static int
6333 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
6334 {
6335     nxt_int_t                res;
6336     nxt_lvlhsh_query_t       lhq;
6337     nxt_unit_port_hash_id_t  port_hash_id;
6338 
6339     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
6340     lhq.replace = 0;
6341     lhq.value = port;
6342 
6343     res = nxt_lvlhsh_insert(port_hash, &lhq);
6344 
6345     switch (res) {
6346 
6347     case NXT_OK:
6348         return NXT_UNIT_OK;
6349 
6350     default:
6351         return NXT_UNIT_ERROR;
6352     }
6353 }
6354 
6355 
6356 static nxt_unit_port_t *
6357 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
6358     int remove)
6359 {
6360     nxt_int_t                res;
6361     nxt_lvlhsh_query_t       lhq;
6362     nxt_unit_port_hash_id_t  port_hash_id;
6363 
6364     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
6365 
6366     if (remove) {
6367         res = nxt_lvlhsh_delete(port_hash, &lhq);
6368 
6369     } else {
6370         res = nxt_lvlhsh_find(port_hash, &lhq);
6371     }
6372 
6373     switch (res) {
6374 
6375     case NXT_OK:
6376         if (!remove) {
6377             nxt_unit_port_use(lhq.value);
6378         }
6379 
6380         return lhq.value;
6381 
6382     default:
6383         return NULL;
6384     }
6385 }
6386 
6387 
6388 static nxt_int_t
6389 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6390 {
6391     return NXT_OK;
6392 }
6393 
6394 
6395 static const nxt_lvlhsh_proto_t  lvlhsh_requests_proto  nxt_aligned(64) = {
6396     NXT_LVLHSH_DEFAULT,
6397     nxt_unit_request_hash_test,
6398     nxt_unit_lvlhsh_alloc,
6399     nxt_unit_lvlhsh_free,
6400 };
6401 
6402 
6403 static int
6404 nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
6405     nxt_unit_request_info_t *req)
6406 {
6407     uint32_t                      *stream;
6408     nxt_int_t                     res;
6409     nxt_lvlhsh_query_t            lhq;
6410     nxt_unit_ctx_impl_t           *ctx_impl;
6411     nxt_unit_request_info_impl_t  *req_impl;
6412 
6413     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6414     if (req_impl->in_hash) {
6415         return NXT_UNIT_OK;
6416     }
6417 
6418     stream = &req_impl->stream;
6419 
6420     lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
6421     lhq.key.length = sizeof(*stream);
6422     lhq.key.start = (u_char *) stream;
6423     lhq.proto = &lvlhsh_requests_proto;
6424     lhq.pool = NULL;
6425     lhq.replace = 0;
6426     lhq.value = req_impl;
6427 
6428     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6429 
6430     pthread_mutex_lock(&ctx_impl->mutex);
6431 
6432     res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq);
6433 
6434     pthread_mutex_unlock(&ctx_impl->mutex);
6435 
6436     switch (res) {
6437 
6438     case NXT_OK:
6439         req_impl->in_hash = 1;
6440         return NXT_UNIT_OK;
6441 
6442     default:
6443         return NXT_UNIT_ERROR;
6444     }
6445 }
6446 
6447 
6448 static nxt_unit_request_info_t *
6449 nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
6450 {
6451     nxt_int_t                     res;
6452     nxt_lvlhsh_query_t            lhq;
6453     nxt_unit_ctx_impl_t           *ctx_impl;
6454     nxt_unit_request_info_impl_t  *req_impl;
6455 
6456     lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
6457     lhq.key.length = sizeof(stream);
6458     lhq.key.start = (u_char *) &stream;
6459     lhq.proto = &lvlhsh_requests_proto;
6460     lhq.pool = NULL;
6461 
6462     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6463 
6464     pthread_mutex_lock(&ctx_impl->mutex);
6465 
6466     if (remove) {
6467         res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
6468 
6469     } else {
6470         res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq);
6471     }
6472 
6473     pthread_mutex_unlock(&ctx_impl->mutex);
6474 
6475     switch (res) {
6476 
6477     case NXT_OK:
6478         req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
6479                                     req);
6480         if (remove) {
6481             req_impl->in_hash = 0;
6482         }
6483 
6484         return lhq.value;
6485 
6486     default:
6487         return NULL;
6488     }
6489 }
6490 
6491 
6492 void
6493 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
6494 {
6495     int              log_fd, n;
6496     char             msg[NXT_MAX_ERROR_STR], *p, *end;
6497     pid_t            pid;
6498     va_list          ap;
6499     nxt_unit_impl_t  *lib;
6500 
6501     if (nxt_fast_path(ctx != NULL)) {
6502         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6503 
6504         pid = lib->pid;
6505         log_fd = lib->log_fd;
6506 
6507     } else {
6508         pid = getpid();
6509         log_fd = STDERR_FILENO;
6510     }
6511 
6512     p = msg;
6513     end = p + sizeof(msg) - 1;
6514 
6515     p = nxt_unit_snprint_prefix(p, end, pid, level);
6516 
6517     va_start(ap, fmt);
6518     p += vsnprintf(p, end - p, fmt, ap);
6519     va_end(ap);
6520 
6521     if (nxt_slow_path(p > end)) {
6522         memcpy(end - 5, "[...]", 5);
6523         p = end;
6524     }
6525 
6526     *p++ = '\n';
6527 
6528     n = write(log_fd, msg, p - msg);
6529     if (nxt_slow_path(n < 0)) {
6530         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6531     }
6532 }
6533 
6534 
6535 void
6536 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
6537 {
6538     int                           log_fd, n;
6539     char                          msg[NXT_MAX_ERROR_STR], *p, *end;
6540     pid_t                         pid;
6541     va_list                       ap;
6542     nxt_unit_impl_t               *lib;
6543     nxt_unit_request_info_impl_t  *req_impl;
6544 
6545     if (nxt_fast_path(req != NULL)) {
6546         lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
6547 
6548         pid = lib->pid;
6549         log_fd = lib->log_fd;
6550 
6551     } else {
6552         pid = getpid();
6553         log_fd = STDERR_FILENO;
6554     }
6555 
6556     p = msg;
6557     end = p + sizeof(msg) - 1;
6558 
6559     p = nxt_unit_snprint_prefix(p, end, pid, level);
6560 
6561     if (nxt_fast_path(req != NULL)) {
6562         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6563 
6564         p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
6565     }
6566 
6567     va_start(ap, fmt);
6568     p += vsnprintf(p, end - p, fmt, ap);
6569     va_end(ap);
6570 
6571     if (nxt_slow_path(p > end)) {
6572         memcpy(end - 5, "[...]", 5);
6573         p = end;
6574     }
6575 
6576     *p++ = '\n';
6577 
6578     n = write(log_fd, msg, p - msg);
6579     if (nxt_slow_path(n < 0)) {
6580         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6581     }
6582 }
6583 
6584 
6585 static const char * nxt_unit_log_levels[] = {
6586     "alert",
6587     "error",
6588     "warn",
6589     "notice",
6590     "info",
6591     "debug",
6592 };
6593 
6594 
6595 static char *
6596 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
6597 {
6598     struct tm        tm;
6599     struct timespec  ts;
6600 
6601     (void) clock_gettime(CLOCK_REALTIME, &ts);
6602 
6603 #if (NXT_HAVE_LOCALTIME_R)
6604     (void) localtime_r(&ts.tv_sec, &tm);
6605 #else
6606     tm = *localtime(&ts.tv_sec);
6607 #endif
6608 
6609 #if (NXT_DEBUG)
6610     p += snprintf(p, end - p,
6611                   "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
6612                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6613                   tm.tm_hour, tm.tm_min, tm.tm_sec,
6614                   (int) ts.tv_nsec / 1000000);
6615 #else
6616     p += snprintf(p, end - p,
6617                   "%4d/%02d/%02d %02d:%02d:%02d ",
6618                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6619                   tm.tm_hour, tm.tm_min, tm.tm_sec);
6620 #endif
6621 
6622     p += snprintf(p, end - p,
6623                   "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
6624                   (int) pid,
6625                   (uint64_t) (uintptr_t) nxt_thread_get_tid());
6626 
6627     return p;
6628 }
6629 
6630 
6631 static void *
6632 nxt_unit_lvlhsh_alloc(void *data, size_t size)
6633 {
6634     int   err;
6635     void  *p;
6636 
6637     err = posix_memalign(&p, size, size);
6638 
6639     if (nxt_fast_path(err == 0)) {
6640         nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p",
6641                        (int) size, (int) size, p);
6642         return p;
6643     }
6644 
6645     nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)",
6646                    (int) size, (int) size, strerror(err), err);
6647     return NULL;
6648 }
6649 
6650 
6651 static void
6652 nxt_unit_lvlhsh_free(void *data, void *p)
6653 {
6654     nxt_unit_free(NULL, p);
6655 }
6656 
6657 
6658 void *
6659 nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size)
6660 {
6661     void  *p;
6662 
6663     p = malloc(size);
6664 
6665     if (nxt_fast_path(p != NULL)) {
6666 #if (NXT_DEBUG_ALLOC)
6667         nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p);
6668 #endif
6669 
6670     } else {
6671         nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)",
6672                        (int) size, strerror(errno), errno);
6673     }
6674 
6675     return p;
6676 }
6677 
6678 
6679 void
6680 nxt_unit_free(nxt_unit_ctx_t *ctx, void *p)
6681 {
6682 #if (NXT_DEBUG_ALLOC)
6683     nxt_unit_debug(ctx, "free(%p)", p);
6684 #endif
6685 
6686     free(p);
6687 }
6688 
6689 
6690 static int
6691 nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length)
6692 {
6693     u_char        c1, c2;
6694     nxt_int_t     n;
6695     const u_char  *s1, *s2;
6696 
6697     s1 = p1;
6698     s2 = p2;
6699 
6700     while (length-- != 0) {
6701         c1 = *s1++;
6702         c2 = *s2++;
6703 
6704         c1 = nxt_lowcase(c1);
6705         c2 = nxt_lowcase(c2);
6706 
6707         n = c1 - c2;
6708 
6709         if (n != 0) {
6710             return n;
6711         }
6712     }
6713 
6714     return 0;
6715 }
6716