xref: /unit/src/nxt_app_nncq.h (revision 1555:1d84b9e4b459)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #ifndef _NXT_APP_NNCQ_H_INCLUDED_
7 #define _NXT_APP_NNCQ_H_INCLUDED_
8 
9 
10 /* Appilcation Numeric Naive Circular Queue */
11 
12 #define NXT_APP_NNCQ_SIZE  131072
13 
14 typedef uint32_t nxt_app_nncq_atomic_t;
15 typedef uint16_t nxt_app_nncq_cycle_t;
16 
17 typedef struct {
18     nxt_app_nncq_atomic_t  head;
19     nxt_app_nncq_atomic_t  entries[NXT_APP_NNCQ_SIZE];
20     nxt_app_nncq_atomic_t  tail;
21 } nxt_app_nncq_t;
22 
23 
24 static inline nxt_app_nncq_atomic_t
nxt_app_nncq_head(nxt_app_nncq_t const volatile * q)25 nxt_app_nncq_head(nxt_app_nncq_t const volatile *q)
26 {
27     return q->head;
28 }
29 
30 
31 static inline nxt_app_nncq_atomic_t
nxt_app_nncq_tail(nxt_app_nncq_t const volatile * q)32 nxt_app_nncq_tail(nxt_app_nncq_t const volatile *q)
33 {
34     return q->tail;
35 }
36 
37 
38 static inline void
nxt_app_nncq_tail_cmp_inc(nxt_app_nncq_t volatile * q,nxt_app_nncq_atomic_t t)39 nxt_app_nncq_tail_cmp_inc(nxt_app_nncq_t volatile *q, nxt_app_nncq_atomic_t t)
40 {
41     nxt_atomic_cmp_set(&q->tail, t, t + 1);
42 }
43 
44 
45 static inline nxt_app_nncq_atomic_t
nxt_app_nncq_index(nxt_app_nncq_t const volatile * q,nxt_app_nncq_atomic_t i)46 nxt_app_nncq_index(nxt_app_nncq_t const volatile *q, nxt_app_nncq_atomic_t i)
47 {
48     return i % NXT_APP_NNCQ_SIZE;
49 }
50 
51 
52 static inline nxt_app_nncq_atomic_t
nxt_app_nncq_map(nxt_app_nncq_t const volatile * q,nxt_app_nncq_atomic_t i)53 nxt_app_nncq_map(nxt_app_nncq_t const volatile *q, nxt_app_nncq_atomic_t i)
54 {
55     return i % NXT_APP_NNCQ_SIZE;
56 }
57 
58 
59 static inline nxt_app_nncq_cycle_t
nxt_app_nncq_cycle(nxt_app_nncq_t const volatile * q,nxt_app_nncq_atomic_t i)60 nxt_app_nncq_cycle(nxt_app_nncq_t const volatile *q, nxt_app_nncq_atomic_t i)
61 {
62     return i / NXT_APP_NNCQ_SIZE;
63 }
64 
65 
66 static inline nxt_app_nncq_cycle_t
nxt_app_nncq_next_cycle(nxt_app_nncq_t const volatile * q,nxt_app_nncq_cycle_t i)67 nxt_app_nncq_next_cycle(nxt_app_nncq_t const volatile *q,
68     nxt_app_nncq_cycle_t i)
69 {
70     return i + 1;
71 }
72 
73 
74 static inline nxt_app_nncq_atomic_t
nxt_app_nncq_new_entry(nxt_app_nncq_t const volatile * q,nxt_app_nncq_cycle_t cycle,nxt_app_nncq_atomic_t i)75 nxt_app_nncq_new_entry(nxt_app_nncq_t const volatile *q,
76     nxt_app_nncq_cycle_t cycle,
77     nxt_app_nncq_atomic_t i)
78 {
79     return cycle * NXT_APP_NNCQ_SIZE + (i % NXT_APP_NNCQ_SIZE);
80 }
81 
82 
83 static inline nxt_app_nncq_atomic_t
nxt_app_nncq_empty(nxt_app_nncq_t const volatile * q)84 nxt_app_nncq_empty(nxt_app_nncq_t const volatile *q)
85 {
86     return NXT_APP_NNCQ_SIZE;
87 }
88 
89 
90 static void
nxt_app_nncq_init(nxt_app_nncq_t volatile * q)91 nxt_app_nncq_init(nxt_app_nncq_t volatile *q)
92 {
93     q->head = NXT_APP_NNCQ_SIZE;
94     nxt_memzero((void *) q->entries,
95                 NXT_APP_NNCQ_SIZE * sizeof(nxt_app_nncq_atomic_t));
96     q->tail = NXT_APP_NNCQ_SIZE;
97 }
98 
99 
100 static void
nxt_app_nncq_enqueue(nxt_app_nncq_t volatile * q,nxt_app_nncq_atomic_t val)101 nxt_app_nncq_enqueue(nxt_app_nncq_t volatile *q, nxt_app_nncq_atomic_t val)
102 {
103     nxt_app_nncq_cycle_t   e_cycle, t_cycle;
104     nxt_app_nncq_atomic_t  n, t, e, j;
105 
106     for ( ;; ) {
107         t = nxt_app_nncq_tail(q);
108         j = nxt_app_nncq_map(q, t);
109         e = q->entries[j];
110 
111         e_cycle = nxt_app_nncq_cycle(q, e);
112         t_cycle = nxt_app_nncq_cycle(q, t);
113 
114         if (e_cycle == t_cycle) {
115             nxt_app_nncq_tail_cmp_inc(q, t);
116             continue;
117         }
118 
119         if (nxt_app_nncq_next_cycle(q, e_cycle) != t_cycle) {
120             continue;
121         }
122 
123         n = nxt_app_nncq_new_entry(q, t_cycle, val);
124 
125         if (nxt_atomic_cmp_set(&q->entries[j], e, n)) {
126             break;
127         }
128     }
129 
130     nxt_app_nncq_tail_cmp_inc(q, t);
131 }
132 
133 
134 static nxt_app_nncq_atomic_t
nxt_app_nncq_dequeue(nxt_app_nncq_t volatile * q)135 nxt_app_nncq_dequeue(nxt_app_nncq_t volatile *q)
136 {
137     nxt_app_nncq_cycle_t   e_cycle, h_cycle;
138     nxt_app_nncq_atomic_t  h, j, e;
139 
140     for ( ;; ) {
141         h = nxt_app_nncq_head(q);
142         j = nxt_app_nncq_map(q, h);
143         e = q->entries[j];
144 
145         e_cycle = nxt_app_nncq_cycle(q, e);
146         h_cycle = nxt_app_nncq_cycle(q, h);
147 
148         if (e_cycle != h_cycle) {
149             if (nxt_app_nncq_next_cycle(q, e_cycle) == h_cycle) {
150                 return nxt_app_nncq_empty(q);
151             }
152 
153             continue;
154         }
155 
156         if (nxt_atomic_cmp_set(&q->head, h, h + 1)) {
157             break;
158         }
159     }
160 
161     return nxt_app_nncq_index(q, e);
162 }
163 
164 
165 #endif /* _NXT_APP_NNCQ_H_INCLUDED_ */
166