xref: /unit/src/nxt_unit.c (revision 1437:1990c369a0b9)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <stdlib.h>
7 
8 #include "nxt_main.h"
9 #include "nxt_port_memory_int.h"
10 
11 #include "nxt_unit.h"
12 #include "nxt_unit_request.h"
13 #include "nxt_unit_response.h"
14 #include "nxt_unit_websocket.h"
15 
16 #include "nxt_websocket.h"
17 
18 #if (NXT_HAVE_MEMFD_CREATE)
19 #include <linux/memfd.h>
20 #endif
21 
22 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
23 #define NXT_UNIT_LOCAL_BUF_SIZE  \
24     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
25 
26 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
27 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
28 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
29 typedef struct nxt_unit_process_s               nxt_unit_process_t;
30 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
31 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
32 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
33 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
34 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
35 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
36 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
37 
38 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
39 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
40     nxt_unit_ctx_impl_t *ctx_impl, void *data);
41 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
42     nxt_unit_mmap_buf_t *mmap_buf);
43 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
44     nxt_unit_mmap_buf_t *mmap_buf);
45 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
46 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
47     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
48     uint32_t *shm_limit);
49 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
50     uint32_t stream);
51 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
52     nxt_unit_recv_msg_t *recv_msg);
53 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
54     nxt_unit_recv_msg_t *recv_msg);
55 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
56     nxt_unit_recv_msg_t *recv_msg);
57 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
58 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
59     nxt_unit_ctx_t *ctx);
60 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
61 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
62 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
63     nxt_unit_ctx_t *ctx);
64 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
65 static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
66 static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
67     nxt_unit_recv_msg_t *recv_msg);
68 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
69 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
70 static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
71     nxt_unit_mmap_buf_t *mmap_buf, int last);
72 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
73 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
74 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
75 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
76     nxt_unit_ctx_impl_t *ctx_impl);
77 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
78     nxt_unit_read_buf_t *rbuf);
79 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
80     nxt_unit_request_info_t *req, size_t size);
81 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
82     size_t size);
83 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
84     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
85     nxt_chunk_id_t *c, int *n, int min_n);
86 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
87 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
88 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
89 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
90     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
91 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
92     int fd);
93 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
94     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
95     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
96 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
97 
98 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
99 static void nxt_unit_process_use(nxt_unit_ctx_t *ctx,
100     nxt_unit_process_t *process, int i);
101 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
102 static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
103     nxt_unit_process_t *process, uint32_t id);
104 static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
105     nxt_unit_recv_msg_t *recv_msg);
106 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
107     nxt_unit_recv_msg_t *recv_msg);
108 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
109     nxt_unit_process_t *process,
110     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
111 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
112 
113 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
114     pid_t pid);
115 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
116     pid_t pid, int remove);
117 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
118 static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
119     nxt_unit_read_buf_t *rbuf);
120 static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
121     nxt_unit_port_id_t *port_id, int *fd);
122 
123 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
124     nxt_unit_port_id_t *new_port, int fd);
125 
126 static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx,
127     nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port,
128     nxt_unit_process_t **process);
129 static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx,
130     nxt_unit_process_t *process);
131 
132 static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx,
133     nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size,
134     const void *oob, size_t oob_size);
135 static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx,
136     nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
137     void *oob, size_t oob_size);
138 
139 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
140     nxt_unit_port_t *port);
141 static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
142     nxt_unit_port_id_t *port_id, int remove);
143 
144 static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
145     nxt_unit_request_info_impl_t *req_impl);
146 static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
147     nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
148 
149 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
150 
151 
152 struct nxt_unit_mmap_buf_s {
153     nxt_unit_buf_t           buf;
154 
155     nxt_unit_mmap_buf_t      *next;
156     nxt_unit_mmap_buf_t      **prev;
157 
158     nxt_port_mmap_header_t   *hdr;
159     nxt_unit_port_id_t       port_id;
160     nxt_unit_request_info_t  *req;
161     nxt_unit_ctx_impl_t      *ctx_impl;
162     nxt_unit_process_t       *process;
163     char                     *free_ptr;
164     char                     *plain_ptr;
165 };
166 
167 
168 struct nxt_unit_recv_msg_s {
169     uint32_t                 stream;
170     nxt_pid_t                pid;
171     nxt_port_id_t            reply_port;
172 
173     uint8_t                  last;      /* 1 bit */
174     uint8_t                  mmap;      /* 1 bit */
175 
176     void                     *start;
177     uint32_t                 size;
178 
179     int                      fd;
180     nxt_unit_process_t       *process;
181 
182     nxt_unit_mmap_buf_t      *incoming_buf;
183 };
184 
185 
186 typedef enum {
187     NXT_UNIT_RS_START           = 0,
188     NXT_UNIT_RS_RESPONSE_INIT,
189     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
190     NXT_UNIT_RS_RESPONSE_SENT,
191     NXT_UNIT_RS_RELEASED,
192 } nxt_unit_req_state_t;
193 
194 
195 struct nxt_unit_request_info_impl_s {
196     nxt_unit_request_info_t  req;
197 
198     uint32_t                 stream;
199 
200     nxt_unit_process_t       *process;
201 
202     nxt_unit_mmap_buf_t      *outgoing_buf;
203     nxt_unit_mmap_buf_t      *incoming_buf;
204 
205     nxt_unit_req_state_t     state;
206     uint8_t                  websocket;
207 
208     nxt_queue_link_t         link;
209 
210     char                     extra_data[];
211 };
212 
213 
214 struct nxt_unit_websocket_frame_impl_s {
215     nxt_unit_websocket_frame_t  ws;
216 
217     nxt_unit_mmap_buf_t         *buf;
218 
219     nxt_queue_link_t            link;
220 
221     nxt_unit_ctx_impl_t         *ctx_impl;
222 };
223 
224 
225 struct nxt_unit_read_buf_s {
226     nxt_unit_read_buf_t           *next;
227     ssize_t                       size;
228     char                          buf[16384];
229     char                          oob[256];
230 };
231 
232 
233 struct nxt_unit_ctx_impl_s {
234     nxt_unit_ctx_t                ctx;
235 
236     pthread_mutex_t               mutex;
237 
238     nxt_unit_port_id_t            read_port_id;
239     int                           read_port_fd;
240 
241     nxt_queue_link_t              link;
242 
243     nxt_unit_mmap_buf_t           *free_buf;
244 
245     /*  of nxt_unit_request_info_impl_t */
246     nxt_queue_t                   free_req;
247 
248     /*  of nxt_unit_websocket_frame_impl_t */
249     nxt_queue_t                   free_ws;
250 
251     /*  of nxt_unit_request_info_impl_t */
252     nxt_queue_t                   active_req;
253 
254     /*  of nxt_unit_request_info_impl_t */
255     nxt_lvlhsh_t                  requests;
256 
257     nxt_unit_read_buf_t           *pending_read_head;
258     nxt_unit_read_buf_t           **pending_read_tail;
259     nxt_unit_read_buf_t           *free_read_buf;
260 
261     nxt_unit_mmap_buf_t           ctx_buf[2];
262     nxt_unit_read_buf_t           ctx_read_buf;
263 
264     nxt_unit_request_info_impl_t  req;
265 };
266 
267 
268 struct nxt_unit_impl_s {
269     nxt_unit_t               unit;
270     nxt_unit_callbacks_t     callbacks;
271 
272     uint32_t                 request_data_size;
273     uint32_t                 shm_mmap_limit;
274 
275     pthread_mutex_t          mutex;
276 
277     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
278     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
279 
280     nxt_unit_port_id_t       ready_port_id;
281 
282     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
283 
284     pid_t                    pid;
285     int                      log_fd;
286     int                      online;
287 
288     nxt_unit_ctx_impl_t      main_ctx;
289 };
290 
291 
292 struct nxt_unit_port_impl_s {
293     nxt_unit_port_t          port;
294 
295     nxt_queue_link_t         link;
296     nxt_unit_process_t       *process;
297 };
298 
299 
300 struct nxt_unit_mmap_s {
301     nxt_port_mmap_header_t   *hdr;
302 };
303 
304 
305 struct nxt_unit_mmaps_s {
306     pthread_mutex_t          mutex;
307     uint32_t                 size;
308     uint32_t                 cap;
309     nxt_atomic_t             allocated_chunks;
310     nxt_unit_mmap_t          *elts;
311 };
312 
313 
314 struct nxt_unit_process_s {
315     pid_t                    pid;
316 
317     nxt_queue_t              ports;
318 
319     nxt_unit_mmaps_t         incoming;
320     nxt_unit_mmaps_t         outgoing;
321 
322     nxt_unit_impl_t          *lib;
323 
324     nxt_atomic_t             use_count;
325 
326     uint32_t                 next_port_id;
327 };
328 
329 
330 /* Explicitly using 32 bit types to avoid possible alignment. */
331 typedef struct {
332     int32_t   pid;
333     uint32_t  id;
334 } nxt_unit_port_hash_id_t;
335 
336 
337 nxt_unit_ctx_t *
338 nxt_unit_init(nxt_unit_init_t *init)
339 {
340     int              rc;
341     uint32_t         ready_stream, shm_limit;
342     nxt_unit_ctx_t   *ctx;
343     nxt_unit_impl_t  *lib;
344     nxt_unit_port_t  ready_port, read_port;
345 
346     lib = nxt_unit_create(init);
347     if (nxt_slow_path(lib == NULL)) {
348         return NULL;
349     }
350 
351     if (init->ready_port.id.pid != 0
352         && init->ready_stream != 0
353         && init->read_port.id.pid != 0)
354     {
355         ready_port = init->ready_port;
356         ready_stream = init->ready_stream;
357         read_port = init->read_port;
358         lib->log_fd = init->log_fd;
359 
360         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
361                               ready_port.id.id);
362         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
363                               read_port.id.id);
364 
365     } else {
366         rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd,
367                                &ready_stream, &shm_limit);
368         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
369             goto fail;
370         }
371 
372         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
373                                 / PORT_MMAP_DATA_SIZE;
374     }
375 
376     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
377         lib->shm_mmap_limit = 1;
378     }
379 
380     lib->pid = read_port.id.pid;
381     ctx = &lib->main_ctx.ctx;
382 
383     rc = lib->callbacks.add_port(ctx, &ready_port);
384     if (rc != NXT_UNIT_OK) {
385         nxt_unit_alert(NULL, "failed to add ready_port");
386 
387         goto fail;
388     }
389 
390     rc = lib->callbacks.add_port(ctx, &read_port);
391     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
392         nxt_unit_alert(NULL, "failed to add read_port");
393 
394         goto fail;
395     }
396 
397     lib->main_ctx.read_port_id = read_port.id;
398     lib->ready_port_id = ready_port.id;
399 
400     rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream);
401     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
402         nxt_unit_alert(NULL, "failed to send READY message");
403 
404         goto fail;
405     }
406 
407     return ctx;
408 
409 fail:
410 
411     free(lib);
412 
413     return NULL;
414 }
415 
416 
417 static nxt_unit_impl_t *
418 nxt_unit_create(nxt_unit_init_t *init)
419 {
420     int                   rc;
421     nxt_unit_impl_t       *lib;
422     nxt_unit_callbacks_t  *cb;
423 
424     lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size);
425     if (nxt_slow_path(lib == NULL)) {
426         nxt_unit_alert(NULL, "failed to allocate unit struct");
427 
428         return NULL;
429     }
430 
431     rc = pthread_mutex_init(&lib->mutex, NULL);
432     if (nxt_slow_path(rc != 0)) {
433         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
434 
435         goto fail;
436     }
437 
438     lib->unit.data = init->data;
439     lib->callbacks = init->callbacks;
440 
441     lib->request_data_size = init->request_data_size;
442     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
443                             / PORT_MMAP_DATA_SIZE;
444 
445     lib->processes.slot = NULL;
446     lib->ports.slot = NULL;
447 
448     lib->log_fd = STDERR_FILENO;
449     lib->online = 1;
450 
451     nxt_queue_init(&lib->contexts);
452 
453     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
454     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
455         goto fail;
456     }
457 
458     cb = &lib->callbacks;
459 
460     if (cb->request_handler == NULL) {
461         nxt_unit_alert(NULL, "request_handler is NULL");
462 
463         goto fail;
464     }
465 
466     if (cb->add_port == NULL) {
467         cb->add_port = nxt_unit_add_port;
468     }
469 
470     if (cb->remove_port == NULL) {
471         cb->remove_port = nxt_unit_remove_port;
472     }
473 
474     if (cb->remove_pid == NULL) {
475         cb->remove_pid = nxt_unit_remove_pid;
476     }
477 
478     if (cb->quit == NULL) {
479         cb->quit = nxt_unit_quit;
480     }
481 
482     if (cb->port_send == NULL) {
483         cb->port_send = nxt_unit_port_send_default;
484     }
485 
486     if (cb->port_recv == NULL) {
487         cb->port_recv = nxt_unit_port_recv_default;
488     }
489 
490     return lib;
491 
492 fail:
493 
494     free(lib);
495 
496     return NULL;
497 }
498 
499 
500 static int
501 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
502     void *data)
503 {
504     int  rc;
505 
506     ctx_impl->ctx.data = data;
507     ctx_impl->ctx.unit = &lib->unit;
508 
509     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
510 
511     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
512     if (nxt_slow_path(rc != 0)) {
513         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
514 
515         return NXT_UNIT_ERROR;
516     }
517 
518     nxt_queue_init(&ctx_impl->free_req);
519     nxt_queue_init(&ctx_impl->free_ws);
520     nxt_queue_init(&ctx_impl->active_req);
521 
522     ctx_impl->free_buf = NULL;
523     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
524     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
525 
526     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
527 
528     ctx_impl->pending_read_head = NULL;
529     ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
530     ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf;
531     ctx_impl->ctx_read_buf.next = NULL;
532 
533     ctx_impl->req.req.ctx = &ctx_impl->ctx;
534     ctx_impl->req.req.unit = &lib->unit;
535 
536     ctx_impl->read_port_fd = -1;
537     ctx_impl->requests.slot = 0;
538 
539     return NXT_UNIT_OK;
540 }
541 
542 
543 nxt_inline void
544 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
545     nxt_unit_mmap_buf_t *mmap_buf)
546 {
547     mmap_buf->next = *head;
548 
549     if (mmap_buf->next != NULL) {
550         mmap_buf->next->prev = &mmap_buf->next;
551     }
552 
553     *head = mmap_buf;
554     mmap_buf->prev = head;
555 }
556 
557 
558 nxt_inline void
559 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
560     nxt_unit_mmap_buf_t *mmap_buf)
561 {
562     while (*prev != NULL) {
563         prev = &(*prev)->next;
564     }
565 
566     nxt_unit_mmap_buf_insert(prev, mmap_buf);
567 }
568 
569 
570 nxt_inline void
571 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
572 {
573     nxt_unit_mmap_buf_t  **prev;
574 
575     prev = mmap_buf->prev;
576 
577     if (mmap_buf->next != NULL) {
578         mmap_buf->next->prev = prev;
579     }
580 
581     if (prev != NULL) {
582         *prev = mmap_buf->next;
583     }
584 }
585 
586 
587 static int
588 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
589     int *log_fd, uint32_t *stream, uint32_t *shm_limit)
590 {
591     int       rc;
592     int       ready_fd, read_fd;
593     char      *unit_init, *version_end;
594     long      version_length;
595     int64_t   ready_pid, read_pid;
596     uint32_t  ready_stream, ready_id, read_id;
597 
598     unit_init = getenv(NXT_UNIT_INIT_ENV);
599     if (nxt_slow_path(unit_init == NULL)) {
600         nxt_unit_alert(NULL, "%s is not in the current environment",
601                        NXT_UNIT_INIT_ENV);
602 
603         return NXT_UNIT_ERROR;
604     }
605 
606     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
607 
608     version_length = nxt_length(NXT_VERSION);
609 
610     version_end = strchr(unit_init, ';');
611     if (version_end == NULL
612         || version_end - unit_init != version_length
613         || memcmp(unit_init, NXT_VERSION, version_length) != 0)
614     {
615         nxt_unit_alert(NULL, "version check error");
616 
617         return NXT_UNIT_ERROR;
618     }
619 
620     rc = sscanf(version_end + 1,
621                 "%"PRIu32";"
622                 "%"PRId64",%"PRIu32",%d;"
623                 "%"PRId64",%"PRIu32",%d;"
624                 "%d,%"PRIu32,
625                 &ready_stream,
626                 &ready_pid, &ready_id, &ready_fd,
627                 &read_pid, &read_id, &read_fd,
628                 log_fd, shm_limit);
629 
630     if (nxt_slow_path(rc != 9)) {
631         nxt_unit_alert(NULL, "failed to scan variables: %d", rc);
632 
633         return NXT_UNIT_ERROR;
634     }
635 
636     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
637 
638     ready_port->in_fd = -1;
639     ready_port->out_fd = ready_fd;
640     ready_port->data = NULL;
641 
642     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
643 
644     read_port->in_fd = read_fd;
645     read_port->out_fd = -1;
646     read_port->data = NULL;
647 
648     *stream = ready_stream;
649 
650     return NXT_UNIT_OK;
651 }
652 
653 
654 static int
655 nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
656     uint32_t stream)
657 {
658     ssize_t          res;
659     nxt_port_msg_t   msg;
660     nxt_unit_impl_t  *lib;
661 
662     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
663 
664     msg.stream = stream;
665     msg.pid = lib->pid;
666     msg.reply_port = 0;
667     msg.type = _NXT_PORT_MSG_PROCESS_READY;
668     msg.last = 1;
669     msg.mmap = 0;
670     msg.nf = 0;
671     msg.mf = 0;
672     msg.tracking = 0;
673 
674     res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
675     if (res != sizeof(msg)) {
676         return NXT_UNIT_ERROR;
677     }
678 
679     return NXT_UNIT_OK;
680 }
681 
682 
683 int
684 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
685     void *buf, size_t buf_size, void *oob, size_t oob_size)
686 {
687     int                   rc;
688     pid_t                 pid;
689     struct cmsghdr        *cm;
690     nxt_port_msg_t        *port_msg;
691     nxt_unit_impl_t       *lib;
692     nxt_unit_recv_msg_t   recv_msg;
693     nxt_unit_callbacks_t  *cb;
694 
695     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
696 
697     rc = NXT_UNIT_ERROR;
698     recv_msg.fd = -1;
699     recv_msg.process = NULL;
700     port_msg = buf;
701     cm = oob;
702 
703     if (oob_size >= CMSG_SPACE(sizeof(int))
704         && cm->cmsg_len == CMSG_LEN(sizeof(int))
705         && cm->cmsg_level == SOL_SOCKET
706         && cm->cmsg_type == SCM_RIGHTS)
707     {
708         memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
709     }
710 
711     recv_msg.incoming_buf = NULL;
712 
713     if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
714         nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
715         goto fail;
716     }
717 
718     recv_msg.stream = port_msg->stream;
719     recv_msg.pid = port_msg->pid;
720     recv_msg.reply_port = port_msg->reply_port;
721     recv_msg.last = port_msg->last;
722     recv_msg.mmap = port_msg->mmap;
723 
724     recv_msg.start = port_msg + 1;
725     recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
726 
727     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
728         nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
729                       port_msg->stream, (int) port_msg->type);
730         goto fail;
731     }
732 
733     if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
734         rc = NXT_UNIT_OK;
735 
736         goto fail;
737     }
738 
739     /* Fragmentation is unsupported. */
740     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
741         nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
742                       port_msg->stream, (int) port_msg->type);
743         goto fail;
744     }
745 
746     if (port_msg->mmap) {
747         if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
748             goto fail;
749         }
750     }
751 
752     cb = &lib->callbacks;
753 
754     switch (port_msg->type) {
755 
756     case _NXT_PORT_MSG_QUIT:
757         nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
758 
759         cb->quit(ctx);
760         rc = NXT_UNIT_OK;
761         break;
762 
763     case _NXT_PORT_MSG_NEW_PORT:
764         rc = nxt_unit_process_new_port(ctx, &recv_msg);
765         break;
766 
767     case _NXT_PORT_MSG_CHANGE_FILE:
768         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
769                        port_msg->stream, recv_msg.fd);
770 
771         if (dup2(recv_msg.fd, lib->log_fd) == -1) {
772             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
773                            port_msg->stream, recv_msg.fd, lib->log_fd,
774                            strerror(errno), errno);
775 
776             goto fail;
777         }
778 
779         rc = NXT_UNIT_OK;
780         break;
781 
782     case _NXT_PORT_MSG_MMAP:
783         if (nxt_slow_path(recv_msg.fd < 0)) {
784             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
785                            port_msg->stream, recv_msg.fd);
786 
787             goto fail;
788         }
789 
790         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
791         break;
792 
793     case _NXT_PORT_MSG_REQ_HEADERS:
794         rc = nxt_unit_process_req_headers(ctx, &recv_msg);
795         break;
796 
797     case _NXT_PORT_MSG_WEBSOCKET:
798         rc = nxt_unit_process_websocket(ctx, &recv_msg);
799         break;
800 
801     case _NXT_PORT_MSG_REMOVE_PID:
802         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
803             nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
804                           "(%d != %d)", port_msg->stream, (int) recv_msg.size,
805                           (int) sizeof(pid));
806 
807             goto fail;
808         }
809 
810         memcpy(&pid, recv_msg.start, sizeof(pid));
811 
812         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
813                        port_msg->stream, (int) pid);
814 
815         cb->remove_pid(ctx, pid);
816 
817         rc = NXT_UNIT_OK;
818         break;
819 
820     case _NXT_PORT_MSG_SHM_ACK:
821         rc = nxt_unit_process_shm_ack(ctx);
822         break;
823 
824     default:
825         nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
826                        port_msg->stream, (int) port_msg->type);
827 
828         goto fail;
829     }
830 
831 fail:
832 
833     if (recv_msg.fd != -1) {
834         close(recv_msg.fd);
835     }
836 
837     while (recv_msg.incoming_buf != NULL) {
838         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
839     }
840 
841     if (recv_msg.process != NULL) {
842         nxt_unit_process_use(ctx, recv_msg.process, -1);
843     }
844 
845     return rc;
846 }
847 
848 
849 static int
850 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
851 {
852     int                      nb;
853     nxt_unit_impl_t          *lib;
854     nxt_unit_port_t          new_port;
855     nxt_port_msg_new_port_t  *new_port_msg;
856 
857     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
858         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
859                       "invalid message size (%d)",
860                       recv_msg->stream, (int) recv_msg->size);
861 
862         return NXT_UNIT_ERROR;
863     }
864 
865     if (nxt_slow_path(recv_msg->fd < 0)) {
866         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
867                        recv_msg->stream, recv_msg->fd);
868 
869         return NXT_UNIT_ERROR;
870     }
871 
872     new_port_msg = recv_msg->start;
873 
874     nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
875                    recv_msg->stream, (int) new_port_msg->pid,
876                    (int) new_port_msg->id, recv_msg->fd);
877 
878     nb = 0;
879 
880     if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
881         nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
882                        "failed: %s (%d)",
883                        recv_msg->stream, recv_msg->fd, strerror(errno), errno);
884 
885         return NXT_UNIT_ERROR;
886     }
887 
888     nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
889                           new_port_msg->id);
890 
891     new_port.in_fd = -1;
892     new_port.out_fd = recv_msg->fd;
893     new_port.data = NULL;
894 
895     recv_msg->fd = -1;
896 
897     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
898 
899     return lib->callbacks.add_port(ctx, &new_port);
900 }
901 
902 
903 static int
904 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
905 {
906     nxt_unit_impl_t               *lib;
907     nxt_unit_request_t            *r;
908     nxt_unit_mmap_buf_t           *b;
909     nxt_unit_request_info_t       *req;
910     nxt_unit_request_info_impl_t  *req_impl;
911 
912     if (nxt_slow_path(recv_msg->mmap == 0)) {
913         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
914                       recv_msg->stream);
915 
916         return NXT_UNIT_ERROR;
917     }
918 
919     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
920         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
921                       "%d expected", recv_msg->stream, (int) recv_msg->size,
922                       (int) sizeof(nxt_unit_request_t));
923 
924         return NXT_UNIT_ERROR;
925     }
926 
927     req_impl = nxt_unit_request_info_get(ctx);
928     if (nxt_slow_path(req_impl == NULL)) {
929         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
930                       recv_msg->stream);
931 
932         return NXT_UNIT_ERROR;
933     }
934 
935     req = &req_impl->req;
936 
937     nxt_unit_port_id_init(&req->response_port, recv_msg->pid,
938                           recv_msg->reply_port);
939 
940     req->request = recv_msg->start;
941 
942     b = recv_msg->incoming_buf;
943 
944     req->request_buf = &b->buf;
945     req->response = NULL;
946     req->response_buf = NULL;
947 
948     r = req->request;
949 
950     req->content_length = r->content_length;
951 
952     req->content_buf = req->request_buf;
953     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
954 
955     /* "Move" process reference to req_impl. */
956     req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
957     if (nxt_slow_path(req_impl->process == NULL)) {
958         return NXT_UNIT_ERROR;
959     }
960 
961     recv_msg->process = NULL;
962 
963     req_impl->stream = recv_msg->stream;
964 
965     req_impl->outgoing_buf = NULL;
966 
967     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
968         b->req = req;
969     }
970 
971     /* "Move" incoming buffer list to req_impl. */
972     req_impl->incoming_buf = recv_msg->incoming_buf;
973     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
974     recv_msg->incoming_buf = NULL;
975 
976     req->content_fd = recv_msg->fd;
977     recv_msg->fd = -1;
978 
979     req->response_max_fields = 0;
980     req_impl->state = NXT_UNIT_RS_START;
981     req_impl->websocket = 0;
982 
983     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
984                    (int) r->method_length,
985                    (char *) nxt_unit_sptr_get(&r->method),
986                    (int) r->target_length,
987                    (char *) nxt_unit_sptr_get(&r->target),
988                    (int) r->content_length);
989 
990     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
991 
992     lib->callbacks.request_handler(req);
993 
994     return NXT_UNIT_OK;
995 }
996 
997 
998 static int
999 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1000 {
1001     size_t                           hsize;
1002     nxt_unit_impl_t                  *lib;
1003     nxt_unit_mmap_buf_t              *b;
1004     nxt_unit_ctx_impl_t              *ctx_impl;
1005     nxt_unit_callbacks_t             *cb;
1006     nxt_unit_request_info_t          *req;
1007     nxt_unit_request_info_impl_t     *req_impl;
1008     nxt_unit_websocket_frame_impl_t  *ws_impl;
1009 
1010     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1011 
1012     req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
1013                                           recv_msg->last);
1014     if (req_impl == NULL) {
1015         return NXT_UNIT_OK;
1016     }
1017 
1018     req = &req_impl->req;
1019 
1020     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1021     cb = &lib->callbacks;
1022 
1023     if (cb->websocket_handler && recv_msg->size >= 2) {
1024         ws_impl = nxt_unit_websocket_frame_get(ctx);
1025         if (nxt_slow_path(ws_impl == NULL)) {
1026             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1027                           req_impl->stream);
1028 
1029             return NXT_UNIT_ERROR;
1030         }
1031 
1032         ws_impl->ws.req = req;
1033 
1034         ws_impl->buf = NULL;
1035 
1036         if (recv_msg->mmap) {
1037             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1038                 b->req = req;
1039             }
1040 
1041             /* "Move" incoming buffer list to ws_impl. */
1042             ws_impl->buf = recv_msg->incoming_buf;
1043             ws_impl->buf->prev = &ws_impl->buf;
1044             recv_msg->incoming_buf = NULL;
1045 
1046             b = ws_impl->buf;
1047 
1048         } else {
1049             b = nxt_unit_mmap_buf_get(ctx);
1050             if (nxt_slow_path(b == NULL)) {
1051                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1052                                req_impl->stream);
1053 
1054                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1055 
1056                 return NXT_UNIT_ERROR;
1057             }
1058 
1059             b->req = req;
1060             b->buf.start = recv_msg->start;
1061             b->buf.free = b->buf.start;
1062             b->buf.end = b->buf.start + recv_msg->size;
1063 
1064             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1065         }
1066 
1067         ws_impl->ws.header = (void *) b->buf.start;
1068         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1069             ws_impl->ws.header);
1070 
1071         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1072 
1073         if (ws_impl->ws.header->mask) {
1074             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1075 
1076         } else {
1077             ws_impl->ws.mask = NULL;
1078         }
1079 
1080         b->buf.free += hsize;
1081 
1082         ws_impl->ws.content_buf = &b->buf;
1083         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1084 
1085         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1086                            "payload_len=%"PRIu64,
1087                             ws_impl->ws.header->opcode,
1088                             ws_impl->ws.payload_len);
1089 
1090         cb->websocket_handler(&ws_impl->ws);
1091     }
1092 
1093     if (recv_msg->last) {
1094         req_impl->websocket = 0;
1095 
1096         if (cb->close_handler) {
1097             nxt_unit_req_debug(req, "close_handler");
1098 
1099             cb->close_handler(req);
1100 
1101         } else {
1102             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1103         }
1104     }
1105 
1106     return NXT_UNIT_OK;
1107 }
1108 
1109 
1110 static int
1111 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1112 {
1113     nxt_unit_impl_t       *lib;
1114     nxt_unit_callbacks_t  *cb;
1115 
1116     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1117     cb = &lib->callbacks;
1118 
1119     if (cb->shm_ack_handler != NULL) {
1120         cb->shm_ack_handler(ctx);
1121     }
1122 
1123     return NXT_UNIT_OK;
1124 }
1125 
1126 
1127 static nxt_unit_request_info_impl_t *
1128 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1129 {
1130     nxt_unit_impl_t               *lib;
1131     nxt_queue_link_t              *lnk;
1132     nxt_unit_ctx_impl_t           *ctx_impl;
1133     nxt_unit_request_info_impl_t  *req_impl;
1134 
1135     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1136 
1137     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1138 
1139     pthread_mutex_lock(&ctx_impl->mutex);
1140 
1141     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1142         pthread_mutex_unlock(&ctx_impl->mutex);
1143 
1144         req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
1145                           + lib->request_data_size);
1146         if (nxt_slow_path(req_impl == NULL)) {
1147             return NULL;
1148         }
1149 
1150         req_impl->req.unit = ctx->unit;
1151         req_impl->req.ctx = ctx;
1152 
1153         pthread_mutex_lock(&ctx_impl->mutex);
1154 
1155     } else {
1156         lnk = nxt_queue_first(&ctx_impl->free_req);
1157         nxt_queue_remove(lnk);
1158 
1159         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1160     }
1161 
1162     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1163 
1164     pthread_mutex_unlock(&ctx_impl->mutex);
1165 
1166     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1167 
1168     return req_impl;
1169 }
1170 
1171 
1172 static void
1173 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1174 {
1175     nxt_unit_ctx_impl_t           *ctx_impl;
1176     nxt_unit_request_info_impl_t  *req_impl;
1177 
1178     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1179     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1180 
1181     req->response = NULL;
1182     req->response_buf = NULL;
1183 
1184     if (req_impl->websocket) {
1185         nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
1186 
1187         req_impl->websocket = 0;
1188     }
1189 
1190     while (req_impl->outgoing_buf != NULL) {
1191         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1192     }
1193 
1194     while (req_impl->incoming_buf != NULL) {
1195         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1196     }
1197 
1198     if (req->content_fd != -1) {
1199         close(req->content_fd);
1200 
1201         req->content_fd = -1;
1202     }
1203 
1204     /*
1205      * Process release should go after buffers release to guarantee mmap
1206      * existence.
1207      */
1208     if (req_impl->process != NULL) {
1209         nxt_unit_process_use(req->ctx, req_impl->process, -1);
1210 
1211         req_impl->process = NULL;
1212     }
1213 
1214     pthread_mutex_lock(&ctx_impl->mutex);
1215 
1216     nxt_queue_remove(&req_impl->link);
1217 
1218     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1219 
1220     pthread_mutex_unlock(&ctx_impl->mutex);
1221 
1222     req_impl->state = NXT_UNIT_RS_RELEASED;
1223 }
1224 
1225 
1226 static void
1227 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1228 {
1229     nxt_unit_ctx_impl_t  *ctx_impl;
1230 
1231     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1232 
1233     nxt_queue_remove(&req_impl->link);
1234 
1235     if (req_impl != &ctx_impl->req) {
1236         free(req_impl);
1237     }
1238 }
1239 
1240 
1241 static nxt_unit_websocket_frame_impl_t *
1242 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1243 {
1244     nxt_queue_link_t                 *lnk;
1245     nxt_unit_ctx_impl_t              *ctx_impl;
1246     nxt_unit_websocket_frame_impl_t  *ws_impl;
1247 
1248     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1249 
1250     pthread_mutex_lock(&ctx_impl->mutex);
1251 
1252     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1253         pthread_mutex_unlock(&ctx_impl->mutex);
1254 
1255         ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
1256         if (nxt_slow_path(ws_impl == NULL)) {
1257             return NULL;
1258         }
1259 
1260     } else {
1261         lnk = nxt_queue_first(&ctx_impl->free_ws);
1262         nxt_queue_remove(lnk);
1263 
1264         pthread_mutex_unlock(&ctx_impl->mutex);
1265 
1266         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1267     }
1268 
1269     ws_impl->ctx_impl = ctx_impl;
1270 
1271     return ws_impl;
1272 }
1273 
1274 
1275 static void
1276 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1277 {
1278     nxt_unit_websocket_frame_impl_t  *ws_impl;
1279 
1280     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1281 
1282     while (ws_impl->buf != NULL) {
1283         nxt_unit_mmap_buf_free(ws_impl->buf);
1284     }
1285 
1286     ws->req = NULL;
1287 
1288     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1289 
1290     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1291 
1292     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1293 }
1294 
1295 
1296 static void
1297 nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
1298 {
1299     nxt_queue_remove(&ws_impl->link);
1300 
1301     free(ws_impl);
1302 }
1303 
1304 
1305 uint16_t
1306 nxt_unit_field_hash(const char *name, size_t name_length)
1307 {
1308     u_char      ch;
1309     uint32_t    hash;
1310     const char  *p, *end;
1311 
1312     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1313     end = name + name_length;
1314 
1315     for (p = name; p < end; p++) {
1316         ch = *p;
1317         hash = (hash << 4) + hash + nxt_lowcase(ch);
1318     }
1319 
1320     hash = (hash >> 16) ^ hash;
1321 
1322     return hash;
1323 }
1324 
1325 
1326 void
1327 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1328 {
1329     uint32_t            i, j;
1330     nxt_unit_field_t    *fields, f;
1331     nxt_unit_request_t  *r;
1332 
1333     nxt_unit_req_debug(req, "group_dup_fields");
1334 
1335     r = req->request;
1336     fields = r->fields;
1337 
1338     for (i = 0; i < r->fields_count; i++) {
1339 
1340         switch (fields[i].hash) {
1341         case NXT_UNIT_HASH_CONTENT_LENGTH:
1342             r->content_length_field = i;
1343             break;
1344 
1345         case NXT_UNIT_HASH_CONTENT_TYPE:
1346             r->content_type_field = i;
1347             break;
1348 
1349         case NXT_UNIT_HASH_COOKIE:
1350             r->cookie_field = i;
1351             break;
1352         };
1353 
1354         for (j = i + 1; j < r->fields_count; j++) {
1355             if (fields[i].hash != fields[j].hash) {
1356                 continue;
1357             }
1358 
1359             if (j == i + 1) {
1360                 continue;
1361             }
1362 
1363             f = fields[j];
1364             f.name.offset += (j - (i + 1)) * sizeof(f);
1365             f.value.offset += (j - (i + 1)) * sizeof(f);
1366 
1367             while (j > i + 1) {
1368                 fields[j] = fields[j - 1];
1369                 fields[j].name.offset -= sizeof(f);
1370                 fields[j].value.offset -= sizeof(f);
1371                 j--;
1372             }
1373 
1374             fields[j] = f;
1375 
1376             i++;
1377         }
1378     }
1379 }
1380 
1381 
1382 int
1383 nxt_unit_response_init(nxt_unit_request_info_t *req,
1384     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
1385 {
1386     uint32_t                      buf_size;
1387     nxt_unit_buf_t                *buf;
1388     nxt_unit_request_info_impl_t  *req_impl;
1389 
1390     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1391 
1392     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1393         nxt_unit_req_warn(req, "init: response already sent");
1394 
1395         return NXT_UNIT_ERROR;
1396     }
1397 
1398     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
1399                        (int) max_fields_count, (int) max_fields_size);
1400 
1401     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
1402         nxt_unit_req_debug(req, "duplicate response init");
1403     }
1404 
1405     /*
1406      * Each field name and value 0-terminated by libunit,
1407      * this is the reason of '+ 2' below.
1408      */
1409     buf_size = sizeof(nxt_unit_response_t)
1410                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
1411                + max_fields_size;
1412 
1413     if (nxt_slow_path(req->response_buf != NULL)) {
1414         buf = req->response_buf;
1415 
1416         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
1417             goto init_response;
1418         }
1419 
1420         nxt_unit_buf_free(buf);
1421 
1422         req->response_buf = NULL;
1423         req->response = NULL;
1424         req->response_max_fields = 0;
1425 
1426         req_impl->state = NXT_UNIT_RS_START;
1427     }
1428 
1429     buf = nxt_unit_response_buf_alloc(req, buf_size);
1430     if (nxt_slow_path(buf == NULL)) {
1431         return NXT_UNIT_ERROR;
1432     }
1433 
1434 init_response:
1435 
1436     memset(buf->start, 0, sizeof(nxt_unit_response_t));
1437 
1438     req->response_buf = buf;
1439 
1440     req->response = (nxt_unit_response_t *) buf->start;
1441     req->response->status = status;
1442 
1443     buf->free = buf->start + sizeof(nxt_unit_response_t)
1444                 + max_fields_count * sizeof(nxt_unit_field_t);
1445 
1446     req->response_max_fields = max_fields_count;
1447     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
1448 
1449     return NXT_UNIT_OK;
1450 }
1451 
1452 
1453 int
1454 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
1455     uint32_t max_fields_count, uint32_t max_fields_size)
1456 {
1457     char                          *p;
1458     uint32_t                      i, buf_size;
1459     nxt_unit_buf_t                *buf;
1460     nxt_unit_field_t              *f, *src;
1461     nxt_unit_response_t           *resp;
1462     nxt_unit_request_info_impl_t  *req_impl;
1463 
1464     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1465 
1466     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1467         nxt_unit_req_warn(req, "realloc: response not init");
1468 
1469         return NXT_UNIT_ERROR;
1470     }
1471 
1472     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1473         nxt_unit_req_warn(req, "realloc: response already sent");
1474 
1475         return NXT_UNIT_ERROR;
1476     }
1477 
1478     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
1479         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
1480 
1481         return NXT_UNIT_ERROR;
1482     }
1483 
1484     /*
1485      * Each field name and value 0-terminated by libunit,
1486      * this is the reason of '+ 2' below.
1487      */
1488     buf_size = sizeof(nxt_unit_response_t)
1489                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
1490                + max_fields_size;
1491 
1492     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
1493 
1494     buf = nxt_unit_response_buf_alloc(req, buf_size);
1495     if (nxt_slow_path(buf == NULL)) {
1496         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
1497         return NXT_UNIT_ERROR;
1498     }
1499 
1500     resp = (nxt_unit_response_t *) buf->start;
1501 
1502     memset(resp, 0, sizeof(nxt_unit_response_t));
1503 
1504     resp->status = req->response->status;
1505     resp->content_length = req->response->content_length;
1506 
1507     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
1508     f = resp->fields;
1509 
1510     for (i = 0; i < req->response->fields_count; i++) {
1511         src = req->response->fields + i;
1512 
1513         if (nxt_slow_path(src->skip != 0)) {
1514             continue;
1515         }
1516 
1517         if (nxt_slow_path(src->name_length + src->value_length + 2
1518                           > (uint32_t) (buf->end - p)))
1519         {
1520             nxt_unit_req_warn(req, "realloc: not enough space for field"
1521                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
1522                   i, src, src->name_length, src->value_length);
1523 
1524             goto fail;
1525         }
1526 
1527         nxt_unit_sptr_set(&f->name, p);
1528         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
1529         *p++ = '\0';
1530 
1531         nxt_unit_sptr_set(&f->value, p);
1532         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
1533         *p++ = '\0';
1534 
1535         f->hash = src->hash;
1536         f->skip = 0;
1537         f->name_length = src->name_length;
1538         f->value_length = src->value_length;
1539 
1540         resp->fields_count++;
1541         f++;
1542     }
1543 
1544     if (req->response->piggyback_content_length > 0) {
1545         if (nxt_slow_path(req->response->piggyback_content_length
1546                           > (uint32_t) (buf->end - p)))
1547         {
1548             nxt_unit_req_warn(req, "realloc: not enought space for content"
1549                   " #%"PRIu32", %"PRIu32" required",
1550                   i, req->response->piggyback_content_length);
1551 
1552             goto fail;
1553         }
1554 
1555         resp->piggyback_content_length =
1556                                        req->response->piggyback_content_length;
1557 
1558         nxt_unit_sptr_set(&resp->piggyback_content, p);
1559         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
1560                        req->response->piggyback_content_length);
1561     }
1562 
1563     buf->free = p;
1564 
1565     nxt_unit_buf_free(req->response_buf);
1566 
1567     req->response = resp;
1568     req->response_buf = buf;
1569     req->response_max_fields = max_fields_count;
1570 
1571     return NXT_UNIT_OK;
1572 
1573 fail:
1574 
1575     nxt_unit_buf_free(buf);
1576 
1577     return NXT_UNIT_ERROR;
1578 }
1579 
1580 
1581 int
1582 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
1583 {
1584     nxt_unit_request_info_impl_t  *req_impl;
1585 
1586     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1587 
1588     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
1589 }
1590 
1591 
1592 int
1593 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
1594     const char *name, uint8_t name_length,
1595     const char *value, uint32_t value_length)
1596 {
1597     nxt_unit_buf_t                *buf;
1598     nxt_unit_field_t              *f;
1599     nxt_unit_response_t           *resp;
1600     nxt_unit_request_info_impl_t  *req_impl;
1601 
1602     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1603 
1604     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
1605         nxt_unit_req_warn(req, "add_field: response not initialized or "
1606                           "already sent");
1607 
1608         return NXT_UNIT_ERROR;
1609     }
1610 
1611     resp = req->response;
1612 
1613     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
1614         nxt_unit_req_warn(req, "add_field: too many response fields");
1615 
1616         return NXT_UNIT_ERROR;
1617     }
1618 
1619     buf = req->response_buf;
1620 
1621     if (nxt_slow_path(name_length + value_length + 2
1622                       > (uint32_t) (buf->end - buf->free)))
1623     {
1624         nxt_unit_req_warn(req, "add_field: response buffer overflow");
1625 
1626         return NXT_UNIT_ERROR;
1627     }
1628 
1629     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
1630                        resp->fields_count,
1631                        (int) name_length, name,
1632                        (int) value_length, value);
1633 
1634     f = resp->fields + resp->fields_count;
1635 
1636     nxt_unit_sptr_set(&f->name, buf->free);
1637     buf->free = nxt_cpymem(buf->free, name, name_length);
1638     *buf->free++ = '\0';
1639 
1640     nxt_unit_sptr_set(&f->value, buf->free);
1641     buf->free = nxt_cpymem(buf->free, value, value_length);
1642     *buf->free++ = '\0';
1643 
1644     f->hash = nxt_unit_field_hash(name, name_length);
1645     f->skip = 0;
1646     f->name_length = name_length;
1647     f->value_length = value_length;
1648 
1649     resp->fields_count++;
1650 
1651     return NXT_UNIT_OK;
1652 }
1653 
1654 
1655 int
1656 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
1657     const void* src, uint32_t size)
1658 {
1659     nxt_unit_buf_t                *buf;
1660     nxt_unit_response_t           *resp;
1661     nxt_unit_request_info_impl_t  *req_impl;
1662 
1663     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1664 
1665     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1666         nxt_unit_req_warn(req, "add_content: response not initialized yet");
1667 
1668         return NXT_UNIT_ERROR;
1669     }
1670 
1671     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1672         nxt_unit_req_warn(req, "add_content: response already sent");
1673 
1674         return NXT_UNIT_ERROR;
1675     }
1676 
1677     buf = req->response_buf;
1678 
1679     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
1680         nxt_unit_req_warn(req, "add_content: buffer overflow");
1681 
1682         return NXT_UNIT_ERROR;
1683     }
1684 
1685     resp = req->response;
1686 
1687     if (resp->piggyback_content_length == 0) {
1688         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
1689         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
1690     }
1691 
1692     resp->piggyback_content_length += size;
1693 
1694     buf->free = nxt_cpymem(buf->free, src, size);
1695 
1696     return NXT_UNIT_OK;
1697 }
1698 
1699 
1700 int
1701 nxt_unit_response_send(nxt_unit_request_info_t *req)
1702 {
1703     int                           rc;
1704     nxt_unit_mmap_buf_t           *mmap_buf;
1705     nxt_unit_request_info_impl_t  *req_impl;
1706 
1707     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1708 
1709     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1710         nxt_unit_req_warn(req, "send: response is not initialized yet");
1711 
1712         return NXT_UNIT_ERROR;
1713     }
1714 
1715     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1716         nxt_unit_req_warn(req, "send: response already sent");
1717 
1718         return NXT_UNIT_ERROR;
1719     }
1720 
1721     if (req->request->websocket_handshake && req->response->status == 101) {
1722         nxt_unit_response_upgrade(req);
1723     }
1724 
1725     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
1726                        req->response->fields_count,
1727                        (int) (req->response_buf->free
1728                               - req->response_buf->start));
1729 
1730     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
1731 
1732     rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
1733     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
1734         req->response = NULL;
1735         req->response_buf = NULL;
1736         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
1737 
1738         nxt_unit_mmap_buf_free(mmap_buf);
1739     }
1740 
1741     return rc;
1742 }
1743 
1744 
1745 int
1746 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
1747 {
1748     nxt_unit_request_info_impl_t  *req_impl;
1749 
1750     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1751 
1752     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
1753 }
1754 
1755 
1756 nxt_unit_buf_t *
1757 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
1758 {
1759     int                           rc;
1760     nxt_unit_mmap_buf_t           *mmap_buf;
1761     nxt_unit_request_info_impl_t  *req_impl;
1762 
1763     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
1764         nxt_unit_req_warn(req, "response_buf_alloc: "
1765                           "requested buffer (%"PRIu32") too big", size);
1766 
1767         return NULL;
1768     }
1769 
1770     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
1771 
1772     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1773 
1774     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
1775     if (nxt_slow_path(mmap_buf == NULL)) {
1776         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
1777 
1778         return NULL;
1779     }
1780 
1781     mmap_buf->req = req;
1782 
1783     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
1784 
1785     rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
1786                                    &req->response_port, size, size, mmap_buf,
1787                                    NULL);
1788     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1789         nxt_unit_mmap_buf_release(mmap_buf);
1790 
1791         return NULL;
1792     }
1793 
1794     return &mmap_buf->buf;
1795 }
1796 
1797 
1798 static nxt_unit_process_t *
1799 nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1800 {
1801     nxt_unit_impl_t  *lib;
1802 
1803     if (recv_msg->process != NULL) {
1804         return recv_msg->process;
1805     }
1806 
1807     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1808 
1809     pthread_mutex_lock(&lib->mutex);
1810 
1811     recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0);
1812 
1813     pthread_mutex_unlock(&lib->mutex);
1814 
1815     if (recv_msg->process == NULL) {
1816         nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
1817                       recv_msg->stream, (int) recv_msg->pid);
1818     }
1819 
1820     return recv_msg->process;
1821 }
1822 
1823 
1824 static nxt_unit_mmap_buf_t *
1825 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
1826 {
1827     nxt_unit_mmap_buf_t  *mmap_buf;
1828     nxt_unit_ctx_impl_t  *ctx_impl;
1829 
1830     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1831 
1832     pthread_mutex_lock(&ctx_impl->mutex);
1833 
1834     if (ctx_impl->free_buf == NULL) {
1835         pthread_mutex_unlock(&ctx_impl->mutex);
1836 
1837         mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
1838         if (nxt_slow_path(mmap_buf == NULL)) {
1839             return NULL;
1840         }
1841 
1842     } else {
1843         mmap_buf = ctx_impl->free_buf;
1844 
1845         nxt_unit_mmap_buf_unlink(mmap_buf);
1846 
1847         pthread_mutex_unlock(&ctx_impl->mutex);
1848     }
1849 
1850     mmap_buf->ctx_impl = ctx_impl;
1851 
1852     mmap_buf->hdr = NULL;
1853     mmap_buf->free_ptr = NULL;
1854 
1855     return mmap_buf;
1856 }
1857 
1858 
1859 static void
1860 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
1861 {
1862     nxt_unit_mmap_buf_unlink(mmap_buf);
1863 
1864     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
1865 
1866     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
1867 
1868     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
1869 }
1870 
1871 
1872 typedef struct {
1873     size_t      len;
1874     const char  *str;
1875 } nxt_unit_str_t;
1876 
1877 
1878 #define nxt_unit_str(str)  { nxt_length(str), str }
1879 
1880 
1881 int
1882 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
1883 {
1884     return req->request->websocket_handshake;
1885 }
1886 
1887 
1888 int
1889 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
1890 {
1891     int                           rc;
1892     nxt_unit_ctx_impl_t           *ctx_impl;
1893     nxt_unit_request_info_impl_t  *req_impl;
1894 
1895     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1896 
1897     if (nxt_slow_path(req_impl->websocket != 0)) {
1898         nxt_unit_req_debug(req, "upgrade: already upgraded");
1899 
1900         return NXT_UNIT_OK;
1901     }
1902 
1903     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1904         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
1905 
1906         return NXT_UNIT_ERROR;
1907     }
1908 
1909     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1910         nxt_unit_req_warn(req, "upgrade: response already sent");
1911 
1912         return NXT_UNIT_ERROR;
1913     }
1914 
1915     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1916 
1917     rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
1918     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1919         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
1920 
1921         return NXT_UNIT_ERROR;
1922     }
1923 
1924     req_impl->websocket = 1;
1925 
1926     req->response->status = 101;
1927 
1928     return NXT_UNIT_OK;
1929 }
1930 
1931 
1932 int
1933 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
1934 {
1935     nxt_unit_request_info_impl_t  *req_impl;
1936 
1937     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1938 
1939     return req_impl->websocket;
1940 }
1941 
1942 
1943 nxt_unit_request_info_t *
1944 nxt_unit_get_request_info_from_data(void *data)
1945 {
1946     nxt_unit_request_info_impl_t  *req_impl;
1947 
1948     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
1949 
1950     return &req_impl->req;
1951 }
1952 
1953 
1954 int
1955 nxt_unit_buf_send(nxt_unit_buf_t *buf)
1956 {
1957     int                           rc;
1958     nxt_unit_mmap_buf_t           *mmap_buf;
1959     nxt_unit_request_info_t       *req;
1960     nxt_unit_request_info_impl_t  *req_impl;
1961 
1962     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1963 
1964     req = mmap_buf->req;
1965     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1966 
1967     nxt_unit_req_debug(req, "buf_send: %d bytes",
1968                        (int) (buf->free - buf->start));
1969 
1970     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1971         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
1972 
1973         return NXT_UNIT_ERROR;
1974     }
1975 
1976     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
1977         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
1978 
1979         return NXT_UNIT_ERROR;
1980     }
1981 
1982     if (nxt_fast_path(buf->free > buf->start)) {
1983         rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
1984         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1985             return rc;
1986         }
1987     }
1988 
1989     nxt_unit_mmap_buf_free(mmap_buf);
1990 
1991     return NXT_UNIT_OK;
1992 }
1993 
1994 
1995 static void
1996 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
1997 {
1998     int                           rc;
1999     nxt_unit_mmap_buf_t           *mmap_buf;
2000     nxt_unit_request_info_t       *req;
2001     nxt_unit_request_info_impl_t  *req_impl;
2002 
2003     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2004 
2005     req = mmap_buf->req;
2006     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2007 
2008     rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
2009     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2010         nxt_unit_mmap_buf_free(mmap_buf);
2011 
2012         nxt_unit_request_info_release(req);
2013 
2014     } else {
2015         nxt_unit_request_done(req, rc);
2016     }
2017 }
2018 
2019 
2020 static int
2021 nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
2022     nxt_unit_mmap_buf_t *mmap_buf, int last)
2023 {
2024     struct {
2025         nxt_port_msg_t       msg;
2026         nxt_port_mmap_msg_t  mmap_msg;
2027     } m;
2028 
2029     int                      rc;
2030     u_char                   *last_used, *first_free;
2031     ssize_t                  res;
2032     nxt_chunk_id_t           first_free_chunk;
2033     nxt_unit_buf_t           *buf;
2034     nxt_unit_impl_t          *lib;
2035     nxt_port_mmap_header_t   *hdr;
2036 
2037     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2038 
2039     buf = &mmap_buf->buf;
2040     hdr = mmap_buf->hdr;
2041 
2042     m.mmap_msg.size = buf->free - buf->start;
2043 
2044     m.msg.stream = stream;
2045     m.msg.pid = lib->pid;
2046     m.msg.reply_port = 0;
2047     m.msg.type = _NXT_PORT_MSG_DATA;
2048     m.msg.last = last != 0;
2049     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2050     m.msg.nf = 0;
2051     m.msg.mf = 0;
2052     m.msg.tracking = 0;
2053 
2054     rc = NXT_UNIT_ERROR;
2055 
2056     if (m.msg.mmap) {
2057         m.mmap_msg.mmap_id = hdr->id;
2058         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2059                                                      (u_char *) buf->start);
2060 
2061         nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2062                        stream,
2063                        (int) m.mmap_msg.mmap_id,
2064                        (int) m.mmap_msg.chunk_id,
2065                        (int) m.mmap_msg.size);
2066 
2067         res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m),
2068                                        NULL, 0);
2069         if (nxt_slow_path(res != sizeof(m))) {
2070             goto free_buf;
2071         }
2072 
2073         last_used = (u_char *) buf->free - 1;
2074         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2075 
2076         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2077             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2078 
2079             buf->start = (char *) first_free;
2080             buf->free = buf->start;
2081 
2082             if (buf->end < buf->start) {
2083                 buf->end = buf->start;
2084             }
2085 
2086         } else {
2087             buf->start = NULL;
2088             buf->free = NULL;
2089             buf->end = NULL;
2090 
2091             mmap_buf->hdr = NULL;
2092         }
2093 
2094         nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks,
2095                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2096 
2097         nxt_unit_debug(ctx, "process %d allocated_chunks %d",
2098                        mmap_buf->process->pid,
2099                        (int) mmap_buf->process->outgoing.allocated_chunks);
2100 
2101     } else {
2102         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2103                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2104         {
2105             nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer"
2106                           ": no space reserved for message header", stream);
2107 
2108             goto free_buf;
2109         }
2110 
2111         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2112 
2113         nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d",
2114                        stream,
2115                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2116 
2117         res = lib->callbacks.port_send(ctx, &mmap_buf->port_id,
2118                                        buf->start - sizeof(m.msg),
2119                                        m.mmap_msg.size + sizeof(m.msg),
2120                                        NULL, 0);
2121         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2122             goto free_buf;
2123         }
2124     }
2125 
2126     rc = NXT_UNIT_OK;
2127 
2128 free_buf:
2129 
2130     nxt_unit_free_outgoing_buf(mmap_buf);
2131 
2132     return rc;
2133 }
2134 
2135 
2136 void
2137 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2138 {
2139     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2140 }
2141 
2142 
2143 static void
2144 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2145 {
2146     nxt_unit_free_outgoing_buf(mmap_buf);
2147 
2148     nxt_unit_mmap_buf_release(mmap_buf);
2149 }
2150 
2151 
2152 static void
2153 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2154 {
2155     if (mmap_buf->hdr != NULL) {
2156         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2157                               mmap_buf->process,
2158                               mmap_buf->hdr, mmap_buf->buf.start,
2159                               mmap_buf->buf.end - mmap_buf->buf.start);
2160 
2161         mmap_buf->hdr = NULL;
2162 
2163         return;
2164     }
2165 
2166     if (mmap_buf->free_ptr != NULL) {
2167         free(mmap_buf->free_ptr);
2168 
2169         mmap_buf->free_ptr = NULL;
2170     }
2171 }
2172 
2173 
2174 static nxt_unit_read_buf_t *
2175 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2176 {
2177     nxt_unit_ctx_impl_t  *ctx_impl;
2178 
2179     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2180 
2181     pthread_mutex_lock(&ctx_impl->mutex);
2182 
2183     return nxt_unit_read_buf_get_impl(ctx_impl);
2184 }
2185 
2186 
2187 static nxt_unit_read_buf_t *
2188 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2189 {
2190     nxt_unit_read_buf_t  *rbuf;
2191 
2192     if (ctx_impl->free_read_buf != NULL) {
2193         rbuf = ctx_impl->free_read_buf;
2194         ctx_impl->free_read_buf = rbuf->next;
2195 
2196         pthread_mutex_unlock(&ctx_impl->mutex);
2197 
2198         return rbuf;
2199     }
2200 
2201     pthread_mutex_unlock(&ctx_impl->mutex);
2202 
2203     rbuf = malloc(sizeof(nxt_unit_read_buf_t));
2204 
2205     return rbuf;
2206 }
2207 
2208 
2209 static void
2210 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2211     nxt_unit_read_buf_t *rbuf)
2212 {
2213     nxt_unit_ctx_impl_t  *ctx_impl;
2214 
2215     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2216 
2217     pthread_mutex_lock(&ctx_impl->mutex);
2218 
2219     rbuf->next = ctx_impl->free_read_buf;
2220     ctx_impl->free_read_buf = rbuf;
2221 
2222     pthread_mutex_unlock(&ctx_impl->mutex);
2223 }
2224 
2225 
2226 nxt_unit_buf_t *
2227 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2228 {
2229     nxt_unit_mmap_buf_t  *mmap_buf;
2230 
2231     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2232 
2233     if (mmap_buf->next == NULL) {
2234         return NULL;
2235     }
2236 
2237     return &mmap_buf->next->buf;
2238 }
2239 
2240 
2241 uint32_t
2242 nxt_unit_buf_max(void)
2243 {
2244     return PORT_MMAP_DATA_SIZE;
2245 }
2246 
2247 
2248 uint32_t
2249 nxt_unit_buf_min(void)
2250 {
2251     return PORT_MMAP_CHUNK_SIZE;
2252 }
2253 
2254 
2255 int
2256 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2257     size_t size)
2258 {
2259     ssize_t  res;
2260 
2261     res = nxt_unit_response_write_nb(req, start, size, size);
2262 
2263     return res < 0 ? -res : NXT_UNIT_OK;
2264 }
2265 
2266 
2267 ssize_t
2268 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2269     size_t size, size_t min_size)
2270 {
2271     int                           rc;
2272     ssize_t                       sent;
2273     uint32_t                      part_size, min_part_size, buf_size;
2274     const char                    *part_start;
2275     nxt_unit_mmap_buf_t           mmap_buf;
2276     nxt_unit_request_info_impl_t  *req_impl;
2277     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2278 
2279     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2280 
2281     part_start = start;
2282     sent = 0;
2283 
2284     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2285         nxt_unit_req_warn(req, "write: response not initialized yet");
2286 
2287         return -NXT_UNIT_ERROR;
2288     }
2289 
2290     /* Check if response is not send yet. */
2291     if (nxt_slow_path(req->response_buf != NULL)) {
2292         part_size = req->response_buf->end - req->response_buf->free;
2293         part_size = nxt_min(size, part_size);
2294 
2295         rc = nxt_unit_response_add_content(req, part_start, part_size);
2296         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2297             return -rc;
2298         }
2299 
2300         rc = nxt_unit_response_send(req);
2301         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2302             return -rc;
2303         }
2304 
2305         size -= part_size;
2306         part_start += part_size;
2307         sent += part_size;
2308 
2309         min_size -= nxt_min(min_size, part_size);
2310     }
2311 
2312     while (size > 0) {
2313         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2314         min_part_size = nxt_min(min_size, part_size);
2315         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2316 
2317         rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
2318                                        &req->response_port, part_size,
2319                                        min_part_size, &mmap_buf, local_buf);
2320         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2321             return -rc;
2322         }
2323 
2324         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2325         if (nxt_slow_path(buf_size == 0)) {
2326             return sent;
2327         }
2328         part_size = nxt_min(buf_size, part_size);
2329 
2330         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2331                                        part_start, part_size);
2332 
2333         rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
2334         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2335             return -rc;
2336         }
2337 
2338         size -= part_size;
2339         part_start += part_size;
2340         sent += part_size;
2341 
2342         min_size -= nxt_min(min_size, part_size);
2343     }
2344 
2345     return sent;
2346 }
2347 
2348 
2349 int
2350 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2351     nxt_unit_read_info_t *read_info)
2352 {
2353     int                           rc;
2354     ssize_t                       n;
2355     uint32_t                      buf_size;
2356     nxt_unit_buf_t                *buf;
2357     nxt_unit_mmap_buf_t           mmap_buf;
2358     nxt_unit_request_info_impl_t  *req_impl;
2359     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2360 
2361     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2362 
2363     /* Check if response is not send yet. */
2364     if (nxt_slow_path(req->response_buf)) {
2365 
2366         /* Enable content in headers buf. */
2367         rc = nxt_unit_response_add_content(req, "", 0);
2368         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2369             nxt_unit_req_error(req, "Failed to add piggyback content");
2370 
2371             return rc;
2372         }
2373 
2374         buf = req->response_buf;
2375 
2376         while (buf->end - buf->free > 0) {
2377             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2378             if (nxt_slow_path(n < 0)) {
2379                 nxt_unit_req_error(req, "Read error");
2380 
2381                 return NXT_UNIT_ERROR;
2382             }
2383 
2384             /* Manually increase sizes. */
2385             buf->free += n;
2386             req->response->piggyback_content_length += n;
2387 
2388             if (read_info->eof) {
2389                 break;
2390             }
2391         }
2392 
2393         rc = nxt_unit_response_send(req);
2394         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2395             nxt_unit_req_error(req, "Failed to send headers with content");
2396 
2397             return rc;
2398         }
2399 
2400         if (read_info->eof) {
2401             return NXT_UNIT_OK;
2402         }
2403     }
2404 
2405     while (!read_info->eof) {
2406         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
2407                            read_info->buf_size);
2408 
2409         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
2410 
2411         rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
2412                                        &req->response_port,
2413                                        buf_size, buf_size,
2414                                        &mmap_buf, local_buf);
2415         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2416             return rc;
2417         }
2418 
2419         buf = &mmap_buf.buf;
2420 
2421         while (!read_info->eof && buf->end > buf->free) {
2422             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2423             if (nxt_slow_path(n < 0)) {
2424                 nxt_unit_req_error(req, "Read error");
2425 
2426                 nxt_unit_free_outgoing_buf(&mmap_buf);
2427 
2428                 return NXT_UNIT_ERROR;
2429             }
2430 
2431             buf->free += n;
2432         }
2433 
2434         rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
2435         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2436             nxt_unit_req_error(req, "Failed to send content");
2437 
2438             return rc;
2439         }
2440     }
2441 
2442     return NXT_UNIT_OK;
2443 }
2444 
2445 
2446 ssize_t
2447 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
2448 {
2449     ssize_t  buf_res, res;
2450 
2451     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
2452                                 dst, size);
2453 
2454     if (buf_res < (ssize_t) size && req->content_fd != -1) {
2455         res = read(req->content_fd, dst, size);
2456         if (res < 0) {
2457             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
2458                                strerror(errno), errno);
2459 
2460             return res;
2461         }
2462 
2463         if (res < (ssize_t) size) {
2464             close(req->content_fd);
2465 
2466             req->content_fd = -1;
2467         }
2468 
2469         req->content_length -= res;
2470         size -= res;
2471 
2472         dst = nxt_pointer_to(dst, res);
2473 
2474     } else {
2475         res = 0;
2476     }
2477 
2478     return buf_res + res;
2479 }
2480 
2481 
2482 ssize_t
2483 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
2484 {
2485     char                 *p;
2486     size_t               l_size, b_size;
2487     nxt_unit_buf_t       *b;
2488     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
2489 
2490     if (req->content_length == 0) {
2491         return 0;
2492     }
2493 
2494     l_size = 0;
2495 
2496     b = req->content_buf;
2497 
2498     while (b != NULL) {
2499         b_size = b->end - b->free;
2500         p = memchr(b->free, '\n', b_size);
2501 
2502         if (p != NULL) {
2503             p++;
2504             l_size += p - b->free;
2505             break;
2506         }
2507 
2508         l_size += b_size;
2509 
2510         if (max_size <= l_size) {
2511             break;
2512         }
2513 
2514         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
2515         if (mmap_buf->next == NULL
2516             && req->content_fd != -1
2517             && l_size < req->content_length)
2518         {
2519             preread_buf = nxt_unit_request_preread(req, 16384);
2520             if (nxt_slow_path(preread_buf == NULL)) {
2521                 return -1;
2522             }
2523 
2524             nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
2525         }
2526 
2527         b = nxt_unit_buf_next(b);
2528     }
2529 
2530     return nxt_min(max_size, l_size);
2531 }
2532 
2533 
2534 static nxt_unit_mmap_buf_t *
2535 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
2536 {
2537     ssize_t              res;
2538     nxt_unit_mmap_buf_t  *mmap_buf;
2539 
2540     if (req->content_fd == -1) {
2541         nxt_unit_req_alert(req, "preread: content_fd == -1");
2542         return NULL;
2543     }
2544 
2545     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2546     if (nxt_slow_path(mmap_buf == NULL)) {
2547         nxt_unit_req_alert(req, "preread: failed to allocate buf");
2548         return NULL;
2549     }
2550 
2551     mmap_buf->free_ptr = malloc(size);
2552     if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
2553         nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
2554         nxt_unit_mmap_buf_release(mmap_buf);
2555         return NULL;
2556     }
2557 
2558     mmap_buf->plain_ptr = mmap_buf->free_ptr;
2559 
2560     mmap_buf->hdr = NULL;
2561     mmap_buf->buf.start = mmap_buf->free_ptr;
2562     mmap_buf->buf.free = mmap_buf->buf.start;
2563     mmap_buf->buf.end = mmap_buf->buf.start + size;
2564     mmap_buf->process = NULL;
2565 
2566     res = read(req->content_fd, mmap_buf->free_ptr, size);
2567     if (res < 0) {
2568         nxt_unit_req_alert(req, "failed to read content: %s (%d)",
2569                            strerror(errno), errno);
2570 
2571         nxt_unit_mmap_buf_free(mmap_buf);
2572 
2573         return NULL;
2574     }
2575 
2576     if (res < (ssize_t) size) {
2577         close(req->content_fd);
2578 
2579         req->content_fd = -1;
2580     }
2581 
2582     nxt_unit_req_debug(req, "preread: read %d", (int) res);
2583 
2584     mmap_buf->buf.end = mmap_buf->buf.free + res;
2585 
2586     return mmap_buf;
2587 }
2588 
2589 
2590 static ssize_t
2591 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
2592 {
2593     u_char          *p;
2594     size_t          rest, copy, read;
2595     nxt_unit_buf_t  *buf, *last_buf;
2596 
2597     p = dst;
2598     rest = size;
2599 
2600     buf = *b;
2601     last_buf = buf;
2602 
2603     while (buf != NULL) {
2604         last_buf = buf;
2605 
2606         copy = buf->end - buf->free;
2607         copy = nxt_min(rest, copy);
2608 
2609         p = nxt_cpymem(p, buf->free, copy);
2610 
2611         buf->free += copy;
2612         rest -= copy;
2613 
2614         if (rest == 0) {
2615             if (buf->end == buf->free) {
2616                 buf = nxt_unit_buf_next(buf);
2617             }
2618 
2619             break;
2620         }
2621 
2622         buf = nxt_unit_buf_next(buf);
2623     }
2624 
2625     *b = last_buf;
2626 
2627     read = size - rest;
2628 
2629     *len -= read;
2630 
2631     return read;
2632 }
2633 
2634 
2635 void
2636 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
2637 {
2638     ssize_t                       res;
2639     uint32_t                      size;
2640     nxt_port_msg_t                msg;
2641     nxt_unit_impl_t               *lib;
2642     nxt_unit_request_info_impl_t  *req_impl;
2643 
2644     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2645 
2646     nxt_unit_req_debug(req, "done: %d", rc);
2647 
2648     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2649         goto skip_response_send;
2650     }
2651 
2652     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2653 
2654         size = nxt_length("Content-Type") + nxt_length("text/plain");
2655 
2656         rc = nxt_unit_response_init(req, 200, 1, size);
2657         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2658             goto skip_response_send;
2659         }
2660 
2661         rc = nxt_unit_response_add_field(req, "Content-Type",
2662                                    nxt_length("Content-Type"),
2663                                    "text/plain", nxt_length("text/plain"));
2664         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2665             goto skip_response_send;
2666         }
2667     }
2668 
2669     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2670 
2671         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2672 
2673         nxt_unit_buf_send_done(req->response_buf);
2674 
2675         return;
2676     }
2677 
2678 skip_response_send:
2679 
2680     lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
2681 
2682     msg.stream = req_impl->stream;
2683     msg.pid = lib->pid;
2684     msg.reply_port = 0;
2685     msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
2686                                    : _NXT_PORT_MSG_RPC_ERROR;
2687     msg.last = 1;
2688     msg.mmap = 0;
2689     msg.nf = 0;
2690     msg.mf = 0;
2691     msg.tracking = 0;
2692 
2693     res = lib->callbacks.port_send(req->ctx, &req->response_port,
2694                                    &msg, sizeof(msg), NULL, 0);
2695     if (nxt_slow_path(res != sizeof(msg))) {
2696         nxt_unit_req_alert(req, "last message send failed: %s (%d)",
2697                            strerror(errno), errno);
2698     }
2699 
2700     nxt_unit_request_info_release(req);
2701 }
2702 
2703 
2704 int
2705 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
2706     uint8_t last, const void *start, size_t size)
2707 {
2708     const struct iovec  iov = { (void *) start, size };
2709 
2710     return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
2711 }
2712 
2713 
2714 int
2715 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
2716     uint8_t last, const struct iovec *iov, int iovcnt)
2717 {
2718     int                           i, rc;
2719     size_t                        l, copy;
2720     uint32_t                      payload_len, buf_size, alloc_size;
2721     const uint8_t                 *b;
2722     nxt_unit_buf_t                *buf;
2723     nxt_unit_mmap_buf_t           mmap_buf;
2724     nxt_websocket_header_t        *wh;
2725     nxt_unit_request_info_impl_t  *req_impl;
2726     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2727 
2728     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2729 
2730     payload_len = 0;
2731 
2732     for (i = 0; i < iovcnt; i++) {
2733         payload_len += iov[i].iov_len;
2734     }
2735 
2736     buf_size = 10 + payload_len;
2737     alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
2738 
2739     rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
2740                                    &req->response_port,
2741                                    alloc_size, alloc_size,
2742                                    &mmap_buf, local_buf);
2743     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2744         return rc;
2745     }
2746 
2747     buf = &mmap_buf.buf;
2748 
2749     buf->start[0] = 0;
2750     buf->start[1] = 0;
2751 
2752     buf_size -= buf->end - buf->start;
2753 
2754     wh = (void *) buf->free;
2755 
2756     buf->free = nxt_websocket_frame_init(wh, payload_len);
2757     wh->fin = last;
2758     wh->opcode = opcode;
2759 
2760     for (i = 0; i < iovcnt; i++) {
2761         b = iov[i].iov_base;
2762         l = iov[i].iov_len;
2763 
2764         while (l > 0) {
2765             copy = buf->end - buf->free;
2766             copy = nxt_min(l, copy);
2767 
2768             buf->free = nxt_cpymem(buf->free, b, copy);
2769             b += copy;
2770             l -= copy;
2771 
2772             if (l > 0) {
2773                 if (nxt_fast_path(buf->free > buf->start)) {
2774                     rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
2775                                                 &mmap_buf, 0);
2776 
2777                     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2778                         return rc;
2779                     }
2780                 }
2781 
2782                 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
2783 
2784                 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
2785                                                &req->response_port,
2786                                                alloc_size, alloc_size,
2787                                                &mmap_buf, local_buf);
2788                 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2789                     return rc;
2790                 }
2791 
2792                 buf_size -= buf->end - buf->start;
2793             }
2794         }
2795     }
2796 
2797     if (buf->free > buf->start) {
2798         rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
2799                                     &mmap_buf, 0);
2800     }
2801 
2802     return rc;
2803 }
2804 
2805 
2806 ssize_t
2807 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
2808     size_t size)
2809 {
2810     ssize_t   res;
2811     uint8_t   *b;
2812     uint64_t  i, d;
2813 
2814     res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
2815                             dst, size);
2816 
2817     if (ws->mask == NULL) {
2818         return res;
2819     }
2820 
2821     b = dst;
2822     d = (ws->payload_len - ws->content_length - res) % 4;
2823 
2824     for (i = 0; i < (uint64_t) res; i++) {
2825         b[i] ^= ws->mask[ (i + d) % 4 ];
2826     }
2827 
2828     return res;
2829 }
2830 
2831 
2832 int
2833 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
2834 {
2835     char                             *b;
2836     size_t                           size;
2837     nxt_unit_websocket_frame_impl_t  *ws_impl;
2838 
2839     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
2840 
2841     if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
2842         return NXT_UNIT_OK;
2843     }
2844 
2845     size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
2846 
2847     b = malloc(size);
2848     if (nxt_slow_path(b == NULL)) {
2849         return NXT_UNIT_ERROR;
2850     }
2851 
2852     memcpy(b, ws_impl->buf->buf.start, size);
2853 
2854     ws_impl->buf->buf.start = b;
2855     ws_impl->buf->buf.free = b;
2856     ws_impl->buf->buf.end = b + size;
2857 
2858     ws_impl->buf->free_ptr = b;
2859 
2860     return NXT_UNIT_OK;
2861 }
2862 
2863 
2864 void
2865 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
2866 {
2867     nxt_unit_websocket_frame_release(ws);
2868 }
2869 
2870 
2871 static nxt_port_mmap_header_t *
2872 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2873     nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n)
2874 {
2875     int                     res, nchunks, i;
2876     uint32_t                outgoing_size;
2877     nxt_unit_mmap_t         *mm, *mm_end;
2878     nxt_unit_impl_t         *lib;
2879     nxt_port_mmap_header_t  *hdr;
2880 
2881     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2882 
2883     pthread_mutex_lock(&process->outgoing.mutex);
2884 
2885 retry:
2886 
2887     outgoing_size = process->outgoing.size;
2888 
2889     mm_end = process->outgoing.elts + outgoing_size;
2890 
2891     for (mm = process->outgoing.elts; mm < mm_end; mm++) {
2892         hdr = mm->hdr;
2893 
2894         if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) {
2895             continue;
2896         }
2897 
2898         *c = 0;
2899 
2900         while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
2901             nchunks = 1;
2902 
2903             while (nchunks < *n) {
2904                 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
2905                                                        *c + nchunks);
2906 
2907                 if (res == 0) {
2908                     if (nchunks >= min_n) {
2909                         *n = nchunks;
2910 
2911                         goto unlock;
2912                     }
2913 
2914                     for (i = 0; i < nchunks; i++) {
2915                         nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
2916                     }
2917 
2918                     *c += nchunks + 1;
2919                     nchunks = 0;
2920                     break;
2921                 }
2922 
2923                 nchunks++;
2924             }
2925 
2926             if (nchunks >= min_n) {
2927                 *n = nchunks;
2928 
2929                 goto unlock;
2930             }
2931         }
2932 
2933         hdr->oosm = 1;
2934     }
2935 
2936     if (outgoing_size >= lib->shm_mmap_limit) {
2937         /* Cannot allocate more shared memory. */
2938         pthread_mutex_unlock(&process->outgoing.mutex);
2939 
2940         if (min_n == 0) {
2941             *n = 0;
2942         }
2943 
2944         if (nxt_slow_path(process->outgoing.allocated_chunks + min_n
2945                           >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
2946         {
2947             /* Memory allocated by application, but not send to router. */
2948             return NULL;
2949         }
2950 
2951         /* Notify router about OOSM condition. */
2952 
2953         res = nxt_unit_send_oosm(ctx, port_id);
2954         if (nxt_slow_path(res != NXT_UNIT_OK)) {
2955             return NULL;
2956         }
2957 
2958         /* Return if caller can handle OOSM condition. Non-blocking mode. */
2959 
2960         if (min_n == 0) {
2961             return NULL;
2962         }
2963 
2964         nxt_unit_debug(ctx, "oosm: waiting for ACK");
2965 
2966         res = nxt_unit_wait_shm_ack(ctx);
2967         if (nxt_slow_path(res != NXT_UNIT_OK)) {
2968             return NULL;
2969         }
2970 
2971         nxt_unit_debug(ctx, "oosm: retry");
2972 
2973         pthread_mutex_lock(&process->outgoing.mutex);
2974 
2975         goto retry;
2976     }
2977 
2978     *c = 0;
2979     hdr = nxt_unit_new_mmap(ctx, process, port_id, *n);
2980 
2981 unlock:
2982 
2983     nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n);
2984 
2985     nxt_unit_debug(ctx, "process %d allocated_chunks %d",
2986                    process->pid,
2987                    (int) process->outgoing.allocated_chunks);
2988 
2989     pthread_mutex_unlock(&process->outgoing.mutex);
2990 
2991     return hdr;
2992 }
2993 
2994 
2995 static int
2996 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
2997 {
2998     ssize_t          res;
2999     nxt_port_msg_t   msg;
3000     nxt_unit_impl_t  *lib;
3001 
3002     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3003 
3004     msg.stream = 0;
3005     msg.pid = lib->pid;
3006     msg.reply_port = 0;
3007     msg.type = _NXT_PORT_MSG_OOSM;
3008     msg.last = 0;
3009     msg.mmap = 0;
3010     msg.nf = 0;
3011     msg.mf = 0;
3012     msg.tracking = 0;
3013 
3014     res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
3015     if (nxt_slow_path(res != sizeof(msg))) {
3016         nxt_unit_warn(ctx, "failed to send oosm to %d: %s (%d)",
3017                       (int) port_id->pid, strerror(errno), errno);
3018 
3019         return NXT_UNIT_ERROR;
3020     }
3021 
3022     return NXT_UNIT_OK;
3023 }
3024 
3025 
3026 static int
3027 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
3028 {
3029     nxt_port_msg_t       *port_msg;
3030     nxt_unit_ctx_impl_t  *ctx_impl;
3031     nxt_unit_read_buf_t  *rbuf;
3032 
3033     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3034 
3035     while (1) {
3036         rbuf = nxt_unit_read_buf_get(ctx);
3037         if (nxt_slow_path(rbuf == NULL)) {
3038             return NXT_UNIT_ERROR;
3039         }
3040 
3041         nxt_unit_read_buf(ctx, rbuf);
3042         if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
3043             nxt_unit_read_buf_release(ctx, rbuf);
3044 
3045             return NXT_UNIT_ERROR;
3046         }
3047 
3048         port_msg = (nxt_port_msg_t *) rbuf->buf;
3049 
3050         if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
3051             nxt_unit_read_buf_release(ctx, rbuf);
3052 
3053             break;
3054         }
3055 
3056         pthread_mutex_lock(&ctx_impl->mutex);
3057 
3058         *ctx_impl->pending_read_tail = rbuf;
3059         ctx_impl->pending_read_tail = &rbuf->next;
3060         rbuf->next = NULL;
3061 
3062         pthread_mutex_unlock(&ctx_impl->mutex);
3063 
3064         if (port_msg->type == _NXT_PORT_MSG_QUIT) {
3065             nxt_unit_debug(ctx, "oosm: quit received");
3066 
3067             return NXT_UNIT_ERROR;
3068         }
3069     }
3070 
3071     return NXT_UNIT_OK;
3072 }
3073 
3074 
3075 static nxt_unit_mmap_t *
3076 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
3077 {
3078     uint32_t  cap;
3079 
3080     cap = mmaps->cap;
3081 
3082     if (cap == 0) {
3083         cap = i + 1;
3084     }
3085 
3086     while (i + 1 > cap) {
3087 
3088         if (cap < 16) {
3089             cap = cap * 2;
3090 
3091         } else {
3092             cap = cap + cap / 2;
3093         }
3094     }
3095 
3096     if (cap != mmaps->cap) {
3097 
3098         mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
3099         if (nxt_slow_path(mmaps->elts == NULL)) {
3100             return NULL;
3101         }
3102 
3103         memset(mmaps->elts + mmaps->cap, 0,
3104                sizeof(*mmaps->elts) * (cap - mmaps->cap));
3105 
3106         mmaps->cap = cap;
3107     }
3108 
3109     if (i + 1 > mmaps->size) {
3110         mmaps->size = i + 1;
3111     }
3112 
3113     return mmaps->elts + i;
3114 }
3115 
3116 
3117 static nxt_port_mmap_header_t *
3118 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
3119     nxt_unit_port_id_t *port_id, int n)
3120 {
3121     int                     i, fd, rc;
3122     void                    *mem;
3123     char                    name[64];
3124     nxt_unit_mmap_t         *mm;
3125     nxt_unit_impl_t         *lib;
3126     nxt_port_mmap_header_t  *hdr;
3127 
3128     lib = process->lib;
3129 
3130     mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
3131     if (nxt_slow_path(mm == NULL)) {
3132         nxt_unit_warn(ctx, "failed to add mmap to outgoing array");
3133 
3134         return NULL;
3135     }
3136 
3137     snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
3138              lib->pid, (void *) pthread_self());
3139 
3140 #if (NXT_HAVE_MEMFD_CREATE)
3141 
3142     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
3143     if (nxt_slow_path(fd == -1)) {
3144         nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
3145                        strerror(errno), errno);
3146 
3147         goto remove_fail;
3148     }
3149 
3150     nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
3151 
3152 #elif (NXT_HAVE_SHM_OPEN_ANON)
3153 
3154     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
3155     if (nxt_slow_path(fd == -1)) {
3156         nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
3157                        strerror(errno), errno);
3158 
3159         goto remove_fail;
3160     }
3161 
3162 #elif (NXT_HAVE_SHM_OPEN)
3163 
3164     /* Just in case. */
3165     shm_unlink(name);
3166 
3167     fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
3168     if (nxt_slow_path(fd == -1)) {
3169         nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
3170                        strerror(errno), errno);
3171 
3172         goto remove_fail;
3173     }
3174 
3175     if (nxt_slow_path(shm_unlink(name) == -1)) {
3176         nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
3177                       strerror(errno), errno);
3178     }
3179 
3180 #else
3181 
3182 #error No working shared memory implementation.
3183 
3184 #endif
3185 
3186     if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
3187         nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3188                        strerror(errno), errno);
3189 
3190         goto remove_fail;
3191     }
3192 
3193     mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3194     if (nxt_slow_path(mem == MAP_FAILED)) {
3195         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3196                        strerror(errno), errno);
3197 
3198         goto remove_fail;
3199     }
3200 
3201     mm->hdr = mem;
3202     hdr = mem;
3203 
3204     memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
3205     memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
3206 
3207     hdr->id = process->outgoing.size - 1;
3208     hdr->src_pid = lib->pid;
3209     hdr->dst_pid = process->pid;
3210     hdr->sent_over = port_id->id;
3211 
3212     /* Mark first n chunk(s) as busy */
3213     for (i = 0; i < n; i++) {
3214         nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
3215     }
3216 
3217     /* Mark as busy chunk followed the last available chunk. */
3218     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
3219     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
3220 
3221     pthread_mutex_unlock(&process->outgoing.mutex);
3222 
3223     rc = nxt_unit_send_mmap(ctx, port_id, fd);
3224     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3225         munmap(mem, PORT_MMAP_SIZE);
3226         hdr = NULL;
3227 
3228     } else {
3229         nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
3230                        hdr->id, (int) lib->pid, (int) process->pid);
3231     }
3232 
3233     close(fd);
3234 
3235     pthread_mutex_lock(&process->outgoing.mutex);
3236 
3237     if (nxt_fast_path(hdr != NULL)) {
3238         return hdr;
3239     }
3240 
3241 remove_fail:
3242 
3243     process->outgoing.size--;
3244 
3245     return NULL;
3246 }
3247 
3248 
3249 static int
3250 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
3251 {
3252     ssize_t          res;
3253     nxt_port_msg_t   msg;
3254     nxt_unit_impl_t  *lib;
3255     union {
3256         struct cmsghdr  cm;
3257         char            space[CMSG_SPACE(sizeof(int))];
3258     } cmsg;
3259 
3260     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3261 
3262     msg.stream = 0;
3263     msg.pid = lib->pid;
3264     msg.reply_port = 0;
3265     msg.type = _NXT_PORT_MSG_MMAP;
3266     msg.last = 0;
3267     msg.mmap = 0;
3268     msg.nf = 0;
3269     msg.mf = 0;
3270     msg.tracking = 0;
3271 
3272     /*
3273      * Fill all padding fields with 0.
3274      * Code in Go 1.11 validate cmsghdr using padding field as part of len.
3275      * See Cmsghdr definition and socketControlMessageHeaderAndData function.
3276      */
3277     memset(&cmsg, 0, sizeof(cmsg));
3278 
3279     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
3280     cmsg.cm.cmsg_level = SOL_SOCKET;
3281     cmsg.cm.cmsg_type = SCM_RIGHTS;
3282 
3283     /*
3284      * memcpy() is used instead of simple
3285      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
3286      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
3287      *   dereferencing type-punned pointer will break strict-aliasing rules
3288      *
3289      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
3290      * in the same simple assignment as in the code above.
3291      */
3292     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
3293 
3294     res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg),
3295                                    &cmsg, sizeof(cmsg));
3296     if (nxt_slow_path(res != sizeof(msg))) {
3297         nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)",
3298                       (int) port_id->pid, strerror(errno), errno);
3299 
3300         return NXT_UNIT_ERROR;
3301     }
3302 
3303     return NXT_UNIT_OK;
3304 }
3305 
3306 
3307 static int
3308 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
3309     nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size,
3310     nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
3311 {
3312     int                     nchunks, min_nchunks;
3313     nxt_chunk_id_t          c;
3314     nxt_port_mmap_header_t  *hdr;
3315 
3316     if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
3317         if (local_buf != NULL) {
3318             mmap_buf->free_ptr = NULL;
3319             mmap_buf->plain_ptr = local_buf;
3320 
3321         } else {
3322             mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t));
3323             if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3324                 return NXT_UNIT_ERROR;
3325             }
3326 
3327             mmap_buf->plain_ptr = mmap_buf->free_ptr;
3328         }
3329 
3330         mmap_buf->hdr = NULL;
3331         mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
3332         mmap_buf->buf.free = mmap_buf->buf.start;
3333         mmap_buf->buf.end = mmap_buf->buf.start + size;
3334         mmap_buf->port_id = *port_id;
3335         mmap_buf->process = process;
3336 
3337         nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
3338                        mmap_buf->buf.start, (int) size);
3339 
3340         return NXT_UNIT_OK;
3341     }
3342 
3343     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3344     min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3345 
3346     hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks);
3347     if (nxt_slow_path(hdr == NULL)) {
3348         if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
3349             mmap_buf->hdr = NULL;
3350             mmap_buf->buf.start = NULL;
3351             mmap_buf->buf.free = NULL;
3352             mmap_buf->buf.end = NULL;
3353             mmap_buf->free_ptr = NULL;
3354 
3355             return NXT_UNIT_OK;
3356         }
3357 
3358         return NXT_UNIT_ERROR;
3359     }
3360 
3361     mmap_buf->hdr = hdr;
3362     mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
3363     mmap_buf->buf.free = mmap_buf->buf.start;
3364     mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
3365     mmap_buf->port_id = *port_id;
3366     mmap_buf->process = process;
3367     mmap_buf->free_ptr = NULL;
3368     mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3369 
3370     nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
3371                   (int) hdr->id, (int) c,
3372                   (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
3373 
3374     return NXT_UNIT_OK;
3375 }
3376 
3377 
3378 static int
3379 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
3380 {
3381     int                      rc;
3382     void                     *mem;
3383     struct stat              mmap_stat;
3384     nxt_unit_mmap_t          *mm;
3385     nxt_unit_impl_t          *lib;
3386     nxt_unit_process_t       *process;
3387     nxt_port_mmap_header_t   *hdr;
3388 
3389     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3390 
3391     nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
3392 
3393     pthread_mutex_lock(&lib->mutex);
3394 
3395     process = nxt_unit_process_find(ctx, pid, 0);
3396 
3397     pthread_mutex_unlock(&lib->mutex);
3398 
3399     if (nxt_slow_path(process == NULL)) {
3400         nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d",
3401                       (int) pid, fd);
3402 
3403         return NXT_UNIT_ERROR;
3404     }
3405 
3406     rc = NXT_UNIT_ERROR;
3407 
3408     if (fstat(fd, &mmap_stat) == -1) {
3409         nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
3410                       strerror(errno), errno);
3411 
3412         goto fail;
3413     }
3414 
3415     mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
3416                MAP_SHARED, fd, 0);
3417     if (nxt_slow_path(mem == MAP_FAILED)) {
3418         nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
3419                       strerror(errno), errno);
3420 
3421         goto fail;
3422     }
3423 
3424     hdr = mem;
3425 
3426     if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {
3427 
3428         nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
3429                       "detected: %d != %d or %d != %d", (int) hdr->src_pid,
3430                       (int) pid, (int) hdr->dst_pid, (int) lib->pid);
3431 
3432         munmap(mem, PORT_MMAP_SIZE);
3433 
3434         goto fail;
3435     }
3436 
3437     pthread_mutex_lock(&process->incoming.mutex);
3438 
3439     mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
3440     if (nxt_slow_path(mm == NULL)) {
3441         nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");
3442 
3443         munmap(mem, PORT_MMAP_SIZE);
3444 
3445     } else {
3446         mm->hdr = hdr;
3447 
3448         hdr->sent_over = 0xFFFFu;
3449 
3450         rc = NXT_UNIT_OK;
3451     }
3452 
3453     pthread_mutex_unlock(&process->incoming.mutex);
3454 
3455 fail:
3456 
3457     nxt_unit_process_use(ctx, process, -1);
3458 
3459     return rc;
3460 }
3461 
3462 
3463 static void
3464 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
3465 {
3466     pthread_mutex_init(&mmaps->mutex, NULL);
3467 
3468     mmaps->size = 0;
3469     mmaps->cap = 0;
3470     mmaps->elts = NULL;
3471     mmaps->allocated_chunks = 0;
3472 }
3473 
3474 
3475 static void
3476 nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i)
3477 {
3478     long c;
3479 
3480     c = nxt_atomic_fetch_add(&process->use_count, i);
3481 
3482     if (i < 0 && c == -i) {
3483         nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid);
3484 
3485         nxt_unit_mmaps_destroy(&process->incoming);
3486         nxt_unit_mmaps_destroy(&process->outgoing);
3487 
3488         free(process);
3489     }
3490 }
3491 
3492 
3493 static void
3494 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
3495 {
3496     nxt_unit_mmap_t  *mm, *end;
3497 
3498     if (mmaps->elts != NULL) {
3499         end = mmaps->elts + mmaps->size;
3500 
3501         for (mm = mmaps->elts; mm < end; mm++) {
3502             munmap(mm->hdr, PORT_MMAP_SIZE);
3503         }
3504 
3505         free(mmaps->elts);
3506     }
3507 
3508     pthread_mutex_destroy(&mmaps->mutex);
3509 }
3510 
3511 
3512 static nxt_port_mmap_header_t *
3513 nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
3514     uint32_t id)
3515 {
3516     nxt_port_mmap_header_t  *hdr;
3517 
3518     if (nxt_fast_path(process->incoming.size > id)) {
3519         hdr = process->incoming.elts[id].hdr;
3520 
3521     } else {
3522         hdr = NULL;
3523     }
3524 
3525     return hdr;
3526 }
3527 
3528 
3529 static int
3530 nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
3531 {
3532     int                           rc;
3533     nxt_chunk_id_t                c;
3534     nxt_unit_process_t            *process;
3535     nxt_port_mmap_header_t        *hdr;
3536     nxt_port_mmap_tracking_msg_t  *tracking_msg;
3537 
3538     if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
3539         nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
3540                       recv_msg->stream, (int) recv_msg->size);
3541 
3542         return 0;
3543     }
3544 
3545     tracking_msg = recv_msg->start;
3546 
3547     recv_msg->start = tracking_msg + 1;
3548     recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
3549 
3550     process = nxt_unit_msg_get_process(ctx, recv_msg);
3551     if (nxt_slow_path(process == NULL)) {
3552         return 0;
3553     }
3554 
3555     pthread_mutex_lock(&process->incoming.mutex);
3556 
3557     hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
3558     if (nxt_slow_path(hdr == NULL)) {
3559         pthread_mutex_unlock(&process->incoming.mutex);
3560 
3561         nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
3562                       "invalid mmap id %d,%"PRIu32,
3563                       recv_msg->stream, (int) process->pid,
3564                       tracking_msg->mmap_id);
3565 
3566         return 0;
3567     }
3568 
3569     c = tracking_msg->tracking_id;
3570     rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
3571 
3572     if (rc == 0) {
3573         nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
3574                        recv_msg->stream);
3575 
3576         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
3577     }
3578 
3579     pthread_mutex_unlock(&process->incoming.mutex);
3580 
3581     return rc;
3582 }
3583 
3584 
3585 static int
3586 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
3587 {
3588     void                    *start;
3589     uint32_t                size;
3590     nxt_unit_process_t      *process;
3591     nxt_unit_mmap_buf_t     *b, **incoming_tail;
3592     nxt_port_mmap_msg_t     *mmap_msg, *end;
3593     nxt_port_mmap_header_t  *hdr;
3594 
3595     if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
3596         nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
3597                       recv_msg->stream, (int) recv_msg->size);
3598 
3599         return NXT_UNIT_ERROR;
3600     }
3601 
3602     process = nxt_unit_msg_get_process(ctx, recv_msg);
3603     if (nxt_slow_path(process == NULL)) {
3604         return NXT_UNIT_ERROR;
3605     }
3606 
3607     mmap_msg = recv_msg->start;
3608     end = nxt_pointer_to(recv_msg->start, recv_msg->size);
3609 
3610     incoming_tail = &recv_msg->incoming_buf;
3611 
3612     for (; mmap_msg < end; mmap_msg++) {
3613         b = nxt_unit_mmap_buf_get(ctx);
3614         if (nxt_slow_path(b == NULL)) {
3615             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
3616                           recv_msg->stream);
3617 
3618             return NXT_UNIT_ERROR;
3619         }
3620 
3621         nxt_unit_mmap_buf_insert(incoming_tail, b);
3622         incoming_tail = &b->next;
3623     }
3624 
3625     b = recv_msg->incoming_buf;
3626     mmap_msg = recv_msg->start;
3627 
3628     pthread_mutex_lock(&process->incoming.mutex);
3629 
3630     for (; mmap_msg < end; mmap_msg++) {
3631         hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
3632         if (nxt_slow_path(hdr == NULL)) {
3633             pthread_mutex_unlock(&process->incoming.mutex);
3634 
3635             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
3636                           "invalid mmap id %d,%"PRIu32,
3637                           recv_msg->stream, (int) process->pid,
3638                           mmap_msg->mmap_id);
3639 
3640             return NXT_UNIT_ERROR;
3641         }
3642 
3643         start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
3644         size = mmap_msg->size;
3645 
3646         if (recv_msg->start == mmap_msg) {
3647             recv_msg->start = start;
3648             recv_msg->size = size;
3649         }
3650 
3651         b->buf.start = start;
3652         b->buf.free = start;
3653         b->buf.end = b->buf.start + size;
3654         b->hdr = hdr;
3655         b->process = process;
3656 
3657         b = b->next;
3658 
3659         nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
3660                        recv_msg->stream,
3661                        start, (int) size,
3662                        (int) hdr->src_pid, (int) hdr->dst_pid,
3663                        (int) hdr->id, (int) mmap_msg->chunk_id,
3664                        (int) mmap_msg->size);
3665     }
3666 
3667     pthread_mutex_unlock(&process->incoming.mutex);
3668 
3669     return NXT_UNIT_OK;
3670 }
3671 
3672 
3673 static void
3674 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
3675     nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr,
3676     void *start, uint32_t size)
3677 {
3678     int              freed_chunks;
3679     u_char           *p, *end;
3680     nxt_chunk_id_t   c;
3681     nxt_unit_impl_t  *lib;
3682 
3683     memset(start, 0xA5, size);
3684 
3685     p = start;
3686     end = p + size;
3687     c = nxt_port_mmap_chunk_id(hdr, p);
3688     freed_chunks = 0;
3689 
3690     while (p < end) {
3691         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
3692 
3693         p += PORT_MMAP_CHUNK_SIZE;
3694         c++;
3695         freed_chunks++;
3696     }
3697 
3698     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3699 
3700     if (hdr->src_pid == lib->pid && freed_chunks != 0) {
3701         nxt_atomic_fetch_add(&process->outgoing.allocated_chunks,
3702                              -freed_chunks);
3703 
3704         nxt_unit_debug(ctx, "process %d allocated_chunks %d",
3705                        process->pid,
3706                        (int) process->outgoing.allocated_chunks);
3707     }
3708 
3709     if (hdr->dst_pid == lib->pid
3710         && freed_chunks != 0
3711         && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
3712     {
3713         nxt_unit_send_shm_ack(ctx, hdr->src_pid);
3714     }
3715 }
3716 
3717 
3718 static int
3719 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
3720 {
3721     ssize_t             res;
3722     nxt_port_msg_t      msg;
3723     nxt_unit_impl_t     *lib;
3724     nxt_unit_port_id_t  port_id;
3725 
3726     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3727 
3728     nxt_unit_port_id_init(&port_id, pid, 0);
3729 
3730     msg.stream = 0;
3731     msg.pid = lib->pid;
3732     msg.reply_port = 0;
3733     msg.type = _NXT_PORT_MSG_SHM_ACK;
3734     msg.last = 0;
3735     msg.mmap = 0;
3736     msg.nf = 0;
3737     msg.mf = 0;
3738     msg.tracking = 0;
3739 
3740     res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0);
3741     if (nxt_slow_path(res != sizeof(msg))) {
3742         nxt_unit_warn(ctx, "failed to send ack to %d: %s (%d)",
3743                       (int) port_id.pid, strerror(errno), errno);
3744 
3745         return NXT_UNIT_ERROR;
3746     }
3747 
3748     return NXT_UNIT_OK;
3749 }
3750 
3751 
3752 static nxt_int_t
3753 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
3754 {
3755     nxt_process_t  *process;
3756 
3757     process = data;
3758 
3759     if (lhq->key.length == sizeof(pid_t)
3760         && *(pid_t *) lhq->key.start == process->pid)
3761     {
3762         return NXT_OK;
3763     }
3764 
3765     return NXT_DECLINED;
3766 }
3767 
3768 
3769 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
3770     NXT_LVLHSH_DEFAULT,
3771     nxt_unit_lvlhsh_pid_test,
3772     nxt_lvlhsh_alloc,
3773     nxt_lvlhsh_free,
3774 };
3775 
3776 
3777 static inline void
3778 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
3779 {
3780     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
3781     lhq->key.length = sizeof(*pid);
3782     lhq->key.start = (u_char *) pid;
3783     lhq->proto = &lvlhsh_processes_proto;
3784 }
3785 
3786 
3787 static nxt_unit_process_t *
3788 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
3789 {
3790     nxt_unit_impl_t     *lib;
3791     nxt_unit_process_t  *process;
3792     nxt_lvlhsh_query_t  lhq;
3793 
3794     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3795 
3796     nxt_unit_process_lhq_pid(&lhq, &pid);
3797 
3798     if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
3799         process = lhq.value;
3800         nxt_unit_process_use(ctx, process, 1);
3801 
3802         return process;
3803     }
3804 
3805     process = malloc(sizeof(nxt_unit_process_t));
3806     if (nxt_slow_path(process == NULL)) {
3807         nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid);
3808 
3809         return NULL;
3810     }
3811 
3812     process->pid = pid;
3813     process->use_count = 1;
3814     process->next_port_id = 0;
3815     process->lib = lib;
3816 
3817     nxt_queue_init(&process->ports);
3818 
3819     nxt_unit_mmaps_init(&process->incoming);
3820     nxt_unit_mmaps_init(&process->outgoing);
3821 
3822     lhq.replace = 0;
3823     lhq.value = process;
3824 
3825     switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
3826 
3827     case NXT_OK:
3828         break;
3829 
3830     default:
3831         nxt_unit_warn(ctx, "process %d insert failed", (int) pid);
3832 
3833         pthread_mutex_destroy(&process->outgoing.mutex);
3834         pthread_mutex_destroy(&process->incoming.mutex);
3835         free(process);
3836         process = NULL;
3837         break;
3838     }
3839 
3840     nxt_unit_process_use(ctx, process, 1);
3841 
3842     return process;
3843 }
3844 
3845 
3846 static nxt_unit_process_t *
3847 nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove)
3848 {
3849     int                 rc;
3850     nxt_unit_impl_t     *lib;
3851     nxt_unit_process_t  *process;
3852     nxt_lvlhsh_query_t  lhq;
3853 
3854     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3855 
3856     nxt_unit_process_lhq_pid(&lhq, &pid);
3857 
3858     if (remove) {
3859         rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
3860 
3861     } else {
3862         rc = nxt_lvlhsh_find(&lib->processes, &lhq);
3863     }
3864 
3865     if (rc == NXT_OK) {
3866         process = lhq.value;
3867 
3868         if (!remove) {
3869             nxt_unit_process_use(ctx, process, 1);
3870         }
3871 
3872         return process;
3873     }
3874 
3875     return NULL;
3876 }
3877 
3878 
3879 static nxt_unit_process_t *
3880 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
3881 {
3882     return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
3883 }
3884 
3885 
3886 int
3887 nxt_unit_run(nxt_unit_ctx_t *ctx)
3888 {
3889     int              rc;
3890     nxt_unit_impl_t  *lib;
3891 
3892     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3893     rc = NXT_UNIT_OK;
3894 
3895     while (nxt_fast_path(lib->online)) {
3896         rc = nxt_unit_run_once(ctx);
3897     }
3898 
3899     return rc;
3900 }
3901 
3902 
3903 int
3904 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
3905 {
3906     int                  rc;
3907     nxt_unit_ctx_impl_t  *ctx_impl;
3908     nxt_unit_read_buf_t  *rbuf;
3909 
3910     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3911 
3912     pthread_mutex_lock(&ctx_impl->mutex);
3913 
3914     if (ctx_impl->pending_read_head != NULL) {
3915         rbuf = ctx_impl->pending_read_head;
3916         ctx_impl->pending_read_head = rbuf->next;
3917 
3918         if (ctx_impl->pending_read_tail == &rbuf->next) {
3919             ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
3920         }
3921 
3922         pthread_mutex_unlock(&ctx_impl->mutex);
3923 
3924     } else {
3925         rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
3926         if (nxt_slow_path(rbuf == NULL)) {
3927             return NXT_UNIT_ERROR;
3928         }
3929 
3930         nxt_unit_read_buf(ctx, rbuf);
3931     }
3932 
3933     if (nxt_fast_path(rbuf->size > 0)) {
3934         rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id,
3935                                   rbuf->buf, rbuf->size,
3936                                   rbuf->oob, sizeof(rbuf->oob));
3937 
3938 #if (NXT_DEBUG)
3939         memset(rbuf->buf, 0xAC, rbuf->size);
3940 #endif
3941 
3942     } else {
3943         rc = NXT_UNIT_ERROR;
3944     }
3945 
3946     nxt_unit_read_buf_release(ctx, rbuf);
3947 
3948     return rc;
3949 }
3950 
3951 
3952 static void
3953 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
3954 {
3955     nxt_unit_impl_t      *lib;
3956     nxt_unit_ctx_impl_t  *ctx_impl;
3957 
3958     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3959 
3960     memset(rbuf->oob, 0, sizeof(struct cmsghdr));
3961 
3962     if (ctx_impl->read_port_fd != -1) {
3963         rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
3964                                         rbuf->buf, sizeof(rbuf->buf),
3965                                         rbuf->oob, sizeof(rbuf->oob));
3966 
3967     } else {
3968         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3969 
3970         rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
3971                                               rbuf->buf, sizeof(rbuf->buf),
3972                                               rbuf->oob, sizeof(rbuf->oob));
3973     }
3974 }
3975 
3976 
3977 void
3978 nxt_unit_done(nxt_unit_ctx_t *ctx)
3979 {
3980     nxt_unit_impl_t      *lib;
3981     nxt_unit_process_t   *process;
3982     nxt_unit_ctx_impl_t  *ctx_impl;
3983 
3984     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3985 
3986     nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
3987 
3988         nxt_unit_ctx_free(&ctx_impl->ctx);
3989 
3990     } nxt_queue_loop;
3991 
3992     for ( ;; ) {
3993         pthread_mutex_lock(&lib->mutex);
3994 
3995         process = nxt_unit_process_pop_first(lib);
3996         if (process == NULL) {
3997             pthread_mutex_unlock(&lib->mutex);
3998 
3999             break;
4000         }
4001 
4002         nxt_unit_remove_process(ctx, process);
4003     }
4004 
4005     pthread_mutex_destroy(&lib->mutex);
4006 
4007     free(lib);
4008 }
4009 
4010 
4011 nxt_unit_ctx_t *
4012 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
4013 {
4014     int                  rc, fd;
4015     nxt_unit_impl_t      *lib;
4016     nxt_unit_port_id_t   new_port_id;
4017     nxt_unit_ctx_impl_t  *new_ctx;
4018 
4019     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4020 
4021     new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
4022     if (nxt_slow_path(new_ctx == NULL)) {
4023         nxt_unit_warn(ctx, "failed to allocate context");
4024 
4025         return NULL;
4026     }
4027 
4028     rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
4029     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4030         free(new_ctx);
4031 
4032         return NULL;
4033     }
4034 
4035     rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd);
4036     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4037         lib->callbacks.remove_port(ctx, &new_port_id);
4038 
4039         close(fd);
4040 
4041         free(new_ctx);
4042 
4043         return NULL;
4044     }
4045 
4046     close(fd);
4047 
4048     rc = nxt_unit_ctx_init(lib, new_ctx, data);
4049     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4050         lib->callbacks.remove_port(ctx, &new_port_id);
4051 
4052         free(new_ctx);
4053 
4054         return NULL;
4055     }
4056 
4057     new_ctx->read_port_id = new_port_id;
4058 
4059     return &new_ctx->ctx;
4060 }
4061 
4062 
4063 void
4064 nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
4065 {
4066     nxt_unit_impl_t                  *lib;
4067     nxt_unit_ctx_impl_t              *ctx_impl;
4068     nxt_unit_mmap_buf_t              *mmap_buf;
4069     nxt_unit_request_info_impl_t     *req_impl;
4070     nxt_unit_websocket_frame_impl_t  *ws_impl;
4071 
4072     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4073     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4074 
4075     nxt_queue_each(req_impl, &ctx_impl->active_req,
4076                    nxt_unit_request_info_impl_t, link)
4077     {
4078         nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
4079 
4080         nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
4081 
4082     } nxt_queue_loop;
4083 
4084     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]);
4085     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]);
4086 
4087     while (ctx_impl->free_buf != NULL) {
4088         mmap_buf = ctx_impl->free_buf;
4089         nxt_unit_mmap_buf_unlink(mmap_buf);
4090         free(mmap_buf);
4091     }
4092 
4093     nxt_queue_each(req_impl, &ctx_impl->free_req,
4094                    nxt_unit_request_info_impl_t, link)
4095     {
4096         nxt_unit_request_info_free(req_impl);
4097 
4098     } nxt_queue_loop;
4099 
4100     nxt_queue_each(ws_impl, &ctx_impl->free_ws,
4101                    nxt_unit_websocket_frame_impl_t, link)
4102     {
4103         nxt_unit_websocket_frame_free(ws_impl);
4104 
4105     } nxt_queue_loop;
4106 
4107     pthread_mutex_destroy(&ctx_impl->mutex);
4108 
4109     nxt_queue_remove(&ctx_impl->link);
4110 
4111     if (ctx_impl != &lib->main_ctx) {
4112         free(ctx_impl);
4113     }
4114 }
4115 
4116 
4117 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
4118 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
4119 #define NXT_UNIX_SOCKET  SOCK_SEQPACKET
4120 #else
4121 #define NXT_UNIX_SOCKET  SOCK_DGRAM
4122 #endif
4123 
4124 
4125 void
4126 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
4127 {
4128     nxt_unit_port_hash_id_t  port_hash_id;
4129 
4130     port_hash_id.pid = pid;
4131     port_hash_id.id = id;
4132 
4133     port_id->pid = pid;
4134     port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
4135     port_id->id = id;
4136 }
4137 
4138 
4139 int
4140 nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
4141     nxt_unit_port_id_t *port_id)
4142 {
4143     int                 rc, fd;
4144     nxt_unit_impl_t     *lib;
4145     nxt_unit_port_id_t  new_port_id;
4146 
4147     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4148 
4149     rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
4150     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4151         return rc;
4152     }
4153 
4154     rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd);
4155 
4156     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
4157         *port_id = new_port_id;
4158 
4159     } else {
4160         lib->callbacks.remove_port(ctx, &new_port_id);
4161     }
4162 
4163     close(fd);
4164 
4165     return rc;
4166 }
4167 
4168 
4169 static int
4170 nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
4171 {
4172     int                 rc, port_sockets[2];
4173     nxt_unit_impl_t     *lib;
4174     nxt_unit_port_t     new_port;
4175     nxt_unit_process_t  *process;
4176 
4177     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4178 
4179     rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
4180     if (nxt_slow_path(rc != 0)) {
4181         nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
4182                       strerror(errno), errno);
4183 
4184         return NXT_UNIT_ERROR;
4185     }
4186 
4187     nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
4188                    port_sockets[0], port_sockets[1]);
4189 
4190     pthread_mutex_lock(&lib->mutex);
4191 
4192     process = nxt_unit_process_get(ctx, lib->pid);
4193     if (nxt_slow_path(process == NULL)) {
4194         pthread_mutex_unlock(&lib->mutex);
4195 
4196         close(port_sockets[0]);
4197         close(port_sockets[1]);
4198 
4199         return NXT_UNIT_ERROR;
4200     }
4201 
4202     nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
4203 
4204     new_port.in_fd = port_sockets[0];
4205     new_port.out_fd = -1;
4206     new_port.data = NULL;
4207 
4208     pthread_mutex_unlock(&lib->mutex);
4209 
4210     nxt_unit_process_use(ctx, process, -1);
4211 
4212     rc = lib->callbacks.add_port(ctx, &new_port);
4213     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4214         nxt_unit_warn(ctx, "create_port: add_port() failed");
4215 
4216         close(port_sockets[0]);
4217         close(port_sockets[1]);
4218 
4219         return rc;
4220     }
4221 
4222     *port_id = new_port.id;
4223     *fd = port_sockets[1];
4224 
4225     return rc;
4226 }
4227 
4228 
4229 static int
4230 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
4231     nxt_unit_port_id_t *new_port, int fd)
4232 {
4233     ssize_t          res;
4234     nxt_unit_impl_t  *lib;
4235 
4236     struct {
4237         nxt_port_msg_t            msg;
4238         nxt_port_msg_new_port_t   new_port;
4239     } m;
4240 
4241     union {
4242         struct cmsghdr  cm;
4243         char            space[CMSG_SPACE(sizeof(int))];
4244     } cmsg;
4245 
4246     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4247 
4248     m.msg.stream = 0;
4249     m.msg.pid = lib->pid;
4250     m.msg.reply_port = 0;
4251     m.msg.type = _NXT_PORT_MSG_NEW_PORT;
4252     m.msg.last = 0;
4253     m.msg.mmap = 0;
4254     m.msg.nf = 0;
4255     m.msg.mf = 0;
4256     m.msg.tracking = 0;
4257 
4258     m.new_port.id = new_port->id;
4259     m.new_port.pid = new_port->pid;
4260     m.new_port.type = NXT_PROCESS_WORKER;
4261     m.new_port.max_size = 16 * 1024;
4262     m.new_port.max_share = 64 * 1024;
4263 
4264     memset(&cmsg, 0, sizeof(cmsg));
4265 
4266     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
4267     cmsg.cm.cmsg_level = SOL_SOCKET;
4268     cmsg.cm.cmsg_type = SCM_RIGHTS;
4269 
4270     /*
4271      * memcpy() is used instead of simple
4272      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
4273      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
4274      *   dereferencing type-punned pointer will break strict-aliasing rules
4275      *
4276      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
4277      * in the same simple assignment as in the code above.
4278      */
4279     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
4280 
4281     res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m),
4282                                    &cmsg, sizeof(cmsg));
4283 
4284     return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
4285 }
4286 
4287 
4288 int
4289 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
4290 {
4291     int                   rc;
4292     nxt_unit_impl_t       *lib;
4293     nxt_unit_process_t    *process;
4294     nxt_unit_port_impl_t  *new_port;
4295 
4296     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4297 
4298     nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
4299                    port->id.pid, port->id.id,
4300                    port->in_fd, port->out_fd);
4301 
4302     pthread_mutex_lock(&lib->mutex);
4303 
4304     process = nxt_unit_process_get(ctx, port->id.pid);
4305     if (nxt_slow_path(process == NULL)) {
4306         rc = NXT_UNIT_ERROR;
4307         goto unlock;
4308     }
4309 
4310     if (port->id.id >= process->next_port_id) {
4311         process->next_port_id = port->id.id + 1;
4312     }
4313 
4314     new_port = malloc(sizeof(nxt_unit_port_impl_t));
4315     if (nxt_slow_path(new_port == NULL)) {
4316         rc = NXT_UNIT_ERROR;
4317         goto unlock;
4318     }
4319 
4320     new_port->port = *port;
4321 
4322     rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
4323     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4324         goto unlock;
4325     }
4326 
4327     nxt_queue_insert_tail(&process->ports, &new_port->link);
4328 
4329     rc = NXT_UNIT_OK;
4330 
4331     new_port->process = process;
4332 
4333 unlock:
4334 
4335     pthread_mutex_unlock(&lib->mutex);
4336 
4337     if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) {
4338         nxt_unit_process_use(ctx, process, -1);
4339     }
4340 
4341     return rc;
4342 }
4343 
4344 
4345 void
4346 nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
4347 {
4348     nxt_unit_find_remove_port(ctx, port_id, NULL);
4349 }
4350 
4351 
4352 void
4353 nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
4354     nxt_unit_port_t *r_port)
4355 {
4356     nxt_unit_impl_t     *lib;
4357     nxt_unit_process_t  *process;
4358 
4359     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4360 
4361     pthread_mutex_lock(&lib->mutex);
4362 
4363     process = NULL;
4364 
4365     nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process);
4366 
4367     pthread_mutex_unlock(&lib->mutex);
4368 
4369     if (nxt_slow_path(process != NULL)) {
4370         nxt_unit_process_use(ctx, process, -1);
4371     }
4372 }
4373 
4374 
4375 static void
4376 nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
4377     nxt_unit_port_t *r_port, nxt_unit_process_t **process)
4378 {
4379     nxt_unit_impl_t       *lib;
4380     nxt_unit_port_impl_t  *port;
4381 
4382     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4383 
4384     port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
4385     if (nxt_slow_path(port == NULL)) {
4386         nxt_unit_debug(ctx, "remove_port: port %d,%d not found",
4387                        (int) port_id->pid, (int) port_id->id);
4388 
4389         return;
4390     }
4391 
4392     nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p",
4393                    (int) port_id->pid, (int) port_id->id,
4394                    port->port.in_fd, port->port.out_fd, port->port.data);
4395 
4396     if (port->port.in_fd != -1) {
4397         close(port->port.in_fd);
4398     }
4399 
4400     if (port->port.out_fd != -1) {
4401         close(port->port.out_fd);
4402     }
4403 
4404     if (port->process != NULL) {
4405         nxt_queue_remove(&port->link);
4406     }
4407 
4408     if (process != NULL) {
4409         *process = port->process;
4410     }
4411 
4412     if (r_port != NULL) {
4413         *r_port = port->port;
4414     }
4415 
4416     free(port);
4417 }
4418 
4419 
4420 void
4421 nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid)
4422 {
4423     nxt_unit_impl_t     *lib;
4424     nxt_unit_process_t  *process;
4425 
4426     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4427 
4428     pthread_mutex_lock(&lib->mutex);
4429 
4430     process = nxt_unit_process_find(ctx, pid, 1);
4431     if (nxt_slow_path(process == NULL)) {
4432         nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid);
4433 
4434         pthread_mutex_unlock(&lib->mutex);
4435 
4436         return;
4437     }
4438 
4439     nxt_unit_remove_process(ctx, process);
4440 }
4441 
4442 
4443 static void
4444 nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process)
4445 {
4446     nxt_queue_t           ports;
4447     nxt_unit_impl_t       *lib;
4448     nxt_unit_port_impl_t  *port;
4449 
4450     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4451 
4452     nxt_queue_init(&ports);
4453 
4454     nxt_queue_add(&ports, &process->ports);
4455 
4456     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
4457 
4458         nxt_unit_process_use(ctx, process, -1);
4459         port->process = NULL;
4460 
4461         /* Shortcut for default callback. */
4462         if (lib->callbacks.remove_port == nxt_unit_remove_port) {
4463             nxt_queue_remove(&port->link);
4464 
4465             nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL);
4466         }
4467 
4468     } nxt_queue_loop;
4469 
4470     pthread_mutex_unlock(&lib->mutex);
4471 
4472     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
4473 
4474         nxt_queue_remove(&port->link);
4475 
4476         lib->callbacks.remove_port(ctx, &port->port.id);
4477 
4478     } nxt_queue_loop;
4479 
4480     nxt_unit_process_use(ctx, process, -1);
4481 }
4482 
4483 
4484 void
4485 nxt_unit_quit(nxt_unit_ctx_t *ctx)
4486 {
4487     nxt_unit_impl_t  *lib;
4488 
4489     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4490 
4491     lib->online = 0;
4492 }
4493 
4494 
4495 static ssize_t
4496 nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
4497     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
4498 {
4499     int                   fd;
4500     nxt_unit_impl_t       *lib;
4501     nxt_unit_port_impl_t  *port;
4502 
4503     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4504 
4505     pthread_mutex_lock(&lib->mutex);
4506 
4507     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
4508 
4509     if (nxt_fast_path(port != NULL)) {
4510         fd = port->port.out_fd;
4511 
4512     } else {
4513         nxt_unit_warn(ctx, "port_send: port %d,%d not found",
4514                       (int) port_id->pid, (int) port_id->id);
4515         fd = -1;
4516     }
4517 
4518     pthread_mutex_unlock(&lib->mutex);
4519 
4520     if (nxt_slow_path(fd == -1)) {
4521         if (port != NULL) {
4522             nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1",
4523                           (int) port_id->pid, (int) port_id->id);
4524         }
4525 
4526         return -1;
4527     }
4528 
4529     nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d",
4530                    (int) port_id->pid, (int) port_id->id, fd);
4531 
4532     return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size);
4533 }
4534 
4535 
4536 ssize_t
4537 nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd,
4538     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
4539 {
4540     ssize_t        res;
4541     struct iovec   iov[1];
4542     struct msghdr  msg;
4543 
4544     iov[0].iov_base = (void *) buf;
4545     iov[0].iov_len = buf_size;
4546 
4547     msg.msg_name = NULL;
4548     msg.msg_namelen = 0;
4549     msg.msg_iov = iov;
4550     msg.msg_iovlen = 1;
4551     msg.msg_flags = 0;
4552     msg.msg_control = (void *) oob;
4553     msg.msg_controllen = oob_size;
4554 
4555     res = sendmsg(fd, &msg, 0);
4556 
4557     if (nxt_slow_path(res == -1)) {
4558         nxt_unit_warn(ctx, "port_send(%d, %d) failed: %s (%d)",
4559                       fd, (int) buf_size, strerror(errno), errno);
4560 
4561     } else {
4562         nxt_unit_debug(ctx, "port_send(%d, %d): %d", fd, (int) buf_size,
4563                        (int) res);
4564     }
4565 
4566     return res;
4567 }
4568 
4569 
4570 static ssize_t
4571 nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
4572     void *buf, size_t buf_size, void *oob, size_t oob_size)
4573 {
4574     int                   fd;
4575     nxt_unit_impl_t       *lib;
4576     nxt_unit_ctx_impl_t   *ctx_impl;
4577     nxt_unit_port_impl_t  *port;
4578 
4579     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4580 
4581     pthread_mutex_lock(&lib->mutex);
4582 
4583     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
4584 
4585     if (nxt_fast_path(port != NULL)) {
4586         fd = port->port.in_fd;
4587 
4588     } else {
4589         nxt_unit_debug(ctx, "port_recv: port %d,%d not found",
4590                        (int) port_id->pid, (int) port_id->id);
4591         fd = -1;
4592     }
4593 
4594     pthread_mutex_unlock(&lib->mutex);
4595 
4596     if (nxt_slow_path(fd == -1)) {
4597         return -1;
4598     }
4599 
4600     nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d",
4601                    (int) port_id->pid, (int) port_id->id, fd);
4602 
4603     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4604 
4605     if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) {
4606         ctx_impl->read_port_fd = fd;
4607     }
4608 
4609     return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size);
4610 }
4611 
4612 
4613 ssize_t
4614 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size,
4615     void *oob, size_t oob_size)
4616 {
4617     ssize_t        res;
4618     struct iovec   iov[1];
4619     struct msghdr  msg;
4620 
4621     iov[0].iov_base = buf;
4622     iov[0].iov_len = buf_size;
4623 
4624     msg.msg_name = NULL;
4625     msg.msg_namelen = 0;
4626     msg.msg_iov = iov;
4627     msg.msg_iovlen = 1;
4628     msg.msg_flags = 0;
4629     msg.msg_control = oob;
4630     msg.msg_controllen = oob_size;
4631 
4632     res = recvmsg(fd, &msg, 0);
4633 
4634     if (nxt_slow_path(res == -1)) {
4635         nxt_unit_warn(ctx, "port_recv(%d) failed: %s (%d)",
4636                       fd, strerror(errno), errno);
4637 
4638     } else {
4639         nxt_unit_debug(ctx, "port_recv(%d): %d", fd, (int) res);
4640     }
4641 
4642     return res;
4643 }
4644 
4645 
4646 static nxt_int_t
4647 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
4648 {
4649     nxt_unit_port_t          *port;
4650     nxt_unit_port_hash_id_t  *port_id;
4651 
4652     port = data;
4653     port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
4654 
4655     if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
4656         && port_id->pid == port->id.pid
4657         && port_id->id == port->id.id)
4658     {
4659         return NXT_OK;
4660     }
4661 
4662     return NXT_DECLINED;
4663 }
4664 
4665 
4666 static const nxt_lvlhsh_proto_t  lvlhsh_ports_proto  nxt_aligned(64) = {
4667     NXT_LVLHSH_DEFAULT,
4668     nxt_unit_port_hash_test,
4669     nxt_lvlhsh_alloc,
4670     nxt_lvlhsh_free,
4671 };
4672 
4673 
4674 static inline void
4675 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
4676     nxt_unit_port_hash_id_t *port_hash_id,
4677     nxt_unit_port_id_t *port_id)
4678 {
4679     port_hash_id->pid = port_id->pid;
4680     port_hash_id->id = port_id->id;
4681 
4682     if (nxt_fast_path(port_id->hash != 0)) {
4683         lhq->key_hash = port_id->hash;
4684 
4685     } else {
4686         lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
4687 
4688         port_id->hash = lhq->key_hash;
4689 
4690         nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
4691                        (int) port_id->pid, (int) port_id->id,
4692                        (int) port_id->hash);
4693     }
4694 
4695     lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
4696     lhq->key.start = (u_char *) port_hash_id;
4697     lhq->proto = &lvlhsh_ports_proto;
4698     lhq->pool = NULL;
4699 }
4700 
4701 
4702 static int
4703 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
4704 {
4705     nxt_int_t                res;
4706     nxt_lvlhsh_query_t       lhq;
4707     nxt_unit_port_hash_id_t  port_hash_id;
4708 
4709     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
4710     lhq.replace = 0;
4711     lhq.value = port;
4712 
4713     res = nxt_lvlhsh_insert(port_hash, &lhq);
4714 
4715     switch (res) {
4716 
4717     case NXT_OK:
4718         return NXT_UNIT_OK;
4719 
4720     default:
4721         return NXT_UNIT_ERROR;
4722     }
4723 }
4724 
4725 
4726 static nxt_unit_port_impl_t *
4727 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
4728     int remove)
4729 {
4730     nxt_int_t                res;
4731     nxt_lvlhsh_query_t       lhq;
4732     nxt_unit_port_hash_id_t  port_hash_id;
4733 
4734     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
4735 
4736     if (remove) {
4737         res = nxt_lvlhsh_delete(port_hash, &lhq);
4738 
4739     } else {
4740         res = nxt_lvlhsh_find(port_hash, &lhq);
4741     }
4742 
4743     switch (res) {
4744 
4745     case NXT_OK:
4746         return lhq.value;
4747 
4748     default:
4749         return NULL;
4750     }
4751 }
4752 
4753 
4754 static nxt_int_t
4755 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
4756 {
4757     return NXT_OK;
4758 }
4759 
4760 
4761 static const nxt_lvlhsh_proto_t  lvlhsh_requests_proto  nxt_aligned(64) = {
4762     NXT_LVLHSH_DEFAULT,
4763     nxt_unit_request_hash_test,
4764     nxt_lvlhsh_alloc,
4765     nxt_lvlhsh_free,
4766 };
4767 
4768 
4769 static int
4770 nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
4771     nxt_unit_request_info_impl_t *req_impl)
4772 {
4773     uint32_t            *stream;
4774     nxt_int_t           res;
4775     nxt_lvlhsh_query_t  lhq;
4776 
4777     stream = &req_impl->stream;
4778 
4779     lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
4780     lhq.key.length = sizeof(*stream);
4781     lhq.key.start = (u_char *) stream;
4782     lhq.proto = &lvlhsh_requests_proto;
4783     lhq.pool = NULL;
4784     lhq.replace = 0;
4785     lhq.value = req_impl;
4786 
4787     res = nxt_lvlhsh_insert(request_hash, &lhq);
4788 
4789     switch (res) {
4790 
4791     case NXT_OK:
4792         return NXT_UNIT_OK;
4793 
4794     default:
4795         return NXT_UNIT_ERROR;
4796     }
4797 }
4798 
4799 
4800 static nxt_unit_request_info_impl_t *
4801 nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
4802     int remove)
4803 {
4804     nxt_int_t           res;
4805     nxt_lvlhsh_query_t  lhq;
4806 
4807     lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
4808     lhq.key.length = sizeof(stream);
4809     lhq.key.start = (u_char *) &stream;
4810     lhq.proto = &lvlhsh_requests_proto;
4811     lhq.pool = NULL;
4812 
4813     if (remove) {
4814         res = nxt_lvlhsh_delete(request_hash, &lhq);
4815 
4816     } else {
4817         res = nxt_lvlhsh_find(request_hash, &lhq);
4818     }
4819 
4820     switch (res) {
4821 
4822     case NXT_OK:
4823         return lhq.value;
4824 
4825     default:
4826         return NULL;
4827     }
4828 }
4829 
4830 
4831 void
4832 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
4833 {
4834     int              log_fd, n;
4835     char             msg[NXT_MAX_ERROR_STR], *p, *end;
4836     pid_t            pid;
4837     va_list          ap;
4838     nxt_unit_impl_t  *lib;
4839 
4840     if (nxt_fast_path(ctx != NULL)) {
4841         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4842 
4843         pid = lib->pid;
4844         log_fd = lib->log_fd;
4845 
4846     } else {
4847         pid = getpid();
4848         log_fd = STDERR_FILENO;
4849     }
4850 
4851     p = msg;
4852     end = p + sizeof(msg) - 1;
4853 
4854     p = nxt_unit_snprint_prefix(p, end, pid, level);
4855 
4856     va_start(ap, fmt);
4857     p += vsnprintf(p, end - p, fmt, ap);
4858     va_end(ap);
4859 
4860     if (nxt_slow_path(p > end)) {
4861         memcpy(end - 5, "[...]", 5);
4862         p = end;
4863     }
4864 
4865     *p++ = '\n';
4866 
4867     n = write(log_fd, msg, p - msg);
4868     if (nxt_slow_path(n < 0)) {
4869         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
4870     }
4871 }
4872 
4873 
4874 void
4875 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
4876 {
4877     int                           log_fd, n;
4878     char                          msg[NXT_MAX_ERROR_STR], *p, *end;
4879     pid_t                         pid;
4880     va_list                       ap;
4881     nxt_unit_impl_t               *lib;
4882     nxt_unit_request_info_impl_t  *req_impl;
4883 
4884     if (nxt_fast_path(req != NULL)) {
4885         lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
4886 
4887         pid = lib->pid;
4888         log_fd = lib->log_fd;
4889 
4890     } else {
4891         pid = getpid();
4892         log_fd = STDERR_FILENO;
4893     }
4894 
4895     p = msg;
4896     end = p + sizeof(msg) - 1;
4897 
4898     p = nxt_unit_snprint_prefix(p, end, pid, level);
4899 
4900     if (nxt_fast_path(req != NULL)) {
4901         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
4902 
4903         p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
4904     }
4905 
4906     va_start(ap, fmt);
4907     p += vsnprintf(p, end - p, fmt, ap);
4908     va_end(ap);
4909 
4910     if (nxt_slow_path(p > end)) {
4911         memcpy(end - 5, "[...]", 5);
4912         p = end;
4913     }
4914 
4915     *p++ = '\n';
4916 
4917     n = write(log_fd, msg, p - msg);
4918     if (nxt_slow_path(n < 0)) {
4919         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
4920     }
4921 }
4922 
4923 
4924 static const char * nxt_unit_log_levels[] = {
4925     "alert",
4926     "error",
4927     "warn",
4928     "notice",
4929     "info",
4930     "debug",
4931 };
4932 
4933 
4934 static char *
4935 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
4936 {
4937     struct tm        tm;
4938     struct timespec  ts;
4939 
4940     (void) clock_gettime(CLOCK_REALTIME, &ts);
4941 
4942 #if (NXT_HAVE_LOCALTIME_R)
4943     (void) localtime_r(&ts.tv_sec, &tm);
4944 #else
4945     tm = *localtime(&ts.tv_sec);
4946 #endif
4947 
4948     p += snprintf(p, end - p,
4949                   "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
4950                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
4951                   tm.tm_hour, tm.tm_min, tm.tm_sec,
4952                   (int) ts.tv_nsec / 1000000);
4953 
4954     p += snprintf(p, end - p,
4955                   "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
4956                   (int) pid,
4957                   (uint64_t) (uintptr_t) nxt_thread_get_tid());
4958 
4959     return p;
4960 }
4961 
4962 
4963 /* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */
4964 
4965 void *
4966 nxt_memalign(size_t alignment, size_t size)
4967 {
4968     void        *p;
4969     nxt_err_t   err;
4970 
4971     err = posix_memalign(&p, alignment, size);
4972 
4973     if (nxt_fast_path(err == 0)) {
4974         return p;
4975     }
4976 
4977     return NULL;
4978 }
4979 
4980 #if (NXT_DEBUG)
4981 
4982 void
4983 nxt_free(void *p)
4984 {
4985     free(p);
4986 }
4987 
4988 #endif
4989