149#include <freeradius-devel/util/debug.h>
150#include <freeradius-devel/server/cf_parse.h>
152#include <freeradius-devel/util/fifo.h>
153#include <freeradius-devel/util/misc.h>
154#include <freeradius-devel/util/rand.h>
162# undef HAVE_REDIS_SSL
166#include <freeradius-devel/tls/strerror.h>
167#include <hiredis/hiredis_ssl.h>
170#define KEY_SLOTS 16384
178#define CLOSED_PERIOD 10000
181#define CLOSED_WEIGHT 1
184#define FAILED_PERIOD 10000
187#define FAILED_WEIGHT 1
190#define RELEASED_PERIOD 10000
194#define RELEASED_MIN_WEIGHT 1000
202 unsigned int cumulative;
302 p = memchr(key,
'{', key_len);
308 q = memchr(p,
'}', key_len - (p - key));
309 if (!q || (q == p + 1))
goto all;
329 if (ret != 0)
return ret;
331 return CMP(a->
addr.inet.dst_port, b->
addr.inet.dst_port);
406 TALLOC_FREE(node->
pool);
447 redisReply *redirect)
454 if (!redirect || (redirect->type != REDIS_REPLY_ERROR)) {
467 if ((
size_t)(q - p) >= (
size_t)redirect->len) {
472 key = strtoul(p, &q, 10);
485 if (
fr_inet_pton_port(&ipaddr, &port, p, redirect->len - (p - redirect->str), AF_UNSPEC,
true,
true) < 0) {
490 if (key_slot) *key_slot = key;
492 node_addr->inet.dst_ipaddr = ipaddr;
493 node_addr->inet.dst_port = port;
548# define SET_ADDR(_addr, _map) \
551 _ret = fr_inet_pton(&_addr.inet.dst_ipaddr, _map->element[0]->str, _map->element[0]->len, AF_UNSPEC, true, true);\
552 fr_assert(_ret == 0);\
553 _addr.inet.dst_port = _map->element[1]->integer; \
556# define SET_ADDR(_addr, _map) \
558 fr_inet_pton(&_addr.inet.dst_ipaddr, _map->element[0]->str, _map->element[0]->len, AF_UNSPEC, true, true);\
559 _addr.inet.dst_port = _map->element[1]->integer; \
563#define SET_INACTIVE(_node) \
565 (_node)->is_active = false; \
566 (_node)->is_master = false; \
567 fr_rb_delete(cluster->used_nodes, _node); \
568 fr_fifo_push(cluster->free_nodes, _node); \
571#define SET_ACTIVE(_node) \
573 (_node)->is_active = true; \
574 fr_rb_insert(cluster->used_nodes, _node); \
575 fr_fifo_pop(cluster->free_nodes); \
576 active[(_node)->id] = true; \
577 rollback[r++] = (_node)->id; \
580 fr_assert(reply->type == REDIS_REPLY_ARRAY);
582 memset(&rollback, 0,
sizeof(rollback));
583 memset(active, 0,
sizeof(active));
584 memset(master, 0,
sizeof(master));
603 for (i = 0; i < reply->elements; i++) {
610 redisReply *map = reply->element[i];
612 memset(&tmpl_slot, 0,
sizeof(tmpl_slot));
617 active[found->
id] =
true;
618 goto reuse_master_node;
643 if (rcode < 0)
goto error;
656 master[found->
id] =
true;
665 for (j = 3; (j < map->elements); j++) {
669 active[found->
id] =
true;
674 if (!spare)
goto out_of_nodes;
683 tmpl_slot.
slave[slaves++] = found->
id;
694 for (k = map->element[0]->integer; k <= map->element[1]->integer; k++) {
736 for (i = 1; i < cluster->
conf->max_nodes; i++) {
756 }
else if (master[i]) {
789 if (node->type != REDIS_REPLY_ARRAY) {
808 if (node->elements < 2) {
809 fr_strerror_printf(
"Cluster map %i node %i has incorrect number of elements, expected at least "
810 "2 got %zu", map_idx, node_idx, node->elements);
814 if (node->element[0]->type != REDIS_REPLY_STRING) {
815 fr_strerror_printf(
"Cluster map %i node %i ip address is wrong type, expected string got %s",
821 if (
fr_inet_pton(&ipaddr, node->element[0]->str, node->element[0]->len, AF_UNSPEC,
true,
true) < 0) {
825 if (node->element[1]->type != REDIS_REPLY_INTEGER) {
826 fr_strerror_printf(
"Cluster map %i node %i port is wrong type, expected integer got %s",
832 if (node->element[1]->integer < 0) {
834 map_idx, node_idx, node->element[1]->integer);
838 if (node->element[1]->integer > UINT16_MAX) {
840 "got %lli", map_idx, node_idx, node->element[1]->integer);
870 reply = redisCommand(conn->
handle,
"cluster slots");
879 if (reply && reply->type == REDIS_REPLY_ERROR) {
891 if (reply->type != REDIS_REPLY_ARRAY) {
892 fr_strerror_printf(
"Bad response to \"cluster slots\" command, expected array got %s",
900 if (reply->elements == 0) {
901 fr_strerror_printf(
"Empty response to \"cluster slots\" command (zero length array)");
908 for (i = 0; i < reply->elements; i++) {
912 map = reply->element[i];
913 if (map->type != REDIS_REPLY_ARRAY) {
921 if (map->elements < 3) {
922 fr_strerror_printf(
"Cluster map %zu has too few elements, expected at least 3, got %zu",
930 if (map->element[0]->type != REDIS_REPLY_INTEGER) {
931 fr_strerror_printf(
"Cluster map %zu key slot start is wrong type, expected integer got %s",
936 if (map->element[0]->integer < 0) {
937 fr_strerror_printf(
"Cluster map %zu key slot start is too low, expected >= 0 got %lli",
938 i, map->element[0]->integer);
942 if (map->element[0]->integer >
KEY_SLOTS) {
951 if (map->element[1]->type != REDIS_REPLY_INTEGER) {
952 fr_strerror_printf(
"Cluster map %zu key slot end is wrong type, expected integer got %s",
957 if (map->element[1]->integer < 0) {
959 i, map->element[1]->integer);
963 if (map->element[1]->integer >
KEY_SLOTS) {
969 if (map->element[1]->integer < map->element[0]->integer) {
971 "Start was %lli, end was %lli", i, map->element[0]->integer,
972 map->element[1]->integer);
984 for (j = 3; j < map->elements; j++) {
1031 ROPTIONAL(
RWARN,
WARN,
"Cluster was updated less than a second ago, ignoring remap request");
1059 for (i = 0; i < map->elements; i++) {
1060 redisReply *map_node = map->element[i];
1063 map_node->element[0]->integer,
1064 map_node->element[1]->integer);
1068 map_node->element[2]->element[0]->str,
1069 map_node->element[2]->element[1]->integer);
1070 for (j = 3; j < map_node->elements; j++) {
1072 map_node->element[j]->element[0]->str,
1073 map_node->element[j]->element[1]->integer);
1084 pthread_mutex_lock(&cluster->
mutex);
1086 pthread_mutex_unlock(&cluster->
mutex);
1091 pthread_mutex_unlock(&cluster->
mutex);
1097 pthread_mutex_unlock(&cluster->
mutex);
1125 memset(&find, 0,
sizeof(find));
1131 pthread_mutex_lock(&cluster->
mutex);
1139 pthread_mutex_unlock(&cluster->
mutex);
1151 pthread_mutex_unlock(&cluster->
mutex);
1156 pthread_mutex_unlock(&cluster->
mutex);
1164 pthread_mutex_unlock(&cluster->
mutex);
1179 pthread_mutex_lock(&cluster->
mutex);
1181 pthread_mutex_unlock(&cluster->
mutex);
1249 reply = redisCommand(conn->
handle,
"PING");
1257 if (reply->type != REDIS_REPLY_STATUS) {
1259 node->
id, node->
name, node->
addr.inet.dst_port,
1331 pthread_mutex_lock(&cluster->
mutex);
1336 if (live->
skip == node->
id)
continue;
1341 pthread_mutex_unlock(&cluster->
mutex);
1344 if (live->
next == 1)
goto no_alts;
1351 for (i = 0; (i < cluster->
conf->max_alt) && live->
next; i++) {
1354 int first, last, pivot;
1355 unsigned int find, cumulative = 0;
1359 for (j = 0; j < live->
next; j++) {
1364 live->
node[j].cumulative = (cumulative += weight);
1371 find = (
fr_rand() & (cumulative - 1));
1373 last = live->
next - 1;
1374 pivot = (first + last) / 2;
1376 while (first <= last) {
1377 if (live->
node[pivot].cumulative < find) {
1379 }
else if (live->
node[pivot].cumulative == find) {
1384 pivot = (first + last) / 2;
1389 if (first > last) pivot = last + 1;
1395 node = &cluster->
node[live->
node[pivot].id];
1402 node->
name, node->
addr.inet.dst_port);
1408 if (pivot == live->
next) {
1412 memcpy(&live->
node[pivot], &live->
node[live->
next - 1],
sizeof(live->
node[pivot]));
1471 redisContext *handle;
1472 redisReply *reply = NULL;
1475 DEBUG2(
"%s - [%i] Connecting to node %s:%i", log_prefix, node->
id, node->
name, node->
addr.inet.dst_port);
1478 if ((handle != NULL) && handle->err) {
1479 ERROR(
"%s - [%i] Connection failed: %s", log_prefix, node->
id, handle->errstr);
1482 }
else if (!handle) {
1483 ERROR(
"%s - [%i] Connection failed", log_prefix, node->
id);
1492#ifdef HAVE_REDIS_SSL
1493 if (node->
cluster->ssl_ctx != NULL) {
1494 fr_tls_session_t *tls_session = fr_tls_session_alloc_client(conn, node->
cluster->ssl_ctx);
1496 fr_tls_strerror_printf(
"%s - [%i]", log_prefix, node->
id);
1497 ERROR(
"%s - [%i] Failed to allocate TLS session", log_prefix, node->
id);
1503 SSL_up_ref(tls_session->ssl);
1504 if (redisInitiateSSL(handle, tls_session->ssl) != REDIS_OK) {
1505 ERROR(
"%s - [%i] Failed to initiate SSL: %s", log_prefix, node->
id, handle->errstr);
1506 SSL_free(tls_session->ssl);
1515 DEBUG3(
"%s - [%i] Executing: AUTH %s %s", log_prefix, node->
id,
1518 reply = redisCommand(handle,
"AUTH %s %s",
1522 DEBUG3(
"%s - [%i] Executing: AUTH %s", log_prefix, node->
id, node->
cluster->
conf->password);
1523 reply = redisCommand(handle,
"AUTH %s", node->
cluster->
conf->password);
1526 ERROR(
"%s - [%i] Failed authenticating: %s", log_prefix, node->
id, handle->errstr);
1533 switch (reply->type) {
1534 case REDIS_REPLY_STATUS:
1535 if (strcmp(reply->str,
"OK") != 0) {
1536 ERROR(
"%s - [%i] Failed authenticating: %s", log_prefix,
1537 node->
id, reply->str);
1543 case REDIS_REPLY_ERROR:
1544 ERROR(
"%s - [%i] Failed authenticating: %s", log_prefix, node->
id, reply->str);
1548 ERROR(
"%s - [%i] Unexpected reply of type %s to AUTH", log_prefix, node->
id,
1555 DEBUG3(
"%s - [%i] Executing: SELECT %i", log_prefix, node->
id, node->
cluster->
conf->database);
1556 reply = redisCommand(handle,
"SELECT %i", node->
cluster->
conf->database);
1558 ERROR(
"%s - [%i] Failed selecting database %i: %s", log_prefix, node->
id,
1563 switch (reply->type) {
1564 case REDIS_REPLY_STATUS:
1565 if (strcmp(reply->str,
"OK") != 0) {
1566 ERROR(
"%s - [%i] Failed selecting database %i: %s", log_prefix, node->
id,
1573 case REDIS_REPLY_ERROR:
1574 ERROR(
"%s - [%i] Failed selecting database %i: %s", log_prefix, node->
id,
1579 ERROR(
"%s - [%i] Unexpected reply of type %s, to SELECT", log_prefix, node->
id,
1603 uint8_t const *key,
size_t key_len)
1607 if (!key || (key_len == 0)) {
1658 if (slave_num >= key_slot->
slave_num)
return NULL;
1660 return &cluster->
node[key_slot->
slave[slave_num]];
1673 if (!node)
return -1;
1675 memcpy(
out, &node->
addr.inet.dst_ipaddr,
sizeof(*
out));
1690 if (!node)
return -1;
1692 *
out = node->
addr.inet.dst_port;
1742 uint8_t const *key,
size_t key_len,
bool read_only)
1747 uint64_t used_nodes;
1753 memset(state, 0,
sizeof(*state));
1757 if (used_nodes == 0) {
1771 for (i = 0; i < key_slot->
slave_num; i++) {
1775 node = &cluster->
node[node_id];
1927 if (state->
retries++ >= cluster->
conf->max_retries) {
2012 if (
new == state->
node) {
2020 state->
node->
addr.inet.dst_port, new->id, new->name, new->addr.inet.dst_port);
2076 .dst_ipaddr = node_addr->inet.dst_ipaddr,
2077 .dst_port = node_addr->inet.dst_port,
2081 pthread_mutex_lock(&cluster->
mutex);
2085 char buffer[INET6_ADDRSTRLEN];
2086 char const *hostname;
2089 pthread_mutex_unlock(&cluster->
mutex);
2091 hostname =
inet_ntop(node_addr->inet.dst_ipaddr.
af, &node_addr->inet.dst_ipaddr.addr,
buffer,
sizeof(
buffer));
2094 hostname, node_addr->inet.dst_port);
2101 pthread_mutex_unlock(&cluster->
mutex);
2106 pthread_mutex_unlock(&cluster->
mutex);
2118 pthread_mutex_unlock(&cluster->
mutex);
2120 *pool = found->
pool;
2152 found = talloc_zero_array(ctx,
fr_socket_t, in_use);
2158 pthread_mutex_lock(&cluster->
mutex);
2166 pthread_mutex_unlock(&cluster->
mutex);
2186 pthread_mutex_destroy(&cluster->
mutex);
2208 bool all_above =
true;
2210 pthread_mutex_lock(&cluster->
mutex);
2216 if (!conn)
continue;
2225 if (ret < 0)
continue;
2235 pthread_mutex_unlock(&cluster->
mutex);
2262 char const *log_prefix,
2263 char const *trigger_prefix,
2269 char const *cs_name1, *cs_name2;
2288 if (!trigger_prefix) {
2304 if (!cs_name2) cs_name2 = cs_name1;
2320 if (
conf->use_tls) {
2321#ifdef HAVE_REDIS_SSL
2323 fr_tls_conf_t *tls_conf;
2330 tls_conf = fr_tls_conf_parse_client(tls_cs);
2337 cluster->ssl_ctx = fr_tls_ctx_alloc(tls_conf,
true);
2338 if (!cluster->ssl_ctx) {
2344 WARN(
"%s - No redis SSL support, ignoring \"use_tls = yes\"", cluster->
log_prefix);
2354 if (
conf->max_nodes == 0) {
2355 ERROR(
"%s - Minimum number of nodes allowed is 1", cluster->
log_prefix);
2367 cluster->module =
module;
2384 pthread_mutex_init(&cluster->
mutex, NULL);
2391 for (i = 1; i < (cluster->
conf->max_nodes + 1); i++) {
2425 ERROR(
"%s - Number of bootstrap servers exceeds 'max_nodes'", cluster->
log_prefix);
2446 WARN(
"%s - Skipping duplicate bootstrap server \"%s\"", cluster->
log_prefix, server);
2455 if (af == AF_UNSPEC) af = node->
addr.inet.dst_ipaddr.
af;
2467 WARN(
"%s - Can't contact bootstrap server \"%s\"", cluster->
log_prefix, server);
2474 if (!cluster->
conf->use_cluster_map) {
2486 DEBUG(
"%s - Cluster map consists of %zu key ranges", cluster->
log_prefix, map->elements);
2487 for (j = 0; j < map->elements; j++) {
2488 redisReply *map_node = map->element[j];
2491 map_node->element[0]->integer,
2492 map_node->element[1]->integer);
2494 map_node->element[2]->element[0]->str,
2495 map_node->element[2]->element[1]->integer);
2496 for (k = 3; k < map_node->elements; k++) {
2498 map_node->element[k]->element[0]->str,
2499 map_node->element[k]->element[1]->integer);
2516 PWARN(
"%s - Bootstrap server \"%s\" returned invalid data", cluster->
log_prefix, server);
2521 PWARN(
"%s - Can't contact bootstrap server \"%s\"", cluster->
log_prefix, server);
2542 ERROR(
"%s - Can't contact any bootstrap servers", cluster->
log_prefix);
static int const char char buffer[256]
#define L(_str)
Helper for initialising arrays of string literals.
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Configuration AVP similar to a fr_pair_t.
A section grouping multiple CONF_PAIR.
CONF_PAIR * cf_pair_find_next(CONF_SECTION const *cs, CONF_PAIR const *prev, char const *attr)
Find a pair with a name matching attr, after specified pair.
char const * cf_section_name2(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
char const * cf_section_name1(CONF_SECTION const *cs)
Return the first identifier of a CONF_SECTION.
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
CONF_PAIR * cf_pair_find(CONF_SECTION const *cs, char const *attr)
Search for a CONF_PAIR with a specific name.
CONF_SECTION * cf_section_dup(TALLOC_CTX *ctx, CONF_SECTION *parent, CONF_SECTION const *cs, char const *name1, char const *name2, bool copy_meta)
Duplicate a configuration section.
char const * cf_pair_value(CONF_PAIR const *pair)
Return the value of a CONF_PAIR.
#define cf_section_alloc(_ctx, _parent, _name1, _name2)
bool remapping
True when cluster is being remapped.
static fr_redis_cluster_rcode_t cluster_map_get(redisReply **out, fr_redis_conn_t *conn)
Learn a new cluster layout by querying the node that issued the -MOVE.
fr_redis_rcode_t fr_redis_cluster_state_next(fr_redis_cluster_state_t *state, fr_redis_conn_t **conn, fr_redis_cluster_t *cluster, request_t *request, fr_redis_rcode_t status, redisReply **reply)
Get the next connection to attempt a command against.
CONF_SECTION *fr_redis_conf_t * conf
< Module configuration.
char const * log_prefix
What to prepend to log messages.
fr_pool_t * pool
Pool associated with this node.
char name[INET6_ADDRSTRLEN]
Buffer to hold IP string.
fr_redis_cluster_t * fr_redis_cluster_alloc(TALLOC_CTX *ctx, CONF_SECTION *module, fr_redis_conf_t *conf, char const *log_prefix, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Allocate and initialise a new cluster structure.
#define SET_ACTIVE(_node)
bool is_active
Whether this node is in the active node set.
fr_redis_cluster_node_t const * fr_redis_cluster_slave(fr_redis_cluster_t *cluster, fr_redis_cluster_key_slot_t const *key_slot, uint8_t slave_num)
Return the slave node that would be used for a particular key.
#define CLOSED_WEIGHT
What weight to give to nodes that had a connection closed recently.
fr_redis_cluster_node_t const * fr_redis_cluster_master(fr_redis_cluster_t *cluster, fr_redis_cluster_key_slot_t const *key_slot)
Return the master node that would be used for a particular key.
#define MAX_SLAVES
Maximum number of slaves associated with a keyslot.
static fr_redis_cluster_rcode_t cluster_node_connect(fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *node)
Establish a connection to a cluster node.
static int cluster_map_node_validate(redisReply *node, int map_idx, int node_idx)
Validate a cluster map node entry.
#define SET_ADDR(_addr, _map)
static uint16_t cluster_key_hash(uint8_t const *key, size_t key_len)
Resolve key to key slot.
static fr_redis_cluster_rcode_t cluster_redirect(fr_redis_cluster_node_t **out, fr_redis_cluster_t *cluster, redisReply *reply)
Retrieve or associate a node with the server indicated in the redirect.
static fr_redis_cluster_rcode_t cluster_node_conf_from_redirect(uint16_t *key_slot, fr_socket_t *node_addr, redisReply *redirect)
Parse a -MOVED or -ASK redirect.
static int cluster_node_find_live(fr_redis_cluster_node_t **live_node, fr_redis_conn_t **live_conn, request_t *request, fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *skip)
Attempt to find a live pool in the cluster.
#define CLOSED_PERIOD
How recently must the closed have occurred for us to care.
struct cluster_nodes_live_t::@53 node[UINT8_MAX - 1]
Array of live node IDs (and weights).
fr_redis_cluster_key_slot_t key_slot_pending[KEY_SLOTS]
Pending key slot table.
fr_rb_tree_t * used_nodes
Tree of used nodes.
fr_redis_cluster_key_slot_t key_slot[KEY_SLOTS]
Lookup table of slots to pools.
#define KEY_SLOTS
Maximum number of keyslots (should not change).
fr_redis_cluster_t * cluster
Common configuration (database number, password, etc..).
char const * trigger_prefix
Trigger path.
fr_rb_node_t rbnode
Entry into the tree of redis nodes.
bool remap_needed
Set true if at least one cluster node is definitely unreachable.
fr_time_t last_updated
Last time the cluster mappings were updated.
#define SET_INACTIVE(_node)
int fr_redis_cluster_pool_by_node_addr(fr_pool_t **pool, fr_redis_cluster_t *cluster, fr_socket_t *node_addr, bool create)
Get the pool associated with a node in the cluster.
fr_pair_list_t trigger_args
Arguments to pass to triggers.
ssize_t fr_redis_cluster_node_addr_by_role(TALLOC_CTX *ctx, fr_socket_t *out[], fr_redis_cluster_t *cluster, bool is_master, bool is_slave)
Return an array of IP addresses belonging to masters or slaves.
fr_socket_t pending_addr
New node address to be applied when the pool is reconnected.
uint8_t next
Next index in live.
size_t fr_redis_cluster_rcodes_table_len
static void _cluster_node_conf_apply(fr_pool_t *pool, void *opaque)
Reconnect callback to apply new pool config.
uint8_t slave[MAX_SLAVES]
R/O node (slave) for this key slot.
int fr_redis_cluster_port(uint16_t *out, fr_redis_cluster_node_t const *node)
Return the port of a particular node.
static int _fr_redis_cluster_free(fr_redis_cluster_t *cluster)
Destroy mutex associated with cluster slots structure.
#define FAILED_WEIGHT
What weight to give to nodes that had a spawn failure recently.
uint8_t master
R/W node (master) for this key slot.
static int cluster_node_pool_health(fr_time_t now, fr_pool_state_t const *state)
Try to determine the health of a cluster node passively by examining its pool state.
static fr_redis_cluster_rcode_t cluster_node_ping(request_t *request, fr_redis_cluster_node_t *node, fr_redis_conn_t *conn)
Issue a ping request against a cluster node.
void * fr_redis_cluster_conn_create(TALLOC_CTX *ctx, void *instance, fr_time_delta_t timeout)
Create a new connection to a Redis node.
fr_redis_cluster_key_slot_t const * fr_redis_cluster_slot_by_key(fr_redis_cluster_t *cluster, request_t *request, uint8_t const *key, size_t key_len)
Implements the key slot selection scheme used by freeradius.
static int8_t _cluster_node_cmp(void const *one, void const *two)
Compare two redis nodes to check equality.
fr_redis_cluster_rcode_t fr_redis_cluster_remap(request_t *request, fr_redis_cluster_t *cluster, fr_redis_conn_t *conn)
Perform a runtime remap of the cluster.
#define RELEASED_MIN_WEIGHT
Minimum weight to assign to node.
static fr_redis_cluster_rcode_t cluster_map_apply(fr_redis_cluster_t *cluster, redisReply *reply)
Apply a cluster map received from a cluster node.
uint8_t id
Node ID (index in node array).
bool triggers_enabled
Whether triggers are enabled.
fr_redis_rcode_t fr_redis_cluster_state_init(fr_redis_cluster_state_t *state, fr_redis_conn_t **conn, fr_redis_cluster_t *cluster, request_t *request, uint8_t const *key, size_t key_len, bool read_only)
Resolve a key to a pool, and reserve a connection in that pool.
static int _cluster_conn_free(fr_redis_conn_t *conn)
Callback for freeing a Redis connection.
fr_socket_t addr
Current node address.
fr_table_num_sorted_t const fr_redis_cluster_rcodes_table[]
bool is_master
Whether this node is a master.
#define FAILED_PERIOD
How recently must the spawn failure occurred for us to care.
fr_fifo_t * free_nodes
Queue of free nodes (or nodes waiting to be reused).
#define RELEASED_PERIOD
Period after which we don't care about when the last connection was released.
int fr_redis_cluster_ipaddr(fr_ipaddr_t *out, fr_redis_cluster_node_t const *node)
Return the ipaddr of a particular node.
pthread_mutex_t mutex
Mutex to synchronise cluster operations.
uint8_t slave_num
Number of slaves associated with this key slot.
fr_redis_cluster_node_t * node
Structure containing a node id, its address and a pool of its connections.
bool fr_redis_cluster_min_version(fr_redis_cluster_t *cluster, char const *min_version)
Check if members of the cluster are above a certain version.
CONF_SECTION * pool_cs
Pool configuration section associated with node.
Live nodes data, used to perform weighted random selection of alternative nodes.
Indexes in the fr_redis_cluster_node_t array for a single key slot.
Common functions for interacting with Redis cluster via Hiredis.
size_t key_len
Length of the key.
bool close_conn
Set by caller of fr_redis_cluster_state_next, to indicate that connection must be closed,...
uint32_t retries
How many times we've received TRYAGAIN.
uint32_t reconnects
How many connections we've tried in this pool.
uint8_t const * key
Key we performed hashing on.
fr_redis_cluster_rcode_t
Return values for internal functions.
@ FR_REDIS_CLUSTER_RCODE_IGNORED
Operation ignored.
@ FR_REDIS_CLUSTER_RCODE_FAILED
Operation failed.
@ FR_REDIS_CLUSTER_RCODE_SUCCESS
Operation completed successfully.
@ FR_REDIS_CLUSTER_RCODE_BAD_INPUT
Validation error.
@ FR_REDIS_CLUSTER_RCODE_NO_CONNECTION
Operation failed because we couldn't find a live connection.
uint32_t redirects
How many redirects have we followed.
fr_redis_cluster_node_t * node
Node we're communicating with.
uint32_t in_pool
How many available connections are there in the pool.
Redis connection sequence state.
uint16_t fr_crc16_xmodem(uint8_t const *in, size_t in_len)
CRC16 implementation according to CCITT standards.
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
void * fr_fifo_peek(fr_fifo_t *fi)
Examine the next element that would be popped.
unsigned int fr_fifo_num_elements(fr_fifo_t *fi)
Return the number of elements in the fifo queue.
int fr_fifo_push(fr_fifo_t *fi, void *data)
Push data onto the fifo.
void * fr_fifo_pop(fr_fifo_t *fi)
Pop data off of the fifo.
#define fr_fifo_create(_ctx, _max_entries, _node_free)
Creates a fifo.
int fr_inet_pton(fr_ipaddr_t *out, char const *value, ssize_t inlen, int af, bool resolve, bool mask)
Simple wrapper to decide whether an IP value is v4 or v6 and call the appropriate parser.
int fr_inet_pton_port(fr_ipaddr_t *out, uint16_t *port_out, char const *value, ssize_t inlen, int af, bool resolve, bool mask)
Parses IPv4/6 address + port, to fr_ipaddr_t and integer (port)
int8_t fr_ipaddr_cmp(fr_ipaddr_t const *a, fr_ipaddr_t const *b)
Compare two ip addresses.
#define REXDENT()
Exdent (unindent) R* messages by one level.
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
#define RPDEBUG2(fmt,...)
#define PDEBUG2(_fmt,...)
#define RPEDEBUG(fmt,...)
#define RINDENT()
Indent R* messages by one level.
@ L_DBG_LVL_3
3rd highest priority debug messages (-xxx | -Xx).
char const * inet_ntop(int af, void const *src, char *dst, size_t cnt)
int fr_pair_list_copy(TALLOC_CTX *ctx, fr_pair_list_t *to, fr_pair_list_t const *from)
Duplicate a list of pairs.
void fr_pair_list_init(fr_pair_list_t *list)
Initialise a pair list header.
int fr_pool_start(fr_pool_t *pool)
void fr_pool_connection_release(fr_pool_t *pool, request_t *request, void *conn)
Release a connection.
fr_pool_state_t const * fr_pool_state(fr_pool_t *pool)
Get the number of connections currently in the pool.
int fr_pool_reconnect(fr_pool_t *pool, request_t *request)
Mark connections for reconnection, and spawn at least 'start' connections.
fr_pool_t * fr_pool_init(TALLOC_CTX *ctx, CONF_SECTION const *cs, void *opaque, fr_pool_connection_create_t c, fr_pool_connection_alive_t a, char const *log_prefix)
Create a new connection pool.
int fr_pool_connection_close(fr_pool_t *pool, request_t *request, void *conn)
Delete a connection from the connection pool.
void * fr_pool_connection_get(fr_pool_t *pool, request_t *request)
Reserve a connection in the connection pool.
int fr_pool_start_num(fr_pool_t *pool)
Connection pool get start.
void fr_pool_enable_triggers(fr_pool_t *pool, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Enable triggers for a connection pool.
void fr_pool_reconnect_func(fr_pool_t *pool, fr_pool_reconnect_t reconnect)
Set a reconnection callback for the connection pool.
fr_time_t last_failed
Last time we tried to spawn a connection but failed.
fr_time_t last_closed
Last time a connection was closed.
fr_time_t last_released
Last time a connection was released.
uint32_t num
Number of connections in the pool.
uint32_t fr_rand(void)
Return a 32-bit random number.
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
Return how many nodes there are in a tree.
void * fr_rb_iter_init_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Initialise an in-order iterator.
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
void * fr_rb_iter_next_inorder(UNUSED fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Return the next node.
#define fr_rb_inline_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black tree.
Iterator structure for in-order traversal of an rbtree.
The main red black tree structure.
void fr_redis_reply_print(fr_log_lvl_t lvl, redisReply *reply, request_t *request, int idx, fr_redis_rcode_t status)
Print the response data in a useful treelike form.
fr_table_num_sorted_t const redis_rcodes[]
redisContext * handle
Hiredis context used when issuing commands.
fr_redis_cluster_node_t * node
Node this connection is to.
#define REDIS_ERROR_MOVED_STR
static void fr_redis_reply_free(redisReply **reply)
Wrap freeReplyObject so we consistently check for NULL pointers.
uint32_t fr_redis_version_num(char const *version)
Convert version string into a 32bit unsigned integer for comparisons.
fr_redis_rcode_t fr_redis_command_status(fr_redis_conn_t *conn, redisReply *reply)
Check the reply for errors.
fr_table_num_sorted_t const redis_reply_types[]
#define REDIS_ERROR_ASK_STR
fr_redis_rcode_t
Codes are ordered inversely by priority.
@ REDIS_RCODE_RECONNECT
Transitory error, caller should retry the operation with a new connection.
@ REDIS_RCODE_SUCCESS
Operation was successful.
@ REDIS_RCODE_MOVE
Attempt operation on an alternative node with remap.
@ REDIS_RCODE_TRY_AGAIN
Try the operation again.
@ REDIS_RCODE_NO_SCRIPT
Script doesn't exist.
@ REDIS_RCODE_ASK
Attempt operation on an alternative node.
@ REDIS_RCODE_ERROR
Unrecoverable library/server error.
fr_redis_rcode_t fr_redis_get_version(char *out, size_t out_len, fr_redis_conn_t *conn)
Get the version of Redis running on the remote server.
Configuration parameters for a redis connection.
Connection handle, holding a redis context.
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
An element in a lexicographically sorted array of name to num mappings.
char * talloc_typed_asprintf(TALLOC_CTX *ctx, char const *fmt,...)
Call talloc vasprintf, setting the type on the new chunk correctly.
int talloc_link_ctx(TALLOC_CTX *parent, TALLOC_CTX *child)
Link two different parent and child contexts, so the child is freed before the parent.
#define talloc_strdup(_ctx, _str)
static size_t talloc_strlen(char const *s)
Returns the length of a talloc array containing a string.
static int64_t fr_time_to_sec(fr_time_t when)
Convert an fr_time_t (internal time) to number of sec since the unix epoch (wallclock time)
#define fr_time_delta_to_timespec(_delta)
Convert a delta to a timespec.
#define fr_time_delta_ispos(_a)
#define fr_time_delta_to_timeval(_delta)
Convert a delta to a timeval.
#define fr_time_sub(_a, _b)
Subtract one time from another.
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
A time delta, a difference in time measured in nanoseconds.
void trigger_args_afrom_server(TALLOC_CTX *ctx, fr_pair_list_t *list, char const *server, uint16_t port)
Create trigger arguments to describe the server the pool connects to.
bool trigger_enabled(void)
Return whether triggers are enabled.
Master include file to access all functions and structures in the library.
bool fr_pair_list_empty(fr_pair_list_t const *list)
Is a valuepair list empty.
void fr_pair_list_free(fr_pair_list_t *list)
Free memory used by a valuepair list.
int af
AF_INET, AF_INET6, or AF_UNIX.
Holds information necessary for binding or connecting to a socket.
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
#define fr_strerror_const(_msg)
#define fr_box_strvalue_len(_val, _len)
static size_t char ** out