xref: /unit/src/nxt_unit.c (revision 1235:4d5998f60f20)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <stdlib.h>
7 
8 #include "nxt_main.h"
9 #include "nxt_port_memory_int.h"
10 
11 #include "nxt_unit.h"
12 #include "nxt_unit_request.h"
13 #include "nxt_unit_response.h"
14 #include "nxt_unit_websocket.h"
15 
16 #include "nxt_websocket.h"
17 
18 #if (NXT_HAVE_MEMFD_CREATE)
19 #include <linux/memfd.h>
20 #endif
21 
22 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
23 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
24 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
25 typedef struct nxt_unit_process_s               nxt_unit_process_t;
26 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
27 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
28 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
29 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
30 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
31 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
32 
33 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
34 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
35     nxt_unit_ctx_impl_t *ctx_impl, void *data);
36 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
37     nxt_unit_mmap_buf_t *mmap_buf);
38 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
39     nxt_unit_mmap_buf_t *mmap_buf);
40 nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf);
41 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
42     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream);
43 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
44     uint32_t stream);
45 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
46     nxt_unit_recv_msg_t *recv_msg);
47 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
48     nxt_unit_recv_msg_t *recv_msg);
49 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
50     nxt_unit_recv_msg_t *recv_msg);
51 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
52     nxt_unit_ctx_t *ctx);
53 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
54 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
55 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
56     nxt_unit_ctx_t *ctx);
57 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
58 static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
59 static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
60     nxt_unit_recv_msg_t *recv_msg);
61 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
62 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
63 static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
64     nxt_unit_mmap_buf_t *mmap_buf, int last);
65 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
66 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
67     size_t size);
68 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
69     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
70     nxt_chunk_id_t *c, int n);
71 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
72 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
73     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
74 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
75     int fd);
76 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
77     nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
78     nxt_unit_mmap_buf_t *mmap_buf);
79 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
80 
81 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
82 static void nxt_unit_process_use(nxt_unit_ctx_t *ctx,
83     nxt_unit_process_t *process, int i);
84 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
85 static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
86     nxt_unit_process_t *process, uint32_t id);
87 static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
88     nxt_unit_recv_msg_t *recv_msg);
89 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
90     nxt_unit_recv_msg_t *recv_msg);
91 static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start,
92     uint32_t size);
93 
94 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
95     pid_t pid);
96 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
97     pid_t pid, int remove);
98 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
99 static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
100     nxt_unit_port_id_t *port_id, int *fd);
101 
102 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
103     nxt_unit_port_id_t *new_port, int fd);
104 
105 static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx,
106     nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port,
107     nxt_unit_process_t **process);
108 static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx,
109     nxt_unit_process_t *process);
110 
111 static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx,
112     nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size,
113     const void *oob, size_t oob_size);
114 static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx,
115     nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
116     void *oob, size_t oob_size);
117 
118 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
119     nxt_unit_port_t *port);
120 static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
121     nxt_unit_port_id_t *port_id, int remove);
122 
123 static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
124     nxt_unit_request_info_impl_t *req_impl);
125 static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
126     nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
127 
128 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
129 
130 
131 struct nxt_unit_mmap_buf_s {
132     nxt_unit_buf_t           buf;
133 
134     nxt_unit_mmap_buf_t      *next;
135     nxt_unit_mmap_buf_t      **prev;
136 
137     nxt_port_mmap_header_t   *hdr;
138 //    nxt_queue_link_t         link;
139     nxt_unit_port_id_t       port_id;
140     nxt_unit_request_info_t  *req;
141     nxt_unit_ctx_impl_t      *ctx_impl;
142 };
143 
144 
145 struct nxt_unit_recv_msg_s {
146     uint32_t                 stream;
147     nxt_pid_t                pid;
148     nxt_port_id_t            reply_port;
149 
150     uint8_t                  last;      /* 1 bit */
151     uint8_t                  mmap;      /* 1 bit */
152 
153     void                     *start;
154     uint32_t                 size;
155 
156     int                      fd;
157     nxt_unit_process_t       *process;
158 
159     nxt_unit_mmap_buf_t      *incoming_buf;
160 };
161 
162 
163 typedef enum {
164     NXT_UNIT_RS_START           = 0,
165     NXT_UNIT_RS_RESPONSE_INIT,
166     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
167     NXT_UNIT_RS_RESPONSE_SENT,
168     NXT_UNIT_RS_RELEASED,
169 } nxt_unit_req_state_t;
170 
171 
172 struct nxt_unit_request_info_impl_s {
173     nxt_unit_request_info_t  req;
174 
175     uint32_t                 stream;
176 
177     nxt_unit_process_t       *process;
178 
179     nxt_unit_mmap_buf_t      *outgoing_buf;
180     nxt_unit_mmap_buf_t      *incoming_buf;
181 
182     nxt_unit_req_state_t     state;
183     uint8_t                  websocket;
184 
185     nxt_queue_link_t         link;
186 
187     char                     extra_data[];
188 };
189 
190 
191 struct nxt_unit_websocket_frame_impl_s {
192     nxt_unit_websocket_frame_t  ws;
193 
194     nxt_unit_mmap_buf_t         *buf;
195 
196     nxt_queue_link_t            link;
197 
198     nxt_unit_ctx_impl_t         *ctx_impl;
199 
200     void                        *retain_buf;
201 };
202 
203 
204 struct nxt_unit_ctx_impl_s {
205     nxt_unit_ctx_t                ctx;
206 
207     pthread_mutex_t               mutex;
208 
209     nxt_unit_port_id_t            read_port_id;
210     int                           read_port_fd;
211 
212     nxt_queue_link_t              link;
213 
214     nxt_unit_mmap_buf_t           *free_buf;
215 
216     /*  of nxt_unit_request_info_impl_t */
217     nxt_queue_t                   free_req;
218 
219     /*  of nxt_unit_websocket_frame_impl_t */
220     nxt_queue_t                   free_ws;
221 
222     /*  of nxt_unit_request_info_impl_t */
223     nxt_queue_t                   active_req;
224 
225     /*  of nxt_unit_request_info_impl_t */
226     nxt_lvlhsh_t                  requests;
227 
228     nxt_unit_mmap_buf_t           ctx_buf[2];
229 
230     nxt_unit_request_info_impl_t  req;
231 };
232 
233 
234 struct nxt_unit_impl_s {
235     nxt_unit_t               unit;
236     nxt_unit_callbacks_t     callbacks;
237 
238     uint32_t                 request_data_size;
239 
240     pthread_mutex_t          mutex;
241 
242     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
243     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
244 
245     nxt_unit_port_id_t       ready_port_id;
246 
247     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
248 
249     pid_t                    pid;
250     int                      log_fd;
251     int                      online;
252 
253     nxt_unit_ctx_impl_t      main_ctx;
254 };
255 
256 
257 struct nxt_unit_port_impl_s {
258     nxt_unit_port_t          port;
259 
260     nxt_queue_link_t         link;
261     nxt_unit_process_t       *process;
262 };
263 
264 
265 struct nxt_unit_mmap_s {
266     nxt_port_mmap_header_t   *hdr;
267 };
268 
269 
270 struct nxt_unit_mmaps_s {
271     pthread_mutex_t          mutex;
272     uint32_t                 size;
273     uint32_t                 cap;
274     nxt_unit_mmap_t          *elts;
275 };
276 
277 
278 struct nxt_unit_process_s {
279     pid_t                    pid;
280 
281     nxt_queue_t              ports;
282 
283     nxt_unit_mmaps_t         incoming;
284     nxt_unit_mmaps_t         outgoing;
285 
286     nxt_unit_impl_t          *lib;
287 
288     nxt_atomic_t             use_count;
289 
290     uint32_t                 next_port_id;
291 };
292 
293 
294 /* Explicitly using 32 bit types to avoid possible alignment. */
295 typedef struct {
296     int32_t   pid;
297     uint32_t  id;
298 } nxt_unit_port_hash_id_t;
299 
300 
301 nxt_unit_ctx_t *
302 nxt_unit_init(nxt_unit_init_t *init)
303 {
304     int              rc;
305     uint32_t         ready_stream;
306     nxt_unit_ctx_t   *ctx;
307     nxt_unit_impl_t  *lib;
308     nxt_unit_port_t  ready_port, read_port;
309 
310     lib = nxt_unit_create(init);
311     if (nxt_slow_path(lib == NULL)) {
312         return NULL;
313     }
314 
315     if (init->ready_port.id.pid != 0
316         && init->ready_stream != 0
317         && init->read_port.id.pid != 0)
318     {
319         ready_port = init->ready_port;
320         ready_stream = init->ready_stream;
321         read_port = init->read_port;
322         lib->log_fd = init->log_fd;
323 
324         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
325                               ready_port.id.id);
326         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
327                               read_port.id.id);
328     } else {
329         rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd,
330                                &ready_stream);
331         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
332             goto fail;
333         }
334     }
335 
336     lib->pid = read_port.id.pid;
337     ctx = &lib->main_ctx.ctx;
338 
339     rc = lib->callbacks.add_port(ctx, &ready_port);
340     if (rc != NXT_UNIT_OK) {
341         nxt_unit_alert(NULL, "failed to add ready_port");
342 
343         goto fail;
344     }
345 
346     rc = lib->callbacks.add_port(ctx, &read_port);
347     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
348         nxt_unit_alert(NULL, "failed to add read_port");
349 
350         goto fail;
351     }
352 
353     lib->main_ctx.read_port_id = read_port.id;
354     lib->ready_port_id = ready_port.id;
355 
356     rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream);
357     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
358         nxt_unit_alert(NULL, "failed to send READY message");
359 
360         goto fail;
361     }
362 
363     return ctx;
364 
365 fail:
366 
367     free(lib);
368 
369     return NULL;
370 }
371 
372 
373 static nxt_unit_impl_t *
374 nxt_unit_create(nxt_unit_init_t *init)
375 {
376     int                   rc;
377     nxt_unit_impl_t       *lib;
378     nxt_unit_callbacks_t  *cb;
379 
380     lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size);
381     if (nxt_slow_path(lib == NULL)) {
382         nxt_unit_alert(NULL, "failed to allocate unit struct");
383 
384         return NULL;
385     }
386 
387     rc = pthread_mutex_init(&lib->mutex, NULL);
388     if (nxt_slow_path(rc != 0)) {
389         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
390 
391         goto fail;
392     }
393 
394     lib->unit.data = init->data;
395     lib->callbacks = init->callbacks;
396 
397     lib->request_data_size = init->request_data_size;
398 
399     lib->processes.slot = NULL;
400     lib->ports.slot = NULL;
401 
402     lib->log_fd = STDERR_FILENO;
403     lib->online = 1;
404 
405     nxt_queue_init(&lib->contexts);
406 
407     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
408     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
409         goto fail;
410     }
411 
412     cb = &lib->callbacks;
413 
414     if (cb->request_handler == NULL) {
415         nxt_unit_alert(NULL, "request_handler is NULL");
416 
417         goto fail;
418     }
419 
420     if (cb->add_port == NULL) {
421         cb->add_port = nxt_unit_add_port;
422     }
423 
424     if (cb->remove_port == NULL) {
425         cb->remove_port = nxt_unit_remove_port;
426     }
427 
428     if (cb->remove_pid == NULL) {
429         cb->remove_pid = nxt_unit_remove_pid;
430     }
431 
432     if (cb->quit == NULL) {
433         cb->quit = nxt_unit_quit;
434     }
435 
436     if (cb->port_send == NULL) {
437         cb->port_send = nxt_unit_port_send_default;
438     }
439 
440     if (cb->port_recv == NULL) {
441         cb->port_recv = nxt_unit_port_recv_default;
442     }
443 
444     return lib;
445 
446 fail:
447 
448     free(lib);
449 
450     return NULL;
451 }
452 
453 
454 static int
455 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
456     void *data)
457 {
458     int  rc;
459 
460     ctx_impl->ctx.data = data;
461     ctx_impl->ctx.unit = &lib->unit;
462 
463     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
464 
465     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
466     if (nxt_slow_path(rc != 0)) {
467         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
468 
469         return NXT_UNIT_ERROR;
470     }
471 
472     nxt_queue_init(&ctx_impl->free_req);
473     nxt_queue_init(&ctx_impl->free_ws);
474     nxt_queue_init(&ctx_impl->active_req);
475 
476     ctx_impl->free_buf = NULL;
477     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
478     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
479 
480     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
481 
482     ctx_impl->req.req.ctx = &ctx_impl->ctx;
483     ctx_impl->req.req.unit = &lib->unit;
484 
485     ctx_impl->read_port_fd = -1;
486     ctx_impl->requests.slot = 0;
487 
488     return NXT_UNIT_OK;
489 }
490 
491 
492 nxt_inline void
493 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
494     nxt_unit_mmap_buf_t *mmap_buf)
495 {
496     mmap_buf->next = *head;
497 
498     if (mmap_buf->next != NULL) {
499         mmap_buf->next->prev = &mmap_buf->next;
500     }
501 
502     *head = mmap_buf;
503     mmap_buf->prev = head;
504 }
505 
506 
507 nxt_inline void
508 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
509     nxt_unit_mmap_buf_t *mmap_buf)
510 {
511     while (*prev != NULL) {
512         prev = &(*prev)->next;
513     }
514 
515     nxt_unit_mmap_buf_insert(prev, mmap_buf);
516 }
517 
518 
519 nxt_inline void
520 nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf)
521 {
522     nxt_unit_mmap_buf_t  **prev;
523 
524     prev = mmap_buf->prev;
525 
526     if (mmap_buf->next != NULL) {
527         mmap_buf->next->prev = prev;
528     }
529 
530     if (prev != NULL) {
531         *prev = mmap_buf->next;
532     }
533 }
534 
535 
536 static int
537 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
538     int *log_fd, uint32_t *stream)
539 {
540     int       rc;
541     int       ready_fd, read_fd;
542     char      *unit_init, *version_end;
543     long      version_length;
544     int64_t   ready_pid, read_pid;
545     uint32_t  ready_stream, ready_id, read_id;
546 
547     unit_init = getenv(NXT_UNIT_INIT_ENV);
548     if (nxt_slow_path(unit_init == NULL)) {
549         nxt_unit_alert(NULL, "%s is not in the current environment",
550                        NXT_UNIT_INIT_ENV);
551 
552         return NXT_UNIT_ERROR;
553     }
554 
555     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
556 
557     version_length = nxt_length(NXT_VERSION);
558 
559     version_end = strchr(unit_init, ';');
560     if (version_end == NULL
561         || version_end - unit_init != version_length
562         || memcmp(unit_init, NXT_VERSION, version_length) != 0)
563     {
564         nxt_unit_alert(NULL, "version check error");
565 
566         return NXT_UNIT_ERROR;
567     }
568 
569     rc = sscanf(version_end + 1,
570                 "%"PRIu32";"
571                 "%"PRId64",%"PRIu32",%d;"
572                 "%"PRId64",%"PRIu32",%d;"
573                 "%d",
574                 &ready_stream,
575                 &ready_pid, &ready_id, &ready_fd,
576                 &read_pid, &read_id, &read_fd,
577                 log_fd);
578 
579     if (nxt_slow_path(rc != 8)) {
580         nxt_unit_alert(NULL, "failed to scan variables");
581 
582         return NXT_UNIT_ERROR;
583     }
584 
585     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
586 
587     ready_port->in_fd = -1;
588     ready_port->out_fd = ready_fd;
589     ready_port->data = NULL;
590 
591     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
592 
593     read_port->in_fd = read_fd;
594     read_port->out_fd = -1;
595     read_port->data = NULL;
596 
597     *stream = ready_stream;
598 
599     return NXT_UNIT_OK;
600 }
601 
602 
603 static int
604 nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
605     uint32_t stream)
606 {
607     ssize_t          res;
608     nxt_port_msg_t   msg;
609     nxt_unit_impl_t  *lib;
610 
611     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
612 
613     msg.stream = stream;
614     msg.pid = lib->pid;
615     msg.reply_port = 0;
616     msg.type = _NXT_PORT_MSG_PROCESS_READY;
617     msg.last = 1;
618     msg.mmap = 0;
619     msg.nf = 0;
620     msg.mf = 0;
621     msg.tracking = 0;
622 
623     res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
624     if (res != sizeof(msg)) {
625         return NXT_UNIT_ERROR;
626     }
627 
628     return NXT_UNIT_OK;
629 }
630 
631 
632 int
633 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
634     void *buf, size_t buf_size, void *oob, size_t oob_size)
635 {
636     int                   rc;
637     pid_t                 pid;
638     struct cmsghdr        *cm;
639     nxt_port_msg_t        *port_msg;
640     nxt_unit_impl_t       *lib;
641     nxt_unit_recv_msg_t   recv_msg;
642     nxt_unit_callbacks_t  *cb;
643 
644     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
645 
646     rc = NXT_UNIT_ERROR;
647     recv_msg.fd = -1;
648     recv_msg.process = NULL;
649     port_msg = buf;
650     cm = oob;
651 
652     if (oob_size >= CMSG_SPACE(sizeof(int))
653         && cm->cmsg_len == CMSG_LEN(sizeof(int))
654         && cm->cmsg_level == SOL_SOCKET
655         && cm->cmsg_type == SCM_RIGHTS)
656     {
657         memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
658     }
659 
660     recv_msg.incoming_buf = NULL;
661 
662     if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
663         nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
664         goto fail;
665     }
666 
667     recv_msg.stream = port_msg->stream;
668     recv_msg.pid = port_msg->pid;
669     recv_msg.reply_port = port_msg->reply_port;
670     recv_msg.last = port_msg->last;
671     recv_msg.mmap = port_msg->mmap;
672 
673     recv_msg.start = port_msg + 1;
674     recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
675 
676     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
677         nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
678                       port_msg->stream, (int) port_msg->type);
679         goto fail;
680     }
681 
682     if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
683         rc = NXT_UNIT_OK;
684 
685         goto fail;
686     }
687 
688     /* Fragmentation is unsupported. */
689     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
690         nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
691                       port_msg->stream, (int) port_msg->type);
692         goto fail;
693     }
694 
695     if (port_msg->mmap) {
696         if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
697             goto fail;
698         }
699     }
700 
701     cb = &lib->callbacks;
702 
703     switch (port_msg->type) {
704 
705     case _NXT_PORT_MSG_QUIT:
706         nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
707 
708         cb->quit(ctx);
709         rc = NXT_UNIT_OK;
710         break;
711 
712     case _NXT_PORT_MSG_NEW_PORT:
713         rc = nxt_unit_process_new_port(ctx, &recv_msg);
714         break;
715 
716     case _NXT_PORT_MSG_CHANGE_FILE:
717         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
718                        port_msg->stream, recv_msg.fd);
719         break;
720 
721     case _NXT_PORT_MSG_MMAP:
722         if (nxt_slow_path(recv_msg.fd < 0)) {
723             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
724                            port_msg->stream, recv_msg.fd);
725 
726             goto fail;
727         }
728 
729         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
730         break;
731 
732     case _NXT_PORT_MSG_REQ_HEADERS:
733         rc = nxt_unit_process_req_headers(ctx, &recv_msg);
734         break;
735 
736     case _NXT_PORT_MSG_WEBSOCKET:
737         rc = nxt_unit_process_websocket(ctx, &recv_msg);
738         break;
739 
740     case _NXT_PORT_MSG_REMOVE_PID:
741         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
742             nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
743                           "(%d != %d)", port_msg->stream, (int) recv_msg.size,
744                           (int) sizeof(pid));
745 
746             goto fail;
747         }
748 
749         memcpy(&pid, recv_msg.start, sizeof(pid));
750 
751         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
752                        port_msg->stream, (int) pid);
753 
754         cb->remove_pid(ctx, pid);
755 
756         rc = NXT_UNIT_OK;
757         break;
758 
759     default:
760         nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
761                        port_msg->stream, (int) port_msg->type);
762 
763         goto fail;
764     }
765 
766 fail:
767 
768     if (recv_msg.fd != -1) {
769         close(recv_msg.fd);
770     }
771 
772     while (recv_msg.incoming_buf != NULL) {
773         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
774     }
775 
776     if (recv_msg.process != NULL) {
777         nxt_unit_process_use(ctx, recv_msg.process, -1);
778     }
779 
780     return rc;
781 }
782 
783 
784 static int
785 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
786 {
787     int                      nb;
788     nxt_unit_impl_t          *lib;
789     nxt_unit_port_t          new_port;
790     nxt_port_msg_new_port_t  *new_port_msg;
791 
792     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
793         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
794                       "invalid message size (%d)",
795                       recv_msg->stream, (int) recv_msg->size);
796 
797         return NXT_UNIT_ERROR;
798     }
799 
800     if (nxt_slow_path(recv_msg->fd < 0)) {
801         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
802                        recv_msg->stream, recv_msg->fd);
803 
804         return NXT_UNIT_ERROR;
805     }
806 
807     new_port_msg = recv_msg->start;
808 
809     nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
810                    recv_msg->stream, (int) new_port_msg->pid,
811                    (int) new_port_msg->id, recv_msg->fd);
812 
813     nb = 0;
814 
815     if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
816         nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
817                        "failed: %s (%d)", recv_msg->fd, strerror(errno), errno);
818 
819         return NXT_UNIT_ERROR;
820     }
821 
822     nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
823                           new_port_msg->id);
824 
825     new_port.in_fd = -1;
826     new_port.out_fd = recv_msg->fd;
827     new_port.data = NULL;
828 
829     recv_msg->fd = -1;
830 
831     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
832 
833     return lib->callbacks.add_port(ctx, &new_port);
834 }
835 
836 
837 static int
838 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
839 {
840     nxt_unit_impl_t               *lib;
841     nxt_unit_request_t            *r;
842     nxt_unit_mmap_buf_t           *b;
843     nxt_unit_request_info_t       *req;
844     nxt_unit_request_info_impl_t  *req_impl;
845 
846     if (nxt_slow_path(recv_msg->mmap == 0)) {
847         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
848                       recv_msg->stream);
849 
850         return NXT_UNIT_ERROR;
851     }
852 
853     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
854         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
855                       "%d expected", recv_msg->stream, (int) recv_msg->size,
856                       (int) sizeof(nxt_unit_request_t));
857 
858         return NXT_UNIT_ERROR;
859     }
860 
861     req_impl = nxt_unit_request_info_get(ctx);
862     if (nxt_slow_path(req_impl == NULL)) {
863         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
864                       recv_msg->stream);
865 
866         return NXT_UNIT_ERROR;
867     }
868 
869     req = &req_impl->req;
870 
871     nxt_unit_port_id_init(&req->response_port, recv_msg->pid,
872                           recv_msg->reply_port);
873 
874     req->request = recv_msg->start;
875 
876     b = recv_msg->incoming_buf;
877 
878     req->request_buf = &b->buf;
879     req->response = NULL;
880     req->response_buf = NULL;
881 
882     r = req->request;
883 
884     req->content_length = r->content_length;
885 
886     req->content_buf = req->request_buf;
887     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
888 
889     /* "Move" process reference to req_impl. */
890     req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
891     if (nxt_slow_path(req_impl->process == NULL)) {
892         return NXT_UNIT_ERROR;
893     }
894 
895     recv_msg->process = NULL;
896 
897     req_impl->stream = recv_msg->stream;
898 
899     req_impl->outgoing_buf = NULL;
900 
901     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
902         b->req = req;
903     }
904 
905     /* "Move" incoming buffer list to req_impl. */
906     req_impl->incoming_buf = recv_msg->incoming_buf;
907     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
908     recv_msg->incoming_buf = NULL;
909 
910     req->response_max_fields = 0;
911     req_impl->state = NXT_UNIT_RS_START;
912     req_impl->websocket = 0;
913 
914     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
915                    (int) r->method_length, nxt_unit_sptr_get(&r->method),
916                    (int) r->target_length, nxt_unit_sptr_get(&r->target),
917                    (int) r->content_length);
918 
919     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
920 
921     lib->callbacks.request_handler(req);
922 
923     return NXT_UNIT_OK;
924 }
925 
926 
927 static int
928 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
929 {
930     size_t                           hsize;
931     nxt_unit_impl_t                  *lib;
932     nxt_unit_mmap_buf_t              *b;
933     nxt_unit_ctx_impl_t              *ctx_impl;
934     nxt_unit_callbacks_t             *cb;
935     nxt_unit_request_info_t          *req;
936     nxt_unit_request_info_impl_t     *req_impl;
937     nxt_unit_websocket_frame_impl_t  *ws_impl;
938 
939     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
940 
941     req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
942                                           recv_msg->last);
943     if (req_impl == NULL) {
944         return NXT_UNIT_OK;
945     }
946 
947     req = &req_impl->req;
948 
949     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
950     cb = &lib->callbacks;
951 
952     if (cb->websocket_handler && recv_msg->size >= 2) {
953         ws_impl = nxt_unit_websocket_frame_get(ctx);
954         if (nxt_slow_path(ws_impl == NULL)) {
955             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
956                           req_impl->stream);
957 
958             return NXT_UNIT_ERROR;
959         }
960 
961         ws_impl->ws.req = req;
962 
963         ws_impl->buf = NULL;
964         ws_impl->retain_buf = NULL;
965 
966         if (recv_msg->mmap) {
967             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
968                 b->req = req;
969             }
970 
971             /* "Move" incoming buffer list to ws_impl. */
972             ws_impl->buf = recv_msg->incoming_buf;
973             ws_impl->buf->prev = &ws_impl->buf;
974             recv_msg->incoming_buf = NULL;
975 
976             b = ws_impl->buf;
977 
978         } else {
979             b = nxt_unit_mmap_buf_get(ctx);
980             if (nxt_slow_path(b == NULL)) {
981                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
982                                req_impl->stream);
983 
984                 nxt_unit_websocket_frame_release(&ws_impl->ws);
985 
986                 return NXT_UNIT_ERROR;
987             }
988 
989             b->hdr = NULL;
990             b->req = req;
991             b->buf.start = recv_msg->start;
992             b->buf.free = b->buf.start;
993             b->buf.end = b->buf.start + recv_msg->size;
994 
995             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
996         }
997 
998         ws_impl->ws.header = (void *) b->buf.start;
999         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1000             ws_impl->ws.header);
1001 
1002         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1003 
1004         if (ws_impl->ws.header->mask) {
1005             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1006 
1007         } else {
1008             ws_impl->ws.mask = NULL;
1009         }
1010 
1011         b->buf.free += hsize;
1012 
1013         ws_impl->ws.content_buf = &b->buf;
1014         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1015 
1016         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1017                            "payload_len=%"PRIu64,
1018                             ws_impl->ws.header->opcode,
1019                             ws_impl->ws.payload_len);
1020 
1021         cb->websocket_handler(&ws_impl->ws);
1022     }
1023 
1024     if (recv_msg->last) {
1025         req_impl->websocket = 0;
1026 
1027         if (cb->close_handler) {
1028             nxt_unit_req_debug(req, "close_handler");
1029 
1030             cb->close_handler(req);
1031 
1032         } else {
1033             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1034         }
1035     }
1036 
1037     return NXT_UNIT_OK;
1038 }
1039 
1040 
1041 static nxt_unit_request_info_impl_t *
1042 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1043 {
1044     nxt_unit_impl_t               *lib;
1045     nxt_queue_link_t              *lnk;
1046     nxt_unit_ctx_impl_t           *ctx_impl;
1047     nxt_unit_request_info_impl_t  *req_impl;
1048 
1049     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1050 
1051     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1052 
1053     pthread_mutex_lock(&ctx_impl->mutex);
1054 
1055     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1056         pthread_mutex_unlock(&ctx_impl->mutex);
1057 
1058         req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
1059                           + lib->request_data_size);
1060         if (nxt_slow_path(req_impl == NULL)) {
1061             return NULL;
1062         }
1063 
1064         req_impl->req.unit = ctx->unit;
1065         req_impl->req.ctx = ctx;
1066 
1067         pthread_mutex_lock(&ctx_impl->mutex);
1068 
1069     } else {
1070         lnk = nxt_queue_first(&ctx_impl->free_req);
1071         nxt_queue_remove(lnk);
1072 
1073         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1074     }
1075 
1076     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1077 
1078     pthread_mutex_unlock(&ctx_impl->mutex);
1079 
1080     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1081 
1082     return req_impl;
1083 }
1084 
1085 
1086 static void
1087 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1088 {
1089     nxt_unit_ctx_impl_t           *ctx_impl;
1090     nxt_unit_request_info_impl_t  *req_impl;
1091 
1092     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1093     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1094 
1095     req->response = NULL;
1096     req->response_buf = NULL;
1097 
1098     if (req_impl->websocket) {
1099         nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
1100 
1101         req_impl->websocket = 0;
1102     }
1103 
1104     while (req_impl->outgoing_buf != NULL) {
1105         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1106     }
1107 
1108     while (req_impl->incoming_buf != NULL) {
1109         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1110     }
1111 
1112     /*
1113      * Process release should go after buffers release to guarantee mmap
1114      * existence.
1115      */
1116     if (req_impl->process != NULL) {
1117         nxt_unit_process_use(req->ctx, req_impl->process, -1);
1118 
1119         req_impl->process = NULL;
1120     }
1121 
1122     pthread_mutex_lock(&ctx_impl->mutex);
1123 
1124     nxt_queue_remove(&req_impl->link);
1125 
1126     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1127 
1128     pthread_mutex_unlock(&ctx_impl->mutex);
1129 
1130     req_impl->state = NXT_UNIT_RS_RELEASED;
1131 }
1132 
1133 
1134 static void
1135 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1136 {
1137     nxt_unit_ctx_impl_t  *ctx_impl;
1138 
1139     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1140 
1141     nxt_queue_remove(&req_impl->link);
1142 
1143     if (req_impl != &ctx_impl->req) {
1144         free(req_impl);
1145     }
1146 }
1147 
1148 
1149 static nxt_unit_websocket_frame_impl_t *
1150 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1151 {
1152     nxt_queue_link_t                 *lnk;
1153     nxt_unit_ctx_impl_t              *ctx_impl;
1154     nxt_unit_websocket_frame_impl_t  *ws_impl;
1155 
1156     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1157 
1158     pthread_mutex_lock(&ctx_impl->mutex);
1159 
1160     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1161         pthread_mutex_unlock(&ctx_impl->mutex);
1162 
1163         ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
1164         if (nxt_slow_path(ws_impl == NULL)) {
1165             return NULL;
1166         }
1167 
1168     } else {
1169         lnk = nxt_queue_first(&ctx_impl->free_ws);
1170         nxt_queue_remove(lnk);
1171 
1172         pthread_mutex_unlock(&ctx_impl->mutex);
1173 
1174         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1175     }
1176 
1177     ws_impl->ctx_impl = ctx_impl;
1178 
1179     return ws_impl;
1180 }
1181 
1182 
1183 static void
1184 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1185 {
1186     nxt_unit_websocket_frame_impl_t  *ws_impl;
1187 
1188     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1189 
1190     while (ws_impl->buf != NULL) {
1191         nxt_unit_mmap_buf_free(ws_impl->buf);
1192     }
1193 
1194     ws->req = NULL;
1195 
1196     if (ws_impl->retain_buf != NULL) {
1197         free(ws_impl->retain_buf);
1198 
1199         ws_impl->retain_buf = NULL;
1200     }
1201 
1202     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1203 
1204     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1205 
1206     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1207 }
1208 
1209 
1210 static void
1211 nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
1212 {
1213     nxt_queue_remove(&ws_impl->link);
1214 
1215     free(ws_impl);
1216 }
1217 
1218 
1219 uint16_t
1220 nxt_unit_field_hash(const char *name, size_t name_length)
1221 {
1222     u_char      ch;
1223     uint32_t    hash;
1224     const char  *p, *end;
1225 
1226     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1227     end = name + name_length;
1228 
1229     for (p = name; p < end; p++) {
1230         ch = *p;
1231         hash = (hash << 4) + hash + nxt_lowcase(ch);
1232     }
1233 
1234     hash = (hash >> 16) ^ hash;
1235 
1236     return hash;
1237 }
1238 
1239 
1240 void
1241 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1242 {
1243     uint32_t            i, j;
1244     nxt_unit_field_t    *fields, f;
1245     nxt_unit_request_t  *r;
1246 
1247     nxt_unit_req_debug(req, "group_dup_fields");
1248 
1249     r = req->request;
1250     fields = r->fields;
1251 
1252     for (i = 0; i < r->fields_count; i++) {
1253 
1254         switch (fields[i].hash) {
1255         case NXT_UNIT_HASH_CONTENT_LENGTH:
1256             r->content_length_field = i;
1257             break;
1258 
1259         case NXT_UNIT_HASH_CONTENT_TYPE:
1260             r->content_type_field = i;
1261             break;
1262 
1263         case NXT_UNIT_HASH_COOKIE:
1264             r->cookie_field = i;
1265             break;
1266         };
1267 
1268         for (j = i + 1; j < r->fields_count; j++) {
1269             if (fields[i].hash != fields[j].hash) {
1270                 continue;
1271             }
1272 
1273             if (j == i + 1) {
1274                 continue;
1275             }
1276 
1277             f = fields[j];
1278             f.name.offset += (j - (i + 1)) * sizeof(f);
1279             f.value.offset += (j - (i + 1)) * sizeof(f);
1280 
1281             while (j > i + 1) {
1282                 fields[j] = fields[j - 1];
1283                 fields[j].name.offset -= sizeof(f);
1284                 fields[j].value.offset -= sizeof(f);
1285                 j--;
1286             }
1287 
1288             fields[j] = f;
1289 
1290             i++;
1291         }
1292     }
1293 }
1294 
1295 
1296 int
1297 nxt_unit_response_init(nxt_unit_request_info_t *req,
1298     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
1299 {
1300     uint32_t                      buf_size;
1301     nxt_unit_buf_t                *buf;
1302     nxt_unit_request_info_impl_t  *req_impl;
1303 
1304     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1305 
1306     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1307         nxt_unit_req_warn(req, "init: response already sent");
1308 
1309         return NXT_UNIT_ERROR;
1310     }
1311 
1312     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
1313                        (int) max_fields_count, (int) max_fields_size);
1314 
1315     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
1316         nxt_unit_req_debug(req, "duplicate response init");
1317     }
1318 
1319     buf_size = sizeof(nxt_unit_response_t)
1320                + max_fields_count * sizeof(nxt_unit_field_t)
1321                + max_fields_size;
1322 
1323     if (nxt_slow_path(req->response_buf != NULL)) {
1324         buf = req->response_buf;
1325 
1326         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
1327             goto init_response;
1328         }
1329 
1330         nxt_unit_buf_free(buf);
1331 
1332         req->response_buf = NULL;
1333         req->response = NULL;
1334         req->response_max_fields = 0;
1335 
1336         req_impl->state = NXT_UNIT_RS_START;
1337     }
1338 
1339     buf = nxt_unit_response_buf_alloc(req, buf_size);
1340     if (nxt_slow_path(buf == NULL)) {
1341         return NXT_UNIT_ERROR;
1342     }
1343 
1344 init_response:
1345 
1346     memset(buf->start, 0, sizeof(nxt_unit_response_t));
1347 
1348     req->response_buf = buf;
1349 
1350     req->response = (nxt_unit_response_t *) buf->start;
1351     req->response->status = status;
1352 
1353     buf->free = buf->start + sizeof(nxt_unit_response_t)
1354                 + max_fields_count * sizeof(nxt_unit_field_t);
1355 
1356     req->response_max_fields = max_fields_count;
1357     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
1358 
1359     return NXT_UNIT_OK;
1360 }
1361 
1362 
1363 int
1364 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
1365     uint32_t max_fields_count, uint32_t max_fields_size)
1366 {
1367     char                          *p;
1368     uint32_t                      i, buf_size;
1369     nxt_unit_buf_t                *buf;
1370     nxt_unit_field_t              *f, *src;
1371     nxt_unit_response_t           *resp;
1372     nxt_unit_request_info_impl_t  *req_impl;
1373 
1374     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1375 
1376     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1377         nxt_unit_req_warn(req, "realloc: response not init");
1378 
1379         return NXT_UNIT_ERROR;
1380     }
1381 
1382     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1383         nxt_unit_req_warn(req, "realloc: response already sent");
1384 
1385         return NXT_UNIT_ERROR;
1386     }
1387 
1388     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
1389         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
1390 
1391         return NXT_UNIT_ERROR;
1392     }
1393 
1394     buf_size = sizeof(nxt_unit_response_t)
1395                + max_fields_count * sizeof(nxt_unit_field_t)
1396                + max_fields_size;
1397 
1398     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
1399 
1400     buf = nxt_unit_response_buf_alloc(req, buf_size);
1401     if (nxt_slow_path(buf == NULL)) {
1402         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
1403         return NXT_UNIT_ERROR;
1404     }
1405 
1406     resp = (nxt_unit_response_t *) buf->start;
1407 
1408     memset(resp, 0, sizeof(nxt_unit_response_t));
1409 
1410     resp->status = req->response->status;
1411     resp->content_length = req->response->content_length;
1412 
1413     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
1414     f = resp->fields;
1415 
1416     for (i = 0; i < req->response->fields_count; i++) {
1417         src = req->response->fields + i;
1418 
1419         if (nxt_slow_path(src->skip != 0)) {
1420             continue;
1421         }
1422 
1423         if (nxt_slow_path(src->name_length + src->value_length + 2
1424                           > (uint32_t) (buf->end - p)))
1425         {
1426             nxt_unit_req_warn(req, "realloc: not enough space for field"
1427                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
1428                   i, src, src->name_length, src->value_length);
1429 
1430             goto fail;
1431         }
1432 
1433         nxt_unit_sptr_set(&f->name, p);
1434         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
1435         *p++ = '\0';
1436 
1437         nxt_unit_sptr_set(&f->value, p);
1438         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
1439         *p++ = '\0';
1440 
1441         f->hash = src->hash;
1442         f->skip = 0;
1443         f->name_length = src->name_length;
1444         f->value_length = src->value_length;
1445 
1446         resp->fields_count++;
1447         f++;
1448     }
1449 
1450     if (req->response->piggyback_content_length > 0) {
1451         if (nxt_slow_path(req->response->piggyback_content_length
1452                           > (uint32_t) (buf->end - p)))
1453         {
1454             nxt_unit_req_warn(req, "realloc: not enought space for content"
1455                   " #%"PRIu32", %"PRIu32" required",
1456                   i, req->response->piggyback_content_length);
1457 
1458             goto fail;
1459         }
1460 
1461         resp->piggyback_content_length =
1462                                        req->response->piggyback_content_length;
1463 
1464         nxt_unit_sptr_set(&resp->piggyback_content, p);
1465         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
1466                        req->response->piggyback_content_length);
1467     }
1468 
1469     buf->free = p;
1470 
1471     nxt_unit_buf_free(req->response_buf);
1472 
1473     req->response = resp;
1474     req->response_buf = buf;
1475     req->response_max_fields = max_fields_count;
1476 
1477     return NXT_UNIT_OK;
1478 
1479 fail:
1480 
1481     nxt_unit_buf_free(buf);
1482 
1483     return NXT_UNIT_ERROR;
1484 }
1485 
1486 
1487 int
1488 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
1489 {
1490     nxt_unit_request_info_impl_t  *req_impl;
1491 
1492     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1493 
1494     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
1495 }
1496 
1497 
1498 int
1499 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
1500     const char *name, uint8_t name_length,
1501     const char *value, uint32_t value_length)
1502 {
1503     nxt_unit_buf_t                *buf;
1504     nxt_unit_field_t              *f;
1505     nxt_unit_response_t           *resp;
1506     nxt_unit_request_info_impl_t  *req_impl;
1507 
1508     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1509 
1510     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
1511         nxt_unit_req_warn(req, "add_field: response not initialized or "
1512                           "already sent");
1513 
1514         return NXT_UNIT_ERROR;
1515     }
1516 
1517     resp = req->response;
1518 
1519     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
1520         nxt_unit_req_warn(req, "add_field: too many response fields");
1521 
1522         return NXT_UNIT_ERROR;
1523     }
1524 
1525     buf = req->response_buf;
1526 
1527     if (nxt_slow_path(name_length + value_length + 2
1528                       > (uint32_t) (buf->end - buf->free)))
1529     {
1530         nxt_unit_req_warn(req, "add_field: response buffer overflow");
1531 
1532         return NXT_UNIT_ERROR;
1533     }
1534 
1535     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
1536                        resp->fields_count,
1537                        (int) name_length, name,
1538                        (int) value_length, value);
1539 
1540     f = resp->fields + resp->fields_count;
1541 
1542     nxt_unit_sptr_set(&f->name, buf->free);
1543     buf->free = nxt_cpymem(buf->free, name, name_length);
1544     *buf->free++ = '\0';
1545 
1546     nxt_unit_sptr_set(&f->value, buf->free);
1547     buf->free = nxt_cpymem(buf->free, value, value_length);
1548     *buf->free++ = '\0';
1549 
1550     f->hash = nxt_unit_field_hash(name, name_length);
1551     f->skip = 0;
1552     f->name_length = name_length;
1553     f->value_length = value_length;
1554 
1555     resp->fields_count++;
1556 
1557     return NXT_UNIT_OK;
1558 }
1559 
1560 
1561 int
1562 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
1563     const void* src, uint32_t size)
1564 {
1565     nxt_unit_buf_t                *buf;
1566     nxt_unit_response_t           *resp;
1567     nxt_unit_request_info_impl_t  *req_impl;
1568 
1569     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1570 
1571     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1572         nxt_unit_req_warn(req, "add_content: response not initialized yet");
1573 
1574         return NXT_UNIT_ERROR;
1575     }
1576 
1577     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1578         nxt_unit_req_warn(req, "add_content: response already sent");
1579 
1580         return NXT_UNIT_ERROR;
1581     }
1582 
1583     buf = req->response_buf;
1584 
1585     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
1586         nxt_unit_req_warn(req, "add_content: buffer overflow");
1587 
1588         return NXT_UNIT_ERROR;
1589     }
1590 
1591     resp = req->response;
1592 
1593     if (resp->piggyback_content_length == 0) {
1594         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
1595         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
1596     }
1597 
1598     resp->piggyback_content_length += size;
1599 
1600     buf->free = nxt_cpymem(buf->free, src, size);
1601 
1602     return NXT_UNIT_OK;
1603 }
1604 
1605 
1606 int
1607 nxt_unit_response_send(nxt_unit_request_info_t *req)
1608 {
1609     int                           rc;
1610     nxt_unit_mmap_buf_t           *mmap_buf;
1611     nxt_unit_request_info_impl_t  *req_impl;
1612 
1613     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1614 
1615     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1616         nxt_unit_req_warn(req, "send: response is not initialized yet");
1617 
1618         return NXT_UNIT_ERROR;
1619     }
1620 
1621     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1622         nxt_unit_req_warn(req, "send: response already sent");
1623 
1624         return NXT_UNIT_ERROR;
1625     }
1626 
1627     if (req->request->websocket_handshake && req->response->status == 101) {
1628         nxt_unit_response_upgrade(req);
1629     }
1630 
1631     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
1632                        req->response->fields_count,
1633                        (int) (req->response_buf->free
1634                               - req->response_buf->start));
1635 
1636     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
1637 
1638     rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
1639     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
1640         req->response = NULL;
1641         req->response_buf = NULL;
1642         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
1643 
1644         nxt_unit_mmap_buf_release(mmap_buf);
1645     }
1646 
1647     return rc;
1648 }
1649 
1650 
1651 int
1652 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
1653 {
1654     nxt_unit_request_info_impl_t  *req_impl;
1655 
1656     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1657 
1658     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
1659 }
1660 
1661 
1662 nxt_unit_buf_t *
1663 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
1664 {
1665     int                           rc;
1666     nxt_unit_mmap_buf_t           *mmap_buf;
1667     nxt_unit_request_info_impl_t  *req_impl;
1668 
1669     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
1670         nxt_unit_req_warn(req, "response_buf_alloc: "
1671                           "requested buffer (%"PRIu32") too big", size);
1672 
1673         return NULL;
1674     }
1675 
1676     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
1677 
1678     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1679 
1680     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
1681     if (nxt_slow_path(mmap_buf == NULL)) {
1682         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
1683 
1684         return NULL;
1685     }
1686 
1687     mmap_buf->req = req;
1688 
1689     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
1690 
1691     rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
1692                                    &req->response_port, size, mmap_buf);
1693     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1694         nxt_unit_mmap_buf_release(mmap_buf);
1695 
1696         return NULL;
1697     }
1698 
1699     return &mmap_buf->buf;
1700 }
1701 
1702 
1703 static nxt_unit_process_t *
1704 nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1705 {
1706     nxt_unit_impl_t  *lib;
1707 
1708     if (recv_msg->process != NULL) {
1709         return recv_msg->process;
1710     }
1711 
1712     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1713 
1714     pthread_mutex_lock(&lib->mutex);
1715 
1716     recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0);
1717 
1718     pthread_mutex_unlock(&lib->mutex);
1719 
1720     if (recv_msg->process == NULL) {
1721         nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
1722                       recv_msg->stream, (int) recv_msg->pid);
1723     }
1724 
1725     return recv_msg->process;
1726 }
1727 
1728 
1729 static nxt_unit_mmap_buf_t *
1730 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
1731 {
1732     nxt_unit_mmap_buf_t  *mmap_buf;
1733     nxt_unit_ctx_impl_t  *ctx_impl;
1734 
1735     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1736 
1737     pthread_mutex_lock(&ctx_impl->mutex);
1738 
1739     if (ctx_impl->free_buf == NULL) {
1740         pthread_mutex_unlock(&ctx_impl->mutex);
1741 
1742         mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
1743         if (nxt_slow_path(mmap_buf == NULL)) {
1744             return NULL;
1745         }
1746 
1747     } else {
1748         mmap_buf = ctx_impl->free_buf;
1749 
1750         nxt_unit_mmap_buf_remove(mmap_buf);
1751 
1752         pthread_mutex_unlock(&ctx_impl->mutex);
1753     }
1754 
1755     mmap_buf->ctx_impl = ctx_impl;
1756 
1757     return mmap_buf;
1758 }
1759 
1760 
1761 static void
1762 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
1763 {
1764     nxt_unit_mmap_buf_remove(mmap_buf);
1765 
1766     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
1767 
1768     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
1769 
1770     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
1771 }
1772 
1773 
1774 typedef struct {
1775     size_t      len;
1776     const char  *str;
1777 } nxt_unit_str_t;
1778 
1779 
1780 #define nxt_unit_str(str)  { nxt_length(str), str }
1781 
1782 
1783 int
1784 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
1785 {
1786     return req->request->websocket_handshake;
1787 }
1788 
1789 
1790 int
1791 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
1792 {
1793     int                           rc;
1794     nxt_unit_ctx_impl_t           *ctx_impl;
1795     nxt_unit_request_info_impl_t  *req_impl;
1796 
1797     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1798 
1799     if (nxt_slow_path(req_impl->websocket != 0)) {
1800         nxt_unit_req_debug(req, "upgrade: already upgraded");
1801 
1802         return NXT_UNIT_OK;
1803     }
1804 
1805     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1806         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
1807 
1808         return NXT_UNIT_ERROR;
1809     }
1810 
1811     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1812         nxt_unit_req_warn(req, "upgrade: response already sent");
1813 
1814         return NXT_UNIT_ERROR;
1815     }
1816 
1817     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1818 
1819     rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
1820     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1821         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
1822 
1823         return NXT_UNIT_ERROR;
1824     }
1825 
1826     req_impl->websocket = 1;
1827 
1828     req->response->status = 101;
1829 
1830     return NXT_UNIT_OK;
1831 }
1832 
1833 
1834 int
1835 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
1836 {
1837     nxt_unit_request_info_impl_t  *req_impl;
1838 
1839     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1840 
1841     return req_impl->websocket;
1842 }
1843 
1844 
1845 nxt_unit_request_info_t *
1846 nxt_unit_get_request_info_from_data(void *data)
1847 {
1848     nxt_unit_request_info_impl_t  *req_impl;
1849 
1850     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
1851 
1852     return &req_impl->req;
1853 }
1854 
1855 
1856 int
1857 nxt_unit_buf_send(nxt_unit_buf_t *buf)
1858 {
1859     int                           rc;
1860     nxt_unit_mmap_buf_t           *mmap_buf;
1861     nxt_unit_request_info_t       *req;
1862     nxt_unit_request_info_impl_t  *req_impl;
1863 
1864     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1865 
1866     req = mmap_buf->req;
1867     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1868 
1869     nxt_unit_req_debug(req, "buf_send: %d bytes",
1870                        (int) (buf->free - buf->start));
1871 
1872     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1873         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
1874 
1875         return NXT_UNIT_ERROR;
1876     }
1877 
1878     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
1879         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
1880 
1881         return NXT_UNIT_ERROR;
1882     }
1883 
1884     if (nxt_fast_path(buf->free > buf->start)) {
1885         rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
1886         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1887             return rc;
1888         }
1889     }
1890 
1891     nxt_unit_mmap_buf_release(mmap_buf);
1892 
1893     return NXT_UNIT_OK;
1894 }
1895 
1896 
1897 static void
1898 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
1899 {
1900     int                           rc;
1901     nxt_unit_mmap_buf_t           *mmap_buf;
1902     nxt_unit_request_info_t       *req;
1903     nxt_unit_request_info_impl_t  *req_impl;
1904 
1905     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1906 
1907     req = mmap_buf->req;
1908     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1909 
1910     rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
1911     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
1912         nxt_unit_mmap_buf_release(mmap_buf);
1913 
1914         nxt_unit_request_info_release(req);
1915 
1916     } else {
1917         nxt_unit_request_done(req, rc);
1918     }
1919 }
1920 
1921 
1922 static int
1923 nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
1924     nxt_unit_mmap_buf_t *mmap_buf, int last)
1925 {
1926     struct {
1927         nxt_port_msg_t       msg;
1928         nxt_port_mmap_msg_t  mmap_msg;
1929     } m;
1930 
1931     u_char                   *end, *last_used, *first_free;
1932     ssize_t                  res;
1933     nxt_chunk_id_t           first_free_chunk;
1934     nxt_unit_buf_t           *buf;
1935     nxt_unit_impl_t          *lib;
1936     nxt_port_mmap_header_t   *hdr;
1937 
1938     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1939 
1940     buf = &mmap_buf->buf;
1941     hdr = mmap_buf->hdr;
1942 
1943     m.mmap_msg.size = buf->free - buf->start;
1944 
1945     m.msg.stream = stream;
1946     m.msg.pid = lib->pid;
1947     m.msg.reply_port = 0;
1948     m.msg.type = _NXT_PORT_MSG_DATA;
1949     m.msg.last = last != 0;
1950     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
1951     m.msg.nf = 0;
1952     m.msg.mf = 0;
1953     m.msg.tracking = 0;
1954 
1955     if (hdr != NULL) {
1956         m.mmap_msg.mmap_id = hdr->id;
1957         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
1958                                                      (u_char *) buf->start);
1959     }
1960 
1961     nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
1962                    stream,
1963                    (int) m.mmap_msg.mmap_id,
1964                    (int) m.mmap_msg.chunk_id,
1965                    (int) m.mmap_msg.size);
1966 
1967     res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m,
1968                                    m.msg.mmap ? sizeof(m) : sizeof(m.msg),
1969                                    NULL, 0);
1970     if (nxt_slow_path(res != sizeof(m))) {
1971         return NXT_UNIT_ERROR;
1972     }
1973 
1974     if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) {
1975         last_used = (u_char *) buf->free - 1;
1976 
1977         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
1978         first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
1979         end = (u_char *) buf->end;
1980 
1981         nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free));
1982 
1983         buf->end = (char *) first_free;
1984     }
1985 
1986     return NXT_UNIT_OK;
1987 }
1988 
1989 
1990 void
1991 nxt_unit_buf_free(nxt_unit_buf_t *buf)
1992 {
1993     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
1994 }
1995 
1996 
1997 static void
1998 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
1999 {
2000     if (nxt_fast_path(mmap_buf->hdr != NULL)) {
2001         nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start,
2002                               mmap_buf->buf.end - mmap_buf->buf.start);
2003     }
2004 
2005     nxt_unit_mmap_buf_release(mmap_buf);
2006 }
2007 
2008 
2009 nxt_unit_buf_t *
2010 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2011 {
2012     nxt_unit_mmap_buf_t  *mmap_buf;
2013 
2014     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2015 
2016     if (mmap_buf->next == NULL) {
2017         return NULL;
2018     }
2019 
2020     return &mmap_buf->next->buf;
2021 }
2022 
2023 
2024 uint32_t
2025 nxt_unit_buf_max(void)
2026 {
2027     return PORT_MMAP_DATA_SIZE;
2028 }
2029 
2030 
2031 uint32_t
2032 nxt_unit_buf_min(void)
2033 {
2034     return PORT_MMAP_CHUNK_SIZE;
2035 }
2036 
2037 
2038 int
2039 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2040     size_t size)
2041 {
2042     int                           rc;
2043     uint32_t                      part_size;
2044     const char                    *part_start;
2045     nxt_unit_mmap_buf_t           mmap_buf;
2046     nxt_unit_request_info_impl_t  *req_impl;
2047 
2048     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2049 
2050     part_start = start;
2051 
2052     /* Check if response is not send yet. */
2053     if (nxt_slow_path(req->response_buf)) {
2054         part_size = req->response_buf->end - req->response_buf->free;
2055         part_size = nxt_min(size, part_size);
2056 
2057         rc = nxt_unit_response_add_content(req, part_start, part_size);
2058         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2059             return rc;
2060         }
2061 
2062         rc = nxt_unit_response_send(req);
2063         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2064             return rc;
2065         }
2066 
2067         size -= part_size;
2068         part_start += part_size;
2069     }
2070 
2071     while (size > 0) {
2072         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2073 
2074         rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
2075                                        &req->response_port, part_size,
2076                                        &mmap_buf);
2077         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2078             return rc;
2079         }
2080 
2081         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2082                                        part_start, part_size);
2083 
2084         rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
2085         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2086             nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start,
2087                                   mmap_buf.buf.end - mmap_buf.buf.start);
2088 
2089             return rc;
2090         }
2091 
2092         size -= part_size;
2093         part_start += part_size;
2094     }
2095 
2096     return NXT_UNIT_OK;
2097 }
2098 
2099 
2100 int
2101 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2102     nxt_unit_read_info_t *read_info)
2103 {
2104     int             rc;
2105     ssize_t         n;
2106     nxt_unit_buf_t  *buf;
2107 
2108     /* Check if response is not send yet. */
2109     if (nxt_slow_path(req->response_buf)) {
2110 
2111         /* Enable content in headers buf. */
2112         rc = nxt_unit_response_add_content(req, "", 0);
2113         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2114             nxt_unit_req_error(req, "Failed to add piggyback content");
2115 
2116             return rc;
2117         }
2118 
2119         buf = req->response_buf;
2120 
2121         while (buf->end - buf->free > 0) {
2122             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2123             if (nxt_slow_path(n < 0)) {
2124                 nxt_unit_req_error(req, "Read error");
2125 
2126                 return NXT_UNIT_ERROR;
2127             }
2128 
2129             /* Manually increase sizes. */
2130             buf->free += n;
2131             req->response->piggyback_content_length += n;
2132 
2133             if (read_info->eof) {
2134                 break;
2135             }
2136         }
2137 
2138         rc = nxt_unit_response_send(req);
2139         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2140             nxt_unit_req_error(req, "Failed to send headers with content");
2141 
2142             return rc;
2143         }
2144 
2145         if (read_info->eof) {
2146             return NXT_UNIT_OK;
2147         }
2148     }
2149 
2150     while (!read_info->eof) {
2151         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
2152                            read_info->buf_size);
2153 
2154         buf = nxt_unit_response_buf_alloc(req, nxt_min(read_info->buf_size,
2155                                                        PORT_MMAP_DATA_SIZE));
2156         if (nxt_slow_path(buf == NULL)) {
2157             nxt_unit_req_error(req, "Failed to allocate buf for content");
2158 
2159             return NXT_UNIT_ERROR;
2160         }
2161 
2162         while (!read_info->eof && buf->end > buf->free) {
2163             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2164             if (nxt_slow_path(n < 0)) {
2165                 nxt_unit_req_error(req, "Read error");
2166 
2167                 nxt_unit_buf_free(buf);
2168 
2169                 return NXT_UNIT_ERROR;
2170             }
2171 
2172             buf->free += n;
2173         }
2174 
2175         rc = nxt_unit_buf_send(buf);
2176         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2177             nxt_unit_req_error(req, "Failed to send content");
2178 
2179             return rc;
2180         }
2181     }
2182 
2183     return NXT_UNIT_OK;
2184 }
2185 
2186 
2187 ssize_t
2188 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
2189 {
2190     return nxt_unit_buf_read(&req->content_buf, &req->content_length,
2191                              dst, size);
2192 }
2193 
2194 
2195 static ssize_t
2196 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
2197 {
2198     u_char          *p;
2199     size_t          rest, copy, read;
2200     nxt_unit_buf_t  *buf;
2201 
2202     p = dst;
2203     rest = size;
2204 
2205     buf = *b;
2206 
2207     while (buf != NULL) {
2208         copy = buf->end - buf->free;
2209         copy = nxt_min(rest, copy);
2210 
2211         p = nxt_cpymem(p, buf->free, copy);
2212 
2213         buf->free += copy;
2214         rest -= copy;
2215 
2216         if (rest == 0) {
2217             if (buf->end == buf->free) {
2218                 buf = nxt_unit_buf_next(buf);
2219             }
2220 
2221             break;
2222         }
2223 
2224         buf = nxt_unit_buf_next(buf);
2225     }
2226 
2227     *b = buf;
2228 
2229     read = size - rest;
2230 
2231     *len -= read;
2232 
2233     return read;
2234 }
2235 
2236 
2237 void
2238 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
2239 {
2240     ssize_t                       res;
2241     uint32_t                      size;
2242     nxt_port_msg_t                msg;
2243     nxt_unit_impl_t               *lib;
2244     nxt_unit_request_info_impl_t  *req_impl;
2245 
2246     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2247 
2248     nxt_unit_req_debug(req, "done: %d", rc);
2249 
2250     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2251         goto skip_response_send;
2252     }
2253 
2254     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2255 
2256         size = nxt_length("Content-Type") + nxt_length("text/plain");
2257 
2258         rc = nxt_unit_response_init(req, 200, 1, size);
2259         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2260             goto skip_response_send;
2261         }
2262 
2263         rc = nxt_unit_response_add_field(req, "Content-Type",
2264                                    nxt_length("Content-Type"),
2265                                    "text/plain", nxt_length("text/plain"));
2266         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2267             goto skip_response_send;
2268         }
2269     }
2270 
2271     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2272 
2273         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2274 
2275         nxt_unit_buf_send_done(req->response_buf);
2276 
2277         return;
2278     }
2279 
2280 skip_response_send:
2281 
2282     lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
2283 
2284     msg.stream = req_impl->stream;
2285     msg.pid = lib->pid;
2286     msg.reply_port = 0;
2287     msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
2288                                    : _NXT_PORT_MSG_RPC_ERROR;
2289     msg.last = 1;
2290     msg.mmap = 0;
2291     msg.nf = 0;
2292     msg.mf = 0;
2293     msg.tracking = 0;
2294 
2295     res = lib->callbacks.port_send(req->ctx, &req->response_port,
2296                                    &msg, sizeof(msg), NULL, 0);
2297     if (nxt_slow_path(res != sizeof(msg))) {
2298         nxt_unit_req_alert(req, "last message send failed: %s (%d)",
2299                            strerror(errno), errno);
2300     }
2301 
2302     nxt_unit_request_info_release(req);
2303 }
2304 
2305 
2306 int
2307 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
2308     uint8_t last, const void *start, size_t size)
2309 {
2310     const struct iovec  iov = { (void *) start, size };
2311 
2312     return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
2313 }
2314 
2315 
2316 int
2317 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
2318     uint8_t last, const struct iovec *iov, int iovcnt)
2319 {
2320     int                     i, rc;
2321     size_t                  l, copy;
2322     uint32_t                payload_len, buf_size;
2323     const uint8_t           *b;
2324     nxt_unit_buf_t          *buf;
2325     nxt_websocket_header_t  *wh;
2326 
2327     payload_len = 0;
2328 
2329     for (i = 0; i < iovcnt; i++) {
2330         payload_len += iov[i].iov_len;
2331     }
2332 
2333     buf_size = 10 + payload_len;
2334 
2335     buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
2336                                                    PORT_MMAP_DATA_SIZE));
2337     if (nxt_slow_path(buf == NULL)) {
2338         nxt_unit_req_error(req, "Failed to allocate buf for content");
2339 
2340         return NXT_UNIT_ERROR;
2341     }
2342 
2343     buf->start[0] = 0;
2344     buf->start[1] = 0;
2345 
2346     wh = (void *) buf->free;
2347 
2348     buf->free = nxt_websocket_frame_init(wh, payload_len);
2349     wh->fin = last;
2350     wh->opcode = opcode;
2351 
2352     for (i = 0; i < iovcnt; i++) {
2353         b = iov[i].iov_base;
2354         l = iov[i].iov_len;
2355 
2356         while (l > 0) {
2357             copy = buf->end - buf->free;
2358             copy = nxt_min(l, copy);
2359 
2360             buf->free = nxt_cpymem(buf->free, b, copy);
2361             b += copy;
2362             l -= copy;
2363 
2364             if (l > 0) {
2365                 buf_size -= buf->end - buf->start;
2366 
2367                 rc = nxt_unit_buf_send(buf);
2368                 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2369                     nxt_unit_req_error(req, "Failed to send content");
2370 
2371                     return NXT_UNIT_ERROR;
2372                 }
2373 
2374                 buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
2375                                                           PORT_MMAP_DATA_SIZE));
2376                 if (nxt_slow_path(buf == NULL)) {
2377                     nxt_unit_req_error(req,
2378                                        "Failed to allocate buf for content");
2379 
2380                     return NXT_UNIT_ERROR;
2381                 }
2382             }
2383         }
2384     }
2385 
2386     if (buf->free > buf->start) {
2387         rc = nxt_unit_buf_send(buf);
2388         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2389             nxt_unit_req_error(req, "Failed to send content");
2390         }
2391     }
2392 
2393     return rc;
2394 }
2395 
2396 
2397 ssize_t
2398 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
2399     size_t size)
2400 {
2401     ssize_t   res;
2402     uint8_t   *b;
2403     uint64_t  i, d;
2404 
2405     res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
2406                             dst, size);
2407 
2408     if (ws->mask == NULL) {
2409         return res;
2410     }
2411 
2412     b = dst;
2413     d = (ws->payload_len - ws->content_length - res) % 4;
2414 
2415     for (i = 0; i < (uint64_t) res; i++) {
2416         b[i] ^= ws->mask[ (i + d) % 4 ];
2417     }
2418 
2419     return res;
2420 }
2421 
2422 
2423 int
2424 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
2425 {
2426     char                             *b;
2427     size_t                           size;
2428     nxt_unit_websocket_frame_impl_t  *ws_impl;
2429 
2430     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
2431 
2432     if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) {
2433         return NXT_UNIT_OK;
2434     }
2435 
2436     size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
2437 
2438     b = malloc(size);
2439     if (nxt_slow_path(b == NULL)) {
2440         return NXT_UNIT_ERROR;
2441     }
2442 
2443     memcpy(b, ws_impl->buf->buf.start, size);
2444 
2445     ws_impl->buf->buf.start = b;
2446     ws_impl->buf->buf.free = b;
2447     ws_impl->buf->buf.end = b + size;
2448 
2449     ws_impl->retain_buf = b;
2450 
2451     return NXT_UNIT_OK;
2452 }
2453 
2454 
2455 void
2456 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
2457 {
2458     nxt_unit_websocket_frame_release(ws);
2459 }
2460 
2461 
2462 static nxt_port_mmap_header_t *
2463 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2464     nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n)
2465 {
2466     int                     res, nchunks, i;
2467     nxt_unit_mmap_t         *mm, *mm_end;
2468     nxt_port_mmap_header_t  *hdr;
2469 
2470     pthread_mutex_lock(&process->outgoing.mutex);
2471 
2472     mm_end = process->outgoing.elts + process->outgoing.size;
2473 
2474     for (mm = process->outgoing.elts; mm < mm_end; mm++) {
2475         hdr = mm->hdr;
2476 
2477         if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) {
2478             continue;
2479         }
2480 
2481         *c = 0;
2482 
2483         while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
2484             nchunks = 1;
2485 
2486             while (nchunks < n) {
2487                 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
2488                                                        *c + nchunks);
2489 
2490                 if (res == 0) {
2491                     for (i = 0; i < nchunks; i++) {
2492                         nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
2493                     }
2494 
2495                     *c += nchunks + 1;
2496                     nchunks = 0;
2497                     break;
2498                 }
2499 
2500                 nchunks++;
2501             }
2502 
2503             if (nchunks == n) {
2504                 goto unlock;
2505             }
2506         }
2507     }
2508 
2509     *c = 0;
2510     hdr = nxt_unit_new_mmap(ctx, process, port_id, n);
2511 
2512 unlock:
2513 
2514     pthread_mutex_unlock(&process->outgoing.mutex);
2515 
2516     return hdr;
2517 }
2518 
2519 
2520 static nxt_unit_mmap_t *
2521 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
2522 {
2523     uint32_t  cap;
2524 
2525     cap = mmaps->cap;
2526 
2527     if (cap == 0) {
2528         cap = i + 1;
2529     }
2530 
2531     while (i + 1 > cap) {
2532 
2533         if (cap < 16) {
2534             cap = cap * 2;
2535 
2536         } else {
2537             cap = cap + cap / 2;
2538         }
2539     }
2540 
2541     if (cap != mmaps->cap) {
2542 
2543         mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
2544         if (nxt_slow_path(mmaps->elts == NULL)) {
2545             return NULL;
2546         }
2547 
2548         memset(mmaps->elts + mmaps->cap, 0,
2549                sizeof(*mmaps->elts) * (cap - mmaps->cap));
2550 
2551         mmaps->cap = cap;
2552     }
2553 
2554     if (i + 1 > mmaps->size) {
2555         mmaps->size = i + 1;
2556     }
2557 
2558     return mmaps->elts + i;
2559 }
2560 
2561 
2562 static nxt_port_mmap_header_t *
2563 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2564     nxt_unit_port_id_t *port_id, int n)
2565 {
2566     int                     i, fd, rc;
2567     void                    *mem;
2568     char                    name[64];
2569     nxt_unit_mmap_t         *mm;
2570     nxt_unit_impl_t         *lib;
2571     nxt_port_mmap_header_t  *hdr;
2572 
2573     lib = process->lib;
2574 
2575     mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
2576     if (nxt_slow_path(mm == NULL)) {
2577         nxt_unit_warn(ctx, "failed to add mmap to outgoing array");
2578 
2579         return NULL;
2580     }
2581 
2582     snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
2583              lib->pid, (void *) pthread_self());
2584 
2585 #if (NXT_HAVE_MEMFD_CREATE)
2586 
2587     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
2588     if (nxt_slow_path(fd == -1)) {
2589         nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
2590                        strerror(errno), errno);
2591 
2592         goto remove_fail;
2593     }
2594 
2595     nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
2596 
2597 #elif (NXT_HAVE_SHM_OPEN_ANON)
2598 
2599     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
2600     if (nxt_slow_path(fd == -1)) {
2601         nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
2602                        strerror(errno), errno);
2603 
2604         goto remove_fail;
2605     }
2606 
2607 #elif (NXT_HAVE_SHM_OPEN)
2608 
2609     /* Just in case. */
2610     shm_unlink(name);
2611 
2612     fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
2613     if (nxt_slow_path(fd == -1)) {
2614         nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
2615                        strerror(errno), errno);
2616 
2617         goto remove_fail;
2618     }
2619 
2620     if (nxt_slow_path(shm_unlink(name) == -1)) {
2621         nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
2622                       strerror(errno), errno);
2623     }
2624 
2625 #else
2626 
2627 #error No working shared memory implementation.
2628 
2629 #endif
2630 
2631     if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
2632         nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
2633                        strerror(errno), errno);
2634 
2635         goto remove_fail;
2636     }
2637 
2638     mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2639     if (nxt_slow_path(mem == MAP_FAILED)) {
2640         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
2641                        strerror(errno), errno);
2642 
2643         goto remove_fail;
2644     }
2645 
2646     mm->hdr = mem;
2647     hdr = mem;
2648 
2649     memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
2650     memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
2651 
2652     hdr->id = process->outgoing.size - 1;
2653     hdr->src_pid = lib->pid;
2654     hdr->dst_pid = process->pid;
2655     hdr->sent_over = port_id->id;
2656 
2657     /* Mark first n chunk(s) as busy */
2658     for (i = 0; i < n; i++) {
2659         nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
2660     }
2661 
2662     /* Mark as busy chunk followed the last available chunk. */
2663     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
2664     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
2665 
2666     pthread_mutex_unlock(&process->outgoing.mutex);
2667 
2668     rc = nxt_unit_send_mmap(ctx, port_id, fd);
2669     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2670         munmap(mem, PORT_MMAP_SIZE);
2671         hdr = NULL;
2672 
2673     } else {
2674         nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
2675                        hdr->id, (int) lib->pid, (int) process->pid);
2676     }
2677 
2678     close(fd);
2679 
2680     pthread_mutex_lock(&process->outgoing.mutex);
2681 
2682     if (nxt_fast_path(hdr != NULL)) {
2683         return hdr;
2684     }
2685 
2686 remove_fail:
2687 
2688     process->outgoing.size--;
2689 
2690     return NULL;
2691 }
2692 
2693 
2694 static int
2695 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
2696 {
2697     ssize_t          res;
2698     nxt_port_msg_t   msg;
2699     nxt_unit_impl_t  *lib;
2700     union {
2701         struct cmsghdr  cm;
2702         char            space[CMSG_SPACE(sizeof(int))];
2703     } cmsg;
2704 
2705     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2706 
2707     msg.stream = 0;
2708     msg.pid = lib->pid;
2709     msg.reply_port = 0;
2710     msg.type = _NXT_PORT_MSG_MMAP;
2711     msg.last = 0;
2712     msg.mmap = 0;
2713     msg.nf = 0;
2714     msg.mf = 0;
2715     msg.tracking = 0;
2716 
2717     /*
2718      * Fill all padding fields with 0.
2719      * Code in Go 1.11 validate cmsghdr using padding field as part of len.
2720      * See Cmsghdr definition and socketControlMessageHeaderAndData function.
2721      */
2722     memset(&cmsg, 0, sizeof(cmsg));
2723 
2724     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
2725     cmsg.cm.cmsg_level = SOL_SOCKET;
2726     cmsg.cm.cmsg_type = SCM_RIGHTS;
2727 
2728     /*
2729      * memcpy() is used instead of simple
2730      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
2731      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
2732      *   dereferencing type-punned pointer will break strict-aliasing rules
2733      *
2734      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
2735      * in the same simple assignment as in the code above.
2736      */
2737     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
2738 
2739     res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg),
2740                                    &cmsg, sizeof(cmsg));
2741     if (nxt_slow_path(res != sizeof(msg))) {
2742         nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)",
2743                       (int) port_id->pid, strerror(errno), errno);
2744 
2745         return NXT_UNIT_ERROR;
2746     }
2747 
2748     return NXT_UNIT_OK;
2749 }
2750 
2751 
2752 static int
2753 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2754     nxt_unit_port_id_t *port_id, uint32_t size,
2755     nxt_unit_mmap_buf_t *mmap_buf)
2756 {
2757     uint32_t                nchunks;
2758     nxt_chunk_id_t          c;
2759     nxt_port_mmap_header_t  *hdr;
2760 
2761     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
2762 
2763     hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks);
2764     if (nxt_slow_path(hdr == NULL)) {
2765         return NXT_UNIT_ERROR;
2766     }
2767 
2768     mmap_buf->hdr = hdr;
2769     mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
2770     mmap_buf->buf.free = mmap_buf->buf.start;
2771     mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
2772     mmap_buf->port_id = *port_id;
2773 
2774     nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
2775                   (int) hdr->id, (int) c,
2776                   (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
2777 
2778     return NXT_UNIT_OK;
2779 }
2780 
2781 
2782 static int
2783 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
2784 {
2785     int                      rc;
2786     void                     *mem;
2787     struct stat              mmap_stat;
2788     nxt_unit_mmap_t          *mm;
2789     nxt_unit_impl_t          *lib;
2790     nxt_unit_process_t       *process;
2791     nxt_port_mmap_header_t   *hdr;
2792 
2793     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2794 
2795     nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
2796 
2797     pthread_mutex_lock(&lib->mutex);
2798 
2799     process = nxt_unit_process_find(ctx, pid, 0);
2800 
2801     pthread_mutex_unlock(&lib->mutex);
2802 
2803     if (nxt_slow_path(process == NULL)) {
2804         nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d",
2805                       (int) pid, fd);
2806 
2807         return NXT_UNIT_ERROR;
2808     }
2809 
2810     rc = NXT_UNIT_ERROR;
2811 
2812     if (fstat(fd, &mmap_stat) == -1) {
2813         nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
2814                       strerror(errno), errno);
2815 
2816         goto fail;
2817     }
2818 
2819     mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
2820                MAP_SHARED, fd, 0);
2821     if (nxt_slow_path(mem == MAP_FAILED)) {
2822         nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
2823                       strerror(errno), errno);
2824 
2825         goto fail;
2826     }
2827 
2828     hdr = mem;
2829 
2830     if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {
2831 
2832         nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
2833                       "detected: %d != %d or %d != %d", (int) hdr->src_pid,
2834                       (int) pid, (int) hdr->dst_pid, (int) lib->pid);
2835 
2836         munmap(mem, PORT_MMAP_SIZE);
2837 
2838         goto fail;
2839     }
2840 
2841     pthread_mutex_lock(&process->incoming.mutex);
2842 
2843     mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
2844     if (nxt_slow_path(mm == NULL)) {
2845         nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");
2846 
2847         munmap(mem, PORT_MMAP_SIZE);
2848 
2849     } else {
2850         mm->hdr = hdr;
2851 
2852         hdr->sent_over = 0xFFFFu;
2853 
2854         rc = NXT_UNIT_OK;
2855     }
2856 
2857     pthread_mutex_unlock(&process->incoming.mutex);
2858 
2859 fail:
2860 
2861     nxt_unit_process_use(ctx, process, -1);
2862 
2863     return rc;
2864 }
2865 
2866 
2867 static void
2868 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
2869 {
2870     pthread_mutex_init(&mmaps->mutex, NULL);
2871 
2872     mmaps->size = 0;
2873     mmaps->cap = 0;
2874     mmaps->elts = NULL;
2875 }
2876 
2877 
2878 static void
2879 nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i)
2880 {
2881     long c;
2882 
2883     c = nxt_atomic_fetch_add(&process->use_count, i);
2884 
2885     if (i < 0 && c == -i) {
2886         nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid);
2887 
2888         nxt_unit_mmaps_destroy(&process->incoming);
2889         nxt_unit_mmaps_destroy(&process->outgoing);
2890 
2891         free(process);
2892     }
2893 }
2894 
2895 
2896 static void
2897 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
2898 {
2899     nxt_unit_mmap_t  *mm, *end;
2900 
2901     if (mmaps->elts != NULL) {
2902         end = mmaps->elts + mmaps->size;
2903 
2904         for (mm = mmaps->elts; mm < end; mm++) {
2905             munmap(mm->hdr, PORT_MMAP_SIZE);
2906         }
2907 
2908         free(mmaps->elts);
2909     }
2910 
2911     pthread_mutex_destroy(&mmaps->mutex);
2912 }
2913 
2914 
2915 static nxt_port_mmap_header_t *
2916 nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2917     uint32_t id)
2918 {
2919     nxt_port_mmap_header_t  *hdr;
2920 
2921     if (nxt_fast_path(process->incoming.size > id)) {
2922         hdr = process->incoming.elts[id].hdr;
2923 
2924     } else {
2925         hdr = NULL;
2926     }
2927 
2928     return hdr;
2929 }
2930 
2931 
2932 static int
2933 nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
2934 {
2935     int                           rc;
2936     nxt_chunk_id_t                c;
2937     nxt_unit_process_t            *process;
2938     nxt_port_mmap_header_t        *hdr;
2939     nxt_port_mmap_tracking_msg_t  *tracking_msg;
2940 
2941     if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
2942         nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
2943                       recv_msg->stream, (int) recv_msg->size);
2944 
2945         return 0;
2946     }
2947 
2948     tracking_msg = recv_msg->start;
2949 
2950     recv_msg->start = tracking_msg + 1;
2951     recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
2952 
2953     process = nxt_unit_msg_get_process(ctx, recv_msg);
2954     if (nxt_slow_path(process == NULL)) {
2955         return 0;
2956     }
2957 
2958     pthread_mutex_lock(&process->incoming.mutex);
2959 
2960     hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
2961     if (nxt_slow_path(hdr == NULL)) {
2962         pthread_mutex_unlock(&process->incoming.mutex);
2963 
2964         nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
2965                       "invalid mmap id %d,%"PRIu32,
2966                       recv_msg->stream, (int) process->pid,
2967                       tracking_msg->mmap_id);
2968 
2969         return 0;
2970     }
2971 
2972     c = tracking_msg->tracking_id;
2973     rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
2974 
2975     if (rc == 0) {
2976         nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
2977                        recv_msg->stream);
2978 
2979         nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
2980     }
2981 
2982     pthread_mutex_unlock(&process->incoming.mutex);
2983 
2984     return rc;
2985 }
2986 
2987 
2988 static int
2989 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
2990 {
2991     void                    *start;
2992     uint32_t                size;
2993     nxt_unit_process_t      *process;
2994     nxt_unit_mmap_buf_t     *b, **incoming_tail;
2995     nxt_port_mmap_msg_t     *mmap_msg, *end;
2996     nxt_port_mmap_header_t  *hdr;
2997 
2998     if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
2999         nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
3000                       recv_msg->stream, (int) recv_msg->size);
3001 
3002         return NXT_UNIT_ERROR;
3003     }
3004 
3005     process = nxt_unit_msg_get_process(ctx, recv_msg);
3006     if (nxt_slow_path(process == NULL)) {
3007         return NXT_UNIT_ERROR;
3008     }
3009 
3010     mmap_msg = recv_msg->start;
3011     end = nxt_pointer_to(recv_msg->start, recv_msg->size);
3012 
3013     incoming_tail = &recv_msg->incoming_buf;
3014 
3015     pthread_mutex_lock(&process->incoming.mutex);
3016 
3017     for (; mmap_msg < end; mmap_msg++) {
3018         hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
3019         if (nxt_slow_path(hdr == NULL)) {
3020             pthread_mutex_unlock(&process->incoming.mutex);
3021 
3022             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
3023                           "invalid mmap id %d,%"PRIu32,
3024                           recv_msg->stream, (int) process->pid,
3025                           mmap_msg->mmap_id);
3026 
3027             return NXT_UNIT_ERROR;
3028         }
3029 
3030         start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
3031         size = mmap_msg->size;
3032 
3033         if (recv_msg->start == mmap_msg) {
3034             recv_msg->start = start;
3035             recv_msg->size = size;
3036         }
3037 
3038         b = nxt_unit_mmap_buf_get(ctx);
3039         if (nxt_slow_path(b == NULL)) {
3040             pthread_mutex_unlock(&process->incoming.mutex);
3041 
3042             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
3043                           recv_msg->stream);
3044 
3045             nxt_unit_mmap_release(hdr, start, size);
3046 
3047             return NXT_UNIT_ERROR;
3048         }
3049 
3050         nxt_unit_mmap_buf_insert(incoming_tail, b);
3051         incoming_tail = &b->next;
3052 
3053         b->buf.start = start;
3054         b->buf.free = start;
3055         b->buf.end = b->buf.start + size;
3056         b->hdr = hdr;
3057 
3058         nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
3059                        recv_msg->stream,
3060                        start, (int) size,
3061                        (int) hdr->src_pid, (int) hdr->dst_pid,
3062                        (int) hdr->id, (int) mmap_msg->chunk_id,
3063                        (int) mmap_msg->size);
3064     }
3065 
3066     pthread_mutex_unlock(&process->incoming.mutex);
3067 
3068     return NXT_UNIT_OK;
3069 }
3070 
3071 
3072 static int
3073 nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size)
3074 {
3075     u_char          *p, *end;
3076     nxt_chunk_id_t  c;
3077 
3078     memset(start, 0xA5, size);
3079 
3080     p = start;
3081     end = p + size;
3082     c = nxt_port_mmap_chunk_id(hdr, p);
3083 
3084     while (p < end) {
3085         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
3086 
3087         p += PORT_MMAP_CHUNK_SIZE;
3088         c++;
3089     }
3090 
3091     return NXT_UNIT_OK;
3092 }
3093 
3094 
3095 static nxt_int_t
3096 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
3097 {
3098     nxt_process_t  *process;
3099 
3100     process = data;
3101 
3102     if (lhq->key.length == sizeof(pid_t)
3103         && *(pid_t *) lhq->key.start == process->pid)
3104     {
3105         return NXT_OK;
3106     }
3107 
3108     return NXT_DECLINED;
3109 }
3110 
3111 
3112 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
3113     NXT_LVLHSH_DEFAULT,
3114     nxt_unit_lvlhsh_pid_test,
3115     nxt_lvlhsh_alloc,
3116     nxt_lvlhsh_free,
3117 };
3118 
3119 
3120 static inline void
3121 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
3122 {
3123     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
3124     lhq->key.length = sizeof(*pid);
3125     lhq->key.start = (u_char *) pid;
3126     lhq->proto = &lvlhsh_processes_proto;
3127 }
3128 
3129 
3130 static nxt_unit_process_t *
3131 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
3132 {
3133     nxt_unit_impl_t     *lib;
3134     nxt_unit_process_t  *process;
3135     nxt_lvlhsh_query_t  lhq;
3136 
3137     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3138 
3139     nxt_unit_process_lhq_pid(&lhq, &pid);
3140 
3141     if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
3142         process = lhq.value;
3143         nxt_unit_process_use(ctx, process, 1);
3144 
3145         return process;
3146     }
3147 
3148     process = malloc(sizeof(nxt_unit_process_t));
3149     if (nxt_slow_path(process == NULL)) {
3150         nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid);
3151 
3152         return NULL;
3153     }
3154 
3155     process->pid = pid;
3156     process->use_count = 1;
3157     process->next_port_id = 0;
3158     process->lib = lib;
3159 
3160     nxt_queue_init(&process->ports);
3161 
3162     nxt_unit_mmaps_init(&process->incoming);
3163     nxt_unit_mmaps_init(&process->outgoing);
3164 
3165     lhq.replace = 0;
3166     lhq.value = process;
3167 
3168     switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
3169 
3170     case NXT_OK:
3171         break;
3172 
3173     default:
3174         nxt_unit_warn(ctx, "process %d insert failed", (int) pid);
3175 
3176         pthread_mutex_destroy(&process->outgoing.mutex);
3177         pthread_mutex_destroy(&process->incoming.mutex);
3178         free(process);
3179         process = NULL;
3180         break;
3181     }
3182 
3183     nxt_unit_process_use(ctx, process, 1);
3184 
3185     return process;
3186 }
3187 
3188 
3189 static nxt_unit_process_t *
3190 nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove)
3191 {
3192     int                 rc;
3193     nxt_unit_impl_t     *lib;
3194     nxt_unit_process_t  *process;
3195     nxt_lvlhsh_query_t  lhq;
3196 
3197     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3198 
3199     nxt_unit_process_lhq_pid(&lhq, &pid);
3200 
3201     if (remove) {
3202         rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
3203 
3204     } else {
3205         rc = nxt_lvlhsh_find(&lib->processes, &lhq);
3206     }
3207 
3208     if (rc == NXT_OK) {
3209         process = lhq.value;
3210 
3211         if (!remove) {
3212             nxt_unit_process_use(ctx, process, 1);
3213         }
3214 
3215         return process;
3216     }
3217 
3218     return NULL;
3219 }
3220 
3221 
3222 static nxt_unit_process_t *
3223 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
3224 {
3225     return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
3226 }
3227 
3228 
3229 int
3230 nxt_unit_run(nxt_unit_ctx_t *ctx)
3231 {
3232     int              rc;
3233     nxt_unit_impl_t  *lib;
3234 
3235     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3236     rc = NXT_UNIT_OK;
3237 
3238     while (nxt_fast_path(lib->online)) {
3239         rc = nxt_unit_run_once(ctx);
3240     }
3241 
3242     return rc;
3243 }
3244 
3245 
3246 int
3247 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
3248 {
3249     int                  rc;
3250     char                 buf[4096];
3251     char                 oob[256];
3252     ssize_t              rsize;
3253     nxt_unit_impl_t      *lib;
3254     nxt_unit_ctx_impl_t  *ctx_impl;
3255 
3256     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3257     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3258 
3259     memset(oob, 0, sizeof(struct cmsghdr));
3260 
3261     if (ctx_impl->read_port_fd != -1) {
3262         rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
3263                                          buf, sizeof(buf),
3264                                          oob, sizeof(oob));
3265     } else {
3266         rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
3267                                          buf, sizeof(buf),
3268                                          oob, sizeof(oob));
3269     }
3270 
3271     if (nxt_fast_path(rsize > 0)) {
3272         rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize,
3273                                   oob, sizeof(oob));
3274 
3275 #if (NXT_DEBUG)
3276         memset(buf, 0xAC, rsize);
3277 #endif
3278 
3279     } else {
3280         rc = NXT_UNIT_ERROR;
3281     }
3282 
3283     return rc;
3284 }
3285 
3286 
3287 void
3288 nxt_unit_done(nxt_unit_ctx_t *ctx)
3289 {
3290     nxt_unit_impl_t      *lib;
3291     nxt_unit_process_t   *process;
3292     nxt_unit_ctx_impl_t  *ctx_impl;
3293 
3294     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3295 
3296     nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
3297 
3298         nxt_unit_ctx_free(&ctx_impl->ctx);
3299 
3300     } nxt_queue_loop;
3301 
3302     for ( ;; ) {
3303         pthread_mutex_lock(&lib->mutex);
3304 
3305         process = nxt_unit_process_pop_first(lib);
3306         if (process == NULL) {
3307             pthread_mutex_unlock(&lib->mutex);
3308 
3309             break;
3310         }
3311 
3312         nxt_unit_remove_process(ctx, process);
3313     }
3314 
3315     pthread_mutex_destroy(&lib->mutex);
3316 
3317     free(lib);
3318 }
3319 
3320 
3321 nxt_unit_ctx_t *
3322 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
3323 {
3324     int                  rc, fd;
3325     nxt_unit_impl_t      *lib;
3326     nxt_unit_port_id_t   new_port_id;
3327     nxt_unit_ctx_impl_t  *new_ctx;
3328 
3329     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3330 
3331     new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
3332     if (nxt_slow_path(new_ctx == NULL)) {
3333         nxt_unit_warn(ctx, "failed to allocate context");
3334 
3335         return NULL;
3336     }
3337 
3338     rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
3339     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3340         free(new_ctx);
3341 
3342         return NULL;
3343     }
3344 
3345     rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd);
3346     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3347         lib->callbacks.remove_port(ctx, &new_port_id);
3348 
3349         close(fd);
3350 
3351         free(new_ctx);
3352 
3353         return NULL;
3354     }
3355 
3356     close(fd);
3357 
3358     rc = nxt_unit_ctx_init(lib, new_ctx, data);
3359     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3360         lib->callbacks.remove_port(ctx, &new_port_id);
3361 
3362         free(new_ctx);
3363 
3364         return NULL;
3365     }
3366 
3367     new_ctx->read_port_id = new_port_id;
3368 
3369     return &new_ctx->ctx;
3370 }
3371 
3372 
3373 void
3374 nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
3375 {
3376     nxt_unit_impl_t                  *lib;
3377     nxt_unit_ctx_impl_t              *ctx_impl;
3378     nxt_unit_mmap_buf_t              *mmap_buf;
3379     nxt_unit_request_info_impl_t     *req_impl;
3380     nxt_unit_websocket_frame_impl_t  *ws_impl;
3381 
3382     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3383     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3384 
3385     nxt_queue_each(req_impl, &ctx_impl->active_req,
3386                    nxt_unit_request_info_impl_t, link)
3387     {
3388         nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
3389 
3390         nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
3391 
3392     } nxt_queue_loop;
3393 
3394     nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]);
3395     nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]);
3396 
3397     while (ctx_impl->free_buf != NULL) {
3398         mmap_buf = ctx_impl->free_buf;
3399         nxt_unit_mmap_buf_remove(mmap_buf);
3400         free(mmap_buf);
3401     }
3402 
3403     nxt_queue_each(req_impl, &ctx_impl->free_req,
3404                    nxt_unit_request_info_impl_t, link)
3405     {
3406         nxt_unit_request_info_free(req_impl);
3407 
3408     } nxt_queue_loop;
3409 
3410     nxt_queue_each(ws_impl, &ctx_impl->free_ws,
3411                    nxt_unit_websocket_frame_impl_t, link)
3412     {
3413         nxt_unit_websocket_frame_free(ws_impl);
3414 
3415     } nxt_queue_loop;
3416 
3417     pthread_mutex_destroy(&ctx_impl->mutex);
3418 
3419     nxt_queue_remove(&ctx_impl->link);
3420 
3421     if (ctx_impl != &lib->main_ctx) {
3422         free(ctx_impl);
3423     }
3424 }
3425 
3426 
3427 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
3428 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
3429 #define NXT_UNIX_SOCKET  SOCK_SEQPACKET
3430 #else
3431 #define NXT_UNIX_SOCKET  SOCK_DGRAM
3432 #endif
3433 
3434 
3435 void
3436 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
3437 {
3438     nxt_unit_port_hash_id_t  port_hash_id;
3439 
3440     port_hash_id.pid = pid;
3441     port_hash_id.id = id;
3442 
3443     port_id->pid = pid;
3444     port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
3445     port_id->id = id;
3446 }
3447 
3448 
3449 int
3450 nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
3451     nxt_unit_port_id_t *port_id)
3452 {
3453     int                 rc, fd;
3454     nxt_unit_impl_t     *lib;
3455     nxt_unit_port_id_t  new_port_id;
3456 
3457     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3458 
3459     rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
3460     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3461         return rc;
3462     }
3463 
3464     rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd);
3465 
3466     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
3467         *port_id = new_port_id;
3468 
3469     } else {
3470         lib->callbacks.remove_port(ctx, &new_port_id);
3471     }
3472 
3473     close(fd);
3474 
3475     return rc;
3476 }
3477 
3478 
3479 static int
3480 nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
3481 {
3482     int                 rc, port_sockets[2];
3483     nxt_unit_impl_t     *lib;
3484     nxt_unit_port_t     new_port;
3485     nxt_unit_process_t  *process;
3486 
3487     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3488 
3489     rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
3490     if (nxt_slow_path(rc != 0)) {
3491         nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
3492                       strerror(errno), errno);
3493 
3494         return NXT_UNIT_ERROR;
3495     }
3496 
3497     nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
3498                    port_sockets[0], port_sockets[1]);
3499 
3500     pthread_mutex_lock(&lib->mutex);
3501 
3502     process = nxt_unit_process_get(ctx, lib->pid);
3503     if (nxt_slow_path(process == NULL)) {
3504         pthread_mutex_unlock(&lib->mutex);
3505 
3506         close(port_sockets[0]);
3507         close(port_sockets[1]);
3508 
3509         return NXT_UNIT_ERROR;
3510     }
3511 
3512     nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
3513 
3514     new_port.in_fd = port_sockets[0];
3515     new_port.out_fd = -1;
3516     new_port.data = NULL;
3517 
3518     pthread_mutex_unlock(&lib->mutex);
3519 
3520     nxt_unit_process_use(ctx, process, -1);
3521 
3522     rc = lib->callbacks.add_port(ctx, &new_port);
3523     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3524         nxt_unit_warn(ctx, "create_port: add_port() failed");
3525 
3526         close(port_sockets[0]);
3527         close(port_sockets[1]);
3528 
3529         return rc;
3530     }
3531 
3532     *port_id = new_port.id;
3533     *fd = port_sockets[1];
3534 
3535     return rc;
3536 }
3537 
3538 
3539 static int
3540 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
3541     nxt_unit_port_id_t *new_port, int fd)
3542 {
3543     ssize_t          res;
3544     nxt_unit_impl_t  *lib;
3545 
3546     struct {
3547         nxt_port_msg_t            msg;
3548         nxt_port_msg_new_port_t   new_port;
3549     } m;
3550 
3551     union {
3552         struct cmsghdr  cm;
3553         char            space[CMSG_SPACE(sizeof(int))];
3554     } cmsg;
3555 
3556     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3557 
3558     m.msg.stream = 0;
3559     m.msg.pid = lib->pid;
3560     m.msg.reply_port = 0;
3561     m.msg.type = _NXT_PORT_MSG_NEW_PORT;
3562     m.msg.last = 0;
3563     m.msg.mmap = 0;
3564     m.msg.nf = 0;
3565     m.msg.mf = 0;
3566     m.msg.tracking = 0;
3567 
3568     m.new_port.id = new_port->id;
3569     m.new_port.pid = new_port->pid;
3570     m.new_port.type = NXT_PROCESS_WORKER;
3571     m.new_port.max_size = 16 * 1024;
3572     m.new_port.max_share = 64 * 1024;
3573 
3574     memset(&cmsg, 0, sizeof(cmsg));
3575 
3576     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
3577     cmsg.cm.cmsg_level = SOL_SOCKET;
3578     cmsg.cm.cmsg_type = SCM_RIGHTS;
3579 
3580     /*
3581      * memcpy() is used instead of simple
3582      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
3583      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
3584      *   dereferencing type-punned pointer will break strict-aliasing rules
3585      *
3586      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
3587      * in the same simple assignment as in the code above.
3588      */
3589     memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
3590 
3591     res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m),
3592                                    &cmsg, sizeof(cmsg));
3593 
3594     return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
3595 }
3596 
3597 
3598 int
3599 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
3600 {
3601     int                   rc;
3602     nxt_unit_impl_t       *lib;
3603     nxt_unit_process_t    *process;
3604     nxt_unit_port_impl_t  *new_port;
3605 
3606     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3607 
3608     nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
3609                    port->id.pid, port->id.id,
3610                    port->in_fd, port->out_fd);
3611 
3612     pthread_mutex_lock(&lib->mutex);
3613 
3614     process = nxt_unit_process_get(ctx, port->id.pid);
3615     if (nxt_slow_path(process == NULL)) {
3616         rc = NXT_UNIT_ERROR;
3617         goto unlock;
3618     }
3619 
3620     if (port->id.id >= process->next_port_id) {
3621         process->next_port_id = port->id.id + 1;
3622     }
3623 
3624     new_port = malloc(sizeof(nxt_unit_port_impl_t));
3625     if (nxt_slow_path(new_port == NULL)) {
3626         rc = NXT_UNIT_ERROR;
3627         goto unlock;
3628     }
3629 
3630     new_port->port = *port;
3631 
3632     rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
3633     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3634         goto unlock;
3635     }
3636 
3637     nxt_queue_insert_tail(&process->ports, &new_port->link);
3638 
3639     rc = NXT_UNIT_OK;
3640 
3641     new_port->process = process;
3642 
3643 unlock:
3644 
3645     pthread_mutex_unlock(&lib->mutex);
3646 
3647     if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) {
3648         nxt_unit_process_use(ctx, process, -1);
3649     }
3650 
3651     return rc;
3652 }
3653 
3654 
3655 void
3656 nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
3657 {
3658     nxt_unit_find_remove_port(ctx, port_id, NULL);
3659 }
3660 
3661 
3662 void
3663 nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
3664     nxt_unit_port_t *r_port)
3665 {
3666     nxt_unit_impl_t     *lib;
3667     nxt_unit_process_t  *process;
3668 
3669     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3670 
3671     pthread_mutex_lock(&lib->mutex);
3672 
3673     process = NULL;
3674 
3675     nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process);
3676 
3677     pthread_mutex_unlock(&lib->mutex);
3678 
3679     if (nxt_slow_path(process != NULL)) {
3680         nxt_unit_process_use(ctx, process, -1);
3681     }
3682 }
3683 
3684 
3685 static void
3686 nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
3687     nxt_unit_port_t *r_port, nxt_unit_process_t **process)
3688 {
3689     nxt_unit_impl_t       *lib;
3690     nxt_unit_port_impl_t  *port;
3691 
3692     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3693 
3694     port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
3695     if (nxt_slow_path(port == NULL)) {
3696         nxt_unit_debug(ctx, "remove_port: port %d,%d not found",
3697                        (int) port_id->pid, (int) port_id->id);
3698 
3699         return;
3700     }
3701 
3702     nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p",
3703                    (int) port_id->pid, (int) port_id->id,
3704                    port->port.in_fd, port->port.out_fd, port->port.data);
3705 
3706     if (port->port.in_fd != -1) {
3707         close(port->port.in_fd);
3708     }
3709 
3710     if (port->port.out_fd != -1) {
3711         close(port->port.out_fd);
3712     }
3713 
3714     if (port->process != NULL) {
3715         nxt_queue_remove(&port->link);
3716     }
3717 
3718     if (process != NULL) {
3719         *process = port->process;
3720     }
3721 
3722     if (r_port != NULL) {
3723         *r_port = port->port;
3724     }
3725 
3726     free(port);
3727 }
3728 
3729 
3730 void
3731 nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid)
3732 {
3733     nxt_unit_impl_t     *lib;
3734     nxt_unit_process_t  *process;
3735 
3736     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3737 
3738     pthread_mutex_lock(&lib->mutex);
3739 
3740     process = nxt_unit_process_find(ctx, pid, 1);
3741     if (nxt_slow_path(process == NULL)) {
3742         nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid);
3743 
3744         pthread_mutex_unlock(&lib->mutex);
3745 
3746         return;
3747     }
3748 
3749     nxt_unit_remove_process(ctx, process);
3750 }
3751 
3752 
3753 static void
3754 nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process)
3755 {
3756     nxt_queue_t           ports;
3757     nxt_unit_impl_t       *lib;
3758     nxt_unit_port_impl_t  *port;
3759 
3760     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3761 
3762     nxt_queue_init(&ports);
3763 
3764     nxt_queue_add(&ports, &process->ports);
3765 
3766     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
3767 
3768         nxt_unit_process_use(ctx, process, -1);
3769         port->process = NULL;
3770 
3771         /* Shortcut for default callback. */
3772         if (lib->callbacks.remove_port == nxt_unit_remove_port) {
3773             nxt_queue_remove(&port->link);
3774 
3775             nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL);
3776         }
3777 
3778     } nxt_queue_loop;
3779 
3780     pthread_mutex_unlock(&lib->mutex);
3781 
3782     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
3783 
3784         nxt_queue_remove(&port->link);
3785 
3786         lib->callbacks.remove_port(ctx, &port->port.id);
3787 
3788     } nxt_queue_loop;
3789 
3790     nxt_unit_process_use(ctx, process, -1);
3791 }
3792 
3793 
3794 void
3795 nxt_unit_quit(nxt_unit_ctx_t *ctx)
3796 {
3797     nxt_unit_impl_t  *lib;
3798 
3799     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3800 
3801     lib->online = 0;
3802 }
3803 
3804 
3805 static ssize_t
3806 nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
3807     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
3808 {
3809     int                   fd;
3810     nxt_unit_impl_t       *lib;
3811     nxt_unit_port_impl_t  *port;
3812 
3813     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3814 
3815     pthread_mutex_lock(&lib->mutex);
3816 
3817     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
3818 
3819     if (nxt_fast_path(port != NULL)) {
3820         fd = port->port.out_fd;
3821 
3822     } else {
3823         nxt_unit_warn(ctx, "port_send: port %d,%d not found",
3824                       (int) port_id->pid, (int) port_id->id);
3825         fd = -1;
3826     }
3827 
3828     pthread_mutex_unlock(&lib->mutex);
3829 
3830     if (nxt_slow_path(fd == -1)) {
3831         if (port != NULL) {
3832             nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1",
3833                           (int) port_id->pid, (int) port_id->id);
3834         }
3835 
3836         return -1;
3837     }
3838 
3839     nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d",
3840                    (int) port_id->pid, (int) port_id->id, fd);
3841 
3842     return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size);
3843 }
3844 
3845 
3846 ssize_t
3847 nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd,
3848     const void *buf, size_t buf_size, const void *oob, size_t oob_size)
3849 {
3850     ssize_t        res;
3851     struct iovec   iov[1];
3852     struct msghdr  msg;
3853 
3854     iov[0].iov_base = (void *) buf;
3855     iov[0].iov_len = buf_size;
3856 
3857     msg.msg_name = NULL;
3858     msg.msg_namelen = 0;
3859     msg.msg_iov = iov;
3860     msg.msg_iovlen = 1;
3861     msg.msg_flags = 0;
3862     msg.msg_control = (void *) oob;
3863     msg.msg_controllen = oob_size;
3864 
3865     res = sendmsg(fd, &msg, 0);
3866 
3867     if (nxt_slow_path(res == -1)) {
3868         nxt_unit_warn(ctx, "port_send(%d, %d) failed: %s (%d)",
3869                       fd, (int) buf_size, strerror(errno), errno);
3870 
3871     } else {
3872         nxt_unit_debug(ctx, "port_send(%d, %d): %d", fd, (int) buf_size,
3873                        (int) res);
3874     }
3875 
3876     return res;
3877 }
3878 
3879 
3880 static ssize_t
3881 nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
3882     void *buf, size_t buf_size, void *oob, size_t oob_size)
3883 {
3884     int                   fd;
3885     nxt_unit_impl_t       *lib;
3886     nxt_unit_ctx_impl_t   *ctx_impl;
3887     nxt_unit_port_impl_t  *port;
3888 
3889     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3890 
3891     pthread_mutex_lock(&lib->mutex);
3892 
3893     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
3894 
3895     if (nxt_fast_path(port != NULL)) {
3896         fd = port->port.in_fd;
3897 
3898     } else {
3899         nxt_unit_debug(ctx, "port_recv: port %d,%d not found",
3900                        (int) port_id->pid, (int) port_id->id);
3901         fd = -1;
3902     }
3903 
3904     pthread_mutex_unlock(&lib->mutex);
3905 
3906     if (nxt_slow_path(fd == -1)) {
3907         return -1;
3908     }
3909 
3910     nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d",
3911                    (int) port_id->pid, (int) port_id->id, fd);
3912 
3913     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3914 
3915     if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) {
3916         ctx_impl->read_port_fd = fd;
3917     }
3918 
3919     return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size);
3920 }
3921 
3922 
3923 ssize_t
3924 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size,
3925     void *oob, size_t oob_size)
3926 {
3927     ssize_t        res;
3928     struct iovec   iov[1];
3929     struct msghdr  msg;
3930 
3931     iov[0].iov_base = buf;
3932     iov[0].iov_len = buf_size;
3933 
3934     msg.msg_name = NULL;
3935     msg.msg_namelen = 0;
3936     msg.msg_iov = iov;
3937     msg.msg_iovlen = 1;
3938     msg.msg_flags = 0;
3939     msg.msg_control = oob;
3940     msg.msg_controllen = oob_size;
3941 
3942     res = recvmsg(fd, &msg, 0);
3943 
3944     if (nxt_slow_path(res == -1)) {
3945         nxt_unit_warn(ctx, "port_recv(%d) failed: %s (%d)",
3946                       fd, strerror(errno), errno);
3947 
3948     } else {
3949         nxt_unit_debug(ctx, "port_recv(%d): %d", fd, (int) res);
3950     }
3951 
3952     return res;
3953 }
3954 
3955 
3956 static nxt_int_t
3957 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
3958 {
3959     nxt_unit_port_t          *port;
3960     nxt_unit_port_hash_id_t  *port_id;
3961 
3962     port = data;
3963     port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
3964 
3965     if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
3966         && port_id->pid == port->id.pid
3967         && port_id->id == port->id.id)
3968     {
3969         return NXT_OK;
3970     }
3971 
3972     return NXT_DECLINED;
3973 }
3974 
3975 
3976 static const nxt_lvlhsh_proto_t  lvlhsh_ports_proto  nxt_aligned(64) = {
3977     NXT_LVLHSH_DEFAULT,
3978     nxt_unit_port_hash_test,
3979     nxt_lvlhsh_alloc,
3980     nxt_lvlhsh_free,
3981 };
3982 
3983 
3984 static inline void
3985 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
3986     nxt_unit_port_hash_id_t *port_hash_id,
3987     nxt_unit_port_id_t *port_id)
3988 {
3989     port_hash_id->pid = port_id->pid;
3990     port_hash_id->id = port_id->id;
3991 
3992     if (nxt_fast_path(port_id->hash != 0)) {
3993         lhq->key_hash = port_id->hash;
3994 
3995     } else {
3996         lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
3997 
3998         port_id->hash = lhq->key_hash;
3999 
4000         nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
4001                        (int) port_id->pid, (int) port_id->id,
4002                        (int) port_id->hash);
4003     }
4004 
4005     lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
4006     lhq->key.start = (u_char *) port_hash_id;
4007     lhq->proto = &lvlhsh_ports_proto;
4008     lhq->pool = NULL;
4009 }
4010 
4011 
4012 static int
4013 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
4014 {
4015     nxt_int_t                res;
4016     nxt_lvlhsh_query_t       lhq;
4017     nxt_unit_port_hash_id_t  port_hash_id;
4018 
4019     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
4020     lhq.replace = 0;
4021     lhq.value = port;
4022 
4023     res = nxt_lvlhsh_insert(port_hash, &lhq);
4024 
4025     switch (res) {
4026 
4027     case NXT_OK:
4028         return NXT_UNIT_OK;
4029 
4030     default:
4031         return NXT_UNIT_ERROR;
4032     }
4033 }
4034 
4035 
4036 static nxt_unit_port_impl_t *
4037 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
4038     int remove)
4039 {
4040     nxt_int_t                res;
4041     nxt_lvlhsh_query_t       lhq;
4042     nxt_unit_port_hash_id_t  port_hash_id;
4043 
4044     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
4045 
4046     if (remove) {
4047         res = nxt_lvlhsh_delete(port_hash, &lhq);
4048 
4049     } else {
4050         res = nxt_lvlhsh_find(port_hash, &lhq);
4051     }
4052 
4053     switch (res) {
4054 
4055     case NXT_OK:
4056         return lhq.value;
4057 
4058     default:
4059         return NULL;
4060     }
4061 }
4062 
4063 
4064 static nxt_int_t
4065 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
4066 {
4067     return NXT_OK;
4068 }
4069 
4070 
4071 static const nxt_lvlhsh_proto_t  lvlhsh_requests_proto  nxt_aligned(64) = {
4072     NXT_LVLHSH_DEFAULT,
4073     nxt_unit_request_hash_test,
4074     nxt_lvlhsh_alloc,
4075     nxt_lvlhsh_free,
4076 };
4077 
4078 
4079 static int
4080 nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
4081     nxt_unit_request_info_impl_t *req_impl)
4082 {
4083     uint32_t            *stream;
4084     nxt_int_t           res;
4085     nxt_lvlhsh_query_t  lhq;
4086 
4087     stream = &req_impl->stream;
4088 
4089     lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
4090     lhq.key.length = sizeof(*stream);
4091     lhq.key.start = (u_char *) stream;
4092     lhq.proto = &lvlhsh_requests_proto;
4093     lhq.pool = NULL;
4094     lhq.replace = 0;
4095     lhq.value = req_impl;
4096 
4097     res = nxt_lvlhsh_insert(request_hash, &lhq);
4098 
4099     switch (res) {
4100 
4101     case NXT_OK:
4102         return NXT_UNIT_OK;
4103 
4104     default:
4105         return NXT_UNIT_ERROR;
4106     }
4107 }
4108 
4109 
4110 static nxt_unit_request_info_impl_t *
4111 nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
4112     int remove)
4113 {
4114     nxt_int_t           res;
4115     nxt_lvlhsh_query_t  lhq;
4116 
4117     lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
4118     lhq.key.length = sizeof(stream);
4119     lhq.key.start = (u_char *) &stream;
4120     lhq.proto = &lvlhsh_requests_proto;
4121     lhq.pool = NULL;
4122 
4123     if (remove) {
4124         res = nxt_lvlhsh_delete(request_hash, &lhq);
4125 
4126     } else {
4127         res = nxt_lvlhsh_find(request_hash, &lhq);
4128     }
4129 
4130     switch (res) {
4131 
4132     case NXT_OK:
4133         return lhq.value;
4134 
4135     default:
4136         return NULL;
4137     }
4138 }
4139 
4140 
4141 void
4142 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
4143 {
4144     int              log_fd, n;
4145     char             msg[NXT_MAX_ERROR_STR], *p, *end;
4146     pid_t            pid;
4147     va_list          ap;
4148     nxt_unit_impl_t  *lib;
4149 
4150     if (nxt_fast_path(ctx != NULL)) {
4151         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4152 
4153         pid = lib->pid;
4154         log_fd = lib->log_fd;
4155 
4156     } else {
4157         pid = getpid();
4158         log_fd = STDERR_FILENO;
4159     }
4160 
4161     p = msg;
4162     end = p + sizeof(msg) - 1;
4163 
4164     p = nxt_unit_snprint_prefix(p, end, pid, level);
4165 
4166     va_start(ap, fmt);
4167     p += vsnprintf(p, end - p, fmt, ap);
4168     va_end(ap);
4169 
4170     if (nxt_slow_path(p > end)) {
4171         memcpy(end - 5, "[...]", 5);
4172         p = end;
4173     }
4174 
4175     *p++ = '\n';
4176 
4177     n = write(log_fd, msg, p - msg);
4178     if (nxt_slow_path(n < 0)) {
4179         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
4180     }
4181 }
4182 
4183 
4184 void
4185 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
4186 {
4187     int                           log_fd, n;
4188     char                          msg[NXT_MAX_ERROR_STR], *p, *end;
4189     pid_t                         pid;
4190     va_list                       ap;
4191     nxt_unit_impl_t               *lib;
4192     nxt_unit_request_info_impl_t  *req_impl;
4193 
4194     if (nxt_fast_path(req != NULL)) {
4195         lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
4196 
4197         pid = lib->pid;
4198         log_fd = lib->log_fd;
4199 
4200     } else {
4201         pid = getpid();
4202         log_fd = STDERR_FILENO;
4203     }
4204 
4205     p = msg;
4206     end = p + sizeof(msg) - 1;
4207 
4208     p = nxt_unit_snprint_prefix(p, end, pid, level);
4209 
4210     if (nxt_fast_path(req != NULL)) {
4211         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
4212 
4213         p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
4214     }
4215 
4216     va_start(ap, fmt);
4217     p += vsnprintf(p, end - p, fmt, ap);
4218     va_end(ap);
4219 
4220     if (nxt_slow_path(p > end)) {
4221         memcpy(end - 5, "[...]", 5);
4222         p = end;
4223     }
4224 
4225     *p++ = '\n';
4226 
4227     n = write(log_fd, msg, p - msg);
4228     if (nxt_slow_path(n < 0)) {
4229         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
4230     }
4231 }
4232 
4233 
4234 static const char * nxt_unit_log_levels[] = {
4235     "alert",
4236     "error",
4237     "warn",
4238     "notice",
4239     "info",
4240     "debug",
4241 };
4242 
4243 
4244 static char *
4245 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
4246 {
4247     struct tm        tm;
4248     struct timespec  ts;
4249 
4250     (void) clock_gettime(CLOCK_REALTIME, &ts);
4251 
4252 #if (NXT_HAVE_LOCALTIME_R)
4253     (void) localtime_r(&ts.tv_sec, &tm);
4254 #else
4255     tm = *localtime(&ts.tv_sec);
4256 #endif
4257 
4258     p += snprintf(p, end - p,
4259                   "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
4260                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
4261                   tm.tm_hour, tm.tm_min, tm.tm_sec,
4262                   (int) ts.tv_nsec / 1000000);
4263 
4264     p += snprintf(p, end - p,
4265                   "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
4266                   (int) pid,
4267                   (uint64_t) (uintptr_t) nxt_thread_get_tid());
4268 
4269     return p;
4270 }
4271 
4272 
4273 /* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */
4274 
4275 void *
4276 nxt_memalign(size_t alignment, size_t size)
4277 {
4278     void        *p;
4279     nxt_err_t   err;
4280 
4281     err = posix_memalign(&p, alignment, size);
4282 
4283     if (nxt_fast_path(err == 0)) {
4284         return p;
4285     }
4286 
4287     return NULL;
4288 }
4289 
4290 #if (NXT_DEBUG)
4291 
4292 void
4293 nxt_free(void *p)
4294 {
4295     free(p);
4296 }
4297 
4298 #endif
4299