The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
cluster.c
Go to the documentation of this file.
1 /*
2  * This program is is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or (at
5  * your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: b9168ca817664a51f6a310fa5655eee0fc006d05 $
19  * @file redis.c
20  * @brief conf functions for interacting with Redis cluster via Hiredis.
21  *
22  * @author Arran Cudbard-Bell
23  *
24  * @copyright 2015 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
25  * @copyright 2015 Network RADIUS (legal@networkradius.com)
26  * @copyright 2015 The FreeRADIUS server project
27  *
28  * Overview
29  * ========
30  *
31  * Read and understand this http://redis.io/topics/cluster-spec first, else the text below
32  * will not be useful.
33  *
34  * Using the cluster's public API
35  * ------------------------------
36  *
37  * The two functions used at runtime to issue commands are #fr_redis_cluster_state_init
38  * and #fr_redis_cluster_state_next.
39  *
40  * #fr_redis_cluster_state_init initialises the structure we use to track which node
41  * we're communicating with, and uses a key (string) to determine which node we should
42  * try and contact first.
43  *
44  * #fr_redis_cluster_state_next examines the result of the last command, and either
45  * gets a new connection, or errors out.
46  *
47  * In both cases the connection should not be released by the caller, only by
48  * #fr_redis_cluster_state_next.
49  *
50  * Below is the sequence of calls required to use the cluster:
51  *
52  * 1. During initialization allocate a cluster configuration structure with
53  * #fr_redis_cluster_alloc. This holds the cluster configuration and state.
54  * 2. Declare a #fr_redis_cluster_state_t in the function that needs to issue
55  * commands against the cluster.
56  * 3. Call #fr_redis_cluster_state_init with relevant arguments including a pointer
57  * to the #fr_redis_cluster_state_t struct, and a key/key_len used for initial
58  * lookup. For most commands the key is the value of the second argument.
59  * #fr_redis_cluster_state_init will then and reserve/pass back a connection for
60  * the pool associated with the node associated with the key.
61  * 4. Use the connection that was passed back, to issue a Redis command against.
62  * 5. Use #fr_redis_command_status to translate the result of the command into
63  * a #fr_redis_rcode_t value.
64  * 6. Call #fr_redis_cluster_state_next with relevant arguments including a pointer
65  * to the #fr_redis_cluster_state_t struct and the #fr_redis_rcode_t value for the
66  * last command.
67  * 7. If #fr_redis_cluster_state_next returns 0, repeat steps 4-7. Otherwise
68  * stop and analyse the return value.
69  *
70  * See #fr_redis_cluster_state_init for example code.
71  *
72  * Structures
73  * ----------
74  *
75  * This code maintains a series structures for efficient lookup and lockless operations.
76  *
77  * The important ones are:
78  * - An array of #fr_redis_cluster_node_t. These are pre-allocated on startup and are
79  * never added to, or removed from.
80  * - An #fr_fifo_t. This contains the queue of nodes that may be re-used.
81  * - An #fr_rb_tree_t. This contains a tree of nodes which are active. The tree is built on IP
82  * address and port.
83  *
84  * Each #fr_redis_cluster_node_t contains a master ID, and an array of slave IDs. The IDs are array
85  * indexes in the fr_redis_cluster_t.node array. We use 8bit unsigned integers instead of
86  * pointers to save space. Using pointers, the node[] array would need 784K, using IDs
87  * it uses 112K. Still not light on memory, but a bit more acceptable.
88  * Currently the key_slot array is shadowed by key_slot_pending, used to stage new key_slot
89  * mappings. This doubles the memory used. We may want to consider allocating key_slot_pending
90  * only during remappings and freeing it after.
91  *
92  * Mapping/Remapping the cluster
93  * -----------------------------
94  *
95  * On startup, and during cluster operation, a remap may be performed. A remap involves
96  * the following steps:
97  *
98  * 1. Executing the Redis 'cluster slots' command.
99  * 2. Validating the result of this command. We need to do extensive validation to
100  * avoid SEGV on invalid data, due to the way libhiredis presents the result.
101  * 3. Determining the intersection between nodes described in the result, and those already
102  * in our #fr_rb_tree_t.
103  * 4. Connecting to nodes that were in the result, but not in the tree.
104  * Note: If we can't connect to any of the masters, we count the map as invalid, roll
105  * back any newly connected nodes, and error out. Slave failure is OK.
106  * 5. Mapping keyslot ranges to nodes in the key_slot_pending array.
107  * 6. Verifying there are no holes in the ranges (if there are, we roll back and error out).
108  * 7. Applying the new keyslot ranges.
109  * 8. Removing nodes no longer used by the key slots, and adding them back to the free
110  * nodes queue.
111  *
112  * #cluster_map_get and #cluster_map_apply, perform the operations described
113  * above. The get function, issues the 'cluster slots' command and performs validation, the
114  * apply function processes and applies the map.
115  *
116  * Failing to apply a map is not a fatal error at runtime, and is only fatal on startup if
117  * pool.start > 0.
118  *
119  * The cluster client can continue to operate, albeit inefficiently, with a stale cluster map
120  * by following '-ASK' and '-MOVE' redirects.
121  *
122  * Remaps are limited to one per second. If any operation sets the remap_needed flag, or
123  * attempts a remap directly, the remap may be skipped if one occurred recently.
124  *
125  *
126  * Processing '-ASK' and '-MOVE' redirects
127  * ---------------------------------------
128  *
129  * The code treats '-ASK' (temporary redirect) and '-MOVE' (permanent redirect) responses
130  * similarly. If the node is known, then a connection is reserved from its pool, if the node
131  * is not known, a new pool is established, and a connection reserved.
132  *
133  * The difference between '-ASK' and '-MOVE' is that '-MOVE' attempts a cluster remap before
134  * following the redirect.
135  *
136  * The data from '-MOVE' responses, is not used to alter the cluster map. That is only done
137  * on successful remap.
138  *
139  *
140  * Processing '-TRYAGAIN'
141  * ----------------------
142  *
143  * If the cluster is in a state of flux, a node may return '-TRYAGAIN' to indicated that we
144  * should attempt the operation again. The cluster spec says we should attempt the operation
145  * after some time. This time is configurable.
146  *
147  */
148 
149 #include <freeradius-devel/util/debug.h>
150 #include <freeradius-devel/server/cf_parse.h>
151 
152 #include <freeradius-devel/util/fifo.h>
153 #include <freeradius-devel/util/misc.h>
154 #include <freeradius-devel/util/rand.h>
155 #include <freeradius-devel/util/time.h>
156 
157 #include "config.h"
158 #include "base.h"
159 #include "cluster.h"
160 #include "crc16.h"
161 
162 #ifndef WITH_TLS
163 # undef HAVE_REDIS_SSL
164 #endif
165 
166 #ifdef HAVE_REDIS_SSL
167 #include <freeradius-devel/tls/strerror.h>
168 #include <hiredis/hiredis_ssl.h>
169 #endif
170 
171 #define KEY_SLOTS 16384 //!< Maximum number of keyslots (should not change).
172 
173 #define MAX_SLAVES 5 //!< Maximum number of slaves associated
174  //!< with a keyslot.
175 
176 /*
177  * Periods and weights for live node selection
178  */
179 #define CLOSED_PERIOD 10000 //!< How recently must the closed have
180  //!< occurred for us to care.
181 
182 #define CLOSED_WEIGHT 1 //!< What weight to give to nodes that
183  //!< had a connection closed recently.
184 
185 #define FAILED_PERIOD 10000 //!< How recently must the spawn failure
186  //!< occurred for us to care.
187 
188 #define FAILED_WEIGHT 1 //!< What weight to give to nodes that
189  //!< had a spawn failure recently.
190 
191 #define RELEASED_PERIOD 10000 //!< Period after which we don't care
192  //!< about when the last connection was
193  //!< released.
194 
195 #define RELEASED_MIN_WEIGHT 1000 //!< Minimum weight to assign to node.
196 
197 /** Live nodes data, used to perform weighted random selection of alternative nodes
198  */
199 typedef struct {
200  struct {
201  uint8_t id; //!< Node ID.
202  fr_pool_state_t const *pool_state; //!< Connection pool stats.
203  unsigned int cumulative; //!< Cumulative weight.
204  } node[UINT8_MAX - 1]; //!< Array of live node IDs (and weights).
205  uint8_t next; //!< Next index in live.
208 
209 /** A Redis cluster node
210  *
211  * Passed as opaque data to pools which open connection to nodes.
212  */
214  fr_rb_node_t rbnode; //!< Entry into the tree of redis nodes.
215 
216  char name[INET6_ADDRSTRLEN]; //!< Buffer to hold IP string.
217  //!< text for debug messages.
218  uint8_t id; //!< Node ID (index in node array).
219 
220  fr_socket_t addr; //!< Current node address.
221  fr_socket_t pending_addr; //!< New node address to be applied when the pool
222  //!< is reconnected.
223 
224  fr_redis_cluster_t *cluster; //!< Common configuration (database number,
225  //!< password, etc..).
226  fr_pool_t *pool; //!< Pool associated with this node.
227  CONF_SECTION *pool_cs; //!< Pool configuration section associated with node.
228 
229  bool is_active; //!< Whether this node is in the active node set.
230  bool is_master; //!< Whether this node is a master.
231  //!< This is needed for commands like 'KEYS', which
232  //!< we need to issue to every master in the cluster.
233 };
234 
235 /** Indexes in the fr_redis_cluster_node_t array for a single key slot
236  *
237  * When dealing with 16K entries, space is a concern. It's significantly
238  * more memory efficient to use 8bit indexes than 64bit pointers for each
239  * of the key slot to node mappings.
240  */
242  uint8_t slave[MAX_SLAVES]; //!< R/O node (slave) for this key slot.
243  uint8_t slave_num; //!< Number of slaves associated with this key slot.
244  uint8_t master; //!< R/W node (master) for this key slot.
245 };
246 
247 /** A redis cluster
248  *
249  * Holds all the structures and collections of nodes, to represent a Redis cluster.
250  */
252  char const *log_prefix; //!< What to prepend to log messages.
253  char const *trigger_prefix; //!< Trigger path.
254  fr_pair_list_t trigger_args; //!< Arguments to pass to triggers.
255  bool triggers_enabled; //!< Whether triggers are enabled.
256 
257  bool remapping; //!< True when cluster is being remapped.
258  bool remap_needed; //!< Set true if at least one cluster node is definitely
259  //!< unreachable. Set false on successful remap.
260  fr_time_t last_updated; //!< Last time the cluster mappings were updated.
261  CONF_SECTION *module; //!< Module configuration.
262 
263  fr_redis_conf_t *conf; //!< Base configuration data such as the database number
264  //!< and passwords.
265 #ifdef HAVE_REDIS_SSL
266  SSL_CTX *ssl_ctx; //!< SSL context.
267 #endif
268 
269  fr_redis_cluster_node_t *node; //!< Structure containing a node id, its address and
270  //!< a pool of its connections.
271 
272  fr_fifo_t *free_nodes; //!< Queue of free nodes (or nodes waiting to be reused).
273  fr_rb_tree_t *used_nodes; //!< Tree of used nodes.
274 
275  fr_redis_cluster_key_slot_t key_slot[KEY_SLOTS]; //!< Lookup table of slots to pools.
277 
278  pthread_mutex_t mutex; //!< Mutex to synchronise cluster operations.
279 };
280 
282  { L("bad-input"), FR_REDIS_CLUSTER_RCODE_BAD_INPUT },
283  { L("failed"), FR_REDIS_CLUSTER_RCODE_FAILED },
284  { L("ignored"), FR_REDIS_CLUSTER_RCODE_IGNORED },
285  { L("no-connection"), FR_REDIS_CLUSTER_RCODE_NO_CONNECTION },
286  { L("success"), FR_REDIS_CLUSTER_RCODE_SUCCESS }
287 };
289 
290 /** Resolve key to key slot
291  *
292  * Identical to the example implementation, except it uses memchr which will
293  * be faster, and isn't so needlessly complex.
294  *
295  * @param[in] key to resolve.
296  * @param[in] key_len length of key.
297  * @return key slot index for the key.
298  */
299 static uint16_t cluster_key_hash(uint8_t const *key, size_t key_len)
300 {
301  uint8_t *p, *q;
302 
303  p = memchr(key, '{', key_len);
304  if (!p) {
305  all:
306  return fr_crc16_xmodem(key, key_len) & (KEY_SLOTS - 1);
307  }
308 
309  q = memchr(p, '}', key_len - (p - key)); /* look for } after { */
310  if (!q || (q == p + 1)) goto all; /* no } or {}, hash everything */
311 
312  p++; /* skip '{' */
313 
314  return fr_crc16_xmodem(p, q - p) & (KEY_SLOTS - 1); /* hash stuff between { and } */
315 }
316 
317 /** Compare two redis nodes to check equality
318  *
319  * @param[in] one first node.
320  * @param[in] two second node.
321  * @return CMP(one, two)
322  */
323 static int8_t _cluster_node_cmp(void const *one, void const *two)
324 {
325  fr_redis_cluster_node_t const *a = one;
326  fr_redis_cluster_node_t const *b = two;
327  int ret;
328 
329  ret = fr_ipaddr_cmp(&a->addr.inet.dst_ipaddr, &b->addr.inet.dst_ipaddr);
330  if (ret != 0) return ret;
331 
332  return CMP(a->addr.inet.dst_port, b->addr.inet.dst_port);
333 }
334 
335 /** Reconnect callback to apply new pool config
336  *
337  * @param[in] pool to apply new configuration to.
338  * @param[in] opaque data passed to the connection pool.
339  */
340 static void _cluster_node_conf_apply(fr_pool_t *pool, void *opaque)
341 {
343  fr_redis_cluster_node_t *node = opaque;
344 
346  node->addr = node->pending_addr;
347 
348  if (node->cluster->triggers_enabled) {
349  trigger_args_afrom_server(pool, &args, node->name, node->addr.inet.dst_port);
350  if (fr_pair_list_empty(&args)) return;
351 
353  &node->cluster->trigger_args) >= 0);
354 
356 
358  }
359 }
360 
361 /** Establish a connection to a cluster node
362  *
363  * @note Must be called with the cluster mutex locked.
364  * @note Configuration to use for the connection must be set in node->pending_addr, not node->cluster->conf.
365  *
366  * @param[in] cluster to search in.
367  * @param[in] node config.
368  * @return
369  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
370  * - FR_REDIS_CLUSTER_RCODE_FAILED if the operation failed.
371  */
373 {
374  char const *p;
375 
376  fr_assert(node->pending_addr.inet.dst_ipaddr.af);
377 
378  /*
379  * Write out the IP address and Port in string form
380  */
381  p = inet_ntop(node->pending_addr.inet.dst_ipaddr.af, &node->pending_addr.inet.dst_ipaddr.addr,
382  node->name, sizeof(node->name));
384 
385  /*
386  * Node has never been used before, needs a pool allocated for it.
387  */
388  if (!node->pool) {
389  char buffer[256];
391  CONF_SECTION *pool;
392 
394  snprintf(buffer, sizeof(buffer), "%s [%i]", cluster->log_prefix, node->id);
395 
396  pool = cf_section_find(cluster->module, "pool", NULL);
397  /*
398  * Dup so we can re-parse, and have unique CONF_DATA
399  */
400  node->pool_cs = cf_section_dup(cluster, NULL, pool, "pool", NULL, true);
401  node->addr = node->pending_addr;
402  node->pool = fr_pool_init(cluster, node->pool_cs, node,
404  if (!node->pool) {
405  error:
406  TALLOC_FREE(node->pool_cs);
407  TALLOC_FREE(node->pool);
409  }
411 
412  if (trigger_enabled() && cluster->triggers_enabled) {
413  trigger_args_afrom_server(node->pool, &args, node->name, node->addr.inet.dst_port);
414  if (fr_pair_list_empty(&args)) goto error;
415 
416  if (!fr_pair_list_empty(&cluster->trigger_args)) MEM(fr_pair_list_copy(cluster, &args, &cluster->trigger_args) >= 0);
417 
419 
421  }
422 
423  if (fr_pool_start(node->pool) < 0) goto error;
424 
426  }
427 
428  /*
429  * Apply the new config to the possibly live pool
430  */
431  if (fr_pool_reconnect(node->pool, NULL) < 0) goto error;
432 
434 }
435 
436 /** Parse a -MOVED or -ASK redirect
437  *
438  * Converts the body of the -MOVED or -ASK error into an IPv4/6 address and port.
439  *
440  * @param[out] key_slot value extracted from redirect string (may be NULL).
441  * @param[out] node_addr Redis node ipaddr and port extracted from redirect string.
442  * @param[in] redirect to process.
443  * @return
444  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
445  * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT if the server returned an invalid redirect.
446  */
448  redisReply *redirect)
449 {
450  char *p, *q;
451  unsigned long key;
452  uint16_t port;
453  fr_ipaddr_t ipaddr;
454 
455  if (!redirect || (redirect->type != REDIS_REPLY_ERROR)) {
457  }
458 
459  p = redirect->str;
460  if (strncmp(REDIS_ERROR_MOVED_STR, redirect->str, sizeof(REDIS_ERROR_MOVED_STR) - 1) == 0) {
461  q = p + sizeof(REDIS_ERROR_MOVED_STR); /* not a typo, skip space too */
462  } else if (strncmp(REDIS_ERROR_ASK_STR, redirect->str, sizeof(REDIS_ERROR_ASK_STR) - 1) == 0) {
463  q = p + sizeof(REDIS_ERROR_ASK_STR); /* not a typo, skip space too */
464  } else {
465  fr_strerror_const("No '-MOVED' or '-ASK' log_prefix");
467  }
468  if ((size_t)(q - p) >= (size_t)redirect->len) {
469  fr_strerror_const("Truncated");
471  }
472  p = q;
473  key = strtoul(p, &q, 10);
474  if (key > KEY_SLOTS) {
475  fr_strerror_printf("Key %lu outside of redis slot range", key);
477  }
478  p = q;
479 
480  if (*p != ' ') {
481  fr_strerror_const("Missing key/host separator");
483  }
484  p++; /* Skip the ' ' */
485 
486  if (fr_inet_pton_port(&ipaddr, &port, p, redirect->len - (p - redirect->str), AF_UNSPEC, true, true) < 0) {
488  }
489  fr_assert(ipaddr.af);
490 
491  if (key_slot) *key_slot = key;
492  if (node_addr) {
493  node_addr->inet.dst_ipaddr = ipaddr;
494  node_addr->inet.dst_port = port;
495  }
496 
498 }
499 
500 /** Apply a cluster map received from a cluster node
501  *
502  * @note Errors may be retrieved with fr_strerror().
503  * @note Must be called with the cluster mutex held.
504  *
505  * Key slot range structure
506  @verbatim
507  [0] -> key slot range 0
508  [0] -> key_slot_start
509  [1] -> key_slot_end
510  [2] -> master_node
511  [0] -> master 0 ip (string)
512  [1] -> master 0 port (number)
513  [3..n] -> slave_node(s)
514  [1] -> key slot range 1)
515  [0] -> key_slot_start
516  [1] -> key_slot_end
517  [2] -> master_node
518  [0] -> master 1 ip (string)
519  [1] -> master 1 port (number)
520  [3..n] -> slave_node(s)
521  [n] -> key slot range n
522  [0] -> key_slot_start
523  [1] -> key_slot_end
524  [2] -> master_node
525  [0] -> master n ip (string)
526  [1] -> master n port (number)
527  [3..n] -> slave_node(s)
528  @endverbatim
529  *
530  * @param[in,out] cluster to apply map to.
531  * @param[in] reply from #cluster_map_get.
532  * @return
533  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
534  * - FR_REDIS_CLUSTER_RCODE_FAILED on failure.
535  * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
536  * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT if the map didn't provide nodes for all keyslots.
537  */
539 {
540  size_t i;
541  uint8_t r = 0;
542 
544 
545  uint8_t rollback[UINT8_MAX]; // Set of nodes to re-add to the queue on failure.
546  bool active[UINT8_MAX]; // Set of nodes active in the new cluster map.
547  bool master[UINT8_MAX]; // Master nodes.
548 #ifndef NDEBUG
549 # define SET_ADDR(_addr, _map) \
550 do { \
551  int _ret; \
552  _ret = fr_inet_pton(&_addr.inet.dst_ipaddr, _map->element[0]->str, _map->element[0]->len, AF_UNSPEC, true, true);\
553  fr_assert(_ret == 0);\
554  _addr.inet.dst_port = _map->element[1]->integer; \
555 } while (0)
556 #else
557 # define SET_ADDR(_addr, _map) \
558 do { \
559  fr_inet_pton(&_addr.inet.dst_ipaddr, _map->element[0]->str, _map->element[0]->len, AF_UNSPEC, true, true);\
560  _addr.inet.dst_port = _map->element[1]->integer; \
561 } while (0)
562 #endif
563 
564 #define SET_INACTIVE(_node) \
565 do { \
566  (_node)->is_active = false; \
567  (_node)->is_master = false; \
568  fr_rb_delete(cluster->used_nodes, _node); \
569  fr_fifo_push(cluster->free_nodes, _node); \
570 } while (0)
571 
572 #define SET_ACTIVE(_node) \
573 do { \
574  (_node)->is_active = true; \
575  fr_rb_insert(cluster->used_nodes, _node); \
576  fr_fifo_pop(cluster->free_nodes); \
577  active[(_node)->id] = true; \
578  rollback[r++] = (_node)->id; \
579 } while (0)
580 
581  fr_assert(reply->type == REDIS_REPLY_ARRAY);
582 
583  memset(&rollback, 0, sizeof(rollback));
584  memset(active, 0, sizeof(active));
585  memset(master, 0, sizeof(master));
586 
587  cluster->remapping = true;
588 
589  /*
590  * Must be cleared with the mutex held
591  */
592  memset(&cluster->key_slot_pending, 0, sizeof(cluster->key_slot_pending));
593 
594  /*
595  * Insert new nodes and markup the keyslot indexes
596  * in our temporary keyslot_array.
597  *
598  * A map consists of an array with the following indexes:
599  * [0] -> key_slot_start
600  * [1] -> key_slot_end
601  * [2] -> master_node
602  * [3..n] -> slave_node(s)
603  */
604  for (i = 0; i < reply->elements; i++) {
605  size_t j;
606  long long int k;
607  int slaves = 0;
608  fr_redis_cluster_node_t *found, *spare;
609  fr_redis_cluster_node_t find = {}; /* Initialise unused fields to stop Coverity complaints */
610  fr_redis_cluster_key_slot_t tmpl_slot;
611  redisReply *map = reply->element[i];
612 
613  memset(&tmpl_slot, 0, sizeof(tmpl_slot));
614 
615  SET_ADDR(find.addr, map->element[2]);
616  found = fr_rb_find(cluster->used_nodes, &find);
617  if (found) {
618  active[found->id] = true;
619  goto reuse_master_node;
620  }
621 
622  /*
623  * Process the master
624  *
625  * A master node consists of any array with the following indexes:
626  * [0] -> node ip (as string)
627  * [1] -> node port
628  */
629  spare = fr_fifo_peek(cluster->free_nodes);
630  if (!spare) {
631  out_of_nodes:
632  fr_strerror_const("Reached maximum connected nodes");
634  error:
635  cluster->remapping = false;
636  cluster->last_updated = fr_time();
637  /* Re-insert new nodes back into the free_nodes queue */
638  for (i = 0; i < r; i++) SET_INACTIVE(&cluster->node[rollback[i]]);
639  return rcode;
640  }
641 
642  spare->pending_addr = find.addr;
643  rcode = cluster_node_connect(cluster, spare);
644  if (rcode < 0) goto error;
645 
646  /*
647  * Check to see if the node we just configured
648  * already exists in the tree. If it does we
649  * use that, else we add it to the array of
650  * nodes to rollback on failure.
651  */
652  SET_ACTIVE(spare);
653  found = spare;
654 
655  reuse_master_node:
656  tmpl_slot.master = found->id;
657  master[found->id] = true; /* Mark this node as a master */
658 
659  /*
660  * Process the slaves
661  *
662  * A slave node consists of any array with the following indexes:
663  * [0] -> node ip (as string)
664  * [1] -> node port
665  */
666  for (j = 3; (j < map->elements); j++) {
667  SET_ADDR(find.addr, map->element[j]);
668  found = fr_rb_find(cluster->used_nodes, &find);
669  if (found) {
670  active[found->id] = true;
671  goto next;
672  }
673 
674  spare = fr_fifo_peek(cluster->free_nodes);
675  if (!spare) goto out_of_nodes;
676 
677  spare->pending_addr = find.addr;
678  if (cluster_node_connect(cluster, spare) < 0) continue; /* Slave failure is non-fatal */
679 
680  SET_ACTIVE(spare);
681  found = spare;
682 
683  next:
684  tmpl_slot.slave[slaves++] = found->id;
685 
686  /* Hit the maximum number of slaves we allow */
687  if (slaves >= MAX_SLAVES) break;
688  }
689  tmpl_slot.slave_num = slaves;
690 
691  /*
692  * Copy our tmpl key slot to each of the key slots
693  * specified by the range for this map.
694  */
695  for (k = map->element[0]->integer; k <= map->element[1]->integer; k++) {
696  memcpy(&cluster->key_slot_pending[k], &tmpl_slot,
697  sizeof(*(cluster->key_slot_pending)));
698  }
699  }
700 
701  /*
702  * Check for holes in the pending_addr key_slot array
703  *
704  * The cluster specification says that upon
705  * detecting a 'NULL' key_slot we should
706  * check again to see if the cluster error has
707  * been resolved, but seeing as we're in the
708  * middle of updating the cluster from very
709  * recent output of 'cluster slots' it's best to
710  * error out.
711  */
712  for (i = 0; i < KEY_SLOTS; i++) {
713  if (cluster->key_slot_pending[i].master == 0) {
714  fr_strerror_printf("Cluster is misconfigured, no node assigned for key %zu", i);
716  goto error;
717  }
718  }
719 
720  /*
721  * We have connections/pools for all the nodes in
722  * the new map, apply it to the live cluster.
723  *
724  * Other workers may be using the key slot table,
725  * but that's ok. Nodes and pools are never freed,
726  * so the worst that will happen, is they'll hit
727  * the wrong node for the key, and get redirected.
728  */
729  memcpy(&cluster->key_slot, &cluster->key_slot_pending, sizeof(cluster->key_slot));
730 
731  /*
732  * Anything not in the active set of nodes gets
733  * added back into the queue, to be re-used.
734  *
735  * We start at 1, as node 0 is reserved.
736  */
737  for (i = 1; i < cluster->conf->max_nodes; i++) {
738 #ifndef NDEBUG
740 
741  if (cluster->node[i].is_active) {
742  /* Sanity check for duplicates that are active */
743  found = fr_rb_find(cluster->used_nodes, &cluster->node[i]);
744  fr_assert(found);
745  fr_assert(found->is_active);
746  fr_assert(found->id == i);
747  }
748 #endif
749 
750  if (!active[i] && cluster->node[i].is_active) {
751  SET_INACTIVE(&cluster->node[i]); /* Sets is_master = false */
752 
753  /*
754  * Only change the masters once we've successfully
755  * remapped the cluster.
756  */
757  } else if (master[i]) {
758  cluster->node[i].is_master = true;
759  }
760  }
761 
762  cluster->remapping = false;
763  cluster->last_updated = fr_time();
764 
765  /*
766  * Sanity checks
767  */
768  fr_assert(((talloc_array_length(cluster->node) - 1) - fr_rb_num_elements(cluster->used_nodes)) ==
769  fr_fifo_num_elements(cluster->free_nodes));
770 
772 }
773 
774 /** Validate a cluster map node entry
775  *
776  * @note Errors may be retrieved with fr_strerror().
777  * @note In a separate function, as it's called for both master and slave nodes.
778  *
779  * @param[in] node we're validating.
780  * @param[in] map_idx we're processing.
781  * @param[in] node_idx we're processing.
782  * @return
783  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
784  * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
785  */
786 static int cluster_map_node_validate(redisReply *node, int map_idx, int node_idx)
787 {
788  fr_ipaddr_t ipaddr;
789 
790  if (node->type != REDIS_REPLY_ARRAY) {
791  fr_strerror_printf("Cluster map %i node %i is wrong type, expected array got %s",
792  map_idx, node_idx,
793  fr_table_str_by_value(redis_reply_types, node->element[1]->type, "<UNKNOWN>"));
795  }
796 
797  /*
798  * As per the redis docs: https://redis.io/commands/cluster-slots
799  *
800  * Newer versions of Redis Cluster will output, for each Redis instance,
801  * not just the IP and port, but also the node ID as third element of the
802  * array. In future versions there could be more elements describing the
803  * node better. In general a client implementation should just rely on
804  * the fact that certain parameters are at fixed positions as specified,
805  * but more parameters may follow and should be ignored.
806  * Similarly a client library should try if possible to cope with the fact
807  * that older versions may just have the IP and port parameter.
808  */
809  if (node->elements < 2) {
810  fr_strerror_printf("Cluster map %i node %i has incorrect number of elements, expected at least "
811  "2 got %zu", map_idx, node_idx, node->elements);
813  }
814 
815  if (node->element[0]->type != REDIS_REPLY_STRING) {
816  fr_strerror_printf("Cluster map %i node %i ip address is wrong type, expected string got %s",
817  map_idx, node_idx,
818  fr_table_str_by_value(redis_reply_types, node->element[0]->type, "<UNKNOWN>"));
820  }
821 
822  if (fr_inet_pton(&ipaddr, node->element[0]->str, node->element[0]->len, AF_UNSPEC, true, true) < 0) {
824  }
825 
826  if (node->element[1]->type != REDIS_REPLY_INTEGER) {
827  fr_strerror_printf("Cluster map %i node %i port is wrong type, expected integer got %s",
828  map_idx, node_idx,
829  fr_table_str_by_value(redis_reply_types, node->element[1]->type, "<UNKNOWN>"));
831  }
832 
833  if (node->element[1]->integer < 0) {
834  fr_strerror_printf("Cluster map %i node %i port is too low, expected >= 0 got %lli",
835  map_idx, node_idx, node->element[1]->integer);
837  }
838 
839  if (node->element[1]->integer > UINT16_MAX) {
840  fr_strerror_printf("Cluster map %i node %i port is too high, expected <= " STRINGIFY(UINT16_MAX)" "
841  "got %lli", map_idx, node_idx, node->element[1]->integer);
843  }
844 
846 }
847 
848 /** Learn a new cluster layout by querying the node that issued the -MOVE
849  *
850  * Also validates the response from the Redis cluster, so we can be sure that
851  * it's well formed, before doing more expensive operations.
852  *
853  * @note Errors may be retrieved with fr_strerror().
854  *
855  * @param[out] out Where to write cluster map.
856  * @param[in] conn to use for learning the new cluster map.
857  * @return
858  * - FR_REDIS_CLUSTER_RCODE_IGNORED if 'cluster slots' returned an error (indicating clustering not supported).
859  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
860  * - FR_REDIS_CLUSTER_RCODE_FAILED if issuing the command resulted in an error.
861  * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
862  * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
863  */
865 {
866  redisReply *reply;
867  size_t i = 0;
868 
869  *out = NULL;
870 
871  reply = redisCommand(conn->handle, "cluster slots");
872  switch (fr_redis_command_status(conn, reply)) {
874  fr_redis_reply_free(&reply);
875  fr_strerror_const("No connections available");
877 
878  case REDIS_RCODE_ERROR:
879  default:
880  if (reply && reply->type == REDIS_REPLY_ERROR) {
881  fr_strerror_printf("%.*s", (int)reply->len, reply->str);
882  fr_redis_reply_free(&reply);
884  }
885  fr_strerror_const("Unknown client error");
887 
888  case REDIS_RCODE_SUCCESS:
889  break;
890  }
891 
892  if (reply->type != REDIS_REPLY_ARRAY) {
893  fr_strerror_printf("Bad response to \"cluster slots\" command, expected array got %s",
894  fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
896  }
897 
898  /*
899  * Clustering configured but no slots set
900  */
901  if (reply->elements == 0) {
902  fr_strerror_printf("Empty response to \"cluster slots\" command (zero length array)");
904  }
905 
906  /*
907  * Validate the complete map set before returning.
908  */
909  for (i = 0; i < reply->elements; i++) {
910  size_t j;
911  redisReply *map;
912 
913  map = reply->element[i];
914  if (map->type != REDIS_REPLY_ARRAY) {
915  fr_strerror_printf("Cluster map %zu is wrong type, expected array got %s",
916  i, fr_table_str_by_value(redis_reply_types, map->type, "<UNKNOWN>"));
917  error:
918  fr_redis_reply_free(&reply);
920  }
921 
922  if (map->elements < 3) {
923  fr_strerror_printf("Cluster map %zu has too few elements, expected at least 3, got %zu",
924  i, map->elements);
925  goto error;
926  }
927 
928  /*
929  * Key slot start
930  */
931  if (map->element[0]->type != REDIS_REPLY_INTEGER) {
932  fr_strerror_printf("Cluster map %zu key slot start is wrong type, expected integer got %s",
933  i, fr_table_str_by_value(redis_reply_types, map->element[0]->type, "<UNKNOWN>"));
934  goto error;
935  }
936 
937  if (map->element[0]->integer < 0) {
938  fr_strerror_printf("Cluster map %zu key slot start is too low, expected >= 0 got %lli",
939  i, map->element[0]->integer);
940  goto error;
941  }
942 
943  if (map->element[0]->integer > KEY_SLOTS) {
944  fr_strerror_printf("Cluster map %zu key slot start is too high, expected <= "
945  STRINGIFY(KEY_SLOTS) " got %lli", i, map->element[0]->integer);
946  goto error;
947  }
948 
949  /*
950  * Key slot end
951  */
952  if (map->element[1]->type != REDIS_REPLY_INTEGER) {
953  fr_strerror_printf("Cluster map %zu key slot end is wrong type, expected integer got %s",
954  i, fr_table_str_by_value(redis_reply_types, map->element[1]->type, "<UNKNOWN>"));
955  goto error;
956  }
957 
958  if (map->element[1]->integer < 0) {
959  fr_strerror_printf("Cluster map %zu key slot end is too low, expected >= 0 got %lli",
960  i, map->element[1]->integer);
961  goto error;
962  }
963 
964  if (map->element[1]->integer > KEY_SLOTS) {
965  fr_strerror_printf("Cluster map %zu key slot end is too high, expected <= "
966  STRINGIFY(KEY_SLOTS) " got %lli", i, map->element[1]->integer);
967  goto error;
968  }
969 
970  if (map->element[1]->integer < map->element[0]->integer) {
971  fr_strerror_printf("Cluster map %zu key slot start/end out of order. "
972  "Start was %lli, end was %lli", i, map->element[0]->integer,
973  map->element[1]->integer);
974  goto error;
975  }
976 
977  /*
978  * Master node
979  */
980  if (cluster_map_node_validate(map->element[2], i, 0) < 0) goto error;
981 
982  /*
983  * Slave nodes
984  */
985  for (j = 3; j < map->elements; j++) {
986  if (cluster_map_node_validate(map->element[j], i, j - 2) < 0) goto error;
987  }
988  }
989  *out = reply;
990 
992 }
993 
994 /** Perform a runtime remap of the cluster
995  *
996  * @note Errors may be retrieved with fr_strerror().
997  * @note Must be called with the cluster mutex free.
998  *
999  * @param[in] request The current request.
1000  * @param[in,out] cluster to remap.
1001  * @param[in] conn to use to query the cluster.
1002  * @return
1003  * - FR_REDIS_CLUSTER_RCODE_IGNORED if 'cluster slots' returned an error (indicating clustering not supported).
1004  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
1005  * - FR_REDIS_CLUSTER_RCODE_FAILED if issuing the 'cluster slots' command resulted in a protocol error.
1006  * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
1007  * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
1008  */
1010 {
1011  fr_time_t now;
1012  redisReply *map;
1014  size_t i, j;
1015 
1016  /*
1017  * If the cluster was remapped very recently, or is being
1018  * remapped it's unlikely that it needs remapping again.
1019  */
1020  if (cluster->remapping) {
1021  in_progress:
1022  ROPTIONAL(RDEBUG2, DEBUG2, "Cluster remapping in progress, ignoring remap request");
1024  }
1025 
1026  /*
1027  * The remap times are _our_ times, not the _request_ time.
1028  */
1029  now = fr_time();
1030  if (fr_time_to_sec(now) == fr_time_to_sec(cluster->last_updated)) {
1031  too_soon:
1032  ROPTIONAL(RWARN, WARN, "Cluster was updated less than a second ago, ignoring remap request");
1034  }
1035 
1036  ROPTIONAL(RINFO, INFO, "Initiating cluster remap");
1037 
1038  /*
1039  * Get new cluster information
1040  */
1041  ret = cluster_map_get(&map, conn);
1042  switch (ret) {
1043  case FR_REDIS_CLUSTER_RCODE_BAD_INPUT: /* Validation error */
1044  case FR_REDIS_CLUSTER_RCODE_NO_CONNECTION: /* Connection error */
1045  case FR_REDIS_CLUSTER_RCODE_FAILED: /* Error issuing command */
1046  return ret;
1047 
1048  case FR_REDIS_CLUSTER_RCODE_IGNORED: /* Clustering not enabled, or not supported */
1049  cluster->remap_needed = false;
1051 
1052  case FR_REDIS_CLUSTER_RCODE_SUCCESS: /* Success */
1053  break;
1054  }
1055 
1056  /*
1057  * Print the mapping we received
1058  */
1059  ROPTIONAL(RINFO, INFO, "Cluster map consists of %zu key ranges", map->elements);
1060  for (i = 0; i < map->elements; i++) {
1061  redisReply *map_node = map->element[i];
1062 
1063  ROPTIONAL(RINFO, INFO, "%zu - keys %lli-%lli", i,
1064  map_node->element[0]->integer,
1065  map_node->element[1]->integer);
1066 
1067  if (request) RINDENT();
1068  ROPTIONAL(RINFO, INFO, "master: %s:%lli",
1069  map_node->element[2]->element[0]->str,
1070  map_node->element[2]->element[1]->integer);
1071  for (j = 3; j < map_node->elements; j++) {
1072  ROPTIONAL(RINFO, INFO, "slave%zu: %s:%lli", j - 3,
1073  map_node->element[j]->element[0]->str,
1074  map_node->element[j]->element[1]->integer);
1075  }
1076  if (request) REXDENT();
1077  }
1078 
1079  /*
1080  * Check again that the cluster isn't being
1081  * remapped, or was remapped too recently,
1082  * now we hold the mutex and the state of
1083  * those variables is synchronized.
1084  */
1085  pthread_mutex_lock(&cluster->mutex);
1086  if (cluster->remapping) {
1087  pthread_mutex_unlock(&cluster->mutex);
1088  fr_redis_reply_free(&map); /* Free the map */
1089  goto in_progress;
1090  }
1091  if (fr_time_to_sec(now) == fr_time_to_sec(cluster->last_updated)) {
1092  pthread_mutex_unlock(&cluster->mutex);
1093  fr_redis_reply_free(&map); /* Free the map */
1094  goto too_soon;
1095  }
1096  ret = cluster_map_apply(cluster, map);
1097  if (ret == FR_REDIS_CLUSTER_RCODE_SUCCESS) cluster->remap_needed = false; /* Change on successful remap */
1098  pthread_mutex_unlock(&cluster->mutex);
1099 
1100  fr_redis_reply_free(&map); /* Free the map */
1101  if (ret < 0) return FR_REDIS_CLUSTER_RCODE_FAILED;
1102 
1104 }
1105 
1106 /** Retrieve or associate a node with the server indicated in the redirect
1107  *
1108  * @note Errors may be retrieved with fr_strerror().
1109  *
1110  * @param[out] out Where to write the node representing the redirect server.
1111  * @param[in] cluster to draw node from.
1112  * @param[in] reply Redis reply containing the redirect information.
1113  * @return
1114  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
1115  * - FR_REDIS_CLUSTER_RCODE_FAILED no more nodes available.
1116  * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
1117  * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
1118  */
1120 {
1121  fr_redis_cluster_node_t find, *found, *spare;
1122  fr_redis_conn_t *rconn;
1123 
1124  uint16_t key;
1125 
1126  memset(&find, 0, sizeof(find));
1127 
1128  *out = NULL;
1129 
1130  if (cluster_node_conf_from_redirect(&key, &find.addr, reply) < 0) return FR_REDIS_CLUSTER_RCODE_FAILED;
1131 
1132  pthread_mutex_lock(&cluster->mutex);
1133  /*
1134  * If we have already have a pool for the
1135  * host we were redirected to, use that.
1136  */
1137  found = fr_rb_find(cluster->used_nodes, &find);
1138  if (found) {
1139  /* We have the new pool, don't need to hold the lock */
1140  pthread_mutex_unlock(&cluster->mutex);
1141  *out = found;
1143  }
1144 
1145  /*
1146  * Otherwise grab a free node and try and connect
1147  * it to the server we were redirected to.
1148  */
1149  spare = fr_fifo_peek(cluster->free_nodes);
1150  if (!spare) {
1151  fr_strerror_const("Reached maximum connected nodes");
1152  pthread_mutex_unlock(&cluster->mutex);
1154  }
1155  spare->pending_addr = find.addr; /* Set the config to be applied */
1156  if (cluster_node_connect(cluster, spare) < 0) {
1157  pthread_mutex_unlock(&cluster->mutex);
1159  }
1160  fr_rb_insert(cluster->used_nodes, spare);
1161  fr_fifo_pop(cluster->free_nodes);
1162  found = spare;
1163 
1164  /* We have the new pool, don't need to hold the lock */
1165  pthread_mutex_unlock(&cluster->mutex);
1166 
1167  /*
1168  * Determine if we can establish a connection to
1169  * the new pool, to check if it's viable.
1170  */
1171  rconn = fr_pool_connection_get(found->pool, NULL);
1172  if (!rconn) {
1173  /*
1174  * To prevent repeated misconfigurations
1175  * using all free nodes, add the node
1176  * back to the spare queue if this
1177  * was the first connection attempt and
1178  * it failed.
1179  */
1180  pthread_mutex_lock(&cluster->mutex);
1181  fr_fifo_push(cluster->free_nodes, spare);
1182  pthread_mutex_unlock(&cluster->mutex);
1183 
1184  fr_strerror_const("No connections available");
1186  }
1187  fr_pool_connection_release(found->pool, NULL, rconn);
1188  *out = found;
1189 
1191 }
1192 
1193 
1194 /** Try to determine the health of a cluster node passively by examining its pool state
1195  *
1196  * Returns an integer value representing the likelihood that the pool is live.
1197  * Range is between 1 and 11,000.
1198  *
1199  * If a weight of 1 is returned, connections from the pool should be checked
1200  * (by pinging) before use.
1201  *
1202  * @param now The current time.
1203  * @param state of the connection pool.
1204  * @return
1205  * - 1 the pool is very likely to be bad.
1206  * - 2-11000 the pool is likely to be good, with a higher number
1207  * indicating higher probability of liveness.
1208  */
1210 {
1211  /*
1212  * Failed spawn recently, probably bad
1213  */
1215 
1216  /*
1217  * Closed recently, probably bad
1218  */
1220 
1221  /*
1222  * Released too long ago, don't know
1223  */
1225 
1226  /*
1227  * Released not long ago, might be ok.
1228  */
1230 }
1231 
1232 /** Issue a ping request against a cluster node
1233  *
1234  * Establishes whether the connection to the node we have is live.
1235  *
1236  * @param request The current request.
1237  * @param node to ping.
1238  * @param conn the connection to ping on.
1239  * @return
1240  * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT if we got a bad response.
1241  * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
1242  * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION on connection down.
1243  */
1245 {
1246  redisReply *reply;
1247  fr_redis_rcode_t rcode;
1248 
1249  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Executing command: PING", node->id);
1250  reply = redisCommand(conn->handle, "PING");
1251  rcode = fr_redis_command_status(conn, reply);
1252  if (rcode != REDIS_RCODE_SUCCESS) {
1253  ROPTIONAL(RPERROR, PERROR, "[%i] PING failed to %s:%i", node->id, node->name, node->addr.inet.dst_port);
1254  fr_redis_reply_free(&reply);
1256  }
1257 
1258  if (reply->type != REDIS_REPLY_STATUS) {
1259  ROPTIONAL(RERROR, ERROR, "[%i] Bad PING response from %s:%i, expected status got %s",
1260  node->id, node->name, node->addr.inet.dst_port,
1261  fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
1262  fr_redis_reply_free(&reply);
1264  }
1265 
1266  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Got response: %s", node->id, reply->str);
1267  fr_redis_reply_free(&reply);
1269 }
1270 
1271 /** Attempt to find a live pool in the cluster
1272  *
1273  * The intent here is to find pools/nodes where a connection was released the shortest
1274  * time ago. Having a connection be released (vs closed) indicates that the pool is live.
1275  *
1276  * We don't want to have all workers try and grab a connection to this node however, as it
1277  * may still be dead (we don't know).
1278  *
1279  * So we use an inverse transform sample, to weight the nodes, based on time between now
1280  * and when the connection was released. Connections released closest to the current
1281  * time are given a higher weighting.
1282  *
1283  * Weight range is between 1 - 11,000.
1284  *
1285  * - If released > 10.0 seconds ago,information is not valid, weight 500.
1286  * - If closed < 10.0 seconds ago, it's a bad pool, weight 1.
1287  * - If spawn failed < 10.0 seconds ago, it's a bad pool, weight 1.
1288  * - If a connection was released 0.0 seconds ago, weight 11,000.
1289  * - If a connection was released 10.0 seconds ago, weight 1000.
1290  *
1291  * Using the above algorithm we use the experience of other workers using the cluster to
1292  * inform our alternative node selection.
1293  *
1294  * Suggestions on improving live node selection appreciated.
1295  *
1296  * Inverse transform sampling based roughly on the solution from this post:
1297  * http://stackoverflow.com/questions/17250568/randomly-choosing-from-a-list-with-weighted-probabilities
1298  *
1299  * Wikipedia page here:
1300  * https://en.wikipedia.org/wiki/Inverse_transform_sampling
1301  *
1302  * @note Must be called with the cluster mutex free.
1303  *
1304  * @param[out] live_node we found.
1305  * @param[out] live_conn to that node.
1306  * @param[in] request The current request (used for logging).
1307  * @param[in] cluster to search for live pools in.
1308  * @param[in] skip this node (it's bad).
1309  * @return 0 (iterates over the whole tree).
1310  */
1312  request_t *request, fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *skip)
1313 {
1314  uint32_t i;
1315 
1316  cluster_nodes_live_t *live;
1317  fr_time_t now;
1320 
1321  ROPTIONAL(RDEBUG2, DEBUG2, "Searching for live cluster nodes");
1322 
1323  if (fr_rb_num_elements(cluster->used_nodes) == 1) {
1324  no_alts:
1325  ROPTIONAL(RERROR, ERROR, "No alternative nodes available");
1326  return -1;
1327  }
1328 
1329  live = talloc_zero(NULL, cluster_nodes_live_t); /* Too big for stack */
1330  live->skip = skip->id;
1331 
1332  pthread_mutex_lock(&cluster->mutex);
1333  for (node = fr_rb_iter_init_inorder(&iter, cluster->used_nodes);
1334  node;
1335  node = fr_rb_iter_next_inorder(&iter)) {
1336  fr_assert(node->pool);
1337  if (live->skip == node->id) continue; /* Skip dead nodes */
1338 
1339  live->node[live->next].pool_state = fr_pool_state(node->pool);
1340  live->node[live->next++].id = node->id;
1341  }
1342  pthread_mutex_unlock(&cluster->mutex);
1343 
1344  fr_assert(live->next); /* There should be at least one */
1345  if (live->next == 1) goto no_alts; /* Weird, but conceivable */
1346 
1347  now = fr_time();
1348 
1349  /*
1350  * Weighted random selection
1351  */
1352  for (i = 0; (i < cluster->conf->max_alt) && live->next; i++) {
1353  fr_redis_conn_t *conn;
1354  uint8_t j;
1355  int first, last, pivot; /* Must be signed for BS */
1356  unsigned int find, cumulative = 0;
1357 
1358  ROPTIONAL(RDEBUG3, DEBUG2, "(Re)assigning node weights:");
1359  if (request) RINDENT();
1360  for (j = 0; j < live->next; j++) {
1361  int weight;
1362 
1363  weight = cluster_node_pool_health(now, live->node[j].pool_state);
1364  ROPTIONAL(RDEBUG3, DEBUG3, "Node %i weight: %i", live->node[j].id, weight);
1365  live->node[j].cumulative = (cumulative += weight);
1366  }
1367  if (request) REXDENT();
1368 
1369  /*
1370  * Select a node at random
1371  */
1372  find = (fr_rand() & (cumulative - 1)); /* Between 1 and total */
1373  first = 0;
1374  last = live->next - 1;
1375  pivot = (first + last) / 2;
1376 
1377  while (first <= last) {
1378  if (live->node[pivot].cumulative < find) {
1379  first = pivot + 1;
1380  } else if (live->node[pivot].cumulative == find) {
1381  break;
1382  } else {
1383  last = pivot - 1;
1384  }
1385  pivot = (first + last) / 2;
1386  }
1387  /*
1388  * Round up...
1389  */
1390  if (first > last) pivot = last + 1;
1391 
1392  /*
1393  * Resolve the index to the actual node. We use IDs
1394  * to save memory...
1395  */
1396  node = &cluster->node[live->node[pivot].id];
1397  fr_assert(live->node[pivot].id == node->id);
1398 
1399  ROPTIONAL(RDEBUG2, DEBUG2, "Selected node %i (using random value %i)", node->id, find);
1400  conn = fr_pool_connection_get(node->pool, request);
1401  if (!conn) {
1402  ROPTIONAL(RERROR, ERROR, "No connections available to node %i %s:%i", node->id,
1403  node->name, node->addr.inet.dst_port);
1404  next:
1405  /*
1406  * Remove the node we just discovered was bad
1407  * out of the set of nodes we're selecting over.
1408  */
1409  if (pivot == live->next) {
1410  live->next--;
1411  continue;
1412  }
1413  memcpy(&live->node[pivot], &live->node[live->next - 1], sizeof(live->node[pivot]));
1414  live->next--;
1415  continue;
1416  }
1417 
1418  /*
1419  * PING! PONG?
1420  */
1421  switch (cluster_node_ping(request, node, conn)) {
1423  break;
1424 
1426  fr_pool_connection_close(node->pool, request, conn);
1427  goto next;
1428 
1429  default:
1430  fr_pool_connection_release(node->pool, request, conn);
1431  goto next;
1432  }
1433 
1434  *live_node = node;
1435  *live_conn = conn;
1436  talloc_free(live);
1437 
1438  return 0;
1439  }
1440 
1441  ROPTIONAL(RERROR, ERROR, "Hit max alt limit %i, and no live connections found", cluster->conf->max_alt);
1442  talloc_free(live);
1443 
1444  return -1;
1445 }
1446 
1447 /** Callback for freeing a Redis connection
1448  *
1449  * @param[in] conn to free.
1450  * @return 0.
1451  */
1453 {
1454  redisFree(conn->handle);
1455 
1456  return 0;
1457 }
1458 
1459 /** Create a new connection to a Redis node
1460  *
1461  * @param[in] ctx to allocate connection structure in. Will be freed at the same time as the pool.
1462  * @param[in] instance data of type #fr_redis_cluster_node_t. Holds parameters for establishing new connection.
1463  * @param[in] timeout The maximum time allowed to complete the connection.
1464  * @return
1465  * - New #fr_redis_conn_t on success.
1466  * - NULL on failure.
1467  */
1468 void *fr_redis_cluster_conn_create(TALLOC_CTX *ctx, void *instance, fr_time_delta_t timeout)
1469 {
1470  fr_redis_cluster_node_t *node = instance;
1471  fr_redis_conn_t *conn = NULL;
1472  redisContext *handle;
1473  redisReply *reply = NULL;
1474  char const *log_prefix = node->cluster->log_prefix;
1475 
1476  DEBUG2("%s - [%i] Connecting to node %s:%i", log_prefix, node->id, node->name, node->addr.inet.dst_port);
1477 
1478  handle = redisConnectWithTimeout(node->name, node->addr.inet.dst_port, fr_time_delta_to_timeval(timeout));
1479  if ((handle != NULL) && handle->err) {
1480  ERROR("%s - [%i] Connection failed: %s", log_prefix, node->id, handle->errstr);
1481  redisFree(handle);
1482  return NULL;
1483  } else if (!handle) {
1484  ERROR("%s - [%i] Connection failed", log_prefix, node->id);
1485  return NULL;
1486  }
1487 
1488  conn = talloc_zero(ctx, fr_redis_conn_t);
1489  conn->handle = handle;
1490  conn->node = node;
1491  talloc_set_destructor(conn, _cluster_conn_free);
1492 
1493 #ifdef HAVE_REDIS_SSL
1494  if (node->cluster->ssl_ctx != NULL) {
1495  fr_tls_session_t *tls_session = fr_tls_session_alloc_client(conn, node->cluster->ssl_ctx);
1496  if (!tls_session) {
1497  fr_tls_strerror_printf("%s - [%i]", log_prefix, node->id);
1498  ERROR("%s - [%i] Failed to allocate TLS session", log_prefix, node->id);
1499  talloc_free(conn);
1500  return NULL;
1501  }
1502 
1503  // redisInitiateSSL() takes ownership of SSL object on success
1504  SSL_up_ref(tls_session->ssl);
1505  if (redisInitiateSSL(handle, tls_session->ssl) != REDIS_OK) {
1506  ERROR("%s - [%i] Failed to initiate SSL: %s", log_prefix, node->id, handle->errstr);
1507  SSL_free(tls_session->ssl);
1508  talloc_free(conn);
1509  return NULL;
1510  }
1511  }
1512 #endif
1513 
1514  if (node->cluster->conf->password) {
1515  if (node->cluster->conf->username) {
1516  DEBUG3("%s - [%i] Executing: AUTH %s %s", log_prefix, node->id,
1517  node->cluster->conf->username,
1518  node->cluster->conf->password);
1519  reply = redisCommand(handle, "AUTH %s %s",
1520  node->cluster->conf->username,
1521  node->cluster->conf->password);
1522  } else {
1523  DEBUG3("%s - [%i] Executing: AUTH %s", log_prefix, node->id, node->cluster->conf->password);
1524  reply = redisCommand(handle, "AUTH %s", node->cluster->conf->password);
1525  }
1526  if (!reply) {
1527  ERROR("%s - [%i] Failed authenticating: %s", log_prefix, node->id, handle->errstr);
1528  error:
1529  if (reply) fr_redis_reply_free(&reply);
1530  talloc_free(conn);
1531  return NULL;
1532  }
1533 
1534  switch (reply->type) {
1535  case REDIS_REPLY_STATUS:
1536  if (strcmp(reply->str, "OK") != 0) {
1537  ERROR("%s - [%i] Failed authenticating: %s", log_prefix,
1538  node->id, reply->str);
1539  goto error;
1540  }
1541  fr_redis_reply_free(&reply);
1542  break; /* else it's OK */
1543 
1544  case REDIS_REPLY_ERROR:
1545  ERROR("%s - [%i] Failed authenticating: %s", log_prefix, node->id, reply->str);
1546  goto error;
1547 
1548  default:
1549  ERROR("%s - [%i] Unexpected reply of type %s to AUTH", log_prefix, node->id,
1550  fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
1551  goto error;
1552  }
1553  }
1554 
1555  if (node->cluster->conf->database) {
1556  DEBUG3("%s - [%i] Executing: SELECT %i", log_prefix, node->id, node->cluster->conf->database);
1557  reply = redisCommand(handle, "SELECT %i", node->cluster->conf->database);
1558  if (!reply) {
1559  ERROR("%s - [%i] Failed selecting database %i: %s", log_prefix, node->id,
1560  node->cluster->conf->database, handle->errstr);
1561  goto error;
1562  }
1563 
1564  switch (reply->type) {
1565  case REDIS_REPLY_STATUS:
1566  if (strcmp(reply->str, "OK") != 0) {
1567  ERROR("%s - [%i] Failed selecting database %i: %s", log_prefix, node->id,
1568  node->cluster->conf->database, reply->str);
1569  goto error;
1570  }
1571  fr_redis_reply_free(&reply);
1572  break; /* else it's OK */
1573 
1574  case REDIS_REPLY_ERROR:
1575  ERROR("%s - [%i] Failed selecting database %i: %s", log_prefix, node->id,
1576  node->cluster->conf->database, reply->str);
1577  goto error;
1578 
1579  default:
1580  ERROR("%s - [%i] Unexpected reply of type %s, to SELECT", log_prefix, node->id,
1581  fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
1582  goto error;
1583  }
1584  }
1585 
1586  return conn;
1587 }
1588 
1589 /** Implements the key slot selection scheme used by freeradius
1590  *
1591  * Like the scheme in the clustering specification but with some differences
1592  * if the key is NULL or zero length, then a random keyslot is chosen.
1593  *
1594  * If there's only a single node in the cluster, then we avoid the CRC16
1595  * and just use key slot 0.
1596  *
1597  * @param cluster to determine key slot for.
1598  * @param request The current request.
1599  * @param key the key to resolve.
1600  * @param key_len the length of the key.
1601  * @return pointer to key slot key resolves to.
1602  */
1604  uint8_t const *key, size_t key_len)
1605 {
1606  fr_redis_cluster_key_slot_t *key_slot;
1607 
1608  if (!key || (key_len == 0)) {
1609  key_slot = &cluster->key_slot[(uint16_t)(fr_rand() & (KEY_SLOTS - 1))];
1610  ROPTIONAL(RDEBUG2, DEBUG2, "Key rand() -> slot %zu", key_slot - cluster->key_slot);
1611 
1612  return key_slot;
1613  }
1614 
1615  /*
1616  * Avoid CRC16 if we're operating with one cluster node or
1617  * without clustering.
1618  */
1619  if (fr_rb_num_elements(cluster->used_nodes) > 1) {
1620  key_slot = &cluster->key_slot[cluster_key_hash(key, key_len)];
1621  ROPTIONAL(RDEBUG2, DEBUG2, "Key \"%pV\" -> slot %zu",
1622  fr_box_strvalue_len((char const *)key, key_len), key_slot - cluster->key_slot);
1623 
1624  return key_slot;
1625  }
1626  ROPTIONAL(RDEBUG3, DEBUG3, "Single node available, skipping key selection");
1627 
1628  return &cluster->key_slot[0];
1629 }
1630 
1631 /** Return the master node that would be used for a particular key
1632  *
1633  * @param[in] cluster To resolve key in.
1634  * @param[in] key_slot to resolve to node.
1635  * @return
1636  * - The current master node.
1637  * - NULL if no master node is currently assigned to a particular key slot.
1638  */
1640  fr_redis_cluster_key_slot_t const *key_slot)
1641 {
1642  return &cluster->node[key_slot->master];
1643 }
1644 
1645 /** Return the slave node that would be used for a particular key
1646  *
1647  * @param[in] cluster To resolve key in.
1648  * @param[in] key_slot To resolve to node.
1649  * @param[in] slave_num 0..n.
1650  * @return
1651  * - A slave node.
1652  * - NULL if no slave node is assigned, or is at the specific key slot.
1653  *
1654  */
1656  fr_redis_cluster_key_slot_t const *key_slot,
1657  uint8_t slave_num)
1658 {
1659  if (slave_num >= key_slot->slave_num) return NULL; /* No slave available */
1660 
1661  return &cluster->node[key_slot->slave[slave_num]];
1662 }
1663 
1664 /** Return the ipaddr of a particular node
1665  *
1666  * @param[out] out Ipaddr of the node.
1667  * @param[in] node to get ip address from.
1668  * @return
1669  * - 0 on success.
1670  * - -1 on failure (node is NULL).
1671  */
1673 {
1674  if (!node) return -1;
1675 
1676  memcpy(out, &node->addr.inet.dst_ipaddr, sizeof(*out));
1677 
1678  return 0;
1679 }
1680 
1681 /** Return the port of a particular node
1682  *
1683  * @param[out] out Port of the node.
1684  * @param[in] node to get ip address from.
1685  * @return
1686  * - 0 on success.
1687  * - -1 on failure (node is NULL).
1688  */
1690 {
1691  if (!node) return -1;
1692 
1693  *out = node->addr.inet.dst_port;
1694 
1695  return 0;
1696 }
1697 
1698 /** Resolve a key to a pool, and reserve a connection in that pool
1699  *
1700  * This should be used with #fr_redis_cluster_state_next, and #fr_redis_command_status, to
1701  * transparently locate the cluster node we need to perform the operation on.
1702  *
1703  * Example code below shows how this function is used in conjunction
1704  * with #fr_redis_cluster_state_next to follow redirects, and reconnect handles.
1705  *
1706  @code{.c}
1707  int s_ret;
1708  redis_conn_state state;
1709  fr_redis_conn_t *conn;
1710  redisReply *reply;
1711  fr_redis_rcode_t status;
1712 
1713  for (s_ret = fr_redis_cluster_state_init(&state, &conn, cluster, key, key_len, false);
1714  s_ret == REDIS_RCODE_TRY_AGAIN,
1715  s_ret = fr_redis_cluster_state_next(&state, &conn, cluster, request, status, &reply)) {
1716  reply = redisCommand(conn->handle, "SET foo bar");
1717  status = fr_redis_command_status(conn, reply);
1718  }
1719  // Reply is freed if ret == REDIS_RCODE_TRY_AGAIN, but left in all other cases to allow error
1720  // processing, or extraction of results.
1721  fr_redis_reply_free(&reply);
1722  if (s_ret != REDIS_RCODE_SUCCESS) {
1723  // Error
1724  }
1725  // Success
1726  @endcode
1727  *
1728  * @param[out] state to track current pool and various counters, will be initialised.
1729  * @param[out] conn Where to write the reserved connection to.
1730  * @param[in] cluster of pools.
1731  * @param[in] request The current request.
1732  * @param[in] key to resolve to a cluster node/pool. If no key is NULL or key_len is 0 a random
1733  * slot will be chosen.
1734  * @param[in] key_len Length of the key.
1735  * @param[in] read_only If true, will use random slave pool in preference to the master, falling
1736  * back to the master if no slaves are available.
1737  * @return
1738  * - REDIS_RCODE_TRY_AGAIN - try your command with this connection (provided via command).
1739  * - REDIS_RCODE_RECONNECT - when no additional connections available.
1740  */
1742  fr_redis_cluster_t *cluster, request_t *request,
1743  uint8_t const *key, size_t key_len, bool read_only)
1744 {
1746  fr_redis_cluster_key_slot_t const *key_slot;
1747  uint8_t first, i;
1748  uint64_t used_nodes;
1749 
1750  fr_assert(cluster);
1751  fr_assert(state);
1752  fr_assert(conn);
1753 
1754  memset(state, 0, sizeof(*state));
1755  *conn = NULL; /* Better safe than exploding */
1756 
1757  used_nodes = fr_rb_num_elements(cluster->used_nodes);
1758  if (used_nodes == 0) {
1759  ROPTIONAL(REDEBUG, ERROR, "No nodes in cluster");
1760  return REDIS_RCODE_RECONNECT;
1761  }
1762 
1763 again:
1764  key_slot = fr_redis_cluster_slot_by_key(cluster, request, key, key_len);
1765 
1766  /*
1767  * 1. Try each of the slaves for the key slot
1768  * 2. Fall through to trying the master, and a single alternate node.
1769  */
1770  if (read_only) {
1771  first = fr_rand() & key_slot->slave_num;
1772  for (i = 0; i < key_slot->slave_num; i++) {
1773  uint8_t node_id;
1774 
1775  node_id = key_slot->slave[(first + i) % key_slot->slave_num];
1776  node = &cluster->node[node_id];
1777  *conn = fr_pool_connection_get(node->pool, request);
1778  if (!*conn) {
1779  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] No connections available (key slot %zu slave %i)",
1780  node->id, key_slot - cluster->key_slot, (first + i) % key_slot->slave_num);
1781  cluster->remap_needed = true;
1782  continue; /* Continue until we find a live pool */
1783  }
1784 
1785  goto finish;
1786  }
1787  /* Fall through to using key slot master or alternate */
1788  }
1789 
1790  /*
1791  * 1. Try the master for the key slot
1792  * 2. If unavailable search for any pools with handles available
1793  * 3. If there are no pools, or we can't reserve a handle,
1794  * give up.
1795  */
1796  node = &cluster->node[key_slot->master];
1797  *conn = fr_pool_connection_get(node->pool, request);
1798  if (!*conn) {
1799  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] No connections available (key slot %zu master)",
1800  node->id, key_slot - cluster->key_slot);
1801  cluster->remap_needed = true;
1802 
1803  if (cluster_node_find_live(&node, conn, request, cluster, node) < 0) return REDIS_RCODE_RECONNECT;
1804  }
1805 
1806 finish:
1807  /*
1808  * Something set the remap_needed flag, and we have a live connection
1809  */
1810  if (cluster->remap_needed) {
1811  if (fr_redis_cluster_remap(request, cluster, *conn) == FR_REDIS_CLUSTER_RCODE_SUCCESS) {
1812  fr_pool_connection_release(node->pool, request, *conn);
1813  goto again; /* New map, try again */
1814  }
1815  ROPTIONAL(RPDEBUG2, PDEBUG2, "%s", "");
1816  }
1817 
1818  state->node = node;
1819  state->key = key;
1820  state->key_len = key_len;
1821 
1822  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] >>> Sending command(s) to %s:%i",
1823  state->node->id, state->node->name, state->node->addr.inet.dst_port);
1824 
1825  return REDIS_RCODE_TRY_AGAIN;
1826 }
1827 
1828 /** Get the next connection to attempt a command against
1829  *
1830  * Will process reconnect and redirect states performing the actions necessary.
1831  *
1832  * - May trigger a cluster remap on receiving a #REDIS_RCODE_MOVE status.
1833  * - May perform a temporary redirect on receiving a #REDIS_RCODE_ASK status.
1834  * - May reserve a new connection on receiving a #REDIS_RCODE_RECONNECT status.
1835  *
1836  * If a remap is in progress, has occurred within the last second, has recently failed,
1837  * or fails, the '-MOVE' will be treated as a temporary redirect (-ASK).
1838  *
1839  * This allows the server to be more responsive during remaps, as unless the worker has been
1840  * redirected to a node we don't currently have a pool for, it can grab a connection for the
1841  * node it was redirected to, and continue.
1842  *
1843  * @note Irrespective of return code, the connection passed via conn will be released,
1844  * A new connection to attempt command on will be provided via conn.
1845  *
1846  * @note reply will be automatically freed and set to NULL if a new connection is provided
1847  * in all other cases, the caller is responsible for freeing the reply.
1848  *
1849  * @param[in,out] state containing the current pool, and various counters which control
1850  * retries, and limit redirects.
1851  * @param[in,out] conn we received the '-ASK' or '-MOVE' redirect on. Will be replaced with a
1852  * connection in the new pool the key points to.
1853  * @param[in] request The current request.
1854  * @param[in] cluster of pools.
1855  * @param[in] status of the last command, must be #REDIS_RCODE_MOVE or #REDIS_RCODE_ASK.
1856  * @param[in] reply from last command. Freed if 0 is returned, else caller must free.
1857  * @return
1858  * - REDIS_RCODE_SUCCESS - on success.
1859  * - REDIS_RCODE_TRY_AGAIN - try new connection (provided via conn). Will free reply.
1860  * - REDIS_RCODE_ERROR - on failure or command error.
1861  * - REDIS_RCODE_RECONNECT - when no additional connections available.
1862  */
1864  fr_redis_cluster_t *cluster, request_t *request,
1865  fr_redis_rcode_t status, redisReply **reply)
1866 {
1867  fr_assert(state && state->node && state->node->pool);
1868  fr_assert(conn && *conn);
1869 
1870  if (*reply) fr_redis_reply_print(L_DBG_LVL_3, *reply, request, 0);
1871 
1872  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] <<< Returned: %s",
1873  state->node->id, fr_table_str_by_value(redis_rcodes, status, "<UNKNOWN>"));
1874 
1875  /*
1876  * Caller indicated we should close the connection
1877  */
1878  if (state->close_conn) {
1879  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Connection no longer viable, closing it", state->node->id);
1880  fr_pool_connection_close(state->node->pool, request, *conn);
1881  *conn = NULL;
1882  state->close_conn = false;
1883  }
1884 
1885  /*
1886  * If we have a proven live connection, and something
1887  * has set the remap_needed flag, do that now before
1888  * releasing the connection.
1889  */
1890  if (cluster->remap_needed && *conn) switch(status) {
1891  case REDIS_RCODE_MOVE: /* We're going to remap anyway */
1892  case REDIS_RCODE_RECONNECT: /* The connection's dead */
1893  break;
1894 
1895  default:
1896  /*
1897  * Remap the cluster. On success, will clear the
1898  * remap_needed flag.
1899  */
1900  if (fr_redis_cluster_remap(request, cluster, *conn) != FR_REDIS_CLUSTER_RCODE_SUCCESS)
1901  ROPTIONAL(RPDEBUG2, PDEBUG2, "%s", "");
1902  }
1903 
1904  /*
1905  * Check the result of the last redis command, and do
1906  * something appropriate.
1907  */
1908  switch (status) {
1909  case REDIS_RCODE_SUCCESS:
1910  fr_pool_connection_release(state->node->pool, request, *conn);
1911  *conn = NULL;
1912  return REDIS_RCODE_SUCCESS;
1913 
1914  /*
1915  * Command error, not fixable.
1916  */
1917  case REDIS_RCODE_NO_SCRIPT:
1918  case REDIS_RCODE_ERROR:
1919  ROPTIONAL(RPEDEBUG, PERROR, "[%i] Command failed", state->node->id);
1920  fr_pool_connection_release(state->node->pool, request, *conn);
1921  *conn = NULL;
1922  return REDIS_RCODE_ERROR;
1923 
1924  /*
1925  * Cluster's unstable, try again.
1926  */
1927  case REDIS_RCODE_TRY_AGAIN:
1928  if (state->retries++ >= cluster->conf->max_retries) {
1929  ROPTIONAL(REDEBUG, ERROR, "[%i] Hit maximum retry attempts", state->node->id);
1930  fr_pool_connection_release(state->node->pool, request, *conn);
1931  *conn = NULL;
1932  return REDIS_RCODE_ERROR;
1933  }
1934 
1935  if (!*conn) *conn = fr_pool_connection_get(state->node->pool, request);
1936 
1937  if (fr_time_delta_ispos(cluster->conf->retry_delay)) nanosleep(&fr_time_delta_to_timespec(cluster->conf->retry_delay), NULL);
1938  goto try_again;
1939 
1940  /*
1941  * Connection's dead, check to see if we can switch nodes,
1942  * or, failing that, reconnect the connection.
1943  */
1944  case REDIS_RCODE_RECONNECT:
1945  {
1946  fr_redis_cluster_key_slot_t const *key_slot;
1947 
1948  ROPTIONAL(RPERROR, PERROR, "[%i] Failed communicating with %s:%i",
1949  state->node->id, state->node->name,
1950  state->node->addr.inet.dst_port);
1951 
1952  fr_pool_connection_close(state->node->pool, request, *conn); /* He's dead jim */
1953 
1954  if (state->reconnects++ > state->in_pool) {
1955  ROPTIONAL(REDEBUG, ERROR, "[%i] Hit maximum reconnect attempts", state->node->id);
1956  cluster->remap_needed = true;
1957  return REDIS_RCODE_RECONNECT;
1958  }
1959 
1960  /*
1961  * Refresh the key slot
1962  */
1963  key_slot = fr_redis_cluster_slot_by_key(cluster, request, state->key, state->key_len);
1964  state->node = &cluster->node[key_slot->master];
1965 
1966  *conn = fr_pool_connection_get(state->node->pool, request);
1967  if (!*conn) {
1968  ROPTIONAL(REDEBUG, ERROR, "[%i] No connections available for %s:%i",
1969  state->node->id, state->node->name, state->node->addr.inet.dst_port);
1970  cluster->remap_needed = true;
1971 
1972  if (cluster_node_find_live(&state->node, conn, request,
1973  cluster, state->node) < 0) return REDIS_RCODE_RECONNECT;
1974 
1975  goto try_again;
1976  }
1977 
1978  state->retries = 0;
1979  }
1980  goto try_again;
1981 
1982  /*
1983  * -MOVE is treated identically to -ASK, except it may
1984  * trigger a cluster remap.
1985  */
1986  case REDIS_RCODE_MOVE:
1987  fr_assert(*reply);
1988 
1989  if (*conn && (fr_redis_cluster_remap(request, cluster, *conn) != FR_REDIS_CLUSTER_RCODE_SUCCESS)) {
1990  ROPTIONAL(RPDEBUG2, PDEBUG2, "%s", "");
1991  }
1992  FALL_THROUGH;
1993 
1994  /*
1995  * -ASK process a redirect.
1996  */
1997  case REDIS_RCODE_ASK:
1998  {
2000 
2001  fr_pool_connection_release(state->node->pool, request, *conn); /* Always release the old connection */
2002 
2003  if (!fr_cond_assert(*reply)) return REDIS_RCODE_ERROR;
2004 
2005  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Processing redirect \"%s\"", state->node->id, (*reply)->str);
2006  if (state->redirects++ >= cluster->conf->max_redirects) {
2007  ROPTIONAL(REDEBUG, ERROR, "[%i] Reached max_redirects (%i)", state->node->id, state->redirects);
2008  return REDIS_RCODE_ERROR;
2009  }
2010 
2011  switch (cluster_redirect(&new, cluster, *reply)) {
2013  if (new == state->node) {
2014  ROPTIONAL(REDEBUG, ERROR, "[%i] %s:%i issued redirect to itself", state->node->id,
2015  state->node->name, state->node->addr.inet.dst_port);
2016  return REDIS_RCODE_ERROR;
2017  }
2018 
2019  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Redirected from %s:%i to [%i] %s:%i",
2020  state->node->id, state->node->name,
2021  state->node->addr.inet.dst_port, new->id, new->name, new->addr.inet.dst_port);
2022  state->node = new;
2023 
2024  *conn = fr_pool_connection_get(state->node->pool, request);
2025  if (!*conn) return REDIS_RCODE_RECONNECT;
2026 
2027  /*
2028  * Reset these counters, their scope is
2029  * a single node in the cluster.
2030  */
2031  state->reconnects = 0;
2032  state->retries = 0;
2033  state->in_pool = fr_pool_state(state->node->pool)->num;
2034  goto try_again;
2035 
2037  cluster->remap_needed = true;
2038  return REDIS_RCODE_RECONNECT;
2039 
2040  default:
2041  return REDIS_RCODE_ERROR;
2042  }
2043  }
2044  }
2045 
2046 try_again:
2047  ROPTIONAL(RDEBUG2, DEBUG2, "[%i] >>> Sending command(s) to %s:%i",
2048  state->node->id, state->node->name, state->node->addr.inet.dst_port);
2049 
2050  fr_redis_reply_free(&*reply);
2051  *reply = NULL;
2052 
2053  return REDIS_RCODE_TRY_AGAIN;
2054 }
2055 
2056 /** Get the pool associated with a node in the cluster
2057  *
2058  * @note This is used for testing only. It's not ifdef'd out because
2059  * tests need to run against production builds too.
2060  *
2061  * @param[out] pool associated with the node.
2062  * @param[in] cluster to search for node in.
2063  * @param[in] node_addr to retrieve pool for. Specifies IP and port of node.
2064  * @param[in] create Establish a connection to the specified node if it
2065  * was previously unknown to the cluster client.
2066  * @return
2067  * - 0 on success.
2068  * - -1 if no such node exists.
2069  */
2071  fr_socket_t *node_addr, bool create)
2072 {
2073  fr_redis_cluster_node_t find, *found;
2074 
2075  find.addr = (fr_socket_t) {
2076  .inet = {
2077  .dst_ipaddr = node_addr->inet.dst_ipaddr,
2078  .dst_port = node_addr->inet.dst_port,
2079  }
2080  };
2081 
2082  pthread_mutex_lock(&cluster->mutex);
2083  found = fr_rb_find(cluster->used_nodes, &find);
2084  if (!found) {
2085  fr_redis_cluster_node_t *spare;
2086  char buffer[INET6_ADDRSTRLEN];
2087  char const *hostname;
2088 
2089  if (!create) {
2090  pthread_mutex_unlock(&cluster->mutex);
2091 
2092  hostname = inet_ntop(node_addr->inet.dst_ipaddr.af, &node_addr->inet.dst_ipaddr.addr, buffer, sizeof(buffer));
2093  fr_assert(hostname); /* addr.ipaddr is probably corrupt */
2094  fr_strerror_printf("No existing node found with address %s, port %i",
2095  hostname, node_addr->inet.dst_port);
2096  return -1;
2097  }
2098 
2099  spare = fr_fifo_peek(cluster->free_nodes);
2100  if (!spare) {
2101  fr_strerror_const("Reached maximum connected nodes");
2102  pthread_mutex_unlock(&cluster->mutex);
2103  return -1;
2104  }
2105  spare->pending_addr = find.addr; /* Set the config to be applied */
2106  if (cluster_node_connect(cluster, spare) < 0) {
2107  pthread_mutex_unlock(&cluster->mutex);
2108  return -1;
2109  }
2110  fr_rb_insert(cluster->used_nodes, spare);
2111  fr_fifo_pop(cluster->free_nodes);
2112  found = spare;
2113  }
2114  /*
2115  * Sanity checks
2116  */
2117  fr_assert(((talloc_array_length(cluster->node) - 1) - fr_rb_num_elements(cluster->used_nodes)) ==
2118  fr_fifo_num_elements(cluster->free_nodes));
2119  pthread_mutex_unlock(&cluster->mutex);
2120 
2121  *pool = found->pool;
2122 
2123  return 0;
2124 }
2125 
2126 /** Return an array of IP addresses belonging to masters or slaves
2127  *
2128  * @note We return IP addresses as they're safe to use across cluster remaps.
2129  * @note Result array must be freed (talloc_free()) after use.
2130  *
2131  * @param[in] ctx to allocate array of IP addresses in.
2132  * @param[out] out Where to write the addresses of the nodes.
2133  * @param[in] cluster to search for nodes in.
2134  * @param[in] is_master If true, include the addresses of all the master nodes.
2135  * @param[in] is_slave If true, include the addresses of all the slaves nodes.
2136  * @return the number of ip addresses written to out.
2137  */
2139  fr_redis_cluster_t *cluster, bool is_master, bool is_slave)
2140 {
2141  uint64_t in_use = fr_rb_num_elements(cluster->used_nodes);
2144  uint8_t count;
2145  fr_socket_t *found;
2146 
2147  if (in_use == 0) {
2148  *out = NULL;
2149  return 0;
2150  }
2151 
2152  count = 0;
2153  found = talloc_zero_array(ctx, fr_socket_t, in_use);
2154  if (!found) {
2155  fr_strerror_const("Out of memory");
2156  return -1;
2157  }
2158 
2159  pthread_mutex_lock(&cluster->mutex);
2160 
2161  for (node = fr_rb_iter_init_inorder(&iter, cluster->used_nodes);
2162  node;
2163  node = fr_rb_iter_next_inorder(&iter)) {
2164  if ((is_master && node->is_master) || (is_slave && !node->is_master)) found[count++] = node->addr;
2165  }
2166 
2167  pthread_mutex_unlock(&cluster->mutex);
2168 
2169  if (count == 0) {
2170  *out = NULL;
2171  talloc_free(found);
2172  return 0;
2173  }
2174 
2175  *out = found;
2176 
2177  return count;
2178 }
2179 
2180 /** Destroy mutex associated with cluster slots structure
2181  *
2182  * @param cluster being freed.
2183  * @return 0
2184  */
2186 {
2187  pthread_mutex_destroy(&cluster->mutex);
2188 
2189  return 0;
2190 }
2191 
2192 /** Check if members of the cluster are above a certain version
2193  *
2194  * @param cluster to perform check on.
2195  * @param min_version that must be found on each node for the check to succeed.
2196  * Must be in the format @verbatim <major>.<minor>.<release> @endverbatim.
2197  * @return
2198  * - true if all contactable members are above min_version.
2199  * - false if at least one member if not above minimum version
2200  * (use #fr_strerror to retrieve node information).
2201  */
2202 bool fr_redis_cluster_min_version(fr_redis_cluster_t *cluster, char const *min_version)
2203 {
2206  fr_redis_conn_t *conn;
2207  int ret;
2208  char buffer[40];
2209  bool all_above = true;
2210 
2211  pthread_mutex_lock(&cluster->mutex);
2212 
2213  for (node = fr_rb_iter_init_inorder(&iter, cluster->used_nodes);
2214  node;
2215  node = fr_rb_iter_next_inorder(&iter)) {
2216  conn = fr_pool_connection_get(node->pool, NULL);
2217  if (!conn) continue;
2218 
2219  /*
2220  * We don't care if we can't get the version
2221  * as we don't want to prevent the server from
2222  * starting if start == 0.
2223  */
2224  ret = fr_redis_get_version(buffer, sizeof(buffer), conn);
2225  fr_pool_connection_release(node->pool, NULL, conn);
2226  if (ret < 0) continue;
2227 
2228  if (fr_redis_version_num(buffer) < fr_redis_version_num(min_version)) {
2229  fr_strerror_printf("Redis node %s:%i (currently v%s) needs update to >= v%s",
2230  node->name, node->addr.inet.dst_port, buffer, min_version);
2231  all_above = false;
2232  break;
2233  }
2234  }
2235 
2236  pthread_mutex_unlock(&cluster->mutex);
2237 
2238  return all_above;
2239 }
2240 
2241 /** Allocate and initialise a new cluster structure
2242  *
2243  * This holds all the data necessary to manage a pool of pools for a specific redis cluster.
2244  *
2245  * @note Will not error out unless cs.pool.start > 0. This is consistent with other pool based
2246  * modules/code.
2247  *
2248  * @param ctx to link the lifetime of the cluster structure to.
2249  * @param module Configuration section to search for 'server' conf pairs in.
2250  * @param conf Base redis server configuration. Cluster nodes share database
2251  * number and password.
2252  * @param triggers_enabled Whether triggers should be enabled.
2253  * @param log_prefix Custom log prefix. Defaults to @verbatim rlm_<module> (<instance>) @endverbatim.
2254  * @param trigger_prefix Custom trigger prefix. Defaults to @verbatim modules.<module>.pool @endverbatim.
2255  * @param trigger_args Argument pairs to pass to the trigger in addition to Connection-Pool-Server,
2256  * and Connection-Pool-Port (which are always set by the cluster code).
2257  * @return
2258  * - New #fr_redis_cluster_t on success.
2259  * - NULL on error.
2260  */
2262  CONF_SECTION *module,
2264  bool triggers_enabled,
2265  char const *log_prefix,
2266  char const *trigger_prefix,
2267  fr_pair_list_t *trigger_args)
2268 {
2269  uint8_t i;
2270  uint16_t s;
2271 
2272  char const *cs_name1, *cs_name2;
2273 
2274  CONF_PAIR *cp;
2275  int af = AF_UNSPEC; /* AF of first server */
2276 
2277  uint64_t num_nodes;
2278  fr_redis_cluster_t *cluster;
2279 
2280  fr_assert(triggers_enabled || !trigger_prefix);
2281  fr_assert(triggers_enabled || (!trigger_args || fr_pair_list_empty(trigger_args)));
2282 
2283  MEM(cluster = talloc_zero(NULL, fr_redis_cluster_t));
2284  fr_pair_list_init(&cluster->trigger_args);
2285 
2286  cs_name1 = cf_section_name1(module);
2287  cs_name2 = cf_section_name2(module);
2288 
2289  cluster->triggers_enabled = triggers_enabled;
2290  if (cluster->triggers_enabled) {
2291  /*
2292  * Setup trigger prefix
2293  */
2294  if (!trigger_prefix) {
2295  cluster->trigger_prefix = talloc_typed_asprintf(cluster, "modules.%s.pool", cs_name1);
2296  } else {
2297  cluster->trigger_prefix = talloc_strdup(cluster, trigger_prefix);
2298  }
2299 
2300  /*
2301  * Duplicate the trigger arguments.
2302  */
2303  if (trigger_args) MEM(fr_pair_list_copy(cluster, &cluster->trigger_args, trigger_args) >= 0);
2304  }
2305 
2306  /*
2307  * Setup log prefix
2308  */
2309  if (!log_prefix) {
2310  if (!cs_name2) cs_name2 = cs_name1;
2311  cluster->log_prefix = talloc_typed_asprintf(conf, "rlm_%s (%s)", cs_name1, cs_name2);
2312  } else {
2313  cluster->log_prefix = talloc_strdup(cluster, log_prefix);
2314  }
2315 
2316  /*
2317  * Ensure we always have a pool section (even if it's empty)
2318  */
2319  if (!cf_section_find(module, "pool", NULL)) {
2320  (void) cf_section_alloc(module, module, "pool", NULL);
2321  }
2322 
2323  /*
2324  * Parse TLS configuration
2325  */
2326  if (conf->use_tls) {
2327 #ifdef HAVE_REDIS_SSL
2328  CONF_SECTION *tls_cs;
2329  fr_tls_conf_t *tls_conf;
2330 
2331  tls_cs = cf_section_find(module, "tls", NULL);
2332  if (!tls_cs) {
2333  tls_cs = cf_section_alloc(module, module, "tls", NULL);
2334  }
2335 
2336  tls_conf = fr_tls_conf_parse_client(tls_cs);
2337  if (!tls_conf) {
2338  ERROR("%s - Failed to parse TLS configuration", cluster->log_prefix);
2339  talloc_free(cluster);
2340  return NULL;
2341  }
2342 
2343  cluster->ssl_ctx = fr_tls_ctx_alloc(tls_conf, true);
2344  if (!cluster->ssl_ctx) {
2345  ERROR("%s - Failed to allocate SSL context", cluster->log_prefix);
2346  talloc_free(cluster);
2347  return NULL;
2348  }
2349 #else
2350  WARN("%s - No redis SSL support, ignoring \"use_tls = yes\"", cluster->log_prefix);
2351 #endif
2352  }
2353 
2354  if (conf->max_nodes == UINT8_MAX) {
2355  ERROR("%s - Maximum number of connected nodes allowed is %i", cluster->log_prefix, UINT8_MAX - 1);
2356  talloc_free(cluster);
2357  return NULL;
2358  }
2359 
2360  if (conf->max_nodes == 0) {
2361  ERROR("%s - Minimum number of nodes allowed is 1", cluster->log_prefix);
2362  talloc_free(cluster);
2363  return NULL;
2364  }
2365 
2366  cp = cf_pair_find(module, "server");
2367  if (!cp) {
2368  ERROR("%s - No servers configured", cluster->log_prefix);
2369  talloc_free(cluster);
2370  return NULL;
2371  }
2372 
2373  cluster->module = module;
2374 
2375  /*
2376  * Ensure the pool is freed at the same time as its
2377  * parent.
2378  *
2379  * We need to break the link between the cluster and
2380  * its parent context, as the two contexts may be
2381  * modified by multiple threads.
2382  */
2383  MEM(talloc_link_ctx(ctx, cluster) >= 0);
2384  MEM(cluster->node = talloc_zero_array(cluster, fr_redis_cluster_node_t, conf->max_nodes + 1));
2385  MEM(cluster->used_nodes = fr_rb_inline_alloc(cluster, fr_redis_cluster_node_t, rbnode, _cluster_node_cmp, NULL));
2386  MEM(cluster->free_nodes = fr_fifo_create(cluster, conf->max_nodes, NULL));
2387 
2388  cluster->conf = conf;
2389 
2390  pthread_mutex_init(&cluster->mutex, NULL);
2391  talloc_set_destructor(cluster, _fr_redis_cluster_free);
2392 
2393  /*
2394  * Node id 0 is reserved, so we can detect misconfigured
2395  * clusters.
2396  */
2397  for (i = 1; i < (cluster->conf->max_nodes + 1); i++) {
2398  cluster->node[i].id = i;
2399  cluster->node[i].cluster = cluster;
2400 
2401  /* Push them all into the queue */
2402  fr_fifo_push(cluster->free_nodes, &cluster->node[i]);
2403  }
2404 
2405  /*
2406  * Don't connect to cluster nodes if we're just
2407  * checking the config.
2408  */
2409  if (check_config) return cluster;
2410 
2411  /*
2412  * Populate the cluster with the bootstrap servers.
2413  *
2414  * If we fail getting a key_slot map here, then the
2415  * bootstrap servers are distributed evenly through
2416  * the key slots.
2417  *
2418  * This allows the server to start, and potentially,
2419  * a valid map to be applied, once the server starts
2420  * processing requests.
2421  */
2422  do {
2423  char const *server;
2425  fr_redis_conn_t *conn;
2426  redisReply *map;
2427  size_t j, k;
2428 
2429  node = fr_fifo_peek(cluster->free_nodes);
2430  if (!node) {
2431  ERROR("%s - Number of bootstrap servers exceeds 'max_nodes'", cluster->log_prefix);
2432 
2433  error:
2434  talloc_free(cluster);
2435  return NULL;
2436  }
2437 
2438  server = cf_pair_value(cp);
2439  if (fr_inet_pton_port(&node->pending_addr.inet.dst_ipaddr, &node->pending_addr.inet.dst_port, server,
2440  talloc_array_length(server) - 1, af, true, true) < 0) {
2441  PERROR("%s - Failed parsing server \"%s\"", cluster->log_prefix, server);
2442  goto error;
2443  }
2444  if (!node->pending_addr.inet.dst_port) node->pending_addr.inet.dst_port = conf->port;
2445 
2446  if (cluster_node_connect(cluster, node) < 0) {
2447  WARN("%s - Connecting to %s:%i failed", cluster->log_prefix, node->name, node->pending_addr.inet.dst_port);
2448  continue;
2449  }
2450 
2451  if (!fr_rb_insert(cluster->used_nodes, node)) {
2452  WARN("%s - Skipping duplicate bootstrap server \"%s\"", cluster->log_prefix, server);
2453  continue;
2454  }
2455  node->is_active = true;
2456  fr_fifo_pop(cluster->free_nodes);
2457 
2458  /*
2459  * Prefer the same IPaddr family as the first node
2460  */
2461  if (af == AF_UNSPEC) af = node->addr.inet.dst_ipaddr.af;
2462 
2463  /*
2464  * Only get cluster map config if required
2465  */
2466  if (fr_pool_start_num(node->pool) > 0) {
2467  /*
2468  * Fine to leave this node configured, if we do find
2469  * a live node, and it's not in the map, it'll be cleared out.
2470  */
2471  conn = fr_pool_connection_get(node->pool, NULL);
2472  if (!conn) {
2473  WARN("%s - Can't contact bootstrap server \"%s\"", cluster->log_prefix, server);
2474  continue;
2475  }
2476  } else {
2477  break;
2478  }
2479 
2480  if (!cluster->conf->use_cluster_map) {
2481  fr_pool_connection_release(node->pool, NULL, conn);
2482  continue;
2483  }
2484 
2485  switch (cluster_map_get(&map, conn)) {
2486  /*
2487  * We got a valid map! See if we can apply it...
2488  */
2490  fr_pool_connection_release(node->pool, NULL, conn);
2491 
2492  DEBUG("%s - Cluster map consists of %zu key ranges", cluster->log_prefix, map->elements);
2493  for (j = 0; j < map->elements; j++) {
2494  redisReply *map_node = map->element[j];
2495 
2496  DEBUG("%s - %zu - keys %lli-%lli", cluster->log_prefix, j,
2497  map_node->element[0]->integer,
2498  map_node->element[1]->integer);
2499  DEBUG("%s - master: %s:%lli", cluster->log_prefix,
2500  map_node->element[2]->element[0]->str,
2501  map_node->element[2]->element[1]->integer);
2502  for (k = 3; k < map_node->elements; k++) {
2503  DEBUG("%s - slave%zu: %s:%lli", cluster->log_prefix, k - 3,
2504  map_node->element[k]->element[0]->str,
2505  map_node->element[k]->element[1]->integer);
2506  }
2507  }
2508 
2509  if (cluster_map_apply(cluster, map) < 0) {
2510  PWARN("%s: Applying cluster map failed", cluster->log_prefix);
2511  fr_redis_reply_free(&map);
2512  continue;
2513  }
2514  fr_redis_reply_free(&map);
2515 
2516  return cluster;
2517 
2518  /*
2519  * Unusable bootstrap node
2520  */
2522  PWARN("%s - Bootstrap server \"%s\" returned invalid data", cluster->log_prefix, server);
2523  fr_pool_connection_release(node->pool, NULL, conn);
2524  continue;
2525 
2527  PWARN("%s - Can't contact bootstrap server \"%s\"", cluster->log_prefix, server);
2528  fr_pool_connection_close(node->pool, NULL, conn);
2529  continue;
2530 
2531  /*
2532  * Clustering not enabled, or not supported,
2533  * by this node, skip it and check the others.
2534  */
2537  PDEBUG2("%s - Bootstrap server \"%s\" returned", cluster->log_prefix, server);
2538  fr_pool_connection_release(node->pool, NULL, conn);
2539  break;
2540  }
2541  } while ((cp = cf_pair_find_next(module, cp, "server")));
2542 
2543  /*
2544  * Catch pool.start != 0
2545  */
2546  num_nodes = fr_rb_num_elements(cluster->used_nodes);
2547  if (!num_nodes) {
2548  ERROR("%s - Can't contact any bootstrap servers", cluster->log_prefix);
2549  goto error;
2550  }
2551 
2552  /*
2553  * We've failed to apply a valid cluster map.
2554  * Distribute the node(s) throughout the key_slots,
2555  * hopefully we'll get one when we start processing
2556  * requests.
2557  */
2558  for (s = 0; s < KEY_SLOTS; s++) cluster->key_slot[s].master = (s % (uint16_t) num_nodes) + 1;
2559 
2560  return cluster;
2561 }
static int const char char buffer[256]
Definition: acutest.h:574
va_list args
Definition: acutest.h:770
#define L(_str)
Helper for initialising arrays of string literals.
Definition: build.h:207
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition: build.h:320
#define STRINGIFY(x)
Definition: build.h:195
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition: build.h:110
#define NUM_ELEMENTS(_t)
Definition: build.h:335
bool check_config
Definition: cf_file.c:67
Configuration AVP similar to a fr_pair_t.
Definition: cf_priv.h:70
A section grouping multiple CONF_PAIR.
Definition: cf_priv.h:101
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
Definition: cf_util.c:1028
CONF_PAIR * cf_pair_find(CONF_SECTION const *cs, char const *attr)
Search for a CONF_PAIR with a specific name.
Definition: cf_util.c:1439
char const * cf_section_name2(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
Definition: cf_util.c:1185
char const * cf_pair_value(CONF_PAIR const *pair)
Return the value of a CONF_PAIR.
Definition: cf_util.c:1594
CONF_SECTION * cf_section_dup(TALLOC_CTX *ctx, CONF_SECTION *parent, CONF_SECTION const *cs, char const *name1, char const *name2, bool copy_meta)
Duplicate a configuration section.
Definition: cf_util.c:928
CONF_PAIR * cf_pair_find_next(CONF_SECTION const *cs, CONF_PAIR const *prev, char const *attr)
Find a pair with a name matching attr, after specified pair.
Definition: cf_util.c:1453
char const * cf_section_name1(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
Definition: cf_util.c:1171
#define cf_section_alloc(_ctx, _parent, _name1, _name2)
Definition: cf_util.h:140
bool remapping
True when cluster is being remapped.
Definition: cluster.c:257
static fr_redis_cluster_rcode_t cluster_map_get(redisReply **out, fr_redis_conn_t *conn)
Learn a new cluster layout by querying the node that issued the -MOVE.
Definition: cluster.c:864
fr_redis_rcode_t fr_redis_cluster_state_next(fr_redis_cluster_state_t *state, fr_redis_conn_t **conn, fr_redis_cluster_t *cluster, request_t *request, fr_redis_rcode_t status, redisReply **reply)
Get the next connection to attempt a command against.
Definition: cluster.c:1863
char const * log_prefix
What to prepend to log messages.
Definition: cluster.c:252
fr_pool_t * pool
Pool associated with this node.
Definition: cluster.c:226
char name[INET6_ADDRSTRLEN]
Buffer to hold IP string.
Definition: cluster.c:216
#define SET_ACTIVE(_node)
bool is_active
Whether this node is in the active node set.
Definition: cluster.c:229
#define CLOSED_WEIGHT
What weight to give to nodes that.
Definition: cluster.c:182
#define MAX_SLAVES
Maximum number of slaves associated.
Definition: cluster.c:173
static fr_redis_cluster_rcode_t cluster_node_connect(fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *node)
Establish a connection to a cluster node.
Definition: cluster.c:372
static int cluster_map_node_validate(redisReply *node, int map_idx, int node_idx)
Validate a cluster map node entry.
Definition: cluster.c:786
struct cluster_nodes_live_t::@52 node[UINT8_MAX - 1]
Array of live node IDs (and weights).
#define SET_ADDR(_addr, _map)
static uint16_t cluster_key_hash(uint8_t const *key, size_t key_len)
Resolve key to key slot.
Definition: cluster.c:299
static fr_redis_cluster_rcode_t cluster_redirect(fr_redis_cluster_node_t **out, fr_redis_cluster_t *cluster, redisReply *reply)
Retrieve or associate a node with the server indicated in the redirect.
Definition: cluster.c:1119
static fr_redis_cluster_rcode_t cluster_node_conf_from_redirect(uint16_t *key_slot, fr_socket_t *node_addr, redisReply *redirect)
Parse a -MOVED or -ASK redirect.
Definition: cluster.c:447
static int cluster_node_find_live(fr_redis_cluster_node_t **live_node, fr_redis_conn_t **live_conn, request_t *request, fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *skip)
Attempt to find a live pool in the cluster.
Definition: cluster.c:1311
fr_redis_cluster_key_slot_t const * fr_redis_cluster_slot_by_key(fr_redis_cluster_t *cluster, request_t *request, uint8_t const *key, size_t key_len)
Implements the key slot selection scheme used by freeradius.
Definition: cluster.c:1603
#define CLOSED_PERIOD
How recently must the closed have.
Definition: cluster.c:179
fr_redis_cluster_key_slot_t key_slot_pending[KEY_SLOTS]
Pending key slot table.
Definition: cluster.c:276
fr_rb_tree_t * used_nodes
Tree of used nodes.
Definition: cluster.c:273
fr_redis_cluster_key_slot_t key_slot[KEY_SLOTS]
Lookup table of slots to pools.
Definition: cluster.c:275
#define KEY_SLOTS
Maximum number of keyslots (should not change).
Definition: cluster.c:171
fr_redis_cluster_t * cluster
Common configuration (database number, password, etc..).
Definition: cluster.c:224
char const * trigger_prefix
Trigger path.
Definition: cluster.c:253
fr_redis_conf_t * conf
Base configuration data such as the database number and passwords.
Definition: cluster.c:263
fr_rb_node_t rbnode
Entry into the tree of redis nodes.
Definition: cluster.c:214
bool remap_needed
Set true if at least one cluster node is definitely unreachable.
Definition: cluster.c:258
fr_time_t last_updated
Last time the cluster mappings were updated.
Definition: cluster.c:260
#define SET_INACTIVE(_node)
int fr_redis_cluster_pool_by_node_addr(fr_pool_t **pool, fr_redis_cluster_t *cluster, fr_socket_t *node_addr, bool create)
Get the pool associated with a node in the cluster.
Definition: cluster.c:2070
fr_pair_list_t trigger_args
Arguments to pass to triggers.
Definition: cluster.c:254
ssize_t fr_redis_cluster_node_addr_by_role(TALLOC_CTX *ctx, fr_socket_t *out[], fr_redis_cluster_t *cluster, bool is_master, bool is_slave)
Return an array of IP addresses belonging to masters or slaves.
Definition: cluster.c:2138
fr_socket_t pending_addr
New node address to be applied when the pool is reconnected.
Definition: cluster.c:221
uint8_t next
Next index in live.
Definition: cluster.c:205
size_t fr_redis_cluster_rcodes_table_len
Definition: cluster.c:288
static void _cluster_node_conf_apply(fr_pool_t *pool, void *opaque)
Reconnect callback to apply new pool config.
Definition: cluster.c:340
uint8_t slave[MAX_SLAVES]
R/O node (slave) for this key slot.
Definition: cluster.c:242
int fr_redis_cluster_port(uint16_t *out, fr_redis_cluster_node_t const *node)
Return the port of a particular node.
Definition: cluster.c:1689
static int _fr_redis_cluster_free(fr_redis_cluster_t *cluster)
Destroy mutex associated with cluster slots structure.
Definition: cluster.c:2185
#define FAILED_WEIGHT
What weight to give to nodes that.
Definition: cluster.c:188
uint8_t master
R/W node (master) for this key slot.
Definition: cluster.c:244
CONF_SECTION * module
Module configuration.
Definition: cluster.c:261
void * fr_redis_cluster_conn_create(TALLOC_CTX *ctx, void *instance, fr_time_delta_t timeout)
Create a new connection to a Redis node.
Definition: cluster.c:1468
static int cluster_node_pool_health(fr_time_t now, fr_pool_state_t const *state)
Try to determine the health of a cluster node passively by examining its pool state.
Definition: cluster.c:1209
static fr_redis_cluster_rcode_t cluster_node_ping(request_t *request, fr_redis_cluster_node_t *node, fr_redis_conn_t *conn)
Issue a ping request against a cluster node.
Definition: cluster.c:1244
static int8_t _cluster_node_cmp(void const *one, void const *two)
Compare two redis nodes to check equality.
Definition: cluster.c:323
fr_redis_cluster_rcode_t fr_redis_cluster_remap(request_t *request, fr_redis_cluster_t *cluster, fr_redis_conn_t *conn)
Perform a runtime remap of the cluster.
Definition: cluster.c:1009
#define RELEASED_MIN_WEIGHT
Minimum weight to assign to node.
Definition: cluster.c:195
static fr_redis_cluster_rcode_t cluster_map_apply(fr_redis_cluster_t *cluster, redisReply *reply)
Apply a cluster map received from a cluster node.
Definition: cluster.c:538
uint8_t id
Node ID (index in node array).
Definition: cluster.c:218
fr_redis_cluster_node_t const * fr_redis_cluster_slave(fr_redis_cluster_t *cluster, fr_redis_cluster_key_slot_t const *key_slot, uint8_t slave_num)
Return the slave node that would be used for a particular key.
Definition: cluster.c:1655
bool triggers_enabled
Whether triggers are enabled.
Definition: cluster.c:255
fr_redis_rcode_t fr_redis_cluster_state_init(fr_redis_cluster_state_t *state, fr_redis_conn_t **conn, fr_redis_cluster_t *cluster, request_t *request, uint8_t const *key, size_t key_len, bool read_only)
Resolve a key to a pool, and reserve a connection in that pool.
Definition: cluster.c:1741
fr_redis_cluster_node_t const * fr_redis_cluster_master(fr_redis_cluster_t *cluster, fr_redis_cluster_key_slot_t const *key_slot)
Return the master node that would be used for a particular key.
Definition: cluster.c:1639
fr_redis_cluster_t * fr_redis_cluster_alloc(TALLOC_CTX *ctx, CONF_SECTION *module, fr_redis_conf_t *conf, bool triggers_enabled, char const *log_prefix, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Allocate and initialise a new cluster structure.
Definition: cluster.c:2261
static int _cluster_conn_free(fr_redis_conn_t *conn)
Callback for freeing a Redis connection.
Definition: cluster.c:1452
fr_socket_t addr
Current node address.
Definition: cluster.c:220
fr_table_num_sorted_t const fr_redis_cluster_rcodes_table[]
Definition: cluster.c:281
bool is_master
Whether this node is a master.
Definition: cluster.c:230
#define FAILED_PERIOD
How recently must the spawn failure.
Definition: cluster.c:185
fr_fifo_t * free_nodes
Queue of free nodes (or nodes waiting to be reused).
Definition: cluster.c:272
#define RELEASED_PERIOD
Period after which we don't care.
Definition: cluster.c:191
int fr_redis_cluster_ipaddr(fr_ipaddr_t *out, fr_redis_cluster_node_t const *node)
Return the ipaddr of a particular node.
Definition: cluster.c:1672
pthread_mutex_t mutex
Mutex to synchronise cluster operations.
Definition: cluster.c:278
uint8_t slave_num
Number of slaves associated with this key slot.
Definition: cluster.c:243
fr_redis_cluster_node_t * node
Structure containing a node id, its address and a pool of its connections.
Definition: cluster.c:269
bool fr_redis_cluster_min_version(fr_redis_cluster_t *cluster, char const *min_version)
Check if members of the cluster are above a certain version.
Definition: cluster.c:2202
CONF_SECTION * pool_cs
Pool configuration section associated with node.
Definition: cluster.c:227
Live nodes data, used to perform weighted random selection of alternative nodes.
Definition: cluster.c:199
A redis cluster.
Definition: cluster.c:251
Indexes in the fr_redis_cluster_node_t array for a single key slot.
Definition: cluster.c:241
A Redis cluster node.
Definition: cluster.c:213
Common functions for interacting with Redis cluster via Hiredis.
size_t key_len
Length of the key.
Definition: cluster.h:55
bool close_conn
Set by caller of fr_redis_cluster_state_next, to indicate that connection must be closed,...
Definition: cluster.h:50
uint32_t retries
How many times we've received TRYAGAIN.
Definition: cluster.h:60
uint32_t reconnects
How many connections we've tried in this pool.
Definition: cluster.h:62
uint8_t const * key
Key we performed hashing on.
Definition: cluster.h:54
fr_redis_cluster_rcode_t
Return values for internal functions.
Definition: cluster.h:67
@ FR_REDIS_CLUSTER_RCODE_IGNORED
Operation ignored.
Definition: cluster.h:68
@ FR_REDIS_CLUSTER_RCODE_FAILED
Operation failed.
Definition: cluster.h:70
@ FR_REDIS_CLUSTER_RCODE_SUCCESS
Operation completed successfully.
Definition: cluster.h:69
@ FR_REDIS_CLUSTER_RCODE_BAD_INPUT
Validation error.
Definition: cluster.h:73
@ FR_REDIS_CLUSTER_RCODE_NO_CONNECTION
Operation failed because we couldn't find a live connection.
Definition: cluster.h:71
uint32_t redirects
How many redirects have we followed.
Definition: cluster.h:58
fr_redis_cluster_node_t * node
Node we're communicating with.
Definition: cluster.h:57
uint32_t in_pool
How many available connections are there in the pool.
Definition: cluster.h:61
Redis connection sequence state.
Definition: cluster.h:49
uint16_t fr_crc16_xmodem(uint8_t const *in, size_t in_len)
CRC16 implementation according to CCITT standards.
Definition: crc16.c:91
next
Definition: dcursor.h:178
fr_dcursor_iter_t iter
Definition: dcursor.h:147
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:139
#define ERROR(fmt,...)
Definition: dhcpclient.c:41
#define DEBUG(fmt,...)
Definition: dhcpclient.c:39
static fr_time_delta_t timeout
Definition: dhcpclient.c:54
void * fr_fifo_peek(fr_fifo_t *fi)
Examine the next element that would be popped.
Definition: fifo.c:158
unsigned int fr_fifo_num_elements(fr_fifo_t *fi)
Return the number of elements in the fifo queue.
Definition: fifo.c:170
void * fr_fifo_pop(fr_fifo_t *fi)
Pop data off of the fifo.
Definition: fifo.c:135
int fr_fifo_push(fr_fifo_t *fi, void *data)
Push data onto the fifo.
Definition: fifo.c:111
Definition: fifo.c:30
#define fr_fifo_create(_ctx, _max_entries, _node_free)
Creates a fifo.
Definition: fifo.h:66
int fr_inet_pton(fr_ipaddr_t *out, char const *value, ssize_t inlen, int af, bool resolve, bool mask)
Simple wrapper to decide whether an IP value is v4 or v6 and call the appropriate parser.
Definition: inet.c:778
int fr_inet_pton_port(fr_ipaddr_t *out, uint16_t *port_out, char const *value, ssize_t inlen, int af, bool resolve, bool mask)
Parses IPv4/6 address + port, to fr_ipaddr_t and integer (port)
Definition: inet.c:937
int8_t fr_ipaddr_cmp(fr_ipaddr_t const *a, fr_ipaddr_t const *b)
Compare two ip addresses.
Definition: inet.c:1346
int af
Address family.
Definition: inet.h:64
IPv4/6 prefix.
Definition: merged_model.c:272
#define PERROR(_fmt,...)
Definition: log.h:228
#define REXDENT()
Exdent (unindent) R* messages by one level.
Definition: log.h:443
#define DEBUG3(_fmt,...)
Definition: log.h:266
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition: log.h:528
#define RPDEBUG2(fmt,...)
Definition: log.h:347
#define RDEBUG3(fmt,...)
Definition: log.h:343
#define PDEBUG2(_fmt,...)
Definition: log.h:272
#define RWARN(fmt,...)
Definition: log.h:297
#define PWARN(_fmt,...)
Definition: log.h:227
#define RERROR(fmt,...)
Definition: log.h:298
#define RPERROR(fmt,...)
Definition: log.h:302
#define RINFO(fmt,...)
Definition: log.h:296
#define RPEDEBUG(fmt,...)
Definition: log.h:376
#define RINDENT()
Indent R* messages by one level.
Definition: log.h:430
talloc_free(reap)
@ L_DBG_LVL_3
3rd highest priority debug messages (-xxx | -Xx).
Definition: log.h:72
unsigned short uint16_t
Definition: merged_model.c:31
unsigned int uint32_t
Definition: merged_model.c:33
long int ssize_t
Definition: merged_model.c:24
unsigned char uint8_t
Definition: merged_model.c:30
#define UINT8_MAX
Definition: merged_model.c:32
char const * inet_ntop(int af, void const *src, char *dst, size_t cnt)
Definition: missing.c:443
int fr_pair_list_copy(TALLOC_CTX *ctx, fr_pair_list_t *to, fr_pair_list_t const *from)
Duplicate a list of pairs.
Definition: pair.c:2319
void fr_pair_list_init(fr_pair_list_t *list)
Initialise a pair list header.
Definition: pair.c:46
int fr_pool_start(fr_pool_t *pool)
Definition: pool.c:1116
void fr_pool_connection_release(fr_pool_t *pool, request_t *request, void *conn)
Release a connection.
Definition: pool.c:1407
fr_pool_state_t const * fr_pool_state(fr_pool_t *pool)
Get the number of connections currently in the pool.
Definition: pool.c:1173
int fr_pool_reconnect(fr_pool_t *pool, request_t *request)
Mark connections for reconnection, and spawn at least 'start' connections.
Definition: pool.c:1244
void * fr_pool_connection_get(fr_pool_t *pool, request_t *request)
Reserve a connection in the connection pool.
Definition: pool.c:1392
int fr_pool_connection_close(fr_pool_t *pool, request_t *request, void *conn)
Delete a connection from the connection pool.
Definition: pool.c:1537
int fr_pool_start_num(fr_pool_t *pool)
Connection pool get start.
Definition: pool.c:1193
void fr_pool_enable_triggers(fr_pool_t *pool, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Enable triggers for a connection pool.
Definition: pool.c:933
fr_pool_t * fr_pool_init(TALLOC_CTX *ctx, CONF_SECTION const *cs, void *opaque, fr_pool_connection_create_t c, fr_pool_connection_alive_t a, char const *log_prefix)
Create a new connection pool.
Definition: pool.c:967
void fr_pool_reconnect_func(fr_pool_t *pool, fr_pool_reconnect_t reconnect)
Set a reconnection callback for the connection pool.
Definition: pool.c:1224
A connection pool.
Definition: pool.c:87
fr_time_t last_failed
Last time we tried to spawn a connection but failed.
Definition: pool.h:50
fr_time_t last_closed
Last time a connection was closed.
Definition: pool.h:57
fr_time_t last_released
Last time a connection was released.
Definition: pool.h:56
uint32_t num
Number of connections in the pool.
Definition: pool.h:67
#define REDEBUG(fmt,...)
Definition: radclient.h:52
#define RDEBUG2(fmt,...)
Definition: radclient.h:54
#define DEBUG2(fmt,...)
Definition: radclient.h:43
#define WARN(fmt,...)
Definition: radclient.h:47
#define INFO(fmt,...)
Definition: radict.c:54
static rs_t * conf
Definition: radsniff.c:53
static char const * hostname(char *buf, size_t buflen, uint32_t ipaddr)
Definition: radwho.c:133
uint32_t fr_rand(void)
Return a 32-bit random number.
Definition: rand.c:106
void * fr_rb_iter_next_inorder(fr_rb_iter_inorder_t *iter)
Return the next node.
Definition: rb.c:850
void * fr_rb_iter_init_inorder(fr_rb_iter_inorder_t *iter, fr_rb_tree_t *tree)
Initialise an in-order iterator.
Definition: rb.c:824
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
#define fr_rb_inline_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black tree.
Definition: rb.h:271
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Iterator structure for in-order traversal of an rbtree.
Definition: rb.h:321
The main red black tree structure.
Definition: rb.h:73
uint8_t max_nodes
Maximum number of cluster nodes to connect to.
Definition: base.h:119
fr_table_num_sorted_t const redis_rcodes[]
Definition: redis.c:40
bool use_cluster_map
use cluster map.
Definition: base.h:114
redisContext * handle
Hiredis context used when issuing commands.
Definition: base.h:101
char const * username
for acls.
Definition: base.h:116
uint32_t database
number on Redis server.
Definition: base.h:112
fr_redis_cluster_node_t * node
Node this connection is to.
Definition: base.h:102
void fr_redis_reply_print(fr_log_lvl_t lvl, redisReply *reply, request_t *request, int idx)
Print the response data in a useful treelike form.
Definition: redis.c:141
#define REDIS_ERROR_MOVED_STR
Definition: base.h:46
static void fr_redis_reply_free(redisReply **reply)
Wrap freeReplyObject so we consistently check for NULL pointers.
Definition: base.h:64
uint32_t max_redirects
Maximum number of times we can be redirected.
Definition: base.h:120
uint32_t fr_redis_version_num(char const *version)
Convert version string into a 32bit unsigned integer for comparisons.
Definition: redis.c:688
fr_redis_rcode_t fr_redis_command_status(fr_redis_conn_t *conn, redisReply *reply)
Check the reply for errors.
Definition: redis.c:71
fr_table_num_sorted_t const redis_reply_types[]
Definition: redis.c:30
fr_time_delta_t retry_delay
How long to wait when we received a -TRYAGAIN message.
Definition: base.h:124
#define REDIS_ERROR_ASK_STR
Definition: base.h:47
fr_redis_rcode_t
Codes are ordered inversely by priority.
Definition: base.h:87
@ REDIS_RCODE_RECONNECT
Transitory error, caller should retry the operation with a new connection.
Definition: base.h:91
@ REDIS_RCODE_SUCCESS
Operation was successful.
Definition: base.h:88
@ REDIS_RCODE_MOVE
Attempt operation on an alternative node with remap.
Definition: base.h:94
@ REDIS_RCODE_TRY_AGAIN
Try the operation again.
Definition: base.h:90
@ REDIS_RCODE_NO_SCRIPT
Script doesn't exist.
Definition: base.h:95
@ REDIS_RCODE_ASK
Attempt operation on an alternative node.
Definition: base.h:93
@ REDIS_RCODE_ERROR
Unrecoverable library/server error.
Definition: base.h:89
char const * password
to authenticate to Redis.
Definition: base.h:117
fr_redis_rcode_t fr_redis_get_version(char *out, size_t out_len, fr_redis_conn_t *conn)
Get the version of Redis running on the remote server.
Definition: redis.c:638
uint32_t max_alt
Maximum alternative nodes to try.
Definition: base.h:123
uint32_t max_retries
Maximum number of times we attempt a command when receiving successive -TRYAGAIN messages.
Definition: base.h:121
Configuration parameters for a redis connection.
Definition: base.h:109
Connection handle, holding a redis context.
Definition: base.h:100
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
Definition: snprintf.c:689
return count
Definition: module.c:163
fr_assert(0)
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition: table.h:772
An element in a lexicographically sorted array of name to num mappings.
Definition: table.h:49
int talloc_link_ctx(TALLOC_CTX *parent, TALLOC_CTX *child)
Link two different parent and child contexts, so the child is freed before the parent.
Definition: talloc.c:171
char * talloc_typed_asprintf(TALLOC_CTX *ctx, char const *fmt,...)
Call talloc vasprintf, setting the type on the new chunk correctly.
Definition: talloc.c:492
static int64_t fr_time_to_sec(fr_time_t when)
Convert an fr_time_t (internal time) to number of sec since the unix epoch (wallclock time)
Definition: time.h:731
#define fr_time_delta_to_timespec(_delta)
Convert a delta to a timespec.
Definition: time.h:666
#define fr_time_delta_ispos(_a)
Definition: time.h:290
#define fr_time_delta_to_timeval(_delta)
Convert a delta to a timeval.
Definition: time.h:656
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition: time.h:229
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
Definition: time.h:637
A time delta, a difference in time measured in nanoseconds.
Definition: time.h:80
"server local" time.
Definition: time.h:69
void trigger_args_afrom_server(TALLOC_CTX *ctx, fr_pair_list_t *list, char const *server, uint16_t port)
Create trigger arguments to describe the server the pool connects to.
Definition: trigger.c:454
bool trigger_enabled(void)
Return whether triggers are enabled.
Definition: trigger.c:135
Master include file to access all functions and structures in the library.
bool fr_pair_list_empty(fr_pair_list_t const *list)
Is a valuepair list empty.
Definition: pair_inline.c:125
void fr_pair_list_free(fr_pair_list_t *list)
Free memory used by a valuepair list.
Definition: pair_inline.c:113
int af
AF_INET, AF_INET6, or AF_UNIX.
Definition: socket.h:78
Holds information necessary for binding or connecting to a socket.
Definition: socket.h:63
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition: strerror.h:64
#define fr_strerror_const(_msg)
Definition: strerror.h:223
#define fr_box_strvalue_len(_val, _len)
Definition: value.h:286
static size_t char ** out
Definition: value.h:997