The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
rlm_sql_cassandra.c
Go to the documentation of this file.
1 /**
2  * This is free and unencumbered software released into the public domain.
3  *
4  * Anyone is free to copy, modify, publish, use, compile, sell, or
5  * distribute this software, either in source code form or as a compiled
6  * binary, for any purpose, commercial or non-commercial, and by any
7  * means.
8  *
9  * In jurisdictions that recognize copyright laws, the author or authors
10  * of this software dedicate any and all copyright interest in the
11  * software to the public domain. We make this dedication for the benefit
12  * of the public at large and to the detriment of our heirs and
13  * successors. We intend this dedication to be an overt act of
14  * relinquishment in perpetuity of all present and future rights to this
15  * software under copyright law.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
20  * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
21  * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
22  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
23  * OTHER DEALINGS IN THE SOFTWARE.
24  *
25  * For more information, please refer to <http://unlicense.org>
26  */
27 
28 /**
29  * $Id: d544f2fe79d4529d712097c97f868aa476ea3aa1 $
30  * @file rlm_sql_cassandra.c
31  * @brief Cassandra SQL driver
32  *
33  * @author Linnaea Von Lavia (le.concorde.4590@gmail.com)
34  * @author Arran Cudbard-Bell (a.cudbardb@freeradius.org)
35  */
36 #define LOG_PREFIX "sql - cassandra"
37 
38 #include <freeradius-devel/server/base.h>
39 #include <freeradius-devel/util/debug.h>
40 
41 #ifdef HAVE_WDOCUMENTATION
42 DIAG_OFF(documentation)
43 #endif
44 DIAG_OFF(strict-prototypes) /* Seen with homebrew cassandra-cpp-driver 2.15.3 */
45 #include <cassandra.h>
46 DIAG_ON(strict-prototypes)
47 #ifdef HAVE_WDOCUMENTATION
48 DIAG_ON(documentation)
49 #endif
50 
51 #include "rlm_sql.h"
52 #include "rlm_sql_trunk.h"
53 
54 typedef struct {
55  bool done_connect_keyspace; //!< Whether we've connected to a keyspace.
56  pthread_mutex_t connect_mutex; //!< Mutex to prevent multiple connections attempting
57  ///< to connect to a keyspace concurrently.
59 
60 /** Cassandra driver instance
61  *
62  */
63 typedef struct {
64  CassCluster *cluster; //!< Configuration of the cassandra cluster connection.
65  CassSession *session; //!< Cluster's connection pool.
66  CassSsl *ssl; //!< Connection's SSL context.
67  rlm_sql_cassandra_mutable_t *mutable; //!< Instance data which needs to change post instantiation.
68 
69  /*
70  * Configuration options
71  */
72  char const *consistency_str; //!< Level of consistency required.
73  CassConsistency consistency; //!< Level of consistency converted to a constant.
74 
75  uint32_t protocol_version; //!< The protocol version.
76 
77  uint32_t connections_per_host; //!< Number of connections to each server in each
78  //!< IO thread.
79  uint32_t io_threads; //!< Number of IO threads.
80 
81  uint32_t io_queue_size; //!< Size of the the fixed size queue that stores
82  //!< pending requests.
83 
84  uint32_t event_queue_size; //!< Sets the size of the the fixed size queue
85  //!< that stores events.
86 
87  bool load_balance_round_robin; //!< Enable round robin load balancing.
88 
89  bool token_aware_routing; //!< Whether to use token aware routing.
90 
91  char const *lbdc_local_dc; //!< The primary data center to try first.
92  uint32_t lbdc_hosts_per_remote_dc; //!< The number of host used in each remote DC if
93  //!< no hosts are available in the local dc
94 
95  bool lbdc_allow_remote_dcs_for_local_cl; //!< Allows remote hosts to be used if no local
96  //!< dc hosts are available and the consistency level
97  //!< is LOCAL_ONE or LOCAL_QUORUM.
98 
99  double lar_exclusion_threshold; //!< How much worse the latency me be, compared to
100  //!< the average latency of the best performing node
101  //!< before it's penalized.
102  //!< This gets mangled to a double.
103 
104  fr_time_delta_t lar_scale; //!< Weight given to older latencies when calculating
105  //!< the average latency of a node. A bigger scale will
106  //!< give more weight to older latency measurements.
107 
108  fr_time_delta_t lar_retry_period; //!< The amount of time a node is penalized by the
109  //!< policy before being given a second chance when
110  //!< the current average latency exceeds the calculated
111  //!< threshold
112  //!< (exclusion_threshold * best_average_latency).
113 
114  fr_time_delta_t lar_update_rate; //!< The rate at which the best average latency is
115  //!< recomputed.
116  uint64_t lar_min_measured; //!< The minimum number of measurements per-host
117  //!< required to be considered by the policy.
118 
119  uint32_t tcp_keepalive; //!< How often to send TCP keepalives.
120  bool tcp_nodelay; //!< Disable TCP naggle algorithm.
121 
122  char const *tls_ca_file; //!< Path to the CA used to validate the server's
123  //!< certificate.
124  char const *tls_certificate_file; //!< Public certificate we present to the server.
125  char const *tls_private_key_file; //!< Private key for the certificate we present to the
126  //!< server.
127  char const *tls_private_key_password; //!< String to decrypt private key.
128  char const *tls_verify_cert_str; //!< Whether we validate the cert provided by the
129  //!< server.
131 
132 /** Cassandra cluster connection
133  *
134  */
135 typedef struct {
136  connection_t *conn; //!< Generic connection structure for managing this handle.
137  rlm_sql_cassandra_t const *inst; //!< Module instance for this connection.
138  rlm_sql_config_t const *config; //!< SQL instance config
139  CassIterator *iterator; //!< Row set iterator.
140 
141  TALLOC_CTX *log_ctx; //!< Prevent unneeded memory allocation by keeping a
142  //!< permanent pool, to store log entries.
143  fr_dlist_head_t queries; //!< Outstanding queries on this connection.
144  fr_event_timer_t const *read_ev; //!< Polling event for reading query results.
145  fr_event_timer_t const *write_ev; //!< Polling event for sending queries.
146  uint poll_interval; //!< Interval between read polling.
147  uint poll_count; //!< How many consecutive polls had no available results.
149 
150 /** Structure for tracking outstanding queries
151  *
152  */
153 typedef struct {
154  fr_dlist_t entry; //!< Entry in list of outstanding queries.
155  fr_sql_query_t *query_ctx; //!< SQL query ctx.
156  CassFuture *future; //!< Future produced when submitting query.
158 
159 /** Driver specific data to attach to the query ctx
160  *
161  */
162 typedef struct {
163  CassResult const *result; //!< Cassandra result handle
164  sql_log_entry_t error; //!< Most recent Cassandra error message for this query.
166 
168  { L("all"), CASS_CONSISTENCY_ALL },
169  { L("any"), CASS_CONSISTENCY_ANY },
170  { L("each_quorum"), CASS_CONSISTENCY_EACH_QUORUM },
171  { L("local_one"), CASS_CONSISTENCY_LOCAL_ONE },
172  { L("local_quorum"), CASS_CONSISTENCY_LOCAL_QUORUM },
173  { L("one"), CASS_CONSISTENCY_ONE },
174  { L("quorum"), CASS_CONSISTENCY_QUORUM },
175  { L("three"), CASS_CONSISTENCY_THREE },
176  { L("two"), CASS_CONSISTENCY_TWO }
177 };
179 
181  { L("identity"), CASS_SSL_VERIFY_PEER_IDENTITY },
182  { L("no"), CASS_SSL_VERIFY_NONE },
183  { L("yes"), CASS_SSL_VERIFY_PEER_CERT }
184 };
186 
188  { FR_CONF_OFFSET("local_dc", rlm_sql_cassandra_t, lbdc_local_dc) },
189  { FR_CONF_OFFSET("hosts_per_remote_dc", rlm_sql_cassandra_t, lbdc_hosts_per_remote_dc), .dflt = "0" },
190  { FR_CONF_OFFSET("allow_remote_dcs_for_local_cl", rlm_sql_cassandra_t, lbdc_allow_remote_dcs_for_local_cl), .dflt = "no" },
192 };
193 
195  { FR_CONF_OFFSET("exclusion_threshold", rlm_sql_cassandra_t, lar_exclusion_threshold), .dflt = "2.0" },
196  { FR_CONF_OFFSET("scale", rlm_sql_cassandra_t, lar_scale), .dflt = "0.1" },
197  { FR_CONF_OFFSET("retry_period", rlm_sql_cassandra_t, lar_retry_period), .dflt = "10" },
198  { FR_CONF_OFFSET("update_rate", rlm_sql_cassandra_t, lar_update_rate), .dflt = "0.1" },
199  { FR_CONF_OFFSET("min_measured", rlm_sql_cassandra_t, lar_min_measured), .dflt = "50" },
201 };
202 
204  { FR_CONF_OFFSET_FLAGS("ca_file", CONF_FLAG_FILE_INPUT, rlm_sql_cassandra_t, tls_ca_file) },
205  { FR_CONF_OFFSET_FLAGS("certificate_file", CONF_FLAG_FILE_INPUT, rlm_sql_cassandra_t, tls_certificate_file) },
206  { FR_CONF_OFFSET_FLAGS("private_key_file", CONF_FLAG_FILE_INPUT, rlm_sql_cassandra_t, tls_private_key_file) },
207  { FR_CONF_OFFSET_FLAGS("private_key_password", CONF_FLAG_SECRET, rlm_sql_cassandra_t, tls_private_key_password) },
208 
209  { FR_CONF_OFFSET("verify_cert", rlm_sql_cassandra_t, tls_verify_cert_str) },
211 };
212 
213 static const conf_parser_t driver_config[] = {
214  { FR_CONF_OFFSET("consistency", rlm_sql_cassandra_t, consistency_str), .dflt = "quorum" },
215 
216  { FR_CONF_OFFSET("protocol_version", rlm_sql_cassandra_t, protocol_version) },
217 
218  { FR_CONF_OFFSET("connections_per_host", rlm_sql_cassandra_t, connections_per_host) },
219 
220  { FR_CONF_OFFSET("io_threads", rlm_sql_cassandra_t, io_threads) },
221  { FR_CONF_OFFSET("io_queue_size", rlm_sql_cassandra_t, io_queue_size) },
222 
223  { FR_CONF_OFFSET("event_queue_size", rlm_sql_cassandra_t, event_queue_size) },
224 
225  { FR_CONF_POINTER("load_balance_dc_aware", 0, CONF_FLAG_SUBSECTION, NULL), .subcs = (void const *) load_balance_dc_aware_config },
226  { FR_CONF_OFFSET("load_balance_round_robin", rlm_sql_cassandra_t, load_balance_round_robin), .dflt = "no" },
227 
228  { FR_CONF_OFFSET("token_aware_routing", rlm_sql_cassandra_t, token_aware_routing), .dflt = "yes" },
229  { FR_CONF_POINTER("latency_aware_routing", 0, CONF_FLAG_SUBSECTION, NULL), .subcs = (void const *) latency_aware_routing_config },
230 
231  { FR_CONF_OFFSET("tcp_keepalive", rlm_sql_cassandra_t, tcp_keepalive) },
232  { FR_CONF_OFFSET("tcp_nodelay", rlm_sql_cassandra_t, tcp_nodelay), .dflt = "no" },
233 
234  { FR_CONF_POINTER("tls", 0, CONF_FLAG_SUBSECTION | CONF_FLAG_OK_MISSING, NULL), .subcs = (void const *) tls_config },
236 };
237 
238 /** Log callback for libcassandra
239  *
240  * libcassandra seems to use this to log global events in the library, other messages
241  * like query errors are not logged here, and should be retrieved with functions like
242  * cass_future_error_message();
243  *
244  * Messages here do not need to be made available via sql_error.
245  *
246  * @param message Contains the log message and information about its source.
247  * @param data user data (not used).
248  */
249 static void _rlm_sql_cassandra_log(CassLogMessage const *message, UNUSED void *data)
250 {
251  switch (message->severity) {
252  case CASS_LOG_CRITICAL:
253  case CASS_LOG_ERROR:
254  if (DEBUG_ENABLED3) {
255  ERROR("%s[%d] %s: %s",
256  message->file, message->line, message->function, message->message);
257  } else {
258  ERROR("%s", message->message);
259  }
260  return;
261 
262  case CASS_LOG_WARN:
263  if (DEBUG_ENABLED3) {
264  WARN("%s[%d] %s: %s",
265  message->file, message->line, message->function, message->message);
266  } else {
267  WARN("%s", message->message);
268  }
269  return;
270 
271  case CASS_LOG_INFO:
272  case CASS_LOG_DISABLED:
273  case CASS_LOG_LAST_ENTRY:
274  if (DEBUG_ENABLED3) {
275  INFO("%s[%d] %s: %s",
276  message->file, message->line, message->function, message->message);
277  } else {
278  INFO("%s", message->message);
279  }
280  return;
281 
282  case CASS_LOG_DEBUG:
283  case CASS_LOG_TRACE:
284  default:
285  if (DEBUG_ENABLED3) {
286  DEBUG3("%s[%d] %s: %s",
287  message->file, message->line, message->function, message->message);
288  } else {
289  DEBUG2("%s", message->message);
290  }
291  return;
292  }
293 }
294 
295 /** Store the last error associated with a query
296  *
297  * @param ctx to allocate message in
298  * @param cass_query_ctx cassandra query context to write error message into.
299  * @param message from libcassandra.
300  * @param len of message.
301  */
302 static void sql_set_query_error(TALLOC_CTX *ctx, cassandra_query_ctx_t *cass_query_ctx, char const *message, size_t len)
303 {
304  talloc_const_free(cass_query_ctx->error.msg);
305  cass_query_ctx->error.msg = fr_asprint(ctx, message, len, '\0');
306  cass_query_ctx->error.type = L_ERR;
307 }
308 
309 
310 /** Store the last error associated with a query, using a format string
311  *
312  * @param ctx to allocate messate in.
313  * @param cass_query_ctx to replace log message in.
314  * @param fmt of message.
315  * @param ... args.
316  */
317 static void sql_set_query_error_printf(TALLOC_CTX *ctx, cassandra_query_ctx_t *cass_query_ctx, char const *fmt, ...)
318  CC_HINT(format (printf, 3, 4));
319 static void sql_set_query_error_printf(TALLOC_CTX *ctx, cassandra_query_ctx_t *cass_query_ctx, char const *fmt, ...)
320 {
321  va_list ap;
322 
323  talloc_const_free(cass_query_ctx->error.msg);
324  va_start(ap, fmt);
325  cass_query_ctx->error.msg = talloc_vasprintf(ctx, fmt, ap);
326  va_end(ap);
327  cass_query_ctx->error.type = L_ERR;
328 }
329 
331 {
332  rlm_sql_cassandra_conn_t *conn = talloc_get_type_abort(h, rlm_sql_cassandra_conn_t);
333 
334  DEBUG2("Socket destructor called, closing socket");
335 
336  if (conn->iterator) cass_iterator_free(conn->iterator);
337  talloc_free(h);
338 }
339 
340 CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function */
341 static connection_state_t _sql_connection_init(void **h, connection_t *conn, void *uctx)
342 {
344  rlm_sql_config_t const *config = &sql->config;
345  rlm_sql_cassandra_t *inst = talloc_get_type_abort(sql->driver_submodule->data, rlm_sql_cassandra_t);
347 
348  MEM(c = talloc_zero(conn, rlm_sql_cassandra_conn_t));
349  *c = (rlm_sql_cassandra_conn_t) {
350  .conn = conn,
351  .inst = inst,
352  .config = config,
353  .log_ctx = talloc_pool(c, 2048), /* Pre-allocate some memory for log messages */
354  .poll_interval = 1000
355  };
356 
357  *h = c;
358 
359  /*
360  * We do this one inside sql_socket_init, to allow pool.start = 0 to
361  * work as expected (allow the server to start if Cassandra is
362  * unavailable).
363  */
364  if (!inst->mutable->done_connect_keyspace) {
365  CassFuture *future;
366  CassError ret;
367 
368  pthread_mutex_lock(&inst->mutable->connect_mutex);
369  if (!inst->mutable->done_connect_keyspace) {
370  /*
371  * Easier to do this here instead of mod_instantiate
372  * as we don't have a pointer to the pool.
373  */
374  cass_cluster_set_connect_timeout(inst->cluster, fr_time_delta_to_msec(config->trunk_conf.conn_conf->connection_timeout));
375 
376  DEBUG2("Connecting to Cassandra cluster");
377  future = cass_session_connect_keyspace(inst->session, inst->cluster, config->sql_db);
378  ret = cass_future_error_code(future);
379  if (ret != CASS_OK) {
380  const char *msg;
381  size_t msg_len;
382 
383  cass_future_error_message(future, &msg, &msg_len);
384  ERROR("Unable to connect: [%x] %s", (int)ret, msg);
385  cass_future_free(future);
386  pthread_mutex_unlock(&inst->mutable->connect_mutex);
387 
389  }
390  cass_future_free(future);
391  inst->mutable->done_connect_keyspace = true;
392  }
393  pthread_mutex_unlock(&inst->mutable->connect_mutex);
394  }
395 
397 
399 }
400 
402 
403 CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function */
404 static void sql_trunk_request_mux(UNUSED fr_event_list_t *el, trunk_connection_t *tconn,
405  connection_t *conn, UNUSED void *uctx)
406 {
407  rlm_sql_cassandra_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_cassandra_conn_t);
408  rlm_sql_cassandra_t const *inst = sql_conn->inst;
409  request_t *request;
410  trunk_request_t *treq;
411  fr_sql_query_t *query_ctx;
412  CassStatement *statement;
413  cassandra_query_t *cass_query;
414  cassandra_query_ctx_t *cass_query_ctx;
415 
416  while (trunk_connection_pop_request(&treq, tconn) == 0) {
417  if (!treq) return;
418 
419  query_ctx = talloc_get_type_abort(treq->preq, fr_sql_query_t);
420  request = query_ctx->request;
421 
422  switch (query_ctx->status) {
423  case SQL_QUERY_PREPARED:
424  ROPTIONAL(RDEBUG2, DEBUG2, "Executing query: %s", query_ctx->query_str);
425  statement = cass_statement_new_n(query_ctx->query_str, talloc_array_length(query_ctx->query_str) - 1, 0);
426  if (inst->consistency_str) cass_statement_set_consistency(statement, inst->consistency);
427 
428  /*
429  * Allocate tracking and driver specific structures.
430  */
431  MEM(cass_query = talloc_zero(conn, cassandra_query_t));
432  MEM(cass_query_ctx = talloc_zero(query_ctx, cassandra_query_ctx_t));
433  query_ctx->uctx = cass_query_ctx;
434 
435  /*
436  * Executing the query returns a future which will later be polled for its result.
437  * Failures are not visible at this point.
438  */
439  cass_query->future = cass_session_execute(inst->session, statement);
440  cass_query->query_ctx = query_ctx;
441 
442  /*
443  * Insert the tracking structure into the list of outstanding queries.
444  */
445  fr_dlist_insert_tail(&sql_conn->queries, cass_query);
446  query_ctx->status = SQL_QUERY_SUBMITTED;
447  query_ctx->tconn = tconn;
449  return;
450 
451  default:
452  return;
453  }
454  }
455 }
456 
458 {
459  rlm_sql_cassandra_conn_t *c = talloc_get_type_abort(uctx, rlm_sql_cassandra_conn_t);
460  cassandra_query_t *cass_query, *next_query = NULL;
461  fr_sql_query_t *query_ctx;
462  cassandra_query_ctx_t *cass_query_ctx;
463  CassError ret;
464  request_t *request;
465  bool handled = false;
466 
467  next_query = fr_dlist_head(&c->queries);
468  while (next_query) {
469  /*
470  * If the future is not ready, move to the next.
471  */
472  if (!cass_future_ready(next_query->future)) goto next;
473 
474  cass_query = next_query;
475  next_query = fr_dlist_remove(&c->queries, cass_query);
476 
477  query_ctx = cass_query->query_ctx;
478  request = query_ctx->request;
479  handled = true;
480  cass_query_ctx = talloc_get_type_abort(query_ctx->uctx, cassandra_query_ctx_t);
481 
482  ret = cass_future_error_code(cass_query->future);
483  if (ret != CASS_OK) {
484  char const *error;
485  size_t len;
486 
487  cass_future_error_message(cass_query->future, &error, &len);
488  sql_set_query_error(c->log_ctx, cass_query_ctx, error, len);
489 
490  switch (ret) {
491  case CASS_ERROR_SERVER_SYNTAX_ERROR:
492  case CASS_ERROR_SERVER_INVALID_QUERY:
493  query_ctx->rcode = RLM_SQL_QUERY_INVALID;
494  break;
495 
496  default:
497  query_ctx->rcode = RLM_SQL_ERROR;
498  }
499  trunk_request_signal_fail(query_ctx->treq);
500  cass_future_free(cass_query->future);
501  talloc_free(cass_query);
502  goto next;
503  }
504 
505  query_ctx->rcode = RLM_SQL_OK;
506  cass_query_ctx->result = cass_future_get_result(cass_query->future);
507  cass_future_free(cass_query->future);
508  if (request) unlang_interpret_mark_runnable(request);
509  talloc_free(cass_query);
510  next:
511  next_query = fr_dlist_next(&c->queries, next_query);
512  }
513 
514  /*
515  * Adjust poll interval. The aim is to return results as fast as
516  * possible, but without over polling.
517  */
518  if (!handled) {
519  /*
520  * Nothing was ready - increase the interval
521  */
523  c->poll_count ++;
524  } else {
525  /*
526  * Results were immediately available - decrease the interval
527  */
528  if (c->poll_count == 0) {
529  c->poll_interval /= 2;
530  }
531  c->poll_count = 0;
532  }
533 
534  /*
535  * There are still outstanding queries, add another polling event
536  */
537  if (fr_dlist_num_elements(&c->queries)) {
540  ERROR("Unable to insert polling event");
541  }
542  }
543 }
544 
546 {
547  trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
548 
550 }
551 
552 /*
553  * While libcassandra support installing callbacks to handle futures being set,
554  * in testing, in the callback attempting to retrieve the error status of the
555  * future locked up. So we have to resort to polling.
556  *
557  * This "notify" callback sets up the appropriate polling events.
558  */
559 CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function */
560 static void sql_trunk_connection_notify(UNUSED trunk_connection_t *tconn, connection_t *conn, UNUSED fr_event_list_t *el,
561  trunk_connection_event_t notify_on, UNUSED void *uctx)
562 {
563  rlm_sql_cassandra_conn_t *c = talloc_get_type_abort(conn->h, rlm_sql_cassandra_conn_t);
564 
565  switch (notify_on) {
569  return;
570 
573  if (fr_dlist_num_elements(&c->queries)) {
576  ERROR("Unable to insert polling event");
577  }
578  }
579  if (notify_on == TRUNK_CONN_EVENT_READ) return;
580 
581  FALL_THROUGH;
582 
585  sql_trunk_connection_write_poll, tconn) < 0) {
586  ERROR("Unable to insert polling event");
587  }
588  return;
589  }
590 }
591 
594 
595 CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function */
596 static void sql_request_cancel(connection_t *conn, void *preq, trunk_cancel_reason_t reason,
597  UNUSED void *uctx)
598 {
599  fr_sql_query_t *query_ctx = talloc_get_type_abort(preq, fr_sql_query_t);
600  rlm_sql_cassandra_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_cassandra_conn_t);
601  cassandra_query_t *cass_query = NULL;
602 
603  if (!query_ctx->treq) return;
604  if (reason != TRUNK_CANCEL_REASON_SIGNAL) return;
605 
606  /*
607  * There is no query cancellation for Cassandra.
608  * So, remove the query from the list of outstanding queries so
609  * if it does return, we don't do anything.
610  */
611  while ((cass_query = fr_dlist_next(&sql_conn->queries, cass_query))) {
612  if (cass_query->query_ctx == query_ctx) {
613  fr_dlist_remove(&sql_conn->queries, cass_query);
614  cass_future_free(cass_query->future);
615  return;
616  }
617  }
618 }
619 
621 {
622  cassandra_query_ctx_t *cass_query_ctx = query_ctx->uctx;
623 
624  return cass_query_ctx->result ? cass_result_row_count(cass_query_ctx->result) : 0;
625 }
626 
627 static sql_rcode_t sql_fields(char const **out[], fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
628 {
629  cassandra_query_ctx_t *cass_query_ctx = query_ctx->uctx;
630  CassResult const *result = cass_query_ctx->result;
631 
632  unsigned int fields, i;
633  char const **names;
634 
635  fields = result ? cass_result_column_count(result) : 0;
636  if (fields == 0) return RLM_SQL_ERROR;
637 
638  MEM(names = talloc_array(query_ctx, char const *, fields));
639 
640  for (i = 0; i < fields; i++) {
641  const char *col_name;
642  size_t col_name_len;
643 
644  /* Writes out a pointer to a buffer in the result */
645  if (cass_result_column_name(result, i, &col_name, &col_name_len) != CASS_OK) {
646  col_name = "<INVALID>";
647  }
648  names[i] = col_name;
649  }
650 
651  *out = names;
652 
653  return RLM_SQL_OK;
654 }
655 
656 static unlang_action_t sql_fetch_row(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx)
657 {
658  fr_sql_query_t *query_ctx = talloc_get_type_abort(uctx, fr_sql_query_t);
659  rlm_sql_cassandra_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_cassandra_conn_t);
660  CassRow const *cass_row;
661  cassandra_query_ctx_t *cass_query_ctx = query_ctx->uctx;
662  CassResult const *result = cass_query_ctx->result;
663  int fields, i;
664  char **row;
665 
666 #define RLM_CASS_ERR_DATA_RETRIVE(_t) \
667 do {\
668  char const *_col_name;\
669  size_t _col_name_len;\
670  CassError _ret;\
671  if ((_ret = cass_result_column_name(result, i, &_col_name, &_col_name_len)) != CASS_OK) {\
672  _col_name = "<INVALID>";\
673  }\
674  sql_set_query_error_printf(conn->log_ctx, cass_query_ctx, "Failed to retrieve " _t " data at column %s (%d): %s", \
675  _col_name, i, cass_error_desc(_ret));\
676  TALLOC_FREE(query_ctx->row);\
677  query_ctx->rcode = RLM_SQL_ERROR;\
678  RETURN_MODULE_FAIL;\
679 } while(0)
680 
681  query_ctx->rcode = RLM_SQL_OK;
682  if (!result) RETURN_MODULE_OK; /* no result */
683 
684  /*
685  * Start of the result set, initialise the iterator.
686  */
687  if (!conn->iterator) conn->iterator = cass_iterator_from_result(result);
688  if (!conn->iterator) RETURN_MODULE_OK; /* no result */
689 
690  /*
691  * Free the previous result (also gets called on finish_query)
692  */
693  TALLOC_FREE(query_ctx->row);
694 
695  if (!cass_iterator_next(conn->iterator)) {
696  query_ctx->rcode = RLM_SQL_NO_MORE_ROWS; /* no more rows */
698  }
699 
700  cass_row = cass_iterator_get_row(conn->iterator); /* this shouldn't fail ? */
701  fields = cass_result_column_count(result); /* get the number of fields... */
702 
703  MEM(row = query_ctx->row = talloc_zero_array(query_ctx, char *, fields + 1));
704 
705  for (i = 0; i < fields; i++) {
706  CassValue const *value;
707  CassValueType type;
708 
709  value = cass_row_get_column(cass_row, i);
710 
711  if (cass_value_is_null(value) == cass_true) continue;
712 
713  type = cass_value_type(value);
714  switch (type) {
715  case CASS_VALUE_TYPE_ASCII:
716  case CASS_VALUE_TYPE_TEXT:
717  case CASS_VALUE_TYPE_VARCHAR:
718  {
719  const char *str;
720  size_t len;
721 
722  if (cass_value_get_string(value, &str, &len) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("string");
723 
724  MEM(row[i] = talloc_array(row, char, len + 1));
725  memcpy(row[i], str, len);
726  row[i][len] = '\0';
727  }
728  break;
729 
730  case CASS_VALUE_TYPE_BOOLEAN:
731  {
732  cass_bool_t bv;
733 
734  if (cass_value_get_bool(value, &bv) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("bool");
735 
736  MEM(row[i] = talloc_zero_array(row, char, 2));
737  row[i][0] = (bv == cass_false) ? '0' : '1';
738  }
739  break;
740 
741  case CASS_VALUE_TYPE_INT:
742  {
743  cass_int32_t i32v;
744 
745  if (cass_value_get_int32(value, &i32v) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("int32");
746 
747  MEM(row[i] = talloc_typed_asprintf(row, "%"PRId32, (int32_t)i32v));
748  }
749  break;
750 
751  case CASS_VALUE_TYPE_TIMESTAMP:
752  case CASS_VALUE_TYPE_BIGINT:
753  {
754  cass_int64_t i64v;
755 
756  if (cass_value_get_int64(value, &i64v) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("int64");
757 
758  MEM(row[i] = talloc_typed_asprintf(row, "%"PRId64, (int64_t)i64v));
759  }
760  break;
761 
762  case CASS_VALUE_TYPE_UUID:
763  case CASS_VALUE_TYPE_TIMEUUID:
764  {
765  CassUuid uuid;
766 
767  if (cass_value_get_uuid(value, &uuid) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("UUID");
768  MEM(row[i] = talloc_array(row, char, CASS_UUID_STRING_LENGTH));
769  cass_uuid_string(uuid, row[i]);
770  }
771  break;
772 
773  default:
774  {
775  const char *col_name;
776  size_t col_name_len;
777 
778  if (cass_result_column_name(result, i, &col_name,
779  &col_name_len) != CASS_OK) col_name = "<INVALID>";
780 
781  sql_set_query_error_printf(conn->log_ctx, cass_query_ctx,
782  "Failed to retrieve data at column %s (%d): Unsupported data type",
783  col_name, i);
784  talloc_free(query_ctx->row);
785  query_ctx->rcode = RLM_SQL_ERROR;
787  }
788  }
789  }
790 
792 }
793 
795 {
796  rlm_sql_cassandra_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_cassandra_conn_t);
797 
798  if (query_ctx->row) TALLOC_FREE(query_ctx->row);
799 
800  if (conn->iterator) {
801  cass_iterator_free(conn->iterator);
802  conn->iterator = NULL;
803  }
804 
805  if (query_ctx->uctx) {
806  CassResult const *result = query_ctx->uctx;
807  cass_result_free(result);
808  query_ctx->uctx = NULL;
809  }
810 
811  return RLM_SQL_OK;
812 }
813 
814 static size_t sql_error(UNUSED TALLOC_CTX *ctx, sql_log_entry_t out[], size_t outlen,
815  fr_sql_query_t *query_ctx)
816 {
817  cassandra_query_ctx_t *cass_query_ctx = talloc_get_type_abort(query_ctx->uctx, cassandra_query_ctx_t);
818 
819  if (cass_query_ctx->error.msg && (outlen >= 1)) {
820  out[0].msg = cass_query_ctx->error.msg;
821  out[0].type = cass_query_ctx->error.type;
822 
823  return 1;
824  }
825 
826  return 0;
827 }
828 
830 {
831  cassandra_query_ctx_t *cass_query_ctx = talloc_get_type_abort(query_ctx->uctx, cassandra_query_ctx_t);
832 
833  talloc_const_free(cass_query_ctx->error.msg);
834 
835  return sql_free_result(query_ctx, config);
836 }
837 
838 /*
839  * The cassandra model is different, as it's distributed, and does
840  * upserts instead of inserts...
841  *
842  * There's a good article on it here:
843  * http://planetcassandra.org/blog/how-to-do-an-upsert-in-cassandra/
844  */
846 {
847  return 1;
848 }
849 
850 static int mod_detach(module_detach_ctx_t const *mctx)
851 {
852  rlm_sql_cassandra_t *inst = talloc_get_type_abort(mctx->mi->data, rlm_sql_cassandra_t);
853 
854  if (inst->ssl) cass_ssl_free(inst->ssl);
855  if (inst->session) cass_session_free(inst->session); /* also synchronously closes the session */
856  if (inst->cluster) cass_cluster_free(inst->cluster);
857 
858  pthread_mutex_destroy(&inst->mutable->connect_mutex);
859  talloc_free(inst->mutable);
860 
861  return 0;
862 }
863 
864 static int mod_instantiate(module_inst_ctx_t const *mctx)
865 {
866  rlm_sql_t const *parent = talloc_get_type_abort(mctx->mi->parent->data, rlm_sql_t);
867  rlm_sql_config_t const *config = &parent->config;
868  rlm_sql_cassandra_t *inst = talloc_get_type_abort(mctx->mi->data, rlm_sql_cassandra_t);
869  bool do_tls = false;
870  bool do_latency_aware_routing = false;
871  CassCluster *cluster;
872  int ret;
873 
874 #define DO_CASS_OPTION(_opt, _x) \
875 do {\
876  CassError _ret;\
877  if ((_ret = (_x)) != CASS_OK) {\
878  ERROR("Error setting " _opt ": %s", cass_error_desc(_ret));\
879  return -1;\
880  }\
881 } while (0)
882 
883  MEM(inst->mutable = talloc_zero(NULL, rlm_sql_cassandra_mutable_t));
884  if ((ret = pthread_mutex_init(&inst->mutable->connect_mutex, NULL)) < 0) {
885  ERROR("Failed initializing mutex: %s", fr_syserror(ret));
886  TALLOC_FREE(inst);
887  return -1;
888  }
889 
890  /*
891  * This has to be done before we call cf_section_parse
892  * as it sets default values, and creates the section.
893  */
894  if (cf_section_find(mctx->mi->conf, "tls", NULL)) do_tls = true;
895  if (cf_section_find(mctx->mi->conf, "latency_aware_routing", NULL)) do_latency_aware_routing = true;
896 
897  DEBUG4("Configuring CassCluster structure");
898  cluster = inst->cluster = cass_cluster_new();
899  if (!cluster) return -1;
900 
901  /*
902  * Parameters inherited from the top level SQL module config
903  */
904  DO_CASS_OPTION("sql_server", cass_cluster_set_contact_points(cluster, config->sql_server));
905  if (config->sql_port) DO_CASS_OPTION("sql_port", cass_cluster_set_port(cluster, config->sql_port));
906  /* Can't fail */
907  if (fr_time_delta_ispos(config->query_timeout)) {
908  cass_cluster_set_request_timeout(cluster, fr_time_delta_to_msec(config->query_timeout));
909  }
910 
911  /* Can't fail */
912  if (config->sql_login && config->sql_password) cass_cluster_set_credentials(cluster, config->sql_login,
913  config->sql_password);
914 
915  /*
916  * inst specific parameters
917  */
918  if (inst->consistency_str) {
919  int consistency;
920 
921  consistency = fr_table_value_by_str(consistency_levels, inst->consistency_str, -1);
922  if (consistency < 0) {
923  ERROR("Invalid consistency level \"%s\"", inst->consistency_str);
924  return -1;
925  }
926  inst->consistency = (CassConsistency)consistency;
927  }
928 
929  if (inst->protocol_version) {
930  DO_CASS_OPTION("protocol_version",
931  cass_cluster_set_protocol_version(inst->cluster, inst->protocol_version));
932  }
933 
934  if (inst->connections_per_host) {
935  DO_CASS_OPTION("connections_per_host",
936  cass_cluster_set_core_connections_per_host(inst->cluster,
937  inst->connections_per_host));
938  }
939 
940  if (inst->event_queue_size) {
941  DO_CASS_OPTION("event_queue_size",
942  cass_cluster_set_num_threads_io(inst->cluster, inst->event_queue_size));
943  }
944 
945  if (inst->io_queue_size) {
946  DO_CASS_OPTION("io_queue_size",
947  cass_cluster_set_num_threads_io(inst->cluster, inst->io_queue_size));
948  }
949 
950  if (inst->io_threads) {
951  DO_CASS_OPTION("io_threads", cass_cluster_set_num_threads_io(inst->cluster, inst->io_threads));
952  }
953 
954  if (inst->load_balance_round_robin) cass_cluster_set_load_balance_round_robin(inst->cluster);
955 
956  cass_cluster_set_token_aware_routing(inst->cluster, inst->token_aware_routing);
957 
958  if (inst->lbdc_local_dc) {
959  DO_CASS_OPTION("load_balance_dc_aware",
960  cass_cluster_set_load_balance_dc_aware(inst->cluster,
961  inst->lbdc_local_dc,
962  inst->lbdc_hosts_per_remote_dc,
963  inst->lbdc_allow_remote_dcs_for_local_cl));
964  }
965 
966  if (do_latency_aware_routing) {
967  /* Can't fail */
968  cass_cluster_set_latency_aware_routing(inst->cluster, true);
969 
970  /* Can't fail */
971  cass_cluster_set_latency_aware_routing_settings(inst->cluster,
972  (cass_double_t)inst->lar_exclusion_threshold,
973  fr_time_delta_to_msec(inst->lar_scale),
974  fr_time_delta_to_msec(inst->lar_retry_period),
975  fr_time_delta_to_msec(inst->lar_update_rate),
976  inst->lar_min_measured);
977  }
978 
979  if (inst->tcp_keepalive) cass_cluster_set_tcp_keepalive(inst->cluster, true, inst->tcp_keepalive);
980  cass_cluster_set_tcp_nodelay(inst->cluster, inst->tcp_nodelay);
981 
982  if (do_tls) {
983  CassSsl *ssl;
984 
985  ssl = inst->ssl = cass_ssl_new();
986  if (!ssl) return -1;
987 
988  if (inst->tls_verify_cert_str) {
989  int verify_cert;
990 
991  verify_cert = fr_table_value_by_str(verify_cert_table, inst->tls_verify_cert_str, -1);
992  if (verify_cert < 0) {
993  ERROR("Invalid certificate validation type \"%s\", "
994  "must be one of 'yes', 'no', 'identity'", inst->tls_verify_cert_str);
995  return -1;
996  }
997  cass_ssl_set_verify_flags(ssl, verify_cert);
998  }
999 
1000  DEBUG2("Enabling TLS");
1001 
1002  if (inst->tls_ca_file) {
1003  DO_CASS_OPTION("ca_file", cass_ssl_add_trusted_cert(ssl, inst->tls_ca_file));
1004  }
1005 
1006  if (inst->tls_certificate_file) {
1007  DO_CASS_OPTION("certificate_file", cass_ssl_set_cert(ssl, inst->tls_certificate_file));
1008  }
1009 
1010  if (inst->tls_private_key_file) {
1011  DO_CASS_OPTION("private_key", cass_ssl_set_private_key(ssl, inst->tls_private_key_file,
1012  inst->tls_private_key_password));
1013  }
1014 
1015  cass_cluster_set_ssl(cluster, ssl);
1016  }
1017 
1018  inst->session = cass_session_new();
1019  if (!inst->session) return -1;
1020 
1021  return 0;
1022 }
1023 
1024 static int mod_load(void)
1025 {
1026  INFO("Built against libcassandra version %d.%d.%d%s",
1027  CASS_VERSION_MAJOR, CASS_VERSION_MINOR, CASS_VERSION_PATCH, CASS_VERSION_SUFFIX);
1028 
1029  /*
1030  * Setup logging callbacks (only needs to be done once)
1031  */
1032  cass_log_set_level(CASS_LOG_INFO);
1033  cass_log_set_callback(_rlm_sql_cassandra_log, NULL);
1034 
1035  return 0;
1036 }
1037 
1038 /* Exported to rlm_sql */
1041  .common = {
1042  .name = "sql_cassandra",
1043  .magic = MODULE_MAGIC_INIT,
1044  .inst_size = sizeof(rlm_sql_cassandra_t),
1045  .onload = mod_load,
1046  .config = driver_config,
1048  .detach = mod_detach
1049  },
1050  .flags = RLM_SQL_MULTI_QUERY_CONN,
1051  .sql_query_resume = sql_query_resume,
1052  .sql_select_query_resume = sql_query_resume,
1053  .sql_num_rows = sql_num_rows,
1054  .sql_affected_rows = sql_affected_rows,
1055  .sql_fields = sql_fields,
1056  .sql_fetch_row = sql_fetch_row,
1057  .sql_free_result = sql_free_result,
1058  .sql_error = sql_error,
1059  .sql_finish_query = sql_finish_query,
1060  .sql_finish_select_query = sql_finish_query,
1061  .uses_trunks = true,
1062  .trunk_io_funcs = {
1063  .connection_alloc = sql_trunk_connection_alloc,
1064  .connection_notify = sql_trunk_connection_notify,
1065  .request_mux = sql_trunk_request_mux,
1066  .request_cancel = sql_request_cancel,
1067  .request_fail = sql_request_fail
1068  }
1069 };
unlang_action_t
Returned by unlang_op_t calls, determine the next action of the interpreter.
Definition: action.h:35
va_end(args)
log_entry msg
Definition: acutest.h:794
static int const char * fmt
Definition: acutest.h:573
va_start(args, fmt)
#define L(_str)
Helper for initialising arrays of string literals.
Definition: build.h:207
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition: build.h:320
#define DIAG_ON(_x)
Definition: build.h:456
#define UNUSED
Definition: build.h:313
#define NUM_ELEMENTS(_t)
Definition: build.h:335
#define CONF_PARSER_TERMINATOR
Definition: cf_parse.h:627
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition: cf_parse.h:268
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
Definition: cf_parse.h:310
#define FR_CONF_OFFSET_FLAGS(_name, _flags, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition: cf_parse.h:256
@ CONF_FLAG_SECRET
Only print value if debug level >= 3.
Definition: cf_parse.h:409
@ CONF_FLAG_FILE_INPUT
File matching value must exist, and must be readable.
Definition: cf_parse.h:411
@ CONF_FLAG_OK_MISSING
OK if it's missing.
Definition: cf_parse.h:427
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Definition: cf_parse.h:399
Defines a CONF_PAIR to C data type mapping.
Definition: cf_parse.h:564
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
Definition: cf_util.c:1028
connection_state_t
Definition: connection.h:45
@ CONNECTION_STATE_FAILED
Connection has failed.
Definition: connection.h:54
@ CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
Definition: connection.h:52
next
Definition: dcursor.h:178
fr_dcursor_eval_t void const * uctx
Definition: dcursor.h:546
#define ERROR(fmt,...)
Definition: dhcpclient.c:41
Test enumeration values.
Definition: dict_test.h:92
#define MODULE_MAGIC_INIT
Stop people using different module/library/server versions together.
Definition: dl_module.h:63
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition: dlist.h:260
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Definition: dlist.h:555
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition: dlist.h:939
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition: dlist.h:486
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition: dlist.h:378
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition: dlist.h:638
Head of a doubly linked list.
Definition: dlist.h:51
Entry in a doubly linked list.
Definition: dlist.h:41
#define fr_event_timer_in(...)
Definition: event.h:255
void unlang_interpret_mark_runnable(request_t *request)
Mark a request as resumable.
Definition: interpret.c:1359
#define DEBUG3(_fmt,...)
Definition: log.h:266
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition: log.h:528
#define DEBUG4(_fmt,...)
Definition: log.h:267
#define DEBUG_ENABLED3
True if global debug level 1-3 messages are enabled.
Definition: log.h:259
talloc_free(reap)
int fr_event_timer_delete(fr_event_timer_t const **ev_p)
Delete a timer event from the event list.
Definition: event.c:1611
Stores all information relating to an event list.
Definition: event.c:411
A timer event.
Definition: event.c:102
@ L_ERR
Error message.
Definition: log.h:56
unsigned int uint32_t
Definition: merged_model.c:33
module_instance_t * mi
Module instance to detach.
Definition: module_ctx.h:57
module_instance_t * mi
Instance of the module being instantiated.
Definition: module_ctx.h:51
Temporary structure to hold arguments for detach calls.
Definition: module_ctx.h:56
Temporary structure to hold arguments for instantiation calls.
Definition: module_ctx.h:50
char * fr_asprint(TALLOC_CTX *ctx, char const *in, ssize_t inlen, char quote)
Escape string that may contain binary data, and write it to a new buffer.
Definition: print.c:428
static const conf_parser_t config[]
Definition: base.c:183
#define RDEBUG2(fmt,...)
Definition: radclient.h:54
#define DEBUG2(fmt,...)
Definition: radclient.h:43
#define WARN(fmt,...)
Definition: radclient.h:47
#define INFO(fmt,...)
Definition: radict.c:54
#define RETURN_MODULE_OK
Definition: rcode.h:57
rlm_rcode_t
Return codes indicating the result of the module call.
Definition: rcode.h:40
static int instantiate(module_inst_ctx_t const *mctx)
Definition: rlm_rest.c:1302
Prototypes and functions for the SQL module.
char const * msg
Log message.
Definition: rlm_sql.h:64
fr_log_type_t type
Type of log entry L_ERR, L_WARN, L_INFO, L_DBG etc.
Definition: rlm_sql.h:62
fr_sql_query_status_t status
Status of the query.
Definition: rlm_sql.h:146
trunk_connection_t * tconn
Trunk connection this query is being run on.
Definition: rlm_sql.h:142
#define RLM_SQL_MULTI_QUERY_CONN
Can support multiple queries on a single connection.
Definition: rlm_sql.h:174
void * uctx
Driver specific data.
Definition: rlm_sql.h:149
char const * query_str
Query string to run.
Definition: rlm_sql.h:144
request_t * request
Request this query relates to.
Definition: rlm_sql.h:139
sql_rcode_t
Action to take at end of an SQL query.
Definition: rlm_sql.h:44
@ RLM_SQL_QUERY_INVALID
Query syntax error.
Definition: rlm_sql.h:45
@ RLM_SQL_ERROR
General connection/server error.
Definition: rlm_sql.h:46
@ RLM_SQL_OK
Success.
Definition: rlm_sql.h:47
@ RLM_SQL_NO_MORE_ROWS
No more rows available.
Definition: rlm_sql.h:50
fr_time_delta_t query_timeout
How long to allow queries to run for.
Definition: rlm_sql.h:94
rlm_sql_row_t row
Row data from the last query.
Definition: rlm_sql.h:148
sql_rcode_t rcode
Result code.
Definition: rlm_sql.h:147
trunk_request_t * treq
Trunk request for this query.
Definition: rlm_sql.h:143
@ SQL_QUERY_SUBMITTED
Submitted for execution.
Definition: rlm_sql.h:130
@ SQL_QUERY_PREPARED
Ready to submit.
Definition: rlm_sql.h:129
Definition: rlm_sql.h:61
static int mod_detach(module_detach_ctx_t const *mctx)
static int mod_load(void)
static void sql_set_query_error_printf(TALLOC_CTX *ctx, cassandra_query_ctx_t *cass_query_ctx, char const *fmt,...))
Store the last error associated with a query, using a format string.
static sql_rcode_t sql_fields(char const **out[], fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
#define DO_CASS_OPTION(_opt, _x)
fr_event_timer_t const * write_ev
Polling event for sending queries.
char const * lbdc_local_dc
The primary data center to try first.
uint poll_count
How many consecutive polls had no available results.
sql_log_entry_t error
Most recent Cassandra error message for this query.
fr_dlist_head_t queries
Outstanding queries on this connection.
uint32_t connections_per_host
Number of connections to each server in each IO thread.
static void sql_trunk_connection_write_poll(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
uint32_t protocol_version
The protocol version.
fr_event_timer_t const * read_ev
Polling event for reading query results.
fr_time_delta_t lar_update_rate
The rate at which the best average latency is recomputed.
CassConsistency consistency
Level of consistency converted to a constant.
DIAG_OFF(strict-prototypes) typedef struct
static conf_parser_t load_balance_dc_aware_config[]
CassSsl * ssl
Connection's SSL context.
static conf_parser_t latency_aware_routing_config[]
CassSession * session
Cluster's connection pool.
static unlang_action_t sql_fetch_row(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx)
uint32_t lbdc_hosts_per_remote_dc
The number of host used in each remote DC if no hosts are available in the local dc.
static conf_parser_t tls_config[]
bool lbdc_allow_remote_dcs_for_local_cl
Allows remote hosts to be used if no local.
static int sql_num_rows(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
char const * tls_ca_file
Path to the CA used to validate the server's certificate.
static void sql_set_query_error(TALLOC_CTX *ctx, cassandra_query_ctx_t *cass_query_ctx, char const *message, size_t len)
Store the last error associated with a query.
static int sql_affected_rows(UNUSED fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
CassFuture * future
Future produced when submitting query.
CassCluster * cluster
Configuration of the cassandra cluster connection.
bool tcp_nodelay
Disable TCP naggle algorithm.
char const * tls_verify_cert_str
Whether we validate the cert provided by the server.
static size_t sql_error(UNUSED TALLOC_CTX *ctx, sql_log_entry_t out[], size_t outlen, fr_sql_query_t *query_ctx)
static void _sql_connection_close(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
static fr_table_num_sorted_t const consistency_levels[]
char const * tls_certificate_file
Public certificate we present to the server.
uint32_t event_queue_size
Sets the size of the the fixed size queue that stores events.
double lar_exclusion_threshold
How much worse the latency me be, compared to the average latency of the best performing node before ...
char const * tls_private_key_password
String to decrypt private key.
uint poll_interval
Interval between read polling.
static const conf_parser_t driver_config[]
#define RLM_CASS_ERR_DATA_RETRIVE(_t)
CassResult const * result
Cassandra result handle.
static size_t consistency_levels_len
fr_sql_query_t * query_ctx
SQL query ctx.
static sql_rcode_t sql_finish_query(fr_sql_query_t *query_ctx, rlm_sql_config_t const *config)
bool load_balance_round_robin
Enable round robin load balancing.
char const * consistency_str
Level of consistency required.
uint32_t io_queue_size
Size of the the fixed size queue that stores pending requests.
fr_dlist_t entry
Entry in list of outstanding queries.
fr_time_delta_t lar_scale
Weight given to older latencies when calculating the average latency of a node.
static size_t verify_cert_table_len
static sql_rcode_t sql_free_result(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
static fr_table_num_sorted_t const verify_cert_table[]
connection_t * conn
Generic connection structure for managing this handle.
rlm_sql_config_t const * config
SQL instance config.
bool token_aware_routing
Whether to use token aware routing.
CC_NO_UBSAN(function)
uint32_t tcp_keepalive
How often to send TCP keepalives.
rlm_sql_cassandra_mutable_t
CassIterator * iterator
Row set iterator.
char const * tls_private_key_file
Private key for the certificate we present to the server.
static void sql_trunk_connection_read_poll(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
TALLOC_CTX * log_ctx
Prevent unneeded memory allocation by keeping a permanent pool, to store log entries.
static void _rlm_sql_cassandra_log(CassLogMessage const *message, UNUSED void *data)
Log callback for libcassandra.
uint32_t io_threads
Number of IO threads.
static int mod_instantiate(module_inst_ctx_t const *mctx)
rlm_sql_cassandra_t const * inst
Module instance for this connection.
fr_time_delta_t lar_retry_period
The amount of time a node is penalized by the policy before being given a second chance when the curr...
rlm_sql_driver_t rlm_sql_cassandra
uint64_t lar_min_measured
The minimum number of measurements per-host required to be considered by the policy.
Driver specific data to attach to the query ctx.
Structure for tracking outstanding queries.
Cassandra cluster connection.
Cassandra driver instance.
static void sql_request_fail(request_t *request, void *preq, UNUSED void *rctx, UNUSED trunk_request_state_t state, UNUSED void *uctx)
Definition: rlm_sql_db2.c:301
Macros to reduce boilerplate in trunk SQL drivers.
#define SQL_QUERY_RESUME
Definition: rlm_sql_trunk.h:56
#define SQL_TRUNK_CONNECTION_ALLOC
Allocate an SQL trunk connection.
Definition: rlm_sql_trunk.h:35
#define SQL_QUERY_FAIL
Definition: rlm_sql_trunk.h:64
CONF_SECTION * conf
Module's instance configuration.
Definition: module.h:329
void * data
Module's instance data.
Definition: module.h:271
module_instance_t const * parent
Parent module's instance (if any).
Definition: module.h:337
RETURN_MODULE_FAIL
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
eap_aka_sim_process_conf_t * inst
fr_aka_sim_id_type_t type
module_t common
Common fields for all loadable modules.
Definition: rlm_sql.h:204
module_instance_t * driver_submodule
Driver's submodule.
Definition: rlm_sql.h:245
rlm_sql_config_t config
Definition: rlm_sql.h:238
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition: syserror.c:243
#define fr_table_value_by_str(_table, _name, _def)
Convert a string to a value using a sorted or ordered table.
Definition: table.h:653
An element in a lexicographically sorted array of name to num mappings.
Definition: table.h:49
char * talloc_typed_asprintf(TALLOC_CTX *ctx, char const *fmt,...)
Call talloc vasprintf, setting the type on the new chunk correctly.
Definition: talloc.c:492
#define talloc_get_type_abort_const
Definition: talloc.h:282
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition: talloc.h:224
static const char * names[8]
Definition: time.c:617
#define fr_time_delta_ispos(_a)
Definition: time.h:290
static int64_t fr_time_delta_to_usec(fr_time_delta_t delta)
Definition: time.h:632
static fr_time_delta_t fr_time_delta_from_usec(int64_t usec)
Definition: time.h:568
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
Definition: time.h:637
A time delta, a difference in time measured in nanoseconds.
Definition: time.h:80
"server local" time.
Definition: time.h:69
void trunk_request_signal_fail(trunk_request_t *treq)
Signal that a trunk request failed.
Definition: trunk.c:2120
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
Definition: trunk.c:3861
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
Definition: trunk.c:2043
void trunk_connection_signal_writable(trunk_connection_t *tconn)
Signal that a trunk connection is writable.
Definition: trunk.c:3881
Associates request queues with a connection.
Definition: trunk.c:131
Wraps a normal request.
Definition: trunk.c:97
trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
Definition: trunk.h:72
@ TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
Definition: trunk.h:79
@ TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
Definition: trunk.h:77
@ TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
Definition: trunk.h:73
@ TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
Definition: trunk.h:75
trunk_cancel_reason_t
Reasons for a request being cancelled.
Definition: trunk.h:55
@ TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
Definition: trunk.h:57
static fr_event_list_t * el
static fr_slen_t parent
Definition: pair.h:851
static fr_slen_t data
Definition: value.h:1265
int format(printf, 5, 0))
static size_t char ** out
Definition: value.h:997