The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
message.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 3c23215e1cdff6a1bb9546a7520a23da7c11a262 $
19 *
20 * @brief Messages for inter-thread communication
21 * @file io/message.c
22 *
23 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24 */
25RCSID("$Id: 3c23215e1cdff6a1bb9546a7520a23da7c11a262 $")
26
27#include <freeradius-devel/io/message.h>
28#include <freeradius-devel/util/strerror.h>
29
30#include <string.h>
31
32/*
33 * Debugging, mainly for message_set_test
34 */
35#if 0
36#define MPRINT(...) fprintf(stderr, __VA_ARGS__)
37#else
38#define MPRINT(...)
39#endif
40
41#define MSG_ARRAY_SIZE (16)
42
43#define CACHE_ALIGN(_x) do { _x += 63; _x &= ~(size_t) 63; } while (0)
44
45/** A Message set, composed of message headers and ring buffer data.
46 *
47 * A message set is intended to send short-lived messages. The
48 * message headers are fixed in size, and allocated from an array
49 * which is treated like a circular buffer. Message bodies (i.e. raw
50 * packets) are variable in size, and live in a separate ring buffer.
51 *
52 * The message set starts off with a small array of message headers,
53 * and a small ring buffer. If an array/buffer fills up, a new one
54 * is allocated at double the size of the previous one.
55 *
56 * The array / buffers are themselves kept in fixed-size arrays, of
57 * MSG_ARRAY_SIZE. The reason is that the memory for fr_message_set_t
58 * should be contiguous, and not in a linked list scattered in
59 * memory.
60 *
61 * The originator allocates a message, and sends it to a recipient.
62 * The recipient (usually in another thread) uses the message, and
63 * marks it as FR_MESSAGE_DONE. The originator then asynchronously
64 * cleans up the message.
65 *
66 * This asynchronous cleanup is done via self-clocking. If there is
67 * no need to clean up the messages, it isn't done. Only when we run
68 * out of space to store messages (or packets) is the cleanup done.
69 *
70 * This cleanup latency ensures that we don't have cache line
71 * bouncing, where the originator sends the message, and then while
72 * the recipieent is reading it... thrashes the cache line with
73 * checks for "are you done? Are you done?"
74 *
75 * If there are more than one used entry in either array, we then try
76 * to coalesce the buffers on cleanup. If we discover that one array
77 * is empty, we discard it, and move the used array entries into it's
78 * place.
79 *
80 * This process ensures that we don't have too many buffers in
81 * progress. It is better to have a few large buffers than many
82 * small ones.
83 *
84 * MSG_ARRAY_SIZE is defined to be large (16 doublings) to allow for
85 * the edge case where messages are stuck for long periods of time.
86 *
87 * With an initial message array size of 64, this gives us room for
88 * 2M packets, if *all* of the mr_array entries have packets stuck in
89 * them that aren't cleaned up for extended periods of time.
90 *
91 * @todo Add a flag for UDP-style protocols, where we can put the
92 * message into the ring buffer. This helps with locality of
93 * reference, and removes the need to track two separate things.
94 */
96 int mr_current; //!< current used message ring entry
97 int mr_max; //!< max used message ring entry
98
99 size_t message_size; //!< size of the callers message, including fr_message_t
100
101 int mr_cleaned; //!< where we last cleaned
102
103 int rb_current; //!< current used ring buffer entry
104 int rb_max; //!< max used ring buffer entry
105
106 size_t max_allocation; //!< maximum allocation size
107
109 int freed;
110
111 fr_ring_buffer_t *mr_array[MSG_ARRAY_SIZE]; //!< array of message arrays
112
113 fr_ring_buffer_t *rb_array[MSG_ARRAY_SIZE]; //!< array of ring buffers
114};
115
116
117/** Create a message set
118 *
119 * @param[in] ctx the context for talloc
120 * @param[in] num_messages size of the initial message array. MUST be a power of 2.
121 * @param[in] message_size the size of each message, INCLUDING fr_message_t, which MUST be at the start of the struct
122 * @param[in] ring_buffer_size of the ring buffer. MUST be a power of 2.
123 * @param[in] unlimited_size allow any message size to be allocated. If false it is limited to ring_buffer_size / 2.
124 * @return
125 * - NULL on error
126 * - newly allocated fr_message_set_t on success
127 */
128fr_message_set_t *fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size,
129 bool unlimited_size)
130{
132
133 /*
134 * Too small, or not a power of 2.
135 */
136 if (num_messages < 8) num_messages = 8;
137
138 if ((num_messages & (num_messages - 1)) != 0) {
139 fr_strerror_const("Number of messages must be a power of 2");
140 return NULL;
141 }
142
143 if (message_size < sizeof(fr_message_t)) {
144 fr_strerror_printf("Message size must be at least %zd", sizeof(fr_message_t));
145 return NULL;
146 }
147
148 if (message_size > 1024) {
149 fr_strerror_const("Message size must be no larger than 1024");
150 return NULL;
151 }
152
153 ms = talloc_zero(ctx, fr_message_set_t);
154 if (!ms) {
155 fr_strerror_const("Failed allocating memory");
156 return NULL;
157 }
158
159 CACHE_ALIGN(message_size);
160 ms->message_size = message_size;
161
162 ms->rb_array[0] = fr_ring_buffer_create(ms, ring_buffer_size);
163 if (!ms->rb_array[0]) {
164 talloc_free(ms);
165 return NULL;
166 }
167 ms->rb_max = 0;
168
169 ms->mr_array[0] = fr_ring_buffer_create(ms, num_messages * message_size);
170 if (!ms->mr_array[0]) {
171 talloc_free(ms);
172 return NULL;
173 }
174
175 /*
176 * If unlimited size is allowed, set the max allocation to 1 << 29
177 * which is based on the maximum ring buffer reservation of 1 << 30
178 */
179 ms->max_allocation = unlimited_size ? 1 << 29 : ring_buffer_size / 2;
180
181 return ms;
182}
183
184
185/** Mark a message as done
186 *
187 * Note that this call is usually done from a thread OTHER than the
188 * originator of the message. As such, the message is NOT actually
189 * freed. Instead, it is just marked as freed.
190 *
191 * @param[in] m the message to make as done.
192 * @return
193 * - <0 on error
194 * - 0 on success
195 */
197{
200
201 /*
202 * Mark a message as freed. The originator will take
203 * care of cleaning it up.
204 */
205 if (m->status == FR_MESSAGE_USED) {
207 return 0;
208 }
209
210 /*
211 * This message was localized, so we can free it via
212 * talloc.
213 */
214 if (m->status == FR_MESSAGE_LOCALIZED) {
215 talloc_free(m);
216 return 0;
217 }
218
219 /*
220 * A catastrophic error.
221 */
222 fr_assert(0 == 1);
223
224 fr_strerror_const("Failed marking message as done");
225 return -1;
226}
227
228
229/** Localize a message by copying it to local storage
230 *
231 * This function "localizes" a message by copying it to local
232 * storage. In the case where the recipient of a message has to sit
233 * on it for a while, that blocks the originator from cleaning up the
234 * message. The recipient can then copy the message to local
235 * storage, so that the originator can clean it up.
236 *
237 * The localized message is marked as FR_MESSAGE_LOCALIZED, so that
238 * the recipient can call the normal fr_message_done() function to
239 * free it.
240 *
241 * @param[in] ctx the talloc context to use for localization
242 * @param[in] m the message to be localized
243 * @param[in] message_size the size of the message, including the fr_message_t
244 * @return
245 * - NULL on allocation error
246 * - a newly localized message
247 */
248fr_message_t *fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
249{
250 fr_message_t *l;
251
252 if (m->status != FR_MESSAGE_USED) {
253 fr_strerror_const("Cannot localize message unless it is in use");
254 return NULL;
255 }
256
257 if (message_size <= sizeof(fr_message_t)) {
258 fr_strerror_const("Message size is too small");
259 return NULL;
260 }
261
262 l = talloc_memdup(ctx, m, message_size);
263 if (!l) {
264 nomem:
265 fr_strerror_const("Failed allocating memory");
266 return NULL;
267 }
268
269 l->data = NULL;
270
271 if (l->data_size) {
272 l->data = talloc_memdup(l, m->data, l->data_size);
273 if (!l->data) {
274 talloc_free(l);
275 goto nomem;
276 }
277 }
278
280
281 /*
282 * After this change, "m" should not be used for
283 * anything.
284 */
286
287 /*
288 * Now clean up the other fields of the newly localized
289 * message.
290 */
291 l->rb = NULL;
292 l->rb_size = 0;
293
294 return l;
295}
296
297
298/** Clean up messages in a message ring.
299 *
300 * Find the oldest messages which are marked FR_MESSAGE_DONE,
301 * and mark them FR_MESSAGE_FREE.
302 *
303 * FIXME: If we care, track which ring buffer is in use, and how
304 * many contiguous chunks we can free. Then, free the chunks at
305 * once, instead of piecemeal. Realistically tho... this will
306 * probably make little difference.
307 *
308 * @param[in] ms the message set
309 * @param[in] mr the message ring
310 * @param[in] max_to_clean maximum number of messages to clean at a time.
311 */
312static int fr_message_ring_gc(fr_message_set_t *ms, fr_ring_buffer_t *mr, int max_to_clean)
313{
314 int messages_cleaned = 0;
315 size_t size;
316 fr_message_t *m;
317
318 while (true) {
319 (void) fr_ring_buffer_start(mr, (uint8_t **) &m, &size);
320 if (size == 0) break;
321
322 fr_assert(m != NULL);
323 fr_assert(size >= ms->message_size);
324
326 if (m->status != FR_MESSAGE_DONE) break;
327
328 messages_cleaned++;
330 ms->freed++;
331
332 if (m->rb) {
333 (void) fr_ring_buffer_free(m->rb, m->rb_size);
334#ifndef NDEBUG
335 memset(m, 0, ms->message_size);
336#endif
337 }
338
340
341 if (messages_cleaned >= max_to_clean) break;
342 }
343
344 MPRINT("CLEANED %d (%p) left\n", messages_cleaned, mr);
345 return messages_cleaned;
346}
347
348
349/** Garbage collect "done" messages.
350 *
351 * Called only from the originating thread. We also clean a limited
352 * number of messages at a time, so that we don't have sudden latency
353 * spikes when cleaning 1M messages.
354 *
355 * @param[in] ms the message set
356 * @param[in] max_to_clean the maximum number of messages to clean
357 */
358static void fr_message_gc(fr_message_set_t *ms, int max_to_clean)
359{
360 int i;
361 int arrays_freed, arrays_used, empty_slot;
362 int largest_free_slot;
363 int total_cleaned;
364 size_t largest_free_size;
365
366 /*
367 * Clean up "done" messages.
368 */
369 total_cleaned = 0;
370
371 /*
372 * Garbage collect the smaller buffers first.
373 */
374 for (i = 0; i <= ms->mr_max; i++) {
375 int cleaned;
376
377 cleaned = fr_message_ring_gc(ms, ms->mr_array[i], max_to_clean - total_cleaned);
378 total_cleaned += cleaned;
379 fr_assert(total_cleaned <= max_to_clean);
380
381 /*
382 * Stop when we've reached our GC limit.
383 */
384 if (total_cleaned == max_to_clean) break;
385 }
386
387 /*
388 * Couldn't GC anything. Don't do more work.
389 */
390 if (total_cleaned == 0) return;
391
392 arrays_freed = 0;
393 arrays_used = 0;
394
395 /*
396 * Keep the two largest message buffers (used or not),
397 * and free all smaller ones which are empty.
398 */
399 for (i = ms->mr_max; i >= 0; i--) {
400 fr_assert(ms->mr_array[i] != NULL);
401
402 if (arrays_used < 2) {
403 MPRINT("\tleaving entry %d alone\n", i);
404 arrays_used++;
405 continue;
406 }
407
408 /*
409 * If the message ring buffer is empty, check if
410 * we should perhaps delete it.
411 */
412 if (fr_ring_buffer_used(ms->mr_array[i]) == 0) {
413 MPRINT("\tfreeing entry %d\n", i);
414 TALLOC_FREE(ms->mr_array[i]);
415 arrays_freed++;
416 continue;
417 }
418
419 MPRINT("\tstill in use entry %d\n", i);
420 }
421
422 /*
423 * Some entries have been freed. We need to coalesce the
424 * remaining entries.
425 */
426 if (arrays_freed) {
427 MPRINT("TRYING TO PACK from %d free arrays out of %d\n", arrays_freed, ms->rb_max + 1);
428
429 empty_slot = -1;
430
431 /*
432 * Pack the rb array by moving used entries to
433 * the bottom of the array.
434 */
435 for (i = 0; i <= ms->mr_max; i++) {
436 int j;
437
438 /*
439 * Skip over empty entries, but set
440 * "empty_slot" to the first empty on we
441 * found.
442 */
443 if (!ms->mr_array[i]) {
444 if (empty_slot < 0) empty_slot = i;
445
446 continue;
447 }
448
449 /*
450 * This array entry is used, but there is
451 * no empty slot to put it into. Ignore
452 * it, and continue
453 */
454 if (empty_slot < 0) continue;
455
456 fr_assert(ms->mr_array[empty_slot] == NULL);
457
458 ms->mr_array[empty_slot] = ms->mr_array[i];
459 ms->mr_array[i] = NULL;
460
461 /*
462 * Find the next empty slot which is
463 * greater than the one we just used.
464 */
465 for (j = empty_slot + 1; j <= i; j++) {
466 if (!ms->mr_array[j]) {
467 empty_slot = j;
468 break;
469 }
470 }
471 }
472
473 /*
474 * Lower max, and set current to the largest
475 * array, whether or not it's used.
476 */
477 ms->mr_max -= arrays_freed;
478 ms->mr_current = ms->mr_max;
479
480#ifndef NDEBUG
481 MPRINT("NUM RB ARRAYS NOW %d\n", ms->mr_max + 1);
482 for (i = 0; i <= ms->mr_max; i++) {
483 MPRINT("\t%d %p\n", i, ms->mr_array[i]);
484 fr_assert(ms->mr_array[i] != NULL);
485 }
486#endif
487 }
488
489 /*
490 * And now we do the same thing for the ring buffers.
491 * Except that freeing the messages above also cleaned up
492 * the contents of each ring buffer, so all we need to do
493 * is find the largest empty ring buffer.
494 *
495 * We do this by keeping the two largest ring buffers
496 * (used or not), and then freeing all smaller ones which
497 * are empty.
498 */
499 arrays_used = 0;
500 arrays_freed = 0;
501 MPRINT("TRYING TO FREE ARRAYS %d\n", ms->rb_max);
502 for (i = ms->rb_max; i >= 0; i--) {
503 fr_assert(ms->rb_array[i] != NULL);
504
505 if (arrays_used < 2) {
506 MPRINT("\tleaving entry %d alone\n", i);
507 arrays_used++;
508 continue;
509 }
510
511 if (fr_ring_buffer_used(ms->rb_array[i]) == 0) {
512 MPRINT("\tfreeing entry %d\n", i);
513 TALLOC_FREE(ms->rb_array[i]);
514 arrays_freed++;
515 continue;
516 }
517
518 MPRINT("\tstill in use entry %d\n", i);
519 }
520
521 /*
522 * Pack the array entries back down.
523 */
524 if (arrays_freed > 0) {
525 MPRINT("TRYING TO PACK from %d free arrays out of %d\n", arrays_freed, ms->rb_max + 1);
526
527 empty_slot = -1;
528
529 /*
530 * Pack the rb array by moving used entries to
531 * the bottom of the array.
532 */
533 for (i = 0; i <= ms->rb_max; i++) {
534 int j;
535
536 /*
537 * Skip over empty entries, but set
538 * "empty_slot" to the first empty on we
539 * found.
540 */
541 if (!ms->rb_array[i]) {
542 if (empty_slot < 0) empty_slot = i;
543
544 continue;
545 }
546
547 /*
548 * This array entry is used, but there is
549 * no empty slot to put it into. Ignore
550 * it, and continue
551 */
552 if (empty_slot < 0) continue;
553
554 fr_assert(ms->rb_array[empty_slot] == NULL);
555
556 ms->rb_array[empty_slot] = ms->rb_array[i];
557 ms->rb_array[i] = NULL;
558
559 /*
560 * Find the next empty slot which is
561 * greater than the one we just used.
562 */
563 for (j = empty_slot + 1; j <= i; j++) {
564 if (!ms->rb_array[j]) {
565 empty_slot = j;
566 break;
567 }
568 }
569 }
570
571 /*
572 * Lower max, and set current to the largest
573 * array, whether or not it's used.
574 */
575 ms->rb_max -= arrays_freed;
576 ms->rb_current = ms->rb_max;
577
578#ifndef NDEBUG
579 MPRINT("NUM RB ARRAYS NOW %d\n", ms->rb_max + 1);
580 for (i = 0; i <= ms->rb_max; i++) {
581 MPRINT("\t%d %p\n", i, ms->rb_array[i]);
582 fr_assert(ms->rb_array[i] != NULL);
583 }
584#endif
585 }
586
587 /*
588 * Set the current ring buffer to the one with the
589 * largest free space in it.
590 *
591 * This is different from the allocation strategy for
592 * messages.
593 */
594 if (!fr_cond_assert(ms->rb_array[ms->rb_max] != NULL)) return;
595
596 largest_free_slot = ms->rb_max;
597 largest_free_size = (fr_ring_buffer_size(ms->rb_array[ms->rb_max]) -
599
600 for (i = 0; i < ms->rb_max; i++) {
601 size_t free_size;
602
603 fr_assert(ms->rb_array[i] != NULL);
604
605 free_size = (fr_ring_buffer_size(ms->rb_array[i]) -
607 if (largest_free_size < free_size) {
608 largest_free_slot = i;
609 largest_free_size = free_size;
610 }
611 }
612
613 ms->rb_current = largest_free_slot;
614 fr_assert(ms->rb_current >= 0);
615 fr_assert(ms->rb_current <= ms->rb_max);
616}
617
618/** Allocate a message from a message ring.
619 *
620 * The newly allocated message is zeroed.
621 *
622 * @param[in] ms the message set
623 * @param[in] mr the message ring to allocate from
624 * @param[in] clean whether to clean the message ring
625 * @return
626 * - NULL on failed allocation
627 * - fr_message_t* on successful allocation.
628 */
630{
631 fr_message_t *m;
632
633 /*
634 * We're at the start of a buffer with data, and there's
635 * no room. Do a quick check to see if we can free up
636 * the oldest entry. If not, return.
637 *
638 * Otherwise, fall through to allocating a entry, of
639 * which there must now be at least one free one.
640 *
641 * This check results in a small amount of cache line
642 * thrashing. But if the buffer is full, it's likely
643 * that the oldest entry can be freed. If not, we have a
644 * small amount of cache thrashing, which should be
645 * extremely rare.
646 */
647 if (clean) {
648 if (fr_message_ring_gc(ms, mr, 4) == 0) {
649 fr_strerror_const("No free memory after GC attempt");
650 return NULL;
651 }
652
653 /*
654 * Else we cleaned up some entries in this array.
655 * Go allocate a message.
656 */
657 }
658
659 /*
660 * Grab a new message from the underlying ring buffer.
661 */
663 if (!m) return NULL;
664
665#ifndef NDEBUG
666 memset(m, 0, ms->message_size);
667#endif
669 return m;
670}
671
672/** Allocate a fr_message_t, WITHOUT a ring buffer.
673 *
674 * @param[in] ms the message set
675 * @param[out] p_cleaned a flag to indicate if we cleaned the message array
676 * @return
677 * - NULL on error
678 * - fr_message_t* on success
679 */
681{
682 int i;
683 fr_message_t *m;
685
686 ms->allocated++;
687 *p_cleaned = false;
688
689 /*
690 * Grab the current message array. In the general case,
691 * there's room, so we grab a message and go find a ring
692 * buffer.
693 */
694 mr = ms->mr_array[ms->mr_current];
696 if (m) {
697 memset(m, 0, ms->message_size);
699 MPRINT("ALLOC normal\n");
700 return m;
701 }
702
703 MPRINT("CLEANING UP (%zd - %zd = %zd)\n", ms->allocated, ms->freed,
704 ms->allocated - ms->freed);
705
706 /*
707 * Else the buffer is full. Do a global cleanup.
708 */
709 fr_message_gc(ms, 128);
710 *p_cleaned = true;
711
712 /*
713 * If we're lucky, the cleanup has given us a new
714 * "current" buffer, which is empty. If so, use it.
715 */
716 mr = ms->mr_array[ms->mr_current];
717 m = fr_message_ring_alloc(ms, mr, true);
718 if (m) {
719 MPRINT("ALLOC after cleanup\n");
720 return m;
721 }
722
723 /*
724 * We've tried two allocations, and both failed. Brute
725 * force over all arrays, trying to allocate one
726 * somewhere... anywhere. We start from the largest
727 * array, because that is the one we want to use the
728 * most.
729 *
730 * We want to avoid allocations in the smallest array,
731 * because that array will quickly wrap, and will cause
732 * us to do cleanups more often. That also lets old
733 * entries in the smallest array age out, so that we can
734 * free the smallest arrays.
735 */
736 for (i = ms->mr_max; i >= 0; i--) {
737 mr = ms->mr_array[i];
738
739 m = fr_message_ring_alloc(ms, mr, true);
740 if (m) {
741 ms->mr_current = i;
742 MPRINT("ALLOC from changed ring buffer\n");
743 MPRINT("SET MR to changed %d\n", ms->mr_current);
744 return m;
745 }
746 }
747
748 /*
749 * All of the arrays are full. If we don't have
750 * room to allocate another array, we're dead.
751 */
752 if ((ms->mr_max + 1) >= MSG_ARRAY_SIZE) {
753 fr_strerror_const("All message arrays are full");
754 return NULL;
755 }
756
757 /*
758 * Allocate another message ring, double the size
759 * of the previous maximum.
760 */
762 if (!mr) {
763 fr_strerror_const_push("Failed allocating ring buffer");
764 return NULL;
765 }
766
767 /*
768 * Set the new one as current for all new
769 * allocations, allocate a message, and go try to
770 * reserve room for the raw packet data.
771 */
772 ms->mr_max++;
773 ms->mr_current = ms->mr_max;
774 ms->mr_array[ms->mr_max] = mr;
775
776 MPRINT("SET MR to doubled %d\n", ms->mr_current);
777
778 /*
779 * And we should now have an entirely empty message ring.
780 */
781 m = fr_message_ring_alloc(ms, mr, false);
782 if (!m) return NULL;
783
784 MPRINT("ALLOC after doubled message ring\n");
785
786 return m;
787}
788
789
790/** Get a ring buffer for a message
791 *
792 * @param[in] ms the message set
793 * @param[in] m the message
794 * @param[in] cleaned_up whether the message set was partially garbage collected
795 * @return
796 * - NULL on error, and m is deallocated
797 * - m on success
798 */
800 bool cleaned_up)
801{
802 int i;
804 size_t alloc_size;
805
806 /*
807 * And... we go through a bunch of hoops, all over again.
808 */
809 m->rb = ms->rb_array[ms->rb_current];
810 fr_assert(m->rb != NULL);
812 if (m->data) return m;
813
814 /*
815 * When the simple allocation fails, ensure we don't do
816 * the cleanup twice in one allocation.
817 */
818 if (!cleaned_up) {
819 /*
820 * If we run out of room in the current ring
821 * buffer, AND it's our only one, then just
822 * double it in size.
823 */
824 if (ms->rb_max == 0) goto alloc_rb;
825
826 /*
827 * We're using multiple ring buffers, and we
828 * haven't already done a cleanup. Force a
829 * cleanup.
830 */
831 MPRINT("CLEANED UP BECAUSE OF RING BUFFER (%zd - %zd = %zd)\n", ms->allocated, ms->freed,
832 ms->allocated - ms->freed);
833
834 fr_message_gc(ms, 128);
835
836 /*
837 * Try to allocate the packet from the newly current ring buffer.
838 */
839 m->rb = ms->rb_array[ms->rb_current];
840 fr_assert(m->rb != NULL);
842 if (m->data) return m;
843
844 MPRINT("CLEANUP RING BUFFER FAILED\n");
845 }
846
847 /*
848 * We've tried two allocations, and both failed. Brute
849 * force over all arrays, trying to allocate one
850 * somewhere... anywhere. We start from the largest
851 * array, because that is the one we want to use the
852 * most.
853 *
854 * We want to avoid allocations in the smallest array,
855 * because that array will quickly wrap, and will cause
856 * us to do cleanups more often. That also lets old
857 * entries in the smallest array age out, so that we can
858 * free the smallest arrays.
859 */
860 for (i = ms->rb_max; i >= 0; i--) {
861 m->rb = ms->rb_array[i];
862 fr_assert(m->rb != NULL);
864 if (m->data) {
865 MPRINT("MOVED TO RING BUFFER %d\n", i);
866 ms->rb_current = i;
867 return m;
868 }
869 }
870
871 /*
872 * All of the arrays are full. If we don't have
873 * room to allocate another array, we're dead.
874 */
875 if ((ms->rb_max + 1) >= MSG_ARRAY_SIZE) {
876 fr_strerror_const("Message arrays are full");
877 goto cleanup;
878 }
879
880alloc_rb:
881 /*
882 * Allocate another message ring, double the size
883 * of the previous maximum, or large enough to hold
884 * twice the requested size if that is larger.
885 * fr_ring_buffer_create will round to the next power of 2.
886 */
887 alloc_size = fr_ring_buffer_size(ms->rb_array[ms->rb_max]) * 2;
888 if (alloc_size < m->rb_size) {
889 alloc_size = m->rb_size * 2;
890 }
891 rb = fr_ring_buffer_create(ms, alloc_size);
892 if (!rb) {
893 fr_strerror_const_push("Failed allocating ring buffer");
894 goto cleanup;
895 }
896
897 MPRINT("RING BUFFER DOUBLES\n");
898
899 /*
900 * Set the new one as current for all new
901 * allocations, allocate a message, and go try to
902 * reserve room for the raw packet data.
903 */
904 ms->rb_max++;
905 ms->rb_current = ms->rb_max;
906 ms->rb_array[ms->rb_current] = rb;
907
908 /*
909 * And we should now have an entirely empty message ring.
910 */
911 m->rb = rb;
913 if (m->data) return m;
914
915cleanup:
916 MPRINT("OUT OF MEMORY\n");
917
918 m->rb = NULL;
920 return NULL;
921}
922
923
924/** Reserve a message
925 *
926 * A later call to fr_message_alloc() will allocate the correct
927 * packet ring buffer size. This call just allocates a message
928 * header, and reserves space for the packet.
929 *
930 * If the caller later decides that the message is not needed, he
931 * should call fr_message_free() to free the message.
932 *
933 * We assume that the caller will call fr_message_reserve(), and then
934 * almost immediately fr_message_alloc(). Multiple calls in series
935 * to fr_message_reserve() MUST NOT be done. The caller could also
936 * just call fr_ring_buffer_alloc(m->rb, size) if they wanted, and
937 * then update m->data_size by hand...
938 *
939 * The message is returned
940 *
941 * @param[in] ms the message set
942 * @param[in] reserve_size to reserve
943 * @return
944 * - NULL on error
945 * - fr_message_t* on success
946 */
948{
949 bool cleaned_up;
950 fr_message_t *m;
951
952 (void) talloc_get_type_abort(ms, fr_message_set_t);
953
954 if (reserve_size > ms->max_allocation) {
955 fr_strerror_printf("Cannot reserve %zd > max allocation %zd\n", reserve_size, ms->max_allocation);
956 return NULL;
957 }
958
959 /*
960 * Allocate a bare message.
961 */
962 m = fr_message_get_message(ms, &cleaned_up);
963 if (!m) {
964 MPRINT("Failed to reserve message\n");
965 return NULL;
966 }
967
968 /*
969 * If the caller is not allocating any packet data, just
970 * return the empty message.
971 */
972 if (!reserve_size) return m;
973
974 /*
975 * We leave m->data_size as zero, and m->rb_size as the
976 * reserved size. This indicates that the message has
977 * reserved room for the packet data, but nothing has
978 * been allocated.
979 */
982
983 return fr_message_get_ring_buffer(ms, m, cleaned_up);
984}
985
986/** Allocate packet data for a message
987 *
988 * The caller will normally call fr_message_reserve() before calling
989 * this function, and pass the resulting message 'm' here. If 'm' is
990 * NULL, however, this function will call fr_message_reserve() of
991 * 'actual_packet_size'. This capability is there for callers who
992 * know the size of the message in advance.
993 *
994 * @param[in] ms the message set
995 * @param[in] m the message message to allocate packet data for
996 * @param[in] actual_packet_size to use
997 * @return
998 * - NULL on error, and input message m is left alone
999 * - fr_message_t* on success. Will always be input message m.
1000 */
1002{
1003 uint8_t *p;
1004 size_t reserve_size;
1005
1006 (void) talloc_get_type_abort(ms, fr_message_set_t);
1007
1008 /* m is NOT talloc'd */
1009
1010 if (!m) {
1011 m = fr_message_reserve(ms, actual_packet_size); /* will cache align it */
1012 if (!m) return NULL;
1013 }
1014
1016 fr_assert(m->rb != NULL);
1017 fr_assert(m->data != NULL);
1018 fr_assert(m->data_size == 0);
1019 fr_assert(m->rb_size >= actual_packet_size);
1020
1021 /*
1022 * No data to send? Just send a bare message;
1023 */
1024 if (actual_packet_size == 0) {
1025 m->data = NULL;
1026 m->rb = NULL;
1027 m->data_size = m->rb_size = 0;
1028 return m;
1029 }
1030
1031 reserve_size = actual_packet_size;
1033
1035 fr_assert(p != NULL);
1036 if (!p) {
1037 fr_strerror_const_push("Failed allocating from ring buffer");
1038 return NULL;
1039 }
1040
1041 fr_assert(p == m->data);
1042
1043 m->data_size = actual_packet_size;
1044 m->rb_size = reserve_size;
1045
1046 return m;
1047}
1048
1049/** Allocate packet data for a message, and reserve a new message
1050 *
1051 * This function allocates a previously reserved message, and then
1052 * reserves a new message.
1053 *
1054 * The application should call fr_message_reserve() with a large
1055 * buffer, and then read data into the buffer. If the buffer
1056 * contains multiple packets, the application should call
1057 * fr_message_alloc_reserve() repeatedly to allocate the full
1058 * packets, while reserving room for the partial packet.
1059 *
1060 * When the application is determines that there is only one full
1061 * packet, and one partial packet in the buffer, it should call this
1062 * function with actual_packet_size, and a large reserve_size. The
1063 * partial packet will be reserved. If the ring buffer is full, the
1064 * partial packet will be copied to a new ring buffer.
1065 *
1066 * When the application determines that there are multiple full
1067 * packets in the buffer, it should call this function with
1068 * actual_packet_size for each buffer, and reserve_size which
1069 * reserves all of the data in the buffer. i.e. the full packets +
1070 * partial packets, which should start off as the original
1071 * reserve_size.
1072 *
1073 * The application should call this function to allocate each packet,
1074 * while decreasing reserve_size by each actual_packet_size that was
1075 * allocated. Once there is only one full and a partial packet in
1076 * the buffer, it should use a large reserve_size, as above.
1077 *
1078 * The application could just always ecall this function with a large
1079 * reserve_size, at the cost of substantially more memcpy()s.
1080 *
1081 * @param[in] ms the message set
1082 * @param[in] m the message message to allocate packet data for
1083 * @param[in] actual_packet_size to use
1084 * @param[in] leftover "dirty" bytes in the buffer
1085 * @param[in] reserve_size to reserve for new message
1086 * @return
1087 * - NULL on error, and input message m is left alone
1088 * - fr_message_t* on success. Will always be a new message.
1089 */
1091 size_t leftover, size_t reserve_size)
1092{
1093 bool cleaned_up;
1094 uint8_t *p;
1095 fr_message_t *m2;
1096 size_t m_rb_size, align_size;
1097
1098 (void) talloc_get_type_abort(ms, fr_message_set_t);
1099
1100 align_size = actual_packet_size;
1101 CACHE_ALIGN(align_size);
1102
1103 /* m is NOT talloc'd */
1104
1106 fr_assert(m->rb != NULL);
1107 fr_assert(m->data != NULL);
1108 fr_assert(m->rb_size >= actual_packet_size);
1109
1110 p = fr_ring_buffer_alloc(m->rb, align_size);
1111 fr_assert(p != NULL);
1112 if (!p) {
1113 fr_strerror_const_push("Failed allocating from ring buffer");
1114 return NULL;
1115 }
1116
1117 fr_assert(p == m->data);
1118
1119 m_rb_size = m->rb_size; /* for ring buffer cleanups */
1120
1121 m->data_size = actual_packet_size;
1122 m->rb_size = align_size;
1123
1124 /*
1125 * If we've allocated all of the reserved ring buffer
1126 * data, then just reserve a brand new reservation.
1127 *
1128 * This will be automatically cache aligned.
1129 */
1130 if (!leftover) return fr_message_reserve(ms, reserve_size);
1131
1132 /*
1133 * Allocate a new message.
1134 */
1135 m2 = fr_message_get_message(ms, &cleaned_up);
1136 if (!m2) return NULL;
1137
1138 /*
1139 * Ensure that there's enough room to shift the next
1140 * packet, so that it's cache aligned. Moving small
1141 * amounts of memory is likely faster than having two
1142 * CPUs fight over the same cache lines.
1143 */
1144 reserve_size += (align_size - actual_packet_size);
1146
1147 /*
1148 * Track how much data there is in the packet.
1149 */
1150 m2->rb = m->rb;
1151 m2->data_size = leftover;
1152 m2->rb_size = reserve_size;
1153
1154 /*
1155 * Try to extend the reservation. If we can do it,
1156 * return.
1157 */
1159 if (m2->data) {
1160 /*
1161 * The next packet pointer doesn't point to the
1162 * actual data after the current packet. Move
1163 * the next packet to match up with the ring
1164 * buffer allocation.
1165 */
1166 if (m2->data != (m->data + actual_packet_size)) {
1167 memmove(m2->data, m->data + actual_packet_size, leftover);
1168 }
1169 return m2;
1170 }
1171
1172 /*
1173 * We failed reserving more memory at the end of the
1174 * current ring buffer.
1175 *
1176 * Reserve data from a new ring buffer. If it doesn't
1177 * succeed, ensure that the old message will properly
1178 * clean up the old ring buffer.
1179 */
1180 if (!fr_message_get_ring_buffer(ms, m2, false)) {
1181 m->rb_size = m_rb_size;
1182 return NULL;
1183 }
1184
1185 /*
1186 * If necessary, copy the remaining data from the old
1187 * buffer to the new one.
1188 */
1189 if (m2->data != (m->data + actual_packet_size)) {
1190 memmove(m2->data, m->data + actual_packet_size, leftover);
1191 }
1192
1193 /*
1194 * The messages are in different ring buffers. We've
1195 * aligned m->rb_size above for the current packet, but
1196 * there's no subsequent message to clean up this
1197 * reservation. Re-extend the current message to it's
1198 * original size, so that cleaning it up will clean up the ring buffer.
1199 */
1200 if (m2->rb != m->rb) {
1201 m->rb_size = m_rb_size;
1202 return m2;
1203 }
1204
1205 /*
1206 * If we've managed to allocate the next message in the
1207 * current ring buffer, then it really should have
1208 * wrapped around. In which case, re-extend the current
1209 * message as above.
1210 */
1211 if (m2->data < m->data) {
1212 m->rb_size = m_rb_size;
1213 return m2;
1214 }
1215
1216 return m2;
1217}
1218
1219/** Count the number of used messages
1220 *
1221 * @param[in] ms the message set
1222 * @return
1223 * - number of used messages
1224 */
1226{
1227 int i, used;
1228
1229 (void) talloc_get_type_abort(ms, fr_message_set_t);
1230
1231 used = 0;
1232 for (i = 0; i <= ms->mr_max; i++) {
1233 fr_ring_buffer_t *mr;
1234
1235 mr = ms->mr_array[i];
1236
1238 }
1239
1240 return used;
1241}
1242
1243/** Garbage collect the message set.
1244 *
1245 * This function should ONLY be called just before freeing the
1246 * message set. It is intended only for debugging, and will cause
1247 * huge latency spikes if used at run time.
1248 *
1249 * @param[in] ms the message set
1250 */
1252{
1253 int i;
1254
1255 (void) talloc_get_type_abort(ms, fr_message_set_t);
1256
1257 /*
1258 * Manually clean up each message ring.
1259 */
1260 for (i = 0; i <= ms->mr_max; i++) {
1261 (void) fr_message_ring_gc(ms, ms->mr_array[i], INT_MAX);
1262 }
1263
1264 /*
1265 * And then do one last pass to clean up the arrays.
1266 */
1267 fr_message_gc(ms, INT_MAX);
1268}
1269
1270/** Print debug information about the message set.
1271 *
1272 * @param[in] ms the message set
1273 * @param[in] fp the FILE where the messages are printed.
1274 */
1276{
1277 int i;
1278
1279 (void) talloc_get_type_abort(ms, fr_message_set_t);
1280
1281 fprintf(fp, "message arrays = %d\t(current %d)\n", ms->mr_max + 1, ms->mr_current);
1282 fprintf(fp, "ring buffers = %d\t(current %d)\n", ms->rb_max + 1, ms->rb_current);
1283
1284 for (i = 0; i <= ms->mr_max; i++) {
1285 fr_ring_buffer_t *mr = ms->mr_array[i];
1286
1287 fprintf(fp, "messages[%d] =\tsize %zu, used %zu\n",
1289 }
1290
1291 for (i = 0; i <= ms->rb_max; i++) {
1292 fprintf(fp, "ring buffer[%d] =\tsize %zu, used %zu\n",
1294 }
1295}
#define RCSID(id)
Definition build.h:487
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:131
talloc_free(hp)
unsigned char uint8_t
fr_ring_buffer_t * rb_array[MSG_ARRAY_SIZE]
array of ring buffers
Definition message.c:113
size_t message_size
size of the callers message, including fr_message_t
Definition message.c:99
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
Create a message set.
Definition message.c:128
int mr_current
current used message ring entry
Definition message.c:96
#define MPRINT(...)
Definition message.c:38
fr_ring_buffer_t * mr_array[MSG_ARRAY_SIZE]
array of message arrays
Definition message.c:111
static void fr_message_gc(fr_message_set_t *ms, int max_to_clean)
Garbage collect "done" messages.
Definition message.c:358
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:196
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:1001
static fr_message_t * fr_message_get_ring_buffer(fr_message_set_t *ms, fr_message_t *m, bool cleaned_up)
Get a ring buffer for a message.
Definition message.c:799
fr_message_t * fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
Localize a message by copying it to local storage.
Definition message.c:248
static fr_message_t * fr_message_get_message(fr_message_set_t *ms, bool *p_cleaned)
Allocate a fr_message_t, WITHOUT a ring buffer.
Definition message.c:680
int rb_max
max used ring buffer entry
Definition message.c:104
#define MSG_ARRAY_SIZE
Definition message.c:41
static fr_message_t * fr_message_ring_alloc(fr_message_set_t *ms, fr_ring_buffer_t *mr, bool clean)
Allocate a message from a message ring.
Definition message.c:629
int fr_message_set_messages_used(fr_message_set_t *ms)
Count the number of used messages.
Definition message.c:1225
#define CACHE_ALIGN(_x)
Definition message.c:43
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition message.c:1251
int mr_max
max used message ring entry
Definition message.c:97
void fr_message_set_debug(FILE *fp, fr_message_set_t *ms)
Print debug information about the message set.
Definition message.c:1275
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition message.c:947
int mr_cleaned
where we last cleaned
Definition message.c:101
static int fr_message_ring_gc(fr_message_set_t *ms, fr_ring_buffer_t *mr, int max_to_clean)
Clean up messages in a message ring.
Definition message.c:312
fr_message_t * fr_message_alloc_reserve(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size, size_t leftover, size_t reserve_size)
Allocate packet data for a message, and reserve a new message.
Definition message.c:1090
size_t max_allocation
maximum allocation size
Definition message.c:106
int rb_current
current used ring buffer entry
Definition message.c:103
A Message set, composed of message headers and ring buffer data.
Definition message.c:95
fr_ring_buffer_t * rb
pointer to the ring buffer
Definition message.h:48
size_t rb_size
cache-aligned size in the ring buffer
Definition message.h:51
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
@ FR_MESSAGE_USED
Definition message.h:39
@ FR_MESSAGE_LOCALIZED
Definition message.h:40
@ FR_MESSAGE_DONE
Definition message.h:41
@ FR_MESSAGE_FREE
Definition message.h:38
fr_message_status_t status
free, used, done, etc.
Definition message.h:45
static size_t reserve_size
static size_t used
#define fr_assert(_expr)
Definition rad_assert.h:38
static bool cleanup
Definition radsniff.c:60
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:64
uint8_t * fr_ring_buffer_alloc(fr_ring_buffer_t *rb, size_t size)
Mark data as allocated.
uint8_t * fr_ring_buffer_reserve(fr_ring_buffer_t *rb, size_t size)
Reserve room in the ring buffer.
int fr_ring_buffer_free(fr_ring_buffer_t *rb, size_t size_to_free)
Mark data as free,.
size_t fr_ring_buffer_used(fr_ring_buffer_t *rb)
Get the amount of data used in a ring buffer.
size_t fr_ring_buffer_size(fr_ring_buffer_t *rb)
Get the size of the ring buffer.
int fr_ring_buffer_start(fr_ring_buffer_t *rb, uint8_t **p_start, size_t *p_size)
Get a pointer to the data at the start of the ring buffer.
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223