All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
cluster.c
Go to the documentation of this file.
1 /*
2  * This program is is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or (at
5  * your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: c17c030266a0ab847a77dbe588e3cc73e9d7b2e2 $
19  * @file redis.c
20  * @brief conf functions for interacting with Redis cluster via Hiredis.
21  *
22  * @author Arran Cudbard-Bell
23  *
24  * @copyright 2015 Arran Cudbard-Bell <a.cudbardb@freeradius.org>
25  * @copyright 2015 Network RADIUS <info@networkradius.com>
26  * @copyright 2015 The FreeRADIUS server project
27  *
28  * Overview
29  * ========
30  *
31  * Read and understand this http://redis.io/topics/cluster-spec first, else the text below
32  * will not be useful.
33  *
34  * Using the cluster's public API
35  * ------------------------------
36  *
37  * The two functions used at runtime to issue commands are #fr_redis_cluster_state_init
38  * and #fr_redis_cluster_state_next.
39  *
40  * #fr_redis_cluster_state_init initialises the structure we use to track which node
41  * we're communicating with, and uses a key (string) to determine which node we should
42  * try and contact first.
43  *
44  * #fr_redis_cluster_state_next examines the result of the last command, and either
45  * gets a new connection, or errors out.
46  *
47  * In both cases the connection should not be released by the caller, only by
48  * #fr_redis_cluster_state_next.
49  *
50  * Below is the sequence of calls required to use the cluster:
51  *
52  * 1. During initialization allocate a cluster configuration structure with
53  * #fr_redis_cluster_alloc. This holds the cluster configuration and state.
54  * 2. Declare a #fr_redis_cluster_state_t in the function that needs to issue
55  * commands against the cluster.
56  * 3. Call #fr_redis_cluster_state_init with relevant arguments including a pointer
57  * to the #fr_redis_cluster_state_t struct, and a key/key_len used for initial
58  * lookup. For most commands the key is the value of the second argument.
59  * #fr_redis_cluster_state_init will then and reserve/pass back a connection for
60  * the pool associated with the node associated with the key.
61  * 4. Use the connection that was passed back, to issue a Redis command against.
62  * 5. Use #fr_redis_command_status to translate the result of the command into
63  * a #fr_redis_rcode_t value.
64  * 6. Call #fr_redis_cluster_state_next with relevant arguments including a pointer
65  * to the #fr_redis_cluster_state_t struct and the #fr_redis_rcode_t value for the
66  * last command.
67  * 7. If #fr_redis_cluster_state_next returns 0, repeat steps 4-7. Otherwise
68  * stop and analyse the return value.
69  *
70  * See #fr_redis_cluster_state_init for example code.
71  *
72  * Structures
73  * ----------
74  *
75  * This code maintains a series structures for efficient lookup and lockless operations.
76  *
77  * The important ones are:
78  * - An array of #cluster_node_t. These are pre-allocated on startup and are
79  * never added to, or removed from.
80  * - An #fr_fifo_t. This contains the queue of nodes that may be re-used.
81  * - An #rbtree_t. This contains a tree of nodes which are active. The tree is built on IP
82  * address and port.
83  *
84  * Each #cluster_node_t contains a master ID, and an array of slave IDs. The IDs are array
85  * indexes in the fr_redis_cluster_t.node array. We use 8bit unsigned integers instead of
86  * pointers to save space. Using pointers, the node[] array would need 784K, using IDs
87  * it uses 112K. Still not light on memory, but a bit more acceptable.
88  * Currently the key_slot array is shadowed by key_slot_pending, used to stage new key_slot
89  * mappings. This doubles the memory used. We may want to consider allocating key_slot_pending
90  * only during remappings and freeing it after.
91  *
92  * Mapping/Remapping the cluster
93  * -----------------------------
94  *
95  * On startup, and during cluster operation, a remap may be performed. A remap involves
96  * the following steps:
97  *
98  * 1. Executing the Redis 'cluster slots' command.
99  * 2. Validating the result of this command. We need to do extensive validation to
100  * avoid SEGV on invalid data, due to the way libhiredis presents the result.
101  * 3. Determining the intersection between nodes described in the result, and those already
102  * in our #rbtree_t.
103  * 4. Connecting to nodes that were in the result, but not in the tree.
104  * Note: If we can't connect to any of the masters, we count the map as invalid, roll
105  * back any newly connected nodes, and error out. Slave failure is OK.
106  * 5. Mapping keyslot ranges to nodes in the key_slot_pending array.
107  * 6. Verifying there are no holes in the ranges (if there are, we roll back and error out).
108  * 7. Applying the new keyslot ranges.
109  * 8. Removing nodes no longer used by the key slots, and adding them back to the free
110  * nodes queue.
111  *
112  * #cluster_map_get and #cluster_map_apply, perform the operations described
113  * above. The get function, issues the 'cluster slots' command and performs validation, the
114  * apply function processes and applys the map.
115  *
116  * Failing to apply a map is not a fatal error at runtime, and is only fatal on startup if
117  * pool.start > 0.
118  *
119  * The cluster client can continue to operate, albeit inefficiently, with a stale cluster map
120  * by following '-ASK' and '-MOVE' redirects.
121  *
122  * Remaps are limited to one per second. If any operation sets the remap_needed flag, or
123  * attempts a remap directly, the remap may be skipped if one occurred recently.
124  *
125  *
126  * Processing '-ASK' and '-MOVE' redirects
127  * ---------------------------------------
128  *
129  * The code treats '-ASK' (temporary redirect) and '-MOVE' (permanent redirect) responses
130  * similarly. If the node is known, then a connection is reserved from its pool, if the node
131  * is not known, a new pool is established, and a connection reserved.
132  *
133  * The difference between '-ASK' and '-MOVE' is that '-MOVE' attempts a cluster remap before
134  * following the redirect.
135  *
136  * The data from '-MOVE' responses, is not used to alter the cluster map. That is only done
137  * on successful remap.
138  *
139  *
140  * Processing '-TRYAGAIN'
141  * ----------------------
142  *
143  * If the cluster is in a state of flux, a node may return '-TRYAGAIN' to indicated that we
144  * should attempt the operation again. The cluster spec says we should attempt the operation
145  * after some time. This time is configurable.
146  *
147  */
148 #include "redis.h"
149 #include "cluster.h"
150 #include "crc16.h"
151 #include <freeradius-devel/rad_assert.h>
152 
153 #ifndef HAVE_PTHREAD_H
154 /*
155  * This is easier than ifdef's throughout the code.
156  */
157 # define pthread_mutex_init(_x, _y)
158 # define pthread_mutex_destroy(_x)
159 # define pthread_mutex_lock(_x)
160 # define pthread_mutex_unlock(_x)
161 #endif
162 
163 #define KEY_SLOTS 16384 //!< Maximum number of keyslots (should not change).
164 
165 #define MAX_SLAVES 5 //!< Maximum number of slaves associated
166  //!< with a keyslot.
167 
168 /*
169  * Periods and weights for live node selection
170  */
171 #define CLOSED_PERIOD 10000 //!< How recently must the closed have
172  //!< occurred for us to care.
173 
174 #define CLOSED_WEIGHT 1 //!< What weight to give to nodes that
175  //!< had a connection closed recently.
176 
177 #define FAILED_PERIOD 10000 //!< How recently must the spawn failure
178  //!< occurred for us to care.
179 
180 #define FAILED_WEIGHT 1 //!< What weight to give to nodes that
181  //!< had a spawn failure recently.
182 
183 #define RELEASED_PERIOD 10000 //!< Period after which we don't care
184  //!< about when the last connection was
185  //!< released.
186 
187 #define RELEASED_MIN_WEIGHT 1000 //!< Minimum weight to assign to node.
188 
189 /** Return values for internal functions
190  */
191 typedef enum {
192  CLUSTER_OP_IGNORED = 1, //!< Operation ignored.
193  CLUSTER_OP_SUCCESS = 0, //!< Operation completed successfully.
194  CLUSTER_OP_FAILED = -1, //!< Operation failed.
195  CLUSTER_OP_NO_CONNECTION = -2, //!< Operation failed because we couldn't find
196  //!< a live connection.
197  CLUSTER_OP_BAD_INPUT = -3 //!< Validation error.
199 
200 /** Live nodes data, used to perform weighted random selection of alternative nodes
201  */
202 typedef struct cluster_nodes_live {
203  struct {
204  uint8_t id; //!< Node ID.
205  fr_connection_pool_state_t const *pool_state; //!< Connection pool stats.
206  unsigned int cumulative; //!< Cumulative weight.
207  } node[UINT8_MAX - 1]; //!< Array of live node IDs (and weights).
208  uint8_t next; //!< Next index in live.
209  uint8_t skip;
211 
212 /** Configuration for a single node
213  */
214 typedef struct cluster_node_conf {
215  fr_ipaddr_t ipaddr; //!< IP Address of Redis cluster node.
216  uint16_t port; //!< Port of Redis cluster node.
218 
219 /** A Redis cluster node
220  *
221  * Passed as opaque data to pools which open connection to nodes.
222  */
223 typedef struct fr_redis_cluster_node {
224  char name[INET6_ADDRSTRLEN]; //!< Buffer to hold IP + port
225  //!< text for debug messages.
226  bool active; //!< Whether this node is in the active node set.
227  uint8_t id; //!< Node ID (index in node array).
228 
229  cluster_node_addr_t addr; //!< Current node address.
230  cluster_node_addr_t pending_addr; //!< New node address to be applied when the pool
231  //!< is reconnected.
232 
233  fr_redis_conf_t *conf; //!< Commmon configuration (database number,
234  //!< password, etc..).
235  fr_connection_pool_t *pool; //!< Pool associated with this node.
237 
238 /** Indexes in the cluster_node_t array for a single key slot
239  *
240  * When dealing with 16K entries, space is a concern. It's significantly
241  * more memory efficient to use 8bit indexes than 64bit pointers for each
242  * of the key slot to node mappings.
243  */
244 typedef struct cluster_key_slot {
245  uint8_t slave[MAX_SLAVES]; //!< R/O node (slave) for this key slot.
246  uint8_t slave_num; //!< Number of slaves associated with this key slot.
247  uint8_t master; //!< R/W node (master) for this key slot.
249 
250 /** A redis cluster
251  *
252  * Holds all the structures and collections of nodes, to represent a Redis cluster.
253  */
255  bool remapping; //!< True when cluster is being remapped.
256  bool remap_needed; //!< Set true if at least one cluster node is definitely
257  //!< unreachable. Set false on successful remap.
258  time_t last_updated; //!< Last time the cluster mappings were updated.
259  CONF_SECTION *module; //!< Module configuration.
260 
261  fr_redis_conf_t *conf; //!< Base configuration data such as the database number
262  //!< and passwords.
263 
264  cluster_node_t *node; //!< Structure containing a node id, its address and
265  //!< a pool of its connections.
266 
267  fr_fifo_t *free_nodes; //!< Queue of free nodes (or nodes waiting to be reused).
268  rbtree_t *used_nodes; //!< Tree of used nodes.
269 
270  cluster_key_slot_t key_slot[KEY_SLOTS]; //!< Lookup table of slots to pools.
271  cluster_key_slot_t key_slot_pending[KEY_SLOTS]; //!< Pending key slot table.
272 
273 #ifdef HAVE_PTHREAD_H
274  pthread_mutex_t mutex; //!< Mutex to synchronise cluster operations.
275 #endif
276 };
277 
278 
279 /** Resolve key to key slot
280  *
281  * Identical to the example implementation, except it uses memchr which will
282  * be faster, and isn't so needlessly complex.
283  *
284  * @param[in] key to resolve.
285  * @param[in] key_len length of key.
286  * @return key slot index for the key.
287  */
288 static uint16_t cluster_key_hash(uint8_t const *key, size_t key_len)
289 {
290  uint8_t *p, *q;
291 
292  p = memchr(key, '{', key_len);
293  if (!p) {
294  all:
295  return fr_crc16_xmodem(key, key_len) & (KEY_SLOTS - 1);
296  }
297 
298  q = memchr(key, '}', key_len);
299  if (!q || (q < p) || (q == p + 1)) goto all; /* no }, or } before {, or {}, hash everything */
300 
301  p++; /* skip '{' */
302 
303  return fr_crc16_xmodem(p, q - p) & (KEY_SLOTS - 1); /* hash stuff between { and } */
304 }
305 
306 /** Compare two redis nodes to check equality
307  *
308  * @param[in] a first node.
309  * @param[in] b second node.
310  * @return
311  * - 0 if nodes are equal.
312  * - +1 if nodes are unequal.
313  * - -1 if nodes are unequal.
314  */
315 static int _cluster_node_cmp(void const *a, void const *b)
316 {
317  int ret;
318 
319  cluster_node_t const *my_a = a;
320  cluster_node_t const *my_b = b;
321 
322  ret = fr_ipaddr_cmp(&my_a->addr.ipaddr, &my_b->addr.ipaddr);
323  if (ret != 0) return ret;
324 
325  if (my_a->addr.port < my_b->addr.port) return -1;
326  if (my_a->addr.port > my_b->addr.port) return +1;
327 
328  return 0;
329 }
330 
331 /** Reconnect callback to apply new pool config
332  *
333  * @param[in] opaque data passed to the connection pool.
334  */
335 static void _cluster_node_conf_apply(void *opaque)
336 {
337  cluster_node_t *node = opaque;
338  node->addr = node->pending_addr;
339 }
340 
341 /** Establish a connection to a cluster node
342  *
343  * @note Must be called with the cluster mutex locked.
344  * @note Configuration to use for the connection must be set in node->pending_addr, not node->conf.
345  *
346  * @param[in] cluster to search in.
347  * @param[in] node config.
348  * @return
349  * - CLUSTER_OP_SUCCESS on success.
350  * - CLUSTER_OP_FAILED if the operation failed.
351  */
353 {
354  char const *p;
355 
357 
358  /*
359  * Write out the IP address and Port in string form
360  */
362  node->name, sizeof(node->name));
363 #ifndef NDEBUG
364  rad_assert(p);
365 #else
366  UNUSED_VAR(p);
367 #endif
368 
369  /*
370  * Node has never been used before, needs a pool allocated for it.
371  */
372  if (!node->pool) {
373  char buffer[256];
374 
375  snprintf(buffer, sizeof(buffer), "%s [%i]", cluster->conf->prefix, node->id);
376 
377  node->addr = node->pending_addr;
378  node->pool = fr_connection_pool_init(cluster, cf_section_sub_find(cluster->module, "pool"), node,
379  fr_redis_cluster_conn_create, NULL, buffer, NULL);
380  if (!node->pool) return CLUSTER_OP_FAILED;
382  return CLUSTER_OP_SUCCESS;
383  }
384 
385  /*
386  * Apply the new config to the possibly live pool
387  */
388  if (fr_connection_pool_reconnect(node->pool) < 0) return CLUSTER_OP_FAILED;
389 
390  return CLUSTER_OP_SUCCESS;
391 }
392 
393 /** Parse a -MOVED or -ASK redirect
394  *
395  * Converts the body of the -MOVED or -ASK error into an IPv4/6 address and port.
396  *
397  * @param[out] key_slot value extracted from redirect string (may be NULL).
398  * @param[out] node_addr Redis node ipaddr and port extracted from redirect string.
399  * @param[in] redirect to process.
400  * @return
401  * - CLUSTER_OP_SUCCESS on success.
402  * - CLUSTER_OP_BAD_INPUT if the server returned an invalid redirect.
403  */
405  redisReply *redirect)
406 {
407  char *p, *q;
408  unsigned long key;
409  uint16_t port;
410  fr_ipaddr_t ipaddr;
411 
412  rad_assert(redirect && (redirect->type == REDIS_REPLY_ERROR));
413 
414  p = redirect->str;
415  if (strncmp(REDIS_ERROR_MOVED_STR, redirect->str, sizeof(REDIS_ERROR_MOVED_STR) - 1) == 0) {
416  q = p + sizeof(REDIS_ERROR_MOVED_STR); /* not a typo, skip space too */
417  } else if (strncmp(REDIS_ERROR_ASK_STR, redirect->str, sizeof(REDIS_ERROR_ASK_STR) - 1) == 0) {
418  q = p + sizeof(REDIS_ERROR_ASK_STR); /* not a typo, skip space too */
419  } else {
420  fr_strerror_printf("No '-MOVED' or '-ASK' prefix");
421  return CLUSTER_OP_BAD_INPUT;
422  }
423  if ((q - p) >= redirect->len) {
424  fr_strerror_printf("Truncated");
425  return CLUSTER_OP_BAD_INPUT;
426  }
427  p = q;
428  key = strtoul(p, &q, 10);
429  if (key > KEY_SLOTS) {
430  fr_strerror_printf("Key %lu outside of redis slot range", key);
431  return CLUSTER_OP_BAD_INPUT;
432  }
433  p = q;
434 
435  if (*p != ' ') {
436  fr_strerror_printf("Missing key/host separator");
437  return CLUSTER_OP_BAD_INPUT;
438  }
439  p++; /* Skip the ' ' */
440 
441  if (fr_inet_pton_port(&ipaddr, &port, p, redirect->len - (p - redirect->str), AF_UNSPEC, false, true) < 0) {
442  return CLUSTER_OP_BAD_INPUT;
443  }
444  rad_assert(ipaddr.af);
445 
446  if (key_slot) *key_slot = key;
447  if (node_addr) {
448  node_addr->ipaddr = ipaddr;
449  node_addr->port = port;
450  }
451 
452  return CLUSTER_OP_SUCCESS;
453 }
454 
455 /** Apply a cluster map received from a cluster node
456  *
457  * @note Errors may be retrieved with fr_strerror().
458  * @note Must be called with the cluster mutex held.
459  *
460  * Key slot range structure
461  @verbatim
462  [0] -> key slot range 0
463  [0] -> key_slot_start
464  [1] -> key_slot_end
465  [2] -> master_node
466  [0] -> master 0 ip (string)
467  [1] -> master 0 port (number)
468  [3..n] -> slave_node(s)
469  [1] -> key slot range 1)
470  [0] -> key_slot_start
471  [1] -> key_slot_end
472  [2] -> master_node
473  [0] -> master 1 ip (string)
474  [1] -> master 1 port (number)
475  [3..n] -> slave_node(s)
476  [n] -> key slot range n
477  [0] -> key_slot_start
478  [1] -> key_slot_end
479  [2] -> master_node
480  [0] -> master n ip (string)
481  [1] -> master n port (number)
482  [3..n] -> slave_node(s)
483  @endverbatim
484  *
485  * @param[in,out] cluster to apply map to.
486  * @param[in] reply from #cluster_map_get.
487  * @return
488  * - CLUSTER_OP_SUCCESS on success.
489  * - CLUSTER_OP_FAILED on failure.
490  * - CLUSTER_OP_NO_CONNECTION connection failure.
491  * - CLUSTER_OP_BAD_INPUT if the map didn't provide nodes for all keyslots.
492  */
493 static cluster_rcode_t cluster_map_apply(fr_redis_cluster_t *cluster, redisReply *reply)
494 {
495  size_t i;
496  uint8_t r = 0;
497 
498  cluster_rcode_t rcode;
499 
500  uint8_t rollback[UINT8_MAX]; // Set of nodes to re-add to the queue on failure.
501  bool active[UINT8_MAX]; // Set of nodes active in the new cluster map.
502 
503 #ifndef NDEBUG
504 # define SET_ADDR(_addr, _map) \
505 do { \
506  int _ret; \
507  _ret = fr_inet_pton(&_addr.ipaddr, _map->element[0]->str, _map->element[0]->len, AF_UNSPEC, false, true);\
508  rad_assert(_ret == 0);\
509  _addr.port = _map->element[1]->integer; \
510 } while (0)
511 #else
512 # define SET_ADDR(_addr, _map) \
513 do { \
514  fr_inet_pton(&_addr.ipaddr, _map->element[0]->str, _map->element[0]->len, AF_UNSPEC, false, true);\
515  _addr.port = _map->element[1]->integer; \
516 } while (0)
517 #endif
518 
519 #define SET_INACTIVE(_node) \
520 do { \
521  (_node)->active = false; \
522  rbtree_deletebydata(cluster->used_nodes, _node); \
523  fr_fifo_push(cluster->free_nodes, _node); \
524 } while (0)
525 
526 #define SET_ACTIVE(_node) \
527 do { \
528  (_node)->active = true; \
529  rbtree_insert(cluster->used_nodes, _node); \
530  fr_fifo_pop(cluster->free_nodes); \
531  active[(_node)->id] = true; \
532  rollback[r++] = (_node)->id; \
533 } while (0)
534 
535  rad_assert(reply->type == REDIS_REPLY_ARRAY);
536 
537  memset(&rollback, 0, sizeof(rollback));
538  memset(active, 0, sizeof(active));
539 
540  cluster->remapping = true;
541 
542  /*
543  * Must be cleared with the mutex held
544  */
545  memset(&cluster->key_slot_pending, 0, sizeof(cluster->key_slot_pending));
546 
547  /*
548  * Insert new nodes and markup the keyslot indexes
549  * in our temporary keyslot_array.
550  *
551  * A map consists of an array with the following indexes:
552  * [0] -> key_slot_start
553  * [1] -> key_slot_end
554  * [2] -> master_node
555  * [3..n] -> slave_node(s)
556  */
557  for (i = 0; i < reply->elements; i++) {
558  size_t j;
559  long long int k;
560  int slaves = 0;
561  cluster_node_t *found, *spare;
562  cluster_node_t find;
563  cluster_key_slot_t tmpl_slot;
564  redisReply *map = reply->element[i];
565 
566  memset(&tmpl_slot, 0, sizeof(tmpl_slot));
567 
568  SET_ADDR(find.addr, map->element[2]);
569  found = rbtree_finddata(cluster->used_nodes, &find);
570  if (found) {
571  active[found->id] = true;
572  goto skip_master;
573  }
574 
575  /*
576  * Process the master
577  *
578  * A master node consists of any array with the following indexes:
579  * [0] -> node ip (as string)
580  * [1] -> node port
581  */
582  spare = fr_fifo_peek(cluster->free_nodes);
583  if (!spare) {
584  out_of_nodes:
585  fr_strerror_printf("Reached maximum connected nodes");
586  rcode = CLUSTER_OP_FAILED;
587  error:
588  cluster->remapping = false;
589  cluster->last_updated = time(NULL);
590  /* Re-insert new nodes back into the free_nodes queue */
591  for (i = 0; i < r; i++) SET_INACTIVE(&cluster->node[rollback[i]]);
592  return rcode;
593  }
594 
595  spare->pending_addr = find.addr;
596  rcode = cluster_node_connect(cluster, spare);
597  if (rcode < 0) goto error;
598 
599  /*
600  * Check to see if the node we just configured
601  * already exists in the tree. If it does we
602  * use that, else we add it to the array of
603  * nodes to rollback on failure.
604  */
605  SET_ACTIVE(spare);
606  found = spare;
607 
608  skip_master:
609  tmpl_slot.master = found->id;
610 
611  /*
612  * Process the slaves
613  *
614  * A slave node consists of any array with the following indexes:
615  * [0] -> node ip (as string)
616  * [1] -> node port
617  */
618  for (j = 3; (j < map->elements); j++) {
619  SET_ADDR(find.addr, map->element[j]);
620  found = rbtree_finddata(cluster->used_nodes, &find);
621  if (found) {
622  active[found->id] = true;
623  goto next;
624  }
625 
626  spare = fr_fifo_peek(cluster->free_nodes);
627  if (!spare) goto out_of_nodes;
628 
629  spare->pending_addr = find.addr;
630  if (cluster_node_connect(cluster, spare) < 0) continue; /* Slave failure is non-fatal */
631 
632  SET_ACTIVE(spare);
633  found = spare;
634 
635  next:
636  tmpl_slot.slave[slaves++] = found->id;
637 
638  /* Hit the maximum number of slaves we allow */
639  if (slaves >= MAX_SLAVES) break;
640  }
641  tmpl_slot.slave_num = slaves;
642 
643  /*
644  * Copy our tmpl key slot to each of the key slots
645  * specified by the range for this map.
646  */
647  for (k = map->element[0]->integer; k <= map->element[1]->integer; k++) {
648  memcpy(&cluster->key_slot_pending[k], &tmpl_slot,
649  sizeof(*(cluster->key_slot_pending)));
650  }
651  }
652 
653  /*
654  * Check for holes in the pending_addr key_slot array
655  *
656  * The cluster specification says that upon
657  * detecting a 'NULL' key_slot we should
658  * check again to see if the cluster error has
659  * been resolved, but seeing as we're in the
660  * middle of updating the cluster from very
661  * recent output of 'cluster slots' it's best to
662  * error out.
663  */
664  for (i = 0; i < KEY_SLOTS; i++) {
665  if (cluster->key_slot_pending[i].master == 0) {
666  fr_strerror_printf("Cluster is misconfigured, no node assigned for key %zu", i);
667  rcode = CLUSTER_OP_BAD_INPUT;
668  goto error;
669  }
670  }
671 
672  /*
673  * We have connections/pools for all the nodes in
674  * the new map, apply it to the live cluster.
675  *
676  * Other workers may be using the key slot table,
677  * but that's ok. Nodes and pools are never freed,
678  * so the worst that will happen, is they'll hit
679  * the wrong node for the key, and get redirected.
680  */
681  memcpy(&cluster->key_slot, &cluster->key_slot_pending, sizeof(cluster->key_slot));
682 
683  /*
684  * Anything not in the active set of nodes gets
685  * added back into the queue, to be re-used.
686  *
687  * We start at 1, as node 0 is reserved.
688  */
689  for (i = 1; i < cluster->conf->max_nodes; i++) {
690 #ifndef NDEBUG
691  cluster_node_t *found;
692 
693  if (cluster->node[i].active) {
694  /* Sanity check for duplicates that are active */
695  found = rbtree_finddata(cluster->used_nodes, &cluster->node[i]);
696  rad_assert(found);
697  rad_assert(found->active);
698  rad_assert(found->id == i);
699  }
700 #endif
701 
702  if (!active[i] && cluster->node[i].active) SET_INACTIVE(&cluster->node[i]);
703  }
704 
705  cluster->remapping = false;
706  cluster->last_updated = time(NULL);
707 
708  /*
709  * Sanity checks
710  */
711  rad_assert(((talloc_array_length(cluster->node) - 1) - rbtree_num_elements(cluster->used_nodes)) ==
712  fr_fifo_num_elements(cluster->free_nodes));
713 
714  return CLUSTER_OP_SUCCESS;
715 }
716 
717 /** Validate a cluster map node entry
718  *
719  * @note Errors may be retrieved with fr_strerror().
720  * @note In a separate function, as it's called for both master and slave nodes.
721  *
722  * @param[in] node we're validating.
723  * @param[in] map_idx we're processing.
724  * @param[in] node_idx we're processing.
725  * @return
726  * - CLUSTER_OP_SUCCESS on success.
727  * - CLUSTER_OP_BAD_INPUT on validation failure (bad data returned from Redis).
728  */
729 static int cluster_map_node_validate(redisReply *node, int map_idx, int node_idx)
730 {
731  fr_ipaddr_t ipaddr;
732 
733  if (node->type != REDIS_REPLY_ARRAY) {
734  fr_strerror_printf("Cluster map %i node %i is wrong type, expected array got %s",
735  map_idx, node_idx,
736  fr_int2str(redis_reply_types, node->element[1]->type, "<UNKNOWN>"));
737  return CLUSTER_OP_BAD_INPUT;
738  }
739 
740  if (node->elements != 2) {
741  fr_strerror_printf("Cluster map %i node %i has incorrect number of elements, expected 2 got %zu",
742  map_idx, node_idx, node->elements);
743  return CLUSTER_OP_BAD_INPUT;
744  }
745 
746  if (node->element[0]->type != REDIS_REPLY_STRING) {
747  fr_strerror_printf("Cluster map %i node %i ip address is wrong type, expected string got %s",
748  map_idx, node_idx,
749  fr_int2str(redis_reply_types, node->element[0]->type, "<UNKNOWN>"));
750  return CLUSTER_OP_BAD_INPUT;
751  }
752 
753  if (fr_inet_pton(&ipaddr, node->element[0]->str, node->element[0]->len, AF_UNSPEC, false, true) < 0) {
754  return CLUSTER_OP_BAD_INPUT;
755  }
756 
757  if (node->element[1]->type != REDIS_REPLY_INTEGER) {
758  fr_strerror_printf("Cluster map %i node %i port is wrong type, expected integer got %s",
759  map_idx, node_idx,
760  fr_int2str(redis_reply_types, node->element[1]->type, "<UNKNOWN>"));
761  return CLUSTER_OP_BAD_INPUT;
762  }
763 
764  if (node->element[1]->integer < 0) {
765  fr_strerror_printf("Cluster map %i node %i port is too low, expected >= 0 got %lli",
766  map_idx, node_idx, node->element[1]->integer);
767  return CLUSTER_OP_BAD_INPUT;
768  }
769 
770  if (node->element[1]->integer > UINT16_MAX) {
771  fr_strerror_printf("Cluster map %i node %i port is too high, expected <= " STRINGIFY(UINT16_MAX)" "
772  "got %lli", map_idx, node_idx, node->element[1]->integer);
773  return CLUSTER_OP_BAD_INPUT;
774  }
775 
776  return CLUSTER_OP_SUCCESS;
777 }
778 
779 /** Learn a new cluster layout by querying the node that issued the -MOVE
780  *
781  * Also validates the response from the Redis cluster, so we can be sure that
782  * it's well formed, before doing more expensive operations.
783  *
784  * @note Errors may be retrieved with fr_strerror().
785  *
786  * @param[out] out Where to write cluster map.
787  * @param[in] conn to use for learning the new cluster map.
788  * @return
789  * - CLUSTER_OP_IGNORED if 'cluster slots' returned an error (indicating clustering not supported).
790  * - CLUSTER_OP_SUCCESS on success.
791  * - CLUSTER_OP_FAILED if issuing the command resulted in an error.
792  * - CLUSTER_OP_NO_CONNECTION connection failure.
793  * - CLUSTER_OP_BAD_INPUT on validation failure (bad data returned from Redis).
794  */
795 static cluster_rcode_t cluster_map_get(redisReply **out, fr_redis_conn_t *conn)
796 {
797  redisReply *reply;
798  size_t i = 0;
799 
800  *out = NULL;
801 
802  reply = redisCommand(conn->handle, "cluster slots");
803  switch (fr_redis_command_status(conn, reply)) {
805  fr_redis_reply_free(reply);
806  fr_strerror_printf("No connections available");
808 
809  case REDIS_RCODE_ERROR:
810  default:
811  if (reply && reply->type == REDIS_REPLY_ERROR) {
812  fr_redis_reply_free(reply);
813  fr_strerror_printf("%.*s", (int)reply->len, reply->str);
814  return CLUSTER_OP_IGNORED;
815  }
816  fr_strerror_printf("Unknown client error");
817  return CLUSTER_OP_FAILED;
818 
819  case REDIS_RCODE_SUCCESS:
820  break;
821  }
822 
823  if (reply->type != REDIS_REPLY_ARRAY) {
824  fr_strerror_printf("Bad response to \"cluster slots\" command, expected array got %s",
825  fr_int2str(redis_reply_types, reply->type, "<UNKNOWN>"));
826  return CLUSTER_OP_BAD_INPUT;
827  }
828 
829  /*
830  * Clustering configured but no slots set
831  */
832  if (reply->elements == 0) {
833  fr_strerror_printf("Empty response to \"cluster slots\" command (zero length array)");
834  return CLUSTER_OP_BAD_INPUT;
835  }
836 
837  /*
838  * Validate the complete map set before returning.
839  */
840  for (i = 0; i < reply->elements; i++) {
841  size_t j;
842  redisReply *map;
843 
844  map = reply->element[i];
845  if (map->type != REDIS_REPLY_ARRAY) {
846  fr_strerror_printf("Cluster map %zu is wrong type, expected array got %s",
847  i, fr_int2str(redis_reply_types, map->type, "<UNKNOWN>"));
848  error:
849  fr_redis_reply_free(reply);
850  return CLUSTER_OP_BAD_INPUT;
851  }
852 
853  if (map->elements < 3) {
854  fr_strerror_printf("Cluster map %zu has too few elements, expected at least 3, got %zu",
855  i, map->elements);
856  goto error;
857  }
858 
859  /*
860  * Key slot start
861  */
862  if (map->element[0]->type != REDIS_REPLY_INTEGER) {
863  fr_strerror_printf("Cluster map %zu key slot start is wrong type, expected integer got %s",
864  i, fr_int2str(redis_reply_types, map->element[0]->type, "<UNKNOWN>"));
865  goto error;
866  }
867 
868  if (map->element[0]->integer < 0) {
869  fr_strerror_printf("Cluster map %zu key slot start is too low, expected >= 0 got %lli",
870  i, map->element[0]->integer);
871  goto error;
872  }
873 
874  if (map->element[0]->integer > KEY_SLOTS) {
875  fr_strerror_printf("Cluster map %zu key slot start is too high, expected <= "
876  STRINGIFY(KEY_SLOTS) " got %lli", i, map->element[0]->integer);
877  goto error;
878  }
879 
880  /*
881  * Key slot end
882  */
883  if (map->element[1]->type != REDIS_REPLY_INTEGER) {
884  fr_strerror_printf("Cluster map %zu key slot end is wrong type, expected integer got %s",
885  i, fr_int2str(redis_reply_types, map->element[1]->type, "<UNKNOWN>"));
886  goto error;
887  }
888 
889  if (map->element[1]->integer < 0) {
890  fr_strerror_printf("Cluster map %zu key slot end is too low, expected >= 0 got %lli",
891  i, map->element[1]->integer);
892  goto error;
893  }
894 
895  if (map->element[1]->integer > KEY_SLOTS) {
896  fr_strerror_printf("Cluster map %zu key slot end is too high, expected <= "
897  STRINGIFY(KEY_SLOTS) " got %lli", i, map->element[1]->integer);
898  goto error;
899  }
900 
901  if (map->element[1]->integer < map->element[0]->integer) {
902  fr_strerror_printf("Cluster map %zu key slot start/end out of order. "
903  "Start was %lli, end was %lli", i, map->element[0]->integer,
904  map->element[1]->integer);
905  goto error;
906  }
907 
908  /*
909  * Master node
910  */
911  if (cluster_map_node_validate(map->element[2], i, 0) < 0) goto error;
912 
913  /*
914  * Slave nodes
915  */
916  for (j = 3; j < map->elements; j++) {
917  if (cluster_map_node_validate(map->element[j], i, j - 2) < 0) goto error;
918  }
919  }
920  *out = reply;
921 
922  return CLUSTER_OP_SUCCESS;
923 }
924 
925 /** Perform a runtime remap of the cluster
926  *
927  * @note Errors may be retrieved with fr_strerror().
928  * @note Must be called with the cluster mutex free.
929  *
930  * @param[in] request The current request.
931  * @param[in,out] cluster to remap.
932  * @param[in] conn to use to query the cluster.
933  * @return
934  * - CLUSTER_OP_IGNORED if 'cluster slots' returned an error (indicating clustering not supported).
935  * - CLUSTER_OP_SUCCESS on success.
936  * - CLUSTER_OP_FAILED if issuing the 'cluster slots' command resulted in a protocol error.
937  * - CLUSTER_OP_NO_CONNECTION connection failure.
938  * - CLUSTER_OP_BAD_INPUT on validation failure (bad data returned from Redis).
939  */
941 {
942  time_t now;
943  redisReply *map;
944  cluster_rcode_t ret;
945  size_t i, j;
946 
947  /*
948  * If the cluster was remapped very recently, or is being
949  * remapped it's unlikely that it needs remapping again.
950  */
951  if (cluster->remapping) {
952  in_progress:
953  RDEBUG("Cluster remapping in progress, ignoring remap request");
954  return CLUSTER_OP_IGNORED;
955  }
956 
957  now = time(NULL);
958  if (now == cluster->last_updated) {
959  too_soon:
960  RDEBUG("Cluster was updated less than a second ago, ignoring remap request");
961  return CLUSTER_OP_IGNORED;
962  }
963 
964  RINFO("Initiating cluster remap");
965 
966  /*
967  * Get new cluster information
968  */
969  ret = cluster_map_get(&map, conn);
970  switch (ret) {
971  case CLUSTER_OP_BAD_INPUT: /* Validation error */
972  case CLUSTER_OP_NO_CONNECTION: /* Connection error */
973  case CLUSTER_OP_FAILED: /* Error issuing command */
974  return ret;
975 
976  case CLUSTER_OP_IGNORED: /* Clustering not enabled, or not supported */
977  cluster->remap_needed = false;
978  return CLUSTER_OP_IGNORED;
979 
980  case CLUSTER_OP_SUCCESS: /* Success */
981  break;
982  }
983 
984  /*
985  * Print the mapping we received
986  */
987  RINFO("Cluster map consists of %zu key ranges", map->elements);
988  for (i = 0; i < map->elements; i++) {
989  redisReply *map_node = map->element[i];
990 
991  RINFO("%zu - keys %lli-%lli", i,
992  map_node->element[0]->integer,
993  map_node->element[1]->integer);
994 
995  RINDENT();
996  RINFO("master: %s:%lli",
997  map_node->element[2]->element[0]->str,
998  map_node->element[2]->element[1]->integer);
999  for (j = 3; j < map_node->elements; j++) {
1000  RINFO("slave%zu: %s:%lli", j - 3,
1001  map_node->element[j]->element[0]->str,
1002  map_node->element[j]->element[1]->integer);
1003  }
1004  REXDENT();
1005  }
1006 
1007  /*
1008  * Check again that the cluster isn't being
1009  * remapped, or was remapped too recently,
1010  * now we hold the mutex and the state of
1011  * those variables is synchronized.
1012  */
1013  pthread_mutex_lock(&cluster->mutex);
1014  if (cluster->remapping) {
1015  pthread_mutex_unlock(&cluster->mutex);
1016  goto in_progress;
1017  }
1018  if (now == cluster->last_updated) {
1019  pthread_mutex_unlock(&cluster->mutex);
1020  goto too_soon;
1021  }
1022  ret = cluster_map_apply(cluster, map);
1023  if (ret == CLUSTER_OP_SUCCESS) cluster->remap_needed = false; /* Change on successful remap */
1024  pthread_mutex_unlock(&cluster->mutex);
1025 
1026  fr_redis_reply_free(map); /* Free the map */
1027  if (ret < 0) return CLUSTER_OP_FAILED;
1028 
1029  return CLUSTER_OP_SUCCESS;
1030 }
1031 
1032 /** Retrieve or associate a node with the server indicated in the redirect
1033  *
1034  * @note Errors may be retrieved with fr_strerror().
1035  *
1036  * @param[out] out Where to write the node representing the redirect server.
1037  * @param[in] cluster to draw node from.
1038  * @param[in] reply Redis reply containing the redirect information.
1039  * @return
1040  * - CLUSTER_OP_SUCCESS on success.
1041  * - CLUSTER_OP_FAILED no more nodes available.
1042  * - CLUSTER_OP_NO_CONNECTION connection failure.
1043  * - CLUSTER_OP_BAD_INPUT on validation failure (bad data returned from Redis).
1044  */
1045 static cluster_rcode_t cluster_redirect(cluster_node_t **out, fr_redis_cluster_t *cluster, redisReply *reply)
1046 {
1047  cluster_node_t find, *found, *spare;
1048  fr_redis_conn_t *rconn;
1049 
1050  uint16_t key;
1051 
1052  memset(&find, 0, sizeof(find));
1053 
1054  *out = NULL;
1055 
1056  if (cluster_node_conf_from_redirect(&key, &find.addr, reply) < 0) return CLUSTER_OP_FAILED;
1057 
1058  pthread_mutex_lock(&cluster->mutex);
1059  /*
1060  * If we have already have a pool for the
1061  * host we were redirected to, use that.
1062  */
1063  found = rbtree_finddata(cluster->used_nodes, &find);
1064  if (found) {
1065  /* We have the new pool, don't need to hold the lock */
1066  pthread_mutex_unlock(&cluster->mutex);
1067  *out = found;
1068  return CLUSTER_OP_SUCCESS;
1069  }
1070 
1071  /*
1072  * Otherwise grab a free node and try and connect
1073  * it to the server we were redirected to.
1074  */
1075  spare = fr_fifo_peek(cluster->free_nodes);
1076  if (!spare) {
1077  fr_strerror_printf("Reached maximum connected nodes");
1078  pthread_mutex_unlock(&cluster->mutex);
1079  return CLUSTER_OP_FAILED;
1080  }
1081  spare->pending_addr = find.addr; /* Set the config to be applied */
1082  if (cluster_node_connect(cluster, spare) < 0) {
1083  pthread_mutex_unlock(&cluster->mutex);
1084  return CLUSTER_OP_NO_CONNECTION;
1085  }
1086  rbtree_insert(cluster->used_nodes, spare);
1087  fr_fifo_pop(cluster->free_nodes);
1088  found = spare;
1089 
1090  /* We have the new pool, don't need to hold the lock */
1091  pthread_mutex_unlock(&cluster->mutex);
1092 
1093  /*
1094  * Determine if we can establish a connection to
1095  * the new pool, to check if it's viable.
1096  */
1097  rconn = fr_connection_get(found->pool);
1098  if (!rconn) {
1099  /*
1100  * To prevent repeated misconfigurations
1101  * using all free nodes, add the node
1102  * back to the spare queue if this
1103  * was the first connection attempt and
1104  * it failed.
1105  */
1106  pthread_mutex_lock(&cluster->mutex);
1107  fr_fifo_push(cluster->free_nodes, spare);
1108  pthread_mutex_unlock(&cluster->mutex);
1109 
1110  fr_strerror_printf("No connections available");
1111  return CLUSTER_OP_NO_CONNECTION;
1112  }
1113  fr_connection_release(found->pool, rconn);
1114  *out = found;
1115 
1116  return CLUSTER_OP_SUCCESS;
1117 }
1118 
1119 /** Walk all used pools adding them to the live node list
1120  *
1121  * @param context Where to write the node we found.
1122  * @param data node to check.
1123  * @return
1124  * - 0 continue walking.
1125  * - -1 found suitable node.
1126  */
1127 static int _cluster_pool_walk(void *context, void *data)
1128 {
1129  cluster_nodes_live_t *live = context;
1130  cluster_node_t *node = data;
1131 
1132  rad_assert(node->pool);
1133 
1134  if (live->skip == node->id) return 0; /* Skip the dead node */
1135 
1136  live->node[live->next].pool_state = fr_connection_pool_state(node->pool);
1137  live->node[live->next++].id = node->id;
1138 
1139  return 0;
1140 }
1141 
1142 /** Try to determine the health of a cluster node passively by examining its pool state
1143  *
1144  * Returns an integer value representing the likelihood that the pool is live.
1145  * Range is between 1 and 11,000.
1146  *
1147  * If a weight of 1 is returned, connections from the pool should be checked
1148  * (by pinging) before use.
1149  *
1150  * @param now The current time.
1151  * @param state of the connection pool.
1152  * @return
1153  * - 1 the pool is very likely to be bad.
1154  * - 2-11000 the pool is likely to be good, with a higher number
1155  * indicating higher probability of liveness.
1156  */
1157 static int cluster_node_pool_health(struct timeval const *now, fr_connection_pool_state_t const *state)
1158 {
1159  struct timeval diff;
1160  uint64_t diff_ms;
1161 
1162  /*
1163  * Failed spawn recently, probably bad
1164  */
1165  if ((((time_t)now->tv_sec - state->last_failed) * 1000) < FAILED_PERIOD) return FAILED_WEIGHT;
1166 
1167  /*
1168  * Closed recently, probably bad
1169  */
1170  fr_timeval_subtract(&diff, now, &state->last_closed);
1171  diff_ms = FR_TIMEVAL_TO_MS(&diff);
1172  if (diff_ms < CLOSED_PERIOD) return CLOSED_WEIGHT;
1173 
1174  /*
1175  * Released too long ago, don't know
1176  */
1177  fr_timeval_subtract(&diff, now, &state->last_released);
1178  diff_ms = FR_TIMEVAL_TO_MS(&diff);
1179  if (diff_ms > RELEASED_PERIOD) return RELEASED_MIN_WEIGHT;
1180 
1181  /*
1182  * Released not long ago, might be ok.
1183  */
1184  return RELEASED_MIN_WEIGHT + (RELEASED_PERIOD - diff_ms);
1185 }
1186 
1187 /** Issue a ping request against a cluster node
1188  *
1189  * Establishes whether the connection to the node we have is live.
1190  *
1191  * @param request The current request.
1192  * @param node to ping.
1193  * @param conn the connection to ping on.
1194  * @return
1195  * - CLUSTER_OP_BAD_INPUT if we got a bad response.
1196  * - CLUSTER_OP_SUCCESS on success.
1197  * - CLUSTER_OP_NO_CONNECTION on connection down.
1198  */
1200 {
1201  redisReply *reply;
1202  fr_redis_rcode_t rcode;
1203 
1204  RDEBUG2("[%i] Executing command: PING", node->id);
1205  reply = redisCommand(conn->handle, "PING");
1206  rcode = fr_redis_command_status(conn, reply);
1207  if (rcode != REDIS_RCODE_SUCCESS) {
1208  RERROR("[%i] PING failed to %s:%i: %s", node->id, node->name,
1209  node->addr.port, fr_strerror());
1210  fr_redis_reply_free(reply);
1211  return CLUSTER_OP_NO_CONNECTION;
1212  }
1213 
1214  if (reply->type != REDIS_REPLY_STATUS) {
1215  RERROR("[%i] Bad PING response from %s:%i, expected status got %s",
1216  node->id, node->name, node->addr.port,
1217  fr_int2str(redis_reply_types, reply->type, "<UNKNOWN>"));
1218  fr_redis_reply_free(reply);
1219  return CLUSTER_OP_BAD_INPUT;
1220  }
1221 
1222  RDEBUG2("[%i] Got response: %s", node->id, reply->str);
1223  fr_redis_reply_free(reply);
1224  return CLUSTER_OP_SUCCESS;
1225 }
1226 
1227 /** Attempt to find a live pool in the cluster
1228  *
1229  * The intent here is to find pools/nodes where a connection was released the shortest
1230  * time ago. Having a connection be released (vs closed) indicates that the pool is live.
1231  *
1232  * We don't want to have all workers try and grab a connection to this node however, as it
1233  * may still be dead (we don't know).
1234  *
1235  * So we use an inverse transform sample, to weight the nodes, based on time between now
1236  * and when the connection was released. Connections released closest to the current
1237  * time are given a higher weighting.
1238  *
1239  * Weight range is between 1 - 11,000.
1240  *
1241  * - If released > 10.0 seconds ago,information is not valid, weight 500.
1242  * - If closed < 10.0 seconds ago, it's a bad pool, weight 1.
1243  * - If spawn failed < 10.0 seconds ago, it's a bad pool, weight 1.
1244  * - If a connection was released 0.0 seconds ago, weight 11,000.
1245  * - If a connection was released 10.0 seconds ago, weight 1000.
1246  *
1247  * Using the above algorithm we use the experience of other workers using the cluster to
1248  * inform our alternative node selection.
1249  *
1250  * Suggestions on improving live node selection appreciated.
1251  *
1252  * Inverse transform sampling based roughly on the solution from this post:
1253  * http://stackoverflow.com/questions/17250568/randomly-choosing-from-a-list-with-weighted-probabilities
1254  *
1255  * Wikipedia page here:
1256  * https://en.wikipedia.org/wiki/Inverse_transform_sampling
1257  *
1258  * @note Must be called with the cluster mutex free.
1259  *
1260  * @param[out] live_node we found.
1261  * @param[out] live_conn to that node.
1262  * @param[in] request The current request (used for logging).
1263  * @param[in] cluster to search for live pools in.
1264  * @param[in] skip this node (it's bad).
1265  * @return 0 (iterates over the whole tree).
1266  */
1267 static int cluster_node_find_live(cluster_node_t **live_node, fr_redis_conn_t **live_conn,
1268  REQUEST *request, fr_redis_cluster_t *cluster, cluster_node_t *skip)
1269 {
1270  uint32_t i;
1271 
1272  cluster_nodes_live_t *live;
1273  struct timeval now;
1274 
1275  RDEBUG2("Searching for live cluster nodes");
1276 
1277  if (rbtree_num_elements(cluster->used_nodes) == 1) {
1278  no_alts:
1279  RERROR("No alternative nodes available");
1280  return -1;
1281  }
1282 
1283  live = talloc_zero(NULL, cluster_nodes_live_t); /* Too big for stack */
1284  live->skip = skip->id;
1285 
1286  pthread_mutex_lock(&cluster->mutex);
1288  pthread_mutex_unlock(&cluster->mutex);
1289 
1290  rad_assert(live->next); /* There should be at least one */
1291  if (live->next == 1) goto no_alts; /* Weird, but conceivable */
1292 
1293  gettimeofday(&now, NULL);
1294 
1295  /*
1296  * Weighted random selection
1297  */
1298  for (i = 0; (i < cluster->conf->max_alt) && live->next; i++) {
1299  fr_redis_conn_t *conn;
1300  cluster_node_t *node;
1301  uint8_t j;
1302  int first, last, pivot; /* Must be signed for BS */
1303  unsigned int find, cumulative = 0;
1304 
1305  RDEBUG3("(Re)assigning node weights:");
1306  RINDENT();
1307  for (j = 0; j < live->next; j++) {
1308  int weight;
1309 
1310  weight = cluster_node_pool_health(&now, live->node[j].pool_state);
1311  RDEBUG3("Node %i weight: %i", live->node[j].id, weight);
1312  live->node[j].cumulative = (cumulative += weight);
1313  }
1314  REXDENT();
1315 
1316  /*
1317  * Select a node at random
1318  */
1319  find = (fr_rand() & (cumulative - 1)); /* Between 1 and total */
1320  first = 0;
1321  last = live->next - 1;
1322  pivot = (first + last) / 2;
1323 
1324  while (first <= last) {
1325  if (live->node[pivot].cumulative < find) {
1326  first = pivot + 1;
1327  } else if (live->node[pivot].cumulative == find) {
1328  break;
1329  } else {
1330  last = pivot - 1;
1331  }
1332  pivot = (first + last) / 2;
1333  }
1334  /*
1335  * Round up...
1336  */
1337  if (first > last) pivot = last + 1;
1338 
1339  /*
1340  * Resolve the index to the actual node. We use IDs
1341  * to save memory...
1342  */
1343  node = &cluster->node[live->node[pivot].id];
1344  rad_assert(live->node[pivot].id == node->id);
1345 
1346  RDEBUG2("Selected node %i (using random value %i)", node->id, find);
1347  conn = fr_connection_get(node->pool);
1348  if (!conn) {
1349  RERROR("No connections available to node %i %s:%i", node->id,
1350  node->name, node->addr.port);
1351  next:
1352  /*
1353  * Remove the node we just discovered was bad
1354  * out of the set of nodes we're selecting over.
1355  */
1356  if (pivot == live->next) {
1357  live->next--;
1358  continue;
1359  }
1360  memcpy(&live->node[pivot], &live->node[live->next - 1], sizeof(live->node[pivot]));
1361  live->next--;
1362  continue;
1363  }
1364 
1365  /*
1366  * PING! PONG?
1367  */
1368  switch (cluster_node_ping(request, node, conn)) {
1369  case CLUSTER_OP_SUCCESS:
1370  break;
1371 
1373  fr_connection_close(node->pool, conn);
1374  goto next;
1375 
1376  default:
1377  fr_connection_release(node->pool, conn);
1378  goto next;
1379  }
1380 
1381  *live_node = node;
1382  *live_conn = conn;
1383  talloc_free(live);
1384 
1385  return 0;
1386  }
1387 
1388  RERROR("Hit max alt limit %i, and no live connections found", cluster->conf->max_alt);
1389  talloc_free(live);
1390 
1391  return -1;
1392 }
1393 
1394 /** Callback for freeing a Redis connection
1395  *
1396  * @param[in] conn to free.
1397  * @return 0.
1398  */
1400 {
1401  redisFree(conn->handle);
1402 
1403  return 0;
1404 }
1405 
1406 /** Create a new connection to a Redis node
1407  *
1408  * @param[in] ctx to allocate connection structure in. Will be freed at the same time as the pool.
1409  * @param[in] instance data of type #cluster_node_t. Holds parameters for establishing new connection.
1410  * @param[in] timeout The maximum time allowed to complete the connection.
1411  * @return
1412  * - New #fr_redis_conn_t on success.
1413  * - NULL on failure.
1414  */
1415 void *fr_redis_cluster_conn_create(TALLOC_CTX *ctx, void *instance, struct timeval const *timeout)
1416 {
1417  cluster_node_t *node = instance;
1418  fr_redis_conn_t *conn = NULL;
1419  redisContext *handle;
1420  redisReply *reply = NULL;
1421 
1422  DEBUG2("%s [%i]: Connecting node to %s:%i", node->conf->prefix, node->id, node->name, node->addr.port);
1423 
1424  handle = redisConnectWithTimeout(node->name, node->addr.port, *timeout);
1425  if ((handle != NULL) && handle->err) {
1426  ERROR("%s [%i]: Connection failed: %s", node->conf->prefix, node->id, handle->errstr);
1427  redisFree(handle);
1428  return NULL;
1429  } else if (!handle) {
1430  ERROR("%s [%i]: Connection failed", node->conf->prefix, node->id);
1431  return NULL;
1432  }
1433 
1434  if (node->conf->password) {
1435  DEBUG3("%s [%i]: Executing: AUTH %s", node->conf->prefix, node->id, node->conf->password);
1436  reply = redisCommand(handle, "AUTH %s", node->conf->password);
1437  if (!reply) {
1438  ERROR("%s [%i]: Failed authenticating: %s", node->conf->prefix, node->id, handle->errstr);
1439  error:
1440  if (reply) fr_redis_reply_free(reply);
1441  redisFree(handle);
1442  return NULL;
1443  }
1444 
1445  switch (reply->type) {
1446  case REDIS_REPLY_STATUS:
1447  if (strcmp(reply->str, "OK") != 0) {
1448  ERROR("%s [%i]: Failed authenticating: %s", node->conf->prefix,
1449  node->id, reply->str);
1450  goto error;
1451  }
1452  fr_redis_reply_free(reply);
1453  break; /* else it's OK */
1454 
1455  case REDIS_REPLY_ERROR:
1456  ERROR("%s [%i]: Failed authenticating: %s", node->conf->prefix, node->id, reply->str);
1457  goto error;
1458 
1459  default:
1460  ERROR("%s [%i]: Unexpected reply of type %s to AUTH", node->conf->prefix, node->id,
1461  fr_int2str(redis_reply_types, reply->type, "<UNKNOWN>"));
1462  goto error;
1463  }
1464  }
1465 
1466  if (node->conf->database) {
1467  DEBUG3("%s [%i]: Executing: SELECT %i", node->conf->prefix, node->id, node->conf->database);
1468  reply = redisCommand(handle, "SELECT %i", node->conf->database);
1469  if (!reply) {
1470  ERROR("%s [%i]: Failed selecting database %i: %s", node->conf->prefix, node->id,
1471  node->conf->database, handle->errstr);
1472  goto error;
1473  }
1474 
1475  switch (reply->type) {
1476  case REDIS_REPLY_STATUS:
1477  if (strcmp(reply->str, "OK") != 0) {
1478  ERROR("%s [%i]: Failed selecting database %i: %s", node->conf->prefix, node->id,
1479  node->conf->database, reply->str);
1480  goto error;
1481  }
1482  fr_redis_reply_free(reply);
1483  break; /* else it's OK */
1484 
1485  case REDIS_REPLY_ERROR:
1486  ERROR("%s [%i]: Failed selecting database %i: %s", node->conf->prefix, node->id,
1487  node->conf->database, reply->str);
1488  goto error;
1489 
1490  default:
1491  ERROR("%s [%i]: Unexpected reply of type %s, to SELECT", node->conf->prefix, node->id,
1492  fr_int2str(redis_reply_types, reply->type, "<UNKNOWN>"));
1493  goto error;
1494  }
1495  }
1496 
1497  conn = talloc_zero(ctx, fr_redis_conn_t);
1498  conn->handle = handle;
1499  talloc_set_destructor(conn, _cluster_conn_free);
1500 
1501  return conn;
1502 }
1503 
1504 /** Implements the key slot selection scheme used by freeradius
1505  *
1506  * Like the scheme in the clustering specification but with some differences
1507  * if the key is NULL or zero length, then a random keyslot is chosen.
1508  *
1509  * If there's only a single node in the cluster, then we avoid the CRC16
1510  * and just use key slot 0.
1511  *
1512  * @param cluster to determine key slot for.
1513  * @param request The current request.
1514  * @param key the key to resolve.
1515  * @param key_len the length of the key.
1516  * @return pointer to key slot key resolves to.
1517  */
1519  uint8_t const *key, size_t key_len)
1520 {
1521  cluster_key_slot_t *key_slot;
1522 
1523  if (!key || (key_len == 0)) {
1524  key_slot = &cluster->key_slot[(uint16_t)(fr_rand() & (KEY_SLOTS - 1))];
1525  RDEBUG2("Key rand() -> slot %zu", key_slot - cluster->key_slot);
1526 
1527  return key_slot;
1528  }
1529 
1530  /*
1531  * Avoid CRC16 if we're operating with one cluster node or
1532  * without clustering.
1533  */
1534  if (rbtree_num_elements(cluster->used_nodes) > 1) {
1535  key_slot = &cluster->key_slot[cluster_key_hash(key, key_len)];
1536  if (RDEBUG_ENABLED2) {
1537  char *p;
1538 
1539  p = fr_asprint(request, (char const *)key, key_len, '"');
1540  RDEBUG2("Key \"%s\" -> slot %zu", p, key_slot - cluster->key_slot);
1541  talloc_free(p);
1542  }
1543 
1544  return key_slot;
1545  }
1546  RDEBUG3("Single node available, skipping key selection");
1547 
1548  return &cluster->key_slot[0];
1549 }
1550 
1551 /** Resolve a key to a pool, and reserve a connection in that pool
1552  *
1553  * This should be used with #fr_redis_cluster_state_next, and #fr_redis_command_status, to
1554  * transparently locate the cluster node we need to perform the operation on.
1555  *
1556  * Example code below shows how this function is used in conjunction
1557  * with #fr_redis_cluster_state_next to follow redirects, and reconnect handles.
1558  *
1559  @code{.c}
1560  int s_ret;
1561  redis_conn_state state;
1562  fr_redis_conn_t *conn;
1563  redisReply *reply;
1564  fr_redis_rcode_t status;
1565 
1566  for (s_ret = fr_redis_cluster_state_init(&state, &conn, cluster, key, key_len, false);
1567  s_ret == REDIS_RCODE_TRY_AGAIN,
1568  s_ret = fr_redis_cluster_state_next(&state, &conn, cluster, request, status, &reply)) {
1569  reply = redisCommand(conn->handle, "SET foo bar");
1570  status = fr_redis_command_status(conn, reply);
1571  }
1572  // Reply is freed if ret == REDIS_RCODE_TRY_AGAIN, but left in all other cases to allow error
1573  // processing, or extraction of results.
1574  fr_redis_reply_free(reply);
1575  if (s_ret != REDIS_RCODE_SUCCESS) {
1576  // Error
1577  }
1578  // Success
1579  @endcode
1580  *
1581  * @param[out] state to track current pool and various counters, will be initialised.
1582  * @param[out] conn Where to write the reserved connection to.
1583  * @param[in] cluster of pools.
1584  * @param[in] request The current request.
1585  * @param[in] key to resolve to a cluster node/pool. If no key is NULL or key_len is 0 a random
1586  * slot will be chosen.
1587  * @param[in] key_len Length of the key.
1588  * @param[in] read_only If true, will use random slave pool in preference to the master, falling
1589  * back to the master if no slaves are available.
1590  * @return
1591  * - REDIS_RCODE_TRY_AGAIN - try your command with this connection (provided via command).
1592  * - REDIS_RCODE_RECONNECT - when no additional connections available.
1593  */
1595  fr_redis_cluster_t *cluster, REQUEST *request,
1596  uint8_t const *key, size_t key_len, bool read_only)
1597 {
1598  cluster_node_t *node;
1599  cluster_key_slot_t *key_slot;
1600  uint8_t first, i;
1601  int used_nodes;
1602 
1603  rad_assert(cluster);
1604  rad_assert(state);
1605  rad_assert(conn);
1606 
1607  memset(state, 0, sizeof(*state));
1608 
1609  used_nodes = rbtree_num_elements(cluster->used_nodes);
1610  if (used_nodes == 0) {
1611  REDEBUG("No nodes in cluster");
1612  return REDIS_RCODE_RECONNECT;
1613  }
1614 
1615 again:
1616  key_slot = cluster_slot_by_key(cluster, request, key, key_len);
1617 
1618  /*
1619  * 1. Try each of the slaves for the key slot
1620  * 2. Fall through to trying the master, and a single alternate node.
1621  */
1622  if (read_only) {
1623  first = fr_rand() & key_slot->slave_num;
1624  for (i = 0; i < key_slot->slave_num; i++) {
1625  uint8_t node_id;
1626 
1627  node_id = key_slot->slave[(first + i) % key_slot->slave_num];
1628  node = &cluster->node[node_id];
1629  *conn = fr_connection_get(node->pool);
1630  if (!*conn) {
1631  RDEBUG2("[%i] No connections available (key slot %zu slave %i)",
1632  node->id, key_slot - cluster->key_slot, (first + i) % key_slot->slave_num);
1633  cluster->remap_needed = true;
1634  continue; /* Continue until we find a live pool */
1635  }
1636 
1637  goto finish;
1638  }
1639  /* Fall through to using key slot master or alternate */
1640  }
1641 
1642  /*
1643  * 1. Try the master for the key slot
1644  * 2. If unavailable search for any pools with handles available
1645  * 3. If there are no pools, or we can't reserve a handle,
1646  * give up.
1647  */
1648  node = &cluster->node[key_slot->master];
1649  *conn = fr_connection_get(node->pool);
1650  if (!*conn) {
1651  RDEBUG2("[%i] No connections available (key slot %zu master)",
1652  node->id, key_slot - cluster->key_slot);
1653  cluster->remap_needed = true;
1654 
1655  if (cluster_node_find_live(&node, conn, request, cluster, node) < 0) return REDIS_RCODE_RECONNECT;
1656  }
1657 
1658 finish:
1659  /*
1660  * Something set the remap_needed flag, and we have a live connection
1661  */
1662  if (cluster->remap_needed) {
1663  if (cluster_remap(request, cluster, *conn) == CLUSTER_OP_SUCCESS) {
1664  fr_connection_release(node->pool, *conn);
1665  goto again; /* New map, try again */
1666  }
1667  RDEBUG2("%s", fr_strerror());
1668  }
1669 
1670  state->node = node;
1671  state->key = key;
1672  state->key_len = key_len;
1673 
1674  RDEBUG2("[%i] >>> Sending command(s) to %s:%i", state->node->id, state->node->name, state->node->addr.port);
1675 
1676  return REDIS_RCODE_TRY_AGAIN;
1677 }
1678 
1679 /** Get the next connection to attempt a command against
1680  *
1681  * Will process reconnect and redirect states performing the actions necessary.
1682  *
1683  * - May trigger a cluster remap on receiving a #REDIS_RCODE_MOVE status.
1684  * - May perform a temporary redirect on receiving a #REDIS_RCODE_ASK status.
1685  * - May reserve a new connection on receiving a #REDIS_RCODE_RECONNECT status.
1686  *
1687  * If a remap is in progress, has ocurred within the last second, has recently failed,
1688  * or fails, the '-MOVE' will be treated as a temporary redirect (-ASK).
1689  *
1690  * This allows the server to be more responsive during remaps, as unless the worker has been
1691  * redirected to a node we don't currently have a pool for, it can grab a connection for the
1692  * node it was redirected to, and continue.
1693  *
1694  * @note Irrespective of return code, the connection passed via conn will be released,
1695  * A new connection to attempt command on will be provided via conn.
1696  *
1697  * @note reply will be automatically freed and set to NULL if a new connection is provided
1698  * in all other cases, the caller is responsible for freeing the reply.
1699  *
1700  * @param[in,out] state containing the current pool, and various counters which control
1701  * retries, and limit redirects.
1702  * @param[in,out] conn we received the '-ASK' or '-MOVE' redirect on. Will be replaced with a
1703  * connection in the new pool the key points to.
1704  * @param[in] request The current request.
1705  * @param[in] cluster of pools.
1706  * @param[in] status of the last command, must be #REDIS_RCODE_MOVE or #REDIS_RCODE_ASK.
1707  * @param[in] reply from last command. Freed if 0 is returned, else caller must free.
1708  * @return
1709  * - REDIS_RCODE_SUCCESS - on success.
1710  * - REDIS_RCODE_TRY_AGAIN - try new connection (provided via conn). Will free reply.
1711  * - REDIS_RCODE_ERROR - on failure or command error.
1712  * - REDIS_RCODE_RECONNECT - when no additional connections available.
1713  */
1715  fr_redis_cluster_t *cluster, REQUEST *request,
1716  fr_redis_rcode_t status, redisReply **reply)
1717 {
1718  rad_assert(state && state->node && state->node->pool);
1719  rad_assert(conn && *conn);
1720 
1721  if (*reply) fr_redis_reply_print(L_DBG_LVL_3, *reply, request, 0);
1722 
1723  RDEBUG2("[%i] <<< Returned: %s", state->node->id, fr_int2str(redis_rcodes, status, "<UNKNOWN>"));
1724 
1725  /*
1726  * Caller indicated we should close the connection
1727  */
1728  if (state->close_conn) {
1729  RDEBUG2("[%i] Connection no longer viable, closing it", state->node->id);
1730  fr_connection_close(state->node->pool, *conn);
1731  *conn = NULL;
1732  state->close_conn = false;
1733  }
1734 
1735  /*
1736  * If we have a proven live connection, and something
1737  * has set the remap_needed flag, do that now before
1738  * releasing the connection.
1739  */
1740  if (cluster->remap_needed && *conn) switch(status) {
1741  case REDIS_RCODE_MOVE: /* We're going to remap anyway */
1742  case REDIS_RCODE_RECONNECT: /* The connection's dead */
1743  break;
1744 
1745  default:
1746  /*
1747  * Remap the cluster. On success, will clear the
1748  * remap_needed flag.
1749  */
1750  if (cluster_remap(request, cluster, *conn) != CLUSTER_OP_SUCCESS) RDEBUG2("%s", fr_strerror());
1751  }
1752 
1753  /*
1754  * Check the result of the last redis command, and do
1755  * something appropriate.
1756  */
1757  switch (status) {
1758  case REDIS_RCODE_SUCCESS:
1759  fr_connection_release(state->node->pool, *conn);
1760  *conn = NULL;
1761  return REDIS_RCODE_SUCCESS;
1762 
1763  /*
1764  * Command error, not fixable.
1765  */
1766  case REDIS_RCODE_NO_SCRIPT:
1767  case REDIS_RCODE_ERROR:
1768  REDEBUG("[%i] Command failed: %s", state->node->id, fr_strerror());
1769  fr_connection_release(state->node->pool, *conn);
1770  *conn = NULL;
1771  return REDIS_RCODE_ERROR;
1772 
1773  /*
1774  * Cluster's unstable, try again.
1775  */
1776  case REDIS_RCODE_TRY_AGAIN:
1777  if (state->retries++ >= cluster->conf->max_retries) {
1778  REDEBUG("[%i] Hit maximum retry attempts", state->node->id);
1779  fr_connection_release(state->node->pool, *conn);
1780  *conn = NULL;
1781  return REDIS_RCODE_ERROR;
1782  }
1783 
1784  if (!*conn) *conn = fr_connection_get(state->node->pool);
1785 
1786  if (FR_TIMEVAL_TO_MS(&cluster->conf->retry_delay)) {
1787  struct timespec ts;
1788 
1789  ts.tv_sec = cluster->conf->retry_delay.tv_sec;
1790  ts.tv_nsec = cluster->conf->retry_delay.tv_usec * 1000;
1791  nanosleep(&ts, NULL);
1792  }
1793  goto try_again;
1794 
1795  /*
1796  * Connection's dead, check to see if we can switch nodes,
1797  * or, failing that, reconnect the connection.
1798  */
1799  case REDIS_RCODE_RECONNECT:
1800  {
1801  cluster_key_slot_t *key_slot;
1802 
1803  RERROR("[%i] Failed communicating with %s:%i: %s", state->node->id, state->node->name,
1804  state->node->addr.port, fr_strerror());
1805 
1806  fr_connection_close(state->node->pool, *conn); /* He's dead jim */
1807 
1808  if (state->reconnects++ > state->in_pool) {
1809  REDEBUG("[%i] Hit maximum reconnect attempts", state->node->id);
1810  cluster->remap_needed = true;
1811  return REDIS_RCODE_RECONNECT;
1812  }
1813 
1814  /*
1815  * Refresh the key slot
1816  */
1817  key_slot = cluster_slot_by_key(cluster, request, state->key, state->key_len);
1818  state->node = &cluster->node[key_slot->master];
1819 
1820  *conn = fr_connection_get(state->node->pool);
1821  if (!*conn) {
1822  REDEBUG("[%i] No connections available for %s:%i", state->node->id, state->node->name,
1823  state->node->addr.port);
1824  cluster->remap_needed = true;
1825  return REDIS_RCODE_RECONNECT;
1826  }
1827 
1828  state->retries = 0;
1829  }
1830  goto try_again;
1831 
1832  /*
1833  * -MOVE is treated identically to -ASK, except it may
1834  * trigger a cluster remap.
1835  */
1836  case REDIS_RCODE_MOVE:
1837  rad_assert(*reply);
1838 
1839  if (*conn && (cluster_remap(request, cluster, *conn) != CLUSTER_OP_SUCCESS)) {
1840  RDEBUG2("%s", fr_strerror());
1841  }
1842  /* FALL-THROUGH */
1843 
1844  /*
1845  * -ASK process a redirect.
1846  */
1847  case REDIS_RCODE_ASK:
1848  {
1849  cluster_node_t *new;
1850 
1851  rad_assert(*reply);
1852 
1853  fr_connection_release(state->node->pool, *conn); /* Always release the old connection */
1854 
1855  RDEBUG("[%i] Processing redirect \"%s\"", state->node->id, (*reply)->str);
1856  if (state->redirects++ >= cluster->conf->max_redirects) {
1857  REDEBUG("[%i] Reached max_redirects (%i)", state->node->id, state->redirects);
1858  return REDIS_RCODE_ERROR;
1859  }
1860 
1861  switch (cluster_redirect(&new, cluster, *reply)) {
1862  case CLUSTER_OP_SUCCESS:
1863  if (new == state->node) {
1864  REDEBUG("[%i] %s:%i issued redirect to itself", state->node->id,
1865  state->node->name, state->node->addr.port);
1866  return REDIS_RCODE_ERROR;
1867  }
1868 
1869  RDEBUG("[%i] Redirected from %s:%i to [%i] %s:%i", state->node->id, state->node->name,
1870  state->node->addr.port, new->id, new->name, new->addr.port);
1871  state->node = new;
1872 
1873  *conn = fr_connection_get(state->node->pool);
1874  if (!*conn) return REDIS_RCODE_RECONNECT;
1875 
1876  /*
1877  * Reset these counters, their scope is
1878  * a single node in the cluster.
1879  */
1880  state->reconnects = 0;
1881  state->retries = 0;
1882  state->in_pool = fr_connection_pool_state(state->node->pool)->num;
1883  goto try_again;
1884 
1886  cluster->remap_needed = true;
1887  return REDIS_RCODE_RECONNECT;
1888 
1889  default:
1890  return REDIS_RCODE_ERROR;
1891  }
1892  }
1893  }
1894 
1895 try_again:
1896  RDEBUG2("[%i] >>> Sending command(s) to %s:%i", state->node->id, state->node->name, state->node->addr.port);
1897 
1898  fr_redis_reply_free(*reply);
1899  *reply = NULL;
1900 
1901  return REDIS_RCODE_TRY_AGAIN;
1902 }
1903 
1904 /** Get the pool associated with a node in the cluster
1905  *
1906  * @note This is used for testing only. It's not ifdef'd out because
1907  * tests need to run against production builds too.
1908  *
1909  * @param[out] pool associated with the node.
1910  * @param[in] cluster to search for node in.
1911  * @param[in] ipaddr of node.
1912  * @param[in] port of node.
1913  * @param[in] create Establish a connection to the specified node if it
1914  * was previously unknown to the cluster client.
1915  * @return
1916  * - 0 on success.
1917  * - -1 if no such node exists.
1918  */
1920  fr_ipaddr_t *ipaddr, uint16_t port, bool create)
1921 {
1922  cluster_node_t find, *found;
1923 
1924  find.addr.ipaddr = *ipaddr;
1925  find.addr.port = port;
1926 
1927  pthread_mutex_lock(&cluster->mutex);
1928  found = rbtree_finddata(cluster->used_nodes, &find);
1929  if (!found) {
1930  cluster_node_t *spare;
1931  char buffer[INET6_ADDRSTRLEN];
1932  char const *hostname;
1933 
1934  if (!create) {
1935  pthread_mutex_unlock(&cluster->mutex);
1936 
1937  hostname = inet_ntop(ipaddr->af, &ipaddr->ipaddr, buffer, sizeof(buffer));
1938  rad_assert(hostname); /* addr.ipaddr is probably corrupt */;
1939  fr_strerror_printf("No existing node found with address %s, port %i", hostname, port);
1940  return -1;
1941  }
1942 
1943  spare = fr_fifo_peek(cluster->free_nodes);
1944  if (!spare) {
1945  fr_strerror_printf("Reached maximum connected nodes");
1946  pthread_mutex_unlock(&cluster->mutex);
1947  return -1;
1948  }
1949  spare->pending_addr = find.addr; /* Set the config to be applied */
1950  if (cluster_node_connect(cluster, spare) < 0) {
1951  pthread_mutex_unlock(&cluster->mutex);
1952  return -1;
1953  }
1954  rbtree_insert(cluster->used_nodes, spare);
1955  fr_fifo_pop(cluster->free_nodes);
1956  found = spare;
1957  }
1958  /*
1959  * Sanity checks
1960  */
1961  rad_assert(((talloc_array_length(cluster->node) - 1) - rbtree_num_elements(cluster->used_nodes)) ==
1962  fr_fifo_num_elements(cluster->free_nodes));
1963  pthread_mutex_unlock(&cluster->mutex);
1964 
1965  *pool = found->pool;
1966 
1967  return 0;
1968 }
1969 
1970 #ifdef HAVE_PTHREAD_H
1971 /** Destroy mutex associated with cluster slots structure
1972  *
1973  * @param cluster being freed.
1974  * @return 0
1975  */
1976 static int _fr_redis_cluster_free(fr_redis_cluster_t *cluster)
1977 {
1978  pthread_mutex_destroy(&cluster->mutex);
1979 
1980  return 0;
1981 }
1982 #endif
1983 
1984 /** Walk all used pools checking their versions
1985  *
1986  * @param context Where to write the node we found.
1987  * @param data node to check.
1988  * @return
1989  * - 0 continue walking.
1990  * - -1 found suitable node.
1991  */
1992 static int _cluster_version_walk(void *context, void *data)
1993 {
1994  char const *min_version = context;
1995  cluster_node_t *node = data;
1996  fr_redis_conn_t *conn;
1997  int ret;
1998  char buffer[40];
1999 
2000  conn = fr_connection_get(node->pool);
2001  if (!conn) return 0;
2002 
2003  /*
2004  * We don't care if we can't get the version
2005  * as we don't want to prevent the server from
2006  * starting if start == 0.
2007  */
2008  ret = fr_redis_get_version(buffer, sizeof(buffer), conn);
2009  fr_connection_release(node->pool, conn);
2010  if (ret < 0) return 0;
2011 
2012  if (fr_redis_version_num(buffer) < fr_redis_version_num(min_version)) {
2013  fr_strerror_printf("Redis node %s:%i (currently v%s) needs update to >= v%s",
2014  node->name, node->addr.port, buffer, min_version);
2015  return -1;
2016  }
2017 
2018  return 0;
2019 }
2020 
2021 /** Check if members of the cluster are above a certain version
2022  *
2023  * @param cluster to perform check on.
2024  * @param min_version that must be found on each node for the check to succeed.
2025  * Must be in the format @verbatim <major>.<minor>.<release> @endverbatim.
2026  * @return
2027  * - true if all contactable members are above min_version.
2028  * - false if at least one member if not above minimum version
2029  * (use #fr_strerror to retrieve node information).
2030  */
2031 bool fr_redis_cluster_min_version(fr_redis_cluster_t *cluster, char const *min_version)
2032 {
2033  int ret;
2034  char *p;
2035 
2036  memcpy(&p, &min_version, sizeof(p));
2037 
2038  pthread_mutex_lock(&cluster->mutex);
2040  pthread_mutex_unlock(&cluster->mutex);
2041 
2042  return ret < 0 ? false : true;
2043 }
2044 
2045 /** Allocate and initialise a new cluster structure
2046  *
2047  * This holds all the data necessary to manage a pool of pools for a specific redis cluster.
2048  *
2049  * @note Will not error out unless cs.pool.start > 0. This is consistent with other pool based
2050  * modules/code.
2051  *
2052  * @param ctx to link the lifetime of the cluster structure to.
2053  * @param module Configuration section to search for 'server' conf pairs in.
2054  * @param conf Base redis server configuration. Cluster nodes share database number and password.
2055  * @return
2056  * - New #fr_redis_cluster_t on success.
2057  * - NULL on error.
2058  */
2060 {
2061  uint8_t i;
2062  uint16_t s;
2063 
2064  CONF_SECTION *mycs;
2065  char const *cs_name1, *cs_name2;
2066 
2067  CONF_PAIR *cp;
2068  int af = AF_UNSPEC; /* AF of first server */
2069 
2070  int num_nodes;
2071  fr_redis_cluster_t *cluster;
2072 
2073  cluster = talloc_zero(NULL, fr_redis_cluster_t);
2074  if (!cluster) {
2075  ERROR("%s: Out of memory", conf->prefix);
2076  return NULL;
2077  }
2078 
2079  if (!conf->prefix) {
2080  cs_name1 = cf_section_name1(module);
2081  cs_name2 = cf_section_name2(module);
2082  if (!cs_name2) cs_name2 = cs_name1;
2083  conf->prefix = talloc_asprintf(conf, "rlm_%s (%s)", cs_name1, cs_name2);
2084  }
2085 
2086  /*
2087  * Ensure we always have a pool section (even if it's empty)
2088  */
2089  mycs = cf_section_sub_find(module, "pool");
2090  if (!mycs) {
2091  mycs = cf_section_alloc(module, "pool", NULL);
2092  cf_section_add(module, mycs);
2093  }
2094 
2095  if (conf->max_nodes == UINT8_MAX) {
2096  ERROR("%s: Maximum number of connected nodes allowed is %i", conf->prefix, UINT8_MAX - 1);
2097  talloc_free(cluster);
2098  return NULL;
2099  }
2100 
2101  if (conf->max_nodes == 0) {
2102  ERROR("%s: Minimum number of nodes allowed is 1", conf->prefix);
2103  talloc_free(cluster);
2104  return NULL;
2105  }
2106 
2107  cp = cf_pair_find(module, "server");
2108  if (!cp) {
2109  ERROR("%s: No servers configured", conf->prefix);
2110  talloc_free(cluster);
2111  return NULL;
2112  }
2113 
2114  cluster->module = module;
2115 
2116  /*
2117  * Ensure the pool is freed at the same time as its
2118  * parent.
2119  *
2120  * We need to break the link between the cluster and
2121  * its parent context, as the two contexts may be
2122  * modified by multiple threads.
2123  */
2124  if (fr_talloc_link_ctx(ctx, cluster) < 0) {
2125  oom:
2126  ERROR("%s: Out of memory", conf->prefix);
2127 
2128  error:
2129  talloc_free(cluster);
2130  return NULL;
2131  }
2132 
2133  cluster->node = talloc_zero_array(cluster, cluster_node_t, conf->max_nodes + 1);
2134  if (!cluster->node) goto oom;
2135 
2136  cluster->used_nodes = rbtree_create(cluster, _cluster_node_cmp, NULL, 0);
2137  cluster->free_nodes = fr_fifo_create(cluster, conf->max_nodes, NULL);
2138  cluster->conf = conf;
2139 
2140 #ifdef HAVE_PTHREAD_H
2141  pthread_mutex_init(&cluster->mutex, NULL);
2142  talloc_set_destructor(cluster, _fr_redis_cluster_free);
2143 #endif
2144 
2145  /*
2146  * Node id 0 is reserved, so we can detect misconfigured
2147  * clusters.
2148  */
2149  for (i = 1; i < (cluster->conf->max_nodes + 1); i++) {
2150  cluster->node[i].id = i;
2151  cluster->node[i].conf = conf;
2152 
2153  /* Push them all into the queue */
2154  fr_fifo_push(cluster->free_nodes, &cluster->node[i]);
2155  }
2156 
2157  /*
2158  * Populate the cluster with the bootstrap servers.
2159  *
2160  * If we fail getting a key_slot map here, then the
2161  * bootstrap servers are distributed evenly through
2162  * the key slots.
2163  *
2164  * This allows the server to start, and potentially,
2165  * a valid map to be applied, once the server starts
2166  * processing requests.
2167  */
2168  do {
2169  char const *server;
2170  cluster_node_t *node;
2171  fr_redis_conn_t *conn;
2172  redisReply *map;
2173  size_t j, k;
2174 
2175  node = fr_fifo_peek(cluster->free_nodes);
2176  if (!node) {
2177  ERROR("%s: Number of bootstrap servers exceeds 'max_nodes'", conf->prefix);
2178  goto error;
2179  }
2180 
2181  server = cf_pair_value(cp);
2182  if (fr_inet_pton_port(&node->pending_addr.ipaddr, &node->pending_addr.port, server,
2183  talloc_array_length(server) - 1, af, true, true) < 0) {
2184  ERROR("%s: Failed parsing server \"%s\": %s", conf->prefix, server, fr_strerror());
2185  goto error;
2186  }
2187  if (!node->pending_addr.port) node->pending_addr.port = conf->port;
2188 
2189  if (cluster_node_connect(cluster, node) < 0) {
2190  WARN("%s: Connecting to %s:%i failed", conf->prefix, node->name, node->pending_addr.port);
2191  continue;
2192  }
2193 
2194  if (!rbtree_insert(cluster->used_nodes, node)) {
2195  WARN("%s: Skipping duplicate bootstrap server \"%s\"", conf->prefix, server);
2196  continue;
2197  }
2198  node->active = true;
2199  fr_fifo_pop(cluster->free_nodes);
2200 
2201  /*
2202  * Prefer the same IPaddr family as the first node
2203  */
2204  if (af == AF_UNSPEC) af = node->addr.ipaddr.af;
2205 
2206  /*
2207  * Fine to leave this node configured, if we do find
2208  * a live node, and it's not in the map, it'll be cleared out.
2209  */
2210  conn = fr_connection_get(node->pool);
2211  if (!conn) {
2212  WARN("%s: Can't contact bootstrap server \"%s\"", conf->prefix, server);
2213  continue;
2214  }
2215 
2216  switch (cluster_map_get(&map, conn)) {
2217  /*
2218  * We got a valid map! See if we can apply it...
2219  */
2220  case CLUSTER_OP_SUCCESS:
2221  fr_connection_release(node->pool, conn);
2222 
2223  INFO("%s: Cluster map consists of %zu key ranges", conf->prefix, map->elements);
2224  for (j = 0; j < map->elements; j++) {
2225  redisReply *map_node = map->element[j];
2226 
2227  INFO("%s: %zu - keys %lli-%lli", conf->prefix, j,
2228  map_node->element[0]->integer,
2229  map_node->element[1]->integer);
2230  INFO("%s: master: %s:%lli", conf->prefix,
2231  map_node->element[2]->element[0]->str,
2232  map_node->element[2]->element[1]->integer);
2233  for (k = 3; k < map_node->elements; k++) {
2234  INFO("%s: slave%zu: %s:%lli", conf->prefix, k - 3,
2235  map_node->element[k]->element[0]->str,
2236  map_node->element[k]->element[1]->integer);
2237  }
2238  }
2239 
2240  if (cluster_map_apply(cluster, map) < 0) {
2241  WARN("%s: Applying cluster map failed: %s", conf->prefix, fr_strerror());
2242  fr_redis_reply_free(map);
2243  continue;
2244  }
2245  fr_redis_reply_free(map);
2246 
2247  return cluster;
2248 
2249  /*
2250  * Unusable bootstrap node
2251  */
2252  case CLUSTER_OP_BAD_INPUT:
2253  WARN("%s: Bootstrap server \"%s\" returned invalid data: %s",
2254  conf->prefix, server, fr_strerror());
2255  fr_connection_release(node->pool, conn);
2256  continue;
2257 
2259  WARN("%s: Can't contact bootstrap server \"%s\": %s",
2260  conf->prefix, server, fr_strerror());
2261  fr_connection_close(node->pool, conn);
2262  continue;
2263 
2264  /*
2265  * Clustering not enabled, or not supported,
2266  * by this node, skip it and check the others.
2267  */
2268  case CLUSTER_OP_FAILED:
2269  case CLUSTER_OP_IGNORED:
2270  DEBUG2("%s: Bootstrap server \"%s\" returned: %s",
2271  conf->prefix, server, fr_strerror());
2272  fr_connection_release(node->pool, conn);
2273  break;
2274  }
2275  } while ((cp = cf_pair_find_next(module, cp, "server")));
2276 
2277  /*
2278  * Catch pool.start != 0
2279  */
2280  num_nodes = rbtree_num_elements(cluster->used_nodes);
2281  if (!num_nodes) {
2282  ERROR("%s: Can't contact any bootstrap servers", conf->prefix);
2283  goto error;
2284  }
2285 
2286  /*
2287  * We've failed to apply a valid cluster map.
2288  * Distribute the node(s) throughout the key_slots,
2289  * hopefully we'll get one when we start processing
2290  * requests.
2291  */
2292  for (s = 0; s < KEY_SLOTS; s++) cluster->key_slot[s].master = (s % (uint16_t) num_nodes) + 1;
2293 
2294  return cluster;
2295 }
cluster_key_slot_t key_slot[KEY_SLOTS]
Lookup table of slots to pools.
Definition: cluster.c:270
static cluster_rcode_t cluster_node_ping(REQUEST *request, cluster_node_t *node, fr_redis_conn_t *conn)
Issue a ping request against a cluster node.
Definition: cluster.c:1199
Operation failed because we couldn't find a live connection.
Definition: cluster.c:195
uint8_t slave[MAX_SLAVES]
R/O node (slave) for this key slot.
Definition: cluster.c:245
A Redis cluster node.
Definition: cluster.c:223
#define RINDENT()
Indent R* messages by one level.
Definition: log.h:265
#define DEBUG3(fmt,...)
Definition: log.h:177
#define RERROR(fmt,...)
Definition: log.h:207
cluster_node_addr_t addr
Current node address.
Definition: cluster.c:229
fr_redis_conf_t * conf
Commmon configuration (database number, password, etc..).
Definition: cluster.c:233
Configuration parameters for a redis connection.
Definition: redis.h:88
FR_NAME_NUMBER const redis_rcodes[]
Definition: redis.c:39
static void _cluster_node_conf_apply(void *opaque)
Reconnect callback to apply new pool config.
Definition: cluster.c:335
uint32_t reconnects
How many connections we've tried in this pool.
Definition: cluster.h:56
#define RDEBUG_ENABLED2
True if request debug level 1-2 messages are enabled.
Definition: log.h:238
uint32_t fr_rand(void)
Return a 32-bit random number.
Definition: radius.c:1621
cluster_node_t * node
Structure containing a node id, its address and a pool of its connections.
Definition: cluster.c:264
Operation completed successfully.
Definition: cluster.c:193
FR_NAME_NUMBER const redis_reply_types[]
Definition: redis.c:29
Script doesn't exist.
Definition: redis.h:75
3rd highest priority debug messages (-xxx | -Xx).
Definition: log.h:53
#define INFO(fmt,...)
Definition: log.h:143
static cluster_rcode_t cluster_map_apply(fr_redis_cluster_t *cluster, redisReply *reply)
Apply a cluster map received from a cluster node.
Definition: cluster.c:493
static int _cluster_pool_walk(void *context, void *data)
Walk all used pools adding them to the live node list.
Definition: cluster.c:1127
char name[INET6_ADDRSTRLEN]
Buffer to hold IP + port.
Definition: cluster.c:224
int fr_inet_pton_port(fr_ipaddr_t *out, uint16_t *port_out, char const *value, ssize_t inlen, int af, bool resolve, bool mask)
Parses IPv4/6 address + port, to fr_ipaddr_t and integer (port)
Definition: inet.c:632
#define FAILED_WEIGHT
What weight to give to nodes that.
Definition: cluster.c:180
#define FAILED_PERIOD
How recently must the spawn failure.
Definition: cluster.c:177
#define pthread_mutex_destroy(_x)
Definition: cluster.c:158
static int cluster_node_find_live(cluster_node_t **live_node, fr_redis_conn_t **live_conn, REQUEST *request, fr_redis_cluster_t *cluster, cluster_node_t *skip)
Attempt to find a live pool in the cluster.
Definition: cluster.c:1267
bool remap_needed
Set true if at least one cluster node is definitely unreachable.
Definition: cluster.c:256
char const * prefix
Logging prefix for errors in fr_redis_cluster_conn_create.
Definition: redis.h:89
#define MAX_SLAVES
Maximum number of slaves associated.
Definition: cluster.c:165
struct cluster_nodes_live::@23 node[UINT8_MAX-1]
Array of live node IDs (and weights).
#define pthread_mutex_lock(_x)
Definition: cluster.c:159
char const * inet_ntop(int af, void const *src, char *dst, size_t cnt)
Definition: missing.c:538
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
Definition: snprintf.c:686
bool close_conn
Set by caller of fr_redis_cluster_state_next, to indicate that connection must be closed...
Definition: cluster.h:44
static cluster_rcode_t cluster_redirect(cluster_node_t **out, fr_redis_cluster_t *cluster, redisReply *reply)
Retrieve or associate a node with the server indicated in the redirect.
Definition: cluster.c:1045
static cluster_rcode_t cluster_node_conf_from_redirect(uint16_t *key_slot, cluster_node_addr_t *node_addr, redisReply *redirect)
Parse a -MOVED or -ASK redirect.
Definition: cluster.c:404
fr_connection_pool_t * pool
Pool associated with this node.
Definition: cluster.c:235
#define pthread_mutex_init(_x, _y)
Definition: cluster.c:157
static char const * hostname(char *buf, size_t buflen, uint32_t ipaddr)
Definition: radwho.c:149
uint8_t const * key
Key we performed hashing on.
Definition: cluster.h:48
static int _cluster_version_walk(void *context, void *data)
Walk all used pools checking their versions.
Definition: cluster.c:1992
#define fr_redis_reply_free(_p)
Wrap freeReplyObject so we consistently check for NULL pointers.
Definition: redis.h:56
static float timeout
Definition: radclient.c:43
void * rbtree_finddata(rbtree_t *tree, void const *data)
Find the user data.
Definition: rbtree.c:537
#define SET_INACTIVE(_node)
static expr_map_t map[]
Definition: rlm_expr.c:169
Definition: fifo.c:29
static int _cluster_conn_free(fr_redis_conn_t *conn)
Callback for freeing a Redis connection.
Definition: cluster.c:1399
uint32_t database
number on Redis server.
Definition: redis.h:93
A redis cluster.
Definition: cluster.c:254
void fr_redis_reply_print(log_lvl_t lvl, redisReply *reply, REQUEST *request, int idx)
Print the response data in a useful treelike form.
Definition: redis.c:139
CONF_PAIR * cf_pair_find(CONF_SECTION const *, char const *name)
Definition: conffile.c:3478
char const * cf_pair_value(CONF_PAIR const *pair)
Definition: conffile.c:3506
uint32_t redirects
How many redirects have we followed.
Definition: cluster.h:52
uint8_t master
R/W node (master) for this key slot.
Definition: cluster.c:247
int af
Address family.
Definition: inet.h:42
static uint16_t cluster_key_hash(uint8_t const *key, size_t key_len)
Resolve key to key slot.
Definition: cluster.c:288
#define rad_assert(expr)
Definition: rad_assert.h:38
bool remapping
True when cluster is being remapped.
Definition: cluster.c:255
#define SET_ACTIVE(_node)
void fr_timeval_subtract(struct timeval *out, struct timeval const *end, struct timeval const *start)
Subtract one timeval from another.
Definition: misc.c:856
int fr_inet_pton(fr_ipaddr_t *out, char const *value, ssize_t inlen, int af, bool resolve, bool mask)
Simple wrapper to decide whether an IP value is v4 or v6 and call the appropriate parser...
Definition: inet.c:564
redisContext * handle
Hiredis context used when issuing commands.
Definition: redis.h:81
struct fr_redis_cluster_node cluster_node_t
A Redis cluster node.
void fr_connection_pool_reconnect_func(fr_connection_pool_t *pool, fr_connection_pool_reconnect_t reconnect)
Set a reconnection callback for the connection pool.
Definition: connection.c:1122
rbtree_t * rbtree_create(TALLOC_CTX *ctx, rb_comparator_t compare, rb_free_t node_free, int flags)
Create a new RED-BLACK tree.
Definition: rbtree.c:112
fr_fifo_t * fr_fifo_create(TALLOC_CTX *ctx, int max_entries, fr_fifo_free_t freeNode)
Definition: fifo.c:39
Indexes in the cluster_node_t array for a single key slot.
Definition: cluster.c:244
uint32_t fr_redis_version_num(char const *version)
Convert version string into a 32bit unsigned integer for comparisons.
Definition: redis.c:597
cluster_rcode_t
Return values for internal functions.
Definition: cluster.c:191
void * fr_redis_cluster_conn_create(TALLOC_CTX *ctx, void *instance, struct timeval const *timeout)
Create a new connection to a Redis node.
Definition: cluster.c:1415
struct timeval last_released
Last time a connection was released.
Definition: connection.h:47
#define RELEASED_PERIOD
Period after which we don't care.
Definition: cluster.c:183
struct cluster_key_slot cluster_key_slot_t
Indexes in the cluster_node_t array for a single key slot.
struct fr_redis_cluster_node * node
Node we're communicating with.
Definition: cluster.h:51
#define DEBUG2(fmt,...)
Definition: log.h:176
#define STRINGIFY(x)
Definition: build.h:34
static int cluster_map_node_validate(redisReply *node, int map_idx, int node_idx)
Validate a cluster map node entry.
Definition: cluster.c:729
union fr_ipaddr_t::@1 ipaddr
uint16_t fr_crc16_xmodem(uint8_t const *in, size_t in_len)
CRC16 implementation according to CCITT standards.
Definition: crc16.c:91
int rbtree_walk(rbtree_t *tree, rb_order_t order, rb_walker_t compare, void *context)
Definition: rbtree.c:693
Unrecoverable library/server error.
Definition: redis.h:69
fr_redis_rcode_t fr_redis_command_status(fr_redis_conn_t *conn, redisReply *reply)
Check the reply for errors.
Definition: redis.c:76
Attempt operation on an alternative node.
Definition: redis.h:73
Operation ignored.
Definition: cluster.c:192
uint16_t port
of Redis daemon.
Definition: redis.h:92
fr_fifo_t * free_nodes
Queue of free nodes (or nodes waiting to be reused).
Definition: cluster.c:267
Live nodes data, used to perform weighted random selection of alternative nodes.
Definition: cluster.c:202
Redis connection sequence state.
Definition: cluster.h:43
char const * password
to authenticate to Redis.
Definition: redis.h:95
CONF_PAIR * cf_pair_find_next(CONF_SECTION const *, CONF_PAIR const *, char const *name)
Find a pair with a name matching attr, after specified pair.
Definition: conffile.c:3673
int fr_fifo_push(fr_fifo_t *fi, void *data)
Definition: fifo.c:79
cluster_node_addr_t pending_addr
New node address to be applied when the pool is reconnected.
Definition: cluster.c:230
#define REXDENT()
Exdent (unindent) R* messages by one level.
Definition: log.h:272
#define UNUSED_VAR(_x)
Definition: build.h:57
Configuration AVP similar to a VALUE_PAIR.
Definition: conffile.c:82
fr_redis_rcode_t fr_redis_get_version(char *out, size_t out_len, fr_redis_conn_t *conn)
Get the version of Redis running on the remote server.
Definition: redis.c:549
uint8_t slave_num
Number of slaves associated with this key slot.
Definition: cluster.c:246
uint32_t max_alt
Maximum alternative nodes to try.
Definition: redis.h:101
static rs_t * conf
Definition: radsniff.c:46
size_t key_len
Length of the key.
Definition: cluster.h:49
#define CLOSED_WEIGHT
What weight to give to nodes that.
Definition: cluster.c:174
char const * fr_strerror(void)
Get the last library error.
Definition: log.c:212
uint32_t max_retries
Maximum number of times we attempt a command when receiving successive -TRYAGAIN messages.
Definition: redis.h:99
CONF_SECTION * cf_section_sub_find(CONF_SECTION const *, char const *name)
Find a sub-section in a section.
Definition: conffile.c:3708
void * fr_fifo_pop(fr_fifo_t *fi)
Definition: fifo.c:92
fr_redis_conf_t * conf
Base configuration data such as the database number and passwords.
Definition: cluster.c:261
Attempt operation on an alternative node with remap.
Definition: redis.h:74
char const * cf_section_name1(CONF_SECTION const *cs)
Definition: conffile.c:3592
#define RDEBUG2(fmt,...)
Definition: log.h:244
unsigned int state
Definition: proto_bfd.c:200
fr_redis_rcode_t fr_redis_cluster_state_next(fr_redis_cluster_state_t *state, fr_redis_conn_t **conn, fr_redis_cluster_t *cluster, REQUEST *request, fr_redis_rcode_t status, redisReply **reply)
Get the next connection to attempt a command against.
Definition: cluster.c:1714
#define REDIS_ERROR_MOVED_STR
Definition: redis.h:40
uint8_t max_nodes
Maximum number of cluster nodes to connect to.
Definition: redis.h:97
fr_ipaddr_t ipaddr
IP Address of Redis cluster node.
Definition: cluster.c:215
uint8_t data[]
Definition: eap_pwd.h:625
static cluster_rcode_t cluster_node_connect(fr_redis_cluster_t *cluster, cluster_node_t *node)
Establish a connection to a cluster node.
Definition: cluster.c:352
time_t last_failed
Last time we tried to spawn a connection but failed.
Definition: connection.h:41
struct cluster_nodes_live cluster_nodes_live_t
Live nodes data, used to perform weighted random selection of alternative nodes.
#define SET_ADDR(_addr, _map)
uint8_t id
Node ID (index in node array).
Definition: cluster.c:227
Transitory error, caller should retry the operation with a new connection.
Definition: redis.h:71
uint32_t in_pool
How many available connections are there in the pool.
Definition: cluster.h:55
int fr_redis_cluster_pool_by_node_addr(fr_connection_pool_t **pool, fr_redis_cluster_t *cluster, fr_ipaddr_t *ipaddr, uint16_t port, bool create)
Get the pool associated with a node in the cluster.
Definition: cluster.c:1919
Common functions for interacting with Redis via hiredis.
uint32_t num
Number of connections in the pool.
Definition: connection.h:55
rbtree_t * used_nodes
Tree of used nodes.
Definition: cluster.c:268
char * fr_asprint(TALLOC_CTX *ctx, char const *in, ssize_t inlen, char quote)
Escape string that may contain binary data, and write it to a new buffer.
Definition: print.c:390
bool rbtree_insert(rbtree_t *tree, void *data)
Definition: rbtree.c:329
void * fr_connection_get(fr_connection_pool_t *pool)
Reserve a connection in the connection pool.
Definition: connection.c:1291
A connection pool.
Definition: connection.c:85
int fr_connection_pool_reconnect(fr_connection_pool_t *pool)
Mark connections for reconnection, and spawn at least 'start' connections.
Definition: connection.c:1141
Try the operation again.
Definition: redis.h:70
int fr_connection_close(fr_connection_pool_t *pool, void *conn)
Delete a connection from the connection pool.
Definition: connection.c:1403
static cluster_rcode_t cluster_map_get(redisReply **out, fr_redis_conn_t *conn)
Learn a new cluster layout by querying the node that issued the -MOVE.
Definition: cluster.c:795
uint32_t max_redirects
Maximum number of times we can be redirected.
Definition: redis.h:98
CONF_SECTION * cf_section_alloc(CONF_SECTION *parent, char const *name1, char const *name2)
Allocate a CONF_SECTION.
Definition: conffile.c:626
#define WARN(fmt,...)
Definition: log.h:144
unsigned int fr_fifo_num_elements(fr_fifo_t *fi)
Definition: fifo.c:115
void fr_strerror_printf(char const *,...) CC_HINT(format(printf
time_t last_updated
Last time the cluster mappings were updated.
Definition: cluster.c:258
bool fr_redis_cluster_min_version(fr_redis_cluster_t *cluster, char const *min_version)
Check if members of the cluster are above a certain version.
Definition: cluster.c:2031
fr_redis_cluster_t * fr_redis_cluster_alloc(TALLOC_CTX *ctx, CONF_SECTION *module, fr_redis_conf_t *conf)
Allocate and initialise a new cluster structure.
Definition: cluster.c:2059
#define REDEBUG(fmt,...)
Definition: log.h:254
static int cluster_node_pool_health(struct timeval const *now, fr_connection_pool_state_t const *state)
Try to determine the health of a cluster node passively by examining its pool state.
Definition: cluster.c:1157
Common functions for interacting with Redis cluster via Hiredis.
fr_redis_rcode_t
Codes are ordered inversely by priority.
Definition: redis.h:67
int fr_talloc_link_ctx(TALLOC_CTX *parent, TALLOC_CTX *child)
Link a parent and a child context, so the child is freed before the parent.
Definition: misc.c:105
Configuration for a single node.
Definition: cluster.c:214
struct timeval last_closed
Last time a connection was closed.
Definition: connection.h:48
bool active
Whether this node is in the active node set.
Definition: cluster.c:226
uint32_t retries
How many time's we've received TRYAGAIN.
Definition: cluster.h:54
static cluster_key_slot_t * cluster_slot_by_key(fr_redis_cluster_t *cluster, REQUEST *request, uint8_t const *key, size_t key_len)
Implements the key slot selection scheme used by freeradius.
Definition: cluster.c:1518
fr_redis_rcode_t fr_redis_cluster_state_init(fr_redis_cluster_state_t *state, fr_redis_conn_t **conn, fr_redis_cluster_t *cluster, REQUEST *request, uint8_t const *key, size_t key_len, bool read_only)
Resolve a key to a pool, and reserve a connection in that pool.
Definition: cluster.c:1594
#define RINFO(fmt,...)
Definition: log.h:205
static cluster_rcode_t cluster_remap(REQUEST *request, fr_redis_cluster_t *cluster, fr_redis_conn_t *conn)
Perform a runtime remap of the cluster.
Definition: cluster.c:940
IPv4/6 prefix.
Definition: inet.h:41
fr_connection_pool_t * fr_connection_pool_init(TALLOC_CTX *ctx, CONF_SECTION *cs, void *opaque, fr_connection_create_t c, fr_connection_alive_t a, char const *log_prefix, char const *trigger_prefix)
Create a new connection pool.
Definition: connection.c:899
char const * fr_int2str(FR_NAME_NUMBER const *table, int number, char const *def)
Definition: token.c:506
uint16_t port
Port of Redis cluster node.
Definition: cluster.c:216
#define REDIS_ERROR_ASK_STR
Definition: redis.h:41
struct timeval retry_delay
How long to wait when we received a -TRYAGAIN message.
Definition: redis.h:102
Operation failed.
Definition: cluster.c:194
uint8_t next
Next index in live.
Definition: cluster.c:208
void fr_connection_release(fr_connection_pool_t *pool, void *conn)
Release a connection.
Definition: connection.c:1305
#define FR_TIMEVAL_TO_MS(_x)
Definition: conffile.h:235
static int _cluster_node_cmp(void const *a, void const *b)
Compare two redis nodes to check equality.
Definition: cluster.c:315
Operation was successfull.
Definition: redis.h:68
struct cluster_node_conf cluster_node_addr_t
Configuration for a single node.
CONF_SECTION * module
Module configuration.
Definition: cluster.c:259
#define RELEASED_MIN_WEIGHT
Minimum weight to assign to node.
Definition: cluster.c:187
void cf_section_add(CONF_SECTION *parent, CONF_SECTION *cs)
Definition: conffile.c:754
static int r
Definition: rbmonkey.c:66
fr_connection_pool_state_t const * fr_connection_pool_state(fr_connection_pool_t *pool)
Get the number of connections currently in the pool.
Definition: connection.c:1081
#define CLOSED_PERIOD
How recently must the closed have.
Definition: cluster.c:171
int fr_ipaddr_cmp(fr_ipaddr_t const *a, fr_ipaddr_t const *b)
Compare two ip addresses.
Definition: inet.c:1026
#define RDEBUG(fmt,...)
Definition: log.h:243
#define KEY_SLOTS
Maximum number of keyslots (should not change).
Definition: cluster.c:163
#define ERROR(fmt,...)
Definition: log.h:145
cluster_key_slot_t key_slot_pending[KEY_SLOTS]
Pending key slot table.
Definition: cluster.c:271
char const * cf_section_name2(CONF_SECTION const *cs)
Definition: conffile.c:3601
Connection handle, holding a redis context.
Definition: redis.h:80
Validation error.
Definition: cluster.c:197
#define pthread_mutex_unlock(_x)
Definition: cluster.c:160
uint32_t rbtree_num_elements(rbtree_t *tree)
Definition: rbtree.c:727
#define RDEBUG3(fmt,...)
Definition: log.h:245
void * fr_fifo_peek(fr_fifo_t *fi)
Definition: fifo.c:108