25 #include <freeradius-devel/kafka/base.h>
26 #include <freeradius-devel/server/cf_parse.h>
27 #include <freeradius-devel/server/tmpl.h>
28 #include <freeradius-devel/util/value.h>
29 #include <freeradius-devel/util/sbuff.h>
30 #include <freeradius-devel/util/size.h>
37 rd_kafka_topic_conf_t *
conf;
62 rd_kafka_conf_destroy(kc->
conf);
66 static inline CC_HINT(always_inline)
77 MEM(kc->
conf = rd_kafka_conf_new());
92 rd_kafka_topic_conf_destroy(ktc->
conf);
96 static inline CC_HINT(always_inline)
107 MEM(ktc->
conf = rd_kafka_topic_conf_new());
130 char tmp[
sizeof(
"18446744073709551615b")];
140 size_t mapping_len = *kctx->mapping_len;
176 if (kctx->size_scale) size *= kctx->size_scale;
213 size_t buff_len =
sizeof(
buff);
218 rd_kafka_conf_res_t ret;
223 if ((ret = rd_kafka_conf_get(kc->
conf, kctx->
property,
buff, &buff_len)) != RD_KAFKA_CONF_OK) {
224 if (ret == RD_KAFKA_CONF_UNKNOWN) {
249 .
name = __FUNCTION__,
268 fr_sbuff_set_to_start(&value_elem);
295 size_t buff_len =
sizeof(
buff);
300 rd_kafka_conf_res_t ret;
305 if ((ret = rd_kafka_topic_conf_get(ktc->
conf, kctx->
property,
buff, &buff_len)) != RD_KAFKA_CONF_OK) {
306 if (ret == RD_KAFKA_CONF_UNKNOWN) {
335 static _Thread_local
char buff[
sizeof(
"18446744073709551615")];
383 size_t size = vb.vb_size;
405 *
out = vb.vb_bool ?
"true" :
"false";
456 if (
count <= 1)
goto do_single;
459 for (cp_p = cp, i = 0;
468 if (slen < 0)
return -1;
480 value, errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
518 value, errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
543 rd_kafka_topic_new (rd_kafka_t *rk,
const char *topic, rd_kafka_topic_conf_t *
conf)
549 .uctx = &(
fr_kafka_conf_ctx_t){ .property =
"sasl.oauthbearer.config", .empty_default =
true }},
580 .uctx = &(
fr_kafka_conf_ctx_t){ .property =
"sasl.kerberos.kinit.keytab", .empty_default =
true }},
618 {
L(
"false"),
"none" },
620 {
L(
"true"),
"https" },
621 {
L(
"yes"),
"https" }
662 .uctx = &(
fr_kafka_conf_ctx_t){ .property =
"ssl.certificate.location", .empty_default =
true }},
824 .uctx = &(
fr_kafka_conf_ctx_t){ .property =
"topic.blacklist", .string_sep =
",", .empty_default =
true }},
829 #define BASE_CONFIG \
830 { FR_CONF_FUNC("server", FR_TYPE_STRING, CONF_FLAG_REQUIRED | CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
831 .uctx = &(fr_kafka_conf_ctx_t){ .property = "metadata.broker.list", .string_sep = "," }}, \
832 { FR_CONF_FUNC("client_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
833 .uctx = &(fr_kafka_conf_ctx_t){ .property = "client.id" }}, \
834 { FR_CONF_FUNC("rack_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
835 .uctx = &(fr_kafka_conf_ctx_t){ .property = "client.rack" }}, \
836 { FR_CONF_FUNC("request_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
837 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.max.bytes" }}, \
838 { FR_CONF_FUNC("request_copy_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
839 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.copy.max.bytes" }}, \
840 { FR_CONF_FUNC("response_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
841 .uctx = &(fr_kafka_conf_ctx_t){ .property = "receive.message.max.bytes" }}, \
842 { FR_CONF_FUNC("feature", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
843 .uctx = &(fr_kafka_conf_ctx_t){ .property = "builtin.features", .string_sep = "," }}, \
844 { FR_CONF_FUNC("debug", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
845 .uctx = &(fr_kafka_conf_ctx_t){ .property = "debug", .string_sep = "," }}, \
846 { FR_CONF_FUNC("plugin", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, NULL), \
847 .uctx = &(fr_kafka_conf_ctx_t){ .property = "plugin.library.paths", .string_sep = ";" }}, \
848 { FR_CONF_SUBSECTION_GLOBAL("metadata", 0, kafka_metadata_config) }, \
849 { FR_CONF_SUBSECTION_GLOBAL("version", 0, kafka_version_config) }, \
850 { FR_CONF_SUBSECTION_GLOBAL("connection", 0, kafka_connection_config) }, \
851 { FR_CONF_SUBSECTION_GLOBAL("tls", 0, kafka_tls_config) }, \
852 { FR_CONF_SUBSECTION_GLOBAL("sasl", 0, kafka_sasl_config) }
969 .uctx = &(
fr_kafka_conf_ctx_t){ .property =
"queued.max.messages.kbytes", .size_scale = 1024 }},
1032 CONF_PARSER_TERMINATOR
1135 .uctx = &(
fr_kafka_conf_ctx_t){ .property =
"queue.buffering.max.kbytes", .size_scale = 1024 }},
1182 CONF_PARSER_TERMINATOR
#define L(_str)
Helper for initialising arrays of string literals.
int cf_pair_to_value_box(TALLOC_CTX *ctx, fr_value_box_t *out, CONF_PAIR *cp, conf_parser_t const *rule)
Parses a CONF_PAIR into a boxed value.
#define CONF_PARSER_TERMINATOR
void const * uctx
User data accessible by the cf_parse_t func.
#define FR_CONF_SUBSECTION_GLOBAL(_name, _flags, _subcs)
conf_parser_t entry which runs conf_parser_t entries for a subsection without any output
fr_type_t type
An fr_type_t value, controls the output type.
#define fr_rule_multi(_rule)
char const * name1
Name of the CONF_ITEM to parse.
@ CONF_FLAG_MULTI
CONF_PAIR can have multiple copies.
@ CONF_FLAG_SECRET
Only print value if debug level >= 3.
@ CONF_FLAG_FILE_INPUT
File matching value must exist, and must be readable.
#define FR_CONF_FUNC(_name, _type, _flags, _func, _dflt_func)
conf_parser_t entry which doesn't fill in a pointer or offset, but relies on functions to record valu...
Defines a CONF_PAIR to C data type mapping.
Internal data that is associated with a configuration section.
Common header for all CONF_* types.
Configuration AVP similar to a fr_pair_t.
A section grouping multiple CONF_PAIR.
CONF_PAIR * cf_item_to_pair(CONF_ITEM const *ci)
Cast a CONF_ITEM to a CONF_PAIR.
unsigned int cf_pair_count(CONF_SECTION const *cs, char const *attr)
Count the number of times an attribute occurs in a parent section.
char const * cf_pair_value(CONF_PAIR const *pair)
Return the value of a CONF_PAIR.
void cf_pair_mark_parsed(CONF_PAIR *cp)
Mark a pair as parsed.
void * cf_data_value(CONF_DATA const *cd)
Return the user assigned value of CONF_DATA.
CONF_SECTION * cf_item_to_section(CONF_ITEM const *ci)
Cast a CONF_ITEM to a CONF_SECTION.
CONF_PAIR * cf_pair_find_next(CONF_SECTION const *cs, CONF_PAIR const *prev, char const *attr)
Find a pair with a name matching attr, after specified pair.
CONF_PAIR * cf_pair_alloc(CONF_SECTION *parent, char const *attr, char const *value, fr_token_t op, fr_token_t lhs_quote, fr_token_t rhs_quote)
Allocate a CONF_PAIR.
#define cf_log_err(_cf, _fmt,...)
#define cf_data_add(_cf, _data, _name, _free)
#define cf_data_find(_cf, _type, _name)
#define cf_log_perr(_cf, _fmt,...)
#define cf_log_debug(_cf, _fmt,...)
static conf_parser_t const kafka_connection_config[]
static conf_parser_t const kafka_base_consumer_topics_config[]
static int kafka_topic_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
Return the default value for a topic from the kafka client library.
static size_t kafka_check_cert_cn_table_len
size_t size_scale
Divide/multiply FR_TYPE_SIZE by this amount.
static conf_parser_t const kafka_base_consumer_topic_config[]
rd_kafka_topic_conf_t * conf
static fr_kafka_topic_conf_t * kafka_topic_conf_from_cs(CONF_SECTION *cs)
static int kafka_config_parse(TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Translate config items directly to settings in a kafka config struct.
conf_parser_t const kafka_base_consumer_config[]
char const * property
Kafka configuration property.
static int _kafka_topic_conf_free(fr_kafka_topic_conf_t *ktc)
Destroy a kafka topic configuration handle.
static conf_parser_t const kafka_sasl_oauth_config[]
static fr_table_ptr_sorted_t kafka_check_cert_cn_table[]
static conf_parser_t const kafka_version_config[]
char const * string_sep
Used for multi-value configuration items.
static conf_parser_t const kafka_consumer_group_config[]
static conf_parser_t const kafka_tls_config[]
static int kafka_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
Return the default value from the kafka client library.
static conf_parser_t const kafka_base_producer_topics_config[]
static int kafka_topic_config_parse(UNUSED TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Translate config items directly to settings in a kafka topic config struct.
bool empty_default
Don't produce messages saying the default is missing.
conf_parser_t const kafka_base_producer_config[]
static int _kafka_conf_free(fr_kafka_conf_t *kc)
Destroy a kafka configuration handle.
static conf_parser_t const kafka_metadata_config[]
size_t * mapping_len
Length of the mapping tables.
fr_table_ptr_sorted_t * mapping
Mapping table between string constant.
static conf_parser_t const kafka_sasl_kerberos_config[]
static conf_parser_t const kafka_base_producer_topic_config[]
static int kafka_config_parse_single(char const **out, CONF_PAIR *cp, conf_parser_t const *rule)
static int kafka_config_dflt_single(CONF_PAIR **out, UNUSED void *parent, CONF_SECTION *cs, char const *value, fr_token_t quote, conf_parser_t const *rule)
Perform any conversions necessary to map kafka defaults to our values.
static fr_kafka_conf_t * kafka_conf_from_cs(CONF_SECTION *cs)
static conf_parser_t const kafka_sasl_config[]
size_t fr_sbuff_out_unescape_until(fr_sbuff_t *out, fr_sbuff_t *in, size_t len, fr_sbuff_term_t const *tt, fr_sbuff_unescape_rules_t const *u_rules)
@ FR_TYPE_TIME_DELTA
A period of time measured in nanoseconds.
@ FR_TYPE_INT8
8 Bit signed integer.
@ FR_TYPE_STRING
String of printable characters.
@ FR_TYPE_INT16
16 Bit signed integer.
@ FR_TYPE_UINT32
32 Bit unsigned integer.
@ FR_TYPE_UINT64
64 Bit unsigned integer.
@ FR_TYPE_BOOL
A truth value.
@ FR_TYPE_SIZE
Unsigned integer capable of representing any memory address on the local system.
static size_t array[MY_ARRAY_SIZE]
ssize_t fr_sbuff_in_sprintf(fr_sbuff_t *sbuff, char const *fmt,...)
Print using a fmt string to an sbuff.
#define fr_sbuff_start(_sbuff_or_marker)
#define FR_SBUFF_IN(_start, _len_or_end)
char const * name
Name for rule set to aid we debugging.
#define fr_sbuff_advance(_sbuff_or_marker, _len)
#define FR_SBUFF_OUT(_start, _len_or_end)
char subs[UINT8_MAX+1]
Special characters and their substitutions.
#define FR_SBUFF_TERM(_str)
Initialise a terminal structure with a single string.
#define FR_SBUFF_TALLOC_THREAD_LOCAL(_out, _init, _max)
Set of terminal elements.
Set of parsing rules for *unescape_until functions.
fr_slen_t fr_size_from_str(size_t *out, fr_sbuff_t *in)
Parse a size string with optional unit.
fr_slen_t fr_size_to_str(fr_sbuff_t *out, size_t in)
Print a size string with unit.
static char buff[sizeof("18446744073709551615")+3]
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
fr_aka_sim_id_type_t type
#define fr_table_value_by_str(_table, _name, _def)
Convert a string to a value using a sorted or ordered table.
#define fr_table_str_by_str_value(_table, _str_value, _def)
Brute force search a sorted or ordered ptr table, assuming the pointers are strings.
An element in a lexicographically sorted array of name to ptr mappings.
fr_slen_t talloc_array_concat(fr_sbuff_t *out, char const *const *array, char const *sep)
Concat an array of strings (not NULL terminated), with a string separator.
fr_slen_t fr_time_delta_from_str(fr_time_delta_t *out, char const *in, size_t inlen, fr_time_res_t hint)
Create fr_time_delta_t from a string.
fr_slen_t fr_time_delta_to_str(fr_sbuff_t *out, fr_time_delta_t delta, fr_time_res_t res, bool is_unsigned)
Print fr_time_delta_t to a string with an appropriate suffix.
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
A time delta, a difference in time measured in nanoseconds.
#define fr_type_is_string(_x)
void fr_value_box_clear(fr_value_box_t *data)
Clear/free any existing value and metadata.
#define FR_VALUE_BOX_INITIALISER_NULL(_vb)
A static initialiser for stack/globally allocated boxes.
static size_t char ** out