The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
cluster.c
Go to the documentation of this file.
1 /*
2  * This program is is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or (at
5  * your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: 0f86002214f06e4b2e32226734906322318e31b3 $
19  * @file redis.c
20  * @brief conf functions for interacting with Redis cluster via Hiredis.
21  *
22  * @author Arran Cudbard-Bell
23  *
24  * @copyright 2015 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
25  * @copyright 2015 Network RADIUS (legal@networkradius.com)
26  * @copyright 2015 The FreeRADIUS server project
27  *
28  * Overview
29  * ========
30  *
31  * Read and understand this http://redis.io/topics/cluster-spec first, else the text below
32  * will not be useful.
33  *
34  * Using the cluster's public API
35  * ------------------------------
36  *
37  * The two functions used at runtime to issue commands are #fr_redis_cluster_state_init
38  * and #fr_redis_cluster_state_next.
39  *
40  * #fr_redis_cluster_state_init initialises the structure we use to track which node
41  * we're communicating with, and uses a key (string) to determine which node we should
42  * try and contact first.
43  *
44  * #fr_redis_cluster_state_next examines the result of the last command, and either
45  * gets a new connection, or errors out.
46  *
47  * In both cases the connection should not be released by the caller, only by
48  * #fr_redis_cluster_state_next.
49  *
50  * Below is the sequence of calls required to use the cluster:
51  *
52  * 1. During initialization allocate a cluster configuration structure with
53  * #fr_redis_cluster_alloc. This holds the cluster configuration and state.
54  * 2. Declare a #fr_redis_cluster_state_t in the function that needs to issue
55  * commands against the cluster.
56  * 3. Call #fr_redis_cluster_state_init with relevant arguments including a pointer
57  * to the #fr_redis_cluster_state_t struct, and a key/key_len used for initial
58  * lookup. For most commands the key is the value of the second argument.
59  * #fr_redis_cluster_state_init will then and reserve/pass back a connection for
60  * the pool associated with the node associated with the key.
61  * 4. Use the connection that was passed back, to issue a Redis command against.
62  * 5. Use #fr_redis_command_status to translate the result of the command into
63  * a #fr_redis_rcode_t value.
64  * 6. Call #fr_redis_cluster_state_next with relevant arguments including a pointer
65  * to the #fr_redis_cluster_state_t struct and the #fr_redis_rcode_t value for the
66  * last command.
67  * 7. If #fr_redis_cluster_state_next returns 0, repeat steps 4-7. Otherwise
68  * stop and analyse the return value.
69  *
70  * See #fr_redis_cluster_state_init for example code.
71  *
72  * Structures
73  * ----------
74  *
75  * This code maintains a series structures for efficient lookup and lockless operations.
76  *
77  * The important ones are:
78  * - An array of #fr_redis_cluster_node_t. These are pre-allocated on startup and are
79  * never added to, or removed from.
80  * - An #fr_fifo_t. This contains the queue of nodes that may be re-used.
81  * - An #fr_rb_tree_t. This contains a tree of nodes which are active. The tree is built on IP
82  * address and port.
83  *
84  * Each #fr_redis_cluster_node_t contains a master ID, and an array of slave IDs. The IDs are array
85  * indexes in the fr_redis_cluster_t.node array. We use 8bit unsigned integers instead of
86  * pointers to save space. Using pointers, the node[] array would need 784K, using IDs
87  * it uses 112K. Still not light on memory, but a bit more acceptable.
88  * Currently the key_slot array is shadowed by key_slot_pending, used to stage new key_slot
89  * mappings. This doubles the memory used. We may want to consider allocating key_slot_pending
90  * only during remappings and freeing it after.
91  *
92  * Mapping/Remapping the cluster
93  * -----------------------------
94  *
95  * On startup, and during cluster operation, a remap may be performed. A remap involves
96  * the following steps:
97  *
98  * 1. Executing the Redis 'cluster slots' command.
99  * 2. Validating the result of this command. We need to do extensive validation to
100  * avoid SEGV on invalid data, due to the way libhiredis presents the result.
101  * 3. Determining the intersection between nodes described in the result, and those already
102  * in our #fr_rb_tree_t.
103  * 4. Connecting to nodes that were in the result, but not in the tree.
104  * Note: If we can't connect to any of the masters, we count the map as invalid, roll
105  * back any newly connected nodes, and error out. Slave failure is OK.
106  * 5. Mapping keyslot ranges to nodes in the key_slot_pending array.
107  * 6. Verifying there are no holes in the ranges (if there are, we roll back and error out).
108  * 7. Applying the new keyslot ranges.
109  * 8. Removing nodes no longer used by the key slots, and adding them back to the free
110  * nodes queue.
111  *
112  * #cluster_map_get and #cluster_map_apply, perform the operations described
113  * above. The get function, issues the 'cluster slots' command and performs validation, the
114  * apply function processes and applies the map.
115  *
116  * Failing to apply a map is not a fatal error at runtime, and is only fatal on startup if
117  * pool.start > 0.
118  *
119  * The cluster client can continue to operate, albeit inefficiently, with a stale cluster map
120  * by following '-ASK' and '-MOVE' redirects.
121  *
122  * Remaps are limited to one per second. If any operation sets the remap_needed flag, or
123  * attempts a remap directly, the remap may be skipped if one occurred recently.
124  *
125  *
126  * Processing '-ASK' and '-MOVE' redirects
127  * ---------------------------------------
128  *
129  * The code treats '-ASK' (temporary redirect) and '-MOVE' (permanent redirect) responses
130  * similarly. If the node is known, then a connection is reserved from its pool, if the node
131  * is not known, a new pool is established, and a connection reserved.
132  *
133  * The difference between '-ASK' and '-MOVE' is that '-MOVE' attempts a cluster remap before
134  * following the redirect.
135  *
136  * The data from '-MOVE' responses, is not used to alter the cluster map. That is only done
137  * on successful remap.
138  *
139  *
140  * Processing '-TRYAGAIN'
141  * ----------------------
142  *
143  * If the cluster is in a state of flux, a node may return '-TRYAGAIN' to indicated that we
144  * should attempt the operation again. The cluster spec says we should attempt the operation
145  * after some time. This time is configurable.
146  *
147  */
148 
149 #include <freeradius-devel/util/debug.h>
150 #include <freeradius-devel/server/cf_parse.h>
151 
152 #include <freeradius-devel/util/fifo.h>
153 #include <freeradius-devel/util/misc.h>
154 #include <freeradius-devel/util/rand.h>
155 #include <freeradius-devel/util/time.h>
156 
157 #include "config.h"
158 #include "base.h"
159 #include "cluster.h"
160 #include "crc16.h"
161 
162 #ifdef HAVE_REDIS_SSL
163 #include <freeradius-devel/tls/strerror.h>
164 #include <hiredis/hiredis_ssl.h>
165 #endif
166 
167 #define KEY_SLOTS 16384 //!< Maximum number of keyslots (should not change).
168 
169 #define MAX_SLAVES 5 //!< Maximum number of slaves associated
170  //!< with a keyslot.
171 
172 /*
173  * Periods and weights for live node selection
174  */
175 #define CLOSED_PERIOD 10000 //!< How recently must the closed have
176  //!< occurred for us to care.
177 
178 #define CLOSED_WEIGHT 1 //!< What weight to give to nodes that
179  //!< had a connection closed recently.
180 
181 #define FAILED_PERIOD 10000 //!< How recently must the spawn failure
182  //!< occurred for us to care.
183 
184 #define FAILED_WEIGHT 1 //!< What weight to give to nodes that
185  //!< had a spawn failure recently.
186 
187 #define RELEASED_PERIOD 10000 //!< Period after which we don't care
188  //!< about when the last connection was
189  //!< released.
190 
191 #define RELEASED_MIN_WEIGHT 1000 //!< Minimum weight to assign to node.
192 
193 /** Live nodes data, used to perform weighted random selection of alternative nodes
194  */
195 typedef struct {
196  struct {
197  uint8_t id; //!< Node ID.
198  fr_pool_state_t const *pool_state; //!< Connection pool stats.
199  unsigned int cumulative; //!< Cumulative weight.
200  } node[UINT8_MAX - 1]; //!< Array of live node IDs (and weights).
201  uint8_t next; //!< Next index in live.
204 
205 /** A Redis cluster node
206  *
207  * Passed as opaque data to pools which open connection to nodes.
208  */
210  fr_rb_node_t rbnode; //!< Entry into the tree of redis nodes.
211 
212  char name[INET6_ADDRSTRLEN]; //!< Buffer to hold IP string.
213  //!< text for debug messages.
214  uint8_t id; //!< Node ID (index in node array).
215 
216  fr_socket_t addr; //!< Current node address.
217  fr_socket_t pending_addr; //!< New node address to be applied when the pool
218  //!< is reconnected.
219 
220  fr_redis_cluster_t *cluster; //!< Common configuration (database number,
221  //!< password, etc..).
222  fr_pool_t *pool; //!< Pool associated with this node.
223  CONF_SECTION *pool_cs; //!< Pool configuration section associated with node.
224 
225  bool is_active; //!< Whether this node is in the active node set.
226  bool is_master; //!< Whether this node is a master.
227  //!< This is needed for commands like 'KEYS', which
228  //!< we need to issue to every master in the cluster.
229 };
230 
231 /** Indexes in the fr_redis_cluster_node_t array for a single key slot
232  *
233  * When dealing with 16K entries, space is a concern. It's significantly
234  * more memory efficient to use 8bit indexes than 64bit pointers for each
235  * of the key slot to node mappings.
236  */
238  uint8_t slave[MAX_SLAVES]; //!< R/O node (slave) for this key slot.
239  uint8_t slave_num; //!< Number of slaves associated with this key slot.
240  uint8_t master; //!< R/W node (master) for this key slot.
241 };
242 
243 /** A redis cluster
244  *
245  * Holds all the structures and collections of nodes, to represent a Redis cluster.
246  */
248  char const *log_prefix; //!< What to prepend to log messages.
249  char const *trigger_prefix; //!< Trigger path.
250  fr_pair_list_t trigger_args; //!< Arguments to pass to triggers.
251  bool triggers_enabled; //!< Whether triggers are enabled.
252 
253  bool remapping; //!< True when cluster is being remapped.
254  bool remap_needed; //!< Set true if at least one cluster node is definitely
255  //!< unreachable. Set false on successful remap.
256  fr_time_t last_updated; //!< Last time the cluster mappings were updated.
257  CONF_SECTION *module; //!< Module configuration.
258 
259  fr_redis_conf_t *conf; //!< Base configuration data such as the database number
260  //!< and passwords.
261 #ifdef HAVE_REDIS_SSL
262  SSL_CTX *ssl_ctx; //!< SSL context.
263 #endif
264 
265  fr_redis_cluster_node_t *node; //!< Structure containing a node id, its address and
266  //!< a pool of its connections.
267 
268  fr_fifo_t *free_nodes; //!< Queue of free nodes (or nodes waiting to be reused).
269  fr_rb_tree_t *used_nodes; //!< Tree of used nodes.
270 
271  fr_redis_cluster_key_slot_t key_slot[KEY_SLOTS]; //!< Lookup table of slots to pools.
273 
274  pthread_mutex_t mutex; //!< Mutex to synchronise cluster operations.
275 };
276 
278  { L("bad-input"), FR_REDIS_CLUSTER_RCODE_BAD_INPUT },
279  { L("failed"), FR_REDIS_CLUSTER_RCODE_FAILED },
280  { L("ignored"), FR_REDIS_CLUSTER_RCODE_IGNORED },
281  { L("no-connection"), FR_REDIS_CLUSTER_RCODE_NO_CONNECTION },
282  { L("success"), FR_REDIS_CLUSTER_RCODE_SUCCESS }
283 };
285 
286 /** Resolve key to key slot
287  *
288  * Identical to the example implementation, except it uses memchr which will
289  * be faster, and isn't so needlessly complex.
290  *
291  * @param[in] key to resolve.
292  * @param[in] key_len length of key.
293  * @return key slot index for the key.
294  */
295 static uint16_t cluster_key_hash(uint8_t const *key, size_t key_len)
296 {
297  uint8_t *p, *q;
298 
299  p = memchr(key, '{', key_len);
300  if (!p) {
301  all:
302  return fr_crc16_xmodem(key, key_len) & (KEY_SLOTS - 1);
303  }
304 
305  q = memchr(p, '}', key_len - (p - key)); /* look for } after { */
306  if (!q || (q == p + 1)) goto all; /* no } or {}, hash everything */
307 
308  p++; /* skip '{' */
309 
310  return fr_crc16_xmodem(p, q - p) & (KEY_SLOTS - 1); /* hash stuff between { and } */
311 }
312 
313 /** Compare two redis nodes to check equality
314  *
315  * @param[in] one first node.
316  * @param[in] two second node.
317  * @return CMP(one, two)
318  */
319 static int8_t _cluster_node_cmp(void const *one, void const *two)
320 {
321  fr_redis_cluster_node_t const *a = one;
322  fr_redis_cluster_node_t const *b = two;
323  int ret;
324 
325  ret = fr_ipaddr_cmp(&a->addr.inet.dst_ipaddr, &b->addr.inet.dst_ipaddr);
326  if (ret != 0) return ret;
327 
328  return CMP(a->addr.inet.dst_port, b->addr.inet.dst_port);
329 }
330 
331 /** Reconnect callback to apply new pool config
332  *
333  * @param[in] pool to apply new configuration to.
334  * @param[in] opaque data passed to the connection pool.
335  */
336 static void _cluster_node_conf_apply(fr_pool_t *pool, void *opaque)
337 {
339  fr_redis_cluster_node_t *node = opaque;
340 
342  node->addr = node->pending_addr;
343 
344  if (node->cluster->triggers_enabled) {
345  trigger_args_afrom_server(pool, &args, node->name, node->addr.inet.dst_port);
346  if (fr_pair_list_empty(&args)) return;
347 
349  &node->cluster->trigger_args) >= 0);
350 
352 
354  }
355 }
356 
357 /** Establish a connection to a cluster node
358  *
359  * @note Must be called with the cluster mutex locked.
360  * @note Configuration to use for the connection must be set in node->pending_addr, not node->cluster->conf.
361  *
362  * @param[in] cluster to search in.
363  * @param[in] node config.
364  * @return
365  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
366  * - FR_REDIS_CLUSTER_RCODE_FAILED if the operation failed.
367  */
369 {
370  char const *p;
371 
372  fr_assert(node->pending_addr.inet.dst_ipaddr.af);
373 
374  /*
375  * Write out the IP address and Port in string form
376  */
377  p = inet_ntop(node->pending_addr.inet.dst_ipaddr.af, &node->pending_addr.inet.dst_ipaddr.addr,
378  node->name, sizeof(node->name));
380 
381  /*
382  * Node has never been used before, needs a pool allocated for it.
383  */
384  if (!node->pool) {
385  char buffer[256];
387  CONF_SECTION *pool;
388 
390  snprintf(buffer, sizeof(buffer), "%s [%i]", cluster->log_prefix, node->id);
391 
392  pool = cf_section_find(cluster->module, "pool", NULL);
393  /*
394  * Dup so we can re-parse, and have unique CONF_DATA
395  */
396  node->pool_cs = cf_section_dup(cluster, NULL, pool, "pool", NULL, true);
397  node->addr = node->pending_addr;
398  node->pool = fr_pool_init(cluster, node->pool_cs, node,
400  if (!node->pool) {
401  error:
402  TALLOC_FREE(node->pool_cs);
403  TALLOC_FREE(node->pool);
405  }
407 
408  if (trigger_enabled() && cluster->triggers_enabled) {
409  trigger_args_afrom_server(node->pool, &args, node->name, node->addr.inet.dst_port);
410  if (fr_pair_list_empty(&args)) goto error;
411 
412  if (!fr_pair_list_empty(&cluster->trigger_args)) MEM(fr_pair_list_copy(cluster, &args, &cluster->trigger_args) >= 0);
413 
415 
417  }
418 
419  if (fr_pool_start(node->pool) < 0) goto error;
420 
422  }
423 
424  /*
425  * Apply the new config to the possibly live pool
426  */
427  if (fr_pool_reconnect(node->pool, NULL) < 0) goto error;
428 
430 }
431 
432 /** Parse a -MOVED or -ASK redirect
433  *
434  * Converts the body of the -MOVED or -ASK error into an IPv4/6 address and port.
435  *
436  * @param[out] key_slot value extracted from redirect string (may be NULL).
437  * @param[out] node_addr Redis node ipaddr and port extracted from redirect string.
438  * @param[in] redirect to process.
439  * @return
440  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
441  * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT if the server returned an invalid redirect.
442  */
444  redisReply *redirect)
445 {
446  char *p, *q;
447  unsigned long key;
448  uint16_t port;
449  fr_ipaddr_t ipaddr;
450 
451  if (!redirect || (redirect->type != REDIS_REPLY_ERROR)) {
453  }
454 
455  p = redirect->str;
456  if (strncmp(REDIS_ERROR_MOVED_STR, redirect->str, sizeof(REDIS_ERROR_MOVED_STR) - 1) == 0) {
457  q = p + sizeof(REDIS_ERROR_MOVED_STR); /* not a typo, skip space too */
458  } else if (strncmp(REDIS_ERROR_ASK_STR, redirect->str, sizeof(REDIS_ERROR_ASK_STR) - 1) == 0) {
459  q = p + sizeof(REDIS_ERROR_ASK_STR); /* not a typo, skip space too */
460  } else {
461  fr_strerror_const("No '-MOVED' or '-ASK' log_prefix");
463  }
464  if ((size_t)(q - p) >= (size_t)redirect->len) {
465  fr_strerror_const("Truncated");
467  }
468  p = q;
469  key = strtoul(p, &q, 10);
470  if (key > KEY_SLOTS) {
471  fr_strerror_printf("Key %lu outside of redis slot range", key);
473  }
474  p = q;
475 
476  if (*p != ' ') {
477  fr_strerror_const("Missing key/host separator");
479  }
480  p++; /* Skip the ' ' */
481 
482  if (fr_inet_pton_port(&ipaddr, &port, p, redirect->len - (p - redirect->str), AF_UNSPEC, true, true) < 0) {
484  }
485  fr_assert(ipaddr.af);
486 
487  if (key_slot) *key_slot = key;
488  if (node_addr) {
489  node_addr->inet.dst_ipaddr = ipaddr;
490  node_addr->inet.dst_port = port;
491  }
492 
494 }
495 
496 /** Apply a cluster map received from a cluster node
497  *
498  * @note Errors may be retrieved with fr_strerror().
499  * @note Must be called with the cluster mutex held.
500  *
501  * Key slot range structure
502  @verbatim
503  [0] -> key slot range 0
504  [0] -> key_slot_start
505  [1] -> key_slot_end
506  [2] -> master_node
507  [0] -> master 0 ip (string)
508  [1] -> master 0 port (number)
509  [3..n] -> slave_node(s)
510  [1] -> key slot range 1)
511  [0] -> key_slot_start
512  [1] -> key_slot_end
513  [2] -> master_node
514  [0] -> master 1 ip (string)
515  [1] -> master 1 port (number)
516  [3..n] -> slave_node(s)
517  [n] -> key slot range n
518  [0] -> key_slot_start
519  [1] -> key_slot_end
520  [2] -> master_node
521  [0] -> master n ip (string)
522  [1] -> master n port (number)
523  [3..n] -> slave_node(s)
524  @endverbatim
525  *
526  * @param[in,out] cluster to apply map to.
527  * @param[in] reply from #cluster_map_get.
528  * @return
529  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
530  * - FR_REDIS_CLUSTER_RCODE_FAILED on failure.
531  * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
532  * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT if the map didn't provide nodes for all keyslots.
533  */
535 {
536  size_t i;
537  uint8_t r = 0;
538 
540 
541  uint8_t rollback[UINT8_MAX]; // Set of nodes to re-add to the queue on failure.
542  bool active[UINT8_MAX]; // Set of nodes active in the new cluster map.
543  bool master[UINT8_MAX]; // Master nodes.
544 #ifndef NDEBUG
545 # define SET_ADDR(_addr, _map) \
546 do { \
547  int _ret; \
548  _ret = fr_inet_pton(&_addr.inet.dst_ipaddr, _map->element[0]->str, _map->element[0]->len, AF_UNSPEC, true, true);\
549  fr_assert(_ret == 0);\
550  _addr.inet.dst_port = _map->element[1]->integer; \
551 } while (0)
552 #else
553 # define SET_ADDR(_addr, _map) \
554 do { \
555  fr_inet_pton(&_addr.inet.dst_ipaddr, _map->element[0]->str, _map->element[0]->len, AF_UNSPEC, true, true);\
556  _addr.inet.dst_port = _map->element[1]->integer; \
557 } while (0)
558 #endif
559 
560 #define SET_INACTIVE(_node) \
561 do { \
562  (_node)->is_active = false; \
563  (_node)->is_master = false; \
564  fr_rb_delete(cluster->used_nodes, _node); \
565  fr_fifo_push(cluster->free_nodes, _node); \
566 } while (0)
567 
568 #define SET_ACTIVE(_node) \
569 do { \
570  (_node)->is_active = true; \
571  fr_rb_insert(cluster->used_nodes, _node); \
572  fr_fifo_pop(cluster->free_nodes); \
573  active[(_node)->id] = true; \
574  rollback[r++] = (_node)->id; \
575 } while (0)
576 
577  fr_assert(reply->type == REDIS_REPLY_ARRAY);
578 
579  memset(&rollback, 0, sizeof(rollback));
580  memset(active, 0, sizeof(active));
581  memset(master, 0, sizeof(master));
582 
583  cluster->remapping = true;
584 
585  /*
586  * Must be cleared with the mutex held
587  */
588  memset(&cluster->key_slot_pending, 0, sizeof(cluster->key_slot_pending));
589 
590  /*
591  * Insert new nodes and markup the keyslot indexes
592  * in our temporary keyslot_array.
593  *
594  * A map consists of an array with the following indexes:
595  * [0] -> key_slot_start
596  * [1] -> key_slot_end
597  * [2] -> master_node
598  * [3..n] -> slave_node(s)
599  */
600  for (i = 0; i < reply->elements; i++) {
601  size_t j;
602  long long int k;
603  int slaves = 0;
604  fr_redis_cluster_node_t *found, *spare;
605  fr_redis_cluster_node_t find = {}; /* Initialise unused fields to stop Coverity complaints */
606  fr_redis_cluster_key_slot_t tmpl_slot;
607  redisReply *map = reply->element[i];
608 
609  memset(&tmpl_slot, 0, sizeof(tmpl_slot));
610 
611  SET_ADDR(find.addr, map->element[2]);
612  found = fr_rb_find(cluster->used_nodes, &find);
613  if (found) {
614  active[found->id] = true;
615  goto reuse_master_node;
616  }
617 
618  /*
619  * Process the master
620  *
621  * A master node consists of any array with the following indexes:
622  * [0] -> node ip (as string)
623  * [1] -> node port
624  */
625  spare = fr_fifo_peek(cluster->free_nodes);
626  if (!spare) {
627  out_of_nodes:
628  fr_strerror_const("Reached maximum connected nodes");
630  error:
631  cluster->remapping = false;
632  cluster->last_updated = fr_time();
633  /* Re-insert new nodes back into the free_nodes queue */
634  for (i = 0; i < r; i++) SET_INACTIVE(&cluster->node[rollback[i]]);
635  return rcode;
636  }
637 
638  spare->pending_addr = find.addr;
639  rcode = cluster_node_connect(cluster, spare);
640  if (rcode < 0) goto error;
641 
642  /*
643  * Check to see if the node we just configured
644  * already exists in the tree. If it does we
645  * use that, else we add it to the array of
646  * nodes to rollback on failure.
647  */
648  SET_ACTIVE(spare);
649  found = spare;
650 
651  reuse_master_node:
652  tmpl_slot.master = found->id;
653  master[found->id] = true; /* Mark this node as a master */
654 
655  /*
656  * Process the slaves
657  *
658  * A slave node consists of any array with the following indexes:
659  * [0] -> node ip (as string)
660  * [1] -> node port
661  */
662  for (j = 3; (j < map->elements); j++) {
663  SET_ADDR(find.addr, map->element[j]);
664  found = fr_rb_find(cluster->used_nodes, &find);
665  if (found) {
666  active[found->id] = true;
667  goto next;
668  }
669 
670  spare = fr_fifo_peek(cluster->free_nodes);
671  if (!spare) goto out_of_nodes;
672 
673  spare->pending_addr = find.addr;
674  if (cluster_node_connect(cluster, spare) < 0) continue; /* Slave failure is non-fatal */
675 
676  SET_ACTIVE(spare);
677  found = spare;
678 
679  next:
680  tmpl_slot.slave[slaves++] = found->id;
681 
682  /* Hit the maximum number of slaves we allow */
683  if (slaves >= MAX_SLAVES) break;
684  }
685  tmpl_slot.slave_num = slaves;
686 
687  /*
688  * Copy our tmpl key slot to each of the key slots
689  * specified by the range for this map.
690  */
691  for (k = map->element[0]->integer; k <= map->element[1]->integer; k++) {
692  memcpy(&cluster->key_slot_pending[k], &tmpl_slot,
693  sizeof(*(cluster->key_slot_pending)));
694  }
695  }
696 
697  /*
698  * Check for holes in the pending_addr key_slot array
699  *
700  * The cluster specification says that upon
701  * detecting a 'NULL' key_slot we should
702  * check again to see if the cluster error has
703  * been resolved, but seeing as we're in the
704  * middle of updating the cluster from very
705  * recent output of 'cluster slots' it's best to
706  * error out.
707  */
708  for (i = 0; i < KEY_SLOTS; i++) {
709  if (cluster->key_slot_pending[i].master == 0) {
710  fr_strerror_printf("Cluster is misconfigured, no node assigned for key %zu", i);
712  goto error;
713  }
714  }
715 
716  /*
717  * We have connections/pools for all the nodes in
718  * the new map, apply it to the live cluster.
719  *
720  * Other workers may be using the key slot table,
721  * but that's ok. Nodes and pools are never freed,
722  * so the worst that will happen, is they'll hit
723  * the wrong node for the key, and get redirected.
724  */
725  memcpy(&cluster->key_slot, &cluster->key_slot_pending, sizeof(cluster->key_slot));
726 
727  /*
728  * Anything not in the active set of nodes gets
729  * added back into the queue, to be re-used.
730  *
731  * We start at 1, as node 0 is reserved.
732  */
733  for (i = 1; i < cluster->conf->max_nodes; i++) {
734 #ifndef NDEBUG
736 
737  if (cluster->node[i].is_active) {
738  /* Sanity check for duplicates that are active */
739  found = fr_rb_find(cluster->used_nodes, &cluster->node[i]);
740  fr_assert(found);
741  fr_assert(found->is_active);
742  fr_assert(found->id == i);
743  }
744 #endif
745 
746  if (!active[i] && cluster->node[i].is_active) {
747  SET_INACTIVE(&cluster->node[i]); /* Sets is_master = false */
748 
749  /*
750  * Only change the masters once we've successfully
751  * remapped the cluster.
752  */
753  } else if (master[i]) {
754  cluster->node[i].is_master = true;
755  }
756  }
757 
758  cluster->remapping = false;
759  cluster->last_updated = fr_time();
760 
761  /*
762  * Sanity checks
763  */
764  fr_assert(((talloc_array_length(cluster->node) - 1) - fr_rb_num_elements(cluster->used_nodes)) ==
765  fr_fifo_num_elements(cluster->free_nodes));
766 
768 }
769 
770 /** Validate a cluster map node entry
771  *
772  * @note Errors may be retrieved with fr_strerror().
773  * @note In a separate function, as it's called for both master and slave nodes.
774  *
775  * @param[in] node we're validating.
776  * @param[in] map_idx we're processing.
777  * @param[in] node_idx we're processing.
778  * @return
779  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
780  * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
781  */
782 static int cluster_map_node_validate(redisReply *node, int map_idx, int node_idx)
783 {
784  fr_ipaddr_t ipaddr;
785 
786  if (node->type != REDIS_REPLY_ARRAY) {
787  fr_strerror_printf("Cluster map %i node %i is wrong type, expected array got %s",
788  map_idx, node_idx,
789  fr_table_str_by_value(redis_reply_types, node->element[1]->type, "<UNKNOWN>"));
791  }
792 
793  /*
794  * As per the redis docs: https://redis.io/commands/cluster-slots
795  *
796  * Newer versions of Redis Cluster will output, for each Redis instance,
797  * not just the IP and port, but also the node ID as third element of the
798  * array. In future versions there could be more elements describing the
799  * node better. In general a client implementation should just rely on
800  * the fact that certain parameters are at fixed positions as specified,
801  * but more parameters may follow and should be ignored.
802  * Similarly a client library should try if possible to cope with the fact
803  * that older versions may just have the IP and port parameter.
804  */
805  if (node->elements < 2) {
806  fr_strerror_printf("Cluster map %i node %i has incorrect number of elements, expected at least "
807  "2 got %zu", map_idx, node_idx, node->elements);
809  }
810 
811  if (node->element[0]->type != REDIS_REPLY_STRING) {
812  fr_strerror_printf("Cluster map %i node %i ip address is wrong type, expected string got %s",
813  map_idx, node_idx,
814  fr_table_str_by_value(redis_reply_types, node->element[0]->type, "<UNKNOWN>"));
816  }
817 
818  if (fr_inet_pton(&ipaddr, node->element[0]->str, node->element[0]->len, AF_UNSPEC, true, true) < 0) {
820  }
821 
822  if (node->element[1]->type != REDIS_REPLY_INTEGER) {
823  fr_strerror_printf("Cluster map %i node %i port is wrong type, expected integer got %s",
824  map_idx, node_idx,
825  fr_table_str_by_value(redis_reply_types, node->element[1]->type, "<UNKNOWN>"));
827  }
828 
829  if (node->element[1]->integer < 0) {
830  fr_strerror_printf("Cluster map %i node %i port is too low, expected >= 0 got %lli",
831  map_idx, node_idx, node->element[1]->integer);
833  }
834 
835  if (node->element[1]->integer > UINT16_MAX) {
836  fr_strerror_printf("Cluster map %i node %i port is too high, expected <= " STRINGIFY(UINT16_MAX)" "
837  "got %lli", map_idx, node_idx, node->element[1]->integer);
839  }
840 
842 }
843 
844 /** Learn a new cluster layout by querying the node that issued the -MOVE
845  *
846  * Also validates the response from the Redis cluster, so we can be sure that
847  * it's well formed, before doing more expensive operations.
848  *
849  * @note Errors may be retrieved with fr_strerror().
850  *
851  * @param[out] out Where to write cluster map.
852  * @param[in] conn to use for learning the new cluster map.
853  * @return
854  * - FR_REDIS_CLUSTER_RCODE_IGNORED if 'cluster slots' returned an error (indicating clustering not supported).
855  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
856  * - FR_REDIS_CLUSTER_RCODE_FAILED if issuing the command resulted in an error.
857  * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
858  * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
859  */
861 {
862  redisReply *reply;
863  size_t i = 0;
864 
865  *out = NULL;
866 
867  reply = redisCommand(conn->handle, "cluster slots");
868  switch (fr_redis_command_status(conn, reply)) {
870  fr_redis_reply_free(&reply);
871  fr_strerror_const("No connections available");
873 
874  case REDIS_RCODE_ERROR:
875  default:
876  if (reply && reply->type == REDIS_REPLY_ERROR) {
877  fr_strerror_printf("%.*s", (int)reply->len, reply->str);
878  fr_redis_reply_free(&reply);
880  }
881  fr_strerror_const("Unknown client error");
883 
884  case REDIS_RCODE_SUCCESS:
885  break;
886  }
887 
888  if (reply->type != REDIS_REPLY_ARRAY) {
889  fr_strerror_printf("Bad response to \"cluster slots\" command, expected array got %s",
890  fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
892  }
893 
894  /*
895  * Clustering configured but no slots set
896  */
897  if (reply->elements == 0) {
898  fr_strerror_printf("Empty response to \"cluster slots\" command (zero length array)");
900  }
901 
902  /*
903  * Validate the complete map set before returning.
904  */
905  for (i = 0; i < reply->elements; i++) {
906  size_t j;
907  redisReply *map;
908 
909  map = reply->element[i];
910  if (map->type != REDIS_REPLY_ARRAY) {
911  fr_strerror_printf("Cluster map %zu is wrong type, expected array got %s",
912  i, fr_table_str_by_value(redis_reply_types, map->type, "<UNKNOWN>"));
913  error:
914  fr_redis_reply_free(&reply);
916  }
917 
918  if (map->elements < 3) {
919  fr_strerror_printf("Cluster map %zu has too few elements, expected at least 3, got %zu",
920  i, map->elements);
921  goto error;
922  }
923 
924  /*
925  * Key slot start
926  */
927  if (map->element[0]->type != REDIS_REPLY_INTEGER) {
928  fr_strerror_printf("Cluster map %zu key slot start is wrong type, expected integer got %s",
929  i, fr_table_str_by_value(redis_reply_types, map->element[0]->type, "<UNKNOWN>"));
930  goto error;
931  }
932 
933  if (map->element[0]->integer < 0) {
934  fr_strerror_printf("Cluster map %zu key slot start is too low, expected >= 0 got %lli",
935  i, map->element[0]->integer);
936  goto error;
937  }
938 
939  if (map->element[0]->integer > KEY_SLOTS) {
940  fr_strerror_printf("Cluster map %zu key slot start is too high, expected <= "
941  STRINGIFY(KEY_SLOTS) " got %lli", i, map->element[0]->integer);
942  goto error;
943  }
944 
945  /*
946  * Key slot end
947  */
948  if (map->element[1]->type != REDIS_REPLY_INTEGER) {
949  fr_strerror_printf("Cluster map %zu key slot end is wrong type, expected integer got %s",
950  i, fr_table_str_by_value(redis_reply_types, map->element[1]->type, "<UNKNOWN>"));
951  goto error;
952  }
953 
954  if (map->element[1]->integer < 0) {
955  fr_strerror_printf("Cluster map %zu key slot end is too low, expected >= 0 got %lli",
956  i, map->element[1]->integer);
957  goto error;
958  }
959 
960  if (map->element[1]->integer > KEY_SLOTS) {
961  fr_strerror_printf("Cluster map %zu key slot end is too high, expected <= "
962  STRINGIFY(KEY_SLOTS) " got %lli", i, map->element[1]->integer);
963  goto error;
964  }
965 
966  if (map->element[1]->integer < map->element[0]->integer) {
967  fr_strerror_printf("Cluster map %zu key slot start/end out of order. "
968  "Start was %lli, end was %lli", i, map->element[0]->integer,
969  map->element[1]->integer);
970  goto error;
971  }
972 
973  /*
974  * Master node
975  */
976  if (cluster_map_node_validate(map->element[2], i, 0) < 0) goto error;
977 
978  /*
979  * Slave nodes
980  */
981  for (j = 3; j < map->elements; j++) {
982  if (cluster_map_node_validate(map->element[j], i, j - 2) < 0) goto error;
983  }
984  }
985  *out = reply;
986 
988 }
989 
990 /** Perform a runtime remap of the cluster
991  *
992  * @note Errors may be retrieved with fr_strerror().
993  * @note Must be called with the cluster mutex free.
994  *
995  * @param[in] request The current request.
996  * @param[in,out] cluster to remap.
997  * @param[in] conn to use to query the cluster.
998  * @return
999  * - FR_REDIS_CLUSTER_RCODE_IGNORED if 'cluster slots' returned an error (indicating clustering not supported).
1000  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
1001  * - FR_REDIS_CLUSTER_RCODE_FAILED if issuing the 'cluster slots' command resulted in a protocol error.
1002  * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
1003  * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
1004  */
1006 {
1007  fr_time_t now;
1008  redisReply *map;
1010  size_t i, j;
1011 
1012  /*
1013  * If the cluster was remapped very recently, or is being
1014  * remapped it's unlikely that it needs remapping again.
1015  */
1016  if (cluster->remapping) {
1017  in_progress:
1018  ROPTIONAL(RDEBUG2, DEBUG2, "Cluster remapping in progress, ignoring remap request");
1020  }
1021 
1022  /*
1023  * The remap times are _our_ times, not the _request_ time.
1024  */
1025  now = fr_time();
1026  if (fr_time_to_sec(now) == fr_time_to_sec(cluster->last_updated)) {
1027  too_soon:
1028  ROPTIONAL(RWARN, WARN, "Cluster was updated less than a second ago, ignoring remap request");
1030  }
1031 
1032  ROPTIONAL(RINFO, INFO, "Initiating cluster remap");
1033 
1034  /*
1035  * Get new cluster information
1036  */
1037  ret = cluster_map_get(&map, conn);
1038  switch (ret) {
1039  case FR_REDIS_CLUSTER_RCODE_BAD_INPUT: /* Validation error */
1040  case FR_REDIS_CLUSTER_RCODE_NO_CONNECTION: /* Connection error */
1041  case FR_REDIS_CLUSTER_RCODE_FAILED: /* Error issuing command */
1042  return ret;
1043 
1044  case FR_REDIS_CLUSTER_RCODE_IGNORED: /* Clustering not enabled, or not supported */
1045  cluster->remap_needed = false;
1047 
1048  case FR_REDIS_CLUSTER_RCODE_SUCCESS: /* Success */
1049  break;
1050  }
1051 
1052  /*
1053  * Print the mapping we received
1054  */
1055  ROPTIONAL(RINFO, INFO, "Cluster map consists of %zu key ranges", map->elements);
1056  for (i = 0; i < map->elements; i++) {
1057  redisReply *map_node = map->element[i];
1058 
1059  ROPTIONAL(RINFO, INFO, "%zu - keys %lli-%lli", i,
1060  map_node->element[0]->integer,
1061  map_node->element[1]->integer);
1062 
1063  if (request) RINDENT();
1064  ROPTIONAL(RINFO, INFO, "master: %s:%lli",
1065  map_node->element[2]->element[0]->str,
1066  map_node->element[2]->element[1]->integer);
1067  for (j = 3; j < map_node->elements; j++) {
1068  ROPTIONAL(RINFO, INFO, "slave%zu: %s:%lli", j - 3,
1069  map_node->element[j]->element[0]->str,
1070  map_node->element[j]->element[1]->integer);
1071  }
1072  if (request) REXDENT();
1073  }
1074 
1075  /*
1076  * Check again that the cluster isn't being
1077  * remapped, or was remapped too recently,
1078  * now we hold the mutex and the state of
1079  * those variables is synchronized.
1080  */
1081  pthread_mutex_lock(&cluster->mutex);
1082  if (cluster->remapping) {
1083  pthread_mutex_unlock(&cluster->mutex);
1084  fr_redis_reply_free(&map); /* Free the map */
1085  goto in_progress;
1086  }
1087  if (fr_time_to_sec(now) == fr_time_to_sec(cluster->last_updated)) {
1088  pthread_mutex_unlock(&cluster->mutex);
1089  fr_redis_reply_free(&map); /* Free the map */
1090  goto too_soon;
1091  }
1092  ret = cluster_map_apply(cluster, map);
1093  if (ret == FR_REDIS_CLUSTER_RCODE_SUCCESS) cluster->remap_needed = false; /* Change on successful remap */
1094  pthread_mutex_unlock(&cluster->mutex);
1095 
1096  fr_redis_reply_free(&map); /* Free the map */
1097  if (ret < 0) return FR_REDIS_CLUSTER_RCODE_FAILED;
1098 
1100 }
1101 
1102 /** Retrieve or associate a node with the server indicated in the redirect
1103  *
1104  * @note Errors may be retrieved with fr_strerror().
1105  *
1106  * @param[out] out Where to write the node representing the redirect server.
1107  * @param[in] cluster to draw node from.
1108  * @param[in] reply Redis reply containing the redirect information.
1109  * @return
1110  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
1111  * - FR_REDIS_CLUSTER_RCODE_FAILED no more nodes available.
1112  * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
1113  * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
1114  */
1116 {
1117  fr_redis_cluster_node_t find, *found, *spare;
1118  fr_redis_conn_t *rconn;
1119 
1120  uint16_t key;
1121 
1122  memset(&find, 0, sizeof(find));
1123 
1124  *out = NULL;
1125 
1126  if (cluster_node_conf_from_redirect(&key, &find.addr, reply) < 0) return FR_REDIS_CLUSTER_RCODE_FAILED;
1127 
1128  pthread_mutex_lock(&cluster->mutex);
1129  /*
1130  * If we have already have a pool for the
1131  * host we were redirected to, use that.
1132  */
1133  found = fr_rb_find(cluster->used_nodes, &find);
1134  if (found) {
1135  /* We have the new pool, don't need to hold the lock */
1136  pthread_mutex_unlock(&cluster->mutex);
1137  *out = found;
1139  }
1140 
1141  /*
1142  * Otherwise grab a free node and try and connect
1143  * it to the server we were redirected to.
1144  */
1145  spare = fr_fifo_peek(cluster->free_nodes);
1146  if (!spare) {
1147  fr_strerror_const("Reached maximum connected nodes");
1148  pthread_mutex_unlock(&cluster->mutex);
1150  }
1151  spare->pending_addr = find.addr; /* Set the config to be applied */
1152  if (cluster_node_connect(cluster, spare) < 0) {
1153  pthread_mutex_unlock(&cluster->mutex);
1155  }
1156  fr_rb_insert(cluster->used_nodes, spare);
1157  fr_fifo_pop(cluster->free_nodes);
1158  found = spare;
1159 
1160  /* We have the new pool, don't need to hold the lock */
1161  pthread_mutex_unlock(&cluster->mutex);
1162 
1163  /*
1164  * Determine if we can establish a connection to
1165  * the new pool, to check if it's viable.
1166  */
1167  rconn = fr_pool_connection_get(found->pool, NULL);
1168  if (!rconn) {
1169  /*
1170  * To prevent repeated misconfigurations
1171  * using all free nodes, add the node
1172  * back to the spare queue if this
1173  * was the first connection attempt and
1174  * it failed.
1175  */
1176  pthread_mutex_lock(&cluster->mutex);
1177  fr_fifo_push(cluster->free_nodes, spare);
1178  pthread_mutex_unlock(&cluster->mutex);
1179 
1180  fr_strerror_const("No connections available");
1182  }
1183  fr_pool_connection_release(found->pool, NULL, rconn);
1184  *out = found;
1185 
1187 }
1188 
1189 
1190 /** Try to determine the health of a cluster node passively by examining its pool state
1191  *
1192  * Returns an integer value representing the likelihood that the pool is live.
1193  * Range is between 1 and 11,000.
1194  *
1195  * If a weight of 1 is returned, connections from the pool should be checked
1196  * (by pinging) before use.
1197  *
1198  * @param now The current time.
1199  * @param state of the connection pool.
1200  * @return
1201  * - 1 the pool is very likely to be bad.
1202  * - 2-11000 the pool is likely to be good, with a higher number
1203  * indicating higher probability of liveness.
1204  */
1206 {
1207  /*
1208  * Failed spawn recently, probably bad
1209  */
1211 
1212  /*
1213  * Closed recently, probably bad
1214  */
1216 
1217  /*
1218  * Released too long ago, don't know
1219  */
1221 
1222  /*
1223  * Released not long ago, might be ok.
1224  */
1226 }
1227 
1228 /** Issue a ping request against a cluster node
1229  *
1230  * Establishes whether the connection to the node we have is live.
1231  *
1232  * @param request The current request.
1233  * @param node to ping.
1234  * @param conn the connection to ping on.
1235  * @return
1236  * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT if we got a bad response.
1237  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
1238  * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION on connection down.
1239  */
1241 {
1242  redisReply *reply;
1243  fr_redis_rcode_t rcode;
1244 
1245  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Executing command: PING", node->id);
1246  reply = redisCommand(conn->handle, "PING");
1247  rcode = fr_redis_command_status(conn, reply);
1248  if (rcode != REDIS_RCODE_SUCCESS) {
1249  ROPTIONAL(RPERROR, PERROR, "[%i] PING failed to %s:%i", node->id, node->name, node->addr.inet.dst_port);
1250  fr_redis_reply_free(&reply);
1252  }
1253 
1254  if (reply->type != REDIS_REPLY_STATUS) {
1255  ROPTIONAL(RERROR, ERROR, "[%i] Bad PING response from %s:%i, expected status got %s",
1256  node->id, node->name, node->addr.inet.dst_port,
1257  fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
1258  fr_redis_reply_free(&reply);
1260  }
1261 
1262  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Got response: %s", node->id, reply->str);
1263  fr_redis_reply_free(&reply);
1265 }
1266 
1267 /** Attempt to find a live pool in the cluster
1268  *
1269  * The intent here is to find pools/nodes where a connection was released the shortest
1270  * time ago. Having a connection be released (vs closed) indicates that the pool is live.
1271  *
1272  * We don't want to have all workers try and grab a connection to this node however, as it
1273  * may still be dead (we don't know).
1274  *
1275  * So we use an inverse transform sample, to weight the nodes, based on time between now
1276  * and when the connection was released. Connections released closest to the current
1277  * time are given a higher weighting.
1278  *
1279  * Weight range is between 1 - 11,000.
1280  *
1281  * - If released > 10.0 seconds ago,information is not valid, weight 500.
1282  * - If closed < 10.0 seconds ago, it's a bad pool, weight 1.
1283  * - If spawn failed < 10.0 seconds ago, it's a bad pool, weight 1.
1284  * - If a connection was released 0.0 seconds ago, weight 11,000.
1285  * - If a connection was released 10.0 seconds ago, weight 1000.
1286  *
1287  * Using the above algorithm we use the experience of other workers using the cluster to
1288  * inform our alternative node selection.
1289  *
1290  * Suggestions on improving live node selection appreciated.
1291  *
1292  * Inverse transform sampling based roughly on the solution from this post:
1293  * http://stackoverflow.com/questions/17250568/randomly-choosing-from-a-list-with-weighted-probabilities
1294  *
1295  * Wikipedia page here:
1296  * https://en.wikipedia.org/wiki/Inverse_transform_sampling
1297  *
1298  * @note Must be called with the cluster mutex free.
1299  *
1300  * @param[out] live_node we found.
1301  * @param[out] live_conn to that node.
1302  * @param[in] request The current request (used for logging).
1303  * @param[in] cluster to search for live pools in.
1304  * @param[in] skip this node (it's bad).
1305  * @return 0 (iterates over the whole tree).
1306  */
1308  request_t *request, fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *skip)
1309 {
1310  uint32_t i;
1311 
1312  cluster_nodes_live_t *live;
1313  fr_time_t now;
1314  fr_rb_iter_inorder_t iter;
1316 
1317  ROPTIONAL(RDEBUG2, DEBUG2, "Searching for live cluster nodes");
1318 
1319  if (fr_rb_num_elements(cluster->used_nodes) == 1) {
1320  no_alts:
1321  ROPTIONAL(RERROR, ERROR, "No alternative nodes available");
1322  return -1;
1323  }
1324 
1325  live = talloc_zero(NULL, cluster_nodes_live_t); /* Too big for stack */
1326  live->skip = skip->id;
1327 
1328  pthread_mutex_lock(&cluster->mutex);
1329  for (node = fr_rb_iter_init_inorder(&iter, cluster->used_nodes);
1330  node;
1331  node = fr_rb_iter_next_inorder(&iter)) {
1332  fr_assert(node->pool);
1333  if (live->skip == node->id) continue; /* Skip dead nodes */
1334 
1335  live->node[live->next].pool_state = fr_pool_state(node->pool);
1336  live->node[live->next++].id = node->id;
1337  }
1338  pthread_mutex_unlock(&cluster->mutex);
1339 
1340  fr_assert(live->next); /* There should be at least one */
1341  if (live->next == 1) goto no_alts; /* Weird, but conceivable */
1342 
1343  now = fr_time();
1344 
1345  /*
1346  * Weighted random selection
1347  */
1348  for (i = 0; (i < cluster->conf->max_alt) && live->next; i++) {
1349  fr_redis_conn_t *conn;
1350  uint8_t j;
1351  int first, last, pivot; /* Must be signed for BS */
1352  unsigned int find, cumulative = 0;
1353 
1354  ROPTIONAL(RDEBUG3, DEBUG2, "(Re)assigning node weights:");
1355  if (request) RINDENT();
1356  for (j = 0; j < live->next; j++) {
1357  int weight;
1358 
1359  weight = cluster_node_pool_health(now, live->node[j].pool_state);
1360  ROPTIONAL(RDEBUG3, DEBUG3, "Node %i weight: %i", live->node[j].id, weight);
1361  live->node[j].cumulative = (cumulative += weight);
1362  }
1363  if (request) REXDENT();
1364 
1365  /*
1366  * Select a node at random
1367  */
1368  find = (fr_rand() & (cumulative - 1)); /* Between 1 and total */
1369  first = 0;
1370  last = live->next - 1;
1371  pivot = (first + last) / 2;
1372 
1373  while (first <= last) {
1374  if (live->node[pivot].cumulative < find) {
1375  first = pivot + 1;
1376  } else if (live->node[pivot].cumulative == find) {
1377  break;
1378  } else {
1379  last = pivot - 1;
1380  }
1381  pivot = (first + last) / 2;
1382  }
1383  /*
1384  * Round up...
1385  */
1386  if (first > last) pivot = last + 1;
1387 
1388  /*
1389  * Resolve the index to the actual node. We use IDs
1390  * to save memory...
1391  */
1392  node = &cluster->node[live->node[pivot].id];
1393  fr_assert(live->node[pivot].id == node->id);
1394 
1395  ROPTIONAL(RDEBUG2, DEBUG2, "Selected node %i (using random value %i)", node->id, find);
1396  conn = fr_pool_connection_get(node->pool, request);
1397  if (!conn) {
1398  ROPTIONAL(RERROR, ERROR, "No connections available to node %i %s:%i", node->id,
1399  node->name, node->addr.inet.dst_port);
1400  next:
1401  /*
1402  * Remove the node we just discovered was bad
1403  * out of the set of nodes we're selecting over.
1404  */
1405  if (pivot == live->next) {
1406  live->next--;
1407  continue;
1408  }
1409  memcpy(&live->node[pivot], &live->node[live->next - 1], sizeof(live->node[pivot]));
1410  live->next--;
1411  continue;
1412  }
1413 
1414  /*
1415  * PING! PONG?
1416  */
1417  switch (cluster_node_ping(request, node, conn)) {
1419  break;
1420 
1422  fr_pool_connection_close(node->pool, request, conn);
1423  goto next;
1424 
1425  default:
1426  fr_pool_connection_release(node->pool, request, conn);
1427  goto next;
1428  }
1429 
1430  *live_node = node;
1431  *live_conn = conn;
1432  talloc_free(live);
1433 
1434  return 0;
1435  }
1436 
1437  ROPTIONAL(RERROR, ERROR, "Hit max alt limit %i, and no live connections found", cluster->conf->max_alt);
1438  talloc_free(live);
1439 
1440  return -1;
1441 }
1442 
1443 /** Callback for freeing a Redis connection
1444  *
1445  * @param[in] conn to free.
1446  * @return 0.
1447  */
1449 {
1450  redisFree(conn->handle);
1451 
1452  return 0;
1453 }
1454 
1455 /** Create a new connection to a Redis node
1456  *
1457  * @param[in] ctx to allocate connection structure in. Will be freed at the same time as the pool.
1458  * @param[in] instance data of type #fr_redis_cluster_node_t. Holds parameters for establishing new connection.
1459  * @param[in] timeout The maximum time allowed to complete the connection.
1460  * @return
1461  * - New #fr_redis_conn_t on success.
1462  * - NULL on failure.
1463  */
1464 void *fr_redis_cluster_conn_create(TALLOC_CTX *ctx, void *instance, fr_time_delta_t timeout)
1465 {
1466  fr_redis_cluster_node_t *node = instance;
1467  fr_redis_conn_t *conn = NULL;
1468  redisContext *handle;
1469  redisReply *reply = NULL;
1470  char const *log_prefix = node->cluster->log_prefix;
1471 
1472  DEBUG2("%s - [%i] Connecting to node %s:%i", log_prefix, node->id, node->name, node->addr.inet.dst_port);
1473 
1474  handle = redisConnectWithTimeout(node->name, node->addr.inet.dst_port, fr_time_delta_to_timeval(timeout));
1475  if ((handle != NULL) && handle->err) {
1476  ERROR("%s - [%i] Connection failed: %s", log_prefix, node->id, handle->errstr);
1477  redisFree(handle);
1478  return NULL;
1479  } else if (!handle) {
1480  ERROR("%s - [%i] Connection failed", log_prefix, node->id);
1481  return NULL;
1482  }
1483 
1484  conn = talloc_zero(ctx, fr_redis_conn_t);
1485  conn->handle = handle;
1486  conn->node = node;
1487  talloc_set_destructor(conn, _cluster_conn_free);
1488 
1489 #ifdef HAVE_REDIS_SSL
1490  if (node->cluster->ssl_ctx != NULL) {
1491  fr_tls_session_t *tls_session = fr_tls_session_alloc_client(conn, node->cluster->ssl_ctx);
1492  if (!tls_session) {
1493  fr_tls_strerror_printf("%s - [%i]", log_prefix, node->id);
1494  ERROR("%s - [%i] Failed to allocate TLS session", log_prefix, node->id);
1495  talloc_free(conn);
1496  return NULL;
1497  }
1498 
1499  // redisInitiateSSL() takes ownership of SSL object on success
1500  SSL_up_ref(tls_session->ssl);
1501  if (redisInitiateSSL(handle, tls_session->ssl) != REDIS_OK) {
1502  ERROR("%s - [%i] Failed to initiate SSL: %s", log_prefix, node->id, handle->errstr);
1503  SSL_free(tls_session->ssl);
1504  talloc_free(conn);
1505  return NULL;
1506  }
1507  }
1508 #endif
1509 
1510  if (node->cluster->conf->password) {
1511  if (node->cluster->conf->username) {
1512  DEBUG3("%s - [%i] Executing: AUTH %s %s", log_prefix, node->id,
1513  node->cluster->conf->username,
1514  node->cluster->conf->password);
1515  reply = redisCommand(handle, "AUTH %s %s",
1516  node->cluster->conf->username,
1517  node->cluster->conf->password);
1518  } else {
1519  DEBUG3("%s - [%i] Executing: AUTH %s", log_prefix, node->id, node->cluster->conf->password);
1520  reply = redisCommand(handle, "AUTH %s", node->cluster->conf->password);
1521  }
1522  if (!reply) {
1523  ERROR("%s - [%i] Failed authenticating: %s", log_prefix, node->id, handle->errstr);
1524  error:
1525  if (reply) fr_redis_reply_free(&reply);
1526  talloc_free(conn);
1527  return NULL;
1528  }
1529 
1530  switch (reply->type) {
1531  case REDIS_REPLY_STATUS:
1532  if (strcmp(reply->str, "OK") != 0) {
1533  ERROR("%s - [%i] Failed authenticating: %s", log_prefix,
1534  node->id, reply->str);
1535  goto error;
1536  }
1537  fr_redis_reply_free(&reply);
1538  break; /* else it's OK */
1539 
1540  case REDIS_REPLY_ERROR:
1541  ERROR("%s - [%i] Failed authenticating: %s", log_prefix, node->id, reply->str);
1542  goto error;
1543 
1544  default:
1545  ERROR("%s - [%i] Unexpected reply of type %s to AUTH", log_prefix, node->id,
1546  fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
1547  goto error;
1548  }
1549  }
1550 
1551  if (node->cluster->conf->database) {
1552  DEBUG3("%s - [%i] Executing: SELECT %i", log_prefix, node->id, node->cluster->conf->database);
1553  reply = redisCommand(handle, "SELECT %i", node->cluster->conf->database);
1554  if (!reply) {
1555  ERROR("%s - [%i] Failed selecting database %i: %s", log_prefix, node->id,
1556  node->cluster->conf->database, handle->errstr);
1557  goto error;
1558  }
1559 
1560  switch (reply->type) {
1561  case REDIS_REPLY_STATUS:
1562  if (strcmp(reply->str, "OK") != 0) {
1563  ERROR("%s - [%i] Failed selecting database %i: %s", log_prefix, node->id,
1564  node->cluster->conf->database, reply->str);
1565  goto error;
1566  }
1567  fr_redis_reply_free(&reply);
1568  break; /* else it's OK */
1569 
1570  case REDIS_REPLY_ERROR:
1571  ERROR("%s - [%i] Failed selecting database %i: %s", log_prefix, node->id,
1572  node->cluster->conf->database, reply->str);
1573  goto error;
1574 
1575  default:
1576  ERROR("%s - [%i] Unexpected reply of type %s, to SELECT", log_prefix, node->id,
1577  fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
1578  goto error;
1579  }
1580  }
1581 
1582  return conn;
1583 }
1584 
1585 /** Implements the key slot selection scheme used by freeradius
1586  *
1587  * Like the scheme in the clustering specification but with some differences
1588  * if the key is NULL or zero length, then a random keyslot is chosen.
1589  *
1590  * If there's only a single node in the cluster, then we avoid the CRC16
1591  * and just use key slot 0.
1592  *
1593  * @param cluster to determine key slot for.
1594  * @param request The current request.
1595  * @param key the key to resolve.
1596  * @param key_len the length of the key.
1597  * @return pointer to key slot key resolves to.
1598  */
1600  uint8_t const *key, size_t key_len)
1601 {
1602  fr_redis_cluster_key_slot_t *key_slot;
1603 
1604  if (!key || (key_len == 0)) {
1605  key_slot = &cluster->key_slot[(uint16_t)(fr_rand() & (KEY_SLOTS - 1))];
1606  ROPTIONAL(RDEBUG2, DEBUG2, "Key rand() -> slot %zu", key_slot - cluster->key_slot);
1607 
1608  return key_slot;
1609  }
1610 
1611  /*
1612  * Avoid CRC16 if we're operating with one cluster node or
1613  * without clustering.
1614  */
1615  if (fr_rb_num_elements(cluster->used_nodes) > 1) {
1616  key_slot = &cluster->key_slot[cluster_key_hash(key, key_len)];
1617  ROPTIONAL(RDEBUG2, DEBUG2, "Key \"%pV\" -> slot %zu",
1618  fr_box_strvalue_len((char const *)key, key_len), key_slot - cluster->key_slot);
1619 
1620  return key_slot;
1621  }
1622  ROPTIONAL(RDEBUG3, DEBUG3, "Single node available, skipping key selection");
1623 
1624  return &cluster->key_slot[0];
1625 }
1626 
1627 /** Return the master node that would be used for a particular key
1628  *
1629  * @param[in] cluster To resolve key in.
1630  * @param[in] key_slot to resolve to node.
1631  * @return
1632  * - The current master node.
1633  * - NULL if no master node is currently assigned to a particular key slot.
1634  */
1636  fr_redis_cluster_key_slot_t const *key_slot)
1637 {
1638  return &cluster->node[key_slot->master];
1639 }
1640 
1641 /** Return the slave node that would be used for a particular key
1642  *
1643  * @param[in] cluster To resolve key in.
1644  * @param[in] key_slot To resolve to node.
1645  * @param[in] slave_num 0..n.
1646  * @return
1647  * - A slave node.
1648  * - NULL if no slave node is assigned, or is at the specific key slot.
1649  *
1650  */
1652  fr_redis_cluster_key_slot_t const *key_slot,
1653  uint8_t slave_num)
1654 {
1655  if (slave_num >= key_slot->slave_num) return NULL; /* No slave available */
1656 
1657  return &cluster->node[key_slot->slave[slave_num]];
1658 }
1659 
1660 /** Return the ipaddr of a particular node
1661  *
1662  * @param[out] out Ipaddr of the node.
1663  * @param[in] node to get ip address from.
1664  * @return
1665  * - 0 on success.
1666  * - -1 on failure (node is NULL).
1667  */
1669 {
1670  if (!node) return -1;
1671 
1672  memcpy(out, &node->addr.inet.dst_ipaddr, sizeof(*out));
1673 
1674  return 0;
1675 }
1676 
1677 /** Return the port of a particular node
1678  *
1679  * @param[out] out Port of the node.
1680  * @param[in] node to get ip address from.
1681  * @return
1682  * - 0 on success.
1683  * - -1 on failure (node is NULL).
1684  */
1686 {
1687  if (!node) return -1;
1688 
1689  *out = node->addr.inet.dst_port;
1690 
1691  return 0;
1692 }
1693 
1694 /** Resolve a key to a pool, and reserve a connection in that pool
1695  *
1696  * This should be used with #fr_redis_cluster_state_next, and #fr_redis_command_status, to
1697  * transparently locate the cluster node we need to perform the operation on.
1698  *
1699  * Example code below shows how this function is used in conjunction
1700  * with #fr_redis_cluster_state_next to follow redirects, and reconnect handles.
1701  *
1702  @code{.c}
1703  int s_ret;
1704  redis_conn_state state;
1705  fr_redis_conn_t *conn;
1706  redisReply *reply;
1707  fr_redis_rcode_t status;
1708 
1709  for (s_ret = fr_redis_cluster_state_init(&state, &conn, cluster, key, key_len, false);
1710  s_ret == REDIS_RCODE_TRY_AGAIN,
1711  s_ret = fr_redis_cluster_state_next(&state, &conn, cluster, request, status, &reply)) {
1712  reply = redisCommand(conn->handle, "SET foo bar");
1713  status = fr_redis_command_status(conn, reply);
1714  }
1715  // Reply is freed if ret == REDIS_RCODE_TRY_AGAIN, but left in all other cases to allow error
1716  // processing, or extraction of results.
1717  fr_redis_reply_free(&reply);
1718  if (s_ret != REDIS_RCODE_SUCCESS) {
1719  // Error
1720  }
1721  // Success
1722  @endcode
1723  *
1724  * @param[out] state to track current pool and various counters, will be initialised.
1725  * @param[out] conn Where to write the reserved connection to.
1726  * @param[in] cluster of pools.
1727  * @param[in] request The current request.
1728  * @param[in] key to resolve to a cluster node/pool. If no key is NULL or key_len is 0 a random
1729  * slot will be chosen.
1730  * @param[in] key_len Length of the key.
1731  * @param[in] read_only If true, will use random slave pool in preference to the master, falling
1732  * back to the master if no slaves are available.
1733  * @return
1734  * - REDIS_RCODE_TRY_AGAIN - try your command with this connection (provided via command).
1735  * - REDIS_RCODE_RECONNECT - when no additional connections available.
1736  */
1738  fr_redis_cluster_t *cluster, request_t *request,
1739  uint8_t const *key, size_t key_len, bool read_only)
1740 {
1742  fr_redis_cluster_key_slot_t const *key_slot;
1743  uint8_t first, i;
1744  uint64_t used_nodes;
1745 
1746  fr_assert(cluster);
1747  fr_assert(state);
1748  fr_assert(conn);
1749 
1750  memset(state, 0, sizeof(*state));
1751  *conn = NULL; /* Better safe than exploding */
1752 
1753  used_nodes = fr_rb_num_elements(cluster->used_nodes);
1754  if (used_nodes == 0) {
1755  ROPTIONAL(REDEBUG, ERROR, "No nodes in cluster");
1756  return REDIS_RCODE_RECONNECT;
1757  }
1758 
1759 again:
1760  key_slot = fr_redis_cluster_slot_by_key(cluster, request, key, key_len);
1761 
1762  /*
1763  * 1. Try each of the slaves for the key slot
1764  * 2. Fall through to trying the master, and a single alternate node.
1765  */
1766  if (read_only) {
1767  first = fr_rand() & key_slot->slave_num;
1768  for (i = 0; i < key_slot->slave_num; i++) {
1769  uint8_t node_id;
1770 
1771  node_id = key_slot->slave[(first + i) % key_slot->slave_num];
1772  node = &cluster->node[node_id];
1773  *conn = fr_pool_connection_get(node->pool, request);
1774  if (!*conn) {
1775  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] No connections available (key slot %zu slave %i)",
1776  node->id, key_slot - cluster->key_slot, (first + i) % key_slot->slave_num);
1777  cluster->remap_needed = true;
1778  continue; /* Continue until we find a live pool */
1779  }
1780 
1781  goto finish;
1782  }
1783  /* Fall through to using key slot master or alternate */
1784  }
1785 
1786  /*
1787  * 1. Try the master for the key slot
1788  * 2. If unavailable search for any pools with handles available
1789  * 3. If there are no pools, or we can't reserve a handle,
1790  * give up.
1791  */
1792  node = &cluster->node[key_slot->master];
1793  *conn = fr_pool_connection_get(node->pool, request);
1794  if (!*conn) {
1795  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] No connections available (key slot %zu master)",
1796  node->id, key_slot - cluster->key_slot);
1797  cluster->remap_needed = true;
1798 
1799  if (cluster_node_find_live(&node, conn, request, cluster, node) < 0) return REDIS_RCODE_RECONNECT;
1800  }
1801 
1802 finish:
1803  /*
1804  * Something set the remap_needed flag, and we have a live connection
1805  */
1806  if (cluster->remap_needed) {
1807  if (fr_redis_cluster_remap(request, cluster, *conn) == FR_REDIS_CLUSTER_RCODE_SUCCESS) {
1808  fr_pool_connection_release(node->pool, request, *conn);
1809  goto again; /* New map, try again */
1810  }
1811  ROPTIONAL(RPDEBUG2, PDEBUG2, "%s", "");
1812  }
1813 
1814  state->node = node;
1815  state->key = key;
1816  state->key_len = key_len;
1817 
1818  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] >>> Sending command(s) to %s:%i",
1819  state->node->id, state->node->name, state->node->addr.inet.dst_port);
1820 
1821  return REDIS_RCODE_TRY_AGAIN;
1822 }
1823 
1824 /** Get the next connection to attempt a command against
1825  *
1826  * Will process reconnect and redirect states performing the actions necessary.
1827  *
1828  * - May trigger a cluster remap on receiving a #REDIS_RCODE_MOVE status.
1829  * - May perform a temporary redirect on receiving a #REDIS_RCODE_ASK status.
1830  * - May reserve a new connection on receiving a #REDIS_RCODE_RECONNECT status.
1831  *
1832  * If a remap is in progress, has occurred within the last second, has recently failed,
1833  * or fails, the '-MOVE' will be treated as a temporary redirect (-ASK).
1834  *
1835  * This allows the server to be more responsive during remaps, as unless the worker has been
1836  * redirected to a node we don't currently have a pool for, it can grab a connection for the
1837  * node it was redirected to, and continue.
1838  *
1839  * @note Irrespective of return code, the connection passed via conn will be released,
1840  * A new connection to attempt command on will be provided via conn.
1841  *
1842  * @note reply will be automatically freed and set to NULL if a new connection is provided
1843  * in all other cases, the caller is responsible for freeing the reply.
1844  *
1845  * @param[in,out] state containing the current pool, and various counters which control
1846  * retries, and limit redirects.
1847  * @param[in,out] conn we received the '-ASK' or '-MOVE' redirect on. Will be replaced with a
1848  * connection in the new pool the key points to.
1849  * @param[in] request The current request.
1850  * @param[in] cluster of pools.
1851  * @param[in] status of the last command, must be #REDIS_RCODE_MOVE or #REDIS_RCODE_ASK.
1852  * @param[in] reply from last command. Freed if 0 is returned, else caller must free.
1853  * @return
1854  * - REDIS_RCODE_SUCCESS - on success.
1855  * - REDIS_RCODE_TRY_AGAIN - try new connection (provided via conn). Will free reply.
1856  * - REDIS_RCODE_ERROR - on failure or command error.
1857  * - REDIS_RCODE_RECONNECT - when no additional connections available.
1858  */
1860  fr_redis_cluster_t *cluster, request_t *request,
1861  fr_redis_rcode_t status, redisReply **reply)
1862 {
1863  fr_assert(state && state->node && state->node->pool);
1864  fr_assert(conn && *conn);
1865 
1866  if (*reply) fr_redis_reply_print(L_DBG_LVL_3, *reply, request, 0);
1867 
1868  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] <<< Returned: %s",
1869  state->node->id, fr_table_str_by_value(redis_rcodes, status, "<UNKNOWN>"));
1870 
1871  /*
1872  * Caller indicated we should close the connection
1873  */
1874  if (state->close_conn) {
1875  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Connection no longer viable, closing it", state->node->id);
1876  fr_pool_connection_close(state->node->pool, request, *conn);
1877  *conn = NULL;
1878  state->close_conn = false;
1879  }
1880 
1881  /*
1882  * If we have a proven live connection, and something
1883  * has set the remap_needed flag, do that now before
1884  * releasing the connection.
1885  */
1886  if (cluster->remap_needed && *conn) switch(status) {
1887  case REDIS_RCODE_MOVE: /* We're going to remap anyway */
1888  case REDIS_RCODE_RECONNECT: /* The connection's dead */
1889  break;
1890 
1891  default:
1892  /*
1893  * Remap the cluster. On success, will clear the
1894  * remap_needed flag.
1895  */
1896  if (fr_redis_cluster_remap(request, cluster, *conn) != FR_REDIS_CLUSTER_RCODE_SUCCESS)
1897  ROPTIONAL(RPDEBUG2, PDEBUG2, "%s", "");
1898  }
1899 
1900  /*
1901  * Check the result of the last redis command, and do
1902  * something appropriate.
1903  */
1904  switch (status) {
1905  case REDIS_RCODE_SUCCESS:
1906  fr_pool_connection_release(state->node->pool, request, *conn);
1907  *conn = NULL;
1908  return REDIS_RCODE_SUCCESS;
1909 
1910  /*
1911  * Command error, not fixable.
1912  */
1913  case REDIS_RCODE_NO_SCRIPT:
1914  case REDIS_RCODE_ERROR:
1915  ROPTIONAL(RPEDEBUG, PERROR, "[%i] Command failed", state->node->id);
1916  fr_pool_connection_release(state->node->pool, request, *conn);
1917  *conn = NULL;
1918  return REDIS_RCODE_ERROR;
1919 
1920  /*
1921  * Cluster's unstable, try again.
1922  */
1923  case REDIS_RCODE_TRY_AGAIN:
1924  if (state->retries++ >= cluster->conf->max_retries) {
1925  ROPTIONAL(REDEBUG, ERROR, "[%i] Hit maximum retry attempts", state->node->id);
1926  fr_pool_connection_release(state->node->pool, request, *conn);
1927  *conn = NULL;
1928  return REDIS_RCODE_ERROR;
1929  }
1930 
1931  if (!*conn) *conn = fr_pool_connection_get(state->node->pool, request);
1932 
1933  if (fr_time_delta_ispos(cluster->conf->retry_delay)) nanosleep(&fr_time_delta_to_timespec(cluster->conf->retry_delay), NULL);
1934  goto try_again;
1935 
1936  /*
1937  * Connection's dead, check to see if we can switch nodes,
1938  * or, failing that, reconnect the connection.
1939  */
1940  case REDIS_RCODE_RECONNECT:
1941  {
1942  fr_redis_cluster_key_slot_t const *key_slot;
1943 
1944  ROPTIONAL(RPERROR, PERROR, "[%i] Failed communicating with %s:%i",
1945  state->node->id, state->node->name,
1946  state->node->addr.inet.dst_port);
1947 
1948  fr_pool_connection_close(state->node->pool, request, *conn); /* He's dead jim */
1949 
1950  if (state->reconnects++ > state->in_pool) {
1951  ROPTIONAL(REDEBUG, ERROR, "[%i] Hit maximum reconnect attempts", state->node->id);
1952  cluster->remap_needed = true;
1953  return REDIS_RCODE_RECONNECT;
1954  }
1955 
1956  /*
1957  * Refresh the key slot
1958  */
1959  key_slot = fr_redis_cluster_slot_by_key(cluster, request, state->key, state->key_len);
1960  state->node = &cluster->node[key_slot->master];
1961 
1962  *conn = fr_pool_connection_get(state->node->pool, request);
1963  if (!*conn) {
1964  ROPTIONAL(REDEBUG, ERROR, "[%i] No connections available for %s:%i",
1965  state->node->id, state->node->name, state->node->addr.inet.dst_port);
1966  cluster->remap_needed = true;
1967 
1968  if (cluster_node_find_live(&state->node, conn, request,
1969  cluster, state->node) < 0) return REDIS_RCODE_RECONNECT;
1970 
1971  goto try_again;
1972  }
1973 
1974  state->retries = 0;
1975  }
1976  goto try_again;
1977 
1978  /*
1979  * -MOVE is treated identically to -ASK, except it may
1980  * trigger a cluster remap.
1981  */
1982  case REDIS_RCODE_MOVE:
1983  fr_assert(*reply);
1984 
1985  if (*conn && (fr_redis_cluster_remap(request, cluster, *conn) != FR_REDIS_CLUSTER_RCODE_SUCCESS)) {
1986  ROPTIONAL(RPDEBUG2, PDEBUG2, "%s", "");
1987  }
1988  FALL_THROUGH;
1989 
1990  /*
1991  * -ASK process a redirect.
1992  */
1993  case REDIS_RCODE_ASK:
1994  {
1996 
1997  fr_pool_connection_release(state->node->pool, request, *conn); /* Always release the old connection */
1998 
1999  if (!fr_cond_assert(*reply)) return REDIS_RCODE_ERROR;
2000 
2001  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Processing redirect \"%s\"", state->node->id, (*reply)->str);
2002  if (state->redirects++ >= cluster->conf->max_redirects) {
2003  ROPTIONAL(REDEBUG, ERROR, "[%i] Reached max_redirects (%i)", state->node->id, state->redirects);
2004  return REDIS_RCODE_ERROR;
2005  }
2006 
2007  switch (cluster_redirect(&new, cluster, *reply)) {
2009  if (new == state->node) {
2010  ROPTIONAL(REDEBUG, ERROR, "[%i] %s:%i issued redirect to itself", state->node->id,
2011  state->node->name, state->node->addr.inet.dst_port);
2012  return REDIS_RCODE_ERROR;
2013  }
2014 
2015  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Redirected from %s:%i to [%i] %s:%i",
2016  state->node->id, state->node->name,
2017  state->node->addr.inet.dst_port, new->id, new->name, new->addr.inet.dst_port);
2018  state->node = new;
2019 
2020  *conn = fr_pool_connection_get(state->node->pool, request);
2021  if (!*conn) return REDIS_RCODE_RECONNECT;
2022 
2023  /*
2024  * Reset these counters, their scope is
2025  * a single node in the cluster.
2026  */
2027  state->reconnects = 0;
2028  state->retries = 0;
2029  state->in_pool = fr_pool_state(state->node->pool)->num;
2030  goto try_again;
2031 
2033  cluster->remap_needed = true;
2034  return REDIS_RCODE_RECONNECT;
2035 
2036  default:
2037  return REDIS_RCODE_ERROR;
2038  }
2039  }
2040  }
2041 
2042 try_again:
2043  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] >>> Sending command(s) to %s:%i",
2044  state->node->id, state->node->name, state->node->addr.inet.dst_port);
2045 
2046  fr_redis_reply_free(&*reply);
2047  *reply = NULL;
2048 
2049  return REDIS_RCODE_TRY_AGAIN;
2050 }
2051 
2052 /** Get the pool associated with a node in the cluster
2053  *
2054  * @note This is used for testing only. It's not ifdef'd out because
2055  * tests need to run against production builds too.
2056  *
2057  * @param[out] pool associated with the node.
2058  * @param[in] cluster to search for node in.
2059  * @param[in] node_addr to retrieve pool for. Specifies IP and port of node.
2060  * @param[in] create Establish a connection to the specified node if it
2061  * was previously unknown to the cluster client.
2062  * @return
2063  * - 0 on success.
2064  * - -1 if no such node exists.
2065  */
2067  fr_socket_t *node_addr, bool create)
2068 {
2069  fr_redis_cluster_node_t find, *found;
2070 
2071  find.addr = (fr_socket_t) {
2072  .inet = {
2073  .dst_ipaddr = node_addr->inet.dst_ipaddr,
2074  .dst_port = node_addr->inet.dst_port,
2075  }
2076  };
2077 
2078  pthread_mutex_lock(&cluster->mutex);
2079  found = fr_rb_find(cluster->used_nodes, &find);
2080  if (!found) {
2081  fr_redis_cluster_node_t *spare;
2082  char buffer[INET6_ADDRSTRLEN];
2083  char const *hostname;
2084 
2085  if (!create) {
2086  pthread_mutex_unlock(&cluster->mutex);
2087 
2088  hostname = inet_ntop(node_addr->inet.dst_ipaddr.af, &node_addr->inet.dst_ipaddr.addr, buffer, sizeof(buffer));
2089  fr_assert(hostname); /* addr.ipaddr is probably corrupt */
2090  fr_strerror_printf("No existing node found with address %s, port %i",
2091  hostname, node_addr->inet.dst_port);
2092  return -1;
2093  }
2094 
2095  spare = fr_fifo_peek(cluster->free_nodes);
2096  if (!spare) {
2097  fr_strerror_const("Reached maximum connected nodes");
2098  pthread_mutex_unlock(&cluster->mutex);
2099  return -1;
2100  }
2101  spare->pending_addr = find.addr; /* Set the config to be applied */
2102  if (cluster_node_connect(cluster, spare) < 0) {
2103  pthread_mutex_unlock(&cluster->mutex);
2104  return -1;
2105  }
2106  fr_rb_insert(cluster->used_nodes, spare);
2107  fr_fifo_pop(cluster->free_nodes);
2108  found = spare;
2109  }
2110  /*
2111  * Sanity checks
2112  */
2113  fr_assert(((talloc_array_length(cluster->node) - 1) - fr_rb_num_elements(cluster->used_nodes)) ==
2114  fr_fifo_num_elements(cluster->free_nodes));
2115  pthread_mutex_unlock(&cluster->mutex);
2116 
2117  *pool = found->pool;
2118 
2119  return 0;
2120 }
2121 
2122 /** Return an array of IP addresses belonging to masters or slaves
2123  *
2124  * @note We return IP addresses as they're safe to use across cluster remaps.
2125  * @note Result array must be freed (talloc_free()) after use.
2126  *
2127  * @param[in] ctx to allocate array of IP addresses in.
2128  * @param[out] out Where to write the addresses of the nodes.
2129  * @param[in] cluster to search for nodes in.
2130  * @param[in] is_master If true, include the addresses of all the master nodes.
2131  * @param[in] is_slave If true, include the addresses of all the slaves nodes.
2132  * @return the number of ip addresses written to out.
2133  */
2135  fr_redis_cluster_t *cluster, bool is_master, bool is_slave)
2136 {
2137  uint64_t in_use = fr_rb_num_elements(cluster->used_nodes);
2138  fr_rb_iter_inorder_t iter;
2140  uint8_t count;
2141  fr_socket_t *found;
2142 
2143  if (in_use == 0) {
2144  *out = NULL;
2145  return 0;
2146  }
2147 
2148  count = 0;
2149  found = talloc_zero_array(ctx, fr_socket_t, in_use);
2150  if (!found) {
2151  fr_strerror_const("Out of memory");
2152  return -1;
2153  }
2154 
2155  pthread_mutex_lock(&cluster->mutex);
2156 
2157  for (node = fr_rb_iter_init_inorder(&iter, cluster->used_nodes);
2158  node;
2159  node = fr_rb_iter_next_inorder(&iter)) {
2160  if ((is_master && node->is_master) || (is_slave && !node->is_master)) found[count++] = node->addr;
2161  }
2162 
2163  pthread_mutex_unlock(&cluster->mutex);
2164 
2165  if (count == 0) {
2166  *out = NULL;
2167  talloc_free(found);
2168  return 0;
2169  }
2170 
2171  *out = found;
2172 
2173  return count;
2174 }
2175 
2176 /** Destroy mutex associated with cluster slots structure
2177  *
2178  * @param cluster being freed.
2179  * @return 0
2180  */
2182 {
2183  pthread_mutex_destroy(&cluster->mutex);
2184 
2185  return 0;
2186 }
2187 
2188 /** Check if members of the cluster are above a certain version
2189  *
2190  * @param cluster to perform check on.
2191  * @param min_version that must be found on each node for the check to succeed.
2192  * Must be in the format @verbatim <major>.<minor>.<release> @endverbatim.
2193  * @return
2194  * - true if all contactable members are above min_version.
2195  * - false if at least one member if not above minimum version
2196  * (use #fr_strerror to retrieve node information).
2197  */
2198 bool fr_redis_cluster_min_version(fr_redis_cluster_t *cluster, char const *min_version)
2199 {
2201  fr_rb_iter_inorder_t iter;
2202  fr_redis_conn_t *conn;
2203  int ret;
2204  char buffer[40];
2205  bool all_above = true;
2206 
2207  pthread_mutex_lock(&cluster->mutex);
2208 
2209  for (node = fr_rb_iter_init_inorder(&iter, cluster->used_nodes);
2210  node;
2211  node = fr_rb_iter_next_inorder(&iter)) {
2212  conn = fr_pool_connection_get(node->pool, NULL);
2213  if (!conn) continue;
2214 
2215  /*
2216  * We don't care if we can't get the version
2217  * as we don't want to prevent the server from
2218  * starting if start == 0.
2219  */
2220  ret = fr_redis_get_version(buffer, sizeof(buffer), conn);
2221  fr_pool_connection_release(node->pool, NULL, conn);
2222  if (ret < 0) continue;
2223 
2224  if (fr_redis_version_num(buffer) < fr_redis_version_num(min_version)) {
2225  fr_strerror_printf("Redis node %s:%i (currently v%s) needs update to >= v%s",
2226  node->name, node->addr.inet.dst_port, buffer, min_version);
2227  all_above = false;
2228  break;
2229  }
2230  }
2231 
2232  pthread_mutex_unlock(&cluster->mutex);
2233 
2234  return all_above;
2235 }
2236 
2237 /** Allocate and initialise a new cluster structure
2238  *
2239  * This holds all the data necessary to manage a pool of pools for a specific redis cluster.
2240  *
2241  * @note Will not error out unless cs.pool.start > 0. This is consistent with other pool based
2242  * modules/code.
2243  *
2244  * @param ctx to link the lifetime of the cluster structure to.
2245  * @param module Configuration section to search for 'server' conf pairs in.
2246  * @param conf Base redis server configuration. Cluster nodes share database
2247  * number and password.
2248  * @param triggers_enabled Whether triggers should be enabled.
2249  * @param log_prefix Custom log prefix. Defaults to @verbatim rlm_<module> (<instance>) @endverbatim.
2250  * @param trigger_prefix Custom trigger prefix. Defaults to @verbatim modules.<module>.pool @endverbatim.
2251  * @param trigger_args Argument pairs to pass to the trigger in addition to Connection-Pool-Server,
2252  * and Connection-Pool-Port (which are always set by the cluster code).
2253  * @return
2254  * - New #fr_redis_cluster_t on success.
2255  * - NULL on error.
2256  */
2258  CONF_SECTION *module,
2260  bool triggers_enabled,
2261  char const *log_prefix,
2262  char const *trigger_prefix,
2263  fr_pair_list_t *trigger_args)
2264 {
2265  uint8_t i;
2266  uint16_t s;
2267 
2268  char const *cs_name1, *cs_name2;
2269 
2270  CONF_PAIR *cp;
2271  int af = AF_UNSPEC; /* AF of first server */
2272 
2273  uint64_t num_nodes;
2274  fr_redis_cluster_t *cluster;
2275 
2276  fr_assert(triggers_enabled || !trigger_prefix);
2277  fr_assert(triggers_enabled || (!trigger_args || fr_pair_list_empty(trigger_args)));
2278 
2279  MEM(cluster = talloc_zero(NULL, fr_redis_cluster_t));
2280  fr_pair_list_init(&cluster->trigger_args);
2281 
2282  cs_name1 = cf_section_name1(module);
2283  cs_name2 = cf_section_name2(module);
2284 
2285  cluster->triggers_enabled = triggers_enabled;
2286  if (cluster->triggers_enabled) {
2287  /*
2288  * Setup trigger prefix
2289  */
2290  if (!trigger_prefix) {
2291  cluster->trigger_prefix = talloc_typed_asprintf(cluster, "modules.%s.pool", cs_name1);
2292  } else {
2293  cluster->trigger_prefix = talloc_strdup(cluster, trigger_prefix);
2294  }
2295 
2296  /*
2297  * Duplicate the trigger arguments.
2298  */
2299  if (trigger_args) MEM(fr_pair_list_copy(cluster, &cluster->trigger_args, trigger_args) >= 0);
2300  }
2301 
2302  /*
2303  * Setup log prefix
2304  */
2305  if (!log_prefix) {
2306  if (!cs_name2) cs_name2 = cs_name1;
2307  cluster->log_prefix = talloc_typed_asprintf(conf, "rlm_%s (%s)", cs_name1, cs_name2);
2308  } else {
2309  cluster->log_prefix = talloc_strdup(cluster, log_prefix);
2310  }
2311 
2312  /*
2313  * Ensure we always have a pool section (even if it's empty)
2314  */
2315  if (!cf_section_find(module, "pool", NULL)) {
2316  (void) cf_section_alloc(module, module, "pool", NULL);
2317  }
2318 
2319  /*
2320  * Parse TLS configuration
2321  */
2322  if (conf->use_tls) {
2323 #ifdef HAVE_REDIS_SSL
2324  CONF_SECTION *tls_cs;
2325  fr_tls_conf_t *tls_conf;
2326 
2327  tls_cs = cf_section_find(module, "tls", NULL);
2328  if (!tls_cs) {
2329  tls_cs = cf_section_alloc(module, module, "tls", NULL);
2330  }
2331 
2332  tls_conf = fr_tls_conf_parse_client(tls_cs);
2333  if (!tls_conf) {
2334  ERROR("%s - Failed to parse TLS configuration", cluster->log_prefix);
2335  talloc_free(cluster);
2336  return NULL;
2337  }
2338 
2339  cluster->ssl_ctx = fr_tls_ctx_alloc(tls_conf, true);
2340  if (!cluster->ssl_ctx) {
2341  ERROR("%s - Failed to allocate SSL context", cluster->log_prefix);
2342  talloc_free(cluster);
2343  return NULL;
2344  }
2345 #else
2346  WARN("%s - No redis SSL support, ignoring \"use_tls = yes\"", cluster->log_prefix);
2347 #endif
2348  }
2349 
2350  if (conf->max_nodes == UINT8_MAX) {
2351  ERROR("%s - Maximum number of connected nodes allowed is %i", cluster->log_prefix, UINT8_MAX - 1);
2352  talloc_free(cluster);
2353  return NULL;
2354  }
2355 
2356  if (conf->max_nodes == 0) {
2357  ERROR("%s - Minimum number of nodes allowed is 1", cluster->log_prefix);
2358  talloc_free(cluster);
2359  return NULL;
2360  }
2361 
2362  cp = cf_pair_find(module, "server");
2363  if (!cp) {
2364  ERROR("%s - No servers configured", cluster->log_prefix);
2365  talloc_free(cluster);
2366  return NULL;
2367  }
2368 
2369  cluster->module = module;
2370 
2371  /*
2372  * Ensure the pool is freed at the same time as its
2373  * parent.
2374  *
2375  * We need to break the link between the cluster and
2376  * its parent context, as the two contexts may be
2377  * modified by multiple threads.
2378  */
2379  MEM(talloc_link_ctx(ctx, cluster) >= 0);
2380  MEM(cluster->node = talloc_zero_array(cluster, fr_redis_cluster_node_t, conf->max_nodes + 1));
2381  MEM(cluster->used_nodes = fr_rb_inline_alloc(cluster, fr_redis_cluster_node_t, rbnode, _cluster_node_cmp, NULL));
2382  MEM(cluster->free_nodes = fr_fifo_create(cluster, conf->max_nodes, NULL));
2383 
2384  cluster->conf = conf;
2385 
2386  pthread_mutex_init(&cluster->mutex, NULL);
2387  talloc_set_destructor(cluster, _fr_redis_cluster_free);
2388 
2389  /*
2390  * Node id 0 is reserved, so we can detect misconfigured
2391  * clusters.
2392  */
2393  for (i = 1; i < (cluster->conf->max_nodes + 1); i++) {
2394  cluster->node[i].id = i;
2395  cluster->node[i].cluster = cluster;
2396 
2397  /* Push them all into the queue */
2398  fr_fifo_push(cluster->free_nodes, &cluster->node[i]);
2399  }
2400 
2401  /*
2402  * Don't connect to cluster nodes if we're just
2403  * checking the config.
2404  */
2405  if (check_config) return cluster;
2406 
2407  /*
2408  * Populate the cluster with the bootstrap servers.
2409  *
2410  * If we fail getting a key_slot map here, then the
2411  * bootstrap servers are distributed evenly through
2412  * the key slots.
2413  *
2414  * This allows the server to start, and potentially,
2415  * a valid map to be applied, once the server starts
2416  * processing requests.
2417  */
2418  do {
2419  char const *server;
2421  fr_redis_conn_t *conn;
2422  redisReply *map;
2423  size_t j, k;
2424 
2425  node = fr_fifo_peek(cluster->free_nodes);
2426  if (!node) {
2427  ERROR("%s - Number of bootstrap servers exceeds 'max_nodes'", cluster->log_prefix);
2428 
2429  error:
2430  talloc_free(cluster);
2431  return NULL;
2432  }
2433 
2434  server = cf_pair_value(cp);
2435  if (fr_inet_pton_port(&node->pending_addr.inet.dst_ipaddr, &node->pending_addr.inet.dst_port, server,
2436  talloc_array_length(server) - 1, af, true, true) < 0) {
2437  PERROR("%s - Failed parsing server \"%s\"", cluster->log_prefix, server);
2438  goto error;
2439  }
2440  if (!node->pending_addr.inet.dst_port) node->pending_addr.inet.dst_port = conf->port;
2441 
2442  if (cluster_node_connect(cluster, node) < 0) {
2443  WARN("%s - Connecting to %s:%i failed", cluster->log_prefix, node->name, node->pending_addr.inet.dst_port);
2444  continue;
2445  }
2446 
2447  if (!fr_rb_insert(cluster->used_nodes, node)) {
2448  WARN("%s - Skipping duplicate bootstrap server \"%s\"", cluster->log_prefix, server);
2449  continue;
2450  }
2451  node->is_active = true;
2452  fr_fifo_pop(cluster->free_nodes);
2453 
2454  /*
2455  * Prefer the same IPaddr family as the first node
2456  */
2457  if (af == AF_UNSPEC) af = node->addr.inet.dst_ipaddr.af;
2458 
2459  /*
2460  * Only get cluster map config if required
2461  */
2462  if (fr_pool_start_num(node->pool) > 0) {
2463  /*
2464  * Fine to leave this node configured, if we do find
2465  * a live node, and it's not in the map, it'll be cleared out.
2466  */
2467  conn = fr_pool_connection_get(node->pool, NULL);
2468  if (!conn) {
2469  WARN("%s - Can't contact bootstrap server \"%s\"", cluster->log_prefix, server);
2470  continue;
2471  }
2472  } else {
2473  break;
2474  }
2475 
2476  if (!cluster->conf->use_cluster_map) {
2477  fr_pool_connection_release(node->pool, NULL, conn);
2478  continue;
2479  }
2480 
2481  switch (cluster_map_get(&map, conn)) {
2482  /*
2483  * We got a valid map! See if we can apply it...
2484  */
2486  fr_pool_connection_release(node->pool, NULL, conn);
2487 
2488  DEBUG("%s - Cluster map consists of %zu key ranges", cluster->log_prefix, map->elements);
2489  for (j = 0; j < map->elements; j++) {
2490  redisReply *map_node = map->element[j];
2491 
2492  DEBUG("%s - %zu - keys %lli-%lli", cluster->log_prefix, j,
2493  map_node->element[0]->integer,
2494  map_node->element[1]->integer);
2495  DEBUG("%s - master: %s:%lli", cluster->log_prefix,
2496  map_node->element[2]->element[0]->str,
2497  map_node->element[2]->element[1]->integer);
2498  for (k = 3; k < map_node->elements; k++) {
2499  DEBUG("%s - slave%zu: %s:%lli", cluster->log_prefix, k - 3,
2500  map_node->element[k]->element[0]->str,
2501  map_node->element[k]->element[1]->integer);
2502  }
2503  }
2504 
2505  if (cluster_map_apply(cluster, map) < 0) {
2506  PWARN("%s: Applying cluster map failed", cluster->log_prefix);
2507  fr_redis_reply_free(&map);
2508  continue;
2509  }
2510  fr_redis_reply_free(&map);
2511 
2512  return cluster;
2513 
2514  /*
2515  * Unusable bootstrap node
2516  */
2518  PWARN("%s - Bootstrap server \"%s\" returned invalid data", cluster->log_prefix, server);
2519  fr_pool_connection_release(node->pool, NULL, conn);
2520  continue;
2521 
2523  PWARN("%s - Can't contact bootstrap server \"%s\"", cluster->log_prefix, server);
2524  fr_pool_connection_close(node->pool, NULL, conn);
2525  continue;
2526 
2527  /*
2528  * Clustering not enabled, or not supported,
2529  * by this node, skip it and check the others.
2530  */
2533  PDEBUG2("%s - Bootstrap server \"%s\" returned", cluster->log_prefix, server);
2534  fr_pool_connection_release(node->pool, NULL, conn);
2535  break;
2536  }
2537  } while ((cp = cf_pair_find_next(module, cp, "server")));
2538 
2539  /*
2540  * Catch pool.start != 0
2541  */
2542  num_nodes = fr_rb_num_elements(cluster->used_nodes);
2543  if (!num_nodes) {
2544  ERROR("%s - Can't contact any bootstrap servers", cluster->log_prefix);
2545  goto error;
2546  }
2547 
2548  /*
2549  * We've failed to apply a valid cluster map.
2550  * Distribute the node(s) throughout the key_slots,
2551  * hopefully we'll get one when we start processing
2552  * requests.
2553  */
2554  for (s = 0; s < KEY_SLOTS; s++) cluster->key_slot[s].master = (s % (uint16_t) num_nodes) + 1;
2555 
2556  return cluster;
2557 }
static int const char char buffer[256]
Definition: acutest.h:574
va_list args
Definition: acutest.h:770
#define L(_str)
Helper for initialising arrays of string literals.
Definition: build.h:207
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition: build.h:320
#define STRINGIFY(x)
Definition: build.h:195
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition: build.h:110
#define NUM_ELEMENTS(_t)
Definition: build.h:335
bool check_config
Definition: cf_file.c:66
Configuration AVP similar to a fr_pair_t.
Definition: cf_priv.h:70
A section grouping multiple CONF_PAIR.
Definition: cf_priv.h:89
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
Definition: cf_util.c:970
CONF_PAIR * cf_pair_find(CONF_SECTION const *cs, char const *attr)
Search for a CONF_PAIR with a specific name.
Definition: cf_util.c:1356
char const * cf_section_name2(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
Definition: cf_util.c:1126
char const * cf_pair_value(CONF_PAIR const *pair)
Return the value of a CONF_PAIR.
Definition: cf_util.c:1511
CONF_SECTION * cf_section_dup(TALLOC_CTX *ctx, CONF_SECTION *parent, CONF_SECTION const *cs, char const *name1, char const *name2, bool copy_meta)
Duplicate a configuration section.
Definition: cf_util.c:894
CONF_PAIR * cf_pair_find_next(CONF_SECTION const *cs, CONF_PAIR const *prev, char const *attr)
Find a pair with a name matching attr, after specified pair.
Definition: cf_util.c:1370
char const * cf_section_name1(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
Definition: cf_util.c:1112
#define cf_section_alloc(_ctx, _parent, _name1, _name2)
Definition: cf_util.h:137
bool remapping
True when cluster is being remapped.
Definition: cluster.c:253
static fr_redis_cluster_rcode_t cluster_map_get(redisReply **out, fr_redis_conn_t *conn)
Learn a new cluster layout by querying the node that issued the -MOVE.
Definition: cluster.c:860
fr_redis_rcode_t fr_redis_cluster_state_next(fr_redis_cluster_state_t *state, fr_redis_conn_t **conn, fr_redis_cluster_t *cluster, request_t *request, fr_redis_rcode_t status, redisReply **reply)
Get the next connection to attempt a command against.
Definition: cluster.c:1859
char const * log_prefix
What to prepend to log messages.
Definition: cluster.c:248
fr_pool_t * pool
Pool associated with this node.
Definition: cluster.c:222
char name[INET6_ADDRSTRLEN]
Buffer to hold IP string.
Definition: cluster.c:212
#define SET_ACTIVE(_node)
bool is_active
Whether this node is in the active node set.
Definition: cluster.c:225
#define CLOSED_WEIGHT
What weight to give to nodes that.
Definition: cluster.c:178
#define MAX_SLAVES
Maximum number of slaves associated.
Definition: cluster.c:169
static fr_redis_cluster_rcode_t cluster_node_connect(fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *node)
Establish a connection to a cluster node.
Definition: cluster.c:368
static int cluster_map_node_validate(redisReply *node, int map_idx, int node_idx)
Validate a cluster map node entry.
Definition: cluster.c:782
#define SET_ADDR(_addr, _map)
static uint16_t cluster_key_hash(uint8_t const *key, size_t key_len)
Resolve key to key slot.
Definition: cluster.c:295
static fr_redis_cluster_rcode_t cluster_redirect(fr_redis_cluster_node_t **out, fr_redis_cluster_t *cluster, redisReply *reply)
Retrieve or associate a node with the server indicated in the redirect.
Definition: cluster.c:1115
static fr_redis_cluster_rcode_t cluster_node_conf_from_redirect(uint16_t *key_slot, fr_socket_t *node_addr, redisReply *redirect)
Parse a -MOVED or -ASK redirect.
Definition: cluster.c:443
static int cluster_node_find_live(fr_redis_cluster_node_t **live_node, fr_redis_conn_t **live_conn, request_t *request, fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *skip)
Attempt to find a live pool in the cluster.
Definition: cluster.c:1307
fr_redis_cluster_key_slot_t const * fr_redis_cluster_slot_by_key(fr_redis_cluster_t *cluster, request_t *request, uint8_t const *key, size_t key_len)
Implements the key slot selection scheme used by freeradius.
Definition: cluster.c:1599
#define CLOSED_PERIOD
How recently must the closed have.
Definition: cluster.c:175
fr_redis_cluster_key_slot_t key_slot_pending[KEY_SLOTS]
Pending key slot table.
Definition: cluster.c:272
struct cluster_nodes_live_t::@51 node[UINT8_MAX - 1]
Array of live node IDs (and weights).
fr_rb_tree_t * used_nodes
Tree of used nodes.
Definition: cluster.c:269
fr_redis_cluster_key_slot_t key_slot[KEY_SLOTS]
Lookup table of slots to pools.
Definition: cluster.c:271
#define KEY_SLOTS
Maximum number of keyslots (should not change).
Definition: cluster.c:167
fr_redis_cluster_t * cluster
Common configuration (database number, password, etc..).
Definition: cluster.c:220
char const * trigger_prefix
Trigger path.
Definition: cluster.c:249
fr_redis_conf_t * conf
Base configuration data such as the database number and passwords.
Definition: cluster.c:259
fr_rb_node_t rbnode
Entry into the tree of redis nodes.
Definition: cluster.c:210
bool remap_needed
Set true if at least one cluster node is definitely unreachable.
Definition: cluster.c:254
fr_time_t last_updated
Last time the cluster mappings were updated.
Definition: cluster.c:256
#define SET_INACTIVE(_node)
int fr_redis_cluster_pool_by_node_addr(fr_pool_t **pool, fr_redis_cluster_t *cluster, fr_socket_t *node_addr, bool create)
Get the pool associated with a node in the cluster.
Definition: cluster.c:2066
fr_pair_list_t trigger_args
Arguments to pass to triggers.
Definition: cluster.c:250
ssize_t fr_redis_cluster_node_addr_by_role(TALLOC_CTX *ctx, fr_socket_t *out[], fr_redis_cluster_t *cluster, bool is_master, bool is_slave)
Return an array of IP addresses belonging to masters or slaves.
Definition: cluster.c:2134
fr_socket_t pending_addr
New node address to be applied when the pool is reconnected.
Definition: cluster.c:217
uint8_t next
Next index in live.
Definition: cluster.c:201
size_t fr_redis_cluster_rcodes_table_len
Definition: cluster.c:284
static void _cluster_node_conf_apply(fr_pool_t *pool, void *opaque)
Reconnect callback to apply new pool config.
Definition: cluster.c:336
uint8_t slave[MAX_SLAVES]
R/O node (slave) for this key slot.
Definition: cluster.c:238
int fr_redis_cluster_port(uint16_t *out, fr_redis_cluster_node_t const *node)
Return the port of a particular node.
Definition: cluster.c:1685
static int _fr_redis_cluster_free(fr_redis_cluster_t *cluster)
Destroy mutex associated with cluster slots structure.
Definition: cluster.c:2181
#define FAILED_WEIGHT
What weight to give to nodes that.
Definition: cluster.c:184
uint8_t master
R/W node (master) for this key slot.
Definition: cluster.c:240
CONF_SECTION * module
Module configuration.
Definition: cluster.c:257
void * fr_redis_cluster_conn_create(TALLOC_CTX *ctx, void *instance, fr_time_delta_t timeout)
Create a new connection to a Redis node.
Definition: cluster.c:1464
static int cluster_node_pool_health(fr_time_t now, fr_pool_state_t const *state)
Try to determine the health of a cluster node passively by examining its pool state.
Definition: cluster.c:1205
static fr_redis_cluster_rcode_t cluster_node_ping(request_t *request, fr_redis_cluster_node_t *node, fr_redis_conn_t *conn)
Issue a ping request against a cluster node.
Definition: cluster.c:1240
static int8_t _cluster_node_cmp(void const *one, void const *two)
Compare two redis nodes to check equality.
Definition: cluster.c:319
fr_redis_cluster_rcode_t fr_redis_cluster_remap(request_t *request, fr_redis_cluster_t *cluster, fr_redis_conn_t *conn)
Perform a runtime remap of the cluster.
Definition: cluster.c:1005
#define RELEASED_MIN_WEIGHT
Minimum weight to assign to node.
Definition: cluster.c:191
static fr_redis_cluster_rcode_t cluster_map_apply(fr_redis_cluster_t *cluster, redisReply *reply)
Apply a cluster map received from a cluster node.
Definition: cluster.c:534
uint8_t id
Node ID (index in node array).
Definition: cluster.c:214
fr_redis_cluster_node_t const * fr_redis_cluster_slave(fr_redis_cluster_t *cluster, fr_redis_cluster_key_slot_t const *key_slot, uint8_t slave_num)
Return the slave node that would be used for a particular key.
Definition: cluster.c:1651
bool triggers_enabled
Whether triggers are enabled.
Definition: cluster.c:251
fr_redis_rcode_t fr_redis_cluster_state_init(fr_redis_cluster_state_t *state, fr_redis_conn_t **conn, fr_redis_cluster_t *cluster, request_t *request, uint8_t const *key, size_t key_len, bool read_only)
Resolve a key to a pool, and reserve a connection in that pool.
Definition: cluster.c:1737
fr_redis_cluster_node_t const * fr_redis_cluster_master(fr_redis_cluster_t *cluster, fr_redis_cluster_key_slot_t const *key_slot)
Return the master node that would be used for a particular key.
Definition: cluster.c:1635
fr_redis_cluster_t * fr_redis_cluster_alloc(TALLOC_CTX *ctx, CONF_SECTION *module, fr_redis_conf_t *conf, bool triggers_enabled, char const *log_prefix, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Allocate and initialise a new cluster structure.
Definition: cluster.c:2257
static int _cluster_conn_free(fr_redis_conn_t *conn)
Callback for freeing a Redis connection.
Definition: cluster.c:1448
fr_socket_t addr
Current node address.
Definition: cluster.c:216
fr_table_num_sorted_t const fr_redis_cluster_rcodes_table[]
Definition: cluster.c:277
bool is_master
Whether this node is a master.
Definition: cluster.c:226
#define FAILED_PERIOD
How recently must the spawn failure.
Definition: cluster.c:181
fr_fifo_t * free_nodes
Queue of free nodes (or nodes waiting to be reused).
Definition: cluster.c:268
#define RELEASED_PERIOD
Period after which we don't care.
Definition: cluster.c:187
int fr_redis_cluster_ipaddr(fr_ipaddr_t *out, fr_redis_cluster_node_t const *node)
Return the ipaddr of a particular node.
Definition: cluster.c:1668
pthread_mutex_t mutex
Mutex to synchronise cluster operations.
Definition: cluster.c:274
uint8_t slave_num
Number of slaves associated with this key slot.
Definition: cluster.c:239
fr_redis_cluster_node_t * node
Structure containing a node id, its address and a pool of its connections.
Definition: cluster.c:265
bool fr_redis_cluster_min_version(fr_redis_cluster_t *cluster, char const *min_version)
Check if members of the cluster are above a certain version.
Definition: cluster.c:2198
CONF_SECTION * pool_cs
Pool configuration section associated with node.
Definition: cluster.c:223
Live nodes data, used to perform weighted random selection of alternative nodes.
Definition: cluster.c:195
A redis cluster.
Definition: cluster.c:247
Indexes in the fr_redis_cluster_node_t array for a single key slot.
Definition: cluster.c:237
A Redis cluster node.
Definition: cluster.c:209
Common functions for interacting with Redis cluster via Hiredis.
size_t key_len
Length of the key.
Definition: cluster.h:55
bool close_conn
Set by caller of fr_redis_cluster_state_next, to indicate that connection must be closed,...
Definition: cluster.h:50
uint32_t retries
How many times we've received TRYAGAIN.
Definition: cluster.h:60
uint32_t reconnects
How many connections we've tried in this pool.
Definition: cluster.h:62
uint8_t const * key
Key we performed hashing on.
Definition: cluster.h:54
fr_redis_cluster_rcode_t
Return values for internal functions.
Definition: cluster.h:67
@ FR_REDIS_CLUSTER_RCODE_IGNORED
Operation ignored.
Definition: cluster.h:68
@ FR_REDIS_CLUSTER_RCODE_FAILED
Operation failed.
Definition: cluster.h:70
@ FR_REDIS_CLUSTER_RCODE_SUCCESS
Operation completed successfully.
Definition: cluster.h:69
@ FR_REDIS_CLUSTER_RCODE_BAD_INPUT
Validation error.
Definition: cluster.h:73
@ FR_REDIS_CLUSTER_RCODE_NO_CONNECTION
Operation failed because we couldn't find a live connection.
Definition: cluster.h:71
uint32_t redirects
How many redirects have we followed.
Definition: cluster.h:58
fr_redis_cluster_node_t * node
Node we're communicating with.
Definition: cluster.h:57
uint32_t in_pool
How many available connections are there in the pool.
Definition: cluster.h:61
Redis connection sequence state.
Definition: cluster.h:49
uint16_t fr_crc16_xmodem(uint8_t const *in, size_t in_len)
CRC16 implementation according to CCITT standards.
Definition: crc16.c:91
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:137
#define ERROR(fmt,...)
Definition: dhcpclient.c:41
#define DEBUG(fmt,...)
Definition: dhcpclient.c:39
static fr_time_delta_t timeout
Definition: dhcpclient.c:54
void * fr_fifo_peek(fr_fifo_t *fi)
Examine the next element that would be popped.
Definition: fifo.c:158
unsigned int fr_fifo_num_elements(fr_fifo_t *fi)
Return the number of elements in the fifo queue.
Definition: fifo.c:170
void * fr_fifo_pop(fr_fifo_t *fi)
Pop data off of the fifo.
Definition: fifo.c:135
int fr_fifo_push(fr_fifo_t *fi, void *data)
Push data onto the fifo.
Definition: fifo.c:111
Definition: fifo.c:30
#define fr_fifo_create(_ctx, _max_entries, _node_free)
Creates a fifo.
Definition: fifo.h:66
int fr_inet_pton(fr_ipaddr_t *out, char const *value, ssize_t inlen, int af, bool resolve, bool mask)
Simple wrapper to decide whether an IP value is v4 or v6 and call the appropriate parser.
Definition: inet.c:764
int fr_inet_pton_port(fr_ipaddr_t *out, uint16_t *port_out, char const *value, ssize_t inlen, int af, bool resolve, bool mask)
Parses IPv4/6 address + port, to fr_ipaddr_t and integer (port)
Definition: inet.c:923
int8_t fr_ipaddr_cmp(fr_ipaddr_t const *a, fr_ipaddr_t const *b)
Compare two ip addresses.
Definition: inet.c:1332
int af
Address family.
Definition: inet.h:64
IPv4/6 prefix.
Definition: merged_model.c:272
#define PERROR(_fmt,...)
Definition: log.h:228
#define REXDENT()
Exdent (unindent) R* messages by one level.
Definition: log.h:443
#define DEBUG3(_fmt,...)
Definition: log.h:266
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition: log.h:528
#define RPDEBUG2(fmt,...)
Definition: log.h:347
#define RDEBUG3(fmt,...)
Definition: log.h:343
#define PDEBUG2(_fmt,...)
Definition: log.h:272
#define RWARN(fmt,...)
Definition: log.h:297
#define PWARN(_fmt,...)
Definition: log.h:227
#define RERROR(fmt,...)
Definition: log.h:298
#define RPERROR(fmt,...)
Definition: log.h:302
#define RINFO(fmt,...)
Definition: log.h:296
#define RPEDEBUG(fmt,...)
Definition: log.h:376
#define RINDENT()
Indent R* messages by one level.
Definition: log.h:430
talloc_free(reap)
@ L_DBG_LVL_3
3rd highest priority debug messages (-xxx | -Xx).
Definition: log.h:72
unsigned short uint16_t
Definition: merged_model.c:31
unsigned int uint32_t
Definition: merged_model.c:33
long int ssize_t
Definition: merged_model.c:24
unsigned char uint8_t
Definition: merged_model.c:30
#define UINT8_MAX
Definition: merged_model.c:32
char const * inet_ntop(int af, void const *src, char *dst, size_t cnt)
Definition: missing.c:443
int fr_pair_list_copy(TALLOC_CTX *ctx, fr_pair_list_t *to, fr_pair_list_t const *from)
Duplicate a list of pairs.
Definition: pair.c:2316
void fr_pair_list_init(fr_pair_list_t *list)
Initialise a pair list header.
Definition: pair.c:46
int fr_pool_start(fr_pool_t *pool)
Definition: pool.c:1114
void fr_pool_connection_release(fr_pool_t *pool, request_t *request, void *conn)
Release a connection.
Definition: pool.c:1405
fr_pool_state_t const * fr_pool_state(fr_pool_t *pool)
Get the number of connections currently in the pool.
Definition: pool.c:1171
int fr_pool_reconnect(fr_pool_t *pool, request_t *request)
Mark connections for reconnection, and spawn at least 'start' connections.
Definition: pool.c:1242
void * fr_pool_connection_get(fr_pool_t *pool, request_t *request)
Reserve a connection in the connection pool.
Definition: pool.c:1390
int fr_pool_connection_close(fr_pool_t *pool, request_t *request, void *conn)
Delete a connection from the connection pool.
Definition: pool.c:1535
int fr_pool_start_num(fr_pool_t *pool)
Connection pool get start.
Definition: pool.c:1191
void fr_pool_enable_triggers(fr_pool_t *pool, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Enable triggers for a connection pool.
Definition: pool.c:931
fr_pool_t * fr_pool_init(TALLOC_CTX *ctx, CONF_SECTION const *cs, void *opaque, fr_pool_connection_create_t c, fr_pool_connection_alive_t a, char const *log_prefix)
Create a new connection pool.
Definition: pool.c:965
void fr_pool_reconnect_func(fr_pool_t *pool, fr_pool_reconnect_t reconnect)
Set a reconnection callback for the connection pool.
Definition: pool.c:1222
A connection pool.
Definition: pool.c:85
fr_time_t last_failed
Last time we tried to spawn a connection but failed.
Definition: pool.h:50
fr_time_t last_closed
Last time a connection was closed.
Definition: pool.h:57
fr_time_t last_released
Last time a connection was released.
Definition: pool.h:56
uint32_t num
Number of connections in the pool.
Definition: pool.h:67
#define REDEBUG(fmt,...)
Definition: radclient.h:52
#define RDEBUG2(fmt,...)
Definition: radclient.h:54
#define DEBUG2(fmt,...)
Definition: radclient.h:43
#define WARN(fmt,...)
Definition: radclient.h:47
#define INFO(fmt,...)
Definition: radict.c:54
static rs_t * conf
Definition: radsniff.c:53
static char const * hostname(char *buf, size_t buflen, uint32_t ipaddr)
Definition: radwho.c:133
uint32_t fr_rand(void)
Return a 32-bit random number.
Definition: rand.c:106
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
Return how many nodes there are in a tree.
Definition: rb.c:775
void * fr_rb_iter_next_inorder(fr_rb_iter_inorder_t *iter)
Return the next node.
Definition: rb.c:844
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition: rb.c:624
void * fr_rb_iter_init_inorder(fr_rb_iter_inorder_t *iter, fr_rb_tree_t *tree)
Initialise an in-order iterator.
Definition: rb.c:818
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition: rb.c:576
#define fr_rb_inline_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black tree.
Definition: rb.h:271
Iterator structure for in-order traversal of an rbtree.
Definition: rb.h:321
The main red black tree structure.
Definition: rb.h:73
uint8_t max_nodes
Maximum number of cluster nodes to connect to.
Definition: base.h:119
fr_table_num_sorted_t const redis_rcodes[]
Definition: redis.c:40
bool use_cluster_map
use cluster map.
Definition: base.h:114
redisContext * handle
Hiredis context used when issuing commands.
Definition: base.h:101
char const * username
for acls.
Definition: base.h:116
uint32_t database
number on Redis server.
Definition: base.h:112
fr_redis_cluster_node_t * node
Node this connection is to.
Definition: base.h:102
void fr_redis_reply_print(fr_log_lvl_t lvl, redisReply *reply, request_t *request, int idx)
Print the response data in a useful treelike form.
Definition: redis.c:141
#define REDIS_ERROR_MOVED_STR
Definition: base.h:46
static void fr_redis_reply_free(redisReply **reply)
Wrap freeReplyObject so we consistently check for NULL pointers.
Definition: base.h:64
uint32_t max_redirects
Maximum number of times we can be redirected.
Definition: base.h:120
uint32_t fr_redis_version_num(char const *version)
Convert version string into a 32bit unsigned integer for comparisons.
Definition: redis.c:688
fr_redis_rcode_t fr_redis_command_status(fr_redis_conn_t *conn, redisReply *reply)
Check the reply for errors.
Definition: redis.c:71
fr_table_num_sorted_t const redis_reply_types[]
Definition: redis.c:30
fr_time_delta_t retry_delay
How long to wait when we received a -TRYAGAIN message.
Definition: base.h:124
#define REDIS_ERROR_ASK_STR
Definition: base.h:47
fr_redis_rcode_t
Codes are ordered inversely by priority.
Definition: base.h:87
@ REDIS_RCODE_RECONNECT
Transitory error, caller should retry the operation with a new connection.
Definition: base.h:91
@ REDIS_RCODE_SUCCESS
Operation was successful.
Definition: base.h:88
@ REDIS_RCODE_MOVE
Attempt operation on an alternative node with remap.
Definition: base.h:94
@ REDIS_RCODE_TRY_AGAIN
Try the operation again.
Definition: base.h:90
@ REDIS_RCODE_NO_SCRIPT
Script doesn't exist.
Definition: base.h:95
@ REDIS_RCODE_ASK
Attempt operation on an alternative node.
Definition: base.h:93
@ REDIS_RCODE_ERROR
Unrecoverable library/server error.
Definition: base.h:89
char const * password
to authenticate to Redis.
Definition: base.h:117
fr_redis_rcode_t fr_redis_get_version(char *out, size_t out_len, fr_redis_conn_t *conn)
Get the version of Redis running on the remote server.
Definition: redis.c:638
uint32_t max_alt
Maximum alternative nodes to try.
Definition: base.h:123
uint32_t max_retries
Maximum number of times we attempt a command when receiving successive -TRYAGAIN messages.
Definition: base.h:121
Configuration parameters for a redis connection.
Definition: base.h:109
Connection handle, holding a redis context.
Definition: base.h:100
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
Definition: snprintf.c:689
return count
Definition: module.c:175
fr_assert(0)
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition: table.h:253
An element in a lexicographically sorted array of name to num mappings.
Definition: table.h:45
int talloc_link_ctx(TALLOC_CTX *parent, TALLOC_CTX *child)
Link two different parent and child contexts, so the child is freed before the parent.
Definition: talloc.c:167
char * talloc_typed_asprintf(TALLOC_CTX *ctx, char const *fmt,...)
Call talloc vasprintf, setting the type on the new chunk correctly.
Definition: talloc.c:380
static int64_t fr_time_to_sec(fr_time_t when)
Convert an fr_time_t (internal time) to number of sec since the unix epoch (wallclock time)
Definition: time.h:729
#define fr_time_delta_to_timespec(_delta)
Convert a delta to a timespec.
Definition: time.h:664
#define fr_time_delta_ispos(_a)
Definition: time.h:288
#define fr_time_delta_to_timeval(_delta)
Convert a delta to a timeval.
Definition: time.h:654
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition: time.h:229
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
Definition: time.h:635
A time delta, a difference in time measured in nanoseconds.
Definition: time.h:80
"server local" time.
Definition: time.h:69
void trigger_args_afrom_server(TALLOC_CTX *ctx, fr_pair_list_t *list, char const *server, uint16_t port)
Create trigger arguments to describe the server the pool connects to.
Definition: trigger.c:501
bool trigger_enabled(void)
Return whether triggers are enabled.
Definition: trigger.c:182
Master include file to access all functions and structures in the library.
bool fr_pair_list_empty(fr_pair_list_t const *list)
Is a valuepair list empty.
Definition: pair_inline.c:125
void fr_pair_list_free(fr_pair_list_t *list)
Free memory used by a valuepair list.
Definition: pair_inline.c:113
int af
AF_INET, AF_INET6, or AF_UNIX.
Definition: socket.h:78
Holds information necessary for binding or connecting to a socket.
Definition: socket.h:63
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition: strerror.h:64
#define fr_strerror_const(_msg)
Definition: strerror.h:223
#define fr_box_strvalue_len(_val, _len)
Definition: value.h:279
static size_t char ** out
Definition: value.h:984