36 #define LOG_PREFIX "sql - cassandra"
38 #include <freeradius-devel/server/base.h>
39 #include <freeradius-devel/util/debug.h>
41 #ifdef HAVE_WDOCUMENTATION
45 #include <cassandra.h>
47 #ifdef HAVE_WDOCUMENTATION
57 CassResult
const *result;
58 CassIterator *iterator;
177 {
L(
"all"), CASS_CONSISTENCY_ALL },
178 {
L(
"any"), CASS_CONSISTENCY_ANY },
179 {
L(
"each_quorum"), CASS_CONSISTENCY_EACH_QUORUM },
180 {
L(
"local_one"), CASS_CONSISTENCY_LOCAL_ONE },
181 {
L(
"local_quorum"), CASS_CONSISTENCY_LOCAL_QUORUM },
182 {
L(
"one"), CASS_CONSISTENCY_ONE },
183 {
L(
"quorum"), CASS_CONSISTENCY_QUORUM },
184 {
L(
"three"), CASS_CONSISTENCY_THREE },
185 {
L(
"two"), CASS_CONSISTENCY_TWO }
190 {
L(
"identity"), CASS_SSL_VERIFY_PEER_IDENTITY },
191 {
L(
"no"), CASS_SSL_VERIFY_NONE },
192 {
L(
"yes"), CASS_SSL_VERIFY_PEER_CERT }
232 #if (CASS_VERSION_MAJOR >= 2 && CASS_VERSION_MINOR >= 10)
291 switch (message->severity) {
292 case CASS_LOG_CRITICAL:
295 ERROR(
"%s[%d] %s: %s",
296 message->file, message->line, message->function, message->message);
298 ERROR(
"%s", message->message);
304 WARN(
"%s[%d] %s: %s",
305 message->file, message->line, message->function, message->message);
307 WARN(
"%s", message->message);
312 case CASS_LOG_DISABLED:
313 case CASS_LOG_LAST_ENTRY:
315 INFO(
"%s[%d] %s: %s",
316 message->file, message->line, message->function, message->message);
318 INFO(
"%s", message->message);
327 message->file, message->line, message->function, message->message);
329 DEBUG2(
"%s", message->message);
346 talloc_free_children(conn->log_ctx);
348 conn->last_error.msg =
fr_asprint(conn->log_ctx, message, len,
'\0');
349 conn->last_error.type =
L_ERR;
363 CC_HINT(
format (printf, 2, 3));
368 talloc_free_children(conn->log_ctx);
371 conn->last_error.msg = talloc_vasprintf(conn->log_ctx,
fmt, ap);
373 conn->last_error.type =
L_ERR;
378 DEBUG2(
"Socket destructor called, closing socket");
380 if (conn->iterator) cass_iterator_free(conn->iterator);
381 if (conn->result) cass_result_free(conn->result);
399 if (!
inst->done_connect_keyspace) {
403 pthread_mutex_lock(&
inst->connect_mutex);
404 if (!
inst->done_connect_keyspace) {
411 DEBUG2(
"Connecting to Cassandra cluster");
412 future = cass_session_connect_keyspace(
inst->session,
inst->cluster,
config->sql_db);
413 ret = cass_future_error_code(future);
414 if (ret != CASS_OK) {
418 cass_future_error_message(future, &
msg, &msg_len);
419 ERROR(
"Unable to connect: [%x] %s", (
int)ret,
msg);
420 cass_future_free(future);
421 pthread_mutex_unlock(&
inst->connect_mutex);
425 cass_future_free(future);
426 inst->done_connect_keyspace =
true;
428 pthread_mutex_unlock(&
inst->connect_mutex);
430 conn->log_ctx = talloc_pool(conn, 1024);
440 CassStatement *statement;
444 statement = cass_statement_new_n(query, talloc_array_length(query) - 1, 0);
445 if (
inst->consistency_str) cass_statement_set_consistency(statement,
inst->consistency);
447 future = cass_session_execute(
inst->session, statement);
448 cass_statement_free(statement);
450 ret = cass_future_error_code(future);
451 if (ret != CASS_OK) {
455 cass_future_error_message(future, &error, &len);
457 cass_future_free(future);
460 case CASS_ERROR_SERVER_SYNTAX_ERROR:
461 case CASS_ERROR_SERVER_INVALID_QUERY:
469 conn->result = cass_future_get_result(future);
470 cass_future_free(future);
479 return conn->result ? cass_result_column_count(conn->result) : 0;
486 return conn->result ? cass_result_row_count(conn->result) : 0;
493 unsigned int fields, i;
499 MEM(
names = talloc_array(handle,
char const *, fields));
501 for (i = 0; i < fields; i++) {
502 const char *col_name;
506 if (cass_result_column_name(conn->result, i, &col_name, &col_name_len) != CASS_OK) {
507 col_name =
"<INVALID>";
521 CassRow
const *cass_row;
525 #define RLM_CASS_ERR_DATA_RETRIVE(_t) \
527 char const *_col_name;\
528 size_t _col_name_len;\
530 if ((_ret = cass_result_column_name(conn->result, i, &_col_name, &_col_name_len)) != CASS_OK) {\
531 _col_name = "<INVALID>";\
533 sql_set_last_error_printf(conn, "Failed to retrieve " _t " data at column %s (%d): %s", \
534 _col_name, i, cass_error_desc(_ret));\
535 TALLOC_FREE(handle->row);\
536 return RLM_SQL_ERROR;\
546 if (!conn->iterator) conn->iterator = cass_iterator_from_result(conn->result);
551 cass_row = cass_iterator_get_row(conn->iterator);
558 MEM(row = handle->
row = talloc_zero_array(handle,
char *, fields + 1));
560 for (i = 0; i < fields; i++) {
561 CassValue
const *
value;
564 value = cass_row_get_column(cass_row, i);
566 if (cass_value_is_null(
value) == cass_true)
continue;
570 case CASS_VALUE_TYPE_ASCII:
571 case CASS_VALUE_TYPE_TEXT:
572 case CASS_VALUE_TYPE_VARCHAR:
579 MEM(row[i] = talloc_array(row,
char, len + 1));
580 memcpy(row[i], str, len);
585 case CASS_VALUE_TYPE_BOOLEAN:
591 MEM(row[i] = talloc_zero_array(row,
char, 2));
592 row[i][0] = (bv == cass_false) ?
'0' :
'1';
596 case CASS_VALUE_TYPE_INT:
606 case CASS_VALUE_TYPE_TIMESTAMP:
607 case CASS_VALUE_TYPE_BIGINT:
617 case CASS_VALUE_TYPE_UUID:
618 case CASS_VALUE_TYPE_TIMEUUID:
623 MEM(row[i] = talloc_array(row,
char, CASS_UUID_STRING_LENGTH));
624 cass_uuid_string(uuid, row[i]);
630 const char *col_name;
633 if (cass_result_column_name(conn->result, i, &col_name,
634 &col_name_len) != CASS_OK) col_name =
"<INVALID>";
637 "Failed to retrieve data at column %s (%d): Unsupported data type",
653 if (handle->
row) TALLOC_FREE(handle->
row);
655 if (conn->iterator) {
656 cass_iterator_free(conn->iterator);
657 conn->iterator = NULL;
661 cass_result_free(conn->result);
673 if (conn->last_error.msg && (outlen >= 1)) {
674 out[0].msg = conn->last_error.msg;
675 out[0].type = conn->last_error.type;
676 conn->last_error.msg = NULL;
692 talloc_free_children(conn->log_ctx);
693 memset(&conn->last_error, 0,
sizeof(conn->last_error));
714 if (
inst->ssl) cass_ssl_free(
inst->ssl);
715 if (
inst->session) cass_session_free(
inst->session);
716 if (
inst->cluster) cass_cluster_free(
inst->cluster);
718 pthread_mutex_destroy(&
inst->connect_mutex);
729 bool do_latency_aware_routing =
false;
730 CassCluster *cluster;
733 #define DO_CASS_OPTION(_opt, _x) \
736 if ((_ret = (_x)) != CASS_OK) {\
737 ERROR("Error setting " _opt ": %s", cass_error_desc(_ret));\
738 return RLM_SQL_ERROR;\
742 if ((ret = pthread_mutex_init(&
inst->connect_mutex, NULL)) < 0) {
755 DEBUG4(
"Configuring CassCluster structure");
756 cluster =
inst->cluster = cass_cluster_new();
770 if (
config->sql_login &&
config->sql_password) cass_cluster_set_credentials(cluster,
config->sql_login,
776 if (
inst->consistency_str) {
780 if (consistency < 0) {
781 ERROR(
"Invalid consistency level \"%s\"",
inst->consistency_str);
784 inst->consistency = (CassConsistency)consistency;
787 if (
inst->protocol_version) {
789 cass_cluster_set_protocol_version(
inst->cluster,
inst->protocol_version));
792 if (
inst->connections_per_host) {
794 cass_cluster_set_core_connections_per_host(
inst->cluster,
795 inst->connections_per_host));
801 #if (CASS_VERSION_MAJOR <= 2 && CASS_VERSION_MINOR < 10)
802 if (
inst->connections_per_host_max) {
804 cass_cluster_set_max_connections_per_host(
inst->cluster,
805 inst->connections_per_host_max));
808 if (
inst->io_flush_requests_max) {
810 cass_cluster_set_max_requests_per_flush(
inst->cluster,
811 inst->io_flush_requests_max));
814 if (
inst->pending_requests_high) {
816 cass_cluster_set_pending_requests_high_water_mark(
inst->cluster,
817 inst->pending_requests_high));
820 if (
inst->pending_requests_low) {
822 cass_cluster_set_pending_requests_high_water_mark(
inst->cluster,
823 inst->pending_requests_low));
826 if (
inst->write_bytes_high) {
828 cass_cluster_set_write_bytes_high_water_mark(
inst->cluster,
829 inst->write_bytes_high));
832 if (
inst->write_bytes_low) {
834 cass_cluster_set_write_bytes_low_water_mark(
inst->cluster,
835 inst->write_bytes_low));
838 if (
inst->spawn_threshold) {
840 cass_cluster_set_max_concurrent_requests_threshold(
inst->cluster,
841 inst->spawn_threshold));
844 if (
inst->spawn_max) {
846 cass_cluster_set_max_concurrent_creation(
inst->cluster,
inst->spawn_max));
849 if (
inst->spawn_retry_delay_is_set) {
854 if (
inst->event_queue_size) {
856 cass_cluster_set_num_threads_io(
inst->cluster,
inst->event_queue_size));
859 if (
inst->io_queue_size) {
861 cass_cluster_set_num_threads_io(
inst->cluster,
inst->io_queue_size));
864 if (
inst->io_threads) {
868 if (
inst->load_balance_round_robin) cass_cluster_set_load_balance_round_robin(
inst->cluster);
870 cass_cluster_set_token_aware_routing(
inst->cluster,
inst->token_aware_routing);
872 if (
inst->lbdc_local_dc) {
874 cass_cluster_set_load_balance_dc_aware(
inst->cluster,
876 inst->lbdc_hosts_per_remote_dc,
877 inst->lbdc_allow_remote_dcs_for_local_cl));
880 if (do_latency_aware_routing) {
882 cass_cluster_set_latency_aware_routing(
inst->cluster,
true);
885 cass_cluster_set_latency_aware_routing_settings(
inst->cluster,
886 (cass_double_t)
inst->lar_exclusion_threshold,
890 inst->lar_min_measured);
893 if (
inst->tcp_keepalive) cass_cluster_set_tcp_keepalive(
inst->cluster,
true,
inst->tcp_keepalive);
894 cass_cluster_set_tcp_nodelay(
inst->cluster,
inst->tcp_nodelay);
899 ssl =
inst->ssl = cass_ssl_new();
902 if (
inst->tls_verify_cert_str) {
906 if (verify_cert < 0) {
907 ERROR(
"Invalid certificate validation type \"%s\", "
908 "must be one of 'yes', 'no', 'identity'",
inst->tls_verify_cert_str);
911 cass_ssl_set_verify_flags(ssl, verify_cert);
916 if (
inst->tls_ca_file) {
920 if (
inst->tls_certificate_file) {
921 DO_CASS_OPTION(
"certificate_file", cass_ssl_set_cert(ssl,
inst->tls_certificate_file));
924 if (
inst->tls_private_key_file) {
925 DO_CASS_OPTION(
"private_key", cass_ssl_set_private_key(ssl,
inst->tls_private_key_file,
926 inst->tls_private_key_password));
929 cass_cluster_set_ssl(cluster, ssl);
932 inst->session = cass_session_new();
943 #if (CASS_VERSION_MAJOR <= 2 && CASS_VERSION_MINOR <= 0)
950 INFO(
"Built against libcassandra version %d.%d.%d%s",
951 CASS_VERSION_MAJOR, CASS_VERSION_MINOR, CASS_VERSION_PATCH, CASS_VERSION_SUFFIX);
956 cass_log_set_level(CASS_LOG_INFO);
966 .name =
"sql_cassandra",
static int const char * fmt
#define L(_str)
Helper for initialising arrays of string literals.
#define CONF_PARSER_TERMINATOR
#define FR_CONF_DEPRECATED(_name, _struct, _field)
conf_parser_t entry which raises an error if a matching CONF_PAIR is found
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
#define FR_CONF_OFFSET_IS_SET(_name, _type, _flags, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct,...
#define FR_CONF_OFFSET_FLAGS(_name, _flags, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
@ CONF_FLAG_SECRET
Only print value if debug level >= 3.
@ CONF_FLAG_FILE_INPUT
File matching value must exist, and must be readable.
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Defines a CONF_PAIR to C data type mapping.
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
static fr_time_delta_t timeout
void *_CONST data
Module instance's parsed configuration.
#define MODULE_MAGIC_INIT
Stop people using different module/library/server versions together.
CONF_SECTION *_CONST conf
Module's instance configuration.
dl_module_inst_t const * inst
dl_module_inst_t const *_CONST parent
Parent module's instance (if any).
#define DEBUG_ENABLED3
True if global debug level 1-3 messages are enabled.
@ FR_TYPE_TIME_DELTA
A period of time measured in nanoseconds.
dl_module_inst_t const * inst
Dynamic loader API handle for the module.
Temporary structure to hold arguments for instantiation calls.
char * fr_asprint(TALLOC_CTX *ctx, char const *in, ssize_t inlen, char quote)
Escape string that may contain binary data, and write it to a new buffer.
static const conf_parser_t config[]
static int instantiate(module_inst_ctx_t const *mctx)
Prototypes and functions for the SQL module.
rlm_sql_t const * inst
The rlm_sql instance this connection belongs to.
void * conn
Database specific connection handle.
sql_rcode_t
Action to take at end of an SQL query.
@ RLM_SQL_QUERY_INVALID
Query syntax error.
@ RLM_SQL_ERROR
General connection/server error.
@ RLM_SQL_NO_MORE_ROWS
No more rows available.
rlm_sql_row_t row
Row data from the last query.
static sql_rcode_t sql_socket_init(rlm_sql_handle_t *handle, rlm_sql_config_t const *config, fr_time_delta_t timeout)
static int mod_detach(module_detach_ctx_t const *mctx)
static int mod_load(void)
static sql_rcode_t sql_fetch_row(rlm_sql_row_t *out, rlm_sql_handle_t *handle, rlm_sql_config_t const *config)
uint32_t connections_per_host_max
Maximum number of connections to each server in each IO threads.
#define DO_CASS_OPTION(_opt, _x)
char const * lbdc_local_dc
The primary data center to try first.
uint32_t connections_per_host
Number of connections to each server in each IO thread.
bool spawn_retry_delay_is_set
uint32_t spawn_threshold
Threshold for the maximum number of concurrent requests in-flight on a connection before creating a n...
uint32_t protocol_version
The protocol version.
fr_time_delta_t lar_update_rate
The rate at which the best average latency is recomputed.
CassConsistency consistency
Level of consistency converted to a constant.
DIAG_OFF(strict-prototypes) typedef struct
Cassandra cluster connection.
static conf_parser_t load_balance_dc_aware_config[]
static void sql_set_last_error(rlm_sql_cassandra_conn_t *conn, char const *message, size_t len)
Replace the last error messages associated with the connection.
uint32_t pending_requests_high
Sets the high water mark for the number of requests queued waiting for a connection in a connection p...
uint32_t pending_requests_low
Sets the low water mark for the number of requests queued waiting for a connection in a connection po...
static sql_rcode_t sql_finish_query(rlm_sql_handle_t *handle, rlm_sql_config_t const *config)
CassSsl * ssl
Connection's SSL context.
static conf_parser_t latency_aware_routing_config[]
CassSession * session
Cluster's connection pool.
static int sql_affected_rows(UNUSED rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t const *config)
uint32_t io_flush_requests_max
Maximum number of requests processed by an IO worker per flush.
pthread_mutex_t connect_mutex
Mutex to prevent multiple connections attempting to connect a keyspace concurrently.
static void sql_set_last_error_printf(rlm_sql_cassandra_conn_t *conn, char const *fmt,...))
Replace the last error messages associated with the connection.
uint32_t lbdc_hosts_per_remote_dc
The number of host used in each remote DC if no hosts are available in the local dc.
static conf_parser_t tls_config[]
fr_time_delta_t spawn_retry_delay
Amount of time to wait before attempting to reconnect.
bool lbdc_allow_remote_dcs_for_local_cl
Allows remote hosts to be used if no local.
static sql_rcode_t sql_free_result(rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t const *config)
char const * tls_ca_file
Path to the CA used to validate the server's certificate.
CassCluster * cluster
Configuration of the cassandra cluster connection.
bool tcp_nodelay
Disable TCP naggle algorithm.
bool done_connect_keyspace
Whether we've connected to a keyspace.
uint32_t write_bytes_high
High water mark for the number of bytes outstanding on a connection.
static void mod_unload(void)
char const * tls_verify_cert_str
Whether we validate the cert provided by the server.
static fr_table_num_sorted_t const consistency_levels[]
char const * tls_certificate_file
Public certificate we present to the server.
uint32_t event_queue_size
Sets the size of the the fixed size queue that stores events.
double lar_exclusion_threshold
How much worse the latency me be, compared to the average latency of the best performing node before ...
char const * tls_private_key_password
String to decrypt private key.
static int sql_num_rows(rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t const *config)
static sql_rcode_t sql_fields(char const **out[], rlm_sql_handle_t *handle, rlm_sql_config_t const *config)
static const conf_parser_t driver_config[]
#define RLM_CASS_ERR_DATA_RETRIVE(_t)
static size_t consistency_levels_len
bool load_balance_round_robin
Enable round robin load balancing.
char const * consistency_str
Level of consistency required.
uint32_t io_queue_size
Size of the the fixed size queue that stores pending requests.
fr_time_delta_t lar_scale
Weight given to older latencies when calculating the average latency of a node.
static sql_rcode_t sql_query(rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t const *config, char const *query)
static size_t verify_cert_table_len
static fr_table_num_sorted_t const verify_cert_table[]
uint32_t write_bytes_low
Low water mark for number of bytes outstanding on a connection.
bool token_aware_routing
Whether to use token aware routing.
uint32_t tcp_keepalive
How often to send TCP keepalives.
static size_t sql_error(UNUSED TALLOC_CTX *ctx, sql_log_entry_t out[], size_t outlen, rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t const *config)
static int sql_num_fields(rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t const *config)
char const * tls_private_key_file
Private key for the certificate we present to the server.
static void _rlm_sql_cassandra_log(CassLogMessage const *message, UNUSED void *data)
Log callback for libcassandra.
uint32_t io_threads
Number of IO threads.
static int mod_instantiate(module_inst_ctx_t const *mctx)
uint32_t spawn_max
The maximum number of connections that will be created concurrently.
fr_time_delta_t lar_retry_period
The amount of time a node is penalized by the policy before being given a second chance when the curr...
rlm_sql_driver_t rlm_sql_cassandra
static int _sql_socket_destructor(rlm_sql_cassandra_conn_t *conn)
uint64_t lar_min_measured
The minimum number of measurements per-host required to be considered by the policy.
Cassandra driver instance.
dl_module_inst_t * dl_inst
Structure containing the module's instance data, configuration, and dl handle.
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
eap_aka_sim_process_conf_t * inst
fr_aka_sim_id_type_t type
module_t common
Common fields for all loadable modules.
module_instance_t * driver_submodule
Driver's submodule.
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
#define fr_table_value_by_str(_table, _name, _def)
Convert a string to a value using a sorted or ordered table.
An element in a lexicographically sorted array of name to num mappings.
char * talloc_typed_asprintf(TALLOC_CTX *ctx, char const *fmt,...)
Call talloc vasprintf, setting the type on the new chunk correctly.
static const char * names[8]
#define fr_time_delta_ispos(_a)
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
A time delta, a difference in time measured in nanoseconds.
int format(printf, 5, 0))
static size_t char ** out