25RCSID(
"$Id: b7df5c377c5e3778bcc5c2b7a7185d59bf6bed06 $")
27#include <freeradius-devel/io/channel.h>
28#include <freeradius-devel/io/control.h>
29#include <freeradius-devel/util/log.h>
30#include <freeradius-devel/util/debug.h>
32#ifdef HAVE_STDATOMIC_H
35# include <freeradius-devel/util/stdatomic.h>
42#define MPRINT(...) fprintf(stdout, __VA_ARGS__)
50#define ENABLE_SKIPS (0)
62size_t channel_direction_len =
NUM_ELEMENTS(channel_direction);
66#define SIGNAL_INTERVAL (1000000)
80#define ATOMIC_QUEUE_SIZE (1024)
284 MPRINT(
"Signalling %s, with %s\n",
292#define RTT(_old, _new) fr_time_delta_wrap((fr_time_delta_unwrap(_new) + (fr_time_delta_unwrap(_old) * (IALPHA - 1))) / IALPHA)
327 cd->live.sequence = sequence;
328 cd->live.ack = requestor->
ack;
351 "Channel data timestamp (%" PRId64
") older than last channel data sent (%" PRId64
")",
383 MPRINT(
"REQUESTOR SKIPS signal\n");
396 MPRINT(
"REQUESTOR SIGNALS\n");
453 requestor->
ack = cd->live.sequence;
490 responder->
ack = cd->live.sequence;
533 cd->live.sequence = sequence;
534 cd->live.ack = responder->
ack;
554 "Channel data timestamp (%" PRId64
") older than last channel data sent (%" PRId64
")",
601 if (((responder->
sequence - their_view_of_my_sequence) <= 1000) &&
604 MPRINT(
"\tRESPONDER SKIPS signal\n");
666 MPRINT(
"\tRESPONDER SLEEPING num_outstanding %"PRIu64
", packets in %"PRIu64
", packets out %"PRIu64
"\n", responder->
stats.
outstanding,
698 memcpy(&cc,
data, data_size);
704 *p_channel = ch = cc.
ch;
717 MPRINT(
"channel got %d\n", cs);
726 MPRINT(
"channel got data_done_responder\n");
732 MPRINT(
"channel got responder_sleeping\n");
746 MPRINT(
"REQUESTOR SKIPS signal AFTER CE %d num_outstanding %"PRIu64
"\n", cs, requestor->
stats.
outstanding);
767 MPRINT(
"REQUESTOR SIGNALS AFTER CE %d\n", cs);
831 if (!active)
return 0;
862 if (!active)
return 0;
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
size_t fr_atomic_queue_size(fr_atomic_queue_t *aq)
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
Structure to hold the atomic queue.
#define L(_str)
Helper for initialising arrays of string literals.
fr_atomic_queue_t * aq
The queue of messages - visible only to this channel.
atomic_bool active
Whether the channel is active.
void * fr_channel_requestor_uctx_get(fr_channel_t *ch)
Get network-specific data from a channel.
@ FR_CHANNEL_SIGNAL_DATA_DONE_RESPONDER
@ FR_CHANNEL_SIGNAL_DATA_TO_REQUESTOR
@ FR_CHANNEL_SIGNAL_DATA_TO_RESPONDER
@ FR_CHANNEL_SIGNAL_RESPONDER_SLEEPING
@ FR_CHANNEL_SIGNAL_ERROR
@ FR_CHANNEL_SIGNAL_CLOSE
uint64_t sequence_at_last_signal
When we last signaled.
uint64_t sequence
Sequence number for this channel.
bool must_signal
we need to signal the other end
fr_channel_signal_t signal
the signal to send
fr_table_num_sorted_t const channel_signals[]
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
fr_channel_direction_t direction
Use for debug messages.
size_t channel_signals_len
void * uctx
Worker context.
size_t channel_packet_priority_len
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
fr_table_num_sorted_t const channel_packet_priority[]
static int fr_channel_data_ready(fr_channel_t *ch, fr_time_t when, fr_channel_end_t *end, fr_channel_signal_t which)
Send a message via a kq user signal.
int fr_channel_set_recv_reply(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_reply)
fr_ring_buffer_t * rb
Ring buffer for control-plane messages.
#define ATOMIC_QUEUE_SIZE
Size of the atomic queues.
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
fr_channel_stats_t stats
channel statistics
fr_control_t * control
The control plane, consisting of an atomic queue and kqueue.
bool same_thread
are both ends in the same thread?
int fr_channel_set_recv_request(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_request)
uint64_t ack
or the endpoint..
void * fr_channel_responder_uctx_get(fr_channel_t *ch)
Get responder-specific data from a channel.
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
int fr_channel_null_reply(fr_channel_t *ch)
Don't send a reply message into the channel.
void fr_channel_requestor_uctx_add(fr_channel_t *ch, void *uctx)
Add network-specific data to a channel.
fr_time_delta_t cpu_time
Total time used by the responder for this channel.
int fr_channel_service_kevent(fr_channel_t *ch, fr_control_t *c, UNUSED struct kevent const *kev)
Service a control-plane event.
void fr_channel_responder_uctx_add(fr_channel_t *ch, void *uctx)
Add responder-specific data to a channel.
int fr_channel_responder_sleeping(fr_channel_t *ch)
Signal a channel that the responder is sleeping.
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
fr_channel_end_t end[2]
Two ends of the channel.
void fr_channel_stats_log(fr_channel_t const *ch, fr_log_t const *log, char const *file, int line)
fr_channel_t * ch
the channel
uint64_t their_view_of_my_sequence
Should be clear.
bool fr_channel_active(fr_channel_t *ch)
Check if a channel is active.
fr_time_delta_t processing_time
Time spent by the responder processing requests.
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
uint64_t ack
Sequence number of the other end.
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
fr_channel_recv_callback_t recv
callback for receiving messages
void * recv_uctx
context for receiving messages
A full channel, which consists of two ends.
fr_message_t m
the message header
@ FR_CHANNEL_DATA_READY_REQUESTOR
@ FR_CHANNEL_DATA_READY_RESPONDER
fr_time_delta_t message_interval
Interval between messages.
void(* fr_channel_recv_callback_t)(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
uint64_t packets
Number of actual data packets.
uint64_t resignals
Number of signals resent.
uint64_t outstanding
Number of outstanding requests with no reply.
fr_time_t last_sent_signal
The last time when we signaled the other end.
fr_time_t last_read_other
Last time we successfully read a message from the other the channel.
fr_time_t last_write
Last write to the channel.
uint64_t kevents
Number of times we've looked at kevents.
uint64_t signals
Number of kevent signals we've sent.
Channel information which is added to a message.
Statistics for the channel.
#define FR_CONTROL_ID_CHANNEL
#define FR_CONTROL_MAX_SIZE
#define FR_CONTROL_MAX_MESSAGES
static fr_atomic_queue_t * aq
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
@ L_INFO
Informational message.
fr_time_t when
when this message was sent
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
#define fr_time()
Allow us to arbitrarily manipulate time.
#define atomic_store(object, desired)
#define atomic_load(object)
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
An element in a lexicographically sorted array of name to num mappings.
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
static int64_t fr_time_unwrap(fr_time_t time)
#define fr_time_lteq(_a, _b)
#define fr_time_delta_ispos(_a)
#define fr_time_sub(_a, _b)
Subtract one time from another.
A time delta, a difference in time measured in nanoseconds.
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
#define fr_strerror_const_push(_msg)
#define fr_strerror_const(_msg)