64#include <freeradius-devel/bio/bio_priv.h>
65#include <freeradius-devel/bio/null.h>
66#include <freeradius-devel/bio/buf.h>
67#include <freeradius-devel/util/rb.h>
68#include <freeradius-devel/util/dlist.h>
70#define _BIO_DEDUP_PRIVATE
71#include <freeradius-devel/bio/dedup.h>
170 if (
my->first !=
item)
return;
186 (void) fr_bio_dedup_list_remove(&
my->active,
item);
220 if (!
item->reply || !
item->reply_size)
return 0;
222 switch (
item->state) {
231 (void) fr_bio_dedup_list_remove(&
my->active,
item);
232 fr_bio_dedup_list_insert_tail(&
my->pending,
item);
236 return item->reply_size;
265 fr_bio_dedup_list_insert_tail(&
my->pending,
item);
267 return item->reply_size;
295 if ((
size_t) rcode ==
item->reply_size) {
303 if ((rcode == 0) || (rcode ==
fr_bio_error(IO_WOULD_BLOCK))) {
305 (void) fr_bio_dedup_list_remove(&
my->active,
item);
306 goto save_in_pending;
310 goto move_to_pending;
362 if (first ==
my->first)
return 0;
380 switch (
item->state) {
385 fr_bio_dedup_list_remove(&
my->active,
item);
403 fr_bio_dedup_list_remove(&
my->active,
item);
424 item->packet_ctx = NULL;
427 fr_bio_dedup_list_insert_head(&
my->free,
item);
448 if (fr_bio_dedup_list_num_elements(&
my->pending) == 0)
return 0;
462 while ((
item = fr_bio_dedup_list_pop_head(&
my->pending)) != NULL) {
477 if (rcode <= 0)
return rcode;
482 if ((
size_t) rcode ==
item->reply_size) {
483 (void) fr_bio_dedup_list_remove(&
my->pending,
item);
515 if (!
my->buffer.start ||
549 if (rcode <= 0)
return rcode;
551 my->buffer.read += rcode;
584 if (rcode <= 0)
return rcode;
620 item->packet_ctx = NULL;
623 fr_bio_dedup_list_insert_head(&
my->free,
item);
630 if (rcode < 0)
return rcode;
656 fr_assert((
size_t) rcode < item->reply_size);
660 switch (
item->state) {
662 (void) fr_bio_dedup_list_remove(&
my->active,
item);
679 (void) fr_bio_dedup_list_remove(&
my->pending,
item);
716 if (rcode <= 0)
return rcode;
722 if (rcode < 0)
return rcode;
820 return next->
write(next, packet_ctx, NULL, size);
830 rcode = next->
write(next, packet_ctx,
buffer, size);
831 if (rcode <= 0)
return rcode;
838 if (
my->get_item)
item =
my->get_item(bio, packet_ctx);
839 if ((
size_t) rcode == size) {
869 rcode = next->
read(next, packet_ctx,
buffer, size);
870 if (rcode <= 0)
return rcode;
875 item = fr_bio_dedup_list_pop_head(&
my->free);
881 .packet_ctx = packet_ctx,
883 .packet_size = (
size_t) rcode,
908 if (!
my->receive(bio,
item, packet_ctx)) {
910 fr_bio_dedup_list_insert_head(&
my->free,
item);
914 fr_bio_dedup_list_insert_tail(&
my->active,
item);
957 switch (
item->state) {
970 item->expires = expires;
988 item->expires = expires;
998 if (
my->first !=
item)
return 0;
1052 if (!max_saved)
return NULL;
1053 if (max_saved > 65536)
return NULL;
1056 if (!
my)
return NULL;
1063 if (!items)
return NULL;
1068 fr_bio_dedup_list_init(&
my->free);
1070 for (i = 0; i < max_saved; i++) {
1073 fr_bio_dedup_list_insert_tail(&
my->free, &items[i]);
1076 fr_bio_dedup_list_init(&
my->active);
1077 fr_bio_dedup_list_init(&
my->pending);
1081 my->receive = receive;
1082 my->release = release;
1083 my->get_item = get_item;
static int const char char buffer[256]
fr_bio_write_t _CONST write
write to the underlying bio
fr_bio_read_t _CONST read
read from the underlying bio
static fr_bio_t * fr_bio_next(fr_bio_t *bio)
static void fr_bio_chain(fr_bio_t *first, fr_bio_t *second)
Chain one bio after another.
int fr_bio_buf_alloc(TALLOC_CTX *ctx, fr_bio_buf_t *bio_buf, size_t size)
ssize_t fr_bio_buf_write(fr_bio_buf_t *bio_buf, const void *buffer, size_t size)
static size_t fr_bio_buf_used(fr_bio_buf_t const *bio_buf)
static void fr_bio_buf_reset(fr_bio_buf_t *bio_buf)
static size_t fr_bio_buf_size(fr_bio_buf_t const *bio_buf)
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
ssize_t fr_bio_dedup_respond(fr_bio_t *bio, fr_bio_dedup_entry_t *item)
Resend a reply when we receive a duplicate request.
fr_bio_dedup_release_t release
called to release a packet
static void fr_bio_dedup_timer_reset_item(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item)
static int fr_bio_dedup_buffer_save(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
Save partially written data to our local buffer.
int fr_bio_dedup_entry_extend(fr_bio_t *bio, fr_bio_dedup_entry_t *item, fr_time_t expires)
Extend the expiry time for an entry.
fr_bio_dedup_config_t config
fr_bio_dedup_get_item_t get_item
turn a packet_ctx into a fr_bio_dedup_entry_t
static ssize_t fr_bio_dedup_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
There's a partial packet written.
void fr_bio_dedup_entry_cancel(fr_bio_t *bio, fr_bio_dedup_entry_t *item)
Cancel one item.
static ssize_t fr_bio_dedup_write_data(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
There's a partial block of data written.
fr_bio_dedup_entry_t * partial
fr_event_timer_t const * ev
fr_bio_t * fr_bio_dedup_alloc(TALLOC_CTX *ctx, size_t max_saved, fr_bio_dedup_receive_t receive, fr_bio_dedup_release_t release, fr_bio_dedup_get_item_t get_item, fr_bio_dedup_config_t const *cfg, fr_bio_t *next)
Allocate a fr_bio_dedup_t.
static int fr_bio_dedup_destructor(fr_bio_dedup_t *my)
Remove the dedup cache.
static void fr_bio_dedup_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx)
Expire an entry when its timer fires.
static ssize_t fr_bio_dedup_flush_pending(fr_bio_dedup_t *my)
Flush any packets in the pending queue.
static ssize_t fr_bio_dedup_blocked(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item, ssize_t rcode)
The write is blocked.
fr_rb_tree_t rb
expire list
static void fr_bio_dedup_replied(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item)
Move an item from active to replied.
fr_bio_dedup_receive_t receive
called when we receive a potentially new packet
static ssize_t fr_bio_dedup_blocked_data(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
The write is blocked, but we don't have "item".
static int8_t _entry_cmp(void const *one, void const *two)
static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write raw data to the bio.
fr_bio_dedup_entry_t * first
static ssize_t fr_bio_dedup_buffer_write(fr_bio_dedup_t *my)
Write data from our local buffer to the next bio.
struct fr_bio_dedup_list_s fr_bio_dedup_list_t
@ FR_BIO_DEDUP_STATE_PARTIAL
Partially written.
@ FR_BIO_DEDUP_STATE_FREE
@ FR_BIO_DEDUP_STATE_ACTIVE
Received, but not replied.
@ FR_BIO_DEDUP_STATE_CANCELLED
Partially written, and then cancelled.
@ FR_BIO_DEDUP_STATE_PENDING
Have a reply, but we're trying to write it out.
@ FR_BIO_DEDUP_STATE_REPLIED
Replied, and waiting for it to expire.
static int fr_bio_dedup_timer_reset(fr_bio_dedup_t *my)
Reset the timer after changing the rb tree.
static ssize_t fr_bio_dedup_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
static void fr_bio_dedup_release(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item, fr_bio_dedup_release_reason_t reason)
Release an entry back to the free list.
void * uctx
user-writable context
void(* fr_bio_dedup_release_t)(fr_bio_t *bio, fr_bio_dedup_entry_t *dedup_ctx, fr_bio_dedup_release_reason_t reason)
Callback on release the packet (timeout, or cancelled by the application)
fr_event_list_t * el
event list
fr_bio_dedup_t * my
so we can get to it from the event timer callback
fr_time_t expires
when this entry expires
size_t packet_size
size of the cached packet data
fr_bio_dedup_state_t state
which tree or list this item is in
struct fr_bio_dedup_entry_s fr_bio_dedup_entry_t
void * packet_ctx
packet_ctx for dedup purposes
uint8_t * reply
reply cached by the application
size_t reply_size
size of the cached reply
void * reply_ctx
reply ctx
uint8_t * packet
cached packet data for finding duplicates
fr_rb_node_t dedup
user managed dedup node
bool(* fr_bio_dedup_receive_t)(fr_bio_t *bio, fr_bio_dedup_entry_t *dedup_ctx, void *packet_ctx)
Callback on read to see if we should receive the packet.
fr_bio_dedup_entry_t *(* fr_bio_dedup_get_item_t)(fr_bio_t *bio, void *packet_ctx)
fr_bio_dedup_release_reason_t
@ FR_BIO_DEDUP_WRITE_ERROR
#define FR_DLIST_TYPES(_name)
Define type specific wrapper structs for dlists.
#define FR_DLIST_ENTRY(_name)
Expands to the type name used for the entry wrapper structure.
#define FR_DLIST_FUNCS(_name, _element_type, _element_entry)
Define type specific wrapper functions for dlists.
#define FR_DLIST_HEAD(_name)
Expands to the type name used for the head wrapper structure.
#define fr_event_timer_at(...)
Stores all information relating to an event list.
static void * item(fr_lst_t const *lst, fr_lst_index_t idx)
void * fr_rb_iter_init_inorder(fr_rb_iter_inorder_t *iter, fr_rb_tree_t *tree)
Initialise an in-order iterator.
void fr_rb_iter_delete_inorder(fr_rb_iter_inorder_t *iter)
Remove the current node from the tree.
void * fr_rb_remove_by_inline_node(fr_rb_tree_t *tree, fr_rb_node_t *node)
Remove an entry from the tree, using the node structure, without freeing the data.
void * fr_rb_first(fr_rb_tree_t *tree)
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
#define fr_rb_inline_init(_tree, _type, _field, _data_cmp, _data_free)
Initialises a red black tree.
Iterator structure for in-order traversal of an rbtree.
The main red black tree structure.
#define fr_time()
Allow us to arbitrarily manipulate time.
static int talloc_const_free(void const *ptr)
Free const'd memory.
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
static fr_time_t fr_time_add_time_delta(fr_time_t a, fr_time_delta_t b)
#define fr_time_lteq(_a, _b)
#define fr_time_add(_a, _b)
Add a time/time delta together.
#define fr_time_gt(_a, _b)
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
static fr_event_list_t * el