36 #include <freeradius-devel/radiusd.h>
37 #include <freeradius-devel/rad_assert.h>
39 #include <cassandra.h>
67 pthread_mutex_t connect_mutex;
170 {
"any", CASS_CONSISTENCY_ANY },
171 {
"one", CASS_CONSISTENCY_ONE },
172 {
"two", CASS_CONSISTENCY_TWO },
173 {
"three", CASS_CONSISTENCY_THREE },
174 {
"quorum", CASS_CONSISTENCY_QUORUM },
175 {
"all", CASS_CONSISTENCY_ALL },
176 {
"each_quorum", CASS_CONSISTENCY_EACH_QUORUM },
177 {
"local_quorum", CASS_CONSISTENCY_LOCAL_QUORUM },
178 {
"local_one", CASS_CONSISTENCY_LOCAL_ONE },
183 {
"no", CASS_SSL_VERIFY_NONE },
184 {
"yes", CASS_SSL_VERIFY_PEER_CERT },
185 {
"identity", CASS_SSL_VERIFY_PEER_IDENTITY },
262 switch (message->severity) {
263 case CASS_LOG_CRITICAL:
266 ERROR(
"rlm_sql_cassandra: %s[%d] %s: %s",
267 message->file, message->line, message->function, message->message);
269 ERROR(
"rlm_sql_cassandra: %s", message->message);
275 WARN(
"rlm_sql_cassandra: %s[%d] %s: %s",
276 message->file, message->line, message->function, message->message);
278 WARN(
"rlm_sql_cassandra: %s", message->message);
283 case CASS_LOG_DISABLED:
284 case CASS_LOG_LAST_ENTRY:
286 INFO(
"rlm_sql_cassandra: %s[%d] %s: %s",
287 message->file, message->line, message->function, message->message);
289 INFO(
"rlm_sql_cassandra: %s", message->message);
297 DEBUG3(
"rlm_sql_cassandra: %s[%d] %s: %s",
298 message->file, message->line, message->function, message->message);
300 DEBUG2(
"rlm_sql_cassandra: %s", message->message);
317 talloc_free_children(conn->
log_ctx);
334 CC_HINT(format (printf, 2, 3));
339 talloc_free_children(conn->log_ctx);
342 conn->last_error.msg = talloc_vasprintf(conn->log_ctx, fmt, ap);
344 conn->last_error.type =
L_ERR;
349 if (config->
ssl) cass_ssl_free(config->
ssl);
353 #ifdef HAVE_PTHREAD_H
363 static bool version_done =
false;
365 bool do_latency_aware_routing =
false;
367 CassCluster *cluster;
371 #define DO_CASS_OPTION(_opt, _x) \
374 if ((_ret = (_x)) != CASS_OK) {\
375 ERROR("rlm_sql_cassandra: Error setting " _opt ": %s", cass_error_desc(_ret));\
376 return RLM_SQL_ERROR;\
383 INFO(
"rlm_sql_cassandra: Built against libcassandra version %d.%d.%d%s",
384 CASS_VERSION_MAJOR, CASS_VERSION_MINOR, CASS_VERSION_PATCH, CASS_VERSION_SUFFIX);
389 cass_log_set_level(CASS_LOG_INFO);
394 #ifdef HAVE_PTHREAD_H
412 DEBUG4(
"rlm_sql_cassandra: Configuring driver's CassCluster structure");
413 cluster = driver->
cluster = cass_cluster_new();
434 if (consistency < 0) {
438 driver->
consistency = (CassConsistency)consistency;
448 cass_cluster_set_core_connections_per_host(driver->
cluster,
454 cass_cluster_set_max_connections_per_host(driver->
cluster,
469 cass_cluster_set_max_requests_per_flush(driver->
cluster,
475 cass_cluster_set_pending_requests_high_water_mark(driver->
cluster,
481 cass_cluster_set_pending_requests_high_water_mark(driver->
cluster,
487 cass_cluster_set_write_bytes_high_water_mark(driver->
cluster,
493 cass_cluster_set_write_bytes_low_water_mark(driver->
cluster,
504 cass_cluster_set_max_concurrent_requests_threshold(driver->
cluster,
510 cass_cluster_set_max_concurrent_creation(driver->
cluster, driver->
spawn_max));
518 if (delay) cass_cluster_set_reconnect_wait_time(driver->
cluster, delay);
527 cass_cluster_set_load_balance_dc_aware(driver->
cluster,
533 if (do_latency_aware_routing) {
534 cass_double_t exclusion_threshold;
535 uint64_t scale_ms, retry_period_ms, update_rate_ms;
540 scale_ms = (driver->
lar_scale.tv_sec * (uint64_t)1000) + (driver->
lar_scale.tv_usec / 1000);
547 cass_cluster_set_latency_aware_routing(driver->
cluster,
true);
550 cass_cluster_set_latency_aware_routing_settings(driver->
cluster,
564 ssl = driver->
ssl = cass_ssl_new();
571 if (verify_cert < 0) {
572 ERROR(
"rlm_sql_cassandra: Invalid certificate validation type \"%s\", "
576 cass_ssl_set_verify_flags(ssl, verify_cert);
579 DEBUG2(
"rlm_sql_cassandra: Enabling TLS");
594 cass_cluster_set_ssl(cluster, ssl);
597 driver->
session = cass_session_new();
607 DEBUG2(
"rlm_sql_cassandra: Socket destructor called, closing socket");
632 #ifdef HAVE_PTHREAD_H
642 DEBUG2(
"rlm_sql_cassandra: Connecting to Cassandra cluster");
644 ret = cass_future_error_code(future);
645 if (ret != CASS_OK) {
649 cass_future_error_message(future, &msg, &msg_len);
650 ERROR(
"rlm_sql_cassandra: Unable to connect: [%x] %s", (
int)ret, msg);
651 cass_future_free(future);
655 cass_future_free(future);
658 #ifdef HAVE_PTHREAD_H
662 conn->
log_ctx = talloc_pool(conn, 1024);
671 CassStatement *statement;
675 statement = cass_statement_new_n(query, talloc_array_length(query) - 1, 0);
678 future = cass_session_execute(conf->
session, statement);
679 cass_statement_free(statement);
681 ret = cass_future_error_code(future);
682 if (ret != CASS_OK) {
686 cass_future_error_message(future, &error, &len);
688 cass_future_free(future);
691 case CASS_ERROR_SERVER_SYNTAX_ERROR:
692 case CASS_ERROR_SERVER_INVALID_QUERY:
700 conn->
result = cass_future_get_result(future);
701 cass_future_free(future);
710 return conn->
result ? cass_result_column_count(conn->
result) : 0;
717 return conn->
result ? cass_result_row_count(conn->
result) : 0;
724 unsigned int fields, i;
730 MEM(names = talloc_array(handle,
char const *, fields));
732 for (i = 0; i < fields; i++) {
733 const char *col_name;
737 cass_result_column_name(conn->
result, i, &col_name, &col_name_len);
750 CassRow
const *cass_row;
754 #define RLM_CASS_ERR_DATA_RETRIVE(_t) \
756 char const *_col_name;\
757 size_t _col_name_len;\
759 if ((_ret = cass_result_column_name(conn->result, i, &_col_name, &_col_name_len)) != CASS_OK) {\
760 _col_name = "<INVALID>";\
762 sql_set_last_error_printf(conn, "Failed to retrieve " _t " data at column %s (%d): %s", \
763 _col_name, i, cass_error_desc(_ret));\
764 TALLOC_FREE(handle->row);\
765 return RLM_SQL_ERROR;\
780 cass_row = cass_iterator_get_row(conn->
iterator);
786 talloc_free(handle->
row);
787 MEM(row = handle->
row = talloc_zero_array(handle,
char *, fields + 1));
789 for (i = 0; i < fields; i++) {
790 CassValue
const *value;
793 value = cass_row_get_column(cass_row, i);
795 if (cass_value_is_null(value) == cass_true)
continue;
797 type = cass_value_type(value);
799 case CASS_VALUE_TYPE_ASCII:
800 case CASS_VALUE_TYPE_TEXT:
801 case CASS_VALUE_TYPE_VARCHAR:
808 MEM(row[i] = talloc_array(row,
char, len + 1));
809 memcpy(row[i], str, len);
814 case CASS_VALUE_TYPE_BOOLEAN:
820 MEM(row[i] = talloc_zero_array(row,
char, 2));
821 row[i][0] = (bv == cass_false) ?
'0' :
'1';
825 case CASS_VALUE_TYPE_INT:
831 MEM(row[i] = talloc_asprintf(row,
"%"PRId32, (int32_t)i32v));
835 case CASS_VALUE_TYPE_TIMESTAMP:
836 case CASS_VALUE_TYPE_BIGINT:
842 MEM(row[i] = talloc_asprintf(row,
"%"PRId64, (int64_t)i64v));
846 case CASS_VALUE_TYPE_UUID:
847 case CASS_VALUE_TYPE_TIMEUUID:
852 MEM(row[i] = talloc_array(row,
char, CASS_UUID_STRING_LENGTH));
853 cass_uuid_string(uuid, row[i]);
859 const char *col_name;
862 if (cass_result_column_name(conn->
result, i, &col_name,
863 &col_name_len) != CASS_OK) col_name =
"<INVALID>";
866 "Failed to retrieve data at column %s (%d): Unsupported data type",
868 talloc_free(handle->
row);
882 if (handle->
row) TALLOC_FREE(handle->
row);
890 cass_result_free(conn->
result);
921 talloc_free_children(conn->
log_ctx);
942 .
name =
"rlm_sql_cassandra",
char const * lbdc_local_dc
The primary data center to try first.
#define PW_TYPE_FILE_INPUT
File matching value must exist, and must be readable.
rlm_sql_module_t rlm_sql_cassandra
uint32_t io_flush_requests_max
Maximum number of requests processed by an IO worker per flush.
General connection/server error.
#define pthread_mutex_init(_x, _y)
uint32_t spawn_threshold
Threshold for the maximum number of concurrent requests in-flight on a connection before creating a n...
Time value (struct timeval), only for config items.
static void _rlm_sql_cassandra_log(CassLogMessage const *message, UNUSED void *data)
Log callback for libcassandra.
Prototypes and functions for the SQL module.
uint32_t write_bytes_low
Low water mark for number of bytes outstanding on a connection.
log_type_t type
Type of log entry L_ERR, L_WARN, L_INFO, L_DBG etc..
CassResult const * result
Result from executing a query.
#define DEBUG_ENABLED3
True if global debug level 1-3 messages are enabled.
char const * tls_private_key_password
String to decrypt private key.
char const * msg
Log message.
struct timeval lar_scale
Weight given to older latencies when calculating the average latency of a node.
static int _sql_socket_destructor(rlm_sql_cassandra_conn_t *conn)
struct rlm_sql_cassandra_config rlm_sql_cassandra_config_t
Cassandra driver instance.
static CONF_PARSER load_balance_dc_aware_config[]
#define CONF_PARSER_TERMINATOR
char const * sql_server
Server to connect to.
#define DO_CASS_OPTION(_opt, _x)
static int sql_affected_rows(UNUSED rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t *config)
static int mod_instantiate(CONF_SECTION *conf, rlm_sql_config_t *config)
#define PW_TYPE_SECRET
Only print value if debug level >= 3.
#define RLM_CASS_ERR_DATA_RETRIVE(_t)
char const * tls_private_key_file
Private key for the certificate we present to the server.
bool token_aware_routing
Whether to use token aware routing.
Cassandra driver instance.
#define PW_TYPE_SUBSECTION
static sql_rcode_t sql_free_result(rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t *config)
bool done_connect_keyspace
Whether we've connected to a keyspace.
Defines a CONF_PAIR to C data type mapping.
static sql_rcode_t sql_fields(char const **out[], rlm_sql_handle_t *handle, rlm_sql_config_t *config)
uint32_t connections_per_host_max
Maximum number of connections to each server in each IO threads.
static sql_rcode_t sql_query(rlm_sql_handle_t *handle, rlm_sql_config_t *config, char const *query)
Cassandra cluster connection.
uint32_t pending_requests_low
Sets the low water mark for the number of requests queued waiting for a connection in a connection po...
char const * tls_certificate_file
Public certificate we present to the server.
struct rlm_sql_cassandra_conn rlm_sql_cassandra_conn_t
Cassandra cluster connection.
int fr_str2int(FR_NAME_NUMBER const *table, char const *name, int def)
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
#define pthread_mutex_unlock(_x)
static const FR_NAME_NUMBER verify_cert_table[]
static void sql_set_last_error_printf(rlm_sql_cassandra_conn_t *conn, char const *fmt,...) CC_HINT(format(printf
Replace the last error messages associated with the connection.
uint32_t spawn_max
The maximum number of connections that will be created concurrently.
static void sql_set_last_error(rlm_sql_cassandra_conn_t *conn, char const *message, size_t len)
Replace the last error messages associated with the connection.
char const * tls_ca_file
Path to the CA used to validate the server's certificate.
uint32_t sql_port
Port to connect to.
bool lbdc_allow_remote_dcs_for_local_cl
Allows remote hosts to be used if no local.
static size_t sql_error(UNUSED TALLOC_CTX *ctx, sql_log_entry_t out[], size_t outlen, rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t *config)
int cf_section_parse(CONF_SECTION *, void *base, CONF_PARSER const *variables)
Parse a configuration section into user-supplied variables.
struct timeval spawn_retry_delay
Amount of time to wait before attempting to reconnect.
uint32_t connections_per_host
Number of connections to each server in each IO thread.
CassSsl * ssl
Connection's SSL context.
char const * consistency_str
Level of consistency required.
CassConsistency consistency
Level of consistency converted to a constant.
uint32_t protocol_version
The protocol version.
uint32_t write_bytes_high
High water mark for the number of bytes outstanding on a connection.
uint32_t lbdc_hosts_per_remote_dc
The number of host used in each remote DC if no hosts are available in the local dc.
static CONF_PARSER latency_aware_routing_config[]
static int rlm_sql_cass_instances
char const * sql_password
Login password to use.
void * driver
Where drivers should write a pointer to their configurations.
static int sql_num_rows(rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t *config)
void * conn
Database specific connection handle.
CONF_SECTION * cf_section_sub_find(CONF_SECTION const *, char const *name)
Find a sub-section in a section.
CassCluster * cluster
Configuration of the cassandra cluster connection.
rlm_sql_row_t row
Row data from the last query.
char const * tls_verify_cert_str
Whether we validate the cert provided by the server.
static sql_rcode_t sql_finish_query(rlm_sql_handle_t *handle, rlm_sql_config_t *config)
static sql_rcode_t sql_socket_init(rlm_sql_handle_t *handle, rlm_sql_config_t *config, struct timeval const *timeout)
bool tcp_nodelay
Disable TCP naggle algorithm.
static const FR_NAME_NUMBER consistency_levels[]
static int sql_num_fields(rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t *config)
struct timeval lar_update_rate
The rate at which the best average latency is recomputed.
#define FR_CONF_OFFSET(_n, _t, _s, _f)
char const * sql_login
Login credentials to use.
TALLOC_CTX * log_ctx
Prevent unneeded memory allocation by keeping a permanent pool, to store log entries.
char * fr_asprint(TALLOC_CTX *ctx, char const *in, ssize_t inlen, char quote)
Escape string that may contain binary data, and write it to a new buffer.
static int _mod_destructor(rlm_sql_cassandra_config_t *config)
CassIterator * iterator
Row set iterator.
uint64_t lar_min_measured
The minimum number of measurements per-host required to be considered by the policy.
static sql_rcode_t sql_fetch_row(rlm_sql_row_t *out, rlm_sql_handle_t *handle, rlm_sql_config_t *config)
static const CONF_PARSER driver_config[]
uint32_t query_timeout
How long to allow queries to run for.
uint32_t tcp_keepalive
How often to send TCP keepalives.
bool load_balance_round_robin
Enable round robin load balancing.
char const * sql_db
Database to run queries against.
sql_log_entry_t last_error
uint32_t io_queue_size
Size of the the fixed size queue that stores pending requests.
uint32_t pending_requests_high
Sets the high water mark for the number of requests queued waiting for a connection in a connection p...
#define pthread_mutex_lock(_x)
struct timeval lar_retry_period
The amount of time a node is penalized by the policy before being given a second chance when the curr...
#define FR_TIMEVAL_TO_MS(_x)
String of printable characters.
#define FR_CONF_POINTER(_n, _t, _p)
static CONF_PARSER tls_config[]
uint32_t event_queue_size
Sets the size of the the fixed size queue that stores events.
#define pthread_mutex_destroy(_x)
uint32_t io_threads
Number of IO threads.
CassSession * session
Cluster's connection pool.
struct timeval lar_exclusion_threshold
How much worse the latency me be, compared to the average latency of the best performing node before ...