The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
atomic_queue.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 9558099c99507d3fc8bffd3127c04f8b5517a573 $
19 *
20 * @brief Thread-safe queues.
21 * @file io/atomic_queue.c
22 *
23 * This is an implementation of a bounded MPMC ring buffer with per-slot
24 * sequence numbers, described by Dmitry Vyukov.
25 *
26 * @copyright 2016 Alan DeKok (aland@freeradius.org)
27 * @copyright 2016 Alister Winfield
28 */
29
30RCSID("$Id: 9558099c99507d3fc8bffd3127c04f8b5517a573 $")
31
32#include <stdint.h>
33#include <stdalign.h>
34#include <inttypes.h>
35#include <stdlib.h>
36
37#include <freeradius-devel/autoconf.h>
38#include <freeradius-devel/io/atomic_queue.h>
39#include <freeradius-devel/util/math.h>
40
41/*
42 * Some macros to make our life easier.
43 */
44#define atomic_int64_t _Atomic(int64_t)
45#define atomic_uint32_t _Atomic(uint32_t)
46#define atomic_uint64_t _Atomic(uint64_t)
47
48#define cas_incr(_store, _var) atomic_compare_exchange_strong_explicit(&_store, &_var, _var + 1, memory_order_release, memory_order_relaxed)
49#define cas_decr(_store, _var) atomic_compare_exchange_strong_explicit(&_store, &_var, _var - 1, memory_order_release, memory_order_relaxed)
50#define load(_var) atomic_load_explicit(&_var, memory_order_relaxed)
51#define acquire(_var) atomic_load_explicit(&_var, memory_order_acquire)
52#define store(_store, _var) atomic_store_explicit(&_store, _var, memory_order_release)
53
54#define CACHE_LINE_SIZE 64
55
56/** Entry in the queue
57 *
58 * @note This structure is cache line aligned for modern AMD/Intel CPUs.
59 * This is to avoid contention when the producer and consumer are executing
60 * on different CPU cores.
61 */
62typedef struct CC_HINT(packed, aligned(CACHE_LINE_SIZE)) {
63 atomic_int64_t seq; //!< Must be seq then data to ensure
64 ///< seq is 64bit aligned for 32bit address
65 ///< spaces.
66 void *data;
68
69/** Structure to hold the atomic queue
70 *
71 * @note DO NOT redorder these fields without understanding how alignas works
72 * and maintaining separation. The head and tail must be in different cache lines
73 * to reduce contention between producers and consumers. Cold data (size, chunk)
74 * can share a line, but must be separated from head and tail and entry.
75 */
77 alignas(CACHE_LINE_SIZE) atomic_int64_t head; //!< Position of the producer.
78 ///< Cache aligned bytes to ensure it's in a
79 ///< different cache line to tail to reduce
80 ///< memory contention.
81
82 alignas(CACHE_LINE_SIZE) atomic_int64_t tail; //!< Position of the consumer.
83 ///< Cache aligned bytes to ensure it's in a
84 ///< different cache line to tail to reduce
85 ///< memory contention.
86 ///< Reads may still need to occur from size
87 ///< whilst the producer is writing to tail.
88
89 alignas(CACHE_LINE_SIZE) size_t size; //!< The length of the queue. This is static.
90 ///< Also needs to be cache aligned, otherwise
91 ///< it can end up directly after tail in memory
92 ///< and share a cache line.
93
94 void *chunk; //!< The start of the talloc chunk to pass to free,
95 ///< or NULL if this queue was allocated raw via
96 ///< #fr_atomic_queue_malloc. We need to play
97 ///< tricks to get aligned memory with talloc.
98
99 alignas(CACHE_LINE_SIZE) fr_atomic_queue_entry_t entry[]; //!< The entry array, also aligned
100 ///< to ensure it's not in the same cache
101 ///< line as tail and size.
102};
103
104/** Initialise the sequence numbers and head/tail on a fresh queue buffer
105 *
106 * Shared between the talloc and raw allocators. The buffer must already
107 * be cache-line aligned and sized to hold `size` entries.
108 *
109 * @param[in] aq The queue buffer to initialise.
110 * @param[in] size Entry count, already rounded up to a power of 2.
111 */
112static void atomic_queue_init(fr_atomic_queue_t *aq, size_t size)
113{
114 size_t i;
115
116 /*
117 * Initialize the array. Data is NULL, and indexes are
118 * the array entry number.
119 */
120 for (i = 0; i < size; i++) {
121 aq->entry[i].data = NULL;
122 store(aq->entry[i].seq, (int64_t)i);
123 }
124
125 aq->size = size;
126
127 store(aq->head, 0);
128 store(aq->tail, 0);
130}
131
132/** Create fixed-size atomic queue
133 *
134 * @note the queue must be freed explicitly by the ctx being freed, or by using
135 * the #fr_atomic_queue_free function.
136 *
137 * @param[in] ctx The talloc ctx to allocate the queue in.
138 * @param[in] size The number of entries in the queue.
139 * @return
140 * - NULL on error.
141 * - fr_atomic_queue_t *, a pointer to the allocated and initialized queue.
142 */
143fr_atomic_queue_t *fr_atomic_queue_talloc(TALLOC_CTX *ctx, size_t size)
144{
146 TALLOC_CTX *chunk;
147
148 if (size == 0) return NULL;
149
150 /*
151 * Roundup to the next power of 2 so we don't need modulo.
152 */
153 size = (size_t)fr_roundup_pow2_uint64((uint64_t)size);
154
155 /*
156 * Allocate a contiguous blob for the header and queue.
157 * This helps with memory locality.
158 *
159 * Since we're allocating a blob, we should also set the
160 * name of the data, too.
161 */
162 chunk = talloc_aligned_array(ctx, (void **)&aq, CACHE_LINE_SIZE,
163 sizeof(*aq) + (size) * sizeof(aq->entry[0]));
164 if (!chunk) return NULL;
165 aq->chunk = chunk;
166
167 talloc_set_name_const(chunk, "fr_atomic_queue_t");
168
169 atomic_queue_init(aq, size);
170
171 return aq;
172}
173
174/** Create fixed-size atomic queue outside any talloc hierarchy
175 *
176 * Backed by `posix_memalign`; the resulting queue is released with
177 * #fr_atomic_queue_free (which detects the raw allocation via a NULL
178 * `chunk` field) or plain `free()` on the queue pointer.
179 *
180 * Intended for callers that push or pop from threads where talloc is
181 * not safe (for example, library-owned callback threads).
182 *
183 * @param[in] size The number of entries in the queue.
184 * @return
185 * - NULL on error.
186 * - fr_atomic_queue_t *, a pointer to the allocated and initialized queue.
187 */
189{
191 size_t bytes;
192
193 if (size == 0) return NULL;
194
195 size = (size_t)fr_roundup_pow2_uint64((uint64_t)size);
196 bytes = sizeof(*aq) + (size) * sizeof(aq->entry[0]);
197
198 if (posix_memalign((void **)&aq, CACHE_LINE_SIZE, bytes) != 0) return NULL;
199
200 aq->chunk = NULL; /* sentinel: raw allocation, free with free() */
201 atomic_queue_init(aq, size);
202
203 return aq;
204}
205
206/** Free an atomic queue if it's not freed by ctx
207 *
208 * This function is needed because the atomic queue memory
209 * must be cache line aligned, and may live either in a talloc chunk
210 * or a raw `posix_memalign` allocation (`aq->chunk == NULL`).
211 */
213{
214 if (!*aq) return;
215
216 if ((*aq)->chunk) {
217 talloc_free((*aq)->chunk);
218 } else {
219 free(*aq);
220 }
221 *aq = NULL;
222}
223
224/** Push a pointer into the atomic queue
225 *
226 * @param[in] aq The atomic queue to add data to.
227 * @param[in] data to push.
228 * @return
229 * - true on successful push
230 * - false on queue full
231 */
233{
234 int64_t head;
236
237 if (!data) return false;
238
239 /*
240 * Here we're essentially racing with other producers
241 * to find the current head of the queue.
242 *
243 * 1. Load the current head (which may be incremented
244 * by another producer before we enter the loop).
245 * 2. Find the head entry, which is head modulo the
246 * queue size (keeps head looping through the queue).
247 * 3. Read the sequence number of the entry.
248 * The sequence numbers are initialised to the index
249 * of the entries in the queue. Each pass of the
250 * producer increments the sequence number by one.
251 * 4.
252 * a. If the sequence number is equal to the head,
253 * then we can use the entry. Increment the head
254 * so other producers know we've used it.
255 * b. If it's greater than head, the producer has
256 * already written to this entry, so we need to re-load
257 * the head and race other producers again.
258 * c. If it's less than the head, the entry has not yet
259 * been consumed, and the queue is full.
260 */
261 head = load(aq->head);
262
263 /*
264 * Try to find the current head.
265 */
266 for (;;) {
267 int64_t seq, diff;
268
269 /*
270 * Alloc function guarantees size is a power
271 * of 2, so we can use this hack to avoid
272 * modulo.
273 */
274 entry = &aq->entry[head & (aq->size - 1)];
275 seq = acquire(entry->seq);
276 diff = (seq - head);
277
278 /*
279 * head is larger than the current entry, the
280 * queue is full.
281 * The consumer will set entry seq to entry +
282 * queue size, marking it as free for the
283 * producer to use.
284 */
285 if (diff < 0) {
286#if 0
287 fr_atomic_queue_debug(stderr, aq);
288#endif
289 return false;
290 }
291
292 /*
293 * Someone else has already written to this entry
294 * we lost the race, try again.
295 */
296 if (diff > 0) {
297 head = load(aq->head);
298 continue;
299 }
300
301 /*
302 * See if we can increment the head value
303 * (and check it's still at its old value).
304 *
305 * This means no two producers can have the same
306 * entry in the queue, because they can't exit
307 * the loop until they've incremented the head
308 * successfully.
309 *
310 * When we fail, we don't increment head before
311 * trying again, because we need to detect queue
312 * full conditions.
313 */
314 if (cas_incr(aq->head, head)) {
315 break;
316 }
317 }
318
319 /*
320 * Store the data in the queue, and increment the entry
321 * with the new index, and make the write visible to
322 * other CPUs.
323 */
324 entry->data = data;
325
326 /*
327 * Technically head can overflow. Practically, with a
328 * 3GHz CPU, doing nothing but incrementing head
329 * uncontended it'd take about 100 years for this to
330 * happen. But hey, maybe someone invents an optical
331 * CPU with a significantly higher clock speed, it's ok
332 * for us to exit every 9 quintillion packets.
333 */
334#ifdef __clang_analyzer__
335 if (unlikely((head + 1) == INT64_MAX)) exit(1);
336#endif
337
338 /*
339 * Mark up the entry as written to. Any other producer
340 * attempting to write will see (diff > 0) and retry.
341 */
342 store(entry->seq, head + 1);
343 return true;
344}
345
346
347/** Pop a pointer from the atomic queue
348 *
349 * @param[in] aq the atomic queue to retrieve data from.
350 * @param[out] p_data where to write the data.
351 * @return
352 * - true on successful pop
353 * - false on queue empty
354 */
356{
357 int64_t tail, seq;
359
360 if (!p_data) return false;
361
362 tail = load(aq->tail);
363
364 for (;;) {
365 int64_t diff;
366
367 entry = &aq->entry[tail & (aq->size - 1)];
368 seq = acquire(entry->seq);
369
370 diff = (seq - (tail + 1));
371
372 /*
373 * Tail is smaller than the current entry,
374 * the queue is empty.
375 *
376 * Tail should now be equal to the head.
377 */
378 if (diff < 0) {
379 return false;
380 }
381
382 /*
383 * Tail is now ahead of us.
384 * Something else has consumed it.
385 * We lost the race with another consumer.
386 */
387 if (diff > 0) {
388 tail = load(aq->tail);
389 continue;
390 }
391
392 /*
393 * Same deal as push.
394 * After this point we own the entry.
395 */
396 if (cas_incr(aq->tail, tail)) {
397 break;
398 }
399 }
400
401 /*
402 * Copy the pointer to the caller BEFORE updating the
403 * queue entry.
404 */
405 *p_data = entry->data;
406
407 /*
408 * Set the current entry to past the end of the queue.
409 * This is equal to what head will be on its next pass
410 * through the queue. This marks the entry as free.
411 */
412 store(entry->seq, tail + aq->size);
413
414 return true;
415}
416
418{
419 return aq->size;
420}
421
422/*
423 * Segmented single-producer / single-consumer ring.
424 *
425 * Safety argument: one producer owns `head` and writes `seg->next`
426 * exactly once (release). One consumer owns `tail` and reads
427 * `tail->next` with acquire; by the release/acquire pair plus the
428 * invariant "no push into s after s->next is set" the consumer
429 * cannot advance past a segment that still has an in-flight push
430 * as long as it retries `pop(tail->q)` once more after observing
431 * `tail->next != NULL`.
432 */
433
435
437 fr_atomic_queue_t *q; //!< Per-segment MPMC ring (used SPSC here).
438 _Atomic(fr_atomic_ring_entry_t *) next; //!< NULL until the producer seals this segment
439 ///< because it filled up and moved on to a
440 ///< fresh one.
441};
442
444 size_t seg_size; //!< Capacity of each segment.
445 _Atomic(fr_atomic_ring_entry_t *) head; //!< Producer end. Writer is the single producer,
446 ///< reader is also the producer - the consumer
447 ///< never loads this.
448 fr_atomic_ring_entry_t *tail; //!< Consumer end. Touched only by the consumer.
449};
450
451/** Allocate a fresh segment and its embedded queue
452 *
453 * Uses the raw (non-talloc) allocator so this function is safe to call
454 * from the producer thread even when that thread cannot safely use talloc.
455 */
457{
459
460 s = malloc(sizeof(*s));
461 if (!s) return NULL;
462
463 s->q = fr_atomic_queue_malloc(seg_size);
464 if (!s->q) {
465 free(s);
466 return NULL;
467 }
468 atomic_init(&s->next, NULL);
469
470 return s;
471}
472
474{
476 free(s);
477}
478
479/** talloc destructor for #fr_atomic_ring_t: walk the chain and free segments */
481{
482 fr_atomic_ring_entry_t *s = ring->tail;
483
484 while (s) {
486
488 s = next;
489 }
490
491 return 0;
492}
493
494/** Allocate an empty SPSC ring
495 *
496 * @param[in] ctx talloc ctx that owns the ring handle (segments live
497 * outside talloc; they are freed by the ring's
498 * destructor).
499 * @param[in] seg_size Per-segment capacity. Rounded up to a power of 2.
500 * @return
501 * - NULL on error.
502 * - A ring containing one initial (empty) segment.
503 */
504fr_atomic_ring_t *fr_atomic_ring_alloc(TALLOC_CTX *ctx, size_t seg_size)
505{
506 fr_atomic_ring_t *ring;
508
509 if (seg_size == 0) return NULL;
510
511 ring = talloc(ctx, fr_atomic_ring_t);
512 if (!ring) return NULL;
513
514 seg = atomic_ring_entry_alloc(seg_size);
515 if (!seg) {
516 talloc_free(ring);
517 return NULL;
518 }
519
520 ring->seg_size = seg_size;
521 ring->tail = seg;
522 atomic_init(&ring->head, seg);
523 talloc_set_destructor(ring, _atomic_ring_free);
524
525 return ring;
526}
527
528/** Free the ring and all remaining segments
529 *
530 * Equivalent to `talloc_free()` on the ring, but nulls the caller's
531 * handle in the style of #fr_atomic_queue_free.
532 */
534{
535 if (!*ring_p) return;
536
537 talloc_free(*ring_p);
538 *ring_p = NULL;
539}
540
541/** Push a pointer into the ring; allocate a new segment on overflow
542 *
543 * Single-producer only. Must not be called concurrently with itself.
544 *
545 * @param[in] ring Ring to push into.
546 * @param[in] data Value to push (must be non-NULL).
547 * @return
548 * - true on success.
549 * - false if both the current segment is full and a new segment
550 * could not be allocated.
551 */
553{
556
558
559 if (likely(fr_atomic_queue_push(h->q, data))) return true;
560
562 if (unlikely(!n)) return false;
563
564 /*
565 * Publish ordering matters: the consumer only inspects `h->next`
566 * and advances past `h` once it sees a non-NULL value there.
567 * Release here pairs with acquire in fr_atomic_ring_pop.
568 */
571
572 /*
573 * coverity[leaked_storage]
574 *
575 * Coverity doesn't track atomic stores as reference
576 * publication, so it sees `n` going out of scope and
577 * flags it as leaked. It isn't: the two atomic stores
578 * above have published `n` into both `h->next` and
579 * `ring->head`, and the consumer will free it via
580 * atomic_ring_entry_free() once it advances past.
581 */
582 return fr_atomic_queue_push(n->q, data);
583}
584
585/** Pop a pointer from the ring, advancing past drained segments
586 *
587 * Single-consumer only. Must not be called concurrently with itself.
588 *
589 * @param[in] ring Ring to pop from.
590 * @param[out] p_data Where to write the popped value on success.
591 * @return
592 * - true if a value was popped.
593 * - false if the ring is currently empty.
594 */
595bool fr_atomic_ring_pop(fr_atomic_ring_t *ring, void **p_data)
596{
600
601 for (;;) {
602 cur = ring->tail;
603
604 if (likely(fr_atomic_queue_pop(cur->q, p_data))) return true;
605
606 /*
607 * Empty from our point of view. If the producer hasn't
608 * sealed this segment there might be pushes in our
609 * future - return and let the caller come back.
610 */
612 if (!n) return false;
613
614 /*
615 * Sealed. One more pop to drain anything the producer
616 * committed before sealing but after our first (empty)
617 * pop observation. Without this re-check, late commits
618 * in the (empty-observation, seal-observation) window
619 * would be stranded when we advance past `cur`.
620 */
621 if (fr_atomic_queue_pop(cur->q, p_data)) return true;
622
623 old = cur;
624 ring->tail = n;
626 /* loop to pop from the new tail */
627 }
628}
629
630#ifdef WITH_VERIFY_PTR
631/** Check the talloc chunk is still valid
632 *
633 */
634void fr_atomic_queue_verify(fr_atomic_queue_t *aq)
635{
636 (void)talloc_get_type_abort(aq->chunk, fr_atomic_queue_t);
637}
638#endif
639
640#ifndef NDEBUG
641
642#if 0
643typedef struct {
644 int status; //!< status of this message
645 size_t data_size; //!< size of the data we're sending
646
647 int signal; //!< the signal to send
648 uint64_t ack; //!< or the endpoint..
649 void *ch; //!< the channel
651#endif
652
653
654/** Dump an atomic queue.
655 *
656 * Absolutely NOT thread-safe.
657 *
658 * @param[in] aq The atomic queue to debug.
659 * @param[in] fp where the debugging information will be printed.
660 */
662{
663 size_t i;
664 int64_t head, tail;
665
666 head = load(aq->head);
667 tail = load(aq->tail);
668
669 fprintf(fp, "AQ %p size %zu, head %" PRId64 ", tail %" PRId64 "\n",
670 aq, aq->size, head, tail);
671
672 for (i = 0; i < aq->size; i++) {
674
675 entry = &aq->entry[i];
676
677 fprintf(fp, "\t[%zu] = { %p, %" PRId64 " }",
678 i, entry->data, load(entry->seq));
679#if 0
680 if (entry->data) {
682
683 c = entry->data;
684
685 fprintf(fp, "\tstatus %d, data_size %zd, signal %d, ack %zd, ch %p",
686 c->status, c->data_size, c->signal, c->ack, c->ch);
687 }
688#endif
689 fprintf(fp, "\n");
690 }
691}
692#endif
int n
Definition acutest.h:577
atomic_int64_t head
Position of the producer.
static void atomic_ring_entry_free(fr_atomic_ring_entry_t *s)
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
#define atomic_int64_t
void fr_atomic_queue_debug(FILE *fp, fr_atomic_queue_t *aq)
Dump an atomic queue.
void * chunk
The start of the talloc chunk to pass to free, or NULL if this queue was allocated raw via fr_atomic_...
bool fr_atomic_ring_push(fr_atomic_ring_t *ring, void *data)
Push a pointer into the ring; allocate a new segment on overflow.
atomic_int64_t tail
Position of the consumer.
#define cas_incr(_store, _var)
void fr_atomic_ring_free(fr_atomic_ring_t **ring_p)
Free the ring and all remaining segments.
fr_atomic_queue_t * fr_atomic_queue_malloc(size_t size)
Create fixed-size atomic queue outside any talloc hierarchy.
static int _atomic_ring_free(fr_atomic_ring_t *ring)
talloc destructor for fr_atomic_ring_t: walk the chain and free segments
fr_atomic_ring_t * fr_atomic_ring_alloc(TALLOC_CTX *ctx, size_t seg_size)
Allocate an empty SPSC ring.
fr_atomic_queue_entry_t entry[]
The entry array, also aligned to ensure it's not in the same cache line as tail and size.
bool fr_atomic_ring_pop(fr_atomic_ring_t *ring, void **p_data)
Pop a pointer from the ring, advancing past drained segments.
size_t size
The length of the queue.
size_t fr_atomic_queue_size(fr_atomic_queue_t *aq)
#define load(_var)
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
void fr_atomic_queue_free(fr_atomic_queue_t **aq)
Free an atomic queue if it's not freed by ctx.
#define store(_store, _var)
static void atomic_queue_init(fr_atomic_queue_t *aq, size_t size)
Initialise the sequence numbers and head/tail on a fresh queue buffer.
#define acquire(_var)
fr_atomic_queue_t * fr_atomic_queue_talloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
fr_atomic_queue_t * q
Per-segment MPMC ring (used SPSC here).
atomic_int64_t seq
Must be seq then data to ensure seq is 64bit aligned for 32bit address spaces.
size_t seg_size
Capacity of each segment.
#define CACHE_LINE_SIZE
static fr_atomic_ring_entry_t * atomic_ring_entry_alloc(size_t seg_size)
Allocate a fresh segment and its embedded queue.
Entry in the queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:512
#define unlikely(_x)
Definition build.h:407
static fr_atomic_queue_t ** aq
free(array)
talloc_free(hp)
fr_control_message_status_t status
status of this message
Definition control.c:60
size_t data_size
size of the data we're sending
Definition control.c:62
The header for the control message.
Definition control.c:59
static uint64_t fr_roundup_pow2_uint64(uint64_t v)
Definition math.h:32
unsigned long int size_t
@ memory_order_seq_cst
Definition stdatomic.h:132
@ memory_order_release
Definition stdatomic.h:130
@ memory_order_relaxed
Definition stdatomic.h:127
@ memory_order_acquire
Definition stdatomic.h:129
#define _Atomic(T)
Definition stdatomic.h:77
#define atomic_load_explicit(object, order)
Definition stdatomic.h:312
#define atomic_thread_fence(order)
Definition stdatomic.h:148
#define atomic_store_explicit(object, desired, order)
Definition stdatomic.h:314
#define atomic_init(obj, value)
Definition stdatomic.h:89
TALLOC_CTX * talloc_aligned_array(TALLOC_CTX *ctx, void **start, size_t alignment, size_t size)
Return a page aligned talloc memory array.
Definition talloc.c:197
static fr_slen_t head
Definition xlat.h:420
static fr_slen_t data
Definition value.h:1340