The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
atomic_queue.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 7a4d7837f69247a69624f8b33786b9b1587175c1 $
19 *
20 * @brief Thread-safe queues.
21 * @file io/atomic_queue.c
22 *
23 * This is an implementation of a bounded MPMC ring buffer with per-slot
24 * sequence numbers, described by Dmitry Vyukov.
25 *
26 * @copyright 2016 Alan DeKok (aland@freeradius.org)
27 * @copyright 2016 Alister Winfield
28 */
29
30RCSID("$Id: 7a4d7837f69247a69624f8b33786b9b1587175c1 $")
31
32#include <stdint.h>
33#include <stdalign.h>
34#include <inttypes.h>
35
36#include <freeradius-devel/autoconf.h>
37#include <freeradius-devel/io/atomic_queue.h>
38#include <freeradius-devel/util/talloc.h>
39#include <freeradius-devel/util/math.h>
40
41/*
42 * Some macros to make our life easier.
43 */
44#define atomic_int64_t _Atomic(int64_t)
45#define atomic_uint32_t _Atomic(uint32_t)
46#define atomic_uint64_t _Atomic(uint64_t)
47
48#define cas_incr(_store, _var) atomic_compare_exchange_strong_explicit(&_store, &_var, _var + 1, memory_order_release, memory_order_relaxed)
49#define cas_decr(_store, _var) atomic_compare_exchange_strong_explicit(&_store, &_var, _var - 1, memory_order_release, memory_order_relaxed)
50#define load(_var) atomic_load_explicit(&_var, memory_order_relaxed)
51#define acquire(_var) atomic_load_explicit(&_var, memory_order_acquire)
52#define store(_store, _var) atomic_store_explicit(&_store, _var, memory_order_release)
53
54#define CACHE_LINE_SIZE 64
55
56/** Entry in the queue
57 *
58 * @note This structure is cache line aligned for modern AMD/Intel CPUs.
59 * This is to avoid contention when the producer and consumer are executing
60 * on different CPU cores.
61 */
62typedef struct CC_HINT(packed, aligned(CACHE_LINE_SIZE)) {
63 atomic_int64_t seq; //!< Must be seq then data to ensure
64 ///< seq is 64bit aligned for 32bit address
65 ///< spaces.
66 void *data;
68
69/** Structure to hold the atomic queue
70 *
71 * @note DO NOT redorder these fields without understanding how alignas works
72 * and maintaining separation. The head and tail must be in different cache lines
73 * to reduce contention between producers and consumers. Cold data (size, chunk)
74 * can share a line, but must be separated from head and tail and entry.
75 */
77 alignas(CACHE_LINE_SIZE) atomic_int64_t head; //!< Position of the producer.
78 ///< Cache aligned bytes to ensure it's in a
79 ///< different cache line to tail to reduce
80 ///< memory contention.
81
82 alignas(CACHE_LINE_SIZE) atomic_int64_t tail; //!< Position of the consumer.
83 ///< Cache aligned bytes to ensure it's in a
84 ///< different cache line to tail to reduce
85 ///< memory contention.
86 ///< Reads may still need to occur from size
87 ///< whilst the producer is writing to tail.
88
89 alignas(CACHE_LINE_SIZE) size_t size; //!< The length of the queue. This is static.
90 ///< Also needs to be cache aligned, otherwise
91 ///< it can end up directly after tail in memory
92 ///< and share a cache line.
93
94 void *chunk; //!< The start of the talloc chunk to pass to free.
95 ///< We need to play tricks to get aligned memory
96 ///< with talloc.
97
98 alignas(CACHE_LINE_SIZE) fr_atomic_queue_entry_t entry[]; //!< The entry array, also aligned
99 ///< to ensure it's not in the same cache
100 ///< line as tail and size.
101};
102
103/** Create fixed-size atomic queue
104 *
105 * @note the queue must be freed explicitly by the ctx being freed, or by using
106 * the #fr_atomic_queue_free function.
107 *
108 * @param[in] ctx The talloc ctx to allocate the queue in.
109 * @param[in] size The number of entries in the queue.
110 * @return
111 * - NULL on error.
112 * - fr_atomic_queue_t *, a pointer to the allocated and initialized queue.
113 */
114fr_atomic_queue_t *fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
115{
116 size_t i;
117 int64_t seq;
119 TALLOC_CTX *chunk;
120
121 if (size == 0) return NULL;
122
123 /*
124 * Roundup to the next power of 2 so we don't need modulo.
125 */
126 size = (size_t)fr_roundup_pow2_uint64((uint64_t)size);
127
128 /*
129 * Allocate a contiguous blob for the header and queue.
130 * This helps with memory locality.
131 *
132 * Since we're allocating a blob, we should also set the
133 * name of the data, too.
134 */
135 chunk = talloc_aligned_array(ctx, (void **)&aq, CACHE_LINE_SIZE,
136 sizeof(*aq) + (size) * sizeof(aq->entry[0]));
137 if (!chunk) return NULL;
138 aq->chunk = chunk;
139
140 talloc_set_name_const(chunk, "fr_atomic_queue_t");
141
142 /*
143 * Initialize the array. Data is NULL, and indexes are
144 * the array entry number.
145 */
146 for (i = 0; i < size; i++) {
147 seq = i;
148
149 aq->entry[i].data = NULL;
150 store(aq->entry[i].seq, seq);
151 }
152
153 aq->size = size;
154
155 /*
156 * Set the head / tail indexes, and force other cores to
157 * see the writes.
158 */
159 store(aq->head, 0);
160 store(aq->tail, 0);
162
163 return aq;
164}
165
166/** Free an atomic queue if it's not freed by ctx
167 *
168 * This function is needed because the atomic queue memory
169 * must be cache line aligned.
170 */
172{
173 if (!*aq) return;
174
175 talloc_free((*aq)->chunk);
176 *aq = NULL;
177}
178
179/** Push a pointer into the atomic queue
180 *
181 * @param[in] aq The atomic queue to add data to.
182 * @param[in] data to push.
183 * @return
184 * - true on successful push
185 * - false on queue full
186 */
188{
189 int64_t head;
191
192 if (!data) return false;
193
194 /*
195 * Here we're essentially racing with other producers
196 * to find the current head of the queue.
197 *
198 * 1. Load the current head (which may be incremented
199 * by another producer before we enter the loop).
200 * 2. Find the head entry, which is head modulo the
201 * queue size (keeps head looping through the queue).
202 * 3. Read the sequence number of the entry.
203 * The sequence numbers are initialised to the index
204 * of the entries in the queue. Each pass of the
205 * producer increments the sequence number by one.
206 * 4.
207 * a. If the sequence number is equal to the head,
208 * then we can use the entry. Increment the head
209 * so other producers know we've used it.
210 * b. If it's greater than head, the producer has
211 * already written to this entry, so we need to re-load
212 * the head and race other producers again.
213 * c. If it's less than the head, the entry has not yet
214 * been consumed, and the queue is full.
215 */
216 head = load(aq->head);
217
218 /*
219 * Try to find the current head.
220 */
221 for (;;) {
222 int64_t seq, diff;
223
224 /*
225 * Alloc function guarantees size is a power
226 * of 2, so we can use this hack to avoid
227 * modulo.
228 */
229 entry = &aq->entry[head & (aq->size - 1)];
230 seq = acquire(entry->seq);
231 diff = (seq - head);
232
233 /*
234 * head is larger than the current entry, the
235 * queue is full.
236 * The consumer will set entry seq to entry +
237 * queue size, marking it as free for the
238 * producer to use.
239 */
240 if (diff < 0) {
241#if 0
242 fr_atomic_queue_debug(stderr, aq);
243#endif
244 return false;
245 }
246
247 /*
248 * Someone else has already written to this entry
249 * we lost the race, try again.
250 */
251 if (diff > 0) {
252 head = load(aq->head);
253 continue;
254 }
255
256 /*
257 * See if we can increment the head value
258 * (and check it's still at its old value).
259 *
260 * This means no two producers can have the same
261 * entry in the queue, because they can't exit
262 * the loop until they've incremented the head
263 * successfully.
264 *
265 * When we fail, we don't increment head before
266 * trying again, because we need to detect queue
267 * full conditions.
268 */
269 if (cas_incr(aq->head, head)) {
270 break;
271 }
272 }
273
274 /*
275 * Store the data in the queue, and increment the entry
276 * with the new index, and make the write visible to
277 * other CPUs.
278 */
279 entry->data = data;
280
281 /*
282 * Technically head can overflow. Practically, with a
283 * 3GHz CPU, doing nothing but incrementing head
284 * uncontended it'd take about 100 years for this to
285 * happen. But hey, maybe someone invents an optical
286 * CPU with a significantly higher clock speed, it's ok
287 * for us to exit every 9 quintillion packets.
288 */
289#ifdef __clang_analyzer__
290 if (unlikely((head + 1) == INT64_MAX)) exit(1);
291#endif
292
293 /*
294 * Mark up the entry as written to. Any other producer
295 * attempting to write will see (diff > 0) and retry.
296 */
297 store(entry->seq, head + 1);
298 return true;
299}
300
301
302/** Pop a pointer from the atomic queue
303 *
304 * @param[in] aq the atomic queue to retrieve data from.
305 * @param[out] p_data where to write the data.
306 * @return
307 * - true on successful pop
308 * - false on queue empty
309 */
311{
312 int64_t tail, seq;
314
315 if (!p_data) return false;
316
317 tail = load(aq->tail);
318
319 for (;;) {
320 int64_t diff;
321
322 entry = &aq->entry[tail % aq->size];
323 seq = acquire(entry->seq);
324
325 diff = (seq - (tail + 1));
326
327 /*
328 * Tail is smaller than the current entry,
329 * the queue is empty.
330 *
331 * Tail should now be equal to the head.
332 */
333 if (diff < 0) {
334 return false;
335 }
336
337 /*
338 * Tail is now ahead of us.
339 * Something else has consumed it.
340 * We lost the race with another consumer.
341 */
342 if (diff > 0) {
343 tail = load(aq->tail);
344 continue;
345 }
346
347 /*
348 * Same deal as push.
349 * After this point we own the entry.
350 */
351 if (cas_incr(aq->tail, tail)) {
352 break;
353 }
354 }
355
356 /*
357 * Copy the pointer to the caller BEFORE updating the
358 * queue entry.
359 */
360 *p_data = entry->data;
361
362 /*
363 * Set the current entry to past the end of the queue.
364 * This is equal to what head will be on its next pass
365 * through the queue. This marks the entry as free.
366 */
367 store(entry->seq, tail + aq->size);
368
369 return true;
370}
371
373{
374 return aq->size;
375}
376
377#ifdef WITH_VERIFY_PTR
378/** Check the talloc chunk is still valid
379 *
380 */
381void fr_atomic_queue_verify(fr_atomic_queue_t *aq)
382{
383 (void)talloc_get_type_abort(aq->chunk, fr_atomic_queue_t);
384}
385#endif
386
387#ifndef NDEBUG
388
389#if 0
390typedef struct {
391 int status; //!< status of this message
392 size_t data_size; //!< size of the data we're sending
393
394 int signal; //!< the signal to send
395 uint64_t ack; //!< or the endpoint..
396 void *ch; //!< the channel
398#endif
399
400
401/** Dump an atomic queue.
402 *
403 * Absolutely NOT thread-safe.
404 *
405 * @param[in] aq The atomic queue to debug.
406 * @param[in] fp where the debugging information will be printed.
407 */
409{
410 size_t i;
411 int64_t head, tail;
412
413 head = load(aq->head);
414 tail = load(aq->head);
415
416 fprintf(fp, "AQ %p size %zu, head %" PRId64 ", tail %" PRId64 "\n",
417 aq, aq->size, head, tail);
418
419 for (i = 0; i < aq->size; i++) {
421
422 entry = &aq->entry[i];
423
424 fprintf(fp, "\t[%zu] = { %p, %" PRId64 " }",
425 i, entry->data, load(entry->seq));
426#if 0
427 if (entry->data) {
429
430 c = entry->data;
431
432 fprintf(fp, "\tstatus %d, data_size %zd, signal %d, ack %zd, ch %p",
433 c->status, c->data_size, c->signal, c->ack, c->ch);
434 }
435#endif
436 fprintf(fp, "\n");
437 }
438}
439#endif
atomic_int64_t head
Position of the producer.
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
#define atomic_int64_t
void fr_atomic_queue_debug(FILE *fp, fr_atomic_queue_t *aq)
Dump an atomic queue.
void * chunk
The start of the talloc chunk to pass to free.
atomic_int64_t tail
Position of the consumer.
#define cas_incr(_store, _var)
fr_atomic_queue_entry_t entry[]
The entry array, also aligned to ensure it's not in the same cache line as tail and size.
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
size_t size
The length of the queue.
size_t fr_atomic_queue_size(fr_atomic_queue_t *aq)
#define load(_var)
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
void fr_atomic_queue_free(fr_atomic_queue_t **aq)
Free an atomic queue if it's not freed by ctx.
#define store(_store, _var)
#define acquire(_var)
atomic_int64_t seq
Must be seq then data to ensure seq is 64bit aligned for 32bit address spaces.
#define CACHE_LINE_SIZE
Entry in the queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:485
#define unlikely(_x)
Definition build.h:383
static fr_atomic_queue_t ** aq
fr_control_message_status_t status
status of this message
Definition control.c:63
size_t data_size
size of the data we're sending
Definition control.c:65
The header for the control message.
Definition control.c:62
talloc_free(reap)
static uint64_t fr_roundup_pow2_uint64(uint64_t v)
Definition math.h:32
unsigned long int size_t
@ memory_order_seq_cst
Definition stdatomic.h:132
#define atomic_thread_fence(order)
Definition stdatomic.h:148
TALLOC_CTX * talloc_aligned_array(TALLOC_CTX *ctx, void **start, size_t alignment, size_t size)
Return a page aligned talloc memory array.
Definition talloc.c:196
static fr_slen_t head
Definition xlat.h:420
static fr_slen_t data
Definition value.h:1293