The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
atomic_queue.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 4c9b7eac8aec45848041939320b3ab29e966ff9f $
19 *
20 * @brief Thread-safe queues.
21 * @file io/atomic_queue.c
22 *
23 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24 * @copyright 2016 Alister Winfield
25 */
26RCSID("$Id: 4c9b7eac8aec45848041939320b3ab29e966ff9f $")
27
28#include <stdint.h>
29#include <stdalign.h>
30#include <inttypes.h>
31
32#include <freeradius-devel/autoconf.h>
33#include <freeradius-devel/io/atomic_queue.h>
34#include <freeradius-devel/util/talloc.h>
35
36#define CACHE_LINE_SIZE 64
37
38/** Entry in the queue
39 *
40 * @note This structure is cache line aligned for modern AMD/Intel CPUs.
41 * This is to avoid contention when the producer and consumer are executing
42 * on different CPU cores.
43 */
44typedef struct CC_HINT(packed, aligned(CACHE_LINE_SIZE)) {
45 atomic_int64_t seq; //!< Must be seq then data to ensure
46 ///< seq is 64bit aligned for 32bit address
47 ///< spaces.
48 void *data;
50
51/** Structure to hold the atomic queue
52 *
53 */
55 alignas(CACHE_LINE_SIZE) atomic_int64_t head; //!< Head, aligned bytes to ensure
56 ///< it's in a different cache line to tail
57 ///< to reduce memory contention.
59
60 size_t size;
61
62 void *chunk; //!< To pass to free. The non-aligned address.
63
64 alignas(CACHE_LINE_SIZE) fr_atomic_queue_entry_t entry[]; //!< The entry array, also aligned
65 ///< to ensure it's not in the same cache
66 ///< line as tail and size.
67};
68
69/** Create fixed-size atomic queue
70 *
71 * @note the queue must be freed explicitly by the ctx being freed, or by using
72 * the #fr_atomic_queue_free function.
73 *
74 * @param[in] ctx The talloc ctx to allocate the queue in.
75 * @param[in] size The number of entries in the queue.
76 * @return
77 * - NULL on error.
78 * - fr_atomic_queue_t *, a pointer to the allocated and initialized queue.
79 */
80fr_atomic_queue_t *fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
81{
82 size_t i;
83 int64_t seq;
85 TALLOC_CTX *chunk;
86
87 if (size == 0) return NULL;
88
89 /*
90 * Allocate a contiguous blob for the header and queue.
91 * This helps with memory locality.
92 *
93 * Since we're allocating a blob, we should also set the
94 * name of the data, too.
95 */
96 chunk = talloc_aligned_array(ctx, (void **)&aq, CACHE_LINE_SIZE,
97 sizeof(*aq) + (size) * sizeof(aq->entry[0]));
98 if (!chunk) return NULL;
99 aq->chunk = chunk;
100
101 talloc_set_name_const(chunk, "fr_atomic_queue_t");
102
103 /*
104 * Initialize the array. Data is NULL, and indexes are
105 * the array entry number.
106 */
107 for (i = 0; i < size; i++) {
108 seq = i;
109
110 aq->entry[i].data = NULL;
111 store(aq->entry[i].seq, seq);
112 }
113
114 aq->size = size;
115
116 /*
117 * Set the head / tail indexes, and force other CPUs to
118 * see the writes.
119 */
120 store(aq->head, 0);
121 store(aq->tail, 0);
123
124 return aq;
125}
126
127/** Free an atomic queue if it's not freed by ctx
128 *
129 * This function is needed because the atomic queue memory
130 * must be cache line aligned.
131 */
133{
134 if (!*aq) return;
135
136 talloc_free((*aq)->chunk);
137 *aq = NULL;
138}
139
140/** Push a pointer into the atomic queue
141 *
142 * @param[in] aq The atomic queue to add data to.
143 * @param[in] data to push.
144 * @return
145 * - true on successful push
146 * - false on queue full
147 */
149{
150 int64_t head;
152
153 if (!data) return false;
154
155 head = load(aq->head);
156
157 /*
158 * Try to find the current head.
159 */
160 for (;;) {
161 int64_t seq, diff;
162
163 entry = &aq->entry[ head % aq->size ];
164 seq = aquire(entry->seq);
165 diff = (seq - head);
166
167 /*
168 * head is larger than the current entry, the queue is full.
169 */
170 if (diff < 0) {
171#if 0
172 fr_atomic_queue_debug(aq, stderr);
173#endif
174 return false;
175 }
176
177 /*
178 * Someone else has already written to this entry. Get the new head pointer, and continue.
179 */
180 if (diff > 0) {
181 head = load(aq->head);
182 continue;
183 }
184
185 /*
186 * We have the possibility that we can write to
187 * this entry. Try it. If the write succeeds,
188 * we're done. If the write fails, re-load the
189 * current head entry, and continue.
190 */
191 if (cas_incr(aq->head, head)) {
192 break;
193 }
194 }
195
196 /*
197 * Store the data in the queue, and increment the entry
198 * with the new index, and make the write visible to
199 * other CPUs.
200 */
201 entry->data = data;
202 store(entry->seq, head + 1);
203 return true;
204}
205
206
207/** Pop a pointer from the atomic queue
208 *
209 * @param[in] aq the atomic queue to retrieve data from.
210 * @param[out] p_data where to write the data.
211 * @return
212 * - true on successful pop
213 * - false on queue empty
214 */
216{
217 int64_t tail, seq;
219
220 if (!p_data) return false;
221
222 tail = load(aq->tail);
223
224 for (;;) {
225 int64_t diff;
226
227 entry = &aq->entry[ tail % aq->size ];
228 seq = aquire(entry->seq);
229
230 diff = (seq - (tail + 1));
231
232 /*
233 * tail is smaller than the current entry, the queue is full.
234 */
235 if (diff < 0) {
236 return false;
237 }
238
239 if (diff > 0) {
240 tail = load(aq->tail);
241 continue;
242 }
243
244 if (cas_incr(aq->tail, tail)) {
245 break;
246 }
247 }
248
249 /*
250 * Copy the pointer to the caller BEFORE updating the
251 * queue entry.
252 */
253 *p_data = entry->data;
254
255 /*
256 * Set the current entry to past the end of the queue.
257 * i.e. it's unused.
258 */
259 seq = tail + aq->size;
260 store(entry->seq, seq);
261
262 return true;
263}
264
266{
267 return aq->size;
268}
269
270#ifdef WITH_VERIFY_PTR
271/** Check the talloc chunk is still valid
272 *
273 */
274void fr_atomic_queue_verify(fr_atomic_queue_t *aq)
275{
276 (void)talloc_get_type_abort(aq->chunk, fr_atomic_queue_t);
277}
278#endif
279
280#ifndef NDEBUG
281
282#if 0
283typedef struct {
284 int status; //!< status of this message
285 size_t data_size; //!< size of the data we're sending
286
287 int signal; //!< the signal to send
288 uint64_t ack; //!< or the endpoint..
289 void *ch; //!< the channel
291#endif
292
293
294/** Dump an atomic queue.
295 *
296 * Absolutely NOT thread-safe.
297 *
298 * @param[in] aq The atomic queue to debug.
299 * @param[in] fp where the debugging information will be printed.
300 */
302{
303 size_t i;
304 int64_t head, tail;
305
306 head = load(aq->head);
307 tail = load(aq->head);
308
309 fprintf(fp, "AQ %p size %zu, head %" PRId64 ", tail %" PRId64 "\n",
310 aq, aq->size, head, tail);
311
312 for (i = 0; i < aq->size; i++) {
314
315 entry = &aq->entry[i];
316
317 fprintf(fp, "\t[%zu] = { %p, %" PRId64 " }",
318 i, entry->data, load(entry->seq));
319#if 0
320 if (entry->data) {
322
323 c = entry->data;
324
325 fprintf(fp, "\tstatus %d, data_size %zd, signal %d, ack %zd, ch %p",
326 c->status, c->data_size, c->signal, c->ack, c->ch);
327 }
328#endif
329 fprintf(fp, "\n");
330 }
331}
332#endif
atomic_int64_t head
Head, aligned bytes to ensure it's in a different cache line to tail to reduce memory contention.
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
void * chunk
To pass to free. The non-aligned address.
atomic_int64_t tail
fr_atomic_queue_entry_t entry[]
The entry array, also aligned to ensure it's not in the same cache line as tail and size.
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
size_t fr_atomic_queue_size(fr_atomic_queue_t *aq)
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
void fr_atomic_queue_free(fr_atomic_queue_t **aq)
Free an atomic queue if it's not freed by ctx.
void fr_atomic_queue_debug(fr_atomic_queue_t *aq, FILE *fp)
Dump an atomic queue.
atomic_int64_t seq
Must be seq then data to ensure seq is 64bit aligned for 32bit address spaces.
#define CACHE_LINE_SIZE
Entry in the queue.
Structure to hold the atomic queue.
#define atomic_int64_t
#define cas_incr(_store, _var)
#define aquire(_var)
#define load(_var)
#define store(_store, _var)
#define RCSID(id)
Definition build.h:483
static fr_atomic_queue_t * aq
fr_control_message_status_t status
status of this message
Definition control.c:63
size_t data_size
size of the data we're sending
Definition control.c:65
The header for the control message.
Definition control.c:62
talloc_free(reap)
@ memory_order_seq_cst
Definition stdatomic.h:132
#define atomic_thread_fence(order)
Definition stdatomic.h:148
TALLOC_CTX * talloc_aligned_array(TALLOC_CTX *ctx, void **start, size_t alignment, size_t size)
Return a page aligned talloc memory array.
Definition talloc.c:200
static fr_slen_t head
Definition xlat.h:422
static fr_slen_t data
Definition value.h:1265