25RCSID(
"$Id: 7fc62cb27893f4f8957139302e4609b18b2565cf $")
27#include <freeradius-devel/io/listen.h>
28#include <freeradius-devel/io/schedule.h>
29#include <freeradius-devel/io/thread.h>
30#include <freeradius-devel/io/coord_priv.h>
31#include <freeradius-devel/unlang/base.h>
32#include <freeradius-devel/util/syserror.h>
36#define FR_CONTROL_ID_COORD_WORKER_ATTACH (1)
37#define FR_CONTROL_ID_COORD_WORKER_DETACH (2)
38#define FR_CONTROL_ID_COORD_WORKER_ACK (3)
39#define FR_CONTROL_ID_COORD_DATA (4)
118static int8_t
coord_cmp(
void const *one,
void const *two)
179 if (
sc->coord_reg == coord_reg) {
180 if ((ret = pthread_join(
sc->thread.pthread_id, NULL)) != 0) {
183 DEBUG2(
"Coordinator %s joined (cleaned up)", coord_reg->
name);
208 memcpy(&cm,
data, data_size);
212 DEBUG3(
"Coordinator %s got data from worker %d for callback %d",
216 ERROR(
"Received data for callback %d which is not defined", cd->
coord_cb_id);
239 memcpy(&cm,
data, data_size);
246 ERROR(
"Received message for callback %d which is not defined", cd->
coord_cb_id);
312 bool single_thread,
uint32_t max_workers)
322 .coord_reg = coord_reg,
323 .single_thread = single_thread,
324 .max_workers = max_workers
386 if (!coord->
cb_inst[i])
goto fail;
415 DEBUG4(
"Gathering events");
417 DEBUG4(
"%u event(s) pending%s",
418 num_events == -1 ? 0 : num_events, num_events == -1 ?
" - event loop exiting" :
"");
419 if (num_events < 0)
break;
424 if (num_events > 0) {
425 DEBUG4(
"Servicing event(s)");
448 char coordinate_name[64];
450 snprintf(coordinate_name,
sizeof(coordinate_name),
"Coordinate %s", coord_reg->
name);
456 PERROR(
"%s - Failed creating coordinator thread", coordinate_name);
505 sc->thread.id = num++;
506 sc->coord_reg = coord_reg;
512 PERROR(
"Failed creating coordinator %s", coord_reg->name);
523 ERROR(
"Failed creating coordinator threads");
565 char coordinate_name[64];
568 snprintf(coordinate_name,
sizeof(coordinate_name),
"Coordinator %s", coord_reg->name);
570 INFO(
"%s - Starting", coordinate_name);
574 PERROR(
"%s - Failed creating coordinator thread", coordinate_name);
595 msg->exiting = exiting;
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
Structure to hold the atomic queue.
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
#define FR_CONTROL_MAX_SIZE
#define FR_CONTROL_MAX_MESSAGES
static size_t num_workers
static fr_atomic_queue_t ** aq
fr_dlist_t entry
Entry in list of registrations.
static void coord_worker_attach(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
Callback run by a coordinator when a worker attaches.
fr_atomic_queue_t * worker_recv_aq
Atomic queue for coordinator -> worker messages.
uint32_t max_workers
Maximum number of workers which will connect to this coordinator.
fr_control_t ** coord_send_control
Control planes for coordinator -> worker messages.
#define FR_CONTROL_ID_COORD_WORKER_DETACH
Message sent from worker to detach from a coordinator.
static void coordinate_worker_ack(UNUSED void *ctx, NDEBUG_UNUSED void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
A worker got an ack from a coordinator in response to attach / detach.
fr_coord_reg_t * coord_reg
Coordinator registration details.
fr_coord_worker_cb_reg_t * worker_cb
Callbacks for coordinator -> worker messages.
int fr_coords_create(TALLOC_CTX *ctx, fr_event_list_t *el)
Start coordinators in single threaded mode.
fr_coord_reg_t * fr_coord_register(TALLOC_CTX *ctx, fr_coord_reg_ctx_t *reg_ctx)
Register a coordinator.
fr_control_t * worker_recv_control
Control plane to send messages to this worker.
static fr_dlist_head_t * coord_threads
static fr_coord_t * fr_coord_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coord_reg_t *coord_reg, bool single_thread, uint32_t max_workers)
Create a coordinator from its registration.
fr_coord_worker_t * fr_coord_attach(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coord_reg_t *coord_reg)
Attach a worker to a coordinator.
fr_coord_reg_t * coord_reg
Coordinator registration details.
size_t worker_send_size
Initial size for worker -> coordinator ring buffer.
size_t coord_send_size
Initial size for coordinator -> worker ring buffer.
uint32_t worker
Worker ID.
int fr_coord_to_worker_send(fr_coord_t *coord, uint32_t worker_id, uint32_t cb_id, fr_dbuff_t *dbuff)
Send generic data from a coordinator to a worker.
fr_ring_buffer_t ** coord_send_rb
Ring buffers for coordinator -> worker control messages.
uint32_t max_workers
Maximum number of workers we expect.
bool exiting
Is the server exiting.
fr_atomic_queue_t * worker_recv_aq
Atomic queue to send data to this worker.
static void coord_worker_data_recv(void *ctx, void const *data, size_t data_size, fr_time_t now)
Callback for a worker receiving data from a coordinator.
static void coord_worker_detach(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
Callback run by a coordinator when a worker detaches.
fr_coord_t * coord
Coordinator this worker is related to.
fr_coord_cb_inst_t ** cb_inst
Array of callback instance specific data.
void fr_coord_deregister(fr_coord_reg_t *coord_reg)
De-register a coordinator.
void fr_coords_destroy(void)
Clean up coordinators in single threaded mode.
static int8_t coord_cmp(void const *one, void const *two)
Compare coordinators by registration.
fr_control_t * coord_recv_control
Control plane for worker -> coordinator messages.
#define FR_CONTROL_ID_COORD_WORKER_ATTACH
Message sent from worker to attach to a coordinator.
int fr_worker_to_coord_send(fr_coord_worker_t *cw, uint32_t cb_id, fr_dbuff_t *dbuff)
Send data from a worker to a coordinator.
static fr_dlist_head_t * coord_regs
fr_coord_cb_reg_t * callbacks
Array of callbacks for worker -> coordinator messages.
int fr_coord_post_event_insert(fr_event_list_t *el)
Insert instance specific post-event callbacks.
int fr_coord_pre_event_insert(fr_event_list_t *el)
Insert instance specific pre-event callbacks.
uint32_t num_callbacks
Number of callbacks registered.
fr_coord_cb_reg_t * coord_cb
Callbacks for worker -> coordinator messages.
fr_atomic_queue_t * coord_recv_aq
Atomic queue for worker -> coordinator.
static void * fr_coordinate_thread(void *arg)
Entry point for a coordinator thread.
static void coord_data_recv(void *ctx, void const *data, size_t data_size, fr_time_t now)
Callback for a coordinator receiving data from a worker.
fr_atomic_queue_t ** coord_send_aq
Atomic queues for coordinator -> worker data.
uint32_t num_callbacks
Number of callbacks defined.
bool exiting
Is this coordinator shutting down.
uint32_t worker
Worker ID.
fr_rb_node_t node
Entry in the tree of coordinators.
fr_coord_worker_cb_reg_t * callbacks
Callbacks for coordinator -> worker messages.
fr_event_list_t * el
Coordinator event list.
int fr_coord_start(uint32_t num_workers, fr_sem_t *sem)
Start all registered coordinator threads in multi-threaded mode.
#define FR_CONTROL_ID_COORD_WORKER_ACK
Message sent to worker to acknowledge attach / detach.
#define FR_CONTROL_ID_COORD_DATA
Worker <-> coordinator message to pass data to a callback.
fr_message_set_t ** coord_send_ms
Message sets for coordinator -> worker data.
fr_thread_t thread
common thread information - must be first!
static void fr_coordinate(fr_coord_t *coord)
Run the event loop for a coordinator thread when in multi-threaded mode.
fr_control_t * worker_recv_control
Coordinator -> worker control plane.
bool single_thread
Are we in single thread mode.
char const * name
Name for debugging.
int fr_coord_to_worker_broadcast(fr_coord_t *coord, uint32_t cb_id, fr_dbuff_t *dbuff)
Broadcast data from a coordinator to all workers.
int fr_coord_detach(fr_coord_worker_t *cw, bool exiting)
Signal a coordinator that a worker wants to detach.
fr_ring_buffer_t * worker_send_rb
Ring buffer for worker -> coordinator control plane.
fr_coord_t * coord
The coordinator data structure.
static fr_rb_tree_t coords
uint32_t num_workers
How many workers are attached.
fr_message_set_t * worker_send_ms
Message set for worker -> coordinator messages.
fr_sem_t * sem
For inter-thread signaling.
A coordinator registration.
A coordinator which receives messages from workers.
Control plane message used for workers attaching / detaching to coordinators.
The worker end of worker <-> coordinator communication.
Scheduler specific information for coordinator threads.
fr_coord_cb_reg_t * coord_cb
Callbacks for worker -> coordinator messages.
struct fr_coord_s fr_coord_t
char const * name
Name for this coordinator.
fr_coord_worker_cb_t callback
size_t coord_send_size
Initial ring buffer size for coordinator -> worker data.
struct fr_coord_reg_s fr_coord_reg_t
fr_coord_worker_cb_reg_t * worker_cb
Callbacks for coordinator -> worker messages.
size_t worker_send_size
Initial ring buffer size for worker -> coordinator data.
fr_coord_cb_inst_create_t inst_create
fr_coord_inst_event_cb_t event_cb
Event callback in multi thread mode.
void * inst_data
Instance data.
uint32_t coord_cb_id
Callback ID for this message.
uint32_t worker
Worker ID.
fr_event_status_cb_t event_pre_cb
Pre-event callback in single thread mode.
fr_event_post_cb_t event_post_cb
Post-event callback in single thread mode.
fr_message_t m
Message containing data being sent.
List / data message used between workers and coordinators.
Generic control message used between workers and coordinators.
#define fr_dbuff_used(_dbuff_or_marker)
Return the number of bytes remaining between the start of the dbuff or marker and the current positio...
#define fr_dbuff_init(_out, _start, _len_or_end)
Initialise an dbuff for encoding or decoding.
#define fr_dbuff_buff(_dbuff_or_marker)
Return the underlying buffer in a dbuff or one of marker.
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
#define fr_dlist_foreach(_list_head, _type, _iter)
Iterate over the contents of a list.
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Head of a doubly linked list.
Entry in a doubly linked list.
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq, size_t num_callbacks)
Create a control-plane signaling path.
int fr_control_callback_add(fr_control_t **c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
int fr_control_open(fr_control_t *c)
Open the control-plane signalling path.
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
void fr_control_wait(fr_control_t *c)
Wait for a plane control to become readable.
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
int fr_event_pre_insert(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Add a pre-event callback to the event list.
int fr_event_post_insert(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Add a post-event callback to the event list.
Stores all information relating to an event list.
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
Create a message set.
int fr_message_done(fr_message_t *m)
Mark a message as done.
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
A Message set, composed of message headers and ring buffer data.
uint8_t * data
pointer to the data in the ring buffer
size_t data_size
size of the data in the ring buffer
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
Return how many nodes there are in a tree.
void * fr_rb_iter_init_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Initialise an in-order iterator.
void fr_rb_iter_delete_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Remove the current node from the tree.
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
void * fr_rb_iter_next_inorder(UNUSED fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Return the next node.
struct fr_rb_tree_s fr_rb_tree_t
#define fr_rb_inline_talloc_init(_tree, _type, _field, _data_cmp, _data_free)
Initialises a red black that verifies elements are of a specific talloc type.
uint32_t num_elements
How many elements are inside the tree.
Iterator structure for in-order traversal of an rbtree.
The main red black tree structure.
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
static _Thread_local int worker_id
Internal ID of the current worker thread.
int fr_schedule_worker_id(void)
Return the worker id for the current thread.
static const uchar sc[16]
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
void fr_thread_start(fr_thread_t *thread, fr_sem_t *sem)
Signal the parent that we're done.
int fr_thread_wait_list(fr_sem_t *sem, fr_dlist_head_t *head)
Wait for multiple threads to signal readiness via a semaphore.
int fr_thread_create(pthread_t *thread, fr_thread_entry_t func, void *arg)
Create a joinable thread.
int fr_thread_setup(fr_thread_t *out, char const *name)
Common setup for child threads: block signals, allocate a talloc context, and create an event list.
int fr_thread_instantiate(TALLOC_CTX *ctx, fr_event_list_t *el)
Instantiate thread-specific data for modules, virtual servers, xlats, unlang, and TLS.
void fr_thread_detach(void)
Detach thread-specific data for modules, virtual servers, xlats.
void fr_thread_exit(fr_thread_t *thread, fr_thread_status_t status, fr_sem_t *sem)
Signal the parent that we're done.
fr_thread_status_t
Track the child thread status.
@ FR_THREAD_EXITED
exited, and in the exited queue
@ FR_THREAD_FAIL
failed, and in the exited queue
static fr_event_list_t * el
#define fr_strerror_const(_msg)