xref: /unit/src/nxt_unit.c (revision 1131:ec7d924d8dfb)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <stdlib.h>
7 
8 #include "nxt_main.h"
9 #include "nxt_port_memory_int.h"
10 
11 #include "nxt_unit.h"
12 #include "nxt_unit_request.h"
13 #include "nxt_unit_response.h"
14 #include "nxt_unit_websocket.h"
15 
16 #include "nxt_websocket.h"
17 
18 #if (NXT_HAVE_MEMFD_CREATE)
19 #include <linux/memfd.h>
20 #endif
21 
22 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
23 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
24 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
25 typedef struct nxt_unit_process_s               nxt_unit_process_t;
26 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
27 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
28 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
29 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
30 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
31 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
32 
33 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
34 static void nxt_unit_ctx_init(nxt_unit_impl_t *lib,
35     nxt_unit_ctx_impl_t *ctx_impl, void *data);
36 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
37     nxt_unit_mmap_buf_t *mmap_buf);
38 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
39     nxt_unit_mmap_buf_t *mmap_buf);
40 nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf);
41 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
42     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream);
43 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
44     uint32_t stream);
45 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
46     nxt_unit_recv_msg_t *recv_msg);
47 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
48     nxt_unit_recv_msg_t *recv_msg);
49 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
50     nxt_unit_recv_msg_t *recv_msg);
51 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
52     nxt_unit_ctx_t *ctx);
53 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
54 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
55 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
56     nxt_unit_ctx_t *ctx);
57 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
58 static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
59 static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
60     nxt_unit_recv_msg_t *recv_msg);
61 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
62 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
63 static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
64     nxt_unit_mmap_buf_t *mmap_buf, int last);
65 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
66 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
67     size_t size);
68 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
69     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
70     nxt_chunk_id_t *c, int n);
71 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
72 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
73     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
74 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
75     int fd);
76 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
77     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
78     nxt_unit_mmap_buf_t *mmap_buf);
79 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
80 
81 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
82 static void nxt_unit_process_use(nxt_unit_ctx_t *ctx,
83     nxt_unit_process_t *process, int i);
84 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
85 static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
86     nxt_unit_process_t *process, uint32_t id);
87 static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
88     nxt_unit_recv_msg_t *recv_msg);
89 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
90     nxt_unit_recv_msg_t *recv_msg);
91 static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start,
92     uint32_t size);
93 
94 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
95     pid_t pid);
96 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
97     pid_t pid, int remove);
98 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
99 static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
100     nxt_unit_port_id_t *port_id, int *fd);
101 
102 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
103     nxt_unit_port_id_t *new_port, int fd);
104 
105 static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx,
106     nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port,
107     nxt_unit_process_t **process);
108 static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx,
109     nxt_unit_process_t *process);
110 
111 static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx,
112     nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size,
113     const void *oob, size_t oob_size);
114 static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx,
115     nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
116     void *oob, size_t oob_size);
117 
118 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
119     nxt_unit_port_t *port);
120 static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
121     nxt_unit_port_id_t *port_id, int remove);
122 
123 static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
124     nxt_unit_request_info_impl_t *req_impl);
125 static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
126     nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
127 
128 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
129 
130 
131 struct nxt_unit_mmap_buf_s {
132     nxt_unit_buf_t           buf;
133 
134     nxt_unit_mmap_buf_t      *next;
135     nxt_unit_mmap_buf_t      **prev;
136 
137     nxt_port_mmap_header_t   *hdr;
138 //    nxt_queue_link_t         link;
139     nxt_unit_port_id_t       port_id;
140     nxt_unit_request_info_t  *req;
141     nxt_unit_ctx_impl_t      *ctx_impl;
142 };
143 
144 
145 struct nxt_unit_recv_msg_s {
146     uint32_t                 stream;
147     nxt_pid_t                pid;
148     nxt_port_id_t            reply_port;
149 
150     uint8_t                  last;      /* 1 bit */
151     uint8_t                  mmap;      /* 1 bit */
152 
153     void                     *start;
154     uint32_t                 size;
155 
156     int                      fd;
157     nxt_unit_process_t       *process;
158 
159     nxt_unit_mmap_buf_t      *incoming_buf;
160 };
161 
162 
163 typedef enum {
164     NXT_UNIT_RS_START           = 0,
165     NXT_UNIT_RS_RESPONSE_INIT,
166     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
167     NXT_UNIT_RS_RESPONSE_SENT,
168     NXT_UNIT_RS_RELEASED,
169 } nxt_unit_req_state_t;
170 
171 
172 struct nxt_unit_request_info_impl_s {
173     nxt_unit_request_info_t  req;
174 
175     uint32_t                 stream;
176 
177     nxt_unit_process_t       *process;
178 
179     nxt_unit_mmap_buf_t      *outgoing_buf;
180     nxt_unit_mmap_buf_t      *incoming_buf;
181 
182     nxt_unit_req_state_t     state;
183     uint8_t                  websocket;
184 
185     nxt_queue_link_t         link;
186 
187     char                     extra_data[];
188 };
189 
190 
191 struct nxt_unit_websocket_frame_impl_s {
192     nxt_unit_websocket_frame_t  ws;
193 
194     nxt_unit_mmap_buf_t         *buf;
195 
196     nxt_queue_link_t            link;
197 
198     nxt_unit_ctx_impl_t         *ctx_impl;
199 
200     void                        *retain_buf;
201 };
202 
203 
204 struct nxt_unit_ctx_impl_s {
205     nxt_unit_ctx_t                ctx;
206 
207     nxt_unit_port_id_t            read_port_id;
208     int                           read_port_fd;
209 
210     nxt_queue_link_t              link;
211 
212     nxt_unit_mmap_buf_t           *free_buf;
213 
214     /*  of nxt_unit_request_info_impl_t */
215     nxt_queue_t                   free_req;
216 
217     /*  of nxt_unit_websocket_frame_impl_t */
218     nxt_queue_t                   free_ws;
219 
220     /*  of nxt_unit_request_info_impl_t */
221     nxt_queue_t                   active_req;
222 
223     /*  of nxt_unit_request_info_impl_t */
224     nxt_lvlhsh_t                  requests;
225 
226     nxt_unit_mmap_buf_t           ctx_buf[2];
227 
228     nxt_unit_request_info_impl_t  req;
229 };
230 
231 
232 struct nxt_unit_impl_s {
233     nxt_unit_t               unit;
234     nxt_unit_callbacks_t     callbacks;
235 
236     uint32_t                 request_data_size;
237 
238     pthread_mutex_t          mutex;
239 
240     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
241     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
242 
243     nxt_unit_port_id_t       ready_port_id;
244 
245     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
246 
247     pid_t                    pid;
248     int                      log_fd;
249     int                      online;
250 
251     nxt_unit_ctx_impl_t      main_ctx;
252 };
253 
254 
255 struct nxt_unit_port_impl_s {
256     nxt_unit_port_t          port;
257 
258     nxt_queue_link_t         link;
259     nxt_unit_process_t       *process;
260 };
261 
262 
263 struct nxt_unit_mmap_s {
264     nxt_port_mmap_header_t   *hdr;
265 };
266 
267 
268 struct nxt_unit_mmaps_s {
269     pthread_mutex_t          mutex;
270     uint32_t                 size;
271     uint32_t                 cap;
272     nxt_unit_mmap_t          *elts;
273 };
274 
275 
276 struct nxt_unit_process_s {
277     pid_t                    pid;
278 
279     nxt_queue_t              ports;
280 
281     nxt_unit_mmaps_t         incoming;
282     nxt_unit_mmaps_t         outgoing;
283 
284     nxt_unit_impl_t          *lib;
285 
286     nxt_atomic_t             use_count;
287 
288     uint32_t                 next_port_id;
289 };
290 
291 
292 /* Explicitly using 32 bit types to avoid possible alignment. */
293 typedef struct {
294     int32_t   pid;
295     uint32_t  id;
296 } nxt_unit_port_hash_id_t;
297 
298 
299 nxt_unit_ctx_t *
300 nxt_unit_init(nxt_unit_init_t *init)
301 {
302     int              rc;
303     uint32_t         ready_stream;
304     nxt_unit_ctx_t   *ctx;
305     nxt_unit_impl_t  *lib;
306     nxt_unit_port_t  ready_port, read_port;
307 
308     lib = nxt_unit_create(init);
309     if (nxt_slow_path(lib == NULL)) {
310         return NULL;
311     }
312 
313     if (init->ready_port.id.pid != 0
314         && init->ready_stream != 0
315         && init->read_port.id.pid != 0)
316     {
317         ready_port = init->ready_port;
318         ready_stream = init->ready_stream;
319         read_port = init->read_port;
320         lib->log_fd = init->log_fd;
321 
322         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
323                               ready_port.id.id);
324         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
325                               read_port.id.id);
326     } else {
327         rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd,
328                                &ready_stream);
329         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
330             goto fail;
331         }
332     }
333 
334     ctx = &lib->main_ctx.ctx;
335 
336     rc = lib->callbacks.add_port(ctx, &ready_port);
337     if (rc != NXT_UNIT_OK) {
338         nxt_unit_alert(NULL, "failed to add ready_port");
339 
340         goto fail;
341     }
342 
343     rc = lib->callbacks.add_port(ctx, &read_port);
344     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
345         nxt_unit_alert(NULL, "failed to add read_port");
346 
347         goto fail;
348     }
349 
350     lib->main_ctx.read_port_id = read_port.id;
351     lib->ready_port_id = ready_port.id;
352 
353     rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream);
354     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
355         nxt_unit_alert(NULL, "failed to send READY message");
356 
357         goto fail;
358     }
359 
360     return ctx;
361 
362 fail:
363 
364     free(lib);
365 
366     return NULL;
367 }
368 
369 
370 static nxt_unit_impl_t *
371 nxt_unit_create(nxt_unit_init_t *init)
372 {
373     int                   rc;
374     nxt_unit_impl_t       *lib;
375     nxt_unit_callbacks_t  *cb;
376 
377     lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size);
378     if (nxt_slow_path(lib == NULL)) {
379         nxt_unit_alert(NULL, "failed to allocate unit struct");
380 
381         return NULL;
382     }
383 
384     rc = pthread_mutex_init(&lib->mutex, NULL);
385     if (nxt_slow_path(rc != 0)) {
386         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
387 
388         goto fail;
389     }
390 
391     lib->unit.data = init->data;
392     lib->callbacks = init->callbacks;
393 
394     lib->request_data_size = init->request_data_size;
395 
396     lib->processes.slot = NULL;
397     lib->ports.slot = NULL;
398 
399     lib->pid = getpid();
400     lib->log_fd = STDERR_FILENO;
401     lib->online = 1;
402 
403     nxt_queue_init(&lib->contexts);
404 
405     nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
406 
407     cb = &lib->callbacks;
408 
409     if (cb->request_handler == NULL) {
410         nxt_unit_alert(NULL, "request_handler is NULL");
411 
412         goto fail;
413     }
414 
415     if (cb->add_port == NULL) {
416         cb->add_port = nxt_unit_add_port;
417     }
418 
419     if (cb->remove_port == NULL) {
420         cb->remove_port = nxt_unit_remove_port;
421     }
422 
423     if (cb->remove_pid == NULL) {
424         cb->remove_pid = nxt_unit_remove_pid;
425     }
426 
427     if (cb->quit == NULL) {
428         cb->quit = nxt_unit_quit;
429     }
430 
431     if (cb->port_send == NULL) {
432         cb->port_send = nxt_unit_port_send_default;
433     }
434 
435     if (cb->port_recv == NULL) {
436         cb->port_recv = nxt_unit_port_recv_default;
437     }
438 
439     return lib;
440 
441 fail:
442 
443     free(lib);
444 
445     return NULL;
446 }
447 
448 
449 static void
450 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
451     void *data)
452 {
453     ctx_impl->ctx.data = data;
454     ctx_impl->ctx.unit = &lib->unit;
455 
456     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
457 
458     nxt_queue_init(&ctx_impl->free_req);
459     nxt_queue_init(&ctx_impl->free_ws);
460     nxt_queue_init(&ctx_impl->active_req);
461 
462     ctx_impl->free_buf = NULL;
463     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
464     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
465 
466     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
467 
468     ctx_impl->req.req.ctx = &ctx_impl->ctx;
469     ctx_impl->req.req.unit = &lib->unit;
470 
471     ctx_impl->read_port_fd = -1;
472     ctx_impl->requests.slot = 0;
473 }
474 
475 
476 nxt_inline void
477 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
478     nxt_unit_mmap_buf_t *mmap_buf)
479 {
480     mmap_buf->next = *head;
481 
482     if (mmap_buf->next != NULL) {
483         mmap_buf->next->prev = &mmap_buf->next;
484     }
485 
486     *head = mmap_buf;
487     mmap_buf->prev = head;
488 }
489 
490 
491 nxt_inline void
492 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
493     nxt_unit_mmap_buf_t *mmap_buf)
494 {
495     while (*prev != NULL) {
496         prev = &(*prev)->next;
497     }
498 
499     nxt_unit_mmap_buf_insert(prev, mmap_buf);
500 }
501 
502 
503 nxt_inline void
504 nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf)
505 {
506     nxt_unit_mmap_buf_t  **prev;
507 
508     prev = mmap_buf->prev;
509 
510     if (mmap_buf->next != NULL) {
511         mmap_buf->next->prev = prev;
512     }
513 
514     if (prev != NULL) {
515         *prev = mmap_buf->next;
516     }
517 }
518 
519 
520 static int
521 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
522     int *log_fd, uint32_t *stream)
523 {
524     int       rc;
525     int       ready_fd, read_fd;
526     char      *unit_init, *version_end;
527     long      version_length;
528     int64_t   ready_pid, read_pid;
529     uint32_t  ready_stream, ready_id, read_id;
530 
531     unit_init = getenv(NXT_UNIT_INIT_ENV);
532     if (nxt_slow_path(unit_init == NULL)) {
533         nxt_unit_alert(NULL, "%s is not in the current environment",
534                        NXT_UNIT_INIT_ENV);
535 
536         return NXT_UNIT_ERROR;
537     }
538 
539     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
540 
541     version_length = nxt_length(NXT_VERSION);
542 
543     version_end = strchr(unit_init, ';');
544     if (version_end == NULL
545         || version_end - unit_init != version_length
546         || memcmp(unit_init, NXT_VERSION, version_length) != 0)
547     {
548         nxt_unit_alert(NULL, "version check error");
549 
550         return NXT_UNIT_ERROR;
551     }
552 
553     rc = sscanf(version_end + 1,
554                 "%"PRIu32";"
555                 "%"PRId64",%"PRIu32",%d;"
556                 "%"PRId64",%"PRIu32",%d;"
557                 "%d",
558                 &ready_stream,
559                 &ready_pid, &ready_id, &ready_fd,
560                 &read_pid, &read_id, &read_fd,
561                 log_fd);
562 
563     if (nxt_slow_path(rc != 8)) {
564         nxt_unit_alert(NULL, "failed to scan variables");
565 
566         return NXT_UNIT_ERROR;
567     }
568 
569     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
570 
571     ready_port->in_fd = -1;
572     ready_port->out_fd = ready_fd;
573     ready_port->data = NULL;
574 
575     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
576 
577     read_port->in_fd = read_fd;
578     read_port->out_fd = -1;
579     read_port->data = NULL;
580 
581     *stream = ready_stream;
582 
583     return NXT_UNIT_OK;
584 }
585 
586 
587 static int
588 nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
589     uint32_t stream)
590 {
591     ssize_t          res;
592     nxt_port_msg_t   msg;
593     nxt_unit_impl_t  *lib;
594 
595     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
596 
597     msg.stream = stream;
598     msg.pid = lib->pid;
599     msg.reply_port = 0;
600     msg.type = _NXT_PORT_MSG_PROCESS_READY;
601     msg.last = 1;
602     msg.mmap = 0;
603     msg.nf = 0;
604     msg.mf = 0;
605     msg.tracking = 0;
606 
607     res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
608     if (res != sizeof(msg)) {
609         return NXT_UNIT_ERROR;
610     }
611 
612     return NXT_UNIT_OK;
613 }
614 
615 
616 int
617 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
618     void *buf, size_t buf_size, void *oob, size_t oob_size)
619 {
620     int                   rc;
621     pid_t                 pid;
622     struct cmsghdr        *cm;
623     nxt_port_msg_t        *port_msg;
624     nxt_unit_impl_t       *lib;
625     nxt_unit_recv_msg_t   recv_msg;
626     nxt_unit_callbacks_t  *cb;
627 
628     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
629 
630     rc = NXT_UNIT_ERROR;
631     recv_msg.fd = -1;
632     recv_msg.process = NULL;
633     port_msg = buf;
634     cm = oob;
635 
636     if (oob_size >= CMSG_SPACE(sizeof(int))
637         && cm->cmsg_len == CMSG_LEN(sizeof(int))
638         && cm->cmsg_level == SOL_SOCKET
639         && cm->cmsg_type == SCM_RIGHTS)
640     {
641         memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
642     }
643 
644     recv_msg.incoming_buf = NULL;
645 
646     if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
647         nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
648         goto fail;
649     }
650 
651     recv_msg.stream = port_msg->stream;
652     recv_msg.pid = port_msg->pid;
653     recv_msg.reply_port = port_msg->reply_port;
654     recv_msg.last = port_msg->last;
655     recv_msg.mmap = port_msg->mmap;
656 
657     recv_msg.start = port_msg + 1;
658     recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
659 
660     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
661         nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
662                       port_msg->stream, (int) port_msg->type);
663         goto fail;
664     }
665 
666     if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
667         rc = NXT_UNIT_OK;
668 
669         goto fail;
670     }
671 
672     /* Fragmentation is unsupported. */
673     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
674         nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
675                       port_msg->stream, (int) port_msg->type);
676         goto fail;
677     }
678 
679     if (port_msg->mmap) {
680         if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
681             goto fail;
682         }
683     }
684 
685     cb = &lib->callbacks;
686 
687     switch (port_msg->type) {
688 
689     case _NXT_PORT_MSG_QUIT:
690         nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
691 
692         cb->quit(ctx);
693         rc = NXT_UNIT_OK;
694         break;
695 
696     case _NXT_PORT_MSG_NEW_PORT:
697         rc = nxt_unit_process_new_port(ctx, &recv_msg);
698         break;
699 
700     case _NXT_PORT_MSG_CHANGE_FILE:
701         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
702                        port_msg->stream, recv_msg.fd);
703         break;
704 
705     case _NXT_PORT_MSG_MMAP:
706         if (nxt_slow_path(recv_msg.fd < 0)) {
707             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
708                            port_msg->stream, recv_msg.fd);
709 
710             goto fail;
711         }
712 
713         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
714         break;
715 
716     case _NXT_PORT_MSG_REQ_HEADERS:
717         rc = nxt_unit_process_req_headers(ctx, &recv_msg);
718         break;
719 
720     case _NXT_PORT_MSG_WEBSOCKET:
721         rc = nxt_unit_process_websocket(ctx, &recv_msg);
722         break;
723 
724     case _NXT_PORT_MSG_REMOVE_PID:
725         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
726             nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
727                           "(%d != %d)", port_msg->stream, (int) recv_msg.size,
728                           (int) sizeof(pid));
729 
730             goto fail;
731         }
732 
733         memcpy(&pid, recv_msg.start, sizeof(pid));
734 
735         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
736                        port_msg->stream, (int) pid);
737 
738         cb->remove_pid(ctx, pid);
739 
740         rc = NXT_UNIT_OK;
741         break;
742 
743     default:
744         nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
745                        port_msg->stream, (int) port_msg->type);
746 
747         goto fail;
748     }
749 
750 fail:
751 
752     if (recv_msg.fd != -1) {
753         close(recv_msg.fd);
754     }
755 
756     while (recv_msg.incoming_buf != NULL) {
757         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
758     }
759 
760     if (recv_msg.process != NULL) {
761         nxt_unit_process_use(ctx, recv_msg.process, -1);
762     }
763 
764     return rc;
765 }
766 
767 
768 static int
769 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
770 {
771     int                      nb;
772     nxt_unit_impl_t          *lib;
773     nxt_unit_port_t          new_port;
774     nxt_port_msg_new_port_t  *new_port_msg;
775 
776     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
777         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
778                       "invalid message size (%d)",
779                       recv_msg->stream, (int) recv_msg->size);
780 
781         return NXT_UNIT_ERROR;
782     }
783 
784     if (nxt_slow_path(recv_msg->fd < 0)) {
785         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
786                        recv_msg->stream, recv_msg->fd);
787 
788         return NXT_UNIT_ERROR;
789     }
790 
791     new_port_msg = recv_msg->start;
792 
793     nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
794                    recv_msg->stream, (int) new_port_msg->pid,
795                    (int) new_port_msg->id, recv_msg->fd);
796 
797     nb = 0;
798 
799     if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
800         nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
801                        "failed: %s (%d)", recv_msg->fd, strerror(errno), errno);
802 
803         return NXT_UNIT_ERROR;
804     }
805 
806     nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
807                           new_port_msg->id);
808 
809     new_port.in_fd = -1;
810     new_port.out_fd = recv_msg->fd;
811     new_port.data = NULL;
812 
813     recv_msg->fd = -1;
814 
815     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
816 
817     return lib->callbacks.add_port(ctx, &new_port);
818 }
819 
820 
821 static int
822 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
823 {
824     nxt_unit_impl_t               *lib;
825     nxt_unit_request_t            *r;
826     nxt_unit_mmap_buf_t           *b;
827     nxt_unit_request_info_t       *req;
828     nxt_unit_request_info_impl_t  *req_impl;
829 
830     if (nxt_slow_path(recv_msg->mmap == 0)) {
831         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
832                       recv_msg->stream);
833 
834         return NXT_UNIT_ERROR;
835     }
836 
837     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
838         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
839                       "%d expected", recv_msg->stream, (int) recv_msg->size,
840                       (int) sizeof(nxt_unit_request_t));
841 
842         return NXT_UNIT_ERROR;
843     }
844 
845     req_impl = nxt_unit_request_info_get(ctx);
846     if (nxt_slow_path(req_impl == NULL)) {
847         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
848                       recv_msg->stream);
849 
850         return NXT_UNIT_ERROR;
851     }
852 
853     req = &req_impl->req;
854 
855     nxt_unit_port_id_init(&req->response_port, recv_msg->pid,
856                           recv_msg->reply_port);
857 
858     req->request = recv_msg->start;
859 
860     b = recv_msg->incoming_buf;
861 
862     req->request_buf = &b->buf;
863     req->response = NULL;
864     req->response_buf = NULL;
865 
866     r = req->request;
867 
868     req->content_length = r->content_length;
869 
870     req->content_buf = req->request_buf;
871     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
872 
873     /* "Move" process reference to req_impl. */
874     req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
875     if (nxt_slow_path(req_impl->process == NULL)) {
876         return NXT_UNIT_ERROR;
877     }
878 
879     recv_msg->process = NULL;
880 
881     req_impl->stream = recv_msg->stream;
882 
883     req_impl->outgoing_buf = NULL;
884 
885     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
886         b->req = req;
887     }
888 
889     /* "Move" incoming buffer list to req_impl. */
890     req_impl->incoming_buf = recv_msg->incoming_buf;
891     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
892     recv_msg->incoming_buf = NULL;
893 
894     req->response_max_fields = 0;
895     req_impl->state = NXT_UNIT_RS_START;
896     req_impl->websocket = 0;
897 
898     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
899                    (int) r->method_length, nxt_unit_sptr_get(&r->method),
900                    (int) r->target_length, nxt_unit_sptr_get(&r->target),
901                    (int) r->content_length);
902 
903     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
904 
905     lib->callbacks.request_handler(req);
906 
907     return NXT_UNIT_OK;
908 }
909 
910 
911 static int
912 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
913 {
914     size_t                           hsize;
915     nxt_unit_impl_t                  *lib;
916     nxt_unit_mmap_buf_t              *b;
917     nxt_unit_ctx_impl_t              *ctx_impl;
918     nxt_unit_callbacks_t             *cb;
919     nxt_unit_request_info_t          *req;
920     nxt_unit_request_info_impl_t     *req_impl;
921     nxt_unit_websocket_frame_impl_t  *ws_impl;
922 
923     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
924 
925     req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
926                                           recv_msg->last);
927     if (req_impl == NULL) {
928         return NXT_UNIT_OK;
929     }
930 
931     req = &req_impl->req;
932 
933     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
934     cb = &lib->callbacks;
935 
936     if (cb->websocket_handler && recv_msg->size >= 2) {
937         ws_impl = nxt_unit_websocket_frame_get(ctx);
938         if (nxt_slow_path(ws_impl == NULL)) {
939             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
940                           req_impl->stream);
941 
942             return NXT_UNIT_ERROR;
943         }
944 
945         ws_impl->ws.req = req;
946 
947         ws_impl->buf = NULL;
948         ws_impl->retain_buf = NULL;
949 
950         if (recv_msg->mmap) {
951             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
952                 b->req = req;
953             }
954 
955             /* "Move" incoming buffer list to ws_impl. */
956             ws_impl->buf = recv_msg->incoming_buf;
957             ws_impl->buf->prev = &ws_impl->buf;
958             recv_msg->incoming_buf = NULL;
959 
960             b = ws_impl->buf;
961 
962         } else {
963             b = nxt_unit_mmap_buf_get(ctx);
964             if (nxt_slow_path(b == NULL)) {
965                 return NXT_UNIT_ERROR;
966             }
967 
968             b->hdr = NULL;
969             b->req = req;
970             b->buf.start = recv_msg->start;
971             b->buf.free = b->buf.start;
972             b->buf.end = b->buf.start + recv_msg->size;
973 
974             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
975         }
976 
977         ws_impl->ws.header = (void *) b->buf.start;
978         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
979             ws_impl->ws.header);
980 
981         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
982 
983         if (ws_impl->ws.header->mask) {
984             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
985 
986         } else {
987             ws_impl->ws.mask = NULL;
988         }
989 
990         b->buf.free += hsize;
991 
992         ws_impl->ws.content_buf = &b->buf;
993         ws_impl->ws.content_length = ws_impl->ws.payload_len;
994 
995         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
996                            "payload_len=%"PRIu64,
997                             ws_impl->ws.header->opcode,
998                             ws_impl->ws.payload_len);
999 
1000         cb->websocket_handler(&ws_impl->ws);
1001     }
1002 
1003     if (recv_msg->last) {
1004         req_impl->websocket = 0;
1005 
1006         if (cb->close_handler) {
1007             nxt_unit_req_debug(req, "close_handler");
1008 
1009             cb->close_handler(req);
1010 
1011         } else {
1012             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1013         }
1014     }
1015 
1016     return NXT_UNIT_OK;
1017 }
1018 
1019 
1020 static nxt_unit_request_info_impl_t *
1021 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1022 {
1023     nxt_unit_impl_t               *lib;
1024     nxt_queue_link_t              *lnk;
1025     nxt_unit_ctx_impl_t           *ctx_impl;
1026     nxt_unit_request_info_impl_t  *req_impl;
1027 
1028     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1029 
1030     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1031 
1032     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1033         req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
1034                           + lib->request_data_size);
1035         if (nxt_slow_path(req_impl == NULL)) {
1036             nxt_unit_warn(ctx, "request info allocation failed");
1037 
1038             return NULL;
1039         }
1040 
1041         req_impl->req.unit = ctx->unit;
1042         req_impl->req.ctx = ctx;
1043 
1044     } else {
1045         lnk = nxt_queue_first(&ctx_impl->free_req);
1046         nxt_queue_remove(lnk);
1047 
1048         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1049     }
1050 
1051     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1052 
1053     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1054 
1055     return req_impl;
1056 }
1057 
1058 
1059 static void
1060 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1061 {
1062     nxt_unit_ctx_impl_t           *ctx_impl;
1063     nxt_unit_request_info_impl_t  *req_impl;
1064 
1065     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1066     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1067 
1068     req->response = NULL;
1069     req->response_buf = NULL;
1070 
1071     if (req_impl->process != NULL) {
1072         nxt_unit_process_use(req->ctx, req_impl->process, -1);
1073 
1074         req_impl->process = NULL;
1075     }
1076 
1077     if (req_impl->websocket) {
1078         nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
1079 
1080         req_impl->websocket = 0;
1081     }
1082 
1083     while (req_impl->outgoing_buf != NULL) {
1084         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1085     }
1086 
1087     while (req_impl->incoming_buf != NULL) {
1088         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1089     }
1090 
1091     nxt_queue_remove(&req_impl->link);
1092 
1093     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1094 
1095     req_impl->state = NXT_UNIT_RS_RELEASED;
1096 }
1097 
1098 
1099 static void
1100 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1101 {
1102     nxt_unit_ctx_impl_t  *ctx_impl;
1103 
1104     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1105 
1106     nxt_queue_remove(&req_impl->link);
1107 
1108     if (req_impl != &ctx_impl->req) {
1109         free(req_impl);
1110     }
1111 }
1112 
1113 
1114 static nxt_unit_websocket_frame_impl_t *
1115 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1116 {
1117     nxt_queue_link_t                 *lnk;
1118     nxt_unit_ctx_impl_t              *ctx_impl;
1119     nxt_unit_websocket_frame_impl_t  *ws_impl;
1120 
1121     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1122 
1123     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1124         ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
1125         if (nxt_slow_path(ws_impl == NULL)) {
1126             nxt_unit_warn(ctx, "websocket frame allocation failed");
1127 
1128             return NULL;
1129         }
1130 
1131     } else {
1132         lnk = nxt_queue_first(&ctx_impl->free_ws);
1133         nxt_queue_remove(lnk);
1134 
1135         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1136     }
1137 
1138     ws_impl->ctx_impl = ctx_impl;
1139 
1140     return ws_impl;
1141 }
1142 
1143 
1144 static void
1145 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1146 {
1147     nxt_unit_websocket_frame_impl_t  *ws_impl;
1148 
1149     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1150 
1151     while (ws_impl->buf != NULL) {
1152         nxt_unit_mmap_buf_free(ws_impl->buf);
1153     }
1154 
1155     ws->req = NULL;
1156 
1157     if (ws_impl->retain_buf != NULL) {
1158         free(ws_impl->retain_buf);
1159 
1160         ws_impl->retain_buf = NULL;
1161     }
1162 
1163     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1164 }
1165 
1166 
1167 static void
1168 nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
1169 {
1170     nxt_queue_remove(&ws_impl->link);
1171 
1172     free(ws_impl);
1173 }
1174 
1175 
1176 uint16_t
1177 nxt_unit_field_hash(const char *name, size_t name_length)
1178 {
1179     u_char      ch;
1180     uint32_t    hash;
1181     const char  *p, *end;
1182 
1183     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1184     end = name + name_length;
1185 
1186     for (p = name; p < end; p++) {
1187         ch = *p;
1188         hash = (hash << 4) + hash + nxt_lowcase(ch);
1189     }
1190 
1191     hash = (hash >> 16) ^ hash;
1192 
1193     return hash;
1194 }
1195 
1196 
1197 void
1198 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1199 {
1200     uint32_t            i, j;
1201     nxt_unit_field_t    *fields, f;
1202     nxt_unit_request_t  *r;
1203 
1204     nxt_unit_req_debug(req, "group_dup_fields");
1205 
1206     r = req->request;
1207     fields = r->fields;
1208 
1209     for (i = 0; i < r->fields_count; i++) {
1210 
1211         switch (fields[i].hash) {
1212         case NXT_UNIT_HASH_CONTENT_LENGTH:
1213             r->content_length_field = i;
1214             break;
1215 
1216         case NXT_UNIT_HASH_CONTENT_TYPE:
1217             r->content_type_field = i;
1218             break;
1219 
1220         case NXT_UNIT_HASH_COOKIE:
1221             r->cookie_field = i;
1222             break;
1223         };
1224 
1225         for (j = i + 1; j < r->fields_count; j++) {
1226             if (fields[i].hash != fields[j].hash) {
1227                 continue;
1228             }
1229 
1230             if (j == i + 1) {
1231                 continue;
1232             }
1233 
1234             f = fields[j];
1235             f.name.offset += (j - (i + 1)) * sizeof(f);
1236             f.value.offset += (j - (i + 1)) * sizeof(f);
1237 
1238             while (j > i + 1) {
1239                 fields[j] = fields[j - 1];
1240                 fields[j].name.offset -= sizeof(f);
1241                 fields[j].value.offset -= sizeof(f);
1242                 j--;
1243             }
1244 
1245             fields[j] = f;
1246 
1247             i++;
1248         }
1249     }
1250 }
1251 
1252 
1253 int
1254 nxt_unit_response_init(nxt_unit_request_info_t *req,
1255     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
1256 {
1257     uint32_t                      buf_size;
1258     nxt_unit_buf_t                *buf;
1259     nxt_unit_request_info_impl_t  *req_impl;
1260 
1261     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1262 
1263     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1264         nxt_unit_req_warn(req, "init: response already sent");
1265 
1266         return NXT_UNIT_ERROR;
1267     }
1268 
1269     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
1270                        (int) max_fields_count, (int) max_fields_size);
1271 
1272     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
1273         nxt_unit_req_debug(req, "duplicate response init");
1274     }
1275 
1276     buf_size = sizeof(nxt_unit_response_t)
1277                + max_fields_count * sizeof(nxt_unit_field_t)
1278                + max_fields_size;
1279 
1280     if (nxt_slow_path(req->response_buf != NULL)) {
1281         buf = req->response_buf;
1282 
1283         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
1284             goto init_response;
1285         }
1286 
1287         nxt_unit_buf_free(buf);
1288 
1289         req->response_buf = NULL;
1290         req->response = NULL;
1291         req->response_max_fields = 0;
1292 
1293         req_impl->state = NXT_UNIT_RS_START;
1294     }
1295 
1296     buf = nxt_unit_response_buf_alloc(req, buf_size);
1297     if (nxt_slow_path(buf == NULL)) {
1298         return NXT_UNIT_ERROR;
1299     }
1300 
1301 init_response:
1302 
1303     memset(buf->start, 0, sizeof(nxt_unit_response_t));
1304 
1305     req->response_buf = buf;
1306 
1307     req->response = (nxt_unit_response_t *) buf->start;
1308     req->response->status = status;
1309 
1310     buf->free = buf->start + sizeof(nxt_unit_response_t)
1311                 + max_fields_count * sizeof(nxt_unit_field_t);
1312 
1313     req->response_max_fields = max_fields_count;
1314     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
1315 
1316     return NXT_UNIT_OK;
1317 }
1318 
1319 
1320 int
1321 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
1322     uint32_t max_fields_count, uint32_t max_fields_size)
1323 {
1324     char                          *p;
1325     uint32_t                      i, buf_size;
1326     nxt_unit_buf_t                *buf;
1327     nxt_unit_field_t              *f, *src;
1328     nxt_unit_response_t           *resp;
1329     nxt_unit_request_info_impl_t  *req_impl;
1330 
1331     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1332 
1333     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1334         nxt_unit_req_warn(req, "realloc: response not init");
1335 
1336         return NXT_UNIT_ERROR;
1337     }
1338 
1339     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1340         nxt_unit_req_warn(req, "realloc: response already sent");
1341 
1342         return NXT_UNIT_ERROR;
1343     }
1344 
1345     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
1346         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
1347 
1348         return NXT_UNIT_ERROR;
1349     }
1350 
1351     buf_size = sizeof(nxt_unit_response_t)
1352                + max_fields_count * sizeof(nxt_unit_field_t)
1353                + max_fields_size;
1354 
1355     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
1356 
1357     buf = nxt_unit_response_buf_alloc(req, buf_size);
1358     if (nxt_slow_path(buf == NULL)) {
1359         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
1360         return NXT_UNIT_ERROR;
1361     }
1362 
1363     resp = (nxt_unit_response_t *) buf->start;
1364 
1365     memset(resp, 0, sizeof(nxt_unit_response_t));
1366 
1367     resp->status = req->response->status;
1368     resp->content_length = req->response->content_length;
1369 
1370     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
1371     f = resp->fields;
1372 
1373     for (i = 0; i < req->response->fields_count; i++) {
1374         src = req->response->fields + i;
1375 
1376         if (nxt_slow_path(src->skip != 0)) {
1377             continue;
1378         }
1379 
1380         if (nxt_slow_path(src->name_length + src->value_length + 2
1381                           > (uint32_t) (buf->end - p)))
1382         {
1383             nxt_unit_req_warn(req, "realloc: not enough space for field"
1384                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
1385                   i, src, src->name_length, src->value_length);
1386 
1387             goto fail;
1388         }
1389 
1390         nxt_unit_sptr_set(&f->name, p);
1391         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
1392         *p++ = '\0';
1393 
1394         nxt_unit_sptr_set(&f->value, p);
1395         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
1396         *p++ = '\0';
1397 
1398         f->hash = src->hash;
1399         f->skip = 0;
1400         f->name_length = src->name_length;
1401         f->value_length = src->value_length;
1402 
1403         resp->fields_count++;
1404         f++;
1405     }
1406 
1407     if (req->response->piggyback_content_length > 0) {
1408         if (nxt_slow_path(req->response->piggyback_content_length
1409                           > (uint32_t) (buf->end - p)))
1410         {
1411             nxt_unit_req_warn(req, "realloc: not enought space for content"
1412                   " #%"PRIu32", %"PRIu32" required",
1413                   i, req->response->piggyback_content_length);
1414 
1415             goto fail;
1416         }
1417 
1418         resp->piggyback_content_length = req->response->piggyback_content_length;
1419 
1420         nxt_unit_sptr_set(&resp->piggyback_content, p);
1421         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
1422                        req->response->piggyback_content_length);
1423     }
1424 
1425     buf->free = p;
1426 
1427     nxt_unit_buf_free(req->response_buf);
1428 
1429     req->response = resp;
1430     req->response_buf = buf;
1431     req->response_max_fields = max_fields_count;
1432 
1433     return NXT_UNIT_OK;
1434 
1435 fail:
1436 
1437     nxt_unit_buf_free(buf);
1438 
1439     return NXT_UNIT_ERROR;
1440 }
1441 
1442 
1443 int
1444 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
1445 {
1446     nxt_unit_request_info_impl_t  *req_impl;
1447 
1448     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1449 
1450     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
1451 }
1452 
1453 
1454 int
1455 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
1456     const char *name, uint8_t name_length,
1457     const char *value, uint32_t value_length)
1458 {
1459     nxt_unit_buf_t                *buf;
1460     nxt_unit_field_t              *f;
1461     nxt_unit_response_t           *resp;
1462     nxt_unit_request_info_impl_t  *req_impl;
1463 
1464     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1465 
1466     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
1467         nxt_unit_req_warn(req, "add_field: response not initialized or "
1468                           "already sent");
1469 
1470         return NXT_UNIT_ERROR;
1471     }
1472 
1473     resp = req->response;
1474 
1475     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
1476         nxt_unit_req_warn(req, "add_field: too many response fields");
1477 
1478         return NXT_UNIT_ERROR;
1479     }
1480 
1481     buf = req->response_buf;
1482 
1483     if (nxt_slow_path(name_length + value_length + 2
1484                       > (uint32_t) (buf->end - buf->free)))
1485     {
1486         nxt_unit_req_warn(req, "add_field: response buffer overflow");
1487 
1488         return NXT_UNIT_ERROR;
1489     }
1490 
1491     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
1492                        resp->fields_count,
1493                        (int) name_length, name,
1494                        (int) value_length, value);
1495 
1496     f = resp->fields + resp->fields_count;
1497 
1498     nxt_unit_sptr_set(&f->name, buf->free);
1499     buf->free = nxt_cpymem(buf->free, name, name_length);
1500     *buf->free++ = '\0';
1501 
1502     nxt_unit_sptr_set(&f->value, buf->free);
1503     buf->free = nxt_cpymem(buf->free, value, value_length);
1504     *buf->free++ = '\0';
1505 
1506     f->hash = nxt_unit_field_hash(name, name_length);
1507     f->skip = 0;
1508     f->name_length = name_length;
1509     f->value_length = value_length;
1510 
1511     resp->fields_count++;
1512 
1513     return NXT_UNIT_OK;
1514 }
1515 
1516 
1517 int
1518 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
1519     const void* src, uint32_t size)
1520 {
1521     nxt_unit_buf_t                *buf;
1522     nxt_unit_response_t           *resp;
1523     nxt_unit_request_info_impl_t  *req_impl;
1524 
1525     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1526 
1527     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1528         nxt_unit_req_warn(req, "add_content: response not initialized yet");
1529 
1530         return NXT_UNIT_ERROR;
1531     }
1532 
1533     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1534         nxt_unit_req_warn(req, "add_content: response already sent");
1535 
1536         return NXT_UNIT_ERROR;
1537     }
1538 
1539     buf = req->response_buf;
1540 
1541     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
1542         nxt_unit_req_warn(req, "add_content: buffer overflow");
1543 
1544         return NXT_UNIT_ERROR;
1545     }
1546 
1547     resp = req->response;
1548 
1549     if (resp->piggyback_content_length == 0) {
1550         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
1551         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
1552     }
1553 
1554     resp->piggyback_content_length += size;
1555 
1556     buf->free = nxt_cpymem(buf->free, src, size);
1557 
1558     return NXT_UNIT_OK;
1559 }
1560 
1561 
1562 int
1563 nxt_unit_response_send(nxt_unit_request_info_t *req)
1564 {
1565     int                           rc;
1566     nxt_unit_mmap_buf_t           *mmap_buf;
1567     nxt_unit_request_info_impl_t  *req_impl;
1568 
1569     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1570 
1571     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1572         nxt_unit_req_warn(req, "send: response is not initialized yet");
1573 
1574         return NXT_UNIT_ERROR;
1575     }
1576 
1577     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1578         nxt_unit_req_warn(req, "send: response already sent");
1579 
1580         return NXT_UNIT_ERROR;
1581     }
1582 
1583     if (req->request->websocket_handshake && req->response->status == 101) {
1584         nxt_unit_response_upgrade(req);
1585     }
1586 
1587     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
1588                        req->response->fields_count,
1589                        (int) (req->response_buf->free
1590                               - req->response_buf->start));
1591 
1592     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
1593 
1594     rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
1595     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
1596         req->response = NULL;
1597         req->response_buf = NULL;
1598         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
1599 
1600         nxt_unit_mmap_buf_release(mmap_buf);
1601     }
1602 
1603     return rc;
1604 }
1605 
1606 
1607 int
1608 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
1609 {
1610     nxt_unit_request_info_impl_t  *req_impl;
1611 
1612     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1613 
1614     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
1615 }
1616 
1617 
1618 nxt_unit_buf_t *
1619 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
1620 {
1621     int                           rc;
1622     nxt_unit_mmap_buf_t           *mmap_buf;
1623     nxt_unit_request_info_impl_t  *req_impl;
1624 
1625     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
1626         nxt_unit_req_warn(req, "response_buf_alloc: "
1627                           "requested buffer (%"PRIu32") too big", size);
1628 
1629         return NULL;
1630     }
1631 
1632     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
1633 
1634     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1635 
1636     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
1637     if (nxt_slow_path(mmap_buf == NULL)) {
1638         return NULL;
1639     }
1640 
1641     mmap_buf->req = req;
1642 
1643     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
1644 
1645     rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
1646                                    &req->response_port, size, mmap_buf);
1647     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1648         nxt_unit_mmap_buf_release(mmap_buf);
1649 
1650         return NULL;
1651     }
1652 
1653     return &mmap_buf->buf;
1654 }
1655 
1656 
1657 static nxt_unit_process_t *
1658 nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1659 {
1660     nxt_unit_impl_t  *lib;
1661 
1662     if (recv_msg->process != NULL) {
1663         return recv_msg->process;
1664     }
1665 
1666     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1667 
1668     pthread_mutex_lock(&lib->mutex);
1669 
1670     recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0);
1671 
1672     pthread_mutex_unlock(&lib->mutex);
1673 
1674     if (recv_msg->process == NULL) {
1675         nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
1676                       recv_msg->stream, (int) recv_msg->pid);
1677     }
1678 
1679     return recv_msg->process;
1680 }
1681 
1682 
1683 static nxt_unit_mmap_buf_t *
1684 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
1685 {
1686     nxt_unit_mmap_buf_t  *mmap_buf;
1687     nxt_unit_ctx_impl_t  *ctx_impl;
1688 
1689     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1690 
1691     if (ctx_impl->free_buf == NULL) {
1692         mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
1693         if (nxt_slow_path(mmap_buf == NULL)) {
1694             nxt_unit_warn(ctx, "failed to allocate buf");
1695         }
1696 
1697     } else {
1698         mmap_buf = ctx_impl->free_buf;
1699 
1700         nxt_unit_mmap_buf_remove(mmap_buf);
1701     }
1702 
1703     mmap_buf->ctx_impl = ctx_impl;
1704 
1705     return mmap_buf;
1706 }
1707 
1708 
1709 static void
1710 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
1711 {
1712     nxt_unit_mmap_buf_remove(mmap_buf);
1713 
1714     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
1715 }
1716 
1717 
1718 typedef struct {
1719     size_t      len;
1720     const char  *str;
1721 } nxt_unit_str_t;
1722 
1723 
1724 #define nxt_unit_str(str)  { nxt_length(str), str }
1725 
1726 
1727 int
1728 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
1729 {
1730     return req->request->websocket_handshake;
1731 }
1732 
1733 
1734 int
1735 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
1736 {
1737     int                           rc;
1738     nxt_unit_ctx_impl_t           *ctx_impl;
1739     nxt_unit_request_info_impl_t  *req_impl;
1740 
1741     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1742 
1743     if (nxt_slow_path(req_impl->websocket != 0)) {
1744         nxt_unit_req_debug(req, "upgrade: already upgraded");
1745 
1746         return NXT_UNIT_OK;
1747     }
1748 
1749     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1750         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
1751 
1752         return NXT_UNIT_ERROR;
1753     }
1754 
1755     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1756         nxt_unit_req_warn(req, "upgrade: response already sent");
1757 
1758         return NXT_UNIT_ERROR;
1759     }
1760 
1761     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1762 
1763     rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
1764     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1765         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
1766 
1767         return NXT_UNIT_ERROR;
1768     }
1769 
1770     req_impl->websocket = 1;
1771 
1772     req->response->status = 101;
1773 
1774     return NXT_UNIT_OK;
1775 }
1776 
1777 
1778 int
1779 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
1780 {
1781     nxt_unit_request_info_impl_t  *req_impl;
1782 
1783     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1784 
1785     return req_impl->websocket;
1786 }
1787 
1788 
1789 nxt_unit_request_info_t *
1790 nxt_unit_get_request_info_from_data(void *data)
1791 {
1792     nxt_unit_request_info_impl_t  *req_impl;
1793 
1794     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
1795 
1796     return &req_impl->req;
1797 }
1798 
1799 
1800 int
1801 nxt_unit_buf_send(nxt_unit_buf_t *buf)
1802 {
1803     int                           rc;
1804     nxt_unit_mmap_buf_t           *mmap_buf;
1805     nxt_unit_request_info_t       *req;
1806     nxt_unit_request_info_impl_t  *req_impl;
1807 
1808     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1809 
1810     req = mmap_buf->req;
1811     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1812 
1813     nxt_unit_req_debug(req, "buf_send: %d bytes",
1814                        (int) (buf->free - buf->start));
1815 
1816     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1817         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
1818 
1819         return NXT_UNIT_ERROR;
1820     }
1821 
1822     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
1823         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
1824 
1825         return NXT_UNIT_ERROR;
1826     }
1827 
1828     if (nxt_fast_path(buf->free > buf->start)) {
1829         rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
1830         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1831             return rc;
1832         }
1833     }
1834 
1835     nxt_unit_mmap_buf_release(mmap_buf);
1836 
1837     return NXT_UNIT_OK;
1838 }
1839 
1840 
1841 static void
1842 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
1843 {
1844     int                           rc;
1845     nxt_unit_mmap_buf_t           *mmap_buf;
1846     nxt_unit_request_info_t       *req;
1847     nxt_unit_request_info_impl_t  *req_impl;
1848 
1849     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1850 
1851     req = mmap_buf->req;
1852     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1853 
1854     rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
1855     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
1856         nxt_unit_mmap_buf_release(mmap_buf);
1857 
1858         nxt_unit_request_info_release(req);
1859 
1860     } else {
1861         nxt_unit_request_done(req, rc);
1862     }
1863 }
1864 
1865 
1866 static int
1867 nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
1868     nxt_unit_mmap_buf_t *mmap_buf, int last)
1869 {
1870     struct {
1871         nxt_port_msg_t       msg;
1872         nxt_port_mmap_msg_t  mmap_msg;
1873     } m;
1874 
1875     u_char                   *end, *last_used, *first_free;
1876     ssize_t                  res;
1877     nxt_chunk_id_t           first_free_chunk;
1878     nxt_unit_buf_t           *buf;
1879     nxt_unit_impl_t          *lib;
1880     nxt_port_mmap_header_t   *hdr;
1881 
1882     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1883 
1884     buf = &mmap_buf->buf;
1885     hdr = mmap_buf->hdr;
1886 
1887     m.mmap_msg.size = buf->free - buf->start;
1888 
1889     m.msg.stream = stream;
1890     m.msg.pid = lib->pid;
1891     m.msg.reply_port = 0;
1892     m.msg.type = _NXT_PORT_MSG_DATA;
1893     m.msg.last = last != 0;
1894     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
1895     m.msg.nf = 0;
1896     m.msg.mf = 0;
1897     m.msg.tracking = 0;
1898 
1899     if (hdr != NULL) {
1900         m.mmap_msg.mmap_id = hdr->id;
1901         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start);
1902     }
1903 
1904     nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
1905                    stream,
1906                    (int) m.mmap_msg.mmap_id,
1907                    (int) m.mmap_msg.chunk_id,
1908                    (int) m.mmap_msg.size);
1909 
1910     res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m,
1911                                    m.msg.mmap ? sizeof(m) : sizeof(m.msg),
1912                                    NULL, 0);
1913     if (nxt_slow_path(res != sizeof(m))) {
1914         return NXT_UNIT_ERROR;
1915     }
1916 
1917     if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) {
1918         last_used = (u_char *) buf->free - 1;
1919 
1920         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
1921         first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
1922         end = (u_char *) buf->end;
1923 
1924         nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free));
1925 
1926         buf->end = (char *) first_free;
1927     }
1928 
1929     return NXT_UNIT_OK;
1930 }
1931 
1932 
1933 void
1934 nxt_unit_buf_free(nxt_unit_buf_t *buf)
1935 {
1936     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
1937 }
1938 
1939 
1940 static void
1941 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
1942 {
1943     if (nxt_fast_path(mmap_buf->hdr != NULL)) {
1944         nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start,
1945                               mmap_buf->buf.end - mmap_buf->buf.start);
1946     }
1947 
1948     nxt_unit_mmap_buf_release(mmap_buf);
1949 }
1950 
1951 
1952 nxt_unit_buf_t *
1953 nxt_unit_buf_next(nxt_unit_buf_t *buf)
1954 {
1955     nxt_unit_mmap_buf_t  *mmap_buf;
1956 
1957     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1958 
1959     if (mmap_buf->next == NULL) {
1960         return NULL;
1961     }
1962 
1963     return &mmap_buf->next->buf;
1964 }
1965 
1966 
1967 uint32_t
1968 nxt_unit_buf_max(void)
1969 {
1970     return PORT_MMAP_DATA_SIZE;
1971 }
1972 
1973 
1974 uint32_t
1975 nxt_unit_buf_min(void)
1976 {
1977     return PORT_MMAP_CHUNK_SIZE;
1978 }
1979 
1980 
1981 int
1982 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
1983     size_t size)
1984 {
1985     int                           rc;
1986     uint32_t                      part_size;
1987     const char                    *part_start;
1988     nxt_unit_mmap_buf_t           mmap_buf;
1989     nxt_unit_request_info_impl_t  *req_impl;
1990 
1991     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1992 
1993     part_start = start;
1994 
1995     /* Check if response is not send yet. */
1996     if (nxt_slow_path(req->response_buf)) {
1997         part_size = req->response_buf->end - req->response_buf->free;
1998         part_size = nxt_min(size, part_size);
1999 
2000         rc = nxt_unit_response_add_content(req, part_start, part_size);
2001         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2002             return rc;
2003         }
2004 
2005         rc = nxt_unit_response_send(req);
2006         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2007             return rc;
2008         }
2009 
2010         size -= part_size;
2011         part_start += part_size;
2012     }
2013 
2014     while (size > 0) {
2015         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2016 
2017         rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
2018                                        &req->response_port, part_size,
2019                                        &mmap_buf);
2020         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2021             return rc;
2022         }
2023 
2024         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2025                                        part_start, part_size);
2026 
2027         rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
2028         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2029             nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start,
2030                                   mmap_buf.buf.end - mmap_buf.buf.start);
2031 
2032             return rc;
2033         }
2034 
2035         size -= part_size;
2036         part_start += part_size;
2037     }
2038 
2039     return NXT_UNIT_OK;
2040 }
2041 
2042 
2043 int
2044 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2045     nxt_unit_read_info_t *read_info)
2046 {
2047     int             rc;
2048     ssize_t         n;
2049     nxt_unit_buf_t  *buf;
2050 
2051     /* Check if response is not send yet. */
2052     if (nxt_slow_path(req->response_buf)) {
2053 
2054         /* Enable content in headers buf. */
2055         rc = nxt_unit_response_add_content(req, "", 0);
2056         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2057             nxt_unit_req_error(req, "Failed to add piggyback content");
2058 
2059             return rc;
2060         }
2061 
2062         buf = req->response_buf;
2063 
2064         while (buf->end - buf->free > 0) {
2065             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2066             if (nxt_slow_path(n < 0)) {
2067                 nxt_unit_req_error(req, "Read error");
2068 
2069                 return NXT_UNIT_ERROR;
2070             }
2071 
2072             /* Manually increase sizes. */
2073             buf->free += n;
2074             req->response->piggyback_content_length += n;
2075 
2076             if (read_info->eof) {
2077                 break;
2078             }
2079         }
2080 
2081         rc = nxt_unit_response_send(req);
2082         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2083             nxt_unit_req_error(req, "Failed to send headers with content");
2084 
2085             return rc;
2086         }
2087 
2088         if (read_info->eof) {
2089             return NXT_UNIT_OK;
2090         }
2091     }
2092 
2093     while (!read_info->eof) {
2094         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
2095                            read_info->buf_size);
2096 
2097         buf = nxt_unit_response_buf_alloc(req, nxt_min(read_info->buf_size,
2098                                                        PORT_MMAP_DATA_SIZE));
2099         if (nxt_slow_path(buf == NULL)) {
2100             nxt_unit_req_error(req, "Failed to allocate buf for content");
2101 
2102             return NXT_UNIT_ERROR;
2103         }
2104 
2105         while (!read_info->eof && buf->end > buf->free) {
2106             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2107             if (nxt_slow_path(n < 0)) {
2108                 nxt_unit_req_error(req, "Read error");
2109 
2110                 nxt_unit_buf_free(buf);
2111 
2112                 return NXT_UNIT_ERROR;
2113             }
2114 
2115             buf->free += n;
2116         }
2117 
2118         rc = nxt_unit_buf_send(buf);
2119         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2120             nxt_unit_req_error(req, "Failed to send content");
2121 
2122             return rc;
2123         }
2124     }
2125 
2126     return NXT_UNIT_OK;
2127 }
2128 
2129 
2130 ssize_t
2131 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
2132 {
2133     return nxt_unit_buf_read(&req->content_buf, &req->content_length,
2134                              dst, size);
2135 }
2136 
2137 
2138 static ssize_t
2139 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
2140 {
2141     u_char          *p;
2142     size_t          rest, copy, read;
2143     nxt_unit_buf_t  *buf;
2144 
2145     p = dst;
2146     rest = size;
2147 
2148     buf = *b;
2149 
2150     while (buf != NULL) {
2151         copy = buf->end - buf->free;
2152         copy = nxt_min(rest, copy);
2153 
2154         p = nxt_cpymem(p, buf->free, copy);
2155 
2156         buf->free += copy;
2157         rest -= copy;
2158 
2159         if (rest == 0) {
2160             if (buf->end == buf->free) {
2161                 buf = nxt_unit_buf_next(buf);
2162             }
2163 
2164             break;
2165         }
2166 
2167         buf = nxt_unit_buf_next(buf);
2168     }
2169 
2170     *b = buf;
2171 
2172     read = size - rest;
2173 
2174     *len -= read;
2175 
2176     return read;
2177 }
2178 
2179 
2180 void
2181 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
2182 {
2183     ssize_t                       res;
2184     uint32_t                      size;
2185     nxt_port_msg_t                msg;
2186     nxt_unit_impl_t               *lib;
2187     nxt_unit_request_info_impl_t  *req_impl;
2188 
2189     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2190 
2191     nxt_unit_req_debug(req, "done: %d", rc);
2192 
2193     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2194         goto skip_response_send;
2195     }
2196 
2197     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2198 
2199         size = nxt_length("Content-Type") + nxt_length("text/plain");
2200 
2201         rc = nxt_unit_response_init(req, 200, 1, size);
2202         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2203             goto skip_response_send;
2204         }
2205 
2206         rc = nxt_unit_response_add_field(req, "Content-Type",
2207                                    nxt_length("Content-Type"),
2208                                    "text/plain", nxt_length("text/plain"));
2209         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2210             goto skip_response_send;
2211         }
2212     }
2213 
2214     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2215 
2216         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2217 
2218         nxt_unit_buf_send_done(req->response_buf);
2219 
2220         return;
2221     }
2222 
2223 skip_response_send:
2224 
2225     lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
2226 
2227     msg.stream = req_impl->stream;
2228     msg.pid = lib->pid;
2229     msg.reply_port = 0;
2230     msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
2231                                    : _NXT_PORT_MSG_RPC_ERROR;
2232     msg.last = 1;
2233     msg.mmap = 0;
2234     msg.nf = 0;
2235     msg.mf = 0;
2236     msg.tracking = 0;
2237 
2238     res = lib->callbacks.port_send(req->ctx, &req->response_port,
2239                                    &msg, sizeof(msg), NULL, 0);
2240     if (nxt_slow_path(res != sizeof(msg))) {
2241         nxt_unit_req_alert(req, "last message send failed: %s (%d)",
2242                            strerror(errno), errno);
2243     }
2244 
2245     nxt_unit_request_info_release(req);
2246 }
2247 
2248 
2249 int
2250 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
2251     uint8_t last, const void *start, size_t size)
2252 {
2253     const struct iovec  iov = { (void *) start, size };
2254 
2255     return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
2256 }
2257 
2258 
2259 int
2260 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
2261     uint8_t last, const struct iovec *iov, int iovcnt)
2262 {
2263     int                     i, rc;
2264     size_t                  l, copy;
2265     uint32_t                payload_len, buf_size;
2266     const uint8_t           *b;
2267     nxt_unit_buf_t          *buf;
2268     nxt_websocket_header_t  *wh;
2269 
2270     payload_len = 0;
2271 
2272     for (i = 0; i < iovcnt; i++) {
2273         payload_len += iov[i].iov_len;
2274     }
2275 
2276     buf_size = 10 + payload_len;
2277 
2278     buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
2279                                                    PORT_MMAP_DATA_SIZE));
2280     if (nxt_slow_path(buf == NULL)) {
2281         nxt_unit_req_error(req, "Failed to allocate buf for content");
2282 
2283         return NXT_UNIT_ERROR;
2284     }
2285 
2286     buf->start[0] = 0;
2287     buf->start[1] = 0;
2288 
2289     wh = (void *) buf->free;
2290 
2291     buf->free = nxt_websocket_frame_init(wh, payload_len);
2292     wh->fin = last;
2293     wh->opcode = opcode;
2294 
2295     for (i = 0; i < iovcnt; i++) {
2296         b = iov[i].iov_base;
2297         l = iov[i].iov_len;
2298 
2299         while (l > 0) {
2300             copy = buf->end - buf->free;
2301             copy = nxt_min(l, copy);
2302 
2303             buf->free = nxt_cpymem(buf->free, b, copy);
2304             b += copy;
2305             l -= copy;
2306 
2307             if (l > 0) {
2308                 buf_size -= buf->end - buf->start;
2309 
2310                 rc = nxt_unit_buf_send(buf);
2311                 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2312                     nxt_unit_req_error(req, "Failed to send content");
2313 
2314                     return NXT_UNIT_ERROR;
2315                 }
2316 
2317                 buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
2318                                                           PORT_MMAP_DATA_SIZE));
2319                 if (nxt_slow_path(buf == NULL)) {
2320                     nxt_unit_req_error(req,
2321                                        "Failed to allocate buf for content");
2322 
2323                     return NXT_UNIT_ERROR;
2324                 }
2325             }
2326         }
2327     }
2328 
2329     if (buf->free > buf->start) {
2330         rc = nxt_unit_buf_send(buf);
2331         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2332             nxt_unit_req_error(req, "Failed to send content");
2333         }
2334     }
2335 
2336     return rc;
2337 }
2338 
2339 
2340 ssize_t
2341 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
2342     size_t size)
2343 {
2344     ssize_t   res;
2345     uint8_t   *b;
2346     uint64_t  i, d;
2347 
2348     res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
2349                             dst, size);
2350 
2351     if (ws->mask == NULL) {
2352         return res;
2353     }
2354 
2355     b = dst;
2356     d = (ws->payload_len - ws->content_length - res) % 4;
2357 
2358     for (i = 0; i < (uint64_t) res; i++) {
2359         b[i] ^= ws->mask[ (i + d) % 4 ];
2360     }
2361 
2362     return res;
2363 }
2364 
2365 
2366 int
2367 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
2368 {
2369     char                             *b;
2370     size_t                           size;
2371     nxt_unit_websocket_frame_impl_t  *ws_impl;
2372 
2373     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
2374 
2375     if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) {
2376         return NXT_UNIT_OK;
2377     }
2378 
2379     size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
2380 
2381     b = malloc(size);
2382     if (nxt_slow_path(b == NULL)) {
2383         return NXT_UNIT_ERROR;
2384     }
2385 
2386     memcpy(b, ws_impl->buf->buf.start, size);
2387 
2388     ws_impl->buf->buf.start = b;
2389     ws_impl->buf->buf.free = b;
2390     ws_impl->buf->buf.end = b + size;
2391 
2392     ws_impl->retain_buf = b;
2393 
2394     return NXT_UNIT_OK;
2395 }
2396 
2397 
2398 void
2399 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
2400 {
2401     nxt_unit_websocket_frame_release(ws);
2402 }
2403 
2404 
2405 static nxt_port_mmap_header_t *
2406 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2407     nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n)
2408 {
2409     int                     res, nchunks, i;
2410     nxt_unit_mmap_t         *mm, *mm_end;
2411     nxt_port_mmap_header_t  *hdr;
2412 
2413     pthread_mutex_lock(&process->outgoing.mutex);
2414 
2415     mm_end = process->outgoing.elts + process->outgoing.size;
2416 
2417     for (mm = process->outgoing.elts; mm < mm_end; mm++) {
2418         hdr = mm->hdr;
2419 
2420         if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) {
2421             continue;
2422         }
2423 
2424         *c = 0;
2425 
2426         while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
2427             nchunks = 1;
2428 
2429             while (nchunks < n) {
2430                 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
2431                                                        *c + nchunks);
2432 
2433                 if (res == 0) {
2434                     for (i = 0; i < nchunks; i++) {
2435                         nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
2436                     }
2437 
2438                     *c += nchunks + 1;
2439                     nchunks = 0;
2440                     break;
2441                 }
2442 
2443                 nchunks++;
2444             }
2445 
2446             if (nchunks == n) {
2447                 goto unlock;
2448             }
2449         }
2450     }
2451 
2452     *c = 0;
2453     hdr = nxt_unit_new_mmap(ctx, process, port_id, n);
2454 
2455 unlock:
2456 
2457     pthread_mutex_unlock(&process->outgoing.mutex);
2458 
2459     return hdr;
2460 }
2461 
2462 
2463 static nxt_unit_mmap_t *
2464 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
2465 {
2466     uint32_t  cap;
2467 
2468     cap = mmaps->cap;
2469 
2470     if (cap == 0) {
2471         cap = i + 1;
2472     }
2473 
2474     while (i + 1 > cap) {
2475 
2476         if (cap < 16) {
2477             cap = cap * 2;
2478 
2479         } else {
2480             cap = cap + cap / 2;
2481         }
2482     }
2483 
2484     if (cap != mmaps->cap) {
2485 
2486         mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
2487         if (nxt_slow_path(mmaps->elts == NULL)) {
2488             return NULL;
2489         }
2490 
2491         memset(mmaps->elts + mmaps->cap, 0,
2492                sizeof(*mmaps->elts) * (cap - mmaps->cap));
2493 
2494         mmaps->cap = cap;
2495     }
2496 
2497     if (i + 1 > mmaps->size) {
2498         mmaps->size = i + 1;
2499     }
2500 
2501     return mmaps->elts + i;
2502 }
2503 
2504 
2505 static nxt_port_mmap_header_t *
2506 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2507     nxt_unit_port_id_t *port_id, int n)
2508 {
2509     int                     i, fd, rc;
2510     void                    *mem;
2511     char                    name[64];
2512     nxt_unit_mmap_t         *mm;
2513     nxt_unit_impl_t         *lib;
2514     nxt_port_mmap_header_t  *hdr;
2515 
2516     lib = process->lib;
2517 
2518     mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
2519     if (nxt_slow_path(mm == NULL)) {
2520         nxt_unit_warn(ctx, "failed to add mmap to outgoing array");
2521 
2522         return NULL;
2523     }
2524 
2525     snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
2526              lib->pid, (void *) pthread_self());
2527 
2528 #if (NXT_HAVE_MEMFD_CREATE)
2529 
2530     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
2531     if (nxt_slow_path(fd == -1)) {
2532         nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
2533                        strerror(errno), errno);
2534 
2535         goto remove_fail;
2536     }
2537 
2538     nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
2539 
2540 #elif (NXT_HAVE_SHM_OPEN_ANON)
2541 
2542     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
2543     if (nxt_slow_path(fd == -1)) {
2544         nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
2545                        strerror(errno), errno);
2546 
2547         goto remove_fail;
2548     }
2549 
2550 #elif (NXT_HAVE_SHM_OPEN)
2551 
2552     /* Just in case. */
2553     shm_unlink(name);
2554 
2555     fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
2556     if (nxt_slow_path(fd == -1)) {
2557         nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
2558                        strerror(errno), errno);
2559 
2560         goto remove_fail;
2561     }
2562 
2563     if (nxt_slow_path(shm_unlink(name) == -1)) {
2564         nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
2565                       strerror(errno), errno);
2566     }
2567 
2568 #else
2569 
2570 #error No working shared memory implementation.
2571 
2572 #endif
2573 
2574     if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
2575         nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
2576                        strerror(errno), errno);
2577 
2578         goto remove_fail;
2579     }
2580 
2581     mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2582     if (nxt_slow_path(mem == MAP_FAILED)) {
2583         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
2584                        strerror(errno), errno);
2585 
2586         goto remove_fail;
2587     }
2588 
2589     mm->hdr = mem;
2590     hdr = mem;
2591 
2592     memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
2593     memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
2594 
2595     hdr->id = process->outgoing.size - 1;
2596     hdr->src_pid = lib->pid;
2597     hdr->dst_pid = process->pid;
2598     hdr->sent_over = port_id->id;
2599 
2600     /* Mark first n chunk(s) as busy */
2601     for (i = 0; i < n; i++) {
2602         nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
2603     }
2604 
2605     /* Mark as busy chunk followed the last available chunk. */
2606     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
2607     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
2608 
2609     pthread_mutex_unlock(&process->outgoing.mutex);
2610 
2611     rc = nxt_unit_send_mmap(ctx, port_id, fd);
2612     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2613         munmap(mem, PORT_MMAP_SIZE);
2614         hdr = NULL;
2615 
2616     } else {
2617         nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
2618                        hdr->id, (int) lib->pid, (int) process->pid);
2619     }
2620 
2621     close(fd);
2622 
2623     pthread_mutex_lock(&process->outgoing.mutex);
2624 
2625     if (nxt_fast_path(hdr != NULL)) {
2626         return hdr;
2627     }
2628 
2629 remove_fail:
2630 
2631     process->outgoing.size--;
2632 
2633     return NULL;
2634 }
2635 
2636 
2637 static int
2638 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
2639 {
2640     ssize_t          res;
2641     nxt_port_msg_t   msg;
2642     nxt_unit_impl_t  *lib;
2643     union {
2644         struct cmsghdr  cm;
2645         char            space[CMSG_SPACE(sizeof(int))];
2646     } cmsg;
2647 
2648     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2649 
2650     msg.stream = 0;
2651     msg.pid = lib->pid;
2652     msg.reply_port = 0;
2653     msg.type = _NXT_PORT_MSG_MMAP;
2654     msg.last = 0;
2655     msg.mmap = 0;
2656     msg.nf = 0;
2657     msg.mf = 0;
2658     msg.tracking = 0;
2659 
2660     /*
2661      * Fill all padding fields with 0.
2662      * Code in Go 1.11 validate cmsghdr using padding field as part of len.
2663      * See Cmsghdr definition and socketControlMessageHeaderAndData function.
2664      */
2665     memset(&cmsg, 0, sizeof(cmsg));
2666 
2667     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
2668     cmsg.cm.cmsg_level = SOL_SOCKET;
2669     cmsg.cm.cmsg_type = SCM_RIGHTS;
2670 
2671     /*
2672      * memcpy() is used instead of simple
2673      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
2674      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
2675      *   dereferencing type-punned pointer will break strict-aliasing rules
2676      *
2677      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
2678      * in the same simple assignment as in the code above.
2679      */
2680     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
2681 
2682     res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg),
2683                                    &cmsg, sizeof(cmsg));
2684     if (nxt_slow_path(res != sizeof(msg))) {
2685         nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)",
2686                       (int) port_id->pid, strerror(errno), errno);
2687 
2688         return NXT_UNIT_ERROR;
2689     }
2690 
2691     return NXT_UNIT_OK;
2692 }
2693 
2694 
2695 static int
2696 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2697     nxt_unit_port_id_t *port_id, uint32_t size,
2698     nxt_unit_mmap_buf_t *mmap_buf)
2699 {
2700     uint32_t                nchunks;
2701     nxt_chunk_id_t          c;
2702     nxt_port_mmap_header_t  *hdr;
2703 
2704     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
2705 
2706     hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks);
2707     if (nxt_slow_path(hdr == NULL)) {
2708         return NXT_UNIT_ERROR;
2709     }
2710 
2711     mmap_buf->hdr = hdr;
2712     mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
2713     mmap_buf->buf.free = mmap_buf->buf.start;
2714     mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
2715     mmap_buf->port_id = *port_id;
2716 
2717     nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
2718                   (int) hdr->id, (int) c,
2719                   (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
2720 
2721     return NXT_UNIT_OK;
2722 }
2723 
2724 
2725 static int
2726 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
2727 {
2728     int                      rc;
2729     void                     *mem;
2730     struct stat              mmap_stat;
2731     nxt_unit_mmap_t          *mm;
2732     nxt_unit_impl_t          *lib;
2733     nxt_unit_process_t       *process;
2734     nxt_port_mmap_header_t   *hdr;
2735 
2736     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2737 
2738     nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
2739 
2740     pthread_mutex_lock(&lib->mutex);
2741 
2742     process = nxt_unit_process_find(ctx, pid, 0);
2743 
2744     pthread_mutex_unlock(&lib->mutex);
2745 
2746     if (nxt_slow_path(process == NULL)) {
2747         nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d",
2748                       (int) pid, fd);
2749 
2750         return NXT_UNIT_ERROR;
2751     }
2752 
2753     rc = NXT_UNIT_ERROR;
2754 
2755     if (fstat(fd, &mmap_stat) == -1) {
2756         nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
2757                       strerror(errno), errno);
2758 
2759         goto fail;
2760     }
2761 
2762     mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
2763                MAP_SHARED, fd, 0);
2764     if (nxt_slow_path(mem == MAP_FAILED)) {
2765         nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
2766                       strerror(errno), errno);
2767 
2768         goto fail;
2769     }
2770 
2771     hdr = mem;
2772 
2773     if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {
2774 
2775         nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
2776                       "detected: %d != %d or %d != %d", (int) hdr->src_pid,
2777                       (int) pid, (int) hdr->dst_pid, (int) lib->pid);
2778 
2779         munmap(mem, PORT_MMAP_SIZE);
2780 
2781         goto fail;
2782     }
2783 
2784     pthread_mutex_lock(&process->incoming.mutex);
2785 
2786     mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
2787     if (nxt_slow_path(mm == NULL)) {
2788         nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");
2789 
2790         munmap(mem, PORT_MMAP_SIZE);
2791 
2792     } else {
2793         mm->hdr = hdr;
2794 
2795         hdr->sent_over = 0xFFFFu;
2796 
2797         rc = NXT_UNIT_OK;
2798     }
2799 
2800     pthread_mutex_unlock(&process->incoming.mutex);
2801 
2802 fail:
2803 
2804     nxt_unit_process_use(ctx, process, -1);
2805 
2806     return rc;
2807 }
2808 
2809 
2810 static void
2811 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
2812 {
2813     pthread_mutex_init(&mmaps->mutex, NULL);
2814 
2815     mmaps->size = 0;
2816     mmaps->cap = 0;
2817     mmaps->elts = NULL;
2818 }
2819 
2820 
2821 static void
2822 nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i)
2823 {
2824     long c;
2825 
2826     c = nxt_atomic_fetch_add(&process->use_count, i);
2827 
2828     if (i < 0 && c == -i) {
2829         nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid);
2830 
2831         nxt_unit_mmaps_destroy(&process->incoming);
2832         nxt_unit_mmaps_destroy(&process->outgoing);
2833 
2834         free(process);
2835     }
2836 }
2837 
2838 
2839 static void
2840 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
2841 {
2842     nxt_unit_mmap_t  *mm, *end;
2843 
2844     if (mmaps->elts != NULL) {
2845         end = mmaps->elts + mmaps->size;
2846 
2847         for (mm = mmaps->elts; mm < end; mm++) {
2848             munmap(mm->hdr, PORT_MMAP_SIZE);
2849         }
2850 
2851         free(mmaps->elts);
2852     }
2853 
2854     pthread_mutex_destroy(&mmaps->mutex);
2855 }
2856 
2857 
2858 static nxt_port_mmap_header_t *
2859 nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2860     uint32_t id)
2861 {
2862     nxt_port_mmap_header_t  *hdr;
2863 
2864     if (nxt_fast_path(process->incoming.size > id)) {
2865         hdr = process->incoming.elts[id].hdr;
2866 
2867     } else {
2868         hdr = NULL;
2869     }
2870 
2871     return hdr;
2872 }
2873 
2874 
2875 static int
2876 nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
2877 {
2878     int                           rc;
2879     nxt_chunk_id_t                c;
2880     nxt_unit_process_t            *process;
2881     nxt_port_mmap_header_t        *hdr;
2882     nxt_port_mmap_tracking_msg_t  *tracking_msg;
2883 
2884     if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
2885         nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
2886                       recv_msg->stream, (int) recv_msg->size);
2887 
2888         return 0;
2889     }
2890 
2891     tracking_msg = recv_msg->start;
2892 
2893     recv_msg->start = tracking_msg + 1;
2894     recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
2895 
2896     process = nxt_unit_msg_get_process(ctx, recv_msg);
2897     if (nxt_slow_path(process == NULL)) {
2898         return 0;
2899     }
2900 
2901     pthread_mutex_lock(&process->incoming.mutex);
2902 
2903     hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
2904     if (nxt_slow_path(hdr == NULL)) {
2905         pthread_mutex_unlock(&process->incoming.mutex);
2906 
2907         nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
2908                       "invalid mmap id %d,%"PRIu32,
2909                       recv_msg->stream, (int) process->pid,
2910                       tracking_msg->mmap_id);
2911 
2912         return 0;
2913     }
2914 
2915     c = tracking_msg->tracking_id;
2916     rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
2917 
2918     if (rc == 0) {
2919         nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
2920                        recv_msg->stream);
2921 
2922         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
2923     }
2924 
2925     pthread_mutex_unlock(&process->incoming.mutex);
2926 
2927     return rc;
2928 }
2929 
2930 
2931 static int
2932 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
2933 {
2934     void                    *start;
2935     uint32_t                size;
2936     nxt_unit_process_t      *process;
2937     nxt_unit_mmap_buf_t     *b, **incoming_tail;
2938     nxt_port_mmap_msg_t     *mmap_msg, *end;
2939     nxt_port_mmap_header_t  *hdr;
2940 
2941     if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
2942         nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
2943                       recv_msg->stream, (int) recv_msg->size);
2944 
2945         return NXT_UNIT_ERROR;
2946     }
2947 
2948     process = nxt_unit_msg_get_process(ctx, recv_msg);
2949     if (nxt_slow_path(process == NULL)) {
2950         return NXT_UNIT_ERROR;
2951     }
2952 
2953     mmap_msg = recv_msg->start;
2954     end = nxt_pointer_to(recv_msg->start, recv_msg->size);
2955 
2956     incoming_tail = &recv_msg->incoming_buf;
2957 
2958     pthread_mutex_lock(&process->incoming.mutex);
2959 
2960     for (; mmap_msg < end; mmap_msg++) {
2961         hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
2962         if (nxt_slow_path(hdr == NULL)) {
2963             pthread_mutex_unlock(&process->incoming.mutex);
2964 
2965             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
2966                           "invalid mmap id %d,%"PRIu32,
2967                           recv_msg->stream, (int) process->pid,
2968                           mmap_msg->mmap_id);
2969 
2970             return NXT_UNIT_ERROR;
2971         }
2972 
2973         start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
2974         size = mmap_msg->size;
2975 
2976         if (recv_msg->start == mmap_msg) {
2977             recv_msg->start = start;
2978             recv_msg->size = size;
2979         }
2980 
2981         b = nxt_unit_mmap_buf_get(ctx);
2982         if (nxt_slow_path(b == NULL)) {
2983             pthread_mutex_unlock(&process->incoming.mutex);
2984 
2985             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
2986                           recv_msg->stream);
2987 
2988             nxt_unit_mmap_release(hdr, start, size);
2989 
2990             return NXT_UNIT_ERROR;
2991         }
2992 
2993         nxt_unit_mmap_buf_insert(incoming_tail, b);
2994         incoming_tail = &b->next;
2995 
2996         b->buf.start = start;
2997         b->buf.free = start;
2998         b->buf.end = b->buf.start + size;
2999         b->hdr = hdr;
3000 
3001         nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
3002                        recv_msg->stream,
3003                        start, (int) size,
3004                        (int) hdr->src_pid, (int) hdr->dst_pid,
3005                        (int) hdr->id, (int) mmap_msg->chunk_id,
3006                        (int) mmap_msg->size);
3007     }
3008 
3009     pthread_mutex_unlock(&process->incoming.mutex);
3010 
3011     return NXT_UNIT_OK;
3012 }
3013 
3014 
3015 static int
3016 nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size)
3017 {
3018     u_char          *p, *end;
3019     nxt_chunk_id_t  c;
3020 
3021     memset(start, 0xA5, size);
3022 
3023     p = start;
3024     end = p + size;
3025     c = nxt_port_mmap_chunk_id(hdr, p);
3026 
3027     while (p < end) {
3028         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
3029 
3030         p += PORT_MMAP_CHUNK_SIZE;
3031         c++;
3032     }
3033 
3034     return NXT_UNIT_OK;
3035 }
3036 
3037 
3038 static nxt_int_t
3039 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
3040 {
3041     nxt_process_t  *process;
3042 
3043     process = data;
3044 
3045     if (lhq->key.length == sizeof(pid_t)
3046         && *(pid_t *) lhq->key.start == process->pid)
3047     {
3048         return NXT_OK;
3049     }
3050 
3051     return NXT_DECLINED;
3052 }
3053 
3054 
3055 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
3056     NXT_LVLHSH_DEFAULT,
3057     nxt_unit_lvlhsh_pid_test,
3058     nxt_lvlhsh_alloc,
3059     nxt_lvlhsh_free,
3060 };
3061 
3062 
3063 static inline void
3064 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
3065 {
3066     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
3067     lhq->key.length = sizeof(*pid);
3068     lhq->key.start = (u_char *) pid;
3069     lhq->proto = &lvlhsh_processes_proto;
3070 }
3071 
3072 
3073 static nxt_unit_process_t *
3074 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
3075 {
3076     nxt_unit_impl_t     *lib;
3077     nxt_unit_process_t  *process;
3078     nxt_lvlhsh_query_t  lhq;
3079 
3080     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3081 
3082     nxt_unit_process_lhq_pid(&lhq, &pid);
3083 
3084     if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
3085         process = lhq.value;
3086         nxt_unit_process_use(ctx, process, 1);
3087 
3088         return process;
3089     }
3090 
3091     process = malloc(sizeof(nxt_unit_process_t));
3092     if (nxt_slow_path(process == NULL)) {
3093         nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid);
3094 
3095         return NULL;
3096     }
3097 
3098     process->pid = pid;
3099     process->use_count = 1;
3100     process->next_port_id = 0;
3101     process->lib = lib;
3102 
3103     nxt_queue_init(&process->ports);
3104 
3105     nxt_unit_mmaps_init(&process->incoming);
3106     nxt_unit_mmaps_init(&process->outgoing);
3107 
3108     lhq.replace = 0;
3109     lhq.value = process;
3110 
3111     switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
3112 
3113     case NXT_OK:
3114         break;
3115 
3116     default:
3117         nxt_unit_warn(ctx, "process %d insert failed", (int) pid);
3118 
3119         pthread_mutex_destroy(&process->outgoing.mutex);
3120         pthread_mutex_destroy(&process->incoming.mutex);
3121         free(process);
3122         process = NULL;
3123         break;
3124     }
3125 
3126     nxt_unit_process_use(ctx, process, 1);
3127 
3128     return process;
3129 }
3130 
3131 
3132 static nxt_unit_process_t *
3133 nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove)
3134 {
3135     int                 rc;
3136     nxt_unit_impl_t     *lib;
3137     nxt_unit_process_t  *process;
3138     nxt_lvlhsh_query_t  lhq;
3139 
3140     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3141 
3142     nxt_unit_process_lhq_pid(&lhq, &pid);
3143 
3144     if (remove) {
3145         rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
3146 
3147     } else {
3148         rc = nxt_lvlhsh_find(&lib->processes, &lhq);
3149     }
3150 
3151     if (rc == NXT_OK) {
3152         process = lhq.value;
3153 
3154         if (!remove) {
3155             nxt_unit_process_use(ctx, process, 1);
3156         }
3157 
3158         return process;
3159     }
3160 
3161     return NULL;
3162 }
3163 
3164 
3165 static nxt_unit_process_t *
3166 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
3167 {
3168     return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
3169 }
3170 
3171 
3172 int
3173 nxt_unit_run(nxt_unit_ctx_t *ctx)
3174 {
3175     int              rc;
3176     nxt_unit_impl_t  *lib;
3177 
3178     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3179     rc = NXT_UNIT_OK;
3180 
3181     while (nxt_fast_path(lib->online)) {
3182         rc = nxt_unit_run_once(ctx);
3183     }
3184 
3185     return rc;
3186 }
3187 
3188 
3189 int
3190 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
3191 {
3192     int                  rc;
3193     char                 buf[4096];
3194     char                 oob[256];
3195     ssize_t              rsize;
3196     nxt_unit_impl_t      *lib;
3197     nxt_unit_ctx_impl_t  *ctx_impl;
3198 
3199     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3200     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3201 
3202     memset(oob, 0, sizeof(struct cmsghdr));
3203 
3204     if (ctx_impl->read_port_fd != -1) {
3205         rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
3206                                          buf, sizeof(buf),
3207                                          oob, sizeof(oob));
3208     } else {
3209         rsize = lib->callbacks.port_recv(