27RCSIDH(kafka_base_h,
"$Id: 6f774a3ab11adcdd2fca1922bfe86f41ee518a2c $")
29#ifdef HAVE_WDOCUMENTATION
30DIAG_OFF(documentation-deprecated-sync)
33#include <librdkafka/rdkafka.h>
34#ifdef HAVE_WDOCUMENTATION
36DIAG_ON(documentation-deprecated-sync)
39#include <freeradius-devel/server/cf_parse.h>
40#include <freeradius-devel/util/rb.h>
57 rd_kafka_topic_conf_t *
rdtc;
177#define KAFKA_BASE_CONFIG \
179 { FR_CONF_PAIR_GLOBAL("server", FR_TYPE_STRING, CONF_FLAG_REQUIRED | CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
180 .uctx = &(fr_kafka_conf_ctx_t){ .property = "metadata.broker.list", .string_sep = "," }}, \
182 { FR_CONF_PAIR_GLOBAL("client_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
183 .uctx = &(fr_kafka_conf_ctx_t){ .property = "client.id" }}, \
185 { FR_CONF_PAIR_GLOBAL("rack_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
186 .uctx = &(fr_kafka_conf_ctx_t){ .property = "client.rack" }}, \
188 { FR_CONF_PAIR_GLOBAL("request_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
189 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.max.bytes" }}, \
191 { FR_CONF_PAIR_GLOBAL("request_copy_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
192 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.copy.max.bytes" }}, \
194 { FR_CONF_PAIR_GLOBAL("response_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
195 .uctx = &(fr_kafka_conf_ctx_t){ .property = "receive.message.max.bytes" }}, \
197 { FR_CONF_PAIR_GLOBAL("feature", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
198 .uctx = &(fr_kafka_conf_ctx_t){ .property = "builtin.features", .string_sep = "," }}, \
200 { FR_CONF_PAIR_GLOBAL("debug", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
201 .uctx = &(fr_kafka_conf_ctx_t){ .property = "debug", .string_sep = "," }}, \
203 { FR_CONF_PAIR_GLOBAL("plugin", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, NULL), \
204 .uctx = &(fr_kafka_conf_ctx_t){ .property = "plugin.library.paths", .string_sep = ";" }}, \
205 { FR_CONF_SUBSECTION_GLOBAL("metadata", 0, kafka_metadata_config) }, \
206 { FR_CONF_SUBSECTION_GLOBAL("version", 0, kafka_version_config) }, \
207 { FR_CONF_SUBSECTION_GLOBAL("connection", 0, kafka_connection_config) }, \
208 { FR_CONF_SUBSECTION_GLOBAL("tls", 0, kafka_tls_config) }, \
209 { FR_CONF_SUBSECTION_GLOBAL("sasl", 0, kafka_sasl_config) }, \
213 { FR_CONF_SUBSECTION_GLOBAL("properties", 0, kafka_base_properties_config) }
230#define KAFKA_PRODUCER_CONFIG \
232 { FR_CONF_PAIR_GLOBAL("transactional_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
233 .uctx = &(fr_kafka_conf_ctx_t){ .property = "transactional.id", .empty_default = true }}, \
236 { FR_CONF_PAIR_GLOBAL("transaction_timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
237 .uctx = &(fr_kafka_conf_ctx_t){ .property = "transaction.timeout.ms" }}, \
240 { FR_CONF_PAIR_GLOBAL("idempotence", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
241 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.idempotence" }}, \
243 { FR_CONF_PAIR_GLOBAL("gapless_guarantee", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
244 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.gapless.guarantee" }}, \
247 { FR_CONF_PAIR_GLOBAL("queue_max_messages", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
248 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.messages" }}, \
250 { FR_CONF_PAIR_GLOBAL("queue_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
251 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.kbytes", .size_scale = 1024 }}, \
253 { FR_CONF_PAIR_GLOBAL("queue_max_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
254 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.ms" }}, \
256 { FR_CONF_PAIR_GLOBAL("message_retry_max", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
257 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.send.max.retries" }}, \
259 { FR_CONF_PAIR_GLOBAL("message_retry_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
260 .uctx = &(fr_kafka_conf_ctx_t){ .property = "retry.backoff.ms" }}, \
262 { FR_CONF_PAIR_GLOBAL("backpressure_threshold", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
263 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.backpressure.threshold" }}, \
265 { FR_CONF_PAIR_GLOBAL("compression_type", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
266 .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.type" }}, \
268 { FR_CONF_PAIR_GLOBAL("batch_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
269 .uctx = &(fr_kafka_conf_ctx_t){ .property = "batch.size" }}, \
271 { FR_CONF_PAIR_GLOBAL("sticky_partition_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
272 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sticky.partitioning.linger.ms" }}, \
276 { FR_CONF_SUBSECTION_GLOBAL("topic", 0, kafka_base_producer_topics_config) }
292#define KAFKA_CONSUMER_CONFIG \
294 { FR_CONF_SUBSECTION_GLOBAL("group", 0, kafka_consumer_group_config) }, \
296 { FR_CONF_PAIR_GLOBAL("max_poll_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
297 .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.poll.interval.ms" }}, \
299 { FR_CONF_PAIR_GLOBAL("auto_commit", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
300 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.auto.commit" }}, \
302 { FR_CONF_PAIR_GLOBAL("auto_commit_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
303 .uctx = &(fr_kafka_conf_ctx_t){ .property = "auto.commit.interval.ms" }}, \
305 { FR_CONF_PAIR_GLOBAL("auto_offset_store", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
306 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.auto.offset.store" }}, \
308 { FR_CONF_PAIR_GLOBAL("queued_messages_min", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt), \
309 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queued.min.messages" }}, \
311 { FR_CONF_PAIR_GLOBAL("queued_messages_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
312 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queued.max.messages.kbytes", .size_scale = 1024 }}, \
314 { FR_CONF_PAIR_GLOBAL("fetch_wait_max", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
315 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.wait.max.ms" }}, \
317 { FR_CONF_PAIR_GLOBAL("fetch_message_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
318 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.message.max.bytes" }}, \
320 { FR_CONF_PAIR_GLOBAL("fetch_partition_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
321 .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.partition.fetch.bytes" }}, \
323 { FR_CONF_PAIR_GLOBAL("fetch_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
324 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.max.bytes" }}, \
326 { FR_CONF_PAIR_GLOBAL("fetch_min_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
327 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.min.bytes" }}, \
329 { FR_CONF_PAIR_GLOBAL("fetch_error_backoff", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
330 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.error.backoff.ms" }}, \
332 { FR_CONF_PAIR_GLOBAL("isolation_level", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
333 .uctx = &(fr_kafka_conf_ctx_t){ .property = "isolation.level" }}, \
335 { FR_CONF_PAIR_GLOBAL("check_crcs", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
336 .uctx = &(fr_kafka_conf_ctx_t){ .property = "check.crcs" }}, \
338 { FR_CONF_PAIR_GLOBAL("auto_create_topic", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
339 .uctx = &(fr_kafka_conf_ctx_t){ .property = "allow.auto.create.topics" }}, \
341 { FR_CONF_SUBSECTION_GLOBAL("topic", 0, kafka_base_consumer_topics_config) }
Defines a CONF_PAIR to C data type mapping.
Common header for all CONF_* types.
Configuration AVP similar to a fr_pair_t.
A section grouping multiple CONF_PAIR.
int kafka_config_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Generic librdkafka-property parser used by KAFKA_BASE_PRODUCER_CONFIG entries.
void fr_kafka_free(void)
Release one reference to librdkafka's global state.
conf_parser_t const kafka_connection_config[]
int kafka_topic_config_raw_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Topic-level raw passthrough.
conf_parser_t const kafka_base_consumer_topics_config[]
conf_parser_t const kafka_base_topic_properties_config[]
Per-topic properties { ... } escape-hatch contents.
fr_kafka_topic_t * kafka_topic_conf_find(fr_kafka_conf_t const *kc, char const *name)
Look up a declared topic by name on an fr_kafka_conf_t
int kafka_config_raw_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Untyped passthrough parser used by KAFKA_RAW_CONFIG
size_t size_scale
Divide/multiply FR_TYPE_SIZE by this amount.
int kafka_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
Default-generator counterpart to kafka_config_parse - reads the librdkafka default for the property a...
CONF_SECTION * cs
topic's CONF_SECTION (for call_env lookups of per-topic pairs like value / key)
fr_rb_tree_t * topics
Declared topics, keyed by name.
rd_kafka_topic_conf_t * rdtc
char const * property
Kafka configuration property.
int fr_kafka_init(void)
Initialise librdkafka's global state (SSL / SASL / internal ref-count)
conf_parser_t const kafka_base_properties_config[]
properties { ... } escape-hatch contents
char const * name
as it appeared in config
fr_kafka_topic_conf_t * conf
parsed per-topic librdkafka conf
conf_parser_t const kafka_version_config[]
char const * string_sep
Used for multi-value configuration items.
conf_parser_t const kafka_consumer_group_config[]
int kafka_topic_subsection_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Per-topic subsection hook used by KAFKA_PRODUCER_CONFIG / KAFKA_CONSUMER_CONFIG.
conf_parser_t const kafka_tls_config[]
conf_parser_t const kafka_base_producer_topics_config[]
bool empty_default
Don't produce messages saying the default is missing.
conf_parser_t const kafka_metadata_config[]
size_t * mapping_len
Length of the mapping tables.
fr_table_ptr_sorted_t * mapping
Mapping table between string constant.
conf_parser_t const kafka_sasl_config[]
uctx attached to each entry in KAFKA_BASE_PRODUCER_CONFIG
Declared topic record - one per topic { <name> { ... } } subsection.
The main red black tree structure.
An element in a lexicographically sorted array of name to ptr mappings.
static size_t char ** out