25#include <freeradius-devel/bio/bio_priv.h>
26#include <freeradius-devel/bio/queue.h>
27#include <freeradius-devel/bio/null.h>
28#include <freeradius-devel/util/dlist.h>
87 if (!
my->cancel)
return;
89 if (fr_bio_queue_list_num_elements(&
my->pending) == 0)
return;
94 while ((
item = fr_bio_queue_list_pop_head(&
my->pending)) != NULL) {
97 fr_bio_queue_list_insert_head(&
my->free,
item);
119 item = fr_bio_queue_list_pop_head(&
my->free);
127 fr_assert((fr_bio_queue_list_num_elements(&
my->pending) == 0) ||
128 (already_written == 0));
130 item->packet_ctx = packet_ctx;
133 item->already_written = already_written;
134 item->cancelled =
false;
136 fr_bio_queue_list_insert_tail(&
my->pending,
item);
156 fr_assert(fr_bio_queue_list_num_elements(&
my->pending) == 0);
164 rcode = next->
write(next, packet_ctx,
buffer, size);
165 if ((
size_t) rcode == size)
return rcode;
171 if (rcode !=
fr_bio_error(IO_WOULD_BLOCK))
return rcode;
183 if (!
buffer)
return rcode;
191 fr_assert(fr_bio_queue_list_num_elements(&
my->free) > 0);
207 if (fr_bio_queue_list_num_elements(&
my->pending) == 0) {
219 while (written < size) {
226 item = fr_bio_queue_list_head(&
my->pending);
239 if (rcode == 0)
break;
251 item->already_written += rcode;
253 if (
item->already_written <
item->size)
break;
258 if (
item->cancelled) {
264 (void) fr_bio_queue_list_pop_head(&
my->pending);
267 item->packet_ctx = NULL;
269 item->already_written = 0;
271 item->cancelled =
true;
273 fr_bio_queue_list_insert_head(&
my->free,
item);
319 rcode = next->
read(next, packet_ctx,
buffer, size);
320 if (rcode >= 0)
return rcode;
325 if (rcode ==
fr_bio_error(IO_WOULD_BLOCK))
return rcode;
406 if (!
my)
return NULL;
412 fr_bio_queue_list_init(&
my->pending);
413 fr_bio_queue_list_init(&
my->free);
420 my->array[i].my =
my;
421 my->array[i].cancelled =
true;
422 fr_bio_queue_list_insert_tail(&
my->free, &
my->array[i]);
458 if (!(
item >= &
my->array[0]) && (item < &my->array[
my->max_saved])) {
465 if (
item->cancelled)
return 0;
482 if (rcode <= 0)
return -1;
487 item->already_written += rcode;
488 if (
item->already_written <
item->size)
return -1;
498 (void) fr_bio_queue_list_remove(&
my->pending,
item);
499 fr_bio_queue_list_insert_head(&
my->free,
item);
501 if (
my->cancel)
my->cancel(bio,
item->packet_ctx,
item->buffer,
item->size);
static int const char char buffer[256]
fr_bio_write_t _CONST write
write to the underlying bio
fr_bio_read_t _CONST read
read from the underlying bio
static fr_bio_t * fr_bio_next(fr_bio_t *bio)
fr_bio_t * fr_bio_queue_alloc(TALLOC_CTX *ctx, size_t max_saved, fr_bio_queue_saved_t saved, fr_bio_queue_callback_t sent, fr_bio_queue_callback_t cancel, fr_bio_t *next)
Allocate a packet-based bio.
fr_bio_queue_callback_t cancel
static int fr_bio_queue_destructor(fr_bio_queue_t *my)
static ssize_t fr_bio_queue_list_push(fr_bio_queue_t *my, void *packet_ctx, const void *buffer, size_t size, size_t already_written)
Push a packet onto a list.
int fr_bio_queue_cancel(fr_bio_t *bio, fr_bio_queue_entry_t *item)
Cancel the write for a packet.
fr_bio_queue_callback_t sent
static ssize_t fr_bio_queue_write_flush(fr_bio_queue_t *my, size_t size)
Flush the packet list.
struct fr_bio_queue_list_s fr_bio_queue_list_t
static ssize_t fr_bio_queue_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
Read one packet from next bio.
struct fr_bio_queue_s fr_bio_queue_t
fr_bio_queue_saved_t saved
static int fr_bio_queue_shutdown(fr_bio_t *bio)
Shutdown.
static ssize_t fr_bio_queue_write_next(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write one packet to the next bio.
static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write to the packet list buffer.
static void fr_bio_queue_list_cancel(fr_bio_queue_t *my)
Forcibly cancel all outstanding packets.
void(* fr_bio_queue_callback_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
struct fr_bio_queue_entry_s fr_bio_queue_entry_t
void(* fr_bio_queue_saved_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size, fr_bio_queue_entry_t *queue_ctx)
static void fr_bio_chain(fr_bio_t *first, fr_bio_t *second)
Chain one bio after another.
#define FR_BIO_DESTRUCTOR_COMMON
Define a common destructor pattern.
#define FR_DLIST_TYPES(_name)
Define type specific wrapper structs for dlists.
#define FR_DLIST_ENTRY(_name)
Expands to the type name used for the entry wrapper structure.
#define FR_DLIST_FUNCS(_name, _element_type, _element_entry)
Define type specific wrapper functions for dlists.
#define FR_DLIST_HEAD(_name)
Expands to the type name used for the head wrapper structure.
void fr_bio_shutdown & my
int fr_bio_destructor(fr_bio_t *bio)
Free this bio.
static void * item(fr_lst_t const *lst, fr_lst_index_t idx)
ssize_t fr_bio_null_write(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
Always return 0 on write.
ssize_t fr_bio_fail_read(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void *buffer, UNUSED size_t size)
Always return error on read.
ssize_t fr_bio_fail_write(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
Always return 0 on write.