26RCSID(
"$Id: d95dfd206454a23f94d9a1dd9eaa01bd4e802a7c $")
28#define LOG_PREFIX pool->log_prefix
30#include <freeradius-devel/server/main_config.h>
31#include <freeradius-devel/server/modpriv.h>
32#include <freeradius-devel/server/trigger.h>
34#include <freeradius-devel/util/debug.h>
36#include <freeradius-devel/util/heap.h>
37#include <freeradius-devel/util/misc.h>
211 this->prev->next = this->next;
214 pool->
head = this->next;
218 this->next->prev = this->prev;
221 pool->
tail = this->prev;
224 this->prev = this->next = NULL;
245 this->next = pool->
head;
293 if (!pool || !conn)
return NULL;
295 pthread_mutex_lock(&pool->
mutex);
302 for (
this = pool->
head;
this != NULL;
this = this->next) {
303 if (this->connection == conn) {
305 pthread_t pthread_id;
307 pthread_id = pthread_self();
308 fr_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
316 pthread_mutex_unlock(&pool->
mutex);
357 pthread_mutex_lock(&pool->
mutex);
364 pthread_mutex_unlock(&pool->
mutex);
366 ERROR(
"Cannot open new connection, already at max");
376 bool complain =
false;
384 pthread_mutex_unlock(&pool->
mutex);
387 ERROR(
"Last connection attempt failed, waiting %pV seconds before retrying",
398 pthread_mutex_unlock(&pool->
mutex);
401 "Cannot open a new connection due to rate limit after failure");
431 pthread_mutex_unlock(&pool->
mutex);
437 ctx = talloc_init(
"connection_ctx");
438 if (!ctx)
return NULL;
448 ERROR(
"Opening connection failed (%" PRIu64
")", number);
451 pthread_mutex_lock(&pool->
mutex);
461 pthread_mutex_unlock(&pool->
mutex);
472 pthread_mutex_lock(&pool->
mutex);
477 pthread_mutex_unlock(&pool->
mutex);
486 this->connection = conn;
487 this->in_use = in_use;
489 this->number = number;
490 this->last_reserved =
fr_time();
491 this->last_released = this->last_reserved;
532 if (unlock) pthread_mutex_unlock(&pool->
mutex);
556 pthread_t pthread_id = pthread_self();
557 fr_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
560 this->in_use =
false;
605 if (this->in_use)
return 1;
607 if (this->needs_reconnecting) {
619 (this->num_uses >= pool->
max_uses)) {
634 ROPTIONAL(
RINFO,
INFO,
"Closing connection (%" PRIu64
"): Hit idle_timeout, was idle for %pVs",
664 pthread_mutex_unlock(&pool->
mutex);
683 if (num < pool->
min) {
692 if (num > pool->
max) {
702 goto close_connection;
714 if (spare == pool->
spare)
goto manage_connections;
719 if (spare > pool->
spare) {
731 if (num <= pool->
min)
goto manage_connections;
750 for (
this = pool->
tail;
this != NULL;
this = this->prev) {
751 if (this->in_use)
continue;
770 goto manage_connections;
776 if (spare < pool->spare) {
789 if (num >= pool->
max)
goto manage_connections;
801 pthread_mutex_unlock(&pool->
mutex);
803 pthread_mutex_lock(&pool->
mutex);
804 goto manage_connections;
812 for (
this = pool->
head;
this != NULL;
this = next) {
820 pthread_mutex_unlock(&pool->
mutex);
841 if (!pool)
return NULL;
843 pthread_mutex_lock(&pool->
mutex);
867 bool complain =
false;
877 pthread_mutex_unlock(&pool->
mutex);
879 ERROR(
"No connections available and at max connection limit");
890 pthread_mutex_unlock(&pool->
mutex);
892 if (!spawn)
return NULL;
901 if (!
this)
return NULL;
906 this->last_reserved =
fr_time();
910 this->pthread_id = pthread_self();
912 pthread_mutex_unlock(&pool->
mutex);
916 return this->connection;
942 if (!trigger_args)
return;
971 char const *log_prefix)
975 if (!cs || !opaque || !c)
return NULL;
990 PERROR(
"%s: Failed linking pool ctx", __FUNCTION__);
1045 ERROR(
"%s: Failed creating connection heap", __FUNCTION__);
1052 pthread_mutex_init(&pool->
mutex, NULL);
1056 DEBUG2(
"Initialising connection pool");
1060 PERROR(
"Configuration parsing failed");
1067 if (pool->
max == 0) {
1081 if (pool->
min > pool->
max) {
1082 cf_log_err(cs,
"Cannot set 'min' to more than 'max'");
1130 for (i = 0; i < pool->
start; i++) {
1137 ERROR(
"Failed spawning initial connections");
1161 if (!copy)
return NULL;
1175 return &pool->
state;
1250 pthread_mutex_lock(&pool->
mutex);
1272 for (i = 0; i < pool->
start; i++) {
1283 for (
this = pool->
head;
this;
this = this->next) this->needs_reconnecting =
true;
1304 pthread_mutex_unlock(&pool->
mutex);
1311 for (i = 0; i < pool->
start; i++) {
1313 if (!
this)
return -1;
1339 if (pool->
ref > 0) {
1344 DEBUG2(
"Removing connection pool");
1346 pthread_mutex_lock(&pool->
mutex);
1352 while ((
this = pool->
head) != NULL) {
1353 INFO(
"Closing connection (%" PRIu64
")", this->number);
1364 pthread_mutex_unlock(&pool->
mutex);
1365 pthread_mutex_destroy(&pool->
mutex);
1411 bool trigger_min =
false, trigger_max =
false;
1416 this->in_use =
false;
1421 this->last_released =
fr_time();
1428 held =
fr_time_sub(this->last_released, this->last_reserved);
1504 if (!pool || !conn)
return NULL;
1510 if (!
this)
return NULL;
1542 if (!
this)
return 0;
#define UNCONST(_type, _ptr)
Remove const qualification from a pointer.
int cf_section_parse(TALLOC_CTX *ctx, void *base, CONF_SECTION *cs)
Parse a configuration section into user-supplied variables.
#define CONF_PARSER_TERMINATOR
#define FR_INTEGER_BOUND_CHECK(_name, _var, _op, _bound)
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
#define FR_TIME_DELTA_COND_CHECK(_name, _var, _cond, _new)
#define cf_section_rules_push(_cs, _rule)
char const * name1
Name of the CONF_ITEM to parse.
#define FR_TIME_DELTA_BOUND_CHECK(_name, _var, _op, _bound)
Defines a CONF_PAIR to C data type mapping.
Configuration AVP similar to a fr_pair_t.
A section grouping multiple CONF_PAIR.
CONF_PAIR * cf_pair_alloc(CONF_SECTION *parent, char const *attr, char const *value, fr_token_t op, fr_token_t lhs_quote, fr_token_t rhs_quote)
Allocate a CONF_PAIR.
#define cf_log_err(_cf, _fmt,...)
static size_t min(size_t x, size_t y)
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
int fr_heap_extract(fr_heap_t **hp, void *data)
Remove a node from the heap.
unsigned int fr_heap_index_t
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
unlang_interpret_t * unlang_interpret_get_thread_default(void)
Get the default interpreter for this thread.
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
#define RATE_LIMIT_GLOBAL_ROPTIONAL(_l_request, _l_global, _fmt,...)
Rate limit messages using a global limiting entry.
static bool fr_rate_limit_enabled(void)
Whether rate limiting is enabled.
main_config_t const * main_config
Main server configuration.
uint32_t max_workers
for the scheduler
int fr_pair_list_copy(TALLOC_CTX *ctx, fr_pair_list_t *to, fr_pair_list_t const *from)
Duplicate a list of pairs.
void fr_pair_list_init(fr_pair_list_t *list)
Initialise a pair list header.
fr_pool_reconnect_t reconnect
Called during connection pool reconnect.
fr_time_delta_t fr_pool_timeout(fr_pool_t *pool)
Connection pool get timeout.
static void connection_link_head(fr_pool_t *pool, fr_pool_connection_t *this)
Adds a connection to the head of the connection list.
int fr_pool_start(fr_pool_t *pool)
bool spread
If true we spread requests over the connections, using the connection released longest ago,...
uint32_t max_pending
Max number of pending connections to allow.
fr_time_delta_t delay_interval
When we next do a cleanup.
static int8_t last_released_cmp(void const *one, void const *two)
Order connections by released longest ago.
fr_time_t last_reserved
Last time the connection was reserved.
fr_heap_t * heap
For the next connection heap.
fr_time_delta_t held_trigger_min
If a connection is held for less than the specified period, fire a trigger.
fr_pool_connection_create_t create
Function used to create new connections.
uint32_t start
Number of initial connections.
fr_time_delta_t idle_timeout
How long a connection can be idle before being closed.
static void connection_unlink(fr_pool_t *pool, fr_pool_connection_t *this)
Removes a connection from the connection list.
static int connection_check(fr_pool_t *pool, request_t *request)
Check whether any connections need to be removed from the pool.
void fr_pool_connection_release(fr_pool_t *pool, request_t *request, void *conn)
Release a connection.
uint32_t min
Minimum number of concurrent connections to keep open.
fr_heap_index_t heap_id
For the next connection heap.
uint64_t number
Unique ID assigned when the connection is created, these will monotonically increase over the lifetim...
CONF_SECTION const * cs
Configuration section holding the section of parsed config file that relates to this pool.
void fr_pool_free(fr_pool_t *pool)
Delete a connection pool.
static fr_pool_connection_t * connection_find(fr_pool_t *pool, void *conn)
Find a connection handle in the connection list.
fr_pool_state_t const * fr_pool_state(fr_pool_t *pool)
Get the number of connections currently in the pool.
static int max_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
void fr_pool_ref(fr_pool_t *pool)
Increment pool reference by one.
uint32_t pending_window
Sliding window of pending connections.
static void connection_close_internal(fr_pool_t *pool, fr_pool_connection_t *this)
Close an existing connection.
char const * trigger_prefix
Prefix to prepend to names of all triggers fired by the connection pool code.
fr_time_delta_t cleanup_interval
Initial timer for how often we sweep the pool for free connections.
void * connection
Pointer to whatever the module uses for a connection handle.
int fr_pool_reconnect(fr_pool_t *pool, request_t *request)
Mark connections for reconnection, and spawn at least 'start' connections.
fr_pool_connection_t * tail
End of the connection list.
fr_pool_t * fr_pool_init(TALLOC_CTX *ctx, CONF_SECTION const *cs, void *opaque, fr_pool_connection_create_t c, fr_pool_connection_alive_t a, char const *log_prefix)
Create a new connection pool.
bool needs_reconnecting
Reconnect this connection before use.
fr_pool_t * fr_pool_copy(TALLOC_CTX *ctx, fr_pool_t *pool, void *opaque)
Allocate a new pool using an existing one as a template.
static int connection_manage(fr_pool_t *pool, request_t *request, fr_pool_connection_t *this, fr_time_t now)
Check whether a connection needs to be removed from the pool.
fr_time_delta_t held_trigger_max
If a connection is held for longer than the specified period, fire a trigger.
fr_pool_connection_alive_t alive
Function used to check status of connections.
uint32_t num_uses
Number of times the connection has been reserved.
bool triggers_enabled
Whether we call the trigger functions.
fr_time_delta_t connect_timeout
New connection timeout, enforced by the create callback.
int fr_pool_connection_close(fr_pool_t *pool, request_t *request, void *conn)
Delete a connection from the connection pool.
uint32_t spare
Number of spare connections to try.
static void fr_pool_trigger_exec(fr_pool_t *pool, char const *event)
Send a connection pool trigger.
uint64_t max_uses
Maximum number of times a connection can be used before being closed.
void * fr_pool_connection_get(fr_pool_t *pool, request_t *request)
Reserve a connection in the connection pool.
void const * fr_pool_opaque(fr_pool_t *pool)
Return the opaque data associated with a connection pool.
pthread_cond_t done_reconnecting
Before calling the create callback, threads should block on this condition if reconnecting == true.
fr_time_delta_t lifetime
How long a connection can be open before being closed (irrespective of whether it's idle or not).
pthread_mutex_t mutex
Mutex used to keep consistent state when making modifications in threaded mode.
uint32_t max
Maximum number of concurrent connections to allow.
fr_pool_connection_t * head
Start of the connection list.
int ref
Reference counter to prevent connection pool being freed multiple times.
int fr_pool_start_num(fr_pool_t *pool)
Connection pool get start.
fr_pool_state_t state
Stats and state of the connection pool.
void fr_pool_enable_triggers(fr_pool_t *pool, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Enable triggers for a connection pool.
fr_pair_list_t trigger_args
Arguments to make available in connection pool triggers.
fr_time_t created
Time connection was created.
bool in_use
Whether the connection is currently reserved.
fr_time_delta_t retry_delay
seconds to delay re-open after a failed open.
static int8_t last_reserved_cmp(void const *one, void const *two)
Order connections by reserved most recently.
pthread_cond_t done_spawn
Threads that need to ensure no spawning is in progress, should block on this condition if pending !...
static void * connection_get_internal(fr_pool_t *pool, request_t *request, bool spawn)
Get a connection from the connection pool.
fr_pool_connection_t * prev
Previous connection in list.
void fr_pool_reconnect_func(fr_pool_t *pool, fr_pool_reconnect_t reconnect)
Set a reconnection callback for the connection pool.
static const conf_parser_t pool_config[]
fr_pool_connection_t * next
Next connection in list.
void * opaque
Pointer to context data that will be passed to callbacks.
static fr_pool_connection_t * connection_spawn(fr_pool_t *pool, request_t *request, fr_time_t now, bool in_use, bool unlock)
Spawns a new connection.
fr_time_t last_released
Time the connection was released.
void * fr_pool_connection_reconnect(fr_pool_t *pool, request_t *request, void *conn)
Reconnect a suspected inviable connection.
char const * log_prefix
Log prefix to prepend to all log messages created by the connection pool code.
An individual connection within the connection pool.
fr_time_t last_failed
Last time we tried to spawn a connection but failed.
fr_time_t last_closed
Last time a connection was closed.
fr_time_t last_throttled
Last time we refused to spawn a connection because the last connection failed, or we were already spa...
uint32_t active
Number of currently reserved connections.
fr_time_t last_released
Last time a connection was released.
fr_time_delta_t next_delay
The next delay time.
fr_time_t last_checked
Last time we pruned the connection pool.
void(* fr_pool_reconnect_t)(fr_pool_t *pool, void *opaque)
Alter the opaque data of a connection pool during reconnection event.
uint64_t count
Number of connections spawned over the lifetime of the pool.
fr_time_t last_held_min
Last time we warned about a low latency event.
fr_time_t last_at_max
Last time we hit the maximum number of allowed connections.
void *(* fr_pool_connection_create_t)(TALLOC_CTX *ctx, void *opaque, fr_time_delta_t timeout)
Create a new connection handle.
uint32_t pending
Number of pending open connections.
fr_time_t last_spawned
Last time we spawned a connection.
fr_time_t last_held_max
Last time we warned about a high latency event.
bool reconnecting
We are currently reconnecting the pool.
uint32_t num
Number of connections in the pool.
int(* fr_pool_connection_alive_t)(void *opaque, void *connection)
Check a connection handle is still viable.
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
#define fr_time()
Allow us to arbitrarily manipulate time.
int talloc_link_ctx(TALLOC_CTX *parent, TALLOC_CTX *child)
Link two different parent and child contexts, so the child is freed before the parent.
char * talloc_typed_strdup(TALLOC_CTX *ctx, char const *p)
Call talloc_strdup, setting the type on the new chunk correctly.
static int talloc_const_free(void const *ptr)
Free const'd memory.
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
static fr_time_delta_t fr_time_delta_add(fr_time_delta_t a, fr_time_delta_t b)
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
#define fr_time_delta_lt(_a, _b)
static fr_time_delta_t fr_time_delta_from_sec(int64_t sec)
#define fr_time_delta_wrap(_time)
#define fr_time_wrap(_time)
#define fr_time_delta_ispos(_a)
#define fr_time_delta_lteq(_a, _b)
#define fr_time_add(_a, _b)
Add a time/time delta together.
#define fr_time_gt(_a, _b)
#define fr_time_delta_gteq(_a, _b)
#define fr_time_sub(_a, _b)
Subtract one time from another.
#define fr_time_lt(_a, _b)
#define fr_time_delta_gt(_a, _b)
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
A time delta, a difference in time measured in nanoseconds.
int trigger_exec(unlang_interpret_t *intp, CONF_SECTION const *cs, char const *name, bool rate_limit, fr_pair_list_t *args)
Execute a trigger - call an executable to process an event.
void fr_pair_list_free(fr_pair_list_t *list)
Free memory used by a valuepair list.
#define fr_box_time_delta(_val)
static size_t char ** out