25RCSID(
"$Id: d116ad620a109a0a53505763b64f93e24cac018d $")
27#include <freeradius-devel/internal/internal.h>
28#include <freeradius-devel/io/listen.h>
29#include <freeradius-devel/io/coord_pair.h>
30#include <freeradius-devel/io/coord_priv.h>
31#include <freeradius-devel/server/main_config.h>
32#include <freeradius-devel/unlang/base.h>
34static _Atomic(uint64_t) request_number = 0;
46struct fr_coord_pair_reg_s {
99 TALLOC_FREE(coord_pair_regs);
100 TALLOC_FREE(coord_pair_modules);
123 if (!attr_worker_id) {
125 if (!attr_worker_id) {
126 ERROR(
"Failed to resolve Worker-Id attribute");
131 if (!coord_pair_regs) {
139 .root = reg_ctx->
root,
140 .cb_id = reg_ctx->
cb_id,
146 if (cb_reg->
packet_type > coord_pair_reg->max_packet_type) {
147 coord_pair_reg->max_packet_type = cb_reg->
packet_type;
156 fr_assert(coord_pair_reg->max_packet_type <= 256);
159 coord_pair_reg->max_packet_type + 1));
163 coord_pair_reg->callbacks[cb_reg->
packet_type] = cb_reg;
181 if (
cf_section_parse(coord_pair_reg, &coord_pair_reg->reuse, cs) < 0)
goto fail;
186 if (coord_pair_reg->reuse.child_pool_size == 0) coord_pair_reg->reuse.child_pool_size =
REQUEST_POOL_SIZE;
187 if (coord_pair_reg->reuse.num_children == 0) coord_pair_reg->reuse.num_children =
REQUEST_POOL_HEADERS;
191 cf_log_err(reg_ctx->
cs,
"Missing virtual_server option");
196 if (!coord_pair_reg->vs) {
205 cf_log_err(cp,
"Virtual server has namespace %s, should be %s",
215 return coord_pair_reg;
223 return coord_pair_reg->cb_id;
250 REDEBUG(
"Request has reached max_request_time - signalling it to stop");
269 RERROR(
"Failed to create request timeout timer");
284 RERROR(
"Failed to set request timeout");
288 RDEBUG3(
"Time tracking started in yielded state");
303 RDEBUG3(
"Time tracking ended");
308 TALLOC_FREE(request->timeout);
312static inline CC_HINT(always_inline)
318 request->packet->timestamp = now;
319 request->async = talloc_zero(request,
fr_async_t);
320 request->async->recv_time = now;
321 request->async->el =
el;
322 request->async->packet_ctx = packet_ctx;
326static inline CC_HINT(always_inline)
331 request->name =
talloc_asprintf(request,
"Coord-%"PRIu64, request->number);
347 request = request_slab_reserve(coord_pair->
slab);
349 ERROR(
"Coordinator failed allocating new request");
359 ERROR(
"Coordinator failed initializing new request");
361 request_slab_release(request);
375 if (
fr_pair_append_by_da(request->request_ctx, &
vp, &request->request_pairs, attr_worker_id) < 0)
goto error;
381 RERROR(
"Failed decoding packet");
391 request->packet->code =
vp->vp_uint32;
394 RERROR(
"Protocol failed to set 'process' function");
409 request->packet->timestamp = now;
410 request->async = talloc_zero(request,
fr_async_t);
411 request->async->recv_time = now;
412 request->async->el = coord_pair->
el;
455 TALLOC_FREE(request->timeout);
469 RDEBUG4(
"%s - Request detaching", __FUNCTION__);
478 RDEBUG3(
"Forcing time tracking to running state, from yielded, for request detach");
485 RDEBUG3(
"Request is detached");
498 RDEBUG4(
"%s - Request marked as runnable", __FUNCTION__);
507 RDEBUG4(
"%s - Request yielded", __FUNCTION__);
516 RDEBUG4(
"%s - Request resuming", __FUNCTION__);
535 RDEBUG4(
"%s - Request priority changed", __FUNCTION__);
551 ret =
CMP(b->priority, a->priority);
552 if (ret != 0)
return ret;
554 return CMP(a->sequence, b->sequence);
560 bool single_thread,
void *uctx)
568 .coord_pair_reg = coord_pair_reg,
603 if (!coord_pair->
intp)
goto fail;
605 if (!(coord_pair->
slab = request_slab_list_alloc(coord_pair,
el, &coord_pair_reg->reuse, NULL, NULL,
606 coord_pair,
true,
false))) {
643 return request ? 1 : 0;
693 PERROR(
"Failed to decode data as pair list");
700 ERROR(
"Message received without %s", coord_pair_reg->attr_packet_type->name);
704 if (
vp->vp_uint32 > coord_pair_reg->max_packet_type || !coord_pair_reg->callbacks[
vp->vp_uint32]) {
705 ERROR(
"Message received with invalid value %pP",
vp);
709 coord_pair_reg->callbacks[
vp->vp_uint32]->callback(cw, coord_pair_reg, &list, now,
710 coord_pair_reg->callbacks[
vp->vp_uint32]->uctx);
727 fr_dbuff_uctx_talloc_t tctx;
758 fr_dbuff_uctx_talloc_t tctx;
784 bool single_thread,
void *uctx)
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
int cf_section_parse(TALLOC_CTX *ctx, void *base, CONF_SECTION *cs)
Parse a configuration section into user-supplied variables.
#define CONF_PARSER_TERMINATOR
#define cf_section_rules_push(_cs, _rule)
Defines a CONF_PAIR to C data type mapping.
Configuration AVP similar to a fr_pair_t.
A section grouping multiple CONF_PAIR.
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
CONF_PAIR * cf_pair_find(CONF_SECTION const *cs, char const *attr)
Search for a CONF_PAIR with a specific name.
char const * cf_pair_value(CONF_PAIR const *pair)
Return the value of a CONF_PAIR.
#define cf_log_err(_cf, _fmt,...)
#define cf_section_alloc(_ctx, _parent, _name1, _name2)
int fr_coord_to_worker_send(fr_coord_t *coord, uint32_t worker_id, uint32_t cb_id, fr_dbuff_t *dbuff)
Send generic data from a coordinator to a worker.
int fr_worker_to_coord_send(fr_coord_worker_t *cw, uint32_t cb_id, fr_dbuff_t *dbuff)
Send data from a worker to a coordinator.
fr_event_list_t * el
Coordinator event list.
A coordinator which receives messages from workers.
The worker end of worker <-> coordinator communication.
struct fr_coord_cb_inst_s fr_coord_cb_inst_t
static void coord_pair_request_time_tracking_end(fr_coord_pair_t *coord_pair, request_t *request, fr_time_t now)
End time tracking for a request.
static int _coord_pair_reg_free(fr_coord_pair_reg_t *to_free)
Remove a coord pair registration from the list when it is freed.
static void _coord_pair_request_resume(request_t *request, UNUSED void *uctx)
Interpreter is starting to work on request again.
fr_timer_list_t * timeout
Track when requests timeout using a dlist.
fr_time_tracking_t tracking
How much time the coordinator has spent doing things.
static void _coord_pair_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Internal request (i.e.
fr_coord_pair_reg_t * coord_pair_reg
Registration details for this coord pair.
void fr_coord_worker_pair_data_recv(fr_coord_worker_t *cw, fr_dbuff_t *dbuff, fr_time_t now, void *uctx)
Callback run when a worker receives pair list data.
int fr_coord_to_worker_reply_send(request_t *request, uint32_t worker_id)
Send a reply list from a coordinator to a worker.
static void _coord_pair_request_internal_init(request_t *request, void *uctx)
static void _coord_pair_request_yield(request_t *request, UNUSED void *uctx)
Interpreter yielded request.
int fr_worker_to_coord_pair_send(fr_coord_worker_t *cw, fr_coord_pair_reg_t *coord_pair_reg, fr_pair_list_t *list)
Send a pair list from a worker to a coordinator.
static int _coord_pair_request_deinit(request_t *request, UNUSED void *uctx)
fr_time_delta_t predicted
How long we predict a request will take to execute.
uint32_t fr_coord_pair_reg_cb_id(fr_coord_pair_reg_t *coord_pair_reg)
Return the coordinator callback ID associated with a coord_pair_reg_t.
void * uctx
Source specific ctx.
static bool _coord_pair_request_scheduled(request_t const *request, UNUSED void *uctx)
Check if a request is scheduled.
static int coord_pair_request_time_tracking_start(fr_coord_pair_t *coord_pair, request_t *request, fr_time_t now)
Start time tracking for a request, and mark it as runnable.
fr_coord_pair_reg_t * fr_coord_pair_register(TALLOC_CTX *ctx, fr_coord_pair_reg_ctx_t *reg_ctx)
Register a set of callbacks for pair list based coordinator messages.
fr_coord_t * coord
Coordinator which this coord pair is attached to.
static void fr_coord_pair_event(UNUSED fr_event_list_t *el, void *uctx)
Event callback in multi threaded mode.
static void coord_pair_request_name_number(request_t *request)
static void _coord_pair_request_detach(request_t *request, void *uctx)
Make us responsible for running the request.
fr_coord_pair_t * coord_pair
Coordinator pair this packet is for.
unlang_interpret_t * intp
Interpreter for running requests.
static int8_t coord_pair_runnable_cmp(void const *one, void const *two)
Compare two requests by priority and sequence.
static void _coord_pair_request_prioritise(request_t *request, void *uctx)
Update a request's priority.
fr_coord_cb_inst_t * fr_coord_pair_inst_create(TALLOC_CTX *ctx, fr_coord_t *coord, fr_event_list_t *el, bool single_thread, void *uctx)
Plugin creation called during coordinator creation.
static int fr_coord_pair_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
fr_heap_t * runnable
Current runnable requests.
static int fr_coord_pair_request_timeout_set(fr_coord_pair_t *coord_pair, request_t *request, fr_time_delta_t timeout)
Set, or re-set the request timer.
static void _coord_pair_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
Detached request (i.e.
static void coord_pair_request_init(fr_event_list_t *el, request_t *request, fr_time_t now, void *packet_ctx)
static void coord_pair_request_bootstrap(fr_coord_pair_t *coord_pair, uint32_t worker_id, fr_dbuff_t *dbuff, fr_time_t now, void *uctx)
static void coord_run_request(fr_coord_pair_t *coord_pair, fr_time_t start)
fr_event_list_t * el
Event list for interpreter.
static void coord_pair_stop_request(request_t *request)
Signal the unlang interpreter that it needs to stop running the request.
static const conf_parser_t request_reuse_config[]
Conf parser to read slab settings from module config.
static void _coord_pair_request_done_external(UNUSED request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
External request is now complete - will never happen with coordinators.
static fr_coord_pair_t * fr_coord_pair_create(TALLOC_CTX *ctx, fr_coord_t *coord, fr_event_list_t *el, bool single_thread, void *uctx)
Create the coord_pair coord instance data.
static void _coord_pair_request_timeout(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t when, void *uctx)
Enforce max_request_time.
uint64_t num_active
Number of active requests.
void fr_coord_pair_data_recv(UNUSED fr_coord_t *coord, uint32_t worker_id, fr_dbuff_t *dbuff, fr_time_t now, void *inst, void *uctx)
Callback run when a coordinator receives pair list data.
static void fr_coord_pair_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
request_slab_list_t * slab
slab allocator for request_t
static void _coord_pair_request_runnable(request_t *request, void *uctx)
Request is now runnable.
Packet context used when coordinator messages are processed through an interpreter.
struct fr_coord_pair_s fr_coord_pair_t
fr_coord_worker_pair_cb_reg_t * worker_cb
Callbacks for coordinator -> worker pair messages.
uint32_t packet_type
Packet type value for this callback.
fr_time_delta_t max_request_time
Maximum time for coordinator request processing.
fr_dict_attr_t const * root
Root attribute for decoding pair list messages.
CONF_SECTION * cs
Module conf section.
fr_coord_worker_pair_cb_t callback
Function to call.
struct fr_coord_pair_reg_s fr_coord_pair_reg_t
uint32_t cb_id
Coordinator callback id used for pair list messages.
void * inst_data
Instance data.
fr_event_status_cb_t event_pre_cb
Pre-event callback in single thread mode.
static void fr_dbuff_free_talloc(fr_dbuff_t *dbuff)
Free the talloc buffer associated with a dbuff.
static fr_dbuff_t * fr_dbuff_init_talloc(TALLOC_CTX *ctx, fr_dbuff_t *dbuff, fr_dbuff_uctx_talloc_t *tctx, size_t init, size_t max)
Initialise a special dbuff which automatically extends as additional data is written.
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
static fr_dict_attr_t const * attr_packet_type
fr_dict_attr_t const * fr_dict_attr_by_name(fr_dict_attr_err_t *err, fr_dict_attr_t const *parent, char const *attr))
Locate a fr_dict_attr_t by its name.
fr_dict_attr_t const * fr_dict_root(fr_dict_t const *dict)
Return the root attribute of a dictionary.
fr_dict_t const * fr_dict_internal(void)
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
static void fr_dlist_entry_init(fr_dlist_t *entry)
Initialise a linked list without metadata.
Head of a doubly linked list.
Entry in a doubly linked list.
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
int fr_heap_extract(fr_heap_t **hp, void *data)
Remove a node from the heap.
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
static bool fr_heap_entry_inserted(fr_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
rlm_rcode_t unlang_interpret(request_t *request, bool running)
Run the interpreter for a current request.
void unlang_interpret_set(request_t *request, unlang_interpret_t *intp)
Set a specific interpreter for a request.
void unlang_interpret_set_thread_default(unlang_interpret_t *intp)
Set the default interpreter for this thread.
unlang_interpret_t * unlang_interpret_init(TALLOC_CTX *ctx, fr_event_list_t *el, unlang_request_func_t *funcs, void *uctx)
Initialize a unlang compiler / interpret.
void unlang_interpret_signal(request_t *request, fr_signal_t action)
Send a signal (usually stop) to a request.
#define UNLANG_REQUEST_RESUME
External functions provided by the owner of the interpret.
Minimal data structure to use the new code.
#define RPEDEBUG(fmt,...)
Stores all information relating to an event list.
fr_packet_t * fr_packet_alloc(TALLOC_CTX *ctx, bool new_vector)
Allocate a new fr_packet_t.
main_config_t const * main_config
Main server configuration.
fr_worker_config_t worker
Worker thread configuration.
int fr_pair_append_by_da(TALLOC_CTX *ctx, fr_pair_t **out, fr_pair_list_t *list, fr_dict_attr_t const *da)
Alloc a new fr_pair_t (and append)
fr_pair_t * fr_pair_find_by_da_nested(fr_pair_list_t const *list, fr_pair_t const *prev, fr_dict_attr_t const *da)
Find a pair with a matching fr_dict_attr_t, by walking the nested fr_dict_attr_t tree.
fr_pair_t * fr_pair_find_by_da(fr_pair_list_t const *list, fr_pair_t const *prev, fr_dict_attr_t const *da)
Find the first pair with a matching da.
void fr_pair_list_init(fr_pair_list_t *list)
Initialise a pair list header.
ssize_t fr_internal_decode_list_dbuff(TALLOC_CTX *ctx, fr_pair_list_t *out, fr_dict_attr_t const *parent, fr_dbuff_t *dbuff, void *decode_ctx)
Retrieve all pairs from the dbuff.
ssize_t fr_internal_encode_list(fr_dbuff_t *dbuff, fr_pair_list_t const *list, void *encode_ctx)
Encode a list of pairs using the internal encoder.
rlm_rcode_t
Return codes indicating the result of the module call.
@ RLM_MODULE_TIMEOUT
Module (or section) timed out.
int request_slab_deinit(request_t *request)
Callback for slabs to deinitialise the request.
int request_detach(request_t *child)
Unlink a subrequest from its parent.
#define REQUEST_VERIFY(_x)
#define request_is_detached(_x)
#define REQUEST_POOL_HEADERS
#define request_is_internal(_x)
@ REQUEST_TYPE_INTERNAL
A request generated internally.
#define request_is_detachable(_x)
#define request_init(_ctx, _type, _args)
#define REQUEST_POOL_SIZE
Optional arguments for initialising requests.
static _Thread_local int worker_id
Internal ID of the current worker thread.
@ FR_SIGNAL_CANCEL
Request has been cancelled.
#define FR_SLAB_FUNCS(_name, _type)
Define type specific wrapper functions for slabs and slab elements.
#define FR_SLAB_TYPES(_name, _type)
Define type specific wrapper structs for slabs and slab elements.
#define FR_SLAB_CONFIG_CONF_PARSER
conf_parser_t entries to populate user configurable slab values
Tuneable parameters for slabs.
module_list_t * module_list_alloc(TALLOC_CTX *ctx, module_list_type_t const *type, char const *name, bool write_protect)
Allocate a new module list.
module_list_type_t const module_list_type_global
Callbacks for a global module list.
eap_aka_sim_process_conf_t * inst
#define atomic_fetch_add_explicit(object, operand, order)
Stores an attribute, a value and various bits of other data.
static int talloc_const_free(void const *ptr)
Free const'd memory.
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
#define fr_time_delta_lt(_a, _b)
#define fr_time_delta_eq(_a, _b)
#define fr_time_sub(_a, _b)
Subtract one time from another.
A time delta, a difference in time measured in nanoseconds.
@ FR_TIME_TRACKING_YIELDED
We're currently tracking time in the yielded state.
static void fr_time_tracking_yield(fr_time_tracking_t *tt, fr_time_t now)
Transition to the yielded state, recording the time we just spent running.
static void fr_time_tracking_end(fr_time_delta_t *predicted, fr_time_tracking_t *tt, fr_time_t now)
End time tracking for this entity.
static void fr_time_tracking_start(fr_time_tracking_t *parent, fr_time_tracking_t *tt, fr_time_t now)
Start time tracking for a tracked entity.
static void fr_time_tracking_resume(fr_time_tracking_t *tt, fr_time_t now)
Track that a request resumed.
fr_timer_list_t * fr_timer_list_ordered_alloc(TALLOC_CTX *ctx, fr_timer_list_t *parent)
Allocate a new sorted event timer list.
static bool fr_timer_armed(fr_timer_t *ev)
static fr_event_list_t * el
void fr_pair_list_free(fr_pair_list_t *list)
Free memory used by a valuepair list.
#define fr_strerror_const(_msg)
unlang_action_t virtual_server_push(unlang_result_t *p_result, request_t *request, virtual_server_t const *vs, bool top_frame)
Set the request processing function.
virtual_server_t const * virtual_server_find(char const *name)
Return virtual server matching the specified name.
CONF_SECTION * virtual_server_cs(virtual_server_t const *vs)
Return the configuration section for a virtual server.
fr_dict_t const * virtual_server_dict_by_cs(CONF_SECTION const *server_cs)
Return the namespace for the virtual server specified by a config section.
fr_dict_attr_t const * virtual_server_packet_type_by_cs(CONF_SECTION const *server_cs)
Return the packet type attribute for a virtual server specified by a config section.
fr_time_delta_t max_request_time
maximum time a request can be processed