The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
base.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 8e61208e2bf0767fe2acffd8a4e8861e4721c625 $
19 * @file kafka/base.c
20 * @brief Kafka global structures
21 *
22 * @copyright 2022 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
23 */
24
25#include <freeradius-devel/kafka/base.h>
26#include <freeradius-devel/server/tmpl.h>
27#include <freeradius-devel/util/size.h>
28
29/* fr_kafka_conf_ctx_t definition lives in base.h so the KAFKA_BASE_CONFIG
30 * macro can construct struct literals of it from caller TUs. */
31
32/** @name Shared helpers
33 *
34 * Used by both the base-level and topic-level parse/dflt paths below.
35 *
36 * @{
37 */
38
39/** Common parse path for a single CONF_PAIR's value
40 *
41 * Handles librdkafka's preferred unit conventions (ms-integer for time
42 * deltas, byte-integer for sizes, string "true"/"false" for bools) and
43 * the optional kctx->mapping translation. Caller hands the resulting
44 * string to either rd_kafka_conf_set or rd_kafka_topic_conf_set.
45 */
46static int kafka_config_parse_single(char const **out, CONF_PAIR *cp, conf_parser_t const *rule)
47{
49 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
50 fr_type_t type = rule->type;
51 static _Thread_local char buff[sizeof("18446744073709551615")];
52 static _Thread_local fr_sbuff_t sbuff;
53
54 /*
55 * Map string values if possible, and if there's
56 * no match then just pass the original through.
57 *
58 * We count this as validation...
59 */
60 if (kctx->mapping) {
61 fr_table_ptr_sorted_t *mapping = kctx->mapping;
62 size_t mapping_len = *kctx->mapping_len;
63
65 return 0;
66 } else if (fr_type_is_string(type)) {
67 *out = cf_pair_value(cp);
68 return 0;
69 }
70
71 /*
72 * Parse as a box for basic validation
73 */
74 if (cf_pair_to_value_box(NULL, &vb, cp, rule) < 0) return -1;
75
76 /*
77 * In kafka all the time deltas are in ms
78 * resolution, so we need to take the parsed value,
79 * scale it, and print it back to a string.
80 */
81 switch (type) {
83 {
84 uint64_t delta;
85
86 sbuff = FR_SBUFF_OUT(buff, sizeof(buff));
87 delta = fr_time_delta_to_msec(vb.vb_time_delta);
88 if (fr_sbuff_in_sprintf(&sbuff, "%" PRIu64, delta) < 0) {
89 error:
91 return -1;
92 }
93 *out = fr_sbuff_start(&sbuff);
94 }
95 break;
96
97 case FR_TYPE_SIZE:
98 {
99 size_t size = vb.vb_size;
100
101 sbuff = FR_SBUFF_OUT(buff, sizeof(buff));
102
103 /*
104 * Most options are in bytes, but some are in kilobytes
105 */
106 if (kctx->size_scale) size /= kctx->size_scale;
107
108 /*
109 * Kafka doesn't want units...
110 */
111 if (fr_sbuff_in_sprintf(&sbuff, "%zu", size) < 0) goto error;
112 *out = fr_sbuff_start(&sbuff);
113 }
114 break;
115
116 /*
117 * Ensure bool is always mapped to the string constants
118 * "true" or "false".
119 */
120 case FR_TYPE_BOOL:
121 *out = vb.vb_bool ? "true" : "false";
122 break;
123
124 default:
125 *out = cf_pair_value(cp);
126 break;
127 }
128
130
131 return 0;
132}
133
134/** Common dflt path: take a librdkafka-native value string and materialise
135 * it as a CONF_PAIR in the caller's units (time deltas as "Ns", sizes
136 * with unit suffixes, etc.). Invoked by the base and topic dflt funcs.
137 *
138 * @param[out] out Where to write the pair.
139 * @param[in] parent being populated.
140 * @param[in] cs to allocate the pair in.
141 * @param[in] value to convert.
142 * @param[in] quote to use when allocing the pair.
143 * @param[in] rule UNUSED.
144 * @return
145 * - 0 on success.
146 * - -1 on failure.
147 */
149 fr_token_t quote, conf_parser_t const *rule)
150{
151 char tmp[sizeof("18446744073709551615b")];
152 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
153 fr_type_t type = rule->type;
154
155 /*
156 * Apply any mappings available, but default back
157 * to the raw value if we don't have a match.
158 */
159 if (kctx->mapping) {
160 fr_table_ptr_sorted_t *mapping = kctx->mapping;
161 size_t mapping_len = *kctx->mapping_len;
162
164 }
165 /*
166 * Convert time delta as an integer with ms precision
167 */
168 switch (type) {
170 {
171 fr_sbuff_t value_elem = FR_SBUFF_IN(tmp, sizeof(tmp));
172 fr_time_delta_t delta;
173
174 if (fr_time_delta_from_str(&delta, value, strlen(value), FR_TIME_RES_MSEC) < 0) {
175 cf_log_perr(cs, "Failed parsing default \"%s\"", value);
176 return -1;
177 }
178
179 fr_time_delta_to_str(&value_elem, delta, FR_TIME_RES_SEC, true);
180 value = fr_sbuff_start(&value_elem);
181 }
182 break;
183
184 case FR_TYPE_SIZE:
185 {
186 fr_sbuff_t value_elem = FR_SBUFF_IN(tmp, sizeof(tmp));
187 size_t size;
188
189 if (fr_size_from_str(&size, &FR_SBUFF_IN_STR(value)) < 0) {
190 cf_log_perr(cs, "Failed parsing default \"%s\"", value);
191 return -1;
192 }
193
194 /*
195 * Some options are in kbytes *sigh*
196 */
197 if (kctx->size_scale) size *= kctx->size_scale;
198
199 /*
200 * reprint the size with an appropriate unit
201 */
202 if (fr_size_to_str(&value_elem, size) < 0) {
203 cf_log_perr(cs, "Failed size reprint");
204 return -1;
205 }
206 value = fr_sbuff_start(&value_elem);
207 }
208 break;
209
210 default:
211 break;
212 }
213
214 MEM(*out = cf_pair_alloc(cs, rule->name1, value, T_OP_EQ, T_BARE_WORD, quote));
215 cf_item_mark_parsed(*out); /* Don't re-parse this */
216
217 return 0;
218}
219
220/** No-op parser used to reserve CONF_PAIR names inside a topic subsection
221 * that the module reads separately (via call_env), so they aren't caught
222 * by the trailing raw-passthrough catch-all and fed to librdkafka.
223 */
224static int kafka_noop_parse(UNUSED TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base,
225 UNUSED CONF_ITEM *ci, UNUSED conf_parser_t const *rule)
226{
227 return 0;
228}
229
230/** @} */
231
232/** @name Base conf (`fr_kafka_conf_t`)
233 *
234 * Lifecycle, lazy-init + talloc sentinel, and the FR_CONF_PAIR_GLOBAL parsers for
235 * the top-level `kafka { ... }` section.
236 *
237 * @{
238 */
239
240/** Destructor on the talloc sentinel that owns the rd_kafka_conf_t handle
241 *
242 * The sentinel is just a talloced `rd_kafka_conf_t *` attached to the
243 * caller's parse ctx - when talloc unwinds the instance, this fires and
244 * releases the librdkafka handle.
245 */
246static int _kafka_conf_free(rd_kafka_conf_t **pconf)
247{
248 if (*pconf) rd_kafka_conf_destroy(*pconf);
249 return 0;
250}
251
252/** Fetch the `fr_kafka_conf_t` currently being populated by the parser
253 *
254 * The parser contract is that `base` points at the caller's instance
255 * struct and `fr_kafka_conf_t` is its first member, so a reinterpret
256 * cast of `base` is the `fr_kafka_conf_t`.
257 *
258 * Also lazy-initialises the underlying librdkafka conf the first time
259 * we see it, attaching a talloc sentinel under the parse ctx so the
260 * handle is released when the caller's instance tree unwinds.
261 */
262static fr_kafka_conf_t *kafka_conf_get(TALLOC_CTX *ctx, void *base)
263{
264 fr_kafka_conf_t *kc = base;
265
266 if (!kc) return NULL;
267 if (!kc->conf) {
268 rd_kafka_conf_t **s;
269
270 MEM(kc->conf = rd_kafka_conf_new());
271
272 /*
273 * Attach a sentinel under the parse ctx so teardown
274 * of the caller's instance data automatically releases
275 * the librdkafka handle.
276 */
277 MEM(s = talloc(ctx, rd_kafka_conf_t *));
278 *s = kc->conf;
279 talloc_set_destructor(s, _kafka_conf_free);
280 }
281 return kc;
282}
283
284/** Translate config items directly to settings in a kafka config struct
285 *
286 * @param[in] ctx to allocate fr_kafka_conf_t in.
287 * @param[out] out Unused.
288 * @param[in] base Unused.
289 * @param[in] ci To parse.
290 * @param[in] rule describing how to parse the item.
291 * @return
292 * - 0 on success.
293 * - -1 on failure
294 */
295int kafka_config_parse(TALLOC_CTX *ctx, UNUSED void *out, void *base,
296 CONF_ITEM *ci, conf_parser_t const *rule)
297{
298 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
301 CONF_PAIR *cp = cf_item_to_pair(ci);
302
303 fr_kafka_conf_t *kc;
304 char const *value;
305
306 kc = kafka_conf_get(ctx, base);
307 fr_assert_msg(kc, "kafka base struct missing - caller must embed fr_kafka_conf_t as first member");
308
309 /*
310 * Multi rules require us to concat the values together before handing them off
311 */
312 if (fr_rule_multi(rule)) {
313 unsigned int i;
314 CONF_PAIR *cp_p;
315 size_t count;
316 char const **array;
317 fr_sbuff_t *agg;
318 fr_slen_t slen;
319
320 FR_SBUFF_TALLOC_THREAD_LOCAL(&agg, 256, SIZE_MAX);
321
322 count = cf_pair_count(cs, rule->name1);
323 if (count <= 1) goto do_single;
324
325 MEM(array = talloc_array(ctx, char const *, count));
326 for (cp_p = cp, i = 0;
327 cp_p;
328 cp_p = cf_pair_find_next(cs, cp_p, rule->name1), i++) {
329 if (kafka_config_parse_single(&array[i], cp_p, rule) < 0) return -1;
331 }
332
333 slen = fr_sbuff_array_concat(agg, array, kctx->string_sep);
334 talloc_free(array);
335 if (slen < 0) return -1;
336
337 value = fr_sbuff_start(agg);
338 } else {
339 do_single:
340 if (kafka_config_parse_single(&value, cp, rule) < 0) return -1;
341 }
342
343 {
344 char errstr[512];
345
346 if (rd_kafka_conf_set(kc->conf, kctx->property,
347 value, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
348 cf_log_perr(cp, "%s", errstr);
349 return -1;
350 }
351 }
352
353 return 0;
354}
355
356/** Return the default value from the kafka client library
357 *
358 * @param[out] out Where to write the pair.
359 * @param[in] parent being populated.
360 * @param[in] cs to allocate the pair in.
361 * @param[in] quote to use when allocing the pair.
362 * @param[in] rule UNUSED.
363 * @return
364 * - 0 on success.
365 * - -1 on failure.
366 */
368{
369 char buff[1024];
370 size_t buff_len = sizeof(buff);
371 char const *value;
372
373 fr_kafka_conf_t *kc;
374 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
375 rd_kafka_conf_res_t ret;
376
377 kc = kafka_conf_get(cs, parent);
378 fr_assert_msg(kc, "kafka base struct missing during default generation");
379
380 if ((ret = rd_kafka_conf_get(kc->conf, kctx->property, buff, &buff_len)) != RD_KAFKA_CONF_OK) {
381 if (ret == RD_KAFKA_CONF_UNKNOWN) {
382 if (kctx->empty_default) return 0;
383
384 cf_log_debug(cs, "No default available for \"%s\" - \"%s\"", rule->name1, kctx->property);
385 return 0; /* Not an error */
386 }
387
388 cf_log_err(cs, "Failed retrieving kafka property \"%s\"", kctx->property);
389 return -1;
390 }
391#if 0
392 cf_log_debug(cs, "Retrieved dflt \"%s\" for \"%s\" - \"%s\"", buff, rule->name1, kctx->property);
393#endif
394 value = buff;
395
396 /*
397 * If it's multi we need to break the string apart on the string separator
398 * and potentially unescape the separator.
399 */
400 if (fr_rule_multi(rule)) {
401 fr_sbuff_t value_in = FR_SBUFF_IN(value, buff_len);
402 char tmp[256];
403 fr_sbuff_t value_elem = FR_SBUFF_OUT(tmp, sizeof(tmp));
404 /*
405 * FR_SBUFF_TERM() uses sizeof() on its argument, which
406 * produces the wrong length for a runtime pointer. Build
407 * the terminator list by hand so the length is correct.
408 */
409 fr_sbuff_term_elem_t tt_elem = { .str = kctx->string_sep, .len = strlen(kctx->string_sep) };
410 fr_sbuff_term_t tt = { .len = 1, .elem = &tt_elem };
411 fr_sbuff_unescape_rules_t ue_rules = {
412 .name = __FUNCTION__,
413 .chr = '\\'
414 };
415 /*
416 * Convert escaped separators back
417 */
418 ue_rules.subs[(uint8_t)kctx->string_sep[0]] = kctx->string_sep[0];
419
420 while (fr_sbuff_out_unescape_until(&value_elem, &value_in, SIZE_MAX, &tt, &ue_rules) > 0) {
421 if (kafka_config_dflt_single(out, parent, cs, fr_sbuff_start(&value_elem), quote, rule) < 0) return -1;
422
423 /*
424 * Skip past the string separator
425 */
426 fr_sbuff_advance(&value_in, strlen(kctx->string_sep));
427
428 /*
429 * Reset
430 */
431 fr_sbuff_set_to_start(&value_elem);
432 }
433 return 0;
434 }
435
436 /*
437 * Parse a single value
438 */
439 if (kafka_config_dflt_single(out, parent, cs, value, quote, rule) < 0) return -1;
440
441 return 0;
442}
443
444/** Untyped passthrough: hand a CONF_PAIR's attr/value straight to rd_kafka_conf_set
445 *
446 * Used by the `CF_IDENT_ANY` entry in the base `properties { }` subsection
447 * to accept arbitrary librdkafka properties that don't have a typed entry
448 * in `KAFKA_BASE_CONFIG` / `KAFKA_PRODUCER_CONFIG` / `KAFKA_CONSUMER_CONFIG`.
449 * No unit scaling, no bool mapping - the user writes what librdkafka
450 * expects (e.g. "500" for a ms value, "1048576" for a byte count).
451 */
452int kafka_config_raw_parse(TALLOC_CTX *ctx, UNUSED void *out, void *base,
453 CONF_ITEM *ci, UNUSED conf_parser_t const *rule)
454{
455 CONF_PAIR *cp = cf_item_to_pair(ci);
456 fr_kafka_conf_t *kc;
457 char errstr[512];
458
459 kc = kafka_conf_get(ctx, base);
460 fr_assert_msg(kc, "kafka base struct missing - caller must embed fr_kafka_conf_t as first member");
461
462 if (rd_kafka_conf_set(kc->conf, cf_pair_attr(cp), cf_pair_value(cp),
463 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
464 cf_log_perr(cp, "%s", errstr);
465 return -1;
466 }
467 return 0;
468}
469
470/** @} */
471
472/** @name Topic conf (`fr_kafka_topic_conf_t` + `fr_kafka_topic_t`)
473 *
474 * Per-topic lifecycle, FR_CONF_PAIR_GLOBAL parsers for entries inside a declared
475 * topic subsection, and the subsection hook that indexes each declared
476 * topic onto `fr_kafka_conf_t.topics`.
477 *
478 * @{
479 */
480
481/** Destructor on a per-topic conf - releases the librdkafka handle. */
483{
484 if (ktc->rdtc) rd_kafka_topic_conf_destroy(ktc->rdtc);
485 return 0;
486}
487
488/** Allocate a per-topic conf parented under `ctx`
489 *
490 * Used by the subsection hook to build each declared topic's
491 * `fr_kafka_topic_conf_t`. The destructor releases the librdkafka
492 * handle when the owning `fr_kafka_topic_t` is freed.
493 */
495{
497
498 MEM(ktc = talloc(ctx, fr_kafka_topic_conf_t));
499 MEM(ktc->rdtc = rd_kafka_topic_conf_new());
500 talloc_set_destructor(ktc, _kafka_topic_conf_free);
501 return ktc;
502}
503
504/** Translate config items directly to settings in a kafka topic config struct
505 *
506 * `base` is the `fr_kafka_topic_conf_t` the per-topic subsection hook
507 * handed down, so we write directly through it instead of re-fetching
508 * via cf_data. Falls back to cf_data lookup if a caller runs this
509 * parser outside `kafka_topic_subsection_parse`.
510 *
511 * @param[in] ctx UNUSED.
512 * @param[out] out UNUSED.
513 * @param[in] base topic-level conf (`fr_kafka_topic_conf_t *`).
514 * @param[in] ci To parse.
515 * @param[in] rule describing how to parse the item.
516 * @return
517 * - 0 on success.
518 * - -1 on failure
519 */
520static int kafka_topic_config_parse(UNUSED TALLOC_CTX *ctx, UNUSED void *out, void *base,
521 CONF_ITEM *ci, conf_parser_t const *rule)
522{
523 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
524 CONF_PAIR *cp = cf_item_to_pair(ci);
525
526 fr_kafka_topic_conf_t *ktc = base;
527 char const *value;
528
529 fr_assert_msg(ktc, "kafka topic conf missing - topic parser invoked without subsection hook");
530 if (kafka_config_parse_single(&value, cp, rule) < 0) return -1;
531
532 {
533 char errstr[512];
534
535 if (rd_kafka_topic_conf_set(ktc->rdtc, kctx->property,
536 value, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
537 cf_log_perr(cp, "%s", errstr);
538 return -1;
539 }
540 }
541
542 return 0;
543}
544
545/** Return the default value for a topic from the kafka client library
546 *
547 * @param[out] out Where to write the pair.
548 * @param[in] parent being populated.
549 * @param[in] cs to allocate the pair in.
550 * @param[in] quote to use when allocing the pair.
551 * @param[in] rule UNUSED.
552 * @return
553 * - 0 on success.
554 * - -1 on failure.
555 */
557{
558 char buff[1024];
559 size_t buff_len = sizeof(buff);
560 char const *value;
561
563 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
564 rd_kafka_conf_res_t ret;
565
566 fr_assert_msg(ktc, "kafka topic conf missing during default generation");
567
568 if ((ret = rd_kafka_topic_conf_get(ktc->rdtc, kctx->property, buff, &buff_len)) != RD_KAFKA_CONF_OK) {
569 if (ret == RD_KAFKA_CONF_UNKNOWN) {
570 if (kctx->empty_default) return 0;
571
572 cf_log_debug(cs, "No default available for \"%s\" - \"%s\"", rule->name1, kctx->property);
573 return 0; /* Not an error */
574 }
575
576 cf_log_err(cs, "Failed retrieving kafka property '%s'", kctx->property);
577 return -1;
578 }
579#if 0
580 cf_log_debug(cs, "Retrieved dflt \"%s\" for \"%s\" - \"%s\"", buff, rule->name1, kctx->property);
581#endif
582 value = buff;
583
584 /*
585 * Parse a single value
586 */
587 if (kafka_config_dflt_single(out, parent, cs, value, quote, rule) < 0) return -1;
588
589 return 0;
590}
591
592/** Topic-level counterpart to `kafka_config_raw_parse`
593 *
594 * Used inside a declared topic's `properties { }` subsection to accept
595 * arbitrary `rd_kafka_topic_conf_set` properties. `base` is the enclosing
596 * topic's `fr_kafka_topic_conf_t`, handed down by the subsection hook.
597 */
598int kafka_topic_config_raw_parse(UNUSED TALLOC_CTX *ctx, UNUSED void *out, void *base,
599 CONF_ITEM *ci, UNUSED conf_parser_t const *rule)
600{
601 CONF_PAIR *cp = cf_item_to_pair(ci);
602 fr_kafka_topic_conf_t *ktc = base;
603 char errstr[512];
604
605 fr_assert_msg(ktc, "kafka topic conf missing - raw parser invoked without subsection hook");
606
607 if (rd_kafka_topic_conf_set(ktc->rdtc, cf_pair_attr(cp), cf_pair_value(cp),
608 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
609 cf_log_perr(cp, "%s", errstr);
610 return -1;
611 }
612 return 0;
613}
614
615/** Order-by-name comparator for the `fr_kafka_conf_t.topics` tree. */
616static int8_t _kafka_topic_cmp(void const *one, void const *two)
617{
618 fr_kafka_topic_t const *a = one;
619 fr_kafka_topic_t const *b = two;
620 return CMP(strcmp(a->name, b->name), 0);
621}
622
624{
626
627 if (!kc || !kc->topics || !name) return NULL;
628 key.name = name;
629 return fr_rb_find(kc->topics, &key);
630}
631
632/** Per-topic subsection hook. Runs the inner rules against the topic's
633 * section, then inserts a record into the parent's topics tree.
634 *
635 * Invoked by the framework for each `<name> { ... }` inside `topic { }`.
636 * `ci` is the topic's CONF_SECTION, `base` points at the caller's instance
637 * struct (with `fr_kafka_conf_t` as its first member).
638 */
639int kafka_topic_subsection_parse(TALLOC_CTX *ctx, void *out, void *base,
640 CONF_ITEM *ci, UNUSED conf_parser_t const *rule)
641{
642 CONF_SECTION *subcs = cf_item_to_section(ci);
643 fr_kafka_conf_t *kc;
644 fr_kafka_topic_t *topic;
645 char const *name = cf_section_name1(subcs);
646
647 fr_assert_msg(base, "kafka base struct missing");
648
649 kc = kafka_conf_get(ctx, base);
650 if (!kc->topics) {
652 }
653
654 /*
655 * Allocate eagerly so the inner parsers can write into
656 * topic->conf via `base` instead of round-tripping through
657 * cf_data.
658 */
659 MEM(topic = talloc_zero(kc->topics, fr_kafka_topic_t));
660 topic->name = talloc_strdup(topic, name);
661 topic->conf = kafka_topic_conf_alloc(topic);
662 topic->cs = subcs;
663
664 /*
665 * Inner rules (acks, compression, properties, ...) have been
666 * pushed on the subsection by the framework. Run them with
667 * topic->conf as base so they write directly into our struct.
668 */
669 if (cf_section_parse(ctx, topic->conf, subcs) < 0) {
670 talloc_free(topic);
671 return -1;
672 }
673
674 if (!fr_rb_insert(kc->topics, topic)) {
675 cf_log_err(ci, "Duplicate kafka topic '%s'", name);
676 talloc_free(topic);
677 return -1;
678 }
679
680 /*
681 * If the caller wired an output target on the subsection
682 * rule, hand back the topic pointer so it lands in their
683 * array. The tree on kc->topics is the primary index;
684 * this is just a convenience for direct-access patterns.
685 */
686 if (out) *((fr_kafka_topic_t **)out) = topic;
687
688 return 0;
689}
690/** @} */
691
692/** @name `conf_parser_t` arrays
693 *
694 * Nested subsections referenced by the `KAFKA_BASE_CONFIG` /
695 * `KAFKA_PRODUCER_CONFIG` / `KAFKA_CONSUMER_CONFIG` macros in base.h.
696 * Base-level surfaces first, then producer-specific, then consumer.
697 *
698 * @{
699 */
700
701/** `properties { ... }` escape-hatch contents
702 *
703 * Accepts any `key = value` pair and hands it straight to
704 * `rd_kafka_conf_set`. See `kafka_config_raw_parse`.
705 */
710
711/** Per-topic `properties { ... }` escape-hatch contents
712 *
713 * Same idea as `kafka_base_properties_config`, but dispatches to
714 * `rd_kafka_topic_conf_set` against the enclosing topic's conf.
715 */
720
723 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.oauthbearer.config", .empty_default = true }},
724
726 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.sasl.oauthbearer.unsecure.jwt" }},
727
729};
730
732 /*
733 * Service principal
734 */
736 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.service.name" }},
737
738 /*
739 * Principal
740 */
742 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.principal" }},
743
744 /*
745 * knit cmd
746 */
748 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.kinit.cmd" }},
749
750 /*
751 * keytab
752 */
754 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.kinit.keytab", .empty_default = true }},
755
756 /*
757 * How long between key refreshes
758 */
760 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.min.time.before.relogin" }},
761
763};
764
766 /*
767 * SASL mechanism
768 */
770 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.mechanism" }},
771
772 /*
773 * Static SASL username
774 */
776 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.username", .empty_default = true }},
777
778 /*
779 * Static SASL password
780 */
782 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.password", .empty_default = true }},
783
785
787
789};
790
792 { L("false"), "none" },
793 { L("no"), "none" },
794 { L("true"), "https" },
795 { L("yes"), "https" }
796};
798
800 /*
801 * Cipher suite list in OpenSSL's format
802 */
804 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.cipher.suites", .empty_default = true }},
805
806 /*
807 * Curves list in OpenSSL's format
808 */
810 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.curves.list", .empty_default = true }},
811
812 /*
813 * Curves list in OpenSSL's format
814 */
816 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.sigalgs.list", .empty_default = true }},
817
818 /*
819 * Sets the full path to a CA certificate (used to validate
820 * the certificate the server presents).
821 */
823 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.ca.location", .empty_default = true }},
824
825 /*
826 * Location of the CRL file.
827 */
829 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.crl.location", .empty_default = true }},
830
831 /*
832 * Sets the path to the public certificate file we present
833 * to the servers.
834 */
836 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.certificate.location", .empty_default = true }},
837
838 /*
839 * Sets the path to the private key for our public
840 * certificate.
841 */
843 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.key.location", .empty_default = true }},
844
845 /*
846 * Enable or disable certificate validation
847 */
849 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.ssl.certificate.verification" }},
850
852 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.endpoint.identification.algorithm",
853 .mapping = kafka_check_cert_cn_table,
854 .mapping_len = &kafka_check_cert_cn_table_len }},
856};
857
859 /*
860 * Socket timeout
861 */
863 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.timeout.ms" }},
864
865 /*
866 * Close broker connections after this period.
867 */
869 .uctx = &(fr_kafka_conf_ctx_t){ .property = "connections.max.idle.ms" }},
870
871 /*
872 * Maximum requests in flight (per connection).
873 */
875 .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.in.flight.requests.per.connection" }},
876
877 /*
878 * Socket send buffer.
879 */
881 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.send.buffer.bytes" }},
882
883 /*
884 * Socket recv buffer.
885 */
887 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.receive.buffer.bytes" }},
888
889 /*
890 * If true, send TCP keepalives
891 */
893 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.keepalive.enable" }},
894
895 /*
896 * If true, disable nagle algorithm
897 */
899 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.nagle.disable" }},
900
901 /*
902 * How long the DNS resolver cache is valid for
903 */
905 .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.address.ttl" }},
906
907 /*
908 * Should we use A records, AAAA records or either
909 * when resolving broker addresses
910 */
912 .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.address.family" }},
913
914 /*
915 * How many failures before we reconnect the connection
916 */
917 { FR_CONF_PAIR_GLOBAL("reconnection_failure_count", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
918 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.max.fails" }},
919
920 /*
921 * Initial time to wait before reconnecting.
922 */
924 .uctx = &(fr_kafka_conf_ctx_t){ .property = "reconnect.backoff.ms" }},
925
926 /*
927 * Max time to wait before reconnecting.
928 */
930 .uctx = &(fr_kafka_conf_ctx_t){ .property = "reconnect.backoff.max.ms" }},
931
933};
934
936 /*
937 * Request the API version from connected brokers
938 */
940 .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.request" }},
941
942 /*
943 * How long to wait for a version response.
944 */
946 .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.request.timeout.ms" }},
947
948 /*
949 * How long to wait before retrying a version request.
950 */
952 .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.fallback.ms" }},
953
954 /*
955 * Default version to use if the version request fails.
956 */
958 .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.version.fallback" }},
959
961};
962
964 /*
965 * Interval between attempts to refresh metadata from brokers
966 */
968 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.interval.ms" }},
969
970 /*
971 * Interval between attempts to refresh metadata from brokers
972 */
974 .uctx = &(fr_kafka_conf_ctx_t){ .property = "metadata.max.age.ms" }},
975
976 /*
977 * Used when a topic loses its leader
978 */
980 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.fast.interval.ms" }},
981
982 /*
983 * Used when a topic loses its leader to prevent spurious metadata changes
984 */
986 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.propagation.max.ms" }},
987
988 /*
989 * Use sparse metadata requests which use less bandwidth maps
990 */
992 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.sparse" }},
993
994 /*
995 * List of topics to ignore
996 */
998 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.blacklist", .string_sep = ",", .empty_default = true }},
999
1001};
1002
1003/** @name Producer-specific topic config
1004 * @{
1005 */
1006
1008 /*
1009 * Payload and key templates for `kafka.produce.<topic>`
1010 * invocations. Parsed at call_env time, but we reserve
1011 * the names here so the raw-passthrough catch-all below
1012 * doesn't try to hand them to rd_kafka_topic_conf_set.
1013 */
1014 { FR_CONF_PAIR_GLOBAL("value", FR_TYPE_STRING, 0, kafka_noop_parse, NULL) },
1016
1017 /*
1018 * This field indicates the number of acknowledgements the leader
1019 * broker must receive from ISR brokers before responding to the request.
1020 */
1022 .uctx = &(fr_kafka_conf_ctx_t){ .property = "request.required.acks" }},
1023
1024 /*
1025 * medium The ack timeout of the producer request in milliseconds
1026 */
1028 .uctx = &(fr_kafka_conf_ctx_t){ .property = "request.timeout.ms" }},
1029
1030 /*
1031 * Local message timeout
1032 */
1034 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.timeout.ms" }},
1035
1036 /*
1037 * Partitioning strategy
1038 */
1040 .uctx = &(fr_kafka_conf_ctx_t){ .property = "partitioner" }},
1041
1042 /*
1043 * compression codec to use for compressing message sets.
1044 */
1046 .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.type" }},
1047
1048 /*
1049 * compression level to use
1050 */
1052 .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.level" }},
1053
1054 /*
1055 * Escape hatch for rd_kafka_topic_conf_set properties not
1056 * covered above. Same shape as the top-level properties
1057 * block but writes to the per-topic conf.
1058 */
1060
1062};
1063
1064/*
1065 * Allows topic configurations in the format:
1066 *
1067 * topic {
1068 * <name> {
1069 * request_required_acks = ...
1070 * }
1071 * }
1072 *
1073 */
1081
1082/* The producer config now lives entirely in the `KAFKA_PRODUCER_CONFIG`
1083 * macro in base.h so callers can compose it with their own config entries.
1084 * See that macro for the full set of librdkafka pass-through properties. */
1085
1086/** @} */
1087
1088/** @name Consumer-specific topic + group config
1089 * @{
1090 */
1091
1093 /*
1094 * Group consumer is a member of
1095 */
1097 .uctx = &(fr_kafka_conf_ctx_t){ .property = "group.id" }},
1098
1099 /*
1100 * A unique identifier of the consumer instance provided by the end user
1101 */
1103 .uctx = &(fr_kafka_conf_ctx_t){ .property = "group.instance.id" }},
1104
1105 /*
1106 * Range or roundrobin
1107 */
1108 { FR_CONF_PAIR_GLOBAL("partition_assignment_strategy", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
1109 .uctx = &(fr_kafka_conf_ctx_t){ .property = "partition.assignment.strategy" }},
1110
1111 /*
1112 * Client group session and failure detection timeout.
1113 */
1115 .uctx = &(fr_kafka_conf_ctx_t){ .property = "session.timeout.ms" }},
1116
1117 /*
1118 * Group session keepalive heartbeat interval.
1119 */
1121 .uctx = &(fr_kafka_conf_ctx_t){ .property = "heartbeat.interval.ms" }},
1122
1123 /*
1124 * How often to query for the current client group coordinator
1125 */
1126 { FR_CONF_PAIR_GLOBAL("coordinator_query_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
1127 .uctx = &(fr_kafka_conf_ctx_t){ .property = "coordinator.query.interval.ms" }},
1128
1129
1131};
1132
1134 /*
1135 * How many messages we process at a time
1136 *
1137 * High numbers may starve the worker thread
1138 */
1140 .uctx = &(fr_kafka_conf_ctx_t){ .property = "consume.callback.max.messages" }},
1141
1142 /*
1143 * Action to take when there is no initial offset
1144 * in offset store or the desired offset is out of range.
1145 */
1147 .uctx = &(fr_kafka_conf_ctx_t){ .property = "auto.offset.reset" }},
1148
1149 /*
1150 * Escape hatch for rd_kafka_topic_conf_set properties not
1151 * covered above.
1152 */
1154
1156};
1157
1158/*
1159 * Allows topic configurations in the format:
1160 *
1161 * topic {
1162 * <name> {
1163 * request_required_acks = ...
1164 * }
1165 * }
1166 *
1167 */
1175
1176/* The consumer config now lives in the `KAFKA_CONSUMER_CONFIG` macro in
1177 * base.h so callers can compose it with their own entries. */
1178
1179/** @} */
1180
1181/** @name Library init
1182 *
1183 * librdkafka defers SSL / SASL / internal-refcount setup until the first
1184 * `rd_kafka_new()`. Doing that lazily in a worker thread races the
1185 * server's own OpenSSL init and leaves the ordering non-deterministic,
1186 * so we kick it once at module load via `fr_kafka_init()`. The counter
1187 * mirrors `fr_openssl_init()` in src/lib/tls/base.c.
1188 *
1189 * @{
1190 */
1192
1193static void _kafka_null_log_cb(UNUSED rd_kafka_t const *rk, UNUSED int level,
1194 UNUSED char const *fac, UNUSED char const *buf)
1195{
1196 /* swallow the "no bootstrap brokers" warning from the dummy producer */
1197}
1198
1199/** Drive librdkafka's lazy global init deterministically
1200 *
1201 * First call creates and immediately destroys a throwaway producer, which
1202 * walks all of librdkafka's one-shot init paths (SSL lock callbacks on
1203 * OpenSSL 1.0.2, SASL global init if compiled in, etc.). Subsequent
1204 * calls just bump the refcount so multiple kafka-using modules can share
1205 * the init.
1206 */
1208{
1209 rd_kafka_conf_t *conf;
1210 rd_kafka_t *rk;
1211 char errstr[512];
1212
1213 if (kafka_instance_count > 0) {
1215 return 0;
1216 }
1217
1218 conf = rd_kafka_conf_new();
1219 rd_kafka_conf_set_log_cb(conf, _kafka_null_log_cb);
1220
1221 rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
1222 if (!rk) {
1223 fr_strerror_printf("Failed priming librdkafka globals: %s", errstr);
1224 return -1;
1225 }
1226 rd_kafka_destroy(rk);
1227
1229 return 0;
1230}
1231
1232/** Drop one ref to librdkafka's global init
1233 *
1234 * librdkafka refcounts its own globals internally; our counter just
1235 * pairs fr_kafka_init() calls so re-entrant module load/unload in test
1236 * harnesses does the right thing.
1237 */
1239{
1240 if (kafka_instance_count == 0) return;
1242}
1243
1244/** @} */
1245/** @} */
#define L(_str)
Helper for initialising arrays of string literals.
Definition build.h:228
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:113
#define UNUSED
Definition build.h:336
#define NUM_ELEMENTS(_t)
Definition build.h:358
int cf_section_parse(TALLOC_CTX *ctx, void *base, CONF_SECTION *cs)
Parse a configuration section into user-supplied variables.
Definition cf_parse.c:1279
int cf_pair_to_value_box(TALLOC_CTX *ctx, fr_value_box_t *out, CONF_PAIR *cp, conf_parser_t const *rule)
Parses a CONF_PAIR into a boxed value.
Definition cf_parse.c:128
#define CONF_PARSER_TERMINATOR
Definition cf_parse.h:669
void const * uctx
User data accessible by the cf_parse_t func.
Definition cf_parse.h:629
#define FR_CONF_PAIR_GLOBAL(_name, _type, _flags, _func, _dflt_func)
conf_parser_t entry which doesn't fill in a pointer or offset, but relies on functions to record valu...
Definition cf_parse.h:385
#define FR_CONF_SUBSECTION_GLOBAL(_name, _flags, _subcs)
conf_parser_t entry which runs conf_parser_t entries for a subsection without any output
Definition cf_parse.h:398
fr_type_t type
An fr_type_t value, controls the output type.
Definition cf_parse.h:610
#define fr_rule_multi(_rule)
Definition cf_parse.h:494
char const * name1
Name of the CONF_ITEM to parse.
Definition cf_parse.h:607
@ CONF_FLAG_MULTI
CONF_PAIR can have multiple copies.
Definition cf_parse.h:446
@ CONF_FLAG_SECRET
Only print value if debug level >= 3.
Definition cf_parse.h:433
@ CONF_FLAG_FILE_READABLE
File matching value must exist, and must be readable.
Definition cf_parse.h:435
Defines a CONF_PAIR to C data type mapping.
Definition cf_parse.h:606
Common header for all CONF_* types.
Definition cf_priv.h:49
Configuration AVP similar to a fr_pair_t.
Definition cf_priv.h:72
A section grouping multiple CONF_PAIR.
Definition cf_priv.h:101
CONF_PAIR * cf_pair_find_next(CONF_SECTION const *cs, CONF_PAIR const *prev, char const *attr)
Find a pair with a name matching attr, after specified pair.
Definition cf_util.c:1436
unsigned int cf_pair_count(CONF_SECTION const *cs, char const *attr)
Count the number of times an attribute occurs in a parent section.
Definition cf_util.c:1503
CONF_PAIR * cf_pair_alloc(CONF_SECTION *parent, char const *attr, char const *value, fr_token_t op, fr_token_t lhs_quote, fr_token_t rhs_quote)
Allocate a CONF_PAIR.
Definition cf_util.c:1269
char const * cf_section_name1(CONF_SECTION const *cs)
Return the first identifier of a CONF_SECTION.
Definition cf_util.c:1173
CONF_SECTION * cf_item_to_section(CONF_ITEM const *ci)
Cast a CONF_ITEM to a CONF_SECTION.
Definition cf_util.c:685
CONF_PAIR * cf_item_to_pair(CONF_ITEM const *ci)
Cast a CONF_ITEM to a CONF_PAIR.
Definition cf_util.c:665
char const * cf_pair_value(CONF_PAIR const *pair)
Return the value of a CONF_PAIR.
Definition cf_util.c:1581
char const * cf_pair_attr(CONF_PAIR const *pair)
Return the attr of a CONF_PAIR.
Definition cf_util.c:1565
#define cf_log_err(_cf, _fmt,...)
Definition cf_util.h:287
#define cf_parent(_cf)
Definition cf_util.h:98
#define cf_log_perr(_cf, _fmt,...)
Definition cf_util.h:294
#define cf_log_debug(_cf, _fmt,...)
Definition cf_util.h:290
#define cf_item_mark_parsed(_cf)
Definition cf_util.h:133
#define CF_IDENT_ANY
Definition cf_util.h:75
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:212
#define MEM(x)
Definition debug.h:46
Test enumeration values.
Definition dict_test.h:92
talloc_free(hp)
size_t size_scale
Divide/multiply FR_TYPE_SIZE by this amount.
Definition base.h:84
rd_kafka_conf_t * conf
Definition base.h:49
CONF_SECTION * cs
topic's CONF_SECTION (for call_env lookups of per-topic pairs like value / key)
Definition base.h:69
fr_rb_tree_t * topics
Declared topics, keyed by name.
Definition base.h:51
rd_kafka_topic_conf_t * rdtc
Definition base.h:57
char const * property
Kafka configuration property.
Definition base.h:85
char const * name
as it appeared in config
Definition base.h:67
fr_kafka_topic_conf_t * conf
parsed per-topic librdkafka conf
Definition base.h:68
char const * string_sep
Used for multi-value configuration items.
Definition base.h:86
bool empty_default
Don't produce messages saying the default is missing.
Definition base.h:83
size_t * mapping_len
Length of the mapping tables.
Definition base.h:82
fr_table_ptr_sorted_t * mapping
Mapping table between string constant.
Definition base.h:81
uctx attached to each entry in KAFKA_BASE_PRODUCER_CONFIG
Definition base.h:80
Declared topic record - one per topic { <name> { ... } } subsection.
Definition base.h:66
void fr_kafka_free(void)
Drop one ref to librdkafka's global init.
Definition base.c:1238
conf_parser_t const kafka_connection_config[]
Definition base.c:858
conf_parser_t const kafka_base_consumer_topics_config[]
Definition base.c:1168
static int kafka_topic_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
Return the default value for a topic from the kafka client library.
Definition base.c:556
conf_parser_t const kafka_base_topic_properties_config[]
Per-topic properties { ... } escape-hatch contents.
Definition base.c:716
static size_t kafka_check_cert_cn_table_len
Definition base.c:797
fr_kafka_topic_t * kafka_topic_conf_find(fr_kafka_conf_t const *kc, char const *name)
Look up a declared topic by name on an fr_kafka_conf_t
Definition base.c:623
static uint32_t kafka_instance_count
Definition base.c:1191
int kafka_config_parse(TALLOC_CTX *ctx, UNUSED void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Translate config items directly to settings in a kafka config struct.
Definition base.c:295
conf_parser_t const kafka_base_consumer_topic_config[]
Definition base.c:1133
int kafka_config_raw_parse(TALLOC_CTX *ctx, UNUSED void *out, void *base, CONF_ITEM *ci, UNUSED conf_parser_t const *rule)
Untyped passthrough: hand a CONF_PAIR's attr/value straight to rd_kafka_conf_set.
Definition base.c:452
int kafka_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
Return the default value from the kafka client library.
Definition base.c:367
static fr_kafka_conf_t * kafka_conf_get(TALLOC_CTX *ctx, void *base)
Fetch the fr_kafka_conf_t currently being populated by the parser.
Definition base.c:262
static int kafka_topic_config_parse(UNUSED TALLOC_CTX *ctx, UNUSED void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Translate config items directly to settings in a kafka topic config struct.
Definition base.c:520
int fr_kafka_init(void)
Drive librdkafka's lazy global init deterministically.
Definition base.c:1207
int kafka_topic_config_raw_parse(UNUSED TALLOC_CTX *ctx, UNUSED void *out, void *base, CONF_ITEM *ci, UNUSED conf_parser_t const *rule)
Topic-level counterpart to kafka_config_raw_parse
Definition base.c:598
conf_parser_t const kafka_base_properties_config[]
properties { ... } escape-hatch contents
Definition base.c:706
static int _kafka_topic_conf_free(fr_kafka_topic_conf_t *ktc)
Destructor on a per-topic conf - releases the librdkafka handle.
Definition base.c:482
static conf_parser_t const kafka_sasl_oauth_config[]
Definition base.c:721
static fr_table_ptr_sorted_t kafka_check_cert_cn_table[]
Definition base.c:791
conf_parser_t const kafka_version_config[]
Definition base.c:935
int kafka_topic_subsection_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, UNUSED conf_parser_t const *rule)
Per-topic subsection hook.
Definition base.c:639
static int kafka_noop_parse(UNUSED TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base, UNUSED CONF_ITEM *ci, UNUSED conf_parser_t const *rule)
No-op parser used to reserve CONF_PAIR names inside a topic subsection that the module reads separate...
Definition base.c:224
conf_parser_t const kafka_consumer_group_config[]
Definition base.c:1092
conf_parser_t const kafka_tls_config[]
Definition base.c:799
conf_parser_t const kafka_base_producer_topics_config[]
Definition base.c:1074
static fr_kafka_topic_conf_t * kafka_topic_conf_alloc(TALLOC_CTX *ctx)
Allocate a per-topic conf parented under ctx
Definition base.c:494
static int _kafka_conf_free(rd_kafka_conf_t **pconf)
Destructor on the talloc sentinel that owns the rd_kafka_conf_t handle.
Definition base.c:246
static void _kafka_null_log_cb(UNUSED rd_kafka_t const *rk, UNUSED int level, UNUSED char const *fac, UNUSED char const *buf)
Definition base.c:1193
conf_parser_t const kafka_metadata_config[]
Definition base.c:963
static conf_parser_t const kafka_sasl_kerberos_config[]
Definition base.c:731
static conf_parser_t const kafka_base_producer_topic_config[]
Definition base.c:1007
static int kafka_config_parse_single(char const **out, CONF_PAIR *cp, conf_parser_t const *rule)
Common parse path for a single CONF_PAIR's value.
Definition base.c:46
static int kafka_config_dflt_single(CONF_PAIR **out, UNUSED void *parent, CONF_SECTION *cs, char const *value, fr_token_t quote, conf_parser_t const *rule)
Common dflt path: take a librdkafka-native value string and materialise it as a CONF_PAIR in the call...
Definition base.c:148
static int8_t _kafka_topic_cmp(void const *one, void const *two)
Order-by-name comparator for the fr_kafka_conf_t.topics tree.
Definition base.c:616
conf_parser_t const kafka_sasl_config[]
Definition base.c:765
size_t fr_sbuff_out_unescape_until(fr_sbuff_t *out, fr_sbuff_t *in, size_t len, fr_sbuff_term_t const *tt, fr_sbuff_unescape_rules_t const *u_rules)
fr_type_t
@ FR_TYPE_TIME_DELTA
A period of time measured in nanoseconds.
@ FR_TYPE_INT8
8 Bit signed integer.
@ FR_TYPE_STRING
String of printable characters.
@ FR_TYPE_INT16
16 Bit signed integer.
@ FR_TYPE_UINT32
32 Bit unsigned integer.
@ FR_TYPE_UINT64
64 Bit unsigned integer.
@ FR_TYPE_BOOL
A truth value.
@ FR_TYPE_SIZE
Unsigned integer capable of representing any memory address on the local system.
unsigned int uint32_t
unsigned char uint8_t
ssize_t fr_slen_t
static rs_t * conf
Definition radsniff.c:52
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
Definition rb.h:244
static char const * name
fr_slen_t fr_sbuff_array_concat(fr_sbuff_t *out, char const *const *array, char const *sep)
Concat an array of strings (not NULL terminated), with a string separator.
Definition sbuff.c:2337
ssize_t fr_sbuff_in_sprintf(fr_sbuff_t *sbuff, char const *fmt,...)
Print using a fmt string to an sbuff.
Definition sbuff.c:1612
#define fr_sbuff_start(_sbuff_or_marker)
#define FR_SBUFF_IN(_start, _len_or_end)
char const * str
Terminal string.
Definition sbuff.h:160
char const * name
Name for rule set to aid we debugging.
Definition sbuff.h:209
size_t len
Length of the list.
Definition sbuff.h:170
#define FR_SBUFF_IN_STR(_start)
#define fr_sbuff_advance(_sbuff_or_marker, _len)
#define FR_SBUFF_OUT(_start, _len_or_end)
char subs[SBUFF_CHAR_CLASS]
Special characters and their substitutions.
Definition sbuff.h:212
#define FR_SBUFF_TALLOC_THREAD_LOCAL(_out, _init, _max)
Terminal element with pre-calculated lengths.
Definition sbuff.h:159
Set of terminal elements.
Set of parsing rules for *unescape_until functions.
fr_slen_t fr_size_from_str(size_t *out, fr_sbuff_t *in)
Parse a size string with optional unit.
Definition size.c:40
fr_slen_t fr_size_to_str(fr_sbuff_t *out, size_t in)
Print a size string with unit.
Definition size.c:155
static char buff[sizeof("18446744073709551615")+3]
Definition size_tests.c:41
return count
Definition module.c:155
fr_aka_sim_id_type_t type
#define fr_table_value_by_str(_table, _name, _def)
Convert a string to a value using a sorted or ordered table.
Definition table.h:653
#define fr_table_str_by_str_value(_table, _str_value, _def)
Brute force search a sorted or ordered ptr table, assuming the pointers are strings.
Definition table.h:625
An element in a lexicographically sorted array of name to ptr mappings.
Definition table.h:65
#define talloc_strdup(_ctx, _str)
Definition talloc.h:142
fr_slen_t fr_time_delta_from_str(fr_time_delta_t *out, char const *in, size_t inlen, fr_time_res_t hint)
Create fr_time_delta_t from a string.
Definition time.c:412
fr_slen_t fr_time_delta_to_str(fr_sbuff_t *out, fr_time_delta_t delta, fr_time_res_t res, bool is_unsigned)
Print fr_time_delta_t to a string with an appropriate suffix.
Definition time.c:440
@ FR_TIME_RES_MSEC
Definition time.h:58
@ FR_TIME_RES_SEC
Definition time.h:50
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
Definition time.h:637
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
enum fr_token fr_token_t
@ T_BARE_WORD
Definition token.h:118
@ T_OP_EQ
Definition token.h:81
static fr_slen_t parent
Definition pair.h:858
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_type_is_string(_x)
Definition types.h:348
void fr_value_box_clear(fr_value_box_t *data)
Clear/free any existing value and metadata.
Definition value.c:4377
#define FR_VALUE_BOX_INITIALISER_NULL(_vb)
A static initialiser for stack/globally allocated boxes.
Definition value.h:511
static size_t char ** out
Definition value.h:1030