The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
base.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 4efb5ac26d9d55a0e67fb83e735bc5efd5ded9c4 $
19 * @file kafka/base.c
20 * @brief Kafka global structures
21 *
22 * @copyright 2022 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
23 */
24
25#include <freeradius-devel/kafka/base.h>
26#include <freeradius-devel/server/tmpl.h>
27#include <freeradius-devel/util/size.h>
28
29typedef struct {
30 rd_kafka_conf_t *conf;
32
33typedef struct {
34 rd_kafka_topic_conf_t *conf;
36
37typedef struct {
38 fr_table_ptr_sorted_t *mapping; //!< Mapping table between string constant.
39
40 size_t *mapping_len; //!< Length of the mapping tables
41
42 bool empty_default; //!< Don't produce messages saying the default is missing.
43
44 size_t size_scale; //!< Divide/multiply FR_TYPE_SIZE by this amount.
45
46 char const *property; //!< Kafka configuration property.
47
48 char const *string_sep; //!< Used for multi-value configuration items.
49 //!< Kafka uses ', ' or ';' seemingly at random.
51
52/** Destroy a kafka configuration handle
53 *
54 * @param[in] kc To destroy.
55 * @return 0
56 */
58{
59 rd_kafka_conf_destroy(kc->conf);
60 return 0;
61}
62
63static inline CC_HINT(always_inline)
65{
66 CONF_DATA const *cd;
68
69 cd = cf_data_find(cs, fr_kafka_conf_t, "conf");
70 if (cd) {
71 kc = cf_data_value(cd);
72 } else {
73 MEM(kc = talloc(NULL, fr_kafka_conf_t));
74 MEM(kc->conf = rd_kafka_conf_new());
75 talloc_set_destructor(kc, _kafka_conf_free);
76 cf_data_add(cs, kc, "conf", true);
77 }
78
79 return kc;
80}
81
82/** Destroy a kafka topic configuration handle
83 *
84 * @param[in] ktc To destroy.
85 * @return 0
86 */
88{
89 rd_kafka_topic_conf_destroy(ktc->conf);
90 return 0;
91}
92
93static inline CC_HINT(always_inline)
95{
96 CONF_DATA const *cd;
98
99 cd = cf_data_find(cs, fr_kafka_topic_conf_t, "conf");
100 if (cd) {
101 ktc = cf_data_value(cd);
102 } else {
103 MEM(ktc = talloc(NULL, fr_kafka_topic_conf_t));
104 MEM(ktc->conf = rd_kafka_topic_conf_new());
105 talloc_set_destructor(ktc, _kafka_topic_conf_free);
106 cf_data_add(cs, ktc, "conf", true);
107 }
108
109 return ktc;
110}
111
112/** Perform any conversions necessary to map kafka defaults to our values
113 *
114 * @param[out] out Where to write the pair.
115 * @param[in] parent being populated.
116 * @param[in] cs to allocate the pair in.
117 * @param[in] value to convert.
118 * @param[in] quote to use when allocing the pair.
119 * @param[in] rule UNUSED.
120 * @return
121 * - 0 on success.
122 * - -1 on failure.
123 */
125 fr_token_t quote, conf_parser_t const *rule)
126{
127 char tmp[sizeof("18446744073709551615b")];
128 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
129 fr_type_t type = rule->type;
130
131 /*
132 * Apply any mappings available, but default back
133 * to the raw value if we don't have a match.
134 */
135 if (kctx->mapping) {
136 fr_table_ptr_sorted_t *mapping = kctx->mapping;
137 size_t mapping_len = *kctx->mapping_len;
138
140 }
141 /*
142 * Convert time delta as an integer with ms precision
143 */
144 switch (type) {
146 {
147 fr_sbuff_t value_elem = FR_SBUFF_IN(tmp, sizeof(tmp));
148 fr_time_delta_t delta;
149
150 if (fr_time_delta_from_str(&delta, value, strlen(value), FR_TIME_RES_MSEC) < 0) {
151 cf_log_perr(cs, "Failed parsing default \"%s\"", value);
152 return -1;
153 }
154
155 fr_time_delta_to_str(&value_elem, delta, FR_TIME_RES_SEC, true);
156 value = fr_sbuff_start(&value_elem);
157 }
158 break;
159
160 case FR_TYPE_SIZE:
161 {
162 fr_sbuff_t value_elem = FR_SBUFF_IN(tmp, sizeof(tmp));
163 size_t size;
164
165 if (fr_size_from_str(&size, &FR_SBUFF_IN_STR(value)) < 0) {
166 cf_log_perr(cs, "Failed parsing default \"%s\"", value);
167 return -1;
168 }
169
170 /*
171 * Some options are in kbytes *sigh*
172 */
173 if (kctx->size_scale) size *= kctx->size_scale;
174
175 /*
176 * reprint the size with an appropriate unit
177 */
178 if (fr_size_to_str(&value_elem, size) < 0) {
179 cf_log_perr(cs, "Failed size reprint");
180 return -1;
181 }
182 value = fr_sbuff_start(&value_elem);
183 }
184 break;
185
186 default:
187 break;
188 }
189
190 MEM(*out = cf_pair_alloc(cs, rule->name1, value, T_OP_EQ, T_BARE_WORD, quote));
191 cf_item_mark_parsed(*out); /* Don't re-parse this */
192
193 return 0;
194}
195
196/** Return the default value from the kafka client library
197 *
198 * @param[out] out Where to write the pair.
199 * @param[in] parent being populated.
200 * @param[in] cs to allocate the pair in.
201 * @param[in] quote to use when allocing the pair.
202 * @param[in] rule UNUSED.
203 * @return
204 * - 0 on success.
205 * - -1 on failure.
206 */
207static int kafka_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
208{
209 char buff[1024];
210 size_t buff_len = sizeof(buff);
211 char const *value;
212
213 fr_kafka_conf_t *kc;
214 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
215 rd_kafka_conf_res_t ret;
216
217 kc = kafka_conf_from_cs(cs);
218 fr_assert(kc);
219
220 if ((ret = rd_kafka_conf_get(kc->conf, kctx->property, buff, &buff_len)) != RD_KAFKA_CONF_OK) {
221 if (ret == RD_KAFKA_CONF_UNKNOWN) {
222 if (kctx->empty_default) return 0;
223
224 cf_log_debug(cs, "No default available for \"%s\" - \"%s\"", rule->name1, kctx->property);
225 return 0; /* Not an error */
226 }
227
228 cf_log_err(cs, "Failed retrieving kafka property \"%s\"", kctx->property);
229 return -1;
230 }
231#if 0
232 cf_log_debug(cs, "Retrieved dflt \"%s\" for \"%s\" - \"%s\"", buff, rule->name1, kctx->property);
233#endif
234 value = buff;
235
236 /*
237 * If it's multi we need to break the string apart on the string separator
238 * and potentially unescape the separator.
239 */
240 if (fr_rule_multi(rule)) {
241 fr_sbuff_t value_in = FR_SBUFF_IN(value, buff_len);
242 char tmp[256];
243 fr_sbuff_t value_elem = FR_SBUFF_OUT(tmp, sizeof(tmp));
245 fr_sbuff_unescape_rules_t ue_rules = {
246 .name = __FUNCTION__,
247 .chr = '\\'
248 };
249 /*
250 * Convert escaped separators back
251 */
252 ue_rules.subs[(uint8_t)kctx->string_sep[0]] = kctx->string_sep[0];
253
254 while (fr_sbuff_out_unescape_until(&value_elem, &value_in, SIZE_MAX, &tt, &ue_rules) > 0) {
255 if (kafka_config_dflt_single(out, parent, cs, fr_sbuff_start(&value_elem), quote, rule) < 0) return -1;
256
257 /*
258 * Skip past the string separator
259 */
260 fr_sbuff_advance(&value_in, strlen(kctx->string_sep));
261
262 /*
263 * Reset
264 */
265 fr_sbuff_set_to_start(&value_elem);
266 }
267 return 0;
268 }
269
270 /*
271 * Parse a single value
272 */
273 if (kafka_config_dflt_single(out, parent, cs, value, quote, rule) < 0) return -1;
274
275 return 0;
276}
277
278/** Return the default value for a topic from the kafka client library
279 *
280 * @param[out] out Where to write the pair.
281 * @param[in] parent being populated.
282 * @param[in] cs to allocate the pair in.
283 * @param[in] quote to use when allocing the pair.
284 * @param[in] rule UNUSED.
285 * @return
286 * - 0 on success.
287 * - -1 on failure.
288 */
290{
291 char buff[1024];
292 size_t buff_len = sizeof(buff);
293 char const *value;
294
296 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
297 rd_kafka_conf_res_t ret;
298
299 ktc = kafka_topic_conf_from_cs(cs);
300 fr_assert(ktc);
301
302 if ((ret = rd_kafka_topic_conf_get(ktc->conf, kctx->property, buff, &buff_len)) != RD_KAFKA_CONF_OK) {
303 if (ret == RD_KAFKA_CONF_UNKNOWN) {
304 if (kctx->empty_default) return 0;
305
306 cf_log_debug(cs, "No default available for \"%s\" - \"%s\"", rule->name1, kctx->property);
307 return 0; /* Not an error */
308 }
309
310 cf_log_err(cs, "Failed retrieving kafka property '%s'", kctx->property);
311 return -1;
312 }
313#if 0
314 cf_log_debug(cs, "Retrieved dflt \"%s\" for \"%s\" - \"%s\"", buff, rule->name1, kctx->property);
315#endif
316 value = buff;
317
318 /*
319 * Parse a single value
320 */
321 if (kafka_config_dflt_single(out, parent, cs, value, quote, rule) < 0) return -1;
322
323 return 0;
324}
325
326static int kafka_config_parse_single(char const **out, CONF_PAIR *cp, conf_parser_t const *rule)
327{
329 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
330 fr_type_t type = rule->type;
331 static _Thread_local char buff[sizeof("18446744073709551615")];
332 static _Thread_local fr_sbuff_t sbuff;
333
334 /*
335 * Map string values if possible, and if there's
336 * no match then just pass the original through.
337 *
338 * We count this as validation...
339 */
340 if (kctx->mapping) {
341 fr_table_ptr_sorted_t *mapping = kctx->mapping;
342 size_t mapping_len = *kctx->mapping_len;
343
345 return 0;
346 } else if (fr_type_is_string(type)) {
347 *out = cf_pair_value(cp);
348 return 0;
349 }
350
351 /*
352 * Parse as a box for basic validation
353 */
354 if (cf_pair_to_value_box(NULL, &vb, cp, rule) < 0) return -1;
355
356 /*
357 * In kafka all the time deltas are in ms
358 * resolution, so we need to take the parsed value,
359 * scale it, and print it back to a string.
360 */
361 switch (type) {
363 {
364 uint64_t delta;
365
366 sbuff = FR_SBUFF_IN(buff, sizeof(buff));
367 delta = fr_time_delta_to_msec(vb.vb_time_delta);
368 if (fr_sbuff_in_sprintf(&sbuff, "%" PRIu64, delta) < 0) {
369 error:
371 return -1;
372 }
373 *out = fr_sbuff_start(&sbuff);
374 }
375 break;
376
377 case FR_TYPE_SIZE:
378 {
379 size_t size = vb.vb_size;
380
381 sbuff = FR_SBUFF_IN(buff, sizeof(buff));
382
383 /*
384 * Most options are in bytes, but some are in kilobytes
385 */
386 if (kctx->size_scale) size /= kctx->size_scale;
387
388 /*
389 * Kafka doesn't want units...
390 */
391 if (fr_sbuff_in_sprintf(&sbuff, "%zu", size) < 0) goto error;
392 *out = fr_sbuff_start(&sbuff);
393 }
394 break;
395
396 /*
397 * Ensure bool is always mapped to the string constants
398 * "true" or "false".
399 */
400 case FR_TYPE_BOOL:
401 *out = vb.vb_bool ? "true" : "false";
402 break;
403
404 default:
405 *out = cf_pair_value(cp);
406 break;
407 }
408
410
411 return 0;
412}
413
414/** Translate config items directly to settings in a kafka config struct
415 *
416 * @param[in] ctx to allocate fr_kafka_conf_t in.
417 * @param[out] out Unused.
418 * @param[in] base Unused.
419 * @param[in] ci To parse.
420 * @param[in] rule describing how to parse the item.
421 * @return
422 * - 0 on success.
423 * - -1 on failure
424 */
425static int kafka_config_parse(TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base,
426 CONF_ITEM *ci, conf_parser_t const *rule)
427{
428 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
431 CONF_PAIR *cp = cf_item_to_pair(ci);
432
433 fr_kafka_conf_t *kc;
434 char const *value;
435
437
438 /*
439 * Multi rules require us to concat the values together before handing them off
440 */
441 if (fr_rule_multi(rule)) {
442 unsigned int i;
443 CONF_PAIR *cp_p;
444 size_t count;
445 char const **array;
446 fr_sbuff_t *agg;
447 fr_slen_t slen;
448
449 FR_SBUFF_TALLOC_THREAD_LOCAL(&agg, 256, SIZE_MAX);
450
451 count = cf_pair_count(cs, rule->name1);
452 if (count <= 1) goto do_single;
453
454 MEM(array = talloc_array(ctx, char const *, count));
455 for (cp_p = cp, i = 0;
456 cp_p;
457 cp_p = cf_pair_find_next(cs, cp_p, rule->name1), i++) {
458 if (kafka_config_parse_single(&array[i], cp_p, rule) < 0) return -1;
460 }
461
462 slen = fr_sbuff_array_concat(agg, array, kctx->string_sep);
463 talloc_free(array);
464 if (slen < 0) return -1;
465
466 value = fr_sbuff_start(agg);
467 } else {
468 do_single:
469 if (kafka_config_parse_single(&value, cp, rule) < 0) return -1;
470 }
471
472 {
473 char errstr[512];
474
475 if (rd_kafka_conf_set(kc->conf, kctx->property,
476 value, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
477 cf_log_perr(cp, "%s", errstr);
478 return -1;
479 }
480 }
481
482 return 0;
483}
484
485
486/** Translate config items directly to settings in a kafka topic config struct
487 *
488 * @param[in] ctx to allocate fr_kafka_conf_t in.
489 * @param[out] out Unused.
490 * @param[in] base Unused.
491 * @param[in] ci To parse.
492 * @param[in] rule describing how to parse the item.
493 * @return
494 * - 0 on success.
495 * - -1 on failure
496 */
497static int kafka_topic_config_parse(UNUSED TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base,
498 CONF_ITEM *ci, conf_parser_t const *rule)
499{
500 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
502 CONF_PAIR *cp = cf_item_to_pair(ci);
503
505 char const *value;
506
508 if (kafka_config_parse_single(&value, cp, rule) < 0) return -1;
509
510 {
511 char errstr[512];
512
513 if (rd_kafka_topic_conf_set(ktc->conf, kctx->property,
514 value, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
515 cf_log_perr(cp, "%s", errstr);
516 return -1;
517 }
518 }
519
520 return 0;
521}
522
523#if 0
524/** Configure a new topic for production or consumption
525 *
526 */
527static int kafka_topic_new(UNUSED TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base,
528 CONF_ITEM *ci, conf_parser_t const *rule)
529{
530 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
532 CONF_PAIR *cp = cf_item_to_pair(ci);
533
535 char const *value;
536
538
539 rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)
540}
541#endif
542
545 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.oauthbearer.config", .empty_default = true }},
546
548 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.sasl.oauthbearer.unsecure.jwt" }},
549
551};
552
554 /*
555 * Service principal
556 */
558 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.service.name" }},
559
560 /*
561 * Principal
562 */
564 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.principal" }},
565
566 /*
567 * knit cmd
568 */
570 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.kinit.cmd" }},
571
572 /*
573 * keytab
574 */
576 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.kinit.keytab", .empty_default = true }},
577
578 /*
579 * How long between key refreshes
580 */
582 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.min.time.before.relogin" }},
583
585};
586
588 /*
589 * SASL mechanism
590 */
592 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.mechanism" }},
593
594 /*
595 * Static SASL username
596 */
598 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.username", .empty_default = true }},
599
600 /*
601 * Static SASL password
602 */
604 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.password", .empty_default = true }},
605
607
609
611};
612
614 { L("false"), "none" },
615 { L("no"), "none" },
616 { L("true"), "https" },
617 { L("yes"), "https" }
618};
620
622 /*
623 * Cipher suite list in OpenSSL's format
624 */
626 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.cipher.suites", .empty_default = true }},
627
628 /*
629 * Curves list in OpenSSL's format
630 */
632 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.curves.list", .empty_default = true }},
633
634 /*
635 * Curves list in OpenSSL's format
636 */
638 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.sigalgs.list", .empty_default = true }},
639
640 /*
641 * Sets the full path to a CA certificate (used to validate
642 * the certificate the server presents).
643 */
645 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.ca.location", .empty_default = true }},
646
647 /*
648 * Location of the CRL file.
649 */
651 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.crl.location", .empty_default = true }},
652
653 /*
654 * Sets the path to the public certificate file we present
655 * to the servers.
656 */
658 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.certificate.location", .empty_default = true }},
659
660 /*
661 * Sets the path to the private key for our public
662 * certificate.
663 */
665 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.key.location", .empty_default = true }},
666
667 /*
668 * Enable or disable certificate validation
669 */
671 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.ssl.certificate.verification" }},
672
674 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.endpoint.identification.algorithm",
675 .mapping = kafka_check_cert_cn_table,
676 .mapping_len = &kafka_check_cert_cn_table_len }},
678};
679
681 /*
682 * Socket timeout
683 */
685 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.timeout.ms" }},
686
687 /*
688 * Close broker connections after this period.
689 */
691 .uctx = &(fr_kafka_conf_ctx_t){ .property = "connections.max.idle.ms" }},
692
693 /*
694 * Maximum requests in flight (per connection).
695 */
696 { FR_CONF_FUNC("max_requests_in_flight", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
697 .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.in.flight.requests.per.connection" }},
698
699 /*
700 * Socket send buffer.
701 */
703 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.send.buffer.bytes" }},
704
705 /*
706 * Socket recv buffer.
707 */
709 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.receive.buffer.bytes" }},
710
711 /*
712 * If true, send TCP keepalives
713 */
715 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.keepalive.enable" }},
716
717 /*
718 * If true, disable nagle algorithm
719 */
721 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.nagle.disable" }},
722
723 /*
724 * How long the DNS resolver cache is valid for
725 */
727 .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.address.ttl" }},
728
729 /*
730 * Should we use A records, AAAA records or either
731 * when resolving broker addresses
732 */
733 { FR_CONF_FUNC("resolver_addr_family", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
734 .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.address.family" }},
735
736 /*
737 * How many failures before we reconnect the connection
738 */
739 { FR_CONF_FUNC("reconnection_failure_count", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
740 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.max.fails" }},
741
742 /*
743 * Initial time to wait before reconnecting.
744 */
745 { FR_CONF_FUNC("reconnection_delay_initial", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
746 .uctx = &(fr_kafka_conf_ctx_t){ .property = "reconnect.backoff.ms" }},
747
748 /*
749 * Max time to wait before reconnecting.
750 */
752 .uctx = &(fr_kafka_conf_ctx_t){ .property = "reconnect.backoff.max.ms" }},
753
755};
756
758 /*
759 * Request the API version from connected brokers
760 */
762 .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.request" }},
763
764 /*
765 * How long to wait for a version response.
766 */
768 .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.request.timeout.ms" }},
769
770 /*
771 * How long to wait before retrying a version request.
772 */
774 .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.fallback.ms" }},
775
776 /*
777 * Default version to use if the version request fails.
778 */
780 .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.version.fallback" }},
781
783};
784
786 /*
787 * Interval between attempts to refresh metadata from brokers
788 */
790 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.interval.ms" }},
791
792 /*
793 * Interval between attempts to refresh metadata from brokers
794 */
796 .uctx = &(fr_kafka_conf_ctx_t){ .property = "metadata.max.age.ms" }},
797
798 /*
799 * Used when a topic loses its leader
800 */
802 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.fast.interval.ms" }},
803
804 /*
805 * Used when a topic loses its leader to prevent spurious metadata changes
806 */
808 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.propagation.max.ms" }},
809
810 /*
811 * Use sparse metadata requests which use less bandwidth maps
812 */
814 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.sparse" }},
815
816 /*
817 * List of topics to ignore
818 */
820 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.blacklist", .string_sep = ",", .empty_default = true }},
821
823};
824
825#define BASE_CONFIG \
826 { FR_CONF_FUNC("server", FR_TYPE_STRING, CONF_FLAG_REQUIRED | CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
827 .uctx = &(fr_kafka_conf_ctx_t){ .property = "metadata.broker.list", .string_sep = "," }}, \
828 { FR_CONF_FUNC("client_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
829 .uctx = &(fr_kafka_conf_ctx_t){ .property = "client.id" }}, \
830 { FR_CONF_FUNC("rack_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
831 .uctx = &(fr_kafka_conf_ctx_t){ .property = "client.rack" }}, \
832 { FR_CONF_FUNC("request_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
833 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.max.bytes" }}, \
834 { FR_CONF_FUNC("request_copy_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
835 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.copy.max.bytes" }}, \
836 { FR_CONF_FUNC("response_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
837 .uctx = &(fr_kafka_conf_ctx_t){ .property = "receive.message.max.bytes" }}, \
838 { FR_CONF_FUNC("feature", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
839 .uctx = &(fr_kafka_conf_ctx_t){ .property = "builtin.features", .string_sep = "," }}, \
840 { FR_CONF_FUNC("debug", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
841 .uctx = &(fr_kafka_conf_ctx_t){ .property = "debug", .string_sep = "," }}, \
842 { FR_CONF_FUNC("plugin", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, NULL), \
843 .uctx = &(fr_kafka_conf_ctx_t){ .property = "plugin.library.paths", .string_sep = ";" }}, \
844 { FR_CONF_SUBSECTION_GLOBAL("metadata", 0, kafka_metadata_config) }, \
845 { FR_CONF_SUBSECTION_GLOBAL("version", 0, kafka_version_config) }, \
846 { FR_CONF_SUBSECTION_GLOBAL("connection", 0, kafka_connection_config) }, \
847 { FR_CONF_SUBSECTION_GLOBAL("tls", 0, kafka_tls_config) }, \
848 { FR_CONF_SUBSECTION_GLOBAL("sasl", 0, kafka_sasl_config) }
849
851 /*
852 * Group consumer is a member of
853 */
855 .uctx = &(fr_kafka_conf_ctx_t){ .property = "group.id" }},
856
857 /*
858 * A unique identifier of the consumer instance provided by the end user
859 */
861 .uctx = &(fr_kafka_conf_ctx_t){ .property = "group.instance.id" }},
862
863 /*
864 * Range or roundrobin
865 */
866 { FR_CONF_FUNC("partition_assignment_strategy", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
867 .uctx = &(fr_kafka_conf_ctx_t){ .property = "partition.assignment.strategy" }},
868
869 /*
870 * Client group session and failure detection timeout.
871 */
873 .uctx = &(fr_kafka_conf_ctx_t){ .property = "session.timeout.ms" }},
874
875 /*
876 * Group session keepalive heartbeat interval.
877 */
879 .uctx = &(fr_kafka_conf_ctx_t){ .property = "heartbeat.interval.ms" }},
880
881 /*
882 * How often to query for the current client group coordinator
883 */
884 { FR_CONF_FUNC("coordinator_query_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
885 .uctx = &(fr_kafka_conf_ctx_t){ .property = "coordinator.query.interval.ms" }},
886
887
889};
890
892 /*
893 * How many messages we process at a time
894 *
895 * High numbers may starve the worker thread
896 */
897 { FR_CONF_FUNC("max_messages_per_cycle", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
898 .uctx = &(fr_kafka_conf_ctx_t){ .property = "consume.callback.max.messages" }},
899
900 /*
901 * Action to take when there is no initial offset
902 * in offset store or the desired offset is out of range.
903 */
905 .uctx = &(fr_kafka_conf_ctx_t){ .property = "auto.offset.reset" }},
906
908};
909
910/*
911 * Allows topic configurations in the format:
912 *
913 * topic {
914 * <name> {
915 * request_required_acks = ...
916 * }
917 * }
918 *
919 */
925
929
930 /*
931 * Maximum allowed time between calls to consume messages.
932 */
934 .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.poll.interval.ms" }},
935
936 /*
937 * Toggle auto commit
938 */
940 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.auto.commit" }},
941
942 /*
943 * Auto commit interval
944 */
946 .uctx = &(fr_kafka_conf_ctx_t){ .property = "auto.commit.interval.ms" }},
947
948 /*
949 * Automatically store offset of last message provided to application.
950 */
952 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.auto.offset.store" }},
953
954 /*
955 * Minimum number of messages per topic+partition librdkafka tries to
956 * maintain in the local consumer queue.
957 */
959 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queued.min.messages" }},
960
961 /*
962 * Maximum size of queued pre-fetched messages in the local consumer queue.
963 */
964 { FR_CONF_FUNC("queued_messages_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt),
965 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queued.max.messages.kbytes", .size_scale = 1024 }},
966
967 /*
968 * Maximum time the broker may wait to fill the Fetch response.
969 */
971 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.wait.max.ms" }},
972
973 /*
974 * Initial maximum number of bytes per topic+partition to request when
975 * fetching messages from the broker.
976 */
977 { FR_CONF_FUNC("fetch_message_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt),
978 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.message.max.bytes" }},
979
980 /*
981 * Initial maximum number of bytes per topic+partition to request when
982 * fetching messages from the broker.
983 */
984 { FR_CONF_FUNC("fetch_partition_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt),
985 .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.partition.fetch.bytes" }},
986
987 /*
988 * Maximum amount of data the broker shall return for a Fetch request.
989 */
991 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.max.bytes" }},
992
993 /*
994 * Minimum number of bytes the broker responds with.
995 */
997 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.min.bytes" }},
998
999 /*
1000 * How long to postpone the next fetch request for a topic+partition
1001 * in case of a fetch error.
1002 */
1004 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.error.backoff.ms" }},
1005
1006 /*
1007 * Controls how to read messages written transactionally
1008 */
1010 .uctx = &(fr_kafka_conf_ctx_t){ .property = "isolation.level" }},
1011
1012 /*
1013 * Verify CRC32 of consumed messages, ensuring no on-the-wire or
1014 * on-disk corruption to the messages occurred.
1015 */
1017 .uctx = &(fr_kafka_conf_ctx_t){ .property = "check.crcs" }},
1018
1019 /*
1020 * Allow automatic topic creation on the broker when subscribing
1021 * to or assigning non-existent topics
1022 */
1024 .uctx = &(fr_kafka_conf_ctx_t){ .property = "allow.auto.create.topics" }},
1025
1027
1029};
1030
1032 /*
1033 * This field indicates the number of acknowledgements the leader
1034 * broker must receive from ISR brokers before responding to the request.
1035 */
1037 .uctx = &(fr_kafka_conf_ctx_t){ .property = "request.required.acks" }},
1038
1039 /*
1040 * medium The ack timeout of the producer request in milliseconds
1041 */
1043 .uctx = &(fr_kafka_conf_ctx_t){ .property = "request.timeout.ms" }},
1044
1045 /*
1046 * Local message timeout
1047 */
1049 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.timeout.ms" }},
1050
1051 /*
1052 * Partitioning strategy
1053 */
1055 .uctx = &(fr_kafka_conf_ctx_t){ .property = "partitioner" }},
1056
1057 /*
1058 * compression codec to use for compressing message sets.
1059 */
1061 .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.type" }},
1062
1063 /*
1064 * compression level to use
1065 */
1067 .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.level" }},
1068
1070};
1071
1072/*
1073 * Allows topic configurations in the format:
1074 *
1075 * topic {
1076 * <name> {
1077 * request_required_acks = ...
1078 * }
1079 * }
1080 *
1081 */
1087
1090
1091 /*
1092 * Enables the transactional producer
1093 */
1095 .uctx = &(fr_kafka_conf_ctx_t){ .property = "transactional.id", .empty_default = true }},
1096
1097 /*
1098 * The maximum amount of time in milliseconds that the transaction
1099 * coordinator will wait for a transaction status update from the
1100 * producer before proactively aborting the ongoing transaction.
1101 */
1103 .uctx = &(fr_kafka_conf_ctx_t){ .property = "transaction.timeout.ms" }},
1104
1105 /*
1106 * When set to true, the producer will ensure that messages are
1107 * successfully produced exactly once and in the original produce
1108 * order.
1109 */
1111 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.idempotence" }},
1112
1113 /*
1114 * When set to true, any error that could result in a gap in the
1115 * produced message series when a batch of messages fails.
1116 */
1118 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.gapless.guarantee" }},
1119
1120 /*
1121 * Maximum number of messages allowed on the producer queue.
1122 * This queue is shared by all topics and partitions.
1123 */
1125 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.messages" }},
1126
1127 /*
1128 * Maximum total message size sum allowed on the producer queue.
1129 */
1131 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.kbytes", .size_scale = 1024 }},
1132
1133 /*
1134 * How long we wait to aggregate messages
1135 */
1137 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.ms" }},
1138
1139 /*
1140 * How many times we resend a message
1141 */
1143 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.send.max.retries" }},
1144
1145 /*
1146 * The backoff time in milliseconds before retrying a protocol request.
1147 */
1148 { FR_CONF_FUNC("message_retry_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
1149 .uctx = &(fr_kafka_conf_ctx_t){ .property = "retry.backoff.ms" }},
1150
1151 /*
1152 * The threshold of outstanding not yet transmitted broker requests
1153 * needed to backpressure the producer's message accumulator.
1154 */
1155 { FR_CONF_FUNC("backpressure_threshold", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
1156 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.backpressure.threshold" }},
1157
1158 /*
1159 * compression codec to use for compressing message sets.
1160 */
1162 .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.type" }},
1163
1164 /*
1165 * Maximum size (in bytes) of all messages batched in one MessageSet
1166 */
1168 .uctx = &(fr_kafka_conf_ctx_t){ .property = "batch.size" }},
1169
1170 /*
1171 * Delay in milliseconds to wait to assign new sticky partitions for each topic
1172 */
1173 { FR_CONF_FUNC("sticky_partition_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
1174 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sticky.partitioning.linger.ms" }},
1175
1177
1179};
#define L(_str)
Helper for initialising arrays of string literals.
Definition build.h:228
#define UNUSED
Definition build.h:336
#define NUM_ELEMENTS(_t)
Definition build.h:358
int cf_pair_to_value_box(TALLOC_CTX *ctx, fr_value_box_t *out, CONF_PAIR *cp, conf_parser_t const *rule)
Parses a CONF_PAIR into a boxed value.
Definition cf_parse.c:128
#define CONF_PARSER_TERMINATOR
Definition cf_parse.h:657
void const * uctx
User data accessible by the cf_parse_t func.
Definition cf_parse.h:617
#define FR_CONF_SUBSECTION_GLOBAL(_name, _flags, _subcs)
conf_parser_t entry which runs conf_parser_t entries for a subsection without any output
Definition cf_parse.h:398
fr_type_t type
An fr_type_t value, controls the output type.
Definition cf_parse.h:598
#define fr_rule_multi(_rule)
Definition cf_parse.h:482
char const * name1
Name of the CONF_ITEM to parse.
Definition cf_parse.h:595
@ CONF_FLAG_MULTI
CONF_PAIR can have multiple copies.
Definition cf_parse.h:446
@ CONF_FLAG_SECRET
Only print value if debug level >= 3.
Definition cf_parse.h:433
@ CONF_FLAG_FILE_READABLE
File matching value must exist, and must be readable.
Definition cf_parse.h:435
#define FR_CONF_FUNC(_name, _type, _flags, _func, _dflt_func)
conf_parser_t entry which doesn't fill in a pointer or offset, but relies on functions to record valu...
Definition cf_parse.h:385
Defines a CONF_PAIR to C data type mapping.
Definition cf_parse.h:594
Internal data that is associated with a configuration section.
Definition cf_priv.h:125
Common header for all CONF_* types.
Definition cf_priv.h:49
Configuration AVP similar to a fr_pair_t.
Definition cf_priv.h:72
A section grouping multiple CONF_PAIR.
Definition cf_priv.h:101
CONF_PAIR * cf_pair_find_next(CONF_SECTION const *cs, CONF_PAIR const *prev, char const *attr)
Find a pair with a name matching attr, after specified pair.
Definition cf_util.c:1433
unsigned int cf_pair_count(CONF_SECTION const *cs, char const *attr)
Count the number of times an attribute occurs in a parent section.
Definition cf_util.c:1500
void * cf_data_value(CONF_DATA const *cd)
Return the user assigned value of CONF_DATA.
Definition cf_util.c:1743
CONF_PAIR * cf_pair_alloc(CONF_SECTION *parent, char const *attr, char const *value, fr_token_t op, fr_token_t lhs_quote, fr_token_t rhs_quote)
Allocate a CONF_PAIR.
Definition cf_util.c:1266
CONF_SECTION * cf_item_to_section(CONF_ITEM const *ci)
Cast a CONF_ITEM to a CONF_SECTION.
Definition cf_util.c:683
CONF_PAIR * cf_item_to_pair(CONF_ITEM const *ci)
Cast a CONF_ITEM to a CONF_PAIR.
Definition cf_util.c:663
char const * cf_pair_value(CONF_PAIR const *pair)
Return the value of a CONF_PAIR.
Definition cf_util.c:1574
#define cf_log_err(_cf, _fmt,...)
Definition cf_util.h:285
#define cf_data_add(_cf, _data, _name, _free)
Definition cf_util.h:251
#define cf_data_find(_cf, _type, _name)
Definition cf_util.h:240
#define cf_parent(_cf)
Definition cf_util.h:98
#define cf_log_perr(_cf, _fmt,...)
Definition cf_util.h:292
#define cf_log_debug(_cf, _fmt,...)
Definition cf_util.h:288
#define cf_item_mark_parsed(_cf)
Definition cf_util.h:133
#define CF_IDENT_ANY
Definition cf_util.h:75
#define MEM(x)
Definition debug.h:46
Test enumeration values.
Definition dict_test.h:92
talloc_free(hp)
static conf_parser_t const kafka_connection_config[]
Definition base.c:680
static conf_parser_t const kafka_base_consumer_topics_config[]
Definition base.c:920
static int kafka_topic_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
Return the default value for a topic from the kafka client library.
Definition base.c:289
static size_t kafka_check_cert_cn_table_len
Definition base.c:619
static fr_kafka_topic_conf_t * kafka_topic_conf_from_cs(CONF_SECTION *cs)
Definition base.c:94
size_t size_scale
Divide/multiply FR_TYPE_SIZE by this amount.
Definition base.c:44
static conf_parser_t const kafka_base_consumer_topic_config[]
Definition base.c:891
rd_kafka_conf_t * conf
Definition base.c:30
rd_kafka_topic_conf_t * conf
Definition base.c:34
#define BASE_CONFIG
Definition base.c:825
static int kafka_config_parse(TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Translate config items directly to settings in a kafka config struct.
Definition base.c:425
conf_parser_t const kafka_base_consumer_config[]
Definition base.c:926
char const * property
Kafka configuration property.
Definition base.c:46
static int _kafka_topic_conf_free(fr_kafka_topic_conf_t *ktc)
Destroy a kafka topic configuration handle.
Definition base.c:87
static conf_parser_t const kafka_sasl_oauth_config[]
Definition base.c:543
static fr_table_ptr_sorted_t kafka_check_cert_cn_table[]
Definition base.c:613
static conf_parser_t const kafka_version_config[]
Definition base.c:757
char const * string_sep
Used for multi-value configuration items.
Definition base.c:48
static conf_parser_t const kafka_consumer_group_config[]
Definition base.c:850
static conf_parser_t const kafka_tls_config[]
Definition base.c:621
static int kafka_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
Return the default value from the kafka client library.
Definition base.c:207
static fr_kafka_conf_t * kafka_conf_from_cs(CONF_SECTION *cs)
Definition base.c:64
static conf_parser_t const kafka_base_producer_topics_config[]
Definition base.c:1082
static int kafka_topic_config_parse(UNUSED TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Translate config items directly to settings in a kafka topic config struct.
Definition base.c:497
bool empty_default
Don't produce messages saying the default is missing.
Definition base.c:42
conf_parser_t const kafka_base_producer_config[]
Definition base.c:1088
static int _kafka_conf_free(fr_kafka_conf_t *kc)
Destroy a kafka configuration handle.
Definition base.c:57
static conf_parser_t const kafka_metadata_config[]
Definition base.c:785
size_t * mapping_len
Length of the mapping tables.
Definition base.c:40
fr_table_ptr_sorted_t * mapping
Mapping table between string constant.
Definition base.c:38
static conf_parser_t const kafka_sasl_kerberos_config[]
Definition base.c:553
static conf_parser_t const kafka_base_producer_topic_config[]
Definition base.c:1031
static int kafka_config_parse_single(char const **out, CONF_PAIR *cp, conf_parser_t const *rule)
Definition base.c:326
static int kafka_config_dflt_single(CONF_PAIR **out, UNUSED void *parent, CONF_SECTION *cs, char const *value, fr_token_t quote, conf_parser_t const *rule)
Perform any conversions necessary to map kafka defaults to our values.
Definition base.c:124
static conf_parser_t const kafka_sasl_config[]
Definition base.c:587
size_t fr_sbuff_out_unescape_until(fr_sbuff_t *out, fr_sbuff_t *in, size_t len, fr_sbuff_term_t const *tt, fr_sbuff_unescape_rules_t const *u_rules)
fr_type_t
@ FR_TYPE_TIME_DELTA
A period of time measured in nanoseconds.
@ FR_TYPE_INT8
8 Bit signed integer.
@ FR_TYPE_STRING
String of printable characters.
@ FR_TYPE_INT16
16 Bit signed integer.
@ FR_TYPE_UINT32
32 Bit unsigned integer.
@ FR_TYPE_UINT64
64 Bit unsigned integer.
@ FR_TYPE_BOOL
A truth value.
@ FR_TYPE_SIZE
Unsigned integer capable of representing any memory address on the local system.
unsigned char uint8_t
ssize_t fr_slen_t
#define fr_assert(_expr)
Definition rad_assert.h:37
static rs_t * conf
Definition radsniff.c:52
fr_slen_t fr_sbuff_array_concat(fr_sbuff_t *out, char const *const *array, char const *sep)
Concat an array of strings (not NULL terminated), with a string separator.
Definition sbuff.c:2329
ssize_t fr_sbuff_in_sprintf(fr_sbuff_t *sbuff, char const *fmt,...)
Print using a fmt string to an sbuff.
Definition sbuff.c:1604
#define fr_sbuff_start(_sbuff_or_marker)
#define FR_SBUFF_IN(_start, _len_or_end)
char const * name
Name for rule set to aid we debugging.
Definition sbuff.h:209
#define FR_SBUFF_IN_STR(_start)
#define fr_sbuff_advance(_sbuff_or_marker, _len)
#define FR_SBUFF_OUT(_start, _len_or_end)
char subs[SBUFF_CHAR_CLASS]
Special characters and their substitutions.
Definition sbuff.h:212
#define FR_SBUFF_TERM(_str)
Initialise a terminal structure with a single string.
Definition sbuff.h:178
#define FR_SBUFF_TALLOC_THREAD_LOCAL(_out, _init, _max)
Set of terminal elements.
Set of parsing rules for *unescape_until functions.
fr_slen_t fr_size_from_str(size_t *out, fr_sbuff_t *in)
Parse a size string with optional unit.
Definition size.c:40
fr_slen_t fr_size_to_str(fr_sbuff_t *out, size_t in)
Print a size string with unit.
Definition size.c:155
static char buff[sizeof("18446744073709551615")+3]
Definition size_tests.c:41
return count
Definition module.c:155
fr_aka_sim_id_type_t type
#define fr_table_value_by_str(_table, _name, _def)
Convert a string to a value using a sorted or ordered table.
Definition table.h:653
#define fr_table_str_by_str_value(_table, _str_value, _def)
Brute force search a sorted or ordered ptr table, assuming the pointers are strings.
Definition table.h:625
An element in a lexicographically sorted array of name to ptr mappings.
Definition table.h:65
fr_slen_t fr_time_delta_from_str(fr_time_delta_t *out, char const *in, size_t inlen, fr_time_res_t hint)
Create fr_time_delta_t from a string.
Definition time.c:412
fr_slen_t fr_time_delta_to_str(fr_sbuff_t *out, fr_time_delta_t delta, fr_time_res_t res, bool is_unsigned)
Print fr_time_delta_t to a string with an appropriate suffix.
Definition time.c:435
@ FR_TIME_RES_MSEC
Definition time.h:58
@ FR_TIME_RES_SEC
Definition time.h:50
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
Definition time.h:637
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
enum fr_token fr_token_t
@ T_BARE_WORD
Definition token.h:118
@ T_OP_EQ
Definition token.h:81
static fr_slen_t parent
Definition pair.h:858
#define fr_type_is_string(_x)
Definition types.h:348
void fr_value_box_clear(fr_value_box_t *data)
Clear/free any existing value and metadata.
Definition value.c:4362
#define FR_VALUE_BOX_INITIALISER_NULL(_vb)
A static initialiser for stack/globally allocated boxes.
Definition value.h:511
static size_t char ** out
Definition value.h:1030