xref: /unit/src/nxt_unit.c (revision 743:e0f0cd7d244a)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <stdlib.h>
7 
8 #include "nxt_main.h"
9 #include "nxt_port_memory_int.h"
10 
11 #include "nxt_unit.h"
12 #include "nxt_unit_request.h"
13 #include "nxt_unit_response.h"
14 
15 #if (NXT_HAVE_MEMFD_CREATE)
16 #include <linux/memfd.h>
17 #endif
18 
19 typedef struct nxt_unit_impl_s               nxt_unit_impl_t;
20 typedef struct nxt_unit_mmap_s               nxt_unit_mmap_t;
21 typedef struct nxt_unit_mmaps_s              nxt_unit_mmaps_t;
22 typedef struct nxt_unit_process_s            nxt_unit_process_t;
23 typedef struct nxt_unit_mmap_buf_s           nxt_unit_mmap_buf_t;
24 typedef struct nxt_unit_recv_msg_s           nxt_unit_recv_msg_t;
25 typedef struct nxt_unit_ctx_impl_s           nxt_unit_ctx_impl_t;
26 typedef struct nxt_unit_port_impl_s          nxt_unit_port_impl_t;
27 typedef struct nxt_unit_request_info_impl_s  nxt_unit_request_info_impl_t;
28 
29 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
30 static void nxt_unit_ctx_init(nxt_unit_impl_t *lib,
31     nxt_unit_ctx_impl_t *ctx_impl, void *data);
32 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
33     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream);
34 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
35     uint32_t stream);
36 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
37     nxt_unit_ctx_t *ctx);
38 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
39 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
40 static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
41     nxt_unit_recv_msg_t *recv_msg);
42 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
43 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
44 static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
45     nxt_unit_mmap_buf_t *mmap_buf, int last);
46 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
47     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
48     nxt_chunk_id_t *c, int n);
49 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
50 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
51     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
52 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
53     int fd);
54 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
55     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
56     nxt_unit_mmap_buf_t *mmap_buf);
57 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
58 
59 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
60 static void nxt_unit_process_use(nxt_unit_ctx_t *ctx,
61     nxt_unit_process_t *process, int i);
62 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
63 static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
64     nxt_unit_process_t *process, uint32_t id);
65 static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
66     nxt_unit_recv_msg_t *recv_msg);
67 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
68     nxt_unit_recv_msg_t *recv_msg, nxt_queue_t *incoming_buf);
69 static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start,
70     uint32_t size);
71 
72 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
73     pid_t pid);
74 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
75     pid_t pid, int remove);
76 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
77 static int nxt_unit_run_once(nxt_unit_ctx_t *ctx);
78 static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
79     nxt_unit_port_id_t *port_id, int *fd);
80 
81 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
82     nxt_unit_port_id_t *new_port, int fd);
83 
84 static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx,
85     nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port,
86     nxt_unit_process_t **process);
87 static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx,
88     nxt_unit_process_t *process);
89 
90 static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx,
91     nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size,
92     const void *oob, size_t oob_size);
93 static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx,
94     nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
95     void *oob, size_t oob_size);
96 
97 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
98     nxt_unit_port_t *port);
99 static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
100     nxt_unit_port_id_t *port_id, int remove);
101 
102 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
103 
104 
105 struct nxt_unit_mmap_buf_s {
106     nxt_unit_buf_t           buf;
107 
108     nxt_port_mmap_header_t   *hdr;
109     nxt_queue_link_t         link;
110     nxt_unit_port_id_t       port_id;
111     nxt_unit_request_info_t  *req;
112     nxt_unit_ctx_impl_t      *ctx_impl;
113 };
114 
115 
116 struct nxt_unit_recv_msg_s {
117     nxt_port_msg_t           port_msg;
118 
119     void                     *start;
120     uint32_t                 size;
121 
122     nxt_unit_process_t       *process;
123 };
124 
125 
126 typedef enum {
127     NXT_UNIT_RS_START           = 0,
128     NXT_UNIT_RS_RESPONSE_INIT,
129     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
130     NXT_UNIT_RS_RESPONSE_SENT,
131     NXT_UNIT_RS_DONE,
132 } nxt_unit_req_state_t;
133 
134 
135 struct nxt_unit_request_info_impl_s {
136     nxt_unit_request_info_t  req;
137 
138     nxt_unit_recv_msg_t      recv_msg;
139     nxt_queue_t              outgoing_buf;    /*  of nxt_unit_mmap_buf_t */
140     nxt_queue_t              incoming_buf;    /*  of nxt_unit_mmap_buf_t */
141 
142     nxt_unit_req_state_t     state;
143 
144     nxt_queue_link_t         link;
145 
146     char                     extra_data[];
147 };
148 
149 
150 struct nxt_unit_ctx_impl_s {
151     nxt_unit_ctx_t                ctx;
152 
153     nxt_unit_port_id_t            read_port_id;
154     int                           read_port_fd;
155 
156     nxt_queue_link_t              link;
157 
158     nxt_queue_t                   free_buf;  /*  of nxt_unit_mmap_buf_t */
159 
160     /*  of nxt_unit_request_info_impl_t */
161     nxt_queue_t                   free_req;
162 
163     /*  of nxt_unit_request_info_impl_t */
164     nxt_queue_t                   active_req;
165 
166     nxt_unit_mmap_buf_t           ctx_buf[2];
167 
168     nxt_unit_request_info_impl_t  req;
169 };
170 
171 
172 struct nxt_unit_impl_s {
173     nxt_unit_t               unit;
174     nxt_unit_callbacks_t     callbacks;
175 
176     uint32_t                 request_data_size;
177 
178     pthread_mutex_t          mutex;
179 
180     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
181     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
182 
183     nxt_unit_port_id_t       ready_port_id;
184 
185     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
186 
187     pid_t                    pid;
188     int                      log_fd;
189     int                      online;
190 
191     nxt_unit_ctx_impl_t      main_ctx;
192 };
193 
194 
195 struct nxt_unit_port_impl_s {
196     nxt_unit_port_t          port;
197 
198     nxt_queue_link_t         link;
199     nxt_unit_process_t       *process;
200 };
201 
202 
203 struct nxt_unit_mmap_s {
204     nxt_port_mmap_header_t   *hdr;
205 };
206 
207 
208 struct nxt_unit_mmaps_s {
209     pthread_mutex_t          mutex;
210     uint32_t                 size;
211     uint32_t                 cap;
212     nxt_unit_mmap_t          *elts;
213 };
214 
215 
216 struct nxt_unit_process_s {
217     pid_t                    pid;
218 
219     nxt_queue_t              ports;
220 
221     nxt_unit_mmaps_t         incoming;
222     nxt_unit_mmaps_t         outgoing;
223 
224     nxt_unit_impl_t          *lib;
225 
226     nxt_atomic_t             use_count;
227 
228     uint32_t                 next_port_id;
229 };
230 
231 
232 /* Explicitly using 32 bit types to avoid possible alignment. */
233 typedef struct {
234     int32_t   pid;
235     uint32_t  id;
236 } nxt_unit_port_hash_id_t;
237 
238 
239 nxt_unit_ctx_t *
240 nxt_unit_init(nxt_unit_init_t *init)
241 {
242     int              rc;
243     uint32_t         ready_stream;
244     nxt_unit_ctx_t   *ctx;
245     nxt_unit_impl_t  *lib;
246     nxt_unit_port_t  ready_port, read_port;
247 
248     lib = nxt_unit_create(init);
249     if (nxt_slow_path(lib == NULL)) {
250         return NULL;
251     }
252 
253     if (init->ready_port.id.pid != 0
254         && init->ready_stream != 0
255         && init->read_port.id.pid != 0)
256     {
257         ready_port = init->ready_port;
258         ready_stream = init->ready_stream;
259         read_port = init->read_port;
260         lib->log_fd = init->log_fd;
261 
262         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
263                               ready_port.id.id);
264         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
265                               read_port.id.id);
266     } else {
267         rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd,
268                                &ready_stream);
269         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
270             goto fail;
271         }
272     }
273 
274     ctx = &lib->main_ctx.ctx;
275 
276     rc = lib->callbacks.add_port(ctx, &ready_port);
277     if (rc != NXT_UNIT_OK) {
278         nxt_unit_alert(NULL, "failed to add ready_port");
279 
280         goto fail;
281     }
282 
283     rc = lib->callbacks.add_port(ctx, &read_port);
284     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
285         nxt_unit_alert(NULL, "failed to add read_port");
286 
287         goto fail;
288     }
289 
290     lib->main_ctx.read_port_id = read_port.id;
291     lib->ready_port_id = ready_port.id;
292 
293     rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream);
294     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
295         nxt_unit_alert(NULL, "failed to send READY message");
296 
297         goto fail;
298     }
299 
300     return ctx;
301 
302 fail:
303 
304     free(lib);
305 
306     return NULL;
307 }
308 
309 
310 static nxt_unit_impl_t *
311 nxt_unit_create(nxt_unit_init_t *init)
312 {
313     int                   rc;
314     nxt_unit_impl_t       *lib;
315     nxt_unit_callbacks_t  *cb;
316 
317     lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size);
318     if (nxt_slow_path(lib == NULL)) {
319         nxt_unit_alert(NULL, "failed to allocate unit struct");
320 
321         return NULL;
322     }
323 
324     rc = pthread_mutex_init(&lib->mutex, NULL);
325     if (nxt_slow_path(rc != 0)) {
326         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
327 
328         goto fail;
329     }
330 
331     lib->unit.data = init->data;
332     lib->callbacks = init->callbacks;
333 
334     lib->request_data_size = init->request_data_size;
335 
336     lib->processes.slot = NULL;
337     lib->ports.slot = NULL;
338 
339     lib->pid = getpid();
340     lib->log_fd = STDERR_FILENO;
341     lib->online = 1;
342 
343     nxt_queue_init(&lib->contexts);
344 
345     nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
346 
347     cb = &lib->callbacks;
348 
349     if (cb->request_handler == NULL) {
350         nxt_unit_alert(NULL, "request_handler is NULL");
351 
352         goto fail;
353     }
354 
355     if (cb->add_port == NULL) {
356         cb->add_port = nxt_unit_add_port;
357     }
358 
359     if (cb->remove_port == NULL) {
360         cb->remove_port = nxt_unit_remove_port;
361     }
362 
363     if (cb->remove_pid == NULL) {
364         cb->remove_pid = nxt_unit_remove_pid;
365     }
366 
367     if (cb->quit == NULL) {
368         cb->quit = nxt_unit_quit;
369     }
370 
371     if (cb->port_send == NULL) {
372         cb->port_send = nxt_unit_port_send_default;
373     }
374 
375     if (cb->port_recv == NULL) {
376         cb->port_recv = nxt_unit_port_recv_default;
377     }
378 
379     return lib;
380 
381 fail:
382 
383     free(lib);
384 
385     return NULL;
386 }
387 
388 
389 static void
390 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
391     void *data)
392 {
393     ctx_impl->ctx.data = data;
394     ctx_impl->ctx.unit = &lib->unit;
395 
396     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
397 
398     nxt_queue_init(&ctx_impl->free_buf);
399     nxt_queue_init(&ctx_impl->free_req);
400     nxt_queue_init(&ctx_impl->active_req);
401 
402     nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0].link);
403     nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1].link);
404     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
405 
406     ctx_impl->req.req.ctx = &ctx_impl->ctx;
407     ctx_impl->req.req.unit = &lib->unit;
408 
409     ctx_impl->read_port_fd = -1;
410 }
411 
412 
413 static int
414 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
415     int *log_fd, uint32_t *stream)
416 {
417     int       rc;
418     int       ready_fd, read_fd;
419     char      *unit_init, *version_end;
420     long      version_length;
421     int64_t   ready_pid, read_pid;
422     uint32_t  ready_stream, ready_id, read_id;
423 
424     unit_init = getenv(NXT_UNIT_INIT_ENV);
425     if (nxt_slow_path(unit_init == NULL)) {
426         nxt_unit_alert(NULL, "%s is not in the current environment",
427                        NXT_UNIT_INIT_ENV);
428 
429         return NXT_UNIT_ERROR;
430     }
431 
432     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
433 
434     version_length = nxt_length(NXT_VERSION);
435 
436     version_end = strchr(unit_init, ';');
437     if (version_end == NULL
438         || version_end - unit_init != version_length
439         || memcmp(unit_init, NXT_VERSION, version_length) != 0)
440     {
441         nxt_unit_alert(NULL, "version check error");
442 
443         return NXT_UNIT_ERROR;
444     }
445 
446     rc = sscanf(version_end + 1,
447                 "%"PRIu32";"
448                 "%"PRId64",%"PRIu32",%d;"
449                 "%"PRId64",%"PRIu32",%d;"
450                 "%d",
451                 &ready_stream,
452                 &ready_pid, &ready_id, &ready_fd,
453                 &read_pid, &read_id, &read_fd,
454                 log_fd);
455 
456     if (nxt_slow_path(rc != 8)) {
457         nxt_unit_alert(NULL, "failed to scan variables");
458 
459         return NXT_UNIT_ERROR;
460     }
461 
462     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
463 
464     ready_port->in_fd = -1;
465     ready_port->out_fd = ready_fd;
466     ready_port->data = NULL;
467 
468     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
469 
470     read_port->in_fd = read_fd;
471     read_port->out_fd = -1;
472     read_port->data = NULL;
473 
474     *stream = ready_stream;
475 
476     return NXT_UNIT_OK;
477 }
478 
479 
480 static int
481 nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
482     uint32_t stream)
483 {
484     ssize_t          res;
485     nxt_port_msg_t   msg;
486     nxt_unit_impl_t  *lib;
487 
488     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
489 
490     msg.stream = stream;
491     msg.pid = lib->pid;
492     msg.reply_port = 0;
493     msg.type = _NXT_PORT_MSG_PROCESS_READY;
494     msg.last = 1;
495     msg.mmap = 0;
496     msg.nf = 0;
497     msg.mf = 0;
498     msg.tracking = 0;
499 
500     res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
501     if (res != sizeof(msg)) {
502         return NXT_UNIT_ERROR;
503     }
504 
505     return NXT_UNIT_OK;
506 }
507 
508 
509 int
510 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
511     void *buf, size_t buf_size, void *oob, size_t oob_size)
512 {
513     int                           fd, rc;
514     pid_t                         pid;
515     nxt_queue_t                   incoming_buf;
516     struct cmsghdr                *cm;
517     nxt_port_msg_t                *port_msg;
518     nxt_unit_impl_t               *lib;
519     nxt_unit_port_t               new_port;
520     nxt_queue_link_t              *lnk;
521     nxt_unit_request_t            *r;
522     nxt_unit_mmap_buf_t           *b;
523     nxt_unit_recv_msg_t           recv_msg;
524     nxt_unit_callbacks_t          *cb;
525     nxt_port_msg_new_port_t       *new_port_msg;
526     nxt_unit_request_info_t       *req;
527     nxt_unit_request_info_impl_t  *req_impl;
528 
529     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
530 
531     rc = NXT_UNIT_ERROR;
532     fd = -1;
533     recv_msg.process = NULL;
534     port_msg = buf;
535     cm = oob;
536 
537     if (oob_size >= CMSG_SPACE(sizeof(int))
538         && cm->cmsg_len == CMSG_LEN(sizeof(int))
539         && cm->cmsg_level == SOL_SOCKET
540         && cm->cmsg_type == SCM_RIGHTS)
541     {
542         memcpy(&fd, CMSG_DATA(cm), sizeof(int));
543     }
544 
545     if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
546         nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
547         goto fail;
548     }
549 
550     recv_msg.port_msg = *port_msg;
551     recv_msg.start = port_msg + 1;
552     recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
553 
554     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
555         nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
556                       port_msg->stream, (int) port_msg->type);
557         goto fail;
558     }
559 
560     if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
561         rc = NXT_UNIT_OK;
562 
563         goto fail;
564     }
565 
566     /* Fragmentation is unsupported. */
567     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
568         nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
569                       port_msg->stream, (int) port_msg->type);
570         goto fail;
571     }
572 
573     if (port_msg->mmap) {
574         nxt_queue_init(&incoming_buf);
575 
576         if (nxt_unit_mmap_read(ctx, &recv_msg, &incoming_buf) != NXT_UNIT_OK) {
577             goto fail;
578         }
579     }
580 
581     cb = &lib->callbacks;
582 
583     switch (port_msg->type) {
584 
585     case _NXT_PORT_MSG_QUIT:
586         nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
587 
588         cb->quit(ctx);
589         rc = NXT_UNIT_OK;
590         break;
591 
592     case _NXT_PORT_MSG_NEW_PORT:
593         if (nxt_slow_path(recv_msg.size != sizeof(nxt_port_msg_new_port_t))) {
594             nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
595                           "invalid message size (%d)",
596                           port_msg->stream, (int) recv_msg.size);
597 
598             goto fail;
599         }
600 
601         new_port_msg = recv_msg.start;
602 
603         nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
604                        port_msg->stream, (int) new_port_msg->pid,
605                        (int) new_port_msg->id, fd);
606 
607         nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
608                               new_port_msg->id);
609 
610         new_port.in_fd = -1;
611         new_port.out_fd = fd;
612         new_port.data = NULL;
613 
614         fd = -1;
615 
616         rc = cb->add_port(ctx, &new_port);
617         break;
618 
619     case _NXT_PORT_MSG_CHANGE_FILE:
620         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
621                        port_msg->stream, fd);
622         break;
623 
624     case _NXT_PORT_MSG_MMAP:
625         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, fd);
626         break;
627 
628     case _NXT_PORT_MSG_DATA:
629         if (nxt_slow_path(port_msg->mmap == 0)) {
630             nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
631                           port_msg->stream);
632 
633             goto fail;
634         }
635 
636         if (nxt_slow_path(recv_msg.size < sizeof(nxt_unit_request_t))) {
637             nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
638                           "%d expected", port_msg->stream, (int) recv_msg.size,
639                           (int) sizeof(nxt_unit_request_t));
640 
641             goto fail;
642         }
643 
644         req_impl = nxt_unit_request_info_get(ctx);
645         if (nxt_slow_path(req_impl == NULL)) {
646             nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
647                           port_msg->stream);
648 
649             goto fail;
650         }
651 
652         req = &req_impl->req;
653 
654         req->request_port = *port_id;
655 
656         nxt_unit_port_id_init(&req->response_port, port_msg->pid,
657                               port_msg->reply_port);
658 
659         req->request = recv_msg.start;
660 
661         lnk = nxt_queue_first(&incoming_buf);
662         b = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
663 
664         req->request_buf = &b->buf;
665         req->response = NULL;
666         req->response_buf = NULL;
667 
668         r = req->request;
669 
670         req->content_length = r->content_length;
671 
672         req->content_buf = req->request_buf;
673         req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
674 
675         /* Move process to req_impl. */
676         req_impl->recv_msg = recv_msg;
677 
678         recv_msg.process = NULL;
679 
680         nxt_queue_init(&req_impl->outgoing_buf);
681         nxt_queue_init(&req_impl->incoming_buf);
682 
683         nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link)
684         {
685             b->req = req;
686         } nxt_queue_loop;
687 
688         nxt_queue_add(&req_impl->incoming_buf, &incoming_buf);
689         nxt_queue_init(&incoming_buf);
690 
691         req->response_max_fields = 0;
692         req_impl->state = NXT_UNIT_RS_START;
693 
694         nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", port_msg->stream,
695                        (int) r->method_length, nxt_unit_sptr_get(&r->method),
696                        (int) r->target_length, nxt_unit_sptr_get(&r->target),
697                        (int) r->content_length);
698 
699         cb->request_handler(req);
700 
701         rc = NXT_UNIT_OK;
702         break;
703 
704     case _NXT_PORT_MSG_REMOVE_PID:
705         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
706             nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
707                           "(%d != %d)", port_msg->stream, (int) recv_msg.size,
708                           (int) sizeof(pid));
709 
710             goto fail;
711         }
712 
713         memcpy(&pid, recv_msg.start, sizeof(pid));
714 
715         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
716                        port_msg->stream, (int) pid);
717 
718         cb->remove_pid(ctx, pid);
719 
720         rc = NXT_UNIT_OK;
721         break;
722 
723     default:
724         nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
725                        port_msg->stream, (int) port_msg->type);
726 
727         goto fail;
728     }
729 
730 fail:
731 
732     if (fd != -1) {
733         close(fd);
734     }
735 
736     if (port_msg->mmap) {
737         nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link)
738         {
739             nxt_unit_mmap_release(b->hdr, b->buf.start,
740                                   b->buf.end - b->buf.start);
741 
742             nxt_unit_mmap_buf_release(b);
743         } nxt_queue_loop;
744     }
745 
746     if (recv_msg.process != NULL) {
747         nxt_unit_process_use(ctx, recv_msg.process, -1);
748     }
749 
750     return rc;
751 }
752 
753 
754 static nxt_unit_request_info_impl_t *
755 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
756 {
757     nxt_unit_impl_t               *lib;
758     nxt_queue_link_t              *lnk;
759     nxt_unit_ctx_impl_t           *ctx_impl;
760     nxt_unit_request_info_impl_t  *req_impl;
761 
762     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
763 
764     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
765 
766     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
767         req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
768                           + lib->request_data_size);
769         if (nxt_slow_path(req_impl == NULL)) {
770             nxt_unit_warn(ctx, "request info allocation failed");
771 
772             return NULL;
773         }
774 
775         req_impl->req.unit = ctx->unit;
776         req_impl->req.ctx = ctx;
777 
778     } else {
779         lnk = nxt_queue_first(&ctx_impl->free_req);
780         nxt_queue_remove(lnk);
781 
782         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
783     }
784 
785     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
786 
787     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
788 
789     return req_impl;
790 }
791 
792 
793 static void
794 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
795 {
796     nxt_unit_mmap_buf_t           *b;
797     nxt_unit_ctx_impl_t           *ctx_impl;
798     nxt_unit_recv_msg_t           *recv_msg;
799     nxt_unit_request_info_impl_t  *req_impl;
800 
801     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
802     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
803 
804     req->response = NULL;
805     req->response_buf = NULL;
806 
807     recv_msg = &req_impl->recv_msg;
808 
809     if (recv_msg->process != NULL) {
810         nxt_unit_process_use(req->ctx, recv_msg->process, -1);
811 
812         recv_msg->process = NULL;
813     }
814 
815     nxt_queue_each(b, &req_impl->outgoing_buf, nxt_unit_mmap_buf_t, link) {
816 
817         nxt_unit_buf_free(&b->buf);
818 
819     } nxt_queue_loop;
820 
821     nxt_queue_each(b, &req_impl->incoming_buf, nxt_unit_mmap_buf_t, link) {
822 
823         nxt_unit_mmap_release(b->hdr, b->buf.start, b->buf.end - b->buf.start);
824         nxt_unit_mmap_buf_release(b);
825 
826     } nxt_queue_loop;
827 
828     nxt_queue_remove(&req_impl->link);
829 
830     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
831 }
832 
833 
834 static void
835 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
836 {
837     nxt_unit_ctx_impl_t  *ctx_impl;
838 
839     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
840 
841     nxt_queue_remove(&req_impl->link);
842 
843     if (req_impl != &ctx_impl->req) {
844         free(req_impl);
845     }
846 }
847 
848 
849 uint16_t
850 nxt_unit_field_hash(const char *name, size_t name_length)
851 {
852     u_char      ch;
853     uint32_t    hash;
854     const char  *p, *end;
855 
856     hash = 159406; /* Magic value copied from nxt_http_parse.c */
857     end = name + name_length;
858 
859     for (p = name; p < end; p++) {
860         ch = *p;
861         hash = (hash << 4) + hash + nxt_lowcase(ch);
862     }
863 
864     hash = (hash >> 16) ^ hash;
865 
866     return hash;
867 }
868 
869 
870 void
871 nxt_unit_split_host(char *host, uint32_t host_length,
872     char **name, uint32_t *name_length, char **port, uint32_t *port_length)
873 {
874     char  *cpos;
875 
876     static char  default_host[] = "localhost";
877     static char  default_port[] = "80";
878 
879     if (nxt_slow_path(host == NULL || host_length == 0)) {
880         *name = default_host;
881         *name_length = nxt_length(default_host);
882 
883         *port = default_port;
884         *port_length = nxt_length(default_port);
885 
886         return;
887     }
888 
889     cpos = memchr(host, ':', host_length);
890 
891     if (nxt_slow_path(cpos == NULL)) {
892         *name = host;
893         *name_length = host_length;
894 
895         *port = default_port;
896         *port_length = nxt_length(default_port);
897 
898         return;
899     }
900 
901     if (nxt_slow_path(cpos == host)) {
902         *name = default_host;
903         *name_length = nxt_length(default_host);
904 
905     } else {
906         *name = host;
907         *name_length = cpos - host;
908     }
909 
910     cpos++;
911 
912     if (nxt_slow_path(host + host_length == cpos)) {
913         *port = default_port;
914         *port_length = nxt_length(default_port);
915 
916     } else {
917         *port = cpos;
918         *port_length = host_length - (cpos - host);
919     }
920 }
921 
922 
923 void
924 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
925 {
926     uint32_t            i, j;
927     nxt_unit_field_t    *fields, f;
928     nxt_unit_request_t  *r;
929 
930     nxt_unit_req_debug(req, "group_dup_fields");
931 
932     r = req->request;
933     fields = r->fields;
934 
935     for (i = 0; i < r->fields_count; i++) {
936 
937         switch (fields[i].hash) {
938         case NXT_UNIT_HASH_HOST:
939             r->host_field = i;
940             break;
941 
942         case NXT_UNIT_HASH_CONTENT_LENGTH:
943             r->content_length_field = i;
944             break;
945 
946         case NXT_UNIT_HASH_CONTENT_TYPE:
947             r->content_type_field = i;
948             break;
949 
950         case NXT_UNIT_HASH_COOKIE:
951             r->cookie_field = i;
952             break;
953         };
954 
955         for (j = i + 1; j < r->fields_count; j++) {
956             if (fields[i].hash != fields[j].hash) {
957                 continue;
958             }
959 
960             if (j == i + 1) {
961                 continue;
962             }
963 
964             f = fields[j];
965             f.name.offset += (j - (i + 1)) * sizeof(f);
966             f.value.offset += (j - (i + 1)) * sizeof(f);
967 
968             while (j > i + 1) {
969                 fields[j] = fields[j - 1];
970                 fields[j].name.offset -= sizeof(f);
971                 fields[j].value.offset -= sizeof(f);
972                 j--;
973             }
974 
975             fields[j] = f;
976 
977             i++;
978         }
979     }
980 }
981 
982 
983 int
984 nxt_unit_response_init(nxt_unit_request_info_t *req,
985     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
986 {
987     uint32_t                      buf_size;
988     nxt_unit_buf_t                *buf;
989     nxt_unit_request_info_impl_t  *req_impl;
990 
991     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
992 
993     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
994         nxt_unit_req_warn(req, "init: response already sent");
995 
996         return NXT_UNIT_ERROR;
997     }
998 
999     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
1000                        (int) max_fields_count, (int) max_fields_size);
1001 
1002     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
1003         nxt_unit_req_debug(req, "duplicate response init");
1004     }
1005 
1006     buf_size = sizeof(nxt_unit_response_t)
1007                + max_fields_count * sizeof(nxt_unit_field_t)
1008                + max_fields_size;
1009 
1010     if (nxt_slow_path(req->response_buf != NULL)) {
1011         buf = req->response_buf;
1012 
1013         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
1014             goto init_response;
1015         }
1016 
1017         nxt_unit_buf_free(buf);
1018 
1019         req->response_buf = NULL;
1020         req->response = NULL;
1021         req->response_max_fields = 0;
1022 
1023         req_impl->state = NXT_UNIT_RS_START;
1024     }
1025 
1026     buf = nxt_unit_response_buf_alloc(req, buf_size);
1027     if (nxt_slow_path(buf == NULL)) {
1028         return NXT_UNIT_ERROR;
1029     }
1030 
1031 init_response:
1032 
1033     memset(buf->start, 0, sizeof(nxt_unit_response_t));
1034 
1035     req->response_buf = buf;
1036 
1037     req->response = (nxt_unit_response_t *) buf->start;
1038     req->response->status = status;
1039 
1040     buf->free = buf->start + sizeof(nxt_unit_response_t)
1041                 + max_fields_count * sizeof(nxt_unit_field_t);
1042 
1043     req->response_max_fields = max_fields_count;
1044     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
1045 
1046     return NXT_UNIT_OK;
1047 }
1048 
1049 
1050 int
1051 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
1052     uint32_t max_fields_count, uint32_t max_fields_size)
1053 {
1054     char                          *p;
1055     uint32_t                      i, buf_size;
1056     nxt_unit_buf_t                *buf;
1057     nxt_unit_field_t              *f, *src;
1058     nxt_unit_response_t           *resp;
1059     nxt_unit_request_info_impl_t  *req_impl;
1060 
1061     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1062 
1063     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1064         nxt_unit_req_warn(req, "realloc: response not init");
1065 
1066         return NXT_UNIT_ERROR;
1067     }
1068 
1069     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1070         nxt_unit_req_warn(req, "realloc: response already sent");
1071 
1072         return NXT_UNIT_ERROR;
1073     }
1074 
1075     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
1076         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
1077 
1078         return NXT_UNIT_ERROR;
1079     }
1080 
1081     buf_size = sizeof(nxt_unit_response_t)
1082                + max_fields_count * sizeof(nxt_unit_field_t)
1083                + max_fields_size;
1084 
1085     buf = nxt_unit_response_buf_alloc(req, buf_size);
1086     if (nxt_slow_path(buf == NULL)) {
1087         return NXT_UNIT_ERROR;
1088     }
1089 
1090     resp = (nxt_unit_response_t *) buf->start;
1091 
1092     memset(resp, 0, sizeof(nxt_unit_response_t));
1093 
1094     resp->status = req->response->status;
1095     resp->content_length = req->response->content_length;
1096 
1097     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
1098     f = resp->fields;
1099 
1100     for (i = 0; i < req->response->fields_count; i++) {
1101         src = req->request->fields + i;
1102 
1103         if (nxt_slow_path(src->skip != 0)) {
1104             continue;
1105         }
1106 
1107         if (nxt_slow_path(src->name_length + src->value_length
1108                           > (uint32_t) (buf->end - p)))
1109         {
1110             goto fail;
1111         }
1112 
1113         nxt_unit_sptr_set(&f->name, p);
1114         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
1115 
1116         nxt_unit_sptr_set(&f->value, p);
1117         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
1118 
1119         f->hash = src->hash;
1120         f->skip = 0;
1121         f->name_length = src->name_length;
1122         f->value_length = src->value_length;
1123 
1124         resp->fields_count++;
1125         f++;
1126     }
1127 
1128     if (req->response->piggyback_content_length > 0) {
1129         if (nxt_slow_path(req->response->piggyback_content_length
1130                           > (uint32_t) (buf->end - p)))
1131         {
1132             goto fail;
1133         }
1134 
1135         resp->piggyback_content_length = req->response->piggyback_content_length;
1136 
1137         nxt_unit_sptr_set(&resp->piggyback_content, p);
1138         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
1139                        req->response->piggyback_content_length);
1140     }
1141 
1142     buf->free = p;
1143 
1144     nxt_unit_buf_free(req->response_buf);
1145 
1146     req->response = resp;
1147     req->response_buf = buf;
1148     req->response_max_fields = max_fields_count;
1149 
1150     return NXT_UNIT_OK;
1151 
1152 fail:
1153 
1154     nxt_unit_buf_free(buf);
1155 
1156     return NXT_UNIT_ERROR;
1157 }
1158 
1159 
1160 int
1161 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
1162 {
1163     nxt_unit_request_info_impl_t  *req_impl;
1164 
1165     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1166 
1167     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
1168 }
1169 
1170 
1171 int
1172 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
1173     const char *name, uint8_t name_length,
1174     const char *value, uint32_t value_length)
1175 {
1176     nxt_unit_buf_t                *buf;
1177     nxt_unit_field_t              *f;
1178     nxt_unit_response_t           *resp;
1179     nxt_unit_request_info_impl_t  *req_impl;
1180 
1181     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1182 
1183     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
1184         nxt_unit_req_warn(req, "add_field: response not initialized or "
1185                           "already sent");
1186 
1187         return NXT_UNIT_ERROR;
1188     }
1189 
1190     resp = req->response;
1191 
1192     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
1193         nxt_unit_req_warn(req, "add_field: too many response fields");
1194 
1195         return NXT_UNIT_ERROR;
1196     }
1197 
1198     buf = req->response_buf;
1199 
1200     if (nxt_slow_path(name_length + value_length
1201                       > (uint32_t) (buf->end - buf->free)))
1202     {
1203         nxt_unit_req_warn(req, "add_field: response buffer overflow");
1204 
1205         return NXT_UNIT_ERROR;
1206     }
1207 
1208     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
1209                        resp->fields_count,
1210                        (int) name_length, name,
1211                        (int) value_length, value);
1212 
1213     f = resp->fields + resp->fields_count;
1214 
1215     nxt_unit_sptr_set(&f->name, buf->free);
1216     buf->free = nxt_cpymem(buf->free, name, name_length);
1217 
1218     nxt_unit_sptr_set(&f->value, buf->free);
1219     buf->free = nxt_cpymem(buf->free, value, value_length);
1220 
1221     f->hash = nxt_unit_field_hash(name, name_length);
1222     f->skip = 0;
1223     f->name_length = name_length;
1224     f->value_length = value_length;
1225 
1226     resp->fields_count++;
1227 
1228     return NXT_UNIT_OK;
1229 }
1230 
1231 
1232 int
1233 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
1234     const void* src, uint32_t size)
1235 {
1236     nxt_unit_buf_t                *buf;
1237     nxt_unit_response_t           *resp;
1238     nxt_unit_request_info_impl_t  *req_impl;
1239 
1240     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1241 
1242     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1243         nxt_unit_req_warn(req, "add_content: response not initialized yet");
1244 
1245         return NXT_UNIT_ERROR;
1246     }
1247 
1248     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1249         nxt_unit_req_warn(req, "add_content: response already sent");
1250 
1251         return NXT_UNIT_ERROR;
1252     }
1253 
1254     buf = req->response_buf;
1255 
1256     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
1257         nxt_unit_req_warn(req, "add_content: buffer overflow");
1258 
1259         return NXT_UNIT_ERROR;
1260     }
1261 
1262     resp = req->response;
1263 
1264     if (resp->piggyback_content_length == 0) {
1265         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
1266         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
1267     }
1268 
1269     resp->piggyback_content_length += size;
1270 
1271     buf->free = nxt_cpymem(buf->free, src, size);
1272 
1273     return NXT_UNIT_OK;
1274 }
1275 
1276 
1277 int
1278 nxt_unit_response_send(nxt_unit_request_info_t *req)
1279 {
1280     int                           rc;
1281     nxt_unit_mmap_buf_t           *mmap_buf;
1282     nxt_unit_request_info_impl_t  *req_impl;
1283 
1284     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1285 
1286     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1287         nxt_unit_req_warn(req, "send: response is not initialized yet");
1288 
1289         return NXT_UNIT_ERROR;
1290     }
1291 
1292     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1293         nxt_unit_req_warn(req, "send: response already sent");
1294 
1295         return NXT_UNIT_ERROR;
1296     }
1297 
1298     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
1299                        req->response->fields_count,
1300                        (int) (req->response_buf->free
1301                               - req->response_buf->start));
1302 
1303     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
1304 
1305     rc = nxt_unit_mmap_buf_send(req->ctx,
1306                                 req_impl->recv_msg.port_msg.stream,
1307                                 mmap_buf, 0);
1308     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
1309         req->response = NULL;
1310         req->response_buf = NULL;
1311         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
1312 
1313         nxt_unit_mmap_buf_release(mmap_buf);
1314     }
1315 
1316     return rc;
1317 }
1318 
1319 
1320 int
1321 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
1322 {
1323     nxt_unit_request_info_impl_t  *req_impl;
1324 
1325     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1326 
1327     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
1328 }
1329 
1330 
1331 nxt_unit_buf_t *
1332 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
1333 {
1334     int                           rc;
1335     nxt_unit_process_t            *process;
1336     nxt_unit_mmap_buf_t           *mmap_buf;
1337     nxt_unit_request_info_impl_t  *req_impl;
1338 
1339     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
1340         nxt_unit_req_warn(req, "response_buf_alloc: "
1341                           "requested buffer (%"PRIu32") too big", size);
1342 
1343         return NULL;
1344     }
1345 
1346     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
1347 
1348     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1349 
1350     process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg);
1351     if (nxt_slow_path(process == NULL)) {
1352         return NULL;
1353     }
1354 
1355     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
1356     if (nxt_slow_path(mmap_buf == NULL)) {
1357         return NULL;
1358     }
1359 
1360     mmap_buf->req = req;
1361 
1362     nxt_queue_insert_tail(&req_impl->outgoing_buf, &mmap_buf->link);
1363 
1364     rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port,
1365                                    size, mmap_buf);
1366     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1367         nxt_unit_mmap_buf_release(mmap_buf);
1368 
1369         return NULL;
1370     }
1371 
1372     return &mmap_buf->buf;
1373 }
1374 
1375 
1376 static nxt_unit_process_t *
1377 nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1378 {
1379     nxt_unit_impl_t  *lib;
1380 
1381     if (recv_msg->process != NULL) {
1382         return recv_msg->process;
1383     }
1384 
1385     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1386 
1387     pthread_mutex_lock(&lib->mutex);
1388 
1389     recv_msg->process = nxt_unit_process_find(ctx, recv_msg->port_msg.pid, 0);
1390 
1391     pthread_mutex_unlock(&lib->mutex);
1392 
1393     if (recv_msg->process == NULL) {
1394         nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
1395                       recv_msg->port_msg.stream, (int) recv_msg->port_msg.pid);
1396     }
1397 
1398     return recv_msg->process;
1399 }
1400 
1401 
1402 static nxt_unit_mmap_buf_t *
1403 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
1404 {
1405     nxt_queue_link_t     *lnk;
1406     nxt_unit_mmap_buf_t  *mmap_buf;
1407     nxt_unit_ctx_impl_t  *ctx_impl;
1408 
1409     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1410 
1411     if (nxt_queue_is_empty(&ctx_impl->free_buf)) {
1412         mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
1413         if (nxt_slow_path(mmap_buf == NULL)) {
1414             nxt_unit_warn(ctx, "failed to allocate buf");
1415         }
1416 
1417     } else {
1418         lnk = nxt_queue_first(&ctx_impl->free_buf);
1419         nxt_queue_remove(lnk);
1420 
1421         mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
1422     }
1423 
1424     mmap_buf->ctx_impl = ctx_impl;
1425 
1426     return mmap_buf;
1427 }
1428 
1429 
1430 static void
1431 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
1432 {
1433     nxt_queue_remove(&mmap_buf->link);
1434 
1435     nxt_queue_insert_tail(&mmap_buf->ctx_impl->free_buf, &mmap_buf->link);
1436 }
1437 
1438 
1439 int
1440 nxt_unit_buf_send(nxt_unit_buf_t *buf)
1441 {
1442     int                           rc;
1443     nxt_unit_mmap_buf_t           *mmap_buf;
1444     nxt_unit_request_info_t       *req;
1445     nxt_unit_request_info_impl_t  *req_impl;
1446 
1447     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1448 
1449     req = mmap_buf->req;
1450     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1451 
1452     nxt_unit_req_debug(req, "buf_send: %d bytes",
1453                        (int) (buf->free - buf->start));
1454 
1455     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1456         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
1457 
1458         return NXT_UNIT_ERROR;
1459     }
1460 
1461     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
1462         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
1463 
1464         return NXT_UNIT_ERROR;
1465     }
1466 
1467     if (nxt_fast_path(buf->free > buf->start)) {
1468         rc = nxt_unit_mmap_buf_send(req->ctx,
1469                                     req_impl->recv_msg.port_msg.stream,
1470                                     mmap_buf, 0);
1471         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1472             return rc;
1473         }
1474     }
1475 
1476     nxt_unit_mmap_buf_release(mmap_buf);
1477 
1478     return NXT_UNIT_OK;
1479 }
1480 
1481 
1482 static void
1483 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
1484 {
1485     int                           rc;
1486     nxt_unit_mmap_buf_t           *mmap_buf;
1487     nxt_unit_request_info_t       *req;
1488     nxt_unit_request_info_impl_t  *req_impl;
1489 
1490     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1491 
1492     req = mmap_buf->req;
1493     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1494 
1495     rc = nxt_unit_mmap_buf_send(req->ctx,
1496                                 req_impl->recv_msg.port_msg.stream,
1497                                 mmap_buf, 1);
1498 
1499     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
1500         nxt_unit_mmap_buf_release(mmap_buf);
1501 
1502         nxt_unit_request_info_release(req);
1503 
1504     } else {
1505         nxt_unit_request_done(req, rc);
1506     }
1507 }
1508 
1509 
1510 static int
1511 nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
1512     nxt_unit_mmap_buf_t *mmap_buf, int last)
1513 {
1514     struct {
1515         nxt_port_msg_t       msg;
1516         nxt_port_mmap_msg_t  mmap_msg;
1517     } m;
1518 
1519     u_char                   *end, *last_used, *first_free;
1520     ssize_t                  res;
1521     nxt_chunk_id_t           first_free_chunk;
1522     nxt_unit_buf_t           *buf;
1523     nxt_unit_impl_t          *lib;
1524     nxt_port_mmap_header_t   *hdr;
1525 
1526     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1527 
1528     buf = &mmap_buf->buf;
1529 
1530     m.mmap_msg.size = buf->free - buf->start;
1531 
1532     m.msg.stream = stream;
1533     m.msg.pid = lib->pid;
1534     m.msg.reply_port = 0;
1535     m.msg.type = _NXT_PORT_MSG_DATA;
1536     m.msg.last = last != 0;
1537     m.msg.mmap = m.mmap_msg.size > 0;
1538     m.msg.nf = 0;
1539     m.msg.mf = 0;
1540     m.msg.tracking = 0;
1541 
1542     hdr = mmap_buf->hdr;
1543 
1544     m.mmap_msg.mmap_id = hdr->id;
1545     m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start);
1546 
1547     nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
1548                    stream,
1549                    (int) m.mmap_msg.mmap_id,
1550                    (int) m.mmap_msg.chunk_id,
1551                    (int) m.mmap_msg.size);
1552 
1553     res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m,
1554                                    m.mmap_msg.size > 0 ? sizeof(m)
1555                                                        : sizeof(m.msg),
1556                                    NULL, 0);
1557     if (nxt_slow_path(res != sizeof(m))) {
1558         return NXT_UNIT_ERROR;
1559     }
1560 
1561     if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
1562         last_used = (u_char *) buf->free - 1;
1563 
1564         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
1565         first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
1566         end = (u_char *) buf->end;
1567 
1568         nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free));
1569 
1570         buf->end = (char *) first_free;
1571     }
1572 
1573     return NXT_UNIT_OK;
1574 }
1575 
1576 
1577 void
1578 nxt_unit_buf_free(nxt_unit_buf_t *buf)
1579 {
1580     nxt_unit_mmap_buf_t  *mmap_buf;
1581 
1582     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1583 
1584     nxt_unit_mmap_release(mmap_buf->hdr, buf->start, buf->end - buf->start);
1585 
1586     nxt_unit_mmap_buf_release(mmap_buf);
1587 }
1588 
1589 
1590 nxt_unit_buf_t *
1591 nxt_unit_buf_next(nxt_unit_buf_t *buf)
1592 {
1593     nxt_queue_link_t              *lnk;
1594     nxt_unit_mmap_buf_t           *mmap_buf;
1595     nxt_unit_request_info_impl_t  *req_impl;
1596 
1597     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1598     req_impl = nxt_container_of(mmap_buf->req, nxt_unit_request_info_impl_t,
1599                                 req);
1600 
1601     lnk = &mmap_buf->link;
1602 
1603     if (lnk == nxt_queue_last(&req_impl->incoming_buf)) {
1604         return NULL;
1605     }
1606 
1607     lnk = nxt_queue_next(lnk);
1608     mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
1609 
1610     return &mmap_buf->buf;
1611 }
1612 
1613 
1614 uint32_t
1615 nxt_unit_buf_max(void)
1616 {
1617     return PORT_MMAP_DATA_SIZE;
1618 }
1619 
1620 
1621 uint32_t
1622 nxt_unit_buf_min(void)
1623 {
1624     return PORT_MMAP_CHUNK_SIZE;
1625 }
1626 
1627 
1628 int
1629 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
1630     size_t size)
1631 {
1632     int                           rc;
1633     uint32_t                      part_size;
1634     const char                    *part_start;
1635     nxt_unit_process_t            *process;
1636     nxt_unit_mmap_buf_t           mmap_buf;
1637     nxt_unit_request_info_impl_t  *req_impl;
1638 
1639     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1640 
1641     part_start = start;
1642 
1643     /* Check if response is not send yet. */
1644     if (nxt_slow_path(req->response_buf)) {
1645         part_size = req->response_buf->end - req->response_buf->free;
1646         part_size = nxt_min(size, part_size);
1647 
1648         rc = nxt_unit_response_add_content(req, part_start, part_size);
1649         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1650             return rc;
1651         }
1652 
1653         rc = nxt_unit_response_send(req);
1654         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1655             return rc;
1656         }
1657 
1658         size -= part_size;
1659         part_start += part_size;
1660     }
1661 
1662     process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg);
1663     if (nxt_slow_path(process == NULL)) {
1664         return NXT_UNIT_ERROR;
1665     }
1666 
1667     while (size > 0) {
1668         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
1669 
1670         rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port,
1671                                        part_size, &mmap_buf);
1672         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1673             return rc;
1674         }
1675 
1676         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
1677                                        part_start, part_size);
1678 
1679         rc = nxt_unit_mmap_buf_send(req->ctx,
1680                                     req_impl->recv_msg.port_msg.stream,
1681                                     &mmap_buf, 0);
1682         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1683             nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start,
1684                                   mmap_buf.buf.end - mmap_buf.buf.start);
1685 
1686             return rc;
1687         }
1688 
1689         size -= part_size;
1690         part_start += part_size;
1691     }
1692 
1693     return NXT_UNIT_OK;
1694 }
1695 
1696 
1697 int
1698 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
1699     nxt_unit_read_info_t *read_info)
1700 {
1701     int             rc;
1702     ssize_t         n;
1703     nxt_unit_buf_t  *buf;
1704 
1705     /* Check if response is not send yet. */
1706     if (nxt_slow_path(req->response_buf)) {
1707 
1708         /* Enable content in headers buf. */
1709         rc = nxt_unit_response_add_content(req, "", 0);
1710         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1711             nxt_unit_req_error(req, "Failed to add piggyback content");
1712 
1713             return rc;
1714         }
1715 
1716         buf = req->response_buf;
1717 
1718         while (buf->end - buf->free > 0) {
1719             n = read_info->read(read_info, buf->free, buf->end - buf->free);
1720             if (nxt_slow_path(n < 0)) {
1721                 nxt_unit_req_error(req, "Read error");
1722 
1723                 return NXT_UNIT_ERROR;
1724             }
1725 
1726             /* Manually increase sizes. */
1727             buf->free += n;
1728             req->response->piggyback_content_length += n;
1729 
1730             if (read_info->eof) {
1731                 break;
1732             }
1733         }
1734 
1735         rc = nxt_unit_response_send(req);
1736         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1737             nxt_unit_req_error(req, "Failed to send headers with content");
1738 
1739             return rc;
1740         }
1741 
1742         if (read_info->eof) {
1743             return NXT_UNIT_OK;
1744         }
1745     }
1746 
1747     while (!read_info->eof) {
1748         buf = nxt_unit_response_buf_alloc(req, nxt_min(read_info->buf_size,
1749                                                        PORT_MMAP_DATA_SIZE));
1750         if (nxt_slow_path(buf == NULL)) {
1751             nxt_unit_req_error(req, "Failed to allocate buf for content");
1752 
1753             return NXT_UNIT_ERROR;
1754         }
1755 
1756         while (!read_info->eof && buf->end > buf->free) {
1757             n = read_info->read(read_info, buf->free, buf->end - buf->free);
1758             if (nxt_slow_path(n < 0)) {
1759                 nxt_unit_req_error(req, "Read error");
1760 
1761                 nxt_unit_buf_free(buf);
1762 
1763                 return NXT_UNIT_ERROR;
1764             }
1765 
1766             buf->free += n;
1767         }
1768 
1769         rc = nxt_unit_buf_send(buf);
1770         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1771             nxt_unit_req_error(req, "Failed to send content");
1772 
1773             return rc;
1774         }
1775     }
1776 
1777     return NXT_UNIT_OK;
1778 }
1779 
1780 
1781 ssize_t
1782 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
1783 {
1784     u_char          *p;
1785     size_t          rest, copy, read;
1786     nxt_unit_buf_t  *buf;
1787 
1788     p = dst;
1789     rest = size;
1790 
1791     buf = req->content_buf;
1792 
1793     while (buf != NULL) {
1794         copy = buf->end - buf->free;
1795         copy = nxt_min(rest, copy);
1796 
1797         p = nxt_cpymem(p, buf->free, copy);
1798 
1799         buf->free += copy;
1800         rest -= copy;
1801 
1802         if (rest == 0) {
1803             if (buf->end == buf->free) {
1804                 buf = nxt_unit_buf_next(buf);
1805             }
1806 
1807             break;
1808         }
1809 
1810         buf = nxt_unit_buf_next(buf);
1811     }
1812 
1813     req->content_buf = buf;
1814 
1815     read = size - rest;
1816 
1817     req->content_length -= read;
1818 
1819     return read;
1820 }
1821 
1822 
1823 void
1824 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
1825 {
1826     ssize_t                       res;
1827     uint32_t                      size;
1828     nxt_port_msg_t                msg;
1829     nxt_unit_impl_t               *lib;
1830     nxt_unit_request_info_impl_t  *req_impl;
1831 
1832     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1833 
1834     nxt_unit_req_debug(req, "done: %d", rc);
1835 
1836     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1837         goto skip_response_send;
1838     }
1839 
1840     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1841 
1842         size = nxt_length("Content-Type") + nxt_length("text/plain");
1843 
1844         rc = nxt_unit_response_init(req, 200, 1, size);
1845         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1846             goto skip_response_send;
1847         }
1848 
1849         rc = nxt_unit_response_add_field(req, "Content-Type",
1850                                    nxt_length("Content-Type"),
1851                                    "text/plain", nxt_length("text/plain"));
1852         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1853             goto skip_response_send;
1854         }
1855     }
1856 
1857     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
1858 
1859         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
1860 
1861         nxt_unit_buf_send_done(req->response_buf);
1862 
1863         return;
1864     }
1865 
1866 skip_response_send:
1867 
1868     lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
1869 
1870     msg.stream = req_impl->recv_msg.port_msg.stream;
1871     msg.pid = lib->pid;
1872     msg.reply_port = 0;
1873     msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
1874                                    : _NXT_PORT_MSG_RPC_ERROR;
1875     msg.last = 1;
1876     msg.mmap = 0;
1877     msg.nf = 0;
1878     msg.mf = 0;
1879     msg.tracking = 0;
1880 
1881     res = lib->callbacks.port_send(req->ctx, &req->response_port,
1882                                    &msg, sizeof(msg), NULL, 0);
1883     if (nxt_slow_path(res != sizeof(msg))) {
1884         nxt_unit_req_alert(req, "last message send failed: %s (%d)",
1885                            strerror(errno), errno);
1886     }
1887 
1888     nxt_unit_request_info_release(req);
1889 }
1890 
1891 
1892 static nxt_port_mmap_header_t *
1893 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
1894     nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n)
1895 {
1896     int                     res, nchunks, i;
1897     nxt_unit_mmap_t         *mm, *mm_end;
1898     nxt_port_mmap_header_t  *hdr;
1899 
1900     pthread_mutex_lock(&process->outgoing.mutex);
1901 
1902     mm_end = process->outgoing.elts + process->outgoing.size;
1903 
1904     for (mm = process->outgoing.elts; mm < mm_end; mm++) {
1905         hdr = mm->hdr;
1906 
1907         if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) {
1908             continue;
1909         }
1910 
1911         *c = 0;
1912 
1913         while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
1914             nchunks = 1;
1915 
1916             while (nchunks < n) {
1917                 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
1918                                                        *c + nchunks);
1919 
1920                 if (res == 0) {
1921                     for (i = 0; i < nchunks; i++) {
1922                         nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
1923                     }
1924 
1925                     *c += nchunks + 1;
1926                     nchunks = 0;
1927                     break;
1928                 }
1929 
1930                 nchunks++;
1931             }
1932 
1933             if (nchunks == n) {
1934                 goto unlock;
1935             }
1936         }
1937     }
1938 
1939     *c = 0;
1940     hdr = nxt_unit_new_mmap(ctx, process, port_id, n);
1941 
1942 unlock:
1943 
1944     pthread_mutex_unlock(&process->outgoing.mutex);
1945 
1946     return hdr;
1947 }
1948 
1949 
1950 static nxt_unit_mmap_t *
1951 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
1952 {
1953     uint32_t  cap;
1954 
1955     cap = mmaps->cap;
1956 
1957     if (cap == 0) {
1958         cap = i + 1;
1959     }
1960 
1961     while (i + 1 > cap) {
1962 
1963         if (cap < 16) {
1964           cap = cap * 2;
1965 
1966         } else {
1967           cap = cap + cap / 2;
1968         }
1969     }
1970 
1971     if (cap != mmaps->cap) {
1972 
1973         mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
1974         if (nxt_slow_path(mmaps->elts == NULL)) {
1975             return NULL;
1976         }
1977 
1978         memset(mmaps->elts + mmaps->cap, 0,
1979                sizeof(*mmaps->elts) * (cap - mmaps->cap));
1980 
1981         mmaps->cap = cap;
1982     }
1983 
1984     if (i + 1 > mmaps->size) {
1985         mmaps->size = i + 1;
1986     }
1987 
1988     return mmaps->elts + i;
1989 }
1990 
1991 
1992 static nxt_port_mmap_header_t *
1993 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
1994     nxt_unit_port_id_t *port_id, int n)
1995 {
1996     int                     i, fd, rc;
1997     void                    *mem;
1998     char                    name[64];
1999     nxt_unit_mmap_t         *mm;
2000     nxt_unit_impl_t         *lib;
2001     nxt_port_mmap_header_t  *hdr;
2002 
2003     lib = process->lib;
2004 
2005     mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
2006     if (nxt_slow_path(mm == NULL)) {
2007         nxt_unit_warn(ctx, "failed to add mmap to outgoing array");
2008 
2009         return NULL;
2010     }
2011 
2012     snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
2013              lib->pid, (void *) pthread_self());
2014 
2015 #if (NXT_HAVE_MEMFD_CREATE)
2016 
2017     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
2018     if (nxt_slow_path(fd == -1)) {
2019         nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
2020                        strerror(errno), errno);
2021 
2022         goto remove_fail;
2023     }
2024 
2025     nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
2026 
2027 #elif (NXT_HAVE_SHM_OPEN_ANON)
2028 
2029     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
2030     if (nxt_slow_path(fd == -1)) {
2031         nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
2032                        strerror(errno), errno);
2033 
2034         goto remove_fail;
2035     }
2036 
2037 #elif (NXT_HAVE_SHM_OPEN)
2038 
2039     /* Just in case. */
2040     shm_unlink(name);
2041 
2042     fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
2043     if (nxt_slow_path(fd == -1)) {
2044         nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
2045                        strerror(errno), errno);
2046 
2047         goto remove_fail;
2048     }
2049 
2050     if (nxt_slow_path(shm_unlink(name) == -1)) {
2051         nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
2052                       strerror(errno), errno);
2053     }
2054 
2055 #else
2056 
2057 #error No working shared memory implementation.
2058 
2059 #endif
2060 
2061     if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
2062         nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
2063                        strerror(errno), errno);
2064 
2065         goto remove_fail;
2066     }
2067 
2068     mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2069     if (nxt_slow_path(mem == MAP_FAILED)) {
2070         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
2071                        strerror(errno), errno);
2072 
2073         goto remove_fail;
2074     }
2075 
2076     mm->hdr = mem;
2077     hdr = mem;
2078 
2079     memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
2080     memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
2081 
2082     hdr->id = process->outgoing.size - 1;
2083     hdr->src_pid = lib->pid;
2084     hdr->dst_pid = process->pid;
2085     hdr->sent_over = port_id->id;
2086 
2087     /* Mark first n chunk(s) as busy */
2088     for (i = 0; i < n; i++) {
2089         nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
2090     }
2091 
2092     /* Mark as busy chunk followed the last available chunk. */
2093     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
2094     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
2095 
2096     pthread_mutex_unlock(&process->outgoing.mutex);
2097 
2098     rc = nxt_unit_send_mmap(ctx, port_id, fd);
2099     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2100         munmap(mem, PORT_MMAP_SIZE);
2101         hdr = NULL;
2102 
2103     } else {
2104         nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
2105                        hdr->id, (int) lib->pid, (int) process->pid);
2106     }
2107 
2108     close(fd);
2109 
2110     pthread_mutex_lock(&process->outgoing.mutex);
2111 
2112     if (nxt_fast_path(hdr != NULL)) {
2113         return hdr;
2114     }
2115 
2116 remove_fail:
2117 
2118     process->outgoing.size--;
2119 
2120     return NULL;
2121 }
2122 
2123 
2124 static int
2125 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
2126 {
2127     ssize_t          res;
2128     nxt_port_msg_t   msg;
2129     nxt_unit_impl_t  *lib;
2130     union {
2131         struct cmsghdr  cm;
2132         char            space[CMSG_SPACE(sizeof(int))];
2133     } cmsg;
2134 
2135     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2136 
2137     msg.stream = 0;
2138     msg.pid = lib->pid;
2139     msg.reply_port = 0;
2140     msg.type = _NXT_PORT_MSG_MMAP;
2141     msg.last = 0;
2142     msg.mmap = 0;
2143     msg.nf = 0;
2144     msg.mf = 0;
2145     msg.tracking = 0;
2146 
2147 #if (NXT_VALGRIND)
2148     memset(&cmsg, 0, sizeof(cmsg));
2149 #endif
2150 
2151     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
2152     cmsg.cm.cmsg_level = SOL_SOCKET;
2153     cmsg.cm.cmsg_type = SCM_RIGHTS;
2154 
2155     /*
2156      * memcpy() is used instead of simple
2157      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
2158      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
2159      *   dereferencing type-punned pointer will break strict-aliasing rules
2160      *
2161      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
2162      * in the same simple assignment as in the code above.
2163      */
2164     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
2165 
2166     res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg),
2167                                    &cmsg, sizeof(cmsg));
2168     if (nxt_slow_path(res != sizeof(msg))) {
2169         nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)",
2170                       (int) port_id->pid, strerror(errno), errno);
2171 
2172         return NXT_UNIT_ERROR;
2173     }
2174 
2175     return NXT_UNIT_OK;
2176 }
2177 
2178 
2179 static int
2180 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2181     nxt_unit_port_id_t *port_id, uint32_t size,
2182     nxt_unit_mmap_buf_t *mmap_buf)
2183 {
2184     uint32_t                nchunks;
2185     nxt_chunk_id_t          c;
2186     nxt_port_mmap_header_t  *hdr;
2187 
2188     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
2189 
2190     hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks);
2191     if (nxt_slow_path(hdr == NULL)) {
2192         return NXT_UNIT_ERROR;
2193     }
2194 
2195     mmap_buf->hdr = hdr;
2196     mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
2197     mmap_buf->buf.free = mmap_buf->buf.start;
2198     mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
2199     mmap_buf->port_id = *port_id;
2200 
2201     nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
2202                   (int) hdr->id, (int) c,
2203                   (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
2204 
2205     return NXT_UNIT_OK;
2206 }
2207 
2208 
2209 static int
2210 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
2211 {
2212     int                      rc;
2213     void                     *mem;
2214     struct stat              mmap_stat;
2215     nxt_unit_mmap_t          *mm;
2216     nxt_unit_impl_t          *lib;
2217     nxt_unit_process_t       *process;
2218     nxt_port_mmap_header_t   *hdr;
2219 
2220     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2221 
2222     nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
2223 
2224     pthread_mutex_lock(&lib->mutex);
2225 
2226     process = nxt_unit_process_find(ctx, pid, 0);
2227 
2228     pthread_mutex_unlock(&lib->mutex);
2229 
2230     if (nxt_slow_path(process == NULL)) {
2231         nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d",
2232                       (int) pid, fd);
2233 
2234         return NXT_UNIT_ERROR;
2235     }
2236 
2237     rc = NXT_UNIT_ERROR;
2238 
2239     if (fstat(fd, &mmap_stat) == -1) {
2240         nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
2241                       strerror(errno), errno);
2242 
2243         goto fail;
2244     }
2245 
2246     mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
2247                MAP_SHARED, fd, 0);
2248     if (nxt_slow_path(mem == MAP_FAILED)) {
2249         nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
2250                       strerror(errno), errno);
2251 
2252         goto fail;
2253     }
2254 
2255     hdr = mem;
2256 
2257     if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {
2258 
2259         nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
2260                       "detected: %d != %d or %d != %d", (int) hdr->src_pid,
2261                       (int) pid, (int) hdr->dst_pid, (int) lib->pid);
2262 
2263         munmap(mem, PORT_MMAP_SIZE);
2264 
2265         goto fail;
2266     }
2267 
2268     pthread_mutex_lock(&process->incoming.mutex);
2269 
2270     mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
2271     if (nxt_slow_path(mm == NULL)) {
2272         nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");
2273 
2274         munmap(mem, PORT_MMAP_SIZE);
2275 
2276     } else {
2277         mm->hdr = hdr;
2278 
2279         hdr->sent_over = 0xFFFFu;
2280 
2281         rc = NXT_UNIT_OK;
2282     }
2283 
2284     pthread_mutex_unlock(&process->incoming.mutex);
2285 
2286 fail:
2287 
2288     nxt_unit_process_use(ctx, process, -1);
2289 
2290     return rc;
2291 }
2292 
2293 
2294 static void
2295 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
2296 {
2297     pthread_mutex_init(&mmaps->mutex, NULL);
2298 
2299     mmaps->size = 0;
2300     mmaps->cap = 0;
2301     mmaps->elts = NULL;
2302 }
2303 
2304 
2305 static void
2306 nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i)
2307 {
2308     long c;
2309 
2310     c = nxt_atomic_fetch_add(&process->use_count, i);
2311 
2312     if (i < 0 && c == -i) {
2313         nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid);
2314 
2315         nxt_unit_mmaps_destroy(&process->incoming);
2316         nxt_unit_mmaps_destroy(&process->outgoing);
2317 
2318         free(process);
2319     }
2320 }
2321 
2322 
2323 static void
2324 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
2325 {
2326     nxt_unit_mmap_t  *mm, *end;
2327 
2328     if (mmaps->elts != NULL) {
2329         end = mmaps->elts + mmaps->size;
2330 
2331         for (mm = mmaps->elts; mm < end; mm++) {
2332             munmap(mm->hdr, PORT_MMAP_SIZE);
2333         }
2334 
2335         free(mmaps->elts);
2336     }
2337 
2338     pthread_mutex_destroy(&mmaps->mutex);
2339 }
2340 
2341 
2342 static nxt_port_mmap_header_t *
2343 nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2344     uint32_t id)
2345 {
2346     nxt_port_mmap_header_t  *hdr;
2347 
2348     if (nxt_fast_path(process->incoming.size > id)) {
2349         hdr = process->incoming.elts[id].hdr;
2350 
2351     } else {
2352         hdr = NULL;
2353     }
2354 
2355     return hdr;
2356 }
2357 
2358 
2359 static int
2360 nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
2361 {
2362     int                           rc;
2363     nxt_chunk_id_t                c;
2364     nxt_unit_process_t            *process;
2365     nxt_port_mmap_header_t        *hdr;
2366     nxt_port_mmap_tracking_msg_t  *tracking_msg;
2367 
2368     if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
2369         nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
2370                       recv_msg->port_msg.stream, (int) recv_msg->size);
2371 
2372         return 0;
2373     }
2374 
2375     tracking_msg = recv_msg->start;
2376 
2377     recv_msg->start = tracking_msg + 1;
2378     recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
2379 
2380     process = nxt_unit_msg_get_process(ctx, recv_msg);
2381     if (nxt_slow_path(process == NULL)) {
2382         return 0;
2383     }
2384 
2385     pthread_mutex_lock(&process->incoming.mutex);
2386 
2387     hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
2388     if (nxt_slow_path(hdr == NULL)) {
2389         pthread_mutex_unlock(&process->incoming.mutex);
2390 
2391         nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
2392                       "invalid mmap id %d,%"PRIu32,
2393                       recv_msg->port_msg.stream,
2394                       (int) process->pid, tracking_msg->mmap_id);
2395 
2396         return 0;
2397     }
2398 
2399     c = tracking_msg->tracking_id;
2400     rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->port_msg.stream, 0);
2401 
2402     if (rc == 0) {
2403         nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
2404                        recv_msg->port_msg.stream);
2405 
2406         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
2407     }
2408 
2409     pthread_mutex_unlock(&process->incoming.mutex);
2410 
2411     return rc;
2412 }
2413 
2414 
2415 static int
2416 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
2417     nxt_queue_t *incoming_buf)
2418 {
2419     void                    *start;
2420     uint32_t                size;
2421     nxt_unit_process_t      *process;
2422     nxt_unit_mmap_buf_t     *b;
2423     nxt_port_mmap_msg_t     *mmap_msg, *end;
2424     nxt_port_mmap_header_t  *hdr;
2425 
2426     if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
2427         nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
2428                       recv_msg->port_msg.stream, (int) recv_msg->size);
2429 
2430         return NXT_UNIT_ERROR;
2431     }
2432 
2433     process = nxt_unit_msg_get_process(ctx, recv_msg);
2434     if (nxt_slow_path(process == NULL)) {
2435         return NXT_UNIT_ERROR;
2436     }
2437 
2438     mmap_msg = recv_msg->start;
2439     end = nxt_pointer_to(recv_msg->start, recv_msg->size);
2440 
2441     pthread_mutex_lock(&process->incoming.mutex);
2442 
2443     for (; mmap_msg < end; mmap_msg++) {
2444         hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
2445         if (nxt_slow_path(hdr == NULL)) {
2446             pthread_mutex_unlock(&process->incoming.mutex);
2447 
2448             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
2449                           "invalid mmap id %d,%"PRIu32,
2450                           recv_msg->port_msg.stream,
2451                           (int) process->pid, mmap_msg->mmap_id);
2452 
2453             return NXT_UNIT_ERROR;
2454         }
2455 
2456         start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
2457         size = mmap_msg->size;
2458 
2459         if (recv_msg->start == mmap_msg) {
2460             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
2461                           "move start %p -> %p",
2462                           recv_msg->port_msg.stream,
2463                           recv_msg->start, start);
2464 
2465             recv_msg->start = start;
2466             recv_msg->size = size;
2467         }
2468 
2469         b = nxt_unit_mmap_buf_get(ctx);
2470         if (nxt_slow_path(b == NULL)) {
2471             pthread_mutex_unlock(&process->incoming.mutex);
2472 
2473             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
2474                           "failed to allocate buf",
2475                           recv_msg->port_msg.stream);
2476 
2477             nxt_unit_mmap_release(hdr, start, size);
2478 
2479             return NXT_UNIT_ERROR;
2480         }
2481 
2482         nxt_queue_insert_tail(incoming_buf, &b->link);
2483 
2484         b->buf.start = start;
2485         b->buf.free = start;
2486         b->buf.end = b->buf.start + size;
2487         b->hdr = hdr;
2488 
2489         nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)\n"
2490                        "%.*s",
2491                        recv_msg->port_msg.stream,
2492                        start, (int) size,
2493                        (int) hdr->src_pid, (int) hdr->dst_pid,
2494                        (int) hdr->id, (int) mmap_msg->chunk_id,
2495                        (int) mmap_msg->size,
2496                        (int) size, (char *) start);
2497     }
2498 
2499     pthread_mutex_unlock(&process->incoming.mutex);
2500 
2501     return NXT_UNIT_OK;
2502 }
2503 
2504 
2505 static int
2506 nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size)
2507 {
2508     u_char          *p, *end;
2509     nxt_chunk_id_t  c;
2510 
2511     memset(start, 0xA5, size);
2512 
2513     p = start;
2514     end = p + size;
2515     c = nxt_port_mmap_chunk_id(hdr, p);
2516 
2517     while (p < end) {
2518         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
2519 
2520         p += PORT_MMAP_CHUNK_SIZE;
2521         c++;
2522     }
2523 
2524     return NXT_UNIT_OK;
2525 }
2526 
2527 
2528 static nxt_int_t
2529 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
2530 {
2531     nxt_process_t  *process;
2532 
2533     process = data;
2534 
2535     if (lhq->key.length == sizeof(pid_t)
2536         && *(pid_t *) lhq->key.start == process->pid)
2537     {
2538         return NXT_OK;
2539     }
2540 
2541     return NXT_DECLINED;
2542 }
2543 
2544 
2545 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
2546     NXT_LVLHSH_DEFAULT,
2547     nxt_unit_lvlhsh_pid_test,
2548     nxt_lvlhsh_alloc,
2549     nxt_lvlhsh_free,
2550 };
2551 
2552 
2553 static inline void
2554 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
2555 {
2556     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
2557     lhq->key.length = sizeof(*pid);
2558     lhq->key.start = (u_char *) pid;
2559     lhq->proto = &lvlhsh_processes_proto;
2560 }
2561 
2562 
2563 static nxt_unit_process_t *
2564 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
2565 {
2566     nxt_unit_impl_t     *lib;
2567     nxt_unit_process_t  *process;
2568     nxt_lvlhsh_query_t  lhq;
2569 
2570     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2571 
2572     nxt_unit_process_lhq_pid(&lhq, &pid);
2573 
2574     if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
2575         process = lhq.value;
2576         nxt_unit_process_use(ctx, process, 1);
2577 
2578         return process;
2579     }
2580 
2581     process = malloc(sizeof(nxt_unit_process_t));
2582     if (nxt_slow_path(process == NULL)) {
2583         nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid);
2584 
2585         return NULL;
2586     }
2587 
2588     process->pid = pid;
2589     process->use_count = 1;
2590     process->next_port_id = 0;
2591     process->lib = lib;
2592 
2593     nxt_queue_init(&process->ports);
2594 
2595     nxt_unit_mmaps_init(&process->incoming);
2596     nxt_unit_mmaps_init(&process->outgoing);
2597 
2598     lhq.replace = 0;
2599     lhq.value = process;
2600 
2601     switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
2602 
2603     case NXT_OK:
2604         break;
2605 
2606     default:
2607         nxt_unit_warn(ctx, "process %d insert failed", (int) pid);
2608 
2609         pthread_mutex_destroy(&process->outgoing.mutex);
2610         pthread_mutex_destroy(&process->incoming.mutex);
2611         free(process);
2612         process = NULL;
2613         break;
2614     }
2615 
2616     nxt_unit_process_use(ctx, process, 1);
2617 
2618     return process;
2619 }
2620 
2621 
2622 static nxt_unit_process_t *
2623 nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove)
2624 {
2625     int                 rc;
2626     nxt_unit_impl_t     *lib;
2627     nxt_unit_process_t  *process;
2628     nxt_lvlhsh_query_t  lhq;
2629 
2630     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2631 
2632     nxt_unit_process_lhq_pid(&lhq, &pid);
2633 
2634     if (remove) {
2635         rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
2636 
2637     } else {
2638         rc = nxt_lvlhsh_find(&lib->processes, &lhq);
2639     }
2640 
2641     if (rc == NXT_OK) {
2642         process = lhq.value;
2643 
2644         if (!remove) {
2645             nxt_unit_process_use(ctx, process, 1);
2646         }
2647 
2648         return process;
2649     }
2650 
2651     return NULL;
2652 }
2653 
2654 
2655 static nxt_unit_process_t *
2656 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
2657 {
2658     return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
2659 }
2660 
2661 
2662 int
2663 nxt_unit_run(nxt_unit_ctx_t *ctx)
2664 {
2665     int              rc;
2666     nxt_unit_impl_t  *lib;
2667 
2668     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2669     rc = NXT_UNIT_OK;
2670 
2671     while (nxt_fast_path(lib->online)) {
2672         rc = nxt_unit_run_once(ctx);
2673     }
2674 
2675     return rc;
2676 }
2677 
2678 
2679 static int
2680 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
2681 {
2682     int                  rc;
2683     char                 buf[4096];
2684     char                 oob[256];
2685     ssize_t              rsize;
2686     nxt_unit_impl_t      *lib;
2687     nxt_unit_ctx_impl_t  *ctx_impl;
2688 
2689     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2690     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2691 
2692     memset(oob, 0, sizeof(struct cmsghdr));
2693 
2694     if (ctx_impl->read_port_fd != -1) {
2695         rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
2696                                          buf, sizeof(buf),
2697                                          oob, sizeof(oob));
2698     } else {
2699         rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
2700                                          buf, sizeof(buf),
2701                                          oob, sizeof(oob));
2702     }
2703 
2704     if (nxt_fast_path(rsize > 0)) {
2705         rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize,
2706                                   oob, sizeof(oob));
2707     } else {
2708         rc = NXT_UNIT_ERROR;
2709     }
2710 
2711     return rc;
2712 }
2713 
2714 
2715 void
2716 nxt_unit_done(nxt_unit_ctx_t *ctx)
2717 {
2718     nxt_unit_impl_t      *lib;
2719     nxt_unit_process_t   *process;
2720     nxt_unit_ctx_impl_t  *ctx_impl;
2721 
2722     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2723 
2724     nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
2725 
2726         nxt_unit_ctx_free(&ctx_impl->ctx);
2727 
2728     } nxt_queue_loop;
2729 
2730     for ( ;; ) {
2731         process = nxt_unit_process_pop_first(lib);
2732         if (process == NULL) {
2733             break;
2734         }
2735 
2736         nxt_unit_remove_process(ctx, process);
2737     }
2738 
2739     free(lib);
2740 }
2741 
2742 
2743 nxt_unit_ctx_t *
2744 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
2745 {
2746     int                  rc, fd;
2747     nxt_unit_impl_t      *lib;
2748     nxt_unit_port_id_t   new_port_id;
2749     nxt_unit_ctx_impl_t  *new_ctx;
2750 
2751     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2752 
2753     new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
2754     if (nxt_slow_path(new_ctx == NULL)) {
2755         nxt_unit_warn(ctx, "failed to allocate context");
2756 
2757         return NULL;
2758     }
2759 
2760     rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
2761     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2762         free(new_ctx);
2763 
2764         return NULL;
2765     }
2766 
2767     rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd);
2768     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2769         lib->callbacks.remove_port(ctx, &new_port_id);
2770 
2771         close(fd);
2772 
2773         free(new_ctx);
2774 
2775         return NULL;
2776     }
2777 
2778     close(fd);
2779 
2780     nxt_unit_ctx_init(lib, new_ctx, data);
2781 
2782     new_ctx->read_port_id = new_port_id;
2783 
2784     return &new_ctx->ctx;
2785 }
2786 
2787 
2788 void
2789 nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
2790 {
2791     nxt_unit_impl_t               *lib;
2792     nxt_unit_ctx_impl_t           *ctx_impl;
2793     nxt_unit_mmap_buf_t           *mmap_buf;
2794     nxt_unit_request_info_impl_t  *req_impl;
2795 
2796     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2797     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2798 
2799     nxt_queue_each(req_impl, &ctx_impl->active_req,
2800                    nxt_unit_request_info_impl_t, link)
2801     {
2802         nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
2803 
2804         nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
2805 
2806     } nxt_queue_loop;
2807 
2808     nxt_queue_remove(&ctx_impl->ctx_buf[0].link);
2809     nxt_queue_remove(&ctx_impl->ctx_buf[1].link);
2810 
2811     nxt_queue_each(mmap_buf, &ctx_impl->free_buf, nxt_unit_mmap_buf_t, link) {
2812 
2813         nxt_queue_remove(&mmap_buf->link);
2814         free(mmap_buf);
2815 
2816     } nxt_queue_loop;
2817 
2818     nxt_queue_each(req_impl, &ctx_impl->free_req,
2819                    nxt_unit_request_info_impl_t, link)
2820     {
2821         nxt_unit_request_info_free(req_impl);
2822 
2823     } nxt_queue_loop;
2824 
2825     nxt_queue_remove(&ctx_impl->link);
2826 
2827     if (ctx_impl != &lib->main_ctx) {
2828         free(ctx_impl);
2829     }
2830 }
2831 
2832 
2833 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
2834 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
2835 #define NXT_UNIX_SOCKET  SOCK_SEQPACKET
2836 #else
2837 #define NXT_UNIX_SOCKET  SOCK_DGRAM
2838 #endif
2839 
2840 
2841 void
2842 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
2843 {
2844     nxt_unit_port_hash_id_t  port_hash_id;
2845 
2846     port_hash_id.pid = pid;
2847     port_hash_id.id = id;
2848 
2849     port_id->pid = pid;
2850     port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
2851     port_id->id = id;
2852 }
2853 
2854 
2855 int
2856 nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
2857     nxt_unit_port_id_t *port_id)
2858 {
2859     int                 rc, fd;
2860     nxt_unit_impl_t     *lib;
2861     nxt_unit_port_id_t  new_port_id;
2862 
2863     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2864 
2865     rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
2866     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2867         return rc;
2868     }
2869 
2870     rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd);
2871 
2872     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2873         *port_id = new_port_id;
2874 
2875     } else {
2876         lib->callbacks.remove_port(ctx, &new_port_id);
2877     }
2878 
2879     close(fd);
2880 
2881     return rc;
2882 }
2883 
2884 
2885 static int
2886 nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
2887 {
2888     int                 rc, port_sockets[2];
2889     nxt_unit_impl_t     *lib;
2890     nxt_unit_port_t     new_port;
2891     nxt_unit_process_t  *process;
2892 
2893     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2894 
2895     rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
2896     if (nxt_slow_path(rc != 0)) {
2897         nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
2898                       strerror(errno), errno);
2899 
2900         return NXT_UNIT_ERROR;
2901     }
2902 
2903     nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
2904                    port_sockets[0], port_sockets[1]);
2905 
2906     pthread_mutex_lock(&lib->mutex);
2907 
2908     process = nxt_unit_process_get(ctx, lib->pid);
2909     if (nxt_slow_path(process == NULL)) {
2910         pthread_mutex_unlock(&lib->mutex);
2911 
2912         close(port_sockets[0]);
2913         close(port_sockets[1]);
2914 
2915         return NXT_UNIT_ERROR;
2916     }
2917 
2918     nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
2919 
2920     new_port.in_fd = port_sockets[0];
2921     new_port.out_fd = -1;
2922     new_port.data = NULL;
2923 
2924     pthread_mutex_unlock(&lib->mutex);
2925 
2926     nxt_unit_process_use(ctx, process, -1);
2927 
2928     rc = lib->callbacks.add_port(ctx, &new_port);
2929     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2930         nxt_unit_warn(ctx, "create_port: add_port() failed");
2931 
2932         close(port_sockets[0]);
2933         close(port_sockets[1]);
2934 
2935         return rc;
2936     }
2937 
2938     *port_id = new_port.id;
2939     *fd = port_sockets[1];
2940 
2941     return rc;
2942 }
2943 
2944 
2945 static int
2946 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
2947     nxt_unit_port_id_t *new_port, int fd)
2948 {
2949     ssize_t          res;
2950     nxt_unit_impl_t  *lib;
2951 
2952     struct {
2953         nxt_port_msg_t            msg;
2954         nxt_port_msg_new_port_t   new_port;
2955     } m;
2956 
2957     union {
2958         struct cmsghdr  cm;
2959         char            space[CMSG_SPACE(sizeof(int))];
2960     } cmsg;
2961 
2962     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2963 
2964     m.msg.stream = 0;
2965     m.msg.pid = lib->pid;
2966     m.msg.reply_port = 0;
2967     m.msg.type = _NXT_PORT_MSG_NEW_PORT;
2968     m.msg.last = 0;
2969     m.msg.mmap = 0;
2970     m.msg.nf = 0;
2971     m.msg.mf = 0;
2972     m.msg.tracking = 0;
2973 
2974     m.new_port.id = new_port->id;
2975     m.new_port.pid = new_port->pid;
2976     m.new_port.type = NXT_PROCESS_WORKER;
2977     m.new_port.max_size = 16 * 1024;
2978     m.new_port.max_share = 64 * 1024;
2979 
2980 #if (NXT_VALGRIND)
2981     memset(&cmsg, 0, sizeof(cmsg));
2982 #endif
2983 
2984     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
2985     cmsg.cm.cmsg_level = SOL_SOCKET;
2986     cmsg.cm.cmsg_type = SCM_RIGHTS;
2987 
2988     /*
2989      * memcpy() is used instead of simple
2990      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
2991      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
2992      *   dereferencing type-punned pointer will break strict-aliasing rules
2993      *
2994      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
2995      * in the same simple assignment as in the code above.
2996      */
2997     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
2998 
2999     res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m),
3000                                    &cmsg, sizeof(cmsg));
3001 
3002     return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
3003 }
3004 
3005 
3006 int
3007 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
3008 {
3009     int                   rc;
3010     nxt_unit_impl_t       *lib;
3011     nxt_unit_process_t    *process;
3012     nxt_unit_port_impl_t  *new_port;
3013 
3014     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3015 
3016     nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
3017                    port->id.pid, port->id.id,
3018                    port->in_fd, port->out_fd);
3019 
3020     pthread_mutex_lock(&lib->mutex);
3021 
3022     process = nxt_unit_process_get(ctx, port->id.pid);
3023     if (nxt_slow_path(process == NULL)) {
3024         rc = NXT_UNIT_ERROR;
3025         goto unlock;
3026     }
3027 
3028     if (port->id.id >= process->next_port_id) {
3029         process->next_port_id = port->id.id + 1;
3030     }
3031 
3032     new_port = malloc(sizeof(nxt_unit_port_impl_t));
3033     if (nxt_slow_path(new_port == NULL)) {
3034         rc = NXT_UNIT_ERROR;
3035         goto unlock;
3036     }
3037 
3038     new_port->port = *port;
3039 
3040     rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
3041     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3042         goto unlock;
3043     }
3044 
3045     nxt_queue_insert_tail(&process->ports, &new_port->link);
3046 
3047     rc = NXT_UNIT_OK;
3048 
3049     new_port->process = process;
3050 
3051 unlock:
3052 
3053     pthread_mutex_unlock(&lib->mutex);
3054 
3055     if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) {
3056         nxt_unit_process_use(ctx, process, -1);
3057     }
3058 
3059     return rc;
3060 }
3061 
3062 
3063 void
3064 nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
3065 {
3066     nxt_unit_find_remove_port(ctx, port_id, NULL);
3067 }
3068 
3069 
3070 void
3071 nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
3072     nxt_unit_port_t *r_port)
3073 {
3074     nxt_unit_impl_t     *lib;
3075     nxt_unit_process_t  *process;
3076 
3077     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3078 
3079     pthread_mutex_lock(&lib->mutex);
3080 
3081     process = NULL;
3082 
3083     nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process);
3084 
3085     pthread_mutex_unlock(&lib->mutex);
3086 
3087     if (nxt_slow_path(process != NULL)) {
3088         nxt_unit_process_use(ctx, process, -1);
3089     }
3090 }
3091 
3092 
3093 static void
3094 nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
3095     nxt_unit_port_t *r_port, nxt_unit_process_t **process)
3096 {
3097     nxt_unit_impl_t       *lib;
3098     nxt_unit_port_impl_t  *port;
3099 
3100     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3101 
3102     port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
3103     if (nxt_slow_path(port == NULL)) {
3104         nxt_unit_debug(ctx, "remove_port: port %d,%d not found",
3105                        (int) port_id->pid, (int) port_id->id);
3106 
3107         return;
3108     }
3109 
3110     nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p",
3111                    (int) port_id->pid, (int) port_id->id,
3112                    port->port.in_fd, port->port.out_fd, port->port.data);
3113 
3114     if (port->port.in_fd != -1) {
3115         close(port->port.in_fd);
3116     }
3117 
3118     if (port->port.out_fd != -1) {
3119         close(port->port.out_fd);
3120     }
3121 
3122     if (port->process != NULL) {
3123         nxt_queue_remove(&port->link);
3124     }
3125 
3126     if (process != NULL) {
3127         *process = port->process;
3128     }
3129 
3130     if (r_port != NULL) {
3131         *r_port = port->port;
3132     }
3133 
3134     free(port);
3135 }
3136 
3137 
3138 void
3139 nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid)
3140 {
3141     nxt_unit_impl_t     *lib;
3142     nxt_unit_process_t  *process;
3143 
3144     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3145 
3146     pthread_mutex_lock(&lib->mutex);
3147 
3148     process = nxt_unit_process_find(ctx, pid, 1);
3149     if (nxt_slow_path(process == NULL)) {
3150         nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid);
3151 
3152         pthread_mutex_unlock(&lib->mutex);
3153 
3154         return;
3155     }
3156 
3157     nxt_unit_remove_process(ctx, process);
3158 }
3159 
3160 
3161 static void
3162 nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process)
3163 {
3164     nxt_queue_t           ports;
3165     nxt_unit_impl_t       *lib;
3166     nxt_unit_port_impl_t  *port;
3167 
3168     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3169 
3170     nxt_queue_init(&ports);
3171 
3172     nxt_queue_add(&ports, &process->ports);
3173 
3174     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
3175 
3176         nxt_unit_process_use(ctx, process, -1);
3177         port->process = NULL;
3178 
3179         /* Shortcut for default callback. */
3180         if (lib->callbacks.remove_port == nxt_unit_remove_port) {
3181             nxt_queue_remove(&port->link);
3182 
3183             nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL);
3184         }
3185 
3186     } nxt_queue_loop;
3187 
3188     pthread_mutex_unlock(&lib->mutex);
3189 
3190     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
3191 
3192         nxt_queue_remove(&port->link);
3193 
3194         lib->callbacks.remove_port(ctx, &port->port.id);
3195 
3196     } nxt_queue_loop;
3197 
3198     nxt_unit_process_use(ctx, process, -1);
3199 }
3200 
3201 
3202 void
3203 nxt_unit_quit(nxt_unit_ctx_t *ctx)
3204 {
3205     nxt_unit_impl_t  *lib;
3206 
3207     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3208