xref: /unit/src/nxt_unit.c (revision 1182:325b315e48c4)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <stdlib.h>
7 
8 #include "nxt_main.h"
9 #include "nxt_port_memory_int.h"
10 
11 #include "nxt_unit.h"
12 #include "nxt_unit_request.h"
13 #include "nxt_unit_response.h"
14 #include "nxt_unit_websocket.h"
15 
16 #include "nxt_websocket.h"
17 
18 #if (NXT_HAVE_MEMFD_CREATE)
19 #include <linux/memfd.h>
20 #endif
21 
22 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
23 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
24 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
25 typedef struct nxt_unit_process_s               nxt_unit_process_t;
26 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
27 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
28 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
29 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
30 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
31 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
32 
33 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
34 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
35     nxt_unit_ctx_impl_t *ctx_impl, void *data);
36 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
37     nxt_unit_mmap_buf_t *mmap_buf);
38 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
39     nxt_unit_mmap_buf_t *mmap_buf);
40 nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf);
41 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
42     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream);
43 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
44     uint32_t stream);
45 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
46     nxt_unit_recv_msg_t *recv_msg);
47 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
48     nxt_unit_recv_msg_t *recv_msg);
49 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
50     nxt_unit_recv_msg_t *recv_msg);
51 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
52     nxt_unit_ctx_t *ctx);
53 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
54 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
55 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
56     nxt_unit_ctx_t *ctx);
57 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
58 static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
59 static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
60     nxt_unit_recv_msg_t *recv_msg);
61 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
62 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
63 static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
64     nxt_unit_mmap_buf_t *mmap_buf, int last);
65 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
66 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
67     size_t size);
68 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
69     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
70     nxt_chunk_id_t *c, int n);
71 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
72 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
73     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
74 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
75     int fd);
76 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
77     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
78     nxt_unit_mmap_buf_t *mmap_buf);
79 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
80 
81 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
82 static void nxt_unit_process_use(nxt_unit_ctx_t *ctx,
83     nxt_unit_process_t *process, int i);
84 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
85 static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
86     nxt_unit_process_t *process, uint32_t id);
87 static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
88     nxt_unit_recv_msg_t *recv_msg);
89 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
90     nxt_unit_recv_msg_t *recv_msg);
91 static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start,
92     uint32_t size);
93 
94 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
95     pid_t pid);
96 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
97     pid_t pid, int remove);
98 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
99 static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
100     nxt_unit_port_id_t *port_id, int *fd);
101 
102 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
103     nxt_unit_port_id_t *new_port, int fd);
104 
105 static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx,
106     nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port,
107     nxt_unit_process_t **process);
108 static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx,
109     nxt_unit_process_t *process);
110 
111 static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx,
112     nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size,
113     const void *oob, size_t oob_size);
114 static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx,
115     nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
116     void *oob, size_t oob_size);
117 
118 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
119     nxt_unit_port_t *port);
120 static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
121     nxt_unit_port_id_t *port_id, int remove);
122 
123 static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
124     nxt_unit_request_info_impl_t *req_impl);
125 static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
126     nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
127 
128 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
129 
130 
131 struct nxt_unit_mmap_buf_s {
132     nxt_unit_buf_t           buf;
133 
134     nxt_unit_mmap_buf_t      *next;
135     nxt_unit_mmap_buf_t      **prev;
136 
137     nxt_port_mmap_header_t   *hdr;
138 //    nxt_queue_link_t         link;
139     nxt_unit_port_id_t       port_id;
140     nxt_unit_request_info_t  *req;
141     nxt_unit_ctx_impl_t      *ctx_impl;
142 };
143 
144 
145 struct nxt_unit_recv_msg_s {
146     uint32_t                 stream;
147     nxt_pid_t                pid;
148     nxt_port_id_t            reply_port;
149 
150     uint8_t                  last;      /* 1 bit */
151     uint8_t                  mmap;      /* 1 bit */
152 
153     void                     *start;
154     uint32_t                 size;
155 
156     int                      fd;
157     nxt_unit_process_t       *process;
158 
159     nxt_unit_mmap_buf_t      *incoming_buf;
160 };
161 
162 
163 typedef enum {
164     NXT_UNIT_RS_START           = 0,
165     NXT_UNIT_RS_RESPONSE_INIT,
166     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
167     NXT_UNIT_RS_RESPONSE_SENT,
168     NXT_UNIT_RS_RELEASED,
169 } nxt_unit_req_state_t;
170 
171 
172 struct nxt_unit_request_info_impl_s {
173     nxt_unit_request_info_t  req;
174 
175     uint32_t                 stream;
176 
177     nxt_unit_process_t       *process;
178 
179     nxt_unit_mmap_buf_t      *outgoing_buf;
180     nxt_unit_mmap_buf_t      *incoming_buf;
181 
182     nxt_unit_req_state_t     state;
183     uint8_t                  websocket;
184 
185     nxt_queue_link_t         link;
186 
187     char                     extra_data[];
188 };
189 
190 
191 struct nxt_unit_websocket_frame_impl_s {
192     nxt_unit_websocket_frame_t  ws;
193 
194     nxt_unit_mmap_buf_t         *buf;
195 
196     nxt_queue_link_t            link;
197 
198     nxt_unit_ctx_impl_t         *ctx_impl;
199 
200     void                        *retain_buf;
201 };
202 
203 
204 struct nxt_unit_ctx_impl_s {
205     nxt_unit_ctx_t                ctx;
206 
207     pthread_mutex_t               mutex;
208 
209     nxt_unit_port_id_t            read_port_id;
210     int                           read_port_fd;
211 
212     nxt_queue_link_t              link;
213 
214     nxt_unit_mmap_buf_t           *free_buf;
215 
216     /*  of nxt_unit_request_info_impl_t */
217     nxt_queue_t                   free_req;
218 
219     /*  of nxt_unit_websocket_frame_impl_t */
220     nxt_queue_t                   free_ws;
221 
222     /*  of nxt_unit_request_info_impl_t */
223     nxt_queue_t                   active_req;
224 
225     /*  of nxt_unit_request_info_impl_t */
226     nxt_lvlhsh_t                  requests;
227 
228     nxt_unit_mmap_buf_t           ctx_buf[2];
229 
230     nxt_unit_request_info_impl_t  req;
231 };
232 
233 
234 struct nxt_unit_impl_s {
235     nxt_unit_t               unit;
236     nxt_unit_callbacks_t     callbacks;
237 
238     uint32_t                 request_data_size;
239 
240     pthread_mutex_t          mutex;
241 
242     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
243     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
244 
245     nxt_unit_port_id_t       ready_port_id;
246 
247     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
248 
249     pid_t                    pid;
250     int                      log_fd;
251     int                      online;
252 
253     nxt_unit_ctx_impl_t      main_ctx;
254 };
255 
256 
257 struct nxt_unit_port_impl_s {
258     nxt_unit_port_t          port;
259 
260     nxt_queue_link_t         link;
261     nxt_unit_process_t       *process;
262 };
263 
264 
265 struct nxt_unit_mmap_s {
266     nxt_port_mmap_header_t   *hdr;
267 };
268 
269 
270 struct nxt_unit_mmaps_s {
271     pthread_mutex_t          mutex;
272     uint32_t                 size;
273     uint32_t                 cap;
274     nxt_unit_mmap_t          *elts;
275 };
276 
277 
278 struct nxt_unit_process_s {
279     pid_t                    pid;
280 
281     nxt_queue_t              ports;
282 
283     nxt_unit_mmaps_t         incoming;
284     nxt_unit_mmaps_t         outgoing;
285 
286     nxt_unit_impl_t          *lib;
287 
288     nxt_atomic_t             use_count;
289 
290     uint32_t                 next_port_id;
291 };
292 
293 
294 /* Explicitly using 32 bit types to avoid possible alignment. */
295 typedef struct {
296     int32_t   pid;
297     uint32_t  id;
298 } nxt_unit_port_hash_id_t;
299 
300 
301 nxt_unit_ctx_t *
302 nxt_unit_init(nxt_unit_init_t *init)
303 {
304     int              rc;
305     uint32_t         ready_stream;
306     nxt_unit_ctx_t   *ctx;
307     nxt_unit_impl_t  *lib;
308     nxt_unit_port_t  ready_port, read_port;
309 
310     lib = nxt_unit_create(init);
311     if (nxt_slow_path(lib == NULL)) {
312         return NULL;
313     }
314 
315     if (init->ready_port.id.pid != 0
316         && init->ready_stream != 0
317         && init->read_port.id.pid != 0)
318     {
319         ready_port = init->ready_port;
320         ready_stream = init->ready_stream;
321         read_port = init->read_port;
322         lib->log_fd = init->log_fd;
323 
324         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
325                               ready_port.id.id);
326         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
327                               read_port.id.id);
328     } else {
329         rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd,
330                                &ready_stream);
331         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
332             goto fail;
333         }
334     }
335 
336     lib->pid = read_port.id.pid;
337     ctx = &lib->main_ctx.ctx;
338 
339     rc = lib->callbacks.add_port(ctx, &ready_port);
340     if (rc != NXT_UNIT_OK) {
341         nxt_unit_alert(NULL, "failed to add ready_port");
342 
343         goto fail;
344     }
345 
346     rc = lib->callbacks.add_port(ctx, &read_port);
347     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
348         nxt_unit_alert(NULL, "failed to add read_port");
349 
350         goto fail;
351     }
352 
353     lib->main_ctx.read_port_id = read_port.id;
354     lib->ready_port_id = ready_port.id;
355 
356     rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream);
357     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
358         nxt_unit_alert(NULL, "failed to send READY message");
359 
360         goto fail;
361     }
362 
363     return ctx;
364 
365 fail:
366 
367     free(lib);
368 
369     return NULL;
370 }
371 
372 
373 static nxt_unit_impl_t *
374 nxt_unit_create(nxt_unit_init_t *init)
375 {
376     int                   rc;
377     nxt_unit_impl_t       *lib;
378     nxt_unit_callbacks_t  *cb;
379 
380     lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size);
381     if (nxt_slow_path(lib == NULL)) {
382         nxt_unit_alert(NULL, "failed to allocate unit struct");
383 
384         return NULL;
385     }
386 
387     rc = pthread_mutex_init(&lib->mutex, NULL);
388     if (nxt_slow_path(rc != 0)) {
389         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
390 
391         goto fail;
392     }
393 
394     lib->unit.data = init->data;
395     lib->callbacks = init->callbacks;
396 
397     lib->request_data_size = init->request_data_size;
398 
399     lib->processes.slot = NULL;
400     lib->ports.slot = NULL;
401 
402     lib->log_fd = STDERR_FILENO;
403     lib->online = 1;
404 
405     nxt_queue_init(&lib->contexts);
406 
407     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
408     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
409         goto fail;
410     }
411 
412     cb = &lib->callbacks;
413 
414     if (cb->request_handler == NULL) {
415         nxt_unit_alert(NULL, "request_handler is NULL");
416 
417         goto fail;
418     }
419 
420     if (cb->add_port == NULL) {
421         cb->add_port = nxt_unit_add_port;
422     }
423 
424     if (cb->remove_port == NULL) {
425         cb->remove_port = nxt_unit_remove_port;
426     }
427 
428     if (cb->remove_pid == NULL) {
429         cb->remove_pid = nxt_unit_remove_pid;
430     }
431 
432     if (cb->quit == NULL) {
433         cb->quit = nxt_unit_quit;
434     }
435 
436     if (cb->port_send == NULL) {
437         cb->port_send = nxt_unit_port_send_default;
438     }
439 
440     if (cb->port_recv == NULL) {
441         cb->port_recv = nxt_unit_port_recv_default;
442     }
443 
444     return lib;
445 
446 fail:
447 
448     free(lib);
449 
450     return NULL;
451 }
452 
453 
454 static int
455 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
456     void *data)
457 {
458     int  rc;
459 
460     ctx_impl->ctx.data = data;
461     ctx_impl->ctx.unit = &lib->unit;
462 
463     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
464 
465     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
466     if (nxt_slow_path(rc != 0)) {
467         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
468 
469         return NXT_UNIT_ERROR;
470     }
471 
472     nxt_queue_init(&ctx_impl->free_req);
473     nxt_queue_init(&ctx_impl->free_ws);
474     nxt_queue_init(&ctx_impl->active_req);
475 
476     ctx_impl->free_buf = NULL;
477     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
478     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
479 
480     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
481 
482     ctx_impl->req.req.ctx = &ctx_impl->ctx;
483     ctx_impl->req.req.unit = &lib->unit;
484 
485     ctx_impl->read_port_fd = -1;
486     ctx_impl->requests.slot = 0;
487 
488     return NXT_UNIT_OK;
489 }
490 
491 
492 nxt_inline void
493 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
494     nxt_unit_mmap_buf_t *mmap_buf)
495 {
496     mmap_buf->next = *head;
497 
498     if (mmap_buf->next != NULL) {
499         mmap_buf->next->prev = &mmap_buf->next;
500     }
501 
502     *head = mmap_buf;
503     mmap_buf->prev = head;
504 }
505 
506 
507 nxt_inline void
508 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
509     nxt_unit_mmap_buf_t *mmap_buf)
510 {
511     while (*prev != NULL) {
512         prev = &(*prev)->next;
513     }
514 
515     nxt_unit_mmap_buf_insert(prev, mmap_buf);
516 }
517 
518 
519 nxt_inline void
520 nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf)
521 {
522     nxt_unit_mmap_buf_t  **prev;
523 
524     prev = mmap_buf->prev;
525 
526     if (mmap_buf->next != NULL) {
527         mmap_buf->next->prev = prev;
528     }
529 
530     if (prev != NULL) {
531         *prev = mmap_buf->next;
532     }
533 }
534 
535 
536 static int
537 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
538     int *log_fd, uint32_t *stream)
539 {
540     int       rc;
541     int       ready_fd, read_fd;
542     char      *unit_init, *version_end;
543     long      version_length;
544     int64_t   ready_pid, read_pid;
545     uint32_t  ready_stream, ready_id, read_id;
546 
547     unit_init = getenv(NXT_UNIT_INIT_ENV);
548     if (nxt_slow_path(unit_init == NULL)) {
549         nxt_unit_alert(NULL, "%s is not in the current environment",
550                        NXT_UNIT_INIT_ENV);
551 
552         return NXT_UNIT_ERROR;
553     }
554 
555     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
556 
557     version_length = nxt_length(NXT_VERSION);
558 
559     version_end = strchr(unit_init, ';');
560     if (version_end == NULL
561         || version_end - unit_init != version_length
562         || memcmp(unit_init, NXT_VERSION, version_length) != 0)
563     {
564         nxt_unit_alert(NULL, "version check error");
565 
566         return NXT_UNIT_ERROR;
567     }
568 
569     rc = sscanf(version_end + 1,
570                 "%"PRIu32";"
571                 "%"PRId64",%"PRIu32",%d;"
572                 "%"PRId64",%"PRIu32",%d;"
573                 "%d",
574                 &ready_stream,
575                 &ready_pid, &ready_id, &ready_fd,
576                 &read_pid, &read_id, &read_fd,
577                 log_fd);
578 
579     if (nxt_slow_path(rc != 8)) {
580         nxt_unit_alert(NULL, "failed to scan variables");
581 
582         return NXT_UNIT_ERROR;
583     }
584 
585     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
586 
587     ready_port->in_fd = -1;
588     ready_port->out_fd = ready_fd;
589     ready_port->data = NULL;
590 
591     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
592 
593     read_port->in_fd = read_fd;
594     read_port->out_fd = -1;
595     read_port->data = NULL;
596 
597     *stream = ready_stream;
598 
599     return NXT_UNIT_OK;
600 }
601 
602 
603 static int
604 nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
605     uint32_t stream)
606 {
607     ssize_t          res;
608     nxt_port_msg_t   msg;
609     nxt_unit_impl_t  *lib;
610 
611     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
612 
613     msg.stream = stream;
614     msg.pid = lib->pid;
615     msg.reply_port = 0;
616     msg.type = _NXT_PORT_MSG_PROCESS_READY;
617     msg.last = 1;
618     msg.mmap = 0;
619     msg.nf = 0;
620     msg.mf = 0;
621     msg.tracking = 0;
622 
623     res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
624     if (res != sizeof(msg)) {
625         return NXT_UNIT_ERROR;
626     }
627 
628     return NXT_UNIT_OK;
629 }
630 
631 
632 int
633 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
634     void *buf, size_t buf_size, void *oob, size_t oob_size)
635 {
636     int                   rc;
637     pid_t                 pid;
638     struct cmsghdr        *cm;
639     nxt_port_msg_t        *port_msg;
640     nxt_unit_impl_t       *lib;
641     nxt_unit_recv_msg_t   recv_msg;
642     nxt_unit_callbacks_t  *cb;
643 
644     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
645 
646     rc = NXT_UNIT_ERROR;
647     recv_msg.fd = -1;
648     recv_msg.process = NULL;
649     port_msg = buf;
650     cm = oob;
651 
652     if (oob_size >= CMSG_SPACE(sizeof(int))
653         && cm->cmsg_len == CMSG_LEN(sizeof(int))
654         && cm->cmsg_level == SOL_SOCKET
655         && cm->cmsg_type == SCM_RIGHTS)
656     {
657         memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
658     }
659 
660     recv_msg.incoming_buf = NULL;
661 
662     if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
663         nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
664         goto fail;
665     }
666 
667     recv_msg.stream = port_msg->stream;
668     recv_msg.pid = port_msg->pid;
669     recv_msg.reply_port = port_msg->reply_port;
670     recv_msg.last = port_msg->last;
671     recv_msg.mmap = port_msg->mmap;
672 
673     recv_msg.start = port_msg + 1;
674     recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
675 
676     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
677         nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
678                       port_msg->stream, (int) port_msg->type);
679         goto fail;
680     }
681 
682     if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
683         rc = NXT_UNIT_OK;
684 
685         goto fail;
686     }
687 
688     /* Fragmentation is unsupported. */
689     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
690         nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
691                       port_msg->stream, (int) port_msg->type);
692         goto fail;
693     }
694 
695     if (port_msg->mmap) {
696         if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
697             goto fail;
698         }
699     }
700 
701     cb = &lib->callbacks;
702 
703     switch (port_msg->type) {
704 
705     case _NXT_PORT_MSG_QUIT:
706         nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
707 
708         cb->quit(ctx);
709         rc = NXT_UNIT_OK;
710         break;
711 
712     case _NXT_PORT_MSG_NEW_PORT:
713         rc = nxt_unit_process_new_port(ctx, &recv_msg);
714         break;
715 
716     case _NXT_PORT_MSG_CHANGE_FILE:
717         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
718                        port_msg->stream, recv_msg.fd);
719         break;
720 
721     case _NXT_PORT_MSG_MMAP:
722         if (nxt_slow_path(recv_msg.fd < 0)) {
723             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
724                            port_msg->stream, recv_msg.fd);
725 
726             goto fail;
727         }
728 
729         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
730         break;
731 
732     case _NXT_PORT_MSG_REQ_HEADERS:
733         rc = nxt_unit_process_req_headers(ctx, &recv_msg);
734         break;
735 
736     case _NXT_PORT_MSG_WEBSOCKET:
737         rc = nxt_unit_process_websocket(ctx, &recv_msg);
738         break;
739 
740     case _NXT_PORT_MSG_REMOVE_PID:
741         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
742             nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
743                           "(%d != %d)", port_msg->stream, (int) recv_msg.size,
744                           (int) sizeof(pid));
745 
746             goto fail;
747         }
748 
749         memcpy(&pid, recv_msg.start, sizeof(pid));
750 
751         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
752                        port_msg->stream, (int) pid);
753 
754         cb->remove_pid(ctx, pid);
755 
756         rc = NXT_UNIT_OK;
757         break;
758 
759     default:
760         nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
761                        port_msg->stream, (int) port_msg->type);
762 
763         goto fail;
764     }
765 
766 fail:
767 
768     if (recv_msg.fd != -1) {
769         close(recv_msg.fd);
770     }
771 
772     while (recv_msg.incoming_buf != NULL) {
773         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
774     }
775 
776     if (recv_msg.process != NULL) {
777         nxt_unit_process_use(ctx, recv_msg.process, -1);
778     }
779 
780     return rc;
781 }
782 
783 
784 static int
785 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
786 {
787     int                      nb;
788     nxt_unit_impl_t          *lib;
789     nxt_unit_port_t          new_port;
790     nxt_port_msg_new_port_t  *new_port_msg;
791 
792     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
793         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
794                       "invalid message size (%d)",
795                       recv_msg->stream, (int) recv_msg->size);
796 
797         return NXT_UNIT_ERROR;
798     }
799 
800     if (nxt_slow_path(recv_msg->fd < 0)) {
801         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
802                        recv_msg->stream, recv_msg->fd);
803 
804         return NXT_UNIT_ERROR;
805     }
806 
807     new_port_msg = recv_msg->start;
808 
809     nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
810                    recv_msg->stream, (int) new_port_msg->pid,
811                    (int) new_port_msg->id, recv_msg->fd);
812 
813     nb = 0;
814 
815     if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
816         nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
817                        "failed: %s (%d)", recv_msg->fd, strerror(errno), errno);
818 
819         return NXT_UNIT_ERROR;
820     }
821 
822     nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
823                           new_port_msg->id);
824 
825     new_port.in_fd = -1;
826     new_port.out_fd = recv_msg->fd;
827     new_port.data = NULL;
828 
829     recv_msg->fd = -1;
830 
831     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
832 
833     return lib->callbacks.add_port(ctx, &new_port);
834 }
835 
836 
837 static int
838 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
839 {
840     nxt_unit_impl_t               *lib;
841     nxt_unit_request_t            *r;
842     nxt_unit_mmap_buf_t           *b;
843     nxt_unit_request_info_t       *req;
844     nxt_unit_request_info_impl_t  *req_impl;
845 
846     if (nxt_slow_path(recv_msg->mmap == 0)) {
847         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
848                       recv_msg->stream);
849 
850         return NXT_UNIT_ERROR;
851     }
852 
853     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
854         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
855                       "%d expected", recv_msg->stream, (int) recv_msg->size,
856                       (int) sizeof(nxt_unit_request_t));
857 
858         return NXT_UNIT_ERROR;
859     }
860 
861     req_impl = nxt_unit_request_info_get(ctx);
862     if (nxt_slow_path(req_impl == NULL)) {
863         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
864                       recv_msg->stream);
865 
866         return NXT_UNIT_ERROR;
867     }
868 
869     req = &req_impl->req;
870 
871     nxt_unit_port_id_init(&req->response_port, recv_msg->pid,
872                           recv_msg->reply_port);
873 
874     req->request = recv_msg->start;
875 
876     b = recv_msg->incoming_buf;
877 
878     req->request_buf = &b->buf;
879     req->response = NULL;
880     req->response_buf = NULL;
881 
882     r = req->request;
883 
884     req->content_length = r->content_length;
885 
886     req->content_buf = req->request_buf;
887     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
888 
889     /* "Move" process reference to req_impl. */
890     req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
891     if (nxt_slow_path(req_impl->process == NULL)) {
892         return NXT_UNIT_ERROR;
893     }
894 
895     recv_msg->process = NULL;
896 
897     req_impl->stream = recv_msg->stream;
898 
899     req_impl->outgoing_buf = NULL;
900 
901     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
902         b->req = req;
903     }
904 
905     /* "Move" incoming buffer list to req_impl. */
906     req_impl->incoming_buf = recv_msg->incoming_buf;
907     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
908     recv_msg->incoming_buf = NULL;
909 
910     req->response_max_fields = 0;
911     req_impl->state = NXT_UNIT_RS_START;
912     req_impl->websocket = 0;
913 
914     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
915                    (int) r->method_length, nxt_unit_sptr_get(&r->method),
916                    (int) r->target_length, nxt_unit_sptr_get(&r->target),
917                    (int) r->content_length);
918 
919     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
920 
921     lib->callbacks.request_handler(req);
922 
923     return NXT_UNIT_OK;
924 }
925 
926 
927 static int
928 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
929 {
930     size_t                           hsize;
931     nxt_unit_impl_t                  *lib;
932     nxt_unit_mmap_buf_t              *b;
933     nxt_unit_ctx_impl_t              *ctx_impl;
934     nxt_unit_callbacks_t             *cb;
935     nxt_unit_request_info_t          *req;
936     nxt_unit_request_info_impl_t     *req_impl;
937     nxt_unit_websocket_frame_impl_t  *ws_impl;
938 
939     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
940 
941     req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
942                                           recv_msg->last);
943     if (req_impl == NULL) {
944         return NXT_UNIT_OK;
945     }
946 
947     req = &req_impl->req;
948 
949     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
950     cb = &lib->callbacks;
951 
952     if (cb->websocket_handler && recv_msg->size >= 2) {
953         ws_impl = nxt_unit_websocket_frame_get(ctx);
954         if (nxt_slow_path(ws_impl == NULL)) {
955             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
956                           req_impl->stream);
957 
958             return NXT_UNIT_ERROR;
959         }
960 
961         ws_impl->ws.req = req;
962 
963         ws_impl->buf = NULL;
964         ws_impl->retain_buf = NULL;
965 
966         if (recv_msg->mmap) {
967             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
968                 b->req = req;
969             }
970 
971             /* "Move" incoming buffer list to ws_impl. */
972             ws_impl->buf = recv_msg->incoming_buf;
973             ws_impl->buf->prev = &ws_impl->buf;
974             recv_msg->incoming_buf = NULL;
975 
976             b = ws_impl->buf;
977 
978         } else {
979             b = nxt_unit_mmap_buf_get(ctx);
980             if (nxt_slow_path(b == NULL)) {
981                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
982                                req_impl->stream);
983 
984                 nxt_unit_websocket_frame_release(&ws_impl->ws);
985 
986                 return NXT_UNIT_ERROR;
987             }
988 
989             b->hdr = NULL;
990             b->req = req;
991             b->buf.start = recv_msg->start;
992             b->buf.free = b->buf.start;
993             b->buf.end = b->buf.start + recv_msg->size;
994 
995             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
996         }
997 
998         ws_impl->ws.header = (void *) b->buf.start;
999         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1000             ws_impl->ws.header);
1001 
1002         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1003 
1004         if (ws_impl->ws.header->mask) {
1005             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1006 
1007         } else {
1008             ws_impl->ws.mask = NULL;
1009         }
1010 
1011         b->buf.free += hsize;
1012 
1013         ws_impl->ws.content_buf = &b->buf;
1014         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1015 
1016         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1017                            "payload_len=%"PRIu64,
1018                             ws_impl->ws.header->opcode,
1019                             ws_impl->ws.payload_len);
1020 
1021         cb->websocket_handler(&ws_impl->ws);
1022     }
1023 
1024     if (recv_msg->last) {
1025         req_impl->websocket = 0;
1026 
1027         if (cb->close_handler) {
1028             nxt_unit_req_debug(req, "close_handler");
1029 
1030             cb->close_handler(req);
1031 
1032         } else {
1033             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1034         }
1035     }
1036 
1037     return NXT_UNIT_OK;
1038 }
1039 
1040 
1041 static nxt_unit_request_info_impl_t *
1042 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1043 {
1044     nxt_unit_impl_t               *lib;
1045     nxt_queue_link_t              *lnk;
1046     nxt_unit_ctx_impl_t           *ctx_impl;
1047     nxt_unit_request_info_impl_t  *req_impl;
1048 
1049     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1050 
1051     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1052 
1053     pthread_mutex_lock(&ctx_impl->mutex);
1054 
1055     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1056         pthread_mutex_unlock(&ctx_impl->mutex);
1057 
1058         req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
1059                           + lib->request_data_size);
1060         if (nxt_slow_path(req_impl == NULL)) {
1061             return NULL;
1062         }
1063 
1064         req_impl->req.unit = ctx->unit;
1065         req_impl->req.ctx = ctx;
1066 
1067         pthread_mutex_lock(&ctx_impl->mutex);
1068 
1069     } else {
1070         lnk = nxt_queue_first(&ctx_impl->free_req);
1071         nxt_queue_remove(lnk);
1072 
1073         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1074     }
1075 
1076     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1077 
1078     pthread_mutex_unlock(&ctx_impl->mutex);
1079 
1080     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1081 
1082     return req_impl;
1083 }
1084 
1085 
1086 static void
1087 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1088 {
1089     nxt_unit_ctx_impl_t           *ctx_impl;
1090     nxt_unit_request_info_impl_t  *req_impl;
1091 
1092     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1093     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1094 
1095     req->response = NULL;
1096     req->response_buf = NULL;
1097 
1098     if (req_impl->websocket) {
1099         nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
1100 
1101         req_impl->websocket = 0;
1102     }
1103 
1104     while (req_impl->outgoing_buf != NULL) {
1105         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1106     }
1107 
1108     while (req_impl->incoming_buf != NULL) {
1109         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1110     }
1111 
1112     /*
1113      * Process release should go after buffers release to guarantee mmap
1114      * existence.
1115      */
1116     if (req_impl->process != NULL) {
1117         nxt_unit_process_use(req->ctx, req_impl->process, -1);
1118 
1119         req_impl->process = NULL;
1120     }
1121 
1122     pthread_mutex_lock(&ctx_impl->mutex);
1123 
1124     nxt_queue_remove(&req_impl->link);
1125 
1126     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1127 
1128     pthread_mutex_unlock(&ctx_impl->mutex);
1129 
1130     req_impl->state = NXT_UNIT_RS_RELEASED;
1131 }
1132 
1133 
1134 static void
1135 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1136 {
1137     nxt_unit_ctx_impl_t  *ctx_impl;
1138 
1139     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1140 
1141     nxt_queue_remove(&req_impl->link);
1142 
1143     if (req_impl != &ctx_impl->req) {
1144         free(req_impl);
1145     }
1146 }
1147 
1148 
1149 static nxt_unit_websocket_frame_impl_t *
1150 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1151 {
1152     nxt_queue_link_t                 *lnk;
1153     nxt_unit_ctx_impl_t              *ctx_impl;
1154     nxt_unit_websocket_frame_impl_t  *ws_impl;
1155 
1156     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1157 
1158     pthread_mutex_lock(&ctx_impl->mutex);
1159 
1160     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1161         pthread_mutex_unlock(&ctx_impl->mutex);
1162 
1163         ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
1164         if (nxt_slow_path(ws_impl == NULL)) {
1165             return NULL;
1166         }
1167 
1168     } else {
1169         lnk = nxt_queue_first(&ctx_impl->free_ws);
1170         nxt_queue_remove(lnk);
1171 
1172         pthread_mutex_unlock(&ctx_impl->mutex);
1173 
1174         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1175     }
1176 
1177     ws_impl->ctx_impl = ctx_impl;
1178 
1179     return ws_impl;
1180 }
1181 
1182 
1183 static void
1184 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1185 {
1186     nxt_unit_websocket_frame_impl_t  *ws_impl;
1187 
1188     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1189 
1190     while (ws_impl->buf != NULL) {
1191         nxt_unit_mmap_buf_free(ws_impl->buf);
1192     }
1193 
1194     ws->req = NULL;
1195 
1196     if (ws_impl->retain_buf != NULL) {
1197         free(ws_impl->retain_buf);
1198 
1199         ws_impl->retain_buf = NULL;
1200     }
1201 
1202     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1203 
1204     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1205 
1206     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1207 }
1208 
1209 
1210 static void
1211 nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
1212 {
1213     nxt_queue_remove(&ws_impl->link);
1214 
1215     free(ws_impl);
1216 }
1217 
1218 
1219 uint16_t
1220 nxt_unit_field_hash(const char *name, size_t name_length)
1221 {
1222     u_char      ch;
1223     uint32_t    hash;
1224     const char  *p, *end;
1225 
1226     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1227     end = name + name_length;
1228 
1229     for (p = name; p < end; p++) {
1230         ch = *p;
1231         hash = (hash << 4) + hash + nxt_lowcase(ch);
1232     }
1233 
1234     hash = (hash >> 16) ^ hash;
1235 
1236     return hash;
1237 }
1238 
1239 
1240 void
1241 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1242 {
1243     uint32_t            i, j;
1244     nxt_unit_field_t    *fields, f;
1245     nxt_unit_request_t  *r;
1246 
1247     nxt_unit_req_debug(req, "group_dup_fields");
1248 
1249     r = req->request;
1250     fields = r->fields;
1251 
1252     for (i = 0; i < r->fields_count; i++) {
1253 
1254         switch (fields[i].hash) {
1255         case NXT_UNIT_HASH_CONTENT_LENGTH:
1256             r->content_length_field = i;
1257             break;
1258 
1259         case NXT_UNIT_HASH_CONTENT_TYPE:
1260             r->content_type_field = i;
1261             break;
1262 
1263         case NXT_UNIT_HASH_COOKIE:
1264             r->cookie_field = i;
1265             break;
1266         };
1267 
1268         for (j = i + 1; j < r->fields_count; j++) {
1269             if (fields[i].hash != fields[j].hash) {
1270                 continue;
1271             }
1272 
1273             if (j == i + 1) {
1274                 continue;
1275             }
1276 
1277             f = fields[j];
1278             f.name.offset += (j - (i + 1)) * sizeof(f);
1279             f.value.offset += (j - (i + 1)) * sizeof(f);
1280 
1281             while (j > i + 1) {
1282                 fields[j] = fields[j - 1];
1283                 fields[j].name.offset -= sizeof(f);
1284                 fields[j].value.offset -= sizeof(f);
1285                 j--;
1286             }
1287 
1288             fields[j] = f;
1289 
1290             i++;
1291         }
1292     }
1293 }
1294 
1295 
1296 int
1297 nxt_unit_response_init(nxt_unit_request_info_t *req,
1298     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
1299 {
1300     uint32_t                      buf_size;
1301     nxt_unit_buf_t                *buf;
1302     nxt_unit_request_info_impl_t  *req_impl;
1303 
1304     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1305 
1306     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1307         nxt_unit_req_warn(req, "init: response already sent");
1308 
1309         return NXT_UNIT_ERROR;
1310     }
1311 
1312     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
1313                        (int) max_fields_count, (int) max_fields_size);
1314 
1315     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
1316         nxt_unit_req_debug(req, "duplicate response init");
1317     }
1318 
1319     buf_size = sizeof(nxt_unit_response_t)
1320                + max_fields_count * sizeof(nxt_unit_field_t)
1321                + max_fields_size;
1322 
1323     if (nxt_slow_path(req->response_buf != NULL)) {
1324         buf = req->response_buf;
1325 
1326         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
1327             goto init_response;
1328         }
1329 
1330         nxt_unit_buf_free(buf);
1331 
1332         req->response_buf = NULL;
1333         req->response = NULL;
1334         req->response_max_fields = 0;
1335 
1336         req_impl->state = NXT_UNIT_RS_START;
1337     }
1338 
1339     buf = nxt_unit_response_buf_alloc(req, buf_size);
1340     if (nxt_slow_path(buf == NULL)) {
1341         return NXT_UNIT_ERROR;
1342     }
1343 
1344 init_response:
1345 
1346     memset(buf->start, 0, sizeof(nxt_unit_response_t));
1347 
1348     req->response_buf = buf;
1349 
1350     req->response = (nxt_unit_response_t *) buf->start;
1351     req->response->status = status;
1352 
1353     buf->free = buf->start + sizeof(nxt_unit_response_t)
1354                 + max_fields_count * sizeof(nxt_unit_field_t);
1355 
1356     req->response_max_fields = max_fields_count;
1357     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
1358 
1359     return NXT_UNIT_OK;
1360 }
1361 
1362 
1363 int
1364 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
1365     uint32_t max_fields_count, uint32_t max_fields_size)
1366 {
1367     char                          *p;
1368     uint32_t                      i, buf_size;
1369     nxt_unit_buf_t                *buf;
1370     nxt_unit_field_t              *f, *src;
1371     nxt_unit_response_t           *resp;
1372     nxt_unit_request_info_impl_t  *req_impl;
1373 
1374     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1375 
1376     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1377         nxt_unit_req_warn(req, "realloc: response not init");
1378 
1379         return NXT_UNIT_ERROR;
1380     }
1381 
1382     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1383         nxt_unit_req_warn(req, "realloc: response already sent");
1384 
1385         return NXT_UNIT_ERROR;
1386     }
1387 
1388     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
1389         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
1390 
1391         return NXT_UNIT_ERROR;
1392     }
1393 
1394     buf_size = sizeof(nxt_unit_response_t)
1395                + max_fields_count * sizeof(nxt_unit_field_t)
1396                + max_fields_size;
1397 
1398     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
1399 
1400     buf = nxt_unit_response_buf_alloc(req, buf_size);
1401     if (nxt_slow_path(buf == NULL)) {
1402         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
1403         return NXT_UNIT_ERROR;
1404     }
1405 
1406     resp = (nxt_unit_response_t *) buf->start;
1407 
1408     memset(resp, 0, sizeof(nxt_unit_response_t));
1409 
1410     resp->status = req->response->status;
1411     resp->content_length = req->response->content_length;
1412 
1413     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
1414     f = resp->fields;
1415 
1416     for (i = 0; i < req->response->fields_count; i++) {
1417         src = req->response->fields + i;
1418 
1419         if (nxt_slow_path(src->skip != 0)) {
1420             continue;
1421         }
1422 
1423         if (nxt_slow_path(src->name_length + src->value_length + 2
1424                           > (uint32_t) (buf->end - p)))
1425         {
1426             nxt_unit_req_warn(req, "realloc: not enough space for field"
1427                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
1428                   i, src, src->name_length, src->value_length);
1429 
1430             goto fail;
1431         }
1432 
1433         nxt_unit_sptr_set(&f->name, p);
1434         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
1435         *p++ = '\0';
1436 
1437         nxt_unit_sptr_set(&f->value, p);
1438         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
1439         *p++ = '\0';
1440 
1441         f->hash = src->hash;
1442         f->skip = 0;
1443         f->name_length = src->name_length;
1444         f->value_length = src->value_length;
1445 
1446         resp->fields_count++;
1447         f++;
1448     }
1449 
1450     if (req->response->piggyback_content_length > 0) {
1451         if (nxt_slow_path(req->response->piggyback_content_length
1452                           > (uint32_t) (buf->end - p)))
1453         {
1454             nxt_unit_req_warn(req, "realloc: not enought space for content"
1455                   " #%"PRIu32", %"PRIu32" required",
1456                   i, req->response->piggyback_content_length);
1457 
1458             goto fail;
1459         }
1460 
1461         resp->piggyback_content_length = req->response->piggyback_content_length;
1462 
1463         nxt_unit_sptr_set(&resp->piggyback_content, p);
1464         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
1465                        req->response->piggyback_content_length);
1466     }
1467 
1468     buf->free = p;
1469 
1470     nxt_unit_buf_free(req->response_buf);
1471 
1472     req->response = resp;
1473     req->response_buf = buf;
1474     req->response_max_fields = max_fields_count;
1475 
1476     return NXT_UNIT_OK;
1477 
1478 fail:
1479 
1480     nxt_unit_buf_free(buf);
1481 
1482     return NXT_UNIT_ERROR;
1483 }
1484 
1485 
1486 int
1487 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
1488 {
1489     nxt_unit_request_info_impl_t  *req_impl;
1490 
1491     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1492 
1493     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
1494 }
1495 
1496 
1497 int
1498 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
1499     const char *name, uint8_t name_length,
1500     const char *value, uint32_t value_length)
1501 {
1502     nxt_unit_buf_t                *buf;
1503     nxt_unit_field_t              *f;
1504     nxt_unit_response_t           *resp;
1505     nxt_unit_request_info_impl_t  *req_impl;
1506 
1507     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1508 
1509     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
1510         nxt_unit_req_warn(req, "add_field: response not initialized or "
1511                           "already sent");
1512 
1513         return NXT_UNIT_ERROR;
1514     }
1515 
1516     resp = req->response;
1517 
1518     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
1519         nxt_unit_req_warn(req, "add_field: too many response fields");
1520 
1521         return NXT_UNIT_ERROR;
1522     }
1523 
1524     buf = req->response_buf;
1525 
1526     if (nxt_slow_path(name_length + value_length + 2
1527                       > (uint32_t) (buf->end - buf->free)))
1528     {
1529         nxt_unit_req_warn(req, "add_field: response buffer overflow");
1530 
1531         return NXT_UNIT_ERROR;
1532     }
1533 
1534     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
1535                        resp->fields_count,
1536                        (int) name_length, name,
1537                        (int) value_length, value);
1538 
1539     f = resp->fields + resp->fields_count;
1540 
1541     nxt_unit_sptr_set(&f->name, buf->free);
1542     buf->free = nxt_cpymem(buf->free, name, name_length);
1543     *buf->free++ = '\0';
1544 
1545     nxt_unit_sptr_set(&f->value, buf->free);
1546     buf->free = nxt_cpymem(buf->free, value, value_length);
1547     *buf->free++ = '\0';
1548 
1549     f->hash = nxt_unit_field_hash(name, name_length);
1550     f->skip = 0;
1551     f->name_length = name_length;
1552     f->value_length = value_length;
1553 
1554     resp->fields_count++;
1555 
1556     return NXT_UNIT_OK;
1557 }
1558 
1559 
1560 int
1561 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
1562     const void* src, uint32_t size)
1563 {
1564     nxt_unit_buf_t                *buf;
1565     nxt_unit_response_t           *resp;
1566     nxt_unit_request_info_impl_t  *req_impl;
1567 
1568     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1569 
1570     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1571         nxt_unit_req_warn(req, "add_content: response not initialized yet");
1572 
1573         return NXT_UNIT_ERROR;
1574     }
1575 
1576     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1577         nxt_unit_req_warn(req, "add_content: response already sent");
1578 
1579         return NXT_UNIT_ERROR;
1580     }
1581 
1582     buf = req->response_buf;
1583 
1584     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
1585         nxt_unit_req_warn(req, "add_content: buffer overflow");
1586 
1587         return NXT_UNIT_ERROR;
1588     }
1589 
1590     resp = req->response;
1591 
1592     if (resp->piggyback_content_length == 0) {
1593         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
1594         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
1595     }
1596 
1597     resp->piggyback_content_length += size;
1598 
1599     buf->free = nxt_cpymem(buf->free, src, size);
1600 
1601     return NXT_UNIT_OK;
1602 }
1603 
1604 
1605 int
1606 nxt_unit_response_send(nxt_unit_request_info_t *req)
1607 {
1608     int                           rc;
1609     nxt_unit_mmap_buf_t           *mmap_buf;
1610     nxt_unit_request_info_impl_t  *req_impl;
1611 
1612     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1613 
1614     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1615         nxt_unit_req_warn(req, "send: response is not initialized yet");
1616 
1617         return NXT_UNIT_ERROR;
1618     }
1619 
1620     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1621         nxt_unit_req_warn(req, "send: response already sent");
1622 
1623         return NXT_UNIT_ERROR;
1624     }
1625 
1626     if (req->request->websocket_handshake && req->response->status == 101) {
1627         nxt_unit_response_upgrade(req);
1628     }
1629 
1630     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
1631                        req->response->fields_count,
1632                        (int) (req->response_buf->free
1633                               - req->response_buf->start));
1634 
1635     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
1636 
1637     rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
1638     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
1639         req->response = NULL;
1640         req->response_buf = NULL;
1641         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
1642 
1643         nxt_unit_mmap_buf_release(mmap_buf);
1644     }
1645 
1646     return rc;
1647 }
1648 
1649 
1650 int
1651 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
1652 {
1653     nxt_unit_request_info_impl_t  *req_impl;
1654 
1655     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1656 
1657     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
1658 }
1659 
1660 
1661 nxt_unit_buf_t *
1662 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
1663 {
1664     int                           rc;
1665     nxt_unit_mmap_buf_t           *mmap_buf;
1666     nxt_unit_request_info_impl_t  *req_impl;
1667 
1668     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
1669         nxt_unit_req_warn(req, "response_buf_alloc: "
1670                           "requested buffer (%"PRIu32") too big", size);
1671 
1672         return NULL;
1673     }
1674 
1675     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
1676 
1677     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1678 
1679     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
1680     if (nxt_slow_path(mmap_buf == NULL)) {
1681         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
1682 
1683         return NULL;
1684     }
1685 
1686     mmap_buf->req = req;
1687 
1688     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
1689 
1690     rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
1691                                    &req->response_port, size, mmap_buf);
1692     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1693         nxt_unit_mmap_buf_release(mmap_buf);
1694 
1695         return NULL;
1696     }
1697 
1698     return &mmap_buf->buf;
1699 }
1700 
1701 
1702 static nxt_unit_process_t *
1703 nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1704 {
1705     nxt_unit_impl_t  *lib;
1706 
1707     if (recv_msg->process != NULL) {
1708         return recv_msg->process;
1709     }
1710 
1711     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1712 
1713     pthread_mutex_lock(&lib->mutex);
1714 
1715     recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0);
1716 
1717     pthread_mutex_unlock(&lib->mutex);
1718 
1719     if (recv_msg->process == NULL) {
1720         nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
1721                       recv_msg->stream, (int) recv_msg->pid);
1722     }
1723 
1724     return recv_msg->process;
1725 }
1726 
1727 
1728 static nxt_unit_mmap_buf_t *
1729 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
1730 {
1731     nxt_unit_mmap_buf_t  *mmap_buf;
1732     nxt_unit_ctx_impl_t  *ctx_impl;
1733 
1734     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1735 
1736     pthread_mutex_lock(&ctx_impl->mutex);
1737 
1738     if (ctx_impl->free_buf == NULL) {
1739         pthread_mutex_unlock(&ctx_impl->mutex);
1740 
1741         mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
1742         if (nxt_slow_path(mmap_buf == NULL)) {
1743             return NULL;
1744         }
1745 
1746     } else {
1747         mmap_buf = ctx_impl->free_buf;
1748 
1749         nxt_unit_mmap_buf_remove(mmap_buf);
1750 
1751         pthread_mutex_unlock(&ctx_impl->mutex);
1752     }
1753 
1754     mmap_buf->ctx_impl = ctx_impl;
1755 
1756     return mmap_buf;
1757 }
1758 
1759 
1760 static void
1761 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
1762 {
1763     nxt_unit_mmap_buf_remove(mmap_buf);
1764 
1765     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
1766 
1767     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
1768 
1769     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
1770 }
1771 
1772 
1773 typedef struct {
1774     size_t      len;
1775     const char  *str;
1776 } nxt_unit_str_t;
1777 
1778 
1779 #define nxt_unit_str(str)  { nxt_length(str), str }
1780 
1781 
1782 int
1783 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
1784 {
1785     return req->request->websocket_handshake;
1786 }
1787 
1788 
1789 int
1790 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
1791 {
1792     int                           rc;
1793     nxt_unit_ctx_impl_t           *ctx_impl;
1794     nxt_unit_request_info_impl_t  *req_impl;
1795 
1796     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1797 
1798     if (nxt_slow_path(req_impl->websocket != 0)) {
1799         nxt_unit_req_debug(req, "upgrade: already upgraded");
1800 
1801         return NXT_UNIT_OK;
1802     }
1803 
1804     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1805         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
1806 
1807         return NXT_UNIT_ERROR;
1808     }
1809 
1810     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1811         nxt_unit_req_warn(req, "upgrade: response already sent");
1812 
1813         return NXT_UNIT_ERROR;
1814     }
1815 
1816     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1817 
1818     rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
1819     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1820         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
1821 
1822         return NXT_UNIT_ERROR;
1823     }
1824 
1825     req_impl->websocket = 1;
1826 
1827     req->response->status = 101;
1828 
1829     return NXT_UNIT_OK;
1830 }
1831 
1832 
1833 int
1834 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
1835 {
1836     nxt_unit_request_info_impl_t  *req_impl;
1837 
1838     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1839 
1840     return req_impl->websocket;
1841 }
1842 
1843 
1844 nxt_unit_request_info_t *
1845 nxt_unit_get_request_info_from_data(void *data)
1846 {
1847     nxt_unit_request_info_impl_t  *req_impl;
1848 
1849     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
1850 
1851     return &req_impl->req;
1852 }
1853 
1854 
1855 int
1856 nxt_unit_buf_send(nxt_unit_buf_t *buf)
1857 {
1858     int                           rc;
1859     nxt_unit_mmap_buf_t           *mmap_buf;
1860     nxt_unit_request_info_t       *req;
1861     nxt_unit_request_info_impl_t  *req_impl;
1862 
1863     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1864 
1865     req = mmap_buf->req;
1866     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1867 
1868     nxt_unit_req_debug(req, "buf_send: %d bytes",
1869                        (int) (buf->free - buf->start));
1870 
1871     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1872         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
1873 
1874         return NXT_UNIT_ERROR;
1875     }
1876 
1877     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
1878         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
1879 
1880         return NXT_UNIT_ERROR;
1881     }
1882 
1883     if (nxt_fast_path(buf->free > buf->start)) {
1884         rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
1885         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1886             return rc;
1887         }
1888     }
1889 
1890     nxt_unit_mmap_buf_release(mmap_buf);
1891 
1892     return NXT_UNIT_OK;
1893 }
1894 
1895 
1896 static void
1897 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
1898 {
1899     int                           rc;
1900     nxt_unit_mmap_buf_t           *mmap_buf;
1901     nxt_unit_request_info_t       *req;
1902     nxt_unit_request_info_impl_t  *req_impl;
1903 
1904     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1905 
1906     req = mmap_buf->req;
1907     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1908 
1909     rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
1910     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
1911         nxt_unit_mmap_buf_release(mmap_buf);
1912 
1913         nxt_unit_request_info_release(req);
1914 
1915     } else {
1916         nxt_unit_request_done(req, rc);
1917     }
1918 }
1919 
1920 
1921 static int
1922 nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
1923     nxt_unit_mmap_buf_t *mmap_buf, int last)
1924 {
1925     struct {
1926         nxt_port_msg_t       msg;
1927         nxt_port_mmap_msg_t  mmap_msg;
1928     } m;
1929 
1930     u_char                   *end, *last_used, *first_free;
1931     ssize_t                  res;
1932     nxt_chunk_id_t           first_free_chunk;
1933     nxt_unit_buf_t           *buf;
1934     nxt_unit_impl_t          *lib;
1935     nxt_port_mmap_header_t   *hdr;
1936 
1937     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1938 
1939     buf = &mmap_buf->buf;
1940     hdr = mmap_buf->hdr;
1941 
1942     m.mmap_msg.size = buf->free - buf->start;
1943 
1944     m.msg.stream = stream;
1945     m.msg.pid = lib->pid;
1946     m.msg.reply_port = 0;
1947     m.msg.type = _NXT_PORT_MSG_DATA;
1948     m.msg.last = last != 0;
1949     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
1950     m.msg.nf = 0;
1951     m.msg.mf = 0;
1952     m.msg.tracking = 0;
1953 
1954     if (hdr != NULL) {
1955         m.mmap_msg.mmap_id = hdr->id;
1956         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start);
1957     }
1958 
1959     nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
1960                    stream,
1961                    (int) m.mmap_msg.mmap_id,
1962                    (int) m.mmap_msg.chunk_id,
1963                    (int) m.mmap_msg.size);
1964 
1965     res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m,
1966                                    m.msg.mmap ? sizeof(m) : sizeof(m.msg),
1967                                    NULL, 0);
1968     if (nxt_slow_path(res != sizeof(m))) {
1969         return NXT_UNIT_ERROR;
1970     }
1971 
1972     if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) {
1973         last_used = (u_char *) buf->free - 1;
1974 
1975         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
1976         first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
1977         end = (u_char *) buf->end;
1978 
1979         nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free));
1980 
1981         buf->end = (char *) first_free;
1982     }
1983 
1984     return NXT_UNIT_OK;
1985 }
1986 
1987 
1988 void
1989 nxt_unit_buf_free(nxt_unit_buf_t *buf)
1990 {
1991     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
1992 }
1993 
1994 
1995 static void
1996 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
1997 {
1998     if (nxt_fast_path(mmap_buf->hdr != NULL)) {
1999         nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start,
2000                               mmap_buf->buf.end - mmap_buf->buf.start);
2001     }
2002 
2003     nxt_unit_mmap_buf_release(mmap_buf);
2004 }
2005 
2006 
2007 nxt_unit_buf_t *
2008 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2009 {
2010     nxt_unit_mmap_buf_t  *mmap_buf;
2011 
2012     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2013 
2014     if (mmap_buf->next == NULL) {
2015         return NULL;
2016     }
2017 
2018     return &mmap_buf->next->buf;
2019 }
2020 
2021 
2022 uint32_t
2023 nxt_unit_buf_max(void)
2024 {
2025     return PORT_MMAP_DATA_SIZE;
2026 }
2027 
2028 
2029 uint32_t
2030 nxt_unit_buf_min(void)
2031 {
2032     return PORT_MMAP_CHUNK_SIZE;
2033 }
2034 
2035 
2036 int
2037 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2038     size_t size)
2039 {
2040     int                           rc;
2041     uint32_t                      part_size;
2042     const char                    *part_start;
2043     nxt_unit_mmap_buf_t           mmap_buf;
2044     nxt_unit_request_info_impl_t  *req_impl;
2045 
2046     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2047 
2048     part_start = start;
2049 
2050     /* Check if response is not send yet. */
2051     if (nxt_slow_path(req->response_buf)) {
2052         part_size = req->response_buf->end - req->response_buf->free;
2053         part_size = nxt_min(size, part_size);
2054 
2055         rc = nxt_unit_response_add_content(req, part_start, part_size);
2056         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2057             return rc;
2058         }
2059 
2060         rc = nxt_unit_response_send(req);
2061         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2062             return rc;
2063         }
2064 
2065         size -= part_size;
2066         part_start += part_size;
2067     }
2068 
2069     while (size > 0) {
2070         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2071 
2072         rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
2073                                        &req->response_port, part_size,
2074                                        &mmap_buf);
2075         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2076             return rc;
2077         }
2078 
2079         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2080                                        part_start, part_size);
2081 
2082         rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
2083         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2084             nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start,
2085                                   mmap_buf.buf.end - mmap_buf.buf.start);
2086 
2087             return rc;
2088         }
2089 
2090         size -= part_size;
2091         part_start += part_size;
2092     }
2093 
2094     return NXT_UNIT_OK;
2095 }
2096 
2097 
2098 int
2099 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2100     nxt_unit_read_info_t *read_info)
2101 {
2102     int             rc;
2103     ssize_t         n;
2104     nxt_unit_buf_t  *buf;
2105 
2106     /* Check if response is not send yet. */
2107     if (nxt_slow_path(req->response_buf)) {
2108 
2109         /* Enable content in headers buf. */
2110         rc = nxt_unit_response_add_content(req, "", 0);
2111         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2112             nxt_unit_req_error(req, "Failed to add piggyback content");
2113 
2114             return rc;
2115         }
2116 
2117         buf = req->response_buf;
2118 
2119         while (buf->end - buf->free > 0) {
2120             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2121             if (nxt_slow_path(n < 0)) {
2122                 nxt_unit_req_error(req, "Read error");
2123 
2124                 return NXT_UNIT_ERROR;
2125             }
2126 
2127             /* Manually increase sizes. */
2128             buf->free += n;
2129             req->response->piggyback_content_length += n;
2130 
2131             if (read_info->eof) {
2132                 break;
2133             }
2134         }
2135 
2136         rc = nxt_unit_response_send(req);
2137         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2138             nxt_unit_req_error(req, "Failed to send headers with content");
2139 
2140             return rc;
2141         }
2142 
2143         if (read_info->eof) {
2144             return NXT_UNIT_OK;
2145         }
2146     }
2147 
2148     while (!read_info->eof) {
2149         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
2150                            read_info->buf_size);
2151 
2152         buf = nxt_unit_response_buf_alloc(req, nxt_min(read_info->buf_size,
2153                                                        PORT_MMAP_DATA_SIZE));
2154         if (nxt_slow_path(buf == NULL)) {
2155             nxt_unit_req_error(req, "Failed to allocate buf for content");
2156 
2157             return NXT_UNIT_ERROR;
2158         }
2159 
2160         while (!read_info->eof && buf->end > buf->free) {
2161             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2162             if (nxt_slow_path(n < 0)) {
2163                 nxt_unit_req_error(req, "Read error");
2164 
2165                 nxt_unit_buf_free(buf);
2166 
2167                 return NXT_UNIT_ERROR;
2168             }
2169 
2170             buf->free += n;
2171         }
2172 
2173         rc = nxt_unit_buf_send(buf);
2174         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2175             nxt_unit_req_error(req, "Failed to send content");
2176 
2177             return rc;
2178         }
2179     }
2180 
2181     return NXT_UNIT_OK;
2182 }
2183 
2184 
2185 ssize_t
2186 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
2187 {
2188     return nxt_unit_buf_read(&req->content_buf, &req->content_length,
2189                              dst, size);
2190 }
2191 
2192 
2193 static ssize_t
2194 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
2195 {
2196     u_char          *p;
2197     size_t          rest, copy, read;
2198     nxt_unit_buf_t  *buf;
2199 
2200     p = dst;
2201     rest = size;
2202 
2203     buf = *b;
2204 
2205     while (buf != NULL) {
2206         copy = buf->end - buf->free;
2207         copy = nxt_min(rest, copy);
2208 
2209         p = nxt_cpymem(p, buf->free, copy);
2210 
2211         buf->free += copy;
2212         rest -= copy;
2213 
2214         if (rest == 0) {
2215             if (buf->end == buf->free) {
2216                 buf = nxt_unit_buf_next(buf);
2217             }
2218 
2219             break;
2220         }
2221 
2222         buf = nxt_unit_buf_next(buf);
2223     }
2224 
2225     *b = buf;
2226 
2227     read = size - rest;
2228 
2229     *len -= read;
2230 
2231     return read;
2232 }
2233 
2234 
2235 void
2236 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
2237 {
2238     ssize_t                       res;
2239     uint32_t                      size;
2240     nxt_port_msg_t                msg;
2241     nxt_unit_impl_t               *lib;
2242     nxt_unit_request_info_impl_t  *req_impl;
2243 
2244     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2245 
2246     nxt_unit_req_debug(req, "done: %d", rc);
2247 
2248     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2249         goto skip_response_send;
2250     }
2251 
2252     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2253 
2254         size = nxt_length("Content-Type") + nxt_length("text/plain");
2255 
2256         rc = nxt_unit_response_init(req, 200, 1, size);
2257         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2258             goto skip_response_send;
2259         }
2260 
2261         rc = nxt_unit_response_add_field(req, "Content-Type",
2262                                    nxt_length("Content-Type"),
2263                                    "text/plain", nxt_length("text/plain"));
2264         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2265             goto skip_response_send;
2266         }
2267     }
2268 
2269     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2270 
2271         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2272 
2273         nxt_unit_buf_send_done(req->response_buf);
2274 
2275         return;
2276     }
2277 
2278 skip_response_send:
2279 
2280     lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
2281 
2282     msg.stream = req_impl->stream;
2283     msg.pid = lib->pid;
2284     msg.reply_port = 0;
2285     msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
2286                                    : _NXT_PORT_MSG_RPC_ERROR;
2287     msg.last = 1;
2288     msg.mmap = 0;
2289     msg.nf = 0;
2290     msg.mf = 0;
2291     msg.tracking = 0;
2292 
2293     res = lib->callbacks.port_send(req->ctx, &req->response_port,
2294                                    &msg, sizeof(msg), NULL, 0);
2295     if (nxt_slow_path(res != sizeof(msg))) {
2296         nxt_unit_req_alert(req, "last message send failed: %s (%d)",
2297                            strerror(errno), errno);
2298     }
2299 
2300     nxt_unit_request_info_release(req);
2301 }
2302 
2303 
2304 int
2305 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
2306     uint8_t last, const void *start, size_t size)
2307 {
2308     const struct iovec  iov = { (void *) start, size };
2309 
2310     return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
2311 }
2312 
2313 
2314 int
2315 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
2316     uint8_t last, const struct iovec *iov, int iovcnt)
2317 {
2318     int                     i, rc;
2319     size_t                  l, copy;
2320     uint32_t                payload_len, buf_size;
2321     const uint8_t           *b;
2322     nxt_unit_buf_t          *buf;
2323     nxt_websocket_header_t  *wh;
2324 
2325     payload_len = 0;
2326 
2327     for (i = 0; i < iovcnt; i++) {
2328         payload_len += iov[i].iov_len;
2329     }
2330 
2331     buf_size = 10 + payload_len;
2332 
2333     buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
2334                                                    PORT_MMAP_DATA_SIZE));
2335     if (nxt_slow_path(buf == NULL)) {
2336         nxt_unit_req_error(req, "Failed to allocate buf for content");
2337 
2338         return NXT_UNIT_ERROR;
2339     }
2340 
2341     buf->start[0] = 0;
2342     buf->start[1] = 0;
2343 
2344     wh = (void *) buf->free;
2345 
2346     buf->free = nxt_websocket_frame_init(wh, payload_len);
2347     wh->fin = last;
2348     wh->opcode = opcode;
2349 
2350     for (i = 0; i < iovcnt; i++) {
2351         b = iov[i].iov_base;
2352         l = iov[i].iov_len;
2353 
2354         while (l > 0) {
2355             copy = buf->end - buf->free;
2356             copy = nxt_min(l, copy);
2357 
2358             buf->free = nxt_cpymem(buf->free, b, copy);
2359             b += copy;
2360             l -= copy;
2361 
2362             if (l > 0) {
2363                 buf_size -= buf->end - buf->start;
2364 
2365                 rc = nxt_unit_buf_send(buf);
2366                 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2367                     nxt_unit_req_error(req, "Failed to send content");
2368 
2369                     return NXT_UNIT_ERROR;
2370                 }
2371 
2372                 buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
2373                                                           PORT_MMAP_DATA_SIZE));
2374                 if (nxt_slow_path(buf == NULL)) {
2375                     nxt_unit_req_error(req,
2376                                        "Failed to allocate buf for content");
2377 
2378                     return NXT_UNIT_ERROR;
2379                 }
2380             }
2381         }
2382     }
2383 
2384     if (buf->free > buf->start) {
2385         rc = nxt_unit_buf_send(buf);
2386         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2387             nxt_unit_req_error(req, "Failed to send content");
2388         }
2389     }
2390 
2391     return rc;
2392 }
2393 
2394 
2395 ssize_t
2396 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
2397     size_t size)
2398 {
2399     ssize_t   res;
2400     uint8_t   *b;
2401     uint64_t  i, d;
2402 
2403     res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
2404                             dst, size);
2405 
2406     if (ws->mask == NULL) {
2407         return res;
2408     }
2409 
2410     b = dst;
2411     d = (ws->payload_len - ws->content_length - res) % 4;
2412 
2413     for (i = 0; i < (uint64_t) res; i++) {
2414         b[i] ^= ws->mask[ (i + d) % 4 ];
2415     }
2416 
2417     return res;
2418 }
2419 
2420 
2421 int
2422 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
2423 {
2424     char                             *b;
2425     size_t                           size;
2426     nxt_unit_websocket_frame_impl_t  *ws_impl;
2427 
2428     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
2429 
2430     if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) {
2431         return NXT_UNIT_OK;
2432     }
2433 
2434     size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
2435 
2436     b = malloc(size);
2437     if (nxt_slow_path(b == NULL)) {
2438         return NXT_UNIT_ERROR;
2439     }
2440 
2441     memcpy(b, ws_impl->buf->buf.start, size);
2442 
2443     ws_impl->buf->buf.start = b;
2444     ws_impl->buf->buf.free = b;
2445     ws_impl->buf->buf.end = b + size;
2446 
2447     ws_impl->retain_buf = b;
2448 
2449     return NXT_UNIT_OK;
2450 }
2451 
2452 
2453 void
2454 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
2455 {
2456     nxt_unit_websocket_frame_release(ws);
2457 }
2458 
2459 
2460 static nxt_port_mmap_header_t *
2461 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2462     nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n)
2463 {
2464     int                     res, nchunks, i;
2465     nxt_unit_mmap_t         *mm, *mm_end;
2466     nxt_port_mmap_header_t  *hdr;
2467 
2468     pthread_mutex_lock(&process->outgoing.mutex);
2469 
2470     mm_end = process->outgoing.elts + process->outgoing.size;
2471 
2472     for (mm = process->outgoing.elts; mm < mm_end; mm++) {
2473         hdr = mm->hdr;
2474 
2475         if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) {
2476             continue;
2477         }
2478 
2479         *c = 0;
2480 
2481         while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
2482             nchunks = 1;
2483 
2484             while (nchunks < n) {
2485                 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
2486                                                        *c + nchunks);
2487 
2488                 if (res == 0) {
2489                     for (i = 0; i < nchunks; i++) {
2490                         nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
2491                     }
2492 
2493                     *c += nchunks + 1;
2494                     nchunks = 0;
2495                     break;
2496                 }
2497 
2498                 nchunks++;
2499             }
2500 
2501             if (nchunks == n) {
2502                 goto unlock;
2503             }
2504         }
2505     }
2506 
2507     *c = 0;
2508     hdr = nxt_unit_new_mmap(ctx, process, port_id, n);
2509 
2510 unlock:
2511 
2512     pthread_mutex_unlock(&process->outgoing.mutex);
2513 
2514     return hdr;
2515 }
2516 
2517 
2518 static nxt_unit_mmap_t *
2519 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
2520 {
2521     uint32_t  cap;
2522 
2523     cap = mmaps->cap;
2524 
2525     if (cap == 0) {
2526         cap = i + 1;
2527     }
2528 
2529     while (i + 1 > cap) {
2530 
2531         if (cap < 16) {
2532             cap = cap * 2;
2533 
2534         } else {
2535             cap = cap + cap / 2;
2536         }
2537     }
2538 
2539     if (cap != mmaps->cap) {
2540 
2541         mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
2542         if (nxt_slow_path(mmaps->elts == NULL)) {
2543             return NULL;
2544         }
2545 
2546         memset(mmaps->elts + mmaps->cap, 0,
2547                sizeof(*mmaps->elts) * (cap - mmaps->cap));
2548 
2549         mmaps->cap = cap;
2550     }
2551 
2552     if (i + 1 > mmaps->size) {
2553         mmaps->size = i + 1;
2554     }
2555 
2556     return mmaps->elts + i;
2557 }
2558 
2559 
2560 static nxt_port_mmap_header_t *
2561 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2562     nxt_unit_port_id_t *port_id, int n)
2563 {
2564     int                     i, fd, rc;
2565     void                    *mem;
2566     char                    name[64];
2567     nxt_unit_mmap_t         *mm;
2568     nxt_unit_impl_t         *lib;
2569     nxt_port_mmap_header_t  *hdr;
2570 
2571     lib = process->lib;
2572 
2573     mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
2574     if (nxt_slow_path(mm == NULL)) {
2575         nxt_unit_warn(ctx, "failed to add mmap to outgoing array");
2576 
2577         return NULL;
2578     }
2579 
2580     snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
2581              lib->pid, (void *) pthread_self());
2582 
2583 #if (NXT_HAVE_MEMFD_CREATE)
2584 
2585     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
2586     if (nxt_slow_path(fd == -1)) {
2587         nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
2588                        strerror(errno), errno);
2589 
2590         goto remove_fail;
2591     }
2592 
2593     nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
2594 
2595 #elif (NXT_HAVE_SHM_OPEN_ANON)
2596 
2597     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
2598     if (nxt_slow_path(fd == -1)) {
2599         nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
2600                        strerror(errno), errno);
2601 
2602         goto remove_fail;
2603     }
2604 
2605 #elif (NXT_HAVE_SHM_OPEN)
2606 
2607     /* Just in case. */
2608     shm_unlink(name);
2609 
2610     fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
2611     if (nxt_slow_path(fd == -1)) {
2612         nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
2613                        strerror(errno), errno);
2614 
2615         goto remove_fail;
2616     }
2617 
2618     if (nxt_slow_path(shm_unlink(name) == -1)) {
2619         nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
2620                       strerror(errno), errno);
2621     }
2622 
2623 #else
2624 
2625 #error No working shared memory implementation.
2626 
2627 #endif
2628 
2629     if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
2630         nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
2631                        strerror(errno), errno);
2632 
2633         goto remove_fail;
2634     }
2635 
2636     mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2637     if (nxt_slow_path(mem == MAP_FAILED)) {
2638         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
2639                        strerror(errno), errno);
2640 
2641         goto remove_fail;
2642     }
2643 
2644     mm->hdr = mem;
2645     hdr = mem;
2646 
2647     memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
2648     memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
2649 
2650     hdr->id = process->outgoing.size - 1;
2651     hdr->src_pid = lib->pid;
2652     hdr->dst_pid = process->pid;
2653     hdr->sent_over = port_id->id;
2654 
2655     /* Mark first n chunk(s) as busy */
2656     for (i = 0; i < n; i++) {
2657         nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
2658     }
2659 
2660     /* Mark as busy chunk followed the last available chunk. */
2661     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
2662     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
2663 
2664     pthread_mutex_unlock(&process->outgoing.mutex);
2665 
2666     rc = nxt_unit_send_mmap(ctx, port_id, fd);
2667     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2668         munmap(mem, PORT_MMAP_SIZE);
2669         hdr = NULL;
2670 
2671     } else {
2672         nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
2673                        hdr->id, (int) lib->pid, (int) process->pid);
2674     }
2675 
2676     close(fd);
2677 
2678     pthread_mutex_lock(&process->outgoing.mutex);
2679 
2680     if (nxt_fast_path(hdr != NULL)) {
2681         return hdr;
2682     }
2683 
2684 remove_fail:
2685 
2686     process->outgoing.size--;
2687 
2688     return NULL;
2689 }
2690 
2691 
2692 static int
2693 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
2694 {
2695     ssize_t          res;
2696     nxt_port_msg_t   msg;
2697     nxt_unit_impl_t  *lib;
2698     union {
2699         struct cmsghdr  cm;
2700         char            space[CMSG_SPACE(sizeof(int))];
2701     } cmsg;
2702 
2703     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2704 
2705     msg.stream = 0;
2706     msg.pid = lib->pid;
2707     msg.reply_port = 0;
2708     msg.type = _NXT_PORT_MSG_MMAP;
2709     msg.last = 0;
2710     msg.mmap = 0;
2711     msg.nf = 0;
2712     msg.mf = 0;
2713     msg.tracking = 0;
2714 
2715     /*
2716      * Fill all padding fields with 0.
2717      * Code in Go 1.11 validate cmsghdr using padding field as part of len.
2718      * See Cmsghdr definition and socketControlMessageHeaderAndData function.
2719      */
2720     memset(&cmsg, 0, sizeof(cmsg));
2721 
2722     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
2723     cmsg.cm.cmsg_level = SOL_SOCKET;
2724     cmsg.cm.cmsg_type = SCM_RIGHTS;
2725 
2726     /*
2727      * memcpy() is used instead of simple
2728      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
2729      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
2730      *   dereferencing type-punned pointer will break strict-aliasing rules
2731      *
2732      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
2733      * in the same simple assignment as in the code above.
2734      */
2735     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
2736 
2737     res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg),
2738                                    &cmsg, sizeof(cmsg));
2739     if (nxt_slow_path(res != sizeof(msg))) {
2740         nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)",
2741                       (int) port_id->pid, strerror(errno), errno);
2742 
2743         return NXT_UNIT_ERROR;
2744     }
2745 
2746     return NXT_UNIT_OK;
2747 }
2748 
2749 
2750 static int
2751 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2752     nxt_unit_port_id_t *port_id, uint32_t size,
2753     nxt_unit_mmap_buf_t *mmap_buf)
2754 {
2755     uint32_t                nchunks;
2756     nxt_chunk_id_t          c;
2757     nxt_port_mmap_header_t  *hdr;
2758 
2759     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
2760 
2761     hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks);
2762     if (nxt_slow_path(hdr == NULL)) {
2763         return NXT_UNIT_ERROR;
2764     }
2765 
2766     mmap_buf->hdr = hdr;
2767     mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
2768     mmap_buf->buf.free = mmap_buf->buf.start;
2769     mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
2770     mmap_buf->port_id = *port_id;
2771 
2772     nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
2773                   (int) hdr->id, (int) c,
2774                   (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
2775 
2776     return NXT_UNIT_OK;
2777 }
2778 
2779 
2780 static int
2781 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
2782 {
2783     int                      rc;
2784     void                     *mem;
2785     struct stat              mmap_stat;
2786     nxt_unit_mmap_t          *mm;
2787     nxt_unit_impl_t          *lib;
2788     nxt_unit_process_t       *process;
2789     nxt_port_mmap_header_t   *hdr;
2790 
2791     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2792 
2793     nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
2794 
2795     pthread_mutex_lock(&lib->mutex);
2796 
2797     process = nxt_unit_process_find(ctx, pid, 0);
2798 
2799     pthread_mutex_unlock(&lib->mutex);
2800 
2801     if (nxt_slow_path(process == NULL)) {
2802         nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d",
2803                       (int) pid, fd);
2804 
2805         return NXT_UNIT_ERROR;
2806     }
2807 
2808     rc = NXT_UNIT_ERROR;
2809 
2810     if (fstat(fd, &mmap_stat) == -1) {
2811         nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
2812                       strerror(errno), errno);
2813 
2814         goto fail;
2815     }
2816 
2817     mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
2818                MAP_SHARED, fd, 0);
2819     if (nxt_slow_path(mem == MAP_FAILED)) {
2820         nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
2821                       strerror(errno), errno);
2822 
2823         goto fail;
2824     }
2825 
2826     hdr = mem;
2827 
2828     if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {
2829 
2830         nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
2831                       "detected: %d != %d or %d != %d", (int) hdr->src_pid,
2832                       (int) pid, (int) hdr->dst_pid, (int) lib->pid);
2833 
2834         munmap(mem, PORT_MMAP_SIZE);
2835 
2836         goto fail;
2837     }
2838 
2839     pthread_mutex_lock(&process->incoming.mutex);
2840 
2841     mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
2842     if (nxt_slow_path(mm == NULL)) {
2843         nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");
2844 
2845         munmap(mem, PORT_MMAP_SIZE);
2846 
2847     } else {
2848         mm->hdr = hdr;
2849 
2850         hdr->sent_over = 0xFFFFu;
2851 
2852         rc = NXT_UNIT_OK;
2853     }
2854 
2855     pthread_mutex_unlock(&process->incoming.mutex);
2856 
2857 fail:
2858 
2859     nxt_unit_process_use(ctx, process, -1);
2860 
2861     return rc;
2862 }
2863 
2864 
2865 static void
2866 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
2867 {
2868     pthread_mutex_init(&mmaps->mutex, NULL);
2869 
2870     mmaps->size = 0;
2871     mmaps->cap = 0;
2872     mmaps->elts = NULL;
2873 }
2874 
2875 
2876 static void
2877 nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i)
2878 {
2879     long c;
2880 
2881     c = nxt_atomic_fetch_add(&process->use_count, i);
2882 
2883     if (i < 0 && c == -i) {
2884         nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid);
2885 
2886         nxt_unit_mmaps_destroy(&process->incoming);
2887         nxt_unit_mmaps_destroy(&process->outgoing);
2888 
2889         free(process);
2890     }
2891 }
2892 
2893 
2894 static void
2895 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
2896 {
2897     nxt_unit_mmap_t  *mm, *end;
2898 
2899     if (mmaps->elts != NULL) {
2900         end = mmaps->elts + mmaps->size;
2901 
2902         for (mm = mmaps->elts; mm < end; mm++) {
2903             munmap(mm->hdr, PORT_MMAP_SIZE);
2904         }
2905 
2906         free(mmaps->elts);
2907     }
2908 
2909     pthread_mutex_destroy(&mmaps->mutex);
2910 }
2911 
2912 
2913 static nxt_port_mmap_header_t *
2914 nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2915     uint32_t id)
2916 {
2917     nxt_port_mmap_header_t  *hdr;
2918 
2919     if (nxt_fast_path(process->incoming.size > id)) {
2920         hdr = process->incoming.elts[id].hdr;
2921 
2922     } else {
2923         hdr = NULL;
2924     }
2925 
2926     return hdr;
2927 }
2928 
2929 
2930 static int
2931 nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
2932 {
2933     int                           rc;
2934     nxt_chunk_id_t                c;
2935     nxt_unit_process_t            *process;
2936     nxt_port_mmap_header_t        *hdr;
2937     nxt_port_mmap_tracking_msg_t  *tracking_msg;
2938 
2939     if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
2940         nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
2941                       recv_msg->stream, (int) recv_msg->size);
2942 
2943         return 0;
2944     }
2945 
2946     tracking_msg = recv_msg->start;
2947 
2948     recv_msg->start = tracking_msg + 1;
2949     recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
2950 
2951     process = nxt_unit_msg_get_process(ctx, recv_msg);
2952     if (nxt_slow_path(process == NULL)) {
2953         return 0;
2954     }
2955 
2956     pthread_mutex_lock(&process->incoming.mutex);
2957 
2958     hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
2959     if (nxt_slow_path(hdr == NULL)) {
2960         pthread_mutex_unlock(&process->incoming.mutex);
2961 
2962         nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
2963                       "invalid mmap id %d,%"PRIu32,
2964                       recv_msg->stream, (int) process->pid,
2965                       tracking_msg->mmap_id);
2966 
2967         return 0;
2968     }
2969 
2970     c = tracking_msg->tracking_id;
2971     rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
2972 
2973     if (rc == 0) {
2974         nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
2975                        recv_msg->stream);
2976 
2977         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
2978     }
2979 
2980     pthread_mutex_unlock(&process->incoming.mutex);
2981 
2982     return rc;
2983 }
2984 
2985 
2986 static int
2987 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
2988 {
2989     void                    *start;
2990     uint32_t                size;
2991     nxt_unit_process_t      *process;
2992     nxt_unit_mmap_buf_t     *b, **incoming_tail;
2993     nxt_port_mmap_msg_t     *mmap_msg, *end;
2994     nxt_port_mmap_header_t  *hdr;
2995 
2996     if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
2997         nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
2998                       recv_msg->stream, (int) recv_msg->size);
2999 
3000         return NXT_UNIT_ERROR;
3001     }
3002 
3003     process = nxt_unit_msg_get_process(ctx, recv_msg);
3004     if (nxt_slow_path(process == NULL)) {
3005         return NXT_UNIT_ERROR;
3006     }
3007 
3008     mmap_msg = recv_msg->start;
3009     end = nxt_pointer_to(recv_msg->start, recv_msg->size);
3010 
3011     incoming_tail = &recv_msg->incoming_buf;
3012 
3013     pthread_mutex_lock(&process->incoming.mutex);
3014 
3015     for (; mmap_msg < end; mmap_msg++) {
3016         hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
3017         if (nxt_slow_path(hdr == NULL)) {
3018             pthread_mutex_unlock(&process->incoming.mutex);
3019 
3020             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
3021                           "invalid mmap id %d,%"PRIu32,
3022                           recv_msg->stream, (int) process->pid,
3023                           mmap_msg->mmap_id);
3024 
3025             return NXT_UNIT_ERROR;
3026         }
3027 
3028         start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
3029         size = mmap_msg->size;
3030 
3031         if (recv_msg->start == mmap_msg) {
3032             recv_msg->start = start;
3033             recv_msg->size = size;
3034         }
3035 
3036         b = nxt_unit_mmap_buf_get(ctx);
3037         if (nxt_slow_path(b == NULL)) {
3038             pthread_mutex_unlock(&process->incoming.mutex);
3039 
3040             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
3041                           recv_msg->stream);
3042 
3043             nxt_unit_mmap_release(hdr, start, size);
3044 
3045             return NXT_UNIT_ERROR;
3046         }
3047 
3048         nxt_unit_mmap_buf_insert(incoming_tail, b);
3049         incoming_tail = &b->next;
3050 
3051         b->buf.start = start;
3052         b->buf.free = start;
3053         b->buf.end = b->buf.start + size;
3054         b->hdr = hdr;
3055 
3056         nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
3057                        recv_msg->stream,
3058                        start, (int) size,
3059                        (int) hdr->src_pid, (int) hdr->dst_pid,
3060                        (int) hdr->id, (int) mmap_msg->chunk_id,
3061                        (int) mmap_msg->size);
3062     }
3063 
3064     pthread_mutex_unlock(&process->incoming.mutex);
3065 
3066     return NXT_UNIT_OK;
3067 }
3068 
3069 
3070 static int
3071 nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size)
3072 {
3073     u_char          *p, *end;
3074     nxt_chunk_id_t  c;
3075 
3076     memset(start, 0xA5, size);
3077 
3078     p = start;
3079     end = p + size;
3080     c = nxt_port_mmap_chunk_id(hdr, p);
3081 
3082     while (p < end) {
3083         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
3084 
3085         p += PORT_MMAP_CHUNK_SIZE;
3086         c++;
3087     }
3088 
3089     return NXT_UNIT_OK;
3090 }
3091 
3092 
3093 static nxt_int_t
3094 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
3095 {
3096     nxt_process_t  *process;
3097 
3098     process = data;
3099 
3100     if (lhq->key.length == sizeof(pid_t)
3101         && *(pid_t *) lhq->key.start == process->pid)
3102     {
3103         return NXT_OK;
3104     }
3105 
3106     return NXT_DECLINED;
3107 }
3108 
3109 
3110 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
3111     NXT_LVLHSH_DEFAULT,
3112     nxt_unit_lvlhsh_pid_test,
3113     nxt_lvlhsh_alloc,
3114     nxt_lvlhsh_free,
3115 };
3116 
3117 
3118 static inline void
3119 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
3120 {
3121     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
3122     lhq->key.length = sizeof(*pid);
3123     lhq->key.start = (u_char *) pid;
3124     lhq->proto = &lvlhsh_processes_proto;
3125 }
3126 
3127 
3128 static nxt_unit_process_t *
3129 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
3130 {
3131     nxt_unit_impl_t     *lib;
3132     nxt_unit_process_t  *process;
3133     nxt_lvlhsh_query_t  lhq;
3134 
3135     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3136 
3137     nxt_unit_process_lhq_pid(&lhq, &pid);
3138 
3139     if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
3140         process = lhq.value;
3141         nxt_unit_process_use(ctx, process, 1);
3142 
3143         return process;
3144     }
3145 
3146     process = malloc(sizeof(nxt_unit_process_t));
3147     if (nxt_slow_path(process == NULL)) {
3148         nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid);
3149 
3150         return NULL;
3151     }
3152 
3153     process->pid = pid;
3154     process->use_count = 1;
3155     process->next_port_id = 0;
3156     process->lib = lib;
3157 
3158     nxt_queue_init(&process->ports);
3159 
3160     nxt_unit_mmaps_init(&process->incoming);
3161     nxt_unit_mmaps_init(&process->outgoing);
3162 
3163     lhq.replace = 0;
3164     lhq.value = process;
3165 
3166     switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
3167 
3168     case NXT_OK:
3169         break;
3170 
3171     default:
3172         nxt_unit_warn(ctx, "process %d insert failed", (int) pid);
3173 
3174         pthread_mutex_destroy(&process->outgoing.mutex);
3175         pthread_mutex_destroy(&process->incoming.mutex);
3176         free(process);
3177         process = NULL;
3178         break;
3179     }
3180 
3181     nxt_unit_process_use(ctx, process, 1);
3182 
3183     return process;
3184 }
3185 
3186 
3187 static nxt_unit_process_t *
3188 nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove)
3189 {
3190     int                 rc;
3191     nxt_unit_impl_t     *lib;
3192     nxt_unit_process_t  *process;
3193     nxt_lvlhsh_query_t  lhq;
3194 
3195     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3196 
3197     nxt_unit_process_lhq_pid(&lhq, &pid);
3198 
3199     if (remove) {
3200         rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
3201 
3202     } else {
3203         rc = nxt_lvlhsh_find(&lib->processes, &lhq);
3204     }
3205 
3206     if (rc == NXT_OK) {
3207         process = lhq.value;
3208 
3209         if (!remove) {
3210             nxt_unit_process_use(ctx, process, 1);
3211         }
3212 
3213         return process;
3214     }
3215 
3216     return NULL;
3217 }
3218 
3219 
3220 static nxt_unit_process_t *
3221 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
3222 {
3223     return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
3224 }
3225 
3226 
3227 int
3228 nxt_unit_run(nxt_unit_ctx_t *ctx)
3229 {
3230     int              rc;
3231     nxt_unit_impl_t  *lib;
3232 
3233     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3234     rc = NXT_UNIT_OK;
3235 
3236     while (nxt_fast_path(lib->online)) {
3237         rc = nxt_unit_run_once(ctx);
3238     }
3239 
3240     return rc;
3241 }
3242 
3243 
3244 int
3245 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
3246 {
3247     int                  rc;
3248     char                 buf[4096];
3249     char                 oob[256];
3250     ssize_t              rsize;
3251     nxt_unit_impl_t      *lib;
3252     nxt_unit_ctx_impl_t  *ctx_impl;
3253 
3254     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3255     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3256 
3257     memset(oob, 0, sizeof(struct cmsghdr));
3258 
3259     if (ctx_impl->read_port_fd != -1) {
3260         rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
3261                                          buf, sizeof(buf),
3262                                          oob, sizeof(oob));
3263     } else {
3264         rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
3265                                          buf, sizeof(buf),
3266                                          oob, sizeof(oob));
3267     }
3268 
3269     if (nxt_fast_path(rsize > 0)) {
3270         rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize,
3271                                   oob, sizeof(oob));
3272 
3273 #if (NXT_DEBUG)
3274         memset(buf, 0xAC, rsize);
3275 #endif
3276 
3277     } else {
3278         rc = NXT_UNIT_ERROR;
3279     }
3280 
3281     return rc;
3282 }
3283 
3284 
3285 void
3286 nxt_unit_done(nxt_unit_ctx_t *ctx)
3287 {
3288     nxt_unit_impl_t      *lib;
3289     nxt_unit_process_t   *process;
3290     nxt_unit_ctx_impl_t  *ctx_impl;
3291 
3292     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3293 
3294     nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
3295 
3296         nxt_unit_ctx_free(&ctx_impl->ctx);
3297 
3298     } nxt_queue_loop;
3299 
3300     for ( ;; ) {
3301         pthread_mutex_lock(&lib->mutex);
3302 
3303         process = nxt_unit_process_pop_first(lib);
3304         if (process == NULL) {
3305             pthread_mutex_unlock(&lib->mutex);
3306 
3307             break;
3308         }
3309 
3310         nxt_unit_remove_process(ctx, process);
3311     }
3312 
3313     pthread_mutex_destroy(&lib->mutex);
3314 
3315     free(lib);
3316 }
3317 
3318 
3319 nxt_unit_ctx_t *
3320 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
3321 {
3322     int                  rc, fd;
3323     nxt_unit_impl_t      *lib;
3324     nxt_unit_port_id_t   new_port_id;
3325     nxt_unit_ctx_impl_t  *new_ctx;
3326 
3327     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3328 
3329     new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
3330     if (nxt_slow_path(new_ctx == NULL)) {
3331         nxt_unit_warn(ctx, "failed to allocate context");
3332 
3333         return NULL;
3334     }
3335 
3336     rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
3337     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3338         free(new_ctx);
3339 
3340         return NULL;
3341     }
3342 
3343     rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd);
3344     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3345         lib->callbacks.remove_port(ctx, &new_port_id);
3346 
3347         close(fd);
3348 
3349         free(new_ctx);
3350 
3351         return NULL;
3352     }
3353 
3354     close(fd);
3355 
3356     rc = nxt_unit_ctx_init(lib, new_ctx, data);
3357     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3358         lib->callbacks.remove_port(ctx, &new_port_id);
3359 
3360         free(new_ctx);
3361 
3362         return NULL;
3363     }
3364 
3365     new_ctx->read_port_id = new_port_id;
3366 
3367     return &new_ctx->ctx;
3368 }
3369 
3370 
3371 void
3372 nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
3373 {
3374     nxt_unit_impl_t                  *lib;
3375     nxt_unit_ctx_impl_t              *ctx_impl;
3376     nxt_unit_mmap_buf_t              *mmap_buf;
3377     nxt_unit_request_info_impl_t     *req_impl;
3378     nxt_unit_websocket_frame_impl_t  *ws_impl;
3379 
3380     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3381     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3382 
3383     nxt_queue_each(req_impl, &ctx_impl->active_req,
3384                    nxt_unit_request_info_impl_t, link)
3385     {
3386         nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
3387 
3388         nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
3389 
3390     } nxt_queue_loop;
3391 
3392     nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]);
3393     nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]);
3394 
3395     while (ctx_impl->free_buf != NULL) {
3396         mmap_buf = ctx_impl->free_buf;
3397         nxt_unit_mmap_buf_remove(mmap_buf);
3398         free(mmap_buf);
3399     }
3400 
3401     nxt_queue_each(req_impl, &ctx_impl->free_req,
3402                    nxt_unit_request_info_impl_t, link)
3403     {
3404         nxt_unit_request_info_free(req_impl);
3405 
3406     } nxt_queue_loop;
3407 
3408     nxt_queue_each(ws_impl, &ctx_impl->free_ws,
3409                    nxt_unit_websocket_frame_impl_t, link)
3410     {
3411         nxt_unit_websocket_frame_free(ws_impl);
3412 
3413     } nxt_queue_loop;
3414 
3415     pthread_mutex_destroy(&ctx_impl->mutex);
3416 
3417     nxt_queue_remove(&ctx_impl->link);
3418 
3419     if (ctx_impl != &lib->main_ctx) {
3420         free(ctx_impl);
3421     }
3422 }
3423 
3424 
3425 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
3426 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
3427 #define NXT_UNIX_SOCKET  SOCK_SEQPACKET
3428 #else
3429 #define NXT_UNIX_SOCKET  SOCK_DGRAM
3430 #endif
3431 
3432 
3433 void
3434 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
3435 {
3436     nxt_unit_port_hash_id_t  port_hash_id;
3437 
3438     port_hash_id.pid = pid;
3439     port_hash_id.id = id;
3440 
3441     port_id->pid = pid;
3442     port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
3443     port_id->id = id;
3444 }
3445 
3446 
3447 int
3448 nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
3449     nxt_unit_port_id_t *port_id)
3450 {
3451     int                 rc, fd;
3452     nxt_unit_impl_t     *lib;
3453     nxt_unit_port_id_t  new_port_id;
3454 
3455     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3456 
3457     rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
3458     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3459         return rc;
3460     }
3461 
3462     rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd);
3463 
3464     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
3465         *port_id = new_port_id;
3466 
3467     } else {
3468         lib->callbacks.remove_port(ctx, &new_port_id);
3469     }
3470 
3471     close(fd);
3472 
3473     return rc;
3474 }
3475 
3476 
3477 static int
3478 nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
3479 {
3480     int                 rc, port_sockets[2];
3481     nxt_unit_impl_t     *lib;
3482     nxt_unit_port_t     new_port;
3483     nxt_unit_process_t  *process;
3484 
3485     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3486 
3487     rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
3488     if (nxt_slow_path(rc != 0)) {
3489         nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
3490                       strerror(errno), errno);
3491 
3492         return NXT_UNIT_ERROR;
3493     }
3494 
3495     nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
3496                    port_sockets[0], port_sockets[1]);
3497 
3498     pthread_mutex_lock(&lib->mutex);
3499 
3500     process = nxt_unit_process_get(ctx, lib->pid);
3501     if (nxt_slow_path(process == NULL)) {
3502         pthread_mutex_unlock(&lib->mutex);
3503 
3504         close(port_sockets[0]);
3505         close(port_sockets[1]);
3506 
3507         return NXT_UNIT_ERROR;
3508     }
3509 
3510     nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
3511 
3512     new_port.in_fd = port_sockets[0];
3513     new_port.out_fd = -1;
3514     new_port.data = NULL;
3515 
3516     pthread_mutex_unlock(&lib->mutex);
3517 
3518     nxt_unit_process_use(ctx, process, -1);
3519 
3520     rc = lib->callbacks.add_port(ctx, &new_port);
3521     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3522         nxt_unit_warn(ctx, "create_port: add_port() failed");
3523 
3524         close(port_sockets[0]);
3525         close(port_sockets[1]);
3526 
3527         return rc;
3528     }
3529 
3530     *port_id = new_port.id;
3531     *fd = port_sockets[1];
3532 
3533     return rc;
3534 }
3535 
3536 
3537 static int
3538 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
3539     nxt_unit_port_id_t *new_port, int fd)
3540 {
3541     ssize_t          res;
3542     nxt_unit_impl_t  *lib;
3543 
3544     struct {
3545         nxt_port_msg_t            msg;
3546         nxt_port_msg_new_port_t   new_port;
3547     } m;
3548 
3549     union {
3550         struct cmsghdr  cm;
3551         char            space[CMSG_SPACE(sizeof(int))];
3552     } cmsg;
3553 
3554     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3555 
3556     m.msg.stream = 0;
3557     m.msg.pid = lib->pid;
3558     m.msg.reply_port = 0;
3559     m.msg.type = _NXT_PORT_MSG_NEW_PORT;
3560     m.msg.last = 0;
3561     m.msg.mmap = 0;
3562     m.msg.nf = 0;
3563     m.msg.mf = 0;
3564     m.msg.tracking = 0;
3565 
3566     m.new_port.id = new_port->id;
3567     m.new_port.pid = new_port->pid;
3568     m.new_port.type = NXT_PROCESS_WORKER;
3569     m.new_port.max_size = 16 * 1024;
3570     m.new_port.max_share = 64 * 1024;
3571 
3572     memset(&cmsg, 0, sizeof(cmsg));
3573 
3574     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
3575     cmsg.cm.cmsg_level = SOL_SOCKET;
3576     cmsg.cm.cmsg_type = SCM_RIGHTS;
3577 
3578     /*
3579      * memcpy() is used instead of simple
3580      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
3581      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
3582      *   dereferencing type-punned pointer will break strict-aliasing rules
3583      *
3584      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
3585      * in the same simple assignment as in the code above.
3586      */
3587     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
3588 
3589     res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m),
3590                                    &cmsg, sizeof(cmsg));
3591 
3592     return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
3593 }
3594 
3595 
3596 int
3597 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
3598 {
3599     int                   rc;
3600     nxt_unit_impl_t       *lib;
3601     nxt_unit_process_t    *process;
3602     nxt_unit_port_impl_t  *new_port;
3603 
3604     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3605 
3606     nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
3607                    port->id.pid, port->id.id,
3608                    port->in_fd, port->out_fd);
3609 
3610     pthread_mutex_lock(&lib->mutex);
3611 
3612     process = nxt_unit_process_get(ctx, port->id.pid);
3613     if (nxt_slow_path(process == NULL)) {
3614         rc = NXT_UNIT_ERROR;
3615         goto unlock;
3616     }
3617 
3618     if (port->id.id >= process->next_port_id) {
3619         process->next_port_id = port->id.id + 1;
3620     }
3621 
3622     new_port = malloc(sizeof(nxt_unit_port_impl_t));
3623     if (nxt_slow_path(new_port == NULL)) {
3624         rc = NXT_UNIT_ERROR;
3625         goto unlock;
3626     }
3627 
3628     new_port->port = *port;
3629 
3630     rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
3631     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3632         goto unlock;
3633     }
3634 
3635     nxt_queue_insert_tail(&process->ports, &new_port->link);
3636 
3637     rc = NXT_UNIT_OK;
3638 
3639     new_port->process = process;
3640 
3641 unlock:
3642 
3643     pthread_mutex_unlock(&lib->mutex);
3644 
3645     if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) {
3646         nxt_unit_process_use(ctx, process, -1);
3647     }
3648 
3649     return rc;
3650 }
3651 
3652 
3653 void
3654 nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
3655 {
3656     nxt_unit_find_remove_port(ctx, port_id, NULL);
3657 }
3658 
3659 
3660 void
3661 nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
3662     nxt_unit_port_t *r_port)
3663 {
3664     nxt_unit_impl_t     *lib;
3665     nxt_unit_process_t  *process;
3666 
3667     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3668 
3669     pthread_mutex_lock(&lib->mutex);
3670 
3671     process = NULL;
3672 
3673     nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process);
3674 
3675     pthread_mutex_unlock(&lib->mutex);
3676 
3677     if (nxt_slow_path(process != NULL)) {
3678         nxt_unit_process_use(ctx, process, -1);
3679     }
3680 }
3681 
3682 
3683 static void
3684 nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
3685     nxt_unit_port_t *r_port, nxt_unit_process_t **process)
3686 {
3687     nxt_unit_impl_t       *lib;
3688     nxt_unit_port_impl_t  *port;
3689 
3690     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3691 
3692     port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
3693     if (nxt_slow_path(port == NULL)) {
3694         nxt_unit_debug(ctx, "remove_port: port %d,%d not found",
3695                        (int) port_id->pid, (int) port_id->id);
3696 
3697         return;
3698     }
3699 
3700     nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p",
3701                    (int) port_id->pid, (int) port_id->id,
3702                    port->port.in_fd, port->port.out_fd, port->port.data);
3703 
3704     if (port->port.in_fd != -1) {
3705         close(port->port.in_fd);
3706     }
3707 
3708     if (port->port.out_fd != -1) {
3709         close(port->port.out_fd);
3710     }
3711 
3712     if (port->process != NULL) {
3713         nxt_queue_remove(&port->link);
3714     }
3715 
3716     if (process != NULL) {
3717         *process = port->process;
3718     }
3719 
3720     if (r_port != NULL) {
3721         *r_port = port->port;
3722     }
3723 
3724     free(port);
3725 }
3726 
3727 
3728 void
3729 nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid)
3730 {
3731     nxt_unit_impl_t     *lib;
3732     nxt_unit_process_t  *process;
3733 
3734     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3735 
3736     pthread_mutex_lock(&lib->mutex);
3737 
3738     process = nxt_unit_process_find(ctx, pid, 1);
3739     if (nxt_slow_path(process == NULL)) {
3740         nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid);
3741 
3742         pthread_mutex_unlock(&lib->mutex);
3743 
3744         return;
3745     }
3746 
3747     nxt_unit_remove_process(ctx, process);
3748 }
3749 
3750 
3751 static void
3752 nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process)
3753 {
3754     nxt_queue_t           ports;
3755     nxt_unit_impl_t       *lib;
3756     nxt_unit_port_impl_t  *port;
3757 
3758     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3759 
3760     nxt_queue_init(&ports);
3761 
3762     nxt_queue_add(&ports, &process->ports);
3763 
3764     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
3765 
3766         nxt_unit_process_use(ctx, process, -1);
3767         port->process = NULL;
3768 
3769         /* Shortcut for default callback. */
3770         if (lib->callbacks.remove_port == nxt_unit_remove_port) {
3771             nxt_queue_remove(&port->link);
3772 
3773             nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL);
3774         }
3775 
3776     } nxt_queue_loop;
3777 
3778     pthread_mutex_unlock(&lib->mutex);
3779 
3780     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
3781 
3782         nxt_queue_remove(&port->link);
3783 
3784         lib->callbacks.remove_port(ctx, &port->port.id);
3785 
3786     } nxt_queue_loop;
3787 
3788     nxt_unit_process_use(ctx, process, -1);
3789 }
3790 
3791 
3792 void
3793 nxt_unit_quit(nxt_unit_ctx_t *ctx)
3794 {
3795     nxt_unit_impl_t  *lib;
3796 
3797     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3798 
3799     lib->online = 0;
3800 }
3801 
3802 
3803 static ssize_t
3804 nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
3805     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
3806 {
3807     int                   fd;
3808     nxt_unit_impl_t       *lib;
3809     nxt_unit_port_impl_t  *port;
3810 
3811     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3812 
3813     pthread_mutex_lock(&lib->mutex);
3814 
3815     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
3816 
3817     if (nxt_fast_path(port != NULL)) {
3818         fd = port->port.out_fd;
3819 
3820     } else {
3821         nxt_unit_warn(ctx, "port_send: port %d,%d not found",
3822                       (int) port_id->pid, (int) port_id->id);
3823         fd = -1;
3824     }
3825 
3826     pthread_mutex_unlock(&lib->mutex);
3827 
3828     if (nxt_slow_path(fd == -1)) {
3829         if (port != NULL) {
3830             nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1",
3831                           (int) port_id->pid, (int) port_id->id);
3832         }
3833 
3834         return -1;
3835     }
3836 
3837     nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d",
3838                    (int) port_id->pid, (int) port_id->id, fd);
3839 
3840     return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size);
3841 }
3842 
3843 
3844 ssize_t
3845 nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd,
3846     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
3847 {
3848     ssize_t        res;
3849     struct iovec   iov[1];
3850     struct msghdr  msg;
3851 
3852     iov[0].iov_base = (void *) buf;
3853     iov[0].iov_len = buf_size;
3854 
3855     msg.msg_name = NULL;
3856     msg.msg_namelen = 0;
3857     msg.msg_iov = iov;
3858     msg.msg_iovlen = 1;
3859     msg.msg_flags = 0;
3860     msg.msg_control = (void *) oob;
3861     msg.msg_controllen = oob_size;
3862 
3863     res = sendmsg(fd, &msg, 0);
3864 
3865     if (nxt_slow_path(res == -1)) {
3866         nxt_unit_warn(ctx, "port_send(%d, %d) failed: %s (%d)",
3867                       fd, (int) buf_size, strerror(errno), errno);
3868 
3869     } else {
3870         nxt_unit_debug(ctx, "port_send(%d, %d): %d", fd, (int) buf_size,
3871                        (int) res);
3872     }
3873 
3874     return res;
3875 }
3876 
3877 
3878 static ssize_t
3879 nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
3880     void *buf, size_t buf_size, void *oob, size_t oob_size)
3881 {
3882     int                   fd;
3883     nxt_unit_impl_t       *lib;
3884     nxt_unit_ctx_impl_t   *ctx_impl;
3885     nxt_unit_port_impl_t  *port;
3886 
3887     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3888 
3889     pthread_mutex_lock(&lib->mutex);
3890 
3891     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
3892 
3893     if (nxt_fast_path(port != NULL)) {
3894         fd = port->port.in_fd;
3895 
3896     } else {
3897         nxt_unit_debug(ctx, "port_recv: port %d,%d not found",
3898                        (int) port_id->pid, (int) port_id->id);
3899         fd = -1;
3900     }
3901 
3902     pthread_mutex_unlock(&lib->mutex);
3903 
3904     if (nxt_slow_path(fd == -1)) {
3905         return -1;
3906     }
3907 
3908     nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d",
3909                    (int) port_id->pid, (int) port_id->id, fd);
3910 
3911     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3912 
3913     if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) {
3914         ctx_impl->read_port_fd = fd;
3915     }
3916 
3917     return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size);
3918 }
3919 
3920 
3921 ssize_t
3922 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size,
3923     void *oob, size_t oob_size)
3924 {
3925     ssize_t        res;
3926     struct iovec   iov[1];
3927     struct msghdr  msg;
3928 
3929     iov[0].iov_base = buf;
3930     iov[0].iov_len = buf_size;
3931 
3932     msg.msg_name = NULL;
3933     msg.msg_namelen = 0;
3934     msg.msg_iov = iov;
3935     msg.msg_iovlen = 1;
3936     msg.msg_flags = 0;
3937     msg.msg_control = oob;
3938     msg.msg_controllen = oob_size;
3939 
3940     res = recvmsg(fd, &msg, 0);
3941 
3942     if (nxt_slow_path(res == -1)) {
3943         nxt_unit_warn(ctx, "port_recv(%d) failed: %s (%d)",
3944                       fd, strerror(errno), errno);
3945 
3946     } else {
3947         nxt_unit_debug(ctx, "port_recv(%d): %d", fd, (int) res);
3948     }
3949 
3950     return res;
3951 }
3952 
3953 
3954 static nxt_int_t
3955 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
3956 {
3957     nxt_unit_port_t          *port;
3958     nxt_unit_port_hash_id_t  *port_id;
3959 
3960     port = data;
3961     port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
3962 
3963     if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
3964         && port_id->pid == port->id.pid
3965         && port_id->id == port->id.id)
3966     {
3967         return NXT_OK;
3968     }
3969 
3970     return NXT_DECLINED;
3971 }
3972 
3973 
3974 static const nxt_lvlhsh_proto_t  lvlhsh_ports_proto  nxt_aligned(64) = {
3975     NXT_LVLHSH_DEFAULT,
3976     nxt_unit_port_hash_test,
3977     nxt_lvlhsh_alloc,
3978     nxt_lvlhsh_free,
3979 };
3980 
3981 
3982 static inline void
3983 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
3984     nxt_unit_port_hash_id_t *port_hash_id,
3985     nxt_unit_port_id_t *port_id)
3986 {
3987     port_hash_id->pid = port_id->pid;
3988     port_hash_id->id = port_id->id;
3989 
3990     if (nxt_fast_path(port_id->hash != 0)) {
3991         lhq->key_hash = port_id->hash;
3992 
3993     } else {
3994         lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
3995 
3996         port_id->hash = lhq->key_hash;
3997 
3998         nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
3999                        (int) port_id->pid, (int) port_id->id,
4000                        (int) port_id->hash);
4001     }
4002 
4003     lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
4004     lhq->key.start = (u_char *) port_hash_id;
4005     lhq->proto = &lvlhsh_ports_proto;
4006     lhq->pool = NULL;
4007 }
4008 
4009 
4010 static int
4011 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
4012 {
4013     nxt_int_t                res;
4014     nxt_lvlhsh_query_t       lhq;
4015     nxt_unit_port_hash_id_t  port_hash_id;
4016 
4017     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
4018     lhq.replace = 0;
4019     lhq.value = port;
4020 
4021     res = nxt_lvlhsh_insert(port_hash, &lhq);
4022 
4023     switch (res) {
4024 
4025     case NXT_OK:
4026         return NXT_UNIT_OK;
4027 
4028     default:
4029         return NXT_UNIT_ERROR;
4030     }
4031 }
4032 
4033 
4034 static nxt_unit_port_impl_t *
4035 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
4036     int remove)
4037 {
4038     nxt_int_t                res;
4039     nxt_lvlhsh_query_t       lhq;
4040     nxt_unit_port_hash_id_t  port_hash_id;
4041 
4042     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
4043 
4044     if (remove) {
4045         res = nxt_lvlhsh_delete(port_hash, &lhq);
4046 
4047     } else {
4048         res = nxt_lvlhsh_find(port_hash, &lhq);
4049     }
4050 
4051     switch (res) {
4052 
4053     case NXT_OK:
4054         return lhq.value;
4055 
4056     default:
4057         return NULL;
4058     }
4059 }
4060 
4061 
4062 static nxt_int_t
4063 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
4064 {
4065     return NXT_OK;
4066 }
4067 
4068 
4069 static const nxt_lvlhsh_proto_t  lvlhsh_requests_proto  nxt_aligned(64) = {
4070     NXT_LVLHSH_DEFAULT,
4071     nxt_unit_request_hash_test,
4072     nxt_lvlhsh_alloc,
4073     nxt_lvlhsh_free,
4074 };
4075 
4076 
4077 static int
4078 nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
4079     nxt_unit_request_info_impl_t *req_impl)
4080 {
4081     uint32_t            *stream;
4082     nxt_int_t           res;
4083     nxt_lvlhsh_query_t  lhq;
4084 
4085     stream = &req_impl->stream;
4086 
4087     lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
4088     lhq.key.length = sizeof(*stream);
4089     lhq.key.start = (u_char *) stream;
4090     lhq.proto = &lvlhsh_requests_proto;
4091     lhq.pool = NULL;
4092     lhq.replace = 0;
4093     lhq.value = req_impl;
4094 
4095     res = nxt_lvlhsh_insert(request_hash, &lhq);
4096 
4097     switch (res) {
4098 
4099     case NXT_OK:
4100         return NXT_UNIT_OK;
4101 
4102     default:
4103         return NXT_UNIT_ERROR;
4104     }
4105 }
4106 
4107 
4108 static nxt_unit_request_info_impl_t *
4109 nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
4110     int remove)
4111 {
4112     nxt_int_t           res;
4113     nxt_lvlhsh_query_t  lhq;
4114 
4115     lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
4116     lhq.key.length = sizeof(stream);
4117     lhq.key.start = (u_char *) &stream;
4118     lhq.proto = &lvlhsh_requests_proto;
4119     lhq.pool = NULL;
4120 
4121     if (remove) {
4122         res = nxt_lvlhsh_delete(request_hash, &lhq);
4123 
4124     } else {
4125         res = nxt_lvlhsh_find(request_hash, &lhq);
4126     }
4127 
4128     switch (res) {
4129 
4130     case NXT_OK:
4131         return lhq.value;
4132 
4133     default:
4134         return NULL;
4135     }
4136 }
4137 
4138 
4139 void
4140 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
4141 {
4142     int              log_fd, n;
4143     char             msg[NXT_MAX_ERROR_STR], *p, *end;
4144     pid_t            pid;
4145     va_list          ap;
4146     nxt_unit_impl_t  *lib;
4147 
4148     if (nxt_fast_path(ctx != NULL)) {
4149         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4150 
4151         pid = lib->pid;
4152         log_fd = lib->log_fd;
4153 
4154     } else {
4155         pid = getpid();
4156         log_fd = STDERR_FILENO;
4157     }
4158 
4159     p = msg;
4160     end = p + sizeof(msg) - 1;
4161 
4162     p = nxt_unit_snprint_prefix(p, end, pid, level);
4163 
4164     va_start(ap, fmt);
4165     p += vsnprintf(p, end - p, fmt, ap);
4166     va_end(ap);
4167 
4168     if (nxt_slow_path(p > end)) {
4169         memcpy(end - 5, "[...]", 5);
4170         p = end;
4171     }
4172 
4173     *p++ = '\n';
4174 
4175     n = write(log_fd, msg, p - msg);
4176     if (nxt_slow_path(n < 0)) {
4177         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
4178     }
4179 }
4180 
4181 
4182 void
4183 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
4184 {
4185     int                           log_fd, n;
4186     char                          msg[NXT_MAX_ERROR_STR], *p, *end;
4187     pid_t                         pid;
4188     va_list                       ap;
4189     nxt_unit_impl_t               *lib;
4190     nxt_unit_request_info_impl_t  *req_impl;
4191 
4192     if (nxt_fast_path(req != NULL)) {
4193         lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
4194 
4195         pid = lib->pid;
4196         log_fd = lib->log_fd;
4197 
4198     } else {
4199         pid = getpid();
4200         log_fd = STDERR_FILENO;
4201     }
4202 
4203     p = msg;
4204     end = p + sizeof(msg) - 1;
4205 
4206     p = nxt_unit_snprint_prefix(p, end, pid, level);
4207 
4208     if (nxt_fast_path(req != NULL)) {
4209         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
4210 
4211         p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
4212     }
4213 
4214     va_start(ap, fmt);
4215     p += vsnprintf(p, end - p, fmt, ap);
4216     va_end(ap);
4217 
4218     if (nxt_slow_path(p > end)) {
4219         memcpy(end - 5, "[...]", 5);
4220         p = end;
4221     }
4222 
4223     *p++ = '\n';
4224 
4225     n = write(log_fd, msg, p - msg);
4226     if (nxt_slow_path(n < 0)) {
4227         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
4228     }
4229 }
4230 
4231 
4232 static const char * nxt_unit_log_levels[] = {
4233     "alert",
4234     "error",
4235     "warn",
4236     "notice",
4237     "info",
4238     "debug",
4239 };
4240 
4241 
4242 static char *
4243 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
4244 {
4245     struct tm        tm;
4246     struct timespec  ts;
4247 
4248     (void) clock_gettime(CLOCK_REALTIME, &ts);
4249 
4250 #if (NXT_HAVE_LOCALTIME_R)
4251     (void) localtime_r(&ts.tv_sec, &tm);
4252 #else
4253     tm = *localtime(&ts.tv_sec);
4254 #endif
4255 
4256     p += snprintf(p, end - p,
4257                   "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
4258                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
4259                   tm.tm_hour, tm.tm_min, tm.tm_sec,
4260                   (int) ts.tv_nsec / 1000000);
4261 
4262     p += snprintf(p, end - p,
4263                   "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
4264                   (int) pid,
4265                   (uint64_t) (uintptr_t) nxt_thread_get_tid());
4266 
4267     return p;
4268 }
4269 
4270 
4271 /* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */
4272 
4273 void *
4274 nxt_memalign(size_t alignment, size_t size)
4275 {
4276     void        *p;
4277     nxt_err_t   err;
4278 
4279     err = posix_memalign(&p, alignment, size);
4280 
4281     if (nxt_fast_path(err == 0)) {
4282         return p;
4283     }
4284 
4285     return NULL;
4286 }
4287 
4288 #if (NXT_DEBUG)
4289 
4290 void
4291 nxt_free(void *p)
4292 {
4293     free(p);
4294 }
4295 
4296 #endif
4297