36 #define LOG_PREFIX "sql - cassandra"
38 #include <freeradius-devel/server/base.h>
39 #include <freeradius-devel/util/debug.h>
41 #ifdef HAVE_WDOCUMENTATION
45 #include <cassandra.h>
47 #ifdef HAVE_WDOCUMENTATION
55 bool done_connect_keyspace;
168 {
L(
"all"), CASS_CONSISTENCY_ALL },
169 {
L(
"any"), CASS_CONSISTENCY_ANY },
170 {
L(
"each_quorum"), CASS_CONSISTENCY_EACH_QUORUM },
171 {
L(
"local_one"), CASS_CONSISTENCY_LOCAL_ONE },
172 {
L(
"local_quorum"), CASS_CONSISTENCY_LOCAL_QUORUM },
173 {
L(
"one"), CASS_CONSISTENCY_ONE },
174 {
L(
"quorum"), CASS_CONSISTENCY_QUORUM },
175 {
L(
"three"), CASS_CONSISTENCY_THREE },
176 {
L(
"two"), CASS_CONSISTENCY_TWO }
181 {
L(
"identity"), CASS_SSL_VERIFY_PEER_IDENTITY },
182 {
L(
"no"), CASS_SSL_VERIFY_NONE },
183 {
L(
"yes"), CASS_SSL_VERIFY_PEER_CERT }
251 switch (message->severity) {
252 case CASS_LOG_CRITICAL:
255 ERROR(
"%s[%d] %s: %s",
256 message->file, message->line, message->function, message->message);
258 ERROR(
"%s", message->message);
264 WARN(
"%s[%d] %s: %s",
265 message->file, message->line, message->function, message->message);
267 WARN(
"%s", message->message);
272 case CASS_LOG_DISABLED:
273 case CASS_LOG_LAST_ENTRY:
275 INFO(
"%s[%d] %s: %s",
276 message->file, message->line, message->function, message->message);
278 INFO(
"%s", message->message);
287 message->file, message->line, message->function, message->message);
289 DEBUG2(
"%s", message->message);
318 CC_HINT(
format (printf, 3, 4));
325 cass_query_ctx->
error.
msg = talloc_vasprintf(ctx,
fmt, ap);
334 DEBUG2(
"Socket destructor called, closing socket");
353 .log_ctx = talloc_pool(c, 2048),
354 .poll_interval = 1000
364 if (!
inst->mutable->done_connect_keyspace) {
368 pthread_mutex_lock(&
inst->mutable->connect_mutex);
369 if (!
inst->mutable->done_connect_keyspace) {
376 DEBUG2(
"Connecting to Cassandra cluster");
377 future = cass_session_connect_keyspace(
inst->session,
inst->cluster,
config->sql_db);
378 ret = cass_future_error_code(future);
379 if (ret != CASS_OK) {
383 cass_future_error_message(future, &
msg, &msg_len);
384 ERROR(
"Unable to connect: [%x] %s", (
int)ret,
msg);
385 cass_future_free(future);
386 pthread_mutex_unlock(&
inst->mutable->connect_mutex);
390 cass_future_free(future);
391 inst->mutable->done_connect_keyspace =
true;
393 pthread_mutex_unlock(&
inst->mutable->connect_mutex);
412 CassStatement *statement;
422 switch (query_ctx->
status) {
425 statement = cass_statement_new_n(query_ctx->
query_str, talloc_array_length(query_ctx->
query_str) - 1, 0);
426 if (
inst->consistency_str) cass_statement_set_consistency(statement,
inst->consistency);
433 query_ctx->
uctx = cass_query_ctx;
439 cass_query->
future = cass_session_execute(
inst->session, statement);
447 query_ctx->
tconn = tconn;
465 bool handled =
false;
472 if (!cass_future_ready(next_query->
future))
goto next;
474 cass_query = next_query;
482 ret = cass_future_error_code(cass_query->
future);
483 if (ret != CASS_OK) {
487 cass_future_error_message(cass_query->
future, &error, &len);
491 case CASS_ERROR_SERVER_SYNTAX_ERROR:
492 case CASS_ERROR_SERVER_INVALID_QUERY:
500 cass_future_free(cass_query->
future);
506 cass_query_ctx->
result = cass_future_get_result(cass_query->
future);
507 cass_future_free(cass_query->
future);
540 ERROR(
"Unable to insert polling event");
576 ERROR(
"Unable to insert polling event");
586 ERROR(
"Unable to insert polling event");
603 if (!query_ctx->
treq)
return;
612 if (cass_query->
query_ctx == query_ctx) {
614 cass_future_free(cass_query->
future);
624 return cass_query_ctx->
result ? cass_result_row_count(cass_query_ctx->
result) : 0;
630 CassResult
const *result = cass_query_ctx->
result;
632 unsigned int fields, i;
635 fields = result ? cass_result_column_count(result) : 0;
638 MEM(
names = talloc_array(query_ctx,
char const *, fields));
640 for (i = 0; i < fields; i++) {
641 const char *col_name;
645 if (cass_result_column_name(result, i, &col_name, &col_name_len) != CASS_OK) {
646 col_name =
"<INVALID>";
660 CassRow
const *cass_row;
662 CassResult
const *result = cass_query_ctx->
result;
666 #define RLM_CASS_ERR_DATA_RETRIVE(_t) \
668 char const *_col_name;\
669 size_t _col_name_len;\
671 if ((_ret = cass_result_column_name(result, i, &_col_name, &_col_name_len)) != CASS_OK) {\
672 _col_name = "<INVALID>";\
674 sql_set_query_error_printf(conn->log_ctx, cass_query_ctx, "Failed to retrieve " _t " data at column %s (%d): %s", \
675 _col_name, i, cass_error_desc(_ret));\
676 TALLOC_FREE(query_ctx->row);\
677 query_ctx->rcode = RLM_SQL_ERROR;\
693 TALLOC_FREE(query_ctx->
row);
695 if (!cass_iterator_next(conn->
iterator)) {
700 cass_row = cass_iterator_get_row(conn->
iterator);
701 fields = cass_result_column_count(result);
703 MEM(row = query_ctx->
row = talloc_zero_array(query_ctx,
char *, fields + 1));
705 for (i = 0; i < fields; i++) {
706 CassValue
const *
value;
709 value = cass_row_get_column(cass_row, i);
711 if (cass_value_is_null(
value) == cass_true)
continue;
715 case CASS_VALUE_TYPE_ASCII:
716 case CASS_VALUE_TYPE_TEXT:
717 case CASS_VALUE_TYPE_VARCHAR:
724 MEM(row[i] = talloc_array(row,
char, len + 1));
725 memcpy(row[i], str, len);
730 case CASS_VALUE_TYPE_BOOLEAN:
736 MEM(row[i] = talloc_zero_array(row,
char, 2));
737 row[i][0] = (bv == cass_false) ?
'0' :
'1';
741 case CASS_VALUE_TYPE_INT:
751 case CASS_VALUE_TYPE_TIMESTAMP:
752 case CASS_VALUE_TYPE_BIGINT:
762 case CASS_VALUE_TYPE_UUID:
763 case CASS_VALUE_TYPE_TIMEUUID:
768 MEM(row[i] = talloc_array(row,
char, CASS_UUID_STRING_LENGTH));
769 cass_uuid_string(uuid, row[i]);
775 const char *col_name;
778 if (cass_result_column_name(result, i, &col_name,
779 &col_name_len) != CASS_OK) col_name =
"<INVALID>";
782 "Failed to retrieve data at column %s (%d): Unsupported data type",
798 if (query_ctx->
row) TALLOC_FREE(query_ctx->
row);
805 if (query_ctx->
uctx) {
806 CassResult
const *result = query_ctx->
uctx;
807 cass_result_free(result);
808 query_ctx->
uctx = NULL;
819 if (cass_query_ctx->
error.
msg && (outlen >= 1)) {
854 if (
inst->ssl) cass_ssl_free(
inst->ssl);
855 if (
inst->session) cass_session_free(
inst->session);
856 if (
inst->cluster) cass_cluster_free(
inst->cluster);
858 pthread_mutex_destroy(&
inst->mutable->connect_mutex);
870 bool do_latency_aware_routing =
false;
871 CassCluster *cluster;
874 #define DO_CASS_OPTION(_opt, _x) \
877 if ((_ret = (_x)) != CASS_OK) {\
878 ERROR("Error setting " _opt ": %s", cass_error_desc(_ret));\
884 if ((ret = pthread_mutex_init(&
inst->mutable->connect_mutex, NULL)) < 0) {
897 DEBUG4(
"Configuring CassCluster structure");
898 cluster =
inst->cluster = cass_cluster_new();
899 if (!cluster)
return -1;
912 if (
config->sql_login &&
config->sql_password) cass_cluster_set_credentials(cluster,
config->sql_login,
918 if (
inst->consistency_str) {
922 if (consistency < 0) {
923 ERROR(
"Invalid consistency level \"%s\"",
inst->consistency_str);
926 inst->consistency = (CassConsistency)consistency;
929 if (
inst->protocol_version) {
931 cass_cluster_set_protocol_version(
inst->cluster,
inst->protocol_version));
934 if (
inst->connections_per_host) {
936 cass_cluster_set_core_connections_per_host(
inst->cluster,
937 inst->connections_per_host));
940 if (
inst->event_queue_size) {
942 cass_cluster_set_num_threads_io(
inst->cluster,
inst->event_queue_size));
945 if (
inst->io_queue_size) {
947 cass_cluster_set_num_threads_io(
inst->cluster,
inst->io_queue_size));
950 if (
inst->io_threads) {
954 if (
inst->load_balance_round_robin) cass_cluster_set_load_balance_round_robin(
inst->cluster);
956 cass_cluster_set_token_aware_routing(
inst->cluster,
inst->token_aware_routing);
958 if (
inst->lbdc_local_dc) {
960 cass_cluster_set_load_balance_dc_aware(
inst->cluster,
962 inst->lbdc_hosts_per_remote_dc,
963 inst->lbdc_allow_remote_dcs_for_local_cl));
966 if (do_latency_aware_routing) {
968 cass_cluster_set_latency_aware_routing(
inst->cluster,
true);
971 cass_cluster_set_latency_aware_routing_settings(
inst->cluster,
972 (cass_double_t)
inst->lar_exclusion_threshold,
976 inst->lar_min_measured);
979 if (
inst->tcp_keepalive) cass_cluster_set_tcp_keepalive(
inst->cluster,
true,
inst->tcp_keepalive);
980 cass_cluster_set_tcp_nodelay(
inst->cluster,
inst->tcp_nodelay);
985 ssl =
inst->ssl = cass_ssl_new();
988 if (
inst->tls_verify_cert_str) {
992 if (verify_cert < 0) {
993 ERROR(
"Invalid certificate validation type \"%s\", "
994 "must be one of 'yes', 'no', 'identity'",
inst->tls_verify_cert_str);
997 cass_ssl_set_verify_flags(ssl, verify_cert);
1002 if (
inst->tls_ca_file) {
1006 if (
inst->tls_certificate_file) {
1007 DO_CASS_OPTION(
"certificate_file", cass_ssl_set_cert(ssl,
inst->tls_certificate_file));
1010 if (
inst->tls_private_key_file) {
1011 DO_CASS_OPTION(
"private_key", cass_ssl_set_private_key(ssl,
inst->tls_private_key_file,
1012 inst->tls_private_key_password));
1015 cass_cluster_set_ssl(cluster, ssl);
1018 inst->session = cass_session_new();
1019 if (!
inst->session)
return -1;
1026 INFO(
"Built against libcassandra version %d.%d.%d%s",
1027 CASS_VERSION_MAJOR, CASS_VERSION_MINOR, CASS_VERSION_PATCH, CASS_VERSION_SUFFIX);
1032 cass_log_set_level(CASS_LOG_INFO);
1042 .name =
"sql_cassandra",
1051 .sql_query_resume = sql_query_resume,
1052 .sql_select_query_resume = sql_query_resume,
1061 .uses_trunks =
true,
1063 .connection_alloc = sql_trunk_connection_alloc,
1064 .connection_notify = sql_trunk_connection_notify,
1065 .request_mux = sql_trunk_request_mux,
1066 .request_cancel = sql_request_cancel,
unlang_action_t
Returned by unlang_op_t calls, determine the next action of the interpreter.
static int const char * fmt
#define L(_str)
Helper for initialising arrays of string literals.
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
#define CONF_PARSER_TERMINATOR
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
#define FR_CONF_OFFSET_FLAGS(_name, _flags, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
@ CONF_FLAG_SECRET
Only print value if debug level >= 3.
@ CONF_FLAG_FILE_INPUT
File matching value must exist, and must be readable.
@ CONF_FLAG_OK_MISSING
OK if it's missing.
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Defines a CONF_PAIR to C data type mapping.
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
@ CONNECTION_STATE_FAILED
Connection has failed.
@ CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
fr_dcursor_eval_t void const * uctx
#define MODULE_MAGIC_INIT
Stop people using different module/library/server versions together.
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Head of a doubly linked list.
Entry in a doubly linked list.
#define fr_event_timer_in(...)
void unlang_interpret_mark_runnable(request_t *request)
Mark a request as resumable.
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
#define DEBUG_ENABLED3
True if global debug level 1-3 messages are enabled.
int fr_event_timer_delete(fr_event_timer_t const **ev_p)
Delete a timer event from the event list.
Stores all information relating to an event list.
module_instance_t * mi
Module instance to detach.
module_instance_t * mi
Instance of the module being instantiated.
Temporary structure to hold arguments for detach calls.
Temporary structure to hold arguments for instantiation calls.
char * fr_asprint(TALLOC_CTX *ctx, char const *in, ssize_t inlen, char quote)
Escape string that may contain binary data, and write it to a new buffer.
static const conf_parser_t config[]
rlm_rcode_t
Return codes indicating the result of the module call.
static int instantiate(module_inst_ctx_t const *mctx)
Prototypes and functions for the SQL module.
char const * msg
Log message.
fr_log_type_t type
Type of log entry L_ERR, L_WARN, L_INFO, L_DBG etc.
fr_sql_query_status_t status
Status of the query.
trunk_connection_t * tconn
Trunk connection this query is being run on.
#define RLM_SQL_MULTI_QUERY_CONN
Can support multiple queries on a single connection.
void * uctx
Driver specific data.
char const * query_str
Query string to run.
request_t * request
Request this query relates to.
sql_rcode_t
Action to take at end of an SQL query.
@ RLM_SQL_QUERY_INVALID
Query syntax error.
@ RLM_SQL_ERROR
General connection/server error.
@ RLM_SQL_NO_MORE_ROWS
No more rows available.
fr_time_delta_t query_timeout
How long to allow queries to run for.
rlm_sql_row_t row
Row data from the last query.
sql_rcode_t rcode
Result code.
trunk_request_t * treq
Trunk request for this query.
@ SQL_QUERY_SUBMITTED
Submitted for execution.
@ SQL_QUERY_PREPARED
Ready to submit.
static int mod_detach(module_detach_ctx_t const *mctx)
static int mod_load(void)
static void sql_set_query_error_printf(TALLOC_CTX *ctx, cassandra_query_ctx_t *cass_query_ctx, char const *fmt,...))
Store the last error associated with a query, using a format string.
static sql_rcode_t sql_fields(char const **out[], fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
#define DO_CASS_OPTION(_opt, _x)
fr_event_timer_t const * write_ev
Polling event for sending queries.
char const * lbdc_local_dc
The primary data center to try first.
uint poll_count
How many consecutive polls had no available results.
sql_log_entry_t error
Most recent Cassandra error message for this query.
fr_dlist_head_t queries
Outstanding queries on this connection.
uint32_t connections_per_host
Number of connections to each server in each IO thread.
static void sql_trunk_connection_write_poll(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
uint32_t protocol_version
The protocol version.
fr_event_timer_t const * read_ev
Polling event for reading query results.
fr_time_delta_t lar_update_rate
The rate at which the best average latency is recomputed.
CassConsistency consistency
Level of consistency converted to a constant.
DIAG_OFF(strict-prototypes) typedef struct
static conf_parser_t load_balance_dc_aware_config[]
CassSsl * ssl
Connection's SSL context.
static conf_parser_t latency_aware_routing_config[]
CassSession * session
Cluster's connection pool.
static unlang_action_t sql_fetch_row(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx)
uint32_t lbdc_hosts_per_remote_dc
The number of host used in each remote DC if no hosts are available in the local dc.
static conf_parser_t tls_config[]
bool lbdc_allow_remote_dcs_for_local_cl
Allows remote hosts to be used if no local.
static int sql_num_rows(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
char const * tls_ca_file
Path to the CA used to validate the server's certificate.
static void sql_set_query_error(TALLOC_CTX *ctx, cassandra_query_ctx_t *cass_query_ctx, char const *message, size_t len)
Store the last error associated with a query.
static int sql_affected_rows(UNUSED fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
CassFuture * future
Future produced when submitting query.
CassCluster * cluster
Configuration of the cassandra cluster connection.
bool tcp_nodelay
Disable TCP naggle algorithm.
char const * tls_verify_cert_str
Whether we validate the cert provided by the server.
static size_t sql_error(UNUSED TALLOC_CTX *ctx, sql_log_entry_t out[], size_t outlen, fr_sql_query_t *query_ctx)
static void _sql_connection_close(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
static fr_table_num_sorted_t const consistency_levels[]
char const * tls_certificate_file
Public certificate we present to the server.
uint32_t event_queue_size
Sets the size of the the fixed size queue that stores events.
double lar_exclusion_threshold
How much worse the latency me be, compared to the average latency of the best performing node before ...
char const * tls_private_key_password
String to decrypt private key.
uint poll_interval
Interval between read polling.
static const conf_parser_t driver_config[]
#define RLM_CASS_ERR_DATA_RETRIVE(_t)
CassResult const * result
Cassandra result handle.
static size_t consistency_levels_len
fr_sql_query_t * query_ctx
SQL query ctx.
static sql_rcode_t sql_finish_query(fr_sql_query_t *query_ctx, rlm_sql_config_t const *config)
bool load_balance_round_robin
Enable round robin load balancing.
char const * consistency_str
Level of consistency required.
uint32_t io_queue_size
Size of the the fixed size queue that stores pending requests.
fr_dlist_t entry
Entry in list of outstanding queries.
fr_time_delta_t lar_scale
Weight given to older latencies when calculating the average latency of a node.
static size_t verify_cert_table_len
static sql_rcode_t sql_free_result(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
static fr_table_num_sorted_t const verify_cert_table[]
connection_t * conn
Generic connection structure for managing this handle.
rlm_sql_config_t const * config
SQL instance config.
bool token_aware_routing
Whether to use token aware routing.
uint32_t tcp_keepalive
How often to send TCP keepalives.
rlm_sql_cassandra_mutable_t
CassIterator * iterator
Row set iterator.
char const * tls_private_key_file
Private key for the certificate we present to the server.
static void sql_trunk_connection_read_poll(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
TALLOC_CTX * log_ctx
Prevent unneeded memory allocation by keeping a permanent pool, to store log entries.
static void _rlm_sql_cassandra_log(CassLogMessage const *message, UNUSED void *data)
Log callback for libcassandra.
uint32_t io_threads
Number of IO threads.
static int mod_instantiate(module_inst_ctx_t const *mctx)
rlm_sql_cassandra_t const * inst
Module instance for this connection.
fr_time_delta_t lar_retry_period
The amount of time a node is penalized by the policy before being given a second chance when the curr...
rlm_sql_driver_t rlm_sql_cassandra
uint64_t lar_min_measured
The minimum number of measurements per-host required to be considered by the policy.
Driver specific data to attach to the query ctx.
Structure for tracking outstanding queries.
Cassandra cluster connection.
Cassandra driver instance.
static void sql_request_fail(request_t *request, void *preq, UNUSED void *rctx, UNUSED trunk_request_state_t state, UNUSED void *uctx)
Macros to reduce boilerplate in trunk SQL drivers.
#define SQL_TRUNK_CONNECTION_ALLOC
Allocate an SQL trunk connection.
CONF_SECTION * conf
Module's instance configuration.
void * data
Module's instance data.
module_instance_t const * parent
Parent module's instance (if any).
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
eap_aka_sim_process_conf_t * inst
fr_aka_sim_id_type_t type
module_t common
Common fields for all loadable modules.
module_instance_t * driver_submodule
Driver's submodule.
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
#define fr_table_value_by_str(_table, _name, _def)
Convert a string to a value using a sorted or ordered table.
An element in a lexicographically sorted array of name to num mappings.
char * talloc_typed_asprintf(TALLOC_CTX *ctx, char const *fmt,...)
Call talloc vasprintf, setting the type on the new chunk correctly.
#define talloc_get_type_abort_const
static int talloc_const_free(void const *ptr)
Free const'd memory.
static const char * names[8]
#define fr_time_delta_ispos(_a)
static int64_t fr_time_delta_to_usec(fr_time_delta_t delta)
static fr_time_delta_t fr_time_delta_from_usec(int64_t usec)
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
A time delta, a difference in time measured in nanoseconds.
void trunk_request_signal_fail(trunk_request_t *treq)
Signal that a trunk request failed.
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
void trunk_connection_signal_writable(trunk_connection_t *tconn)
Signal that a trunk connection is writable.
Associates request queues with a connection.
trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
@ TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
@ TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
@ TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
@ TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
trunk_cancel_reason_t
Reasons for a request being cancelled.
@ TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
static fr_event_list_t * el
int format(printf, 5, 0))
static size_t char ** out