The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
base.h
Go to the documentation of this file.
1#pragma once
2/*
3 * This program is free software; you can kafkatribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or (at
6 * your option) any later version.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
16 */
17
18/**
19 * $Id: 6f774a3ab11adcdd2fca1922bfe86f41ee518a2c $
20 * @file lib/kafka/base.h
21 * @brief Common functions for interacting with kafk
22 *
23 * @author Arran Cudbard-Bell
24 *
25 * @copyright 2022 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
26 */
27RCSIDH(kafka_base_h, "$Id: 6f774a3ab11adcdd2fca1922bfe86f41ee518a2c $")
28
29#ifdef HAVE_WDOCUMENTATION
30DIAG_OFF(documentation-deprecated-sync)
31DIAG_OFF(documentation)
32#endif
33#include <librdkafka/rdkafka.h>
34#ifdef HAVE_WDOCUMENTATION
35DIAG_ON(documentation)
36DIAG_ON(documentation-deprecated-sync)
37#endif
38
39#include <freeradius-devel/server/cf_parse.h>
40#include <freeradius-devel/util/rb.h>
41
42#ifdef __cplusplus
43extern "C" {
44#endif
45
47
48typedef struct {
49 rd_kafka_conf_t *conf;
50
51 fr_rb_tree_t *topics; //!< Declared topics, keyed by name. Populated during
52 //!< config parsing by the per-topic hook on the `topic { }`
53 //!< subsection; use `kafka_topic_conf_find` to query.
55
56typedef struct {
57 rd_kafka_topic_conf_t *rdtc;
59
60/** Declared topic record - one per `topic { <name> { ... } }` subsection
61 *
62 * Built by the library's per-topic parse hook, indexed on the parent
63 * `fr_kafka_conf_t.topics` tree. Callers typically retrieve via
64 * `kafka_topic_conf_find` rather than poking this struct directly.
65 */
67 char const *name; //!< as it appeared in config
68 fr_kafka_topic_conf_t *conf; //!< parsed per-topic librdkafka conf
69 CONF_SECTION *cs; //!< topic's CONF_SECTION (for call_env lookups
70 //!< of per-topic pairs like `value` / `key`)
72};
73
74/** uctx attached to each entry in `KAFKA_BASE_PRODUCER_CONFIG`
75 *
76 * Public so the config macro's struct literals resolve in caller TUs.
77 * You only touch this directly if you're extending the library's
78 * producer config with additional librdkafka pass-through properties.
79 */
80typedef struct {
81 fr_table_ptr_sorted_t *mapping; //!< Mapping table between string constant.
82 size_t *mapping_len; //!< Length of the mapping tables
83 bool empty_default; //!< Don't produce messages saying the default is missing.
84 size_t size_scale; //!< Divide/multiply FR_TYPE_SIZE by this amount.
85 char const *property; //!< Kafka configuration property.
86 char const *string_sep; //!< Used for multi-value configuration items.
87 //!< Kafka uses ', ' or ';' seemingly at random.
89
90/** Generic librdkafka-property parser used by `KAFKA_BASE_PRODUCER_CONFIG` entries
91 *
92 * Exposed so the macro's FR_CONF_PAIR_GLOBAL entries can reference it from any TU.
93 */
94int kafka_config_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule);
95
96/** Default-generator counterpart to @ref kafka_config_parse - reads the
97 * librdkafka default for the property and materialises it as a CONF_PAIR.
98 */
99int kafka_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule);
100
101/** Untyped passthrough parser used by `KAFKA_RAW_CONFIG`
102 *
103 * Hands the CONF_PAIR's attribute/value straight to `rd_kafka_conf_set`.
104 * No type dispatch - user is responsible for librdkafka-native units.
105 */
106int kafka_config_raw_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule);
107
108/** Topic-level raw passthrough. Counterpart to `kafka_config_raw_parse`
109 * for use inside a declared topic's subsection, dispatching to
110 * `rd_kafka_topic_conf_set`.
111 */
112int kafka_topic_config_raw_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule);
113
114/** Per-topic subsection hook used by KAFKA_PRODUCER_CONFIG / KAFKA_CONSUMER_CONFIG
115 *
116 * Runs the inner rules against each `topic { <name> { ... } }` block
117 * then inserts an `fr_kafka_topic_t` into the parent `fr_kafka_conf_t.topics`
118 * tree so callers can look topics up by name without re-walking CONF_SECTIONs.
119 */
120int kafka_topic_subsection_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule);
121
122/** Look up a declared topic by name on an `fr_kafka_conf_t`
123 *
124 * @return the `fr_kafka_topic_t`, or NULL if no topic of that name was
125 * declared (or if no topics have been parsed yet).
126 */
128
129/** Initialise librdkafka's global state (SSL / SASL / internal ref-count)
130 *
131 * Ref-counted: every call must be paired with `fr_kafka_free()`. The
132 * first call lazily kicks librdkafka's one-time init paths by creating
133 * and destroying a throwaway producer; subsequent calls just bump the
134 * refcount. Call this from a kafka-using module's `.onload` so the
135 * library's internal globals are set up deterministically at startup
136 * rather than racing the first real `rd_kafka_new()` in a worker
137 * thread.
138 *
139 * @return 0 on success, -1 on failure.
140 */
141int fr_kafka_init(void);
142
143/** Release one reference to librdkafka's global state
144 *
145 * Call from a module's `.unload` to pair `fr_kafka_init()`. The last
146 * release is a no-op; librdkafka internally ref-counts its own globals
147 * and tears down when the last `rd_kafka_t` goes.
148 */
149void fr_kafka_free(void);
150
151/** @name Nested config arrays referenced by `KAFKA_BASE_PRODUCER_CONFIG`
152 *
153 * Extern so the macro's FR_CONF_SUBSECTION_GLOBAL entries can name them
154 * from any TU. Not part of the stable API - treat as implementation
155 * detail of the macro.
156 * @{
157 */
161extern conf_parser_t const kafka_tls_config[];
162extern conf_parser_t const kafka_sasl_config[];
168/** @} */
169
170/** Config entries common to producer and consumer clients
171 *
172 * Broker list, client identity, TLS / SASL, metadata / version /
173 * connection tuning, debug / plugin knobs. Usually composed with a
174 * role-specific macro (@ref KAFKA_PRODUCER_CONFIG, or a future
175 * consumer equivalent).
176 */
177#define KAFKA_BASE_CONFIG \
178 /* Initial list of brokers. librdkafka only needs one it can reach. */ \
179 { FR_CONF_PAIR_GLOBAL("server", FR_TYPE_STRING, CONF_FLAG_REQUIRED | CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
180 .uctx = &(fr_kafka_conf_ctx_t){ .property = "metadata.broker.list", .string_sep = "," }}, \
181 /* Identifier sent with each request to brokers. */ \
182 { FR_CONF_PAIR_GLOBAL("client_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
183 .uctx = &(fr_kafka_conf_ctx_t){ .property = "client.id" }}, \
184 /* Rack identifier for rack-aware fetch-from-follower. */ \
185 { FR_CONF_PAIR_GLOBAL("rack_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
186 .uctx = &(fr_kafka_conf_ctx_t){ .property = "client.rack" }}, \
187 /* Max size of a message the broker will accept. */ \
188 { FR_CONF_PAIR_GLOBAL("request_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
189 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.max.bytes" }}, \
190 /* Max size of a message copied into librdkafka's send buffer. */ \
191 { FR_CONF_PAIR_GLOBAL("request_copy_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
192 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.copy.max.bytes" }}, \
193 /* Max size of a response from a broker. */ \
194 { FR_CONF_PAIR_GLOBAL("response_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
195 .uctx = &(fr_kafka_conf_ctx_t){ .property = "receive.message.max.bytes" }}, \
196 /* Compile-time features to enable (comma-separated). */ \
197 { FR_CONF_PAIR_GLOBAL("feature", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
198 .uctx = &(fr_kafka_conf_ctx_t){ .property = "builtin.features", .string_sep = "," }}, \
199 /* Comma-separated list of debug contexts to enable. */ \
200 { FR_CONF_PAIR_GLOBAL("debug", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
201 .uctx = &(fr_kafka_conf_ctx_t){ .property = "debug", .string_sep = "," }}, \
202 /* Semicolon-separated plugin library paths to load. */ \
203 { FR_CONF_PAIR_GLOBAL("plugin", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, NULL), \
204 .uctx = &(fr_kafka_conf_ctx_t){ .property = "plugin.library.paths", .string_sep = ";" }}, \
205 { FR_CONF_SUBSECTION_GLOBAL("metadata", 0, kafka_metadata_config) }, \
206 { FR_CONF_SUBSECTION_GLOBAL("version", 0, kafka_version_config) }, \
207 { FR_CONF_SUBSECTION_GLOBAL("connection", 0, kafka_connection_config) }, \
208 { FR_CONF_SUBSECTION_GLOBAL("tls", 0, kafka_tls_config) }, \
209 { FR_CONF_SUBSECTION_GLOBAL("sasl", 0, kafka_sasl_config) }, \
210 /* Escape-hatch for librdkafka client properties we don't enumerate. \
211 * Contents are fed verbatim to rd_kafka_conf_set - no type dispatch, \
212 * the user writes librdkafka-native units (e.g. "500" for ms). */ \
213 { FR_CONF_SUBSECTION_GLOBAL("properties", 0, kafka_base_properties_config) }
214
215/** Producer-only delta: librdkafka producer tuning + declared topics
216 *
217 * Compose with @ref KAFKA_BASE_CONFIG. Callers must embed
218 * `fr_kafka_conf_t` as the first member of their instance struct so
219 * `FR_CONF_OFFSET` resolves against it.
220 *
221 * @code
222 * static conf_parser_t const module_config[] = {
223 * KAFKA_BASE_CONFIG,
224 * KAFKA_PRODUCER_CONFIG,
225 * { FR_CONF_OFFSET("my_thing", rlm_foo_t, my_thing) },
226 * CONF_PARSER_TERMINATOR
227 * };
228 * @endcode
229 */
230#define KAFKA_PRODUCER_CONFIG \
231 /* Enables the transactional producer. */ \
232 { FR_CONF_PAIR_GLOBAL("transactional_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
233 .uctx = &(fr_kafka_conf_ctx_t){ .property = "transactional.id", .empty_default = true }}, \
234 /* Maximum time the transaction coordinator will wait for a status update \
235 * from the producer before proactively aborting the transaction. */ \
236 { FR_CONF_PAIR_GLOBAL("transaction_timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
237 .uctx = &(fr_kafka_conf_ctx_t){ .property = "transaction.timeout.ms" }}, \
238 /* Ensures exactly-once, in-order delivery per partition. \
239 * Requires acks=all semantics broker-side. */ \
240 { FR_CONF_PAIR_GLOBAL("idempotence", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
241 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.idempotence" }}, \
242 /* Fail any error that would cause a gap in the produced message series. */ \
243 { FR_CONF_PAIR_GLOBAL("gapless_guarantee", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
244 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.gapless.guarantee" }}, \
245 /* Max number of messages buffered across all topics/partitions. \
246 * Produce fails synchronously (QUEUE_FULL) once hit. */ \
247 { FR_CONF_PAIR_GLOBAL("queue_max_messages", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
248 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.messages" }}, \
249 /* Max total size of buffered messages (bytes, scaled from kbytes). */ \
250 { FR_CONF_PAIR_GLOBAL("queue_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
251 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.kbytes", .size_scale = 1024 }}, \
252 /* Linger time before a batch is sent, for producer-side batching. */ \
253 { FR_CONF_PAIR_GLOBAL("queue_max_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
254 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.ms" }}, \
255 /* Max number of retries per failed send. */ \
256 { FR_CONF_PAIR_GLOBAL("message_retry_max", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
257 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.send.max.retries" }}, \
258 /* Backoff between retries of a protocol request. */ \
259 { FR_CONF_PAIR_GLOBAL("message_retry_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
260 .uctx = &(fr_kafka_conf_ctx_t){ .property = "retry.backoff.ms" }}, \
261 /* Outstanding-request threshold at which the accumulator backpressures. */ \
262 { FR_CONF_PAIR_GLOBAL("backpressure_threshold", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
263 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.backpressure.threshold" }}, \
264 /* Compression codec: none, gzip, snappy, lz4, zstd. */ \
265 { FR_CONF_PAIR_GLOBAL("compression_type", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
266 .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.type" }}, \
267 /* Max size (bytes) of all messages batched into one MessageSet. */ \
268 { FR_CONF_PAIR_GLOBAL("batch_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
269 .uctx = &(fr_kafka_conf_ctx_t){ .property = "batch.size" }}, \
270 /* Delay before reassigning sticky partitions per topic. */ \
271 { FR_CONF_PAIR_GLOBAL("sticky_partition_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
272 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sticky.partitioning.linger.ms" }}, \
273 /* Declared topics. Topic-level conf is stashed on each subsection via \
274 * cf_data_add; the subcs_size is a dummy because cf_parse asserts on \
275 * it for multi subsections. */ \
276 { FR_CONF_SUBSECTION_GLOBAL("topic", 0, kafka_base_producer_topics_config) }
277
278/** Consumer-only delta: consumer-group membership, fetch/queue tuning,
279 * declared subscription topics.
280 *
281 * Compose with @ref KAFKA_BASE_CONFIG. Same embedding contract as the
282 * producer macro (see @ref KAFKA_PRODUCER_CONFIG).
283 *
284 * @code
285 * static conf_parser_t const module_config[] = {
286 * KAFKA_BASE_CONFIG,
287 * KAFKA_CONSUMER_CONFIG,
288 * CONF_PARSER_TERMINATOR
289 * };
290 * @endcode
291 */
292#define KAFKA_CONSUMER_CONFIG \
293 /* Consumer-group membership config (id, instance_id, session_timeout, ...). */ \
294 { FR_CONF_SUBSECTION_GLOBAL("group", 0, kafka_consumer_group_config) }, \
295 /* Max allowed time between calls to consume messages. */ \
296 { FR_CONF_PAIR_GLOBAL("max_poll_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
297 .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.poll.interval.ms" }}, \
298 /* Whether offsets are committed automatically. */ \
299 { FR_CONF_PAIR_GLOBAL("auto_commit", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
300 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.auto.commit" }}, \
301 /* Interval between auto commits. */ \
302 { FR_CONF_PAIR_GLOBAL("auto_commit_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
303 .uctx = &(fr_kafka_conf_ctx_t){ .property = "auto.commit.interval.ms" }}, \
304 /* Automatically store the offset of the last message handed to the application. */ \
305 { FR_CONF_PAIR_GLOBAL("auto_offset_store", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
306 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.auto.offset.store" }}, \
307 /* Min number of messages per topic+partition librdkafka keeps locally. */ \
308 { FR_CONF_PAIR_GLOBAL("queued_messages_min", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt), \
309 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queued.min.messages" }}, \
310 /* Max total size of pre-fetched messages in the local consumer queue. */ \
311 { FR_CONF_PAIR_GLOBAL("queued_messages_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
312 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queued.max.messages.kbytes", .size_scale = 1024 }}, \
313 /* Max time the broker may wait to fill the Fetch response. */ \
314 { FR_CONF_PAIR_GLOBAL("fetch_wait_max", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
315 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.wait.max.ms" }}, \
316 /* Initial per-partition fetch size. */ \
317 { FR_CONF_PAIR_GLOBAL("fetch_message_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
318 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.message.max.bytes" }}, \
319 /* Max bytes per topic+partition in a Fetch response. */ \
320 { FR_CONF_PAIR_GLOBAL("fetch_partition_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
321 .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.partition.fetch.bytes" }}, \
322 /* Max total data a broker may return for a single Fetch request. */ \
323 { FR_CONF_PAIR_GLOBAL("fetch_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
324 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.max.bytes" }}, \
325 /* Min bytes the broker responds with. */ \
326 { FR_CONF_PAIR_GLOBAL("fetch_min_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
327 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.min.bytes" }}, \
328 /* How long to wait before retrying a fetch after an error. */ \
329 { FR_CONF_PAIR_GLOBAL("fetch_error_backoff", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
330 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.error.backoff.ms" }}, \
331 /* How to read messages written transactionally (read_committed / read_uncommitted). */ \
332 { FR_CONF_PAIR_GLOBAL("isolation_level", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
333 .uctx = &(fr_kafka_conf_ctx_t){ .property = "isolation.level" }}, \
334 /* Verify CRC32 of every consumed message. */ \
335 { FR_CONF_PAIR_GLOBAL("check_crcs", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
336 .uctx = &(fr_kafka_conf_ctx_t){ .property = "check.crcs" }}, \
337 /* Allow automatic topic creation when subscribing to an unknown topic. */ \
338 { FR_CONF_PAIR_GLOBAL("auto_create_topic", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
339 .uctx = &(fr_kafka_conf_ctx_t){ .property = "allow.auto.create.topics" }}, \
340 /* Declared subscription topics; same layering as the producer form. */ \
341 { FR_CONF_SUBSECTION_GLOBAL("topic", 0, kafka_base_consumer_topics_config) }
342
343
344#ifdef __cplusplus
345}
346#endif
#define DIAG_ON(_x)
Definition build.h:487
#define RCSIDH(h, id)
Definition build.h:513
#define DIAG_OFF(_x)
Definition build.h:486
Defines a CONF_PAIR to C data type mapping.
Definition cf_parse.h:606
Common header for all CONF_* types.
Definition cf_priv.h:49
Configuration AVP similar to a fr_pair_t.
Definition cf_priv.h:72
A section grouping multiple CONF_PAIR.
Definition cf_priv.h:101
int kafka_config_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Generic librdkafka-property parser used by KAFKA_BASE_PRODUCER_CONFIG entries.
void fr_kafka_free(void)
Release one reference to librdkafka's global state.
Definition base.c:1238
conf_parser_t const kafka_connection_config[]
Definition base.c:858
int kafka_topic_config_raw_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Topic-level raw passthrough.
conf_parser_t const kafka_base_consumer_topics_config[]
Definition base.c:1168
conf_parser_t const kafka_base_topic_properties_config[]
Per-topic properties { ... } escape-hatch contents.
Definition base.c:716
fr_kafka_topic_t * kafka_topic_conf_find(fr_kafka_conf_t const *kc, char const *name)
Look up a declared topic by name on an fr_kafka_conf_t
Definition base.c:623
int kafka_config_raw_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Untyped passthrough parser used by KAFKA_RAW_CONFIG
size_t size_scale
Divide/multiply FR_TYPE_SIZE by this amount.
Definition base.h:84
rd_kafka_conf_t * conf
Definition base.h:49
int kafka_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
Default-generator counterpart to kafka_config_parse - reads the librdkafka default for the property a...
Definition base.c:367
CONF_SECTION * cs
topic's CONF_SECTION (for call_env lookups of per-topic pairs like value / key)
Definition base.h:69
fr_rb_tree_t * topics
Declared topics, keyed by name.
Definition base.h:51
rd_kafka_topic_conf_t * rdtc
Definition base.h:57
char const * property
Kafka configuration property.
Definition base.h:85
int fr_kafka_init(void)
Initialise librdkafka's global state (SSL / SASL / internal ref-count)
Definition base.c:1207
conf_parser_t const kafka_base_properties_config[]
properties { ... } escape-hatch contents
Definition base.c:706
fr_rb_node_t node
Definition base.h:71
char const * name
as it appeared in config
Definition base.h:67
fr_kafka_topic_conf_t * conf
parsed per-topic librdkafka conf
Definition base.h:68
conf_parser_t const kafka_version_config[]
Definition base.c:935
char const * string_sep
Used for multi-value configuration items.
Definition base.h:86
conf_parser_t const kafka_consumer_group_config[]
Definition base.c:1092
int kafka_topic_subsection_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Per-topic subsection hook used by KAFKA_PRODUCER_CONFIG / KAFKA_CONSUMER_CONFIG.
conf_parser_t const kafka_tls_config[]
Definition base.c:799
conf_parser_t const kafka_base_producer_topics_config[]
Definition base.c:1074
bool empty_default
Don't produce messages saying the default is missing.
Definition base.h:83
conf_parser_t const kafka_metadata_config[]
Definition base.c:963
size_t * mapping_len
Length of the mapping tables.
Definition base.h:82
fr_table_ptr_sorted_t * mapping
Mapping table between string constant.
Definition base.h:81
conf_parser_t const kafka_sasl_config[]
Definition base.c:765
uctx attached to each entry in KAFKA_BASE_PRODUCER_CONFIG
Definition base.h:80
Declared topic record - one per topic { <name> { ... } } subsection.
Definition base.h:66
The main red black tree structure.
Definition rb.h:71
static char const * name
An element in a lexicographically sorted array of name to ptr mappings.
Definition table.h:65
enum fr_token fr_token_t
static fr_slen_t parent
Definition pair.h:858
static size_t char ** out
Definition value.h:1030