25RCSID(
"$Id: 15f4364765c192611e3c4df1a0d3f9ebfa6eee14 $")
27#include <freeradius-devel/io/message.h>
28#include <freeradius-devel/util/strerror.h>
35#define MPRINT(...) fprintf(stderr, __VA_ARGS__)
40#define MSG_ARRAY_SIZE (16)
42#define CACHE_ALIGN(_x) do { _x += 63; _x &= ~(size_t) 63; } while (0)
135 if (num_messages < 8) num_messages = 8;
137 if ((num_messages & (num_messages - 1)) != 0) {
147 if (message_size > 1024) {
178 ms->
max_allocation = unlimited_size ? 1 << 29 : ring_buffer_size / 2;
261 l = talloc_memdup(ctx, m, message_size);
313 int messages_cleaned = 0;
319 if (size == 0)
break;
340 if (messages_cleaned >= max_to_clean)
break;
343 MPRINT(
"CLEANED %d (%p) left\n", messages_cleaned, mr);
344 return messages_cleaned;
360 int arrays_freed, arrays_used, empty_slot;
361 int largest_free_slot;
363 size_t largest_free_size;
373 for (i = 0; i <= ms->
mr_max; i++) {
377 total_cleaned += cleaned;
378 fr_assert(total_cleaned <= max_to_clean);
383 if (total_cleaned == max_to_clean)
break;
389 if (total_cleaned == 0)
return;
398 for (i = ms->
mr_max; i >= 0; i--) {
401 if (arrays_used < 2) {
402 MPRINT(
"\tleaving entry %d alone\n", i);
412 MPRINT(
"\tfreeing entry %d\n", i);
418 MPRINT(
"\tstill in use entry %d\n", i);
426 MPRINT(
"TRYING TO PACK from %d free arrays out of %d\n", arrays_freed, ms->
rb_max + 1);
434 for (i = 0; i <= ms->
mr_max; i++) {
443 if (empty_slot < 0) empty_slot = i;
453 if (empty_slot < 0)
continue;
464 for (j = empty_slot + 1; j <= i; j++) {
476 ms->
mr_max -= arrays_freed;
481 for (i = 0; i <= ms->
mr_max; i++) {
501 for (i = ms->
rb_max; i >= 0; i--) {
504 if (arrays_used < 2) {
505 MPRINT(
"\tleaving entry %d alone\n", i);
511 MPRINT(
"\tfreeing entry %d\n", i);
517 MPRINT(
"\tstill in use entry %d\n", i);
523 if (arrays_freed > 0) {
524 MPRINT(
"TRYING TO PACK from %d free arrays out of %d\n", arrays_freed, ms->
rb_max + 1);
532 for (i = 0; i <= ms->
rb_max; i++) {
541 if (empty_slot < 0) empty_slot = i;
551 if (empty_slot < 0)
continue;
562 for (j = empty_slot + 1; j <= i; j++) {
574 ms->
rb_max -= arrays_freed;
579 for (i = 0; i <= ms->
rb_max; i++) {
595 largest_free_slot = ms->
rb_max;
599 for (i = 0; i < ms->
rb_max; i++) {
606 if (largest_free_size < free_size) {
607 largest_free_slot = i;
608 largest_free_size = free_size;
718 MPRINT(
"ALLOC after cleanup\n");
735 for (i = ms->
mr_max; i >= 0; i--) {
741 MPRINT(
"ALLOC from changed ring buffer\n");
783 MPRINT(
"ALLOC after doubled message ring\n");
811 if (m->
data)
return m;
823 if (ms->
rb_max == 0)
goto alloc_rb;
841 if (m->
data)
return m;
843 MPRINT(
"CLEANUP RING BUFFER FAILED\n");
859 for (i = ms->
rb_max; i >= 0; i--) {
864 MPRINT(
"MOVED TO RING BUFFER %d\n", i);
887 if (alloc_size < m->rb_size) {
896 MPRINT(
"RING BUFFER DOUBLES\n");
912 if (m->
data)
return m;
915 MPRINT(
"OUT OF MEMORY\n");
963 MPRINT(
"Failed to reserve message\n");
1011 if (!m)
return NULL;
1023 if (actual_packet_size == 0) {
1095 size_t m_rb_size, align_size;
1099 align_size = actual_packet_size;
1135 if (!m2)
return NULL;
1165 if (m2->
data != (m->
data + actual_packet_size)) {
1166 memmove(m2->
data, m->
data + actual_packet_size, leftover);
1188 if (m2->
data != (m->
data + actual_packet_size)) {
1189 memmove(m2->
data, m->
data + actual_packet_size, leftover);
1199 if (m2->
rb != m->
rb) {
1231 for (i = 0; i <= ms->
mr_max; i++) {
1259 for (i = 0; i <= ms->
mr_max; i++) {
1280 fprintf(fp,
"message arrays = %d\t(current %d)\n", ms->
mr_max + 1, ms->
mr_current);
1281 fprintf(fp,
"ring buffers = %d\t(current %d)\n", ms->
rb_max + 1, ms->
rb_current);
1283 for (i = 0; i <= ms->
mr_max; i++) {
1286 fprintf(fp,
"messages[%d] =\tsize %zu, used %zu\n",
1290 for (i = 0; i <= ms->
rb_max; i++) {
1291 fprintf(fp,
"ring buffer[%d] =\tsize %zu, used %zu\n",
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
fr_ring_buffer_t * rb_array[MSG_ARRAY_SIZE]
array of ring buffers
size_t message_size
size of the callers message, including fr_message_t
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
Create a message set.
int mr_current
current used message ring entry
fr_ring_buffer_t * mr_array[MSG_ARRAY_SIZE]
array of message arrays
static void fr_message_gc(fr_message_set_t *ms, int max_to_clean)
Garbage collect "done" messages.
int fr_message_done(fr_message_t *m)
Mark a message as done.
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
static fr_message_t * fr_message_get_ring_buffer(fr_message_set_t *ms, fr_message_t *m, bool cleaned_up)
Get a ring buffer for a message.
fr_message_t * fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
Localize a message by copying it to local storage.
static fr_message_t * fr_message_get_message(fr_message_set_t *ms, bool *p_cleaned)
Allocate a fr_message_t, WITHOUT a ring buffer.
int rb_max
max used ring buffer entry
static fr_message_t * fr_message_ring_alloc(fr_message_set_t *ms, fr_ring_buffer_t *mr, bool clean)
Allocate a message from a message ring.
int fr_message_set_messages_used(fr_message_set_t *ms)
Count the number of used messages.
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
int mr_max
max used message ring entry
void fr_message_set_debug(FILE *fp, fr_message_set_t *ms)
Print debug information about the message set.
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
int mr_cleaned
where we last cleaned
static int fr_message_ring_gc(fr_message_set_t *ms, fr_ring_buffer_t *mr, int max_to_clean)
Clean up messages in a message ring.
fr_message_t * fr_message_alloc_reserve(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size, size_t leftover, size_t reserve_size)
Allocate packet data for a message, and reserve a new message.
size_t max_allocation
maximum allocation size
int rb_current
current used ring buffer entry
A Message set, composed of message headers and ring buffer data.
fr_ring_buffer_t * rb
pointer to the ring buffer
size_t rb_size
cache-aligned size in the ring buffer
uint8_t * data
pointer to the data in the ring buffer
size_t data_size
size of the data in the ring buffer
fr_message_status_t status
free, used, done, etc.
static size_t reserve_size
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
uint8_t * fr_ring_buffer_alloc(fr_ring_buffer_t *rb, size_t size)
Mark data as allocated.
uint8_t * fr_ring_buffer_reserve(fr_ring_buffer_t *rb, size_t size)
Reserve room in the ring buffer.
int fr_ring_buffer_free(fr_ring_buffer_t *rb, size_t size_to_free)
Mark data as free,.
size_t fr_ring_buffer_used(fr_ring_buffer_t *rb)
Get the amount of data used in a ring buffer.
size_t fr_ring_buffer_size(fr_ring_buffer_t *rb)
Get the size of the ring buffer.
int fr_ring_buffer_start(fr_ring_buffer_t *rb, uint8_t **p_start, size_t *p_size)
Get a pointer to the data at the start of the ring buffer.
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
#define fr_strerror_const_push(_msg)
#define fr_strerror_const(_msg)