The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
cluster.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 6be16bb474662819ea79fd7bde89a02a6025bd53 $
19 * @file redis.c
20 * @brief conf functions for interacting with Redis cluster via Hiredis.
21 *
22 * @author Arran Cudbard-Bell
23 *
24 * @copyright 2015 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
25 * @copyright 2015 Network RADIUS (legal@networkradius.com)
26 * @copyright 2015 The FreeRADIUS server project
27 *
28 * Overview
29 * ========
30 *
31 * Read and understand this http://redis.io/topics/cluster-spec first, else the text below
32 * will not be useful.
33 *
34 * Using the cluster's public API
35 * ------------------------------
36 *
37 * The two functions used at runtime to issue commands are #fr_redis_cluster_state_init
38 * and #fr_redis_cluster_state_next.
39 *
40 * #fr_redis_cluster_state_init initialises the structure we use to track which node
41 * we're communicating with, and uses a key (string) to determine which node we should
42 * try and contact first.
43 *
44 * #fr_redis_cluster_state_next examines the result of the last command, and either
45 * gets a new connection, or errors out.
46 *
47 * In both cases the connection should not be released by the caller, only by
48 * #fr_redis_cluster_state_next.
49 *
50 * Below is the sequence of calls required to use the cluster:
51 *
52 * 1. During initialization allocate a cluster configuration structure with
53 * #fr_redis_cluster_alloc. This holds the cluster configuration and state.
54 * 2. Declare a #fr_redis_cluster_state_t in the function that needs to issue
55 * commands against the cluster.
56 * 3. Call #fr_redis_cluster_state_init with relevant arguments including a pointer
57 * to the #fr_redis_cluster_state_t struct, and a key/key_len used for initial
58 * lookup. For most commands the key is the value of the second argument.
59 * #fr_redis_cluster_state_init will then and reserve/pass back a connection for
60 * the pool associated with the node associated with the key.
61 * 4. Use the connection that was passed back, to issue a Redis command against.
62 * 5. Use #fr_redis_command_status to translate the result of the command into
63 * a #fr_redis_rcode_t value.
64 * 6. Call #fr_redis_cluster_state_next with relevant arguments including a pointer
65 * to the #fr_redis_cluster_state_t struct and the #fr_redis_rcode_t value for the
66 * last command.
67 * 7. If #fr_redis_cluster_state_next returns 0, repeat steps 4-7. Otherwise
68 * stop and analyse the return value.
69 *
70 * See #fr_redis_cluster_state_init for example code.
71 *
72 * Structures
73 * ----------
74 *
75 * This code maintains a series structures for efficient lookup and lockless operations.
76 *
77 * The important ones are:
78 * - An array of #fr_redis_cluster_node_t. These are pre-allocated on startup and are
79 * never added to, or removed from.
80 * - An #fr_fifo_t. This contains the queue of nodes that may be re-used.
81 * - An #fr_rb_tree_t. This contains a tree of nodes which are active. The tree is built on IP
82 * address and port.
83 *
84 * Each #fr_redis_cluster_node_t contains a master ID, and an array of slave IDs. The IDs are array
85 * indexes in the fr_redis_cluster_t.node array. We use 8bit unsigned integers instead of
86 * pointers to save space. Using pointers, the node[] array would need 784K, using IDs
87 * it uses 112K. Still not light on memory, but a bit more acceptable.
88 * Currently the key_slot array is shadowed by key_slot_pending, used to stage new key_slot
89 * mappings. This doubles the memory used. We may want to consider allocating key_slot_pending
90 * only during remappings and freeing it after.
91 *
92 * Mapping/Remapping the cluster
93 * -----------------------------
94 *
95 * On startup, and during cluster operation, a remap may be performed. A remap involves
96 * the following steps:
97 *
98 * 1. Executing the Redis 'cluster slots' command.
99 * 2. Validating the result of this command. We need to do extensive validation to
100 * avoid SEGV on invalid data, due to the way libhiredis presents the result.
101 * 3. Determining the intersection between nodes described in the result, and those already
102 * in our #fr_rb_tree_t.
103 * 4. Connecting to nodes that were in the result, but not in the tree.
104 * Note: If we can't connect to any of the masters, we count the map as invalid, roll
105 * back any newly connected nodes, and error out. Slave failure is OK.
106 * 5. Mapping keyslot ranges to nodes in the key_slot_pending array.
107 * 6. Verifying there are no holes in the ranges (if there are, we roll back and error out).
108 * 7. Applying the new keyslot ranges.
109 * 8. Removing nodes no longer used by the key slots, and adding them back to the free
110 * nodes queue.
111 *
112 * #cluster_map_get and #cluster_map_apply, perform the operations described
113 * above. The get function, issues the 'cluster slots' command and performs validation, the
114 * apply function processes and applies the map.
115 *
116 * Failing to apply a map is not a fatal error at runtime, and is only fatal on startup if
117 * pool.start > 0.
118 *
119 * The cluster client can continue to operate, albeit inefficiently, with a stale cluster map
120 * by following '-ASK' and '-MOVE' redirects.
121 *
122 * Remaps are limited to one per second. If any operation sets the remap_needed flag, or
123 * attempts a remap directly, the remap may be skipped if one occurred recently.
124 *
125 *
126 * Processing '-ASK' and '-MOVE' redirects
127 * ---------------------------------------
128 *
129 * The code treats '-ASK' (temporary redirect) and '-MOVE' (permanent redirect) responses
130 * similarly. If the node is known, then a connection is reserved from its pool, if the node
131 * is not known, a new pool is established, and a connection reserved.
132 *
133 * The difference between '-ASK' and '-MOVE' is that '-MOVE' attempts a cluster remap before
134 * following the redirect.
135 *
136 * The data from '-MOVE' responses, is not used to alter the cluster map. That is only done
137 * on successful remap.
138 *
139 *
140 * Processing '-TRYAGAIN'
141 * ----------------------
142 *
143 * If the cluster is in a state of flux, a node may return '-TRYAGAIN' to indicated that we
144 * should attempt the operation again. The cluster spec says we should attempt the operation
145 * after some time. This time is configurable.
146 *
147 */
148
149#include <freeradius-devel/util/debug.h>
150#include <freeradius-devel/server/cf_parse.h>
151
152#include <freeradius-devel/util/fifo.h>
153#include <freeradius-devel/util/misc.h>
154#include <freeradius-devel/util/rand.h>
155
156#include "config.h"
157#include "base.h"
158#include "cluster.h"
159#include "crc16.h"
160
161#ifndef WITH_TLS
162# undef HAVE_REDIS_SSL
163#endif
164
165#ifdef HAVE_REDIS_SSL
166#include <freeradius-devel/tls/strerror.h>
167#include <hiredis/hiredis_ssl.h>
168#endif
169
170#define KEY_SLOTS 16384 //!< Maximum number of keyslots (should not change).
171
172#define MAX_SLAVES 5 //!< Maximum number of slaves associated
173 //!< with a keyslot.
174
175/*
176 * Periods and weights for live node selection
177 */
178#define CLOSED_PERIOD 10000 //!< How recently must the closed have
179 //!< occurred for us to care.
180
181#define CLOSED_WEIGHT 1 //!< What weight to give to nodes that
182 //!< had a connection closed recently.
183
184#define FAILED_PERIOD 10000 //!< How recently must the spawn failure
185 //!< occurred for us to care.
186
187#define FAILED_WEIGHT 1 //!< What weight to give to nodes that
188 //!< had a spawn failure recently.
189
190#define RELEASED_PERIOD 10000 //!< Period after which we don't care
191 //!< about when the last connection was
192 //!< released.
193
194#define RELEASED_MIN_WEIGHT 1000 //!< Minimum weight to assign to node.
195
196/** Live nodes data, used to perform weighted random selection of alternative nodes
197 */
198typedef struct {
199 struct {
200 uint8_t id; //!< Node ID.
201 fr_pool_state_t const *pool_state; //!< Connection pool stats.
202 unsigned int cumulative; //!< Cumulative weight.
203 } node[UINT8_MAX - 1]; //!< Array of live node IDs (and weights).
204 uint8_t next; //!< Next index in live.
207
208/** A Redis cluster node
209 *
210 * Passed as opaque data to pools which open connection to nodes.
211 */
213 fr_rb_node_t rbnode; //!< Entry into the tree of redis nodes.
214
215 char name[INET6_ADDRSTRLEN]; //!< Buffer to hold IP string.
216 //!< text for debug messages.
217 uint8_t id; //!< Node ID (index in node array).
218
219 fr_socket_t addr; //!< Current node address.
220 fr_socket_t pending_addr; //!< New node address to be applied when the pool
221 //!< is reconnected.
222
223 fr_redis_cluster_t *cluster; //!< Common configuration (database number,
224 //!< password, etc..).
225 fr_pool_t *pool; //!< Pool associated with this node.
226 CONF_SECTION *pool_cs; //!< Pool configuration section associated with node.
227
228 bool is_active; //!< Whether this node is in the active node set.
229 bool is_master; //!< Whether this node is a master.
230 //!< This is needed for commands like 'KEYS', which
231 //!< we need to issue to every master in the cluster.
232};
233
234/** Indexes in the fr_redis_cluster_node_t array for a single key slot
235 *
236 * When dealing with 16K entries, space is a concern. It's significantly
237 * more memory efficient to use 8bit indexes than 64bit pointers for each
238 * of the key slot to node mappings.
239 */
241 uint8_t slave[MAX_SLAVES]; //!< R/O node (slave) for this key slot.
242 uint8_t slave_num; //!< Number of slaves associated with this key slot.
243 uint8_t master; //!< R/W node (master) for this key slot.
244};
245
246/** A redis cluster
247 *
248 * Holds all the structures and collections of nodes, to represent a Redis cluster.
249 */
251 char const *log_prefix; //!< What to prepend to log messages.
252 char const *trigger_prefix; //!< Trigger path.
253 fr_pair_list_t trigger_args; //!< Arguments to pass to triggers.
254 bool triggers_enabled; //!< Whether triggers are enabled.
255
256 bool remapping; //!< True when cluster is being remapped.
257 bool remap_needed; //!< Set true if at least one cluster node is definitely
258 //!< unreachable. Set false on successful remap.
259 fr_time_t last_updated; //!< Last time the cluster mappings were updated.
260 CONF_SECTION *module; //!< Module configuration.
261
262 fr_redis_conf_t *conf; //!< Base configuration data such as the database number
263 //!< and passwords.
264#ifdef HAVE_REDIS_SSL
265 SSL_CTX *ssl_ctx; //!< SSL context.
266#endif
267
268 fr_redis_cluster_node_t *node; //!< Structure containing a node id, its address and
269 //!< a pool of its connections.
270
271 fr_fifo_t *free_nodes; //!< Queue of free nodes (or nodes waiting to be reused).
272 fr_rb_tree_t *used_nodes; //!< Tree of used nodes.
273
274 fr_redis_cluster_key_slot_t key_slot[KEY_SLOTS]; //!< Lookup table of slots to pools.
276
277 pthread_mutex_t mutex; //!< Mutex to synchronise cluster operations.
278};
279
288
289/** Resolve key to key slot
290 *
291 * Identical to the example implementation, except it uses memchr which will
292 * be faster, and isn't so needlessly complex.
293 *
294 * @param[in] key to resolve.
295 * @param[in] key_len length of key.
296 * @return key slot index for the key.
297 */
298static uint16_t cluster_key_hash(uint8_t const *key, size_t key_len)
299{
300 uint8_t *p, *q;
301
302 p = memchr(key, '{', key_len);
303 if (!p) {
304 all:
305 return fr_crc16_xmodem(key, key_len) & (KEY_SLOTS - 1);
306 }
307
308 q = memchr(p, '}', key_len - (p - key)); /* look for } after { */
309 if (!q || (q == p + 1)) goto all; /* no } or {}, hash everything */
310
311 p++; /* skip '{' */
312
313 return fr_crc16_xmodem(p, q - p) & (KEY_SLOTS - 1); /* hash stuff between { and } */
314}
315
316/** Compare two redis nodes to check equality
317 *
318 * @param[in] one first node.
319 * @param[in] two second node.
320 * @return CMP(one, two)
321 */
322static int8_t _cluster_node_cmp(void const *one, void const *two)
323{
324 fr_redis_cluster_node_t const *a = one;
325 fr_redis_cluster_node_t const *b = two;
326 int ret;
327
328 ret = fr_ipaddr_cmp(&a->addr.inet.dst_ipaddr, &b->addr.inet.dst_ipaddr);
329 if (ret != 0) return ret;
330
331 return CMP(a->addr.inet.dst_port, b->addr.inet.dst_port);
332}
333
334/** Reconnect callback to apply new pool config
335 *
336 * @param[in] pool to apply new configuration to.
337 * @param[in] opaque data passed to the connection pool.
338 */
339static void _cluster_node_conf_apply(fr_pool_t *pool, void *opaque)
340{
342 fr_redis_cluster_node_t *node = opaque;
343
345 node->addr = node->pending_addr;
346
347 if (node->cluster->triggers_enabled) {
348 trigger_args_afrom_server(pool, &args, node->name, node->addr.inet.dst_port);
349 if (fr_pair_list_empty(&args)) return;
350
352 &node->cluster->trigger_args) >= 0);
353
355
357 }
358}
359
360/** Establish a connection to a cluster node
361 *
362 * @note Must be called with the cluster mutex locked.
363 * @note Configuration to use for the connection must be set in node->pending_addr, not node->cluster->conf.
364 *
365 * @param[in] cluster to search in.
366 * @param[in] node config.
367 * @return
368 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
369 * - FR_REDIS_CLUSTER_RCODE_FAILED if the operation failed.
370 */
372{
373 char const *p;
374
375 fr_assert(node->pending_addr.inet.dst_ipaddr.af);
376
377 /*
378 * Write out the IP address and Port in string form
379 */
380 p = inet_ntop(node->pending_addr.inet.dst_ipaddr.af, &node->pending_addr.inet.dst_ipaddr.addr,
381 node->name, sizeof(node->name));
383
384 /*
385 * Node has never been used before, needs a pool allocated for it.
386 */
387 if (!node->pool) {
388 char buffer[256];
390 CONF_SECTION *pool;
391
393 snprintf(buffer, sizeof(buffer), "%s [%i]", cluster->log_prefix, node->id);
394
395 pool = cf_section_find(cluster->module, "pool", NULL);
396 /*
397 * Dup so we can re-parse, and have unique CONF_DATA
398 */
399 node->pool_cs = cf_section_dup(cluster, NULL, pool, "pool", NULL, true);
400 node->addr = node->pending_addr;
401 node->pool = fr_pool_init(cluster, node->pool_cs, node,
403 if (!node->pool) {
404 error:
405 TALLOC_FREE(node->pool_cs);
406 TALLOC_FREE(node->pool);
408 }
410
411 if (trigger_enabled() && cluster->triggers_enabled) {
412 trigger_args_afrom_server(node->pool, &args, node->name, node->addr.inet.dst_port);
413 if (fr_pair_list_empty(&args)) goto error;
414
415 if (!fr_pair_list_empty(&cluster->trigger_args)) MEM(fr_pair_list_copy(cluster, &args, &cluster->trigger_args) >= 0);
416
418
420 }
421
422 if (fr_pool_start(node->pool) < 0) goto error;
423
425 }
426
427 /*
428 * Apply the new config to the possibly live pool
429 */
430 if (fr_pool_reconnect(node->pool, NULL) < 0) goto error;
431
433}
434
435/** Parse a -MOVED or -ASK redirect
436 *
437 * Converts the body of the -MOVED or -ASK error into an IPv4/6 address and port.
438 *
439 * @param[out] key_slot value extracted from redirect string (may be NULL).
440 * @param[out] node_addr Redis node ipaddr and port extracted from redirect string.
441 * @param[in] redirect to process.
442 * @return
443 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
444 * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT if the server returned an invalid redirect.
445 */
447 redisReply *redirect)
448{
449 char *p, *q;
450 unsigned long key;
451 uint16_t port;
452 fr_ipaddr_t ipaddr;
453
454 if (!redirect || (redirect->type != REDIS_REPLY_ERROR)) {
456 }
457
458 p = redirect->str;
459 if (strncmp(REDIS_ERROR_MOVED_STR, redirect->str, sizeof(REDIS_ERROR_MOVED_STR) - 1) == 0) {
460 q = p + sizeof(REDIS_ERROR_MOVED_STR); /* not a typo, skip space too */
461 } else if (strncmp(REDIS_ERROR_ASK_STR, redirect->str, sizeof(REDIS_ERROR_ASK_STR) - 1) == 0) {
462 q = p + sizeof(REDIS_ERROR_ASK_STR); /* not a typo, skip space too */
463 } else {
464 fr_strerror_const("No '-MOVED' or '-ASK' log_prefix");
466 }
467 if ((size_t)(q - p) >= (size_t)redirect->len) {
468 fr_strerror_const("Truncated");
470 }
471 p = q;
472 key = strtoul(p, &q, 10);
473 if (key > KEY_SLOTS) {
474 fr_strerror_printf("Key %lu outside of redis slot range", key);
476 }
477 p = q;
478
479 if (*p != ' ') {
480 fr_strerror_const("Missing key/host separator");
482 }
483 p++; /* Skip the ' ' */
484
485 if (fr_inet_pton_port(&ipaddr, &port, p, redirect->len - (p - redirect->str), AF_UNSPEC, true, true) < 0) {
487 }
488 fr_assert(ipaddr.af);
489
490 if (key_slot) *key_slot = key;
491 if (node_addr) {
492 node_addr->inet.dst_ipaddr = ipaddr;
493 node_addr->inet.dst_port = port;
494 }
495
497}
498
499/** Apply a cluster map received from a cluster node
500 *
501 * @note Errors may be retrieved with fr_strerror().
502 * @note Must be called with the cluster mutex held.
503 *
504 * Key slot range structure
505 @verbatim
506 [0] -> key slot range 0
507 [0] -> key_slot_start
508 [1] -> key_slot_end
509 [2] -> master_node
510 [0] -> master 0 ip (string)
511 [1] -> master 0 port (number)
512 [3..n] -> slave_node(s)
513 [1] -> key slot range 1)
514 [0] -> key_slot_start
515 [1] -> key_slot_end
516 [2] -> master_node
517 [0] -> master 1 ip (string)
518 [1] -> master 1 port (number)
519 [3..n] -> slave_node(s)
520 [n] -> key slot range n
521 [0] -> key_slot_start
522 [1] -> key_slot_end
523 [2] -> master_node
524 [0] -> master n ip (string)
525 [1] -> master n port (number)
526 [3..n] -> slave_node(s)
527 @endverbatim
528 *
529 * @param[in,out] cluster to apply map to.
530 * @param[in] reply from #cluster_map_get.
531 * @return
532 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
533 * - FR_REDIS_CLUSTER_RCODE_FAILED on failure.
534 * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
535 * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT if the map didn't provide nodes for all keyslots.
536 */
538{
539 size_t i;
540 uint8_t r = 0;
541
543
544 uint8_t rollback[UINT8_MAX]; // Set of nodes to re-add to the queue on failure.
545 bool active[UINT8_MAX]; // Set of nodes active in the new cluster map.
546 bool master[UINT8_MAX]; // Master nodes.
547#ifndef NDEBUG
548# define SET_ADDR(_addr, _map) \
549do { \
550 int _ret; \
551 _ret = fr_inet_pton(&_addr.inet.dst_ipaddr, _map->element[0]->str, _map->element[0]->len, AF_UNSPEC, true, true);\
552 fr_assert(_ret == 0);\
553 _addr.inet.dst_port = _map->element[1]->integer; \
554} while (0)
555#else
556# define SET_ADDR(_addr, _map) \
557do { \
558 fr_inet_pton(&_addr.inet.dst_ipaddr, _map->element[0]->str, _map->element[0]->len, AF_UNSPEC, true, true);\
559 _addr.inet.dst_port = _map->element[1]->integer; \
560} while (0)
561#endif
562
563#define SET_INACTIVE(_node) \
564do { \
565 (_node)->is_active = false; \
566 (_node)->is_master = false; \
567 fr_rb_delete(cluster->used_nodes, _node); \
568 fr_fifo_push(cluster->free_nodes, _node); \
569} while (0)
570
571#define SET_ACTIVE(_node) \
572do { \
573 (_node)->is_active = true; \
574 fr_rb_insert(cluster->used_nodes, _node); \
575 fr_fifo_pop(cluster->free_nodes); \
576 active[(_node)->id] = true; \
577 rollback[r++] = (_node)->id; \
578} while (0)
579
580 fr_assert(reply->type == REDIS_REPLY_ARRAY);
581
582 memset(&rollback, 0, sizeof(rollback));
583 memset(active, 0, sizeof(active));
584 memset(master, 0, sizeof(master));
585
586 cluster->remapping = true;
587
588 /*
589 * Must be cleared with the mutex held
590 */
591 memset(&cluster->key_slot_pending, 0, sizeof(cluster->key_slot_pending));
592
593 /*
594 * Insert new nodes and markup the keyslot indexes
595 * in our temporary keyslot_array.
596 *
597 * A map consists of an array with the following indexes:
598 * [0] -> key_slot_start
599 * [1] -> key_slot_end
600 * [2] -> master_node
601 * [3..n] -> slave_node(s)
602 */
603 for (i = 0; i < reply->elements; i++) {
604 size_t j;
605 long long int k;
606 int slaves = 0;
607 fr_redis_cluster_node_t *found, *spare;
608 fr_redis_cluster_node_t find = {}; /* Initialise unused fields to stop Coverity complaints */
610 redisReply *map = reply->element[i];
611
612 memset(&tmpl_slot, 0, sizeof(tmpl_slot));
613
614 SET_ADDR(find.addr, map->element[2]);
615 found = fr_rb_find(cluster->used_nodes, &find);
616 if (found) {
617 active[found->id] = true;
618 goto reuse_master_node;
619 }
620
621 /*
622 * Process the master
623 *
624 * A master node consists of any array with the following indexes:
625 * [0] -> node ip (as string)
626 * [1] -> node port
627 */
628 spare = fr_fifo_peek(cluster->free_nodes);
629 if (!spare) {
630 out_of_nodes:
631 fr_strerror_const("Reached maximum connected nodes");
633 error:
634 cluster->remapping = false;
635 cluster->last_updated = fr_time();
636 /* Re-insert new nodes back into the free_nodes queue */
637 for (i = 0; i < r; i++) SET_INACTIVE(&cluster->node[rollback[i]]);
638 return rcode;
639 }
640
641 spare->pending_addr = find.addr;
642 rcode = cluster_node_connect(cluster, spare);
643 if (rcode < 0) goto error;
644
645 /*
646 * Check to see if the node we just configured
647 * already exists in the tree. If it does we
648 * use that, else we add it to the array of
649 * nodes to rollback on failure.
650 */
651 SET_ACTIVE(spare);
652 found = spare;
653
654 reuse_master_node:
655 tmpl_slot.master = found->id;
656 master[found->id] = true; /* Mark this node as a master */
657
658 /*
659 * Process the slaves
660 *
661 * A slave node consists of any array with the following indexes:
662 * [0] -> node ip (as string)
663 * [1] -> node port
664 */
665 for (j = 3; (j < map->elements); j++) {
666 SET_ADDR(find.addr, map->element[j]);
667 found = fr_rb_find(cluster->used_nodes, &find);
668 if (found) {
669 active[found->id] = true;
670 goto next;
671 }
672
673 spare = fr_fifo_peek(cluster->free_nodes);
674 if (!spare) goto out_of_nodes;
675
676 spare->pending_addr = find.addr;
677 if (cluster_node_connect(cluster, spare) < 0) continue; /* Slave failure is non-fatal */
678
679 SET_ACTIVE(spare);
680 found = spare;
681
682 next:
683 tmpl_slot.slave[slaves++] = found->id;
684
685 /* Hit the maximum number of slaves we allow */
686 if (slaves >= MAX_SLAVES) break;
687 }
688 tmpl_slot.slave_num = slaves;
689
690 /*
691 * Copy our tmpl key slot to each of the key slots
692 * specified by the range for this map.
693 */
694 for (k = map->element[0]->integer; k <= map->element[1]->integer; k++) {
695 memcpy(&cluster->key_slot_pending[k], &tmpl_slot,
696 sizeof(*(cluster->key_slot_pending)));
697 }
698 }
699
700 /*
701 * Check for holes in the pending_addr key_slot array
702 *
703 * The cluster specification says that upon
704 * detecting a 'NULL' key_slot we should
705 * check again to see if the cluster error has
706 * been resolved, but seeing as we're in the
707 * middle of updating the cluster from very
708 * recent output of 'cluster slots' it's best to
709 * error out.
710 */
711 for (i = 0; i < KEY_SLOTS; i++) {
712 if (cluster->key_slot_pending[i].master == 0) {
713 fr_strerror_printf("Cluster is misconfigured, no node assigned for key %zu", i);
715 goto error;
716 }
717 }
718
719 /*
720 * We have connections/pools for all the nodes in
721 * the new map, apply it to the live cluster.
722 *
723 * Other workers may be using the key slot table,
724 * but that's ok. Nodes and pools are never freed,
725 * so the worst that will happen, is they'll hit
726 * the wrong node for the key, and get redirected.
727 */
728 memcpy(&cluster->key_slot, &cluster->key_slot_pending, sizeof(cluster->key_slot));
729
730 /*
731 * Anything not in the active set of nodes gets
732 * added back into the queue, to be re-used.
733 *
734 * We start at 1, as node 0 is reserved.
735 */
736 for (i = 1; i < cluster->conf->max_nodes; i++) {
737#ifndef NDEBUG
739
740 if (cluster->node[i].is_active) {
741 /* Sanity check for duplicates that are active */
742 found = fr_rb_find(cluster->used_nodes, &cluster->node[i]);
743 fr_assert(found);
744 fr_assert(found->is_active);
745 fr_assert(found->id == i);
746 }
747#endif
748
749 if (!active[i] && cluster->node[i].is_active) {
750 SET_INACTIVE(&cluster->node[i]); /* Sets is_master = false */
751
752 /*
753 * Only change the masters once we've successfully
754 * remapped the cluster.
755 */
756 } else if (master[i]) {
757 cluster->node[i].is_master = true;
758 }
759 }
760
761 cluster->remapping = false;
762 cluster->last_updated = fr_time();
763
764 /*
765 * Sanity checks
766 */
767 fr_assert(((talloc_array_length(cluster->node) - 1) - fr_rb_num_elements(cluster->used_nodes)) ==
769
771}
772
773/** Validate a cluster map node entry
774 *
775 * @note Errors may be retrieved with fr_strerror().
776 * @note In a separate function, as it's called for both master and slave nodes.
777 *
778 * @param[in] node we're validating.
779 * @param[in] map_idx we're processing.
780 * @param[in] node_idx we're processing.
781 * @return
782 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
783 * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
784 */
785static int cluster_map_node_validate(redisReply *node, int map_idx, int node_idx)
786{
787 fr_ipaddr_t ipaddr;
788
789 if (node->type != REDIS_REPLY_ARRAY) {
790 fr_strerror_printf("Cluster map %i node %i is wrong type, expected array got %s",
791 map_idx, node_idx,
792 fr_table_str_by_value(redis_reply_types, node->element[1]->type, "<UNKNOWN>"));
794 }
795
796 /*
797 * As per the redis docs: https://redis.io/commands/cluster-slots
798 *
799 * Newer versions of Redis Cluster will output, for each Redis instance,
800 * not just the IP and port, but also the node ID as third element of the
801 * array. In future versions there could be more elements describing the
802 * node better. In general a client implementation should just rely on
803 * the fact that certain parameters are at fixed positions as specified,
804 * but more parameters may follow and should be ignored.
805 * Similarly a client library should try if possible to cope with the fact
806 * that older versions may just have the IP and port parameter.
807 */
808 if (node->elements < 2) {
809 fr_strerror_printf("Cluster map %i node %i has incorrect number of elements, expected at least "
810 "2 got %zu", map_idx, node_idx, node->elements);
812 }
813
814 if (node->element[0]->type != REDIS_REPLY_STRING) {
815 fr_strerror_printf("Cluster map %i node %i ip address is wrong type, expected string got %s",
816 map_idx, node_idx,
817 fr_table_str_by_value(redis_reply_types, node->element[0]->type, "<UNKNOWN>"));
819 }
820
821 if (fr_inet_pton(&ipaddr, node->element[0]->str, node->element[0]->len, AF_UNSPEC, true, true) < 0) {
823 }
824
825 if (node->element[1]->type != REDIS_REPLY_INTEGER) {
826 fr_strerror_printf("Cluster map %i node %i port is wrong type, expected integer got %s",
827 map_idx, node_idx,
828 fr_table_str_by_value(redis_reply_types, node->element[1]->type, "<UNKNOWN>"));
830 }
831
832 if (node->element[1]->integer < 0) {
833 fr_strerror_printf("Cluster map %i node %i port is too low, expected >= 0 got %lli",
834 map_idx, node_idx, node->element[1]->integer);
836 }
837
838 if (node->element[1]->integer > UINT16_MAX) {
839 fr_strerror_printf("Cluster map %i node %i port is too high, expected <= " STRINGIFY(UINT16_MAX)" "
840 "got %lli", map_idx, node_idx, node->element[1]->integer);
842 }
843
845}
846
847/** Learn a new cluster layout by querying the node that issued the -MOVE
848 *
849 * Also validates the response from the Redis cluster, so we can be sure that
850 * it's well formed, before doing more expensive operations.
851 *
852 * @note Errors may be retrieved with fr_strerror().
853 *
854 * @param[out] out Where to write cluster map.
855 * @param[in] conn to use for learning the new cluster map.
856 * @return
857 * - FR_REDIS_CLUSTER_RCODE_IGNORED if 'cluster slots' returned an error (indicating clustering not supported).
858 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
859 * - FR_REDIS_CLUSTER_RCODE_FAILED if issuing the command resulted in an error.
860 * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
861 * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
862 */
864{
865 redisReply *reply;
866 size_t i = 0;
867
868 *out = NULL;
869
870 reply = redisCommand(conn->handle, "cluster slots");
871 switch (fr_redis_command_status(conn, reply)) {
873 fr_redis_reply_free(&reply);
874 fr_strerror_const("No connections available");
876
878 default:
879 if (reply && reply->type == REDIS_REPLY_ERROR) {
880 fr_strerror_printf("%.*s", (int)reply->len, reply->str);
881 fr_redis_reply_free(&reply);
883 }
884 fr_strerror_const("Unknown client error");
886
888 break;
889 }
890
891 if (reply->type != REDIS_REPLY_ARRAY) {
892 fr_strerror_printf("Bad response to \"cluster slots\" command, expected array got %s",
893 fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
895 }
896
897 /*
898 * Clustering configured but no slots set
899 */
900 if (reply->elements == 0) {
901 fr_strerror_printf("Empty response to \"cluster slots\" command (zero length array)");
903 }
904
905 /*
906 * Validate the complete map set before returning.
907 */
908 for (i = 0; i < reply->elements; i++) {
909 size_t j;
910 redisReply *map;
911
912 map = reply->element[i];
913 if (map->type != REDIS_REPLY_ARRAY) {
914 fr_strerror_printf("Cluster map %zu is wrong type, expected array got %s",
915 i, fr_table_str_by_value(redis_reply_types, map->type, "<UNKNOWN>"));
916 error:
917 fr_redis_reply_free(&reply);
919 }
920
921 if (map->elements < 3) {
922 fr_strerror_printf("Cluster map %zu has too few elements, expected at least 3, got %zu",
923 i, map->elements);
924 goto error;
925 }
926
927 /*
928 * Key slot start
929 */
930 if (map->element[0]->type != REDIS_REPLY_INTEGER) {
931 fr_strerror_printf("Cluster map %zu key slot start is wrong type, expected integer got %s",
932 i, fr_table_str_by_value(redis_reply_types, map->element[0]->type, "<UNKNOWN>"));
933 goto error;
934 }
935
936 if (map->element[0]->integer < 0) {
937 fr_strerror_printf("Cluster map %zu key slot start is too low, expected >= 0 got %lli",
938 i, map->element[0]->integer);
939 goto error;
940 }
941
942 if (map->element[0]->integer > KEY_SLOTS) {
943 fr_strerror_printf("Cluster map %zu key slot start is too high, expected <= "
944 STRINGIFY(KEY_SLOTS) " got %lli", i, map->element[0]->integer);
945 goto error;
946 }
947
948 /*
949 * Key slot end
950 */
951 if (map->element[1]->type != REDIS_REPLY_INTEGER) {
952 fr_strerror_printf("Cluster map %zu key slot end is wrong type, expected integer got %s",
953 i, fr_table_str_by_value(redis_reply_types, map->element[1]->type, "<UNKNOWN>"));
954 goto error;
955 }
956
957 if (map->element[1]->integer < 0) {
958 fr_strerror_printf("Cluster map %zu key slot end is too low, expected >= 0 got %lli",
959 i, map->element[1]->integer);
960 goto error;
961 }
962
963 if (map->element[1]->integer > KEY_SLOTS) {
964 fr_strerror_printf("Cluster map %zu key slot end is too high, expected <= "
965 STRINGIFY(KEY_SLOTS) " got %lli", i, map->element[1]->integer);
966 goto error;
967 }
968
969 if (map->element[1]->integer < map->element[0]->integer) {
970 fr_strerror_printf("Cluster map %zu key slot start/end out of order. "
971 "Start was %lli, end was %lli", i, map->element[0]->integer,
972 map->element[1]->integer);
973 goto error;
974 }
975
976 /*
977 * Master node
978 */
979 if (cluster_map_node_validate(map->element[2], i, 0) < 0) goto error;
980
981 /*
982 * Slave nodes
983 */
984 for (j = 3; j < map->elements; j++) {
985 if (cluster_map_node_validate(map->element[j], i, j - 2) < 0) goto error;
986 }
987 }
988 *out = reply;
989
991}
992
993/** Perform a runtime remap of the cluster
994 *
995 * @note Errors may be retrieved with fr_strerror().
996 * @note Must be called with the cluster mutex free.
997 *
998 * @param[in] request The current request.
999 * @param[in,out] cluster to remap.
1000 * @param[in] conn to use to query the cluster.
1001 * @return
1002 * - FR_REDIS_CLUSTER_RCODE_IGNORED if 'cluster slots' returned an error (indicating clustering not supported).
1003 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
1004 * - FR_REDIS_CLUSTER_RCODE_FAILED if issuing the 'cluster slots' command resulted in a protocol error.
1005 * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
1006 * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
1007 */
1009{
1010 fr_time_t now;
1011 redisReply *map;
1013 size_t i, j;
1014
1015 /*
1016 * If the cluster was remapped very recently, or is being
1017 * remapped it's unlikely that it needs remapping again.
1018 */
1019 if (cluster->remapping) {
1020 in_progress:
1021 ROPTIONAL(RDEBUG2, DEBUG2, "Cluster remapping in progress, ignoring remap request");
1023 }
1024
1025 /*
1026 * The remap times are _our_ times, not the _request_ time.
1027 */
1028 now = fr_time();
1029 if (fr_time_to_sec(now) == fr_time_to_sec(cluster->last_updated)) {
1030 too_soon:
1031 ROPTIONAL(RWARN, WARN, "Cluster was updated less than a second ago, ignoring remap request");
1033 }
1034
1035 ROPTIONAL(RINFO, INFO, "Initiating cluster remap");
1036
1037 /*
1038 * Get new cluster information
1039 */
1040 ret = cluster_map_get(&map, conn);
1041 switch (ret) {
1042 case FR_REDIS_CLUSTER_RCODE_BAD_INPUT: /* Validation error */
1043 case FR_REDIS_CLUSTER_RCODE_NO_CONNECTION: /* Connection error */
1044 case FR_REDIS_CLUSTER_RCODE_FAILED: /* Error issuing command */
1045 return ret;
1046
1047 case FR_REDIS_CLUSTER_RCODE_IGNORED: /* Clustering not enabled, or not supported */
1048 cluster->remap_needed = false;
1050
1051 case FR_REDIS_CLUSTER_RCODE_SUCCESS: /* Success */
1052 break;
1053 }
1054
1055 /*
1056 * Print the mapping we received
1057 */
1058 ROPTIONAL(RINFO, INFO, "Cluster map consists of %zu key ranges", map->elements);
1059 for (i = 0; i < map->elements; i++) {
1060 redisReply *map_node = map->element[i];
1061
1062 ROPTIONAL(RINFO, INFO, "%zu - keys %lli-%lli", i,
1063 map_node->element[0]->integer,
1064 map_node->element[1]->integer);
1065
1066 if (request) RINDENT();
1067 ROPTIONAL(RINFO, INFO, "master: %s:%lli",
1068 map_node->element[2]->element[0]->str,
1069 map_node->element[2]->element[1]->integer);
1070 for (j = 3; j < map_node->elements; j++) {
1071 ROPTIONAL(RINFO, INFO, "slave%zu: %s:%lli", j - 3,
1072 map_node->element[j]->element[0]->str,
1073 map_node->element[j]->element[1]->integer);
1074 }
1075 if (request) REXDENT();
1076 }
1077
1078 /*
1079 * Check again that the cluster isn't being
1080 * remapped, or was remapped too recently,
1081 * now we hold the mutex and the state of
1082 * those variables is synchronized.
1083 */
1084 pthread_mutex_lock(&cluster->mutex);
1085 if (cluster->remapping) {
1086 pthread_mutex_unlock(&cluster->mutex);
1087 fr_redis_reply_free(&map); /* Free the map */
1088 goto in_progress;
1089 }
1090 if (fr_time_to_sec(now) == fr_time_to_sec(cluster->last_updated)) {
1091 pthread_mutex_unlock(&cluster->mutex);
1092 fr_redis_reply_free(&map); /* Free the map */
1093 goto too_soon;
1094 }
1095 ret = cluster_map_apply(cluster, map);
1096 if (ret == FR_REDIS_CLUSTER_RCODE_SUCCESS) cluster->remap_needed = false; /* Change on successful remap */
1097 pthread_mutex_unlock(&cluster->mutex);
1098
1099 fr_redis_reply_free(&map); /* Free the map */
1100 if (ret < 0) return FR_REDIS_CLUSTER_RCODE_FAILED;
1101
1103}
1104
1105/** Retrieve or associate a node with the server indicated in the redirect
1106 *
1107 * @note Errors may be retrieved with fr_strerror().
1108 *
1109 * @param[out] out Where to write the node representing the redirect server.
1110 * @param[in] cluster to draw node from.
1111 * @param[in] reply Redis reply containing the redirect information.
1112 * @return
1113 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
1114 * - FR_REDIS_CLUSTER_RCODE_FAILED no more nodes available.
1115 * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
1116 * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
1117 */
1119{
1120 fr_redis_cluster_node_t find, *found, *spare;
1121 fr_redis_conn_t *rconn;
1122
1123 uint16_t key;
1124
1125 memset(&find, 0, sizeof(find));
1126
1127 *out = NULL;
1128
1129 if (cluster_node_conf_from_redirect(&key, &find.addr, reply) < 0) return FR_REDIS_CLUSTER_RCODE_FAILED;
1130
1131 pthread_mutex_lock(&cluster->mutex);
1132 /*
1133 * If we have already have a pool for the
1134 * host we were redirected to, use that.
1135 */
1136 found = fr_rb_find(cluster->used_nodes, &find);
1137 if (found) {
1138 /* We have the new pool, don't need to hold the lock */
1139 pthread_mutex_unlock(&cluster->mutex);
1140 *out = found;
1142 }
1143
1144 /*
1145 * Otherwise grab a free node and try and connect
1146 * it to the server we were redirected to.
1147 */
1148 spare = fr_fifo_peek(cluster->free_nodes);
1149 if (!spare) {
1150 fr_strerror_const("Reached maximum connected nodes");
1151 pthread_mutex_unlock(&cluster->mutex);
1153 }
1154 spare->pending_addr = find.addr; /* Set the config to be applied */
1155 if (cluster_node_connect(cluster, spare) < 0) {
1156 pthread_mutex_unlock(&cluster->mutex);
1158 }
1159 fr_rb_insert(cluster->used_nodes, spare);
1160 fr_fifo_pop(cluster->free_nodes);
1161 found = spare;
1162
1163 /* We have the new pool, don't need to hold the lock */
1164 pthread_mutex_unlock(&cluster->mutex);
1165
1166 /*
1167 * Determine if we can establish a connection to
1168 * the new pool, to check if it's viable.
1169 */
1170 rconn = fr_pool_connection_get(found->pool, NULL);
1171 if (!rconn) {
1172 /*
1173 * To prevent repeated misconfigurations
1174 * using all free nodes, add the node
1175 * back to the spare queue if this
1176 * was the first connection attempt and
1177 * it failed.
1178 */
1179 pthread_mutex_lock(&cluster->mutex);
1180 fr_fifo_push(cluster->free_nodes, spare);
1181 pthread_mutex_unlock(&cluster->mutex);
1182
1183 fr_strerror_const("No connections available");
1185 }
1186 fr_pool_connection_release(found->pool, NULL, rconn);
1187 *out = found;
1188
1190}
1191
1192
1193/** Try to determine the health of a cluster node passively by examining its pool state
1194 *
1195 * Returns an integer value representing the likelihood that the pool is live.
1196 * Range is between 1 and 11,000.
1197 *
1198 * If a weight of 1 is returned, connections from the pool should be checked
1199 * (by pinging) before use.
1200 *
1201 * @param now The current time.
1202 * @param state of the connection pool.
1203 * @return
1204 * - 1 the pool is very likely to be bad.
1205 * - 2-11000 the pool is likely to be good, with a higher number
1206 * indicating higher probability of liveness.
1207 */
1209{
1210 /*
1211 * Failed spawn recently, probably bad
1212 */
1214
1215 /*
1216 * Closed recently, probably bad
1217 */
1219
1220 /*
1221 * Released too long ago, don't know
1222 */
1224
1225 /*
1226 * Released not long ago, might be ok.
1227 */
1229}
1230
1231/** Issue a ping request against a cluster node
1232 *
1233 * Establishes whether the connection to the node we have is live.
1234 *
1235 * @param request The current request.
1236 * @param node to ping.
1237 * @param conn the connection to ping on.
1238 * @return
1239 * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT if we got a bad response.
1240 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
1241 * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION on connection down.
1242 */
1244{
1245 redisReply *reply;
1246 fr_redis_rcode_t rcode;
1247
1248 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Executing command: PING", node->id);
1249 reply = redisCommand(conn->handle, "PING");
1250 rcode = fr_redis_command_status(conn, reply);
1251 if (rcode != REDIS_RCODE_SUCCESS) {
1252 ROPTIONAL(RPERROR, PERROR, "[%i] PING failed to %s:%i", node->id, node->name, node->addr.inet.dst_port);
1253 fr_redis_reply_free(&reply);
1255 }
1256
1257 if (reply->type != REDIS_REPLY_STATUS) {
1258 ROPTIONAL(RERROR, ERROR, "[%i] Bad PING response from %s:%i, expected status got %s",
1259 node->id, node->name, node->addr.inet.dst_port,
1260 fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
1261 fr_redis_reply_free(&reply);
1263 }
1264
1265 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Got response: %s", node->id, reply->str);
1266 fr_redis_reply_free(&reply);
1268}
1269
1270/** Attempt to find a live pool in the cluster
1271 *
1272 * The intent here is to find pools/nodes where a connection was released the shortest
1273 * time ago. Having a connection be released (vs closed) indicates that the pool is live.
1274 *
1275 * We don't want to have all workers try and grab a connection to this node however, as it
1276 * may still be dead (we don't know).
1277 *
1278 * So we use an inverse transform sample, to weight the nodes, based on time between now
1279 * and when the connection was released. Connections released closest to the current
1280 * time are given a higher weighting.
1281 *
1282 * Weight range is between 1 - 11,000.
1283 *
1284 * - If released > 10.0 seconds ago,information is not valid, weight 500.
1285 * - If closed < 10.0 seconds ago, it's a bad pool, weight 1.
1286 * - If spawn failed < 10.0 seconds ago, it's a bad pool, weight 1.
1287 * - If a connection was released 0.0 seconds ago, weight 11,000.
1288 * - If a connection was released 10.0 seconds ago, weight 1000.
1289 *
1290 * Using the above algorithm we use the experience of other workers using the cluster to
1291 * inform our alternative node selection.
1292 *
1293 * Suggestions on improving live node selection appreciated.
1294 *
1295 * Inverse transform sampling based roughly on the solution from this post:
1296 * http://stackoverflow.com/questions/17250568/randomly-choosing-from-a-list-with-weighted-probabilities
1297 *
1298 * Wikipedia page here:
1299 * https://en.wikipedia.org/wiki/Inverse_transform_sampling
1300 *
1301 * @note Must be called with the cluster mutex free.
1302 *
1303 * @param[out] live_node we found.
1304 * @param[out] live_conn to that node.
1305 * @param[in] request The current request (used for logging).
1306 * @param[in] cluster to search for live pools in.
1307 * @param[in] skip this node (it's bad).
1308 * @return 0 (iterates over the whole tree).
1309 */
1311 request_t *request, fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *skip)
1312{
1313 uint32_t i;
1314
1316 fr_time_t now;
1319
1320 ROPTIONAL(RDEBUG2, DEBUG2, "Searching for live cluster nodes");
1321
1322 if (fr_rb_num_elements(cluster->used_nodes) == 1) {
1323 no_alts:
1324 ROPTIONAL(RERROR, ERROR, "No alternative nodes available");
1325 return -1;
1326 }
1327
1328 live = talloc_zero(NULL, cluster_nodes_live_t); /* Too big for stack */
1329 live->skip = skip->id;
1330
1331 pthread_mutex_lock(&cluster->mutex);
1332 for (node = fr_rb_iter_init_inorder(cluster->used_nodes, &iter);
1333 node;
1334 node = fr_rb_iter_next_inorder(cluster->used_nodes, &iter)) {
1335 fr_assert(node->pool);
1336 if (live->skip == node->id) continue; /* Skip dead nodes */
1337
1338 live->node[live->next].pool_state = fr_pool_state(node->pool);
1339 live->node[live->next++].id = node->id;
1340 }
1341 pthread_mutex_unlock(&cluster->mutex);
1342
1343 fr_assert(live->next); /* There should be at least one */
1344 if (live->next == 1) goto no_alts; /* Weird, but conceivable */
1345
1346 now = fr_time();
1347
1348 /*
1349 * Weighted random selection
1350 */
1351 for (i = 0; (i < cluster->conf->max_alt) && live->next; i++) {
1352 fr_redis_conn_t *conn;
1353 uint8_t j;
1354 int first, last, pivot; /* Must be signed for BS */
1355 unsigned int find, cumulative = 0;
1356
1357 ROPTIONAL(RDEBUG3, DEBUG2, "(Re)assigning node weights:");
1358 if (request) RINDENT();
1359 for (j = 0; j < live->next; j++) {
1360 int weight;
1361
1362 weight = cluster_node_pool_health(now, live->node[j].pool_state);
1363 ROPTIONAL(RDEBUG3, DEBUG3, "Node %i weight: %i", live->node[j].id, weight);
1364 live->node[j].cumulative = (cumulative += weight);
1365 }
1366 if (request) REXDENT();
1367
1368 /*
1369 * Select a node at random
1370 */
1371 find = (fr_rand() & (cumulative - 1)); /* Between 1 and total */
1372 first = 0;
1373 last = live->next - 1;
1374 pivot = (first + last) / 2;
1375
1376 while (first <= last) {
1377 if (live->node[pivot].cumulative < find) {
1378 first = pivot + 1;
1379 } else if (live->node[pivot].cumulative == find) {
1380 break;
1381 } else {
1382 last = pivot - 1;
1383 }
1384 pivot = (first + last) / 2;
1385 }
1386 /*
1387 * Round up...
1388 */
1389 if (first > last) pivot = last + 1;
1390
1391 /*
1392 * Resolve the index to the actual node. We use IDs
1393 * to save memory...
1394 */
1395 node = &cluster->node[live->node[pivot].id];
1396 fr_assert(live->node[pivot].id == node->id);
1397
1398 ROPTIONAL(RDEBUG2, DEBUG2, "Selected node %i (using random value %i)", node->id, find);
1399 conn = fr_pool_connection_get(node->pool, request);
1400 if (!conn) {
1401 ROPTIONAL(RERROR, ERROR, "No connections available to node %i %s:%i", node->id,
1402 node->name, node->addr.inet.dst_port);
1403 next:
1404 /*
1405 * Remove the node we just discovered was bad
1406 * out of the set of nodes we're selecting over.
1407 */
1408 if (pivot == live->next) {
1409 live->next--;
1410 continue;
1411 }
1412 memcpy(&live->node[pivot], &live->node[live->next - 1], sizeof(live->node[pivot]));
1413 live->next--;
1414 continue;
1415 }
1416
1417 /*
1418 * PING! PONG?
1419 */
1420 switch (cluster_node_ping(request, node, conn)) {
1422 break;
1423
1425 fr_pool_connection_close(node->pool, request, conn);
1426 goto next;
1427
1428 default:
1429 fr_pool_connection_release(node->pool, request, conn);
1430 goto next;
1431 }
1432
1433 *live_node = node;
1434 *live_conn = conn;
1435 talloc_free(live);
1436
1437 return 0;
1438 }
1439
1440 ROPTIONAL(RERROR, ERROR, "Hit max alt limit %i, and no live connections found", cluster->conf->max_alt);
1441 talloc_free(live);
1442
1443 return -1;
1444}
1445
1446/** Callback for freeing a Redis connection
1447 *
1448 * @param[in] conn to free.
1449 * @return 0.
1450 */
1452{
1453 redisFree(conn->handle);
1454
1455 return 0;
1456}
1457
1458/** Create a new connection to a Redis node
1459 *
1460 * @param[in] ctx to allocate connection structure in. Will be freed at the same time as the pool.
1461 * @param[in] instance data of type #fr_redis_cluster_node_t. Holds parameters for establishing new connection.
1462 * @param[in] timeout The maximum time allowed to complete the connection.
1463 * @return
1464 * - New #fr_redis_conn_t on success.
1465 * - NULL on failure.
1466 */
1467void *fr_redis_cluster_conn_create(TALLOC_CTX *ctx, void *instance, fr_time_delta_t timeout)
1468{
1469 fr_redis_cluster_node_t *node = instance;
1470 fr_redis_conn_t *conn = NULL;
1471 redisContext *handle;
1472 redisReply *reply = NULL;
1473 char const *log_prefix = node->cluster->log_prefix;
1474
1475 DEBUG2("%s - [%i] Connecting to node %s:%i", log_prefix, node->id, node->name, node->addr.inet.dst_port);
1476
1477 handle = redisConnectWithTimeout(node->name, node->addr.inet.dst_port, fr_time_delta_to_timeval(timeout));
1478 if ((handle != NULL) && handle->err) {
1479 ERROR("%s - [%i] Connection failed: %s", log_prefix, node->id, handle->errstr);
1480 redisFree(handle);
1481 return NULL;
1482 } else if (!handle) {
1483 ERROR("%s - [%i] Connection failed", log_prefix, node->id);
1484 return NULL;
1485 }
1486
1487 conn = talloc_zero(ctx, fr_redis_conn_t);
1488 conn->handle = handle;
1489 conn->node = node;
1490 talloc_set_destructor(conn, _cluster_conn_free);
1491
1492#ifdef HAVE_REDIS_SSL
1493 if (node->cluster->ssl_ctx != NULL) {
1494 fr_tls_session_t *tls_session = fr_tls_session_alloc_client(conn, node->cluster->ssl_ctx);
1495 if (!tls_session) {
1496 fr_tls_strerror_printf("%s - [%i]", log_prefix, node->id);
1497 ERROR("%s - [%i] Failed to allocate TLS session", log_prefix, node->id);
1498 talloc_free(conn);
1499 return NULL;
1500 }
1501
1502 // redisInitiateSSL() takes ownership of SSL object on success
1503 SSL_up_ref(tls_session->ssl);
1504 if (redisInitiateSSL(handle, tls_session->ssl) != REDIS_OK) {
1505 ERROR("%s - [%i] Failed to initiate SSL: %s", log_prefix, node->id, handle->errstr);
1506 SSL_free(tls_session->ssl);
1507 talloc_free(conn);
1508 return NULL;
1509 }
1510 }
1511#endif
1512
1513 if (node->cluster->conf->password) {
1514 if (node->cluster->conf->username) {
1515 DEBUG3("%s - [%i] Executing: AUTH %s %s", log_prefix, node->id,
1516 node->cluster->conf->username,
1517 node->cluster->conf->password);
1518 reply = redisCommand(handle, "AUTH %s %s",
1519 node->cluster->conf->username,
1520 node->cluster->conf->password);
1521 } else {
1522 DEBUG3("%s - [%i] Executing: AUTH %s", log_prefix, node->id, node->cluster->conf->password);
1523 reply = redisCommand(handle, "AUTH %s", node->cluster->conf->password);
1524 }
1525 if (!reply) {
1526 ERROR("%s - [%i] Failed authenticating: %s", log_prefix, node->id, handle->errstr);
1527 error:
1528 if (reply) fr_redis_reply_free(&reply);
1529 talloc_free(conn);
1530 return NULL;
1531 }
1532
1533 switch (reply->type) {
1534 case REDIS_REPLY_STATUS:
1535 if (strcmp(reply->str, "OK") != 0) {
1536 ERROR("%s - [%i] Failed authenticating: %s", log_prefix,
1537 node->id, reply->str);
1538 goto error;
1539 }
1540 fr_redis_reply_free(&reply);
1541 break; /* else it's OK */
1542
1543 case REDIS_REPLY_ERROR:
1544 ERROR("%s - [%i] Failed authenticating: %s", log_prefix, node->id, reply->str);
1545 goto error;
1546
1547 default:
1548 ERROR("%s - [%i] Unexpected reply of type %s to AUTH", log_prefix, node->id,
1549 fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
1550 goto error;
1551 }
1552 }
1553
1554 if (node->cluster->conf->database) {
1555 DEBUG3("%s - [%i] Executing: SELECT %i", log_prefix, node->id, node->cluster->conf->database);
1556 reply = redisCommand(handle, "SELECT %i", node->cluster->conf->database);
1557 if (!reply) {
1558 ERROR("%s - [%i] Failed selecting database %i: %s", log_prefix, node->id,
1559 node->cluster->conf->database, handle->errstr);
1560 goto error;
1561 }
1562
1563 switch (reply->type) {
1564 case REDIS_REPLY_STATUS:
1565 if (strcmp(reply->str, "OK") != 0) {
1566 ERROR("%s - [%i] Failed selecting database %i: %s", log_prefix, node->id,
1567 node->cluster->conf->database, reply->str);
1568 goto error;
1569 }
1570 fr_redis_reply_free(&reply);
1571 break; /* else it's OK */
1572
1573 case REDIS_REPLY_ERROR:
1574 ERROR("%s - [%i] Failed selecting database %i: %s", log_prefix, node->id,
1575 node->cluster->conf->database, reply->str);
1576 goto error;
1577
1578 default:
1579 ERROR("%s - [%i] Unexpected reply of type %s, to SELECT", log_prefix, node->id,
1580 fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
1581 goto error;
1582 }
1583 }
1584
1585 return conn;
1586}
1587
1588/** Implements the key slot selection scheme used by freeradius
1589 *
1590 * Like the scheme in the clustering specification but with some differences
1591 * if the key is NULL or zero length, then a random keyslot is chosen.
1592 *
1593 * If there's only a single node in the cluster, then we avoid the CRC16
1594 * and just use key slot 0.
1595 *
1596 * @param cluster to determine key slot for.
1597 * @param request The current request.
1598 * @param key the key to resolve.
1599 * @param key_len the length of the key.
1600 * @return pointer to key slot key resolves to.
1601 */
1603 uint8_t const *key, size_t key_len)
1604{
1606
1607 if (!key || (key_len == 0)) {
1608 key_slot = &cluster->key_slot[(uint16_t)(fr_rand() & (KEY_SLOTS - 1))];
1609 ROPTIONAL(RDEBUG2, DEBUG2, "Key rand() -> slot %zu", key_slot - cluster->key_slot);
1610
1611 return key_slot;
1612 }
1613
1614 /*
1615 * Avoid CRC16 if we're operating with one cluster node or
1616 * without clustering.
1617 */
1618 if (fr_rb_num_elements(cluster->used_nodes) > 1) {
1619 key_slot = &cluster->key_slot[cluster_key_hash(key, key_len)];
1620 ROPTIONAL(RDEBUG2, DEBUG2, "Key \"%pV\" -> slot %zu",
1621 fr_box_strvalue_len((char const *)key, key_len), key_slot - cluster->key_slot);
1622
1623 return key_slot;
1624 }
1625 ROPTIONAL(RDEBUG3, DEBUG3, "Single node available, skipping key selection");
1626
1627 return &cluster->key_slot[0];
1628}
1629
1630/** Return the master node that would be used for a particular key
1631 *
1632 * @param[in] cluster To resolve key in.
1633 * @param[in] key_slot to resolve to node.
1634 * @return
1635 * - The current master node.
1636 * - NULL if no master node is currently assigned to a particular key slot.
1637 */
1639 fr_redis_cluster_key_slot_t const *key_slot)
1640{
1641 return &cluster->node[key_slot->master];
1642}
1643
1644/** Return the slave node that would be used for a particular key
1645 *
1646 * @param[in] cluster To resolve key in.
1647 * @param[in] key_slot To resolve to node.
1648 * @param[in] slave_num 0..n.
1649 * @return
1650 * - A slave node.
1651 * - NULL if no slave node is assigned, or is at the specific key slot.
1652 *
1653 */
1655 fr_redis_cluster_key_slot_t const *key_slot,
1656 uint8_t slave_num)
1657{
1658 if (slave_num >= key_slot->slave_num) return NULL; /* No slave available */
1659
1660 return &cluster->node[key_slot->slave[slave_num]];
1661}
1662
1663/** Return the ipaddr of a particular node
1664 *
1665 * @param[out] out Ipaddr of the node.
1666 * @param[in] node to get ip address from.
1667 * @return
1668 * - 0 on success.
1669 * - -1 on failure (node is NULL).
1670 */
1672{
1673 if (!node) return -1;
1674
1675 memcpy(out, &node->addr.inet.dst_ipaddr, sizeof(*out));
1676
1677 return 0;
1678}
1679
1680/** Return the port of a particular node
1681 *
1682 * @param[out] out Port of the node.
1683 * @param[in] node to get ip address from.
1684 * @return
1685 * - 0 on success.
1686 * - -1 on failure (node is NULL).
1687 */
1689{
1690 if (!node) return -1;
1691
1692 *out = node->addr.inet.dst_port;
1693
1694 return 0;
1695}
1696
1697/** Resolve a key to a pool, and reserve a connection in that pool
1698 *
1699 * This should be used with #fr_redis_cluster_state_next, and #fr_redis_command_status, to
1700 * transparently locate the cluster node we need to perform the operation on.
1701 *
1702 * Example code below shows how this function is used in conjunction
1703 * with #fr_redis_cluster_state_next to follow redirects, and reconnect handles.
1704 *
1705 @code{.c}
1706 int s_ret;
1707 redis_conn_state state;
1708 fr_redis_conn_t *conn;
1709 redisReply *reply;
1710 fr_redis_rcode_t status;
1711
1712 for (s_ret = fr_redis_cluster_state_init(&state, &conn, cluster, key, key_len, false);
1713 s_ret == REDIS_RCODE_TRY_AGAIN,
1714 s_ret = fr_redis_cluster_state_next(&state, &conn, cluster, request, status, &reply)) {
1715 reply = redisCommand(conn->handle, "SET foo bar");
1716 status = fr_redis_command_status(conn, reply);
1717 }
1718 // Reply is freed if ret == REDIS_RCODE_TRY_AGAIN, but left in all other cases to allow error
1719 // processing, or extraction of results.
1720 fr_redis_reply_free(&reply);
1721 if (s_ret != REDIS_RCODE_SUCCESS) {
1722 // Error
1723 }
1724 // Success
1725 @endcode
1726 *
1727 * @param[out] state to track current pool and various counters, will be initialised.
1728 * @param[out] conn Where to write the reserved connection to.
1729 * @param[in] cluster of pools.
1730 * @param[in] request The current request.
1731 * @param[in] key to resolve to a cluster node/pool. If no key is NULL or key_len is 0 a random
1732 * slot will be chosen.
1733 * @param[in] key_len Length of the key.
1734 * @param[in] read_only If true, will use random slave pool in preference to the master, falling
1735 * back to the master if no slaves are available.
1736 * @return
1737 * - REDIS_RCODE_TRY_AGAIN - try your command with this connection (provided via command).
1738 * - REDIS_RCODE_RECONNECT - when no additional connections available.
1739 */
1741 fr_redis_cluster_t *cluster, request_t *request,
1742 uint8_t const *key, size_t key_len, bool read_only)
1743{
1745 fr_redis_cluster_key_slot_t const *key_slot;
1746 uint8_t first, i;
1747 uint64_t used_nodes;
1748
1749 fr_assert(cluster);
1750 fr_assert(state);
1751 fr_assert(conn);
1752
1753 memset(state, 0, sizeof(*state));
1754 *conn = NULL; /* Better safe than exploding */
1755
1756 used_nodes = fr_rb_num_elements(cluster->used_nodes);
1757 if (used_nodes == 0) {
1758 ROPTIONAL(REDEBUG, ERROR, "No nodes in cluster");
1759 return REDIS_RCODE_RECONNECT;
1760 }
1761
1762again:
1763 key_slot = fr_redis_cluster_slot_by_key(cluster, request, key, key_len);
1764
1765 /*
1766 * 1. Try each of the slaves for the key slot
1767 * 2. Fall through to trying the master, and a single alternate node.
1768 */
1769 if (read_only) {
1770 first = fr_rand() & key_slot->slave_num;
1771 for (i = 0; i < key_slot->slave_num; i++) {
1772 uint8_t node_id;
1773
1774 node_id = key_slot->slave[(first + i) % key_slot->slave_num];
1775 node = &cluster->node[node_id];
1776 *conn = fr_pool_connection_get(node->pool, request);
1777 if (!*conn) {
1778 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] No connections available (key slot %zu slave %i)",
1779 node->id, key_slot - cluster->key_slot, (first + i) % key_slot->slave_num);
1780 cluster->remap_needed = true;
1781 continue; /* Continue until we find a live pool */
1782 }
1783
1784 goto finish;
1785 }
1786 /* Fall through to using key slot master or alternate */
1787 }
1788
1789 /*
1790 * 1. Try the master for the key slot
1791 * 2. If unavailable search for any pools with handles available
1792 * 3. If there are no pools, or we can't reserve a handle,
1793 * give up.
1794 */
1795 node = &cluster->node[key_slot->master];
1796 *conn = fr_pool_connection_get(node->pool, request);
1797 if (!*conn) {
1798 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] No connections available (key slot %zu master)",
1799 node->id, key_slot - cluster->key_slot);
1800 cluster->remap_needed = true;
1801
1802 if (cluster_node_find_live(&node, conn, request, cluster, node) < 0) return REDIS_RCODE_RECONNECT;
1803 }
1804
1805finish:
1806 /*
1807 * Something set the remap_needed flag, and we have a live connection
1808 */
1809 if (cluster->remap_needed) {
1810 if (fr_redis_cluster_remap(request, cluster, *conn) == FR_REDIS_CLUSTER_RCODE_SUCCESS) {
1811 fr_pool_connection_release(node->pool, request, *conn);
1812 goto again; /* New map, try again */
1813 }
1814 ROPTIONAL(RPDEBUG2, PDEBUG2, "%s", "");
1815 }
1816
1817 state->node = node;
1818 state->key = key;
1819 state->key_len = key_len;
1820
1821 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] >>> Sending command(s) to %s:%i",
1822 state->node->id, state->node->name, state->node->addr.inet.dst_port);
1823
1824 return REDIS_RCODE_TRY_AGAIN;
1825}
1826
1827/** Get the next connection to attempt a command against
1828 *
1829 * Will process reconnect and redirect states performing the actions necessary.
1830 *
1831 * - May trigger a cluster remap on receiving a #REDIS_RCODE_MOVE status.
1832 * - May perform a temporary redirect on receiving a #REDIS_RCODE_ASK status.
1833 * - May reserve a new connection on receiving a #REDIS_RCODE_RECONNECT status.
1834 *
1835 * If a remap is in progress, has occurred within the last second, has recently failed,
1836 * or fails, the '-MOVE' will be treated as a temporary redirect (-ASK).
1837 *
1838 * This allows the server to be more responsive during remaps, as unless the worker has been
1839 * redirected to a node we don't currently have a pool for, it can grab a connection for the
1840 * node it was redirected to, and continue.
1841 *
1842 * @note Irrespective of return code, the connection passed via conn will be released,
1843 * A new connection to attempt command on will be provided via conn.
1844 *
1845 * @note reply will be automatically freed and set to NULL if a new connection is provided
1846 * in all other cases, the caller is responsible for freeing the reply.
1847 *
1848 * @param[in,out] state containing the current pool, and various counters which control
1849 * retries, and limit redirects.
1850 * @param[in,out] conn we received the '-ASK' or '-MOVE' redirect on. Will be replaced with a
1851 * connection in the new pool the key points to.
1852 * @param[in] request The current request.
1853 * @param[in] cluster of pools.
1854 * @param[in] status of the last command, must be #REDIS_RCODE_MOVE or #REDIS_RCODE_ASK.
1855 * @param[in] reply from last command. Freed if 0 is returned, else caller must free.
1856 * @return
1857 * - REDIS_RCODE_SUCCESS - on success.
1858 * - REDIS_RCODE_TRY_AGAIN - try new connection (provided via conn). Will free reply.
1859 * - REDIS_RCODE_ERROR - on failure or command error.
1860 * - REDIS_RCODE_RECONNECT - when no additional connections available.
1861 */
1863 fr_redis_cluster_t *cluster, request_t *request,
1864 fr_redis_rcode_t status, redisReply **reply)
1865{
1866 fr_assert(state && state->node && state->node->pool);
1867 fr_assert(conn && *conn);
1868
1869 if (*reply) fr_redis_reply_print(L_DBG_LVL_3, *reply, request, 0, status);
1870
1871 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] <<< Returned: %s",
1872 state->node->id, fr_table_str_by_value(redis_rcodes, status, "<UNKNOWN>"));
1873
1874 /*
1875 * Caller indicated we should close the connection
1876 */
1877 if (state->close_conn) {
1878 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Connection no longer viable, closing it", state->node->id);
1879 fr_pool_connection_close(state->node->pool, request, *conn);
1880 *conn = NULL;
1881 state->close_conn = false;
1882 }
1883
1884 /*
1885 * If we have a proven live connection, and something
1886 * has set the remap_needed flag, do that now before
1887 * releasing the connection.
1888 */
1889 if (cluster->remap_needed && *conn) switch(status) {
1890 case REDIS_RCODE_MOVE: /* We're going to remap anyway */
1891 case REDIS_RCODE_RECONNECT: /* The connection's dead */
1892 break;
1893
1894 default:
1895 /*
1896 * Remap the cluster. On success, will clear the
1897 * remap_needed flag.
1898 */
1899 if (fr_redis_cluster_remap(request, cluster, *conn) != FR_REDIS_CLUSTER_RCODE_SUCCESS)
1900 ROPTIONAL(RPDEBUG2, PDEBUG2, "%s", "");
1901 }
1902
1903 /*
1904 * Check the result of the last redis command, and do
1905 * something appropriate.
1906 */
1907 switch (status) {
1909 fr_pool_connection_release(state->node->pool, request, *conn);
1910 *conn = NULL;
1911 return REDIS_RCODE_SUCCESS;
1912
1913 /*
1914 * Command error, not fixable.
1915 */
1917 case REDIS_RCODE_ERROR:
1918 ROPTIONAL(RPEDEBUG, PERROR, "[%i] Command failed", state->node->id);
1919 fr_pool_connection_release(state->node->pool, request, *conn);
1920 *conn = NULL;
1921 return REDIS_RCODE_ERROR;
1922
1923 /*
1924 * Cluster's unstable, try again.
1925 */
1927 if (state->retries++ >= cluster->conf->max_retries) {
1928 ROPTIONAL(REDEBUG, ERROR, "[%i] Hit maximum retry attempts", state->node->id);
1929 fr_pool_connection_release(state->node->pool, request, *conn);
1930 *conn = NULL;
1931 return REDIS_RCODE_ERROR;
1932 }
1933
1934 if (!*conn) *conn = fr_pool_connection_get(state->node->pool, request);
1935
1936 if (fr_time_delta_ispos(cluster->conf->retry_delay)) nanosleep(&fr_time_delta_to_timespec(cluster->conf->retry_delay), NULL);
1937 goto try_again;
1938
1939 /*
1940 * Connection's dead, check to see if we can switch nodes,
1941 * or, failing that, reconnect the connection.
1942 */
1944 {
1945 fr_redis_cluster_key_slot_t const *key_slot;
1946
1947 ROPTIONAL(RPERROR, PERROR, "[%i] Failed communicating with %s:%i",
1948 state->node->id, state->node->name,
1949 state->node->addr.inet.dst_port);
1950
1951 fr_pool_connection_close(state->node->pool, request, *conn); /* He's dead jim */
1952
1953 if (state->reconnects++ > state->in_pool) {
1954 ROPTIONAL(REDEBUG, ERROR, "[%i] Hit maximum reconnect attempts", state->node->id);
1955 cluster->remap_needed = true;
1956 return REDIS_RCODE_RECONNECT;
1957 }
1958
1959 /*
1960 * Refresh the key slot
1961 */
1962 key_slot = fr_redis_cluster_slot_by_key(cluster, request, state->key, state->key_len);
1963 state->node = &cluster->node[key_slot->master];
1964
1965 *conn = fr_pool_connection_get(state->node->pool, request);
1966 if (!*conn) {
1967 ROPTIONAL(REDEBUG, ERROR, "[%i] No connections available for %s:%i",
1968 state->node->id, state->node->name, state->node->addr.inet.dst_port);
1969 cluster->remap_needed = true;
1970
1971 if (cluster_node_find_live(&state->node, conn, request,
1972 cluster, state->node) < 0) return REDIS_RCODE_RECONNECT;
1973
1974 goto try_again;
1975 }
1976
1977 state->retries = 0;
1978 }
1979 goto try_again;
1980
1981 /*
1982 * -MOVE is treated identically to -ASK, except it may
1983 * trigger a cluster remap.
1984 */
1985 case REDIS_RCODE_MOVE:
1986 fr_assert(*reply);
1987
1988 if (*conn && (fr_redis_cluster_remap(request, cluster, *conn) != FR_REDIS_CLUSTER_RCODE_SUCCESS)) {
1989 ROPTIONAL(RPDEBUG2, PDEBUG2, "%s", "");
1990 }
1992
1993 /*
1994 * -ASK process a redirect.
1995 */
1996 case REDIS_RCODE_ASK:
1997 {
1999
2000 fr_pool_connection_release(state->node->pool, request, *conn); /* Always release the old connection */
2001
2002 if (!fr_cond_assert(*reply)) return REDIS_RCODE_ERROR;
2003
2004 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Processing redirect \"%s\"", state->node->id, (*reply)->str);
2005 if (state->redirects++ >= cluster->conf->max_redirects) {
2006 ROPTIONAL(REDEBUG, ERROR, "[%i] Reached max_redirects (%i)", state->node->id, state->redirects);
2007 return REDIS_RCODE_ERROR;
2008 }
2009
2010 switch (cluster_redirect(&new, cluster, *reply)) {
2012 if (new == state->node) {
2013 ROPTIONAL(REDEBUG, ERROR, "[%i] %s:%i issued redirect to itself", state->node->id,
2014 state->node->name, state->node->addr.inet.dst_port);
2015 return REDIS_RCODE_ERROR;
2016 }
2017
2018 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Redirected from %s:%i to [%i] %s:%i",
2019 state->node->id, state->node->name,
2020 state->node->addr.inet.dst_port, new->id, new->name, new->addr.inet.dst_port);
2021 state->node = new;
2022
2023 *conn = fr_pool_connection_get(state->node->pool, request);
2024 if (!*conn) return REDIS_RCODE_RECONNECT;
2025
2026 /*
2027 * Reset these counters, their scope is
2028 * a single node in the cluster.
2029 */
2030 state->reconnects = 0;
2031 state->retries = 0;
2032 state->in_pool = fr_pool_state(state->node->pool)->num;
2033 goto try_again;
2034
2036 cluster->remap_needed = true;
2037 return REDIS_RCODE_RECONNECT;
2038
2039 default:
2040 return REDIS_RCODE_ERROR;
2041 }
2042 }
2043 }
2044
2045try_again:
2046 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] >>> Sending command(s) to %s:%i",
2047 state->node->id, state->node->name, state->node->addr.inet.dst_port);
2048
2049 fr_redis_reply_free(&*reply);
2050 *reply = NULL;
2051
2052 return REDIS_RCODE_TRY_AGAIN;
2053}
2054
2055/** Get the pool associated with a node in the cluster
2056 *
2057 * @note This is used for testing only. It's not ifdef'd out because
2058 * tests need to run against production builds too.
2059 *
2060 * @param[out] pool associated with the node.
2061 * @param[in] cluster to search for node in.
2062 * @param[in] node_addr to retrieve pool for. Specifies IP and port of node.
2063 * @param[in] create Establish a connection to the specified node if it
2064 * was previously unknown to the cluster client.
2065 * @return
2066 * - 0 on success.
2067 * - -1 if no such node exists.
2068 */
2070 fr_socket_t *node_addr, bool create)
2071{
2072 fr_redis_cluster_node_t find, *found;
2073
2074 find.addr = (fr_socket_t) {
2075 .inet = {
2076 .dst_ipaddr = node_addr->inet.dst_ipaddr,
2077 .dst_port = node_addr->inet.dst_port,
2078 }
2079 };
2080
2081 pthread_mutex_lock(&cluster->mutex);
2082 found = fr_rb_find(cluster->used_nodes, &find);
2083 if (!found) {
2085 char buffer[INET6_ADDRSTRLEN];
2086 char const *hostname;
2087
2088 if (!create) {
2089 pthread_mutex_unlock(&cluster->mutex);
2090
2091 hostname = inet_ntop(node_addr->inet.dst_ipaddr.af, &node_addr->inet.dst_ipaddr.addr, buffer, sizeof(buffer));
2092 fr_assert(hostname); /* addr.ipaddr is probably corrupt */
2093 fr_strerror_printf("No existing node found with address %s, port %i",
2094 hostname, node_addr->inet.dst_port);
2095 return -1;
2096 }
2097
2098 spare = fr_fifo_peek(cluster->free_nodes);
2099 if (!spare) {
2100 fr_strerror_const("Reached maximum connected nodes");
2101 pthread_mutex_unlock(&cluster->mutex);
2102 return -1;
2103 }
2104 spare->pending_addr = find.addr; /* Set the config to be applied */
2105 if (cluster_node_connect(cluster, spare) < 0) {
2106 pthread_mutex_unlock(&cluster->mutex);
2107 return -1;
2108 }
2109 fr_rb_insert(cluster->used_nodes, spare);
2110 fr_fifo_pop(cluster->free_nodes);
2111 found = spare;
2112 }
2113 /*
2114 * Sanity checks
2115 */
2116 fr_assert(((talloc_array_length(cluster->node) - 1) - fr_rb_num_elements(cluster->used_nodes)) ==
2118 pthread_mutex_unlock(&cluster->mutex);
2119
2120 *pool = found->pool;
2121
2122 return 0;
2123}
2124
2125/** Return an array of IP addresses belonging to masters or slaves
2126 *
2127 * @note We return IP addresses as they're safe to use across cluster remaps.
2128 * @note Result array must be freed (talloc_free()) after use.
2129 *
2130 * @param[in] ctx to allocate array of IP addresses in.
2131 * @param[out] out Where to write the addresses of the nodes.
2132 * @param[in] cluster to search for nodes in.
2133 * @param[in] is_master If true, include the addresses of all the master nodes.
2134 * @param[in] is_slave If true, include the addresses of all the slaves nodes.
2135 * @return the number of ip addresses written to out.
2136 */
2138 fr_redis_cluster_t *cluster, bool is_master, bool is_slave)
2139{
2140 uint64_t in_use = fr_rb_num_elements(cluster->used_nodes);
2143 uint8_t count;
2144 fr_socket_t *found;
2145
2146 if (in_use == 0) {
2147 *out = NULL;
2148 return 0;
2149 }
2150
2151 count = 0;
2152 found = talloc_zero_array(ctx, fr_socket_t, in_use);
2153 if (!found) {
2154 fr_strerror_const("Out of memory");
2155 return -1;
2156 }
2157
2158 pthread_mutex_lock(&cluster->mutex);
2159
2160 for (node = fr_rb_iter_init_inorder(cluster->used_nodes, &iter);
2161 node;
2162 node = fr_rb_iter_next_inorder(cluster->used_nodes, &iter)) {
2163 if ((is_master && node->is_master) || (is_slave && !node->is_master)) found[count++] = node->addr;
2164 }
2165
2166 pthread_mutex_unlock(&cluster->mutex);
2167
2168 if (count == 0) {
2169 *out = NULL;
2170 talloc_free(found);
2171 return 0;
2172 }
2173
2174 *out = found;
2175
2176 return count;
2177}
2178
2179/** Destroy mutex associated with cluster slots structure
2180 *
2181 * @param cluster being freed.
2182 * @return 0
2183 */
2185{
2186 pthread_mutex_destroy(&cluster->mutex);
2187
2188 return 0;
2189}
2190
2191/** Check if members of the cluster are above a certain version
2192 *
2193 * @param cluster to perform check on.
2194 * @param min_version that must be found on each node for the check to succeed.
2195 * Must be in the format @verbatim <major>.<minor>.<release> @endverbatim.
2196 * @return
2197 * - true if all contactable members are above min_version.
2198 * - false if at least one member if not above minimum version
2199 * (use #fr_strerror to retrieve node information).
2200 */
2201bool fr_redis_cluster_min_version(fr_redis_cluster_t *cluster, char const *min_version)
2202{
2205 fr_redis_conn_t *conn;
2206 int ret;
2207 char buffer[40];
2208 bool all_above = true;
2209
2210 pthread_mutex_lock(&cluster->mutex);
2211
2212 for (node = fr_rb_iter_init_inorder(cluster->used_nodes, &iter);
2213 node;
2214 node = fr_rb_iter_next_inorder(cluster->used_nodes, &iter)) {
2215 conn = fr_pool_connection_get(node->pool, NULL);
2216 if (!conn) continue;
2217
2218 /*
2219 * We don't care if we can't get the version
2220 * as we don't want to prevent the server from
2221 * starting if start == 0.
2222 */
2223 ret = fr_redis_get_version(buffer, sizeof(buffer), conn);
2224 fr_pool_connection_release(node->pool, NULL, conn);
2225 if (ret < 0) continue;
2226
2227 if (fr_redis_version_num(buffer) < fr_redis_version_num(min_version)) {
2228 fr_strerror_printf("Redis node %s:%i (currently v%s) needs update to >= v%s",
2229 node->name, node->addr.inet.dst_port, buffer, min_version);
2230 all_above = false;
2231 break;
2232 }
2233 }
2234
2235 pthread_mutex_unlock(&cluster->mutex);
2236
2237 return all_above;
2238}
2239
2240/** Allocate and initialise a new cluster structure
2241 *
2242 * This holds all the data necessary to manage a pool of pools for a specific redis cluster.
2243 *
2244 * @note Will not error out unless cs.pool.start > 0. This is consistent with other pool based
2245 * modules/code.
2246 *
2247 * @param ctx to link the lifetime of the cluster structure to.
2248 * @param module Configuration section to search for 'server' conf pairs in.
2249 * @param conf Base redis server configuration. Cluster nodes share database
2250 * number and password.
2251 * @param log_prefix Custom log prefix. Defaults to @verbatim rlm_<module> (<instance>) @endverbatim.
2252 * @param trigger_prefix Custom trigger prefix. Defaults to @verbatim modules.<module>.pool @endverbatim.
2253 * @param trigger_args Argument pairs to pass to the trigger in addition to Connection-Pool-Server,
2254 * and Connection-Pool-Port (which are always set by the cluster code).
2255 * @return
2256 * - New #fr_redis_cluster_t on success.
2257 * - NULL on error.
2258 */
2260 CONF_SECTION *module,
2262 char const *log_prefix,
2263 char const *trigger_prefix,
2264 fr_pair_list_t *trigger_args)
2265{
2266 uint8_t i;
2267 uint16_t s;
2268
2269 char const *cs_name1, *cs_name2;
2270
2271 CONF_PAIR *cp;
2272 int af = AF_UNSPEC; /* AF of first server */
2273
2274 uint64_t num_nodes;
2275 fr_redis_cluster_t *cluster;
2276
2277 MEM(cluster = talloc_zero(NULL, fr_redis_cluster_t));
2279
2280 cs_name1 = cf_section_name1(module);
2281 cs_name2 = cf_section_name2(module);
2282
2283 cluster->triggers_enabled = conf->triggers;
2284 if (cluster->triggers_enabled) {
2285 /*
2286 * Setup trigger prefix
2287 */
2288 if (!trigger_prefix) {
2289 cluster->trigger_prefix = talloc_typed_asprintf(cluster, "modules.%s.pool", cs_name1);
2290 } else {
2291 cluster->trigger_prefix = talloc_strdup(cluster, trigger_prefix);
2292 }
2293
2294 /*
2295 * Duplicate the trigger arguments.
2296 */
2297 if (trigger_args) MEM(fr_pair_list_copy(cluster, &cluster->trigger_args, trigger_args) >= 0);
2298 }
2299
2300 /*
2301 * Setup log prefix
2302 */
2303 if (!log_prefix) {
2304 if (!cs_name2) cs_name2 = cs_name1;
2305 cluster->log_prefix = talloc_typed_asprintf(conf, "rlm_%s (%s)", cs_name1, cs_name2);
2306 } else {
2307 cluster->log_prefix = talloc_strdup(cluster, log_prefix);
2308 }
2309
2310 /*
2311 * Ensure we always have a pool section (even if it's empty)
2312 */
2313 if (!cf_section_find(module, "pool", NULL)) {
2314 (void) cf_section_alloc(module, module, "pool", NULL);
2315 }
2316
2317 /*
2318 * Parse TLS configuration
2319 */
2320 if (conf->use_tls) {
2321#ifdef HAVE_REDIS_SSL
2322 CONF_SECTION *tls_cs;
2323 fr_tls_conf_t *tls_conf;
2324
2325 tls_cs = cf_section_find(module, "tls", NULL);
2326 if (!tls_cs) {
2327 tls_cs = cf_section_alloc(module, module, "tls", NULL);
2328 }
2329
2330 tls_conf = fr_tls_conf_parse_client(tls_cs);
2331 if (!tls_conf) {
2332 ERROR("%s - Failed to parse TLS configuration", cluster->log_prefix);
2333 talloc_free(cluster);
2334 return NULL;
2335 }
2336
2337 cluster->ssl_ctx = fr_tls_ctx_alloc(tls_conf, true);
2338 if (!cluster->ssl_ctx) {
2339 ERROR("%s - Failed to allocate SSL context", cluster->log_prefix);
2340 talloc_free(cluster);
2341 return NULL;
2342 }
2343#else
2344 WARN("%s - No redis SSL support, ignoring \"use_tls = yes\"", cluster->log_prefix);
2345#endif
2346 }
2347
2348 if (conf->max_nodes == UINT8_MAX) {
2349 ERROR("%s - Maximum number of connected nodes allowed is %i", cluster->log_prefix, UINT8_MAX - 1);
2350 talloc_free(cluster);
2351 return NULL;
2352 }
2353
2354 if (conf->max_nodes == 0) {
2355 ERROR("%s - Minimum number of nodes allowed is 1", cluster->log_prefix);
2356 talloc_free(cluster);
2357 return NULL;
2358 }
2359
2360 cp = cf_pair_find(module, "server");
2361 if (!cp) {
2362 ERROR("%s - No servers configured", cluster->log_prefix);
2363 talloc_free(cluster);
2364 return NULL;
2365 }
2366
2367 cluster->module = module;
2368
2369 /*
2370 * Ensure the pool is freed at the same time as its
2371 * parent.
2372 *
2373 * We need to break the link between the cluster and
2374 * its parent context, as the two contexts may be
2375 * modified by multiple threads.
2376 */
2377 MEM(talloc_link_ctx(ctx, cluster) >= 0);
2378 MEM(cluster->node = talloc_zero_array(cluster, fr_redis_cluster_node_t, conf->max_nodes + 1));
2379 MEM(cluster->used_nodes = fr_rb_inline_alloc(cluster, fr_redis_cluster_node_t, rbnode, _cluster_node_cmp, NULL));
2380 MEM(cluster->free_nodes = fr_fifo_create(cluster, conf->max_nodes, NULL));
2381
2382 cluster->conf = conf;
2383
2384 pthread_mutex_init(&cluster->mutex, NULL);
2385 talloc_set_destructor(cluster, _fr_redis_cluster_free);
2386
2387 /*
2388 * Node id 0 is reserved, so we can detect misconfigured
2389 * clusters.
2390 */
2391 for (i = 1; i < (cluster->conf->max_nodes + 1); i++) {
2392 cluster->node[i].id = i;
2393 cluster->node[i].cluster = cluster;
2394
2395 /* Push them all into the queue */
2396 fr_fifo_push(cluster->free_nodes, &cluster->node[i]);
2397 }
2398
2399 /*
2400 * Don't connect to cluster nodes if we're just
2401 * checking the config.
2402 */
2403 if (check_config) return cluster;
2404
2405 /*
2406 * Populate the cluster with the bootstrap servers.
2407 *
2408 * If we fail getting a key_slot map here, then the
2409 * bootstrap servers are distributed evenly through
2410 * the key slots.
2411 *
2412 * This allows the server to start, and potentially,
2413 * a valid map to be applied, once the server starts
2414 * processing requests.
2415 */
2416 do {
2417 char const *server;
2419 fr_redis_conn_t *conn;
2420 redisReply *map;
2421 size_t j, k;
2422
2423 node = fr_fifo_peek(cluster->free_nodes);
2424 if (!node) {
2425 ERROR("%s - Number of bootstrap servers exceeds 'max_nodes'", cluster->log_prefix);
2426
2427 error:
2428 talloc_free(cluster);
2429 return NULL;
2430 }
2431
2432 server = cf_pair_value(cp);
2433 if (fr_inet_pton_port(&node->pending_addr.inet.dst_ipaddr, &node->pending_addr.inet.dst_port, server,
2434 talloc_strlen(server), af, true, true) < 0) {
2435 PERROR("%s - Failed parsing server \"%s\"", cluster->log_prefix, server);
2436 goto error;
2437 }
2438 if (!node->pending_addr.inet.dst_port) node->pending_addr.inet.dst_port = conf->port;
2439
2440 if (cluster_node_connect(cluster, node) < 0) {
2441 WARN("%s - Connecting to %s:%i failed", cluster->log_prefix, node->name, node->pending_addr.inet.dst_port);
2442 continue;
2443 }
2444
2445 if (!fr_rb_insert(cluster->used_nodes, node)) {
2446 WARN("%s - Skipping duplicate bootstrap server \"%s\"", cluster->log_prefix, server);
2447 continue;
2448 }
2449 node->is_active = true;
2450 fr_fifo_pop(cluster->free_nodes);
2451
2452 /*
2453 * Prefer the same IPaddr family as the first node
2454 */
2455 if (af == AF_UNSPEC) af = node->addr.inet.dst_ipaddr.af;
2456
2457 /*
2458 * Only get cluster map config if required
2459 */
2460 if (fr_pool_start_num(node->pool) > 0) {
2461 /*
2462 * Fine to leave this node configured, if we do find
2463 * a live node, and it's not in the map, it'll be cleared out.
2464 */
2465 conn = fr_pool_connection_get(node->pool, NULL);
2466 if (!conn) {
2467 WARN("%s - Can't contact bootstrap server \"%s\"", cluster->log_prefix, server);
2468 continue;
2469 }
2470 } else {
2471 break;
2472 }
2473
2474 if (!cluster->conf->use_cluster_map) {
2475 fr_pool_connection_release(node->pool, NULL, conn);
2476 continue;
2477 }
2478
2479 switch (cluster_map_get(&map, conn)) {
2480 /*
2481 * We got a valid map! See if we can apply it...
2482 */
2484 fr_pool_connection_release(node->pool, NULL, conn);
2485
2486 DEBUG("%s - Cluster map consists of %zu key ranges", cluster->log_prefix, map->elements);
2487 for (j = 0; j < map->elements; j++) {
2488 redisReply *map_node = map->element[j];
2489
2490 DEBUG("%s - %zu - keys %lli-%lli", cluster->log_prefix, j,
2491 map_node->element[0]->integer,
2492 map_node->element[1]->integer);
2493 DEBUG("%s - master: %s:%lli", cluster->log_prefix,
2494 map_node->element[2]->element[0]->str,
2495 map_node->element[2]->element[1]->integer);
2496 for (k = 3; k < map_node->elements; k++) {
2497 DEBUG("%s - slave%zu: %s:%lli", cluster->log_prefix, k - 3,
2498 map_node->element[k]->element[0]->str,
2499 map_node->element[k]->element[1]->integer);
2500 }
2501 }
2502
2503 if (cluster_map_apply(cluster, map) < 0) {
2504 PWARN("%s: Applying cluster map failed", cluster->log_prefix);
2505 fr_redis_reply_free(&map);
2506 continue;
2507 }
2508 fr_redis_reply_free(&map);
2509
2510 return cluster;
2511
2512 /*
2513 * Unusable bootstrap node
2514 */
2516 PWARN("%s - Bootstrap server \"%s\" returned invalid data", cluster->log_prefix, server);
2517 fr_pool_connection_release(node->pool, NULL, conn);
2518 continue;
2519
2521 PWARN("%s - Can't contact bootstrap server \"%s\"", cluster->log_prefix, server);
2522 fr_pool_connection_close(node->pool, NULL, conn);
2523 continue;
2524
2525 /*
2526 * Clustering not enabled, or not supported,
2527 * by this node, skip it and check the others.
2528 */
2531 PDEBUG2("%s - Bootstrap server \"%s\" returned", cluster->log_prefix, server);
2532 fr_pool_connection_release(node->pool, NULL, conn);
2533 break;
2534 }
2535 } while ((cp = cf_pair_find_next(module, cp, "server")));
2536
2537 /*
2538 * Catch pool.start != 0
2539 */
2540 num_nodes = fr_rb_num_elements(cluster->used_nodes);
2541 if (!num_nodes) {
2542 ERROR("%s - Can't contact any bootstrap servers", cluster->log_prefix);
2543 goto error;
2544 }
2545
2546 /*
2547 * We've failed to apply a valid cluster map.
2548 * Distribute the node(s) throughout the key_slots,
2549 * hopefully we'll get one when we start processing
2550 * requests.
2551 */
2552 for (s = 0; s < KEY_SLOTS; s++) cluster->key_slot[s].master = (s % (uint16_t) num_nodes) + 1;
2553
2554 return cluster;
2555}
static int const char char buffer[256]
Definition acutest.h:576
va_list args
Definition acutest.h:770
#define L(_str)
Helper for initialising arrays of string literals.
Definition build.h:210
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition build.h:325
#define STRINGIFY(x)
Definition build.h:198
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:113
#define NUM_ELEMENTS(_t)
Definition build.h:340
bool check_config
Definition cf_file.c:61
Configuration AVP similar to a fr_pair_t.
Definition cf_priv.h:72
A section grouping multiple CONF_PAIR.
Definition cf_priv.h:101
CONF_PAIR * cf_pair_find_next(CONF_SECTION const *cs, CONF_PAIR const *prev, char const *attr)
Find a pair with a name matching attr, after specified pair.
Definition cf_util.c:1433
char const * cf_section_name2(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
Definition cf_util.c:1184
char const * cf_section_name1(CONF_SECTION const *cs)
Return the first identifier of a CONF_SECTION.
Definition cf_util.c:1170
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
Definition cf_util.c:1027
CONF_PAIR * cf_pair_find(CONF_SECTION const *cs, char const *attr)
Search for a CONF_PAIR with a specific name.
Definition cf_util.c:1419
CONF_SECTION * cf_section_dup(TALLOC_CTX *ctx, CONF_SECTION *parent, CONF_SECTION const *cs, char const *name1, char const *name2, bool copy_meta)
Duplicate a configuration section.
Definition cf_util.c:927
char const * cf_pair_value(CONF_PAIR const *pair)
Return the value of a CONF_PAIR.
Definition cf_util.c:1574
#define cf_section_alloc(_ctx, _parent, _name1, _name2)
Definition cf_util.h:143
bool remapping
True when cluster is being remapped.
Definition cluster.c:256
static fr_redis_cluster_rcode_t cluster_map_get(redisReply **out, fr_redis_conn_t *conn)
Learn a new cluster layout by querying the node that issued the -MOVE.
Definition cluster.c:863
fr_redis_rcode_t fr_redis_cluster_state_next(fr_redis_cluster_state_t *state, fr_redis_conn_t **conn, fr_redis_cluster_t *cluster, request_t *request, fr_redis_rcode_t status, redisReply **reply)
Get the next connection to attempt a command against.
Definition cluster.c:1862
CONF_SECTION *fr_redis_conf_t * conf
< Module configuration.
Definition cluster.c:262
char const * log_prefix
What to prepend to log messages.
Definition cluster.c:251
fr_pool_t * pool
Pool associated with this node.
Definition cluster.c:225
char name[INET6_ADDRSTRLEN]
Buffer to hold IP string.
Definition cluster.c:215
fr_redis_cluster_t * fr_redis_cluster_alloc(TALLOC_CTX *ctx, CONF_SECTION *module, fr_redis_conf_t *conf, char const *log_prefix, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Allocate and initialise a new cluster structure.
Definition cluster.c:2259
#define SET_ACTIVE(_node)
bool is_active
Whether this node is in the active node set.
Definition cluster.c:228
fr_redis_cluster_node_t const * fr_redis_cluster_slave(fr_redis_cluster_t *cluster, fr_redis_cluster_key_slot_t const *key_slot, uint8_t slave_num)
Return the slave node that would be used for a particular key.
Definition cluster.c:1654
#define CLOSED_WEIGHT
What weight to give to nodes that had a connection closed recently.
Definition cluster.c:181
fr_redis_cluster_node_t const * fr_redis_cluster_master(fr_redis_cluster_t *cluster, fr_redis_cluster_key_slot_t const *key_slot)
Return the master node that would be used for a particular key.
Definition cluster.c:1638
#define MAX_SLAVES
Maximum number of slaves associated with a keyslot.
Definition cluster.c:172
static fr_redis_cluster_rcode_t cluster_node_connect(fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *node)
Establish a connection to a cluster node.
Definition cluster.c:371
static int cluster_map_node_validate(redisReply *node, int map_idx, int node_idx)
Validate a cluster map node entry.
Definition cluster.c:785
#define SET_ADDR(_addr, _map)
static uint16_t cluster_key_hash(uint8_t const *key, size_t key_len)
Resolve key to key slot.
Definition cluster.c:298
static fr_redis_cluster_rcode_t cluster_redirect(fr_redis_cluster_node_t **out, fr_redis_cluster_t *cluster, redisReply *reply)
Retrieve or associate a node with the server indicated in the redirect.
Definition cluster.c:1118
static fr_redis_cluster_rcode_t cluster_node_conf_from_redirect(uint16_t *key_slot, fr_socket_t *node_addr, redisReply *redirect)
Parse a -MOVED or -ASK redirect.
Definition cluster.c:446
static int cluster_node_find_live(fr_redis_cluster_node_t **live_node, fr_redis_conn_t **live_conn, request_t *request, fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *skip)
Attempt to find a live pool in the cluster.
Definition cluster.c:1310
#define CLOSED_PERIOD
How recently must the closed have occurred for us to care.
Definition cluster.c:178
struct cluster_nodes_live_t::@53 node[UINT8_MAX - 1]
Array of live node IDs (and weights).
fr_redis_cluster_key_slot_t key_slot_pending[KEY_SLOTS]
Pending key slot table.
Definition cluster.c:275
fr_rb_tree_t * used_nodes
Tree of used nodes.
Definition cluster.c:272
fr_redis_cluster_key_slot_t key_slot[KEY_SLOTS]
Lookup table of slots to pools.
Definition cluster.c:274
#define KEY_SLOTS
Maximum number of keyslots (should not change).
Definition cluster.c:170
fr_redis_cluster_t * cluster
Common configuration (database number, password, etc..).
Definition cluster.c:223
char const * trigger_prefix
Trigger path.
Definition cluster.c:252
fr_rb_node_t rbnode
Entry into the tree of redis nodes.
Definition cluster.c:213
bool remap_needed
Set true if at least one cluster node is definitely unreachable.
Definition cluster.c:257
fr_time_t last_updated
Last time the cluster mappings were updated.
Definition cluster.c:259
#define SET_INACTIVE(_node)
int fr_redis_cluster_pool_by_node_addr(fr_pool_t **pool, fr_redis_cluster_t *cluster, fr_socket_t *node_addr, bool create)
Get the pool associated with a node in the cluster.
Definition cluster.c:2069
fr_pair_list_t trigger_args
Arguments to pass to triggers.
Definition cluster.c:253
ssize_t fr_redis_cluster_node_addr_by_role(TALLOC_CTX *ctx, fr_socket_t *out[], fr_redis_cluster_t *cluster, bool is_master, bool is_slave)
Return an array of IP addresses belonging to masters or slaves.
Definition cluster.c:2137
fr_socket_t pending_addr
New node address to be applied when the pool is reconnected.
Definition cluster.c:220
uint8_t next
Next index in live.
Definition cluster.c:204
size_t fr_redis_cluster_rcodes_table_len
Definition cluster.c:287
static void _cluster_node_conf_apply(fr_pool_t *pool, void *opaque)
Reconnect callback to apply new pool config.
Definition cluster.c:339
uint8_t slave[MAX_SLAVES]
R/O node (slave) for this key slot.
Definition cluster.c:241
int fr_redis_cluster_port(uint16_t *out, fr_redis_cluster_node_t const *node)
Return the port of a particular node.
Definition cluster.c:1688
static int _fr_redis_cluster_free(fr_redis_cluster_t *cluster)
Destroy mutex associated with cluster slots structure.
Definition cluster.c:2184
#define FAILED_WEIGHT
What weight to give to nodes that had a spawn failure recently.
Definition cluster.c:187
uint8_t master
R/W node (master) for this key slot.
Definition cluster.c:243
static int cluster_node_pool_health(fr_time_t now, fr_pool_state_t const *state)
Try to determine the health of a cluster node passively by examining its pool state.
Definition cluster.c:1208
static fr_redis_cluster_rcode_t cluster_node_ping(request_t *request, fr_redis_cluster_node_t *node, fr_redis_conn_t *conn)
Issue a ping request against a cluster node.
Definition cluster.c:1243
void * fr_redis_cluster_conn_create(TALLOC_CTX *ctx, void *instance, fr_time_delta_t timeout)
Create a new connection to a Redis node.
Definition cluster.c:1467
fr_redis_cluster_key_slot_t const * fr_redis_cluster_slot_by_key(fr_redis_cluster_t *cluster, request_t *request, uint8_t const *key, size_t key_len)
Implements the key slot selection scheme used by freeradius.
Definition cluster.c:1602
static int8_t _cluster_node_cmp(void const *one, void const *two)
Compare two redis nodes to check equality.
Definition cluster.c:322
fr_redis_cluster_rcode_t fr_redis_cluster_remap(request_t *request, fr_redis_cluster_t *cluster, fr_redis_conn_t *conn)
Perform a runtime remap of the cluster.
Definition cluster.c:1008
#define RELEASED_MIN_WEIGHT
Minimum weight to assign to node.
Definition cluster.c:194
static fr_redis_cluster_rcode_t cluster_map_apply(fr_redis_cluster_t *cluster, redisReply *reply)
Apply a cluster map received from a cluster node.
Definition cluster.c:537
uint8_t id
Node ID (index in node array).
Definition cluster.c:217
bool triggers_enabled
Whether triggers are enabled.
Definition cluster.c:254
fr_redis_rcode_t fr_redis_cluster_state_init(fr_redis_cluster_state_t *state, fr_redis_conn_t **conn, fr_redis_cluster_t *cluster, request_t *request, uint8_t const *key, size_t key_len, bool read_only)
Resolve a key to a pool, and reserve a connection in that pool.
Definition cluster.c:1740
static int _cluster_conn_free(fr_redis_conn_t *conn)
Callback for freeing a Redis connection.
Definition cluster.c:1451
fr_socket_t addr
Current node address.
Definition cluster.c:219
fr_table_num_sorted_t const fr_redis_cluster_rcodes_table[]
Definition cluster.c:280
bool is_master
Whether this node is a master.
Definition cluster.c:229
#define FAILED_PERIOD
How recently must the spawn failure occurred for us to care.
Definition cluster.c:184
fr_fifo_t * free_nodes
Queue of free nodes (or nodes waiting to be reused).
Definition cluster.c:271
#define RELEASED_PERIOD
Period after which we don't care about when the last connection was released.
Definition cluster.c:190
int fr_redis_cluster_ipaddr(fr_ipaddr_t *out, fr_redis_cluster_node_t const *node)
Return the ipaddr of a particular node.
Definition cluster.c:1671
pthread_mutex_t mutex
Mutex to synchronise cluster operations.
Definition cluster.c:277
uint8_t slave_num
Number of slaves associated with this key slot.
Definition cluster.c:242
fr_redis_cluster_node_t * node
Structure containing a node id, its address and a pool of its connections.
Definition cluster.c:268
bool fr_redis_cluster_min_version(fr_redis_cluster_t *cluster, char const *min_version)
Check if members of the cluster are above a certain version.
Definition cluster.c:2201
CONF_SECTION * pool_cs
Pool configuration section associated with node.
Definition cluster.c:226
Live nodes data, used to perform weighted random selection of alternative nodes.
Definition cluster.c:198
A redis cluster.
Definition cluster.c:250
Indexes in the fr_redis_cluster_node_t array for a single key slot.
Definition cluster.c:240
A Redis cluster node.
Definition cluster.c:212
Common functions for interacting with Redis cluster via Hiredis.
size_t key_len
Length of the key.
Definition cluster.h:55
bool close_conn
Set by caller of fr_redis_cluster_state_next, to indicate that connection must be closed,...
Definition cluster.h:50
uint32_t retries
How many times we've received TRYAGAIN.
Definition cluster.h:60
uint32_t reconnects
How many connections we've tried in this pool.
Definition cluster.h:62
uint8_t const * key
Key we performed hashing on.
Definition cluster.h:54
fr_redis_cluster_rcode_t
Return values for internal functions.
Definition cluster.h:67
@ FR_REDIS_CLUSTER_RCODE_IGNORED
Operation ignored.
Definition cluster.h:68
@ FR_REDIS_CLUSTER_RCODE_FAILED
Operation failed.
Definition cluster.h:70
@ FR_REDIS_CLUSTER_RCODE_SUCCESS
Operation completed successfully.
Definition cluster.h:69
@ FR_REDIS_CLUSTER_RCODE_BAD_INPUT
Validation error.
Definition cluster.h:73
@ FR_REDIS_CLUSTER_RCODE_NO_CONNECTION
Operation failed because we couldn't find a live connection.
Definition cluster.h:71
uint32_t redirects
How many redirects have we followed.
Definition cluster.h:58
fr_redis_cluster_node_t * node
Node we're communicating with.
Definition cluster.h:57
uint32_t in_pool
How many available connections are there in the pool.
Definition cluster.h:61
Redis connection sequence state.
Definition cluster.h:49
uint16_t fr_crc16_xmodem(uint8_t const *in, size_t in_len)
CRC16 implementation according to CCITT standards.
Definition crc16.c:91
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:141
#define MEM(x)
Definition debug.h:46
#define ERROR(fmt,...)
Definition dhcpclient.c:40
#define DEBUG(fmt,...)
Definition dhcpclient.c:38
void * fr_fifo_peek(fr_fifo_t *fi)
Examine the next element that would be popped.
Definition fifo.c:158
unsigned int fr_fifo_num_elements(fr_fifo_t *fi)
Return the number of elements in the fifo queue.
Definition fifo.c:170
int fr_fifo_push(fr_fifo_t *fi, void *data)
Push data onto the fifo.
Definition fifo.c:111
void * fr_fifo_pop(fr_fifo_t *fi)
Pop data off of the fifo.
Definition fifo.c:135
#define fr_fifo_create(_ctx, _max_entries, _node_free)
Creates a fifo.
Definition fifo.h:66
talloc_free(hp)
int fr_inet_pton(fr_ipaddr_t *out, char const *value, ssize_t inlen, int af, bool resolve, bool mask)
Simple wrapper to decide whether an IP value is v4 or v6 and call the appropriate parser.
Definition inet.c:783
int fr_inet_pton_port(fr_ipaddr_t *out, uint16_t *port_out, char const *value, ssize_t inlen, int af, bool resolve, bool mask)
Parses IPv4/6 address + port, to fr_ipaddr_t and integer (port)
Definition inet.c:944
int8_t fr_ipaddr_cmp(fr_ipaddr_t const *a, fr_ipaddr_t const *b)
Compare two ip addresses.
Definition inet.c:1353
int af
Address family.
Definition inet.h:63
IPv4/6 prefix.
#define PERROR(_fmt,...)
Definition log.h:228
#define REXDENT()
Exdent (unindent) R* messages by one level.
Definition log.h:455
#define DEBUG3(_fmt,...)
Definition log.h:266
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition log.h:540
#define RPDEBUG2(fmt,...)
Definition log.h:359
#define RDEBUG3(fmt,...)
Definition log.h:355
#define PDEBUG2(_fmt,...)
Definition log.h:272
#define RWARN(fmt,...)
Definition log.h:309
#define PWARN(_fmt,...)
Definition log.h:227
#define RERROR(fmt,...)
Definition log.h:310
#define RPERROR(fmt,...)
Definition log.h:314
#define RINFO(fmt,...)
Definition log.h:308
#define RPEDEBUG(fmt,...)
Definition log.h:388
#define RINDENT()
Indent R* messages by one level.
Definition log.h:442
#define fr_time()
Definition event.c:60
@ L_DBG_LVL_3
3rd highest priority debug messages (-xxx | -Xx).
Definition log.h:69
unsigned short uint16_t
unsigned int uint32_t
long int ssize_t
unsigned char uint8_t
#define UINT8_MAX
char const * inet_ntop(int af, void const *src, char *dst, size_t cnt)
Definition missing.c:447
int fr_pair_list_copy(TALLOC_CTX *ctx, fr_pair_list_t *to, fr_pair_list_t const *from)
Duplicate a list of pairs.
Definition pair.c:2330
void fr_pair_list_init(fr_pair_list_t *list)
Initialise a pair list header.
Definition pair.c:46
int fr_pool_start(fr_pool_t *pool)
Definition pool.c:1119
void fr_pool_connection_release(fr_pool_t *pool, request_t *request, void *conn)
Release a connection.
Definition pool.c:1410
fr_pool_state_t const * fr_pool_state(fr_pool_t *pool)
Get the number of connections currently in the pool.
Definition pool.c:1176
int fr_pool_reconnect(fr_pool_t *pool, request_t *request)
Mark connections for reconnection, and spawn at least 'start' connections.
Definition pool.c:1247
fr_pool_t * fr_pool_init(TALLOC_CTX *ctx, CONF_SECTION const *cs, void *opaque, fr_pool_connection_create_t c, fr_pool_connection_alive_t a, char const *log_prefix)
Create a new connection pool.
Definition pool.c:970
int fr_pool_connection_close(fr_pool_t *pool, request_t *request, void *conn)
Delete a connection from the connection pool.
Definition pool.c:1540
void * fr_pool_connection_get(fr_pool_t *pool, request_t *request)
Reserve a connection in the connection pool.
Definition pool.c:1395
int fr_pool_start_num(fr_pool_t *pool)
Connection pool get start.
Definition pool.c:1196
void fr_pool_enable_triggers(fr_pool_t *pool, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Enable triggers for a connection pool.
Definition pool.c:936
void fr_pool_reconnect_func(fr_pool_t *pool, fr_pool_reconnect_t reconnect)
Set a reconnection callback for the connection pool.
Definition pool.c:1227
A connection pool.
Definition pool.c:85
fr_time_t last_failed
Last time we tried to spawn a connection but failed.
Definition pool.h:50
fr_time_t last_closed
Last time a connection was closed.
Definition pool.h:57
fr_time_t last_released
Last time a connection was released.
Definition pool.h:56
uint32_t num
Number of connections in the pool.
Definition pool.h:67
#define fr_assert(_expr)
Definition rad_assert.h:37
#define REDEBUG(fmt,...)
#define RDEBUG2(fmt,...)
#define DEBUG2(fmt,...)
#define WARN(fmt,...)
#define INFO(fmt,...)
Definition radict.c:63
static rs_t * conf
Definition radsniff.c:52
uint32_t fr_rand(void)
Return a 32-bit random number.
Definition rand.c:104
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
Return how many nodes there are in a tree.
Definition rb.c:781
void * fr_rb_iter_init_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Initialise an in-order iterator.
Definition rb.c:824
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
void * fr_rb_iter_next_inorder(UNUSED fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Return the next node.
Definition rb.c:850
#define fr_rb_inline_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black tree.
Definition rb.h:269
Iterator structure for in-order traversal of an rbtree.
Definition rb.h:319
The main red black tree structure.
Definition rb.h:71
void fr_redis_reply_print(fr_log_lvl_t lvl, redisReply *reply, request_t *request, int idx, fr_redis_rcode_t status)
Print the response data in a useful treelike form.
Definition redis.c:142
fr_table_num_sorted_t const redis_rcodes[]
Definition redis.c:40
redisContext * handle
Hiredis context used when issuing commands.
Definition base.h:101
fr_redis_cluster_node_t * node
Node this connection is to.
Definition base.h:102
#define REDIS_ERROR_MOVED_STR
Definition base.h:46
static void fr_redis_reply_free(redisReply **reply)
Wrap freeReplyObject so we consistently check for NULL pointers.
Definition base.h:64
uint32_t fr_redis_version_num(char const *version)
Convert version string into a 32bit unsigned integer for comparisons.
Definition redis.c:692
fr_redis_rcode_t fr_redis_command_status(fr_redis_conn_t *conn, redisReply *reply)
Check the reply for errors.
Definition redis.c:71
fr_table_num_sorted_t const redis_reply_types[]
Definition redis.c:30
#define REDIS_ERROR_ASK_STR
Definition base.h:47
fr_redis_rcode_t
Codes are ordered inversely by priority.
Definition base.h:87
@ REDIS_RCODE_RECONNECT
Transitory error, caller should retry the operation with a new connection.
Definition base.h:91
@ REDIS_RCODE_SUCCESS
Operation was successful.
Definition base.h:88
@ REDIS_RCODE_MOVE
Attempt operation on an alternative node with remap.
Definition base.h:94
@ REDIS_RCODE_TRY_AGAIN
Try the operation again.
Definition base.h:90
@ REDIS_RCODE_NO_SCRIPT
Script doesn't exist.
Definition base.h:95
@ REDIS_RCODE_ASK
Attempt operation on an alternative node.
Definition base.h:93
@ REDIS_RCODE_ERROR
Unrecoverable library/server error.
Definition base.h:89
fr_redis_rcode_t fr_redis_get_version(char *out, size_t out_len, fr_redis_conn_t *conn)
Get the version of Redis running on the remote server.
Definition redis.c:642
Configuration parameters for a redis connection.
Definition base.h:109
Connection handle, holding a redis context.
Definition base.h:100
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
Definition snprintf.c:689
return count
Definition module.c:155
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
An element in a lexicographically sorted array of name to num mappings.
Definition table.h:49
char * talloc_typed_asprintf(TALLOC_CTX *ctx, char const *fmt,...)
Call talloc vasprintf, setting the type on the new chunk correctly.
Definition talloc.c:545
int talloc_link_ctx(TALLOC_CTX *parent, TALLOC_CTX *child)
Link two different parent and child contexts, so the child is freed before the parent.
Definition talloc.c:167
#define talloc_strdup(_ctx, _str)
Definition talloc.h:142
static size_t talloc_strlen(char const *s)
Returns the length of a talloc array containing a string.
Definition talloc.h:136
static int64_t fr_time_to_sec(fr_time_t when)
Convert an fr_time_t (internal time) to number of sec since the unix epoch (wallclock time)
Definition time.h:731
#define fr_time_delta_to_timespec(_delta)
Convert a delta to a timespec.
Definition time.h:666
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_delta_to_timeval(_delta)
Convert a delta to a timeval.
Definition time.h:656
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
Definition time.h:637
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
void trigger_args_afrom_server(TALLOC_CTX *ctx, fr_pair_list_t *list, char const *server, uint16_t port)
Create trigger arguments to describe the server the pool connects to.
Definition trigger.c:425
bool trigger_enabled(void)
Return whether triggers are enabled.
Definition trigger.c:94
Master include file to access all functions and structures in the library.
bool fr_pair_list_empty(fr_pair_list_t const *list)
Is a valuepair list empty.
void fr_pair_list_free(fr_pair_list_t *list)
Free memory used by a valuepair list.
int af
AF_INET, AF_INET6, or AF_UNIX.
Definition socket.h:75
Holds information necessary for binding or connecting to a socket.
Definition socket.h:60
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_strerror_const(_msg)
Definition strerror.h:223
#define fr_box_strvalue_len(_val, _len)
Definition value.h:309
static size_t char ** out
Definition value.h:1030