The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
atomic_queue.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 12ceb6293da8f179a27ae753cafd6712dacd7f38 $
19 *
20 * @brief Thread-safe queues.
21 * @file io/atomic_queue.c
22 *
23 * This is an implementation of a bounded MPMC ring buffer with per-slot
24 * sequence numbers, described by Dmitry Vyukov.
25 *
26 * @copyright 2016 Alan DeKok (aland@freeradius.org)
27 * @copyright 2016 Alister Winfield
28 */
29
30RCSID("$Id: 12ceb6293da8f179a27ae753cafd6712dacd7f38 $")
31
32#include <stdint.h>
33#include <stdalign.h>
34#include <inttypes.h>
35
36#include <freeradius-devel/autoconf.h>
37#include <freeradius-devel/io/atomic_queue.h>
38#include <freeradius-devel/util/math.h>
39
40/*
41 * Some macros to make our life easier.
42 */
43#define atomic_int64_t _Atomic(int64_t)
44#define atomic_uint32_t _Atomic(uint32_t)
45#define atomic_uint64_t _Atomic(uint64_t)
46
47#define cas_incr(_store, _var) atomic_compare_exchange_strong_explicit(&_store, &_var, _var + 1, memory_order_release, memory_order_relaxed)
48#define cas_decr(_store, _var) atomic_compare_exchange_strong_explicit(&_store, &_var, _var - 1, memory_order_release, memory_order_relaxed)
49#define load(_var) atomic_load_explicit(&_var, memory_order_relaxed)
50#define acquire(_var) atomic_load_explicit(&_var, memory_order_acquire)
51#define store(_store, _var) atomic_store_explicit(&_store, _var, memory_order_release)
52
53#define CACHE_LINE_SIZE 64
54
55/** Entry in the queue
56 *
57 * @note This structure is cache line aligned for modern AMD/Intel CPUs.
58 * This is to avoid contention when the producer and consumer are executing
59 * on different CPU cores.
60 */
61typedef struct CC_HINT(packed, aligned(CACHE_LINE_SIZE)) {
62 atomic_int64_t seq; //!< Must be seq then data to ensure
63 ///< seq is 64bit aligned for 32bit address
64 ///< spaces.
65 void *data;
67
68/** Structure to hold the atomic queue
69 *
70 * @note DO NOT redorder these fields without understanding how alignas works
71 * and maintaining separation. The head and tail must be in different cache lines
72 * to reduce contention between producers and consumers. Cold data (size, chunk)
73 * can share a line, but must be separated from head and tail and entry.
74 */
76 alignas(CACHE_LINE_SIZE) atomic_int64_t head; //!< Position of the producer.
77 ///< Cache aligned bytes to ensure it's in a
78 ///< different cache line to tail to reduce
79 ///< memory contention.
80
81 alignas(CACHE_LINE_SIZE) atomic_int64_t tail; //!< Position of the consumer.
82 ///< Cache aligned bytes to ensure it's in a
83 ///< different cache line to tail to reduce
84 ///< memory contention.
85 ///< Reads may still need to occur from size
86 ///< whilst the producer is writing to tail.
87
88 alignas(CACHE_LINE_SIZE) size_t size; //!< The length of the queue. This is static.
89 ///< Also needs to be cache aligned, otherwise
90 ///< it can end up directly after tail in memory
91 ///< and share a cache line.
92
93 void *chunk; //!< The start of the talloc chunk to pass to free.
94 ///< We need to play tricks to get aligned memory
95 ///< with talloc.
96
97 alignas(CACHE_LINE_SIZE) fr_atomic_queue_entry_t entry[]; //!< The entry array, also aligned
98 ///< to ensure it's not in the same cache
99 ///< line as tail and size.
100};
101
102/** Create fixed-size atomic queue
103 *
104 * @note the queue must be freed explicitly by the ctx being freed, or by using
105 * the #fr_atomic_queue_free function.
106 *
107 * @param[in] ctx The talloc ctx to allocate the queue in.
108 * @param[in] size The number of entries in the queue.
109 * @return
110 * - NULL on error.
111 * - fr_atomic_queue_t *, a pointer to the allocated and initialized queue.
112 */
113fr_atomic_queue_t *fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
114{
115 size_t i;
116 int64_t seq;
118 TALLOC_CTX *chunk;
119
120 if (size == 0) return NULL;
121
122 /*
123 * Roundup to the next power of 2 so we don't need modulo.
124 */
125 size = (size_t)fr_roundup_pow2_uint64((uint64_t)size);
126
127 /*
128 * Allocate a contiguous blob for the header and queue.
129 * This helps with memory locality.
130 *
131 * Since we're allocating a blob, we should also set the
132 * name of the data, too.
133 */
134 chunk = talloc_aligned_array(ctx, (void **)&aq, CACHE_LINE_SIZE,
135 sizeof(*aq) + (size) * sizeof(aq->entry[0]));
136 if (!chunk) return NULL;
137 aq->chunk = chunk;
138
139 talloc_set_name_const(chunk, "fr_atomic_queue_t");
140
141 /*
142 * Initialize the array. Data is NULL, and indexes are
143 * the array entry number.
144 */
145 for (i = 0; i < size; i++) {
146 seq = i;
147
148 aq->entry[i].data = NULL;
149 store(aq->entry[i].seq, seq);
150 }
151
152 aq->size = size;
153
154 /*
155 * Set the head / tail indexes, and force other cores to
156 * see the writes.
157 */
158 store(aq->head, 0);
159 store(aq->tail, 0);
161
162 return aq;
163}
164
165/** Free an atomic queue if it's not freed by ctx
166 *
167 * This function is needed because the atomic queue memory
168 * must be cache line aligned.
169 */
171{
172 if (!*aq) return;
173
174 talloc_free((*aq)->chunk);
175 *aq = NULL;
176}
177
178/** Push a pointer into the atomic queue
179 *
180 * @param[in] aq The atomic queue to add data to.
181 * @param[in] data to push.
182 * @return
183 * - true on successful push
184 * - false on queue full
185 */
187{
188 int64_t head;
190
191 if (!data) return false;
192
193 /*
194 * Here we're essentially racing with other producers
195 * to find the current head of the queue.
196 *
197 * 1. Load the current head (which may be incremented
198 * by another producer before we enter the loop).
199 * 2. Find the head entry, which is head modulo the
200 * queue size (keeps head looping through the queue).
201 * 3. Read the sequence number of the entry.
202 * The sequence numbers are initialised to the index
203 * of the entries in the queue. Each pass of the
204 * producer increments the sequence number by one.
205 * 4.
206 * a. If the sequence number is equal to the head,
207 * then we can use the entry. Increment the head
208 * so other producers know we've used it.
209 * b. If it's greater than head, the producer has
210 * already written to this entry, so we need to re-load
211 * the head and race other producers again.
212 * c. If it's less than the head, the entry has not yet
213 * been consumed, and the queue is full.
214 */
215 head = load(aq->head);
216
217 /*
218 * Try to find the current head.
219 */
220 for (;;) {
221 int64_t seq, diff;
222
223 /*
224 * Alloc function guarantees size is a power
225 * of 2, so we can use this hack to avoid
226 * modulo.
227 */
228 entry = &aq->entry[head & (aq->size - 1)];
229 seq = acquire(entry->seq);
230 diff = (seq - head);
231
232 /*
233 * head is larger than the current entry, the
234 * queue is full.
235 * The consumer will set entry seq to entry +
236 * queue size, marking it as free for the
237 * producer to use.
238 */
239 if (diff < 0) {
240#if 0
241 fr_atomic_queue_debug(stderr, aq);
242#endif
243 return false;
244 }
245
246 /*
247 * Someone else has already written to this entry
248 * we lost the race, try again.
249 */
250 if (diff > 0) {
251 head = load(aq->head);
252 continue;
253 }
254
255 /*
256 * See if we can increment the head value
257 * (and check it's still at its old value).
258 *
259 * This means no two producers can have the same
260 * entry in the queue, because they can't exit
261 * the loop until they've incremented the head
262 * successfully.
263 *
264 * When we fail, we don't increment head before
265 * trying again, because we need to detect queue
266 * full conditions.
267 */
268 if (cas_incr(aq->head, head)) {
269 break;
270 }
271 }
272
273 /*
274 * Store the data in the queue, and increment the entry
275 * with the new index, and make the write visible to
276 * other CPUs.
277 */
278 entry->data = data;
279
280 /*
281 * Technically head can overflow. Practically, with a
282 * 3GHz CPU, doing nothing but incrementing head
283 * uncontended it'd take about 100 years for this to
284 * happen. But hey, maybe someone invents an optical
285 * CPU with a significantly higher clock speed, it's ok
286 * for us to exit every 9 quintillion packets.
287 */
288#ifdef __clang_analyzer__
289 if (unlikely((head + 1) == INT64_MAX)) exit(1);
290#endif
291
292 /*
293 * Mark up the entry as written to. Any other producer
294 * attempting to write will see (diff > 0) and retry.
295 */
296 store(entry->seq, head + 1);
297 return true;
298}
299
300
301/** Pop a pointer from the atomic queue
302 *
303 * @param[in] aq the atomic queue to retrieve data from.
304 * @param[out] p_data where to write the data.
305 * @return
306 * - true on successful pop
307 * - false on queue empty
308 */
310{
311 int64_t tail, seq;
313
314 if (!p_data) return false;
315
316 tail = load(aq->tail);
317
318 for (;;) {
319 int64_t diff;
320
321 entry = &aq->entry[tail & (aq->size - 1)];
322 seq = acquire(entry->seq);
323
324 diff = (seq - (tail + 1));
325
326 /*
327 * Tail is smaller than the current entry,
328 * the queue is empty.
329 *
330 * Tail should now be equal to the head.
331 */
332 if (diff < 0) {
333 return false;
334 }
335
336 /*
337 * Tail is now ahead of us.
338 * Something else has consumed it.
339 * We lost the race with another consumer.
340 */
341 if (diff > 0) {
342 tail = load(aq->tail);
343 continue;
344 }
345
346 /*
347 * Same deal as push.
348 * After this point we own the entry.
349 */
350 if (cas_incr(aq->tail, tail)) {
351 break;
352 }
353 }
354
355 /*
356 * Copy the pointer to the caller BEFORE updating the
357 * queue entry.
358 */
359 *p_data = entry->data;
360
361 /*
362 * Set the current entry to past the end of the queue.
363 * This is equal to what head will be on its next pass
364 * through the queue. This marks the entry as free.
365 */
366 store(entry->seq, tail + aq->size);
367
368 return true;
369}
370
372{
373 return aq->size;
374}
375
376#ifdef WITH_VERIFY_PTR
377/** Check the talloc chunk is still valid
378 *
379 */
380void fr_atomic_queue_verify(fr_atomic_queue_t *aq)
381{
382 (void)talloc_get_type_abort(aq->chunk, fr_atomic_queue_t);
383}
384#endif
385
386#ifndef NDEBUG
387
388#if 0
389typedef struct {
390 int status; //!< status of this message
391 size_t data_size; //!< size of the data we're sending
392
393 int signal; //!< the signal to send
394 uint64_t ack; //!< or the endpoint..
395 void *ch; //!< the channel
397#endif
398
399
400/** Dump an atomic queue.
401 *
402 * Absolutely NOT thread-safe.
403 *
404 * @param[in] aq The atomic queue to debug.
405 * @param[in] fp where the debugging information will be printed.
406 */
408{
409 size_t i;
410 int64_t head, tail;
411
412 head = load(aq->head);
413 tail = load(aq->tail);
414
415 fprintf(fp, "AQ %p size %zu, head %" PRId64 ", tail %" PRId64 "\n",
416 aq, aq->size, head, tail);
417
418 for (i = 0; i < aq->size; i++) {
420
421 entry = &aq->entry[i];
422
423 fprintf(fp, "\t[%zu] = { %p, %" PRId64 " }",
424 i, entry->data, load(entry->seq));
425#if 0
426 if (entry->data) {
428
429 c = entry->data;
430
431 fprintf(fp, "\tstatus %d, data_size %zd, signal %d, ack %zd, ch %p",
432 c->status, c->data_size, c->signal, c->ack, c->ch);
433 }
434#endif
435 fprintf(fp, "\n");
436 }
437}
438#endif
atomic_int64_t head
Position of the producer.
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
#define atomic_int64_t
void fr_atomic_queue_debug(FILE *fp, fr_atomic_queue_t *aq)
Dump an atomic queue.
void * chunk
The start of the talloc chunk to pass to free.
atomic_int64_t tail
Position of the consumer.
#define cas_incr(_store, _var)
fr_atomic_queue_entry_t entry[]
The entry array, also aligned to ensure it's not in the same cache line as tail and size.
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
size_t size
The length of the queue.
size_t fr_atomic_queue_size(fr_atomic_queue_t *aq)
#define load(_var)
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
void fr_atomic_queue_free(fr_atomic_queue_t **aq)
Free an atomic queue if it's not freed by ctx.
#define store(_store, _var)
#define acquire(_var)
atomic_int64_t seq
Must be seq then data to ensure seq is 64bit aligned for 32bit address spaces.
#define CACHE_LINE_SIZE
Entry in the queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:506
#define unlikely(_x)
Definition build.h:402
static fr_atomic_queue_t ** aq
talloc_free(hp)
fr_control_message_status_t status
status of this message
Definition control.c:60
size_t data_size
size of the data we're sending
Definition control.c:62
The header for the control message.
Definition control.c:59
static uint64_t fr_roundup_pow2_uint64(uint64_t v)
Definition math.h:32
unsigned long int size_t
@ memory_order_seq_cst
Definition stdatomic.h:132
#define atomic_thread_fence(order)
Definition stdatomic.h:148
TALLOC_CTX * talloc_aligned_array(TALLOC_CTX *ctx, void **start, size_t alignment, size_t size)
Return a page aligned talloc memory array.
Definition talloc.c:196
static fr_slen_t head
Definition xlat.h:420
static fr_slen_t data
Definition value.h:1340