The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
message.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 15f4364765c192611e3c4df1a0d3f9ebfa6eee14 $
19 *
20 * @brief Messages for inter-thread communication
21 * @file io/message.c
22 *
23 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24 */
25RCSID("$Id: 15f4364765c192611e3c4df1a0d3f9ebfa6eee14 $")
26
27#include <freeradius-devel/io/message.h>
28#include <freeradius-devel/util/strerror.h>
29
30
31/*
32 * Debugging, mainly for message_set_test
33 */
34#if 0
35#define MPRINT(...) fprintf(stderr, __VA_ARGS__)
36#else
37#define MPRINT(...)
38#endif
39
40#define MSG_ARRAY_SIZE (16)
41
42#define CACHE_ALIGN(_x) do { _x += 63; _x &= ~(size_t) 63; } while (0)
43
44/** A Message set, composed of message headers and ring buffer data.
45 *
46 * A message set is intended to send short-lived messages. The
47 * message headers are fixed in size, and allocated from an array
48 * which is treated like a circular buffer. Message bodies (i.e. raw
49 * packets) are variable in size, and live in a separate ring buffer.
50 *
51 * The message set starts off with a small array of message headers,
52 * and a small ring buffer. If an array/buffer fills up, a new one
53 * is allocated at double the size of the previous one.
54 *
55 * The array / buffers are themselves kept in fixed-size arrays, of
56 * MSG_ARRAY_SIZE. The reason is that the memory for fr_message_set_t
57 * should be contiguous, and not in a linked list scattered in
58 * memory.
59 *
60 * The originator allocates a message, and sends it to a recipient.
61 * The recipient (usually in another thread) uses the message, and
62 * marks it as FR_MESSAGE_DONE. The originator then asynchronously
63 * cleans up the message.
64 *
65 * This asynchronous cleanup is done via self-clocking. If there is
66 * no need to clean up the messages, it isn't done. Only when we run
67 * out of space to store messages (or packets) is the cleanup done.
68 *
69 * This cleanup latency ensures that we don't have cache line
70 * bouncing, where the originator sends the message, and then while
71 * the recipieent is reading it... thrashes the cache line with
72 * checks for "are you done? Are you done?"
73 *
74 * If there are more than one used entry in either array, we then try
75 * to coalesce the buffers on cleanup. If we discover that one array
76 * is empty, we discard it, and move the used array entries into it's
77 * place.
78 *
79 * This process ensures that we don't have too many buffers in
80 * progress. It is better to have a few large buffers than many
81 * small ones.
82 *
83 * MSG_ARRAY_SIZE is defined to be large (16 doublings) to allow for
84 * the edge case where messages are stuck for long periods of time.
85 *
86 * With an initial message array size of 64, this gives us room for
87 * 2M packets, if *all* of the mr_array entries have packets stuck in
88 * them that aren't cleaned up for extended periods of time.
89 *
90 * @todo Add a flag for UDP-style protocols, where we can put the
91 * message into the ring buffer. This helps with locality of
92 * reference, and removes the need to track two separate things.
93 */
95 int mr_current; //!< current used message ring entry
96 int mr_max; //!< max used message ring entry
97
98 size_t message_size; //!< size of the callers message, including fr_message_t
99
100 int mr_cleaned; //!< where we last cleaned
101
102 int rb_current; //!< current used ring buffer entry
103 int rb_max; //!< max used ring buffer entry
104
105 size_t max_allocation; //!< maximum allocation size
106
108 int freed;
109
110 fr_ring_buffer_t *mr_array[MSG_ARRAY_SIZE]; //!< array of message arrays
111
112 fr_ring_buffer_t *rb_array[MSG_ARRAY_SIZE]; //!< array of ring buffers
113};
114
115
116/** Create a message set
117 *
118 * @param[in] ctx the context for talloc
119 * @param[in] num_messages size of the initial message array. MUST be a power of 2.
120 * @param[in] message_size the size of each message, INCLUDING fr_message_t, which MUST be at the start of the struct
121 * @param[in] ring_buffer_size of the ring buffer. MUST be a power of 2.
122 * @param[in] unlimited_size allow any message size to be allocated. If false it is limited to ring_buffer_size / 2.
123 * @return
124 * - NULL on error
125 * - newly allocated fr_message_set_t on success
126 */
127fr_message_set_t *fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size,
128 bool unlimited_size)
129{
131
132 /*
133 * Too small, or not a power of 2.
134 */
135 if (num_messages < 8) num_messages = 8;
136
137 if ((num_messages & (num_messages - 1)) != 0) {
138 fr_strerror_const("Number of messages must be a power of 2");
139 return NULL;
140 }
141
142 if (message_size < sizeof(fr_message_t)) {
143 fr_strerror_printf("Message size must be at least %zd", sizeof(fr_message_t));
144 return NULL;
145 }
146
147 if (message_size > 1024) {
148 fr_strerror_const("Message size must be no larger than 1024");
149 return NULL;
150 }
151
152 ms = talloc_zero(ctx, fr_message_set_t);
153 if (!ms) {
154 fr_strerror_const("Failed allocating memory");
155 return NULL;
156 }
157
158 CACHE_ALIGN(message_size);
159 ms->message_size = message_size;
160
161 ms->rb_array[0] = fr_ring_buffer_create(ms, ring_buffer_size);
162 if (!ms->rb_array[0]) {
163 talloc_free(ms);
164 return NULL;
165 }
166 ms->rb_max = 0;
167
168 ms->mr_array[0] = fr_ring_buffer_create(ms, num_messages * message_size);
169 if (!ms->mr_array[0]) {
170 talloc_free(ms);
171 return NULL;
172 }
173
174 /*
175 * If unlimited size is allowed, set the max allocation to 1 << 29
176 * which is based on the maximum ring buffer reservation of 1 << 30
177 */
178 ms->max_allocation = unlimited_size ? 1 << 29 : ring_buffer_size / 2;
179
180 return ms;
181}
182
183
184/** Mark a message as done
185 *
186 * Note that this call is usually done from a thread OTHER than the
187 * originator of the message. As such, the message is NOT actually
188 * freed. Instead, it is just marked as freed.
189 *
190 * @param[in] m the message to make as done.
191 * @return
192 * - <0 on error
193 * - 0 on success
194 */
196{
199
200 /*
201 * Mark a message as freed. The originator will take
202 * care of cleaning it up.
203 */
204 if (m->status == FR_MESSAGE_USED) {
206 return 0;
207 }
208
209 /*
210 * This message was localized, so we can free it via
211 * talloc.
212 */
213 if (m->status == FR_MESSAGE_LOCALIZED) {
214 talloc_free(m);
215 return 0;
216 }
217
218 /*
219 * A catastrophic error.
220 */
221 fr_assert(0 == 1);
222
223 fr_strerror_const("Failed marking message as done");
224 return -1;
225}
226
227
228/** Localize a message by copying it to local storage
229 *
230 * This function "localizes" a message by copying it to local
231 * storage. In the case where the recipient of a message has to sit
232 * on it for a while, that blocks the originator from cleaning up the
233 * message. The recipient can then copy the message to local
234 * storage, so that the originator can clean it up.
235 *
236 * The localized message is marked as FR_MESSAGE_LOCALIZED, so that
237 * the recipient can call the normal fr_message_done() function to
238 * free it.
239 *
240 * @param[in] ctx the talloc context to use for localization
241 * @param[in] m the message to be localized
242 * @param[in] message_size the size of the message, including the fr_message_t
243 * @return
244 * - NULL on allocation error
245 * - a newly localized message
246 */
247fr_message_t *fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
248{
249 fr_message_t *l;
250
251 if (m->status != FR_MESSAGE_USED) {
252 fr_strerror_const("Cannot localize message unless it is in use");
253 return NULL;
254 }
255
256 if (message_size <= sizeof(fr_message_t)) {
257 fr_strerror_const("Message size is too small");
258 return NULL;
259 }
260
261 l = talloc_memdup(ctx, m, message_size);
262 if (!l) {
263 nomem:
264 fr_strerror_const("Failed allocating memory");
265 return NULL;
266 }
267
268 l->data = NULL;
269
270 if (l->data_size) {
271 l->data = talloc_memdup(l, m->data, l->data_size);
272 if (!l->data) {
273 talloc_free(l);
274 goto nomem;
275 }
276 }
277
279
280 /*
281 * After this change, "m" should not be used for
282 * anything.
283 */
285
286 /*
287 * Now clean up the other fields of the newly localized
288 * message.
289 */
290 l->rb = NULL;
291 l->rb_size = 0;
292
293 return l;
294}
295
296
297/** Clean up messages in a message ring.
298 *
299 * Find the oldest messages which are marked FR_MESSAGE_DONE,
300 * and mark them FR_MESSAGE_FREE.
301 *
302 * FIXME: If we care, track which ring buffer is in use, and how
303 * many contiguous chunks we can free. Then, free the chunks at
304 * once, instead of piecemeal. Realistically tho... this will
305 * probably make little difference.
306 *
307 * @param[in] ms the message set
308 * @param[in] mr the message ring
309 * @param[in] max_to_clean maximum number of messages to clean at a time.
310 */
311static int fr_message_ring_gc(fr_message_set_t *ms, fr_ring_buffer_t *mr, int max_to_clean)
312{
313 int messages_cleaned = 0;
314 size_t size;
315 fr_message_t *m;
316
317 while (true) {
318 (void) fr_ring_buffer_start(mr, (uint8_t **) &m, &size);
319 if (size == 0) break;
320
321 fr_assert(m != NULL);
322 fr_assert(size >= ms->message_size);
323
325 if (m->status != FR_MESSAGE_DONE) break;
326
327 messages_cleaned++;
329 ms->freed++;
330
331 if (m->rb) {
332 (void) fr_ring_buffer_free(m->rb, m->rb_size);
333#ifndef NDEBUG
334 memset(m, 0, ms->message_size);
335#endif
336 }
337
339
340 if (messages_cleaned >= max_to_clean) break;
341 }
342
343 MPRINT("CLEANED %d (%p) left\n", messages_cleaned, mr);
344 return messages_cleaned;
345}
346
347
348/** Garbage collect "done" messages.
349 *
350 * Called only from the originating thread. We also clean a limited
351 * number of messages at a time, so that we don't have sudden latency
352 * spikes when cleaning 1M messages.
353 *
354 * @param[in] ms the message set
355 * @param[in] max_to_clean the maximum number of messages to clean
356 */
357static void fr_message_gc(fr_message_set_t *ms, int max_to_clean)
358{
359 int i;
360 int arrays_freed, arrays_used, empty_slot;
361 int largest_free_slot;
362 int total_cleaned;
363 size_t largest_free_size;
364
365 /*
366 * Clean up "done" messages.
367 */
368 total_cleaned = 0;
369
370 /*
371 * Garbage collect the smaller buffers first.
372 */
373 for (i = 0; i <= ms->mr_max; i++) {
374 int cleaned;
375
376 cleaned = fr_message_ring_gc(ms, ms->mr_array[i], max_to_clean - total_cleaned);
377 total_cleaned += cleaned;
378 fr_assert(total_cleaned <= max_to_clean);
379
380 /*
381 * Stop when we've reached our GC limit.
382 */
383 if (total_cleaned == max_to_clean) break;
384 }
385
386 /*
387 * Couldn't GC anything. Don't do more work.
388 */
389 if (total_cleaned == 0) return;
390
391 arrays_freed = 0;
392 arrays_used = 0;
393
394 /*
395 * Keep the two largest message buffers (used or not),
396 * and free all smaller ones which are empty.
397 */
398 for (i = ms->mr_max; i >= 0; i--) {
399 fr_assert(ms->mr_array[i] != NULL);
400
401 if (arrays_used < 2) {
402 MPRINT("\tleaving entry %d alone\n", i);
403 arrays_used++;
404 continue;
405 }
406
407 /*
408 * If the message ring buffer is empty, check if
409 * we should perhaps delete it.
410 */
411 if (fr_ring_buffer_used(ms->mr_array[i]) == 0) {
412 MPRINT("\tfreeing entry %d\n", i);
413 TALLOC_FREE(ms->mr_array[i]);
414 arrays_freed++;
415 continue;
416 }
417
418 MPRINT("\tstill in use entry %d\n", i);
419 }
420
421 /*
422 * Some entries have been freed. We need to coalesce the
423 * remaining entries.
424 */
425 if (arrays_freed) {
426 MPRINT("TRYING TO PACK from %d free arrays out of %d\n", arrays_freed, ms->rb_max + 1);
427
428 empty_slot = -1;
429
430 /*
431 * Pack the rb array by moving used entries to
432 * the bottom of the array.
433 */
434 for (i = 0; i <= ms->mr_max; i++) {
435 int j;
436
437 /*
438 * Skip over empty entries, but set
439 * "empty_slot" to the first empty on we
440 * found.
441 */
442 if (!ms->mr_array[i]) {
443 if (empty_slot < 0) empty_slot = i;
444
445 continue;
446 }
447
448 /*
449 * This array entry is used, but there is
450 * no empty slot to put it into. Ignore
451 * it, and continue
452 */
453 if (empty_slot < 0) continue;
454
455 fr_assert(ms->mr_array[empty_slot] == NULL);
456
457 ms->mr_array[empty_slot] = ms->mr_array[i];
458 ms->mr_array[i] = NULL;
459
460 /*
461 * Find the next empty slot which is
462 * greater than the one we just used.
463 */
464 for (j = empty_slot + 1; j <= i; j++) {
465 if (!ms->mr_array[j]) {
466 empty_slot = j;
467 break;
468 }
469 }
470 }
471
472 /*
473 * Lower max, and set current to the largest
474 * array, whether or not it's used.
475 */
476 ms->mr_max -= arrays_freed;
477 ms->mr_current = ms->mr_max;
478
479#ifndef NDEBUG
480 MPRINT("NUM RB ARRAYS NOW %d\n", ms->mr_max + 1);
481 for (i = 0; i <= ms->mr_max; i++) {
482 MPRINT("\t%d %p\n", i, ms->mr_array[i]);
483 fr_assert(ms->mr_array[i] != NULL);
484 }
485#endif
486 }
487
488 /*
489 * And now we do the same thing for the ring buffers.
490 * Except that freeing the messages above also cleaned up
491 * the contents of each ring buffer, so all we need to do
492 * is find the largest empty ring buffer.
493 *
494 * We do this by keeping the two largest ring buffers
495 * (used or not), and then freeing all smaller ones which
496 * are empty.
497 */
498 arrays_used = 0;
499 arrays_freed = 0;
500 MPRINT("TRYING TO FREE ARRAYS %d\n", ms->rb_max);
501 for (i = ms->rb_max; i >= 0; i--) {
502 fr_assert(ms->rb_array[i] != NULL);
503
504 if (arrays_used < 2) {
505 MPRINT("\tleaving entry %d alone\n", i);
506 arrays_used++;
507 continue;
508 }
509
510 if (fr_ring_buffer_used(ms->rb_array[i]) == 0) {
511 MPRINT("\tfreeing entry %d\n", i);
512 TALLOC_FREE(ms->rb_array[i]);
513 arrays_freed++;
514 continue;
515 }
516
517 MPRINT("\tstill in use entry %d\n", i);
518 }
519
520 /*
521 * Pack the array entries back down.
522 */
523 if (arrays_freed > 0) {
524 MPRINT("TRYING TO PACK from %d free arrays out of %d\n", arrays_freed, ms->rb_max + 1);
525
526 empty_slot = -1;
527
528 /*
529 * Pack the rb array by moving used entries to
530 * the bottom of the array.
531 */
532 for (i = 0; i <= ms->rb_max; i++) {
533 int j;
534
535 /*
536 * Skip over empty entries, but set
537 * "empty_slot" to the first empty on we
538 * found.
539 */
540 if (!ms->rb_array[i]) {
541 if (empty_slot < 0) empty_slot = i;
542
543 continue;
544 }
545
546 /*
547 * This array entry is used, but there is
548 * no empty slot to put it into. Ignore
549 * it, and continue
550 */
551 if (empty_slot < 0) continue;
552
553 fr_assert(ms->rb_array[empty_slot] == NULL);
554
555 ms->rb_array[empty_slot] = ms->rb_array[i];
556 ms->rb_array[i] = NULL;
557
558 /*
559 * Find the next empty slot which is
560 * greater than the one we just used.
561 */
562 for (j = empty_slot + 1; j <= i; j++) {
563 if (!ms->rb_array[j]) {
564 empty_slot = j;
565 break;
566 }
567 }
568 }
569
570 /*
571 * Lower max, and set current to the largest
572 * array, whether or not it's used.
573 */
574 ms->rb_max -= arrays_freed;
575 ms->rb_current = ms->rb_max;
576
577#ifndef NDEBUG
578 MPRINT("NUM RB ARRAYS NOW %d\n", ms->rb_max + 1);
579 for (i = 0; i <= ms->rb_max; i++) {
580 MPRINT("\t%d %p\n", i, ms->rb_array[i]);
581 fr_assert(ms->rb_array[i] != NULL);
582 }
583#endif
584 }
585
586 /*
587 * Set the current ring buffer to the one with the
588 * largest free space in it.
589 *
590 * This is different from the allocation strategy for
591 * messages.
592 */
593 if (!fr_cond_assert(ms->rb_array[ms->rb_max] != NULL)) return;
594
595 largest_free_slot = ms->rb_max;
596 largest_free_size = (fr_ring_buffer_size(ms->rb_array[ms->rb_max]) -
598
599 for (i = 0; i < ms->rb_max; i++) {
600 size_t free_size;
601
602 fr_assert(ms->rb_array[i] != NULL);
603
604 free_size = (fr_ring_buffer_size(ms->rb_array[i]) -
606 if (largest_free_size < free_size) {
607 largest_free_slot = i;
608 largest_free_size = free_size;
609 }
610 }
611
612 ms->rb_current = largest_free_slot;
613 fr_assert(ms->rb_current >= 0);
614 fr_assert(ms->rb_current <= ms->rb_max);
615}
616
617/** Allocate a message from a message ring.
618 *
619 * The newly allocated message is zeroed.
620 *
621 * @param[in] ms the message set
622 * @param[in] mr the message ring to allocate from
623 * @param[in] clean whether to clean the message ring
624 * @return
625 * - NULL on failed allocation
626 * - fr_message_t* on successful allocation.
627 */
629{
630 fr_message_t *m;
631
632 /*
633 * We're at the start of a buffer with data, and there's
634 * no room. Do a quick check to see if we can free up
635 * the oldest entry. If not, return.
636 *
637 * Otherwise, fall through to allocating a entry, of
638 * which there must now be at least one free one.
639 *
640 * This check results in a small amount of cache line
641 * thrashing. But if the buffer is full, it's likely
642 * that the oldest entry can be freed. If not, we have a
643 * small amount of cache thrashing, which should be
644 * extremely rare.
645 */
646 if (clean) {
647 if (fr_message_ring_gc(ms, mr, 4) == 0) {
648 fr_strerror_const("No free memory after GC attempt");
649 return NULL;
650 }
651
652 /*
653 * Else we cleaned up some entries in this array.
654 * Go allocate a message.
655 */
656 }
657
658 /*
659 * Grab a new message from the underlying ring buffer.
660 */
662 if (!m) return NULL;
663
664#ifndef NDEBUG
665 memset(m, 0, ms->message_size);
666#endif
668 return m;
669}
670
671/** Allocate a fr_message_t, WITHOUT a ring buffer.
672 *
673 * @param[in] ms the message set
674 * @param[out] p_cleaned a flag to indicate if we cleaned the message array
675 * @return
676 * - NULL on error
677 * - fr_message_t* on success
678 */
680{
681 int i;
682 fr_message_t *m;
684
685 ms->allocated++;
686 *p_cleaned = false;
687
688 /*
689 * Grab the current message array. In the general case,
690 * there's room, so we grab a message and go find a ring
691 * buffer.
692 */
693 mr = ms->mr_array[ms->mr_current];
695 if (m) {
696 memset(m, 0, ms->message_size);
698 MPRINT("ALLOC normal\n");
699 return m;
700 }
701
702 MPRINT("CLEANING UP (%zd - %zd = %zd)\n", ms->allocated, ms->freed,
703 ms->allocated - ms->freed);
704
705 /*
706 * Else the buffer is full. Do a global cleanup.
707 */
708 fr_message_gc(ms, 128);
709 *p_cleaned = true;
710
711 /*
712 * If we're lucky, the cleanup has given us a new
713 * "current" buffer, which is empty. If so, use it.
714 */
715 mr = ms->mr_array[ms->mr_current];
716 m = fr_message_ring_alloc(ms, mr, true);
717 if (m) {
718 MPRINT("ALLOC after cleanup\n");
719 return m;
720 }
721
722 /*
723 * We've tried two allocations, and both failed. Brute
724 * force over all arrays, trying to allocate one
725 * somewhere... anywhere. We start from the largest
726 * array, because that is the one we want to use the
727 * most.
728 *
729 * We want to avoid allocations in the smallest array,
730 * because that array will quickly wrap, and will cause
731 * us to do cleanups more often. That also lets old
732 * entries in the smallest array age out, so that we can
733 * free the smallest arrays.
734 */
735 for (i = ms->mr_max; i >= 0; i--) {
736 mr = ms->mr_array[i];
737
738 m = fr_message_ring_alloc(ms, mr, true);
739 if (m) {
740 ms->mr_current = i;
741 MPRINT("ALLOC from changed ring buffer\n");
742 MPRINT("SET MR to changed %d\n", ms->mr_current);
743 return m;
744 }
745 }
746
747 /*
748 * All of the arrays are full. If we don't have
749 * room to allocate another array, we're dead.
750 */
751 if ((ms->mr_max + 1) >= MSG_ARRAY_SIZE) {
752 fr_strerror_const("All message arrays are full");
753 return NULL;
754 }
755
756 /*
757 * Allocate another message ring, double the size
758 * of the previous maximum.
759 */
761 if (!mr) {
762 fr_strerror_const_push("Failed allocating ring buffer");
763 return NULL;
764 }
765
766 /*
767 * Set the new one as current for all new
768 * allocations, allocate a message, and go try to
769 * reserve room for the raw packet data.
770 */
771 ms->mr_max++;
772 ms->mr_current = ms->mr_max;
773 ms->mr_array[ms->mr_max] = mr;
774
775 MPRINT("SET MR to doubled %d\n", ms->mr_current);
776
777 /*
778 * And we should now have an entirely empty message ring.
779 */
780 m = fr_message_ring_alloc(ms, mr, false);
781 if (!m) return NULL;
782
783 MPRINT("ALLOC after doubled message ring\n");
784
785 return m;
786}
787
788
789/** Get a ring buffer for a message
790 *
791 * @param[in] ms the message set
792 * @param[in] m the message
793 * @param[in] cleaned_up whether the message set was partially garbage collected
794 * @return
795 * - NULL on error, and m is deallocated
796 * - m on success
797 */
799 bool cleaned_up)
800{
801 int i;
803 size_t alloc_size;
804
805 /*
806 * And... we go through a bunch of hoops, all over again.
807 */
808 m->rb = ms->rb_array[ms->rb_current];
809 fr_assert(m->rb != NULL);
811 if (m->data) return m;
812
813 /*
814 * When the simple allocation fails, ensure we don't do
815 * the cleanup twice in one allocation.
816 */
817 if (!cleaned_up) {
818 /*
819 * If we run out of room in the current ring
820 * buffer, AND it's our only one, then just
821 * double it in size.
822 */
823 if (ms->rb_max == 0) goto alloc_rb;
824
825 /*
826 * We're using multiple ring buffers, and we
827 * haven't already done a cleanup. Force a
828 * cleanup.
829 */
830 MPRINT("CLEANED UP BECAUSE OF RING BUFFER (%zd - %zd = %zd)\n", ms->allocated, ms->freed,
831 ms->allocated - ms->freed);
832
833 fr_message_gc(ms, 128);
834
835 /*
836 * Try to allocate the packet from the newly current ring buffer.
837 */
838 m->rb = ms->rb_array[ms->rb_current];
839 fr_assert(m->rb != NULL);
841 if (m->data) return m;
842
843 MPRINT("CLEANUP RING BUFFER FAILED\n");
844 }
845
846 /*
847 * We've tried two allocations, and both failed. Brute
848 * force over all arrays, trying to allocate one
849 * somewhere... anywhere. We start from the largest
850 * array, because that is the one we want to use the
851 * most.
852 *
853 * We want to avoid allocations in the smallest array,
854 * because that array will quickly wrap, and will cause
855 * us to do cleanups more often. That also lets old
856 * entries in the smallest array age out, so that we can
857 * free the smallest arrays.
858 */
859 for (i = ms->rb_max; i >= 0; i--) {
860 m->rb = ms->rb_array[i];
861 fr_assert(m->rb != NULL);
863 if (m->data) {
864 MPRINT("MOVED TO RING BUFFER %d\n", i);
865 ms->rb_current = i;
866 return m;
867 }
868 }
869
870 /*
871 * All of the arrays are full. If we don't have
872 * room to allocate another array, we're dead.
873 */
874 if ((ms->rb_max + 1) >= MSG_ARRAY_SIZE) {
875 fr_strerror_const("Message arrays are full");
876 goto cleanup;
877 }
878
879alloc_rb:
880 /*
881 * Allocate another message ring, double the size
882 * of the previous maximum, or large enough to hold
883 * twice the requested size if that is larger.
884 * fr_ring_buffer_create will round to the next power of 2.
885 */
886 alloc_size = fr_ring_buffer_size(ms->rb_array[ms->rb_max]) * 2;
887 if (alloc_size < m->rb_size) {
888 alloc_size = m->rb_size * 2;
889 }
890 rb = fr_ring_buffer_create(ms, alloc_size);
891 if (!rb) {
892 fr_strerror_const_push("Failed allocating ring buffer");
893 goto cleanup;
894 }
895
896 MPRINT("RING BUFFER DOUBLES\n");
897
898 /*
899 * Set the new one as current for all new
900 * allocations, allocate a message, and go try to
901 * reserve room for the raw packet data.
902 */
903 ms->rb_max++;
904 ms->rb_current = ms->rb_max;
905 ms->rb_array[ms->rb_current] = rb;
906
907 /*
908 * And we should now have an entirely empty message ring.
909 */
910 m->rb = rb;
912 if (m->data) return m;
913
914cleanup:
915 MPRINT("OUT OF MEMORY\n");
916
917 m->rb = NULL;
919 return NULL;
920}
921
922
923/** Reserve a message
924 *
925 * A later call to fr_message_alloc() will allocate the correct
926 * packet ring buffer size. This call just allocates a message
927 * header, and reserves space for the packet.
928 *
929 * If the caller later decides that the message is not needed, he
930 * should call fr_message_free() to free the message.
931 *
932 * We assume that the caller will call fr_message_reserve(), and then
933 * almost immediately fr_message_alloc(). Multiple calls in series
934 * to fr_message_reserve() MUST NOT be done. The caller could also
935 * just call fr_ring_buffer_alloc(m->rb, size) if they wanted, and
936 * then update m->data_size by hand...
937 *
938 * The message is returned
939 *
940 * @param[in] ms the message set
941 * @param[in] reserve_size to reserve
942 * @return
943 * - NULL on error
944 * - fr_message_t* on success
945 */
947{
948 bool cleaned_up;
949 fr_message_t *m;
950
951 (void) talloc_get_type_abort(ms, fr_message_set_t);
952
953 if (reserve_size > ms->max_allocation) {
954 fr_strerror_printf("Cannot reserve %zd > max allocation %zd\n", reserve_size, ms->max_allocation);
955 return NULL;
956 }
957
958 /*
959 * Allocate a bare message.
960 */
961 m = fr_message_get_message(ms, &cleaned_up);
962 if (!m) {
963 MPRINT("Failed to reserve message\n");
964 return NULL;
965 }
966
967 /*
968 * If the caller is not allocating any packet data, just
969 * return the empty message.
970 */
971 if (!reserve_size) return m;
972
973 /*
974 * We leave m->data_size as zero, and m->rb_size as the
975 * reserved size. This indicates that the message has
976 * reserved room for the packet data, but nothing has
977 * been allocated.
978 */
981
982 return fr_message_get_ring_buffer(ms, m, cleaned_up);
983}
984
985/** Allocate packet data for a message
986 *
987 * The caller will normally call fr_message_reserve() before calling
988 * this function, and pass the resulting message 'm' here. If 'm' is
989 * NULL, however, this function will call fr_message_reserve() of
990 * 'actual_packet_size'. This capability is there for callers who
991 * know the size of the message in advance.
992 *
993 * @param[in] ms the message set
994 * @param[in] m the message message to allocate packet data for
995 * @param[in] actual_packet_size to use
996 * @return
997 * - NULL on error, and input message m is left alone
998 * - fr_message_t* on success. Will always be input message m.
999 */
1001{
1002 uint8_t *p;
1003 size_t reserve_size;
1004
1005 (void) talloc_get_type_abort(ms, fr_message_set_t);
1006
1007 /* m is NOT talloc'd */
1008
1009 if (!m) {
1010 m = fr_message_reserve(ms, actual_packet_size); /* will cache align it */
1011 if (!m) return NULL;
1012 }
1013
1015 fr_assert(m->rb != NULL);
1016 fr_assert(m->data != NULL);
1017 fr_assert(m->data_size == 0);
1018 fr_assert(m->rb_size >= actual_packet_size);
1019
1020 /*
1021 * No data to send? Just send a bare message;
1022 */
1023 if (actual_packet_size == 0) {
1024 m->data = NULL;
1025 m->rb = NULL;
1026 m->data_size = m->rb_size = 0;
1027 return m;
1028 }
1029
1030 reserve_size = actual_packet_size;
1032
1034 fr_assert(p != NULL);
1035 if (!p) {
1036 fr_strerror_const_push("Failed allocating from ring buffer");
1037 return NULL;
1038 }
1039
1040 fr_assert(p == m->data);
1041
1042 m->data_size = actual_packet_size;
1043 m->rb_size = reserve_size;
1044
1045 return m;
1046}
1047
1048/** Allocate packet data for a message, and reserve a new message
1049 *
1050 * This function allocates a previously reserved message, and then
1051 * reserves a new message.
1052 *
1053 * The application should call fr_message_reserve() with a large
1054 * buffer, and then read data into the buffer. If the buffer
1055 * contains multiple packets, the application should call
1056 * fr_message_alloc_reserve() repeatedly to allocate the full
1057 * packets, while reserving room for the partial packet.
1058 *
1059 * When the application is determines that there is only one full
1060 * packet, and one partial packet in the buffer, it should call this
1061 * function with actual_packet_size, and a large reserve_size. The
1062 * partial packet will be reserved. If the ring buffer is full, the
1063 * partial packet will be copied to a new ring buffer.
1064 *
1065 * When the application determines that there are multiple full
1066 * packets in the buffer, it should call this function with
1067 * actual_packet_size for each buffer, and reserve_size which
1068 * reserves all of the data in the buffer. i.e. the full packets +
1069 * partial packets, which should start off as the original
1070 * reserve_size.
1071 *
1072 * The application should call this function to allocate each packet,
1073 * while decreasing reserve_size by each actual_packet_size that was
1074 * allocated. Once there is only one full and a partial packet in
1075 * the buffer, it should use a large reserve_size, as above.
1076 *
1077 * The application could just always ecall this function with a large
1078 * reserve_size, at the cost of substantially more memcpy()s.
1079 *
1080 * @param[in] ms the message set
1081 * @param[in] m the message message to allocate packet data for
1082 * @param[in] actual_packet_size to use
1083 * @param[in] leftover "dirty" bytes in the buffer
1084 * @param[in] reserve_size to reserve for new message
1085 * @return
1086 * - NULL on error, and input message m is left alone
1087 * - fr_message_t* on success. Will always be a new message.
1088 */
1090 size_t leftover, size_t reserve_size)
1091{
1092 bool cleaned_up;
1093 uint8_t *p;
1094 fr_message_t *m2;
1095 size_t m_rb_size, align_size;
1096
1097 (void) talloc_get_type_abort(ms, fr_message_set_t);
1098
1099 align_size = actual_packet_size;
1100 CACHE_ALIGN(align_size);
1101
1102 /* m is NOT talloc'd */
1103
1105 fr_assert(m->rb != NULL);
1106 fr_assert(m->data != NULL);
1107 fr_assert(m->rb_size >= actual_packet_size);
1108
1109 p = fr_ring_buffer_alloc(m->rb, align_size);
1110 fr_assert(p != NULL);
1111 if (!p) {
1112 fr_strerror_const_push("Failed allocating from ring buffer");
1113 return NULL;
1114 }
1115
1116 fr_assert(p == m->data);
1117
1118 m_rb_size = m->rb_size; /* for ring buffer cleanups */
1119
1120 m->data_size = actual_packet_size;
1121 m->rb_size = align_size;
1122
1123 /*
1124 * If we've allocated all of the reserved ring buffer
1125 * data, then just reserve a brand new reservation.
1126 *
1127 * This will be automatically cache aligned.
1128 */
1129 if (!leftover) return fr_message_reserve(ms, reserve_size);
1130
1131 /*
1132 * Allocate a new message.
1133 */
1134 m2 = fr_message_get_message(ms, &cleaned_up);
1135 if (!m2) return NULL;
1136
1137 /*
1138 * Ensure that there's enough room to shift the next
1139 * packet, so that it's cache aligned. Moving small
1140 * amounts of memory is likely faster than having two
1141 * CPUs fight over the same cache lines.
1142 */
1143 reserve_size += (align_size - actual_packet_size);
1145
1146 /*
1147 * Track how much data there is in the packet.
1148 */
1149 m2->rb = m->rb;
1150 m2->data_size = leftover;
1151 m2->rb_size = reserve_size;
1152
1153 /*
1154 * Try to extend the reservation. If we can do it,
1155 * return.
1156 */
1158 if (m2->data) {
1159 /*
1160 * The next packet pointer doesn't point to the
1161 * actual data after the current packet. Move
1162 * the next packet to match up with the ring
1163 * buffer allocation.
1164 */
1165 if (m2->data != (m->data + actual_packet_size)) {
1166 memmove(m2->data, m->data + actual_packet_size, leftover);
1167 }
1168 return m2;
1169 }
1170
1171 /*
1172 * We failed reserving more memory at the end of the
1173 * current ring buffer.
1174 *
1175 * Reserve data from a new ring buffer. If it doesn't
1176 * succeed, ensure that the old message will properly
1177 * clean up the old ring buffer.
1178 */
1179 if (!fr_message_get_ring_buffer(ms, m2, false)) {
1180 m->rb_size = m_rb_size;
1181 return NULL;
1182 }
1183
1184 /*
1185 * If necessary, copy the remaining data from the old
1186 * buffer to the new one.
1187 */
1188 if (m2->data != (m->data + actual_packet_size)) {
1189 memmove(m2->data, m->data + actual_packet_size, leftover);
1190 }
1191
1192 /*
1193 * The messages are in different ring buffers. We've
1194 * aligned m->rb_size above for the current packet, but
1195 * there's no subsequent message to clean up this
1196 * reservation. Re-extend the current message to it's
1197 * original size, so that cleaning it up will clean up the ring buffer.
1198 */
1199 if (m2->rb != m->rb) {
1200 m->rb_size = m_rb_size;
1201 return m2;
1202 }
1203
1204 /*
1205 * If we've managed to allocate the next message in the
1206 * current ring buffer, then it really should have
1207 * wrapped around. In which case, re-extend the current
1208 * message as above.
1209 */
1210 if (m2->data < m->data) {
1211 m->rb_size = m_rb_size;
1212 return m2;
1213 }
1214
1215 return m2;
1216}
1217
1218/** Count the number of used messages
1219 *
1220 * @param[in] ms the message set
1221 * @return
1222 * - number of used messages
1223 */
1225{
1226 int i, used;
1227
1228 (void) talloc_get_type_abort(ms, fr_message_set_t);
1229
1230 used = 0;
1231 for (i = 0; i <= ms->mr_max; i++) {
1232 fr_ring_buffer_t *mr;
1233
1234 mr = ms->mr_array[i];
1235
1237 }
1238
1239 return used;
1240}
1241
1242/** Garbage collect the message set.
1243 *
1244 * This function should ONLY be called just before freeing the
1245 * message set. It is intended only for debugging, and will cause
1246 * huge latency spikes if used at run time.
1247 *
1248 * @param[in] ms the message set
1249 */
1251{
1252 int i;
1253
1254 (void) talloc_get_type_abort(ms, fr_message_set_t);
1255
1256 /*
1257 * Manually clean up each message ring.
1258 */
1259 for (i = 0; i <= ms->mr_max; i++) {
1260 (void) fr_message_ring_gc(ms, ms->mr_array[i], INT_MAX);
1261 }
1262
1263 /*
1264 * And then do one last pass to clean up the arrays.
1265 */
1266 fr_message_gc(ms, INT_MAX);
1267}
1268
1269/** Print debug information about the message set.
1270 *
1271 * @param[in] ms the message set
1272 * @param[in] fp the FILE where the messages are printed.
1273 */
1275{
1276 int i;
1277
1278 (void) talloc_get_type_abort(ms, fr_message_set_t);
1279
1280 fprintf(fp, "message arrays = %d\t(current %d)\n", ms->mr_max + 1, ms->mr_current);
1281 fprintf(fp, "ring buffers = %d\t(current %d)\n", ms->rb_max + 1, ms->rb_current);
1282
1283 for (i = 0; i <= ms->mr_max; i++) {
1284 fr_ring_buffer_t *mr = ms->mr_array[i];
1285
1286 fprintf(fp, "messages[%d] =\tsize %zu, used %zu\n",
1288 }
1289
1290 for (i = 0; i <= ms->rb_max; i++) {
1291 fprintf(fp, "ring buffer[%d] =\tsize %zu, used %zu\n",
1293 }
1294}
#define RCSID(id)
Definition build.h:506
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:141
talloc_free(hp)
unsigned char uint8_t
fr_ring_buffer_t * rb_array[MSG_ARRAY_SIZE]
array of ring buffers
Definition message.c:112
size_t message_size
size of the callers message, including fr_message_t
Definition message.c:98
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
Create a message set.
Definition message.c:127
int mr_current
current used message ring entry
Definition message.c:95
#define MPRINT(...)
Definition message.c:37
fr_ring_buffer_t * mr_array[MSG_ARRAY_SIZE]
array of message arrays
Definition message.c:110
static void fr_message_gc(fr_message_set_t *ms, int max_to_clean)
Garbage collect "done" messages.
Definition message.c:357
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:195
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:1000
static fr_message_t * fr_message_get_ring_buffer(fr_message_set_t *ms, fr_message_t *m, bool cleaned_up)
Get a ring buffer for a message.
Definition message.c:798
fr_message_t * fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
Localize a message by copying it to local storage.
Definition message.c:247
static fr_message_t * fr_message_get_message(fr_message_set_t *ms, bool *p_cleaned)
Allocate a fr_message_t, WITHOUT a ring buffer.
Definition message.c:679
int rb_max
max used ring buffer entry
Definition message.c:103
#define MSG_ARRAY_SIZE
Definition message.c:40
static fr_message_t * fr_message_ring_alloc(fr_message_set_t *ms, fr_ring_buffer_t *mr, bool clean)
Allocate a message from a message ring.
Definition message.c:628
int fr_message_set_messages_used(fr_message_set_t *ms)
Count the number of used messages.
Definition message.c:1224
#define CACHE_ALIGN(_x)
Definition message.c:42
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition message.c:1250
int mr_max
max used message ring entry
Definition message.c:96
void fr_message_set_debug(FILE *fp, fr_message_set_t *ms)
Print debug information about the message set.
Definition message.c:1274
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition message.c:946
int mr_cleaned
where we last cleaned
Definition message.c:100
static int fr_message_ring_gc(fr_message_set_t *ms, fr_ring_buffer_t *mr, int max_to_clean)
Clean up messages in a message ring.
Definition message.c:311
fr_message_t * fr_message_alloc_reserve(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size, size_t leftover, size_t reserve_size)
Allocate packet data for a message, and reserve a new message.
Definition message.c:1089
size_t max_allocation
maximum allocation size
Definition message.c:105
int rb_current
current used ring buffer entry
Definition message.c:102
A Message set, composed of message headers and ring buffer data.
Definition message.c:94
fr_ring_buffer_t * rb
pointer to the ring buffer
Definition message.h:48
size_t rb_size
cache-aligned size in the ring buffer
Definition message.h:51
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
@ FR_MESSAGE_USED
Definition message.h:39
@ FR_MESSAGE_LOCALIZED
Definition message.h:40
@ FR_MESSAGE_DONE
Definition message.h:41
@ FR_MESSAGE_FREE
Definition message.h:38
fr_message_status_t status
free, used, done, etc.
Definition message.h:45
static size_t reserve_size
static size_t used
#define fr_assert(_expr)
Definition rad_assert.h:37
static bool cleanup
Definition radsniff.c:59
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:63
uint8_t * fr_ring_buffer_alloc(fr_ring_buffer_t *rb, size_t size)
Mark data as allocated.
uint8_t * fr_ring_buffer_reserve(fr_ring_buffer_t *rb, size_t size)
Reserve room in the ring buffer.
int fr_ring_buffer_free(fr_ring_buffer_t *rb, size_t size_to_free)
Mark data as free,.
size_t fr_ring_buffer_used(fr_ring_buffer_t *rb)
Get the amount of data used in a ring buffer.
size_t fr_ring_buffer_size(fr_ring_buffer_t *rb)
Get the size of the ring buffer.
int fr_ring_buffer_start(fr_ring_buffer_t *rb, uint8_t **p_start, size_t *p_size)
Get a pointer to the data at the start of the ring buffer.
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223