The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
rlm_sql_cassandra.c
Go to the documentation of this file.
1/**
2 * This is free and unencumbered software released into the public domain.
3 *
4 * Anyone is free to copy, modify, publish, use, compile, sell, or
5 * distribute this software, either in source code form or as a compiled
6 * binary, for any purpose, commercial or non-commercial, and by any
7 * means.
8 *
9 * In jurisdictions that recognize copyright laws, the author or authors
10 * of this software dedicate any and all copyright interest in the
11 * software to the public domain. We make this dedication for the benefit
12 * of the public at large and to the detriment of our heirs and
13 * successors. We intend this dedication to be an overt act of
14 * relinquishment in perpetuity of all present and future rights to this
15 * software under copyright law.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
20 * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
21 * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
22 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
23 * OTHER DEALINGS IN THE SOFTWARE.
24 *
25 * For more information, please refer to <http://unlicense.org>
26 */
27
28/**
29 * $Id: cc2c807191b4c0653bd18c0f4b2b64165680a3a3 $
30 * @file rlm_sql_cassandra.c
31 * @brief Cassandra SQL driver
32 *
33 * @author Linnaea Von Lavia (le.concorde.4590@gmail.com)
34 * @author Arran Cudbard-Bell (a.cudbardb@freeradius.org)
35 */
36#define LOG_PREFIX "sql - cassandra"
37
38#include <freeradius-devel/server/base.h>
39#include <freeradius-devel/util/debug.h>
40
41#ifdef HAVE_WDOCUMENTATION
42DIAG_OFF(documentation)
43#endif
44DIAG_OFF(strict-prototypes) /* Seen with homebrew cassandra-cpp-driver 2.15.3 */
45#include <cassandra.h>
46DIAG_ON(strict-prototypes)
47#ifdef HAVE_WDOCUMENTATION
48DIAG_ON(documentation)
49#endif
50
51#include "rlm_sql.h"
52#include "rlm_sql_trunk.h"
53
54typedef struct {
55 bool done_connect_keyspace; //!< Whether we've connected to a keyspace.
56 pthread_mutex_t connect_mutex; //!< Mutex to prevent multiple connections attempting
57 ///< to connect to a keyspace concurrently.
59
60/** Cassandra driver instance
61 *
62 */
63typedef struct {
64 CassCluster *cluster; //!< Configuration of the cassandra cluster connection.
65 CassSession *session; //!< Cluster's connection pool.
66 CassSsl *ssl; //!< Connection's SSL context.
67 rlm_sql_cassandra_mutable_t *mutable; //!< Instance data which needs to change post instantiation.
68
69 /*
70 * Configuration options
71 */
72 char const *consistency_str; //!< Level of consistency required.
73 CassConsistency consistency; //!< Level of consistency converted to a constant.
74
75 uint32_t protocol_version; //!< The protocol version.
76
77 uint32_t connections_per_host; //!< Number of connections to each server in each
78 //!< IO thread.
79 uint32_t io_threads; //!< Number of IO threads.
80
81 uint32_t io_queue_size; //!< Size of the the fixed size queue that stores
82 //!< pending requests.
83
84 uint32_t event_queue_size; //!< Sets the size of the the fixed size queue
85 //!< that stores events.
86
87 bool load_balance_round_robin; //!< Enable round robin load balancing.
88
89 bool token_aware_routing; //!< Whether to use token aware routing.
90
91 char const *lbdc_local_dc; //!< The primary data center to try first.
92 uint32_t lbdc_hosts_per_remote_dc; //!< The number of host used in each remote DC if
93 //!< no hosts are available in the local dc
94
95 bool lbdc_allow_remote_dcs_for_local_cl; //!< Allows remote hosts to be used if no local
96 //!< dc hosts are available and the consistency level
97 //!< is LOCAL_ONE or LOCAL_QUORUM.
98
99 double lar_exclusion_threshold; //!< How much worse the latency me be, compared to
100 //!< the average latency of the best performing node
101 //!< before it's penalized.
102 //!< This gets mangled to a double.
103
104 fr_time_delta_t lar_scale; //!< Weight given to older latencies when calculating
105 //!< the average latency of a node. A bigger scale will
106 //!< give more weight to older latency measurements.
107
108 fr_time_delta_t lar_retry_period; //!< The amount of time a node is penalized by the
109 //!< policy before being given a second chance when
110 //!< the current average latency exceeds the calculated
111 //!< threshold
112 //!< (exclusion_threshold * best_average_latency).
113
114 fr_time_delta_t lar_update_rate; //!< The rate at which the best average latency is
115 //!< recomputed.
116 uint64_t lar_min_measured; //!< The minimum number of measurements per-host
117 //!< required to be considered by the policy.
118
119 uint32_t tcp_keepalive; //!< How often to send TCP keepalives.
120 bool tcp_nodelay; //!< Disable TCP naggle algorithm.
121
122 char const *tls_ca_file; //!< Path to the CA used to validate the server's
123 //!< certificate.
124 char const *tls_certificate_file; //!< Public certificate we present to the server.
125 char const *tls_private_key_file; //!< Private key for the certificate we present to the
126 //!< server.
127 char const *tls_private_key_password; //!< String to decrypt private key.
128 char const *tls_verify_cert_str; //!< Whether we validate the cert provided by the
129 //!< server.
131
132/** Cassandra cluster connection
133 *
134 */
135typedef struct {
136 connection_t *conn; //!< Generic connection structure for managing this handle.
137 rlm_sql_cassandra_t const *inst; //!< Module instance for this connection.
138 rlm_sql_config_t const *config; //!< SQL instance config
139 CassIterator *iterator; //!< Row set iterator.
140
141 TALLOC_CTX *log_ctx; //!< Prevent unneeded memory allocation by keeping a
142 //!< permanent pool, to store log entries.
143 fr_dlist_head_t queries; //!< Outstanding queries on this connection.
144 fr_event_timer_t const *read_ev; //!< Polling event for reading query results.
145 fr_event_timer_t const *write_ev; //!< Polling event for sending queries.
146 uint poll_interval; //!< Interval between read polling.
147 uint poll_count; //!< How many consecutive polls had no available results.
149
150/** Structure for tracking outstanding queries
151 *
152 */
153typedef struct {
154 fr_dlist_t entry; //!< Entry in list of outstanding queries.
155 fr_sql_query_t *query_ctx; //!< SQL query ctx.
156 CassFuture *future; //!< Future produced when submitting query.
158
159/** Driver specific data to attach to the query ctx
160 *
161 */
162typedef struct {
163 CassResult const *result; //!< Cassandra result handle
164 sql_log_entry_t error; //!< Most recent Cassandra error message for this query.
166
168 { L("all"), CASS_CONSISTENCY_ALL },
169 { L("any"), CASS_CONSISTENCY_ANY },
170 { L("each_quorum"), CASS_CONSISTENCY_EACH_QUORUM },
171 { L("local_one"), CASS_CONSISTENCY_LOCAL_ONE },
172 { L("local_quorum"), CASS_CONSISTENCY_LOCAL_QUORUM },
173 { L("one"), CASS_CONSISTENCY_ONE },
174 { L("quorum"), CASS_CONSISTENCY_QUORUM },
175 { L("three"), CASS_CONSISTENCY_THREE },
176 { L("two"), CASS_CONSISTENCY_TWO }
177};
179
181 { L("identity"), CASS_SSL_VERIFY_PEER_IDENTITY },
182 { L("no"), CASS_SSL_VERIFY_NONE },
183 { L("yes"), CASS_SSL_VERIFY_PEER_CERT }
184};
186
188 { FR_CONF_OFFSET("local_dc", rlm_sql_cassandra_t, lbdc_local_dc) },
189 { FR_CONF_OFFSET("hosts_per_remote_dc", rlm_sql_cassandra_t, lbdc_hosts_per_remote_dc), .dflt = "0" },
190 { FR_CONF_OFFSET("allow_remote_dcs_for_local_cl", rlm_sql_cassandra_t, lbdc_allow_remote_dcs_for_local_cl), .dflt = "no" },
192};
193
195 { FR_CONF_OFFSET("exclusion_threshold", rlm_sql_cassandra_t, lar_exclusion_threshold), .dflt = "2.0" },
196 { FR_CONF_OFFSET("scale", rlm_sql_cassandra_t, lar_scale), .dflt = "0.1" },
197 { FR_CONF_OFFSET("retry_period", rlm_sql_cassandra_t, lar_retry_period), .dflt = "10" },
198 { FR_CONF_OFFSET("update_rate", rlm_sql_cassandra_t, lar_update_rate), .dflt = "0.1" },
199 { FR_CONF_OFFSET("min_measured", rlm_sql_cassandra_t, lar_min_measured), .dflt = "50" },
201};
202
205 { FR_CONF_OFFSET_FLAGS("certificate_file", CONF_FLAG_FILE_INPUT, rlm_sql_cassandra_t, tls_certificate_file) },
206 { FR_CONF_OFFSET_FLAGS("private_key_file", CONF_FLAG_FILE_INPUT, rlm_sql_cassandra_t, tls_private_key_file) },
207 { FR_CONF_OFFSET_FLAGS("private_key_password", CONF_FLAG_SECRET, rlm_sql_cassandra_t, tls_private_key_password) },
208
209 { FR_CONF_OFFSET("verify_cert", rlm_sql_cassandra_t, tls_verify_cert_str) },
211};
212
213static const conf_parser_t driver_config[] = {
214 { FR_CONF_OFFSET("consistency", rlm_sql_cassandra_t, consistency_str), .dflt = "quorum" },
215
216 { FR_CONF_OFFSET("protocol_version", rlm_sql_cassandra_t, protocol_version) },
217
218 { FR_CONF_OFFSET("connections_per_host", rlm_sql_cassandra_t, connections_per_host) },
219
220 { FR_CONF_OFFSET("io_threads", rlm_sql_cassandra_t, io_threads) },
221 { FR_CONF_OFFSET("io_queue_size", rlm_sql_cassandra_t, io_queue_size) },
222
223 { FR_CONF_OFFSET("event_queue_size", rlm_sql_cassandra_t, event_queue_size) },
224
225 { FR_CONF_POINTER("load_balance_dc_aware", 0, CONF_FLAG_SUBSECTION, NULL), .subcs = (void const *) load_balance_dc_aware_config },
226 { FR_CONF_OFFSET("load_balance_round_robin", rlm_sql_cassandra_t, load_balance_round_robin), .dflt = "no" },
227
228 { FR_CONF_OFFSET("token_aware_routing", rlm_sql_cassandra_t, token_aware_routing), .dflt = "yes" },
229 { FR_CONF_POINTER("latency_aware_routing", 0, CONF_FLAG_SUBSECTION, NULL), .subcs = (void const *) latency_aware_routing_config },
230
231 { FR_CONF_OFFSET("tcp_keepalive", rlm_sql_cassandra_t, tcp_keepalive) },
232 { FR_CONF_OFFSET("tcp_nodelay", rlm_sql_cassandra_t, tcp_nodelay), .dflt = "no" },
233
234 { FR_CONF_POINTER("tls", 0, CONF_FLAG_SUBSECTION | CONF_FLAG_OK_MISSING, NULL), .subcs = (void const *) tls_config },
236};
237
238/** Log callback for libcassandra
239 *
240 * libcassandra seems to use this to log global events in the library, other messages
241 * like query errors are not logged here, and should be retrieved with functions like
242 * cass_future_error_message();
243 *
244 * Messages here do not need to be made available via sql_error.
245 *
246 * @param message Contains the log message and information about its source.
247 * @param data user data (not used).
248 */
249static void _rlm_sql_cassandra_log(CassLogMessage const *message, UNUSED void *data)
250{
251 switch (message->severity) {
252 case CASS_LOG_CRITICAL:
253 case CASS_LOG_ERROR:
254 if (DEBUG_ENABLED3) {
255 ERROR("%s[%d] %s: %s",
256 message->file, message->line, message->function, message->message);
257 } else {
258 ERROR("%s", message->message);
259 }
260 return;
261
262 case CASS_LOG_WARN:
263 if (DEBUG_ENABLED3) {
264 WARN("%s[%d] %s: %s",
265 message->file, message->line, message->function, message->message);
266 } else {
267 WARN("%s", message->message);
268 }
269 return;
270
271 case CASS_LOG_INFO:
272 case CASS_LOG_DISABLED:
273 case CASS_LOG_LAST_ENTRY:
274 if (DEBUG_ENABLED3) {
275 INFO("%s[%d] %s: %s",
276 message->file, message->line, message->function, message->message);
277 } else {
278 INFO("%s", message->message);
279 }
280 return;
281
282 case CASS_LOG_DEBUG:
283 case CASS_LOG_TRACE:
284 default:
285 if (DEBUG_ENABLED3) {
286 DEBUG3("%s[%d] %s: %s",
287 message->file, message->line, message->function, message->message);
288 } else {
289 DEBUG2("%s", message->message);
290 }
291 return;
292 }
293}
294
295/** Store the last error associated with a query
296 *
297 * @param ctx to allocate message in
298 * @param cass_query_ctx cassandra query context to write error message into.
299 * @param message from libcassandra.
300 * @param len of message.
301 */
302static void sql_set_query_error(TALLOC_CTX *ctx, cassandra_query_ctx_t *cass_query_ctx, char const *message, size_t len)
303{
304 talloc_const_free(cass_query_ctx->error.msg);
305 cass_query_ctx->error.msg = fr_asprint(ctx, message, len, '\0');
306 cass_query_ctx->error.type = L_ERR;
307}
308
309
310/** Store the last error associated with a query, using a format string
311 *
312 * @param ctx to allocate messate in.
313 * @param cass_query_ctx to replace log message in.
314 * @param fmt of message.
315 * @param ... args.
316 */
317static void sql_set_query_error_printf(TALLOC_CTX *ctx, cassandra_query_ctx_t *cass_query_ctx, char const *fmt, ...)
318 CC_HINT(format (printf, 3, 4));
319static void sql_set_query_error_printf(TALLOC_CTX *ctx, cassandra_query_ctx_t *cass_query_ctx, char const *fmt, ...)
320{
321 va_list ap;
322
323 talloc_const_free(cass_query_ctx->error.msg);
324 va_start(ap, fmt);
325 cass_query_ctx->error.msg = talloc_vasprintf(ctx, fmt, ap);
326 va_end(ap);
327 cass_query_ctx->error.type = L_ERR;
328}
329
330static void _sql_connection_close(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
331{
332 rlm_sql_cassandra_conn_t *conn = talloc_get_type_abort(h, rlm_sql_cassandra_conn_t);
333
334 DEBUG2("Socket destructor called, closing socket");
335
336 if (conn->iterator) cass_iterator_free(conn->iterator);
337 talloc_free(h);
338}
339
340CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function */
341static connection_state_t _sql_connection_init(void **h, connection_t *conn, void *uctx)
342{
344 rlm_sql_config_t const *config = &sql->config;
345 rlm_sql_cassandra_t *inst = talloc_get_type_abort(sql->driver_submodule->data, rlm_sql_cassandra_t);
347
348 MEM(c = talloc_zero(conn, rlm_sql_cassandra_conn_t));
350 .conn = conn,
351 .inst = inst,
352 .config = config,
353 .log_ctx = talloc_pool(c, 2048), /* Pre-allocate some memory for log messages */
354 .poll_interval = 1000
355 };
356
357 *h = c;
358
359 /*
360 * We do this one inside sql_socket_init, to allow pool.start = 0 to
361 * work as expected (allow the server to start if Cassandra is
362 * unavailable).
363 */
364 if (!inst->mutable->done_connect_keyspace) {
365 CassFuture *future;
366 CassError ret;
367
368 pthread_mutex_lock(&inst->mutable->connect_mutex);
369 if (!inst->mutable->done_connect_keyspace) {
370 /*
371 * Easier to do this here instead of mod_instantiate
372 * as we don't have a pointer to the pool.
373 */
374 cass_cluster_set_connect_timeout(inst->cluster, fr_time_delta_to_msec(config->trunk_conf.conn_conf->connection_timeout));
375
376 DEBUG2("Connecting to Cassandra cluster");
377 future = cass_session_connect_keyspace(inst->session, inst->cluster, config->sql_db);
378 ret = cass_future_error_code(future);
379 if (ret != CASS_OK) {
380 const char *msg;
381 size_t msg_len;
382
383 cass_future_error_message(future, &msg, &msg_len);
384 ERROR("Unable to connect: [%x] %s", (int)ret, msg);
385 cass_future_free(future);
386 pthread_mutex_unlock(&inst->mutable->connect_mutex);
387
389 }
390 cass_future_free(future);
391 inst->mutable->done_connect_keyspace = true;
392 }
393 pthread_mutex_unlock(&inst->mutable->connect_mutex);
394 }
395
397
399}
400
402
403CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function */
405 connection_t *conn, UNUSED void *uctx)
406{
407 rlm_sql_cassandra_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_cassandra_conn_t);
408 rlm_sql_cassandra_t const *inst = sql_conn->inst;
409 request_t *request;
410 trunk_request_t *treq;
411 fr_sql_query_t *query_ctx;
412 CassStatement *statement;
413 cassandra_query_t *cass_query;
414 cassandra_query_ctx_t *cass_query_ctx;
415
416 while (trunk_connection_pop_request(&treq, tconn) == 0) {
417 if (!treq) return;
418
419 query_ctx = talloc_get_type_abort(treq->preq, fr_sql_query_t);
420 request = query_ctx->request;
421
422 switch (query_ctx->status) {
424 ROPTIONAL(RDEBUG2, DEBUG2, "Executing query: %s", query_ctx->query_str);
425 statement = cass_statement_new_n(query_ctx->query_str, talloc_array_length(query_ctx->query_str) - 1, 0);
426 if (inst->consistency_str) cass_statement_set_consistency(statement, inst->consistency);
427
428 /*
429 * Allocate tracking and driver specific structures.
430 */
431 MEM(cass_query = talloc_zero(conn, cassandra_query_t));
432 MEM(cass_query_ctx = talloc_zero(query_ctx, cassandra_query_ctx_t));
433 query_ctx->uctx = cass_query_ctx;
434
435 /*
436 * Executing the query returns a future which will later be polled for its result.
437 * Failures are not visible at this point.
438 */
439 cass_query->future = cass_session_execute(inst->session, statement);
440 cass_query->query_ctx = query_ctx;
441 cass_statement_free(statement);
442
443 /*
444 * Insert the tracking structure into the list of outstanding queries.
445 */
446 fr_dlist_insert_tail(&sql_conn->queries, cass_query);
447 query_ctx->status = SQL_QUERY_SUBMITTED;
448 query_ctx->tconn = tconn;
450 return;
451
452 default:
453 return;
454 }
455 }
456}
457
459{
460 rlm_sql_cassandra_conn_t *c = talloc_get_type_abort(uctx, rlm_sql_cassandra_conn_t);
461 cassandra_query_t *cass_query, *next_query = NULL;
462 fr_sql_query_t *query_ctx;
463 cassandra_query_ctx_t *cass_query_ctx;
464 CassError ret;
465 request_t *request;
466 bool handled = false;
467
468 next_query = fr_dlist_head(&c->queries);
469 while (next_query) {
470 /*
471 * If the future is not ready, move to the next.
472 */
473 if (!cass_future_ready(next_query->future)) goto next;
474
475 cass_query = next_query;
476 next_query = fr_dlist_remove(&c->queries, cass_query);
477
478 query_ctx = cass_query->query_ctx;
479 request = query_ctx->request;
480 handled = true;
481 cass_query_ctx = talloc_get_type_abort(query_ctx->uctx, cassandra_query_ctx_t);
482
483 ret = cass_future_error_code(cass_query->future);
484 if (ret != CASS_OK) {
485 char const *error;
486 size_t len;
487
488 cass_future_error_message(cass_query->future, &error, &len);
489 sql_set_query_error(c->log_ctx, cass_query_ctx, error, len);
490
491 switch (ret) {
492 case CASS_ERROR_SERVER_SYNTAX_ERROR:
493 case CASS_ERROR_SERVER_INVALID_QUERY:
494 query_ctx->rcode = RLM_SQL_QUERY_INVALID;
495 break;
496
497 default:
498 query_ctx->rcode = RLM_SQL_ERROR;
499 }
501 cass_future_free(cass_query->future);
502 talloc_free(cass_query);
503 goto next;
504 }
505
506 query_ctx->rcode = RLM_SQL_OK;
507 cass_query_ctx->result = cass_future_get_result(cass_query->future);
508 cass_future_free(cass_query->future);
509 if (request) unlang_interpret_mark_runnable(request);
510 talloc_free(cass_query);
511 next:
512 next_query = fr_dlist_next(&c->queries, next_query);
513 }
514
515 /*
516 * Adjust poll interval. The aim is to return results as fast as
517 * possible, but without over polling.
518 */
519 if (!handled) {
520 /*
521 * Nothing was ready - increase the interval
522 */
524 c->poll_count ++;
525 } else {
526 /*
527 * Results were immediately available - decrease the interval
528 */
529 if (c->poll_count == 0) {
530 c->poll_interval /= 2;
531 }
532 c->poll_count = 0;
533 }
534
535 /*
536 * There are still outstanding queries, add another polling event
537 */
541 ERROR("Unable to insert polling event");
542 }
543 }
544}
545
547{
548 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
549
551}
552
553/*
554 * While libcassandra support installing callbacks to handle futures being set,
555 * in testing, in the callback attempting to retrieve the error status of the
556 * future locked up. So we have to resort to polling.
557 *
558 * This "notify" callback sets up the appropriate polling events.
559 */
560CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function */
562 trunk_connection_event_t notify_on, UNUSED void *uctx)
563{
564 rlm_sql_cassandra_conn_t *c = talloc_get_type_abort(conn->h, rlm_sql_cassandra_conn_t);
565
566 switch (notify_on) {
570 return;
571
577 ERROR("Unable to insert polling event");
578 }
579 }
580 if (notify_on == TRUNK_CONN_EVENT_READ) return;
581
583
587 ERROR("Unable to insert polling event");
588 }
589 return;
590 }
591}
592
595
596CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function */
597static void sql_request_cancel(connection_t *conn, void *preq, trunk_cancel_reason_t reason,
598 UNUSED void *uctx)
599{
600 fr_sql_query_t *query_ctx = talloc_get_type_abort(preq, fr_sql_query_t);
601 rlm_sql_cassandra_conn_t *sql_conn = talloc_get_type_abort(conn->h, rlm_sql_cassandra_conn_t);
602 cassandra_query_t *cass_query = NULL;
603
604 if (!query_ctx->treq) return;
605 if (reason != TRUNK_CANCEL_REASON_SIGNAL) return;
606
607 /*
608 * There is no query cancellation for Cassandra.
609 * So, remove the query from the list of outstanding queries so
610 * if it does return, we don't do anything.
611 */
612 while ((cass_query = fr_dlist_next(&sql_conn->queries, cass_query))) {
613 if (cass_query->query_ctx == query_ctx) {
614 fr_dlist_remove(&sql_conn->queries, cass_query);
615 cass_future_free(cass_query->future);
616 return;
617 }
618 }
619}
620
622{
623 cassandra_query_ctx_t *cass_query_ctx = query_ctx->uctx;
624
625 return cass_query_ctx->result ? cass_result_row_count(cass_query_ctx->result) : 0;
626}
627
628static sql_rcode_t sql_fields(char const **out[], fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
629{
630 cassandra_query_ctx_t *cass_query_ctx = query_ctx->uctx;
631 CassResult const *result = cass_query_ctx->result;
632
633 unsigned int fields, i;
634 char const **names;
635
636 fields = result ? cass_result_column_count(result) : 0;
637 if (fields == 0) return RLM_SQL_ERROR;
638
639 MEM(names = talloc_array(query_ctx, char const *, fields));
640
641 for (i = 0; i < fields; i++) {
642 const char *col_name;
643 size_t col_name_len;
644
645 /* Writes out a pointer to a buffer in the result */
646 if (cass_result_column_name(result, i, &col_name, &col_name_len) != CASS_OK) {
647 col_name = "<INVALID>";
648 }
649 names[i] = col_name;
650 }
651
652 *out = names;
653
654 return RLM_SQL_OK;
655}
656
657static unlang_action_t sql_fetch_row(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx)
658{
659 fr_sql_query_t *query_ctx = talloc_get_type_abort(uctx, fr_sql_query_t);
660 rlm_sql_cassandra_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_cassandra_conn_t);
661 CassRow const *cass_row;
662 cassandra_query_ctx_t *cass_query_ctx = query_ctx->uctx;
663 CassResult const *result = cass_query_ctx->result;
664 int fields, i;
665 char **row;
666
667#define RLM_CASS_ERR_DATA_RETRIVE(_t) \
668do {\
669 char const *_col_name;\
670 size_t _col_name_len;\
671 CassError _ret;\
672 if ((_ret = cass_result_column_name(result, i, &_col_name, &_col_name_len)) != CASS_OK) {\
673 _col_name = "<INVALID>";\
674 }\
675 sql_set_query_error_printf(conn->log_ctx, cass_query_ctx, "Failed to retrieve " _t " data at column %s (%d): %s", \
676 _col_name, i, cass_error_desc(_ret));\
677 TALLOC_FREE(query_ctx->row);\
678 query_ctx->rcode = RLM_SQL_ERROR;\
679 RETURN_MODULE_FAIL;\
680} while(0)
681
682 query_ctx->rcode = RLM_SQL_OK;
683 if (!result) RETURN_MODULE_OK; /* no result */
684
685 /*
686 * Start of the result set, initialise the iterator.
687 */
688 if (!conn->iterator) conn->iterator = cass_iterator_from_result(result);
689 if (!conn->iterator) RETURN_MODULE_OK; /* no result */
690
691 /*
692 * Free the previous result (also gets called on finish_query)
693 */
694 TALLOC_FREE(query_ctx->row);
695
696 if (!cass_iterator_next(conn->iterator)) {
697 query_ctx->rcode = RLM_SQL_NO_MORE_ROWS; /* no more rows */
699 }
700
701 cass_row = cass_iterator_get_row(conn->iterator); /* this shouldn't fail ? */
702 fields = cass_result_column_count(result); /* get the number of fields... */
703
704 MEM(row = query_ctx->row = talloc_zero_array(query_ctx, char *, fields + 1));
705
706 for (i = 0; i < fields; i++) {
707 CassValue const *value;
708 CassValueType type;
709
710 value = cass_row_get_column(cass_row, i);
711
712 if (cass_value_is_null(value) == cass_true) continue;
713
714 type = cass_value_type(value);
715 switch (type) {
716 case CASS_VALUE_TYPE_ASCII:
717 case CASS_VALUE_TYPE_TEXT:
718 case CASS_VALUE_TYPE_VARCHAR:
719 {
720 const char *str;
721 size_t len;
722
723 if (cass_value_get_string(value, &str, &len) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("string");
724
725 MEM(row[i] = talloc_array(row, char, len + 1));
726 memcpy(row[i], str, len);
727 row[i][len] = '\0';
728 }
729 break;
730
731 case CASS_VALUE_TYPE_BOOLEAN:
732 {
733 cass_bool_t bv;
734
735 if (cass_value_get_bool(value, &bv) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("bool");
736
737 MEM(row[i] = talloc_zero_array(row, char, 2));
738 row[i][0] = (bv == cass_false) ? '0' : '1';
739 }
740 break;
741
742 case CASS_VALUE_TYPE_INT:
743 {
744 cass_int32_t i32v;
745
746 if (cass_value_get_int32(value, &i32v) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("int32");
747
748 MEM(row[i] = talloc_typed_asprintf(row, "%"PRId32, (int32_t)i32v));
749 }
750 break;
751
752 case CASS_VALUE_TYPE_TIMESTAMP:
753 case CASS_VALUE_TYPE_BIGINT:
754 {
755 cass_int64_t i64v;
756
757 if (cass_value_get_int64(value, &i64v) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("int64");
758
759 MEM(row[i] = talloc_typed_asprintf(row, "%"PRId64, (int64_t)i64v));
760 }
761 break;
762
763 case CASS_VALUE_TYPE_UUID:
764 case CASS_VALUE_TYPE_TIMEUUID:
765 {
766 CassUuid uuid;
767
768 if (cass_value_get_uuid(value, &uuid) != CASS_OK) RLM_CASS_ERR_DATA_RETRIVE("UUID");
769 MEM(row[i] = talloc_array(row, char, CASS_UUID_STRING_LENGTH));
770 cass_uuid_string(uuid, row[i]);
771 }
772 break;
773
774 default:
775 {
776 const char *col_name;
777 size_t col_name_len;
778
779 if (cass_result_column_name(result, i, &col_name,
780 &col_name_len) != CASS_OK) col_name = "<INVALID>";
781
782 sql_set_query_error_printf(conn->log_ctx, cass_query_ctx,
783 "Failed to retrieve data at column %s (%d): Unsupported data type",
784 col_name, i);
785 talloc_free(query_ctx->row);
786 query_ctx->rcode = RLM_SQL_ERROR;
788 }
789 }
790 }
791
793}
794
796{
797 rlm_sql_cassandra_conn_t *conn = talloc_get_type_abort(query_ctx->tconn->conn->h, rlm_sql_cassandra_conn_t);
798
799 if (query_ctx->row) TALLOC_FREE(query_ctx->row);
800
801 if (conn->iterator) {
802 cass_iterator_free(conn->iterator);
803 conn->iterator = NULL;
804 }
805
806 if (query_ctx->uctx) {
807 cassandra_query_ctx_t *cass_query_ctx = talloc_get_type_abort(query_ctx->uctx, cassandra_query_ctx_t);
808 if (cass_query_ctx->result) cass_result_free(cass_query_ctx->result);
809 cass_query_ctx->result = NULL;
810 }
811
812 return RLM_SQL_OK;
813}
814
815static size_t sql_error(UNUSED TALLOC_CTX *ctx, sql_log_entry_t out[], size_t outlen,
816 fr_sql_query_t *query_ctx)
817{
818 cassandra_query_ctx_t *cass_query_ctx = talloc_get_type_abort(query_ctx->uctx, cassandra_query_ctx_t);
819
820 if (cass_query_ctx->error.msg && (outlen >= 1)) {
821 out[0].msg = cass_query_ctx->error.msg;
822 out[0].type = cass_query_ctx->error.type;
823
824 return 1;
825 }
826
827 return 0;
828}
829
831{
832 cassandra_query_ctx_t *cass_query_ctx = talloc_get_type_abort(query_ctx->uctx, cassandra_query_ctx_t);
833
834 cass_result_free(cass_query_ctx->result);
835 talloc_const_free(cass_query_ctx->error.msg);
836
837 return sql_free_result(query_ctx, config);
838}
839
840/*
841 * The cassandra model is different, as it's distributed, and does
842 * upserts instead of inserts...
843 *
844 * There's a good article on it here:
845 * http://planetcassandra.org/blog/how-to-do-an-upsert-in-cassandra/
846 */
848{
849 return 1;
850}
851
852static int mod_detach(module_detach_ctx_t const *mctx)
853{
854 rlm_sql_cassandra_t *inst = talloc_get_type_abort(mctx->mi->data, rlm_sql_cassandra_t);
855
856 if (inst->ssl) cass_ssl_free(inst->ssl);
857 if (inst->session) cass_session_free(inst->session); /* also synchronously closes the session */
858 if (inst->cluster) cass_cluster_free(inst->cluster);
859
860 pthread_mutex_destroy(&inst->mutable->connect_mutex);
861 talloc_free(inst->mutable);
862
863 return 0;
864}
865
866static int mod_instantiate(module_inst_ctx_t const *mctx)
867{
868 rlm_sql_t const *parent = talloc_get_type_abort(mctx->mi->parent->data, rlm_sql_t);
869 rlm_sql_config_t const *config = &parent->config;
870 rlm_sql_cassandra_t *inst = talloc_get_type_abort(mctx->mi->data, rlm_sql_cassandra_t);
871 bool do_tls = false;
872 bool do_latency_aware_routing = false;
873 CassCluster *cluster;
874 int ret;
875
876#define DO_CASS_OPTION(_opt, _x) \
877do {\
878 CassError _ret;\
879 if ((_ret = (_x)) != CASS_OK) {\
880 ERROR("Error setting " _opt ": %s", cass_error_desc(_ret));\
881 return -1;\
882 }\
883} while (0)
884
885 MEM(inst->mutable = talloc_zero(NULL, rlm_sql_cassandra_mutable_t));
886 if ((ret = pthread_mutex_init(&inst->mutable->connect_mutex, NULL)) < 0) {
887 ERROR("Failed initializing mutex: %s", fr_syserror(ret));
888 TALLOC_FREE(inst);
889 return -1;
890 }
891
892 /*
893 * This has to be done before we call cf_section_parse
894 * as it sets default values, and creates the section.
895 */
896 if (cf_section_find(mctx->mi->conf, "tls", NULL)) do_tls = true;
897 if (cf_section_find(mctx->mi->conf, "latency_aware_routing", NULL)) do_latency_aware_routing = true;
898
899 DEBUG4("Configuring CassCluster structure");
900 cluster = inst->cluster = cass_cluster_new();
901 if (!cluster) return -1;
902
903 /*
904 * Parameters inherited from the top level SQL module config
905 */
906 DO_CASS_OPTION("sql_server", cass_cluster_set_contact_points(cluster, config->sql_server));
907 if (config->sql_port) DO_CASS_OPTION("sql_port", cass_cluster_set_port(cluster, config->sql_port));
908 /* Can't fail */
909 if (fr_time_delta_ispos(config->query_timeout)) {
910 cass_cluster_set_request_timeout(cluster, fr_time_delta_to_msec(config->query_timeout));
911 }
912
913 /* Can't fail */
914 if (config->sql_login && config->sql_password) cass_cluster_set_credentials(cluster, config->sql_login,
915 config->sql_password);
916
917 /*
918 * inst specific parameters
919 */
920 if (inst->consistency_str) {
921 int consistency;
922
923 consistency = fr_table_value_by_str(consistency_levels, inst->consistency_str, -1);
924 if (consistency < 0) {
925 ERROR("Invalid consistency level \"%s\"", inst->consistency_str);
926 return -1;
927 }
928 inst->consistency = (CassConsistency)consistency;
929 }
930
931 if (inst->protocol_version) {
932 DO_CASS_OPTION("protocol_version",
933 cass_cluster_set_protocol_version(inst->cluster, inst->protocol_version));
934 }
935
936 if (inst->connections_per_host) {
937 DO_CASS_OPTION("connections_per_host",
938 cass_cluster_set_core_connections_per_host(inst->cluster,
939 inst->connections_per_host));
940 }
941
942 if (inst->event_queue_size) {
943 DO_CASS_OPTION("event_queue_size",
944 cass_cluster_set_num_threads_io(inst->cluster, inst->event_queue_size));
945 }
946
947 if (inst->io_queue_size) {
948 DO_CASS_OPTION("io_queue_size",
949 cass_cluster_set_num_threads_io(inst->cluster, inst->io_queue_size));
950 }
951
952 if (inst->io_threads) {
953 DO_CASS_OPTION("io_threads", cass_cluster_set_num_threads_io(inst->cluster, inst->io_threads));
954 }
955
956 if (inst->load_balance_round_robin) cass_cluster_set_load_balance_round_robin(inst->cluster);
957
958 cass_cluster_set_token_aware_routing(inst->cluster, inst->token_aware_routing);
959
960 if (inst->lbdc_local_dc) {
961 DO_CASS_OPTION("load_balance_dc_aware",
962 cass_cluster_set_load_balance_dc_aware(inst->cluster,
963 inst->lbdc_local_dc,
964 inst->lbdc_hosts_per_remote_dc,
965 inst->lbdc_allow_remote_dcs_for_local_cl));
966 }
967
968 if (do_latency_aware_routing) {
969 /* Can't fail */
970 cass_cluster_set_latency_aware_routing(inst->cluster, true);
971
972 /* Can't fail */
973 cass_cluster_set_latency_aware_routing_settings(inst->cluster,
974 (cass_double_t)inst->lar_exclusion_threshold,
975 fr_time_delta_to_msec(inst->lar_scale),
976 fr_time_delta_to_msec(inst->lar_retry_period),
977 fr_time_delta_to_msec(inst->lar_update_rate),
978 inst->lar_min_measured);
979 }
980
981 if (inst->tcp_keepalive) cass_cluster_set_tcp_keepalive(inst->cluster, true, inst->tcp_keepalive);
982 cass_cluster_set_tcp_nodelay(inst->cluster, inst->tcp_nodelay);
983
984 if (do_tls) {
985 CassSsl *ssl;
986
987 ssl = inst->ssl = cass_ssl_new();
988 if (!ssl) return -1;
989
990 if (inst->tls_verify_cert_str) {
991 int verify_cert;
992
993 verify_cert = fr_table_value_by_str(verify_cert_table, inst->tls_verify_cert_str, -1);
994 if (verify_cert < 0) {
995 ERROR("Invalid certificate validation type \"%s\", "
996 "must be one of 'yes', 'no', 'identity'", inst->tls_verify_cert_str);
997 return -1;
998 }
999 cass_ssl_set_verify_flags(ssl, verify_cert);
1000 }
1001
1002 DEBUG2("Enabling TLS");
1003
1004 if (inst->tls_ca_file) {
1005 DO_CASS_OPTION("ca_file", cass_ssl_add_trusted_cert(ssl, inst->tls_ca_file));
1006 }
1007
1008 if (inst->tls_certificate_file) {
1009 DO_CASS_OPTION("certificate_file", cass_ssl_set_cert(ssl, inst->tls_certificate_file));
1010 }
1011
1012 if (inst->tls_private_key_file) {
1013 DO_CASS_OPTION("private_key", cass_ssl_set_private_key(ssl, inst->tls_private_key_file,
1014 inst->tls_private_key_password));
1015 }
1016
1017 cass_cluster_set_ssl(cluster, ssl);
1018 }
1019
1020 inst->session = cass_session_new();
1021 if (!inst->session) return -1;
1022
1023 return 0;
1024}
1025
1026static int mod_load(void)
1027{
1028 INFO("Built against libcassandra version %d.%d.%d%s",
1029 CASS_VERSION_MAJOR, CASS_VERSION_MINOR, CASS_VERSION_PATCH, CASS_VERSION_SUFFIX);
1030
1031 /*
1032 * Setup logging callbacks (only needs to be done once)
1033 */
1034 cass_log_set_level(CASS_LOG_INFO);
1035 cass_log_set_callback(_rlm_sql_cassandra_log, NULL);
1036
1037 return 0;
1038}
1039
1040/* Exported to rlm_sql */
1043 .common = {
1044 .name = "sql_cassandra",
1045 .magic = MODULE_MAGIC_INIT,
1046 .inst_size = sizeof(rlm_sql_cassandra_t),
1047 .onload = mod_load,
1050 .detach = mod_detach
1051 },
1053 .sql_query_resume = sql_query_resume,
1054 .sql_select_query_resume = sql_query_resume,
1055 .sql_num_rows = sql_num_rows,
1056 .sql_affected_rows = sql_affected_rows,
1057 .sql_fields = sql_fields,
1058 .sql_fetch_row = sql_fetch_row,
1059 .sql_free_result = sql_free_result,
1060 .sql_error = sql_error,
1061 .sql_finish_query = sql_finish_query,
1062 .sql_finish_select_query = sql_finish_query,
1063 .trunk_io_funcs = {
1064 .connection_alloc = sql_trunk_connection_alloc,
1065 .connection_notify = sql_trunk_connection_notify,
1066 .request_mux = sql_trunk_request_mux,
1067 .request_cancel = sql_request_cancel,
1068 .request_fail = sql_request_fail
1069 }
1070};
unlang_action_t
Returned by unlang_op_t calls, determine the next action of the interpreter.
Definition action.h:35
va_end(args)
log_entry msg
Definition acutest.h:794
static int const char * fmt
Definition acutest.h:573
va_start(args, fmt)
#define L(_str)
Helper for initialising arrays of string literals.
Definition build.h:209
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition build.h:322
#define DIAG_ON(_x)
Definition build.h:458
#define CC_NO_UBSAN(_sanitize)
Definition build.h:426
#define UNUSED
Definition build.h:315
#define NUM_ELEMENTS(_t)
Definition build.h:337
#define DIAG_OFF(_x)
Definition build.h:457
#define CONF_PARSER_TERMINATOR
Definition cf_parse.h:642
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition cf_parse.h:268
conf_parser_flags_t flags
Flags which control parsing behaviour.
Definition cf_parse.h:585
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
Definition cf_parse.h:323
#define FR_CONF_OFFSET_FLAGS(_name, _flags, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition cf_parse.h:256
@ CONF_FLAG_SECRET
Only print value if debug level >= 3.
Definition cf_parse.h:422
@ CONF_FLAG_FILE_INPUT
File matching value must exist, and must be readable.
Definition cf_parse.h:424
@ CONF_FLAG_OK_MISSING
OK if it's missing.
Definition cf_parse.h:440
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Definition cf_parse.h:412
Defines a CONF_PAIR to C data type mapping.
Definition cf_parse.h:579
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
Definition cf_util.c:1028
connection_state_t
Definition connection.h:45
@ CONNECTION_STATE_FAILED
Connection has failed.
Definition connection.h:54
@ CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
Definition connection.h:52
#define MEM(x)
Definition debug.h:36
#define ERROR(fmt,...)
Definition dhcpclient.c:41
Test enumeration values.
Definition dict_test.h:92
#define MODULE_MAGIC_INIT
Stop people using different module/library/server versions together.
Definition dl_module.h:63
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:260
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition dlist.h:486
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition dlist.h:638
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:939
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:378
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Definition dlist.h:555
Head of a doubly linked list.
Definition dlist.h:51
Entry in a doubly linked list.
Definition dlist.h:41
#define fr_event_timer_in(...)
Definition event.h:255
void unlang_interpret_mark_runnable(request_t *request)
Mark a request as resumable.
Definition interpret.c:1359
#define DEBUG3(_fmt,...)
Definition log.h:266
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition log.h:528
#define DEBUG4(_fmt,...)
Definition log.h:267
#define DEBUG_ENABLED3
True if global debug level 1-3 messages are enabled.
Definition log.h:259
talloc_free(reap)
int fr_event_timer_delete(fr_event_timer_t const **ev_p)
Delete a timer event from the event list.
Definition event.c:1611
Stores all information relating to an event list.
Definition event.c:411
A timer event.
Definition event.c:102
@ L_ERR
Error message.
Definition log.h:56
unsigned int uint32_t
module_instance_t * mi
Module instance to detach.
Definition module_ctx.h:57
module_instance_t * mi
Instance of the module being instantiated.
Definition module_ctx.h:51
Temporary structure to hold arguments for detach calls.
Definition module_ctx.h:56
Temporary structure to hold arguments for instantiation calls.
Definition module_ctx.h:50
char * fr_asprint(TALLOC_CTX *ctx, char const *in, ssize_t inlen, char quote)
Escape string that may contain binary data, and write it to a new buffer.
Definition print.c:428
static const conf_parser_t config[]
Definition base.c:183
#define RDEBUG2(fmt,...)
Definition radclient.h:54
#define DEBUG2(fmt,...)
Definition radclient.h:43
#define WARN(fmt,...)
Definition radclient.h:47
#define INFO(fmt,...)
Definition radict.c:54
#define RETURN_MODULE_OK
Definition rcode.h:57
#define RETURN_MODULE_FAIL
Definition rcode.h:56
rlm_rcode_t
Return codes indicating the result of the module call.
Definition rcode.h:40
static int instantiate(module_inst_ctx_t const *mctx)
Definition rlm_rest.c:1310
Prototypes and functions for the SQL module.
char const * msg
Log message.
Definition rlm_sql.h:64
fr_log_type_t type
Type of log entry L_ERR, L_WARN, L_INFO, L_DBG etc.
Definition rlm_sql.h:62
fr_sql_query_status_t status
Status of the query.
Definition rlm_sql.h:138
trunk_connection_t * tconn
Trunk connection this query is being run on.
Definition rlm_sql.h:134
#define RLM_SQL_MULTI_QUERY_CONN
Can support multiple queries on a single connection.
Definition rlm_sql.h:166
void * uctx
Driver specific data.
Definition rlm_sql.h:141
char const * query_str
Query string to run.
Definition rlm_sql.h:136
request_t * request
Request this query relates to.
Definition rlm_sql.h:132
sql_rcode_t
Action to take at end of an SQL query.
Definition rlm_sql.h:44
@ RLM_SQL_QUERY_INVALID
Query syntax error.
Definition rlm_sql.h:45
@ RLM_SQL_ERROR
General connection/server error.
Definition rlm_sql.h:46
@ RLM_SQL_OK
Success.
Definition rlm_sql.h:47
@ RLM_SQL_NO_MORE_ROWS
No more rows available.
Definition rlm_sql.h:50
fr_time_delta_t query_timeout
How long to allow queries to run for.
Definition rlm_sql.h:94
rlm_sql_row_t row
Row data from the last query.
Definition rlm_sql.h:140
sql_rcode_t rcode
Result code.
Definition rlm_sql.h:139
trunk_request_t * treq
Trunk request for this query.
Definition rlm_sql.h:135
@ SQL_QUERY_SUBMITTED
Submitted for execution.
Definition rlm_sql.h:123
@ SQL_QUERY_PREPARED
Ready to submit.
Definition rlm_sql.h:122
Definition rlm_sql.h:61
static int mod_detach(module_detach_ctx_t const *mctx)
static int mod_load(void)
bool done_connect_keyspace
Whether we've connected to a keyspace.
static void sql_set_query_error_printf(TALLOC_CTX *ctx, cassandra_query_ctx_t *cass_query_ctx, char const *fmt,...))
Store the last error associated with a query, using a format string.
static sql_rcode_t sql_fields(char const **out[], fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
#define DO_CASS_OPTION(_opt, _x)
fr_event_timer_t const * write_ev
Polling event for sending queries.
char const * lbdc_local_dc
The primary data center to try first.
uint poll_count
How many consecutive polls had no available results.
sql_log_entry_t error
Most recent Cassandra error message for this query.
fr_dlist_head_t queries
Outstanding queries on this connection.
static SQL_TRUNK_CONNECTION_ALLOC void sql_trunk_request_mux(UNUSED fr_event_list_t *el, trunk_connection_t *tconn, connection_t *conn, UNUSED void *uctx)
uint32_t connections_per_host
Number of connections to each server in each IO thread.
static void sql_trunk_connection_write_poll(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
uint32_t protocol_version
The protocol version.
fr_event_timer_t const * read_ev
Polling event for reading query results.
fr_time_delta_t lar_update_rate
The rate at which the best average latency is recomputed.
pthread_mutex_t connect_mutex
Mutex to prevent multiple connections attempting to connect to a keyspace concurrently.
CassConsistency consistency
Level of consistency converted to a constant.
static conf_parser_t load_balance_dc_aware_config[]
CassSsl * ssl
Connection's SSL context.
static connection_state_t _sql_connection_init(void **h, connection_t *conn, void *uctx)
static conf_parser_t latency_aware_routing_config[]
CassSession * session
Cluster's connection pool.
static unlang_action_t sql_fetch_row(rlm_rcode_t *p_result, UNUSED int *priority, UNUSED request_t *request, void *uctx)
uint32_t lbdc_hosts_per_remote_dc
The number of host used in each remote DC if no hosts are available in the local dc.
static conf_parser_t tls_config[]
bool lbdc_allow_remote_dcs_for_local_cl
Allows remote hosts to be used if no local.
static int sql_num_rows(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
char const * tls_ca_file
Path to the CA used to validate the server's certificate.
static void sql_set_query_error(TALLOC_CTX *ctx, cassandra_query_ctx_t *cass_query_ctx, char const *message, size_t len)
Store the last error associated with a query.
static int sql_affected_rows(UNUSED fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
CassFuture * future
Future produced when submitting query.
CassCluster * cluster
Configuration of the cassandra cluster connection.
bool tcp_nodelay
Disable TCP naggle algorithm.
char const * tls_verify_cert_str
Whether we validate the cert provided by the server.
static size_t sql_error(UNUSED TALLOC_CTX *ctx, sql_log_entry_t out[], size_t outlen, fr_sql_query_t *query_ctx)
static void _sql_connection_close(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
static fr_table_num_sorted_t const consistency_levels[]
char const * tls_certificate_file
Public certificate we present to the server.
uint32_t event_queue_size
Sets the size of the the fixed size queue that stores events.
double lar_exclusion_threshold
How much worse the latency me be, compared to the average latency of the best performing node before ...
char const * tls_private_key_password
String to decrypt private key.
uint poll_interval
Interval between read polling.
static const conf_parser_t driver_config[]
#define RLM_CASS_ERR_DATA_RETRIVE(_t)
CassResult const * result
Cassandra result handle.
static size_t consistency_levels_len
fr_sql_query_t * query_ctx
SQL query ctx.
static sql_rcode_t sql_finish_query(fr_sql_query_t *query_ctx, rlm_sql_config_t const *config)
bool load_balance_round_robin
Enable round robin load balancing.
char const * consistency_str
Level of consistency required.
uint32_t io_queue_size
Size of the the fixed size queue that stores pending requests.
fr_dlist_t entry
Entry in list of outstanding queries.
fr_time_delta_t lar_scale
Weight given to older latencies when calculating the average latency of a node.
static size_t verify_cert_table_len
static sql_rcode_t sql_free_result(fr_sql_query_t *query_ctx, UNUSED rlm_sql_config_t const *config)
static fr_table_num_sorted_t const verify_cert_table[]
connection_t * conn
Generic connection structure for managing this handle.
rlm_sql_config_t const * config
SQL instance config.
bool token_aware_routing
Whether to use token aware routing.
uint32_t tcp_keepalive
How often to send TCP keepalives.
static void sql_trunk_connection_notify(UNUSED trunk_connection_t *tconn, connection_t *conn, UNUSED fr_event_list_t *el, trunk_connection_event_t notify_on, UNUSED void *uctx)
CassIterator * iterator
Row set iterator.
char const * tls_private_key_file
Private key for the certificate we present to the server.
static void sql_trunk_connection_read_poll(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
TALLOC_CTX * log_ctx
Prevent unneeded memory allocation by keeping a permanent pool, to store log entries.
static void _rlm_sql_cassandra_log(CassLogMessage const *message, UNUSED void *data)
Log callback for libcassandra.
uint32_t io_threads
Number of IO threads.
static int mod_instantiate(module_inst_ctx_t const *mctx)
rlm_sql_cassandra_t const * inst
Module instance for this connection.
fr_time_delta_t lar_retry_period
The amount of time a node is penalized by the policy before being given a second chance when the curr...
rlm_sql_driver_t rlm_sql_cassandra
uint64_t lar_min_measured
The minimum number of measurements per-host required to be considered by the policy.
SQL_QUERY_RESUME static SQL_QUERY_FAIL void sql_request_cancel(connection_t *conn, void *preq, trunk_cancel_reason_t reason, UNUSED void *uctx)
Driver specific data to attach to the query ctx.
Structure for tracking outstanding queries.
Cassandra cluster connection.
Cassandra driver instance.
static void sql_request_fail(request_t *request, void *preq, UNUSED void *rctx, UNUSED trunk_request_state_t state, UNUSED void *uctx)
Macros to reduce boilerplate in trunk SQL drivers.
#define SQL_QUERY_RESUME
#define SQL_TRUNK_CONNECTION_ALLOC
Allocate an SQL trunk connection.
#define SQL_QUERY_FAIL
CONF_SECTION * conf
Module's instance configuration.
Definition module.h:329
size_t inst_size
Size of the module's instance data.
Definition module.h:203
void * data
Module's instance data.
Definition module.h:271
module_instance_t const * parent
Parent module's instance (if any).
Definition module.h:337
eap_aka_sim_process_conf_t * inst
fr_aka_sim_id_type_t type
module_t common
Common fields for all loadable modules.
Definition rlm_sql.h:194
module_instance_t * driver_submodule
Driver's submodule.
Definition rlm_sql.h:227
rlm_sql_config_t config
Definition rlm_sql.h:221
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
#define fr_table_value_by_str(_table, _name, _def)
Convert a string to a value using a sorted or ordered table.
Definition table.h:653
An element in a lexicographically sorted array of name to num mappings.
Definition table.h:49
char * talloc_typed_asprintf(TALLOC_CTX *ctx, char const *fmt,...)
Call talloc vasprintf, setting the type on the new chunk correctly.
Definition talloc.c:492
#define talloc_get_type_abort_const
Definition talloc.h:282
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition talloc.h:224
static const char * names[8]
Definition time.c:621
#define fr_time_delta_ispos(_a)
Definition time.h:290
static int64_t fr_time_delta_to_usec(fr_time_delta_t delta)
Definition time.h:632
static fr_time_delta_t fr_time_delta_from_usec(int64_t usec)
Definition time.h:568
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
Definition time.h:637
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
void trunk_request_signal_fail(trunk_request_t *treq)
Signal that a trunk request failed.
Definition trunk.c:2132
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
Definition trunk.c:3883
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
Definition trunk.c:2050
void trunk_connection_signal_writable(trunk_connection_t *tconn)
Signal that a trunk connection is writable.
Definition trunk.c:3903
Associates request queues with a connection.
Definition trunk.c:134
Wraps a normal request.
Definition trunk.c:100
trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
Definition trunk.h:72
@ TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
Definition trunk.h:79
@ TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
Definition trunk.h:77
@ TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
Definition trunk.h:73
@ TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
Definition trunk.h:75
trunk_cancel_reason_t
Reasons for a request being cancelled.
Definition trunk.h:55
@ TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
Definition trunk.h:57
static fr_event_list_t * el
static fr_slen_t parent
Definition pair.h:851
static fr_slen_t data
Definition value.h:1265
static size_t char ** out
Definition value.h:997