The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
cluster.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: b9168ca817664a51f6a310fa5655eee0fc006d05 $
19 * @file redis.c
20 * @brief conf functions for interacting with Redis cluster via Hiredis.
21 *
22 * @author Arran Cudbard-Bell
23 *
24 * @copyright 2015 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
25 * @copyright 2015 Network RADIUS (legal@networkradius.com)
26 * @copyright 2015 The FreeRADIUS server project
27 *
28 * Overview
29 * ========
30 *
31 * Read and understand this http://redis.io/topics/cluster-spec first, else the text below
32 * will not be useful.
33 *
34 * Using the cluster's public API
35 * ------------------------------
36 *
37 * The two functions used at runtime to issue commands are #fr_redis_cluster_state_init
38 * and #fr_redis_cluster_state_next.
39 *
40 * #fr_redis_cluster_state_init initialises the structure we use to track which node
41 * we're communicating with, and uses a key (string) to determine which node we should
42 * try and contact first.
43 *
44 * #fr_redis_cluster_state_next examines the result of the last command, and either
45 * gets a new connection, or errors out.
46 *
47 * In both cases the connection should not be released by the caller, only by
48 * #fr_redis_cluster_state_next.
49 *
50 * Below is the sequence of calls required to use the cluster:
51 *
52 * 1. During initialization allocate a cluster configuration structure with
53 * #fr_redis_cluster_alloc. This holds the cluster configuration and state.
54 * 2. Declare a #fr_redis_cluster_state_t in the function that needs to issue
55 * commands against the cluster.
56 * 3. Call #fr_redis_cluster_state_init with relevant arguments including a pointer
57 * to the #fr_redis_cluster_state_t struct, and a key/key_len used for initial
58 * lookup. For most commands the key is the value of the second argument.
59 * #fr_redis_cluster_state_init will then and reserve/pass back a connection for
60 * the pool associated with the node associated with the key.
61 * 4. Use the connection that was passed back, to issue a Redis command against.
62 * 5. Use #fr_redis_command_status to translate the result of the command into
63 * a #fr_redis_rcode_t value.
64 * 6. Call #fr_redis_cluster_state_next with relevant arguments including a pointer
65 * to the #fr_redis_cluster_state_t struct and the #fr_redis_rcode_t value for the
66 * last command.
67 * 7. If #fr_redis_cluster_state_next returns 0, repeat steps 4-7. Otherwise
68 * stop and analyse the return value.
69 *
70 * See #fr_redis_cluster_state_init for example code.
71 *
72 * Structures
73 * ----------
74 *
75 * This code maintains a series structures for efficient lookup and lockless operations.
76 *
77 * The important ones are:
78 * - An array of #fr_redis_cluster_node_t. These are pre-allocated on startup and are
79 * never added to, or removed from.
80 * - An #fr_fifo_t. This contains the queue of nodes that may be re-used.
81 * - An #fr_rb_tree_t. This contains a tree of nodes which are active. The tree is built on IP
82 * address and port.
83 *
84 * Each #fr_redis_cluster_node_t contains a master ID, and an array of slave IDs. The IDs are array
85 * indexes in the fr_redis_cluster_t.node array. We use 8bit unsigned integers instead of
86 * pointers to save space. Using pointers, the node[] array would need 784K, using IDs
87 * it uses 112K. Still not light on memory, but a bit more acceptable.
88 * Currently the key_slot array is shadowed by key_slot_pending, used to stage new key_slot
89 * mappings. This doubles the memory used. We may want to consider allocating key_slot_pending
90 * only during remappings and freeing it after.
91 *
92 * Mapping/Remapping the cluster
93 * -----------------------------
94 *
95 * On startup, and during cluster operation, a remap may be performed. A remap involves
96 * the following steps:
97 *
98 * 1. Executing the Redis 'cluster slots' command.
99 * 2. Validating the result of this command. We need to do extensive validation to
100 * avoid SEGV on invalid data, due to the way libhiredis presents the result.
101 * 3. Determining the intersection between nodes described in the result, and those already
102 * in our #fr_rb_tree_t.
103 * 4. Connecting to nodes that were in the result, but not in the tree.
104 * Note: If we can't connect to any of the masters, we count the map as invalid, roll
105 * back any newly connected nodes, and error out. Slave failure is OK.
106 * 5. Mapping keyslot ranges to nodes in the key_slot_pending array.
107 * 6. Verifying there are no holes in the ranges (if there are, we roll back and error out).
108 * 7. Applying the new keyslot ranges.
109 * 8. Removing nodes no longer used by the key slots, and adding them back to the free
110 * nodes queue.
111 *
112 * #cluster_map_get and #cluster_map_apply, perform the operations described
113 * above. The get function, issues the 'cluster slots' command and performs validation, the
114 * apply function processes and applies the map.
115 *
116 * Failing to apply a map is not a fatal error at runtime, and is only fatal on startup if
117 * pool.start > 0.
118 *
119 * The cluster client can continue to operate, albeit inefficiently, with a stale cluster map
120 * by following '-ASK' and '-MOVE' redirects.
121 *
122 * Remaps are limited to one per second. If any operation sets the remap_needed flag, or
123 * attempts a remap directly, the remap may be skipped if one occurred recently.
124 *
125 *
126 * Processing '-ASK' and '-MOVE' redirects
127 * ---------------------------------------
128 *
129 * The code treats '-ASK' (temporary redirect) and '-MOVE' (permanent redirect) responses
130 * similarly. If the node is known, then a connection is reserved from its pool, if the node
131 * is not known, a new pool is established, and a connection reserved.
132 *
133 * The difference between '-ASK' and '-MOVE' is that '-MOVE' attempts a cluster remap before
134 * following the redirect.
135 *
136 * The data from '-MOVE' responses, is not used to alter the cluster map. That is only done
137 * on successful remap.
138 *
139 *
140 * Processing '-TRYAGAIN'
141 * ----------------------
142 *
143 * If the cluster is in a state of flux, a node may return '-TRYAGAIN' to indicated that we
144 * should attempt the operation again. The cluster spec says we should attempt the operation
145 * after some time. This time is configurable.
146 *
147 */
148
149#include <freeradius-devel/util/debug.h>
150#include <freeradius-devel/server/cf_parse.h>
151
152#include <freeradius-devel/util/fifo.h>
153#include <freeradius-devel/util/misc.h>
154#include <freeradius-devel/util/rand.h>
155#include <freeradius-devel/util/time.h>
156
157#include "config.h"
158#include "base.h"
159#include "cluster.h"
160#include "crc16.h"
161
162#ifndef WITH_TLS
163# undef HAVE_REDIS_SSL
164#endif
165
166#ifdef HAVE_REDIS_SSL
167#include <freeradius-devel/tls/strerror.h>
168#include <hiredis/hiredis_ssl.h>
169#endif
170
171#define KEY_SLOTS 16384 //!< Maximum number of keyslots (should not change).
172
173#define MAX_SLAVES 5 //!< Maximum number of slaves associated
174 //!< with a keyslot.
175
176/*
177 * Periods and weights for live node selection
178 */
179#define CLOSED_PERIOD 10000 //!< How recently must the closed have
180 //!< occurred for us to care.
181
182#define CLOSED_WEIGHT 1 //!< What weight to give to nodes that
183 //!< had a connection closed recently.
184
185#define FAILED_PERIOD 10000 //!< How recently must the spawn failure
186 //!< occurred for us to care.
187
188#define FAILED_WEIGHT 1 //!< What weight to give to nodes that
189 //!< had a spawn failure recently.
190
191#define RELEASED_PERIOD 10000 //!< Period after which we don't care
192 //!< about when the last connection was
193 //!< released.
194
195#define RELEASED_MIN_WEIGHT 1000 //!< Minimum weight to assign to node.
196
197/** Live nodes data, used to perform weighted random selection of alternative nodes
198 */
199typedef struct {
200 struct {
201 uint8_t id; //!< Node ID.
202 fr_pool_state_t const *pool_state; //!< Connection pool stats.
203 unsigned int cumulative; //!< Cumulative weight.
204 } node[UINT8_MAX - 1]; //!< Array of live node IDs (and weights).
205 uint8_t next; //!< Next index in live.
208
209/** A Redis cluster node
210 *
211 * Passed as opaque data to pools which open connection to nodes.
212 */
214 fr_rb_node_t rbnode; //!< Entry into the tree of redis nodes.
215
216 char name[INET6_ADDRSTRLEN]; //!< Buffer to hold IP string.
217 //!< text for debug messages.
218 uint8_t id; //!< Node ID (index in node array).
219
220 fr_socket_t addr; //!< Current node address.
221 fr_socket_t pending_addr; //!< New node address to be applied when the pool
222 //!< is reconnected.
223
224 fr_redis_cluster_t *cluster; //!< Common configuration (database number,
225 //!< password, etc..).
226 fr_pool_t *pool; //!< Pool associated with this node.
227 CONF_SECTION *pool_cs; //!< Pool configuration section associated with node.
228
229 bool is_active; //!< Whether this node is in the active node set.
230 bool is_master; //!< Whether this node is a master.
231 //!< This is needed for commands like 'KEYS', which
232 //!< we need to issue to every master in the cluster.
233};
234
235/** Indexes in the fr_redis_cluster_node_t array for a single key slot
236 *
237 * When dealing with 16K entries, space is a concern. It's significantly
238 * more memory efficient to use 8bit indexes than 64bit pointers for each
239 * of the key slot to node mappings.
240 */
242 uint8_t slave[MAX_SLAVES]; //!< R/O node (slave) for this key slot.
243 uint8_t slave_num; //!< Number of slaves associated with this key slot.
244 uint8_t master; //!< R/W node (master) for this key slot.
245};
246
247/** A redis cluster
248 *
249 * Holds all the structures and collections of nodes, to represent a Redis cluster.
250 */
252 char const *log_prefix; //!< What to prepend to log messages.
253 char const *trigger_prefix; //!< Trigger path.
254 fr_pair_list_t trigger_args; //!< Arguments to pass to triggers.
255 bool triggers_enabled; //!< Whether triggers are enabled.
256
257 bool remapping; //!< True when cluster is being remapped.
258 bool remap_needed; //!< Set true if at least one cluster node is definitely
259 //!< unreachable. Set false on successful remap.
260 fr_time_t last_updated; //!< Last time the cluster mappings were updated.
261 CONF_SECTION *module; //!< Module configuration.
262
263 fr_redis_conf_t *conf; //!< Base configuration data such as the database number
264 //!< and passwords.
265#ifdef HAVE_REDIS_SSL
266 SSL_CTX *ssl_ctx; //!< SSL context.
267#endif
268
269 fr_redis_cluster_node_t *node; //!< Structure containing a node id, its address and
270 //!< a pool of its connections.
271
272 fr_fifo_t *free_nodes; //!< Queue of free nodes (or nodes waiting to be reused).
273 fr_rb_tree_t *used_nodes; //!< Tree of used nodes.
274
275 fr_redis_cluster_key_slot_t key_slot[KEY_SLOTS]; //!< Lookup table of slots to pools.
277
278 pthread_mutex_t mutex; //!< Mutex to synchronise cluster operations.
279};
280
289
290/** Resolve key to key slot
291 *
292 * Identical to the example implementation, except it uses memchr which will
293 * be faster, and isn't so needlessly complex.
294 *
295 * @param[in] key to resolve.
296 * @param[in] key_len length of key.
297 * @return key slot index for the key.
298 */
299static uint16_t cluster_key_hash(uint8_t const *key, size_t key_len)
300{
301 uint8_t *p, *q;
302
303 p = memchr(key, '{', key_len);
304 if (!p) {
305 all:
306 return fr_crc16_xmodem(key, key_len) & (KEY_SLOTS - 1);
307 }
308
309 q = memchr(p, '}', key_len - (p - key)); /* look for } after { */
310 if (!q || (q == p + 1)) goto all; /* no } or {}, hash everything */
311
312 p++; /* skip '{' */
313
314 return fr_crc16_xmodem(p, q - p) & (KEY_SLOTS - 1); /* hash stuff between { and } */
315}
316
317/** Compare two redis nodes to check equality
318 *
319 * @param[in] one first node.
320 * @param[in] two second node.
321 * @return CMP(one, two)
322 */
323static int8_t _cluster_node_cmp(void const *one, void const *two)
324{
325 fr_redis_cluster_node_t const *a = one;
326 fr_redis_cluster_node_t const *b = two;
327 int ret;
328
329 ret = fr_ipaddr_cmp(&a->addr.inet.dst_ipaddr, &b->addr.inet.dst_ipaddr);
330 if (ret != 0) return ret;
331
332 return CMP(a->addr.inet.dst_port, b->addr.inet.dst_port);
333}
334
335/** Reconnect callback to apply new pool config
336 *
337 * @param[in] pool to apply new configuration to.
338 * @param[in] opaque data passed to the connection pool.
339 */
340static void _cluster_node_conf_apply(fr_pool_t *pool, void *opaque)
341{
343 fr_redis_cluster_node_t *node = opaque;
344
346 node->addr = node->pending_addr;
347
348 if (node->cluster->triggers_enabled) {
349 trigger_args_afrom_server(pool, &args, node->name, node->addr.inet.dst_port);
350 if (fr_pair_list_empty(&args)) return;
351
353 &node->cluster->trigger_args) >= 0);
354
356
358 }
359}
360
361/** Establish a connection to a cluster node
362 *
363 * @note Must be called with the cluster mutex locked.
364 * @note Configuration to use for the connection must be set in node->pending_addr, not node->cluster->conf.
365 *
366 * @param[in] cluster to search in.
367 * @param[in] node config.
368 * @return
369 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
370 * - FR_REDIS_CLUSTER_RCODE_FAILED if the operation failed.
371 */
373{
374 char const *p;
375
376 fr_assert(node->pending_addr.inet.dst_ipaddr.af);
377
378 /*
379 * Write out the IP address and Port in string form
380 */
381 p = inet_ntop(node->pending_addr.inet.dst_ipaddr.af, &node->pending_addr.inet.dst_ipaddr.addr,
382 node->name, sizeof(node->name));
384
385 /*
386 * Node has never been used before, needs a pool allocated for it.
387 */
388 if (!node->pool) {
389 char buffer[256];
391 CONF_SECTION *pool;
392
394 snprintf(buffer, sizeof(buffer), "%s [%i]", cluster->log_prefix, node->id);
395
396 pool = cf_section_find(cluster->module, "pool", NULL);
397 /*
398 * Dup so we can re-parse, and have unique CONF_DATA
399 */
400 node->pool_cs = cf_section_dup(cluster, NULL, pool, "pool", NULL, true);
401 node->addr = node->pending_addr;
402 node->pool = fr_pool_init(cluster, node->pool_cs, node,
404 if (!node->pool) {
405 error:
406 TALLOC_FREE(node->pool_cs);
407 TALLOC_FREE(node->pool);
409 }
411
412 if (trigger_enabled() && cluster->triggers_enabled) {
413 trigger_args_afrom_server(node->pool, &args, node->name, node->addr.inet.dst_port);
414 if (fr_pair_list_empty(&args)) goto error;
415
416 if (!fr_pair_list_empty(&cluster->trigger_args)) MEM(fr_pair_list_copy(cluster, &args, &cluster->trigger_args) >= 0);
417
419
421 }
422
423 if (fr_pool_start(node->pool) < 0) goto error;
424
426 }
427
428 /*
429 * Apply the new config to the possibly live pool
430 */
431 if (fr_pool_reconnect(node->pool, NULL) < 0) goto error;
432
434}
435
436/** Parse a -MOVED or -ASK redirect
437 *
438 * Converts the body of the -MOVED or -ASK error into an IPv4/6 address and port.
439 *
440 * @param[out] key_slot value extracted from redirect string (may be NULL).
441 * @param[out] node_addr Redis node ipaddr and port extracted from redirect string.
442 * @param[in] redirect to process.
443 * @return
444 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
445 * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT if the server returned an invalid redirect.
446 */
448 redisReply *redirect)
449{
450 char *p, *q;
451 unsigned long key;
452 uint16_t port;
453 fr_ipaddr_t ipaddr;
454
455 if (!redirect || (redirect->type != REDIS_REPLY_ERROR)) {
457 }
458
459 p = redirect->str;
460 if (strncmp(REDIS_ERROR_MOVED_STR, redirect->str, sizeof(REDIS_ERROR_MOVED_STR) - 1) == 0) {
461 q = p + sizeof(REDIS_ERROR_MOVED_STR); /* not a typo, skip space too */
462 } else if (strncmp(REDIS_ERROR_ASK_STR, redirect->str, sizeof(REDIS_ERROR_ASK_STR) - 1) == 0) {
463 q = p + sizeof(REDIS_ERROR_ASK_STR); /* not a typo, skip space too */
464 } else {
465 fr_strerror_const("No '-MOVED' or '-ASK' log_prefix");
467 }
468 if ((size_t)(q - p) >= (size_t)redirect->len) {
469 fr_strerror_const("Truncated");
471 }
472 p = q;
473 key = strtoul(p, &q, 10);
474 if (key > KEY_SLOTS) {
475 fr_strerror_printf("Key %lu outside of redis slot range", key);
477 }
478 p = q;
479
480 if (*p != ' ') {
481 fr_strerror_const("Missing key/host separator");
483 }
484 p++; /* Skip the ' ' */
485
486 if (fr_inet_pton_port(&ipaddr, &port, p, redirect->len - (p - redirect->str), AF_UNSPEC, true, true) < 0) {
488 }
489 fr_assert(ipaddr.af);
490
491 if (key_slot) *key_slot = key;
492 if (node_addr) {
493 node_addr->inet.dst_ipaddr = ipaddr;
494 node_addr->inet.dst_port = port;
495 }
496
498}
499
500/** Apply a cluster map received from a cluster node
501 *
502 * @note Errors may be retrieved with fr_strerror().
503 * @note Must be called with the cluster mutex held.
504 *
505 * Key slot range structure
506 @verbatim
507 [0] -> key slot range 0
508 [0] -> key_slot_start
509 [1] -> key_slot_end
510 [2] -> master_node
511 [0] -> master 0 ip (string)
512 [1] -> master 0 port (number)
513 [3..n] -> slave_node(s)
514 [1] -> key slot range 1)
515 [0] -> key_slot_start
516 [1] -> key_slot_end
517 [2] -> master_node
518 [0] -> master 1 ip (string)
519 [1] -> master 1 port (number)
520 [3..n] -> slave_node(s)
521 [n] -> key slot range n
522 [0] -> key_slot_start
523 [1] -> key_slot_end
524 [2] -> master_node
525 [0] -> master n ip (string)
526 [1] -> master n port (number)
527 [3..n] -> slave_node(s)
528 @endverbatim
529 *
530 * @param[in,out] cluster to apply map to.
531 * @param[in] reply from #cluster_map_get.
532 * @return
533 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
534 * - FR_REDIS_CLUSTER_RCODE_FAILED on failure.
535 * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
536 * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT if the map didn't provide nodes for all keyslots.
537 */
539{
540 size_t i;
541 uint8_t r = 0;
542
544
545 uint8_t rollback[UINT8_MAX]; // Set of nodes to re-add to the queue on failure.
546 bool active[UINT8_MAX]; // Set of nodes active in the new cluster map.
547 bool master[UINT8_MAX]; // Master nodes.
548#ifndef NDEBUG
549# define SET_ADDR(_addr, _map) \
550do { \
551 int _ret; \
552 _ret = fr_inet_pton(&_addr.inet.dst_ipaddr, _map->element[0]->str, _map->element[0]->len, AF_UNSPEC, true, true);\
553 fr_assert(_ret == 0);\
554 _addr.inet.dst_port = _map->element[1]->integer; \
555} while (0)
556#else
557# define SET_ADDR(_addr, _map) \
558do { \
559 fr_inet_pton(&_addr.inet.dst_ipaddr, _map->element[0]->str, _map->element[0]->len, AF_UNSPEC, true, true);\
560 _addr.inet.dst_port = _map->element[1]->integer; \
561} while (0)
562#endif
563
564#define SET_INACTIVE(_node) \
565do { \
566 (_node)->is_active = false; \
567 (_node)->is_master = false; \
568 fr_rb_delete(cluster->used_nodes, _node); \
569 fr_fifo_push(cluster->free_nodes, _node); \
570} while (0)
571
572#define SET_ACTIVE(_node) \
573do { \
574 (_node)->is_active = true; \
575 fr_rb_insert(cluster->used_nodes, _node); \
576 fr_fifo_pop(cluster->free_nodes); \
577 active[(_node)->id] = true; \
578 rollback[r++] = (_node)->id; \
579} while (0)
580
581 fr_assert(reply->type == REDIS_REPLY_ARRAY);
582
583 memset(&rollback, 0, sizeof(rollback));
584 memset(active, 0, sizeof(active));
585 memset(master, 0, sizeof(master));
586
587 cluster->remapping = true;
588
589 /*
590 * Must be cleared with the mutex held
591 */
592 memset(&cluster->key_slot_pending, 0, sizeof(cluster->key_slot_pending));
593
594 /*
595 * Insert new nodes and markup the keyslot indexes
596 * in our temporary keyslot_array.
597 *
598 * A map consists of an array with the following indexes:
599 * [0] -> key_slot_start
600 * [1] -> key_slot_end
601 * [2] -> master_node
602 * [3..n] -> slave_node(s)
603 */
604 for (i = 0; i < reply->elements; i++) {
605 size_t j;
606 long long int k;
607 int slaves = 0;
608 fr_redis_cluster_node_t *found, *spare;
609 fr_redis_cluster_node_t find = {}; /* Initialise unused fields to stop Coverity complaints */
611 redisReply *map = reply->element[i];
612
613 memset(&tmpl_slot, 0, sizeof(tmpl_slot));
614
615 SET_ADDR(find.addr, map->element[2]);
616 found = fr_rb_find(cluster->used_nodes, &find);
617 if (found) {
618 active[found->id] = true;
619 goto reuse_master_node;
620 }
621
622 /*
623 * Process the master
624 *
625 * A master node consists of any array with the following indexes:
626 * [0] -> node ip (as string)
627 * [1] -> node port
628 */
629 spare = fr_fifo_peek(cluster->free_nodes);
630 if (!spare) {
631 out_of_nodes:
632 fr_strerror_const("Reached maximum connected nodes");
634 error:
635 cluster->remapping = false;
636 cluster->last_updated = fr_time();
637 /* Re-insert new nodes back into the free_nodes queue */
638 for (i = 0; i < r; i++) SET_INACTIVE(&cluster->node[rollback[i]]);
639 return rcode;
640 }
641
642 spare->pending_addr = find.addr;
643 rcode = cluster_node_connect(cluster, spare);
644 if (rcode < 0) goto error;
645
646 /*
647 * Check to see if the node we just configured
648 * already exists in the tree. If it does we
649 * use that, else we add it to the array of
650 * nodes to rollback on failure.
651 */
652 SET_ACTIVE(spare);
653 found = spare;
654
655 reuse_master_node:
656 tmpl_slot.master = found->id;
657 master[found->id] = true; /* Mark this node as a master */
658
659 /*
660 * Process the slaves
661 *
662 * A slave node consists of any array with the following indexes:
663 * [0] -> node ip (as string)
664 * [1] -> node port
665 */
666 for (j = 3; (j < map->elements); j++) {
667 SET_ADDR(find.addr, map->element[j]);
668 found = fr_rb_find(cluster->used_nodes, &find);
669 if (found) {
670 active[found->id] = true;
671 goto next;
672 }
673
674 spare = fr_fifo_peek(cluster->free_nodes);
675 if (!spare) goto out_of_nodes;
676
677 spare->pending_addr = find.addr;
678 if (cluster_node_connect(cluster, spare) < 0) continue; /* Slave failure is non-fatal */
679
680 SET_ACTIVE(spare);
681 found = spare;
682
683 next:
684 tmpl_slot.slave[slaves++] = found->id;
685
686 /* Hit the maximum number of slaves we allow */
687 if (slaves >= MAX_SLAVES) break;
688 }
689 tmpl_slot.slave_num = slaves;
690
691 /*
692 * Copy our tmpl key slot to each of the key slots
693 * specified by the range for this map.
694 */
695 for (k = map->element[0]->integer; k <= map->element[1]->integer; k++) {
696 memcpy(&cluster->key_slot_pending[k], &tmpl_slot,
697 sizeof(*(cluster->key_slot_pending)));
698 }
699 }
700
701 /*
702 * Check for holes in the pending_addr key_slot array
703 *
704 * The cluster specification says that upon
705 * detecting a 'NULL' key_slot we should
706 * check again to see if the cluster error has
707 * been resolved, but seeing as we're in the
708 * middle of updating the cluster from very
709 * recent output of 'cluster slots' it's best to
710 * error out.
711 */
712 for (i = 0; i < KEY_SLOTS; i++) {
713 if (cluster->key_slot_pending[i].master == 0) {
714 fr_strerror_printf("Cluster is misconfigured, no node assigned for key %zu", i);
716 goto error;
717 }
718 }
719
720 /*
721 * We have connections/pools for all the nodes in
722 * the new map, apply it to the live cluster.
723 *
724 * Other workers may be using the key slot table,
725 * but that's ok. Nodes and pools are never freed,
726 * so the worst that will happen, is they'll hit
727 * the wrong node for the key, and get redirected.
728 */
729 memcpy(&cluster->key_slot, &cluster->key_slot_pending, sizeof(cluster->key_slot));
730
731 /*
732 * Anything not in the active set of nodes gets
733 * added back into the queue, to be re-used.
734 *
735 * We start at 1, as node 0 is reserved.
736 */
737 for (i = 1; i < cluster->conf->max_nodes; i++) {
738#ifndef NDEBUG
740
741 if (cluster->node[i].is_active) {
742 /* Sanity check for duplicates that are active */
743 found = fr_rb_find(cluster->used_nodes, &cluster->node[i]);
744 fr_assert(found);
745 fr_assert(found->is_active);
746 fr_assert(found->id == i);
747 }
748#endif
749
750 if (!active[i] && cluster->node[i].is_active) {
751 SET_INACTIVE(&cluster->node[i]); /* Sets is_master = false */
752
753 /*
754 * Only change the masters once we've successfully
755 * remapped the cluster.
756 */
757 } else if (master[i]) {
758 cluster->node[i].is_master = true;
759 }
760 }
761
762 cluster->remapping = false;
763 cluster->last_updated = fr_time();
764
765 /*
766 * Sanity checks
767 */
768 fr_assert(((talloc_array_length(cluster->node) - 1) - fr_rb_num_elements(cluster->used_nodes)) ==
770
772}
773
774/** Validate a cluster map node entry
775 *
776 * @note Errors may be retrieved with fr_strerror().
777 * @note In a separate function, as it's called for both master and slave nodes.
778 *
779 * @param[in] node we're validating.
780 * @param[in] map_idx we're processing.
781 * @param[in] node_idx we're processing.
782 * @return
783 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
784 * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
785 */
786static int cluster_map_node_validate(redisReply *node, int map_idx, int node_idx)
787{
788 fr_ipaddr_t ipaddr;
789
790 if (node->type != REDIS_REPLY_ARRAY) {
791 fr_strerror_printf("Cluster map %i node %i is wrong type, expected array got %s",
792 map_idx, node_idx,
793 fr_table_str_by_value(redis_reply_types, node->element[1]->type, "<UNKNOWN>"));
795 }
796
797 /*
798 * As per the redis docs: https://redis.io/commands/cluster-slots
799 *
800 * Newer versions of Redis Cluster will output, for each Redis instance,
801 * not just the IP and port, but also the node ID as third element of the
802 * array. In future versions there could be more elements describing the
803 * node better. In general a client implementation should just rely on
804 * the fact that certain parameters are at fixed positions as specified,
805 * but more parameters may follow and should be ignored.
806 * Similarly a client library should try if possible to cope with the fact
807 * that older versions may just have the IP and port parameter.
808 */
809 if (node->elements < 2) {
810 fr_strerror_printf("Cluster map %i node %i has incorrect number of elements, expected at least "
811 "2 got %zu", map_idx, node_idx, node->elements);
813 }
814
815 if (node->element[0]->type != REDIS_REPLY_STRING) {
816 fr_strerror_printf("Cluster map %i node %i ip address is wrong type, expected string got %s",
817 map_idx, node_idx,
818 fr_table_str_by_value(redis_reply_types, node->element[0]->type, "<UNKNOWN>"));
820 }
821
822 if (fr_inet_pton(&ipaddr, node->element[0]->str, node->element[0]->len, AF_UNSPEC, true, true) < 0) {
824 }
825
826 if (node->element[1]->type != REDIS_REPLY_INTEGER) {
827 fr_strerror_printf("Cluster map %i node %i port is wrong type, expected integer got %s",
828 map_idx, node_idx,
829 fr_table_str_by_value(redis_reply_types, node->element[1]->type, "<UNKNOWN>"));
831 }
832
833 if (node->element[1]->integer < 0) {
834 fr_strerror_printf("Cluster map %i node %i port is too low, expected >= 0 got %lli",
835 map_idx, node_idx, node->element[1]->integer);
837 }
838
839 if (node->element[1]->integer > UINT16_MAX) {
840 fr_strerror_printf("Cluster map %i node %i port is too high, expected <= " STRINGIFY(UINT16_MAX)" "
841 "got %lli", map_idx, node_idx, node->element[1]->integer);
843 }
844
846}
847
848/** Learn a new cluster layout by querying the node that issued the -MOVE
849 *
850 * Also validates the response from the Redis cluster, so we can be sure that
851 * it's well formed, before doing more expensive operations.
852 *
853 * @note Errors may be retrieved with fr_strerror().
854 *
855 * @param[out] out Where to write cluster map.
856 * @param[in] conn to use for learning the new cluster map.
857 * @return
858 * - FR_REDIS_CLUSTER_RCODE_IGNORED if 'cluster slots' returned an error (indicating clustering not supported).
859 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
860 * - FR_REDIS_CLUSTER_RCODE_FAILED if issuing the command resulted in an error.
861 * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
862 * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
863 */
865{
866 redisReply *reply;
867 size_t i = 0;
868
869 *out = NULL;
870
871 reply = redisCommand(conn->handle, "cluster slots");
872 switch (fr_redis_command_status(conn, reply)) {
874 fr_redis_reply_free(&reply);
875 fr_strerror_const("No connections available");
877
879 default:
880 if (reply && reply->type == REDIS_REPLY_ERROR) {
881 fr_strerror_printf("%.*s", (int)reply->len, reply->str);
882 fr_redis_reply_free(&reply);
884 }
885 fr_strerror_const("Unknown client error");
887
889 break;
890 }
891
892 if (reply->type != REDIS_REPLY_ARRAY) {
893 fr_strerror_printf("Bad response to \"cluster slots\" command, expected array got %s",
894 fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
896 }
897
898 /*
899 * Clustering configured but no slots set
900 */
901 if (reply->elements == 0) {
902 fr_strerror_printf("Empty response to \"cluster slots\" command (zero length array)");
904 }
905
906 /*
907 * Validate the complete map set before returning.
908 */
909 for (i = 0; i < reply->elements; i++) {
910 size_t j;
911 redisReply *map;
912
913 map = reply->element[i];
914 if (map->type != REDIS_REPLY_ARRAY) {
915 fr_strerror_printf("Cluster map %zu is wrong type, expected array got %s",
916 i, fr_table_str_by_value(redis_reply_types, map->type, "<UNKNOWN>"));
917 error:
918 fr_redis_reply_free(&reply);
920 }
921
922 if (map->elements < 3) {
923 fr_strerror_printf("Cluster map %zu has too few elements, expected at least 3, got %zu",
924 i, map->elements);
925 goto error;
926 }
927
928 /*
929 * Key slot start
930 */
931 if (map->element[0]->type != REDIS_REPLY_INTEGER) {
932 fr_strerror_printf("Cluster map %zu key slot start is wrong type, expected integer got %s",
933 i, fr_table_str_by_value(redis_reply_types, map->element[0]->type, "<UNKNOWN>"));
934 goto error;
935 }
936
937 if (map->element[0]->integer < 0) {
938 fr_strerror_printf("Cluster map %zu key slot start is too low, expected >= 0 got %lli",
939 i, map->element[0]->integer);
940 goto error;
941 }
942
943 if (map->element[0]->integer > KEY_SLOTS) {
944 fr_strerror_printf("Cluster map %zu key slot start is too high, expected <= "
945 STRINGIFY(KEY_SLOTS) " got %lli", i, map->element[0]->integer);
946 goto error;
947 }
948
949 /*
950 * Key slot end
951 */
952 if (map->element[1]->type != REDIS_REPLY_INTEGER) {
953 fr_strerror_printf("Cluster map %zu key slot end is wrong type, expected integer got %s",
954 i, fr_table_str_by_value(redis_reply_types, map->element[1]->type, "<UNKNOWN>"));
955 goto error;
956 }
957
958 if (map->element[1]->integer < 0) {
959 fr_strerror_printf("Cluster map %zu key slot end is too low, expected >= 0 got %lli",
960 i, map->element[1]->integer);
961 goto error;
962 }
963
964 if (map->element[1]->integer > KEY_SLOTS) {
965 fr_strerror_printf("Cluster map %zu key slot end is too high, expected <= "
966 STRINGIFY(KEY_SLOTS) " got %lli", i, map->element[1]->integer);
967 goto error;
968 }
969
970 if (map->element[1]->integer < map->element[0]->integer) {
971 fr_strerror_printf("Cluster map %zu key slot start/end out of order. "
972 "Start was %lli, end was %lli", i, map->element[0]->integer,
973 map->element[1]->integer);
974 goto error;
975 }
976
977 /*
978 * Master node
979 */
980 if (cluster_map_node_validate(map->element[2], i, 0) < 0) goto error;
981
982 /*
983 * Slave nodes
984 */
985 for (j = 3; j < map->elements; j++) {
986 if (cluster_map_node_validate(map->element[j], i, j - 2) < 0) goto error;
987 }
988 }
989 *out = reply;
990
992}
993
994/** Perform a runtime remap of the cluster
995 *
996 * @note Errors may be retrieved with fr_strerror().
997 * @note Must be called with the cluster mutex free.
998 *
999 * @param[in] request The current request.
1000 * @param[in,out] cluster to remap.
1001 * @param[in] conn to use to query the cluster.
1002 * @return
1003 * - FR_REDIS_CLUSTER_RCODE_IGNORED if 'cluster slots' returned an error (indicating clustering not supported).
1004 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
1005 * - FR_REDIS_CLUSTER_RCODE_FAILED if issuing the 'cluster slots' command resulted in a protocol error.
1006 * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
1007 * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
1008 */
1010{
1011 fr_time_t now;
1012 redisReply *map;
1014 size_t i, j;
1015
1016 /*
1017 * If the cluster was remapped very recently, or is being
1018 * remapped it's unlikely that it needs remapping again.
1019 */
1020 if (cluster->remapping) {
1021 in_progress:
1022 ROPTIONAL(RDEBUG2, DEBUG2, "Cluster remapping in progress, ignoring remap request");
1024 }
1025
1026 /*
1027 * The remap times are _our_ times, not the _request_ time.
1028 */
1029 now = fr_time();
1030 if (fr_time_to_sec(now) == fr_time_to_sec(cluster->last_updated)) {
1031 too_soon:
1032 ROPTIONAL(RWARN, WARN, "Cluster was updated less than a second ago, ignoring remap request");
1034 }
1035
1036 ROPTIONAL(RINFO, INFO, "Initiating cluster remap");
1037
1038 /*
1039 * Get new cluster information
1040 */
1041 ret = cluster_map_get(&map, conn);
1042 switch (ret) {
1043 case FR_REDIS_CLUSTER_RCODE_BAD_INPUT: /* Validation error */
1044 case FR_REDIS_CLUSTER_RCODE_NO_CONNECTION: /* Connection error */
1045 case FR_REDIS_CLUSTER_RCODE_FAILED: /* Error issuing command */
1046 return ret;
1047
1048 case FR_REDIS_CLUSTER_RCODE_IGNORED: /* Clustering not enabled, or not supported */
1049 cluster->remap_needed = false;
1051
1052 case FR_REDIS_CLUSTER_RCODE_SUCCESS: /* Success */
1053 break;
1054 }
1055
1056 /*
1057 * Print the mapping we received
1058 */
1059 ROPTIONAL(RINFO, INFO, "Cluster map consists of %zu key ranges", map->elements);
1060 for (i = 0; i < map->elements; i++) {
1061 redisReply *map_node = map->element[i];
1062
1063 ROPTIONAL(RINFO, INFO, "%zu - keys %lli-%lli", i,
1064 map_node->element[0]->integer,
1065 map_node->element[1]->integer);
1066
1067 if (request) RINDENT();
1068 ROPTIONAL(RINFO, INFO, "master: %s:%lli",
1069 map_node->element[2]->element[0]->str,
1070 map_node->element[2]->element[1]->integer);
1071 for (j = 3; j < map_node->elements; j++) {
1072 ROPTIONAL(RINFO, INFO, "slave%zu: %s:%lli", j - 3,
1073 map_node->element[j]->element[0]->str,
1074 map_node->element[j]->element[1]->integer);
1075 }
1076 if (request) REXDENT();
1077 }
1078
1079 /*
1080 * Check again that the cluster isn't being
1081 * remapped, or was remapped too recently,
1082 * now we hold the mutex and the state of
1083 * those variables is synchronized.
1084 */
1085 pthread_mutex_lock(&cluster->mutex);
1086 if (cluster->remapping) {
1087 pthread_mutex_unlock(&cluster->mutex);
1088 fr_redis_reply_free(&map); /* Free the map */
1089 goto in_progress;
1090 }
1091 if (fr_time_to_sec(now) == fr_time_to_sec(cluster->last_updated)) {
1092 pthread_mutex_unlock(&cluster->mutex);
1093 fr_redis_reply_free(&map); /* Free the map */
1094 goto too_soon;
1095 }
1096 ret = cluster_map_apply(cluster, map);
1097 if (ret == FR_REDIS_CLUSTER_RCODE_SUCCESS) cluster->remap_needed = false; /* Change on successful remap */
1098 pthread_mutex_unlock(&cluster->mutex);
1099
1100 fr_redis_reply_free(&map); /* Free the map */
1101 if (ret < 0) return FR_REDIS_CLUSTER_RCODE_FAILED;
1102
1104}
1105
1106/** Retrieve or associate a node with the server indicated in the redirect
1107 *
1108 * @note Errors may be retrieved with fr_strerror().
1109 *
1110 * @param[out] out Where to write the node representing the redirect server.
1111 * @param[in] cluster to draw node from.
1112 * @param[in] reply Redis reply containing the redirect information.
1113 * @return
1114 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
1115 * - FR_REDIS_CLUSTER_RCODE_FAILED no more nodes available.
1116 * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION connection failure.
1117 * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT on validation failure (bad data returned from Redis).
1118 */
1120{
1121 fr_redis_cluster_node_t find, *found, *spare;
1122 fr_redis_conn_t *rconn;
1123
1124 uint16_t key;
1125
1126 memset(&find, 0, sizeof(find));
1127
1128 *out = NULL;
1129
1130 if (cluster_node_conf_from_redirect(&key, &find.addr, reply) < 0) return FR_REDIS_CLUSTER_RCODE_FAILED;
1131
1132 pthread_mutex_lock(&cluster->mutex);
1133 /*
1134 * If we have already have a pool for the
1135 * host we were redirected to, use that.
1136 */
1137 found = fr_rb_find(cluster->used_nodes, &find);
1138 if (found) {
1139 /* We have the new pool, don't need to hold the lock */
1140 pthread_mutex_unlock(&cluster->mutex);
1141 *out = found;
1143 }
1144
1145 /*
1146 * Otherwise grab a free node and try and connect
1147 * it to the server we were redirected to.
1148 */
1149 spare = fr_fifo_peek(cluster->free_nodes);
1150 if (!spare) {
1151 fr_strerror_const("Reached maximum connected nodes");
1152 pthread_mutex_unlock(&cluster->mutex);
1154 }
1155 spare->pending_addr = find.addr; /* Set the config to be applied */
1156 if (cluster_node_connect(cluster, spare) < 0) {
1157 pthread_mutex_unlock(&cluster->mutex);
1159 }
1160 fr_rb_insert(cluster->used_nodes, spare);
1161 fr_fifo_pop(cluster->free_nodes);
1162 found = spare;
1163
1164 /* We have the new pool, don't need to hold the lock */
1165 pthread_mutex_unlock(&cluster->mutex);
1166
1167 /*
1168 * Determine if we can establish a connection to
1169 * the new pool, to check if it's viable.
1170 */
1171 rconn = fr_pool_connection_get(found->pool, NULL);
1172 if (!rconn) {
1173 /*
1174 * To prevent repeated misconfigurations
1175 * using all free nodes, add the node
1176 * back to the spare queue if this
1177 * was the first connection attempt and
1178 * it failed.
1179 */
1180 pthread_mutex_lock(&cluster->mutex);
1181 fr_fifo_push(cluster->free_nodes, spare);
1182 pthread_mutex_unlock(&cluster->mutex);
1183
1184 fr_strerror_const("No connections available");
1186 }
1187 fr_pool_connection_release(found->pool, NULL, rconn);
1188 *out = found;
1189
1191}
1192
1193
1194/** Try to determine the health of a cluster node passively by examining its pool state
1195 *
1196 * Returns an integer value representing the likelihood that the pool is live.
1197 * Range is between 1 and 11,000.
1198 *
1199 * If a weight of 1 is returned, connections from the pool should be checked
1200 * (by pinging) before use.
1201 *
1202 * @param now The current time.
1203 * @param state of the connection pool.
1204 * @return
1205 * - 1 the pool is very likely to be bad.
1206 * - 2-11000 the pool is likely to be good, with a higher number
1207 * indicating higher probability of liveness.
1208 */
1210{
1211 /*
1212 * Failed spawn recently, probably bad
1213 */
1215
1216 /*
1217 * Closed recently, probably bad
1218 */
1220
1221 /*
1222 * Released too long ago, don't know
1223 */
1225
1226 /*
1227 * Released not long ago, might be ok.
1228 */
1230}
1231
1232/** Issue a ping request against a cluster node
1233 *
1234 * Establishes whether the connection to the node we have is live.
1235 *
1236 * @param request The current request.
1237 * @param node to ping.
1238 * @param conn the connection to ping on.
1239 * @return
1240 * - FR_REDIS_CLUSTER_RCODE_BAD_INPUT if we got a bad response.
1241 * - FR_REDIS_CLUSTER_RCODE_SUCCESS on success.
1242 * - FR_REDIS_CLUSTER_RCODE_NO_CONNECTION on connection down.
1243 */
1245{
1246 redisReply *reply;
1247 fr_redis_rcode_t rcode;
1248
1249 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Executing command: PING", node->id);
1250 reply = redisCommand(conn->handle, "PING");
1251 rcode = fr_redis_command_status(conn, reply);
1252 if (rcode != REDIS_RCODE_SUCCESS) {
1253 ROPTIONAL(RPERROR, PERROR, "[%i] PING failed to %s:%i", node->id, node->name, node->addr.inet.dst_port);
1254 fr_redis_reply_free(&reply);
1256 }
1257
1258 if (reply->type != REDIS_REPLY_STATUS) {
1259 ROPTIONAL(RERROR, ERROR, "[%i] Bad PING response from %s:%i, expected status got %s",
1260 node->id, node->name, node->addr.inet.dst_port,
1261 fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
1262 fr_redis_reply_free(&reply);
1264 }
1265
1266 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Got response: %s", node->id, reply->str);
1267 fr_redis_reply_free(&reply);
1269}
1270
1271/** Attempt to find a live pool in the cluster
1272 *
1273 * The intent here is to find pools/nodes where a connection was released the shortest
1274 * time ago. Having a connection be released (vs closed) indicates that the pool is live.
1275 *
1276 * We don't want to have all workers try and grab a connection to this node however, as it
1277 * may still be dead (we don't know).
1278 *
1279 * So we use an inverse transform sample, to weight the nodes, based on time between now
1280 * and when the connection was released. Connections released closest to the current
1281 * time are given a higher weighting.
1282 *
1283 * Weight range is between 1 - 11,000.
1284 *
1285 * - If released > 10.0 seconds ago,information is not valid, weight 500.
1286 * - If closed < 10.0 seconds ago, it's a bad pool, weight 1.
1287 * - If spawn failed < 10.0 seconds ago, it's a bad pool, weight 1.
1288 * - If a connection was released 0.0 seconds ago, weight 11,000.
1289 * - If a connection was released 10.0 seconds ago, weight 1000.
1290 *
1291 * Using the above algorithm we use the experience of other workers using the cluster to
1292 * inform our alternative node selection.
1293 *
1294 * Suggestions on improving live node selection appreciated.
1295 *
1296 * Inverse transform sampling based roughly on the solution from this post:
1297 * http://stackoverflow.com/questions/17250568/randomly-choosing-from-a-list-with-weighted-probabilities
1298 *
1299 * Wikipedia page here:
1300 * https://en.wikipedia.org/wiki/Inverse_transform_sampling
1301 *
1302 * @note Must be called with the cluster mutex free.
1303 *
1304 * @param[out] live_node we found.
1305 * @param[out] live_conn to that node.
1306 * @param[in] request The current request (used for logging).
1307 * @param[in] cluster to search for live pools in.
1308 * @param[in] skip this node (it's bad).
1309 * @return 0 (iterates over the whole tree).
1310 */
1312 request_t *request, fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *skip)
1313{
1314 uint32_t i;
1315
1317 fr_time_t now;
1320
1321 ROPTIONAL(RDEBUG2, DEBUG2, "Searching for live cluster nodes");
1322
1323 if (fr_rb_num_elements(cluster->used_nodes) == 1) {
1324 no_alts:
1325 ROPTIONAL(RERROR, ERROR, "No alternative nodes available");
1326 return -1;
1327 }
1328
1329 live = talloc_zero(NULL, cluster_nodes_live_t); /* Too big for stack */
1330 live->skip = skip->id;
1331
1332 pthread_mutex_lock(&cluster->mutex);
1333 for (node = fr_rb_iter_init_inorder(&iter, cluster->used_nodes);
1334 node;
1335 node = fr_rb_iter_next_inorder(&iter)) {
1336 fr_assert(node->pool);
1337 if (live->skip == node->id) continue; /* Skip dead nodes */
1338
1339 live->node[live->next].pool_state = fr_pool_state(node->pool);
1340 live->node[live->next++].id = node->id;
1341 }
1342 pthread_mutex_unlock(&cluster->mutex);
1343
1344 fr_assert(live->next); /* There should be at least one */
1345 if (live->next == 1) goto no_alts; /* Weird, but conceivable */
1346
1347 now = fr_time();
1348
1349 /*
1350 * Weighted random selection
1351 */
1352 for (i = 0; (i < cluster->conf->max_alt) && live->next; i++) {
1353 fr_redis_conn_t *conn;
1354 uint8_t j;
1355 int first, last, pivot; /* Must be signed for BS */
1356 unsigned int find, cumulative = 0;
1357
1358 ROPTIONAL(RDEBUG3, DEBUG2, "(Re)assigning node weights:");
1359 if (request) RINDENT();
1360 for (j = 0; j < live->next; j++) {
1361 int weight;
1362
1363 weight = cluster_node_pool_health(now, live->node[j].pool_state);
1364 ROPTIONAL(RDEBUG3, DEBUG3, "Node %i weight: %i", live->node[j].id, weight);
1365 live->node[j].cumulative = (cumulative += weight);
1366 }
1367 if (request) REXDENT();
1368
1369 /*
1370 * Select a node at random
1371 */
1372 find = (fr_rand() & (cumulative - 1)); /* Between 1 and total */
1373 first = 0;
1374 last = live->next - 1;
1375 pivot = (first + last) / 2;
1376
1377 while (first <= last) {
1378 if (live->node[pivot].cumulative < find) {
1379 first = pivot + 1;
1380 } else if (live->node[pivot].cumulative == find) {
1381 break;
1382 } else {
1383 last = pivot - 1;
1384 }
1385 pivot = (first + last) / 2;
1386 }
1387 /*
1388 * Round up...
1389 */
1390 if (first > last) pivot = last + 1;
1391
1392 /*
1393 * Resolve the index to the actual node. We use IDs
1394 * to save memory...
1395 */
1396 node = &cluster->node[live->node[pivot].id];
1397 fr_assert(live->node[pivot].id == node->id);
1398
1399 ROPTIONAL(RDEBUG2, DEBUG2, "Selected node %i (using random value %i)", node->id, find);
1400 conn = fr_pool_connection_get(node->pool, request);
1401 if (!conn) {
1402 ROPTIONAL(RERROR, ERROR, "No connections available to node %i %s:%i", node->id,
1403 node->name, node->addr.inet.dst_port);
1404 next:
1405 /*
1406 * Remove the node we just discovered was bad
1407 * out of the set of nodes we're selecting over.
1408 */
1409 if (pivot == live->next) {
1410 live->next--;
1411 continue;
1412 }
1413 memcpy(&live->node[pivot], &live->node[live->next - 1], sizeof(live->node[pivot]));
1414 live->next--;
1415 continue;
1416 }
1417
1418 /*
1419 * PING! PONG?
1420 */
1421 switch (cluster_node_ping(request, node, conn)) {
1423 break;
1424
1426 fr_pool_connection_close(node->pool, request, conn);
1427 goto next;
1428
1429 default:
1430 fr_pool_connection_release(node->pool, request, conn);
1431 goto next;
1432 }
1433
1434 *live_node = node;
1435 *live_conn = conn;
1436 talloc_free(live);
1437
1438 return 0;
1439 }
1440
1441 ROPTIONAL(RERROR, ERROR, "Hit max alt limit %i, and no live connections found", cluster->conf->max_alt);
1442 talloc_free(live);
1443
1444 return -1;
1445}
1446
1447/** Callback for freeing a Redis connection
1448 *
1449 * @param[in] conn to free.
1450 * @return 0.
1451 */
1453{
1454 redisFree(conn->handle);
1455
1456 return 0;
1457}
1458
1459/** Create a new connection to a Redis node
1460 *
1461 * @param[in] ctx to allocate connection structure in. Will be freed at the same time as the pool.
1462 * @param[in] instance data of type #fr_redis_cluster_node_t. Holds parameters for establishing new connection.
1463 * @param[in] timeout The maximum time allowed to complete the connection.
1464 * @return
1465 * - New #fr_redis_conn_t on success.
1466 * - NULL on failure.
1467 */
1468void *fr_redis_cluster_conn_create(TALLOC_CTX *ctx, void *instance, fr_time_delta_t timeout)
1469{
1470 fr_redis_cluster_node_t *node = instance;
1471 fr_redis_conn_t *conn = NULL;
1472 redisContext *handle;
1473 redisReply *reply = NULL;
1474 char const *log_prefix = node->cluster->log_prefix;
1475
1476 DEBUG2("%s - [%i] Connecting to node %s:%i", log_prefix, node->id, node->name, node->addr.inet.dst_port);
1477
1478 handle = redisConnectWithTimeout(node->name, node->addr.inet.dst_port, fr_time_delta_to_timeval(timeout));
1479 if ((handle != NULL) && handle->err) {
1480 ERROR("%s - [%i] Connection failed: %s", log_prefix, node->id, handle->errstr);
1481 redisFree(handle);
1482 return NULL;
1483 } else if (!handle) {
1484 ERROR("%s - [%i] Connection failed", log_prefix, node->id);
1485 return NULL;
1486 }
1487
1488 conn = talloc_zero(ctx, fr_redis_conn_t);
1489 conn->handle = handle;
1490 conn->node = node;
1491 talloc_set_destructor(conn, _cluster_conn_free);
1492
1493#ifdef HAVE_REDIS_SSL
1494 if (node->cluster->ssl_ctx != NULL) {
1495 fr_tls_session_t *tls_session = fr_tls_session_alloc_client(conn, node->cluster->ssl_ctx);
1496 if (!tls_session) {
1497 fr_tls_strerror_printf("%s - [%i]", log_prefix, node->id);
1498 ERROR("%s - [%i] Failed to allocate TLS session", log_prefix, node->id);
1499 talloc_free(conn);
1500 return NULL;
1501 }
1502
1503 // redisInitiateSSL() takes ownership of SSL object on success
1504 SSL_up_ref(tls_session->ssl);
1505 if (redisInitiateSSL(handle, tls_session->ssl) != REDIS_OK) {
1506 ERROR("%s - [%i] Failed to initiate SSL: %s", log_prefix, node->id, handle->errstr);
1507 SSL_free(tls_session->ssl);
1508 talloc_free(conn);
1509 return NULL;
1510 }
1511 }
1512#endif
1513
1514 if (node->cluster->conf->password) {
1515 if (node->cluster->conf->username) {
1516 DEBUG3("%s - [%i] Executing: AUTH %s %s", log_prefix, node->id,
1517 node->cluster->conf->username,
1518 node->cluster->conf->password);
1519 reply = redisCommand(handle, "AUTH %s %s",
1520 node->cluster->conf->username,
1521 node->cluster->conf->password);
1522 } else {
1523 DEBUG3("%s - [%i] Executing: AUTH %s", log_prefix, node->id, node->cluster->conf->password);
1524 reply = redisCommand(handle, "AUTH %s", node->cluster->conf->password);
1525 }
1526 if (!reply) {
1527 ERROR("%s - [%i] Failed authenticating: %s", log_prefix, node->id, handle->errstr);
1528 error:
1529 if (reply) fr_redis_reply_free(&reply);
1530 talloc_free(conn);
1531 return NULL;
1532 }
1533
1534 switch (reply->type) {
1535 case REDIS_REPLY_STATUS:
1536 if (strcmp(reply->str, "OK") != 0) {
1537 ERROR("%s - [%i] Failed authenticating: %s", log_prefix,
1538 node->id, reply->str);
1539 goto error;
1540 }
1541 fr_redis_reply_free(&reply);
1542 break; /* else it's OK */
1543
1544 case REDIS_REPLY_ERROR:
1545 ERROR("%s - [%i] Failed authenticating: %s", log_prefix, node->id, reply->str);
1546 goto error;
1547
1548 default:
1549 ERROR("%s - [%i] Unexpected reply of type %s to AUTH", log_prefix, node->id,
1550 fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
1551 goto error;
1552 }
1553 }
1554
1555 if (node->cluster->conf->database) {
1556 DEBUG3("%s - [%i] Executing: SELECT %i", log_prefix, node->id, node->cluster->conf->database);
1557 reply = redisCommand(handle, "SELECT %i", node->cluster->conf->database);
1558 if (!reply) {
1559 ERROR("%s - [%i] Failed selecting database %i: %s", log_prefix, node->id,
1560 node->cluster->conf->database, handle->errstr);
1561 goto error;
1562 }
1563
1564 switch (reply->type) {
1565 case REDIS_REPLY_STATUS:
1566 if (strcmp(reply->str, "OK") != 0) {
1567 ERROR("%s - [%i] Failed selecting database %i: %s", log_prefix, node->id,
1568 node->cluster->conf->database, reply->str);
1569 goto error;
1570 }
1571 fr_redis_reply_free(&reply);
1572 break; /* else it's OK */
1573
1574 case REDIS_REPLY_ERROR:
1575 ERROR("%s - [%i] Failed selecting database %i: %s", log_prefix, node->id,
1576 node->cluster->conf->database, reply->str);
1577 goto error;
1578
1579 default:
1580 ERROR("%s - [%i] Unexpected reply of type %s, to SELECT", log_prefix, node->id,
1581 fr_table_str_by_value(redis_reply_types, reply->type, "<UNKNOWN>"));
1582 goto error;
1583 }
1584 }
1585
1586 return conn;
1587}
1588
1589/** Implements the key slot selection scheme used by freeradius
1590 *
1591 * Like the scheme in the clustering specification but with some differences
1592 * if the key is NULL or zero length, then a random keyslot is chosen.
1593 *
1594 * If there's only a single node in the cluster, then we avoid the CRC16
1595 * and just use key slot 0.
1596 *
1597 * @param cluster to determine key slot for.
1598 * @param request The current request.
1599 * @param key the key to resolve.
1600 * @param key_len the length of the key.
1601 * @return pointer to key slot key resolves to.
1602 */
1604 uint8_t const *key, size_t key_len)
1605{
1607
1608 if (!key || (key_len == 0)) {
1609 key_slot = &cluster->key_slot[(uint16_t)(fr_rand() & (KEY_SLOTS - 1))];
1610 ROPTIONAL(RDEBUG2, DEBUG2, "Key rand() -> slot %zu", key_slot - cluster->key_slot);
1611
1612 return key_slot;
1613 }
1614
1615 /*
1616 * Avoid CRC16 if we're operating with one cluster node or
1617 * without clustering.
1618 */
1619 if (fr_rb_num_elements(cluster->used_nodes) > 1) {
1620 key_slot = &cluster->key_slot[cluster_key_hash(key, key_len)];
1621 ROPTIONAL(RDEBUG2, DEBUG2, "Key \"%pV\" -> slot %zu",
1622 fr_box_strvalue_len((char const *)key, key_len), key_slot - cluster->key_slot);
1623
1624 return key_slot;
1625 }
1626 ROPTIONAL(RDEBUG3, DEBUG3, "Single node available, skipping key selection");
1627
1628 return &cluster->key_slot[0];
1629}
1630
1631/** Return the master node that would be used for a particular key
1632 *
1633 * @param[in] cluster To resolve key in.
1634 * @param[in] key_slot to resolve to node.
1635 * @return
1636 * - The current master node.
1637 * - NULL if no master node is currently assigned to a particular key slot.
1638 */
1640 fr_redis_cluster_key_slot_t const *key_slot)
1641{
1642 return &cluster->node[key_slot->master];
1643}
1644
1645/** Return the slave node that would be used for a particular key
1646 *
1647 * @param[in] cluster To resolve key in.
1648 * @param[in] key_slot To resolve to node.
1649 * @param[in] slave_num 0..n.
1650 * @return
1651 * - A slave node.
1652 * - NULL if no slave node is assigned, or is at the specific key slot.
1653 *
1654 */
1656 fr_redis_cluster_key_slot_t const *key_slot,
1657 uint8_t slave_num)
1658{
1659 if (slave_num >= key_slot->slave_num) return NULL; /* No slave available */
1660
1661 return &cluster->node[key_slot->slave[slave_num]];
1662}
1663
1664/** Return the ipaddr of a particular node
1665 *
1666 * @param[out] out Ipaddr of the node.
1667 * @param[in] node to get ip address from.
1668 * @return
1669 * - 0 on success.
1670 * - -1 on failure (node is NULL).
1671 */
1673{
1674 if (!node) return -1;
1675
1676 memcpy(out, &node->addr.inet.dst_ipaddr, sizeof(*out));
1677
1678 return 0;
1679}
1680
1681/** Return the port of a particular node
1682 *
1683 * @param[out] out Port of the node.
1684 * @param[in] node to get ip address from.
1685 * @return
1686 * - 0 on success.
1687 * - -1 on failure (node is NULL).
1688 */
1690{
1691 if (!node) return -1;
1692
1693 *out = node->addr.inet.dst_port;
1694
1695 return 0;
1696}
1697
1698/** Resolve a key to a pool, and reserve a connection in that pool
1699 *
1700 * This should be used with #fr_redis_cluster_state_next, and #fr_redis_command_status, to
1701 * transparently locate the cluster node we need to perform the operation on.
1702 *
1703 * Example code below shows how this function is used in conjunction
1704 * with #fr_redis_cluster_state_next to follow redirects, and reconnect handles.
1705 *
1706 @code{.c}
1707 int s_ret;
1708 redis_conn_state state;
1709 fr_redis_conn_t *conn;
1710 redisReply *reply;
1711 fr_redis_rcode_t status;
1712
1713 for (s_ret = fr_redis_cluster_state_init(&state, &conn, cluster, key, key_len, false);
1714 s_ret == REDIS_RCODE_TRY_AGAIN,
1715 s_ret = fr_redis_cluster_state_next(&state, &conn, cluster, request, status, &reply)) {
1716 reply = redisCommand(conn->handle, "SET foo bar");
1717 status = fr_redis_command_status(conn, reply);
1718 }
1719 // Reply is freed if ret == REDIS_RCODE_TRY_AGAIN, but left in all other cases to allow error
1720 // processing, or extraction of results.
1721 fr_redis_reply_free(&reply);
1722 if (s_ret != REDIS_RCODE_SUCCESS) {
1723 // Error
1724 }
1725 // Success
1726 @endcode
1727 *
1728 * @param[out] state to track current pool and various counters, will be initialised.
1729 * @param[out] conn Where to write the reserved connection to.
1730 * @param[in] cluster of pools.
1731 * @param[in] request The current request.
1732 * @param[in] key to resolve to a cluster node/pool. If no key is NULL or key_len is 0 a random
1733 * slot will be chosen.
1734 * @param[in] key_len Length of the key.
1735 * @param[in] read_only If true, will use random slave pool in preference to the master, falling
1736 * back to the master if no slaves are available.
1737 * @return
1738 * - REDIS_RCODE_TRY_AGAIN - try your command with this connection (provided via command).
1739 * - REDIS_RCODE_RECONNECT - when no additional connections available.
1740 */
1742 fr_redis_cluster_t *cluster, request_t *request,
1743 uint8_t const *key, size_t key_len, bool read_only)
1744{
1746 fr_redis_cluster_key_slot_t const *key_slot;
1747 uint8_t first, i;
1748 uint64_t used_nodes;
1749
1750 fr_assert(cluster);
1751 fr_assert(state);
1752 fr_assert(conn);
1753
1754 memset(state, 0, sizeof(*state));
1755 *conn = NULL; /* Better safe than exploding */
1756
1757 used_nodes = fr_rb_num_elements(cluster->used_nodes);
1758 if (used_nodes == 0) {
1759 ROPTIONAL(REDEBUG, ERROR, "No nodes in cluster");
1760 return REDIS_RCODE_RECONNECT;
1761 }
1762
1763again:
1764 key_slot = fr_redis_cluster_slot_by_key(cluster, request, key, key_len);
1765
1766 /*
1767 * 1. Try each of the slaves for the key slot
1768 * 2. Fall through to trying the master, and a single alternate node.
1769 */
1770 if (read_only) {
1771 first = fr_rand() & key_slot->slave_num;
1772 for (i = 0; i < key_slot->slave_num; i++) {
1773 uint8_t node_id;
1774
1775 node_id = key_slot->slave[(first + i) % key_slot->slave_num];
1776 node = &cluster->node[node_id];
1777 *conn = fr_pool_connection_get(node->pool, request);
1778 if (!*conn) {
1779 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] No connections available (key slot %zu slave %i)",
1780 node->id, key_slot - cluster->key_slot, (first + i) % key_slot->slave_num);
1781 cluster->remap_needed = true;
1782 continue; /* Continue until we find a live pool */
1783 }
1784
1785 goto finish;
1786 }
1787 /* Fall through to using key slot master or alternate */
1788 }
1789
1790 /*
1791 * 1. Try the master for the key slot
1792 * 2. If unavailable search for any pools with handles available
1793 * 3. If there are no pools, or we can't reserve a handle,
1794 * give up.
1795 */
1796 node = &cluster->node[key_slot->master];
1797 *conn = fr_pool_connection_get(node->pool, request);
1798 if (!*conn) {
1799 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] No connections available (key slot %zu master)",
1800 node->id, key_slot - cluster->key_slot);
1801 cluster->remap_needed = true;
1802
1803 if (cluster_node_find_live(&node, conn, request, cluster, node) < 0) return REDIS_RCODE_RECONNECT;
1804 }
1805
1806finish:
1807 /*
1808 * Something set the remap_needed flag, and we have a live connection
1809 */
1810 if (cluster->remap_needed) {
1811 if (fr_redis_cluster_remap(request, cluster, *conn) == FR_REDIS_CLUSTER_RCODE_SUCCESS) {
1812 fr_pool_connection_release(node->pool, request, *conn);
1813 goto again; /* New map, try again */
1814 }
1815 ROPTIONAL(RPDEBUG2, PDEBUG2, "%s", "");
1816 }
1817
1818 state->node = node;
1819 state->key = key;
1820 state->key_len = key_len;
1821
1822 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] >>> Sending command(s) to %s:%i",
1823 state->node->id, state->node->name, state->node->addr.inet.dst_port);
1824
1825 return REDIS_RCODE_TRY_AGAIN;
1826}
1827
1828/** Get the next connection to attempt a command against
1829 *
1830 * Will process reconnect and redirect states performing the actions necessary.
1831 *
1832 * - May trigger a cluster remap on receiving a #REDIS_RCODE_MOVE status.
1833 * - May perform a temporary redirect on receiving a #REDIS_RCODE_ASK status.
1834 * - May reserve a new connection on receiving a #REDIS_RCODE_RECONNECT status.
1835 *
1836 * If a remap is in progress, has occurred within the last second, has recently failed,
1837 * or fails, the '-MOVE' will be treated as a temporary redirect (-ASK).
1838 *
1839 * This allows the server to be more responsive during remaps, as unless the worker has been
1840 * redirected to a node we don't currently have a pool for, it can grab a connection for the
1841 * node it was redirected to, and continue.
1842 *
1843 * @note Irrespective of return code, the connection passed via conn will be released,
1844 * A new connection to attempt command on will be provided via conn.
1845 *
1846 * @note reply will be automatically freed and set to NULL if a new connection is provided
1847 * in all other cases, the caller is responsible for freeing the reply.
1848 *
1849 * @param[in,out] state containing the current pool, and various counters which control
1850 * retries, and limit redirects.
1851 * @param[in,out] conn we received the '-ASK' or '-MOVE' redirect on. Will be replaced with a
1852 * connection in the new pool the key points to.
1853 * @param[in] request The current request.
1854 * @param[in] cluster of pools.
1855 * @param[in] status of the last command, must be #REDIS_RCODE_MOVE or #REDIS_RCODE_ASK.
1856 * @param[in] reply from last command. Freed if 0 is returned, else caller must free.
1857 * @return
1858 * - REDIS_RCODE_SUCCESS - on success.
1859 * - REDIS_RCODE_TRY_AGAIN - try new connection (provided via conn). Will free reply.
1860 * - REDIS_RCODE_ERROR - on failure or command error.
1861 * - REDIS_RCODE_RECONNECT - when no additional connections available.
1862 */
1864 fr_redis_cluster_t *cluster, request_t *request,
1865 fr_redis_rcode_t status, redisReply **reply)
1866{
1867 fr_assert(state && state->node && state->node->pool);
1868 fr_assert(conn && *conn);
1869
1870 if (*reply) fr_redis_reply_print(L_DBG_LVL_3, *reply, request, 0);
1871
1872 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] <<< Returned: %s",
1873 state->node->id, fr_table_str_by_value(redis_rcodes, status, "<UNKNOWN>"));
1874
1875 /*
1876 * Caller indicated we should close the connection
1877 */
1878 if (state->close_conn) {
1879 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Connection no longer viable, closing it", state->node->id);
1880 fr_pool_connection_close(state->node->pool, request, *conn);
1881 *conn = NULL;
1882 state->close_conn = false;
1883 }
1884
1885 /*
1886 * If we have a proven live connection, and something
1887 * has set the remap_needed flag, do that now before
1888 * releasing the connection.
1889 */
1890 if (cluster->remap_needed && *conn) switch(status) {
1891 case REDIS_RCODE_MOVE: /* We're going to remap anyway */
1892 case REDIS_RCODE_RECONNECT: /* The connection's dead */
1893 break;
1894
1895 default:
1896 /*
1897 * Remap the cluster. On success, will clear the
1898 * remap_needed flag.
1899 */
1900 if (fr_redis_cluster_remap(request, cluster, *conn) != FR_REDIS_CLUSTER_RCODE_SUCCESS)
1901 ROPTIONAL(RPDEBUG2, PDEBUG2, "%s", "");
1902 }
1903
1904 /*
1905 * Check the result of the last redis command, and do
1906 * something appropriate.
1907 */
1908 switch (status) {
1910 fr_pool_connection_release(state->node->pool, request, *conn);
1911 *conn = NULL;
1912 return REDIS_RCODE_SUCCESS;
1913
1914 /*
1915 * Command error, not fixable.
1916 */
1918 case REDIS_RCODE_ERROR:
1919 ROPTIONAL(RPEDEBUG, PERROR, "[%i] Command failed", state->node->id);
1920 fr_pool_connection_release(state->node->pool, request, *conn);
1921 *conn = NULL;
1922 return REDIS_RCODE_ERROR;
1923
1924 /*
1925 * Cluster's unstable, try again.
1926 */
1928 if (state->retries++ >= cluster->conf->max_retries) {
1929 ROPTIONAL(REDEBUG, ERROR, "[%i] Hit maximum retry attempts", state->node->id);
1930 fr_pool_connection_release(state->node->pool, request, *conn);
1931 *conn = NULL;
1932 return REDIS_RCODE_ERROR;
1933 }
1934
1935 if (!*conn) *conn = fr_pool_connection_get(state->node->pool, request);
1936
1937 if (fr_time_delta_ispos(cluster->conf->retry_delay)) nanosleep(&fr_time_delta_to_timespec(cluster->conf->retry_delay), NULL);
1938 goto try_again;
1939
1940 /*
1941 * Connection's dead, check to see if we can switch nodes,
1942 * or, failing that, reconnect the connection.
1943 */
1945 {
1946 fr_redis_cluster_key_slot_t const *key_slot;
1947
1948 ROPTIONAL(RPERROR, PERROR, "[%i] Failed communicating with %s:%i",
1949 state->node->id, state->node->name,
1950 state->node->addr.inet.dst_port);
1951
1952 fr_pool_connection_close(state->node->pool, request, *conn); /* He's dead jim */
1953
1954 if (state->reconnects++ > state->in_pool) {
1955 ROPTIONAL(REDEBUG, ERROR, "[%i] Hit maximum reconnect attempts", state->node->id);
1956 cluster->remap_needed = true;
1957 return REDIS_RCODE_RECONNECT;
1958 }
1959
1960 /*
1961 * Refresh the key slot
1962 */
1963 key_slot = fr_redis_cluster_slot_by_key(cluster, request, state->key, state->key_len);
1964 state->node = &cluster->node[key_slot->master];
1965
1966 *conn = fr_pool_connection_get(state->node->pool, request);
1967 if (!*conn) {
1968 ROPTIONAL(REDEBUG, ERROR, "[%i] No connections available for %s:%i",
1969 state->node->id, state->node->name, state->node->addr.inet.dst_port);
1970 cluster->remap_needed = true;
1971
1972 if (cluster_node_find_live(&state->node, conn, request,
1973 cluster, state->node) < 0) return REDIS_RCODE_RECONNECT;
1974
1975 goto try_again;
1976 }
1977
1978 state->retries = 0;
1979 }
1980 goto try_again;
1981
1982 /*
1983 * -MOVE is treated identically to -ASK, except it may
1984 * trigger a cluster remap.
1985 */
1986 case REDIS_RCODE_MOVE:
1987 fr_assert(*reply);
1988
1989 if (*conn && (fr_redis_cluster_remap(request, cluster, *conn) != FR_REDIS_CLUSTER_RCODE_SUCCESS)) {
1990 ROPTIONAL(RPDEBUG2, PDEBUG2, "%s", "");
1991 }
1993
1994 /*
1995 * -ASK process a redirect.
1996 */
1997 case REDIS_RCODE_ASK:
1998 {
2000
2001 fr_pool_connection_release(state->node->pool, request, *conn); /* Always release the old connection */
2002
2003 if (!fr_cond_assert(*reply)) return REDIS_RCODE_ERROR;
2004
2005 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Processing redirect \"%s\"", state->node->id, (*reply)->str);
2006 if (state->redirects++ >= cluster->conf->max_redirects) {
2007 ROPTIONAL(REDEBUG, ERROR, "[%i] Reached max_redirects (%i)", state->node->id, state->redirects);
2008 return REDIS_RCODE_ERROR;
2009 }
2010
2011 switch (cluster_redirect(&new, cluster, *reply)) {
2013 if (new == state->node) {
2014 ROPTIONAL(REDEBUG, ERROR, "[%i] %s:%i issued redirect to itself", state->node->id,
2015 state->node->name, state->node->addr.inet.dst_port);
2016 return REDIS_RCODE_ERROR;
2017 }
2018
2019 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] Redirected from %s:%i to [%i] %s:%i",
2020 state->node->id, state->node->name,
2021 state->node->addr.inet.dst_port, new->id, new->name, new->addr.inet.dst_port);
2022 state->node = new;
2023
2024 *conn = fr_pool_connection_get(state->node->pool, request);
2025 if (!*conn) return REDIS_RCODE_RECONNECT;
2026
2027 /*
2028 * Reset these counters, their scope is
2029 * a single node in the cluster.
2030 */
2031 state->reconnects = 0;
2032 state->retries = 0;
2033 state->in_pool = fr_pool_state(state->node->pool)->num;
2034 goto try_again;
2035
2037 cluster->remap_needed = true;
2038 return REDIS_RCODE_RECONNECT;
2039
2040 default:
2041 return REDIS_RCODE_ERROR;
2042 }
2043 }
2044 }
2045
2046try_again:
2047 ROPTIONAL(RDEBUG2, DEBUG2, "[%i] >>> Sending command(s) to %s:%i",
2048 state->node->id, state->node->name, state->node->addr.inet.dst_port);
2049
2050 fr_redis_reply_free(&*reply);
2051 *reply = NULL;
2052
2053 return REDIS_RCODE_TRY_AGAIN;
2054}
2055
2056/** Get the pool associated with a node in the cluster
2057 *
2058 * @note This is used for testing only. It's not ifdef'd out because
2059 * tests need to run against production builds too.
2060 *
2061 * @param[out] pool associated with the node.
2062 * @param[in] cluster to search for node in.
2063 * @param[in] node_addr to retrieve pool for. Specifies IP and port of node.
2064 * @param[in] create Establish a connection to the specified node if it
2065 * was previously unknown to the cluster client.
2066 * @return
2067 * - 0 on success.
2068 * - -1 if no such node exists.
2069 */
2071 fr_socket_t *node_addr, bool create)
2072{
2073 fr_redis_cluster_node_t find, *found;
2074
2075 find.addr = (fr_socket_t) {
2076 .inet = {
2077 .dst_ipaddr = node_addr->inet.dst_ipaddr,
2078 .dst_port = node_addr->inet.dst_port,
2079 }
2080 };
2081
2082 pthread_mutex_lock(&cluster->mutex);
2083 found = fr_rb_find(cluster->used_nodes, &find);
2084 if (!found) {
2086 char buffer[INET6_ADDRSTRLEN];
2087 char const *hostname;
2088
2089 if (!create) {
2090 pthread_mutex_unlock(&cluster->mutex);
2091
2092 hostname = inet_ntop(node_addr->inet.dst_ipaddr.af, &node_addr->inet.dst_ipaddr.addr, buffer, sizeof(buffer));
2093 fr_assert(hostname); /* addr.ipaddr is probably corrupt */
2094 fr_strerror_printf("No existing node found with address %s, port %i",
2095 hostname, node_addr->inet.dst_port);
2096 return -1;
2097 }
2098
2099 spare = fr_fifo_peek(cluster->free_nodes);
2100 if (!spare) {
2101 fr_strerror_const("Reached maximum connected nodes");
2102 pthread_mutex_unlock(&cluster->mutex);
2103 return -1;
2104 }
2105 spare->pending_addr = find.addr; /* Set the config to be applied */
2106 if (cluster_node_connect(cluster, spare) < 0) {
2107 pthread_mutex_unlock(&cluster->mutex);
2108 return -1;
2109 }
2110 fr_rb_insert(cluster->used_nodes, spare);
2111 fr_fifo_pop(cluster->free_nodes);
2112 found = spare;
2113 }
2114 /*
2115 * Sanity checks
2116 */
2117 fr_assert(((talloc_array_length(cluster->node) - 1) - fr_rb_num_elements(cluster->used_nodes)) ==
2119 pthread_mutex_unlock(&cluster->mutex);
2120
2121 *pool = found->pool;
2122
2123 return 0;
2124}
2125
2126/** Return an array of IP addresses belonging to masters or slaves
2127 *
2128 * @note We return IP addresses as they're safe to use across cluster remaps.
2129 * @note Result array must be freed (talloc_free()) after use.
2130 *
2131 * @param[in] ctx to allocate array of IP addresses in.
2132 * @param[out] out Where to write the addresses of the nodes.
2133 * @param[in] cluster to search for nodes in.
2134 * @param[in] is_master If true, include the addresses of all the master nodes.
2135 * @param[in] is_slave If true, include the addresses of all the slaves nodes.
2136 * @return the number of ip addresses written to out.
2137 */
2139 fr_redis_cluster_t *cluster, bool is_master, bool is_slave)
2140{
2141 uint64_t in_use = fr_rb_num_elements(cluster->used_nodes);
2144 uint8_t count;
2145 fr_socket_t *found;
2146
2147 if (in_use == 0) {
2148 *out = NULL;
2149 return 0;
2150 }
2151
2152 count = 0;
2153 found = talloc_zero_array(ctx, fr_socket_t, in_use);
2154 if (!found) {
2155 fr_strerror_const("Out of memory");
2156 return -1;
2157 }
2158
2159 pthread_mutex_lock(&cluster->mutex);
2160
2161 for (node = fr_rb_iter_init_inorder(&iter, cluster->used_nodes);
2162 node;
2163 node = fr_rb_iter_next_inorder(&iter)) {
2164 if ((is_master && node->is_master) || (is_slave && !node->is_master)) found[count++] = node->addr;
2165 }
2166
2167 pthread_mutex_unlock(&cluster->mutex);
2168
2169 if (count == 0) {
2170 *out = NULL;
2171 talloc_free(found);
2172 return 0;
2173 }
2174
2175 *out = found;
2176
2177 return count;
2178}
2179
2180/** Destroy mutex associated with cluster slots structure
2181 *
2182 * @param cluster being freed.
2183 * @return 0
2184 */
2186{
2187 pthread_mutex_destroy(&cluster->mutex);
2188
2189 return 0;
2190}
2191
2192/** Check if members of the cluster are above a certain version
2193 *
2194 * @param cluster to perform check on.
2195 * @param min_version that must be found on each node for the check to succeed.
2196 * Must be in the format @verbatim <major>.<minor>.<release> @endverbatim.
2197 * @return
2198 * - true if all contactable members are above min_version.
2199 * - false if at least one member if not above minimum version
2200 * (use #fr_strerror to retrieve node information).
2201 */
2202bool fr_redis_cluster_min_version(fr_redis_cluster_t *cluster, char const *min_version)
2203{
2206 fr_redis_conn_t *conn;
2207 int ret;
2208 char buffer[40];
2209 bool all_above = true;
2210
2211 pthread_mutex_lock(&cluster->mutex);
2212
2213 for (node = fr_rb_iter_init_inorder(&iter, cluster->used_nodes);
2214 node;
2215 node = fr_rb_iter_next_inorder(&iter)) {
2216 conn = fr_pool_connection_get(node->pool, NULL);
2217 if (!conn) continue;
2218
2219 /*
2220 * We don't care if we can't get the version
2221 * as we don't want to prevent the server from
2222 * starting if start == 0.
2223 */
2224 ret = fr_redis_get_version(buffer, sizeof(buffer), conn);
2225 fr_pool_connection_release(node->pool, NULL, conn);
2226 if (ret < 0) continue;
2227
2228 if (fr_redis_version_num(buffer) < fr_redis_version_num(min_version)) {
2229 fr_strerror_printf("Redis node %s:%i (currently v%s) needs update to >= v%s",
2230 node->name, node->addr.inet.dst_port, buffer, min_version);
2231 all_above = false;
2232 break;
2233 }
2234 }
2235
2236 pthread_mutex_unlock(&cluster->mutex);
2237
2238 return all_above;
2239}
2240
2241/** Allocate and initialise a new cluster structure
2242 *
2243 * This holds all the data necessary to manage a pool of pools for a specific redis cluster.
2244 *
2245 * @note Will not error out unless cs.pool.start > 0. This is consistent with other pool based
2246 * modules/code.
2247 *
2248 * @param ctx to link the lifetime of the cluster structure to.
2249 * @param module Configuration section to search for 'server' conf pairs in.
2250 * @param conf Base redis server configuration. Cluster nodes share database
2251 * number and password.
2252 * @param triggers_enabled Whether triggers should be enabled.
2253 * @param log_prefix Custom log prefix. Defaults to @verbatim rlm_<module> (<instance>) @endverbatim.
2254 * @param trigger_prefix Custom trigger prefix. Defaults to @verbatim modules.<module>.pool @endverbatim.
2255 * @param trigger_args Argument pairs to pass to the trigger in addition to Connection-Pool-Server,
2256 * and Connection-Pool-Port (which are always set by the cluster code).
2257 * @return
2258 * - New #fr_redis_cluster_t on success.
2259 * - NULL on error.
2260 */
2262 CONF_SECTION *module,
2264 bool triggers_enabled,
2265 char const *log_prefix,
2266 char const *trigger_prefix,
2267 fr_pair_list_t *trigger_args)
2268{
2269 uint8_t i;
2270 uint16_t s;
2271
2272 char const *cs_name1, *cs_name2;
2273
2274 CONF_PAIR *cp;
2275 int af = AF_UNSPEC; /* AF of first server */
2276
2277 uint64_t num_nodes;
2278 fr_redis_cluster_t *cluster;
2279
2280 fr_assert(triggers_enabled || !trigger_prefix);
2281 fr_assert(triggers_enabled || (!trigger_args || fr_pair_list_empty(trigger_args)));
2282
2283 MEM(cluster = talloc_zero(NULL, fr_redis_cluster_t));
2285
2286 cs_name1 = cf_section_name1(module);
2287 cs_name2 = cf_section_name2(module);
2288
2289 cluster->triggers_enabled = triggers_enabled;
2290 if (cluster->triggers_enabled) {
2291 /*
2292 * Setup trigger prefix
2293 */
2294 if (!trigger_prefix) {
2295 cluster->trigger_prefix = talloc_typed_asprintf(cluster, "modules.%s.pool", cs_name1);
2296 } else {
2297 cluster->trigger_prefix = talloc_strdup(cluster, trigger_prefix);
2298 }
2299
2300 /*
2301 * Duplicate the trigger arguments.
2302 */
2303 if (trigger_args) MEM(fr_pair_list_copy(cluster, &cluster->trigger_args, trigger_args) >= 0);
2304 }
2305
2306 /*
2307 * Setup log prefix
2308 */
2309 if (!log_prefix) {
2310 if (!cs_name2) cs_name2 = cs_name1;
2311 cluster->log_prefix = talloc_typed_asprintf(conf, "rlm_%s (%s)", cs_name1, cs_name2);
2312 } else {
2313 cluster->log_prefix = talloc_strdup(cluster, log_prefix);
2314 }
2315
2316 /*
2317 * Ensure we always have a pool section (even if it's empty)
2318 */
2319 if (!cf_section_find(module, "pool", NULL)) {
2320 (void) cf_section_alloc(module, module, "pool", NULL);
2321 }
2322
2323 /*
2324 * Parse TLS configuration
2325 */
2326 if (conf->use_tls) {
2327#ifdef HAVE_REDIS_SSL
2328 CONF_SECTION *tls_cs;
2329 fr_tls_conf_t *tls_conf;
2330
2331 tls_cs = cf_section_find(module, "tls", NULL);
2332 if (!tls_cs) {
2333 tls_cs = cf_section_alloc(module, module, "tls", NULL);
2334 }
2335
2336 tls_conf = fr_tls_conf_parse_client(tls_cs);
2337 if (!tls_conf) {
2338 ERROR("%s - Failed to parse TLS configuration", cluster->log_prefix);
2339 talloc_free(cluster);
2340 return NULL;
2341 }
2342
2343 cluster->ssl_ctx = fr_tls_ctx_alloc(tls_conf, true);
2344 if (!cluster->ssl_ctx) {
2345 ERROR("%s - Failed to allocate SSL context", cluster->log_prefix);
2346 talloc_free(cluster);
2347 return NULL;
2348 }
2349#else
2350 WARN("%s - No redis SSL support, ignoring \"use_tls = yes\"", cluster->log_prefix);
2351#endif
2352 }
2353
2354 if (conf->max_nodes == UINT8_MAX) {
2355 ERROR("%s - Maximum number of connected nodes allowed is %i", cluster->log_prefix, UINT8_MAX - 1);
2356 talloc_free(cluster);
2357 return NULL;
2358 }
2359
2360 if (conf->max_nodes == 0) {
2361 ERROR("%s - Minimum number of nodes allowed is 1", cluster->log_prefix);
2362 talloc_free(cluster);
2363 return NULL;
2364 }
2365
2366 cp = cf_pair_find(module, "server");
2367 if (!cp) {
2368 ERROR("%s - No servers configured", cluster->log_prefix);
2369 talloc_free(cluster);
2370 return NULL;
2371 }
2372
2373 cluster->module = module;
2374
2375 /*
2376 * Ensure the pool is freed at the same time as its
2377 * parent.
2378 *
2379 * We need to break the link between the cluster and
2380 * its parent context, as the two contexts may be
2381 * modified by multiple threads.
2382 */
2383 MEM(talloc_link_ctx(ctx, cluster) >= 0);
2384 MEM(cluster->node = talloc_zero_array(cluster, fr_redis_cluster_node_t, conf->max_nodes + 1));
2385 MEM(cluster->used_nodes = fr_rb_inline_alloc(cluster, fr_redis_cluster_node_t, rbnode, _cluster_node_cmp, NULL));
2386 MEM(cluster->free_nodes = fr_fifo_create(cluster, conf->max_nodes, NULL));
2387
2388 cluster->conf = conf;
2389
2390 pthread_mutex_init(&cluster->mutex, NULL);
2391 talloc_set_destructor(cluster, _fr_redis_cluster_free);
2392
2393 /*
2394 * Node id 0 is reserved, so we can detect misconfigured
2395 * clusters.
2396 */
2397 for (i = 1; i < (cluster->conf->max_nodes + 1); i++) {
2398 cluster->node[i].id = i;
2399 cluster->node[i].cluster = cluster;
2400
2401 /* Push them all into the queue */
2402 fr_fifo_push(cluster->free_nodes, &cluster->node[i]);
2403 }
2404
2405 /*
2406 * Don't connect to cluster nodes if we're just
2407 * checking the config.
2408 */
2409 if (check_config) return cluster;
2410
2411 /*
2412 * Populate the cluster with the bootstrap servers.
2413 *
2414 * If we fail getting a key_slot map here, then the
2415 * bootstrap servers are distributed evenly through
2416 * the key slots.
2417 *
2418 * This allows the server to start, and potentially,
2419 * a valid map to be applied, once the server starts
2420 * processing requests.
2421 */
2422 do {
2423 char const *server;
2425 fr_redis_conn_t *conn;
2426 redisReply *map;
2427 size_t j, k;
2428
2429 node = fr_fifo_peek(cluster->free_nodes);
2430 if (!node) {
2431 ERROR("%s - Number of bootstrap servers exceeds 'max_nodes'", cluster->log_prefix);
2432
2433 error:
2434 talloc_free(cluster);
2435 return NULL;
2436 }
2437
2438 server = cf_pair_value(cp);
2439 if (fr_inet_pton_port(&node->pending_addr.inet.dst_ipaddr, &node->pending_addr.inet.dst_port, server,
2440 talloc_array_length(server) - 1, af, true, true) < 0) {
2441 PERROR("%s - Failed parsing server \"%s\"", cluster->log_prefix, server);
2442 goto error;
2443 }
2444 if (!node->pending_addr.inet.dst_port) node->pending_addr.inet.dst_port = conf->port;
2445
2446 if (cluster_node_connect(cluster, node) < 0) {
2447 WARN("%s - Connecting to %s:%i failed", cluster->log_prefix, node->name, node->pending_addr.inet.dst_port);
2448 continue;
2449 }
2450
2451 if (!fr_rb_insert(cluster->used_nodes, node)) {
2452 WARN("%s - Skipping duplicate bootstrap server \"%s\"", cluster->log_prefix, server);
2453 continue;
2454 }
2455 node->is_active = true;
2456 fr_fifo_pop(cluster->free_nodes);
2457
2458 /*
2459 * Prefer the same IPaddr family as the first node
2460 */
2461 if (af == AF_UNSPEC) af = node->addr.inet.dst_ipaddr.af;
2462
2463 /*
2464 * Only get cluster map config if required
2465 */
2466 if (fr_pool_start_num(node->pool) > 0) {
2467 /*
2468 * Fine to leave this node configured, if we do find
2469 * a live node, and it's not in the map, it'll be cleared out.
2470 */
2471 conn = fr_pool_connection_get(node->pool, NULL);
2472 if (!conn) {
2473 WARN("%s - Can't contact bootstrap server \"%s\"", cluster->log_prefix, server);
2474 continue;
2475 }
2476 } else {
2477 break;
2478 }
2479
2480 if (!cluster->conf->use_cluster_map) {
2481 fr_pool_connection_release(node->pool, NULL, conn);
2482 continue;
2483 }
2484
2485 switch (cluster_map_get(&map, conn)) {
2486 /*
2487 * We got a valid map! See if we can apply it...
2488 */
2490 fr_pool_connection_release(node->pool, NULL, conn);
2491
2492 DEBUG("%s - Cluster map consists of %zu key ranges", cluster->log_prefix, map->elements);
2493 for (j = 0; j < map->elements; j++) {
2494 redisReply *map_node = map->element[j];
2495
2496 DEBUG("%s - %zu - keys %lli-%lli", cluster->log_prefix, j,
2497 map_node->element[0]->integer,
2498 map_node->element[1]->integer);
2499 DEBUG("%s - master: %s:%lli", cluster->log_prefix,
2500 map_node->element[2]->element[0]->str,
2501 map_node->element[2]->element[1]->integer);
2502 for (k = 3; k < map_node->elements; k++) {
2503 DEBUG("%s - slave%zu: %s:%lli", cluster->log_prefix, k - 3,
2504 map_node->element[k]->element[0]->str,
2505 map_node->element[k]->element[1]->integer);
2506 }
2507 }
2508
2509 if (cluster_map_apply(cluster, map) < 0) {
2510 PWARN("%s: Applying cluster map failed", cluster->log_prefix);
2511 fr_redis_reply_free(&map);
2512 continue;
2513 }
2514 fr_redis_reply_free(&map);
2515
2516 return cluster;
2517
2518 /*
2519 * Unusable bootstrap node
2520 */
2522 PWARN("%s - Bootstrap server \"%s\" returned invalid data", cluster->log_prefix, server);
2523 fr_pool_connection_release(node->pool, NULL, conn);
2524 continue;
2525
2527 PWARN("%s - Can't contact bootstrap server \"%s\"", cluster->log_prefix, server);
2528 fr_pool_connection_close(node->pool, NULL, conn);
2529 continue;
2530
2531 /*
2532 * Clustering not enabled, or not supported,
2533 * by this node, skip it and check the others.
2534 */
2537 PDEBUG2("%s - Bootstrap server \"%s\" returned", cluster->log_prefix, server);
2538 fr_pool_connection_release(node->pool, NULL, conn);
2539 break;
2540 }
2541 } while ((cp = cf_pair_find_next(module, cp, "server")));
2542
2543 /*
2544 * Catch pool.start != 0
2545 */
2546 num_nodes = fr_rb_num_elements(cluster->used_nodes);
2547 if (!num_nodes) {
2548 ERROR("%s - Can't contact any bootstrap servers", cluster->log_prefix);
2549 goto error;
2550 }
2551
2552 /*
2553 * We've failed to apply a valid cluster map.
2554 * Distribute the node(s) throughout the key_slots,
2555 * hopefully we'll get one when we start processing
2556 * requests.
2557 */
2558 for (s = 0; s < KEY_SLOTS; s++) cluster->key_slot[s].master = (s % (uint16_t) num_nodes) + 1;
2559
2560 return cluster;
2561}
static int const char char buffer[256]
Definition acutest.h:576
va_list args
Definition acutest.h:770
#define L(_str)
Helper for initialising arrays of string literals.
Definition build.h:209
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition build.h:322
#define STRINGIFY(x)
Definition build.h:197
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:112
#define NUM_ELEMENTS(_t)
Definition build.h:337
bool check_config
Definition cf_file.c:67
Configuration AVP similar to a fr_pair_t.
Definition cf_priv.h:70
A section grouping multiple CONF_PAIR.
Definition cf_priv.h:101
CONF_PAIR * cf_pair_find_next(CONF_SECTION const *cs, CONF_PAIR const *prev, char const *attr)
Find a pair with a name matching attr, after specified pair.
Definition cf_util.c:1453
char const * cf_section_name2(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
Definition cf_util.c:1185
char const * cf_section_name1(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
Definition cf_util.c:1171
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
Definition cf_util.c:1028
CONF_PAIR * cf_pair_find(CONF_SECTION const *cs, char const *attr)
Search for a CONF_PAIR with a specific name.
Definition cf_util.c:1439
CONF_SECTION * cf_section_dup(TALLOC_CTX *ctx, CONF_SECTION *parent, CONF_SECTION const *cs, char const *name1, char const *name2, bool copy_meta)
Duplicate a configuration section.
Definition cf_util.c:928
char const * cf_pair_value(CONF_PAIR const *pair)
Return the value of a CONF_PAIR.
Definition cf_util.c:1594
#define cf_section_alloc(_ctx, _parent, _name1, _name2)
Definition cf_util.h:140
bool remapping
True when cluster is being remapped.
Definition cluster.c:257
static fr_redis_cluster_rcode_t cluster_map_get(redisReply **out, fr_redis_conn_t *conn)
Learn a new cluster layout by querying the node that issued the -MOVE.
Definition cluster.c:864
fr_redis_cluster_t * fr_redis_cluster_alloc(TALLOC_CTX *ctx, CONF_SECTION *module, fr_redis_conf_t *conf, bool triggers_enabled, char const *log_prefix, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Allocate and initialise a new cluster structure.
Definition cluster.c:2261
fr_redis_rcode_t fr_redis_cluster_state_next(fr_redis_cluster_state_t *state, fr_redis_conn_t **conn, fr_redis_cluster_t *cluster, request_t *request, fr_redis_rcode_t status, redisReply **reply)
Get the next connection to attempt a command against.
Definition cluster.c:1863
CONF_SECTION *fr_redis_conf_t * conf
< Module configuration.
Definition cluster.c:263
char const * log_prefix
What to prepend to log messages.
Definition cluster.c:252
fr_pool_t * pool
Pool associated with this node.
Definition cluster.c:226
char name[INET6_ADDRSTRLEN]
Buffer to hold IP string.
Definition cluster.c:216
#define SET_ACTIVE(_node)
bool is_active
Whether this node is in the active node set.
Definition cluster.c:229
fr_redis_cluster_node_t const * fr_redis_cluster_slave(fr_redis_cluster_t *cluster, fr_redis_cluster_key_slot_t const *key_slot, uint8_t slave_num)
Return the slave node that would be used for a particular key.
Definition cluster.c:1655
#define CLOSED_WEIGHT
What weight to give to nodes that had a connection closed recently.
Definition cluster.c:182
fr_redis_cluster_node_t const * fr_redis_cluster_master(fr_redis_cluster_t *cluster, fr_redis_cluster_key_slot_t const *key_slot)
Return the master node that would be used for a particular key.
Definition cluster.c:1639
#define MAX_SLAVES
Maximum number of slaves associated with a keyslot.
Definition cluster.c:173
static fr_redis_cluster_rcode_t cluster_node_connect(fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *node)
Establish a connection to a cluster node.
Definition cluster.c:372
static int cluster_map_node_validate(redisReply *node, int map_idx, int node_idx)
Validate a cluster map node entry.
Definition cluster.c:786
struct cluster_nodes_live_t::@52 node[UINT8_MAX - 1]
Array of live node IDs (and weights).
#define SET_ADDR(_addr, _map)
static uint16_t cluster_key_hash(uint8_t const *key, size_t key_len)
Resolve key to key slot.
Definition cluster.c:299
static fr_redis_cluster_rcode_t cluster_redirect(fr_redis_cluster_node_t **out, fr_redis_cluster_t *cluster, redisReply *reply)
Retrieve or associate a node with the server indicated in the redirect.
Definition cluster.c:1119
static fr_redis_cluster_rcode_t cluster_node_conf_from_redirect(uint16_t *key_slot, fr_socket_t *node_addr, redisReply *redirect)
Parse a -MOVED or -ASK redirect.
Definition cluster.c:447
static int cluster_node_find_live(fr_redis_cluster_node_t **live_node, fr_redis_conn_t **live_conn, request_t *request, fr_redis_cluster_t *cluster, fr_redis_cluster_node_t *skip)
Attempt to find a live pool in the cluster.
Definition cluster.c:1311
#define CLOSED_PERIOD
How recently must the closed have occurred for us to care.
Definition cluster.c:179
fr_redis_cluster_key_slot_t key_slot_pending[KEY_SLOTS]
Pending key slot table.
Definition cluster.c:276
fr_rb_tree_t * used_nodes
Tree of used nodes.
Definition cluster.c:273
fr_redis_cluster_key_slot_t key_slot[KEY_SLOTS]
Lookup table of slots to pools.
Definition cluster.c:275
#define KEY_SLOTS
Maximum number of keyslots (should not change).
Definition cluster.c:171
fr_redis_cluster_t * cluster
Common configuration (database number, password, etc..).
Definition cluster.c:224
char const * trigger_prefix
Trigger path.
Definition cluster.c:253
fr_rb_node_t rbnode
Entry into the tree of redis nodes.
Definition cluster.c:214
bool remap_needed
Set true if at least one cluster node is definitely unreachable.
Definition cluster.c:258
fr_time_t last_updated
Last time the cluster mappings were updated.
Definition cluster.c:260
#define SET_INACTIVE(_node)
int fr_redis_cluster_pool_by_node_addr(fr_pool_t **pool, fr_redis_cluster_t *cluster, fr_socket_t *node_addr, bool create)
Get the pool associated with a node in the cluster.
Definition cluster.c:2070
fr_pair_list_t trigger_args
Arguments to pass to triggers.
Definition cluster.c:254
ssize_t fr_redis_cluster_node_addr_by_role(TALLOC_CTX *ctx, fr_socket_t *out[], fr_redis_cluster_t *cluster, bool is_master, bool is_slave)
Return an array of IP addresses belonging to masters or slaves.
Definition cluster.c:2138
fr_socket_t pending_addr
New node address to be applied when the pool is reconnected.
Definition cluster.c:221
uint8_t next
Next index in live.
Definition cluster.c:205
size_t fr_redis_cluster_rcodes_table_len
Definition cluster.c:288
static void _cluster_node_conf_apply(fr_pool_t *pool, void *opaque)
Reconnect callback to apply new pool config.
Definition cluster.c:340
uint8_t slave[MAX_SLAVES]
R/O node (slave) for this key slot.
Definition cluster.c:242
int fr_redis_cluster_port(uint16_t *out, fr_redis_cluster_node_t const *node)
Return the port of a particular node.
Definition cluster.c:1689
static int _fr_redis_cluster_free(fr_redis_cluster_t *cluster)
Destroy mutex associated with cluster slots structure.
Definition cluster.c:2185
#define FAILED_WEIGHT
What weight to give to nodes that had a spawn failure recently.
Definition cluster.c:188
uint8_t master
R/W node (master) for this key slot.
Definition cluster.c:244
static int cluster_node_pool_health(fr_time_t now, fr_pool_state_t const *state)
Try to determine the health of a cluster node passively by examining its pool state.
Definition cluster.c:1209
static fr_redis_cluster_rcode_t cluster_node_ping(request_t *request, fr_redis_cluster_node_t *node, fr_redis_conn_t *conn)
Issue a ping request against a cluster node.
Definition cluster.c:1244
void * fr_redis_cluster_conn_create(TALLOC_CTX *ctx, void *instance, fr_time_delta_t timeout)
Create a new connection to a Redis node.
Definition cluster.c:1468
fr_redis_cluster_key_slot_t const * fr_redis_cluster_slot_by_key(fr_redis_cluster_t *cluster, request_t *request, uint8_t const *key, size_t key_len)
Implements the key slot selection scheme used by freeradius.
Definition cluster.c:1603
static int8_t _cluster_node_cmp(void const *one, void const *two)
Compare two redis nodes to check equality.
Definition cluster.c:323
fr_redis_cluster_rcode_t fr_redis_cluster_remap(request_t *request, fr_redis_cluster_t *cluster, fr_redis_conn_t *conn)
Perform a runtime remap of the cluster.
Definition cluster.c:1009
#define RELEASED_MIN_WEIGHT
Minimum weight to assign to node.
Definition cluster.c:195
static fr_redis_cluster_rcode_t cluster_map_apply(fr_redis_cluster_t *cluster, redisReply *reply)
Apply a cluster map received from a cluster node.
Definition cluster.c:538
uint8_t id
Node ID (index in node array).
Definition cluster.c:218
bool triggers_enabled
Whether triggers are enabled.
Definition cluster.c:255
fr_redis_rcode_t fr_redis_cluster_state_init(fr_redis_cluster_state_t *state, fr_redis_conn_t **conn, fr_redis_cluster_t *cluster, request_t *request, uint8_t const *key, size_t key_len, bool read_only)
Resolve a key to a pool, and reserve a connection in that pool.
Definition cluster.c:1741
static int _cluster_conn_free(fr_redis_conn_t *conn)
Callback for freeing a Redis connection.
Definition cluster.c:1452
fr_socket_t addr
Current node address.
Definition cluster.c:220
fr_table_num_sorted_t const fr_redis_cluster_rcodes_table[]
Definition cluster.c:281
bool is_master
Whether this node is a master.
Definition cluster.c:230
#define FAILED_PERIOD
How recently must the spawn failure occurred for us to care.
Definition cluster.c:185
fr_fifo_t * free_nodes
Queue of free nodes (or nodes waiting to be reused).
Definition cluster.c:272
#define RELEASED_PERIOD
Period after which we don't care about when the last connection was released.
Definition cluster.c:191
int fr_redis_cluster_ipaddr(fr_ipaddr_t *out, fr_redis_cluster_node_t const *node)
Return the ipaddr of a particular node.
Definition cluster.c:1672
pthread_mutex_t mutex
Mutex to synchronise cluster operations.
Definition cluster.c:278
uint8_t slave_num
Number of slaves associated with this key slot.
Definition cluster.c:243
fr_redis_cluster_node_t * node
Structure containing a node id, its address and a pool of its connections.
Definition cluster.c:269
bool fr_redis_cluster_min_version(fr_redis_cluster_t *cluster, char const *min_version)
Check if members of the cluster are above a certain version.
Definition cluster.c:2202
CONF_SECTION * pool_cs
Pool configuration section associated with node.
Definition cluster.c:227
Live nodes data, used to perform weighted random selection of alternative nodes.
Definition cluster.c:199
A redis cluster.
Definition cluster.c:251
Indexes in the fr_redis_cluster_node_t array for a single key slot.
Definition cluster.c:241
A Redis cluster node.
Definition cluster.c:213
Common functions for interacting with Redis cluster via Hiredis.
size_t key_len
Length of the key.
Definition cluster.h:55
bool close_conn
Set by caller of fr_redis_cluster_state_next, to indicate that connection must be closed,...
Definition cluster.h:50
uint32_t retries
How many times we've received TRYAGAIN.
Definition cluster.h:60
uint32_t reconnects
How many connections we've tried in this pool.
Definition cluster.h:62
uint8_t const * key
Key we performed hashing on.
Definition cluster.h:54
fr_redis_cluster_rcode_t
Return values for internal functions.
Definition cluster.h:67
@ FR_REDIS_CLUSTER_RCODE_IGNORED
Operation ignored.
Definition cluster.h:68
@ FR_REDIS_CLUSTER_RCODE_FAILED
Operation failed.
Definition cluster.h:70
@ FR_REDIS_CLUSTER_RCODE_SUCCESS
Operation completed successfully.
Definition cluster.h:69
@ FR_REDIS_CLUSTER_RCODE_BAD_INPUT
Validation error.
Definition cluster.h:73
@ FR_REDIS_CLUSTER_RCODE_NO_CONNECTION
Operation failed because we couldn't find a live connection.
Definition cluster.h:71
uint32_t redirects
How many redirects have we followed.
Definition cluster.h:58
fr_redis_cluster_node_t * node
Node we're communicating with.
Definition cluster.h:57
uint32_t in_pool
How many available connections are there in the pool.
Definition cluster.h:61
Redis connection sequence state.
Definition cluster.h:49
uint16_t fr_crc16_xmodem(uint8_t const *in, size_t in_len)
CRC16 implementation according to CCITT standards.
Definition crc16.c:91
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:139
#define MEM(x)
Definition debug.h:36
#define ERROR(fmt,...)
Definition dhcpclient.c:41
#define DEBUG(fmt,...)
Definition dhcpclient.c:39
void * fr_fifo_peek(fr_fifo_t *fi)
Examine the next element that would be popped.
Definition fifo.c:158
unsigned int fr_fifo_num_elements(fr_fifo_t *fi)
Return the number of elements in the fifo queue.
Definition fifo.c:170
int fr_fifo_push(fr_fifo_t *fi, void *data)
Push data onto the fifo.
Definition fifo.c:111
void * fr_fifo_pop(fr_fifo_t *fi)
Pop data off of the fifo.
Definition fifo.c:135
#define fr_fifo_create(_ctx, _max_entries, _node_free)
Creates a fifo.
Definition fifo.h:66
int fr_inet_pton(fr_ipaddr_t *out, char const *value, ssize_t inlen, int af, bool resolve, bool mask)
Simple wrapper to decide whether an IP value is v4 or v6 and call the appropriate parser.
Definition inet.c:778
int fr_inet_pton_port(fr_ipaddr_t *out, uint16_t *port_out, char const *value, ssize_t inlen, int af, bool resolve, bool mask)
Parses IPv4/6 address + port, to fr_ipaddr_t and integer (port)
Definition inet.c:937
int8_t fr_ipaddr_cmp(fr_ipaddr_t const *a, fr_ipaddr_t const *b)
Compare two ip addresses.
Definition inet.c:1346
int af
Address family.
Definition inet.h:64
IPv4/6 prefix.
#define PERROR(_fmt,...)
Definition log.h:228
#define REXDENT()
Exdent (unindent) R* messages by one level.
Definition log.h:443
#define DEBUG3(_fmt,...)
Definition log.h:266
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition log.h:528
#define RPDEBUG2(fmt,...)
Definition log.h:347
#define RDEBUG3(fmt,...)
Definition log.h:343
#define PDEBUG2(_fmt,...)
Definition log.h:272
#define RWARN(fmt,...)
Definition log.h:297
#define PWARN(_fmt,...)
Definition log.h:227
#define RERROR(fmt,...)
Definition log.h:298
#define RPERROR(fmt,...)
Definition log.h:302
#define RINFO(fmt,...)
Definition log.h:296
#define RPEDEBUG(fmt,...)
Definition log.h:376
#define RINDENT()
Indent R* messages by one level.
Definition log.h:430
talloc_free(reap)
@ L_DBG_LVL_3
3rd highest priority debug messages (-xxx | -Xx).
Definition log.h:72
unsigned short uint16_t
unsigned int uint32_t
long int ssize_t
unsigned char uint8_t
#define UINT8_MAX
char const * inet_ntop(int af, void const *src, char *dst, size_t cnt)
Definition missing.c:443
int fr_pair_list_copy(TALLOC_CTX *ctx, fr_pair_list_t *to, fr_pair_list_t const *from)
Duplicate a list of pairs.
Definition pair.c:2319
void fr_pair_list_init(fr_pair_list_t *list)
Initialise a pair list header.
Definition pair.c:46
int fr_pool_start(fr_pool_t *pool)
Definition pool.c:1116
void fr_pool_connection_release(fr_pool_t *pool, request_t *request, void *conn)
Release a connection.
Definition pool.c:1407
fr_pool_state_t const * fr_pool_state(fr_pool_t *pool)
Get the number of connections currently in the pool.
Definition pool.c:1173
int fr_pool_reconnect(fr_pool_t *pool, request_t *request)
Mark connections for reconnection, and spawn at least 'start' connections.
Definition pool.c:1244
fr_pool_t * fr_pool_init(TALLOC_CTX *ctx, CONF_SECTION const *cs, void *opaque, fr_pool_connection_create_t c, fr_pool_connection_alive_t a, char const *log_prefix)
Create a new connection pool.
Definition pool.c:967
int fr_pool_connection_close(fr_pool_t *pool, request_t *request, void *conn)
Delete a connection from the connection pool.
Definition pool.c:1537
void * fr_pool_connection_get(fr_pool_t *pool, request_t *request)
Reserve a connection in the connection pool.
Definition pool.c:1392
int fr_pool_start_num(fr_pool_t *pool)
Connection pool get start.
Definition pool.c:1193
void fr_pool_enable_triggers(fr_pool_t *pool, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Enable triggers for a connection pool.
Definition pool.c:933
void fr_pool_reconnect_func(fr_pool_t *pool, fr_pool_reconnect_t reconnect)
Set a reconnection callback for the connection pool.
Definition pool.c:1224
A connection pool.
Definition pool.c:87
fr_time_t last_failed
Last time we tried to spawn a connection but failed.
Definition pool.h:50
fr_time_t last_closed
Last time a connection was closed.
Definition pool.h:57
fr_time_t last_released
Last time a connection was released.
Definition pool.h:56
uint32_t num
Number of connections in the pool.
Definition pool.h:67
#define fr_assert(_expr)
Definition rad_assert.h:38
#define REDEBUG(fmt,...)
Definition radclient.h:52
#define RDEBUG2(fmt,...)
Definition radclient.h:54
#define DEBUG2(fmt,...)
Definition radclient.h:43
#define WARN(fmt,...)
Definition radclient.h:47
#define INFO(fmt,...)
Definition radict.c:54
static rs_t * conf
Definition radsniff.c:53
static char const * hostname(char *buf, size_t buflen, uint32_t ipaddr)
Definition radwho.c:133
uint32_t fr_rand(void)
Return a 32-bit random number.
Definition rand.c:105
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
Return how many nodes there are in a tree.
Definition rb.c:781
void * fr_rb_iter_init_inorder(fr_rb_iter_inorder_t *iter, fr_rb_tree_t *tree)
Initialise an in-order iterator.
Definition rb.c:824
void * fr_rb_iter_next_inorder(fr_rb_iter_inorder_t *iter)
Return the next node.
Definition rb.c:850
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
#define fr_rb_inline_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black tree.
Definition rb.h:271
Iterator structure for in-order traversal of an rbtree.
Definition rb.h:321
The main red black tree structure.
Definition rb.h:73
fr_table_num_sorted_t const redis_rcodes[]
Definition redis.c:40
redisContext * handle
Hiredis context used when issuing commands.
Definition base.h:101
fr_redis_cluster_node_t * node
Node this connection is to.
Definition base.h:102
void fr_redis_reply_print(fr_log_lvl_t lvl, redisReply *reply, request_t *request, int idx)
Print the response data in a useful treelike form.
Definition redis.c:141
#define REDIS_ERROR_MOVED_STR
Definition base.h:46
static void fr_redis_reply_free(redisReply **reply)
Wrap freeReplyObject so we consistently check for NULL pointers.
Definition base.h:64
uint32_t fr_redis_version_num(char const *version)
Convert version string into a 32bit unsigned integer for comparisons.
Definition redis.c:688
fr_redis_rcode_t fr_redis_command_status(fr_redis_conn_t *conn, redisReply *reply)
Check the reply for errors.
Definition redis.c:71
fr_table_num_sorted_t const redis_reply_types[]
Definition redis.c:30
#define REDIS_ERROR_ASK_STR
Definition base.h:47
fr_redis_rcode_t
Codes are ordered inversely by priority.
Definition base.h:87
@ REDIS_RCODE_RECONNECT
Transitory error, caller should retry the operation with a new connection.
Definition base.h:91
@ REDIS_RCODE_SUCCESS
Operation was successful.
Definition base.h:88
@ REDIS_RCODE_MOVE
Attempt operation on an alternative node with remap.
Definition base.h:94
@ REDIS_RCODE_TRY_AGAIN
Try the operation again.
Definition base.h:90
@ REDIS_RCODE_NO_SCRIPT
Script doesn't exist.
Definition base.h:95
@ REDIS_RCODE_ASK
Attempt operation on an alternative node.
Definition base.h:93
@ REDIS_RCODE_ERROR
Unrecoverable library/server error.
Definition base.h:89
fr_redis_rcode_t fr_redis_get_version(char *out, size_t out_len, fr_redis_conn_t *conn)
Get the version of Redis running on the remote server.
Definition redis.c:638
Configuration parameters for a redis connection.
Definition base.h:109
Connection handle, holding a redis context.
Definition base.h:100
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
Definition snprintf.c:689
return count
Definition module.c:163
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
An element in a lexicographically sorted array of name to num mappings.
Definition table.h:49
char * talloc_typed_asprintf(TALLOC_CTX *ctx, char const *fmt,...)
Call talloc vasprintf, setting the type on the new chunk correctly.
Definition talloc.c:492
int talloc_link_ctx(TALLOC_CTX *parent, TALLOC_CTX *child)
Link two different parent and child contexts, so the child is freed before the parent.
Definition talloc.c:171
static int64_t fr_time_to_sec(fr_time_t when)
Convert an fr_time_t (internal time) to number of sec since the unix epoch (wallclock time)
Definition time.h:731
#define fr_time_delta_to_timespec(_delta)
Convert a delta to a timespec.
Definition time.h:666
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_delta_to_timeval(_delta)
Convert a delta to a timeval.
Definition time.h:656
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
Definition time.h:637
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
void trigger_args_afrom_server(TALLOC_CTX *ctx, fr_pair_list_t *list, char const *server, uint16_t port)
Create trigger arguments to describe the server the pool connects to.
Definition trigger.c:454
bool trigger_enabled(void)
Return whether triggers are enabled.
Definition trigger.c:135
Master include file to access all functions and structures in the library.
bool fr_pair_list_empty(fr_pair_list_t const *list)
Is a valuepair list empty.
void fr_pair_list_free(fr_pair_list_t *list)
Free memory used by a valuepair list.
int af
AF_INET, AF_INET6, or AF_UNIX.
Definition socket.h:78
Holds information necessary for binding or connecting to a socket.
Definition socket.h:63
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_strerror_const(_msg)
Definition strerror.h:223
#define fr_box_strvalue_len(_val, _len)
Definition value.h:286
static size_t char ** out
Definition value.h:997