25RCSID(
"$Id: 3c23215e1cdff6a1bb9546a7520a23da7c11a262 $")
27#include <freeradius-devel/io/message.h>
28#include <freeradius-devel/util/strerror.h>
36#define MPRINT(...) fprintf(stderr, __VA_ARGS__)
41#define MSG_ARRAY_SIZE (16)
43#define CACHE_ALIGN(_x) do { _x += 63; _x &= ~(size_t) 63; } while (0)
136 if (num_messages < 8) num_messages = 8;
138 if ((num_messages & (num_messages - 1)) != 0) {
148 if (message_size > 1024) {
179 ms->
max_allocation = unlimited_size ? 1 << 29 : ring_buffer_size / 2;
262 l = talloc_memdup(ctx, m, message_size);
314 int messages_cleaned = 0;
320 if (size == 0)
break;
341 if (messages_cleaned >= max_to_clean)
break;
344 MPRINT(
"CLEANED %d (%p) left\n", messages_cleaned, mr);
345 return messages_cleaned;
361 int arrays_freed, arrays_used, empty_slot;
362 int largest_free_slot;
364 size_t largest_free_size;
374 for (i = 0; i <= ms->
mr_max; i++) {
378 total_cleaned += cleaned;
379 fr_assert(total_cleaned <= max_to_clean);
384 if (total_cleaned == max_to_clean)
break;
390 if (total_cleaned == 0)
return;
399 for (i = ms->
mr_max; i >= 0; i--) {
402 if (arrays_used < 2) {
403 MPRINT(
"\tleaving entry %d alone\n", i);
413 MPRINT(
"\tfreeing entry %d\n", i);
419 MPRINT(
"\tstill in use entry %d\n", i);
427 MPRINT(
"TRYING TO PACK from %d free arrays out of %d\n", arrays_freed, ms->
rb_max + 1);
435 for (i = 0; i <= ms->
mr_max; i++) {
444 if (empty_slot < 0) empty_slot = i;
454 if (empty_slot < 0)
continue;
465 for (j = empty_slot + 1; j <= i; j++) {
477 ms->
mr_max -= arrays_freed;
482 for (i = 0; i <= ms->
mr_max; i++) {
502 for (i = ms->
rb_max; i >= 0; i--) {
505 if (arrays_used < 2) {
506 MPRINT(
"\tleaving entry %d alone\n", i);
512 MPRINT(
"\tfreeing entry %d\n", i);
518 MPRINT(
"\tstill in use entry %d\n", i);
524 if (arrays_freed > 0) {
525 MPRINT(
"TRYING TO PACK from %d free arrays out of %d\n", arrays_freed, ms->
rb_max + 1);
533 for (i = 0; i <= ms->
rb_max; i++) {
542 if (empty_slot < 0) empty_slot = i;
552 if (empty_slot < 0)
continue;
563 for (j = empty_slot + 1; j <= i; j++) {
575 ms->
rb_max -= arrays_freed;
580 for (i = 0; i <= ms->
rb_max; i++) {
596 largest_free_slot = ms->
rb_max;
600 for (i = 0; i < ms->
rb_max; i++) {
607 if (largest_free_size < free_size) {
608 largest_free_slot = i;
609 largest_free_size = free_size;
719 MPRINT(
"ALLOC after cleanup\n");
736 for (i = ms->
mr_max; i >= 0; i--) {
742 MPRINT(
"ALLOC from changed ring buffer\n");
784 MPRINT(
"ALLOC after doubled message ring\n");
812 if (m->
data)
return m;
824 if (ms->
rb_max == 0)
goto alloc_rb;
842 if (m->
data)
return m;
844 MPRINT(
"CLEANUP RING BUFFER FAILED\n");
860 for (i = ms->
rb_max; i >= 0; i--) {
865 MPRINT(
"MOVED TO RING BUFFER %d\n", i);
888 if (alloc_size < m->rb_size) {
897 MPRINT(
"RING BUFFER DOUBLES\n");
913 if (m->
data)
return m;
916 MPRINT(
"OUT OF MEMORY\n");
964 MPRINT(
"Failed to reserve message\n");
1012 if (!m)
return NULL;
1024 if (actual_packet_size == 0) {
1096 size_t m_rb_size, align_size;
1100 align_size = actual_packet_size;
1136 if (!m2)
return NULL;
1166 if (m2->
data != (m->
data + actual_packet_size)) {
1167 memmove(m2->
data, m->
data + actual_packet_size, leftover);
1189 if (m2->
data != (m->
data + actual_packet_size)) {
1190 memmove(m2->
data, m->
data + actual_packet_size, leftover);
1200 if (m2->
rb != m->
rb) {
1232 for (i = 0; i <= ms->
mr_max; i++) {
1260 for (i = 0; i <= ms->
mr_max; i++) {
1281 fprintf(fp,
"message arrays = %d\t(current %d)\n", ms->
mr_max + 1, ms->
mr_current);
1282 fprintf(fp,
"ring buffers = %d\t(current %d)\n", ms->
rb_max + 1, ms->
rb_current);
1284 for (i = 0; i <= ms->
mr_max; i++) {
1287 fprintf(fp,
"messages[%d] =\tsize %zu, used %zu\n",
1291 for (i = 0; i <= ms->
rb_max; i++) {
1292 fprintf(fp,
"ring buffer[%d] =\tsize %zu, used %zu\n",
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
fr_ring_buffer_t * rb_array[MSG_ARRAY_SIZE]
array of ring buffers
size_t message_size
size of the callers message, including fr_message_t
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
Create a message set.
int mr_current
current used message ring entry
fr_ring_buffer_t * mr_array[MSG_ARRAY_SIZE]
array of message arrays
static void fr_message_gc(fr_message_set_t *ms, int max_to_clean)
Garbage collect "done" messages.
int fr_message_done(fr_message_t *m)
Mark a message as done.
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
static fr_message_t * fr_message_get_ring_buffer(fr_message_set_t *ms, fr_message_t *m, bool cleaned_up)
Get a ring buffer for a message.
fr_message_t * fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
Localize a message by copying it to local storage.
static fr_message_t * fr_message_get_message(fr_message_set_t *ms, bool *p_cleaned)
Allocate a fr_message_t, WITHOUT a ring buffer.
int rb_max
max used ring buffer entry
static fr_message_t * fr_message_ring_alloc(fr_message_set_t *ms, fr_ring_buffer_t *mr, bool clean)
Allocate a message from a message ring.
int fr_message_set_messages_used(fr_message_set_t *ms)
Count the number of used messages.
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
int mr_max
max used message ring entry
void fr_message_set_debug(FILE *fp, fr_message_set_t *ms)
Print debug information about the message set.
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
int mr_cleaned
where we last cleaned
static int fr_message_ring_gc(fr_message_set_t *ms, fr_ring_buffer_t *mr, int max_to_clean)
Clean up messages in a message ring.
fr_message_t * fr_message_alloc_reserve(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size, size_t leftover, size_t reserve_size)
Allocate packet data for a message, and reserve a new message.
size_t max_allocation
maximum allocation size
int rb_current
current used ring buffer entry
A Message set, composed of message headers and ring buffer data.
fr_ring_buffer_t * rb
pointer to the ring buffer
size_t rb_size
cache-aligned size in the ring buffer
uint8_t * data
pointer to the data in the ring buffer
size_t data_size
size of the data in the ring buffer
fr_message_status_t status
free, used, done, etc.
static size_t reserve_size
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
uint8_t * fr_ring_buffer_alloc(fr_ring_buffer_t *rb, size_t size)
Mark data as allocated.
uint8_t * fr_ring_buffer_reserve(fr_ring_buffer_t *rb, size_t size)
Reserve room in the ring buffer.
int fr_ring_buffer_free(fr_ring_buffer_t *rb, size_t size_to_free)
Mark data as free,.
size_t fr_ring_buffer_used(fr_ring_buffer_t *rb)
Get the amount of data used in a ring buffer.
size_t fr_ring_buffer_size(fr_ring_buffer_t *rb)
Get the size of the ring buffer.
int fr_ring_buffer_start(fr_ring_buffer_t *rb, uint8_t **p_start, size_t *p_size)
Get a pointer to the data at the start of the ring buffer.
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
#define fr_strerror_const_push(_msg)
#define fr_strerror_const(_msg)