![]() |
The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
|
Kafka global structures. More...
#include <freeradius-devel/kafka/base.h>#include <freeradius-devel/server/tmpl.h>#include <freeradius-devel/util/size.h>
Include dependency graph for base.c:Go to the source code of this file.
<tt>conf_parser_t</tt> arrays | |
Nested subsections referenced by the Base-level surfaces first, then producer-specific, then consumer. | |
| conf_parser_t const | kafka_base_properties_config [] |
properties { ... } escape-hatch contents | |
| conf_parser_t const | kafka_base_topic_properties_config [] |
Per-topic properties { ... } escape-hatch contents. | |
| static fr_table_ptr_sorted_t | kafka_check_cert_cn_table [] |
| static size_t | kafka_check_cert_cn_table_len = NUM_ELEMENTS(kafka_check_cert_cn_table) |
| conf_parser_t const | kafka_connection_config [] |
| conf_parser_t const | kafka_metadata_config [] |
| conf_parser_t const | kafka_sasl_config [] |
| static conf_parser_t const | kafka_sasl_kerberos_config [] |
| static conf_parser_t const | kafka_sasl_oauth_config [] |
| conf_parser_t const | kafka_tls_config [] |
| conf_parser_t const | kafka_version_config [] |
Producer-specific topic config | |
| static conf_parser_t const | kafka_base_producer_topic_config [] |
| conf_parser_t const | kafka_base_producer_topics_config [] |
Consumer-specific topic + group config | |
| conf_parser_t const | kafka_base_consumer_topic_config [] |
| conf_parser_t const | kafka_base_consumer_topics_config [] |
| conf_parser_t const | kafka_consumer_group_config [] |
Library init | |
librdkafka defers SSL / SASL / internal-refcount setup until the first Doing that lazily in a worker thread races the server's own OpenSSL init and leaves the ordering non-deterministic, so we kick it once at module load via | |
| static void | _kafka_null_log_cb (UNUSED rd_kafka_t const *rk, UNUSED int level, UNUSED char const *fac, UNUSED char const *buf) |
| void | fr_kafka_free (void) |
| Drop one ref to librdkafka's global init. | |
| int | fr_kafka_init (void) |
| Drive librdkafka's lazy global init deterministically. | |
| static uint32_t | kafka_instance_count = 0 |
Shared helpers | |
Used by both the base-level and topic-level parse/dflt paths below. | |
| static int | kafka_config_dflt_single (CONF_PAIR **out, UNUSED void *parent, CONF_SECTION *cs, char const *value, fr_token_t quote, conf_parser_t const *rule) |
| Common dflt path: take a librdkafka-native value string and materialise it as a CONF_PAIR in the caller's units (time deltas as "Ns", sizes with unit suffixes, etc.). | |
| static int | kafka_config_parse_single (char const **out, CONF_PAIR *cp, conf_parser_t const *rule) |
| Common parse path for a single CONF_PAIR's value. | |
| static int | kafka_noop_parse (UNUSED TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base, UNUSED CONF_ITEM *ci, UNUSED conf_parser_t const *rule) |
| No-op parser used to reserve CONF_PAIR names inside a topic subsection that the module reads separately (via call_env), so they aren't caught by the trailing raw-passthrough catch-all and fed to librdkafka. | |
Base conf (<tt>fr_kafka_conf_t</tt>) | |
Lifecycle, lazy-init + talloc sentinel, and the FR_CONF_PAIR_GLOBAL parsers for the top-level | |
| static int | _kafka_conf_free (rd_kafka_conf_t **pconf) |
| Destructor on the talloc sentinel that owns the rd_kafka_conf_t handle. | |
| static fr_kafka_conf_t * | kafka_conf_get (TALLOC_CTX *ctx, void *base) |
Fetch the fr_kafka_conf_t currently being populated by the parser. | |
| int | kafka_config_dflt (CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule) |
| Return the default value from the kafka client library. | |
| int | kafka_config_parse (TALLOC_CTX *ctx, UNUSED void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule) |
| Translate config items directly to settings in a kafka config struct. | |
| int | kafka_config_raw_parse (TALLOC_CTX *ctx, UNUSED void *out, void *base, CONF_ITEM *ci, UNUSED conf_parser_t const *rule) |
| Untyped passthrough: hand a CONF_PAIR's attr/value straight to rd_kafka_conf_set. | |
Topic conf (<tt>fr_kafka_topic_conf_t</tt> + <tt>fr_kafka_topic_t</tt>) | |
Per-topic lifecycle, FR_CONF_PAIR_GLOBAL parsers for entries inside a declared topic subsection, and the subsection hook that indexes each declared topic onto | |
| static int8_t | _kafka_topic_cmp (void const *one, void const *two) |
Order-by-name comparator for the fr_kafka_conf_t.topics tree. | |
| static int | _kafka_topic_conf_free (fr_kafka_topic_conf_t *ktc) |
| Destructor on a per-topic conf - releases the librdkafka handle. | |
| static fr_kafka_topic_conf_t * | kafka_topic_conf_alloc (TALLOC_CTX *ctx) |
Allocate a per-topic conf parented under ctx | |
| fr_kafka_topic_t * | kafka_topic_conf_find (fr_kafka_conf_t const *kc, char const *name) |
Look up a declared topic by name on an fr_kafka_conf_t | |
| static int | kafka_topic_config_dflt (CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule) |
| Return the default value for a topic from the kafka client library. | |
| static int | kafka_topic_config_parse (UNUSED TALLOC_CTX *ctx, UNUSED void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule) |
| Translate config items directly to settings in a kafka topic config struct. | |
| int | kafka_topic_config_raw_parse (UNUSED TALLOC_CTX *ctx, UNUSED void *out, void *base, CONF_ITEM *ci, UNUSED conf_parser_t const *rule) |
Topic-level counterpart to kafka_config_raw_parse | |
| int | kafka_topic_subsection_parse (TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, UNUSED conf_parser_t const *rule) |
| Per-topic subsection hook. | |
Kafka global structures.
Definition in file base.c.
|
static |
Destructor on the talloc sentinel that owns the rd_kafka_conf_t handle.
The sentinel is just a talloced rd_kafka_conf_t * attached to the caller's parse ctx - when talloc unwinds the instance, this fires and releases the librdkafka handle.
Definition at line 246 of file base.c.
Here is the caller graph for this function:
|
static |
Order-by-name comparator for the fr_kafka_conf_t.topics tree.
Definition at line 616 of file base.c.
Here is the caller graph for this function:
|
static |
| void fr_kafka_free | ( | void | ) |
Drop one ref to librdkafka's global init.
Release one reference to librdkafka's global state.
librdkafka refcounts its own globals internally; our counter just pairs fr_kafka_init() calls so re-entrant module load/unload in test harnesses does the right thing.
Definition at line 1238 of file base.c.
Here is the caller graph for this function:| int fr_kafka_init | ( | void | ) |
Drive librdkafka's lazy global init deterministically.
Initialise librdkafka's global state (SSL / SASL / internal ref-count)
First call creates and immediately destroys a throwaway producer, which walks all of librdkafka's one-shot init paths (SSL lock callbacks on OpenSSL 1.0.2, SASL global init if compiled in, etc.). Subsequent calls just bump the refcount so multiple kafka-using modules can share the init.
Definition at line 1207 of file base.c.
Here is the call graph for this function:
Here is the caller graph for this function:
|
static |
Fetch the fr_kafka_conf_t currently being populated by the parser.
The parser contract is that base points at the caller's instance struct and fr_kafka_conf_t is its first member, so a reinterpret cast of base is the fr_kafka_conf_t.
Also lazy-initialises the underlying librdkafka conf the first time we see it, attaching a talloc sentinel under the parse ctx so the handle is released when the caller's instance tree unwinds.
Definition at line 262 of file base.c.
Here is the call graph for this function:
Here is the caller graph for this function:| int kafka_config_dflt | ( | CONF_PAIR ** | out, |
| void * | parent, | ||
| CONF_SECTION * | cs, | ||
| fr_token_t | quote, | ||
| conf_parser_t const * | rule | ||
| ) |
Return the default value from the kafka client library.
Default-generator counterpart to kafka_config_parse - reads the librdkafka default for the property and materialises it as a CONF_PAIR.
| [out] | out | Where to write the pair. |
| [in] | parent | being populated. |
| [in] | cs | to allocate the pair in. |
| [in] | quote | to use when allocing the pair. |
| [in] | rule | UNUSED. |
Definition at line 367 of file base.c.
Here is the call graph for this function:
|
static |
Common dflt path: take a librdkafka-native value string and materialise it as a CONF_PAIR in the caller's units (time deltas as "Ns", sizes with unit suffixes, etc.).
Invoked by the base and topic dflt funcs.
| [out] | out | Where to write the pair. |
| [in] | parent | being populated. |
| [in] | cs | to allocate the pair in. |
| [in] | value | to convert. |
| [in] | quote | to use when allocing the pair. |
| [in] | rule | UNUSED. |
Definition at line 148 of file base.c.
Here is the call graph for this function:
Here is the caller graph for this function:| int kafka_config_parse | ( | TALLOC_CTX * | ctx, |
| UNUSED void * | out, | ||
| void * | base, | ||
| CONF_ITEM * | ci, | ||
| conf_parser_t const * | rule | ||
| ) |
Translate config items directly to settings in a kafka config struct.
| [in] | ctx | to allocate fr_kafka_conf_t in. |
| [out] | out | Unused. |
| [in] | base | Unused. |
| [in] | ci | To parse. |
| [in] | rule | describing how to parse the item. |
Definition at line 295 of file base.c.
Here is the call graph for this function:
|
static |
Common parse path for a single CONF_PAIR's value.
Handles librdkafka's preferred unit conventions (ms-integer for time deltas, byte-integer for sizes, string "true"/"false" for bools) and the optional kctx->mapping translation. Caller hands the resulting string to either rd_kafka_conf_set or rd_kafka_topic_conf_set.
Definition at line 46 of file base.c.
Here is the call graph for this function:
Here is the caller graph for this function:| int kafka_config_raw_parse | ( | TALLOC_CTX * | ctx, |
| UNUSED void * | out, | ||
| void * | base, | ||
| CONF_ITEM * | ci, | ||
| UNUSED conf_parser_t const * | rule | ||
| ) |
Untyped passthrough: hand a CONF_PAIR's attr/value straight to rd_kafka_conf_set.
Used by the CF_IDENT_ANY entry in the base properties { } subsection to accept arbitrary librdkafka properties that don't have a typed entry in KAFKA_BASE_CONFIG / KAFKA_PRODUCER_CONFIG / KAFKA_CONSUMER_CONFIG. No unit scaling, no bool mapping - the user writes what librdkafka expects (e.g. "500" for a ms value, "1048576" for a byte count).
Definition at line 452 of file base.c.
Here is the call graph for this function:
|
static |
Allocate a per-topic conf parented under ctx
Used by the subsection hook to build each declared topic's fr_kafka_topic_conf_t. The destructor releases the librdkafka handle when the owning fr_kafka_topic_t is freed.
Definition at line 494 of file base.c.
Here is the call graph for this function:
Here is the caller graph for this function:| fr_kafka_topic_t * kafka_topic_conf_find | ( | fr_kafka_conf_t const * | kc, |
| char const * | name | ||
| ) |
Look up a declared topic by name on an fr_kafka_conf_t
fr_kafka_topic_t, or NULL if no topic of that name was declared (or if no topics have been parsed yet). Definition at line 623 of file base.c.
Here is the call graph for this function:
Here is the caller graph for this function:
|
static |
Return the default value for a topic from the kafka client library.
| [out] | out | Where to write the pair. |
| [in] | parent | being populated. |
| [in] | cs | to allocate the pair in. |
| [in] | quote | to use when allocing the pair. |
| [in] | rule | UNUSED. |
Definition at line 556 of file base.c.
Here is the call graph for this function:
|
static |
Translate config items directly to settings in a kafka topic config struct.
base is the fr_kafka_topic_conf_t the per-topic subsection hook handed down, so we write directly through it instead of re-fetching via cf_data. Falls back to cf_data lookup if a caller runs this parser outside kafka_topic_subsection_parse.
| [in] | ctx | UNUSED. |
| [out] | out | UNUSED. |
| [in] | base | topic-level conf (fr_kafka_topic_conf_t *). |
| [in] | ci | To parse. |
| [in] | rule | describing how to parse the item. |
Definition at line 520 of file base.c.
Here is the call graph for this function:| int kafka_topic_config_raw_parse | ( | UNUSED TALLOC_CTX * | ctx, |
| UNUSED void * | out, | ||
| void * | base, | ||
| CONF_ITEM * | ci, | ||
| UNUSED conf_parser_t const * | rule | ||
| ) |
Topic-level counterpart to kafka_config_raw_parse
Used inside a declared topic's properties { } subsection to accept arbitrary rd_kafka_topic_conf_set properties. base is the enclosing topic's fr_kafka_topic_conf_t, handed down by the subsection hook.
Definition at line 598 of file base.c.
Here is the call graph for this function:| int kafka_topic_subsection_parse | ( | TALLOC_CTX * | ctx, |
| void * | out, | ||
| void * | base, | ||
| CONF_ITEM * | ci, | ||
| UNUSED conf_parser_t const * | rule | ||
| ) |
Per-topic subsection hook.
Runs the inner rules against the topic's section, then inserts a record into the parent's topics tree.
Invoked by the framework for each <name> { ... } inside topic { }. ci is the topic's CONF_SECTION, base points at the caller's instance struct (with fr_kafka_conf_t as its first member).
Definition at line 639 of file base.c.
Here is the call graph for this function:| conf_parser_t const kafka_base_consumer_topic_config[] |
| conf_parser_t const kafka_base_consumer_topics_config[] |
|
static |
| conf_parser_t const kafka_base_producer_topics_config[] |
| conf_parser_t const kafka_base_properties_config[] |
properties { ... } escape-hatch contents
Accepts any key = value pair and hands it straight to rd_kafka_conf_set. See kafka_config_raw_parse.
| conf_parser_t const kafka_base_topic_properties_config[] |
Per-topic properties { ... } escape-hatch contents.
Same idea as kafka_base_properties_config, but dispatches to rd_kafka_topic_conf_set against the enclosing topic's conf.
|
static |
|
static |
| conf_parser_t const kafka_connection_config[] |
| conf_parser_t const kafka_consumer_group_config[] |
| conf_parser_t const kafka_metadata_config[] |
| conf_parser_t const kafka_sasl_config[] |
|
static |
|
static |
| conf_parser_t const kafka_tls_config[] |
| conf_parser_t const kafka_version_config[] |
1.9.8