xref: /unit/src/nxt_unit.c (revision 1544:05af370e63b7)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <stdlib.h>
7 
8 #include "nxt_main.h"
9 #include "nxt_port_memory_int.h"
10 
11 #include "nxt_unit.h"
12 #include "nxt_unit_request.h"
13 #include "nxt_unit_response.h"
14 #include "nxt_unit_websocket.h"
15 
16 #include "nxt_websocket.h"
17 
18 #if (NXT_HAVE_MEMFD_CREATE)
19 #include <linux/memfd.h>
20 #endif
21 
22 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
23 #define NXT_UNIT_LOCAL_BUF_SIZE  \
24     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
25 
26 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
27 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
28 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
29 typedef struct nxt_unit_process_s               nxt_unit_process_t;
30 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
31 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
32 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
33 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
34 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
35 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
36 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
37 
38 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
39 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
40     nxt_unit_ctx_impl_t *ctx_impl, void *data);
41 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl);
42 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl);
43 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
44 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
45 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
46     nxt_unit_mmap_buf_t *mmap_buf);
47 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
48     nxt_unit_mmap_buf_t *mmap_buf);
49 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
50 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
51     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
52     int *log_fd, uint32_t *stream, uint32_t *shm_limit);
53 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream);
54 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
55     nxt_unit_recv_msg_t *recv_msg);
56 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
57     nxt_unit_recv_msg_t *recv_msg);
58 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
59     nxt_unit_recv_msg_t *recv_msg);
60 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
61 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
62     nxt_unit_ctx_t *ctx);
63 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
64 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
65 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
66     nxt_unit_ctx_t *ctx);
67 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
68 static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
69 static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
70     nxt_unit_recv_msg_t *recv_msg);
71 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
72 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
73 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
74     nxt_unit_mmap_buf_t *mmap_buf, int last);
75 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
76 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
77 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
78 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
79     nxt_unit_ctx_impl_t *ctx_impl);
80 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
81     nxt_unit_read_buf_t *rbuf);
82 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
83     nxt_unit_request_info_t *req, size_t size);
84 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
85     size_t size);
86 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
87     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
88 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
89 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
90 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
91 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
92     nxt_unit_port_t *port, int n);
93 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
94     int fd);
95 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
96     nxt_unit_port_t *port, uint32_t size,
97     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
98 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
99 
100 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
101 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
102 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
103 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
104 static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
105     nxt_unit_process_t *process, uint32_t id);
106 static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
107     nxt_unit_recv_msg_t *recv_msg);
108 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
109     nxt_unit_recv_msg_t *recv_msg);
110 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
111     nxt_unit_process_t *process,
112     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
113 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
114 
115 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
116     pid_t pid);
117 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
118     pid_t pid, int remove);
119 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
120 static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
121     nxt_unit_read_buf_t *rbuf);
122 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
123 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
124 
125 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
126     nxt_unit_port_t *port);
127 
128 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
129 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
130 nxt_inline nxt_unit_process_t *nxt_unit_port_process(nxt_unit_port_t *port);
131 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
132     nxt_unit_port_t *port);
133 static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
134     nxt_unit_port_id_t *port_id);
135 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
136     nxt_unit_port_id_t *port_id);
137 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
138 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
139     nxt_unit_process_t *process);
140 static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
141 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
142     nxt_unit_port_t *port, const void *buf, size_t buf_size,
143     const void *oob, size_t oob_size);
144 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
145     const void *buf, size_t buf_size, const void *oob, size_t oob_size);
146 static ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *ctx,
147     nxt_unit_port_t *port, void *buf, size_t buf_size,
148     void *oob, size_t oob_size);
149 
150 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
151     nxt_unit_port_t *port);
152 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
153     nxt_unit_port_id_t *port_id, int remove);
154 
155 static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
156     nxt_unit_request_info_impl_t *req_impl);
157 static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
158     nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
159 
160 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
161 
162 
163 struct nxt_unit_mmap_buf_s {
164     nxt_unit_buf_t           buf;
165 
166     nxt_unit_mmap_buf_t      *next;
167     nxt_unit_mmap_buf_t      **prev;
168 
169     nxt_port_mmap_header_t   *hdr;
170     nxt_unit_request_info_t  *req;
171     nxt_unit_ctx_impl_t      *ctx_impl;
172     nxt_unit_process_t       *process;
173     char                     *free_ptr;
174     char                     *plain_ptr;
175 };
176 
177 
178 struct nxt_unit_recv_msg_s {
179     uint32_t                 stream;
180     nxt_pid_t                pid;
181     nxt_port_id_t            reply_port;
182 
183     uint8_t                  last;      /* 1 bit */
184     uint8_t                  mmap;      /* 1 bit */
185 
186     void                     *start;
187     uint32_t                 size;
188 
189     int                      fd;
190     nxt_unit_process_t       *process;
191 
192     nxt_unit_mmap_buf_t      *incoming_buf;
193 };
194 
195 
196 typedef enum {
197     NXT_UNIT_RS_START           = 0,
198     NXT_UNIT_RS_RESPONSE_INIT,
199     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
200     NXT_UNIT_RS_RESPONSE_SENT,
201     NXT_UNIT_RS_RELEASED,
202 } nxt_unit_req_state_t;
203 
204 
205 struct nxt_unit_request_info_impl_s {
206     nxt_unit_request_info_t  req;
207 
208     uint32_t                 stream;
209 
210     nxt_unit_process_t       *process;
211 
212     nxt_unit_mmap_buf_t      *outgoing_buf;
213     nxt_unit_mmap_buf_t      *incoming_buf;
214 
215     nxt_unit_req_state_t     state;
216     uint8_t                  websocket;
217 
218     nxt_queue_link_t         link;
219 
220     char                     extra_data[];
221 };
222 
223 
224 struct nxt_unit_websocket_frame_impl_s {
225     nxt_unit_websocket_frame_t  ws;
226 
227     nxt_unit_mmap_buf_t         *buf;
228 
229     nxt_queue_link_t            link;
230 
231     nxt_unit_ctx_impl_t         *ctx_impl;
232 };
233 
234 
235 struct nxt_unit_read_buf_s {
236     nxt_unit_read_buf_t           *next;
237     ssize_t                       size;
238     char                          buf[16384];
239     char                          oob[256];
240 };
241 
242 
243 struct nxt_unit_ctx_impl_s {
244     nxt_unit_ctx_t                ctx;
245 
246     nxt_atomic_t                  use_count;
247 
248     pthread_mutex_t               mutex;
249 
250     nxt_unit_port_t               *read_port;
251 
252     nxt_queue_link_t              link;
253 
254     nxt_unit_mmap_buf_t           *free_buf;
255 
256     /*  of nxt_unit_request_info_impl_t */
257     nxt_queue_t                   free_req;
258 
259     /*  of nxt_unit_websocket_frame_impl_t */
260     nxt_queue_t                   free_ws;
261 
262     /*  of nxt_unit_request_info_impl_t */
263     nxt_queue_t                   active_req;
264 
265     /*  of nxt_unit_request_info_impl_t */
266     nxt_lvlhsh_t                  requests;
267 
268     nxt_unit_read_buf_t           *pending_read_head;
269     nxt_unit_read_buf_t           **pending_read_tail;
270     nxt_unit_read_buf_t           *free_read_buf;
271 
272     nxt_unit_mmap_buf_t           ctx_buf[2];
273     nxt_unit_read_buf_t           ctx_read_buf;
274 
275     nxt_unit_request_info_impl_t  req;
276 };
277 
278 
279 struct nxt_unit_impl_s {
280     nxt_unit_t               unit;
281     nxt_unit_callbacks_t     callbacks;
282 
283     nxt_atomic_t             use_count;
284 
285     uint32_t                 request_data_size;
286     uint32_t                 shm_mmap_limit;
287 
288     pthread_mutex_t          mutex;
289 
290     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
291     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
292 
293     nxt_unit_port_t          *router_port;
294 
295     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
296 
297     pid_t                    pid;
298     int                      log_fd;
299     int                      online;
300 
301     nxt_unit_ctx_impl_t      main_ctx;
302 };
303 
304 
305 struct nxt_unit_port_impl_s {
306     nxt_unit_port_t          port;
307 
308     nxt_atomic_t             use_count;
309 
310     nxt_queue_link_t         link;
311     nxt_unit_process_t       *process;
312 };
313 
314 
315 struct nxt_unit_mmap_s {
316     nxt_port_mmap_header_t   *hdr;
317 };
318 
319 
320 struct nxt_unit_mmaps_s {
321     pthread_mutex_t          mutex;
322     uint32_t                 size;
323     uint32_t                 cap;
324     nxt_atomic_t             allocated_chunks;
325     nxt_unit_mmap_t          *elts;
326 };
327 
328 
329 struct nxt_unit_process_s {
330     pid_t                    pid;
331 
332     nxt_queue_t              ports;
333 
334     nxt_unit_mmaps_t         incoming;
335     nxt_unit_mmaps_t         outgoing;
336 
337     nxt_unit_impl_t          *lib;
338 
339     nxt_atomic_t             use_count;
340 
341     uint32_t                 next_port_id;
342 };
343 
344 
345 /* Explicitly using 32 bit types to avoid possible alignment. */
346 typedef struct {
347     int32_t   pid;
348     uint32_t  id;
349 } nxt_unit_port_hash_id_t;
350 
351 
352 nxt_unit_ctx_t *
353 nxt_unit_init(nxt_unit_init_t *init)
354 {
355     int              rc;
356     uint32_t         ready_stream, shm_limit;
357     nxt_unit_ctx_t   *ctx;
358     nxt_unit_impl_t  *lib;
359     nxt_unit_port_t  ready_port, router_port, read_port;
360 
361     lib = nxt_unit_create(init);
362     if (nxt_slow_path(lib == NULL)) {
363         return NULL;
364     }
365 
366     if (init->ready_port.id.pid != 0
367         && init->ready_stream != 0
368         && init->read_port.id.pid != 0)
369     {
370         ready_port = init->ready_port;
371         ready_stream = init->ready_stream;
372         router_port = init->router_port;
373         read_port = init->read_port;
374         lib->log_fd = init->log_fd;
375 
376         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
377                               ready_port.id.id);
378         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
379                               router_port.id.id);
380         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
381                               read_port.id.id);
382 
383     } else {
384         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
385                                &lib->log_fd, &ready_stream, &shm_limit);
386         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
387             goto fail;
388         }
389 
390         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
391                                 / PORT_MMAP_DATA_SIZE;
392     }
393 
394     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
395         lib->shm_mmap_limit = 1;
396     }
397 
398     lib->pid = read_port.id.pid;
399 
400     ctx = &lib->main_ctx.ctx;
401 
402     lib->router_port = nxt_unit_add_port(ctx, &router_port);
403     if (nxt_slow_path(lib->router_port == NULL)) {
404         nxt_unit_alert(NULL, "failed to add router_port");
405 
406         goto fail;
407     }
408 
409     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port);
410     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
411         nxt_unit_alert(NULL, "failed to add read_port");
412 
413         goto fail;
414     }
415 
416     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream);
417     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
418         nxt_unit_alert(NULL, "failed to send READY message");
419 
420         goto fail;
421     }
422 
423     close(ready_port.out_fd);
424 
425     return ctx;
426 
427 fail:
428 
429     nxt_unit_ctx_release(&lib->main_ctx);
430 
431     return NULL;
432 }
433 
434 
435 static nxt_unit_impl_t *
436 nxt_unit_create(nxt_unit_init_t *init)
437 {
438     int                   rc;
439     nxt_unit_impl_t       *lib;
440     nxt_unit_callbacks_t  *cb;
441 
442     lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size);
443     if (nxt_slow_path(lib == NULL)) {
444         nxt_unit_alert(NULL, "failed to allocate unit struct");
445 
446         return NULL;
447     }
448 
449     rc = pthread_mutex_init(&lib->mutex, NULL);
450     if (nxt_slow_path(rc != 0)) {
451         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
452 
453         goto fail;
454     }
455 
456     lib->unit.data = init->data;
457     lib->callbacks = init->callbacks;
458 
459     lib->request_data_size = init->request_data_size;
460     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
461                             / PORT_MMAP_DATA_SIZE;
462 
463     lib->processes.slot = NULL;
464     lib->ports.slot = NULL;
465 
466     lib->log_fd = STDERR_FILENO;
467     lib->online = 1;
468 
469     nxt_queue_init(&lib->contexts);
470 
471     lib->use_count = 0;
472     lib->router_port = NULL;
473 
474     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
475     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
476         goto fail;
477     }
478 
479     cb = &lib->callbacks;
480 
481     if (cb->request_handler == NULL) {
482         nxt_unit_alert(NULL, "request_handler is NULL");
483 
484         goto fail;
485     }
486 
487     return lib;
488 
489 fail:
490 
491     free(lib);
492 
493     return NULL;
494 }
495 
496 
497 static int
498 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
499     void *data)
500 {
501     int  rc;
502 
503     ctx_impl->ctx.data = data;
504     ctx_impl->ctx.unit = &lib->unit;
505 
506     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
507     if (nxt_slow_path(rc != 0)) {
508         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
509 
510         return NXT_UNIT_ERROR;
511     }
512 
513     nxt_unit_lib_use(lib);
514 
515     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
516 
517     ctx_impl->use_count = 1;
518 
519     nxt_queue_init(&ctx_impl->free_req);
520     nxt_queue_init(&ctx_impl->free_ws);
521     nxt_queue_init(&ctx_impl->active_req);
522 
523     ctx_impl->free_buf = NULL;
524     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
525     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
526 
527     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
528 
529     ctx_impl->pending_read_head = NULL;
530     ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
531     ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf;
532     ctx_impl->ctx_read_buf.next = NULL;
533 
534     ctx_impl->req.req.ctx = &ctx_impl->ctx;
535     ctx_impl->req.req.unit = &lib->unit;
536 
537     ctx_impl->read_port = NULL;
538     ctx_impl->requests.slot = 0;
539 
540     return NXT_UNIT_OK;
541 }
542 
543 
544 nxt_inline void
545 nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl)
546 {
547     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
548 }
549 
550 
551 nxt_inline void
552 nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl)
553 {
554     long c;
555 
556     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
557 
558     if (c == 1) {
559         nxt_unit_ctx_free(ctx_impl);
560     }
561 }
562 
563 
564 nxt_inline void
565 nxt_unit_lib_use(nxt_unit_impl_t *lib)
566 {
567     nxt_atomic_fetch_add(&lib->use_count, 1);
568 }
569 
570 
571 nxt_inline void
572 nxt_unit_lib_release(nxt_unit_impl_t *lib)
573 {
574     long                c;
575     nxt_unit_process_t  *process;
576 
577     c = nxt_atomic_fetch_add(&lib->use_count, -1);
578 
579     if (c == 1) {
580         for ( ;; ) {
581             pthread_mutex_lock(&lib->mutex);
582 
583             process = nxt_unit_process_pop_first(lib);
584             if (process == NULL) {
585                 pthread_mutex_unlock(&lib->mutex);
586 
587                 break;
588             }
589 
590             nxt_unit_remove_process(lib, process);
591         }
592 
593         pthread_mutex_destroy(&lib->mutex);
594 
595         if (nxt_fast_path(lib->router_port != NULL)) {
596             nxt_unit_port_release(lib->router_port);
597         }
598 
599         free(lib);
600     }
601 }
602 
603 
604 nxt_inline void
605 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
606     nxt_unit_mmap_buf_t *mmap_buf)
607 {
608     mmap_buf->next = *head;
609 
610     if (mmap_buf->next != NULL) {
611         mmap_buf->next->prev = &mmap_buf->next;
612     }
613 
614     *head = mmap_buf;
615     mmap_buf->prev = head;
616 }
617 
618 
619 nxt_inline void
620 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
621     nxt_unit_mmap_buf_t *mmap_buf)
622 {
623     while (*prev != NULL) {
624         prev = &(*prev)->next;
625     }
626 
627     nxt_unit_mmap_buf_insert(prev, mmap_buf);
628 }
629 
630 
631 nxt_inline void
632 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
633 {
634     nxt_unit_mmap_buf_t  **prev;
635 
636     prev = mmap_buf->prev;
637 
638     if (mmap_buf->next != NULL) {
639         mmap_buf->next->prev = prev;
640     }
641 
642     if (prev != NULL) {
643         *prev = mmap_buf->next;
644     }
645 }
646 
647 
648 static int
649 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
650     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
651     uint32_t *shm_limit)
652 {
653     int       rc;
654     int       ready_fd, router_fd, read_fd;
655     char      *unit_init, *version_end;
656     long      version_length;
657     int64_t   ready_pid, router_pid, read_pid;
658     uint32_t  ready_stream, router_id, ready_id, read_id;
659 
660     unit_init = getenv(NXT_UNIT_INIT_ENV);
661     if (nxt_slow_path(unit_init == NULL)) {
662         nxt_unit_alert(NULL, "%s is not in the current environment",
663                        NXT_UNIT_INIT_ENV);
664 
665         return NXT_UNIT_ERROR;
666     }
667 
668     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
669 
670     version_length = nxt_length(NXT_VERSION);
671 
672     version_end = strchr(unit_init, ';');
673     if (version_end == NULL
674         || version_end - unit_init != version_length
675         || memcmp(unit_init, NXT_VERSION, version_length) != 0)
676     {
677         nxt_unit_alert(NULL, "version check error");
678 
679         return NXT_UNIT_ERROR;
680     }
681 
682     rc = sscanf(version_end + 1,
683                 "%"PRIu32";"
684                 "%"PRId64",%"PRIu32",%d;"
685                 "%"PRId64",%"PRIu32",%d;"
686                 "%"PRId64",%"PRIu32",%d;"
687                 "%d,%"PRIu32,
688                 &ready_stream,
689                 &ready_pid, &ready_id, &ready_fd,
690                 &router_pid, &router_id, &router_fd,
691                 &read_pid, &read_id, &read_fd,
692                 log_fd, shm_limit);
693 
694     if (nxt_slow_path(rc != 12)) {
695         nxt_unit_alert(NULL, "failed to scan variables: %d", rc);
696 
697         return NXT_UNIT_ERROR;
698     }
699 
700     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
701 
702     ready_port->in_fd = -1;
703     ready_port->out_fd = ready_fd;
704     ready_port->data = NULL;
705 
706     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
707 
708     router_port->in_fd = -1;
709     router_port->out_fd = router_fd;
710     router_port->data = NULL;
711 
712     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
713 
714     read_port->in_fd = read_fd;
715     read_port->out_fd = -1;
716     read_port->data = NULL;
717 
718     *stream = ready_stream;
719 
720     return NXT_UNIT_OK;
721 }
722 
723 
724 static int
725 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream)
726 {
727     ssize_t          res;
728     nxt_port_msg_t   msg;
729     nxt_unit_impl_t  *lib;
730 
731     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
732 
733     msg.stream = stream;
734     msg.pid = lib->pid;
735     msg.reply_port = 0;
736     msg.type = _NXT_PORT_MSG_PROCESS_READY;
737     msg.last = 1;
738     msg.mmap = 0;
739     msg.nf = 0;
740     msg.mf = 0;
741     msg.tracking = 0;
742 
743     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0);
744     if (res != sizeof(msg)) {
745         return NXT_UNIT_ERROR;
746     }
747 
748     return NXT_UNIT_OK;
749 }
750 
751 
752 int
753 nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
754     void *buf, size_t buf_size, void *oob, size_t oob_size)
755 {
756     int                  rc;
757     pid_t                pid;
758     struct cmsghdr       *cm;
759     nxt_port_msg_t       *port_msg;
760     nxt_unit_impl_t      *lib;
761     nxt_unit_recv_msg_t  recv_msg;
762 
763     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
764 
765     rc = NXT_UNIT_ERROR;
766     recv_msg.fd = -1;
767     recv_msg.process = NULL;
768     port_msg = buf;
769     cm = oob;
770 
771     if (oob_size >= CMSG_SPACE(sizeof(int))
772         && cm->cmsg_len == CMSG_LEN(sizeof(int))
773         && cm->cmsg_level == SOL_SOCKET
774         && cm->cmsg_type == SCM_RIGHTS)
775     {
776         memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
777     }
778 
779     recv_msg.incoming_buf = NULL;
780 
781     if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
782         nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
783         goto fail;
784     }
785 
786     recv_msg.stream = port_msg->stream;
787     recv_msg.pid = port_msg->pid;
788     recv_msg.reply_port = port_msg->reply_port;
789     recv_msg.last = port_msg->last;
790     recv_msg.mmap = port_msg->mmap;
791 
792     recv_msg.start = port_msg + 1;
793     recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
794 
795     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
796         nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
797                       port_msg->stream, (int) port_msg->type);
798         goto fail;
799     }
800 
801     if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
802         rc = NXT_UNIT_OK;
803 
804         goto fail;
805     }
806 
807     /* Fragmentation is unsupported. */
808     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
809         nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
810                       port_msg->stream, (int) port_msg->type);
811         goto fail;
812     }
813 
814     if (port_msg->mmap) {
815         if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
816             goto fail;
817         }
818     }
819 
820     switch (port_msg->type) {
821 
822     case _NXT_PORT_MSG_QUIT:
823         nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
824 
825         nxt_unit_quit(ctx);
826         rc = NXT_UNIT_OK;
827         break;
828 
829     case _NXT_PORT_MSG_NEW_PORT:
830         rc = nxt_unit_process_new_port(ctx, &recv_msg);
831         break;
832 
833     case _NXT_PORT_MSG_CHANGE_FILE:
834         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
835                        port_msg->stream, recv_msg.fd);
836 
837         if (dup2(recv_msg.fd, lib->log_fd) == -1) {
838             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
839                            port_msg->stream, recv_msg.fd, lib->log_fd,
840                            strerror(errno), errno);
841 
842             goto fail;
843         }
844 
845         rc = NXT_UNIT_OK;
846         break;
847 
848     case _NXT_PORT_MSG_MMAP:
849         if (nxt_slow_path(recv_msg.fd < 0)) {
850             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
851                            port_msg->stream, recv_msg.fd);
852 
853             goto fail;
854         }
855 
856         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
857         break;
858 
859     case _NXT_PORT_MSG_REQ_HEADERS:
860         rc = nxt_unit_process_req_headers(ctx, &recv_msg);
861         break;
862 
863     case _NXT_PORT_MSG_WEBSOCKET:
864         rc = nxt_unit_process_websocket(ctx, &recv_msg);
865         break;
866 
867     case _NXT_PORT_MSG_REMOVE_PID:
868         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
869             nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
870                           "(%d != %d)", port_msg->stream, (int) recv_msg.size,
871                           (int) sizeof(pid));
872 
873             goto fail;
874         }
875 
876         memcpy(&pid, recv_msg.start, sizeof(pid));
877 
878         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
879                        port_msg->stream, (int) pid);
880 
881         nxt_unit_remove_pid(lib, pid);
882 
883         rc = NXT_UNIT_OK;
884         break;
885 
886     case _NXT_PORT_MSG_SHM_ACK:
887         rc = nxt_unit_process_shm_ack(ctx);
888         break;
889 
890     default:
891         nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
892                        port_msg->stream, (int) port_msg->type);
893 
894         goto fail;
895     }
896 
897 fail:
898 
899     if (recv_msg.fd != -1) {
900         close(recv_msg.fd);
901     }
902 
903     while (recv_msg.incoming_buf != NULL) {
904         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
905     }
906 
907     if (recv_msg.process != NULL) {
908         nxt_unit_process_release(recv_msg.process);
909     }
910 
911     return rc;
912 }
913 
914 
915 static int
916 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
917 {
918     int                      nb;
919     nxt_unit_port_t          new_port, *port;
920     nxt_port_msg_new_port_t  *new_port_msg;
921 
922     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
923         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
924                       "invalid message size (%d)",
925                       recv_msg->stream, (int) recv_msg->size);
926 
927         return NXT_UNIT_ERROR;
928     }
929 
930     if (nxt_slow_path(recv_msg->fd < 0)) {
931         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
932                        recv_msg->stream, recv_msg->fd);
933 
934         return NXT_UNIT_ERROR;
935     }
936 
937     new_port_msg = recv_msg->start;
938 
939     nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
940                    recv_msg->stream, (int) new_port_msg->pid,
941                    (int) new_port_msg->id, recv_msg->fd);
942 
943     nb = 0;
944 
945     if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
946         nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
947                        "failed: %s (%d)",
948                        recv_msg->stream, recv_msg->fd, strerror(errno), errno);
949 
950         return NXT_UNIT_ERROR;
951     }
952 
953     nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
954                           new_port_msg->id);
955 
956     new_port.in_fd = -1;
957     new_port.out_fd = recv_msg->fd;
958     new_port.data = NULL;
959 
960     recv_msg->fd = -1;
961 
962     port = nxt_unit_add_port(ctx, &new_port);
963     if (nxt_slow_path(port == NULL)) {
964         return NXT_UNIT_ERROR;
965     }
966 
967     nxt_unit_port_release(port);
968 
969     return NXT_UNIT_OK;
970 }
971 
972 
973 static int
974 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
975 {
976     nxt_unit_impl_t               *lib;
977     nxt_unit_port_t               *port;
978     nxt_unit_port_id_t            port_id;
979     nxt_unit_request_t            *r;
980     nxt_unit_mmap_buf_t           *b;
981     nxt_unit_request_info_t       *req;
982     nxt_unit_request_info_impl_t  *req_impl;
983 
984     if (nxt_slow_path(recv_msg->mmap == 0)) {
985         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
986                       recv_msg->stream);
987 
988         return NXT_UNIT_ERROR;
989     }
990 
991     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
992         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
993                       "%d expected", recv_msg->stream, (int) recv_msg->size,
994                       (int) sizeof(nxt_unit_request_t));
995 
996         return NXT_UNIT_ERROR;
997     }
998 
999     req_impl = nxt_unit_request_info_get(ctx);
1000     if (nxt_slow_path(req_impl == NULL)) {
1001         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1002                       recv_msg->stream);
1003 
1004         return NXT_UNIT_ERROR;
1005     }
1006 
1007     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1008 
1009     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1010 
1011     pthread_mutex_lock(&lib->mutex);
1012 
1013     port = nxt_unit_port_hash_find(&lib->ports, &port_id, 0);
1014 
1015     pthread_mutex_unlock(&lib->mutex);
1016 
1017     if (nxt_slow_path(port == NULL)) {
1018         nxt_unit_alert(ctx, "#%"PRIu32": response port %d,%d not found",
1019                        recv_msg->stream,
1020                        (int) recv_msg->pid, (int) recv_msg->reply_port);
1021 
1022         return NXT_UNIT_ERROR;
1023     }
1024 
1025     req = &req_impl->req;
1026 
1027     req->response_port = port;
1028 
1029     req->request = recv_msg->start;
1030 
1031     b = recv_msg->incoming_buf;
1032 
1033     req->request_buf = &b->buf;
1034     req->response = NULL;
1035     req->response_buf = NULL;
1036 
1037     r = req->request;
1038 
1039     req->content_length = r->content_length;
1040 
1041     req->content_buf = req->request_buf;
1042     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1043 
1044     /* "Move" process reference to req_impl. */
1045     req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
1046     if (nxt_slow_path(req_impl->process == NULL)) {
1047         return NXT_UNIT_ERROR;
1048     }
1049 
1050     recv_msg->process = NULL;
1051 
1052     req_impl->stream = recv_msg->stream;
1053 
1054     req_impl->outgoing_buf = NULL;
1055 
1056     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1057         b->req = req;
1058     }
1059 
1060     /* "Move" incoming buffer list to req_impl. */
1061     req_impl->incoming_buf = recv_msg->incoming_buf;
1062     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1063     recv_msg->incoming_buf = NULL;
1064 
1065     req->content_fd = recv_msg->fd;
1066     recv_msg->fd = -1;
1067 
1068     req->response_max_fields = 0;
1069     req_impl->state = NXT_UNIT_RS_START;
1070     req_impl->websocket = 0;
1071 
1072     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1073                    (int) r->method_length,
1074                    (char *) nxt_unit_sptr_get(&r->method),
1075                    (int) r->target_length,
1076                    (char *) nxt_unit_sptr_get(&r->target),
1077                    (int) r->content_length);
1078 
1079     lib->callbacks.request_handler(req);
1080 
1081     return NXT_UNIT_OK;
1082 }
1083 
1084 
1085 static int
1086 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1087 {
1088     size_t                           hsize;
1089     nxt_unit_impl_t                  *lib;
1090     nxt_unit_mmap_buf_t              *b;
1091     nxt_unit_ctx_impl_t              *ctx_impl;
1092     nxt_unit_callbacks_t             *cb;
1093     nxt_unit_request_info_t          *req;
1094     nxt_unit_request_info_impl_t     *req_impl;
1095     nxt_unit_websocket_frame_impl_t  *ws_impl;
1096 
1097     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1098 
1099     req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
1100                                           recv_msg->last);
1101     if (req_impl == NULL) {
1102         return NXT_UNIT_OK;
1103     }
1104 
1105     req = &req_impl->req;
1106 
1107     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1108     cb = &lib->callbacks;
1109 
1110     if (cb->websocket_handler && recv_msg->size >= 2) {
1111         ws_impl = nxt_unit_websocket_frame_get(ctx);
1112         if (nxt_slow_path(ws_impl == NULL)) {
1113             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1114                           req_impl->stream);
1115 
1116             return NXT_UNIT_ERROR;
1117         }
1118 
1119         ws_impl->ws.req = req;
1120 
1121         ws_impl->buf = NULL;
1122 
1123         if (recv_msg->mmap) {
1124             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1125                 b->req = req;
1126             }
1127 
1128             /* "Move" incoming buffer list to ws_impl. */
1129             ws_impl->buf = recv_msg->incoming_buf;
1130             ws_impl->buf->prev = &ws_impl->buf;
1131             recv_msg->incoming_buf = NULL;
1132 
1133             b = ws_impl->buf;
1134 
1135         } else {
1136             b = nxt_unit_mmap_buf_get(ctx);
1137             if (nxt_slow_path(b == NULL)) {
1138                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1139                                req_impl->stream);
1140 
1141                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1142 
1143                 return NXT_UNIT_ERROR;
1144             }
1145 
1146             b->req = req;
1147             b->buf.start = recv_msg->start;
1148             b->buf.free = b->buf.start;
1149             b->buf.end = b->buf.start + recv_msg->size;
1150 
1151             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1152         }
1153 
1154         ws_impl->ws.header = (void *) b->buf.start;
1155         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1156             ws_impl->ws.header);
1157 
1158         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1159 
1160         if (ws_impl->ws.header->mask) {
1161             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1162 
1163         } else {
1164             ws_impl->ws.mask = NULL;
1165         }
1166 
1167         b->buf.free += hsize;
1168 
1169         ws_impl->ws.content_buf = &b->buf;
1170         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1171 
1172         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1173                            "payload_len=%"PRIu64,
1174                             ws_impl->ws.header->opcode,
1175                             ws_impl->ws.payload_len);
1176 
1177         cb->websocket_handler(&ws_impl->ws);
1178     }
1179 
1180     if (recv_msg->last) {
1181         req_impl->websocket = 0;
1182 
1183         if (cb->close_handler) {
1184             nxt_unit_req_debug(req, "close_handler");
1185 
1186             cb->close_handler(req);
1187 
1188         } else {
1189             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1190         }
1191     }
1192 
1193     return NXT_UNIT_OK;
1194 }
1195 
1196 
1197 static int
1198 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1199 {
1200     nxt_unit_impl_t       *lib;
1201     nxt_unit_callbacks_t  *cb;
1202 
1203     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1204     cb = &lib->callbacks;
1205 
1206     if (cb->shm_ack_handler != NULL) {
1207         cb->shm_ack_handler(ctx);
1208     }
1209 
1210     return NXT_UNIT_OK;
1211 }
1212 
1213 
1214 static nxt_unit_request_info_impl_t *
1215 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1216 {
1217     nxt_unit_impl_t               *lib;
1218     nxt_queue_link_t              *lnk;
1219     nxt_unit_ctx_impl_t           *ctx_impl;
1220     nxt_unit_request_info_impl_t  *req_impl;
1221 
1222     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1223 
1224     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1225 
1226     pthread_mutex_lock(&ctx_impl->mutex);
1227 
1228     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1229         pthread_mutex_unlock(&ctx_impl->mutex);
1230 
1231         req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
1232                           + lib->request_data_size);
1233         if (nxt_slow_path(req_impl == NULL)) {
1234             return NULL;
1235         }
1236 
1237         req_impl->req.unit = ctx->unit;
1238         req_impl->req.ctx = ctx;
1239 
1240         pthread_mutex_lock(&ctx_impl->mutex);
1241 
1242     } else {
1243         lnk = nxt_queue_first(&ctx_impl->free_req);
1244         nxt_queue_remove(lnk);
1245 
1246         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1247     }
1248 
1249     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1250 
1251     pthread_mutex_unlock(&ctx_impl->mutex);
1252 
1253     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1254 
1255     return req_impl;
1256 }
1257 
1258 
1259 static void
1260 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1261 {
1262     nxt_unit_ctx_impl_t           *ctx_impl;
1263     nxt_unit_request_info_impl_t  *req_impl;
1264 
1265     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1266     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1267 
1268     req->response = NULL;
1269     req->response_buf = NULL;
1270 
1271     if (req_impl->websocket) {
1272         nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
1273 
1274         req_impl->websocket = 0;
1275     }
1276 
1277     while (req_impl->outgoing_buf != NULL) {
1278         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1279     }
1280 
1281     while (req_impl->incoming_buf != NULL) {
1282         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1283     }
1284 
1285     if (req->content_fd != -1) {
1286         close(req->content_fd);
1287 
1288         req->content_fd = -1;
1289     }
1290 
1291     /*
1292      * Process release should go after buffers release to guarantee mmap
1293      * existence.
1294      */
1295     if (req_impl->process != NULL) {
1296         nxt_unit_process_release(req_impl->process);
1297 
1298         req_impl->process = NULL;
1299     }
1300 
1301     if (req->response_port != NULL) {
1302         nxt_unit_port_release(req->response_port);
1303 
1304         req->response_port = NULL;
1305     }
1306 
1307     pthread_mutex_lock(&ctx_impl->mutex);
1308 
1309     nxt_queue_remove(&req_impl->link);
1310 
1311     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1312 
1313     pthread_mutex_unlock(&ctx_impl->mutex);
1314 
1315     req_impl->state = NXT_UNIT_RS_RELEASED;
1316 }
1317 
1318 
1319 static void
1320 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1321 {
1322     nxt_unit_ctx_impl_t  *ctx_impl;
1323 
1324     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1325 
1326     nxt_queue_remove(&req_impl->link);
1327 
1328     if (req_impl != &ctx_impl->req) {
1329         free(req_impl);
1330     }
1331 }
1332 
1333 
1334 static nxt_unit_websocket_frame_impl_t *
1335 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1336 {
1337     nxt_queue_link_t                 *lnk;
1338     nxt_unit_ctx_impl_t              *ctx_impl;
1339     nxt_unit_websocket_frame_impl_t  *ws_impl;
1340 
1341     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1342 
1343     pthread_mutex_lock(&ctx_impl->mutex);
1344 
1345     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1346         pthread_mutex_unlock(&ctx_impl->mutex);
1347 
1348         ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
1349         if (nxt_slow_path(ws_impl == NULL)) {
1350             return NULL;
1351         }
1352 
1353     } else {
1354         lnk = nxt_queue_first(&ctx_impl->free_ws);
1355         nxt_queue_remove(lnk);
1356 
1357         pthread_mutex_unlock(&ctx_impl->mutex);
1358 
1359         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1360     }
1361 
1362     ws_impl->ctx_impl = ctx_impl;
1363 
1364     return ws_impl;
1365 }
1366 
1367 
1368 static void
1369 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1370 {
1371     nxt_unit_websocket_frame_impl_t  *ws_impl;
1372 
1373     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1374 
1375     while (ws_impl->buf != NULL) {
1376         nxt_unit_mmap_buf_free(ws_impl->buf);
1377     }
1378 
1379     ws->req = NULL;
1380 
1381     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1382 
1383     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1384 
1385     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1386 }
1387 
1388 
1389 static void
1390 nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
1391 {
1392     nxt_queue_remove(&ws_impl->link);
1393 
1394     free(ws_impl);
1395 }
1396 
1397 
1398 uint16_t
1399 nxt_unit_field_hash(const char *name, size_t name_length)
1400 {
1401     u_char      ch;
1402     uint32_t    hash;
1403     const char  *p, *end;
1404 
1405     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1406     end = name + name_length;
1407 
1408     for (p = name; p < end; p++) {
1409         ch = *p;
1410         hash = (hash << 4) + hash + nxt_lowcase(ch);
1411     }
1412 
1413     hash = (hash >> 16) ^ hash;
1414 
1415     return hash;
1416 }
1417 
1418 
1419 void
1420 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1421 {
1422     uint32_t            i, j;
1423     nxt_unit_field_t    *fields, f;
1424     nxt_unit_request_t  *r;
1425 
1426     nxt_unit_req_debug(req, "group_dup_fields");
1427 
1428     r = req->request;
1429     fields = r->fields;
1430 
1431     for (i = 0; i < r->fields_count; i++) {
1432 
1433         switch (fields[i].hash) {
1434         case NXT_UNIT_HASH_CONTENT_LENGTH:
1435             r->content_length_field = i;
1436             break;
1437 
1438         case NXT_UNIT_HASH_CONTENT_TYPE:
1439             r->content_type_field = i;
1440             break;
1441 
1442         case NXT_UNIT_HASH_COOKIE:
1443             r->cookie_field = i;
1444             break;
1445         };
1446 
1447         for (j = i + 1; j < r->fields_count; j++) {
1448             if (fields[i].hash != fields[j].hash) {
1449                 continue;
1450             }
1451 
1452             if (j == i + 1) {
1453                 continue;
1454             }
1455 
1456             f = fields[j];
1457             f.name.offset += (j - (i + 1)) * sizeof(f);
1458             f.value.offset += (j - (i + 1)) * sizeof(f);
1459 
1460             while (j > i + 1) {
1461                 fields[j] = fields[j - 1];
1462                 fields[j].name.offset -= sizeof(f);
1463                 fields[j].value.offset -= sizeof(f);
1464                 j--;
1465             }
1466 
1467             fields[j] = f;
1468 
1469             i++;
1470         }
1471     }
1472 }
1473 
1474 
1475 int
1476 nxt_unit_response_init(nxt_unit_request_info_t *req,
1477     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
1478 {
1479     uint32_t                      buf_size;
1480     nxt_unit_buf_t                *buf;
1481     nxt_unit_request_info_impl_t  *req_impl;
1482 
1483     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1484 
1485     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1486         nxt_unit_req_warn(req, "init: response already sent");
1487 
1488         return NXT_UNIT_ERROR;
1489     }
1490 
1491     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
1492                        (int) max_fields_count, (int) max_fields_size);
1493 
1494     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
1495         nxt_unit_req_debug(req, "duplicate response init");
1496     }
1497 
1498     /*
1499      * Each field name and value 0-terminated by libunit,
1500      * this is the reason of '+ 2' below.
1501      */
1502     buf_size = sizeof(nxt_unit_response_t)
1503                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
1504                + max_fields_size;
1505 
1506     if (nxt_slow_path(req->response_buf != NULL)) {
1507         buf = req->response_buf;
1508 
1509         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
1510             goto init_response;
1511         }
1512 
1513         nxt_unit_buf_free(buf);
1514 
1515         req->response_buf = NULL;
1516         req->response = NULL;
1517         req->response_max_fields = 0;
1518 
1519         req_impl->state = NXT_UNIT_RS_START;
1520     }
1521 
1522     buf = nxt_unit_response_buf_alloc(req, buf_size);
1523     if (nxt_slow_path(buf == NULL)) {
1524         return NXT_UNIT_ERROR;
1525     }
1526 
1527 init_response:
1528 
1529     memset(buf->start, 0, sizeof(nxt_unit_response_t));
1530 
1531     req->response_buf = buf;
1532 
1533     req->response = (nxt_unit_response_t *) buf->start;
1534     req->response->status = status;
1535 
1536     buf->free = buf->start + sizeof(nxt_unit_response_t)
1537                 + max_fields_count * sizeof(nxt_unit_field_t);
1538 
1539     req->response_max_fields = max_fields_count;
1540     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
1541 
1542     return NXT_UNIT_OK;
1543 }
1544 
1545 
1546 int
1547 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
1548     uint32_t max_fields_count, uint32_t max_fields_size)
1549 {
1550     char                          *p;
1551     uint32_t                      i, buf_size;
1552     nxt_unit_buf_t                *buf;
1553     nxt_unit_field_t              *f, *src;
1554     nxt_unit_response_t           *resp;
1555     nxt_unit_request_info_impl_t  *req_impl;
1556 
1557     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1558 
1559     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1560         nxt_unit_req_warn(req, "realloc: response not init");
1561 
1562         return NXT_UNIT_ERROR;
1563     }
1564 
1565     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1566         nxt_unit_req_warn(req, "realloc: response already sent");
1567 
1568         return NXT_UNIT_ERROR;
1569     }
1570 
1571     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
1572         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
1573 
1574         return NXT_UNIT_ERROR;
1575     }
1576 
1577     /*
1578      * Each field name and value 0-terminated by libunit,
1579      * this is the reason of '+ 2' below.
1580      */
1581     buf_size = sizeof(nxt_unit_response_t)
1582                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
1583                + max_fields_size;
1584 
1585     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
1586 
1587     buf = nxt_unit_response_buf_alloc(req, buf_size);
1588     if (nxt_slow_path(buf == NULL)) {
1589         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
1590         return NXT_UNIT_ERROR;
1591     }
1592 
1593     resp = (nxt_unit_response_t *) buf->start;
1594 
1595     memset(resp, 0, sizeof(nxt_unit_response_t));
1596 
1597     resp->status = req->response->status;
1598     resp->content_length = req->response->content_length;
1599 
1600     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
1601     f = resp->fields;
1602 
1603     for (i = 0; i < req->response->fields_count; i++) {
1604         src = req->response->fields + i;
1605 
1606         if (nxt_slow_path(src->skip != 0)) {
1607             continue;
1608         }
1609 
1610         if (nxt_slow_path(src->name_length + src->value_length + 2
1611                           > (uint32_t) (buf->end - p)))
1612         {
1613             nxt_unit_req_warn(req, "realloc: not enough space for field"
1614                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
1615                   i, src, src->name_length, src->value_length);
1616 
1617             goto fail;
1618         }
1619 
1620         nxt_unit_sptr_set(&f->name, p);
1621         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
1622         *p++ = '\0';
1623 
1624         nxt_unit_sptr_set(&f->value, p);
1625         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
1626         *p++ = '\0';
1627 
1628         f->hash = src->hash;
1629         f->skip = 0;
1630         f->name_length = src->name_length;
1631         f->value_length = src->value_length;
1632 
1633         resp->fields_count++;
1634         f++;
1635     }
1636 
1637     if (req->response->piggyback_content_length > 0) {
1638         if (nxt_slow_path(req->response->piggyback_content_length
1639                           > (uint32_t) (buf->end - p)))
1640         {
1641             nxt_unit_req_warn(req, "realloc: not enought space for content"
1642                   " #%"PRIu32", %"PRIu32" required",
1643                   i, req->response->piggyback_content_length);
1644 
1645             goto fail;
1646         }
1647 
1648         resp->piggyback_content_length =
1649                                        req->response->piggyback_content_length;
1650 
1651         nxt_unit_sptr_set(&resp->piggyback_content, p);
1652         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
1653                        req->response->piggyback_content_length);
1654     }
1655 
1656     buf->free = p;
1657 
1658     nxt_unit_buf_free(req->response_buf);
1659 
1660     req->response = resp;
1661     req->response_buf = buf;
1662     req->response_max_fields = max_fields_count;
1663 
1664     return NXT_UNIT_OK;
1665 
1666 fail:
1667 
1668     nxt_unit_buf_free(buf);
1669 
1670     return NXT_UNIT_ERROR;
1671 }
1672 
1673 
1674 int
1675 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
1676 {
1677     nxt_unit_request_info_impl_t  *req_impl;
1678 
1679     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1680 
1681     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
1682 }
1683 
1684 
1685 int
1686 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
1687     const char *name, uint8_t name_length,
1688     const char *value, uint32_t value_length)
1689 {
1690     nxt_unit_buf_t                *buf;
1691     nxt_unit_field_t              *f;
1692     nxt_unit_response_t           *resp;
1693     nxt_unit_request_info_impl_t  *req_impl;
1694 
1695     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1696 
1697     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
1698         nxt_unit_req_warn(req, "add_field: response not initialized or "
1699                           "already sent");
1700 
1701         return NXT_UNIT_ERROR;
1702     }
1703 
1704     resp = req->response;
1705 
1706     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
1707         nxt_unit_req_warn(req, "add_field: too many response fields");
1708 
1709         return NXT_UNIT_ERROR;
1710     }
1711 
1712     buf = req->response_buf;
1713 
1714     if (nxt_slow_path(name_length + value_length + 2
1715                       > (uint32_t) (buf->end - buf->free)))
1716     {
1717         nxt_unit_req_warn(req, "add_field: response buffer overflow");
1718 
1719         return NXT_UNIT_ERROR;
1720     }
1721 
1722     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
1723                        resp->fields_count,
1724                        (int) name_length, name,
1725                        (int) value_length, value);
1726 
1727     f = resp->fields + resp->fields_count;
1728 
1729     nxt_unit_sptr_set(&f->name, buf->free);
1730     buf->free = nxt_cpymem(buf->free, name, name_length);
1731     *buf->free++ = '\0';
1732 
1733     nxt_unit_sptr_set(&f->value, buf->free);
1734     buf->free = nxt_cpymem(buf->free, value, value_length);
1735     *buf->free++ = '\0';
1736 
1737     f->hash = nxt_unit_field_hash(name, name_length);
1738     f->skip = 0;
1739     f->name_length = name_length;
1740     f->value_length = value_length;
1741 
1742     resp->fields_count++;
1743 
1744     return NXT_UNIT_OK;
1745 }
1746 
1747 
1748 int
1749 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
1750     const void* src, uint32_t size)
1751 {
1752     nxt_unit_buf_t                *buf;
1753     nxt_unit_response_t           *resp;
1754     nxt_unit_request_info_impl_t  *req_impl;
1755 
1756     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1757 
1758     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1759         nxt_unit_req_warn(req, "add_content: response not initialized yet");
1760 
1761         return NXT_UNIT_ERROR;
1762     }
1763 
1764     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1765         nxt_unit_req_warn(req, "add_content: response already sent");
1766 
1767         return NXT_UNIT_ERROR;
1768     }
1769 
1770     buf = req->response_buf;
1771 
1772     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
1773         nxt_unit_req_warn(req, "add_content: buffer overflow");
1774 
1775         return NXT_UNIT_ERROR;
1776     }
1777 
1778     resp = req->response;
1779 
1780     if (resp->piggyback_content_length == 0) {
1781         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
1782         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
1783     }
1784 
1785     resp->piggyback_content_length += size;
1786 
1787     buf->free = nxt_cpymem(buf->free, src, size);
1788 
1789     return NXT_UNIT_OK;
1790 }
1791 
1792 
1793 int
1794 nxt_unit_response_send(nxt_unit_request_info_t *req)
1795 {
1796     int                           rc;
1797     nxt_unit_mmap_buf_t           *mmap_buf;
1798     nxt_unit_request_info_impl_t  *req_impl;
1799 
1800     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1801 
1802     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1803         nxt_unit_req_warn(req, "send: response is not initialized yet");
1804 
1805         return NXT_UNIT_ERROR;
1806     }
1807 
1808     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1809         nxt_unit_req_warn(req, "send: response already sent");
1810 
1811         return NXT_UNIT_ERROR;
1812     }
1813 
1814     if (req->request->websocket_handshake && req->response->status == 101) {
1815         nxt_unit_response_upgrade(req);
1816     }
1817 
1818     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
1819                        req->response->fields_count,
1820                        (int) (req->response_buf->free
1821                               - req->response_buf->start));
1822 
1823     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
1824 
1825     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
1826     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
1827         req->response = NULL;
1828         req->response_buf = NULL;
1829         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
1830 
1831         nxt_unit_mmap_buf_free(mmap_buf);
1832     }
1833 
1834     return rc;
1835 }
1836 
1837 
1838 int
1839 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
1840 {
1841     nxt_unit_request_info_impl_t  *req_impl;
1842 
1843     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1844 
1845     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
1846 }
1847 
1848 
1849 nxt_unit_buf_t *
1850 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
1851 {
1852     int                           rc;
1853     nxt_unit_mmap_buf_t           *mmap_buf;
1854     nxt_unit_request_info_impl_t  *req_impl;
1855 
1856     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
1857         nxt_unit_req_warn(req, "response_buf_alloc: "
1858                           "requested buffer (%"PRIu32") too big", size);
1859 
1860         return NULL;
1861     }
1862 
1863     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
1864 
1865     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1866 
1867     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
1868     if (nxt_slow_path(mmap_buf == NULL)) {
1869         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
1870 
1871         return NULL;
1872     }
1873 
1874     mmap_buf->req = req;
1875 
1876     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
1877 
1878     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
1879                                    size, size, mmap_buf,
1880                                    NULL);
1881     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1882         nxt_unit_mmap_buf_release(mmap_buf);
1883 
1884         return NULL;
1885     }
1886 
1887     return &mmap_buf->buf;
1888 }
1889 
1890 
1891 static nxt_unit_process_t *
1892 nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1893 {
1894     nxt_unit_impl_t  *lib;
1895 
1896     if (recv_msg->process != NULL) {
1897         return recv_msg->process;
1898     }
1899 
1900     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1901 
1902     pthread_mutex_lock(&lib->mutex);
1903 
1904     recv_msg->process = nxt_unit_process_find(lib, recv_msg->pid, 0);
1905 
1906     pthread_mutex_unlock(&lib->mutex);
1907 
1908     if (recv_msg->process == NULL) {
1909         nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
1910                       recv_msg->stream, (int) recv_msg->pid);
1911     }
1912 
1913     return recv_msg->process;
1914 }
1915 
1916 
1917 static nxt_unit_mmap_buf_t *
1918 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
1919 {
1920     nxt_unit_mmap_buf_t  *mmap_buf;
1921     nxt_unit_ctx_impl_t  *ctx_impl;
1922 
1923     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1924 
1925     pthread_mutex_lock(&ctx_impl->mutex);
1926 
1927     if (ctx_impl->free_buf == NULL) {
1928         pthread_mutex_unlock(&ctx_impl->mutex);
1929 
1930         mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
1931         if (nxt_slow_path(mmap_buf == NULL)) {
1932             return NULL;
1933         }
1934 
1935     } else {
1936         mmap_buf = ctx_impl->free_buf;
1937 
1938         nxt_unit_mmap_buf_unlink(mmap_buf);
1939 
1940         pthread_mutex_unlock(&ctx_impl->mutex);
1941     }
1942 
1943     mmap_buf->ctx_impl = ctx_impl;
1944 
1945     mmap_buf->hdr = NULL;
1946     mmap_buf->free_ptr = NULL;
1947 
1948     return mmap_buf;
1949 }
1950 
1951 
1952 static void
1953 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
1954 {
1955     nxt_unit_mmap_buf_unlink(mmap_buf);
1956 
1957     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
1958 
1959     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
1960 
1961     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
1962 }
1963 
1964 
1965 int
1966 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
1967 {
1968     return req->request->websocket_handshake;
1969 }
1970 
1971 
1972 int
1973 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
1974 {
1975     int                           rc;
1976     nxt_unit_ctx_impl_t           *ctx_impl;
1977     nxt_unit_request_info_impl_t  *req_impl;
1978 
1979     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1980 
1981     if (nxt_slow_path(req_impl->websocket != 0)) {
1982         nxt_unit_req_debug(req, "upgrade: already upgraded");
1983 
1984         return NXT_UNIT_OK;
1985     }
1986 
1987     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1988         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
1989 
1990         return NXT_UNIT_ERROR;
1991     }
1992 
1993     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1994         nxt_unit_req_warn(req, "upgrade: response already sent");
1995 
1996         return NXT_UNIT_ERROR;
1997     }
1998 
1999     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
2000 
2001     rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
2002     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2003         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2004 
2005         return NXT_UNIT_ERROR;
2006     }
2007 
2008     req_impl->websocket = 1;
2009 
2010     req->response->status = 101;
2011 
2012     return NXT_UNIT_OK;
2013 }
2014 
2015 
2016 int
2017 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2018 {
2019     nxt_unit_request_info_impl_t  *req_impl;
2020 
2021     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2022 
2023     return req_impl->websocket;
2024 }
2025 
2026 
2027 nxt_unit_request_info_t *
2028 nxt_unit_get_request_info_from_data(void *data)
2029 {
2030     nxt_unit_request_info_impl_t  *req_impl;
2031 
2032     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2033 
2034     return &req_impl->req;
2035 }
2036 
2037 
2038 int
2039 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2040 {
2041     int                           rc;
2042     nxt_unit_mmap_buf_t           *mmap_buf;
2043     nxt_unit_request_info_t       *req;
2044     nxt_unit_request_info_impl_t  *req_impl;
2045 
2046     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2047 
2048     req = mmap_buf->req;
2049     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2050 
2051     nxt_unit_req_debug(req, "buf_send: %d bytes",
2052                        (int) (buf->free - buf->start));
2053 
2054     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2055         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2056 
2057         return NXT_UNIT_ERROR;
2058     }
2059 
2060     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2061         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2062 
2063         return NXT_UNIT_ERROR;
2064     }
2065 
2066     if (nxt_fast_path(buf->free > buf->start)) {
2067         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2068         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2069             return rc;
2070         }
2071     }
2072 
2073     nxt_unit_mmap_buf_free(mmap_buf);
2074 
2075     return NXT_UNIT_OK;
2076 }
2077 
2078 
2079 static void
2080 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2081 {
2082     int                      rc;
2083     nxt_unit_mmap_buf_t      *mmap_buf;
2084     nxt_unit_request_info_t  *req;
2085 
2086     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2087 
2088     req = mmap_buf->req;
2089 
2090     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2091     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2092         nxt_unit_mmap_buf_free(mmap_buf);
2093 
2094         nxt_unit_request_info_release(req);
2095 
2096     } else {
2097         nxt_unit_request_done(req, rc);
2098     }
2099 }
2100 
2101 
2102 static int
2103 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2104     nxt_unit_mmap_buf_t *mmap_buf, int last)
2105 {
2106     struct {
2107         nxt_port_msg_t       msg;
2108         nxt_port_mmap_msg_t  mmap_msg;
2109     } m;
2110 
2111     int                           rc;
2112     u_char                        *last_used, *first_free;
2113     ssize_t                       res;
2114     nxt_chunk_id_t                first_free_chunk;
2115     nxt_unit_buf_t                *buf;
2116     nxt_unit_impl_t               *lib;
2117     nxt_port_mmap_header_t        *hdr;
2118     nxt_unit_request_info_impl_t  *req_impl;
2119 
2120     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2121     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2122 
2123     buf = &mmap_buf->buf;
2124     hdr = mmap_buf->hdr;
2125 
2126     m.mmap_msg.size = buf->free - buf->start;
2127 
2128     m.msg.stream = req_impl->stream;
2129     m.msg.pid = lib->pid;
2130     m.msg.reply_port = 0;
2131     m.msg.type = _NXT_PORT_MSG_DATA;
2132     m.msg.last = last != 0;
2133     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2134     m.msg.nf = 0;
2135     m.msg.mf = 0;
2136     m.msg.tracking = 0;
2137 
2138     rc = NXT_UNIT_ERROR;
2139 
2140     if (m.msg.mmap) {
2141         m.mmap_msg.mmap_id = hdr->id;
2142         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2143                                                      (u_char *) buf->start);
2144 
2145         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2146                        req_impl->stream,
2147                        (int) m.mmap_msg.mmap_id,
2148                        (int) m.mmap_msg.chunk_id,
2149                        (int) m.mmap_msg.size);
2150 
2151         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2152                                  NULL, 0);
2153         if (nxt_slow_path(res != sizeof(m))) {
2154             goto free_buf;
2155         }
2156 
2157         last_used = (u_char *) buf->free - 1;
2158         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2159 
2160         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2161             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2162 
2163             buf->start = (char *) first_free;
2164             buf->free = buf->start;
2165 
2166             if (buf->end < buf->start) {
2167                 buf->end = buf->start;
2168             }
2169 
2170         } else {
2171             buf->start = NULL;
2172             buf->free = NULL;
2173             buf->end = NULL;
2174 
2175             mmap_buf->hdr = NULL;
2176         }
2177 
2178         nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks,
2179                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2180 
2181         nxt_unit_debug(req->ctx, "process %d allocated_chunks %d",
2182                        mmap_buf->process->pid,
2183                        (int) mmap_buf->process->outgoing.allocated_chunks);
2184 
2185     } else {
2186         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2187                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2188         {
2189             nxt_unit_alert(req->ctx,
2190                            "#%"PRIu32": failed to send plain memory buffer"
2191                            ": no space reserved for message header",
2192                            req_impl->stream);
2193 
2194             goto free_buf;
2195         }
2196 
2197         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2198 
2199         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2200                        req_impl->stream,
2201                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2202 
2203         res = nxt_unit_port_send(req->ctx, req->response_port,
2204                                  buf->start - sizeof(m.msg),
2205                                  m.mmap_msg.size + sizeof(m.msg),
2206                                  NULL, 0);
2207         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2208             goto free_buf;
2209         }
2210     }
2211 
2212     rc = NXT_UNIT_OK;
2213 
2214 free_buf:
2215 
2216     nxt_unit_free_outgoing_buf(mmap_buf);
2217 
2218     return rc;
2219 }
2220 
2221 
2222 void
2223 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2224 {
2225     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2226 }
2227 
2228 
2229 static void
2230 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2231 {
2232     nxt_unit_free_outgoing_buf(mmap_buf);
2233 
2234     nxt_unit_mmap_buf_release(mmap_buf);
2235 }
2236 
2237 
2238 static void
2239 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2240 {
2241     if (mmap_buf->hdr != NULL) {
2242         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2243                               mmap_buf->process,
2244                               mmap_buf->hdr, mmap_buf->buf.start,
2245                               mmap_buf->buf.end - mmap_buf->buf.start);
2246 
2247         mmap_buf->hdr = NULL;
2248 
2249         return;
2250     }
2251 
2252     if (mmap_buf->free_ptr != NULL) {
2253         free(mmap_buf->free_ptr);
2254 
2255         mmap_buf->free_ptr = NULL;
2256     }
2257 }
2258 
2259 
2260 static nxt_unit_read_buf_t *
2261 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2262 {
2263     nxt_unit_ctx_impl_t  *ctx_impl;
2264 
2265     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2266 
2267     pthread_mutex_lock(&ctx_impl->mutex);
2268 
2269     return nxt_unit_read_buf_get_impl(ctx_impl);
2270 }
2271 
2272 
2273 static nxt_unit_read_buf_t *
2274 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2275 {
2276     nxt_unit_read_buf_t  *rbuf;
2277 
2278     if (ctx_impl->free_read_buf != NULL) {
2279         rbuf = ctx_impl->free_read_buf;
2280         ctx_impl->free_read_buf = rbuf->next;
2281 
2282         pthread_mutex_unlock(&ctx_impl->mutex);
2283 
2284         return rbuf;
2285     }
2286 
2287     pthread_mutex_unlock(&ctx_impl->mutex);
2288 
2289     rbuf = malloc(sizeof(nxt_unit_read_buf_t));
2290 
2291     return rbuf;
2292 }
2293 
2294 
2295 static void
2296 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2297     nxt_unit_read_buf_t *rbuf)
2298 {
2299     nxt_unit_ctx_impl_t  *ctx_impl;
2300 
2301     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2302 
2303     pthread_mutex_lock(&ctx_impl->mutex);
2304 
2305     rbuf->next = ctx_impl->free_read_buf;
2306     ctx_impl->free_read_buf = rbuf;
2307 
2308     pthread_mutex_unlock(&ctx_impl->mutex);
2309 }
2310 
2311 
2312 nxt_unit_buf_t *
2313 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2314 {
2315     nxt_unit_mmap_buf_t  *mmap_buf;
2316 
2317     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2318 
2319     if (mmap_buf->next == NULL) {
2320         return NULL;
2321     }
2322 
2323     return &mmap_buf->next->buf;
2324 }
2325 
2326 
2327 uint32_t
2328 nxt_unit_buf_max(void)
2329 {
2330     return PORT_MMAP_DATA_SIZE;
2331 }
2332 
2333 
2334 uint32_t
2335 nxt_unit_buf_min(void)
2336 {
2337     return PORT_MMAP_CHUNK_SIZE;
2338 }
2339 
2340 
2341 int
2342 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2343     size_t size)
2344 {
2345     ssize_t  res;
2346 
2347     res = nxt_unit_response_write_nb(req, start, size, size);
2348 
2349     return res < 0 ? -res : NXT_UNIT_OK;
2350 }
2351 
2352 
2353 ssize_t
2354 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2355     size_t size, size_t min_size)
2356 {
2357     int                           rc;
2358     ssize_t                       sent;
2359     uint32_t                      part_size, min_part_size, buf_size;
2360     const char                    *part_start;
2361     nxt_unit_mmap_buf_t           mmap_buf;
2362     nxt_unit_request_info_impl_t  *req_impl;
2363     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2364 
2365     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2366 
2367     part_start = start;
2368     sent = 0;
2369 
2370     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2371         nxt_unit_req_alert(req, "write: response not initialized yet");
2372 
2373         return -NXT_UNIT_ERROR;
2374     }
2375 
2376     /* Check if response is not send yet. */
2377     if (nxt_slow_path(req->response_buf != NULL)) {
2378         part_size = req->response_buf->end - req->response_buf->free;
2379         part_size = nxt_min(size, part_size);
2380 
2381         rc = nxt_unit_response_add_content(req, part_start, part_size);
2382         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2383             return -rc;
2384         }
2385 
2386         rc = nxt_unit_response_send(req);
2387         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2388             return -rc;
2389         }
2390 
2391         size -= part_size;
2392         part_start += part_size;
2393         sent += part_size;
2394 
2395         min_size -= nxt_min(min_size, part_size);
2396     }
2397 
2398     while (size > 0) {
2399         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2400         min_part_size = nxt_min(min_size, part_size);
2401         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2402 
2403         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2404                                        min_part_size, &mmap_buf, local_buf);
2405         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2406             return -rc;
2407         }
2408 
2409         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2410         if (nxt_slow_path(buf_size == 0)) {
2411             return sent;
2412         }
2413         part_size = nxt_min(buf_size, part_size);
2414 
2415         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2416                                        part_start, part_size);
2417 
2418         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2419         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2420             return -rc;
2421         }
2422 
2423         size -= part_size;
2424         part_start += part_size;
2425         sent += part_size;
2426 
2427         min_size -= nxt_min(min_size, part_size);
2428     }
2429 
2430     return sent;
2431 }
2432 
2433 
2434 int
2435 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2436     nxt_unit_read_info_t *read_info)
2437 {
2438     int                           rc;
2439     ssize_t                       n;
2440     uint32_t                      buf_size;
2441     nxt_unit_buf_t                *buf;
2442     nxt_unit_mmap_buf_t           mmap_buf;
2443     nxt_unit_request_info_impl_t  *req_impl;
2444     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2445 
2446     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2447 
2448     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2449         nxt_unit_req_alert(req, "write: response not initialized yet");
2450 
2451         return NXT_UNIT_ERROR;
2452     }
2453 
2454     /* Check if response is not send yet. */
2455     if (nxt_slow_path(req->response_buf != NULL)) {
2456 
2457         /* Enable content in headers buf. */
2458         rc = nxt_unit_response_add_content(req, "", 0);
2459         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2460             nxt_unit_req_error(req, "Failed to add piggyback content");
2461 
2462             return rc;
2463         }
2464 
2465         buf = req->response_buf;
2466 
2467         while (buf->end - buf->free > 0) {
2468             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2469             if (nxt_slow_path(n < 0)) {
2470                 nxt_unit_req_error(req, "Read error");
2471 
2472                 return NXT_UNIT_ERROR;
2473             }
2474 
2475             /* Manually increase sizes. */
2476             buf->free += n;
2477             req->response->piggyback_content_length += n;
2478 
2479             if (read_info->eof) {
2480                 break;
2481             }
2482         }
2483 
2484         rc = nxt_unit_response_send(req);
2485         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2486             nxt_unit_req_error(req, "Failed to send headers with content");
2487 
2488             return rc;
2489         }
2490 
2491         if (read_info->eof) {
2492             return NXT_UNIT_OK;
2493         }
2494     }
2495 
2496     while (!read_info->eof) {
2497         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
2498                            read_info->buf_size);
2499 
2500         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
2501 
2502         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2503                                        buf_size, buf_size,
2504                                        &mmap_buf, local_buf);
2505         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2506             return rc;
2507         }
2508 
2509         buf = &mmap_buf.buf;
2510 
2511         while (!read_info->eof && buf->end > buf->free) {
2512             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2513             if (nxt_slow_path(n < 0)) {
2514                 nxt_unit_req_error(req, "Read error");
2515 
2516                 nxt_unit_free_outgoing_buf(&mmap_buf);
2517 
2518                 return NXT_UNIT_ERROR;
2519             }
2520 
2521             buf->free += n;
2522         }
2523 
2524         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2525         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2526             nxt_unit_req_error(req, "Failed to send content");
2527 
2528             return rc;
2529         }
2530     }
2531 
2532     return NXT_UNIT_OK;
2533 }
2534 
2535 
2536 ssize_t
2537 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
2538 {
2539     ssize_t  buf_res, res;
2540 
2541     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
2542                                 dst, size);
2543 
2544     if (buf_res < (ssize_t) size && req->content_fd != -1) {
2545         res = read(req->content_fd, dst, size);
2546         if (res < 0) {
2547             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
2548                                strerror(errno), errno);
2549 
2550             return res;
2551         }
2552 
2553         if (res < (ssize_t) size) {
2554             close(req->content_fd);
2555 
2556             req->content_fd = -1;
2557         }
2558 
2559         req->content_length -= res;
2560         size -= res;
2561 
2562         dst = nxt_pointer_to(dst, res);
2563 
2564     } else {
2565         res = 0;
2566     }
2567 
2568     return buf_res + res;
2569 }
2570 
2571 
2572 ssize_t
2573 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
2574 {
2575     char                 *p;
2576     size_t               l_size, b_size;
2577     nxt_unit_buf_t       *b;
2578     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
2579 
2580     if (req->content_length == 0) {
2581         return 0;
2582     }
2583 
2584     l_size = 0;
2585 
2586     b = req->content_buf;
2587 
2588     while (b != NULL) {
2589         b_size = b->end - b->free;
2590         p = memchr(b->free, '\n', b_size);
2591 
2592         if (p != NULL) {
2593             p++;
2594             l_size += p - b->free;
2595             break;
2596         }
2597 
2598         l_size += b_size;
2599 
2600         if (max_size <= l_size) {
2601             break;
2602         }
2603 
2604         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
2605         if (mmap_buf->next == NULL
2606             && req->content_fd != -1
2607             && l_size < req->content_length)
2608         {
2609             preread_buf = nxt_unit_request_preread(req, 16384);
2610             if (nxt_slow_path(preread_buf == NULL)) {
2611                 return -1;
2612             }
2613 
2614             nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
2615         }
2616 
2617         b = nxt_unit_buf_next(b);
2618     }
2619 
2620     return nxt_min(max_size, l_size);
2621 }
2622 
2623 
2624 static nxt_unit_mmap_buf_t *
2625 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
2626 {
2627     ssize_t              res;
2628     nxt_unit_mmap_buf_t  *mmap_buf;
2629 
2630     if (req->content_fd == -1) {
2631         nxt_unit_req_alert(req, "preread: content_fd == -1");
2632         return NULL;
2633     }
2634 
2635     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2636     if (nxt_slow_path(mmap_buf == NULL)) {
2637         nxt_unit_req_alert(req, "preread: failed to allocate buf");
2638         return NULL;
2639     }
2640 
2641     mmap_buf->free_ptr = malloc(size);
2642     if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
2643         nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
2644         nxt_unit_mmap_buf_release(mmap_buf);
2645         return NULL;
2646     }
2647 
2648     mmap_buf->plain_ptr = mmap_buf->free_ptr;
2649 
2650     mmap_buf->hdr = NULL;
2651     mmap_buf->buf.start = mmap_buf->free_ptr;
2652     mmap_buf->buf.free = mmap_buf->buf.start;
2653     mmap_buf->buf.end = mmap_buf->buf.start + size;
2654     mmap_buf->process = NULL;
2655 
2656     res = read(req->content_fd, mmap_buf->free_ptr, size);
2657     if (res < 0) {
2658         nxt_unit_req_alert(req, "failed to read content: %s (%d)",
2659                            strerror(errno), errno);
2660 
2661         nxt_unit_mmap_buf_free(mmap_buf);
2662 
2663         return NULL;
2664     }
2665 
2666     if (res < (ssize_t) size) {
2667         close(req->content_fd);
2668 
2669         req->content_fd = -1;
2670     }
2671 
2672     nxt_unit_req_debug(req, "preread: read %d", (int) res);
2673 
2674     mmap_buf->buf.end = mmap_buf->buf.free + res;
2675 
2676     return mmap_buf;
2677 }
2678 
2679 
2680 static ssize_t
2681 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
2682 {
2683     u_char          *p;
2684     size_t          rest, copy, read;
2685     nxt_unit_buf_t  *buf, *last_buf;
2686 
2687     p = dst;
2688     rest = size;
2689 
2690     buf = *b;
2691     last_buf = buf;
2692 
2693     while (buf != NULL) {
2694         last_buf = buf;
2695 
2696         copy = buf->end - buf->free;
2697         copy = nxt_min(rest, copy);
2698 
2699         p = nxt_cpymem(p, buf->free, copy);
2700 
2701         buf->free += copy;
2702         rest -= copy;
2703 
2704         if (rest == 0) {
2705             if (buf->end == buf->free) {
2706                 buf = nxt_unit_buf_next(buf);
2707             }
2708 
2709             break;
2710         }
2711 
2712         buf = nxt_unit_buf_next(buf);
2713     }
2714 
2715     *b = last_buf;
2716 
2717     read = size - rest;
2718 
2719     *len -= read;
2720 
2721     return read;
2722 }
2723 
2724 
2725 void
2726 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
2727 {
2728     uint32_t                      size;
2729     nxt_port_msg_t                msg;
2730     nxt_unit_impl_t               *lib;
2731     nxt_unit_request_info_impl_t  *req_impl;
2732 
2733     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2734 
2735     nxt_unit_req_debug(req, "done: %d", rc);
2736 
2737     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2738         goto skip_response_send;
2739     }
2740 
2741     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2742 
2743         size = nxt_length("Content-Type") + nxt_length("text/plain");
2744 
2745         rc = nxt_unit_response_init(req, 200, 1, size);
2746         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2747             goto skip_response_send;
2748         }
2749 
2750         rc = nxt_unit_response_add_field(req, "Content-Type",
2751                                    nxt_length("Content-Type"),
2752                                    "text/plain", nxt_length("text/plain"));
2753         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2754             goto skip_response_send;
2755         }
2756     }
2757 
2758     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2759 
2760         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2761 
2762         nxt_unit_buf_send_done(req->response_buf);
2763 
2764         return;
2765     }
2766 
2767 skip_response_send:
2768 
2769     lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
2770 
2771     msg.stream = req_impl->stream;
2772     msg.pid = lib->pid;
2773     msg.reply_port = 0;
2774     msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
2775                                    : _NXT_PORT_MSG_RPC_ERROR;
2776     msg.last = 1;
2777     msg.mmap = 0;
2778     msg.nf = 0;
2779     msg.mf = 0;
2780     msg.tracking = 0;
2781 
2782     (void) nxt_unit_port_send(req->ctx, req->response_port,
2783                               &msg, sizeof(msg), NULL, 0);
2784 
2785     nxt_unit_request_info_release(req);
2786 }
2787 
2788 
2789 int
2790 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
2791     uint8_t last, const void *start, size_t size)
2792 {
2793     const struct iovec  iov = { (void *) start, size };
2794 
2795     return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
2796 }
2797 
2798 
2799 int
2800 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
2801     uint8_t last, const struct iovec *iov, int iovcnt)
2802 {
2803     int                     i, rc;
2804     size_t                  l, copy;
2805     uint32_t                payload_len, buf_size, alloc_size;
2806     const uint8_t           *b;
2807     nxt_unit_buf_t          *buf;
2808     nxt_unit_mmap_buf_t     mmap_buf;
2809     nxt_websocket_header_t  *wh;
2810     char                    local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2811 
2812     payload_len = 0;
2813 
2814     for (i = 0; i < iovcnt; i++) {
2815         payload_len += iov[i].iov_len;
2816     }
2817 
2818     buf_size = 10 + payload_len;
2819     alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
2820 
2821     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2822                                    alloc_size, alloc_size,
2823                                    &mmap_buf, local_buf);
2824     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2825         return rc;
2826     }
2827 
2828     buf = &mmap_buf.buf;
2829 
2830     buf->start[0] = 0;
2831     buf->start[1] = 0;
2832 
2833     buf_size -= buf->end - buf->start;
2834 
2835     wh = (void *) buf->free;
2836 
2837     buf->free = nxt_websocket_frame_init(wh, payload_len);
2838     wh->fin = last;
2839     wh->opcode = opcode;
2840 
2841     for (i = 0; i < iovcnt; i++) {
2842         b = iov[i].iov_base;
2843         l = iov[i].iov_len;
2844 
2845         while (l > 0) {
2846             copy = buf->end - buf->free;
2847             copy = nxt_min(l, copy);
2848 
2849             buf->free = nxt_cpymem(buf->free, b, copy);
2850             b += copy;
2851             l -= copy;
2852 
2853             if (l > 0) {
2854                 if (nxt_fast_path(buf->free > buf->start)) {
2855                     rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2856 
2857                     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2858                         return rc;
2859                     }
2860                 }
2861 
2862                 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
2863 
2864                 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2865                                                alloc_size, alloc_size,
2866                                                &mmap_buf, local_buf);
2867                 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2868                     return rc;
2869                 }
2870 
2871                 buf_size -= buf->end - buf->start;
2872             }
2873         }
2874     }
2875 
2876     if (buf->free > buf->start) {
2877         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2878     }
2879 
2880     return rc;
2881 }
2882 
2883 
2884 ssize_t
2885 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
2886     size_t size)
2887 {
2888     ssize_t   res;
2889     uint8_t   *b;
2890     uint64_t  i, d;
2891 
2892     res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
2893                             dst, size);
2894 
2895     if (ws->mask == NULL) {
2896         return res;
2897     }
2898 
2899     b = dst;
2900     d = (ws->payload_len - ws->content_length - res) % 4;
2901 
2902     for (i = 0; i < (uint64_t) res; i++) {
2903         b[i] ^= ws->mask[ (i + d) % 4 ];
2904     }
2905 
2906     return res;
2907 }
2908 
2909 
2910 int
2911 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
2912 {
2913     char                             *b;
2914     size_t                           size;
2915     nxt_unit_websocket_frame_impl_t  *ws_impl;
2916 
2917     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
2918 
2919     if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
2920         return NXT_UNIT_OK;
2921     }
2922 
2923     size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
2924 
2925     b = malloc(size);
2926     if (nxt_slow_path(b == NULL)) {
2927         return NXT_UNIT_ERROR;
2928     }
2929 
2930     memcpy(b, ws_impl->buf->buf.start, size);
2931 
2932     ws_impl->buf->buf.start = b;
2933     ws_impl->buf->buf.free = b;
2934     ws_impl->buf->buf.end = b + size;
2935 
2936     ws_impl->buf->free_ptr = b;
2937 
2938     return NXT_UNIT_OK;
2939 }
2940 
2941 
2942 void
2943 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
2944 {
2945     nxt_unit_websocket_frame_release(ws);
2946 }
2947 
2948 
2949 static nxt_port_mmap_header_t *
2950 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
2951     nxt_chunk_id_t *c, int *n, int min_n)
2952 {
2953     int                     res, nchunks, i;
2954     uint32_t                outgoing_size;
2955     nxt_unit_mmap_t         *mm, *mm_end;
2956     nxt_unit_impl_t         *lib;
2957     nxt_unit_process_t      *process;
2958     nxt_port_mmap_header_t  *hdr;
2959 
2960     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2961 
2962     process = nxt_unit_port_process(port);
2963     if (nxt_slow_path(process == NULL)) {
2964         nxt_unit_alert(ctx, "mmap_get: port %d,%d already closed",
2965                        (int) port->id.pid, (int) port->id.id);
2966 
2967         return NULL;
2968     }
2969 
2970     pthread_mutex_lock(&process->outgoing.mutex);
2971 
2972 retry:
2973 
2974     outgoing_size = process->outgoing.size;
2975 
2976     mm_end = process->outgoing.elts + outgoing_size;
2977 
2978     for (mm = process->outgoing.elts; mm < mm_end; mm++) {
2979         hdr = mm->hdr;
2980 
2981         if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id.id) {
2982             continue;
2983         }
2984 
2985         *c = 0;
2986 
2987         while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
2988             nchunks = 1;
2989 
2990             while (nchunks < *n) {
2991                 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
2992                                                        *c + nchunks);
2993 
2994                 if (res == 0) {
2995                     if (nchunks >= min_n) {
2996                         *n = nchunks;
2997 
2998                         goto unlock;
2999                     }
3000 
3001                     for (i = 0; i < nchunks; i++) {
3002                         nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
3003                     }
3004 
3005                     *c += nchunks + 1;
3006                     nchunks = 0;
3007                     break;
3008                 }
3009 
3010                 nchunks++;
3011             }
3012 
3013             if (nchunks >= min_n) {
3014                 *n = nchunks;
3015 
3016                 goto unlock;
3017             }
3018         }
3019 
3020         hdr->oosm = 1;
3021     }
3022 
3023     if (outgoing_size >= lib->shm_mmap_limit) {
3024         /* Cannot allocate more shared memory. */
3025         pthread_mutex_unlock(&process->outgoing.mutex);
3026 
3027         if (min_n == 0) {
3028             *n = 0;
3029         }
3030 
3031         if (nxt_slow_path(process->outgoing.allocated_chunks + min_n
3032                           >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
3033         {
3034             /* Memory allocated by application, but not send to router. */
3035             return NULL;
3036         }
3037 
3038         /* Notify router about OOSM condition. */
3039 
3040         res = nxt_unit_send_oosm(ctx, port);
3041         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3042             return NULL;
3043         }
3044 
3045         /* Return if caller can handle OOSM condition. Non-blocking mode. */
3046 
3047         if (min_n == 0) {
3048             return NULL;
3049         }
3050 
3051         nxt_unit_debug(ctx, "oosm: waiting for ACK");
3052 
3053         res = nxt_unit_wait_shm_ack(ctx);
3054         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3055             return NULL;
3056         }
3057 
3058         nxt_unit_debug(ctx, "oosm: retry");
3059 
3060         pthread_mutex_lock(&process->outgoing.mutex);
3061 
3062         goto retry;
3063     }
3064 
3065     *c = 0;
3066     hdr = nxt_unit_new_mmap(ctx, port, *n);
3067 
3068 unlock:
3069 
3070     nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n);
3071 
3072     nxt_unit_debug(ctx, "process %d allocated_chunks %d",
3073                    process->pid,
3074                    (int) process->outgoing.allocated_chunks);
3075 
3076     pthread_mutex_unlock(&process->outgoing.mutex);
3077 
3078     return hdr;
3079 }
3080 
3081 
3082 static int
3083 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
3084 {
3085     ssize_t          res;
3086     nxt_port_msg_t   msg;
3087     nxt_unit_impl_t  *lib;
3088 
3089     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3090 
3091     msg.stream = 0;
3092     msg.pid = lib->pid;
3093     msg.reply_port = 0;
3094     msg.type = _NXT_PORT_MSG_OOSM;
3095     msg.last = 0;
3096     msg.mmap = 0;
3097     msg.nf = 0;
3098     msg.mf = 0;
3099     msg.tracking = 0;
3100 
3101     res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
3102     if (nxt_slow_path(res != sizeof(msg))) {
3103         return NXT_UNIT_ERROR;
3104     }
3105 
3106     return NXT_UNIT_OK;
3107 }
3108 
3109 
3110 static int
3111 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
3112 {
3113     nxt_port_msg_t       *port_msg;
3114     nxt_unit_ctx_impl_t  *ctx_impl;
3115     nxt_unit_read_buf_t  *rbuf;
3116 
3117     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3118 
3119     while (1) {
3120         rbuf = nxt_unit_read_buf_get(ctx);
3121         if (nxt_slow_path(rbuf == NULL)) {
3122             return NXT_UNIT_ERROR;
3123         }
3124 
3125         nxt_unit_read_buf(ctx, rbuf);
3126 
3127         if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
3128             nxt_unit_read_buf_release(ctx, rbuf);
3129 
3130             return NXT_UNIT_ERROR;
3131         }
3132 
3133         port_msg = (nxt_port_msg_t *) rbuf->buf;
3134 
3135         if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
3136             nxt_unit_read_buf_release(ctx, rbuf);
3137 
3138             break;
3139         }
3140 
3141         pthread_mutex_lock(&ctx_impl->mutex);
3142 
3143         *ctx_impl->pending_read_tail = rbuf;
3144         ctx_impl->pending_read_tail = &rbuf->next;
3145         rbuf->next = NULL;
3146 
3147         pthread_mutex_unlock(&ctx_impl->mutex);
3148 
3149         if (port_msg->type == _NXT_PORT_MSG_QUIT) {
3150             nxt_unit_debug(ctx, "oosm: quit received");
3151 
3152             return NXT_UNIT_ERROR;
3153         }
3154     }
3155 
3156     return NXT_UNIT_OK;
3157 }
3158 
3159 
3160 static nxt_unit_mmap_t *
3161 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
3162 {
3163     uint32_t  cap;
3164 
3165     cap = mmaps->cap;
3166 
3167     if (cap == 0) {
3168         cap = i + 1;
3169     }
3170 
3171     while (i + 1 > cap) {
3172 
3173         if (cap < 16) {
3174             cap = cap * 2;
3175 
3176         } else {
3177             cap = cap + cap / 2;
3178         }
3179     }
3180 
3181     if (cap != mmaps->cap) {
3182 
3183         mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
3184         if (nxt_slow_path(mmaps->elts == NULL)) {
3185             return NULL;
3186         }
3187 
3188         memset(mmaps->elts + mmaps->cap, 0,
3189                sizeof(*mmaps->elts) * (cap - mmaps->cap));
3190 
3191         mmaps->cap = cap;
3192     }
3193 
3194     if (i + 1 > mmaps->size) {
3195         mmaps->size = i + 1;
3196     }
3197 
3198     return mmaps->elts + i;
3199 }
3200 
3201 
3202 static nxt_port_mmap_header_t *
3203 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
3204 {
3205     int                     i, fd, rc;
3206     void                    *mem;
3207     char                    name[64];
3208     nxt_unit_mmap_t         *mm;
3209     nxt_unit_impl_t         *lib;
3210     nxt_unit_process_t      *process;
3211     nxt_port_mmap_header_t  *hdr;
3212 
3213     process = nxt_unit_port_process(port);
3214     if (nxt_slow_path(process == NULL)) {
3215         nxt_unit_alert(ctx, "new_mmap: port %d,%d already closed",
3216                        (int) port->id.pid, (int) port->id.id);
3217 
3218         return NULL;
3219     }
3220 
3221     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3222 
3223     mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
3224     if (nxt_slow_path(mm == NULL)) {
3225         nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
3226 
3227         return NULL;
3228     }
3229 
3230     snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
3231              lib->pid, (void *) pthread_self());
3232 
3233 #if (NXT_HAVE_MEMFD_CREATE)
3234 
3235     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
3236     if (nxt_slow_path(fd == -1)) {
3237         nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
3238                        strerror(errno), errno);
3239 
3240         goto remove_fail;
3241     }
3242 
3243     nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
3244 
3245 #elif (NXT_HAVE_SHM_OPEN_ANON)
3246 
3247     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
3248     if (nxt_slow_path(fd == -1)) {
3249         nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
3250                        strerror(errno), errno);
3251 
3252         goto remove_fail;
3253     }
3254 
3255 #elif (NXT_HAVE_SHM_OPEN)
3256 
3257     /* Just in case. */
3258     shm_unlink(name);
3259 
3260     fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
3261     if (nxt_slow_path(fd == -1)) {
3262         nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
3263                        strerror(errno), errno);
3264 
3265         goto remove_fail;
3266     }
3267 
3268     if (nxt_slow_path(shm_unlink(name) == -1)) {
3269         nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
3270                       strerror(errno), errno);
3271     }
3272 
3273 #else
3274 
3275 #error No working shared memory implementation.
3276 
3277 #endif
3278 
3279     if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
3280         nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3281                        strerror(errno), errno);
3282 
3283         goto remove_fail;
3284     }
3285 
3286     mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3287     if (nxt_slow_path(mem == MAP_FAILED)) {
3288         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3289                        strerror(errno), errno);
3290 
3291         goto remove_fail;
3292     }
3293 
3294     mm->hdr = mem;
3295     hdr = mem;
3296 
3297     memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
3298     memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
3299 
3300     hdr->id = process->outgoing.size - 1;
3301     hdr->src_pid = lib->pid;
3302     hdr->dst_pid = process->pid;
3303     hdr->sent_over = port->id.id;
3304 
3305     /* Mark first n chunk(s) as busy */
3306     for (i = 0; i < n; i++) {
3307         nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
3308     }
3309 
3310     /* Mark as busy chunk followed the last available chunk. */
3311     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
3312     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
3313 
3314     pthread_mutex_unlock(&process->outgoing.mutex);
3315 
3316     rc = nxt_unit_send_mmap(ctx, port, fd);
3317     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3318         munmap(mem, PORT_MMAP_SIZE);
3319         hdr = NULL;
3320 
3321     } else {
3322         nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
3323                        hdr->id, (int) lib->pid, (int) process->pid);
3324     }
3325 
3326     close(fd);
3327 
3328     pthread_mutex_lock(&process->outgoing.mutex);
3329 
3330     if (nxt_fast_path(hdr != NULL)) {
3331         return hdr;
3332     }
3333 
3334 remove_fail:
3335 
3336     process->outgoing.size--;
3337 
3338     return NULL;
3339 }
3340 
3341 
3342 static int
3343 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
3344 {
3345     ssize_t          res;
3346     nxt_port_msg_t   msg;
3347     nxt_unit_impl_t  *lib;
3348     union {
3349         struct cmsghdr  cm;
3350         char            space[CMSG_SPACE(sizeof(int))];
3351     } cmsg;
3352 
3353     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3354 
3355     msg.stream = 0;
3356     msg.pid = lib->pid;
3357     msg.reply_port = 0;
3358     msg.type = _NXT_PORT_MSG_MMAP;
3359     msg.last = 0;
3360     msg.mmap = 0;
3361     msg.nf = 0;
3362     msg.mf = 0;
3363     msg.tracking = 0;
3364 
3365     /*
3366      * Fill all padding fields with 0.
3367      * Code in Go 1.11 validate cmsghdr using padding field as part of len.
3368      * See Cmsghdr definition and socketControlMessageHeaderAndData function.
3369      */
3370     memset(&cmsg, 0, sizeof(cmsg));
3371 
3372     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
3373     cmsg.cm.cmsg_level = SOL_SOCKET;
3374     cmsg.cm.cmsg_type = SCM_RIGHTS;
3375 
3376     /*
3377      * memcpy() is used instead of simple
3378      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
3379      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
3380      *   dereferencing type-punned pointer will break strict-aliasing rules
3381      *
3382      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
3383      * in the same simple assignment as in the code above.
3384      */
3385     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
3386 
3387     res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg),
3388                              &cmsg, sizeof(cmsg));
3389     if (nxt_slow_path(res != sizeof(msg))) {
3390         return NXT_UNIT_ERROR;
3391     }
3392 
3393     return NXT_UNIT_OK;
3394 }
3395 
3396 
3397 static int
3398 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3399     uint32_t size, uint32_t min_size,
3400     nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
3401 {
3402     int                     nchunks, min_nchunks;
3403     nxt_chunk_id_t          c;
3404     nxt_port_mmap_header_t  *hdr;
3405 
3406     if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
3407         if (local_buf != NULL) {
3408             mmap_buf->free_ptr = NULL;
3409             mmap_buf->plain_ptr = local_buf;
3410 
3411         } else {
3412             mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t));
3413             if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3414                 return NXT_UNIT_ERROR;
3415             }
3416 
3417             mmap_buf->plain_ptr = mmap_buf->free_ptr;
3418         }
3419 
3420         mmap_buf->hdr = NULL;
3421         mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
3422         mmap_buf->buf.free = mmap_buf->buf.start;
3423         mmap_buf->buf.end = mmap_buf->buf.start + size;
3424         mmap_buf->process = nxt_unit_port_process(port);
3425 
3426         nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
3427                        mmap_buf->buf.start, (int) size);
3428 
3429         return NXT_UNIT_OK;
3430     }
3431 
3432     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3433     min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3434 
3435     hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks);
3436     if (nxt_slow_path(hdr == NULL)) {
3437         if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
3438             mmap_buf->hdr = NULL;
3439             mmap_buf->buf.start = NULL;
3440             mmap_buf->buf.free = NULL;
3441             mmap_buf->buf.end = NULL;
3442             mmap_buf->free_ptr = NULL;
3443 
3444             return NXT_UNIT_OK;
3445         }
3446 
3447         return NXT_UNIT_ERROR;
3448     }
3449 
3450     mmap_buf->hdr = hdr;
3451     mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
3452     mmap_buf->buf.free = mmap_buf->buf.start;
3453     mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
3454     mmap_buf->process = nxt_unit_port_process(port);
3455     mmap_buf->free_ptr = NULL;
3456     mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3457 
3458     nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
3459                   (int) hdr->id, (int) c,
3460                   (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
3461 
3462     return NXT_UNIT_OK;
3463 }
3464 
3465 
3466 static int
3467 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
3468 {
3469     int                      rc;
3470     void                     *mem;
3471     struct stat              mmap_stat;
3472     nxt_unit_mmap_t          *mm;
3473     nxt_unit_impl_t          *lib;
3474     nxt_unit_process_t       *process;
3475     nxt_port_mmap_header_t   *hdr;
3476 
3477     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3478 
3479     nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
3480 
3481     pthread_mutex_lock(&lib->mutex);
3482 
3483     process = nxt_unit_process_find(lib, pid, 0);
3484 
3485     pthread_mutex_unlock(&lib->mutex);
3486 
3487     if (nxt_slow_path(process == NULL)) {
3488         nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d",
3489                       (int) pid, fd);
3490 
3491         return NXT_UNIT_ERROR;
3492     }
3493 
3494     rc = NXT_UNIT_ERROR;
3495 
3496     if (fstat(fd, &mmap_stat) == -1) {
3497         nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
3498                       strerror(errno), errno);
3499 
3500         goto fail;
3501     }
3502 
3503     mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
3504                MAP_SHARED, fd, 0);
3505     if (nxt_slow_path(mem == MAP_FAILED)) {
3506         nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
3507                       strerror(errno), errno);
3508 
3509         goto fail;
3510     }
3511 
3512     hdr = mem;
3513 
3514     if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {
3515 
3516         nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
3517                       "detected: %d != %d or %d != %d", (int) hdr->src_pid,
3518                       (int) pid, (int) hdr->dst_pid, (int) lib->pid);
3519 
3520         munmap(mem, PORT_MMAP_SIZE);
3521 
3522         goto fail;
3523     }
3524 
3525     pthread_mutex_lock(&process->incoming.mutex);
3526 
3527     mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
3528     if (nxt_slow_path(mm == NULL)) {
3529         nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");
3530 
3531         munmap(mem, PORT_MMAP_SIZE);
3532 
3533     } else {
3534         mm->hdr = hdr;
3535 
3536         hdr->sent_over = 0xFFFFu;
3537 
3538         rc = NXT_UNIT_OK;
3539     }
3540 
3541     pthread_mutex_unlock(&process->incoming.mutex);
3542 
3543 fail:
3544 
3545     nxt_unit_process_release(process);
3546 
3547     return rc;
3548 }
3549 
3550 
3551 static void
3552 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
3553 {
3554     pthread_mutex_init(&mmaps->mutex, NULL);
3555 
3556     mmaps->size = 0;
3557     mmaps->cap = 0;
3558     mmaps->elts = NULL;
3559     mmaps->allocated_chunks = 0;
3560 }
3561 
3562 
3563 nxt_inline void
3564 nxt_unit_process_use(nxt_unit_process_t *process)
3565 {
3566     nxt_atomic_fetch_add(&process->use_count, 1);
3567 }
3568 
3569 
3570 nxt_inline void
3571 nxt_unit_process_release(nxt_unit_process_t *process)
3572 {
3573     long c;
3574 
3575     c = nxt_atomic_fetch_add(&process->use_count, -1);
3576 
3577     if (c == 1) {
3578         nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
3579 
3580         nxt_unit_mmaps_destroy(&process->incoming);
3581         nxt_unit_mmaps_destroy(&process->outgoing);
3582 
3583         free(process);
3584     }
3585 }
3586 
3587 
3588 static void
3589 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
3590 {
3591     nxt_unit_mmap_t  *mm, *end;
3592 
3593     if (mmaps->elts != NULL) {
3594         end = mmaps->elts + mmaps->size;
3595 
3596         for (mm = mmaps->elts; mm < end; mm++) {
3597             munmap(mm->hdr, PORT_MMAP_SIZE);
3598         }
3599 
3600         free(mmaps->elts);
3601     }
3602 
3603     pthread_mutex_destroy(&mmaps->mutex);
3604 }
3605 
3606 
3607 static nxt_port_mmap_header_t *
3608 nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
3609     uint32_t id)
3610 {
3611     nxt_port_mmap_header_t  *hdr;
3612 
3613     if (nxt_fast_path(process->incoming.size > id)) {
3614         hdr = process->incoming.elts[id].hdr;
3615 
3616     } else {
3617         hdr = NULL;
3618     }
3619 
3620     return hdr;
3621 }
3622 
3623 
3624 static int
3625 nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
3626 {
3627     int                           rc;
3628     nxt_chunk_id_t                c;
3629     nxt_unit_process_t            *process;
3630     nxt_port_mmap_header_t        *hdr;
3631     nxt_port_mmap_tracking_msg_t  *tracking_msg;
3632 
3633     if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
3634         nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
3635                       recv_msg->stream, (int) recv_msg->size);
3636 
3637         return 0;
3638     }
3639 
3640     tracking_msg = recv_msg->start;
3641 
3642     recv_msg->start = tracking_msg + 1;
3643     recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
3644 
3645     process = nxt_unit_msg_get_process(ctx, recv_msg);
3646     if (nxt_slow_path(process == NULL)) {
3647         return 0;
3648     }
3649 
3650     pthread_mutex_lock(&process->incoming.mutex);
3651 
3652     hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
3653     if (nxt_slow_path(hdr == NULL)) {
3654         pthread_mutex_unlock(&process->incoming.mutex);
3655 
3656         nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
3657                       "invalid mmap id %d,%"PRIu32,
3658                       recv_msg->stream, (int) process->pid,
3659                       tracking_msg->mmap_id);
3660 
3661         return 0;
3662     }
3663 
3664     c = tracking_msg->tracking_id;
3665     rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
3666 
3667     if (rc == 0) {
3668         nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
3669                        recv_msg->stream);
3670 
3671         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
3672     }
3673 
3674     pthread_mutex_unlock(&process->incoming.mutex);
3675 
3676     return rc;
3677 }
3678 
3679 
3680 static int
3681 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
3682 {
3683     void                    *start;
3684     uint32_t                size;
3685     nxt_unit_process_t      *process;
3686     nxt_unit_mmap_buf_t     *b, **incoming_tail;
3687     nxt_port_mmap_msg_t     *mmap_msg, *end;
3688     nxt_port_mmap_header_t  *hdr;
3689 
3690     if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
3691         nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
3692                       recv_msg->stream, (int) recv_msg->size);
3693 
3694         return NXT_UNIT_ERROR;
3695     }
3696 
3697     process = nxt_unit_msg_get_process(ctx, recv_msg);
3698     if (nxt_slow_path(process == NULL)) {
3699         return NXT_UNIT_ERROR;
3700     }
3701 
3702     mmap_msg = recv_msg->start;
3703     end = nxt_pointer_to(recv_msg->start, recv_msg->size);
3704 
3705     incoming_tail = &recv_msg->incoming_buf;
3706 
3707     for (; mmap_msg < end; mmap_msg++) {
3708         b = nxt_unit_mmap_buf_get(ctx);
3709         if (nxt_slow_path(b == NULL)) {
3710             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
3711                           recv_msg->stream);
3712 
3713             return NXT_UNIT_ERROR;
3714         }
3715 
3716         nxt_unit_mmap_buf_insert(incoming_tail, b);
3717         incoming_tail = &b->next;
3718     }
3719 
3720     b = recv_msg->incoming_buf;
3721     mmap_msg = recv_msg->start;
3722 
3723     pthread_mutex_lock(&process->incoming.mutex);
3724 
3725     for (; mmap_msg < end; mmap_msg++) {
3726         hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
3727         if (nxt_slow_path(hdr == NULL)) {
3728             pthread_mutex_unlock(&process->incoming.mutex);
3729 
3730             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
3731                           "invalid mmap id %d,%"PRIu32,
3732                           recv_msg->stream, (int) process->pid,
3733                           mmap_msg->mmap_id);
3734 
3735             return NXT_UNIT_ERROR;
3736         }
3737 
3738         start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
3739         size = mmap_msg->size;
3740 
3741         if (recv_msg->start == mmap_msg) {
3742             recv_msg->start = start;
3743             recv_msg->size = size;
3744         }
3745 
3746         b->buf.start = start;
3747         b->buf.free = start;
3748         b->buf.end = b->buf.start + size;
3749         b->hdr = hdr;
3750         b->process = process;
3751 
3752         b = b->next;
3753 
3754         nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
3755                        recv_msg->stream,
3756                        start, (int) size,
3757                        (int) hdr->src_pid, (int) hdr->dst_pid,
3758                        (int) hdr->id, (int) mmap_msg->chunk_id,
3759                        (int) mmap_msg->size);
3760     }
3761 
3762     pthread_mutex_unlock(&process->incoming.mutex);
3763 
3764     return NXT_UNIT_OK;
3765 }
3766 
3767 
3768 static void
3769 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
3770     nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr,
3771     void *start, uint32_t size)
3772 {
3773     int              freed_chunks;
3774     u_char           *p, *end;
3775     nxt_chunk_id_t   c;
3776     nxt_unit_impl_t  *lib;
3777 
3778     memset(start, 0xA5, size);
3779 
3780     p = start;
3781     end = p + size;
3782     c = nxt_port_mmap_chunk_id(hdr, p);
3783     freed_chunks = 0;
3784 
3785     while (p < end) {
3786         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
3787 
3788         p += PORT_MMAP_CHUNK_SIZE;
3789         c++;
3790         freed_chunks++;
3791     }
3792 
3793     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3794 
3795     if (hdr->src_pid == lib->pid && freed_chunks != 0) {
3796         nxt_atomic_fetch_add(&process->outgoing.allocated_chunks,
3797                              -freed_chunks);
3798 
3799         nxt_unit_debug(ctx, "process %d allocated_chunks %d",
3800                        process->pid,
3801                        (int) process->outgoing.allocated_chunks);
3802     }
3803 
3804     if (hdr->dst_pid == lib->pid
3805         && freed_chunks != 0
3806         && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
3807     {
3808         nxt_unit_send_shm_ack(ctx, hdr->src_pid);
3809     }
3810 }
3811 
3812 
3813 static int
3814 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
3815 {
3816     ssize_t          res;
3817     nxt_port_msg_t   msg;
3818     nxt_unit_impl_t  *lib;
3819 
3820     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3821 
3822     msg.stream = 0;
3823     msg.pid = lib->pid;
3824     msg.reply_port = 0;
3825     msg.type = _NXT_PORT_MSG_SHM_ACK;
3826     msg.last = 0;
3827     msg.mmap = 0;
3828     msg.nf = 0;
3829     msg.mf = 0;
3830     msg.tracking = 0;
3831 
3832     res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
3833     if (nxt_slow_path(res != sizeof(msg))) {
3834         return NXT_UNIT_ERROR;
3835     }
3836 
3837     return NXT_UNIT_OK;
3838 }
3839 
3840 
3841 static nxt_int_t
3842 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
3843 {
3844     nxt_process_t  *process;
3845 
3846     process = data;
3847 
3848     if (lhq->key.length == sizeof(pid_t)
3849         && *(pid_t *) lhq->key.start == process->pid)
3850     {
3851         return NXT_OK;
3852     }
3853 
3854     return NXT_DECLINED;
3855 }
3856 
3857 
3858 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
3859     NXT_LVLHSH_DEFAULT,
3860     nxt_unit_lvlhsh_pid_test,
3861     nxt_lvlhsh_alloc,
3862     nxt_lvlhsh_free,
3863 };
3864 
3865 
3866 static inline void
3867 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
3868 {
3869     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
3870     lhq->key.length = sizeof(*pid);
3871     lhq->key.start = (u_char *) pid;
3872     lhq->proto = &lvlhsh_processes_proto;
3873 }
3874 
3875 
3876 static nxt_unit_process_t *
3877 nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
3878 {
3879     nxt_unit_process_t  *process;
3880     nxt_lvlhsh_query_t  lhq;
3881 
3882     nxt_unit_process_lhq_pid(&lhq, &pid);
3883 
3884     if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
3885         process = lhq.value;
3886         nxt_unit_process_use(process);
3887 
3888         return process;
3889     }
3890 
3891     process = malloc(sizeof(nxt_unit_process_t));
3892     if (nxt_slow_path(process == NULL)) {
3893         nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid);
3894 
3895         return NULL;
3896     }
3897 
3898     process->pid = pid;
3899     process->use_count = 1;
3900     process->next_port_id = 0;
3901     process->lib = lib;
3902 
3903     nxt_queue_init(&process->ports);
3904 
3905     nxt_unit_mmaps_init(&process->incoming);
3906     nxt_unit_mmaps_init(&process->outgoing);
3907 
3908     lhq.replace = 0;
3909     lhq.value = process;
3910 
3911     switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
3912 
3913     case NXT_OK:
3914         break;
3915 
3916     default:
3917         nxt_unit_alert(NULL, "process %d insert failed", (int) pid);
3918 
3919         pthread_mutex_destroy(&process->outgoing.mutex);
3920         pthread_mutex_destroy(&process->incoming.mutex);
3921         free(process);
3922         process = NULL;
3923         break;
3924     }
3925 
3926     nxt_unit_process_use(process);
3927 
3928     return process;
3929 }
3930 
3931 
3932 static nxt_unit_process_t *
3933 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
3934 {
3935     int                 rc;
3936     nxt_lvlhsh_query_t  lhq;
3937 
3938     nxt_unit_process_lhq_pid(&lhq, &pid);
3939 
3940     if (remove) {
3941         rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
3942 
3943     } else {
3944         rc = nxt_lvlhsh_find(&lib->processes, &lhq);
3945     }
3946 
3947     if (rc == NXT_OK) {
3948         if (!remove) {
3949             nxt_unit_process_use(lhq.value);
3950         }
3951 
3952         return lhq.value;
3953     }
3954 
3955     return NULL;
3956 }
3957 
3958 
3959 static nxt_unit_process_t *
3960 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
3961 {
3962     return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
3963 }
3964 
3965 
3966 int
3967 nxt_unit_run(nxt_unit_ctx_t *ctx)
3968 {
3969     int                  rc;
3970     nxt_unit_impl_t      *lib;
3971     nxt_unit_ctx_impl_t  *ctx_impl;
3972 
3973     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3974 
3975     nxt_unit_ctx_use(ctx_impl);
3976 
3977     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3978     rc = NXT_UNIT_OK;
3979 
3980     while (nxt_fast_path(lib->online)) {
3981         rc = nxt_unit_run_once(ctx);
3982 
3983         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3984             break;
3985         }
3986     }
3987 
3988     nxt_unit_ctx_release(ctx_impl);
3989 
3990     return rc;
3991 }
3992 
3993 
3994 int
3995 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
3996 {
3997     int                  rc;
3998     nxt_unit_ctx_impl_t  *ctx_impl;
3999     nxt_unit_read_buf_t  *rbuf;
4000 
4001     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4002 
4003     nxt_unit_ctx_use(ctx_impl);
4004 
4005     pthread_mutex_lock(&ctx_impl->mutex);
4006 
4007     if (ctx_impl->pending_read_head != NULL) {
4008         rbuf = ctx_impl->pending_read_head;
4009         ctx_impl->pending_read_head = rbuf->next;
4010 
4011         if (ctx_impl->pending_read_tail == &rbuf->next) {
4012             ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
4013         }
4014 
4015         pthread_mutex_unlock(&ctx_impl->mutex);
4016 
4017     } else {
4018         rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
4019         if (nxt_slow_path(rbuf == NULL)) {
4020 
4021             nxt_unit_ctx_release(ctx_impl);
4022 
4023             return NXT_UNIT_ERROR;
4024         }
4025 
4026         nxt_unit_read_buf(ctx, rbuf);
4027     }
4028 
4029     if (nxt_fast_path(rbuf->size > 0)) {
4030         rc = nxt_unit_process_msg(ctx,
4031                                   rbuf->buf, rbuf->size,
4032                                   rbuf->oob, sizeof(rbuf->oob));
4033 
4034 #if (NXT_DEBUG)
4035         memset(rbuf->buf, 0xAC, rbuf->size);
4036 #endif
4037 
4038     } else {
4039         rc = NXT_UNIT_ERROR;
4040     }
4041 
4042     nxt_unit_read_buf_release(ctx, rbuf);
4043 
4044     nxt_unit_ctx_release(ctx_impl);
4045 
4046     return rc;
4047 }
4048 
4049 
4050 static void
4051 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4052 {
4053     nxt_unit_ctx_impl_t  *ctx_impl;
4054 
4055     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4056 
4057     memset(rbuf->oob, 0, sizeof(struct cmsghdr));
4058 
4059     rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port,
4060                                     rbuf->buf, sizeof(rbuf->buf),
4061                                     rbuf->oob, sizeof(rbuf->oob));
4062 }
4063 
4064 
4065 void
4066 nxt_unit_done(nxt_unit_ctx_t *ctx)
4067 {
4068     nxt_unit_ctx_impl_t  *ctx_impl;
4069 
4070     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4071 
4072     nxt_unit_ctx_release(ctx_impl);
4073 }
4074 
4075 
4076 nxt_unit_ctx_t *
4077 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
4078 {
4079     int                  rc;
4080     nxt_unit_impl_t      *lib;
4081     nxt_unit_port_t      *port;
4082     nxt_unit_ctx_impl_t  *new_ctx;
4083 
4084     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4085 
4086     new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
4087     if (nxt_slow_path(new_ctx == NULL)) {
4088         nxt_unit_alert(ctx, "failed to allocate context");
4089 
4090         return NULL;
4091     }
4092 
4093     port = nxt_unit_create_port(ctx);
4094     if (nxt_slow_path(port == NULL)) {
4095         free(new_ctx);
4096 
4097         return NULL;
4098     }
4099 
4100     rc = nxt_unit_send_port(ctx, lib->router_port, port);
4101     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4102         goto fail;
4103     }
4104 
4105     rc = nxt_unit_ctx_init(lib, new_ctx, data);
4106     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4107         goto fail;
4108     }
4109 
4110     new_ctx->read_port = port;
4111 
4112     return &new_ctx->ctx;
4113 
4114 fail:
4115 
4116     nxt_unit_remove_port(lib, &port->id);
4117     nxt_unit_port_release(port);
4118 
4119     free(new_ctx);
4120 
4121     return NULL;
4122 }
4123 
4124 
4125 static void
4126 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
4127 {
4128     nxt_unit_impl_t                  *lib;
4129     nxt_unit_mmap_buf_t              *mmap_buf;
4130     nxt_unit_request_info_impl_t     *req_impl;
4131     nxt_unit_websocket_frame_impl_t  *ws_impl;
4132 
4133     lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
4134 
4135     nxt_queue_each(req_impl, &ctx_impl->active_req,
4136                    nxt_unit_request_info_impl_t, link)
4137     {
4138         nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
4139 
4140         nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
4141 
4142     } nxt_queue_loop;
4143 
4144     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]);
4145     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]);
4146 
4147     while (ctx_impl->free_buf != NULL) {
4148         mmap_buf = ctx_impl->free_buf;
4149         nxt_unit_mmap_buf_unlink(mmap_buf);
4150         free(mmap_buf);
4151     }
4152 
4153     nxt_queue_each(req_impl, &ctx_impl->free_req,
4154                    nxt_unit_request_info_impl_t, link)
4155     {
4156         nxt_unit_request_info_free(req_impl);
4157 
4158     } nxt_queue_loop;
4159 
4160     nxt_queue_each(ws_impl, &ctx_impl->free_ws,
4161                    nxt_unit_websocket_frame_impl_t, link)
4162     {
4163         nxt_unit_websocket_frame_free(ws_impl);
4164 
4165     } nxt_queue_loop;
4166 
4167     pthread_mutex_destroy(&ctx_impl->mutex);
4168 
4169     nxt_queue_remove(&ctx_impl->link);
4170 
4171     if (nxt_fast_path(ctx_impl->read_port != NULL)) {
4172         nxt_unit_port_release(ctx_impl->read_port);
4173     }
4174 
4175     if (ctx_impl != &lib->main_ctx) {
4176         free(ctx_impl);
4177     }
4178 
4179     nxt_unit_lib_release(lib);
4180 }
4181 
4182 
4183 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
4184 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
4185 #define NXT_UNIX_SOCKET  SOCK_SEQPACKET
4186 #else
4187 #define NXT_UNIX_SOCKET  SOCK_DGRAM
4188 #endif
4189 
4190 
4191 void
4192 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
4193 {
4194     nxt_unit_port_hash_id_t  port_hash_id;
4195 
4196     port_hash_id.pid = pid;
4197     port_hash_id.id = id;
4198 
4199     port_id->pid = pid;
4200     port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
4201     port_id->id = id;
4202 }
4203 
4204 
4205 static nxt_unit_port_t *
4206 nxt_unit_create_port(nxt_unit_ctx_t *ctx)
4207 {
4208     int                 rc, port_sockets[2];
4209     nxt_unit_impl_t     *lib;
4210     nxt_unit_port_t     new_port, *port;
4211     nxt_unit_process_t  *process;
4212 
4213     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4214 
4215     rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
4216     if (nxt_slow_path(rc != 0)) {
4217         nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
4218                       strerror(errno), errno);
4219 
4220         return NULL;
4221     }
4222 
4223     nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
4224                    port_sockets[0], port_sockets[1]);
4225 
4226     pthread_mutex_lock(&lib->mutex);
4227 
4228     process = nxt_unit_process_get(lib, lib->pid);
4229     if (nxt_slow_path(process == NULL)) {
4230         pthread_mutex_unlock(&lib->mutex);
4231 
4232         close(port_sockets[0]);
4233         close(port_sockets[1]);
4234 
4235         return NULL;
4236     }
4237 
4238     nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
4239 
4240     new_port.in_fd = port_sockets[0];
4241     new_port.out_fd = port_sockets[1];
4242     new_port.data = NULL;
4243 
4244     pthread_mutex_unlock(&lib->mutex);
4245 
4246     nxt_unit_process_release(process);
4247 
4248     port = nxt_unit_add_port(ctx, &new_port);
4249     if (nxt_slow_path(port == NULL)) {
4250         nxt_unit_alert(ctx, "create_port: add_port() failed");
4251 
4252         close(port_sockets[0]);
4253         close(port_sockets[1]);
4254     }
4255 
4256     return port;
4257 }
4258 
4259 
4260 static int
4261 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
4262     nxt_unit_port_t *port)
4263 {
4264     ssize_t          res;
4265     nxt_unit_impl_t  *lib;
4266 
4267     struct {
4268         nxt_port_msg_t            msg;
4269         nxt_port_msg_new_port_t   new_port;
4270     } m;
4271 
4272     union {
4273         struct cmsghdr  cm;
4274         char            space[CMSG_SPACE(sizeof(int))];
4275     } cmsg;
4276 
4277     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4278 
4279     m.msg.stream = 0;
4280     m.msg.pid = lib->pid;
4281     m.msg.reply_port = 0;
4282     m.msg.type = _NXT_PORT_MSG_NEW_PORT;
4283     m.msg.last = 0;
4284     m.msg.mmap = 0;
4285     m.msg.nf = 0;
4286     m.msg.mf = 0;
4287     m.msg.tracking = 0;
4288 
4289     m.new_port.id = port->id.id;
4290     m.new_port.pid = port->id.pid;
4291     m.new_port.type = NXT_PROCESS_APP;
4292     m.new_port.max_size = 16 * 1024;
4293     m.new_port.max_share = 64 * 1024;
4294 
4295     memset(&cmsg, 0, sizeof(cmsg));
4296 
4297     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
4298     cmsg.cm.cmsg_level = SOL_SOCKET;
4299     cmsg.cm.cmsg_type = SCM_RIGHTS;
4300 
4301     /*
4302      * memcpy() is used instead of simple
4303      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
4304      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
4305      *   dereferencing type-punned pointer will break strict-aliasing rules
4306      *
4307      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
4308      * in the same simple assignment as in the code above.
4309      */
4310     memcpy(CMSG_DATA(&cmsg.cm), &port->out_fd, sizeof(int));
4311 
4312     res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
4313 
4314     return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
4315 }
4316 
4317 
4318 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port)
4319 {
4320     nxt_unit_port_impl_t  *port_impl;
4321 
4322     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
4323 
4324     nxt_atomic_fetch_add(&port_impl->use_count, 1);
4325 }
4326 
4327 
4328 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
4329 {
4330     long                  c;
4331     nxt_unit_port_impl_t  *port_impl;
4332 
4333     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
4334 
4335     c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
4336 
4337     if (c == 1) {
4338         nxt_unit_debug(NULL, "destroy port %d,%d",
4339                        (int) port->id.pid, (int) port->id.id);
4340 
4341         nxt_unit_process_release(port_impl->process);
4342 
4343         if (port->in_fd != -1) {
4344             close(port->in_fd);
4345 
4346             port->in_fd = -1;
4347         }
4348 
4349         if (port->out_fd != -1) {
4350             close(port->out_fd);
4351 
4352             port->out_fd = -1;
4353         }
4354 
4355         free(port_impl);
4356     }
4357 }
4358 
4359 
4360 nxt_inline nxt_unit_process_t *
4361 nxt_unit_port_process(nxt_unit_port_t *port)
4362 {
4363     nxt_unit_port_impl_t  *port_impl;
4364 
4365     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
4366 
4367     return port_impl->process;
4368 }
4369 
4370 
4371 static nxt_unit_port_t *
4372 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
4373 {
4374     int                   rc;
4375     nxt_unit_impl_t       *lib;
4376     nxt_unit_port_t       *old_port;
4377     nxt_unit_process_t    *process;
4378     nxt_unit_port_impl_t  *new_port;
4379 
4380     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4381 
4382     pthread_mutex_lock(&lib->mutex);
4383 
4384     old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
4385 
4386     if (nxt_slow_path(old_port != NULL)) {
4387         nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d",
4388                        port->id.pid, port->id.id,
4389                        port->in_fd, port->out_fd);
4390 
4391         if (old_port->data == NULL) {
4392             old_port->data = port->data;
4393             port->data = NULL;
4394         }
4395 
4396         if (old_port->in_fd == -1) {
4397             old_port->in_fd = port->in_fd;
4398             port->in_fd = -1;
4399         }
4400 
4401         if (port->in_fd != -1) {
4402             close(port->in_fd);
4403             port->in_fd = -1;
4404         }
4405 
4406         if (old_port->out_fd == -1) {
4407             old_port->out_fd = port->out_fd;
4408             port->out_fd = -1;
4409         }
4410 
4411         if (port->out_fd != -1) {
4412             close(port->out_fd);
4413             port->out_fd = -1;
4414         }
4415 
4416         *port = *old_port;
4417 
4418         pthread_mutex_unlock(&lib->mutex);
4419 
4420         if (lib->callbacks.add_port != NULL
4421             && (port->in_fd != -1 || port->out_fd != -1))
4422         {
4423             lib->callbacks.add_port(ctx, old_port);
4424         }
4425 
4426         return old_port;
4427     }
4428 
4429     new_port = NULL;
4430 
4431     nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
4432                    port->id.pid, port->id.id,
4433                    port->in_fd, port->out_fd);
4434 
4435     process = nxt_unit_process_get(lib, port->id.pid);
4436     if (nxt_slow_path(process == NULL)) {
4437         goto unlock;
4438     }
4439 
4440     if (port->id.id >= process->next_port_id) {
4441         process->next_port_id = port->id.id + 1;
4442     }
4443 
4444     new_port = malloc(sizeof(nxt_unit_port_impl_t));
4445     if (nxt_slow_path(new_port == NULL)) {
4446         goto unlock;
4447     }
4448 
4449     new_port->port = *port;
4450 
4451     rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
4452     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4453         nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
4454                        port->id.pid, port->id.id);
4455 
4456         free(new_port);
4457 
4458         new_port = NULL;
4459 
4460         goto unlock;
4461     }
4462 
4463     nxt_queue_insert_tail(&process->ports, &new_port->link);
4464 
4465     new_port->use_count = 2;
4466     new_port->process = process;
4467 
4468     process = NULL;
4469 
4470 unlock:
4471 
4472     pthread_mutex_unlock(&lib->mutex);
4473 
4474     if (nxt_slow_path(process != NULL)) {
4475         nxt_unit_process_release(process);
4476     }
4477 
4478     if (lib->callbacks.add_port != NULL
4479         && new_port != NULL
4480         && (port->in_fd != -1 || port->out_fd != -1))
4481     {
4482         lib->callbacks.add_port(ctx, &new_port->port);
4483     }
4484 
4485     return &new_port->port;
4486 }
4487 
4488 
4489 static void
4490 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
4491 {
4492     nxt_unit_port_t       *port;
4493     nxt_unit_port_impl_t  *port_impl;
4494 
4495     pthread_mutex_lock(&lib->mutex);
4496 
4497     port = nxt_unit_remove_port_unsafe(lib, port_id);
4498 
4499     if (nxt_fast_path(port != NULL)) {
4500         port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
4501 
4502         nxt_queue_remove(&port_impl->link);
4503     }
4504 
4505     pthread_mutex_unlock(&lib->mutex);
4506 
4507     if (lib->callbacks.remove_port != NULL && port != NULL) {
4508         lib->callbacks.remove_port(&lib->unit, port);
4509     }
4510 
4511     if (nxt_fast_path(port != NULL)) {
4512         nxt_unit_port_release(port);
4513     }
4514 }
4515 
4516 
4517 static nxt_unit_port_t *
4518 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
4519 {
4520     nxt_unit_port_t  *port;
4521 
4522     port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
4523     if (nxt_slow_path(port == NULL)) {
4524         nxt_unit_debug(NULL, "remove_port: port %d,%d not found",
4525                        (int) port_id->pid, (int) port_id->id);
4526 
4527         return NULL;
4528     }
4529 
4530     nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p",
4531                    (int) port_id->pid, (int) port_id->id,
4532                    port->in_fd, port->out_fd, port->data);
4533 
4534     return port;
4535 }
4536 
4537 
4538 static void
4539 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid)
4540 {
4541     nxt_unit_process_t  *process;
4542 
4543     pthread_mutex_lock(&lib->mutex);
4544 
4545     process = nxt_unit_process_find(lib, pid, 1);
4546     if (nxt_slow_path(process == NULL)) {
4547         nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid);
4548 
4549         pthread_mutex_unlock(&lib->mutex);
4550 
4551         return;
4552     }
4553 
4554     nxt_unit_remove_process(lib, process);
4555 
4556     if (lib->callbacks.remove_pid != NULL) {
4557         lib->callbacks.remove_pid(&lib->unit, pid);
4558     }
4559 }
4560 
4561 
4562 static void
4563 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
4564 {
4565     nxt_queue_t           ports;
4566     nxt_unit_port_impl_t  *port;
4567 
4568     nxt_queue_init(&ports);
4569 
4570     nxt_queue_add(&ports, &process->ports);
4571 
4572     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
4573 
4574         nxt_unit_remove_port_unsafe(lib, &port->port.id);
4575 
4576     } nxt_queue_loop;
4577 
4578     pthread_mutex_unlock(&lib->mutex);
4579 
4580     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
4581 
4582         nxt_queue_remove(&port->link);
4583 
4584         if (lib->callbacks.remove_port != NULL) {
4585             lib->callbacks.remove_port(&lib->unit, &port->port);
4586         }
4587 
4588         nxt_unit_port_release(&port->port);
4589 
4590     } nxt_queue_loop;
4591 
4592     nxt_unit_process_release(process);
4593 }
4594 
4595 
4596 static void
4597 nxt_unit_quit(nxt_unit_ctx_t *ctx)
4598 {
4599     nxt_unit_impl_t  *lib;
4600 
4601     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4602 
4603     lib->online = 0;
4604 
4605     if (lib->callbacks.quit != NULL) {
4606         lib->callbacks.quit(ctx);
4607     }
4608 }
4609 
4610 
4611 static ssize_t
4612 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
4613     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
4614 {
4615     nxt_unit_impl_t  *lib;
4616 
4617     nxt_unit_debug(ctx, "port_send: port %d,%d fd %d",
4618                    (int) port->id.pid, (int) port->id.id, port->out_fd);
4619 
4620     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4621 
4622     if (lib->callbacks.port_send != NULL) {
4623         return lib->callbacks.port_send(ctx, port, buf, buf_size,
4624                                         oob, oob_size);
4625     }
4626 
4627     return nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
4628                             oob, oob_size);
4629 }
4630 
4631 
4632 static ssize_t
4633 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
4634     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
4635 {
4636     ssize_t        res;
4637     struct iovec   iov[1];
4638     struct msghdr  msg;
4639 
4640     iov[0].iov_base = (void *) buf;
4641     iov[0].iov_len = buf_size;
4642 
4643     msg.msg_name = NULL;
4644     msg.msg_namelen = 0;
4645     msg.msg_iov = iov;
4646     msg.msg_iovlen = 1;
4647     msg.msg_flags = 0;
4648     msg.msg_control = (void *) oob;
4649     msg.msg_controllen = oob_size;
4650 
4651 retry:
4652 
4653     res = sendmsg(fd, &msg, 0);
4654 
4655     if (nxt_slow_path(res == -1)) {
4656         if (errno == EINTR) {
4657             goto retry;
4658         }
4659 
4660         /*
4661          * FIXME: This should be "alert" after router graceful shutdown
4662          * implementation.
4663          */
4664         nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
4665                       fd, (int) buf_size, strerror(errno), errno);
4666 
4667     } else {
4668         nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
4669                        (int) res);
4670     }
4671 
4672     return res;
4673 }
4674 
4675 
4676 static ssize_t
4677 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
4678     void *buf, size_t buf_size, void *oob, size_t oob_size)
4679 {
4680     int              fd;
4681     ssize_t          res;
4682     struct iovec     iov[1];
4683     struct msghdr    msg;
4684     nxt_unit_impl_t  *lib;
4685 
4686     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4687 
4688     if (lib->callbacks.port_recv != NULL) {
4689         return lib->callbacks.port_recv(ctx, port,
4690                                         buf, buf_size, oob, oob_size);
4691     }
4692 
4693     iov[0].iov_base = buf;
4694     iov[0].iov_len = buf_size;
4695 
4696     msg.msg_name = NULL;
4697     msg.msg_namelen = 0;
4698     msg.msg_iov = iov;
4699     msg.msg_iovlen = 1;
4700     msg.msg_flags = 0;
4701     msg.msg_control = oob;
4702     msg.msg_controllen = oob_size;
4703 
4704     fd = port->in_fd;
4705 
4706 retry:
4707 
4708     res = recvmsg(fd, &msg, 0);
4709 
4710     if (nxt_slow_path(res == -1)) {
4711         if (errno == EINTR) {
4712             goto retry;
4713         }
4714 
4715         nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
4716                        fd, strerror(errno), errno);
4717 
4718     } else {
4719         nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res);
4720     }
4721 
4722     return res;
4723 }
4724 
4725 
4726 static nxt_int_t
4727 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
4728 {
4729     nxt_unit_port_t          *port;
4730     nxt_unit_port_hash_id_t  *port_id;
4731 
4732     port = data;
4733     port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
4734 
4735     if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
4736         && port_id->pid == port->id.pid
4737         && port_id->id == port->id.id)
4738     {
4739         return NXT_OK;
4740     }
4741 
4742     return NXT_DECLINED;
4743 }
4744 
4745 
4746 static const nxt_lvlhsh_proto_t  lvlhsh_ports_proto  nxt_aligned(64) = {
4747     NXT_LVLHSH_DEFAULT,
4748     nxt_unit_port_hash_test,
4749     nxt_lvlhsh_alloc,
4750     nxt_lvlhsh_free,
4751 };
4752 
4753 
4754 static inline void
4755 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
4756     nxt_unit_port_hash_id_t *port_hash_id,
4757     nxt_unit_port_id_t *port_id)
4758 {
4759     port_hash_id->pid = port_id->pid;
4760     port_hash_id->id = port_id->id;
4761 
4762     if (nxt_fast_path(port_id->hash != 0)) {
4763         lhq->key_hash = port_id->hash;
4764 
4765     } else {
4766         lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
4767 
4768         port_id->hash = lhq->key_hash;
4769 
4770         nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
4771                        (int) port_id->pid, (int) port_id->id,
4772                        (int) port_id->hash);
4773     }
4774 
4775     lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
4776     lhq->key.start = (u_char *) port_hash_id;
4777     lhq->proto = &lvlhsh_ports_proto;
4778     lhq->pool = NULL;
4779 }
4780 
4781 
4782 static int
4783 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
4784 {
4785     nxt_int_t                res;
4786     nxt_lvlhsh_query_t       lhq;
4787     nxt_unit_port_hash_id_t  port_hash_id;
4788 
4789     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
4790     lhq.replace = 0;
4791     lhq.value = port;
4792 
4793     res = nxt_lvlhsh_insert(port_hash, &lhq);
4794 
4795     switch (res) {
4796 
4797     case NXT_OK:
4798         return NXT_UNIT_OK;
4799 
4800     default:
4801         return NXT_UNIT_ERROR;
4802     }
4803 }
4804 
4805 
4806 static nxt_unit_port_t *
4807 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
4808     int remove)
4809 {
4810     nxt_int_t                res;
4811     nxt_lvlhsh_query_t       lhq;
4812     nxt_unit_port_hash_id_t  port_hash_id;
4813 
4814     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
4815 
4816     if (remove) {
4817         res = nxt_lvlhsh_delete(port_hash, &lhq);
4818 
4819     } else {
4820         res = nxt_lvlhsh_find(port_hash, &lhq);
4821     }
4822 
4823     switch (res) {
4824 
4825     case NXT_OK:
4826         if (!remove) {
4827             nxt_unit_port_use(lhq.value);
4828         }
4829 
4830         return lhq.value;
4831 
4832     default:
4833         return NULL;
4834     }
4835 }
4836 
4837 
4838 static nxt_int_t
4839 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
4840 {
4841     return NXT_OK;
4842 }
4843 
4844 
4845 static const nxt_lvlhsh_proto_t  lvlhsh_requests_proto  nxt_aligned(64) = {
4846     NXT_LVLHSH_DEFAULT,
4847     nxt_unit_request_hash_test,
4848     nxt_lvlhsh_alloc,
4849     nxt_lvlhsh_free,
4850 };
4851 
4852 
4853 static int
4854 nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
4855     nxt_unit_request_info_impl_t *req_impl)
4856 {
4857     uint32_t            *stream;
4858     nxt_int_t           res;
4859     nxt_lvlhsh_query_t  lhq;
4860 
4861     stream = &req_impl->stream;
4862 
4863     lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
4864     lhq.key.length = sizeof(*stream);
4865     lhq.key.start = (u_char *) stream;
4866     lhq.proto = &lvlhsh_requests_proto;
4867     lhq.pool = NULL;
4868     lhq.replace = 0;
4869     lhq.value = req_impl;
4870 
4871     res = nxt_lvlhsh_insert(request_hash, &lhq);
4872 
4873     switch (res) {
4874 
4875     case NXT_OK:
4876         return NXT_UNIT_OK;
4877 
4878     default:
4879         return NXT_UNIT_ERROR;
4880     }
4881 }
4882 
4883 
4884 static nxt_unit_request_info_impl_t *
4885 nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
4886     int remove)
4887 {
4888     nxt_int_t           res;
4889     nxt_lvlhsh_query_t  lhq;
4890 
4891     lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
4892     lhq.key.length = sizeof(stream);
4893     lhq.key.start = (u_char *) &stream;
4894     lhq.proto = &lvlhsh_requests_proto;
4895     lhq.pool = NULL;
4896 
4897     if (remove) {
4898         res = nxt_lvlhsh_delete(request_hash, &lhq);
4899 
4900     } else {
4901         res = nxt_lvlhsh_find(request_hash, &lhq);
4902     }
4903 
4904     switch (res) {
4905 
4906     case NXT_OK:
4907         return lhq.value;
4908 
4909     default:
4910         return NULL;
4911     }
4912 }
4913 
4914 
4915 void
4916 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
4917 {
4918     int              log_fd, n;
4919     char             msg[NXT_MAX_ERROR_STR], *p, *end;
4920     pid_t            pid;
4921     va_list          ap;
4922     nxt_unit_impl_t  *lib;
4923 
4924     if (nxt_fast_path(ctx != NULL)) {
4925         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4926 
4927         pid = lib->pid;
4928         log_fd = lib->log_fd;
4929 
4930     } else {
4931         pid = getpid();
4932         log_fd = STDERR_FILENO;
4933     }
4934 
4935     p = msg;
4936     end = p + sizeof(msg) - 1;
4937 
4938     p = nxt_unit_snprint_prefix(p, end, pid, level);
4939 
4940     va_start(ap, fmt);
4941     p += vsnprintf(p, end - p, fmt, ap);
4942     va_end(ap);
4943 
4944     if (nxt_slow_path(p > end)) {
4945         memcpy(end - 5, "[...]", 5);
4946         p = end;
4947     }
4948 
4949     *p++ = '\n';
4950 
4951     n = write(log_fd, msg, p - msg);
4952     if (nxt_slow_path(n < 0)) {
4953         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
4954     }
4955 }
4956 
4957 
4958 void
4959 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
4960 {
4961     int                           log_fd, n;
4962     char                          msg[NXT_MAX_ERROR_STR], *p, *end;
4963     pid_t                         pid;
4964     va_list                       ap;
4965     nxt_unit_impl_t               *lib;
4966     nxt_unit_request_info_impl_t  *req_impl;
4967 
4968     if (nxt_fast_path(req != NULL)) {
4969         lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
4970 
4971         pid = lib->pid;
4972         log_fd = lib->log_fd;
4973 
4974     } else {
4975         pid = getpid();
4976         log_fd = STDERR_FILENO;
4977     }
4978 
4979     p = msg;
4980     end = p + sizeof(msg) - 1;
4981 
4982     p = nxt_unit_snprint_prefix(p, end, pid, level);
4983 
4984     if (nxt_fast_path(req != NULL)) {
4985         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
4986 
4987         p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
4988     }
4989 
4990     va_start(ap, fmt);
4991     p += vsnprintf(p, end - p, fmt, ap);
4992     va_end(ap);
4993 
4994     if (nxt_slow_path(p > end)) {
4995         memcpy(end - 5, "[...]", 5);
4996         p = end;
4997     }
4998 
4999     *p++ = '\n';
5000 
5001     n = write(log_fd, msg, p - msg);
5002     if (nxt_slow_path(n < 0)) {
5003         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
5004     }
5005 }
5006 
5007 
5008 static const char * nxt_unit_log_levels[] = {
5009     "alert",
5010     "error",
5011     "warn",
5012     "notice",
5013     "info",
5014     "debug",
5015 };
5016 
5017 
5018 static char *
5019 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
5020 {
5021     struct tm        tm;
5022     struct timespec  ts;
5023 
5024     (void) clock_gettime(CLOCK_REALTIME, &ts);
5025 
5026 #if (NXT_HAVE_LOCALTIME_R)
5027     (void) localtime_r(&ts.tv_sec, &tm);
5028 #else
5029     tm = *localtime(&ts.tv_sec);
5030 #endif
5031 
5032 #if (NXT_DEBUG)
5033     p += snprintf(p, end - p,
5034                   "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
5035                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
5036                   tm.tm_hour, tm.tm_min, tm.tm_sec,
5037                   (int) ts.tv_nsec / 1000000);
5038 #else
5039     p += snprintf(p, end - p,
5040                   "%4d/%02d/%02d %02d:%02d:%02d ",
5041                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
5042                   tm.tm_hour, tm.tm_min, tm.tm_sec);
5043 #endif
5044 
5045     p += snprintf(p, end - p,
5046                   "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
5047                   (int) pid,
5048                   (uint64_t) (uintptr_t) nxt_thread_get_tid());
5049 
5050     return p;
5051 }
5052 
5053 
5054 /* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */
5055 
5056 void *
5057 nxt_memalign(size_t alignment, size_t size)
5058 {
5059     void        *p;
5060     nxt_err_t   err;
5061 
5062     err = posix_memalign(&p, alignment, size);
5063 
5064     if (nxt_fast_path(err == 0)) {
5065         return p;
5066     }
5067 
5068     return NULL;
5069 }
5070 
5071 #if (NXT_DEBUG)
5072 
5073 void
5074 nxt_free(void *p)
5075 {
5076     free(p);
5077 }
5078 
5079 #endif
5080