All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
rlm_sql_cassandra.c
Go to the documentation of this file.
1 /**
2  * This is free and unencumbered software released into the public domain.
3  *
4  * Anyone is free to copy, modify, publish, use, compile, sell, or
5  * distribute this software, either in source code form or as a compiled
6  * binary, for any purpose, commercial or non-commercial, and by any
7  * means.
8  *
9  * In jurisdictions that recognize copyright laws, the author or authors
10  * of this software dedicate any and all copyright interest in the
11  * software to the public domain. We make this dedication for the benefit
12  * of the public at large and to the detriment of our heirs and
13  * successors. We intend this dedication to be an overt act of
14  * relinquishment in perpetuity of all present and future rights to this
15  * software under copyright law.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
20  * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
21  * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
22  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
23  * OTHER DEALINGS IN THE SOFTWARE.
24  *
25  * For more information, please refer to <http://unlicense.org>
26  */
27 
28 /**
29  * $Id: 907f73a71f076e5484a1be1adcd3e9ad5dd6e5cf $
30  * @file rlm_sql_cassandra.c
31  * @brief Cassandra SQL driver
32  *
33  * @author Linnaea Von Lavia <le.concorde.4590@gmail.com>
34  * @author Arran Cudbard-Bell <a.cudbardb@freeradius.org>
35  */
36 #include <freeradius-devel/radiusd.h>
37 #include <freeradius-devel/rad_assert.h>
38 
39 #include <cassandra.h>
40 
41 #include "rlm_sql.h"
42 
43 static int rlm_sql_cass_instances = 0;
44 
45 /** Cassandra cluster connection
46  *
47  */
48 typedef struct rlm_sql_cassandra_conn {
49  CassResult const *result; //!< Result from executing a query.
50  CassIterator *iterator; //!< Row set iterator.
51 
52  TALLOC_CTX *log_ctx; //!< Prevent unneeded memory allocation by keeping a
53  //!< permanent pool, to store log entries.
56 
57 /** Cassandra driver instance
58  *
59  */
60 typedef struct rlm_sql_cassandra_config {
61  CassCluster *cluster; //!< Configuration of the cassandra cluster connection.
62  CassSession *session; //!< Cluster's connection pool.
63  CassSsl *ssl; //!< Connection's SSL context.
64  bool done_connect_keyspace; //!< Whether we've connected to a keyspace.
65 
66 #ifdef HAVE_PTHREAD_H
67  pthread_mutex_t connect_mutex; //!< Mutex to prevent multiple connections attempting
68  //!< to connect a keyspace concurrently.
69 #endif
70 
71  /*
72  * Configuration options
73  */
74  char const *consistency_str; //!< Level of consistency required.
75  CassConsistency consistency; //!< Level of consistency converted to a constant.
76 
77  uint32_t protocol_version; //!< The protocol version.
78 
79  uint32_t connections_per_host; //!< Number of connections to each server in each
80  //!< IO thread.
81  uint32_t connections_per_host_max; //!< Maximum number of connections to each server
82  //!< in each IO threads.
83  uint32_t io_threads; //!< Number of IO threads.
84 
85  uint32_t io_queue_size; //!< Size of the the fixed size queue that stores
86  //!< pending requests.
87 
88  uint32_t io_flush_requests_max; //!< Maximum number of requests processed by an
89  //!< IO worker per flush.
90 
91  uint32_t pending_requests_high; //!< Sets the high water mark for the number of requests
92  //!< queued waiting for a connection in a connection
93  //!< pool. Disables writes to a host on an IO worker
94  //!< if the number of requests queued exceed this value.
95 
96  uint32_t pending_requests_low; //!< Sets the low water mark for the number of requests
97  //!< queued waiting for a connection in a connection
98  //!< pool. After exceeding high water mark requests,
99  //!< writes to a host will only resume once the number
100  //!< of requests fall below this value.
101 
102  uint32_t write_bytes_high; //!< High water mark for the number of bytes
103  //!< outstanding on a connection. Disables writes to
104  //!< a connection if the number of bytes queued exceed
105  //!< this value.
106 
107  uint32_t write_bytes_low; //!< Low water mark for number of bytes outstanding on
108  //!< a connection. After exceeding high water mark
109  //!< bytes, writes will only resume once the number of
110  //!< bytes fall below this value.
111 
112  uint32_t event_queue_size; //!< Sets the size of the the fixed size queue
113  //!< that stores events.
114 
115  uint32_t spawn_threshold; //!< Threshold for the maximum number of concurrent
116  //!< requests in-flight on a connection before creating
117  //!< a new connection.
118  uint32_t spawn_max; //!< The maximum number of connections that
119  //!< will be created concurrently.
120 
121  struct timeval spawn_retry_delay; //!< Amount of time to wait before attempting
122  //!< to reconnect.
123 
124  bool load_balance_round_robin; //!< Enable round robin load balancing.
125 
126  bool token_aware_routing; //!< Whether to use token aware routing.
127 
128  char const *lbdc_local_dc; //!< The primary data center to try first.
129  uint32_t lbdc_hosts_per_remote_dc; //!< The number of host used in each remote DC if
130  //!< no hosts are available in the local dc
131 
132  bool lbdc_allow_remote_dcs_for_local_cl; //!< Allows remote hosts to be used if no local
133  //!< dc hosts are available and the consistency level
134  //!< is LOCAL_ONE or LOCAL_QUORUM.
135 
136  struct timeval lar_exclusion_threshold; //!< How much worse the latency me be, compared to
137  //!< the average latency of the best performing node
138  //!< before it's penalized.
139  //!< This gets mangled to a double.
140 
141  struct timeval lar_scale; //!< Weight given to older latencies when calculating
142  //!< the average latency of a node. A bigger scale will
143  //!< give more weight to older latency measurements.
144 
145  struct timeval lar_retry_period; //!< The amount of time a node is penalized by the
146  //!< policy before being given a second chance when
147  //!< the current average latency exceeds the calculated
148  //!< threshold
149  //!< (exclusion_threshold * best_average_latency).
150 
151  struct timeval lar_update_rate; //!< The rate at which the best average latency is
152  //!< recomputed.
153  uint64_t lar_min_measured; //!< The minimum number of measurements per-host
154  //!< required to be considered by the policy.
155 
156  uint32_t tcp_keepalive; //!< How often to send TCP keepalives.
157  bool tcp_nodelay; //!< Disable TCP naggle algorithm.
158 
159  char const *tls_ca_file; //!< Path to the CA used to validate the server's
160  //!< certificate.
161  char const *tls_certificate_file; //!< Public certificate we present to the server.
162  char const *tls_private_key_file; //!< Private key for the certificate we present to the
163  //!< server.
164  char const *tls_private_key_password; //!< String to decrypt private key.
165  char const *tls_verify_cert_str; //!< Whether we validate the cert provided by the
166  //!< server.
168 
170  { "any", CASS_CONSISTENCY_ANY },
171  { "one", CASS_CONSISTENCY_ONE },
172  { "two", CASS_CONSISTENCY_TWO },
173  { "three", CASS_CONSISTENCY_THREE },
174  { "quorum", CASS_CONSISTENCY_QUORUM },
175  { "all", CASS_CONSISTENCY_ALL },
176  { "each_quorum", CASS_CONSISTENCY_EACH_QUORUM },
177  { "local_quorum", CASS_CONSISTENCY_LOCAL_QUORUM },
178  { "local_one", CASS_CONSISTENCY_LOCAL_ONE },
179  { NULL, 0 }
180 };
181 
183  { "no", CASS_SSL_VERIFY_NONE },
184  { "yes", CASS_SSL_VERIFY_PEER_CERT },
185  { "identity", CASS_SSL_VERIFY_PEER_IDENTITY },
186  { NULL, 0 }
187 };
188 
190  { FR_CONF_OFFSET("local_dc", PW_TYPE_STRING, rlm_sql_cassandra_config_t, lbdc_local_dc) },
191  { FR_CONF_OFFSET("hosts_per_remote_dc", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, lbdc_hosts_per_remote_dc), .dflt = "0" },
192  { FR_CONF_OFFSET("allow_remote_dcs_for_local_cl", PW_TYPE_BOOLEAN, rlm_sql_cassandra_config_t, lbdc_allow_remote_dcs_for_local_cl), .dflt = "no" }
193 };
194 
196  { FR_CONF_OFFSET("exclusion_threshold", PW_TYPE_TIMEVAL, rlm_sql_cassandra_config_t, lar_exclusion_threshold), .dflt = "2.0" },
197  { FR_CONF_OFFSET("scale", PW_TYPE_TIMEVAL, rlm_sql_cassandra_config_t, lar_scale), .dflt = "0.1" },
198  { FR_CONF_OFFSET("retry_period", PW_TYPE_TIMEVAL, rlm_sql_cassandra_config_t, lar_retry_period), .dflt = "10" },
199  { FR_CONF_OFFSET("update_rate", PW_TYPE_TIMEVAL, rlm_sql_cassandra_config_t, lar_update_rate), .dflt = "0.1" },
200  { FR_CONF_OFFSET("min_measured", PW_TYPE_INTEGER64, rlm_sql_cassandra_config_t, lar_min_measured), .dflt = "50" }
201 };
202 
204  { FR_CONF_OFFSET("ca_file", PW_TYPE_FILE_INPUT, rlm_sql_cassandra_config_t, tls_ca_file) },
205  { FR_CONF_OFFSET("certificate_file", PW_TYPE_FILE_INPUT, rlm_sql_cassandra_config_t, tls_certificate_file) },
206  { FR_CONF_OFFSET("private_key_file", PW_TYPE_FILE_INPUT, rlm_sql_cassandra_config_t, tls_private_key_file) },
207  { FR_CONF_OFFSET("private_key_password", PW_TYPE_STRING | PW_TYPE_SECRET, rlm_sql_cassandra_config_t, tls_private_key_password) },
208 
209  { FR_CONF_OFFSET("verify_cert", PW_TYPE_STRING, rlm_sql_cassandra_config_t, tls_verify_cert_str) },
211 };
212 
213 static const CONF_PARSER driver_config[] = {
214  { FR_CONF_OFFSET("consistency", PW_TYPE_STRING, rlm_sql_cassandra_config_t, consistency_str), .dflt = "quorum" },
215 
216  { FR_CONF_OFFSET("protocol_version", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, protocol_version) },
217 
218  { FR_CONF_OFFSET("connections_per_host", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, connections_per_host) },
219  { FR_CONF_OFFSET("connections_per_host_max", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, connections_per_host_max) },
220 
221  { FR_CONF_OFFSET("io_threads", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, io_threads) },
222  { FR_CONF_OFFSET("io_queue_size", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, io_queue_size) },
223  { FR_CONF_OFFSET("io_flush_requests_max", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, io_flush_requests_max) },
224 
225  { FR_CONF_OFFSET("pending_requests_high", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, pending_requests_high) },
226  { FR_CONF_OFFSET("pending_requests_low", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, pending_requests_low) },
227  { FR_CONF_OFFSET("write_bytes_high", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, write_bytes_high) },
228  { FR_CONF_OFFSET("write_bytes_low", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, write_bytes_low) },
229 
230  { FR_CONF_OFFSET("event_queue_size", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, event_queue_size) },
231 
232  { FR_CONF_OFFSET("spawn_threshold", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, spawn_threshold) },
233  { FR_CONF_OFFSET("spawn_max", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, spawn_max) },
234  { FR_CONF_OFFSET("spawn_retry_delay", PW_TYPE_TIMEVAL, rlm_sql_cassandra_config_t, spawn_retry_delay) },
235 
236  { FR_CONF_POINTER("load_balance_dc_aware", PW_TYPE_SUBSECTION, NULL), .dflt = (void const *) load_balance_dc_aware_config },
237  { FR_CONF_OFFSET("load_balance_round_robin", PW_TYPE_BOOLEAN, rlm_sql_cassandra_config_t, load_balance_round_robin), .dflt = "no" },
238 
239  { FR_CONF_OFFSET("token_aware_routing", PW_TYPE_BOOLEAN, rlm_sql_cassandra_config_t, token_aware_routing), .dflt = "yes" },
240  { FR_CONF_POINTER("latency_aware_routing", PW_TYPE_SUBSECTION, NULL), .dflt = (void const *) latency_aware_routing_config },
241 
242  { FR_CONF_OFFSET("tcp_keepalive", PW_TYPE_INTEGER, rlm_sql_cassandra_config_t, tcp_keepalive) },
243  { FR_CONF_OFFSET("tcp_nodelay", PW_TYPE_BOOLEAN, rlm_sql_cassandra_config_t, tcp_nodelay), .dflt = "no" },
244 
245  { FR_CONF_POINTER("tls", PW_TYPE_SUBSECTION, NULL), .dflt = (void const *) tls_config },
247 };
248 
249 /** Log callback for libcassandra
250  *
251  * libcassandra seems to use this to log global events in the library, other messages
252  * like query errors are not logged here, and should be retrieved with functions like
253  * cass_future_error_message();
254  *
255  * Messages here do not need to be made available via sql_error.
256  *
257  * @param message Contains the log message and information about its source.
258  * @param data user data (not used).
259  */
260 static void _rlm_sql_cassandra_log(CassLogMessage const *message, UNUSED void *data)
261 {
262  switch (message->severity) {
263  case CASS_LOG_CRITICAL:
264  case CASS_LOG_ERROR:
265  if (DEBUG_ENABLED3) {
266  ERROR("rlm_sql_cassandra: %s[%d] %s: %s",
267  message->file, message->line, message->function, message->message);
268  } else {
269  ERROR("rlm_sql_cassandra: %s", message->message);
270  }
271  return;
272 
273  case CASS_LOG_WARN:
274  if (DEBUG_ENABLED3) {
275  WARN("rlm_sql_cassandra: %s[%d] %s: %s",
276  message->file, message->line, message->function, message->message);
277  } else {
278  WARN("rlm_sql_cassandra: %s", message->message);
279  }
280  return;
281 
282  case CASS_LOG_INFO:
283  case CASS_LOG_DISABLED:
284  case CASS_LOG_LAST_ENTRY:
285  if (DEBUG_ENABLED3) {
286  INFO("rlm_sql_cassandra: %s[%d] %s: %s",
287  message->file, message->line, message->function, message->message);
288  } else {
289  INFO("rlm_sql_cassandra: %s", message->message);
290  }
291  return;
292 
293  case CASS_LOG_DEBUG:
294  case CASS_LOG_TRACE:
295  default:
296  if (DEBUG_ENABLED3) {
297  DEBUG3("rlm_sql_cassandra: %s[%d] %s: %s",
298  message->file, message->line, message->function, message->message);
299  } else {
300  DEBUG2("rlm_sql_cassandra: %s", message->message);
301  }
302  return;
303  }
304 }
305 
306 /** Replace the last error messages associated with the connection
307  *
308  * This could be modified in future to maintain a circular buffer of log entries,
309  * but it's not required for now.
310  *
311  * @param conn to replace log message in.
312  * @param message from libcassandra.
313  * @param len of message.
314  */
315 static void sql_set_last_error(rlm_sql_cassandra_conn_t *conn, char const *message, size_t len)
316 {
317  talloc_free_children(conn->log_ctx);
318 
319  conn->last_error.msg = fr_asprint(conn->log_ctx, message, len, '\0');
320  conn->last_error.type = L_ERR;
321 }
322 
323 
324 /** Replace the last error messages associated with the connection
325  *
326  * This could be modified in future to maintain a circular buffer of log entries,
327  * but it's not required for now.
328  *
329  * @param conn to replace log message in.
330  * @param fmt of message.
331  * @param ... args.
332  */
333 static void sql_set_last_error_printf(rlm_sql_cassandra_conn_t *conn, char const *fmt, ...)
334  CC_HINT(format (printf, 2, 3));
335 static void sql_set_last_error_printf(rlm_sql_cassandra_conn_t *conn, char const *fmt, ...)
336 {
337  va_list ap;
338 
339  talloc_free_children(conn->log_ctx);
340 
341  va_start(ap, fmt);
342  conn->last_error.msg = talloc_vasprintf(conn->log_ctx, fmt, ap);
343  va_end(ap);
344  conn->last_error.type = L_ERR;
345 }
346 
348 {
349  if (config->ssl) cass_ssl_free(config->ssl);
350  if (config->session) cass_session_free(config->session); /* also synchronously closes the session */
351  if (config->cluster) cass_cluster_free(config->cluster);
352 
353 #ifdef HAVE_PTHREAD_H
354  pthread_mutex_destroy(&config->connect_mutex);
355 #endif
356  if (--rlm_sql_cass_instances == 0) cass_log_cleanup(); /* must be last call to libcassandra */
357 
358  return 0;
359 }
360 
362 {
363  static bool version_done = false;
364  bool do_tls = false;
365  bool do_latency_aware_routing = false;
366 
367  CassCluster *cluster;
368 
370 
371 #define DO_CASS_OPTION(_opt, _x) \
372 do {\
373  CassError _ret;\
374  if ((_ret = (_x)) != CASS_OK) {\
375  ERROR("rlm_sql_cassandra: Error setting " _opt ": %s", cass_error_desc(_ret));\
376  return RLM_SQL_ERROR;\
377  }\
378 } while (0)
379 
380  if (!version_done) {
381  version_done = true;
382 
383  INFO("rlm_sql_cassandra: Built against libcassandra version %d.%d.%d%s",
384  CASS_VERSION_MAJOR, CASS_VERSION_MINOR, CASS_VERSION_PATCH, CASS_VERSION_SUFFIX);
385 
386  /*
387  * Setup logging callbacks (only needs to be done once)
388  */
389  cass_log_set_level(CASS_LOG_INFO);
390  cass_log_set_callback(_rlm_sql_cassandra_log, NULL);
391  }
392 
393  MEM(driver = config->driver = talloc_zero(config, rlm_sql_cassandra_config_t));
394 #ifdef HAVE_PTHREAD_H
395  if (pthread_mutex_init(&driver->connect_mutex, NULL) < 0) {
396  ERROR("Failed initializing mutex: %s", fr_syserror(errno));
397  TALLOC_FREE(driver);
398  return -1;
399  }
400 #endif
401  talloc_set_destructor(driver, _mod_destructor);
402 
403  /*
404  * This has to be done before we call cf_section_parse
405  * as it sets default values, and creates the section.
406  */
407  if (cf_section_sub_find(conf, "tls")) do_tls = true;
408  if (cf_section_sub_find(conf, "latency_aware_routing")) do_latency_aware_routing = true;
409 
410  if (cf_section_parse(conf, driver, driver_config) < 0) return -1;
411 
412  DEBUG4("rlm_sql_cassandra: Configuring driver's CassCluster structure");
413  cluster = driver->cluster = cass_cluster_new();
414  if (!cluster) return RLM_SQL_ERROR;
415 
416  /*
417  * Parameters inherited from the top level SQL module config
418  */
419  DO_CASS_OPTION("sql_server", cass_cluster_set_contact_points(cluster, config->sql_server));
420  if (config->sql_port) DO_CASS_OPTION("sql_port", cass_cluster_set_port(cluster, config->sql_port));
421  /* Can't fail */
422  if (config->query_timeout) cass_cluster_set_request_timeout(cluster, config->query_timeout * 1000);
423  /* Can't fail */
424  if (config->sql_login && config->sql_password) cass_cluster_set_credentials(cluster, config->sql_login,
425  config->sql_password);
426 
427  /*
428  * Driver specific parameters
429  */
430  if (driver->consistency_str) {
431  int consistency;
432 
433  consistency = fr_str2int(consistency_levels, driver->consistency_str, -1);
434  if (consistency < 0) {
435  ERROR("rlm_sql_cassandra: Invalid consistency level \"%s\"", driver->consistency_str);
436  return -1;
437  }
438  driver->consistency = (CassConsistency)consistency;
439  }
440 
441  if (driver->protocol_version) {
442  DO_CASS_OPTION("protocol_version",
443  cass_cluster_set_protocol_version(driver->cluster, driver->protocol_version));
444  }
445 
446  if (driver->connections_per_host) {
447  DO_CASS_OPTION("connections_per_host",
448  cass_cluster_set_core_connections_per_host(driver->cluster,
449  driver->connections_per_host));
450  }
451 
452  if (driver->connections_per_host_max) {
453  DO_CASS_OPTION("connections_per_host_max",
454  cass_cluster_set_max_connections_per_host(driver->cluster,
455  driver->connections_per_host_max));
456  }
457 
458  if (driver->io_threads) {
459  DO_CASS_OPTION("io_threads", cass_cluster_set_num_threads_io(driver->cluster, driver->io_threads));
460  }
461 
462  if (driver->io_queue_size) {
463  DO_CASS_OPTION("io_queue_size",
464  cass_cluster_set_num_threads_io(driver->cluster, driver->io_queue_size));
465  }
466 
467  if (driver->io_flush_requests_max) {
468  DO_CASS_OPTION("io_flush_requests_max",
469  cass_cluster_set_max_requests_per_flush(driver->cluster,
470  driver->io_flush_requests_max));
471  }
472 
473  if (driver->pending_requests_high) {
474  DO_CASS_OPTION("pending_requests_high",
475  cass_cluster_set_pending_requests_high_water_mark(driver->cluster,
476  driver->pending_requests_high));
477  }
478 
479  if (driver->pending_requests_low) {
480  DO_CASS_OPTION("pending_requests_low",
481  cass_cluster_set_pending_requests_high_water_mark(driver->cluster,
482  driver->pending_requests_low));
483  }
484 
485  if (driver->write_bytes_high) {
486  DO_CASS_OPTION("write_bytes_high",
487  cass_cluster_set_write_bytes_high_water_mark(driver->cluster,
488  driver->write_bytes_high));
489  }
490 
491  if (driver->write_bytes_low) {
492  DO_CASS_OPTION("write_bytes_low",
493  cass_cluster_set_write_bytes_low_water_mark(driver->cluster,
494  driver->write_bytes_low));
495  }
496 
497  if (driver->event_queue_size) {
498  DO_CASS_OPTION("event_queue_size",
499  cass_cluster_set_num_threads_io(driver->cluster, driver->event_queue_size));
500  }
501 
502  if (driver->spawn_threshold) {
503  DO_CASS_OPTION("spawn_threshold",
504  cass_cluster_set_max_concurrent_requests_threshold(driver->cluster,
505  driver->spawn_threshold));
506  }
507 
508  if (driver->spawn_max) {
509  DO_CASS_OPTION("spawn_max",
510  cass_cluster_set_max_concurrent_creation(driver->cluster, driver->spawn_max));
511  }
512 
513  {
514  uint32_t delay;
515 
516  delay = (driver->spawn_retry_delay.tv_sec * (uint64_t)1000) +
517  (driver->spawn_retry_delay.tv_usec / 1000);
518  if (delay) cass_cluster_set_reconnect_wait_time(driver->cluster, delay);
519  }
520 
521  if (driver->load_balance_round_robin) cass_cluster_set_load_balance_round_robin(driver->cluster);
522 
523  cass_cluster_set_token_aware_routing(driver->cluster, driver->token_aware_routing);
524 
525  if (driver->lbdc_local_dc) {
526  DO_CASS_OPTION("load_balance_dc_aware",
527  cass_cluster_set_load_balance_dc_aware(driver->cluster,
528  driver->lbdc_local_dc,
529  driver->lbdc_hosts_per_remote_dc,
531  }
532 
533  if (do_latency_aware_routing) {
534  cass_double_t exclusion_threshold;
535  uint64_t scale_ms, retry_period_ms, update_rate_ms;
536 
537  exclusion_threshold = driver->lar_exclusion_threshold.tv_sec +
538  (driver->lar_exclusion_threshold.tv_usec / 1000000);
539 
540  scale_ms = (driver->lar_scale.tv_sec * (uint64_t)1000) + (driver->lar_scale.tv_usec / 1000);
541  retry_period_ms = (driver->lar_retry_period.tv_sec * (uint64_t)1000) +
542  (driver->lar_retry_period.tv_usec / 1000);
543  update_rate_ms = (driver->lar_update_rate.tv_sec * (uint64_t)1000) +
544  (driver->lar_update_rate.tv_usec / 1000);
545 
546  /* Can't fail */
547  cass_cluster_set_latency_aware_routing(driver->cluster, true);
548 
549  /* Can't fail */
550  cass_cluster_set_latency_aware_routing_settings(driver->cluster,
551  exclusion_threshold,
552  scale_ms,
553  retry_period_ms,
554  update_rate_ms,
555  driver->lar_min_measured);
556  }
557 
558  if (driver->tcp_keepalive) cass_cluster_set_tcp_keepalive(driver->cluster, true, driver->tcp_keepalive);
559  cass_cluster_set_tcp_nodelay(driver->cluster, driver->tcp_nodelay);
560 
561  if (do_tls) {
562  CassSsl *ssl;
563 
564  ssl = driver->ssl = cass_ssl_new();
565  if (!ssl) return RLM_SQL_ERROR;
566 
567  if (driver->tls_verify_cert_str) {
568  int verify_cert;
569 
570  verify_cert = fr_str2int(verify_cert_table, driver->tls_verify_cert_str, -1);
571  if (verify_cert < 0) {
572  ERROR("rlm_sql_cassandra: Invalid certificate validation type \"%s\", "
573  "must be one of 'yes', 'no', 'identity'", driver->tls_verify_cert_str);
574  return -1;
575  }
576  cass_ssl_set_verify_flags(ssl, verify_cert);
577  }
578 
579  DEBUG2("rlm_sql_cassandra: Enabling TLS");
580 
581  if (driver->tls_ca_file) {
582  DO_CASS_OPTION("ca_file", cass_ssl_add_trusted_cert(ssl, driver->tls_ca_file));
583  }
584 
585  if (driver->tls_certificate_file) {
586  DO_CASS_OPTION("certificate_file", cass_ssl_set_cert(ssl, driver->tls_certificate_file));
587  }
588 
589  if (driver->tls_private_key_file) {
590  DO_CASS_OPTION("private_key", cass_ssl_set_private_key(ssl, driver->tls_private_key_file,
591  driver->tls_private_key_password));
592  }
593 
594  cass_cluster_set_ssl(cluster, ssl);
595  }
596 
597  driver->session = cass_session_new();
598  if (!driver->session) return RLM_SQL_ERROR;
599 
601 
602  return 0;
603 }
604 
606 {
607  DEBUG2("rlm_sql_cassandra: Socket destructor called, closing socket");
608 
609  if (conn->iterator) cass_iterator_free(conn->iterator);
610  if (conn->result) cass_result_free(conn->result);
611 
612  return 0;
613 }
614 
615 static sql_rcode_t sql_socket_init(rlm_sql_handle_t *handle, rlm_sql_config_t *config, struct timeval const *timeout)
616 {
618  rlm_sql_cassandra_config_t *driver = config->driver;
619 
620  MEM(conn = handle->conn = talloc_zero(handle, rlm_sql_cassandra_conn_t));
621  talloc_set_destructor(conn, _sql_socket_destructor);
622 
623  /*
624  * We do this one inside sql_socket_init, to allow pool.start = 0 to
625  * work as expected (allow the server to start if Cassandra is
626  * unavailable).
627  */
628  if (!driver->done_connect_keyspace) {
629  CassFuture *future;
630  CassError ret;
631 
632 #ifdef HAVE_PTHREAD_H
633  pthread_mutex_lock(&driver->connect_mutex);
634 #endif
635  if (!driver->done_connect_keyspace) {
636  /*
637  * Easier to do this here instead of mod_instantiate
638  * as we don't have a pointer to the pool.
639  */
640  cass_cluster_set_connect_timeout(driver->cluster, FR_TIMEVAL_TO_MS(timeout));
641 
642  DEBUG2("rlm_sql_cassandra: Connecting to Cassandra cluster");
643  future = cass_session_connect_keyspace(driver->session, driver->cluster, config->sql_db);
644  ret = cass_future_error_code(future);
645  if (ret != CASS_OK) {
646  const char *msg;
647  size_t msg_len;
648 
649  cass_future_error_message(future, &msg, &msg_len);
650  ERROR("rlm_sql_cassandra: Unable to connect: [%x] %s", (int)ret, msg);
651  cass_future_free(future);
652 
653  return RLM_SQL_ERROR;
654  }
655  cass_future_free(future);
656  driver->done_connect_keyspace = true;
657  }
658 #ifdef HAVE_PTHREAD_H
659  pthread_mutex_unlock(&driver->connect_mutex);
660 #endif
661  }
662  conn->log_ctx = talloc_pool(conn, 1024); /* Pre-allocate some memory for log messages */
663 
664  return RLM_SQL_OK;
665 }
666 
667 static sql_rcode_t sql_query(rlm_sql_handle_t *handle, rlm_sql_config_t *config, char const *query)
668 {
669  rlm_sql_cassandra_conn_t *conn = handle->conn;
671  CassStatement *statement;
672  CassFuture *future;
673  CassError ret;
674 
675  statement = cass_statement_new_n(query, talloc_array_length(query) - 1, 0);
676  if (conf->consistency_str) cass_statement_set_consistency(statement, conf->consistency);
677 
678  future = cass_session_execute(conf->session, statement);
679  cass_statement_free(statement);
680 
681  ret = cass_future_error_code(future);
682  if (ret != CASS_OK) {
683  char const *error;
684  size_t len;
685 
686  cass_future_error_message(future, &error, &len);
687  sql_set_last_error(conn, error, len);
688  cass_future_free(future);
689 
690  switch (ret) {
691  case CASS_ERROR_SERVER_SYNTAX_ERROR:
692  case CASS_ERROR_SERVER_INVALID_QUERY:
693  return RLM_SQL_QUERY_INVALID;
694 
695  default:
696  return RLM_SQL_ERROR;
697  }
698  }
699 
700  conn->result = cass_future_get_result(future);
701  cass_future_free(future);
702 
703  return RLM_SQL_OK;
704 }
705 
707 {
708  rlm_sql_cassandra_conn_t *conn = handle->conn;
709 
710  return conn->result ? cass_result_column_count(conn->result) : 0;
711 }
712 
714 {
715  rlm_sql_cassandra_conn_t *conn = handle->conn;
716 
717  return conn->result ? cass_result_row_count(conn->result) : 0;
718 }
719 
720 static sql_rcode_t sql_fields(char const **out[], rlm_sql_handle_t *handle, rlm_sql_config_t *config)
721 {
722  rlm_sql_cassandra_conn_t *conn = handle->conn;
723 
724  unsigned int fields, i;
725  char const **names;
726 
727  fields = sql_num_fields(handle, config);
728  if (fields == 0) return RLM_SQL_ERROR;
729 
730  MEM(names = talloc_array(handle, char const *, fields));
731 
732  for (i = 0; i < fields; i++) {
733  const char *col_name;
734  size_t col_name_len;
735 
736  /* Writes out a pointer to a buffer in the result */
737  cass_result_column_name(conn->result, i, &col_name, &col_name_len);
738  names[i] = col_name;
739  }
740 
741  *out = names;
742 
743  return RLM_SQL_OK;
744 }
745 
747 {
748 
749  rlm_sql_cassandra_conn_t *conn = handle->conn;
750  CassRow const *cass_row;
751  int fields, i;
752  char **row;
753 
754 #define RLM_CASS_ERR_DATA_RETRIVE(_t) \
755 do {\
756  char const *_col_name;\
757  size_t _col_name_len;\
758  CassError _ret;\
759  if ((_ret = cass_result_column_name(conn->result, i, &_col_name, &_col_name_len)) != CASS_OK) {\
760  _col_name = "<INVALID>";\
761  }\
762  sql_set_last_error_printf(conn, "Failed to retrieve " _t " data at column %s (%d): %s", \
763  _col_name, i, cass_error_desc(_ret));\
764  TALLOC_FREE(handle->row);\
765  return RLM_SQL_ERROR;\
766 } while(0)
767 
768  if (!conn->result) return RLM_SQL_OK; /* no result */
769 
770  *out = NULL;
771 
772  /*
773  * Start of the result set, initialise the iterator.
774  */
775  if (!conn->iterator) conn->iterator = cass_iterator_from_result(conn->result);
776  if (!conn->iterator) return RLM_SQL_OK; /* no result */
777 
778  if (!cass_iterator_next(conn->iterator)) return RLM_SQL_OK; /* no more rows */
779 
780  cass_row = cass_iterator_get_row(conn->iterator); /* this shouldn't fail ? */
781  fields = sql_num_fields(handle, config); /* get the number of fields... */
782 
783  /*
784  * Free the previous result (also gets called on finish_query)
785  */
786  talloc_free(handle->row);
787  MEM(row = handle->row = talloc_zero_array(handle, char *, fields + 1));
788 
789  for (i = 0; i < fields; i++) {
790  CassValue const *value;
791  CassValueType type;
792 
793  value = cass_row_get_column(cass_row, i);
794 
795  if (cass_value_is_null(value) == cass_true) continue;
796 
797  type = cass_value_type(value);
798  switch (type) {
799  case CASS_VALUE_TYPE_ASCII:
800  case CASS_VALUE_TYPE_TEXT:
801  case CASS_VALUE_TYPE_VARCHAR:
802  {
803  const char *str;
804  size_t len;
805 
806  if (cass_value_get_string(value, &str, &len) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("string");
807 
808  MEM(row[i] = talloc_array(row, char, len + 1));
809  memcpy(row[i], str, len);
810  row[i][len] = '\0';
811  }
812  break;
813 
814  case CASS_VALUE_TYPE_BOOLEAN:
815  {
816  cass_bool_t bv;
817 
818  if (cass_value_get_bool(value, &bv) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("bool");
819 
820  MEM(row[i] = talloc_zero_array(row, char, 2));
821  row[i][0] = (bv == cass_false) ? '0' : '1';
822  }
823  break;
824 
825  case CASS_VALUE_TYPE_INT:
826  {
827  cass_int32_t i32v;
828 
829  if (cass_value_get_int32(value, &i32v) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("int32");
830 
831  MEM(row[i] = talloc_asprintf(row, "%"PRId32, (int32_t)i32v));
832  }
833  break;
834 
835  case CASS_VALUE_TYPE_TIMESTAMP:
836  case CASS_VALUE_TYPE_BIGINT:
837  {
838  cass_int64_t i64v;
839 
840  if (cass_value_get_int64(value, &i64v) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("int64");
841 
842  MEM(row[i] = talloc_asprintf(row, "%"PRId64, (int64_t)i64v));
843  }
844  break;
845 
846  case CASS_VALUE_TYPE_UUID:
847  case CASS_VALUE_TYPE_TIMEUUID:
848  {
849  CassUuid uuid;
850 
851  if (cass_value_get_uuid(value, &uuid) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("UUID");
852  MEM(row[i] = talloc_array(row, char, CASS_UUID_STRING_LENGTH));
853  cass_uuid_string(uuid, row[i]);
854  }
855  break;
856 
857  default:
858  {
859  const char *col_name;
860  size_t col_name_len;
861 
862  if (cass_result_column_name(conn->result, i, &col_name,
863  &col_name_len) != CASS_OK) col_name = "<INVALID>";
864 
866  "Failed to retrieve data at column %s (%d): Unsupported data type",
867  col_name, i);
868  talloc_free(handle->row);
869  return RLM_SQL_ERROR;
870  }
871  }
872  }
873  *out = row;
874 
875  return RLM_SQL_OK;
876 }
877 
879 {
880  rlm_sql_cassandra_conn_t *conn = handle->conn;
881 
882  if (handle->row) TALLOC_FREE(handle->row);
883 
884  if (conn->iterator) {
885  cass_iterator_free(conn->iterator);
886  conn->iterator = NULL;
887  }
888 
889  if (conn->result) {
890  cass_result_free(conn->result);
891  conn->result = NULL;
892  }
893 
894  return RLM_SQL_OK;
895 }
896 
897 static size_t sql_error(UNUSED TALLOC_CTX *ctx, sql_log_entry_t out[], size_t outlen,
898  rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t *config)
899 {
900  rlm_sql_cassandra_conn_t *conn = handle->conn;
901 
902  if (conn->last_error.msg && (outlen >= 1)) {
903  out[0].msg = conn->last_error.msg;
904  out[0].type = conn->last_error.type;
905  conn->last_error.msg = NULL;
906 
907  return 1;
908  }
909 
910  return 0;
911 }
912 
914 {
915  rlm_sql_cassandra_conn_t *conn = handle->conn;
916 
917  /*
918  * Clear our local log buffer, and free any messages which weren't
919  * reparented (so we don't leak memory).
920  */
921  talloc_free_children(conn->log_ctx);
922  memset(&conn->last_error, 0, sizeof(conn->last_error));
923 
924  return sql_free_result(handle, config);
925 }
926 
927 /*
928  * The cassandra model is different, as it's distributed, and does
929  * upserts instead of inserts...
930  *
931  * There's a good article on it here:
932  * http://planetcassandra.org/blog/how-to-do-an-upsert-in-cassandra/
933  */
935 {
936  return 1;
937 }
938 
939 /* Exported to rlm_sql */
941 rlm_sql_module_t rlm_sql_cassandra = {
942  .name = "rlm_sql_cassandra",
943  .mod_instantiate = mod_instantiate,
944  .sql_socket_init = sql_socket_init,
945  .sql_query = sql_query,
946  .sql_select_query = sql_query,
947  .sql_num_fields = sql_num_fields,
948  .sql_num_rows = sql_num_rows,
949  .sql_affected_rows = sql_affected_rows,
950  .sql_fields = sql_fields,
951  .sql_fetch_row = sql_fetch_row,
952  .sql_free_result = sql_free_result,
953  .sql_error = sql_error,
954  .sql_finish_query = sql_finish_query,
955  .sql_finish_select_query = sql_finish_query
956 };
char const * lbdc_local_dc
The primary data center to try first.
#define PW_TYPE_FILE_INPUT
File matching value must exist, and must be readable.
Definition: conffile.h:204
rlm_sql_module_t rlm_sql_cassandra
uint32_t io_flush_requests_max
Maximum number of requests processed by an IO worker per flush.
General connection/server error.
Definition: rlm_sql.h:46
#define pthread_mutex_init(_x, _y)
Definition: rlm_eap.h:75
uint32_t spawn_threshold
Threshold for the maximum number of concurrent requests in-flight on a connection before creating a n...
#define DEBUG3(fmt,...)
Definition: log.h:177
Time value (struct timeval), only for config items.
Definition: radius.h:55
static void _rlm_sql_cassandra_log(CassLogMessage const *message, UNUSED void *data)
Log callback for libcassandra.
Prototypes and functions for the SQL module.
char ** rlm_sql_row_t
Definition: rlm_sql.h:59
uint32_t write_bytes_low
Low water mark for number of bytes outstanding on a connection.
log_type_t type
Type of log entry L_ERR, L_WARN, L_INFO, L_DBG etc..
Definition: rlm_sql.h:62
CassResult const * result
Result from executing a query.
#define DEBUG_ENABLED3
True if global debug level 1-3 messages are enabled.
Definition: log.h:171
char const * tls_private_key_password
String to decrypt private key.
#define MEM(x)
Definition: radiusd.h:396
#define INFO(fmt,...)
Definition: log.h:143
char const * msg
Log message.
Definition: rlm_sql.h:63
struct timeval lar_scale
Weight given to older latencies when calculating the average latency of a node.
#define CC_HINT(_x)
Definition: build.h:71
static int _sql_socket_destructor(rlm_sql_cassandra_conn_t *conn)
#define UNUSED
Definition: libradius.h:134
struct rlm_sql_cassandra_config rlm_sql_cassandra_config_t
Cassandra driver instance.
static CONF_PARSER load_balance_dc_aware_config[]
Error message.
Definition: log.h:36
#define CONF_PARSER_TERMINATOR
Definition: conffile.h:289
char const * sql_server
Server to connect to.
Definition: rlm_sql.h:85
#define DO_CASS_OPTION(_opt, _x)
static int sql_affected_rows(UNUSED rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t *config)
static int mod_instantiate(CONF_SECTION *conf, rlm_sql_config_t *config)
#define PW_TYPE_SECRET
Only print value if debug level >= 3.
Definition: conffile.h:202
static float timeout
Definition: radclient.c:43
#define RLM_CASS_ERR_DATA_RETRIVE(_t)
char const * tls_private_key_file
Private key for the certificate we present to the server.
bool token_aware_routing
Whether to use token aware routing.
Cassandra driver instance.
#define PW_TYPE_SUBSECTION
Definition: conffile.h:188
static sql_rcode_t sql_free_result(rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t *config)
bool done_connect_keyspace
Whether we've connected to a keyspace.
Defines a CONF_PAIR to C data type mapping.
Definition: conffile.h:267
static sql_rcode_t sql_fields(char const **out[], rlm_sql_handle_t *handle, rlm_sql_config_t *config)
uint32_t connections_per_host_max
Maximum number of connections to each server in each IO threads.
static sql_rcode_t sql_query(rlm_sql_handle_t *handle, rlm_sql_config_t *config, char const *query)
Cassandra cluster connection.
uint32_t pending_requests_low
Sets the low water mark for the number of requests queued waiting for a connection in a connection po...
char const * tls_certificate_file
Public certificate we present to the server.
struct rlm_sql_cassandra_conn rlm_sql_cassandra_conn_t
Cassandra cluster connection.
char const * name
Definition: rlm_sql.h:191
int fr_str2int(FR_NAME_NUMBER const *table, char const *name, int def)
Definition: token.c:451
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition: log.c:238
#define pthread_mutex_unlock(_x)
Definition: rlm_eap.h:78
static const FR_NAME_NUMBER verify_cert_table[]
static void sql_set_last_error_printf(rlm_sql_cassandra_conn_t *conn, char const *fmt,...) CC_HINT(format(printf
Replace the last error messages associated with the connection.
uint32_t spawn_max
The maximum number of connections that will be created concurrently.
sql_rcode_t
Definition: rlm_sql.h:44
static void sql_set_last_error(rlm_sql_cassandra_conn_t *conn, char const *message, size_t len)
Replace the last error messages associated with the connection.
char const * tls_ca_file
Path to the CA used to validate the server's certificate.
uint32_t sql_port
Port to connect to.
Definition: rlm_sql.h:86
bool lbdc_allow_remote_dcs_for_local_cl
Allows remote hosts to be used if no local.
static size_t sql_error(UNUSED TALLOC_CTX *ctx, sql_log_entry_t out[], size_t outlen, rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t *config)
#define DEBUG2(fmt,...)
Definition: log.h:176
int cf_section_parse(CONF_SECTION *, void *base, CONF_PARSER const *variables)
Parse a configuration section into user-supplied variables.
Definition: conffile.c:2234
Definition: rlm_sql.h:61
struct timeval spawn_retry_delay
Amount of time to wait before attempting to reconnect.
uint32_t connections_per_host
Number of connections to each server in each IO thread.
CassSsl * ssl
Connection's SSL context.
char const * consistency_str
Level of consistency required.
CassConsistency consistency
Level of consistency converted to a constant.
Success.
Definition: rlm_sql.h:47
uint32_t protocol_version
The protocol version.
uint32_t write_bytes_high
High water mark for the number of bytes outstanding on a connection.
uint32_t lbdc_hosts_per_remote_dc
The number of host used in each remote DC if no hosts are available in the local dc.
A truth value.
Definition: radius.h:56
static CONF_PARSER latency_aware_routing_config[]
static int rlm_sql_cass_instances
char const * sql_password
Login password to use.
Definition: rlm_sql.h:88
void * driver
Where drivers should write a pointer to their configurations.
Definition: rlm_sql.h:135
32 Bit unsigned integer.
Definition: radius.h:34
static int sql_num_rows(rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t *config)
static rs_t * conf
Definition: radsniff.c:46
void * conn
Database specific connection handle.
Definition: rlm_sql.h:153
CONF_SECTION * cf_section_sub_find(CONF_SECTION const *, char const *name)
Find a sub-section in a section.
Definition: conffile.c:3708
#define DEBUG4(fmt,...)
Definition: log.h:178
CassCluster * cluster
Configuration of the cassandra cluster connection.
64 Bit unsigned integer.
Definition: radius.h:51
rlm_sql_row_t row
Row data from the last query.
Definition: rlm_sql.h:154
char const * tls_verify_cert_str
Whether we validate the cert provided by the server.
static sql_rcode_t sql_finish_query(rlm_sql_handle_t *handle, rlm_sql_config_t *config)
static sql_rcode_t sql_socket_init(rlm_sql_handle_t *handle, rlm_sql_config_t *config, struct timeval const *timeout)
bool tcp_nodelay
Disable TCP naggle algorithm.
uint8_t data[]
Definition: eap_pwd.h:625
static const FR_NAME_NUMBER consistency_levels[]
static int sql_num_fields(rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t *config)
struct timeval lar_update_rate
The rate at which the best average latency is recomputed.
#define FR_CONF_OFFSET(_n, _t, _s, _f)
Definition: conffile.h:168
char const * sql_login
Login credentials to use.
Definition: rlm_sql.h:87
TALLOC_CTX * log_ctx
Prevent unneeded memory allocation by keeping a permanent pool, to store log entries.
char * fr_asprint(TALLOC_CTX *ctx, char const *in, ssize_t inlen, char quote)
Escape string that may contain binary data, and write it to a new buffer.
Definition: print.c:390
Query syntax error.
Definition: rlm_sql.h:45
static int _mod_destructor(rlm_sql_cassandra_config_t *config)
CassIterator * iterator
Row set iterator.
uint64_t lar_min_measured
The minimum number of measurements per-host required to be considered by the policy.
static sql_rcode_t sql_fetch_row(rlm_sql_row_t *out, rlm_sql_handle_t *handle, rlm_sql_config_t *config)
#define WARN(fmt,...)
Definition: log.h:144
static const CONF_PARSER driver_config[]
uint32_t query_timeout
How long to allow queries to run for.
Definition: rlm_sql.h:130
uint32_t tcp_keepalive
How often to send TCP keepalives.
bool load_balance_round_robin
Enable round robin load balancing.
char const * sql_db
Database to run queries against.
Definition: rlm_sql.h:89
sql_log_entry_t last_error
uint32_t io_queue_size
Size of the the fixed size queue that stores pending requests.
uint32_t pending_requests_high
Sets the high water mark for the number of requests queued waiting for a connection in a connection p...
#define pthread_mutex_lock(_x)
Definition: rlm_eap.h:77
struct timeval lar_retry_period
The amount of time a node is penalized by the policy before being given a second chance when the curr...
#define FR_TIMEVAL_TO_MS(_x)
Definition: conffile.h:235
String of printable characters.
Definition: radius.h:33
#define FR_CONF_POINTER(_n, _t, _p)
Definition: conffile.h:172
static CONF_PARSER tls_config[]
uint32_t event_queue_size
Sets the size of the the fixed size queue that stores events.
#define pthread_mutex_destroy(_x)
Definition: rlm_eap.h:76
uint32_t io_threads
Number of IO threads.
CassSession * session
Cluster's connection pool.
#define ERROR(fmt,...)
Definition: log.h:145
struct timeval lar_exclusion_threshold
How much worse the latency me be, compared to the average latency of the best performing node before ...