149#include <freeradius-devel/util/debug.h>
150#include <freeradius-devel/server/cf_parse.h>
152#include <freeradius-devel/util/fifo.h>
153#include <freeradius-devel/util/misc.h>
154#include <freeradius-devel/util/rand.h>
155#include <freeradius-devel/util/time.h>
163# undef HAVE_REDIS_SSL
167#include <freeradius-devel/tls/strerror.h>
168#include <hiredis/hiredis_ssl.h>
171#define KEY_SLOTS 16384
179#define CLOSED_PERIOD 10000
182#define CLOSED_WEIGHT 1
185#define FAILED_PERIOD 10000
188#define FAILED_WEIGHT 1
191#define RELEASED_PERIOD 10000
195#define RELEASED_MIN_WEIGHT 1000
203 unsigned int cumulative;
303 p = memchr(key,
'{', key_len);
309 q = memchr(p,
'}', key_len - (p - key));
310 if (!q || (q == p + 1))
goto all;
330 if (ret != 0)
return ret;
332 return CMP(a->
addr.inet.dst_port, b->
addr.inet.dst_port);
407 TALLOC_FREE(node->
pool);
448 redisReply *redirect)
455 if (!redirect || (redirect->type != REDIS_REPLY_ERROR)) {
468 if ((
size_t)(q - p) >= (
size_t)redirect->len) {
473 key = strtoul(p, &q, 10);
486 if (
fr_inet_pton_port(&ipaddr, &port, p, redirect->len - (p - redirect->str), AF_UNSPEC,
true,
true) < 0) {
491 if (key_slot) *key_slot = key;
493 node_addr->inet.dst_ipaddr = ipaddr;
494 node_addr->inet.dst_port = port;
549# define SET_ADDR(_addr, _map) \
552 _ret = fr_inet_pton(&_addr.inet.dst_ipaddr, _map->element[0]->str, _map->element[0]->len, AF_UNSPEC, true, true);\
553 fr_assert(_ret == 0);\
554 _addr.inet.dst_port = _map->element[1]->integer; \
557# define SET_ADDR(_addr, _map) \
559 fr_inet_pton(&_addr.inet.dst_ipaddr, _map->element[0]->str, _map->element[0]->len, AF_UNSPEC, true, true);\
560 _addr.inet.dst_port = _map->element[1]->integer; \
564#define SET_INACTIVE(_node) \
566 (_node)->is_active = false; \
567 (_node)->is_master = false; \
568 fr_rb_delete(cluster->used_nodes, _node); \
569 fr_fifo_push(cluster->free_nodes, _node); \
572#define SET_ACTIVE(_node) \
574 (_node)->is_active = true; \
575 fr_rb_insert(cluster->used_nodes, _node); \
576 fr_fifo_pop(cluster->free_nodes); \
577 active[(_node)->id] = true; \
578 rollback[r++] = (_node)->id; \
581 fr_assert(reply->type == REDIS_REPLY_ARRAY);
583 memset(&rollback, 0,
sizeof(rollback));
584 memset(active, 0,
sizeof(active));
585 memset(master, 0,
sizeof(master));
604 for (i = 0; i < reply->elements; i++) {
611 redisReply *map = reply->element[i];
613 memset(&tmpl_slot, 0,
sizeof(tmpl_slot));
618 active[found->
id] =
true;
619 goto reuse_master_node;
644 if (rcode < 0)
goto error;
657 master[found->
id] =
true;
666 for (j = 3; (j < map->elements); j++) {
670 active[found->
id] =
true;
675 if (!spare)
goto out_of_nodes;
684 tmpl_slot.
slave[slaves++] = found->
id;
695 for (k = map->element[0]->integer; k <= map->element[1]->integer; k++) {
737 for (i = 1; i < cluster->
conf->max_nodes; i++) {
757 }
else if (master[i]) {
790 if (node->type != REDIS_REPLY_ARRAY) {
809 if (node->elements < 2) {
810 fr_strerror_printf(
"Cluster map %i node %i has incorrect number of elements, expected at least "
811 "2 got %zu", map_idx, node_idx, node->elements);
815 if (node->element[0]->type != REDIS_REPLY_STRING) {
816 fr_strerror_printf(
"Cluster map %i node %i ip address is wrong type, expected string got %s",
822 if (
fr_inet_pton(&ipaddr, node->element[0]->str, node->element[0]->len, AF_UNSPEC,
true,
true) < 0) {
826 if (node->element[1]->type != REDIS_REPLY_INTEGER) {
827 fr_strerror_printf(
"Cluster map %i node %i port is wrong type, expected integer got %s",
833 if (node->element[1]->integer < 0) {
835 map_idx, node_idx, node->element[1]->integer);
839 if (node->element[1]->integer > UINT16_MAX) {
841 "got %lli", map_idx, node_idx, node->element[1]->integer);
871 reply = redisCommand(conn->
handle,
"cluster slots");
880 if (reply && reply->type == REDIS_REPLY_ERROR) {
892 if (reply->type != REDIS_REPLY_ARRAY) {
893 fr_strerror_printf(
"Bad response to \"cluster slots\" command, expected array got %s",
901 if (reply->elements == 0) {
902 fr_strerror_printf(
"Empty response to \"cluster slots\" command (zero length array)");
909 for (i = 0; i < reply->elements; i++) {
913 map = reply->element[i];
914 if (map->type != REDIS_REPLY_ARRAY) {
922 if (map->elements < 3) {
923 fr_strerror_printf(
"Cluster map %zu has too few elements, expected at least 3, got %zu",
931 if (map->element[0]->type != REDIS_REPLY_INTEGER) {
932 fr_strerror_printf(
"Cluster map %zu key slot start is wrong type, expected integer got %s",
937 if (map->element[0]->integer < 0) {
938 fr_strerror_printf(
"Cluster map %zu key slot start is too low, expected >= 0 got %lli",
939 i, map->element[0]->integer);
943 if (map->element[0]->integer >
KEY_SLOTS) {
952 if (map->element[1]->type != REDIS_REPLY_INTEGER) {
953 fr_strerror_printf(
"Cluster map %zu key slot end is wrong type, expected integer got %s",
958 if (map->element[1]->integer < 0) {
960 i, map->element[1]->integer);
964 if (map->element[1]->integer >
KEY_SLOTS) {
970 if (map->element[1]->integer < map->element[0]->integer) {
972 "Start was %lli, end was %lli", i, map->element[0]->integer,
973 map->element[1]->integer);
985 for (j = 3; j < map->elements; j++) {
1032 ROPTIONAL(
RWARN,
WARN,
"Cluster was updated less than a second ago, ignoring remap request");
1060 for (i = 0; i < map->elements; i++) {
1061 redisReply *map_node = map->element[i];
1064 map_node->element[0]->integer,
1065 map_node->element[1]->integer);
1069 map_node->element[2]->element[0]->str,
1070 map_node->element[2]->element[1]->integer);
1071 for (j = 3; j < map_node->elements; j++) {
1073 map_node->element[j]->element[0]->str,
1074 map_node->element[j]->element[1]->integer);
1085 pthread_mutex_lock(&cluster->
mutex);
1087 pthread_mutex_unlock(&cluster->
mutex);
1092 pthread_mutex_unlock(&cluster->
mutex);
1098 pthread_mutex_unlock(&cluster->
mutex);
1126 memset(&find, 0,
sizeof(find));
1132 pthread_mutex_lock(&cluster->
mutex);
1140 pthread_mutex_unlock(&cluster->
mutex);
1152 pthread_mutex_unlock(&cluster->
mutex);
1157 pthread_mutex_unlock(&cluster->
mutex);
1165 pthread_mutex_unlock(&cluster->
mutex);
1180 pthread_mutex_lock(&cluster->
mutex);
1182 pthread_mutex_unlock(&cluster->
mutex);
1250 reply = redisCommand(conn->
handle,
"PING");
1258 if (reply->type != REDIS_REPLY_STATUS) {
1260 node->
id, node->
name, node->
addr.inet.dst_port,
1332 pthread_mutex_lock(&cluster->
mutex);
1337 if (live->
skip == node->
id)
continue;
1342 pthread_mutex_unlock(&cluster->
mutex);
1345 if (live->
next == 1)
goto no_alts;
1352 for (i = 0; (i < cluster->
conf->max_alt) && live->
next; i++) {
1355 int first, last, pivot;
1356 unsigned int find, cumulative = 0;
1360 for (j = 0; j < live->
next; j++) {
1365 live->
node[j].cumulative = (cumulative += weight);
1372 find = (
fr_rand() & (cumulative - 1));
1374 last = live->
next - 1;
1375 pivot = (first + last) / 2;
1377 while (first <= last) {
1378 if (live->
node[pivot].cumulative < find) {
1380 }
else if (live->
node[pivot].cumulative == find) {
1385 pivot = (first + last) / 2;
1390 if (first > last) pivot = last + 1;
1396 node = &cluster->
node[live->
node[pivot].id];
1403 node->
name, node->
addr.inet.dst_port);
1409 if (pivot == live->
next) {
1413 memcpy(&live->
node[pivot], &live->
node[live->
next - 1],
sizeof(live->
node[pivot]));
1472 redisContext *handle;
1473 redisReply *reply = NULL;
1476 DEBUG2(
"%s - [%i] Connecting to node %s:%i", log_prefix, node->
id, node->
name, node->
addr.inet.dst_port);
1479 if ((handle != NULL) && handle->err) {
1480 ERROR(
"%s - [%i] Connection failed: %s", log_prefix, node->
id, handle->errstr);
1483 }
else if (!handle) {
1484 ERROR(
"%s - [%i] Connection failed", log_prefix, node->
id);
1493#ifdef HAVE_REDIS_SSL
1494 if (node->
cluster->ssl_ctx != NULL) {
1495 fr_tls_session_t *tls_session = fr_tls_session_alloc_client(conn, node->
cluster->ssl_ctx);
1497 fr_tls_strerror_printf(
"%s - [%i]", log_prefix, node->
id);
1498 ERROR(
"%s - [%i] Failed to allocate TLS session", log_prefix, node->
id);
1504 SSL_up_ref(tls_session->ssl);
1505 if (redisInitiateSSL(handle, tls_session->ssl) != REDIS_OK) {
1506 ERROR(
"%s - [%i] Failed to initiate SSL: %s", log_prefix, node->
id, handle->errstr);
1507 SSL_free(tls_session->ssl);
1516 DEBUG3(
"%s - [%i] Executing: AUTH %s %s", log_prefix, node->
id,
1519 reply = redisCommand(handle,
"AUTH %s %s",
1523 DEBUG3(
"%s - [%i] Executing: AUTH %s", log_prefix, node->
id, node->
cluster->
conf->password);
1524 reply = redisCommand(handle,
"AUTH %s", node->
cluster->
conf->password);
1527 ERROR(
"%s - [%i] Failed authenticating: %s", log_prefix, node->
id, handle->errstr);
1534 switch (reply->type) {
1535 case REDIS_REPLY_STATUS:
1536 if (strcmp(reply->str,
"OK") != 0) {
1537 ERROR(
"%s - [%i] Failed authenticating: %s", log_prefix,
1538 node->
id, reply->str);
1544 case REDIS_REPLY_ERROR:
1545 ERROR(
"%s - [%i] Failed authenticating: %s", log_prefix, node->
id, reply->str);
1549 ERROR(
"%s - [%i] Unexpected reply of type %s to AUTH", log_prefix, node->
id,
1556 DEBUG3(
"%s - [%i] Executing: SELECT %i", log_prefix, node->
id, node->
cluster->
conf->database);
1557 reply = redisCommand(handle,
"SELECT %i", node->
cluster->
conf->database);
1559 ERROR(
"%s - [%i] Failed selecting database %i: %s", log_prefix, node->
id,
1564 switch (reply->type) {
1565 case REDIS_REPLY_STATUS:
1566 if (strcmp(reply->str,
"OK") != 0) {
1567 ERROR(
"%s - [%i] Failed selecting database %i: %s", log_prefix, node->
id,
1574 case REDIS_REPLY_ERROR:
1575 ERROR(
"%s - [%i] Failed selecting database %i: %s", log_prefix, node->
id,
1580 ERROR(
"%s - [%i] Unexpected reply of type %s, to SELECT", log_prefix, node->
id,
1604 uint8_t const *key,
size_t key_len)
1608 if (!key || (key_len == 0)) {
1659 if (slave_num >= key_slot->
slave_num)
return NULL;
1661 return &cluster->
node[key_slot->
slave[slave_num]];
1674 if (!node)
return -1;
1676 memcpy(
out, &node->
addr.inet.dst_ipaddr,
sizeof(*
out));
1691 if (!node)
return -1;
1693 *
out = node->
addr.inet.dst_port;
1743 uint8_t const *key,
size_t key_len,
bool read_only)
1748 uint64_t used_nodes;
1754 memset(state, 0,
sizeof(*state));
1758 if (used_nodes == 0) {
1772 for (i = 0; i < key_slot->
slave_num; i++) {
1776 node = &cluster->
node[node_id];
1928 if (state->
retries++ >= cluster->
conf->max_retries) {
2013 if (
new == state->
node) {
2021 state->
node->
addr.inet.dst_port, new->id, new->name, new->addr.inet.dst_port);
2077 .dst_ipaddr = node_addr->inet.dst_ipaddr,
2078 .dst_port = node_addr->inet.dst_port,
2082 pthread_mutex_lock(&cluster->
mutex);
2086 char buffer[INET6_ADDRSTRLEN];
2090 pthread_mutex_unlock(&cluster->
mutex);
2095 hostname, node_addr->inet.dst_port);
2102 pthread_mutex_unlock(&cluster->
mutex);
2107 pthread_mutex_unlock(&cluster->
mutex);
2119 pthread_mutex_unlock(&cluster->
mutex);
2121 *pool = found->
pool;
2153 found = talloc_zero_array(ctx,
fr_socket_t, in_use);
2159 pthread_mutex_lock(&cluster->
mutex);
2167 pthread_mutex_unlock(&cluster->
mutex);
2187 pthread_mutex_destroy(&cluster->
mutex);
2209 bool all_above =
true;
2211 pthread_mutex_lock(&cluster->
mutex);
2217 if (!conn)
continue;
2226 if (ret < 0)
continue;
2236 pthread_mutex_unlock(&cluster->
mutex);
2264 bool triggers_enabled,
2265 char const *log_prefix,
2266 char const *trigger_prefix,
2272 char const *cs_name1, *cs_name2;
2280 fr_assert(triggers_enabled || !trigger_prefix);
2294 if (!trigger_prefix) {
2297 cluster->
trigger_prefix = talloc_strdup(cluster, trigger_prefix);
2310 if (!cs_name2) cs_name2 = cs_name1;
2313 cluster->
log_prefix = talloc_strdup(cluster, log_prefix);
2326 if (
conf->use_tls) {
2327#ifdef HAVE_REDIS_SSL
2329 fr_tls_conf_t *tls_conf;
2336 tls_conf = fr_tls_conf_parse_client(tls_cs);
2343 cluster->ssl_ctx = fr_tls_ctx_alloc(tls_conf,
true);
2344 if (!cluster->ssl_ctx) {
2350 WARN(
"%s - No redis SSL support, ignoring \"use_tls = yes\"", cluster->
log_prefix);
2360 if (
conf->max_nodes == 0) {
2361 ERROR(
"%s - Minimum number of nodes allowed is 1", cluster->
log_prefix);
2373 cluster->module =
module;
2390 pthread_mutex_init(&cluster->
mutex, NULL);
2397 for (i = 1; i < (cluster->
conf->max_nodes + 1); i++) {
2431 ERROR(
"%s - Number of bootstrap servers exceeds 'max_nodes'", cluster->
log_prefix);
2440 talloc_array_length(server) - 1, af,
true,
true) < 0) {
2452 WARN(
"%s - Skipping duplicate bootstrap server \"%s\"", cluster->
log_prefix, server);
2461 if (af == AF_UNSPEC) af = node->
addr.inet.dst_ipaddr.
af;
2473 WARN(
"%s - Can't contact bootstrap server \"%s\"", cluster->
log_prefix, server);
2480 if (!cluster->
conf->use_cluster_map) {
2492 DEBUG(
"%s - Cluster map consists of %zu key ranges", cluster->
log_prefix, map->elements);
2493 for (j = 0; j < map->elements; j++) {
2494 redisReply *map_node = map->element[j];
2497 map_node->element[0]->integer,
2498 map_node->element[1]->integer);
2500 map_node->element[2]->element[0]->str,
2501 map_node->element[2]->element[1]->integer);
2502 for (k = 3; k < map_node->elements; k++) {
2504 map_node->element[k]->element[0]->str,
2505 map_node->element[k]->element[1]->integer);
2522 PWARN(
"%s - Bootstrap server \"%s\" returned invalid data", cluster->
log_prefix, server);
2527 PWARN(
"%s - Can't contact bootstrap server \"%s\"", cluster->
log_prefix, server);
2548 ERROR(
"%s - Can't contact any bootstrap servers", cluster->
log_prefix);
static int const char char buffer[256]
#define L(_str)
Helper for initialising arrays of string literals.
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Configuration AVP similar to a fr_pair_t.
A section grouping multiple CONF_PAIR.
CONF_PAIR * cf_pair_find_next(CONF_SECTION const *cs, CONF_PAIR const *prev, char const *attr)
Find a pair with a name matching attr, after specified pair.
char const * cf_section_name2(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
char const * cf_section_name1(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
CONF_PAIR * cf_pair_find(CONF_SECTION const *cs, char const *attr)
Search for a CONF_PAIR with a specific name.
CONF_SECTION * cf_section_dup(TALLOC_CTX *ctx, CONF_SECTION *parent, CONF_SECTION const *cs, char const *name1, char const *name2, bool copy_meta)
Duplicate a configuration section.
char const * cf_pair_value(CONF_PAIR const *pair)
Return the value of a CONF_PAIR.
#define cf_section_alloc(_ctx, _parent, _name1, _name2)
bool remapping
True when cluster is being remapped.
static fr_redis_cluster_rcode_t cluster_map_get(redisReply **out, fr_redis_conn_t *conn)
Learn a new cluster layout by querying the node that issued the -MOVE.
fr_redis_cluster_t * fr_redis_cluster_alloc(TALLOC_CTX *ctx, CONF_SECTION *module, fr_redis_conf_t *conf, bool triggers_enabled, char const *log_prefix, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Allocate and initialise a new cluster structure.
fr_redis_rcode_t fr_redis_cluster_state_next(fr_redis_cluster_state_t *state, fr_redis_conn_t **conn, fr_redis_cluster_t *cluster, request_t *request, fr_redis_rcode_t status, redisReply **reply)
Get the next connection to attempt a command against.
CONF_SECTION *fr_redis_conf_t * conf
< Module configuration.
char const * log_prefix
What to prepend to log messages.
fr_pool_t * pool
Pool associated with this node.
char name[INET6_ADDRSTRLEN]
Buffer to hold IP string.
#define SET_ACTIVE(_node)
bool is_active
Whether this node is in the active node set.
fr_redis_cluster_node_t const * fr_redis_cluster_slave(fr_redis_cluster_t *cluster, fr_redis_cluster_key_slot_t const *key_slot, uint8_t slave_num)
Return the slave node that would be used for a particular key.
#define CLOSED_WEIGHT
What weight to give to nodes that had a connection closed recently.
fr_redis_cluster_node_t const * fr_redis_cluster_master(fr_redis_cluster_t *cluster, fr_redis_cluster_key_slot_t const *key_slot)
Return the master node that would be used for a particular key.
#define MAX_SLAVES
Maximum number of slaves associated with a keyslot.
static fr_redis_cluster_rcode_t cluster_node_connect(fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *node)
Establish a connection to a cluster node.
static int cluster_map_node_validate(redisReply *node, int map_idx, int node_idx)
Validate a cluster map node entry.
struct cluster_nodes_live_t::@52 node[UINT8_MAX - 1]
Array of live node IDs (and weights).
#define SET_ADDR(_addr, _map)
static uint16_t cluster_key_hash(uint8_t const *key, size_t key_len)
Resolve key to key slot.
static fr_redis_cluster_rcode_t cluster_redirect(fr_redis_cluster_node_t **out, fr_redis_cluster_t *cluster, redisReply *reply)
Retrieve or associate a node with the server indicated in the redirect.
static fr_redis_cluster_rcode_t cluster_node_conf_from_redirect(uint16_t *key_slot, fr_socket_t *node_addr, redisReply *redirect)
Parse a -MOVED or -ASK redirect.
static int cluster_node_find_live(fr_redis_cluster_node_t **live_node, fr_redis_conn_t **live_conn, request_t *request, fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *skip)
Attempt to find a live pool in the cluster.
#define CLOSED_PERIOD
How recently must the closed have occurred for us to care.
fr_redis_cluster_key_slot_t key_slot_pending[KEY_SLOTS]
Pending key slot table.
fr_rb_tree_t * used_nodes
Tree of used nodes.
fr_redis_cluster_key_slot_t key_slot[KEY_SLOTS]
Lookup table of slots to pools.
#define KEY_SLOTS
Maximum number of keyslots (should not change).
fr_redis_cluster_t * cluster
Common configuration (database number, password, etc..).
char const * trigger_prefix
Trigger path.
fr_rb_node_t rbnode
Entry into the tree of redis nodes.
bool remap_needed
Set true if at least one cluster node is definitely unreachable.
fr_time_t last_updated
Last time the cluster mappings were updated.
#define SET_INACTIVE(_node)
int fr_redis_cluster_pool_by_node_addr(fr_pool_t **pool, fr_redis_cluster_t *cluster, fr_socket_t *node_addr, bool create)
Get the pool associated with a node in the cluster.
fr_pair_list_t trigger_args
Arguments to pass to triggers.
ssize_t fr_redis_cluster_node_addr_by_role(TALLOC_CTX *ctx, fr_socket_t *out[], fr_redis_cluster_t *cluster, bool is_master, bool is_slave)
Return an array of IP addresses belonging to masters or slaves.
fr_socket_t pending_addr
New node address to be applied when the pool is reconnected.
uint8_t next
Next index in live.
size_t fr_redis_cluster_rcodes_table_len
static void _cluster_node_conf_apply(fr_pool_t *pool, void *opaque)
Reconnect callback to apply new pool config.
uint8_t slave[MAX_SLAVES]
R/O node (slave) for this key slot.
int fr_redis_cluster_port(uint16_t *out, fr_redis_cluster_node_t const *node)
Return the port of a particular node.
static int _fr_redis_cluster_free(fr_redis_cluster_t *cluster)
Destroy mutex associated with cluster slots structure.
#define FAILED_WEIGHT
What weight to give to nodes that had a spawn failure recently.
uint8_t master
R/W node (master) for this key slot.
static int cluster_node_pool_health(fr_time_t now, fr_pool_state_t const *state)
Try to determine the health of a cluster node passively by examining its pool state.
static fr_redis_cluster_rcode_t cluster_node_ping(request_t *request, fr_redis_cluster_node_t *node, fr_redis_conn_t *conn)
Issue a ping request against a cluster node.
void * fr_redis_cluster_conn_create(TALLOC_CTX *ctx, void *instance, fr_time_delta_t timeout)
Create a new connection to a Redis node.
fr_redis_cluster_key_slot_t const * fr_redis_cluster_slot_by_key(fr_redis_cluster_t *cluster, request_t *request, uint8_t const *key, size_t key_len)
Implements the key slot selection scheme used by freeradius.
static int8_t _cluster_node_cmp(void const *one, void const *two)
Compare two redis nodes to check equality.
fr_redis_cluster_rcode_t fr_redis_cluster_remap(request_t *request, fr_redis_cluster_t *cluster, fr_redis_conn_t *conn)
Perform a runtime remap of the cluster.
#define RELEASED_MIN_WEIGHT
Minimum weight to assign to node.
static fr_redis_cluster_rcode_t cluster_map_apply(fr_redis_cluster_t *cluster, redisReply *reply)
Apply a cluster map received from a cluster node.
uint8_t id
Node ID (index in node array).
bool triggers_enabled
Whether triggers are enabled.
fr_redis_rcode_t fr_redis_cluster_state_init(fr_redis_cluster_state_t *state, fr_redis_conn_t **conn, fr_redis_cluster_t *cluster, request_t *request, uint8_t const *key, size_t key_len, bool read_only)
Resolve a key to a pool, and reserve a connection in that pool.
static int _cluster_conn_free(fr_redis_conn_t *conn)
Callback for freeing a Redis connection.
fr_socket_t addr
Current node address.
fr_table_num_sorted_t const fr_redis_cluster_rcodes_table[]
bool is_master
Whether this node is a master.
#define FAILED_PERIOD
How recently must the spawn failure occurred for us to care.
fr_fifo_t * free_nodes
Queue of free nodes (or nodes waiting to be reused).
#define RELEASED_PERIOD
Period after which we don't care about when the last connection was released.
int fr_redis_cluster_ipaddr(fr_ipaddr_t *out, fr_redis_cluster_node_t const *node)
Return the ipaddr of a particular node.
pthread_mutex_t mutex
Mutex to synchronise cluster operations.
uint8_t slave_num
Number of slaves associated with this key slot.
fr_redis_cluster_node_t * node
Structure containing a node id, its address and a pool of its connections.
bool fr_redis_cluster_min_version(fr_redis_cluster_t *cluster, char const *min_version)
Check if members of the cluster are above a certain version.
CONF_SECTION * pool_cs
Pool configuration section associated with node.
Live nodes data, used to perform weighted random selection of alternative nodes.
Indexes in the fr_redis_cluster_node_t array for a single key slot.
Common functions for interacting with Redis cluster via Hiredis.
size_t key_len
Length of the key.
bool close_conn
Set by caller of fr_redis_cluster_state_next, to indicate that connection must be closed,...
uint32_t retries
How many times we've received TRYAGAIN.
uint32_t reconnects
How many connections we've tried in this pool.
uint8_t const * key
Key we performed hashing on.
fr_redis_cluster_rcode_t
Return values for internal functions.
@ FR_REDIS_CLUSTER_RCODE_IGNORED
Operation ignored.
@ FR_REDIS_CLUSTER_RCODE_FAILED
Operation failed.
@ FR_REDIS_CLUSTER_RCODE_SUCCESS
Operation completed successfully.
@ FR_REDIS_CLUSTER_RCODE_BAD_INPUT
Validation error.
@ FR_REDIS_CLUSTER_RCODE_NO_CONNECTION
Operation failed because we couldn't find a live connection.
uint32_t redirects
How many redirects have we followed.
fr_redis_cluster_node_t * node
Node we're communicating with.
uint32_t in_pool
How many available connections are there in the pool.
Redis connection sequence state.
uint16_t fr_crc16_xmodem(uint8_t const *in, size_t in_len)
CRC16 implementation according to CCITT standards.
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
void * fr_fifo_peek(fr_fifo_t *fi)
Examine the next element that would be popped.
unsigned int fr_fifo_num_elements(fr_fifo_t *fi)
Return the number of elements in the fifo queue.
int fr_fifo_push(fr_fifo_t *fi, void *data)
Push data onto the fifo.
void * fr_fifo_pop(fr_fifo_t *fi)
Pop data off of the fifo.
#define fr_fifo_create(_ctx, _max_entries, _node_free)
Creates a fifo.
int fr_inet_pton(fr_ipaddr_t *out, char const *value, ssize_t inlen, int af, bool resolve, bool mask)
Simple wrapper to decide whether an IP value is v4 or v6 and call the appropriate parser.
int fr_inet_pton_port(fr_ipaddr_t *out, uint16_t *port_out, char const *value, ssize_t inlen, int af, bool resolve, bool mask)
Parses IPv4/6 address + port, to fr_ipaddr_t and integer (port)
int8_t fr_ipaddr_cmp(fr_ipaddr_t const *a, fr_ipaddr_t const *b)
Compare two ip addresses.
#define REXDENT()
Exdent (unindent) R* messages by one level.
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
#define RPDEBUG2(fmt,...)
#define PDEBUG2(_fmt,...)
#define RPEDEBUG(fmt,...)
#define RINDENT()
Indent R* messages by one level.
@ L_DBG_LVL_3
3rd highest priority debug messages (-xxx | -Xx).
char const * inet_ntop(int af, void const *src, char *dst, size_t cnt)
int fr_pair_list_copy(TALLOC_CTX *ctx, fr_pair_list_t *to, fr_pair_list_t const *from)
Duplicate a list of pairs.
void fr_pair_list_init(fr_pair_list_t *list)
Initialise a pair list header.
int fr_pool_start(fr_pool_t *pool)
void fr_pool_connection_release(fr_pool_t *pool, request_t *request, void *conn)
Release a connection.
fr_pool_state_t const * fr_pool_state(fr_pool_t *pool)
Get the number of connections currently in the pool.
int fr_pool_reconnect(fr_pool_t *pool, request_t *request)
Mark connections for reconnection, and spawn at least 'start' connections.
fr_pool_t * fr_pool_init(TALLOC_CTX *ctx, CONF_SECTION const *cs, void *opaque, fr_pool_connection_create_t c, fr_pool_connection_alive_t a, char const *log_prefix)
Create a new connection pool.
int fr_pool_connection_close(fr_pool_t *pool, request_t *request, void *conn)
Delete a connection from the connection pool.
void * fr_pool_connection_get(fr_pool_t *pool, request_t *request)
Reserve a connection in the connection pool.
int fr_pool_start_num(fr_pool_t *pool)
Connection pool get start.
void fr_pool_enable_triggers(fr_pool_t *pool, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Enable triggers for a connection pool.
void fr_pool_reconnect_func(fr_pool_t *pool, fr_pool_reconnect_t reconnect)
Set a reconnection callback for the connection pool.
fr_time_t last_failed
Last time we tried to spawn a connection but failed.
fr_time_t last_closed
Last time a connection was closed.
fr_time_t last_released
Last time a connection was released.
uint32_t num
Number of connections in the pool.
static char const * hostname(char *buf, size_t buflen, uint32_t ipaddr)
uint32_t fr_rand(void)
Return a 32-bit random number.
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
Return how many nodes there are in a tree.
void * fr_rb_iter_init_inorder(fr_rb_iter_inorder_t *iter, fr_rb_tree_t *tree)
Initialise an in-order iterator.
void * fr_rb_iter_next_inorder(fr_rb_iter_inorder_t *iter)
Return the next node.
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
#define fr_rb_inline_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black tree.
Iterator structure for in-order traversal of an rbtree.
The main red black tree structure.
fr_table_num_sorted_t const redis_rcodes[]
redisContext * handle
Hiredis context used when issuing commands.
fr_redis_cluster_node_t * node
Node this connection is to.
void fr_redis_reply_print(fr_log_lvl_t lvl, redisReply *reply, request_t *request, int idx)
Print the response data in a useful treelike form.
#define REDIS_ERROR_MOVED_STR
static void fr_redis_reply_free(redisReply **reply)
Wrap freeReplyObject so we consistently check for NULL pointers.
uint32_t fr_redis_version_num(char const *version)
Convert version string into a 32bit unsigned integer for comparisons.
fr_redis_rcode_t fr_redis_command_status(fr_redis_conn_t *conn, redisReply *reply)
Check the reply for errors.
fr_table_num_sorted_t const redis_reply_types[]
#define REDIS_ERROR_ASK_STR
fr_redis_rcode_t
Codes are ordered inversely by priority.
@ REDIS_RCODE_RECONNECT
Transitory error, caller should retry the operation with a new connection.
@ REDIS_RCODE_SUCCESS
Operation was successful.
@ REDIS_RCODE_MOVE
Attempt operation on an alternative node with remap.
@ REDIS_RCODE_TRY_AGAIN
Try the operation again.
@ REDIS_RCODE_NO_SCRIPT
Script doesn't exist.
@ REDIS_RCODE_ASK
Attempt operation on an alternative node.
@ REDIS_RCODE_ERROR
Unrecoverable library/server error.
fr_redis_rcode_t fr_redis_get_version(char *out, size_t out_len, fr_redis_conn_t *conn)
Get the version of Redis running on the remote server.
Configuration parameters for a redis connection.
Connection handle, holding a redis context.
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
#define fr_time()
Allow us to arbitrarily manipulate time.
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
An element in a lexicographically sorted array of name to num mappings.
char * talloc_typed_asprintf(TALLOC_CTX *ctx, char const *fmt,...)
Call talloc vasprintf, setting the type on the new chunk correctly.
int talloc_link_ctx(TALLOC_CTX *parent, TALLOC_CTX *child)
Link two different parent and child contexts, so the child is freed before the parent.
static int64_t fr_time_to_sec(fr_time_t when)
Convert an fr_time_t (internal time) to number of sec since the unix epoch (wallclock time)
#define fr_time_delta_to_timespec(_delta)
Convert a delta to a timespec.
#define fr_time_delta_ispos(_a)
#define fr_time_delta_to_timeval(_delta)
Convert a delta to a timeval.
#define fr_time_sub(_a, _b)
Subtract one time from another.
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
A time delta, a difference in time measured in nanoseconds.
void trigger_args_afrom_server(TALLOC_CTX *ctx, fr_pair_list_t *list, char const *server, uint16_t port)
Create trigger arguments to describe the server the pool connects to.
bool trigger_enabled(void)
Return whether triggers are enabled.
Master include file to access all functions and structures in the library.
bool fr_pair_list_empty(fr_pair_list_t const *list)
Is a valuepair list empty.
void fr_pair_list_free(fr_pair_list_t *list)
Free memory used by a valuepair list.
int af
AF_INET, AF_INET6, or AF_UNIX.
Holds information necessary for binding or connecting to a socket.
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
#define fr_strerror_const(_msg)
#define fr_box_strvalue_len(_val, _len)
static size_t char ** out