25RCSID(
"$Id: aab82433d7163a5110f15178d907c4ce0562cc5d $")
27#include <freeradius-devel/io/message.h>
28#include <freeradius-devel/util/strerror.h>
35#define MPRINT(...) fprintf(stderr, __VA_ARGS__)
40#define MSG_ARRAY_SIZE (16)
42#define CACHE_ALIGN(_x) do { _x += 63; _x &= ~(size_t) 63; } while (0)
135 if (num_messages < 8) num_messages = 8;
137 if ((num_messages & (num_messages - 1)) != 0) {
147 if (message_size > 1024) {
178 ms->
max_allocation = unlimited_size ? 1 << 29 : ring_buffer_size / 2;
261 l = talloc_memdup(ctx, m, message_size);
313 int messages_cleaned = 0;
319 if (size == 0)
break;
340 if (messages_cleaned >= max_to_clean)
break;
343 MPRINT(
"CLEANED %d (%p) left\n", messages_cleaned, mr);
344 return messages_cleaned;
360 int arrays_freed, arrays_used, empty_slot;
361 int largest_free_slot;
363 size_t largest_free_size;
373 for (i = 0; i <= ms->
mr_max; i++) {
377 total_cleaned += cleaned;
378 fr_assert(total_cleaned <= max_to_clean);
383 if (total_cleaned == max_to_clean)
break;
389 if (total_cleaned == 0)
return;
398 for (i = ms->
mr_max; i >= 0; i--) {
401 if (arrays_used < 2) {
402 MPRINT(
"\tleaving entry %d alone\n", i);
412 MPRINT(
"\tfreeing entry %d\n", i);
418 MPRINT(
"\tstill in use entry %d\n", i);
426 MPRINT(
"TRYING TO PACK from %d free arrays out of %d\n", arrays_freed, ms->
rb_max + 1);
434 for (i = 0; i <= ms->
mr_max; i++) {
443 if (empty_slot < 0) empty_slot = i;
453 if (empty_slot < 0)
continue;
464 for (j = empty_slot + 1; j <= i; j++) {
476 ms->
mr_max -= arrays_freed;
481 for (i = 0; i <= ms->
mr_max; i++) {
501 for (i = ms->
rb_max; i >= 0; i--) {
504 if (arrays_used < 2) {
505 MPRINT(
"\tleaving entry %d alone\n", i);
511 MPRINT(
"\tfreeing entry %d\n", i);
517 MPRINT(
"\tstill in use entry %d\n", i);
523 if (arrays_freed > 0) {
524 MPRINT(
"TRYING TO PACK from %d free arrays out of %d\n", arrays_freed, ms->
rb_max + 1);
532 for (i = 0; i <= ms->
rb_max; i++) {
541 if (empty_slot < 0) empty_slot = i;
551 if (empty_slot < 0)
continue;
562 for (j = empty_slot + 1; j <= i; j++) {
574 ms->
rb_max -= arrays_freed;
579 for (i = 0; i <= ms->
rb_max; i++) {
595 largest_free_slot = ms->
rb_max;
599 for (i = 0; i < ms->
rb_max; i++) {
606 if (largest_free_size < free_size) {
607 largest_free_slot = i;
608 largest_free_size = free_size;
633static inline CC_HINT(always_inline)
647 mr = ms->mr_array[ms->mr_current];
648 p = op(mr, ms->message_size);
650 MPRINT(
"RING FIND normal\n");
654 MPRINT(
"CLEANING UP (%zd - %zd = %zd)\n", ms->allocated, ms->freed,
655 ms->allocated - ms->freed);
672 mr = ms->mr_array[ms->mr_current];
674 p = op(mr, ms->message_size);
676 MPRINT(
"RING FIND after cleanup\n");
693 for (i = ms->mr_max; i >= 0; i--) {
694 mr = ms->mr_array[i];
697 p = op(mr, ms->message_size);
700 MPRINT(
"RING FIND from changed ring buffer\n");
701 MPRINT(
"SET MR to changed %d\n", ms->mr_current);
731 ms->mr_current = ms->mr_max;
732 ms->mr_array[ms->mr_max] = mr;
734 MPRINT(
"SET MR to doubled %d\n", ms->mr_current);
736 return op(mr, ms->message_size);
800 bool cleaned =
false;
809 if (!
fr_cond_assert_msg(nm == m,
"Alloc interleaved between reserving and finalising message allocation %p", m)) {
839 if (m->
data)
return m;
851 if (ms->
rb_max == 0)
goto alloc_rb;
869 if (m->
data)
return m;
871 MPRINT(
"CLEANUP RING BUFFER FAILED\n");
887 for (i = ms->
rb_max; i >= 0; i--) {
892 MPRINT(
"MOVED TO RING BUFFER %d\n", i);
915 if (alloc_size < m->rb_size) {
924 MPRINT(
"RING BUFFER DOUBLES\n");
940 if (m->
data)
return m;
943 MPRINT(
"OUT OF MEMORY\n");
990 MPRINT(
"Failed to reserve message\n");
1061 if (!m)
return NULL;
1067 if (total_size == 0) {
1078 align_size = total_size;
1113 if (!m)
return NULL;
1160 size_t actual_packet_size,
1166 size_t m_rb_size, align_size;
1170 align_size = actual_packet_size;
1206 if (!m2)
return NULL;
1236 if (m2->
data != (m->
data + actual_packet_size)) {
1237 memmove(m2->
data, m->
data + actual_packet_size, leftover);
1259 if (m2->
data != (m->
data + actual_packet_size)) {
1260 memmove(m2->
data, m->
data + actual_packet_size, leftover);
1270 if (m2->
rb != m->
rb) {
1302 for (i = 0; i <= ms->
mr_max; i++) {
1330 for (i = 0; i <= ms->
mr_max; i++) {
1351 fprintf(fp,
"message arrays = %d\t(current %d)\n", ms->
mr_max + 1, ms->
mr_current);
1352 fprintf(fp,
"ring buffers = %d\t(current %d)\n", ms->
rb_max + 1, ms->
rb_current);
1354 for (i = 0; i <= ms->
mr_max; i++) {
1357 fprintf(fp,
"messages[%d] =\tsize %zu, used %zu\n",
1361 for (i = 0; i <= ms->
rb_max; i++) {
1362 fprintf(fp,
"ring buffer[%d] =\tsize %zu, used %zu\n",
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
fr_ring_buffer_t * rb_array[MSG_ARRAY_SIZE]
array of ring buffers
size_t message_size
size of the callers message, including fr_message_t
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
Create a message set.
int mr_current
current used message ring entry
fr_ring_buffer_t * mr_array[MSG_ARRAY_SIZE]
array of message arrays
static void message_gc(fr_message_set_t *ms, int max_to_clean)
Garbage collect "done" messages.
void fr_message_and_data_reset(fr_message_set_t *ms, fr_message_t *m)
Cancel a reservation made by fr_message_and_data_reserve(), returning the slot to the set.
int fr_message_done(fr_message_t *m)
Mark a message as done.
static fr_message_t * message_reserve(fr_message_set_t *ms, bool *p_cleaned)
Reserve a fr_message_t, WITHOUT a ring buffer.
fr_message_t * fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
Localize a message by copying it to local storage.
static int message_finalise(fr_message_set_t *ms, fr_message_t *m)
Finalise a reserved message allocation.
static int message_ring_gc(fr_message_set_t *ms, fr_ring_buffer_t *mr, int max_to_clean)
Clean up messages in a message ring.
int rb_max
max used ring buffer entry
fr_message_t * fr_message_and_data_commit_with_leftover(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size, size_t leftover, size_t reserve_size)
Allocate packet data for a message, and reserve a new message.
fr_message_t * fr_message_and_data_commit(fr_message_set_t *ms, fr_message_t *m, size_t total_size)
Commit a previously reserved message, allocating exactly total_size bytes of packet data.
int fr_message_set_messages_used(fr_message_set_t *ms)
Count the number of used messages.
static fr_message_t * message_alloc(fr_message_set_t *ms, bool *p_cleaned)
Allocate a fr_message_t, WITHOUT a ring buffer.
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
static uint8_t * _message_ring_find(bool *p_cleaned, fr_message_set_t *ms, fr_ring_buffer_op_t op)
Find a message ring with space, performing GC and expansion as needed.
int mr_max
max used message ring entry
uint8_t *(* fr_ring_buffer_op_t)(fr_ring_buffer_t *rb, size_t size)
void fr_message_set_debug(FILE *fp, fr_message_set_t *ms)
Print debug information about the message set.
fr_message_t * fr_message_and_data_alloc(fr_message_set_t *ms, size_t size)
Reserve and commit a message atomically.
int mr_cleaned
where we last cleaned
static fr_message_t * message_data_reserve(fr_message_set_t *ms, fr_message_t *m, bool cleaned_up)
Reserve data in a ring buffer for this message.
size_t max_allocation
maximum allocation size
fr_message_t * fr_message_and_data_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
int rb_current
current used ring buffer entry
A Message set, composed of message headers and ring buffer data.
fr_ring_buffer_t * rb
pointer to the ring buffer
size_t rb_size
cache-aligned size in the ring buffer
uint8_t * data
pointer to the data in the ring buffer
size_t data_size
size of the data in the ring buffer
fr_message_status_t status
free, used, done, etc.
static size_t reserve_size
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
uint8_t * fr_ring_buffer_alloc(fr_ring_buffer_t *rb, size_t size)
Mark data as allocated.
uint8_t * fr_ring_buffer_reserve(fr_ring_buffer_t *rb, size_t size)
Reserve room in the ring buffer.
int fr_ring_buffer_free(fr_ring_buffer_t *rb, size_t size_to_free)
Mark data as free,.
size_t fr_ring_buffer_used(fr_ring_buffer_t *rb)
Get the amount of data used in a ring buffer.
size_t fr_ring_buffer_size(fr_ring_buffer_t *rb)
Get the size of the ring buffer.
int fr_ring_buffer_start(fr_ring_buffer_t *rb, uint8_t **p_start, size_t *p_size)
Get a pointer to the data at the start of the ring buffer.
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
#define fr_strerror_const_push(_msg)
#define fr_strerror_const(_msg)