The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
Data Structures | Macros | Typedefs | Functions
message.c File Reference

Messages for inter-thread communication. More...

#include <freeradius-devel/io/message.h>
#include <freeradius-devel/util/strerror.h>
+ Include dependency graph for message.c:

Go to the source code of this file.

Data Structures

struct  fr_message_set_s
 A Message set, composed of message headers and ring buffer data. More...
 

Macros

#define CACHE_ALIGN(_x)   do { _x += 63; _x &= ~(size_t) 63; } while (0)
 
#define MPRINT(...)
 
#define MSG_ARRAY_SIZE   (16)
 

Typedefs

typedef uint8_t *(* fr_ring_buffer_op_t) (fr_ring_buffer_t *rb, size_t size)
 

Functions

static uint8_t_message_ring_find (bool *p_cleaned, fr_message_set_t *ms, fr_ring_buffer_op_t op)
 Find a message ring with space, performing GC and expansion as needed.
 
fr_message_tfr_message_and_data_alloc (fr_message_set_t *ms, size_t size)
 Reserve and commit a message atomically.
 
fr_message_tfr_message_and_data_commit (fr_message_set_t *ms, fr_message_t *m, size_t total_size)
 Commit a previously reserved message, allocating exactly total_size bytes of packet data.
 
fr_message_tfr_message_and_data_commit_with_leftover (fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size, size_t leftover, size_t reserve_size)
 Allocate packet data for a message, and reserve a new message.
 
fr_message_tfr_message_and_data_reserve (fr_message_set_t *ms, size_t reserve_size)
 Reserve a message.
 
void fr_message_and_data_reset (fr_message_set_t *ms, fr_message_t *m)
 Cancel a reservation made by fr_message_and_data_reserve(), returning the slot to the set.
 
int fr_message_done (fr_message_t *m)
 Mark a message as done.
 
fr_message_tfr_message_localize (TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
 Localize a message by copying it to local storage.
 
fr_message_set_tfr_message_set_create (TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
 Create a message set.
 
void fr_message_set_debug (FILE *fp, fr_message_set_t *ms)
 Print debug information about the message set.
 
void fr_message_set_gc (fr_message_set_t *ms)
 Garbage collect the message set.
 
int fr_message_set_messages_used (fr_message_set_t *ms)
 Count the number of used messages.
 
static fr_message_tmessage_alloc (fr_message_set_t *ms, bool *p_cleaned)
 Allocate a fr_message_t, WITHOUT a ring buffer.
 
static fr_message_tmessage_data_reserve (fr_message_set_t *ms, fr_message_t *m, bool cleaned_up)
 Reserve data in a ring buffer for this message.
 
static int message_finalise (fr_message_set_t *ms, fr_message_t *m)
 Finalise a reserved message allocation.
 
static void message_gc (fr_message_set_t *ms, int max_to_clean)
 Garbage collect "done" messages.
 
static fr_message_tmessage_reserve (fr_message_set_t *ms, bool *p_cleaned)
 Reserve a fr_message_t, WITHOUT a ring buffer.
 
static int message_ring_gc (fr_message_set_t *ms, fr_ring_buffer_t *mr, int max_to_clean)
 Clean up messages in a message ring.
 

Detailed Description

Messages for inter-thread communication.

Id
aab82433d7163a5110f15178d907c4ce0562cc5d

Definition in file message.c.


Data Structure Documentation

◆ fr_message_set_s

struct fr_message_set_s

A Message set, composed of message headers and ring buffer data.

A message set is intended to send short-lived messages. The message headers are fixed in size, and allocated from an array which is treated like a circular buffer. Message bodies (i.e. raw packets) are variable in size, and live in a separate ring buffer.

The message set starts off with a small array of message headers, and a small ring buffer. If an array/buffer fills up, a new one is allocated at double the size of the previous one.

The array / buffers are themselves kept in fixed-size arrays, of MSG_ARRAY_SIZE. The reason is that the memory for fr_message_set_t should be contiguous, and not in a linked list scattered in memory.

The originator allocates a message, and sends it to a recipient. The recipient (usually in another thread) uses the message, and marks it as FR_MESSAGE_DONE. The originator then asynchronously cleans up the message.

This asynchronous cleanup is done via self-clocking. If there is no need to clean up the messages, it isn't done. Only when we run out of space to store messages (or packets) is the cleanup done.

This cleanup latency ensures that we don't have cache line bouncing, where the originator sends the message, and then while the recipieent is reading it... thrashes the cache line with checks for "are you done? Are you done?"

If there are more than one used entry in either array, we then try to coalesce the buffers on cleanup. If we discover that one array is empty, we discard it, and move the used array entries into it's place.

This process ensures that we don't have too many buffers in progress. It is better to have a few large buffers than many small ones.

MSG_ARRAY_SIZE is defined to be large (16 doublings) to allow for the edge case where messages are stuck for long periods of time.

With an initial message array size of 64, this gives us room for 2M packets, if all of the mr_array entries have packets stuck in them that aren't cleaned up for extended periods of time.

Todo:
Add a flag for UDP-style protocols, where we can put the message into the ring buffer. This helps with locality of reference, and removes the need to track two separate things.

Definition at line 94 of file message.c.

+ Collaboration diagram for fr_message_set_s:
Data Fields
int allocated
int freed
size_t max_allocation maximum allocation size
size_t message_size size of the callers message, including fr_message_t
fr_ring_buffer_t * mr_array[MSG_ARRAY_SIZE] array of message arrays
int mr_cleaned where we last cleaned
int mr_current current used message ring entry
int mr_max max used message ring entry
fr_ring_buffer_t * rb_array[MSG_ARRAY_SIZE] array of ring buffers
int rb_current current used ring buffer entry
int rb_max max used ring buffer entry

Macro Definition Documentation

◆ CACHE_ALIGN

#define CACHE_ALIGN (   _x)    do { _x += 63; _x &= ~(size_t) 63; } while (0)

Definition at line 42 of file message.c.

◆ MPRINT

#define MPRINT (   ...)

Definition at line 37 of file message.c.

◆ MSG_ARRAY_SIZE

#define MSG_ARRAY_SIZE   (16)

Definition at line 40 of file message.c.

Typedef Documentation

◆ fr_ring_buffer_op_t

typedef uint8_t *(* fr_ring_buffer_op_t) (fr_ring_buffer_t *rb, size_t size)

Definition at line 617 of file message.c.

Function Documentation

◆ _message_ring_find()

static uint8_t * _message_ring_find ( bool p_cleaned,
fr_message_set_t ms,
fr_ring_buffer_op_t  op 
)
inlinestatic

Find a message ring with space, performing GC and expansion as needed.

The caller passes in the operation to perform on the ring buffer (fr_ring_buffer_alloc or fr_ring_buffer_reserve). This function handles GC, searching across all message rings, and doubling the ring size when all are full.

Parameters
[out]p_cleaneda flag to indicate if we cleaned the message array
[in]msthe message set
[in]opthe ring buffer operation (alloc or reserve)
Returns
  • NULL on error
  • pointer to message-sized region on success

Definition at line 634 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ fr_message_and_data_alloc()

fr_message_t * fr_message_and_data_alloc ( fr_message_set_t ms,
size_t  size 
)

Reserve and commit a message atomically.

Combines fr_message_and_data_reserve() and fr_message_and_data_commit() into a single call for callers that know the exact packet size upfront.

Parameters
[in]msthe message set
[in]sizepacket size in bytes
Returns
  • NULL on error
  • fr_message_t* on success

Definition at line 1108 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ fr_message_and_data_commit()

fr_message_t * fr_message_and_data_commit ( fr_message_set_t ms,
fr_message_t m,
size_t  total_size 
)

Commit a previously reserved message, allocating exactly total_size bytes of packet data.

Commits both the message ring slot and the data ring buffer. total_size is the final packet size; it may be less than the original reservation. For streaming reads where m->data_size is already non-zero (leftover bytes from a prior read), total_size must include those bytes.

Parameters
[in]msthe message set
[in]mthe previously reserved message (m is NOT talloc'd)
[in]total_sizefinal packet size in bytes
Returns
  • NULL on error, input message m is left alone
  • m on success

Definition at line 1051 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ fr_message_and_data_commit_with_leftover()

fr_message_t * fr_message_and_data_commit_with_leftover ( fr_message_set_t ms,
fr_message_t m,
size_t  actual_packet_size,
size_t  leftover,
size_t  reserve_size 
)

Allocate packet data for a message, and reserve a new message.

This function allocates a previously reserved message, and then reserves a new message.

The application should call fr_message_and_data_reserve() with a large buffer, and then read data into the buffer. If the buffer contains multiple packets, the application should call fr_message_and_data_commit_with_leftover() repeatedly to allocate the full packets, while reserving room for the partial packet.

When the application is determines that there is only one full packet, and one partial packet in the buffer, it should call this function with actual_packet_size, and a large reserve_size. The partial packet will be reserved. If the ring buffer is full, the partial packet will be copied to a new ring buffer.

When the application determines that there are multiple full packets in the buffer, it should call this function with actual_packet_size for each buffer, and reserve_size which reserves all of the data in the buffer. i.e. the full packets + partial packets, which should start off as the original reserve_size.

The application should call this function to allocate each packet, while decreasing reserve_size by each actual_packet_size that was allocated. Once there is only one full and a partial packet in the buffer, it should use a large reserve_size, as above.

The application could just always ecall this function with a large reserve_size, at the cost of substantially more memcpy()s.

Parameters
[in]msthe message set
[in]mthe message message to allocate packet data for
[in]actual_packet_sizeto use
[in]leftover"dirty" bytes in the buffer
[in]reserve_sizeto reserve for new message
Returns
  • NULL on error, and input message m is left alone
  • fr_message_t* on success. Will always be a new message.

Definition at line 1159 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ fr_message_and_data_reserve()

fr_message_t * fr_message_and_data_reserve ( fr_message_set_t ms,
size_t  reserve_size 
)

Reserve a message.

A later call to fr_message_and_data_commit() will commit the reservation. This call just reserves a message header and space for the packet data.

If the caller later decides that the message is not needed, he should call fr_message_free() to free the message.

We assume that the caller will call fr_message_and_data_reserve(), and then almost immediately fr_message_and_data_commit(). Multiple calls in series to fr_message_and_data_reserve() MUST NOT be done. The caller could also just call fr_ring_buffer_alloc(m->rb, size) if they wanted, and then update m->data_size by hand...

The message is returned

Parameters
[in]msthe message set
[in]reserve_sizeto reserve
Returns
  • NULL on error
  • fr_message_t* on success

Definition at line 973 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ fr_message_and_data_reset()

void fr_message_and_data_reset ( fr_message_set_t ms,
fr_message_t m 
)

Cancel a reservation made by fr_message_and_data_reserve(), returning the slot to the set.

Both rings are uncommitted at this point (write_offset was not advanced). Clearing the message fields is sufficient: the next reserve overwrites the slot, and the data ring's reserved field is replaced by the new reservation size.

Callers MUST verify m->data_size == 0 before calling this. If a concurrent fr_message_and_data_alloc() aliased this reservation (same ring slot, same data address), m->data_size will be non-zero and the slot is already committed; calling this function in that case corrupts the committed message.

Parameters
[in]msthe message set the reservation was made against.
[in]mthe previously reserved message to cancel.

Definition at line 1026 of file message.c.

+ Here is the caller graph for this function:

◆ fr_message_done()

int fr_message_done ( fr_message_t m)

Mark a message as done.

Note that this call is usually done from a thread OTHER than the originator of the message. As such, the message is NOT actually freed. Instead, it is just marked as freed.

Parameters
[in]mthe message to make as done.
Returns
  • <0 on error
  • 0 on success

Definition at line 195 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ fr_message_localize()

fr_message_t * fr_message_localize ( TALLOC_CTX *  ctx,
fr_message_t m,
size_t  message_size 
)

Localize a message by copying it to local storage.

This function "localizes" a message by copying it to local storage. In the case where the recipient of a message has to sit on it for a while, that blocks the originator from cleaning up the message. The recipient can then copy the message to local storage, so that the originator can clean it up.

The localized message is marked as FR_MESSAGE_LOCALIZED, so that the recipient can call the normal fr_message_done() function to free it.

Parameters
[in]ctxthe talloc context to use for localization
[in]mthe message to be localized
[in]message_sizethe size of the message, including the fr_message_t
Returns
  • NULL on allocation error
  • a newly localized message

Definition at line 247 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ fr_message_set_create()

fr_message_set_t * fr_message_set_create ( TALLOC_CTX *  ctx,
int  num_messages,
size_t  message_size,
size_t  ring_buffer_size,
bool  unlimited_size 
)

Create a message set.

Parameters
[in]ctxthe context for talloc
[in]num_messagessize of the initial message array. MUST be a power of 2.
[in]message_sizethe size of each message, INCLUDING fr_message_t, which MUST be at the start of the struct
[in]ring_buffer_sizeof the ring buffer. MUST be a power of 2.
[in]unlimited_sizeallow any message size to be allocated. If false it is limited to ring_buffer_size / 2.
Returns
  • NULL on error
  • newly allocated fr_message_set_t on success

Definition at line 127 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ fr_message_set_debug()

void fr_message_set_debug ( FILE *  fp,
fr_message_set_t ms 
)

Print debug information about the message set.

Parameters
[in]msthe message set
[in]fpthe FILE where the messages are printed.

Definition at line 1345 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ fr_message_set_gc()

void fr_message_set_gc ( fr_message_set_t ms)

Garbage collect the message set.

This function should ONLY be called just before freeing the message set. It is intended only for debugging, and will cause huge latency spikes if used at run time.

Parameters
[in]msthe message set

Definition at line 1321 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ fr_message_set_messages_used()

int fr_message_set_messages_used ( fr_message_set_t ms)

Count the number of used messages.

Parameters
[in]msthe message set
Returns
  • number of used messages

Definition at line 1295 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ message_alloc()

static fr_message_t * message_alloc ( fr_message_set_t ms,
bool p_cleaned 
)
static

Allocate a fr_message_t, WITHOUT a ring buffer.

Parameters
[in]msthe message set
[out]p_cleaneda flag to indicate if we cleaned the message array
Returns
  • NULL on error
  • fr_message_t* on success

Definition at line 747 of file message.c.

+ Here is the call graph for this function:

◆ message_data_reserve()

static fr_message_t * message_data_reserve ( fr_message_set_t ms,
fr_message_t m,
bool  cleaned_up 
)
static

Reserve data in a ring buffer for this message.

Parameters
[in]msthe message set
[in]mthe message
[in]cleaned_upwhether the message set was partially garbage collected
Returns
  • NULL on error, and m is deallocated
  • m on success

Definition at line 827 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ message_finalise()

static int message_finalise ( fr_message_set_t ms,
fr_message_t m 
)
static

Finalise a reserved message allocation.

Note
This finalises the allocation of a previous message. No additional reserve calls can be made between the one that initialised memory for m, and this finalise call.
Parameters
[in]msthe message set to finalise the allocation in.
[in]mthe message to perform the allocation for.
Returns
  • <0 on error
  • 0 on success.

Definition at line 798 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ message_gc()

static void message_gc ( fr_message_set_t ms,
int  max_to_clean 
)
static

Garbage collect "done" messages.

Called only from the originating thread. We also clean a limited number of messages at a time, so that we don't have sudden latency spikes when cleaning 1M messages.

Parameters
[in]msthe message set
[in]max_to_cleanthe maximum number of messages to clean

Definition at line 357 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ message_reserve()

static fr_message_t * message_reserve ( fr_message_set_t ms,
bool p_cleaned 
)
static

Reserve a fr_message_t, WITHOUT a ring buffer.

Note
This only reserves the message, i.e. checks there's space, and initialises the message memory, it does not perform the actual allocation. This is useful if there are error paths which could result in the message needing to be returned to the message set without being used. Multiple pending reservations are NOT permitted.
Parameters
[in]msthe message set to reserve the message in.
[out]p_cleaneda flag to indicate if we cleaned the message array
Returns
  • NULL on error
  • fr_message_t* on success

Definition at line 774 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ message_ring_gc()

static int message_ring_gc ( fr_message_set_t ms,
fr_ring_buffer_t mr,
int  max_to_clean 
)
static

Clean up messages in a message ring.

Find the oldest messages which are marked FR_MESSAGE_DONE, and mark them FR_MESSAGE_FREE.

FIXME: If we care, track which ring buffer is in use, and how many contiguous chunks we can free. Then, free the chunks at once, instead of piecemeal. Realistically tho... this will probably make little difference.

Parameters
[in]msthe message set
[in]mrthe message ring
[in]max_to_cleanmaximum number of messages to clean at a time.

Definition at line 311 of file message.c.

+ Here is the call graph for this function:
+ Here is the caller graph for this function: