The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
atomic_queue.c
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or
5  * (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: 4c9b7eac8aec45848041939320b3ab29e966ff9f $
19  *
20  * @brief Thread-safe queues.
21  * @file io/atomic_queue.c
22  *
23  * @copyright 2016 Alan DeKok (aland@freeradius.org)
24  * @copyright 2016 Alister Winfield
25  */
26 RCSID("$Id: 4c9b7eac8aec45848041939320b3ab29e966ff9f $")
27 
28 #include <stdint.h>
29 #include <stdalign.h>
30 #include <inttypes.h>
31 
32 #include <freeradius-devel/autoconf.h>
33 #include <freeradius-devel/io/atomic_queue.h>
34 #include <freeradius-devel/util/talloc.h>
35 
36 #define CACHE_LINE_SIZE 64
37 
38 /** Entry in the queue
39  *
40  * @note This structure is cache line aligned for modern AMD/Intel CPUs.
41  * This is to avoid contention when the producer and consumer are executing
42  * on different CPU cores.
43  */
44 typedef struct CC_HINT(packed, aligned(CACHE_LINE_SIZE)) {
45  atomic_int64_t seq; //!< Must be seq then data to ensure
46  ///< seq is 64bit aligned for 32bit address
47  ///< spaces.
48  void *data;
50 
51 /** Structure to hold the atomic queue
52  *
53  */
55  alignas(CACHE_LINE_SIZE) atomic_int64_t head; //!< Head, aligned bytes to ensure
56  ///< it's in a different cache line to tail
57  ///< to reduce memory contention.
59 
60  size_t size;
61 
62  void *chunk; //!< To pass to free. The non-aligned address.
63 
64  alignas(CACHE_LINE_SIZE) fr_atomic_queue_entry_t entry[]; //!< The entry array, also aligned
65  ///< to ensure it's not in the same cache
66  ///< line as tail and size.
67 };
68 
69 /** Create fixed-size atomic queue
70  *
71  * @note the queue must be freed explicitly by the ctx being freed, or by using
72  * the #fr_atomic_queue_free function.
73  *
74  * @param[in] ctx The talloc ctx to allocate the queue in.
75  * @param[in] size The number of entries in the queue.
76  * @return
77  * - NULL on error.
78  * - fr_atomic_queue_t *, a pointer to the allocated and initialized queue.
79  */
80 fr_atomic_queue_t *fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
81 {
82  size_t i;
83  int64_t seq;
85  TALLOC_CTX *chunk;
86 
87  if (size == 0) return NULL;
88 
89  /*
90  * Allocate a contiguous blob for the header and queue.
91  * This helps with memory locality.
92  *
93  * Since we're allocating a blob, we should also set the
94  * name of the data, too.
95  */
96  chunk = talloc_aligned_array(ctx, (void **)&aq, CACHE_LINE_SIZE,
97  sizeof(*aq) + (size) * sizeof(aq->entry[0]));
98  if (!chunk) return NULL;
99  aq->chunk = chunk;
100 
101  talloc_set_name_const(chunk, "fr_atomic_queue_t");
102 
103  /*
104  * Initialize the array. Data is NULL, and indexes are
105  * the array entry number.
106  */
107  for (i = 0; i < size; i++) {
108  seq = i;
109 
110  aq->entry[i].data = NULL;
111  store(aq->entry[i].seq, seq);
112  }
113 
114  aq->size = size;
115 
116  /*
117  * Set the head / tail indexes, and force other CPUs to
118  * see the writes.
119  */
120  store(aq->head, 0);
121  store(aq->tail, 0);
123 
124  return aq;
125 }
126 
127 /** Free an atomic queue if it's not freed by ctx
128  *
129  * This function is needed because the atomic queue memory
130  * must be cache line aligned.
131  */
133 {
134  if (!*aq) return;
135 
136  talloc_free((*aq)->chunk);
137  *aq = NULL;
138 }
139 
140 /** Push a pointer into the atomic queue
141  *
142  * @param[in] aq The atomic queue to add data to.
143  * @param[in] data to push.
144  * @return
145  * - true on successful push
146  * - false on queue full
147  */
149 {
150  int64_t head;
152 
153  if (!data) return false;
154 
155  head = load(aq->head);
156 
157  /*
158  * Try to find the current head.
159  */
160  for (;;) {
161  int64_t seq, diff;
162 
163  entry = &aq->entry[ head % aq->size ];
164  seq = aquire(entry->seq);
165  diff = (seq - head);
166 
167  /*
168  * head is larger than the current entry, the queue is full.
169  */
170  if (diff < 0) {
171 #if 0
172  fr_atomic_queue_debug(aq, stderr);
173 #endif
174  return false;
175  }
176 
177  /*
178  * Someone else has already written to this entry. Get the new head pointer, and continue.
179  */
180  if (diff > 0) {
181  head = load(aq->head);
182  continue;
183  }
184 
185  /*
186  * We have the possibility that we can write to
187  * this entry. Try it. If the write succeeds,
188  * we're done. If the write fails, re-load the
189  * current head entry, and continue.
190  */
191  if (cas_incr(aq->head, head)) {
192  break;
193  }
194  }
195 
196  /*
197  * Store the data in the queue, and increment the entry
198  * with the new index, and make the write visible to
199  * other CPUs.
200  */
201  entry->data = data;
202  store(entry->seq, head + 1);
203  return true;
204 }
205 
206 
207 /** Pop a pointer from the atomic queue
208  *
209  * @param[in] aq the atomic queue to retrieve data from.
210  * @param[out] p_data where to write the data.
211  * @return
212  * - true on successful pop
213  * - false on queue empty
214  */
216 {
217  int64_t tail, seq;
219 
220  if (!p_data) return false;
221 
222  tail = load(aq->tail);
223 
224  for (;;) {
225  int64_t diff;
226 
227  entry = &aq->entry[ tail % aq->size ];
228  seq = aquire(entry->seq);
229 
230  diff = (seq - (tail + 1));
231 
232  /*
233  * tail is smaller than the current entry, the queue is full.
234  */
235  if (diff < 0) {
236  return false;
237  }
238 
239  if (diff > 0) {
240  tail = load(aq->tail);
241  continue;
242  }
243 
244  if (cas_incr(aq->tail, tail)) {
245  break;
246  }
247  }
248 
249  /*
250  * Copy the pointer to the caller BEFORE updating the
251  * queue entry.
252  */
253  *p_data = entry->data;
254 
255  /*
256  * Set the current entry to past the end of the queue.
257  * i.e. it's unused.
258  */
259  seq = tail + aq->size;
260  store(entry->seq, seq);
261 
262  return true;
263 }
264 
266 {
267  return aq->size;
268 }
269 
270 #ifdef WITH_VERIFY_PTR
271 /** Check the talloc chunk is still valid
272  *
273  */
274 void fr_atomic_queue_verify(fr_atomic_queue_t *aq)
275 {
276  (void)talloc_get_type_abort(aq->chunk, fr_atomic_queue_t);
277 }
278 #endif
279 
280 #ifndef NDEBUG
281 
282 #if 0
283 typedef struct {
284  int status; //!< status of this message
285  size_t data_size; //!< size of the data we're sending
286 
287  int signal; //!< the signal to send
288  uint64_t ack; //!< or the endpoint..
289  void *ch; //!< the channel
291 #endif
292 
293 
294 /** Dump an atomic queue.
295  *
296  * Absolutely NOT thread-safe.
297  *
298  * @param[in] aq The atomic queue to debug.
299  * @param[in] fp where the debugging information will be printed.
300  */
302 {
303  size_t i;
304  int64_t head, tail;
305 
306  head = load(aq->head);
307  tail = load(aq->head);
308 
309  fprintf(fp, "AQ %p size %zu, head %" PRId64 ", tail %" PRId64 "\n",
310  aq, aq->size, head, tail);
311 
312  for (i = 0; i < aq->size; i++) {
314 
315  entry = &aq->entry[i];
316 
317  fprintf(fp, "\t[%zu] = { %p, %" PRId64 " }",
318  i, entry->data, load(entry->seq));
319 #if 0
320  if (entry->data) {
322 
323  c = entry->data;
324 
325  fprintf(fp, "\tstatus %d, data_size %zd, signal %d, ack %zd, ch %p",
326  c->status, c->data_size, c->signal, c->ack, c->ch);
327  }
328 #endif
329  fprintf(fp, "\n");
330  }
331 }
332 #endif
atomic_int64_t head
Head, aligned bytes to ensure it's in a different cache line to tail to reduce memory contention.
Definition: atomic_queue.c:55
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
Definition: atomic_queue.c:215
void * chunk
To pass to free. The non-aligned address.
Definition: atomic_queue.c:62
atomic_int64_t tail
Definition: atomic_queue.c:58
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Definition: atomic_queue.c:80
fr_atomic_queue_entry_t entry[]
The entry array, also aligned to ensure it's not in the same cache line as tail and size.
Definition: atomic_queue.c:64
size_t fr_atomic_queue_size(fr_atomic_queue_t *aq)
Definition: atomic_queue.c:265
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
Definition: atomic_queue.c:148
void fr_atomic_queue_free(fr_atomic_queue_t **aq)
Free an atomic queue if it's not freed by ctx.
Definition: atomic_queue.c:132
void fr_atomic_queue_debug(fr_atomic_queue_t *aq, FILE *fp)
Dump an atomic queue.
Definition: atomic_queue.c:301
atomic_int64_t seq
Must be seq then data to ensure seq is 64bit aligned for 32bit address spaces.
Definition: atomic_queue.c:45
#define CACHE_LINE_SIZE
Definition: atomic_queue.c:36
Entry in the queue.
Definition: atomic_queue.c:44
Structure to hold the atomic queue.
Definition: atomic_queue.c:54
#define atomic_int64_t
Definition: atomic_queue.h:40
#define cas_incr(_store, _var)
Definition: atomic_queue.h:44
#define aquire(_var)
Definition: atomic_queue.h:47
#define load(_var)
Definition: atomic_queue.h:46
#define store(_store, _var)
Definition: atomic_queue.h:48
#define RCSID(id)
Definition: build.h:481
static fr_atomic_queue_t * aq
Definition: control_test.c:47
fr_control_message_status_t status
status of this message
Definition: control.c:63
size_t data_size
size of the data we're sending
Definition: control.c:65
The header for the control message.
Definition: control.c:62
talloc_free(reap)
@ memory_order_seq_cst
Definition: stdatomic.h:132
#define atomic_thread_fence(order)
Definition: stdatomic.h:148
TALLOC_CTX * talloc_aligned_array(TALLOC_CTX *ctx, void **start, size_t alignment, size_t size)
Return a page aligned talloc memory array.
Definition: talloc.c:200
static fr_slen_t head
Definition: xlat.h:406
static fr_slen_t data
Definition: value.h:1265