xref: /unit/src/nxt_unit.c (revision 1543:42f27153db91)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <stdlib.h>
7 
8 #include "nxt_main.h"
9 #include "nxt_port_memory_int.h"
10 
11 #include "nxt_unit.h"
12 #include "nxt_unit_request.h"
13 #include "nxt_unit_response.h"
14 #include "nxt_unit_websocket.h"
15 
16 #include "nxt_websocket.h"
17 
18 #if (NXT_HAVE_MEMFD_CREATE)
19 #include <linux/memfd.h>
20 #endif
21 
22 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
23 #define NXT_UNIT_LOCAL_BUF_SIZE  \
24     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
25 
26 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
27 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
28 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
29 typedef struct nxt_unit_process_s               nxt_unit_process_t;
30 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
31 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
32 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
33 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
34 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
35 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
36 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
37 
38 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
39 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
40     nxt_unit_ctx_impl_t *ctx_impl, void *data);
41 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl);
42 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl);
43 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
44 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
45 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
46     nxt_unit_mmap_buf_t *mmap_buf);
47 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
48     nxt_unit_mmap_buf_t *mmap_buf);
49 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
50 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
51     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
52     int *log_fd, uint32_t *stream, uint32_t *shm_limit);
53 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream);
54 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
55     nxt_unit_recv_msg_t *recv_msg);
56 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
57     nxt_unit_recv_msg_t *recv_msg);
58 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
59     nxt_unit_recv_msg_t *recv_msg);
60 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
61 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
62     nxt_unit_ctx_t *ctx);
63 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
64 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
65 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
66     nxt_unit_ctx_t *ctx);
67 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
68 static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
69 static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
70     nxt_unit_recv_msg_t *recv_msg);
71 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
72 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
73 static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
74     nxt_unit_mmap_buf_t *mmap_buf, int last);
75 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
76 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
77 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
78 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
79     nxt_unit_ctx_impl_t *ctx_impl);
80 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
81     nxt_unit_read_buf_t *rbuf);
82 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
83     nxt_unit_request_info_t *req, size_t size);
84 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
85     size_t size);
86 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
87     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
88     nxt_chunk_id_t *c, int *n, int min_n);
89 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
90 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
91 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
92 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
93     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
94 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
95     int fd);
96 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
97     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
98     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
99 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
100 
101 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
102 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
103 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
104 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
105 static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
106     nxt_unit_process_t *process, uint32_t id);
107 static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
108     nxt_unit_recv_msg_t *recv_msg);
109 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
110     nxt_unit_recv_msg_t *recv_msg);
111 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
112     nxt_unit_process_t *process,
113     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
114 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
115 
116 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
117     pid_t pid);
118 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
119     pid_t pid, int remove);
120 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
121 static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
122     nxt_unit_read_buf_t *rbuf);
123 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
124 static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
125     nxt_unit_port_id_t *port_id, int *fd);
126 
127 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
128     nxt_unit_port_id_t *new_port, int fd);
129 
130 static int nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
131 static int nxt_unit_remove_port(nxt_unit_impl_t *lib,
132     nxt_unit_port_id_t *port_id);
133 static int nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
134     nxt_unit_port_id_t *port_id, nxt_unit_port_t **r_port,
135     nxt_unit_process_t **process);
136 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
137 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
138     nxt_unit_process_t *process);
139 static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
140 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
141     nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size,
142     const void *oob, size_t oob_size);
143 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
144     const void *buf, size_t buf_size, const void *oob, size_t oob_size);
145 static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx,
146     nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
147     void *oob, size_t oob_size);
148 
149 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
150     nxt_unit_port_t *port);
151 static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
152     nxt_unit_port_id_t *port_id, int remove);
153 
154 static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
155     nxt_unit_request_info_impl_t *req_impl);
156 static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
157     nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
158 
159 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
160 
161 
162 struct nxt_unit_mmap_buf_s {
163     nxt_unit_buf_t           buf;
164 
165     nxt_unit_mmap_buf_t      *next;
166     nxt_unit_mmap_buf_t      **prev;
167 
168     nxt_port_mmap_header_t   *hdr;
169     nxt_unit_port_id_t       port_id;
170     nxt_unit_request_info_t  *req;
171     nxt_unit_ctx_impl_t      *ctx_impl;
172     nxt_unit_process_t       *process;
173     char                     *free_ptr;
174     char                     *plain_ptr;
175 };
176 
177 
178 struct nxt_unit_recv_msg_s {
179     uint32_t                 stream;
180     nxt_pid_t                pid;
181     nxt_port_id_t            reply_port;
182 
183     uint8_t                  last;      /* 1 bit */
184     uint8_t                  mmap;      /* 1 bit */
185 
186     void                     *start;
187     uint32_t                 size;
188 
189     int                      fd;
190     nxt_unit_process_t       *process;
191 
192     nxt_unit_mmap_buf_t      *incoming_buf;
193 };
194 
195 
196 typedef enum {
197     NXT_UNIT_RS_START           = 0,
198     NXT_UNIT_RS_RESPONSE_INIT,
199     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
200     NXT_UNIT_RS_RESPONSE_SENT,
201     NXT_UNIT_RS_RELEASED,
202 } nxt_unit_req_state_t;
203 
204 
205 struct nxt_unit_request_info_impl_s {
206     nxt_unit_request_info_t  req;
207 
208     uint32_t                 stream;
209 
210     nxt_unit_process_t       *process;
211 
212     nxt_unit_mmap_buf_t      *outgoing_buf;
213     nxt_unit_mmap_buf_t      *incoming_buf;
214 
215     nxt_unit_req_state_t     state;
216     uint8_t                  websocket;
217 
218     nxt_queue_link_t         link;
219 
220     char                     extra_data[];
221 };
222 
223 
224 struct nxt_unit_websocket_frame_impl_s {
225     nxt_unit_websocket_frame_t  ws;
226 
227     nxt_unit_mmap_buf_t         *buf;
228 
229     nxt_queue_link_t            link;
230 
231     nxt_unit_ctx_impl_t         *ctx_impl;
232 };
233 
234 
235 struct nxt_unit_read_buf_s {
236     nxt_unit_read_buf_t           *next;
237     ssize_t                       size;
238     char                          buf[16384];
239     char                          oob[256];
240 };
241 
242 
243 struct nxt_unit_ctx_impl_s {
244     nxt_unit_ctx_t                ctx;
245 
246     nxt_atomic_t                  use_count;
247 
248     pthread_mutex_t               mutex;
249 
250     nxt_unit_port_id_t            read_port_id;
251     int                           read_port_fd;
252 
253     nxt_queue_link_t              link;
254 
255     nxt_unit_mmap_buf_t           *free_buf;
256 
257     /*  of nxt_unit_request_info_impl_t */
258     nxt_queue_t                   free_req;
259 
260     /*  of nxt_unit_websocket_frame_impl_t */
261     nxt_queue_t                   free_ws;
262 
263     /*  of nxt_unit_request_info_impl_t */
264     nxt_queue_t                   active_req;
265 
266     /*  of nxt_unit_request_info_impl_t */
267     nxt_lvlhsh_t                  requests;
268 
269     nxt_unit_read_buf_t           *pending_read_head;
270     nxt_unit_read_buf_t           **pending_read_tail;
271     nxt_unit_read_buf_t           *free_read_buf;
272 
273     nxt_unit_mmap_buf_t           ctx_buf[2];
274     nxt_unit_read_buf_t           ctx_read_buf;
275 
276     nxt_unit_request_info_impl_t  req;
277 };
278 
279 
280 struct nxt_unit_impl_s {
281     nxt_unit_t               unit;
282     nxt_unit_callbacks_t     callbacks;
283 
284     nxt_atomic_t             use_count;
285 
286     uint32_t                 request_data_size;
287     uint32_t                 shm_mmap_limit;
288 
289     pthread_mutex_t          mutex;
290 
291     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
292     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
293 
294     nxt_unit_port_id_t       router_port_id;
295 
296     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
297 
298     pid_t                    pid;
299     int                      log_fd;
300     int                      online;
301 
302     nxt_unit_ctx_impl_t      main_ctx;
303 };
304 
305 
306 struct nxt_unit_port_impl_s {
307     nxt_unit_port_t          port;
308 
309     nxt_queue_link_t         link;
310     nxt_unit_process_t       *process;
311 };
312 
313 
314 struct nxt_unit_mmap_s {
315     nxt_port_mmap_header_t   *hdr;
316 };
317 
318 
319 struct nxt_unit_mmaps_s {
320     pthread_mutex_t          mutex;
321     uint32_t                 size;
322     uint32_t                 cap;
323     nxt_atomic_t             allocated_chunks;
324     nxt_unit_mmap_t          *elts;
325 };
326 
327 
328 struct nxt_unit_process_s {
329     pid_t                    pid;
330 
331     nxt_queue_t              ports;
332 
333     nxt_unit_mmaps_t         incoming;
334     nxt_unit_mmaps_t         outgoing;
335 
336     nxt_unit_impl_t          *lib;
337 
338     nxt_atomic_t             use_count;
339 
340     uint32_t                 next_port_id;
341 };
342 
343 
344 /* Explicitly using 32 bit types to avoid possible alignment. */
345 typedef struct {
346     int32_t   pid;
347     uint32_t  id;
348 } nxt_unit_port_hash_id_t;
349 
350 
351 nxt_unit_ctx_t *
352 nxt_unit_init(nxt_unit_init_t *init)
353 {
354     int              rc;
355     uint32_t         ready_stream, shm_limit;
356     nxt_unit_ctx_t   *ctx;
357     nxt_unit_impl_t  *lib;
358     nxt_unit_port_t  ready_port, router_port, read_port;
359 
360     lib = nxt_unit_create(init);
361     if (nxt_slow_path(lib == NULL)) {
362         return NULL;
363     }
364 
365     if (init->ready_port.id.pid != 0
366         && init->ready_stream != 0
367         && init->read_port.id.pid != 0)
368     {
369         ready_port = init->ready_port;
370         ready_stream = init->ready_stream;
371         router_port = init->router_port;
372         read_port = init->read_port;
373         lib->log_fd = init->log_fd;
374 
375         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
376                               ready_port.id.id);
377         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
378                               router_port.id.id);
379         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
380                               read_port.id.id);
381 
382     } else {
383         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
384                                &lib->log_fd, &ready_stream, &shm_limit);
385         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
386             goto fail;
387         }
388 
389         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
390                                 / PORT_MMAP_DATA_SIZE;
391     }
392 
393     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
394         lib->shm_mmap_limit = 1;
395     }
396 
397     lib->pid = read_port.id.pid;
398     ctx = &lib->main_ctx.ctx;
399 
400     rc = nxt_unit_add_port(ctx, &router_port);
401     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
402         nxt_unit_alert(NULL, "failed to add router_port");
403 
404         goto fail;
405     }
406 
407     lib->router_port_id = router_port.id;
408 
409     rc = nxt_unit_add_port(ctx, &read_port);
410     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
411         nxt_unit_alert(NULL, "failed to add read_port");
412 
413         goto fail;
414     }
415 
416     lib->main_ctx.read_port_id = read_port.id;
417 
418     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream);
419     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
420         nxt_unit_alert(NULL, "failed to send READY message");
421 
422         goto fail;
423     }
424 
425     close(ready_port.out_fd);
426 
427     return ctx;
428 
429 fail:
430 
431     free(lib);
432 
433     return NULL;
434 }
435 
436 
437 static nxt_unit_impl_t *
438 nxt_unit_create(nxt_unit_init_t *init)
439 {
440     int                   rc;
441     nxt_unit_impl_t       *lib;
442     nxt_unit_callbacks_t  *cb;
443 
444     lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size);
445     if (nxt_slow_path(lib == NULL)) {
446         nxt_unit_alert(NULL, "failed to allocate unit struct");
447 
448         return NULL;
449     }
450 
451     rc = pthread_mutex_init(&lib->mutex, NULL);
452     if (nxt_slow_path(rc != 0)) {
453         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
454 
455         goto fail;
456     }
457 
458     lib->unit.data = init->data;
459     lib->callbacks = init->callbacks;
460 
461     lib->request_data_size = init->request_data_size;
462     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
463                             / PORT_MMAP_DATA_SIZE;
464 
465     lib->processes.slot = NULL;
466     lib->ports.slot = NULL;
467 
468     lib->log_fd = STDERR_FILENO;
469     lib->online = 1;
470 
471     nxt_queue_init(&lib->contexts);
472 
473     lib->use_count = 0;
474 
475     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
476     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
477         goto fail;
478     }
479 
480     cb = &lib->callbacks;
481 
482     if (cb->request_handler == NULL) {
483         nxt_unit_alert(NULL, "request_handler is NULL");
484 
485         goto fail;
486     }
487 
488     if (cb->port_recv == NULL) {
489         cb->port_recv = nxt_unit_port_recv_default;
490     }
491 
492     return lib;
493 
494 fail:
495 
496     free(lib);
497 
498     return NULL;
499 }
500 
501 
502 static int
503 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
504     void *data)
505 {
506     int  rc;
507 
508     ctx_impl->ctx.data = data;
509     ctx_impl->ctx.unit = &lib->unit;
510 
511     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
512     if (nxt_slow_path(rc != 0)) {
513         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
514 
515         return NXT_UNIT_ERROR;
516     }
517 
518     nxt_unit_lib_use(lib);
519 
520     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
521 
522     ctx_impl->use_count = 1;
523 
524     nxt_queue_init(&ctx_impl->free_req);
525     nxt_queue_init(&ctx_impl->free_ws);
526     nxt_queue_init(&ctx_impl->active_req);
527 
528     ctx_impl->free_buf = NULL;
529     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
530     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
531 
532     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
533 
534     ctx_impl->pending_read_head = NULL;
535     ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
536     ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf;
537     ctx_impl->ctx_read_buf.next = NULL;
538 
539     ctx_impl->req.req.ctx = &ctx_impl->ctx;
540     ctx_impl->req.req.unit = &lib->unit;
541 
542     ctx_impl->read_port_fd = -1;
543     ctx_impl->requests.slot = 0;
544 
545     return NXT_UNIT_OK;
546 }
547 
548 
549 nxt_inline void
550 nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl)
551 {
552     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
553 }
554 
555 
556 nxt_inline void
557 nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl)
558 {
559     long c;
560 
561     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
562 
563     if (c == 1) {
564         nxt_unit_ctx_free(ctx_impl);
565     }
566 }
567 
568 
569 nxt_inline void
570 nxt_unit_lib_use(nxt_unit_impl_t *lib)
571 {
572     nxt_atomic_fetch_add(&lib->use_count, 1);
573 }
574 
575 
576 nxt_inline void
577 nxt_unit_lib_release(nxt_unit_impl_t *lib)
578 {
579     long                c;
580     nxt_unit_process_t  *process;
581 
582     c = nxt_atomic_fetch_add(&lib->use_count, -1);
583 
584     if (c == 1) {
585         for ( ;; ) {
586             pthread_mutex_lock(&lib->mutex);
587 
588             process = nxt_unit_process_pop_first(lib);
589             if (process == NULL) {
590                 pthread_mutex_unlock(&lib->mutex);
591 
592                 break;
593             }
594 
595             nxt_unit_remove_process(lib, process);
596         }
597 
598         pthread_mutex_destroy(&lib->mutex);
599 
600         free(lib);
601     }
602 }
603 
604 
605 nxt_inline void
606 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
607     nxt_unit_mmap_buf_t *mmap_buf)
608 {
609     mmap_buf->next = *head;
610 
611     if (mmap_buf->next != NULL) {
612         mmap_buf->next->prev = &mmap_buf->next;
613     }
614 
615     *head = mmap_buf;
616     mmap_buf->prev = head;
617 }
618 
619 
620 nxt_inline void
621 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
622     nxt_unit_mmap_buf_t *mmap_buf)
623 {
624     while (*prev != NULL) {
625         prev = &(*prev)->next;
626     }
627 
628     nxt_unit_mmap_buf_insert(prev, mmap_buf);
629 }
630 
631 
632 nxt_inline void
633 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
634 {
635     nxt_unit_mmap_buf_t  **prev;
636 
637     prev = mmap_buf->prev;
638 
639     if (mmap_buf->next != NULL) {
640         mmap_buf->next->prev = prev;
641     }
642 
643     if (prev != NULL) {
644         *prev = mmap_buf->next;
645     }
646 }
647 
648 
649 static int
650 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
651     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
652     uint32_t *shm_limit)
653 {
654     int       rc;
655     int       ready_fd, router_fd, read_fd;
656     char      *unit_init, *version_end;
657     long      version_length;
658     int64_t   ready_pid, router_pid, read_pid;
659     uint32_t  ready_stream, router_id, ready_id, read_id;
660 
661     unit_init = getenv(NXT_UNIT_INIT_ENV);
662     if (nxt_slow_path(unit_init == NULL)) {
663         nxt_unit_alert(NULL, "%s is not in the current environment",
664                        NXT_UNIT_INIT_ENV);
665 
666         return NXT_UNIT_ERROR;
667     }
668 
669     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
670 
671     version_length = nxt_length(NXT_VERSION);
672 
673     version_end = strchr(unit_init, ';');
674     if (version_end == NULL
675         || version_end - unit_init != version_length
676         || memcmp(unit_init, NXT_VERSION, version_length) != 0)
677     {
678         nxt_unit_alert(NULL, "version check error");
679 
680         return NXT_UNIT_ERROR;
681     }
682 
683     rc = sscanf(version_end + 1,
684                 "%"PRIu32";"
685                 "%"PRId64",%"PRIu32",%d;"
686                 "%"PRId64",%"PRIu32",%d;"
687                 "%"PRId64",%"PRIu32",%d;"
688                 "%d,%"PRIu32,
689                 &ready_stream,
690                 &ready_pid, &ready_id, &ready_fd,
691                 &router_pid, &router_id, &router_fd,
692                 &read_pid, &read_id, &read_fd,
693                 log_fd, shm_limit);
694 
695     if (nxt_slow_path(rc != 12)) {
696         nxt_unit_alert(NULL, "failed to scan variables: %d", rc);
697 
698         return NXT_UNIT_ERROR;
699     }
700 
701     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
702 
703     ready_port->in_fd = -1;
704     ready_port->out_fd = ready_fd;
705     ready_port->data = NULL;
706 
707     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
708 
709     router_port->in_fd = -1;
710     router_port->out_fd = router_fd;
711     router_port->data = NULL;
712 
713     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
714 
715     read_port->in_fd = read_fd;
716     read_port->out_fd = -1;
717     read_port->data = NULL;
718 
719     *stream = ready_stream;
720 
721     return NXT_UNIT_OK;
722 }
723 
724 
725 static int
726 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream)
727 {
728     ssize_t          res;
729     nxt_port_msg_t   msg;
730     nxt_unit_impl_t  *lib;
731 
732     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
733 
734     msg.stream = stream;
735     msg.pid = lib->pid;
736     msg.reply_port = 0;
737     msg.type = _NXT_PORT_MSG_PROCESS_READY;
738     msg.last = 1;
739     msg.mmap = 0;
740     msg.nf = 0;
741     msg.mf = 0;
742     msg.tracking = 0;
743 
744     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0);
745     if (res != sizeof(msg)) {
746         return NXT_UNIT_ERROR;
747     }
748 
749     return NXT_UNIT_OK;
750 }
751 
752 
753 int
754 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
755     void *buf, size_t buf_size, void *oob, size_t oob_size)
756 {
757     int                  rc;
758     pid_t                pid;
759     struct cmsghdr       *cm;
760     nxt_port_msg_t       *port_msg;
761     nxt_unit_impl_t      *lib;
762     nxt_unit_recv_msg_t  recv_msg;
763 
764     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
765 
766     rc = NXT_UNIT_ERROR;
767     recv_msg.fd = -1;
768     recv_msg.process = NULL;
769     port_msg = buf;
770     cm = oob;
771 
772     if (oob_size >= CMSG_SPACE(sizeof(int))
773         && cm->cmsg_len == CMSG_LEN(sizeof(int))
774         && cm->cmsg_level == SOL_SOCKET
775         && cm->cmsg_type == SCM_RIGHTS)
776     {
777         memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
778     }
779 
780     recv_msg.incoming_buf = NULL;
781 
782     if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
783         nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
784         goto fail;
785     }
786 
787     recv_msg.stream = port_msg->stream;
788     recv_msg.pid = port_msg->pid;
789     recv_msg.reply_port = port_msg->reply_port;
790     recv_msg.last = port_msg->last;
791     recv_msg.mmap = port_msg->mmap;
792 
793     recv_msg.start = port_msg + 1;
794     recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
795 
796     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
797         nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
798                       port_msg->stream, (int) port_msg->type);
799         goto fail;
800     }
801 
802     if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
803         rc = NXT_UNIT_OK;
804 
805         goto fail;
806     }
807 
808     /* Fragmentation is unsupported. */
809     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
810         nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
811                       port_msg->stream, (int) port_msg->type);
812         goto fail;
813     }
814 
815     if (port_msg->mmap) {
816         if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
817             goto fail;
818         }
819     }
820 
821     switch (port_msg->type) {
822 
823     case _NXT_PORT_MSG_QUIT:
824         nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
825 
826         nxt_unit_quit(ctx);
827         rc = NXT_UNIT_OK;
828         break;
829 
830     case _NXT_PORT_MSG_NEW_PORT:
831         rc = nxt_unit_process_new_port(ctx, &recv_msg);
832         break;
833 
834     case _NXT_PORT_MSG_CHANGE_FILE:
835         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
836                        port_msg->stream, recv_msg.fd);
837 
838         if (dup2(recv_msg.fd, lib->log_fd) == -1) {
839             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
840                            port_msg->stream, recv_msg.fd, lib->log_fd,
841                            strerror(errno), errno);
842 
843             goto fail;
844         }
845 
846         rc = NXT_UNIT_OK;
847         break;
848 
849     case _NXT_PORT_MSG_MMAP:
850         if (nxt_slow_path(recv_msg.fd < 0)) {
851             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
852                            port_msg->stream, recv_msg.fd);
853 
854             goto fail;
855         }
856 
857         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
858         break;
859 
860     case _NXT_PORT_MSG_REQ_HEADERS:
861         rc = nxt_unit_process_req_headers(ctx, &recv_msg);
862         break;
863 
864     case _NXT_PORT_MSG_WEBSOCKET:
865         rc = nxt_unit_process_websocket(ctx, &recv_msg);
866         break;
867 
868     case _NXT_PORT_MSG_REMOVE_PID:
869         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
870             nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
871                           "(%d != %d)", port_msg->stream, (int) recv_msg.size,
872                           (int) sizeof(pid));
873 
874             goto fail;
875         }
876 
877         memcpy(&pid, recv_msg.start, sizeof(pid));
878 
879         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
880                        port_msg->stream, (int) pid);
881 
882         nxt_unit_remove_pid(lib, pid);
883 
884         rc = NXT_UNIT_OK;
885         break;
886 
887     case _NXT_PORT_MSG_SHM_ACK:
888         rc = nxt_unit_process_shm_ack(ctx);
889         break;
890 
891     default:
892         nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
893                        port_msg->stream, (int) port_msg->type);
894 
895         goto fail;
896     }
897 
898 fail:
899 
900     if (recv_msg.fd != -1) {
901         close(recv_msg.fd);
902     }
903 
904     while (recv_msg.incoming_buf != NULL) {
905         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
906     }
907 
908     if (recv_msg.process != NULL) {
909         nxt_unit_process_release(recv_msg.process);
910     }
911 
912     return rc;
913 }
914 
915 
916 static int
917 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
918 {
919     int                      nb;
920     nxt_unit_port_t          new_port;
921     nxt_port_msg_new_port_t  *new_port_msg;
922 
923     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
924         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
925                       "invalid message size (%d)",
926                       recv_msg->stream, (int) recv_msg->size);
927 
928         return NXT_UNIT_ERROR;
929     }
930 
931     if (nxt_slow_path(recv_msg->fd < 0)) {
932         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
933                        recv_msg->stream, recv_msg->fd);
934 
935         return NXT_UNIT_ERROR;
936     }
937 
938     new_port_msg = recv_msg->start;
939 
940     nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
941                    recv_msg->stream, (int) new_port_msg->pid,
942                    (int) new_port_msg->id, recv_msg->fd);
943 
944     nb = 0;
945 
946     if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
947         nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
948                        "failed: %s (%d)",
949                        recv_msg->stream, recv_msg->fd, strerror(errno), errno);
950 
951         return NXT_UNIT_ERROR;
952     }
953 
954     nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
955                           new_port_msg->id);
956 
957     new_port.in_fd = -1;
958     new_port.out_fd = recv_msg->fd;
959     new_port.data = NULL;
960 
961     recv_msg->fd = -1;
962 
963     return nxt_unit_add_port(ctx, &new_port);
964 }
965 
966 
967 static int
968 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
969 {
970     nxt_unit_impl_t               *lib;
971     nxt_unit_request_t            *r;
972     nxt_unit_mmap_buf_t           *b;
973     nxt_unit_request_info_t       *req;
974     nxt_unit_request_info_impl_t  *req_impl;
975 
976     if (nxt_slow_path(recv_msg->mmap == 0)) {
977         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
978                       recv_msg->stream);
979 
980         return NXT_UNIT_ERROR;
981     }
982 
983     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
984         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
985                       "%d expected", recv_msg->stream, (int) recv_msg->size,
986                       (int) sizeof(nxt_unit_request_t));
987 
988         return NXT_UNIT_ERROR;
989     }
990 
991     req_impl = nxt_unit_request_info_get(ctx);
992     if (nxt_slow_path(req_impl == NULL)) {
993         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
994                       recv_msg->stream);
995 
996         return NXT_UNIT_ERROR;
997     }
998 
999     req = &req_impl->req;
1000 
1001     nxt_unit_port_id_init(&req->response_port, recv_msg->pid,
1002                           recv_msg->reply_port);
1003 
1004     req->request = recv_msg->start;
1005 
1006     b = recv_msg->incoming_buf;
1007 
1008     req->request_buf = &b->buf;
1009     req->response = NULL;
1010     req->response_buf = NULL;
1011 
1012     r = req->request;
1013 
1014     req->content_length = r->content_length;
1015 
1016     req->content_buf = req->request_buf;
1017     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1018 
1019     /* "Move" process reference to req_impl. */
1020     req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
1021     if (nxt_slow_path(req_impl->process == NULL)) {
1022         return NXT_UNIT_ERROR;
1023     }
1024 
1025     recv_msg->process = NULL;
1026 
1027     req_impl->stream = recv_msg->stream;
1028 
1029     req_impl->outgoing_buf = NULL;
1030 
1031     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1032         b->req = req;
1033     }
1034 
1035     /* "Move" incoming buffer list to req_impl. */
1036     req_impl->incoming_buf = recv_msg->incoming_buf;
1037     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1038     recv_msg->incoming_buf = NULL;
1039 
1040     req->content_fd = recv_msg->fd;
1041     recv_msg->fd = -1;
1042 
1043     req->response_max_fields = 0;
1044     req_impl->state = NXT_UNIT_RS_START;
1045     req_impl->websocket = 0;
1046 
1047     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1048                    (int) r->method_length,
1049                    (char *) nxt_unit_sptr_get(&r->method),
1050                    (int) r->target_length,
1051                    (char *) nxt_unit_sptr_get(&r->target),
1052                    (int) r->content_length);
1053 
1054     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1055 
1056     lib->callbacks.request_handler(req);
1057 
1058     return NXT_UNIT_OK;
1059 }
1060 
1061 
1062 static int
1063 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1064 {
1065     size_t                           hsize;
1066     nxt_unit_impl_t                  *lib;
1067     nxt_unit_mmap_buf_t              *b;
1068     nxt_unit_ctx_impl_t              *ctx_impl;
1069     nxt_unit_callbacks_t             *cb;
1070     nxt_unit_request_info_t          *req;
1071     nxt_unit_request_info_impl_t     *req_impl;
1072     nxt_unit_websocket_frame_impl_t  *ws_impl;
1073 
1074     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1075 
1076     req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
1077                                           recv_msg->last);
1078     if (req_impl == NULL) {
1079         return NXT_UNIT_OK;
1080     }
1081 
1082     req = &req_impl->req;
1083 
1084     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1085     cb = &lib->callbacks;
1086 
1087     if (cb->websocket_handler && recv_msg->size >= 2) {
1088         ws_impl = nxt_unit_websocket_frame_get(ctx);
1089         if (nxt_slow_path(ws_impl == NULL)) {
1090             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1091                           req_impl->stream);
1092 
1093             return NXT_UNIT_ERROR;
1094         }
1095 
1096         ws_impl->ws.req = req;
1097 
1098         ws_impl->buf = NULL;
1099 
1100         if (recv_msg->mmap) {
1101             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1102                 b->req = req;
1103             }
1104 
1105             /* "Move" incoming buffer list to ws_impl. */
1106             ws_impl->buf = recv_msg->incoming_buf;
1107             ws_impl->buf->prev = &ws_impl->buf;
1108             recv_msg->incoming_buf = NULL;
1109 
1110             b = ws_impl->buf;
1111 
1112         } else {
1113             b = nxt_unit_mmap_buf_get(ctx);
1114             if (nxt_slow_path(b == NULL)) {
1115                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1116                                req_impl->stream);
1117 
1118                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1119 
1120                 return NXT_UNIT_ERROR;
1121             }
1122 
1123             b->req = req;
1124             b->buf.start = recv_msg->start;
1125             b->buf.free = b->buf.start;
1126             b->buf.end = b->buf.start + recv_msg->size;
1127 
1128             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1129         }
1130 
1131         ws_impl->ws.header = (void *) b->buf.start;
1132         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1133             ws_impl->ws.header);
1134 
1135         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1136 
1137         if (ws_impl->ws.header->mask) {
1138             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1139 
1140         } else {
1141             ws_impl->ws.mask = NULL;
1142         }
1143 
1144         b->buf.free += hsize;
1145 
1146         ws_impl->ws.content_buf = &b->buf;
1147         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1148 
1149         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1150                            "payload_len=%"PRIu64,
1151                             ws_impl->ws.header->opcode,
1152                             ws_impl->ws.payload_len);
1153 
1154         cb->websocket_handler(&ws_impl->ws);
1155     }
1156 
1157     if (recv_msg->last) {
1158         req_impl->websocket = 0;
1159 
1160         if (cb->close_handler) {
1161             nxt_unit_req_debug(req, "close_handler");
1162 
1163             cb->close_handler(req);
1164 
1165         } else {
1166             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1167         }
1168     }
1169 
1170     return NXT_UNIT_OK;
1171 }
1172 
1173 
1174 static int
1175 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1176 {
1177     nxt_unit_impl_t       *lib;
1178     nxt_unit_callbacks_t  *cb;
1179 
1180     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1181     cb = &lib->callbacks;
1182 
1183     if (cb->shm_ack_handler != NULL) {
1184         cb->shm_ack_handler(ctx);
1185     }
1186 
1187     return NXT_UNIT_OK;
1188 }
1189 
1190 
1191 static nxt_unit_request_info_impl_t *
1192 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1193 {
1194     nxt_unit_impl_t               *lib;
1195     nxt_queue_link_t              *lnk;
1196     nxt_unit_ctx_impl_t           *ctx_impl;
1197     nxt_unit_request_info_impl_t  *req_impl;
1198 
1199     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1200 
1201     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1202 
1203     pthread_mutex_lock(&ctx_impl->mutex);
1204 
1205     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1206         pthread_mutex_unlock(&ctx_impl->mutex);
1207 
1208         req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
1209                           + lib->request_data_size);
1210         if (nxt_slow_path(req_impl == NULL)) {
1211             return NULL;
1212         }
1213 
1214         req_impl->req.unit = ctx->unit;
1215         req_impl->req.ctx = ctx;
1216 
1217         pthread_mutex_lock(&ctx_impl->mutex);
1218 
1219     } else {
1220         lnk = nxt_queue_first(&ctx_impl->free_req);
1221         nxt_queue_remove(lnk);
1222 
1223         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1224     }
1225 
1226     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1227 
1228     pthread_mutex_unlock(&ctx_impl->mutex);
1229 
1230     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1231 
1232     return req_impl;
1233 }
1234 
1235 
1236 static void
1237 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1238 {
1239     nxt_unit_ctx_impl_t           *ctx_impl;
1240     nxt_unit_request_info_impl_t  *req_impl;
1241 
1242     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1243     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1244 
1245     req->response = NULL;
1246     req->response_buf = NULL;
1247 
1248     if (req_impl->websocket) {
1249         nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
1250 
1251         req_impl->websocket = 0;
1252     }
1253 
1254     while (req_impl->outgoing_buf != NULL) {
1255         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1256     }
1257 
1258     while (req_impl->incoming_buf != NULL) {
1259         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1260     }
1261 
1262     if (req->content_fd != -1) {
1263         close(req->content_fd);
1264 
1265         req->content_fd = -1;
1266     }
1267 
1268     /*
1269      * Process release should go after buffers release to guarantee mmap
1270      * existence.
1271      */
1272     if (req_impl->process != NULL) {
1273         nxt_unit_process_release(req_impl->process);
1274 
1275         req_impl->process = NULL;
1276     }
1277 
1278     pthread_mutex_lock(&ctx_impl->mutex);
1279 
1280     nxt_queue_remove(&req_impl->link);
1281 
1282     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1283 
1284     pthread_mutex_unlock(&ctx_impl->mutex);
1285 
1286     req_impl->state = NXT_UNIT_RS_RELEASED;
1287 }
1288 
1289 
1290 static void
1291 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1292 {
1293     nxt_unit_ctx_impl_t  *ctx_impl;
1294 
1295     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1296 
1297     nxt_queue_remove(&req_impl->link);
1298 
1299     if (req_impl != &ctx_impl->req) {
1300         free(req_impl);
1301     }
1302 }
1303 
1304 
1305 static nxt_unit_websocket_frame_impl_t *
1306 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1307 {
1308     nxt_queue_link_t                 *lnk;
1309     nxt_unit_ctx_impl_t              *ctx_impl;
1310     nxt_unit_websocket_frame_impl_t  *ws_impl;
1311 
1312     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1313 
1314     pthread_mutex_lock(&ctx_impl->mutex);
1315 
1316     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1317         pthread_mutex_unlock(&ctx_impl->mutex);
1318 
1319         ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
1320         if (nxt_slow_path(ws_impl == NULL)) {
1321             return NULL;
1322         }
1323 
1324     } else {
1325         lnk = nxt_queue_first(&ctx_impl->free_ws);
1326         nxt_queue_remove(lnk);
1327 
1328         pthread_mutex_unlock(&ctx_impl->mutex);
1329 
1330         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1331     }
1332 
1333     ws_impl->ctx_impl = ctx_impl;
1334 
1335     return ws_impl;
1336 }
1337 
1338 
1339 static void
1340 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1341 {
1342     nxt_unit_websocket_frame_impl_t  *ws_impl;
1343 
1344     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1345 
1346     while (ws_impl->buf != NULL) {
1347         nxt_unit_mmap_buf_free(ws_impl->buf);
1348     }
1349 
1350     ws->req = NULL;
1351 
1352     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1353 
1354     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1355 
1356     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1357 }
1358 
1359 
1360 static void
1361 nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
1362 {
1363     nxt_queue_remove(&ws_impl->link);
1364 
1365     free(ws_impl);
1366 }
1367 
1368 
1369 uint16_t
1370 nxt_unit_field_hash(const char *name, size_t name_length)
1371 {
1372     u_char      ch;
1373     uint32_t    hash;
1374     const char  *p, *end;
1375 
1376     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1377     end = name + name_length;
1378 
1379     for (p = name; p < end; p++) {
1380         ch = *p;
1381         hash = (hash << 4) + hash + nxt_lowcase(ch);
1382     }
1383 
1384     hash = (hash >> 16) ^ hash;
1385 
1386     return hash;
1387 }
1388 
1389 
1390 void
1391 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1392 {
1393     uint32_t            i, j;
1394     nxt_unit_field_t    *fields, f;
1395     nxt_unit_request_t  *r;
1396 
1397     nxt_unit_req_debug(req, "group_dup_fields");
1398 
1399     r = req->request;
1400     fields = r->fields;
1401 
1402     for (i = 0; i < r->fields_count; i++) {
1403 
1404         switch (fields[i].hash) {
1405         case NXT_UNIT_HASH_CONTENT_LENGTH:
1406             r->content_length_field = i;
1407             break;
1408 
1409         case NXT_UNIT_HASH_CONTENT_TYPE:
1410             r->content_type_field = i;
1411             break;
1412 
1413         case NXT_UNIT_HASH_COOKIE:
1414             r->cookie_field = i;
1415             break;
1416         };
1417 
1418         for (j = i + 1; j < r->fields_count; j++) {
1419             if (fields[i].hash != fields[j].hash) {
1420                 continue;
1421             }
1422 
1423             if (j == i + 1) {
1424                 continue;
1425             }
1426 
1427             f = fields[j];
1428             f.name.offset += (j - (i + 1)) * sizeof(f);
1429             f.value.offset += (j - (i + 1)) * sizeof(f);
1430 
1431             while (j > i + 1) {
1432                 fields[j] = fields[j - 1];
1433                 fields[j].name.offset -= sizeof(f);
1434                 fields[j].value.offset -= sizeof(f);
1435                 j--;
1436             }
1437 
1438             fields[j] = f;
1439 
1440             i++;
1441         }
1442     }
1443 }
1444 
1445 
1446 int
1447 nxt_unit_response_init(nxt_unit_request_info_t *req,
1448     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
1449 {
1450     uint32_t                      buf_size;
1451     nxt_unit_buf_t                *buf;
1452     nxt_unit_request_info_impl_t  *req_impl;
1453 
1454     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1455 
1456     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1457         nxt_unit_req_warn(req, "init: response already sent");
1458 
1459         return NXT_UNIT_ERROR;
1460     }
1461 
1462     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
1463                        (int) max_fields_count, (int) max_fields_size);
1464 
1465     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
1466         nxt_unit_req_debug(req, "duplicate response init");
1467     }
1468 
1469     /*
1470      * Each field name and value 0-terminated by libunit,
1471      * this is the reason of '+ 2' below.
1472      */
1473     buf_size = sizeof(nxt_unit_response_t)
1474                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
1475                + max_fields_size;
1476 
1477     if (nxt_slow_path(req->response_buf != NULL)) {
1478         buf = req->response_buf;
1479 
1480         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
1481             goto init_response;
1482         }
1483 
1484         nxt_unit_buf_free(buf);
1485 
1486         req->response_buf = NULL;
1487         req->response = NULL;
1488         req->response_max_fields = 0;
1489 
1490         req_impl->state = NXT_UNIT_RS_START;
1491     }
1492 
1493     buf = nxt_unit_response_buf_alloc(req, buf_size);
1494     if (nxt_slow_path(buf == NULL)) {
1495         return NXT_UNIT_ERROR;
1496     }
1497 
1498 init_response:
1499 
1500     memset(buf->start, 0, sizeof(nxt_unit_response_t));
1501 
1502     req->response_buf = buf;
1503 
1504     req->response = (nxt_unit_response_t *) buf->start;
1505     req->response->status = status;
1506 
1507     buf->free = buf->start + sizeof(nxt_unit_response_t)
1508                 + max_fields_count * sizeof(nxt_unit_field_t);
1509 
1510     req->response_max_fields = max_fields_count;
1511     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
1512 
1513     return NXT_UNIT_OK;
1514 }
1515 
1516 
1517 int
1518 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
1519     uint32_t max_fields_count, uint32_t max_fields_size)
1520 {
1521     char                          *p;
1522     uint32_t                      i, buf_size;
1523     nxt_unit_buf_t                *buf;
1524     nxt_unit_field_t              *f, *src;
1525     nxt_unit_response_t           *resp;
1526     nxt_unit_request_info_impl_t  *req_impl;
1527 
1528     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1529 
1530     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1531         nxt_unit_req_warn(req, "realloc: response not init");
1532 
1533         return NXT_UNIT_ERROR;
1534     }
1535 
1536     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1537         nxt_unit_req_warn(req, "realloc: response already sent");
1538 
1539         return NXT_UNIT_ERROR;
1540     }
1541 
1542     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
1543         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
1544 
1545         return NXT_UNIT_ERROR;
1546     }
1547 
1548     /*
1549      * Each field name and value 0-terminated by libunit,
1550      * this is the reason of '+ 2' below.
1551      */
1552     buf_size = sizeof(nxt_unit_response_t)
1553                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
1554                + max_fields_size;
1555 
1556     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
1557 
1558     buf = nxt_unit_response_buf_alloc(req, buf_size);
1559     if (nxt_slow_path(buf == NULL)) {
1560         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
1561         return NXT_UNIT_ERROR;
1562     }
1563 
1564     resp = (nxt_unit_response_t *) buf->start;
1565 
1566     memset(resp, 0, sizeof(nxt_unit_response_t));
1567 
1568     resp->status = req->response->status;
1569     resp->content_length = req->response->content_length;
1570 
1571     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
1572     f = resp->fields;
1573 
1574     for (i = 0; i < req->response->fields_count; i++) {
1575         src = req->response->fields + i;
1576 
1577         if (nxt_slow_path(src->skip != 0)) {
1578             continue;
1579         }
1580 
1581         if (nxt_slow_path(src->name_length + src->value_length + 2
1582                           > (uint32_t) (buf->end - p)))
1583         {
1584             nxt_unit_req_warn(req, "realloc: not enough space for field"
1585                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
1586                   i, src, src->name_length, src->value_length);
1587 
1588             goto fail;
1589         }
1590 
1591         nxt_unit_sptr_set(&f->name, p);
1592         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
1593         *p++ = '\0';
1594 
1595         nxt_unit_sptr_set(&f->value, p);
1596         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
1597         *p++ = '\0';
1598 
1599         f->hash = src->hash;
1600         f->skip = 0;
1601         f->name_length = src->name_length;
1602         f->value_length = src->value_length;
1603 
1604         resp->fields_count++;
1605         f++;
1606     }
1607 
1608     if (req->response->piggyback_content_length > 0) {
1609         if (nxt_slow_path(req->response->piggyback_content_length
1610                           > (uint32_t) (buf->end - p)))
1611         {
1612             nxt_unit_req_warn(req, "realloc: not enought space for content"
1613                   " #%"PRIu32", %"PRIu32" required",
1614                   i, req->response->piggyback_content_length);
1615 
1616             goto fail;
1617         }
1618 
1619         resp->piggyback_content_length =
1620                                        req->response->piggyback_content_length;
1621 
1622         nxt_unit_sptr_set(&resp->piggyback_content, p);
1623         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
1624                        req->response->piggyback_content_length);
1625     }
1626 
1627     buf->free = p;
1628 
1629     nxt_unit_buf_free(req->response_buf);
1630 
1631     req->response = resp;
1632     req->response_buf = buf;
1633     req->response_max_fields = max_fields_count;
1634 
1635     return NXT_UNIT_OK;
1636 
1637 fail:
1638 
1639     nxt_unit_buf_free(buf);
1640 
1641     return NXT_UNIT_ERROR;
1642 }
1643 
1644 
1645 int
1646 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
1647 {
1648     nxt_unit_request_info_impl_t  *req_impl;
1649 
1650     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1651 
1652     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
1653 }
1654 
1655 
1656 int
1657 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
1658     const char *name, uint8_t name_length,
1659     const char *value, uint32_t value_length)
1660 {
1661     nxt_unit_buf_t                *buf;
1662     nxt_unit_field_t              *f;
1663     nxt_unit_response_t           *resp;
1664     nxt_unit_request_info_impl_t  *req_impl;
1665 
1666     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1667 
1668     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
1669         nxt_unit_req_warn(req, "add_field: response not initialized or "
1670                           "already sent");
1671 
1672         return NXT_UNIT_ERROR;
1673     }
1674 
1675     resp = req->response;
1676 
1677     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
1678         nxt_unit_req_warn(req, "add_field: too many response fields");
1679 
1680         return NXT_UNIT_ERROR;
1681     }
1682 
1683     buf = req->response_buf;
1684 
1685     if (nxt_slow_path(name_length + value_length + 2
1686                       > (uint32_t) (buf->end - buf->free)))
1687     {
1688         nxt_unit_req_warn(req, "add_field: response buffer overflow");
1689 
1690         return NXT_UNIT_ERROR;
1691     }
1692 
1693     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
1694                        resp->fields_count,
1695                        (int) name_length, name,
1696                        (int) value_length, value);
1697 
1698     f = resp->fields + resp->fields_count;
1699 
1700     nxt_unit_sptr_set(&f->name, buf->free);
1701     buf->free = nxt_cpymem(buf->free, name, name_length);
1702     *buf->free++ = '\0';
1703 
1704     nxt_unit_sptr_set(&f->value, buf->free);
1705     buf->free = nxt_cpymem(buf->free, value, value_length);
1706     *buf->free++ = '\0';
1707 
1708     f->hash = nxt_unit_field_hash(name, name_length);
1709     f->skip = 0;
1710     f->name_length = name_length;
1711     f->value_length = value_length;
1712 
1713     resp->fields_count++;
1714 
1715     return NXT_UNIT_OK;
1716 }
1717 
1718 
1719 int
1720 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
1721     const void* src, uint32_t size)
1722 {
1723     nxt_unit_buf_t                *buf;
1724     nxt_unit_response_t           *resp;
1725     nxt_unit_request_info_impl_t  *req_impl;
1726 
1727     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1728 
1729     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1730         nxt_unit_req_warn(req, "add_content: response not initialized yet");
1731 
1732         return NXT_UNIT_ERROR;
1733     }
1734 
1735     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1736         nxt_unit_req_warn(req, "add_content: response already sent");
1737 
1738         return NXT_UNIT_ERROR;
1739     }
1740 
1741     buf = req->response_buf;
1742 
1743     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
1744         nxt_unit_req_warn(req, "add_content: buffer overflow");
1745 
1746         return NXT_UNIT_ERROR;
1747     }
1748 
1749     resp = req->response;
1750 
1751     if (resp->piggyback_content_length == 0) {
1752         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
1753         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
1754     }
1755 
1756     resp->piggyback_content_length += size;
1757 
1758     buf->free = nxt_cpymem(buf->free, src, size);
1759 
1760     return NXT_UNIT_OK;
1761 }
1762 
1763 
1764 int
1765 nxt_unit_response_send(nxt_unit_request_info_t *req)
1766 {
1767     int                           rc;
1768     nxt_unit_mmap_buf_t           *mmap_buf;
1769     nxt_unit_request_info_impl_t  *req_impl;
1770 
1771     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1772 
1773     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1774         nxt_unit_req_warn(req, "send: response is not initialized yet");
1775 
1776         return NXT_UNIT_ERROR;
1777     }
1778 
1779     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1780         nxt_unit_req_warn(req, "send: response already sent");
1781 
1782         return NXT_UNIT_ERROR;
1783     }
1784 
1785     if (req->request->websocket_handshake && req->response->status == 101) {
1786         nxt_unit_response_upgrade(req);
1787     }
1788 
1789     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
1790                        req->response->fields_count,
1791                        (int) (req->response_buf->free
1792                               - req->response_buf->start));
1793 
1794     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
1795 
1796     rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
1797     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
1798         req->response = NULL;
1799         req->response_buf = NULL;
1800         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
1801 
1802         nxt_unit_mmap_buf_free(mmap_buf);
1803     }
1804 
1805     return rc;
1806 }
1807 
1808 
1809 int
1810 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
1811 {
1812     nxt_unit_request_info_impl_t  *req_impl;
1813 
1814     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1815 
1816     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
1817 }
1818 
1819 
1820 nxt_unit_buf_t *
1821 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
1822 {
1823     int                           rc;
1824     nxt_unit_mmap_buf_t           *mmap_buf;
1825     nxt_unit_request_info_impl_t  *req_impl;
1826 
1827     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
1828         nxt_unit_req_warn(req, "response_buf_alloc: "
1829                           "requested buffer (%"PRIu32") too big", size);
1830 
1831         return NULL;
1832     }
1833 
1834     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
1835 
1836     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1837 
1838     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
1839     if (nxt_slow_path(mmap_buf == NULL)) {
1840         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
1841 
1842         return NULL;
1843     }
1844 
1845     mmap_buf->req = req;
1846 
1847     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
1848 
1849     rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
1850                                    &req->response_port, size, size, mmap_buf,
1851                                    NULL);
1852     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1853         nxt_unit_mmap_buf_release(mmap_buf);
1854 
1855         return NULL;
1856     }
1857 
1858     return &mmap_buf->buf;
1859 }
1860 
1861 
1862 static nxt_unit_process_t *
1863 nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1864 {
1865     nxt_unit_impl_t  *lib;
1866 
1867     if (recv_msg->process != NULL) {
1868         return recv_msg->process;
1869     }
1870 
1871     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1872 
1873     pthread_mutex_lock(&lib->mutex);
1874 
1875     recv_msg->process = nxt_unit_process_find(lib, recv_msg->pid, 0);
1876 
1877     pthread_mutex_unlock(&lib->mutex);
1878 
1879     if (recv_msg->process == NULL) {
1880         nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
1881                       recv_msg->stream, (int) recv_msg->pid);
1882     }
1883 
1884     return recv_msg->process;
1885 }
1886 
1887 
1888 static nxt_unit_mmap_buf_t *
1889 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
1890 {
1891     nxt_unit_mmap_buf_t  *mmap_buf;
1892     nxt_unit_ctx_impl_t  *ctx_impl;
1893 
1894     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1895 
1896     pthread_mutex_lock(&ctx_impl->mutex);
1897 
1898     if (ctx_impl->free_buf == NULL) {
1899         pthread_mutex_unlock(&ctx_impl->mutex);
1900 
1901         mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
1902         if (nxt_slow_path(mmap_buf == NULL)) {
1903             return NULL;
1904         }
1905 
1906     } else {
1907         mmap_buf = ctx_impl->free_buf;
1908 
1909         nxt_unit_mmap_buf_unlink(mmap_buf);
1910 
1911         pthread_mutex_unlock(&ctx_impl->mutex);
1912     }
1913 
1914     mmap_buf->ctx_impl = ctx_impl;
1915 
1916     mmap_buf->hdr = NULL;
1917     mmap_buf->free_ptr = NULL;
1918 
1919     return mmap_buf;
1920 }
1921 
1922 
1923 static void
1924 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
1925 {
1926     nxt_unit_mmap_buf_unlink(mmap_buf);
1927 
1928     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
1929 
1930     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
1931 
1932     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
1933 }
1934 
1935 
1936 int
1937 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
1938 {
1939     return req->request->websocket_handshake;
1940 }
1941 
1942 
1943 int
1944 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
1945 {
1946     int                           rc;
1947     nxt_unit_ctx_impl_t           *ctx_impl;
1948     nxt_unit_request_info_impl_t  *req_impl;
1949 
1950     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1951 
1952     if (nxt_slow_path(req_impl->websocket != 0)) {
1953         nxt_unit_req_debug(req, "upgrade: already upgraded");
1954 
1955         return NXT_UNIT_OK;
1956     }
1957 
1958     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1959         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
1960 
1961         return NXT_UNIT_ERROR;
1962     }
1963 
1964     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1965         nxt_unit_req_warn(req, "upgrade: response already sent");
1966 
1967         return NXT_UNIT_ERROR;
1968     }
1969 
1970     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1971 
1972     rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
1973     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1974         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
1975 
1976         return NXT_UNIT_ERROR;
1977     }
1978 
1979     req_impl->websocket = 1;
1980 
1981     req->response->status = 101;
1982 
1983     return NXT_UNIT_OK;
1984 }
1985 
1986 
1987 int
1988 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
1989 {
1990     nxt_unit_request_info_impl_t  *req_impl;
1991 
1992     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1993 
1994     return req_impl->websocket;
1995 }
1996 
1997 
1998 nxt_unit_request_info_t *
1999 nxt_unit_get_request_info_from_data(void *data)
2000 {
2001     nxt_unit_request_info_impl_t  *req_impl;
2002 
2003     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2004 
2005     return &req_impl->req;
2006 }
2007 
2008 
2009 int
2010 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2011 {
2012     int                           rc;
2013     nxt_unit_mmap_buf_t           *mmap_buf;
2014     nxt_unit_request_info_t       *req;
2015     nxt_unit_request_info_impl_t  *req_impl;
2016 
2017     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2018 
2019     req = mmap_buf->req;
2020     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2021 
2022     nxt_unit_req_debug(req, "buf_send: %d bytes",
2023                        (int) (buf->free - buf->start));
2024 
2025     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2026         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2027 
2028         return NXT_UNIT_ERROR;
2029     }
2030 
2031     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2032         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2033 
2034         return NXT_UNIT_ERROR;
2035     }
2036 
2037     if (nxt_fast_path(buf->free > buf->start)) {
2038         rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
2039         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2040             return rc;
2041         }
2042     }
2043 
2044     nxt_unit_mmap_buf_free(mmap_buf);
2045 
2046     return NXT_UNIT_OK;
2047 }
2048 
2049 
2050 static void
2051 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2052 {
2053     int                           rc;
2054     nxt_unit_mmap_buf_t           *mmap_buf;
2055     nxt_unit_request_info_t       *req;
2056     nxt_unit_request_info_impl_t  *req_impl;
2057 
2058     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2059 
2060     req = mmap_buf->req;
2061     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2062 
2063     rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
2064     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2065         nxt_unit_mmap_buf_free(mmap_buf);
2066 
2067         nxt_unit_request_info_release(req);
2068 
2069     } else {
2070         nxt_unit_request_done(req, rc);
2071     }
2072 }
2073 
2074 
2075 static int
2076 nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
2077     nxt_unit_mmap_buf_t *mmap_buf, int last)
2078 {
2079     struct {
2080         nxt_port_msg_t       msg;
2081         nxt_port_mmap_msg_t  mmap_msg;
2082     } m;
2083 
2084     int                      rc;
2085     u_char                   *last_used, *first_free;
2086     ssize_t                  res;
2087     nxt_chunk_id_t           first_free_chunk;
2088     nxt_unit_buf_t           *buf;
2089     nxt_unit_impl_t          *lib;
2090     nxt_port_mmap_header_t   *hdr;
2091 
2092     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2093 
2094     buf = &mmap_buf->buf;
2095     hdr = mmap_buf->hdr;
2096 
2097     m.mmap_msg.size = buf->free - buf->start;
2098 
2099     m.msg.stream = stream;
2100     m.msg.pid = lib->pid;
2101     m.msg.reply_port = 0;
2102     m.msg.type = _NXT_PORT_MSG_DATA;
2103     m.msg.last = last != 0;
2104     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2105     m.msg.nf = 0;
2106     m.msg.mf = 0;
2107     m.msg.tracking = 0;
2108 
2109     rc = NXT_UNIT_ERROR;
2110 
2111     if (m.msg.mmap) {
2112         m.mmap_msg.mmap_id = hdr->id;
2113         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2114                                                      (u_char *) buf->start);
2115 
2116         nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2117                        stream,
2118                        (int) m.mmap_msg.mmap_id,
2119                        (int) m.mmap_msg.chunk_id,
2120                        (int) m.mmap_msg.size);
2121 
2122         res = nxt_unit_port_send(ctx, &mmap_buf->port_id, &m, sizeof(m),
2123                                  NULL, 0);
2124         if (nxt_slow_path(res != sizeof(m))) {
2125             goto free_buf;
2126         }
2127 
2128         last_used = (u_char *) buf->free - 1;
2129         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2130 
2131         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2132             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2133 
2134             buf->start = (char *) first_free;
2135             buf->free = buf->start;
2136 
2137             if (buf->end < buf->start) {
2138                 buf->end = buf->start;
2139             }
2140 
2141         } else {
2142             buf->start = NULL;
2143             buf->free = NULL;
2144             buf->end = NULL;
2145 
2146             mmap_buf->hdr = NULL;
2147         }
2148 
2149         nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks,
2150                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2151 
2152         nxt_unit_debug(ctx, "process %d allocated_chunks %d",
2153                        mmap_buf->process->pid,
2154                        (int) mmap_buf->process->outgoing.allocated_chunks);
2155 
2156     } else {
2157         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2158                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2159         {
2160             nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer"
2161                           ": no space reserved for message header", stream);
2162 
2163             goto free_buf;
2164         }
2165 
2166         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2167 
2168         nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d",
2169                        stream,
2170                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2171 
2172         res = nxt_unit_port_send(ctx, &mmap_buf->port_id,
2173                                  buf->start - sizeof(m.msg),
2174                                  m.mmap_msg.size + sizeof(m.msg),
2175                                  NULL, 0);
2176         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2177             goto free_buf;
2178         }
2179     }
2180 
2181     rc = NXT_UNIT_OK;
2182 
2183 free_buf:
2184 
2185     nxt_unit_free_outgoing_buf(mmap_buf);
2186 
2187     return rc;
2188 }
2189 
2190 
2191 void
2192 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2193 {
2194     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2195 }
2196 
2197 
2198 static void
2199 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2200 {
2201     nxt_unit_free_outgoing_buf(mmap_buf);
2202 
2203     nxt_unit_mmap_buf_release(mmap_buf);
2204 }
2205 
2206 
2207 static void
2208 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2209 {
2210     if (mmap_buf->hdr != NULL) {
2211         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2212                               mmap_buf->process,
2213                               mmap_buf->hdr, mmap_buf->buf.start,
2214                               mmap_buf->buf.end - mmap_buf->buf.start);
2215 
2216         mmap_buf->hdr = NULL;
2217 
2218         return;
2219     }
2220 
2221     if (mmap_buf->free_ptr != NULL) {
2222         free(mmap_buf->free_ptr);
2223 
2224         mmap_buf->free_ptr = NULL;
2225     }
2226 }
2227 
2228 
2229 static nxt_unit_read_buf_t *
2230 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2231 {
2232     nxt_unit_ctx_impl_t  *ctx_impl;
2233 
2234     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2235 
2236     pthread_mutex_lock(&ctx_impl->mutex);
2237 
2238     return nxt_unit_read_buf_get_impl(ctx_impl);
2239 }
2240 
2241 
2242 static nxt_unit_read_buf_t *
2243 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2244 {
2245     nxt_unit_read_buf_t  *rbuf;
2246 
2247     if (ctx_impl->free_read_buf != NULL) {
2248         rbuf = ctx_impl->free_read_buf;
2249         ctx_impl->free_read_buf = rbuf->next;
2250 
2251         pthread_mutex_unlock(&ctx_impl->mutex);
2252 
2253         return rbuf;
2254     }
2255 
2256     pthread_mutex_unlock(&ctx_impl->mutex);
2257 
2258     rbuf = malloc(sizeof(nxt_unit_read_buf_t));
2259 
2260     return rbuf;
2261 }
2262 
2263 
2264 static void
2265 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2266     nxt_unit_read_buf_t *rbuf)
2267 {
2268     nxt_unit_ctx_impl_t  *ctx_impl;
2269 
2270     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2271 
2272     pthread_mutex_lock(&ctx_impl->mutex);
2273 
2274     rbuf->next = ctx_impl->free_read_buf;
2275     ctx_impl->free_read_buf = rbuf;
2276 
2277     pthread_mutex_unlock(&ctx_impl->mutex);
2278 }
2279 
2280 
2281 nxt_unit_buf_t *
2282 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2283 {
2284     nxt_unit_mmap_buf_t  *mmap_buf;
2285 
2286     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2287 
2288     if (mmap_buf->next == NULL) {
2289         return NULL;
2290     }
2291 
2292     return &mmap_buf->next->buf;
2293 }
2294 
2295 
2296 uint32_t
2297 nxt_unit_buf_max(void)
2298 {
2299     return PORT_MMAP_DATA_SIZE;
2300 }
2301 
2302 
2303 uint32_t
2304 nxt_unit_buf_min(void)
2305 {
2306     return PORT_MMAP_CHUNK_SIZE;
2307 }
2308 
2309 
2310 int
2311 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2312     size_t size)
2313 {
2314     ssize_t  res;
2315 
2316     res = nxt_unit_response_write_nb(req, start, size, size);
2317 
2318     return res < 0 ? -res : NXT_UNIT_OK;
2319 }
2320 
2321 
2322 ssize_t
2323 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2324     size_t size, size_t min_size)
2325 {
2326     int                           rc;
2327     ssize_t                       sent;
2328     uint32_t                      part_size, min_part_size, buf_size;
2329     const char                    *part_start;
2330     nxt_unit_mmap_buf_t           mmap_buf;
2331     nxt_unit_request_info_impl_t  *req_impl;
2332     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2333 
2334     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2335 
2336     part_start = start;
2337     sent = 0;
2338 
2339     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2340         nxt_unit_req_warn(req, "write: response not initialized yet");
2341 
2342         return -NXT_UNIT_ERROR;
2343     }
2344 
2345     /* Check if response is not send yet. */
2346     if (nxt_slow_path(req->response_buf != NULL)) {
2347         part_size = req->response_buf->end - req->response_buf->free;
2348         part_size = nxt_min(size, part_size);
2349 
2350         rc = nxt_unit_response_add_content(req, part_start, part_size);
2351         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2352             return -rc;
2353         }
2354 
2355         rc = nxt_unit_response_send(req);
2356         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2357             return -rc;
2358         }
2359 
2360         size -= part_size;
2361         part_start += part_size;
2362         sent += part_size;
2363 
2364         min_size -= nxt_min(min_size, part_size);
2365     }
2366 
2367     while (size > 0) {
2368         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2369         min_part_size = nxt_min(min_size, part_size);
2370         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2371 
2372         rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
2373                                        &req->response_port, part_size,
2374                                        min_part_size, &mmap_buf, local_buf);
2375         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2376             return -rc;
2377         }
2378 
2379         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2380         if (nxt_slow_path(buf_size == 0)) {
2381             return sent;
2382         }
2383         part_size = nxt_min(buf_size, part_size);
2384 
2385         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2386                                        part_start, part_size);
2387 
2388         rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
2389         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2390             return -rc;
2391         }
2392 
2393         size -= part_size;
2394         part_start += part_size;
2395         sent += part_size;
2396 
2397         min_size -= nxt_min(min_size, part_size);
2398     }
2399 
2400     return sent;
2401 }
2402 
2403 
2404 int
2405 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2406     nxt_unit_read_info_t *read_info)
2407 {
2408     int                           rc;
2409     ssize_t                       n;
2410     uint32_t                      buf_size;
2411     nxt_unit_buf_t                *buf;
2412     nxt_unit_mmap_buf_t           mmap_buf;
2413     nxt_unit_request_info_impl_t  *req_impl;
2414     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2415 
2416     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2417 
2418     /* Check if response is not send yet. */
2419     if (nxt_slow_path(req->response_buf)) {
2420 
2421         /* Enable content in headers buf. */
2422         rc = nxt_unit_response_add_content(req, "", 0);
2423         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2424             nxt_unit_req_error(req, "Failed to add piggyback content");
2425 
2426             return rc;
2427         }
2428 
2429         buf = req->response_buf;
2430 
2431         while (buf->end - buf->free > 0) {
2432             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2433             if (nxt_slow_path(n < 0)) {
2434                 nxt_unit_req_error(req, "Read error");
2435 
2436                 return NXT_UNIT_ERROR;
2437             }
2438 
2439             /* Manually increase sizes. */
2440             buf->free += n;
2441             req->response->piggyback_content_length += n;
2442 
2443             if (read_info->eof) {
2444                 break;
2445             }
2446         }
2447 
2448         rc = nxt_unit_response_send(req);
2449         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2450             nxt_unit_req_error(req, "Failed to send headers with content");
2451 
2452             return rc;
2453         }
2454 
2455         if (read_info->eof) {
2456             return NXT_UNIT_OK;
2457         }
2458     }
2459 
2460     while (!read_info->eof) {
2461         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
2462                            read_info->buf_size);
2463 
2464         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
2465 
2466         rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
2467                                        &req->response_port,
2468                                        buf_size, buf_size,
2469                                        &mmap_buf, local_buf);
2470         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2471             return rc;
2472         }
2473 
2474         buf = &mmap_buf.buf;
2475 
2476         while (!read_info->eof && buf->end > buf->free) {
2477             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2478             if (nxt_slow_path(n < 0)) {
2479                 nxt_unit_req_error(req, "Read error");
2480 
2481                 nxt_unit_free_outgoing_buf(&mmap_buf);
2482 
2483                 return NXT_UNIT_ERROR;
2484             }
2485 
2486             buf->free += n;
2487         }
2488 
2489         rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
2490         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2491             nxt_unit_req_error(req, "Failed to send content");
2492 
2493             return rc;
2494         }
2495     }
2496 
2497     return NXT_UNIT_OK;
2498 }
2499 
2500 
2501 ssize_t
2502 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
2503 {
2504     ssize_t  buf_res, res;
2505 
2506     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
2507                                 dst, size);
2508 
2509     if (buf_res < (ssize_t) size && req->content_fd != -1) {
2510         res = read(req->content_fd, dst, size);
2511         if (res < 0) {
2512             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
2513                                strerror(errno), errno);
2514 
2515             return res;
2516         }
2517 
2518         if (res < (ssize_t) size) {
2519             close(req->content_fd);
2520 
2521             req->content_fd = -1;
2522         }
2523 
2524         req->content_length -= res;
2525         size -= res;
2526 
2527         dst = nxt_pointer_to(dst, res);
2528 
2529     } else {
2530         res = 0;
2531     }
2532 
2533     return buf_res + res;
2534 }
2535 
2536 
2537 ssize_t
2538 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
2539 {
2540     char                 *p;
2541     size_t               l_size, b_size;
2542     nxt_unit_buf_t       *b;
2543     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
2544 
2545     if (req->content_length == 0) {
2546         return 0;
2547     }
2548 
2549     l_size = 0;
2550 
2551     b = req->content_buf;
2552 
2553     while (b != NULL) {
2554         b_size = b->end - b->free;
2555         p = memchr(b->free, '\n', b_size);
2556 
2557         if (p != NULL) {
2558             p++;
2559             l_size += p - b->free;
2560             break;
2561         }
2562 
2563         l_size += b_size;
2564 
2565         if (max_size <= l_size) {
2566             break;
2567         }
2568 
2569         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
2570         if (mmap_buf->next == NULL
2571             && req->content_fd != -1
2572             && l_size < req->content_length)
2573         {
2574             preread_buf = nxt_unit_request_preread(req, 16384);
2575             if (nxt_slow_path(preread_buf == NULL)) {
2576                 return -1;
2577             }
2578 
2579             nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
2580         }
2581 
2582         b = nxt_unit_buf_next(b);
2583     }
2584 
2585     return nxt_min(max_size, l_size);
2586 }
2587 
2588 
2589 static nxt_unit_mmap_buf_t *
2590 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
2591 {
2592     ssize_t              res;
2593     nxt_unit_mmap_buf_t  *mmap_buf;
2594 
2595     if (req->content_fd == -1) {
2596         nxt_unit_req_alert(req, "preread: content_fd == -1");
2597         return NULL;
2598     }
2599 
2600     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2601     if (nxt_slow_path(mmap_buf == NULL)) {
2602         nxt_unit_req_alert(req, "preread: failed to allocate buf");
2603         return NULL;
2604     }
2605 
2606     mmap_buf->free_ptr = malloc(size);
2607     if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
2608         nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
2609         nxt_unit_mmap_buf_release(mmap_buf);
2610         return NULL;
2611     }
2612 
2613     mmap_buf->plain_ptr = mmap_buf->free_ptr;
2614 
2615     mmap_buf->hdr = NULL;
2616     mmap_buf->buf.start = mmap_buf->free_ptr;
2617     mmap_buf->buf.free = mmap_buf->buf.start;
2618     mmap_buf->buf.end = mmap_buf->buf.start + size;
2619     mmap_buf->process = NULL;
2620 
2621     res = read(req->content_fd, mmap_buf->free_ptr, size);
2622     if (res < 0) {
2623         nxt_unit_req_alert(req, "failed to read content: %s (%d)",
2624                            strerror(errno), errno);
2625 
2626         nxt_unit_mmap_buf_free(mmap_buf);
2627 
2628         return NULL;
2629     }
2630 
2631     if (res < (ssize_t) size) {
2632         close(req->content_fd);
2633 
2634         req->content_fd = -1;
2635     }
2636 
2637     nxt_unit_req_debug(req, "preread: read %d", (int) res);
2638 
2639     mmap_buf->buf.end = mmap_buf->buf.free + res;
2640 
2641     return mmap_buf;
2642 }
2643 
2644 
2645 static ssize_t
2646 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
2647 {
2648     u_char          *p;
2649     size_t          rest, copy, read;
2650     nxt_unit_buf_t  *buf, *last_buf;
2651 
2652     p = dst;
2653     rest = size;
2654 
2655     buf = *b;
2656     last_buf = buf;
2657 
2658     while (buf != NULL) {
2659         last_buf = buf;
2660 
2661         copy = buf->end - buf->free;
2662         copy = nxt_min(rest, copy);
2663 
2664         p = nxt_cpymem(p, buf->free, copy);
2665 
2666         buf->free += copy;
2667         rest -= copy;
2668 
2669         if (rest == 0) {
2670             if (buf->end == buf->free) {
2671                 buf = nxt_unit_buf_next(buf);
2672             }
2673 
2674             break;
2675         }
2676 
2677         buf = nxt_unit_buf_next(buf);
2678     }
2679 
2680     *b = last_buf;
2681 
2682     read = size - rest;
2683 
2684     *len -= read;
2685 
2686     return read;
2687 }
2688 
2689 
2690 void
2691 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
2692 {
2693     uint32_t                      size;
2694     nxt_port_msg_t                msg;
2695     nxt_unit_impl_t               *lib;
2696     nxt_unit_request_info_impl_t  *req_impl;
2697 
2698     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2699 
2700     nxt_unit_req_debug(req, "done: %d", rc);
2701 
2702     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2703         goto skip_response_send;
2704     }
2705 
2706     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2707 
2708         size = nxt_length("Content-Type") + nxt_length("text/plain");
2709 
2710         rc = nxt_unit_response_init(req, 200, 1, size);
2711         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2712             goto skip_response_send;
2713         }
2714 
2715         rc = nxt_unit_response_add_field(req, "Content-Type",
2716                                    nxt_length("Content-Type"),
2717                                    "text/plain", nxt_length("text/plain"));
2718         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2719             goto skip_response_send;
2720         }
2721     }
2722 
2723     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2724 
2725         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2726 
2727         nxt_unit_buf_send_done(req->response_buf);
2728 
2729         return;
2730     }
2731 
2732 skip_response_send:
2733 
2734     lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
2735 
2736     msg.stream = req_impl->stream;
2737     msg.pid = lib->pid;
2738     msg.reply_port = 0;
2739     msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
2740                                    : _NXT_PORT_MSG_RPC_ERROR;
2741     msg.last = 1;
2742     msg.mmap = 0;
2743     msg.nf = 0;
2744     msg.mf = 0;
2745     msg.tracking = 0;
2746 
2747     (void) nxt_unit_port_send(req->ctx, &req->response_port,
2748                               &msg, sizeof(msg), NULL, 0);
2749 
2750     nxt_unit_request_info_release(req);
2751 }
2752 
2753 
2754 int
2755 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
2756     uint8_t last, const void *start, size_t size)
2757 {
2758     const struct iovec  iov = { (void *) start, size };
2759 
2760     return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
2761 }
2762 
2763 
2764 int
2765 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
2766     uint8_t last, const struct iovec *iov, int iovcnt)
2767 {
2768     int                           i, rc;
2769     size_t                        l, copy;
2770     uint32_t                      payload_len, buf_size, alloc_size;
2771     const uint8_t                 *b;
2772     nxt_unit_buf_t                *buf;
2773     nxt_unit_mmap_buf_t           mmap_buf;
2774     nxt_websocket_header_t        *wh;
2775     nxt_unit_request_info_impl_t  *req_impl;
2776     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2777 
2778     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2779 
2780     payload_len = 0;
2781 
2782     for (i = 0; i < iovcnt; i++) {
2783         payload_len += iov[i].iov_len;
2784     }
2785 
2786     buf_size = 10 + payload_len;
2787     alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
2788 
2789     rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
2790                                    &req->response_port,
2791                                    alloc_size, alloc_size,
2792                                    &mmap_buf, local_buf);
2793     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2794         return rc;
2795     }
2796 
2797     buf = &mmap_buf.buf;
2798 
2799     buf->start[0] = 0;
2800     buf->start[1] = 0;
2801 
2802     buf_size -= buf->end - buf->start;
2803 
2804     wh = (void *) buf->free;
2805 
2806     buf->free = nxt_websocket_frame_init(wh, payload_len);
2807     wh->fin = last;
2808     wh->opcode = opcode;
2809 
2810     for (i = 0; i < iovcnt; i++) {
2811         b = iov[i].iov_base;
2812         l = iov[i].iov_len;
2813 
2814         while (l > 0) {
2815             copy = buf->end - buf->free;
2816             copy = nxt_min(l, copy);
2817 
2818             buf->free = nxt_cpymem(buf->free, b, copy);
2819             b += copy;
2820             l -= copy;
2821 
2822             if (l > 0) {
2823                 if (nxt_fast_path(buf->free > buf->start)) {
2824                     rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
2825                                                 &mmap_buf, 0);
2826 
2827                     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2828                         return rc;
2829                     }
2830                 }
2831 
2832                 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
2833 
2834                 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
2835                                                &req->response_port,
2836                                                alloc_size, alloc_size,
2837                                                &mmap_buf, local_buf);
2838                 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2839                     return rc;
2840                 }
2841 
2842                 buf_size -= buf->end - buf->start;
2843             }
2844         }
2845     }
2846 
2847     if (buf->free > buf->start) {
2848         rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
2849                                     &mmap_buf, 0);
2850     }
2851 
2852     return rc;
2853 }
2854 
2855 
2856 ssize_t
2857 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
2858     size_t size)
2859 {
2860     ssize_t   res;
2861     uint8_t   *b;
2862     uint64_t  i, d;
2863 
2864     res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
2865                             dst, size);
2866 
2867     if (ws->mask == NULL) {
2868         return res;
2869     }
2870 
2871     b = dst;
2872     d = (ws->payload_len - ws->content_length - res) % 4;
2873 
2874     for (i = 0; i < (uint64_t) res; i++) {
2875         b[i] ^= ws->mask[ (i + d) % 4 ];
2876     }
2877 
2878     return res;
2879 }
2880 
2881 
2882 int
2883 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
2884 {
2885     char                             *b;
2886     size_t                           size;
2887     nxt_unit_websocket_frame_impl_t  *ws_impl;
2888 
2889     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
2890 
2891     if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
2892         return NXT_UNIT_OK;
2893     }
2894 
2895     size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
2896 
2897     b = malloc(size);
2898     if (nxt_slow_path(b == NULL)) {
2899         return NXT_UNIT_ERROR;
2900     }
2901 
2902     memcpy(b, ws_impl->buf->buf.start, size);
2903 
2904     ws_impl->buf->buf.start = b;
2905     ws_impl->buf->buf.free = b;
2906     ws_impl->buf->buf.end = b + size;
2907 
2908     ws_impl->buf->free_ptr = b;
2909 
2910     return NXT_UNIT_OK;
2911 }
2912 
2913 
2914 void
2915 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
2916 {
2917     nxt_unit_websocket_frame_release(ws);
2918 }
2919 
2920 
2921 static nxt_port_mmap_header_t *
2922 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2923     nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n)
2924 {
2925     int                     res, nchunks, i;
2926     uint32_t                outgoing_size;
2927     nxt_unit_mmap_t         *mm, *mm_end;
2928     nxt_unit_impl_t         *lib;
2929     nxt_port_mmap_header_t  *hdr;
2930 
2931     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2932 
2933     pthread_mutex_lock(&process->outgoing.mutex);
2934 
2935 retry:
2936 
2937     outgoing_size = process->outgoing.size;
2938 
2939     mm_end = process->outgoing.elts + outgoing_size;
2940 
2941     for (mm = process->outgoing.elts; mm < mm_end; mm++) {
2942         hdr = mm->hdr;
2943 
2944         if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) {
2945             continue;
2946         }
2947 
2948         *c = 0;
2949 
2950         while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
2951             nchunks = 1;
2952 
2953             while (nchunks < *n) {
2954                 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
2955                                                        *c + nchunks);
2956 
2957                 if (res == 0) {
2958                     if (nchunks >= min_n) {
2959                         *n = nchunks;
2960 
2961                         goto unlock;
2962                     }
2963 
2964                     for (i = 0; i < nchunks; i++) {
2965                         nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
2966                     }
2967 
2968                     *c += nchunks + 1;
2969                     nchunks = 0;
2970                     break;
2971                 }
2972 
2973                 nchunks++;
2974             }
2975 
2976             if (nchunks >= min_n) {
2977                 *n = nchunks;
2978 
2979                 goto unlock;
2980             }
2981         }
2982 
2983         hdr->oosm = 1;
2984     }
2985 
2986     if (outgoing_size >= lib->shm_mmap_limit) {
2987         /* Cannot allocate more shared memory. */
2988         pthread_mutex_unlock(&process->outgoing.mutex);
2989 
2990         if (min_n == 0) {
2991             *n = 0;
2992         }
2993 
2994         if (nxt_slow_path(process->outgoing.allocated_chunks + min_n
2995                           >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
2996         {
2997             /* Memory allocated by application, but not send to router. */
2998             return NULL;
2999         }
3000 
3001         /* Notify router about OOSM condition. */
3002 
3003         res = nxt_unit_send_oosm(ctx, port_id);
3004         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3005             return NULL;
3006         }
3007 
3008         /* Return if caller can handle OOSM condition. Non-blocking mode. */
3009 
3010         if (min_n == 0) {
3011             return NULL;
3012         }
3013 
3014         nxt_unit_debug(ctx, "oosm: waiting for ACK");
3015 
3016         res = nxt_unit_wait_shm_ack(ctx);
3017         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3018             return NULL;
3019         }
3020 
3021         nxt_unit_debug(ctx, "oosm: retry");
3022 
3023         pthread_mutex_lock(&process->outgoing.mutex);
3024 
3025         goto retry;
3026     }
3027 
3028     *c = 0;
3029     hdr = nxt_unit_new_mmap(ctx, process, port_id, *n);
3030 
3031 unlock:
3032 
3033     nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n);
3034 
3035     nxt_unit_debug(ctx, "process %d allocated_chunks %d",
3036                    process->pid,
3037                    (int) process->outgoing.allocated_chunks);
3038 
3039     pthread_mutex_unlock(&process->outgoing.mutex);
3040 
3041     return hdr;
3042 }
3043 
3044 
3045 static int
3046 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
3047 {
3048     ssize_t          res;
3049     nxt_port_msg_t   msg;
3050     nxt_unit_impl_t  *lib;
3051 
3052     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3053 
3054     msg.stream = 0;
3055     msg.pid = lib->pid;
3056     msg.reply_port = 0;
3057     msg.type = _NXT_PORT_MSG_OOSM;
3058     msg.last = 0;
3059     msg.mmap = 0;
3060     msg.nf = 0;
3061     msg.mf = 0;
3062     msg.tracking = 0;
3063 
3064     res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
3065     if (nxt_slow_path(res != sizeof(msg))) {
3066         return NXT_UNIT_ERROR;
3067     }
3068 
3069     return NXT_UNIT_OK;
3070 }
3071 
3072 
3073 static int
3074 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
3075 {
3076     nxt_port_msg_t       *port_msg;
3077     nxt_unit_ctx_impl_t  *ctx_impl;
3078     nxt_unit_read_buf_t  *rbuf;
3079 
3080     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3081 
3082     while (1) {
3083         rbuf = nxt_unit_read_buf_get(ctx);
3084         if (nxt_slow_path(rbuf == NULL)) {
3085             return NXT_UNIT_ERROR;
3086         }
3087 
3088         nxt_unit_read_buf(ctx, rbuf);
3089 
3090         if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
3091             nxt_unit_read_buf_release(ctx, rbuf);
3092 
3093             return NXT_UNIT_ERROR;
3094         }
3095 
3096         port_msg = (nxt_port_msg_t *) rbuf->buf;
3097 
3098         if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
3099             nxt_unit_read_buf_release(ctx, rbuf);
3100 
3101             break;
3102         }
3103 
3104         pthread_mutex_lock(&ctx_impl->mutex);
3105 
3106         *ctx_impl->pending_read_tail = rbuf;
3107         ctx_impl->pending_read_tail = &rbuf->next;
3108         rbuf->next = NULL;
3109 
3110         pthread_mutex_unlock(&ctx_impl->mutex);
3111 
3112         if (port_msg->type == _NXT_PORT_MSG_QUIT) {
3113             nxt_unit_debug(ctx, "oosm: quit received");
3114 
3115             return NXT_UNIT_ERROR;
3116         }
3117     }
3118 
3119     return NXT_UNIT_OK;
3120 }
3121 
3122 
3123 static nxt_unit_mmap_t *
3124 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
3125 {
3126     uint32_t  cap;
3127 
3128     cap = mmaps->cap;
3129 
3130     if (cap == 0) {
3131         cap = i + 1;
3132     }
3133 
3134     while (i + 1 > cap) {
3135 
3136         if (cap < 16) {
3137             cap = cap * 2;
3138 
3139         } else {
3140             cap = cap + cap / 2;
3141         }
3142     }
3143 
3144     if (cap != mmaps->cap) {
3145 
3146         mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
3147         if (nxt_slow_path(mmaps->elts == NULL)) {
3148             return NULL;
3149         }
3150 
3151         memset(mmaps->elts + mmaps->cap, 0,
3152                sizeof(*mmaps->elts) * (cap - mmaps->cap));
3153 
3154         mmaps->cap = cap;
3155     }
3156 
3157     if (i + 1 > mmaps->size) {
3158         mmaps->size = i + 1;
3159     }
3160 
3161     return mmaps->elts + i;
3162 }
3163 
3164 
3165 static nxt_port_mmap_header_t *
3166 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
3167     nxt_unit_port_id_t *port_id, int n)
3168 {
3169     int                     i, fd, rc;
3170     void                    *mem;
3171     char                    name[64];
3172     nxt_unit_mmap_t         *mm;
3173     nxt_unit_impl_t         *lib;
3174     nxt_port_mmap_header_t  *hdr;
3175 
3176     lib = process->lib;
3177 
3178     mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
3179     if (nxt_slow_path(mm == NULL)) {
3180         nxt_unit_warn(ctx, "failed to add mmap to outgoing array");
3181 
3182         return NULL;
3183     }
3184 
3185     snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
3186              lib->pid, (void *) pthread_self());
3187 
3188 #if (NXT_HAVE_MEMFD_CREATE)
3189 
3190     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
3191     if (nxt_slow_path(fd == -1)) {
3192         nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
3193                        strerror(errno), errno);
3194 
3195         goto remove_fail;
3196     }
3197 
3198     nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
3199 
3200 #elif (NXT_HAVE_SHM_OPEN_ANON)
3201 
3202     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
3203     if (nxt_slow_path(fd == -1)) {
3204         nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
3205                        strerror(errno), errno);
3206 
3207         goto remove_fail;
3208     }
3209 
3210 #elif (NXT_HAVE_SHM_OPEN)
3211 
3212     /* Just in case. */
3213     shm_unlink(name);
3214 
3215     fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
3216     if (nxt_slow_path(fd == -1)) {
3217         nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
3218                        strerror(errno), errno);
3219 
3220         goto remove_fail;
3221     }
3222 
3223     if (nxt_slow_path(shm_unlink(name) == -1)) {
3224         nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
3225                       strerror(errno), errno);
3226     }
3227 
3228 #else
3229 
3230 #error No working shared memory implementation.
3231 
3232 #endif
3233 
3234     if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
3235         nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3236                        strerror(errno), errno);
3237 
3238         goto remove_fail;
3239     }
3240 
3241     mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3242     if (nxt_slow_path(mem == MAP_FAILED)) {
3243         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3244                        strerror(errno), errno);
3245 
3246         goto remove_fail;
3247     }
3248 
3249     mm->hdr = mem;
3250     hdr = mem;
3251 
3252     memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
3253     memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
3254 
3255     hdr->id = process->outgoing.size - 1;
3256     hdr->src_pid = lib->pid;
3257     hdr->dst_pid = process->pid;
3258     hdr->sent_over = port_id->id;
3259 
3260     /* Mark first n chunk(s) as busy */
3261     for (i = 0; i < n; i++) {
3262         nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
3263     }
3264 
3265     /* Mark as busy chunk followed the last available chunk. */
3266     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
3267     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
3268 
3269     pthread_mutex_unlock(&process->outgoing.mutex);
3270 
3271     rc = nxt_unit_send_mmap(ctx, port_id, fd);
3272     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3273         munmap(mem, PORT_MMAP_SIZE);
3274         hdr = NULL;
3275 
3276     } else {
3277         nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
3278                        hdr->id, (int) lib->pid, (int) process->pid);
3279     }
3280 
3281     close(fd);
3282 
3283     pthread_mutex_lock(&process->outgoing.mutex);
3284 
3285     if (nxt_fast_path(hdr != NULL)) {
3286         return hdr;
3287     }
3288 
3289 remove_fail:
3290 
3291     process->outgoing.size--;
3292 
3293     return NULL;
3294 }
3295 
3296 
3297 static int
3298 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
3299 {
3300     ssize_t          res;
3301     nxt_port_msg_t   msg;
3302     nxt_unit_impl_t  *lib;
3303     union {
3304         struct cmsghdr  cm;
3305         char            space[CMSG_SPACE(sizeof(int))];
3306     } cmsg;
3307 
3308     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3309 
3310     msg.stream = 0;
3311     msg.pid = lib->pid;
3312     msg.reply_port = 0;
3313     msg.type = _NXT_PORT_MSG_MMAP;
3314     msg.last = 0;
3315     msg.mmap = 0;
3316     msg.nf = 0;
3317     msg.mf = 0;
3318     msg.tracking = 0;
3319 
3320     /*
3321      * Fill all padding fields with 0.
3322      * Code in Go 1.11 validate cmsghdr using padding field as part of len.
3323      * See Cmsghdr definition and socketControlMessageHeaderAndData function.
3324      */
3325     memset(&cmsg, 0, sizeof(cmsg));
3326 
3327     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
3328     cmsg.cm.cmsg_level = SOL_SOCKET;
3329     cmsg.cm.cmsg_type = SCM_RIGHTS;
3330 
3331     /*
3332      * memcpy() is used instead of simple
3333      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
3334      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
3335      *   dereferencing type-punned pointer will break strict-aliasing rules
3336      *
3337      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
3338      * in the same simple assignment as in the code above.
3339      */
3340     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
3341 
3342     res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg),
3343                              &cmsg, sizeof(cmsg));
3344     if (nxt_slow_path(res != sizeof(msg))) {
3345         return NXT_UNIT_ERROR;
3346     }
3347 
3348     return NXT_UNIT_OK;
3349 }
3350 
3351 
3352 static int
3353 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
3354     nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size,
3355     nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
3356 {
3357     int                     nchunks, min_nchunks;
3358     nxt_chunk_id_t          c;
3359     nxt_port_mmap_header_t  *hdr;
3360 
3361     if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
3362         if (local_buf != NULL) {
3363             mmap_buf->free_ptr = NULL;
3364             mmap_buf->plain_ptr = local_buf;
3365 
3366         } else {
3367             mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t));
3368             if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3369                 return NXT_UNIT_ERROR;
3370             }
3371 
3372             mmap_buf->plain_ptr = mmap_buf->free_ptr;
3373         }
3374 
3375         mmap_buf->hdr = NULL;
3376         mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
3377         mmap_buf->buf.free = mmap_buf->buf.start;
3378         mmap_buf->buf.end = mmap_buf->buf.start + size;
3379         mmap_buf->port_id = *port_id;
3380         mmap_buf->process = process;
3381 
3382         nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
3383                        mmap_buf->buf.start, (int) size);
3384 
3385         return NXT_UNIT_OK;
3386     }
3387 
3388     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3389     min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3390 
3391     hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks);
3392     if (nxt_slow_path(hdr == NULL)) {
3393         if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
3394             mmap_buf->hdr = NULL;
3395             mmap_buf->buf.start = NULL;
3396             mmap_buf->buf.free = NULL;
3397             mmap_buf->buf.end = NULL;
3398             mmap_buf->free_ptr = NULL;
3399 
3400             return NXT_UNIT_OK;
3401         }
3402 
3403         return NXT_UNIT_ERROR;
3404     }
3405 
3406     mmap_buf->hdr = hdr;
3407     mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
3408     mmap_buf->buf.free = mmap_buf->buf.start;
3409     mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
3410     mmap_buf->port_id = *port_id;
3411     mmap_buf->process = process;
3412     mmap_buf->free_ptr = NULL;
3413     mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3414 
3415     nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
3416                   (int) hdr->id, (int) c,
3417                   (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
3418 
3419     return NXT_UNIT_OK;
3420 }
3421 
3422 
3423 static int
3424 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
3425 {
3426     int                      rc;
3427     void                     *mem;
3428     struct stat              mmap_stat;
3429     nxt_unit_mmap_t          *mm;
3430     nxt_unit_impl_t          *lib;
3431     nxt_unit_process_t       *process;
3432     nxt_port_mmap_header_t   *hdr;
3433 
3434     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3435 
3436     nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
3437 
3438     pthread_mutex_lock(&lib->mutex);
3439 
3440     process = nxt_unit_process_find(lib, pid, 0);
3441 
3442     pthread_mutex_unlock(&lib->mutex);
3443 
3444     if (nxt_slow_path(process == NULL)) {
3445         nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d",
3446                       (int) pid, fd);
3447 
3448         return NXT_UNIT_ERROR;
3449     }
3450 
3451     rc = NXT_UNIT_ERROR;
3452 
3453     if (fstat(fd, &mmap_stat) == -1) {
3454         nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
3455                       strerror(errno), errno);
3456 
3457         goto fail;
3458     }
3459 
3460     mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
3461                MAP_SHARED, fd, 0);
3462     if (nxt_slow_path(mem == MAP_FAILED)) {
3463         nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
3464                       strerror(errno), errno);
3465 
3466         goto fail;
3467     }
3468 
3469     hdr = mem;
3470 
3471     if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {
3472 
3473         nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
3474                       "detected: %d != %d or %d != %d", (int) hdr->src_pid,
3475                       (int) pid, (int) hdr->dst_pid, (int) lib->pid);
3476 
3477         munmap(mem, PORT_MMAP_SIZE);
3478 
3479         goto fail;
3480     }
3481 
3482     pthread_mutex_lock(&process->incoming.mutex);
3483 
3484     mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
3485     if (nxt_slow_path(mm == NULL)) {
3486         nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");
3487 
3488         munmap(mem, PORT_MMAP_SIZE);
3489 
3490     } else {
3491         mm->hdr = hdr;
3492 
3493         hdr->sent_over = 0xFFFFu;
3494 
3495         rc = NXT_UNIT_OK;
3496     }
3497 
3498     pthread_mutex_unlock(&process->incoming.mutex);
3499 
3500 fail:
3501 
3502     nxt_unit_process_release(process);
3503 
3504     return rc;
3505 }
3506 
3507 
3508 static void
3509 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
3510 {
3511     pthread_mutex_init(&mmaps->mutex, NULL);
3512 
3513     mmaps->size = 0;
3514     mmaps->cap = 0;
3515     mmaps->elts = NULL;
3516     mmaps->allocated_chunks = 0;
3517 }
3518 
3519 
3520 nxt_inline void
3521 nxt_unit_process_use(nxt_unit_process_t *process)
3522 {
3523     nxt_atomic_fetch_add(&process->use_count, 1);
3524 }
3525 
3526 
3527 nxt_inline void
3528 nxt_unit_process_release(nxt_unit_process_t *process)
3529 {
3530     long c;
3531 
3532     c = nxt_atomic_fetch_add(&process->use_count, -1);
3533 
3534     if (c == 1) {
3535         nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
3536 
3537         nxt_unit_mmaps_destroy(&process->incoming);
3538         nxt_unit_mmaps_destroy(&process->outgoing);
3539 
3540         free(process);
3541     }
3542 }
3543 
3544 
3545 static void
3546 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
3547 {
3548     nxt_unit_mmap_t  *mm, *end;
3549 
3550     if (mmaps->elts != NULL) {
3551         end = mmaps->elts + mmaps->size;
3552 
3553         for (mm = mmaps->elts; mm < end; mm++) {
3554             munmap(mm->hdr, PORT_MMAP_SIZE);
3555         }
3556 
3557         free(mmaps->elts);
3558     }
3559 
3560     pthread_mutex_destroy(&mmaps->mutex);
3561 }
3562 
3563 
3564 static nxt_port_mmap_header_t *
3565 nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
3566     uint32_t id)
3567 {
3568     nxt_port_mmap_header_t  *hdr;
3569 
3570     if (nxt_fast_path(process->incoming.size > id)) {
3571         hdr = process->incoming.elts[id].hdr;
3572 
3573     } else {
3574         hdr = NULL;
3575     }
3576 
3577     return hdr;
3578 }
3579 
3580 
3581 static int
3582 nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
3583 {
3584     int                           rc;
3585     nxt_chunk_id_t                c;
3586     nxt_unit_process_t            *process;
3587     nxt_port_mmap_header_t        *hdr;
3588     nxt_port_mmap_tracking_msg_t  *tracking_msg;
3589 
3590     if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
3591         nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
3592                       recv_msg->stream, (int) recv_msg->size);
3593 
3594         return 0;
3595     }
3596 
3597     tracking_msg = recv_msg->start;
3598 
3599     recv_msg->start = tracking_msg + 1;
3600     recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
3601 
3602     process = nxt_unit_msg_get_process(ctx, recv_msg);
3603     if (nxt_slow_path(process == NULL)) {
3604         return 0;
3605     }
3606 
3607     pthread_mutex_lock(&process->incoming.mutex);
3608 
3609     hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
3610     if (nxt_slow_path(hdr == NULL)) {
3611         pthread_mutex_unlock(&process->incoming.mutex);
3612 
3613         nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
3614                       "invalid mmap id %d,%"PRIu32,
3615                       recv_msg->stream, (int) process->pid,
3616                       tracking_msg->mmap_id);
3617 
3618         return 0;
3619     }
3620 
3621     c = tracking_msg->tracking_id;
3622     rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
3623 
3624     if (rc == 0) {
3625         nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
3626                        recv_msg->stream);
3627 
3628         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
3629     }
3630 
3631     pthread_mutex_unlock(&process->incoming.mutex);
3632 
3633     return rc;
3634 }
3635 
3636 
3637 static int
3638 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
3639 {
3640     void                    *start;
3641     uint32_t                size;
3642     nxt_unit_process_t      *process;
3643     nxt_unit_mmap_buf_t     *b, **incoming_tail;
3644     nxt_port_mmap_msg_t     *mmap_msg, *end;
3645     nxt_port_mmap_header_t  *hdr;
3646 
3647     if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
3648         nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
3649                       recv_msg->stream, (int) recv_msg->size);
3650 
3651         return NXT_UNIT_ERROR;
3652     }
3653 
3654     process = nxt_unit_msg_get_process(ctx, recv_msg);
3655     if (nxt_slow_path(process == NULL)) {
3656         return NXT_UNIT_ERROR;
3657     }
3658 
3659     mmap_msg = recv_msg->start;
3660     end = nxt_pointer_to(recv_msg->start, recv_msg->size);
3661 
3662     incoming_tail = &recv_msg->incoming_buf;
3663 
3664     for (; mmap_msg < end; mmap_msg++) {
3665         b = nxt_unit_mmap_buf_get(ctx);
3666         if (nxt_slow_path(b == NULL)) {
3667             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
3668                           recv_msg->stream);
3669 
3670             return NXT_UNIT_ERROR;
3671         }
3672 
3673         nxt_unit_mmap_buf_insert(incoming_tail, b);
3674         incoming_tail = &b->next;
3675     }
3676 
3677     b = recv_msg->incoming_buf;
3678     mmap_msg = recv_msg->start;
3679 
3680     pthread_mutex_lock(&process->incoming.mutex);
3681 
3682     for (; mmap_msg < end; mmap_msg++) {
3683         hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
3684         if (nxt_slow_path(hdr == NULL)) {
3685             pthread_mutex_unlock(&process->incoming.mutex);
3686 
3687             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
3688                           "invalid mmap id %d,%"PRIu32,
3689                           recv_msg->stream, (int) process->pid,
3690                           mmap_msg->mmap_id);
3691 
3692             return NXT_UNIT_ERROR;
3693         }
3694 
3695         start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
3696         size = mmap_msg->size;
3697 
3698         if (recv_msg->start == mmap_msg) {
3699             recv_msg->start = start;
3700             recv_msg->size = size;
3701         }
3702 
3703         b->buf.start = start;
3704         b->buf.free = start;
3705         b->buf.end = b->buf.start + size;
3706         b->hdr = hdr;
3707         b->process = process;
3708 
3709         b = b->next;
3710 
3711         nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
3712                        recv_msg->stream,
3713                        start, (int) size,
3714                        (int) hdr->src_pid, (int) hdr->dst_pid,
3715                        (int) hdr->id, (int) mmap_msg->chunk_id,
3716                        (int) mmap_msg->size);
3717     }
3718 
3719     pthread_mutex_unlock(&process->incoming.mutex);
3720 
3721     return NXT_UNIT_OK;
3722 }
3723 
3724 
3725 static void
3726 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
3727     nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr,
3728     void *start, uint32_t size)
3729 {
3730     int              freed_chunks;
3731     u_char           *p, *end;
3732     nxt_chunk_id_t   c;
3733     nxt_unit_impl_t  *lib;
3734 
3735     memset(start, 0xA5, size);
3736 
3737     p = start;
3738     end = p + size;
3739     c = nxt_port_mmap_chunk_id(hdr, p);
3740     freed_chunks = 0;
3741 
3742     while (p < end) {
3743         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
3744 
3745         p += PORT_MMAP_CHUNK_SIZE;
3746         c++;
3747         freed_chunks++;
3748     }
3749 
3750     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3751 
3752     if (hdr->src_pid == lib->pid && freed_chunks != 0) {
3753         nxt_atomic_fetch_add(&process->outgoing.allocated_chunks,
3754                              -freed_chunks);
3755 
3756         nxt_unit_debug(ctx, "process %d allocated_chunks %d",
3757                        process->pid,
3758                        (int) process->outgoing.allocated_chunks);
3759     }
3760 
3761     if (hdr->dst_pid == lib->pid
3762         && freed_chunks != 0
3763         && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
3764     {
3765         nxt_unit_send_shm_ack(ctx, hdr->src_pid);
3766     }
3767 }
3768 
3769 
3770 static int
3771 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
3772 {
3773     ssize_t             res;
3774     nxt_port_msg_t      msg;
3775     nxt_unit_impl_t     *lib;
3776     nxt_unit_port_id_t  port_id;
3777 
3778     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3779 
3780     nxt_unit_port_id_init(&port_id, pid, 0);
3781 
3782     msg.stream = 0;
3783     msg.pid = lib->pid;
3784     msg.reply_port = 0;
3785     msg.type = _NXT_PORT_MSG_SHM_ACK;
3786     msg.last = 0;
3787     msg.mmap = 0;
3788     msg.nf = 0;
3789     msg.mf = 0;
3790     msg.tracking = 0;
3791 
3792     res = nxt_unit_port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0);
3793     if (nxt_slow_path(res != sizeof(msg))) {
3794         return NXT_UNIT_ERROR;
3795     }
3796 
3797     return NXT_UNIT_OK;
3798 }
3799 
3800 
3801 static nxt_int_t
3802 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
3803 {
3804     nxt_process_t  *process;
3805 
3806     process = data;
3807 
3808     if (lhq->key.length == sizeof(pid_t)
3809         && *(pid_t *) lhq->key.start == process->pid)
3810     {
3811         return NXT_OK;
3812     }
3813 
3814     return NXT_DECLINED;
3815 }
3816 
3817 
3818 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
3819     NXT_LVLHSH_DEFAULT,
3820     nxt_unit_lvlhsh_pid_test,
3821     nxt_lvlhsh_alloc,
3822     nxt_lvlhsh_free,
3823 };
3824 
3825 
3826 static inline void
3827 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
3828 {
3829     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
3830     lhq->key.length = sizeof(*pid);
3831     lhq->key.start = (u_char *) pid;
3832     lhq->proto = &lvlhsh_processes_proto;
3833 }
3834 
3835 
3836 static nxt_unit_process_t *
3837 nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
3838 {
3839     nxt_unit_process_t  *process;
3840     nxt_lvlhsh_query_t  lhq;
3841 
3842     nxt_unit_process_lhq_pid(&lhq, &pid);
3843 
3844     if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
3845         process = lhq.value;
3846         nxt_unit_process_use(process);
3847 
3848         return process;
3849     }
3850 
3851     process = malloc(sizeof(nxt_unit_process_t));
3852     if (nxt_slow_path(process == NULL)) {
3853         nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid);
3854 
3855         return NULL;
3856     }
3857 
3858     process->pid = pid;
3859     process->use_count = 1;
3860     process->next_port_id = 0;
3861     process->lib = lib;
3862 
3863     nxt_queue_init(&process->ports);
3864 
3865     nxt_unit_mmaps_init(&process->incoming);
3866     nxt_unit_mmaps_init(&process->outgoing);
3867 
3868     lhq.replace = 0;
3869     lhq.value = process;
3870 
3871     switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
3872 
3873     case NXT_OK:
3874         break;
3875 
3876     default:
3877         nxt_unit_alert(NULL, "process %d insert failed", (int) pid);
3878 
3879         pthread_mutex_destroy(&process->outgoing.mutex);
3880         pthread_mutex_destroy(&process->incoming.mutex);
3881         free(process);
3882         process = NULL;
3883         break;
3884     }
3885 
3886     nxt_unit_process_use(process);
3887 
3888     return process;
3889 }
3890 
3891 
3892 static nxt_unit_process_t *
3893 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
3894 {
3895     int                 rc;
3896     nxt_unit_process_t  *process;
3897     nxt_lvlhsh_query_t  lhq;
3898 
3899     nxt_unit_process_lhq_pid(&lhq, &pid);
3900 
3901     if (remove) {
3902         rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
3903 
3904     } else {
3905         rc = nxt_lvlhsh_find(&lib->processes, &lhq);
3906     }
3907 
3908     if (rc == NXT_OK) {
3909         process = lhq.value;
3910 
3911         if (!remove) {
3912             nxt_unit_process_use(process);
3913         }
3914 
3915         return process;
3916     }
3917 
3918     return NULL;
3919 }
3920 
3921 
3922 static nxt_unit_process_t *
3923 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
3924 {
3925     return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
3926 }
3927 
3928 
3929 int
3930 nxt_unit_run(nxt_unit_ctx_t *ctx)
3931 {
3932     int                  rc;
3933     nxt_unit_impl_t      *lib;
3934     nxt_unit_ctx_impl_t  *ctx_impl;
3935 
3936     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3937 
3938     nxt_unit_ctx_use(ctx_impl);
3939 
3940     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3941     rc = NXT_UNIT_OK;
3942 
3943     while (nxt_fast_path(lib->online)) {
3944         rc = nxt_unit_run_once(ctx);
3945 
3946         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3947             break;
3948         }
3949     }
3950 
3951     nxt_unit_ctx_release(ctx_impl);
3952 
3953     return rc;
3954 }
3955 
3956 
3957 int
3958 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
3959 {
3960     int                  rc;
3961     nxt_unit_ctx_impl_t  *ctx_impl;
3962     nxt_unit_read_buf_t  *rbuf;
3963 
3964     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3965 
3966     nxt_unit_ctx_use(ctx_impl);
3967 
3968     pthread_mutex_lock(&ctx_impl->mutex);
3969 
3970     if (ctx_impl->pending_read_head != NULL) {
3971         rbuf = ctx_impl->pending_read_head;
3972         ctx_impl->pending_read_head = rbuf->next;
3973 
3974         if (ctx_impl->pending_read_tail == &rbuf->next) {
3975             ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
3976         }
3977 
3978         pthread_mutex_unlock(&ctx_impl->mutex);
3979 
3980     } else {
3981         rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
3982         if (nxt_slow_path(rbuf == NULL)) {
3983 
3984             nxt_unit_ctx_release(ctx_impl);
3985 
3986             return NXT_UNIT_ERROR;
3987         }
3988 
3989         nxt_unit_read_buf(ctx, rbuf);
3990     }
3991 
3992     if (nxt_fast_path(rbuf->size > 0)) {
3993         rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id,
3994                                   rbuf->buf, rbuf->size,
3995                                   rbuf->oob, sizeof(rbuf->oob));
3996 
3997 #if (NXT_DEBUG)
3998         memset(rbuf->buf, 0xAC, rbuf->size);
3999 #endif
4000 
4001     } else {
4002         rc = NXT_UNIT_ERROR;
4003     }
4004 
4005     nxt_unit_read_buf_release(ctx, rbuf);
4006 
4007     nxt_unit_ctx_release(ctx_impl);
4008 
4009     return rc;
4010 }
4011 
4012 
4013 static void
4014 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4015 {
4016     nxt_unit_impl_t      *lib;
4017     nxt_unit_ctx_impl_t  *ctx_impl;
4018 
4019     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4020 
4021     memset(rbuf->oob, 0, sizeof(struct cmsghdr));
4022 
4023     if (ctx_impl->read_port_fd != -1) {
4024         rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
4025                                         rbuf->buf, sizeof(rbuf->buf),
4026                                         rbuf->oob, sizeof(rbuf->oob));
4027 
4028     } else {
4029         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4030 
4031         rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
4032                                               rbuf->buf, sizeof(rbuf->buf),
4033                                               rbuf->oob, sizeof(rbuf->oob));
4034     }
4035 }
4036 
4037 
4038 void
4039 nxt_unit_done(nxt_unit_ctx_t *ctx)
4040 {
4041     nxt_unit_ctx_impl_t  *ctx_impl;
4042 
4043     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4044 
4045     nxt_unit_ctx_release(ctx_impl);
4046 }
4047 
4048 
4049 nxt_unit_ctx_t *
4050 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
4051 {
4052     int                  rc, fd;
4053     nxt_unit_impl_t      *lib;
4054     nxt_unit_port_id_t   new_port_id;
4055     nxt_unit_ctx_impl_t  *new_ctx;
4056 
4057     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4058 
4059     new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
4060     if (nxt_slow_path(new_ctx == NULL)) {
4061         nxt_unit_warn(ctx, "failed to allocate context");
4062 
4063         return NULL;
4064     }
4065 
4066     rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
4067     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4068         free(new_ctx);
4069 
4070         return NULL;
4071     }
4072 
4073     rc = nxt_unit_send_port(ctx, &lib->router_port_id, &new_port_id, fd);
4074     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4075         nxt_unit_remove_port(lib, &new_port_id);
4076 
4077         close(fd);
4078 
4079         free(new_ctx);
4080 
4081         return NULL;
4082     }
4083 
4084     close(fd);
4085 
4086     rc = nxt_unit_ctx_init(lib, new_ctx, data);
4087     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4088         nxt_unit_remove_port(lib, &new_port_id);
4089 
4090         free(new_ctx);
4091 
4092         return NULL;
4093     }
4094 
4095     new_ctx->read_port_id = new_port_id;
4096 
4097     return &new_ctx->ctx;
4098 }
4099 
4100 
4101 static void
4102 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
4103 {
4104     nxt_unit_impl_t                  *lib;
4105     nxt_unit_mmap_buf_t              *mmap_buf;
4106     nxt_unit_request_info_impl_t     *req_impl;
4107     nxt_unit_websocket_frame_impl_t  *ws_impl;
4108 
4109     lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
4110 
4111     nxt_queue_each(req_impl, &ctx_impl->active_req,
4112                    nxt_unit_request_info_impl_t, link)
4113     {
4114         nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
4115 
4116         nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
4117 
4118     } nxt_queue_loop;
4119 
4120     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]);
4121     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]);
4122 
4123     while (ctx_impl->free_buf != NULL) {
4124         mmap_buf = ctx_impl->free_buf;
4125         nxt_unit_mmap_buf_unlink(mmap_buf);
4126         free(mmap_buf);
4127     }
4128 
4129     nxt_queue_each(req_impl, &ctx_impl->free_req,
4130                    nxt_unit_request_info_impl_t, link)
4131     {
4132         nxt_unit_request_info_free(req_impl);
4133 
4134     } nxt_queue_loop;
4135 
4136     nxt_queue_each(ws_impl, &ctx_impl->free_ws,
4137                    nxt_unit_websocket_frame_impl_t, link)
4138     {
4139         nxt_unit_websocket_frame_free(ws_impl);
4140 
4141     } nxt_queue_loop;
4142 
4143     pthread_mutex_destroy(&ctx_impl->mutex);
4144 
4145     nxt_queue_remove(&ctx_impl->link);
4146 
4147     if (ctx_impl != &lib->main_ctx) {
4148         free(ctx_impl);
4149     }
4150 
4151     nxt_unit_lib_release(lib);
4152 }
4153 
4154 
4155 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
4156 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
4157 #define NXT_UNIX_SOCKET  SOCK_SEQPACKET
4158 #else
4159 #define NXT_UNIX_SOCKET  SOCK_DGRAM
4160 #endif
4161 
4162 
4163 void
4164 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
4165 {
4166     nxt_unit_port_hash_id_t  port_hash_id;
4167 
4168     port_hash_id.pid = pid;
4169     port_hash_id.id = id;
4170 
4171     port_id->pid = pid;
4172     port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
4173     port_id->id = id;
4174 }
4175 
4176 
4177 static int
4178 nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
4179 {
4180     int                 rc, port_sockets[2];
4181     nxt_unit_impl_t     *lib;
4182     nxt_unit_port_t     new_port;
4183     nxt_unit_process_t  *process;
4184 
4185     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4186 
4187     rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
4188     if (nxt_slow_path(rc != 0)) {
4189         nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
4190                       strerror(errno), errno);
4191 
4192         return NXT_UNIT_ERROR;
4193     }
4194 
4195     nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
4196                    port_sockets[0], port_sockets[1]);
4197 
4198     pthread_mutex_lock(&lib->mutex);
4199 
4200     process = nxt_unit_process_get(lib, lib->pid);
4201     if (nxt_slow_path(process == NULL)) {
4202         pthread_mutex_unlock(&lib->mutex);
4203 
4204         close(port_sockets[0]);
4205         close(port_sockets[1]);
4206 
4207         return NXT_UNIT_ERROR;
4208     }
4209 
4210     nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
4211 
4212     new_port.in_fd = port_sockets[0];
4213     new_port.out_fd = -1;
4214     new_port.data = NULL;
4215 
4216     pthread_mutex_unlock(&lib->mutex);
4217 
4218     nxt_unit_process_release(process);
4219 
4220     rc = nxt_unit_add_port(ctx, &new_port);
4221     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4222         nxt_unit_warn(ctx, "create_port: add_port() failed");
4223 
4224         close(port_sockets[0]);
4225         close(port_sockets[1]);
4226 
4227         return rc;
4228     }
4229 
4230     *port_id = new_port.id;
4231     *fd = port_sockets[1];
4232 
4233     return rc;
4234 }
4235 
4236 
4237 static int
4238 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
4239     nxt_unit_port_id_t *new_port, int fd)
4240 {
4241     ssize_t          res;
4242     nxt_unit_impl_t  *lib;
4243 
4244     struct {
4245         nxt_port_msg_t            msg;
4246         nxt_port_msg_new_port_t   new_port;
4247     } m;
4248 
4249     union {
4250         struct cmsghdr  cm;
4251         char            space[CMSG_SPACE(sizeof(int))];
4252     } cmsg;
4253 
4254     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4255 
4256     m.msg.stream = 0;
4257     m.msg.pid = lib->pid;
4258     m.msg.reply_port = 0;
4259     m.msg.type = _NXT_PORT_MSG_NEW_PORT;
4260     m.msg.last = 0;
4261     m.msg.mmap = 0;
4262     m.msg.nf = 0;
4263     m.msg.mf = 0;
4264     m.msg.tracking = 0;
4265 
4266     m.new_port.id = new_port->id;
4267     m.new_port.pid = new_port->pid;
4268     m.new_port.type = NXT_PROCESS_APP;
4269     m.new_port.max_size = 16 * 1024;
4270     m.new_port.max_share = 64 * 1024;
4271 
4272     memset(&cmsg, 0, sizeof(cmsg));
4273 
4274     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
4275     cmsg.cm.cmsg_level = SOL_SOCKET;
4276     cmsg.cm.cmsg_type = SCM_RIGHTS;
4277 
4278     /*
4279      * memcpy() is used instead of simple
4280      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
4281      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
4282      *   dereferencing type-punned pointer will break strict-aliasing rules
4283      *
4284      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
4285      * in the same simple assignment as in the code above.
4286      */
4287     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
4288 
4289     res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
4290 
4291     return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
4292 }
4293 
4294 
4295 static int
4296 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
4297 {
4298     int                   rc;
4299     nxt_unit_impl_t       *lib;
4300     nxt_unit_process_t    *process;
4301     nxt_unit_port_impl_t  *new_port, *old_port;
4302 
4303     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4304 
4305     pthread_mutex_lock(&lib->mutex);
4306 
4307     old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
4308 
4309     if (nxt_slow_path(old_port != NULL)) {
4310         nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d",
4311                        port->id.pid, port->id.id,
4312                        port->in_fd, port->out_fd);
4313 
4314         if (old_port->port.data == NULL) {
4315             old_port->port.data = port->data;
4316             port->data = NULL;
4317         }
4318 
4319         if (old_port->port.in_fd == -1) {
4320             old_port->port.in_fd = port->in_fd;
4321             port->in_fd = -1;
4322         }
4323 
4324         if (port->in_fd != -1) {
4325             close(port->in_fd);
4326             port->in_fd = -1;
4327         }
4328 
4329         if (old_port->port.out_fd == -1) {
4330             old_port->port.out_fd = port->out_fd;
4331             port->out_fd = -1;
4332         }
4333 
4334         if (port->out_fd != -1) {
4335             close(port->out_fd);
4336             port->out_fd = -1;
4337         }
4338 
4339         *port = old_port->port;
4340 
4341         pthread_mutex_unlock(&lib->mutex);
4342 
4343         if (lib->callbacks.add_port != NULL
4344             && (port->in_fd != -1 || port->out_fd != -1))
4345         {
4346             lib->callbacks.add_port(ctx, &old_port->port);
4347         }
4348 
4349         return NXT_UNIT_OK;
4350     }
4351 
4352     nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
4353                    port->id.pid, port->id.id,
4354                    port->in_fd, port->out_fd);
4355 
4356     process = nxt_unit_process_get(lib, port->id.pid);
4357     if (nxt_slow_path(process == NULL)) {
4358         rc = NXT_UNIT_ERROR;
4359         goto unlock;
4360     }
4361 
4362     if (port->id.id >= process->next_port_id) {
4363         process->next_port_id = port->id.id + 1;
4364     }
4365 
4366     new_port = malloc(sizeof(nxt_unit_port_impl_t));
4367     if (nxt_slow_path(new_port == NULL)) {
4368         rc = NXT_UNIT_ERROR;
4369         goto unlock;
4370     }
4371 
4372     new_port->port = *port;
4373 
4374     rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
4375     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4376         nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
4377                        port->id.pid, port->id.id);
4378 
4379         goto unlock;
4380     }
4381 
4382     nxt_queue_insert_tail(&process->ports, &new_port->link);
4383 
4384     rc = NXT_UNIT_OK;
4385 
4386     new_port->process = process;
4387 
4388 unlock:
4389 
4390     pthread_mutex_unlock(&lib->mutex);
4391 
4392     if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) {
4393         nxt_unit_process_release(process);
4394     }
4395 
4396     if (lib->callbacks.add_port != NULL
4397         && rc == NXT_UNIT_OK
4398         && (port->in_fd != -1 || port->out_fd != -1))
4399     {
4400         lib->callbacks.add_port(ctx, &new_port->port);
4401     }
4402 
4403     return rc;
4404 }
4405 
4406 
4407 static int
4408 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
4409 {
4410     int                 res;
4411     nxt_unit_port_t     *port;
4412     nxt_unit_process_t  *process;
4413 
4414     port = NULL;
4415     process = NULL;
4416 
4417     pthread_mutex_lock(&lib->mutex);
4418 
4419     res = nxt_unit_remove_port_unsafe(lib, port_id, &port, &process);
4420 
4421     pthread_mutex_unlock(&lib->mutex);
4422 
4423     if (lib->callbacks.remove_port != NULL && res == NXT_UNIT_OK) {
4424         lib->callbacks.remove_port(&lib->unit, port);
4425     }
4426 
4427     if (nxt_fast_path(port != NULL)) {
4428         if (port->in_fd != -1) {
4429             close(port->in_fd);
4430         }
4431 
4432         if (port->out_fd != -1) {
4433             close(port->out_fd);
4434         }
4435     }
4436 
4437     if (nxt_slow_path(process != NULL)) {
4438         nxt_unit_process_release(process);
4439     }
4440 
4441     if (nxt_fast_path(port != NULL)) {
4442         free(port);
4443     }
4444 
4445     return res;
4446 }
4447 
4448 
4449 static int
4450 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id,
4451     nxt_unit_port_t **r_port, nxt_unit_process_t **process)
4452 {
4453     nxt_unit_port_impl_t  *port;
4454 
4455     port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
4456     if (nxt_slow_path(port == NULL)) {
4457         nxt_unit_debug(NULL, "remove_port: port %d,%d not found",
4458                        (int) port_id->pid, (int) port_id->id);
4459 
4460         return NXT_UNIT_ERROR;
4461     }
4462 
4463     nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p",
4464                    (int) port_id->pid, (int) port_id->id,
4465                    port->port.in_fd, port->port.out_fd, port->port.data);
4466 
4467     if (port->process != NULL) {
4468         nxt_queue_remove(&port->link);
4469     }
4470 
4471     if (process != NULL) {
4472         *process = port->process;
4473     }
4474 
4475     if (r_port != NULL) {
4476         *r_port = &port->port;
4477     }
4478 
4479     return NXT_UNIT_OK;
4480 }
4481 
4482 
4483 static void
4484 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid)
4485 {
4486     nxt_unit_process_t  *process;
4487 
4488     pthread_mutex_lock(&lib->mutex);
4489 
4490     process = nxt_unit_process_find(lib, pid, 1);
4491     if (nxt_slow_path(process == NULL)) {
4492         nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid);
4493 
4494         pthread_mutex_unlock(&lib->mutex);
4495 
4496         return;
4497     }
4498 
4499     nxt_unit_remove_process(lib, process);
4500 
4501     if (lib->callbacks.remove_pid != NULL) {
4502         lib->callbacks.remove_pid(&lib->unit, pid);
4503     }
4504 }
4505 
4506 
4507 static void
4508 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
4509 {
4510     nxt_queue_t           ports;
4511     nxt_unit_port_impl_t  *port;
4512 
4513     nxt_queue_init(&ports);
4514 
4515     nxt_queue_add(&ports, &process->ports);
4516 
4517     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
4518 
4519         nxt_unit_process_release(process);
4520 
4521         /* To avoid unlink port. */
4522         port->process = NULL;
4523 
4524         nxt_unit_remove_port_unsafe(lib, &port->port.id, NULL, NULL);
4525 
4526     } nxt_queue_loop;
4527 
4528     pthread_mutex_unlock(&lib->mutex);
4529 
4530     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
4531 
4532         nxt_queue_remove(&port->link);
4533 
4534         if (lib->callbacks.remove_port != NULL) {
4535             lib->callbacks.remove_port(&lib->unit, &port->port);
4536         }
4537 
4538         if (port->port.in_fd != -1) {
4539             close(port->port.in_fd);
4540         }
4541 
4542         if (port->port.out_fd != -1) {
4543             close(port->port.out_fd);
4544         }
4545 
4546         free(port);
4547 
4548     } nxt_queue_loop;
4549 
4550     nxt_unit_process_release(process);
4551 }
4552 
4553 
4554 static void
4555 nxt_unit_quit(nxt_unit_ctx_t *ctx)
4556 {
4557     nxt_unit_impl_t  *lib;
4558 
4559     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4560 
4561     lib->online = 0;
4562 
4563     if (lib->callbacks.quit != NULL) {
4564         lib->callbacks.quit(ctx);
4565     }
4566 }
4567 
4568 
4569 static ssize_t
4570 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
4571     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
4572 {
4573     int                   fd;
4574     nxt_unit_impl_t       *lib;
4575     nxt_unit_port_impl_t  *port;
4576 
4577     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4578 
4579     pthread_mutex_lock(&lib->mutex);
4580 
4581     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
4582 
4583     if (nxt_fast_path(port != NULL && port->port.out_fd != -1)) {
4584         fd = port->port.out_fd;
4585 
4586         pthread_mutex_unlock(&lib->mutex);
4587 
4588     } else {
4589         pthread_mutex_unlock(&lib->mutex);
4590 
4591         nxt_unit_alert(ctx, "port_send: port %d,%d not found",
4592                        (int) port_id->pid, (int) port_id->id);
4593 
4594         return -NXT_UNIT_ERROR;
4595     }
4596 
4597     nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d",
4598                    (int) port_id->pid, (int) port_id->id, fd);
4599 
4600     if (lib->callbacks.port_send == NULL) {
4601         return nxt_unit_sendmsg(ctx, fd, buf, buf_size, oob, oob_size);
4602 
4603     } else {
4604         return lib->callbacks.port_send(ctx, port_id, buf, buf_size,
4605                                         oob, oob_size);
4606     }
4607 }
4608 
4609 
4610 static ssize_t
4611 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
4612     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
4613 {
4614     ssize_t        res;
4615     struct iovec   iov[1];
4616     struct msghdr  msg;
4617 
4618     iov[0].iov_base = (void *) buf;
4619     iov[0].iov_len = buf_size;
4620 
4621     msg.msg_name = NULL;
4622     msg.msg_namelen = 0;
4623     msg.msg_iov = iov;
4624     msg.msg_iovlen = 1;
4625     msg.msg_flags = 0;
4626     msg.msg_control = (void *) oob;
4627     msg.msg_controllen = oob_size;
4628 
4629 retry:
4630 
4631     res = sendmsg(fd, &msg, 0);
4632 
4633     if (nxt_slow_path(res == -1)) {
4634         if (errno == EINTR) {
4635             goto retry;
4636         }
4637 
4638         /*
4639          * FIXME: This should be "alert" after router graceful shutdown
4640          * implementation.
4641          */
4642         nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
4643                       fd, (int) buf_size, strerror(errno), errno);
4644 
4645     } else {
4646         nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
4647                        (int) res);
4648     }
4649 
4650     return res;
4651 }
4652 
4653 
4654 static ssize_t
4655 nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
4656     void *buf, size_t buf_size, void *oob, size_t oob_size)
4657 {
4658     int                   fd;
4659     nxt_unit_impl_t       *lib;
4660     nxt_unit_ctx_impl_t   *ctx_impl;
4661     nxt_unit_port_impl_t  *port;
4662 
4663     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4664 
4665     pthread_mutex_lock(&lib->mutex);
4666 
4667     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
4668 
4669     if (nxt_fast_path(port != NULL)) {
4670         fd = port->port.in_fd;
4671 
4672     } else {
4673         nxt_unit_debug(ctx, "port_recv: port %d,%d not found",
4674                        (int) port_id->pid, (int) port_id->id);
4675         fd = -1;
4676     }
4677 
4678     pthread_mutex_unlock(&lib->mutex);
4679 
4680     if (nxt_slow_path(fd == -1)) {
4681         return -1;
4682     }
4683 
4684     nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d",
4685                    (int) port_id->pid, (int) port_id->id, fd);
4686 
4687     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4688 
4689     if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) {
4690         ctx_impl->read_port_fd = fd;
4691     }
4692 
4693     return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size);
4694 }
4695 
4696 
4697 ssize_t
4698 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size,
4699     void *oob, size_t oob_size)
4700 {
4701     ssize_t        res;
4702     struct iovec   iov[1];
4703     struct msghdr  msg;
4704 
4705     iov[0].iov_base = buf;
4706     iov[0].iov_len = buf_size;
4707 
4708     msg.msg_name = NULL;
4709     msg.msg_namelen = 0;
4710     msg.msg_iov = iov;
4711     msg.msg_iovlen = 1;
4712     msg.msg_flags = 0;
4713     msg.msg_control = oob;
4714     msg.msg_controllen = oob_size;
4715 
4716 retry:
4717 
4718     res = recvmsg(fd, &msg, 0);
4719 
4720     if (nxt_slow_path(res == -1)) {
4721         if (errno == EINTR) {
4722             goto retry;
4723         }
4724 
4725         nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
4726                        fd, strerror(errno), errno);
4727 
4728     } else {
4729         nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res);
4730     }
4731 
4732     return res;
4733 }
4734 
4735 
4736 static nxt_int_t
4737 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
4738 {
4739     nxt_unit_port_t          *port;
4740     nxt_unit_port_hash_id_t  *port_id;
4741 
4742     port = data;
4743     port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
4744 
4745     if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
4746         && port_id->pid == port->id.pid
4747         && port_id->id == port->id.id)
4748     {
4749         return NXT_OK;
4750     }
4751 
4752     return NXT_DECLINED;
4753 }
4754 
4755 
4756 static const nxt_lvlhsh_proto_t  lvlhsh_ports_proto  nxt_aligned(64) = {
4757     NXT_LVLHSH_DEFAULT,
4758     nxt_unit_port_hash_test,
4759     nxt_lvlhsh_alloc,
4760     nxt_lvlhsh_free,
4761 };
4762 
4763 
4764 static inline void
4765 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
4766     nxt_unit_port_hash_id_t *port_hash_id,
4767     nxt_unit_port_id_t *port_id)
4768 {
4769     port_hash_id->pid = port_id->pid;
4770     port_hash_id->id = port_id->id;
4771 
4772     if (nxt_fast_path(port_id->hash != 0)) {
4773         lhq->key_hash = port_id->hash;
4774 
4775     } else {
4776         lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
4777 
4778         port_id->hash = lhq->key_hash;
4779 
4780         nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
4781                        (int) port_id->pid, (int) port_id->id,
4782                        (int) port_id->hash);
4783     }
4784 
4785     lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
4786     lhq->key.start = (u_char *) port_hash_id;
4787     lhq->proto = &lvlhsh_ports_proto;
4788     lhq->pool = NULL;
4789 }
4790 
4791 
4792 static int
4793 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
4794 {
4795     nxt_int_t                res;
4796     nxt_lvlhsh_query_t       lhq;
4797     nxt_unit_port_hash_id_t  port_hash_id;
4798 
4799     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
4800     lhq.replace = 0;
4801     lhq.value = port;
4802 
4803     res = nxt_lvlhsh_insert(port_hash, &lhq);
4804 
4805     switch (res) {
4806 
4807     case NXT_OK:
4808         return NXT_UNIT_OK;
4809 
4810     default:
4811         return NXT_UNIT_ERROR;
4812     }
4813 }
4814 
4815 
4816 static nxt_unit_port_impl_t *
4817 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
4818     int remove)
4819 {
4820     nxt_int_t                res;
4821     nxt_lvlhsh_query_t       lhq;
4822     nxt_unit_port_hash_id_t  port_hash_id;
4823 
4824     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
4825 
4826     if (remove) {
4827         res = nxt_lvlhsh_delete(port_hash, &lhq);
4828 
4829     } else {
4830         res = nxt_lvlhsh_find(port_hash, &lhq);
4831     }
4832 
4833     switch (res) {
4834 
4835     case NXT_OK:
4836         return lhq.value;
4837 
4838     default:
4839         return NULL;
4840     }
4841 }
4842 
4843 
4844 static nxt_int_t
4845 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
4846 {
4847     return NXT_OK;
4848 }
4849 
4850 
4851 static const nxt_lvlhsh_proto_t  lvlhsh_requests_proto  nxt_aligned(64) = {
4852     NXT_LVLHSH_DEFAULT,
4853     nxt_unit_request_hash_test,
4854     nxt_lvlhsh_alloc,
4855     nxt_lvlhsh_free,
4856 };
4857 
4858 
4859 static int
4860 nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
4861     nxt_unit_request_info_impl_t *req_impl)
4862 {
4863     uint32_t            *stream;
4864     nxt_int_t           res;
4865     nxt_lvlhsh_query_t  lhq;
4866 
4867     stream = &req_impl->stream;
4868 
4869     lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
4870     lhq.key.length = sizeof(*stream);
4871     lhq.key.start = (u_char *) stream;
4872     lhq.proto = &lvlhsh_requests_proto;
4873     lhq.pool = NULL;
4874     lhq.replace = 0;
4875     lhq.value = req_impl;
4876 
4877     res = nxt_lvlhsh_insert(request_hash, &lhq);
4878 
4879     switch (res) {
4880 
4881     case NXT_OK:
4882         return NXT_UNIT_OK;
4883 
4884     default:
4885         return NXT_UNIT_ERROR;
4886     }
4887 }
4888 
4889 
4890 static nxt_unit_request_info_impl_t *
4891 nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
4892     int remove)
4893 {
4894     nxt_int_t           res;
4895     nxt_lvlhsh_query_t  lhq;
4896 
4897     lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
4898     lhq.key.length = sizeof(stream);
4899     lhq.key.start = (u_char *) &stream;
4900     lhq.proto = &lvlhsh_requests_proto;
4901     lhq.pool = NULL;
4902 
4903     if (remove) {
4904         res = nxt_lvlhsh_delete(request_hash, &lhq);
4905 
4906     } else {
4907         res = nxt_lvlhsh_find(request_hash, &lhq);
4908     }
4909 
4910     switch (res) {
4911 
4912     case NXT_OK:
4913         return lhq.value;
4914 
4915     default:
4916         return NULL;
4917     }
4918 }
4919 
4920 
4921 void
4922 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
4923 {
4924     int              log_fd, n;
4925     char             msg[NXT_MAX_ERROR_STR], *p, *end;
4926     pid_t            pid;
4927     va_list          ap;
4928     nxt_unit_impl_t  *lib;
4929 
4930     if (nxt_fast_path(ctx != NULL)) {
4931         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4932 
4933         pid = lib->pid;
4934         log_fd = lib->log_fd;
4935 
4936     } else {
4937         pid = getpid();
4938         log_fd = STDERR_FILENO;
4939     }
4940 
4941     p = msg;
4942     end = p + sizeof(msg) - 1;
4943 
4944     p = nxt_unit_snprint_prefix(p, end, pid, level);
4945 
4946     va_start(ap, fmt);
4947     p += vsnprintf(p, end - p, fmt, ap);
4948     va_end(ap);
4949 
4950     if (nxt_slow_path(p > end)) {
4951         memcpy(end - 5, "[...]", 5);
4952         p = end;
4953     }
4954 
4955     *p++ = '\n';
4956 
4957     n = write(log_fd, msg, p - msg);
4958     if (nxt_slow_path(n < 0)) {
4959         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
4960     }
4961 }
4962 
4963 
4964 void
4965 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
4966 {
4967     int                           log_fd, n;
4968     char                          msg[NXT_MAX_ERROR_STR], *p, *end;
4969     pid_t                         pid;
4970     va_list                       ap;
4971     nxt_unit_impl_t               *lib;
4972     nxt_unit_request_info_impl_t  *req_impl;
4973 
4974     if (nxt_fast_path(req != NULL)) {
4975         lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
4976 
4977         pid = lib->pid;
4978         log_fd = lib->log_fd;
4979 
4980     } else {
4981         pid = getpid();
4982         log_fd = STDERR_FILENO;
4983     }
4984 
4985     p = msg;
4986     end = p + sizeof(msg) - 1;
4987 
4988     p = nxt_unit_snprint_prefix(p, end, pid, level);
4989 
4990     if (nxt_fast_path(req != NULL)) {
4991         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
4992 
4993         p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
4994     }
4995 
4996     va_start(ap, fmt);
4997     p += vsnprintf(p, end - p, fmt, ap);
4998     va_end(ap);
4999 
5000     if (nxt_slow_path(p > end)) {
5001         memcpy(end - 5, "[...]", 5);
5002         p = end;
5003     }
5004 
5005     *p++ = '\n';
5006 
5007     n = write(log_fd, msg, p - msg);
5008     if (nxt_slow_path(n < 0)) {
5009         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
5010     }
5011 }
5012 
5013 
5014 static const char * nxt_unit_log_levels[] = {
5015     "alert",
5016     "error",
5017     "warn",
5018     "notice",
5019     "info",
5020     "debug",
5021 };
5022 
5023 
5024 static char *
5025 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
5026 {
5027     struct tm        tm;
5028     struct timespec  ts;
5029 
5030     (void) clock_gettime(CLOCK_REALTIME, &ts);
5031 
5032 #if (NXT_HAVE_LOCALTIME_R)
5033     (void) localtime_r(&ts.tv_sec, &tm);
5034 #else
5035     tm = *localtime(&ts.tv_sec);
5036 #endif
5037 
5038 #if (NXT_DEBUG)
5039     p += snprintf(p, end - p,
5040                   "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
5041                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
5042                   tm.tm_hour, tm.tm_min, tm.tm_sec,
5043                   (int) ts.tv_nsec / 1000000);
5044 #else
5045     p += snprintf(p, end - p,
5046                   "%4d/%02d/%02d %02d:%02d:%02d ",
5047                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
5048                   tm.tm_hour, tm.tm_min, tm.tm_sec);
5049 #endif
5050 
5051     p += snprintf(p, end - p,
5052                   "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
5053                   (int) pid,
5054                   (uint64_t) (uintptr_t) nxt_thread_get_tid());
5055 
5056     return p;
5057 }
5058 
5059 
5060 /* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */
5061 
5062 void *
5063 nxt_memalign(size_t alignment, size_t size)
5064 {
5065     void        *p;
5066     nxt_err_t   err;
5067 
5068     err = posix_memalign(&p, alignment, size);
5069 
5070     if (nxt_fast_path(err == 0)) {
5071         return p;
5072     }
5073 
5074     return NULL;
5075 }
5076 
5077 #if (NXT_DEBUG)
5078 
5079 void
5080 nxt_free(void *p)
5081 {
5082     free(p);
5083 }
5084 
5085 #endif
5086