25RCSID(
"$Id: d6f3e31b83f774bebcfa7d3d2cb46109cc6bb828 $")
27#include <freeradius-devel/io/channel.h>
28#include <freeradius-devel/util/debug.h>
30#ifdef HAVE_STDATOMIC_H
33# include <freeradius-devel/util/stdatomic.h>
40#define MPRINT(...) fprintf(stdout, __VA_ARGS__)
48#define ENABLE_SKIPS (0)
60size_t channel_direction_len =
NUM_ELEMENTS(channel_direction);
64#define SIGNAL_INTERVAL (1000000)
78#define ATOMIC_QUEUE_SIZE (1024)
282 MPRINT(
"Signalling %s, with %s\n",
290#define RTT(_old, _new) fr_time_delta_wrap((fr_time_delta_unwrap(_new) + (fr_time_delta_unwrap(_old) * (IALPHA - 1))) / IALPHA)
325 cd->live.sequence = sequence;
326 cd->live.ack = requestor->
ack;
349 "Channel data timestamp (%" PRId64
") older than last channel data sent (%" PRId64
")",
381 MPRINT(
"REQUESTOR SKIPS signal\n");
394 MPRINT(
"REQUESTOR SIGNALS\n");
451 requestor->
ack = cd->live.sequence;
488 responder->
ack = cd->live.sequence;
531 cd->live.sequence = sequence;
532 cd->live.ack = responder->
ack;
552 "Channel data timestamp (%" PRId64
") older than last channel data sent (%" PRId64
")",
599 if (((responder->
sequence - their_view_of_my_sequence) <= 1000) &&
602 MPRINT(
"\tRESPONDER SKIPS signal\n");
664 MPRINT(
"\tRESPONDER SLEEPING num_outstanding %"PRIu64
", packets in %"PRIu64
", packets out %"PRIu64
"\n", responder->
stats.
outstanding,
696 memcpy(&cc,
data, data_size);
702 *p_channel = ch = cc.
ch;
715 MPRINT(
"channel got %d\n", cs);
724 MPRINT(
"channel got data_done_responder\n");
730 MPRINT(
"channel got responder_sleeping\n");
744 MPRINT(
"REQUESTOR SKIPS signal AFTER CE %d num_outstanding %"PRIu64
"\n", cs, requestor->
stats.
outstanding);
765 MPRINT(
"REQUESTOR SIGNALS AFTER CE %d\n", cs);
829 if (!active)
return 0;
860 if (!active)
return 0;
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
size_t fr_atomic_queue_size(fr_atomic_queue_t *aq)
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
Structure to hold the atomic queue.
#define L(_str)
Helper for initialising arrays of string literals.
fr_atomic_queue_t * aq
The queue of messages - visible only to this channel.
atomic_bool active
Whether the channel is active.
void * fr_channel_requestor_uctx_get(fr_channel_t *ch)
Get network-specific data from a channel.
@ FR_CHANNEL_SIGNAL_DATA_DONE_RESPONDER
@ FR_CHANNEL_SIGNAL_DATA_TO_REQUESTOR
@ FR_CHANNEL_SIGNAL_DATA_TO_RESPONDER
@ FR_CHANNEL_SIGNAL_RESPONDER_SLEEPING
@ FR_CHANNEL_SIGNAL_ERROR
@ FR_CHANNEL_SIGNAL_CLOSE
uint64_t sequence_at_last_signal
When we last signaled.
uint64_t sequence
Sequence number for this channel.
bool must_signal
we need to signal the other end
fr_channel_signal_t signal
the signal to send
fr_table_num_sorted_t const channel_signals[]
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
fr_channel_direction_t direction
Use for debug messages.
size_t channel_signals_len
void * uctx
Worker context.
size_t channel_packet_priority_len
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
fr_table_num_sorted_t const channel_packet_priority[]
static int fr_channel_data_ready(fr_channel_t *ch, fr_time_t when, fr_channel_end_t *end, fr_channel_signal_t which)
Send a message via a kq user signal.
int fr_channel_set_recv_reply(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_reply)
fr_ring_buffer_t * rb
Ring buffer for control-plane messages.
#define ATOMIC_QUEUE_SIZE
Size of the atomic queues.
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
fr_channel_stats_t stats
channel statistics
fr_control_t * control
The control plane, consisting of an atomic queue and kqueue.
bool same_thread
are both ends in the same thread?
int fr_channel_set_recv_request(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_request)
uint64_t ack
or the endpoint..
void * fr_channel_responder_uctx_get(fr_channel_t *ch)
Get responder-specific data from a channel.
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
int fr_channel_null_reply(fr_channel_t *ch)
Don't send a reply message into the channel.
void fr_channel_requestor_uctx_add(fr_channel_t *ch, void *uctx)
Add network-specific data to a channel.
fr_time_delta_t cpu_time
Total time used by the responder for this channel.
int fr_channel_service_kevent(fr_channel_t *ch, fr_control_t *c, UNUSED struct kevent const *kev)
Service a control-plane event.
void fr_channel_responder_uctx_add(fr_channel_t *ch, void *uctx)
Add responder-specific data to a channel.
int fr_channel_responder_sleeping(fr_channel_t *ch)
Signal a channel that the responder is sleeping.
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
fr_channel_end_t end[2]
Two ends of the channel.
void fr_channel_stats_log(fr_channel_t const *ch, fr_log_t const *log, char const *file, int line)
fr_channel_t * ch
the channel
uint64_t their_view_of_my_sequence
Should be clear.
bool fr_channel_active(fr_channel_t *ch)
Check if a channel is active.
fr_time_delta_t processing_time
Time spent by the responder processing requests.
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
uint64_t ack
Sequence number of the other end.
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
fr_channel_recv_callback_t recv
callback for receiving messages
void * recv_uctx
context for receiving messages
A full channel, which consists of two ends.
fr_message_t m
the message header
@ FR_CHANNEL_DATA_READY_REQUESTOR
@ FR_CHANNEL_DATA_READY_RESPONDER
fr_time_delta_t message_interval
Interval between messages.
#define FR_CONTROL_ID_CHANNEL
void(* fr_channel_recv_callback_t)(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
uint64_t packets
Number of actual data packets.
uint64_t resignals
Number of signals resent.
uint64_t outstanding
Number of outstanding requests with no reply.
fr_time_t last_sent_signal
The last time when we signaled the other end.
fr_time_t last_read_other
Last time we successfully read a message from the other the channel.
fr_time_t last_write
Last write to the channel.
uint64_t kevents
Number of times we've looked at kevents.
uint64_t signals
Number of kevent signals we've sent.
Channel information which is added to a message.
Statistics for the channel.
#define FR_CONTROL_MAX_SIZE
#define FR_CONTROL_MAX_MESSAGES
static fr_atomic_queue_t ** aq
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
@ L_INFO
Informational message.
fr_time_t when
when this message was sent
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
#define atomic_store(object, desired)
#define atomic_load(object)
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
An element in a lexicographically sorted array of name to num mappings.
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
static int64_t fr_time_unwrap(fr_time_t time)
#define fr_time_lteq(_a, _b)
#define fr_time_delta_ispos(_a)
#define fr_time_sub(_a, _b)
Subtract one time from another.
A time delta, a difference in time measured in nanoseconds.
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
#define fr_strerror_const_push(_msg)
#define fr_strerror_const(_msg)