xref: /unit/src/nxt_port_queue.h (revision 1555:1d84b9e4b459)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #ifndef _NXT_PORT_QUEUE_H_INCLUDED_
7 #define _NXT_PORT_QUEUE_H_INCLUDED_
8 
9 
10 #include <nxt_nncq.h>
11 
12 
13 /* Using Numeric Naive Circular Queue as a backend. */
14 
15 #define NXT_PORT_QUEUE_SIZE      NXT_NNCQ_SIZE
16 #define NXT_PORT_QUEUE_MSG_SIZE  31
17 
18 
19 typedef struct {
20     uint8_t   size;
21     uint8_t   data[NXT_PORT_QUEUE_MSG_SIZE];
22 } nxt_port_queue_item_t;
23 
24 
25 typedef struct {
26     nxt_nncq_atomic_t      nitems;
27     nxt_nncq_t             free_items;
28     nxt_nncq_t             queue;
29     nxt_port_queue_item_t  items[NXT_PORT_QUEUE_SIZE];
30 } nxt_port_queue_t;
31 
32 
33 nxt_inline void
nxt_port_queue_init(nxt_port_queue_t volatile * q)34 nxt_port_queue_init(nxt_port_queue_t volatile *q)
35 {
36     nxt_nncq_atomic_t  i;
37 
38     nxt_nncq_init(&q->free_items);
39     nxt_nncq_init(&q->queue);
40 
41     for (i = 0; i < NXT_PORT_QUEUE_SIZE; i++) {
42         nxt_nncq_enqueue(&q->free_items, i);
43     }
44 
45     q->nitems = 0;
46 }
47 
48 
49 nxt_inline nxt_int_t
nxt_port_queue_send(nxt_port_queue_t volatile * q,const void * p,uint8_t size,int * notify)50 nxt_port_queue_send(nxt_port_queue_t volatile *q, const void *p, uint8_t size,
51     int *notify)
52 {
53     nxt_nncq_atomic_t      i;
54     nxt_port_queue_item_t  *qi;
55 
56     i = nxt_nncq_dequeue(&q->free_items);
57     if (i == nxt_nncq_empty(&q->free_items)) {
58         *notify = 0;
59         return NXT_AGAIN;
60     }
61 
62     qi = (nxt_port_queue_item_t *) &q->items[i];
63 
64     qi->size = size;
65     nxt_memcpy(qi->data, p, size);
66 
67     nxt_nncq_enqueue(&q->queue, i);
68 
69     i = nxt_atomic_fetch_add(&q->nitems, 1);
70 
71     *notify = (i == 0);
72 
73     return NXT_OK;
74 }
75 
76 
77 nxt_inline ssize_t
nxt_port_queue_recv(nxt_port_queue_t volatile * q,void * p)78 nxt_port_queue_recv(nxt_port_queue_t volatile *q, void *p)
79 {
80     ssize_t                res;
81     nxt_nncq_atomic_t      i;
82     nxt_port_queue_item_t  *qi;
83 
84     i = nxt_nncq_dequeue(&q->queue);
85     if (i == nxt_nncq_empty(&q->queue)) {
86         return -1;
87     }
88 
89     qi = (nxt_port_queue_item_t *) &q->items[i];
90 
91     res = qi->size;
92     nxt_memcpy(p, qi->data, qi->size);
93 
94     nxt_nncq_enqueue(&q->free_items, i);
95 
96     nxt_atomic_fetch_add(&q->nitems, -1);
97 
98     return res;
99 }
100 
101 
102 #endif /* _NXT_PORT_QUEUE_H_INCLUDED_ */
103