The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
message.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 986864d4ac3fcfcc45754b9048f12929e9f54319 $
19 *
20 * @brief Messages for inter-thread communication
21 * @file io/message.c
22 *
23 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24 */
25RCSID("$Id: 986864d4ac3fcfcc45754b9048f12929e9f54319 $")
26
27#include <freeradius-devel/io/message.h>
28#include <freeradius-devel/util/strerror.h>
29
30#include <string.h>
31
32/*
33 * Debugging, mainly for message_set_test
34 */
35#if 0
36#define MPRINT(...) fprintf(stderr, __VA_ARGS__)
37#else
38#define MPRINT(...)
39#endif
40
41#define MSG_ARRAY_SIZE (16)
42
43#define CACHE_ALIGN(_x) do { _x += 63; _x &= ~(size_t) 63; } while (0)
44
45/** A Message set, composed of message headers and ring buffer data.
46 *
47 * A message set is intended to send short-lived messages. The
48 * message headers are fixed in size, and allocated from an array
49 * which is treated like a circular buffer. Message bodies (i.e. raw
50 * packets) are variable in size, and live in a separate ring buffer.
51 *
52 * The message set starts off with a small array of message headers,
53 * and a small ring buffer. If an array/buffer fills up, a new one
54 * is allocated at double the size of the previous one.
55 *
56 * The array / buffers are themselves kept in fixed-size arrays, of
57 * MSG_ARRAY_SIZE. The reason is that the memory for fr_message_set_t
58 * should be contiguous, and not in a linked list scattered in
59 * memory.
60 *
61 * The originator allocates a message, and sends it to a recipient.
62 * The recipient (usually in another thread) uses the message, and
63 * marks it as FR_MESSAGE_DONE. The originator then asynchronously
64 * cleans up the message.
65 *
66 * This asynchronous cleanup is done via self-clocking. If there is
67 * no need to clean up the messages, it isn't done. Only when we run
68 * out of space to store messages (or packets) is the cleanup done.
69 *
70 * This cleanup latency ensures that we don't have cache line
71 * bouncing, where the originator sends the message, and then while
72 * the recipieent is reading it... thrashes the cache line with
73 * checks for "are you done? Are you done?"
74 *
75 * If there are more than one used entry in either array, we then try
76 * to coalesce the buffers on cleanup. If we discover that one array
77 * is empty, we discard it, and move the used array entries into it's
78 * place.
79 *
80 * This process ensures that we don't have too many buffers in
81 * progress. It is better to have a few large buffers than many
82 * small ones.
83 *
84 * MSG_ARRAY_SIZE is defined to be large (16 doublings) to allow for
85 * the edge case where messages are stuck for long periods of time.
86 *
87 * With an initial message array size of 64, this gives us room for
88 * 2M packets, if *all* of the mr_array entries have packets stuck in
89 * them that aren't cleaned up for extended periods of time.
90 *
91 * @todo Add a flag for UDP-style protocols, where we can put the
92 * message into the ring buffer. This helps with locality of
93 * reference, and removes the need to track two separate things.
94 */
96 int mr_current; //!< current used message ring entry
97 int mr_max; //!< max used message ring entry
98
99 size_t message_size; //!< size of the callers message, including fr_message_t
100
101 int mr_cleaned; //!< where we last cleaned
102
103 int rb_current; //!< current used ring buffer entry
104 int rb_max; //!< max used ring buffer entry
105
106 size_t max_allocation; //!< maximum allocation size
107
109 int freed;
110
111 fr_ring_buffer_t *mr_array[MSG_ARRAY_SIZE]; //!< array of message arrays
112
113 fr_ring_buffer_t *rb_array[MSG_ARRAY_SIZE]; //!< array of ring buffers
114};
115
116
117/** Create a message set
118 *
119 * @param[in] ctx the context for talloc
120 * @param[in] num_messages size of the initial message array. MUST be a power of 2.
121 * @param[in] message_size the size of each message, INCLUDING fr_message_t, which MUST be at the start of the struct
122 * @param[in] ring_buffer_size of the ring buffer. MUST be a power of 2.
123 * @return
124 * - NULL on error
125 * - newly allocated fr_message_set_t on success
126 */
127fr_message_set_t *fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
128{
130
131 /*
132 * Too small, or not a power of 2.
133 */
134 if (num_messages < 8) num_messages = 8;
135
136 if ((num_messages & (num_messages - 1)) != 0) {
137 fr_strerror_const("Number of messages must be a power of 2");
138 return NULL;
139 }
140
141 if (message_size < sizeof(fr_message_t)) {
142 fr_strerror_printf("Message size must be at least %zd", sizeof(fr_message_t));
143 return NULL;
144 }
145
146 if (message_size > 1024) {
147 fr_strerror_const("Message size must be no larger than 1024");
148 return NULL;
149 }
150
151 ms = talloc_zero(ctx, fr_message_set_t);
152 if (!ms) {
153 fr_strerror_const("Failed allocating memory");
154 return NULL;
155 }
156
157 CACHE_ALIGN(message_size);
158 ms->message_size = message_size;
159
160 ms->rb_array[0] = fr_ring_buffer_create(ms, ring_buffer_size);
161 if (!ms->rb_array[0]) {
162 talloc_free(ms);
163 return NULL;
164 }
165 ms->rb_max = 0;
166
167 ms->mr_array[0] = fr_ring_buffer_create(ms, num_messages * message_size);
168 if (!ms->mr_array[0]) {
169 talloc_free(ms);
170 return NULL;
171 }
172
173 ms->max_allocation = ring_buffer_size / 2;
174
175 return ms;
176}
177
178
179/** Mark a message as done
180 *
181 * Note that this call is usually done from a thread OTHER than the
182 * originator of the message. As such, the message is NOT actually
183 * freed. Instead, it is just marked as freed.
184 *
185 * @param[in] m the message to make as done.
186 * @return
187 * - <0 on error
188 * - 0 on success
189 */
191{
194
195 /*
196 * Mark a message as freed. The originator will take
197 * care of cleaning it up.
198 */
199 if (m->status == FR_MESSAGE_USED) {
201 return 0;
202 }
203
204 /*
205 * This message was localized, so we can free it via
206 * talloc.
207 */
208 if (m->status == FR_MESSAGE_LOCALIZED) {
209 talloc_free(m);
210 return 0;
211 }
212
213 /*
214 * A catastrophic error.
215 */
216 fr_assert(0 == 1);
217
218 fr_strerror_const("Failed marking message as done");
219 return -1;
220}
221
222
223/** Localize a message by copying it to local storage
224 *
225 * This function "localizes" a message by copying it to local
226 * storage. In the case where the recipient of a message has to sit
227 * on it for a while, that blocks the originator from cleaning up the
228 * message. The recipient can then copy the message to local
229 * storage, so that the originator can clean it up.
230 *
231 * The localized message is marked as FR_MESSAGE_LOCALIZED, so that
232 * the recipient can call the normal fr_message_done() function to
233 * free it.
234 *
235 * @param[in] ctx the talloc context to use for localization
236 * @param[in] m the message to be localized
237 * @param[in] message_size the size of the message, including the fr_message_t
238 * @return
239 * - NULL on allocation error
240 * - a newly localized message
241 */
242fr_message_t *fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
243{
244 fr_message_t *l;
245
246 if (m->status != FR_MESSAGE_USED) {
247 fr_strerror_const("Cannot localize message unless it is in use");
248 return NULL;
249 }
250
251 if (message_size <= sizeof(fr_message_t)) {
252 fr_strerror_const("Message size is too small");
253 return NULL;
254 }
255
256 l = talloc_memdup(ctx, m, message_size);
257 if (!l) {
258 nomem:
259 fr_strerror_const("Failed allocating memory");
260 return NULL;
261 }
262
263 l->data = NULL;
264
265 if (l->data_size) {
266 l->data = talloc_memdup(l, m->data, l->data_size);
267 if (!l->data) {
268 talloc_free(l);
269 goto nomem;
270 }
271 }
272
274
275 /*
276 * After this change, "m" should not be used for
277 * anything.
278 */
280
281 /*
282 * Now clean up the other fields of the newly localized
283 * message.
284 */
285 l->rb = NULL;
286 l->rb_size = 0;
287
288 return l;
289}
290
291
292/** Clean up messages in a message ring.
293 *
294 * Find the oldest messages which are marked FR_MESSAGE_DONE,
295 * and mark them FR_MESSAGE_FREE.
296 *
297 * FIXME: If we care, track which ring buffer is in use, and how
298 * many contiguous chunks we can free. Then, free the chunks at
299 * once, instead of piecemeal. Realistically tho... this will
300 * probably make little difference.
301 *
302 * @param[in] ms the message set
303 * @param[in] mr the message ring
304 * @param[in] max_to_clean maximum number of messages to clean at a time.
305 */
306static int fr_message_ring_gc(fr_message_set_t *ms, fr_ring_buffer_t *mr, int max_to_clean)
307{
308 int messages_cleaned = 0;
309 size_t size;
310 fr_message_t *m;
311
312 while (true) {
313 (void) fr_ring_buffer_start(mr, (uint8_t **) &m, &size);
314 if (size == 0) break;
315
316 fr_assert(m != NULL);
317 fr_assert(size >= ms->message_size);
318
320 if (m->status != FR_MESSAGE_DONE) break;
321
322 messages_cleaned++;
324 ms->freed++;
325
326 if (m->rb) {
327 (void) fr_ring_buffer_free(m->rb, m->rb_size);
328#ifndef NDEBUG
329 memset(m, 0, ms->message_size);
330#endif
331 }
332
334
335 if (messages_cleaned >= max_to_clean) break;
336 }
337
338 MPRINT("CLEANED %d (%p) left\n", messages_cleaned, mr);
339 return messages_cleaned;
340}
341
342
343/** Garbage collect "done" messages.
344 *
345 * Called only from the originating thread. We also clean a limited
346 * number of messages at a time, so that we don't have sudden latency
347 * spikes when cleaning 1M messages.
348 *
349 * @param[in] ms the message set
350 * @param[in] max_to_clean the maximum number of messages to clean
351 */
352static void fr_message_gc(fr_message_set_t *ms, int max_to_clean)
353{
354 int i;
355 int arrays_freed, arrays_used, empty_slot;
356 int largest_free_slot;
357 int total_cleaned;
358 size_t largest_free_size;
359
360 /*
361 * Clean up "done" messages.
362 */
363 total_cleaned = 0;
364
365 /*
366 * Garbage collect the smaller buffers first.
367 */
368 for (i = 0; i <= ms->mr_max; i++) {
369 int cleaned;
370
371 cleaned = fr_message_ring_gc(ms, ms->mr_array[i], max_to_clean - total_cleaned);
372 total_cleaned += cleaned;
373 fr_assert(total_cleaned <= max_to_clean);
374
375 /*
376 * Stop when we've reached our GC limit.
377 */
378 if (total_cleaned == max_to_clean) break;
379 }
380
381 /*
382 * Couldn't GC anything. Don't do more work.
383 */
384 if (total_cleaned == 0) return;
385
386 arrays_freed = 0;
387 arrays_used = 0;
388
389 /*
390 * Keep the two largest message buffers (used or not),
391 * and free all smaller ones which are empty.
392 */
393 for (i = ms->mr_max; i >= 0; i--) {
394 fr_assert(ms->mr_array[i] != NULL);
395
396 if (arrays_used < 2) {
397 MPRINT("\tleaving entry %d alone\n", i);
398 arrays_used++;
399 continue;
400 }
401
402 /*
403 * If the message ring buffer is empty, check if
404 * we should perhaps delete it.
405 */
406 if (fr_ring_buffer_used(ms->mr_array[i]) == 0) {
407 MPRINT("\tfreeing entry %d\n", i);
408 TALLOC_FREE(ms->mr_array[i]);
409 arrays_freed++;
410 continue;
411 }
412
413 MPRINT("\tstill in use entry %d\n", i);
414 }
415
416 /*
417 * Some entries have been freed. We need to coalesce the
418 * remaining entries.
419 */
420 if (arrays_freed) {
421 MPRINT("TRYING TO PACK from %d free arrays out of %d\n", arrays_freed, ms->rb_max + 1);
422
423 empty_slot = -1;
424
425 /*
426 * Pack the rb array by moving used entries to
427 * the bottom of the array.
428 */
429 for (i = 0; i <= ms->mr_max; i++) {
430 int j;
431
432 /*
433 * Skip over empty entries, but set
434 * "empty_slot" to the first empty on we
435 * found.
436 */
437 if (!ms->mr_array[i]) {
438 if (empty_slot < 0) empty_slot = i;
439
440 continue;
441 }
442
443 /*
444 * This array entry is used, but there is
445 * no empty slot to put it into. Ignore
446 * it, and continue
447 */
448 if (empty_slot < 0) continue;
449
450 fr_assert(ms->mr_array[empty_slot] == NULL);
451
452 ms->mr_array[empty_slot] = ms->mr_array[i];
453 ms->mr_array[i] = NULL;
454
455 /*
456 * Find the next empty slot which is
457 * greater than the one we just used.
458 */
459 for (j = empty_slot + 1; j <= i; j++) {
460 if (!ms->mr_array[j]) {
461 empty_slot = j;
462 break;
463 }
464 }
465 }
466
467 /*
468 * Lower max, and set current to the largest
469 * array, whether or not it's used.
470 */
471 ms->mr_max -= arrays_freed;
472 ms->mr_current = ms->mr_max;
473
474#ifndef NDEBUG
475 MPRINT("NUM RB ARRAYS NOW %d\n", ms->mr_max + 1);
476 for (i = 0; i <= ms->mr_max; i++) {
477 MPRINT("\t%d %p\n", i, ms->mr_array[i]);
478 fr_assert(ms->mr_array[i] != NULL);
479 }
480#endif
481 }
482
483 /*
484 * And now we do the same thing for the ring buffers.
485 * Except that freeing the messages above also cleaned up
486 * the contents of each ring buffer, so all we need to do
487 * is find the largest empty ring buffer.
488 *
489 * We do this by keeping the two largest ring buffers
490 * (used or not), and then freeing all smaller ones which
491 * are empty.
492 */
493 arrays_used = 0;
494 arrays_freed = 0;
495 MPRINT("TRYING TO FREE ARRAYS %d\n", ms->rb_max);
496 for (i = ms->rb_max; i >= 0; i--) {
497 fr_assert(ms->rb_array[i] != NULL);
498
499 if (arrays_used < 2) {
500 MPRINT("\tleaving entry %d alone\n", i);
501 arrays_used++;
502 continue;
503 }
504
505 if (fr_ring_buffer_used(ms->rb_array[i]) == 0) {
506 MPRINT("\tfreeing entry %d\n", i);
507 TALLOC_FREE(ms->rb_array[i]);
508 arrays_freed++;
509 continue;
510 }
511
512 MPRINT("\tstill in use entry %d\n", i);
513 }
514
515 /*
516 * Pack the array entries back down.
517 */
518 if (arrays_freed > 0) {
519 MPRINT("TRYING TO PACK from %d free arrays out of %d\n", arrays_freed, ms->rb_max + 1);
520
521 empty_slot = -1;
522
523 /*
524 * Pack the rb array by moving used entries to
525 * the bottom of the array.
526 */
527 for (i = 0; i <= ms->rb_max; i++) {
528 int j;
529
530 /*
531 * Skip over empty entries, but set
532 * "empty_slot" to the first empty on we
533 * found.
534 */
535 if (!ms->rb_array[i]) {
536 if (empty_slot < 0) empty_slot = i;
537
538 continue;
539 }
540
541 /*
542 * This array entry is used, but there is
543 * no empty slot to put it into. Ignore
544 * it, and continue
545 */
546 if (empty_slot < 0) continue;
547
548 fr_assert(ms->rb_array[empty_slot] == NULL);
549
550 ms->rb_array[empty_slot] = ms->rb_array[i];
551 ms->rb_array[i] = NULL;
552
553 /*
554 * Find the next empty slot which is
555 * greater than the one we just used.
556 */
557 for (j = empty_slot + 1; j <= i; j++) {
558 if (!ms->rb_array[j]) {
559 empty_slot = j;
560 break;
561 }
562 }
563 }
564
565 /*
566 * Lower max, and set current to the largest
567 * array, whether or not it's used.
568 */
569 ms->rb_max -= arrays_freed;
570 ms->rb_current = ms->rb_max;
571
572#ifndef NDEBUG
573 MPRINT("NUM RB ARRAYS NOW %d\n", ms->rb_max + 1);
574 for (i = 0; i <= ms->rb_max; i++) {
575 MPRINT("\t%d %p\n", i, ms->rb_array[i]);
576 fr_assert(ms->rb_array[i] != NULL);
577 }
578#endif
579 }
580
581 /*
582 * Set the current ring buffer to the one with the
583 * largest free space in it.
584 *
585 * This is different from the allocation strategy for
586 * messages.
587 */
588 if (!fr_cond_assert(ms->rb_array[ms->rb_max] != NULL)) return;
589
590 largest_free_slot = ms->rb_max;
591 largest_free_size = (fr_ring_buffer_size(ms->rb_array[ms->rb_max]) -
593
594 for (i = 0; i < ms->rb_max; i++) {
595 size_t free_size;
596
597 fr_assert(ms->rb_array[i] != NULL);
598
599 free_size = (fr_ring_buffer_size(ms->rb_array[i]) -
601 if (largest_free_size < free_size) {
602 largest_free_slot = i;
603 largest_free_size = free_size;
604 }
605 }
606
607 ms->rb_current = largest_free_slot;
608 fr_assert(ms->rb_current >= 0);
609 fr_assert(ms->rb_current <= ms->rb_max);
610}
611
612/** Allocate a message from a message ring.
613 *
614 * The newly allocated message is zeroed.
615 *
616 * @param[in] ms the message set
617 * @param[in] mr the message ring to allocate from
618 * @param[in] clean whether to clean the message ring
619 * @return
620 * - NULL on failed allocation
621 * - fr_message_t* on successful allocation.
622 */
624{
625 fr_message_t *m;
626
627 /*
628 * We're at the start of a buffer with data, and there's
629 * no room. Do a quick check to see if we can free up
630 * the oldest entry. If not, return.
631 *
632 * Otherwise, fall through to allocating a entry, of
633 * which there must now be at least one free one.
634 *
635 * This check results in a small amount of cache line
636 * thrashing. But if the buffer is full, it's likely
637 * that the oldest entry can be freed. If not, we have a
638 * small amount of cache thrashing, which should be
639 * extremely rare.
640 */
641 if (clean) {
642 if (fr_message_ring_gc(ms, mr, 4) == 0) {
643 fr_strerror_const("No free memory after GC attempt");
644 return NULL;
645 }
646
647 /*
648 * Else we cleaned up some entries in this array.
649 * Go allocate a message.
650 */
651 }
652
653 /*
654 * Grab a new message from the underlying ring buffer.
655 */
657 if (!m) return NULL;
658
659#ifndef NDEBUG
660 memset(m, 0, ms->message_size);
661#endif
663 return m;
664}
665
666/** Allocate a fr_message_t, WITHOUT a ring buffer.
667 *
668 * @param[in] ms the message set
669 * @param[out] p_cleaned a flag to indicate if we cleaned the message array
670 * @return
671 * - NULL on error
672 * - fr_message_t* on success
673 */
675{
676 int i;
677 fr_message_t *m;
679
680 ms->allocated++;
681 *p_cleaned = false;
682
683 /*
684 * Grab the current message array. In the general case,
685 * there's room, so we grab a message and go find a ring
686 * buffer.
687 */
688 mr = ms->mr_array[ms->mr_current];
690 if (m) {
691 memset(m, 0, ms->message_size);
693 MPRINT("ALLOC normal\n");
694 return m;
695 }
696
697 MPRINT("CLEANING UP (%zd - %zd = %zd)\n", ms->allocated, ms->freed,
698 ms->allocated - ms->freed);
699
700 /*
701 * Else the buffer is full. Do a global cleanup.
702 */
703 fr_message_gc(ms, 128);
704 *p_cleaned = true;
705
706 /*
707 * If we're lucky, the cleanup has given us a new
708 * "current" buffer, which is empty. If so, use it.
709 */
710 mr = ms->mr_array[ms->mr_current];
711 m = fr_message_ring_alloc(ms, mr, true);
712 if (m) {
713 MPRINT("ALLOC after cleanup\n");
714 return m;
715 }
716
717 /*
718 * We've tried two allocations, and both failed. Brute
719 * force over all arrays, trying to allocate one
720 * somewhere... anywhere. We start from the largest
721 * array, because that is the one we want to use the
722 * most.
723 *
724 * We want to avoid allocations in the smallest array,
725 * because that array will quickly wrap, and will cause
726 * us to do cleanups more often. That also lets old
727 * entries in the smallest array age out, so that we can
728 * free the smallest arrays.
729 */
730 for (i = ms->mr_max; i >= 0; i--) {
731 mr = ms->mr_array[i];
732
733 m = fr_message_ring_alloc(ms, mr, true);
734 if (m) {
735 ms->mr_current = i;
736 MPRINT("ALLOC from changed ring buffer\n");
737 MPRINT("SET MR to changed %d\n", ms->mr_current);
738 return m;
739 }
740 }
741
742 /*
743 * All of the arrays are full. If we don't have
744 * room to allocate another array, we're dead.
745 */
746 if ((ms->mr_max + 1) >= MSG_ARRAY_SIZE) {
747 fr_strerror_const("All message arrays are full");
748 return NULL;
749 }
750
751 /*
752 * Allocate another message ring, double the size
753 * of the previous maximum.
754 */
756 if (!mr) {
757 fr_strerror_const_push("Failed allocating ring buffer");
758 return NULL;
759 }
760
761 /*
762 * Set the new one as current for all new
763 * allocations, allocate a message, and go try to
764 * reserve room for the raw packet data.
765 */
766 ms->mr_max++;
767 ms->mr_current = ms->mr_max;
768 ms->mr_array[ms->mr_max] = mr;
769
770 MPRINT("SET MR to doubled %d\n", ms->mr_current);
771
772 /*
773 * And we should now have an entirely empty message ring.
774 */
775 m = fr_message_ring_alloc(ms, mr, false);
776 if (!m) return NULL;
777
778 MPRINT("ALLOC after doubled message ring\n");
779
780 return m;
781}
782
783
784/** Get a ring buffer for a message
785 *
786 * @param[in] ms the message set
787 * @param[in] m the message
788 * @param[in] cleaned_up whether the message set was partially garbage collected
789 * @return
790 * - NULL on error, and m is deallocated
791 * - m on success
792 */
794 bool cleaned_up)
795{
796 int i;
798
799 /*
800 * And... we go through a bunch of hoops, all over again.
801 */
802 m->rb = ms->rb_array[ms->rb_current];
803 fr_assert(m->rb != NULL);
805 if (m->data) return m;
806
807 /*
808 * When the simple allocation fails, ensure we don't do
809 * the cleanup twice in one allocation.
810 */
811 if (!cleaned_up) {
812 /*
813 * If we run out of room in the current ring
814 * buffer, AND it's our only one, then just
815 * double it in size.
816 */
817 if (ms->rb_max == 0) goto alloc_rb;
818
819 /*
820 * We're using multiple ring buffers, and we
821 * haven't already done a cleanup. Force a
822 * cleanup.
823 */
824 MPRINT("CLEANED UP BECAUSE OF RING BUFFER (%zd - %zd = %zd)\n", ms->allocated, ms->freed,
825 ms->allocated - ms->freed);
826
827 fr_message_gc(ms, 128);
828
829 /*
830 * Try to allocate the packet from the newly current ring buffer.
831 */
832 m->rb = ms->rb_array[ms->rb_current];
833 fr_assert(m->rb != NULL);
835 if (m->data) return m;
836
837 MPRINT("CLEANUP RING BUFFER FAILED\n");
838 }
839
840 /*
841 * We've tried two allocations, and both failed. Brute
842 * force over all arrays, trying to allocate one
843 * somewhere... anywhere. We start from the largest
844 * array, because that is the one we want to use the
845 * most.
846 *
847 * We want to avoid allocations in the smallest array,
848 * because that array will quickly wrap, and will cause
849 * us to do cleanups more often. That also lets old
850 * entries in the smallest array age out, so that we can
851 * free the smallest arrays.
852 */
853 for (i = ms->rb_max; i >= 0; i--) {
854 m->rb = ms->rb_array[i];
855 fr_assert(m->rb != NULL);
857 if (m->data) {
858 MPRINT("MOVED TO RING BUFFER %d\n", i);
859 ms->rb_current = i;
860 return m;
861 }
862 }
863
864 /*
865 * All of the arrays are full. If we don't have
866 * room to allocate another array, we're dead.
867 */
868 if ((ms->rb_max + 1) >= MSG_ARRAY_SIZE) {
869 fr_strerror_const("Message arrays are full");
870 goto cleanup;
871 }
872
873alloc_rb:
874 /*
875 * Allocate another message ring, double the size
876 * of the previous maximum.
877 */
879 if (!rb) {
880 fr_strerror_const_push("Failed allocating ring buffer");
881 goto cleanup;
882 }
883
884 MPRINT("RING BUFFER DOUBLES\n");
885
886 /*
887 * Set the new one as current for all new
888 * allocations, allocate a message, and go try to
889 * reserve room for the raw packet data.
890 */
891 ms->rb_max++;
892 ms->rb_current = ms->rb_max;
893 ms->rb_array[ms->rb_current] = rb;
894
895 /*
896 * And we should now have an entirely empty message ring.
897 */
898 m->rb = rb;
900 if (m->data) return m;
901
902cleanup:
903 MPRINT("OUT OF MEMORY\n");
904
905 m->rb = NULL;
907 return NULL;
908}
909
910
911/** Reserve a message
912 *
913 * A later call to fr_message_alloc() will allocate the correct
914 * packet ring buffer size. This call just allocates a message
915 * header, and reserves space for the packet.
916 *
917 * If the caller later decides that the message is not needed, he
918 * should call fr_message_free() to free the message.
919 *
920 * We assume that the caller will call fr_message_reserve(), and then
921 * almost immediately fr_message_alloc(). Multiple calls in series
922 * to fr_message_reserve() MUST NOT be done. The caller could also
923 * just call fr_ring_buffer_alloc(m->rb, size) if they wanted, and
924 * then update m->data_size by hand...
925 *
926 * The message is returned
927 *
928 * @param[in] ms the message set
929 * @param[in] reserve_size to reserve
930 * @return
931 * - NULL on error
932 * - fr_message_t* on success
933 */
935{
936 bool cleaned_up;
937 fr_message_t *m;
938
939 (void) talloc_get_type_abort(ms, fr_message_set_t);
940
941 if (reserve_size > ms->max_allocation) {
942 fr_strerror_printf("Cannot reserve %zd > max allocation %zd\n", reserve_size, ms->max_allocation);
943 return NULL;
944 }
945
946 /*
947 * Allocate a bare message.
948 */
949 m = fr_message_get_message(ms, &cleaned_up);
950 if (!m) {
951 MPRINT("Failed to reserve message\n");
952 return NULL;
953 }
954
955 /*
956 * If the caller is not allocating any packet data, just
957 * return the empty message.
958 */
959 if (!reserve_size) return m;
960
961 /*
962 * We leave m->data_size as zero, and m->rb_size as the
963 * reserved size. This indicates that the message has
964 * reserved room for the packet data, but nothing has
965 * been allocated.
966 */
969
970 return fr_message_get_ring_buffer(ms, m, cleaned_up);
971}
972
973/** Allocate packet data for a message
974 *
975 * The caller will normally call fr_message_reserve() before calling
976 * this function, and pass the resulting message 'm' here. If 'm' is
977 * NULL, however, this function will call fr_message_reserve() of
978 * 'actual_packet_size'. This capability is there for callers who
979 * know the size of the message in advance.
980 *
981 * @param[in] ms the message set
982 * @param[in] m the message message to allocate packet data for
983 * @param[in] actual_packet_size to use
984 * @return
985 * - NULL on error, and input message m is left alone
986 * - fr_message_t* on success. Will always be input message m.
987 */
988fr_message_t *fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
989{
990 uint8_t *p;
991 size_t reserve_size;
992
993 (void) talloc_get_type_abort(ms, fr_message_set_t);
994
995 /* m is NOT talloc'd */
996
997 if (!m) {
998 m = fr_message_reserve(ms, actual_packet_size); /* will cache align it */
999 if (!m) return NULL;
1000 }
1001
1003 fr_assert(m->rb != NULL);
1004 fr_assert(m->data != NULL);
1005 fr_assert(m->data_size == 0);
1006 fr_assert(m->rb_size >= actual_packet_size);
1007
1008 /*
1009 * No data to send? Just send a bare message;
1010 */
1011 if (actual_packet_size == 0) {
1012 m->data = NULL;
1013 m->rb = NULL;
1014 m->data_size = m->rb_size = 0;
1015 return m;
1016 }
1017
1018 reserve_size = actual_packet_size;
1020
1022 fr_assert(p != NULL);
1023 if (!p) {
1024 fr_strerror_const_push("Failed allocating from ring buffer");
1025 return NULL;
1026 }
1027
1028 fr_assert(p == m->data);
1029
1030 m->data_size = actual_packet_size;
1031 m->rb_size = reserve_size;
1032
1033 return m;
1034}
1035
1036/** Allocate packet data for a message, and reserve a new message
1037 *
1038 * This function allocates a previously reserved message, and then
1039 * reserves a new message.
1040 *
1041 * The application should call fr_message_reserve() with a large
1042 * buffer, and then read data into the buffer. If the buffer
1043 * contains multiple packets, the application should call
1044 * fr_message_alloc_reserve() repeatedly to allocate the full
1045 * packets, while reserving room for the partial packet.
1046 *
1047 * When the application is determines that there is only one full
1048 * packet, and one partial packet in the buffer, it should call this
1049 * function with actual_packet_size, and a large reserve_size. The
1050 * partial packet will be reserved. If the ring buffer is full, the
1051 * partial packet will be copied to a new ring buffer.
1052 *
1053 * When the application determines that there are multiple full
1054 * packets in the buffer, it should call this function with
1055 * actual_packet_size for each buffer, and reserve_size which
1056 * reserves all of the data in the buffer. i.e. the full packets +
1057 * partial packets, which should start off as the original
1058 * reserve_size.
1059 *
1060 * The application should call this function to allocate each packet,
1061 * while decreasing reserve_size by each actual_packet_size that was
1062 * allocated. Once there is only one full and a partial packet in
1063 * the buffer, it should use a large reserve_size, as above.
1064 *
1065 * The application could just always ecall this function with a large
1066 * reserve_size, at the cost of substantially more memcpy()s.
1067 *
1068 * @param[in] ms the message set
1069 * @param[in] m the message message to allocate packet data for
1070 * @param[in] actual_packet_size to use
1071 * @param[in] leftover "dirty" bytes in the buffer
1072 * @param[in] reserve_size to reserve for new message
1073 * @return
1074 * - NULL on error, and input message m is left alone
1075 * - fr_message_t* on success. Will always be a new message.
1076 */
1078 size_t leftover, size_t reserve_size)
1079{
1080 bool cleaned_up;
1081 uint8_t *p;
1082 fr_message_t *m2;
1083 size_t m_rb_size, align_size;
1084
1085 (void) talloc_get_type_abort(ms, fr_message_set_t);
1086
1087 align_size = actual_packet_size;
1088 CACHE_ALIGN(align_size);
1089
1090 /* m is NOT talloc'd */
1091
1093 fr_assert(m->rb != NULL);
1094 fr_assert(m->data != NULL);
1095 fr_assert(m->rb_size >= actual_packet_size);
1096
1097 p = fr_ring_buffer_alloc(m->rb, align_size);
1098 fr_assert(p != NULL);
1099 if (!p) {
1100 fr_strerror_const_push("Failed allocating from ring buffer");
1101 return NULL;
1102 }
1103
1104 fr_assert(p == m->data);
1105
1106 m_rb_size = m->rb_size; /* for ring buffer cleanups */
1107
1108 m->data_size = actual_packet_size;
1109 m->rb_size = align_size;
1110
1111 /*
1112 * If we've allocated all of the reserved ring buffer
1113 * data, then just reserve a brand new reservation.
1114 *
1115 * This will be automatically cache aligned.
1116 */
1117 if (!leftover) return fr_message_reserve(ms, reserve_size);
1118
1119 /*
1120 * Allocate a new message.
1121 */
1122 m2 = fr_message_get_message(ms, &cleaned_up);
1123 if (!m2) return NULL;
1124
1125 /*
1126 * Ensure that there's enough room to shift the next
1127 * packet, so that it's cache aligned. Moving small
1128 * amounts of memory is likely faster than having two
1129 * CPUs fight over the same cache lines.
1130 */
1131 reserve_size += (align_size - actual_packet_size);
1133
1134 /*
1135 * Track how much data there is in the packet.
1136 */
1137 m2->rb = m->rb;
1138 m2->data_size = leftover;
1139 m2->rb_size = reserve_size;
1140
1141 /*
1142 * Try to extend the reservation. If we can do it,
1143 * return.
1144 */
1146 if (m2->data) {
1147 /*
1148 * The next packet pointer doesn't point to the
1149 * actual data after the current packet. Move
1150 * the next packet to match up with the ring
1151 * buffer allocation.
1152 */
1153 if (m2->data != (m->data + actual_packet_size)) {
1154 memmove(m2->data, m->data + actual_packet_size, leftover);
1155 }
1156 return m2;
1157 }
1158
1159 /*
1160 * We failed reserving more memory at the end of the
1161 * current ring buffer.
1162 *
1163 * Reserve data from a new ring buffer. If it doesn't
1164 * succeed, ensure that the old message will properly
1165 * clean up the old ring buffer.
1166 */
1167 if (!fr_message_get_ring_buffer(ms, m2, false)) {
1168 m->rb_size = m_rb_size;
1169 return NULL;
1170 }
1171
1172 /*
1173 * If necessary, copy the remaining data from the old
1174 * buffer to the new one.
1175 */
1176 if (m2->data != (m->data + actual_packet_size)) {
1177 memmove(m2->data, m->data + actual_packet_size, leftover);
1178 }
1179
1180 /*
1181 * The messages are in different ring buffers. We've
1182 * aligned m->rb_size above for the current packet, but
1183 * there's no subsequent message to clean up this
1184 * reservation. Re-extend the current message to it's
1185 * original size, so that cleaning it up will clean up the ring buffer.
1186 */
1187 if (m2->rb != m->rb) {
1188 m->rb_size = m_rb_size;
1189 return m2;
1190 }
1191
1192 /*
1193 * If we've managed to allocate the next message in the
1194 * current ring buffer, then it really should have
1195 * wrapped around. In which case, re-extend the current
1196 * message as above.
1197 */
1198 if (m2->data < m->data) {
1199 m->rb_size = m_rb_size;
1200 return m2;
1201 }
1202
1203 return m2;
1204}
1205
1206/** Count the number of used messages
1207 *
1208 * @param[in] ms the message set
1209 * @return
1210 * - number of used messages
1211 */
1213{
1214 int i, used;
1215
1216 (void) talloc_get_type_abort(ms, fr_message_set_t);
1217
1218 used = 0;
1219 for (i = 0; i <= ms->mr_max; i++) {
1220 fr_ring_buffer_t *mr;
1221
1222 mr = ms->mr_array[i];
1223
1225 }
1226
1227 return used;
1228}
1229
1230/** Garbage collect the message set.
1231 *
1232 * This function should ONLY be called just before freeing the
1233 * message set. It is intended only for debugging, and will cause
1234 * huge latency spikes if used at run time.
1235 *
1236 * @param[in] ms the message set
1237 */
1239{
1240 int i;
1241
1242 (void) talloc_get_type_abort(ms, fr_message_set_t);
1243
1244 /*
1245 * Manually clean up each message ring.
1246 */
1247 for (i = 0; i <= ms->mr_max; i++) {
1248 (void) fr_message_ring_gc(ms, ms->mr_array[i], ~0);
1249 }
1250
1251 /*
1252 * And then do one last pass to clean up the arrays.
1253 */
1254 fr_message_gc(ms, 1 << 24);
1255}
1256
1257/** Print debug information about the message set.
1258 *
1259 * @param[in] ms the message set
1260 * @param[in] fp the FILE where the messages are printed.
1261 */
1263{
1264 int i;
1265
1266 (void) talloc_get_type_abort(ms, fr_message_set_t);
1267
1268 fprintf(fp, "message arrays = %d\t(current %d)\n", ms->mr_max + 1, ms->mr_current);
1269 fprintf(fp, "ring buffers = %d\t(current %d)\n", ms->rb_max + 1, ms->rb_current);
1270
1271 for (i = 0; i <= ms->mr_max; i++) {
1272 fr_ring_buffer_t *mr = ms->mr_array[i];
1273
1274 fprintf(fp, "messages[%d] =\tsize %zu, used %zu\n",
1276 }
1277
1278 for (i = 0; i <= ms->rb_max; i++) {
1279 fprintf(fp, "ring buffer[%d] =\tsize %zu, used %zu\n",
1281 }
1282}
#define RCSID(id)
Definition build.h:483
static fr_ring_buffer_t * rb
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:139
talloc_free(reap)
unsigned char uint8_t
fr_ring_buffer_t * rb_array[MSG_ARRAY_SIZE]
array of ring buffers
Definition message.c:113
size_t message_size
size of the callers message, including fr_message_t
Definition message.c:99
int mr_current
current used message ring entry
Definition message.c:96
#define MPRINT(...)
Definition message.c:38
fr_ring_buffer_t * mr_array[MSG_ARRAY_SIZE]
array of message arrays
Definition message.c:111
static void fr_message_gc(fr_message_set_t *ms, int max_to_clean)
Garbage collect "done" messages.
Definition message.c:352
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:190
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:988
static fr_message_t * fr_message_get_ring_buffer(fr_message_set_t *ms, fr_message_t *m, bool cleaned_up)
Get a ring buffer for a message.
Definition message.c:793
fr_message_t * fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
Localize a message by copying it to local storage.
Definition message.c:242
static fr_message_t * fr_message_get_message(fr_message_set_t *ms, bool *p_cleaned)
Allocate a fr_message_t, WITHOUT a ring buffer.
Definition message.c:674
int rb_max
max used ring buffer entry
Definition message.c:104
#define MSG_ARRAY_SIZE
Definition message.c:41
static fr_message_t * fr_message_ring_alloc(fr_message_set_t *ms, fr_ring_buffer_t *mr, bool clean)
Allocate a message from a message ring.
Definition message.c:623
int fr_message_set_messages_used(fr_message_set_t *ms)
Count the number of used messages.
Definition message.c:1212
void fr_message_set_debug(fr_message_set_t *ms, FILE *fp)
Print debug information about the message set.
Definition message.c:1262
#define CACHE_ALIGN(_x)
Definition message.c:43
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition message.c:1238
int mr_max
max used message ring entry
Definition message.c:97
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition message.c:934
int mr_cleaned
where we last cleaned
Definition message.c:101
static int fr_message_ring_gc(fr_message_set_t *ms, fr_ring_buffer_t *mr, int max_to_clean)
Clean up messages in a message ring.
Definition message.c:306
fr_message_t * fr_message_alloc_reserve(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size, size_t leftover, size_t reserve_size)
Allocate packet data for a message, and reserve a new message.
Definition message.c:1077
size_t max_allocation
maximum allocation size
Definition message.c:106
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
Definition message.c:127
int rb_current
current used ring buffer entry
Definition message.c:103
A Message set, composed of message headers and ring buffer data.
Definition message.c:95
fr_ring_buffer_t * rb
pointer to the ring buffer
Definition message.h:48
size_t rb_size
cache-aligned size in the ring buffer
Definition message.h:51
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
@ FR_MESSAGE_USED
Definition message.h:39
@ FR_MESSAGE_LOCALIZED
Definition message.h:40
@ FR_MESSAGE_DONE
Definition message.h:41
@ FR_MESSAGE_FREE
Definition message.h:38
fr_message_status_t status
free, used, done, etc.
Definition message.h:45
static size_t reserve_size
static size_t used
#define fr_assert(_expr)
Definition rad_assert.h:38
static bool cleanup
Definition radsniff.c:60
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:64
uint8_t * fr_ring_buffer_alloc(fr_ring_buffer_t *rb, size_t size)
Mark data as allocated.
uint8_t * fr_ring_buffer_reserve(fr_ring_buffer_t *rb, size_t size)
Reserve room in the ring buffer.
int fr_ring_buffer_free(fr_ring_buffer_t *rb, size_t size_to_free)
Mark data as free,.
size_t fr_ring_buffer_used(fr_ring_buffer_t *rb)
Get the amount of data used in a ring buffer.
size_t fr_ring_buffer_size(fr_ring_buffer_t *rb)
Get the size of the ring buffer.
int fr_ring_buffer_start(fr_ring_buffer_t *rb, uint8_t **p_start, size_t *p_size)
Get a pointer to the data at the start of the ring buffer.
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223