The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
base.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 1c5b31927d4e569cd934a3f2d567d2a70091105c $
19 * @file kafka/base.c
20 * @brief Kafka global structures
21 *
22 * @copyright 2022 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
23 */
24
25#include <freeradius-devel/kafka/base.h>
26#include <freeradius-devel/server/cf_parse.h>
27#include <freeradius-devel/server/tmpl.h>
28#include <freeradius-devel/util/value.h>
29#include <freeradius-devel/util/sbuff.h>
30#include <freeradius-devel/util/size.h>
31
32typedef struct {
33 rd_kafka_conf_t *conf;
35
36typedef struct {
37 rd_kafka_topic_conf_t *conf;
39
40typedef struct {
41 fr_table_ptr_sorted_t *mapping; //!< Mapping table between string constant.
42
43 size_t *mapping_len; //!< Length of the mapping tables
44
45 bool empty_default; //!< Don't produce messages saying the default is missing.
46
47 size_t size_scale; //!< Divide/multiply FR_TYPE_SIZE by this amount.
48
49 char const *property; //!< Kafka configuration property.
50
51 char const *string_sep; //!< Used for multi-value configuration items.
52 //!< Kafka uses ', ' or ';' seemingly at random.
54
55/** Destroy a kafka configuration handle
56 *
57 * @param[in] kc To destroy.
58 * @return 0
59 */
61{
62 rd_kafka_conf_destroy(kc->conf);
63 return 0;
64}
65
66static inline CC_HINT(always_inline)
68{
69 CONF_DATA const *cd;
71
72 cd = cf_data_find(cs, fr_kafka_conf_t, "conf");
73 if (cd) {
74 kc = cf_data_value(cd);
75 } else {
76 MEM(kc = talloc(NULL, fr_kafka_conf_t));
77 MEM(kc->conf = rd_kafka_conf_new());
78 talloc_set_destructor(kc, _kafka_conf_free);
79 cf_data_add(cs, kc, "conf", true);
80 }
81
82 return kc;
83}
84
85/** Destroy a kafka topic configuration handle
86 *
87 * @param[in] ktc To destroy.
88 * @return 0
89 */
91{
92 rd_kafka_topic_conf_destroy(ktc->conf);
93 return 0;
94}
95
96static inline CC_HINT(always_inline)
98{
99 CONF_DATA const *cd;
101
102 cd = cf_data_find(cs, fr_kafka_topic_conf_t, "conf");
103 if (cd) {
104 ktc = cf_data_value(cd);
105 } else {
106 MEM(ktc = talloc(NULL, fr_kafka_topic_conf_t));
107 MEM(ktc->conf = rd_kafka_topic_conf_new());
108 talloc_set_destructor(ktc, _kafka_topic_conf_free);
109 cf_data_add(cs, ktc, "conf", true);
110 }
111
112 return ktc;
113}
114
115/** Perform any conversions necessary to map kafka defaults to our values
116 *
117 * @param[out] out Where to write the pair.
118 * @param[in] parent being populated.
119 * @param[in] cs to allocate the pair in.
120 * @param[in] value to convert.
121 * @param[in] quote to use when allocing the pair.
122 * @param[in] rule UNUSED.
123 * @return
124 * - 0 on success.
125 * - -1 on failure.
126 */
128 fr_token_t quote, conf_parser_t const *rule)
129{
130 char tmp[sizeof("18446744073709551615b")];
131 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
132 fr_type_t type = rule->type;
133
134 /*
135 * Apply any mappings available, but default back
136 * to the raw value if we don't have a match.
137 */
138 if (kctx->mapping) {
139 fr_table_ptr_sorted_t *mapping = kctx->mapping;
140 size_t mapping_len = *kctx->mapping_len;
141
143 }
144 /*
145 * Convert time delta as an integer with ms precision
146 */
147 switch (type) {
149 {
150 fr_sbuff_t value_elem = FR_SBUFF_IN(tmp, sizeof(tmp));
151 fr_time_delta_t delta;
152
153 if (fr_time_delta_from_str(&delta, value, strlen(value), FR_TIME_RES_MSEC) < 0) {
154 cf_log_perr(cs, "Failed parsing default \"%s\"", value);
155 return -1;
156 }
157
158 fr_time_delta_to_str(&value_elem, delta, FR_TIME_RES_SEC, true);
159 value = fr_sbuff_start(&value_elem);
160 }
161 break;
162
163 case FR_TYPE_SIZE:
164 {
165 fr_sbuff_t value_elem = FR_SBUFF_IN(tmp, sizeof(tmp));
166 size_t size;
167
168 if (fr_size_from_str(&size, &FR_SBUFF_IN(value, strlen(value))) < 0) {
169 cf_log_perr(cs, "Failed parsing default \"%s\"", value);
170 return -1;
171 }
172
173 /*
174 * Some options are in kbytes *sigh*
175 */
176 if (kctx->size_scale) size *= kctx->size_scale;
177
178 /*
179 * reprint the size with an appropriate unit
180 */
181 if (fr_size_to_str(&value_elem, size) < 0) {
182 cf_log_perr(cs, "Failed size reprint");
183 return -1;
184 }
185 value = fr_sbuff_start(&value_elem);
186 }
187 break;
188
189 default:
190 break;
191 }
192
193 MEM(*out = cf_pair_alloc(cs, rule->name1, value, T_OP_EQ, T_BARE_WORD, quote));
194 cf_pair_mark_parsed(*out); /* Don't re-parse this */
195
196 return 0;
197}
198
199/** Return the default value from the kafka client library
200 *
201 * @param[out] out Where to write the pair.
202 * @param[in] parent being populated.
203 * @param[in] cs to allocate the pair in.
204 * @param[in] quote to use when allocing the pair.
205 * @param[in] rule UNUSED.
206 * @return
207 * - 0 on success.
208 * - -1 on failure.
209 */
210static int kafka_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
211{
212 char buff[1024];
213 size_t buff_len = sizeof(buff);
214 char const *value;
215
216 fr_kafka_conf_t *kc;
217 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
218 rd_kafka_conf_res_t ret;
219
220 kc = kafka_conf_from_cs(cs);
221 fr_assert(kc);
222
223 if ((ret = rd_kafka_conf_get(kc->conf, kctx->property, buff, &buff_len)) != RD_KAFKA_CONF_OK) {
224 if (ret == RD_KAFKA_CONF_UNKNOWN) {
225 if (kctx->empty_default) return 0;
226
227 cf_log_debug(cs, "No default available for \"%s\" - \"%s\"", rule->name1, kctx->property);
228 return 0; /* Not an error */
229 }
230
231 cf_log_err(cs, "Failed retrieving kafka property \"%s\"", kctx->property);
232 return -1;
233 }
234#if 0
235 cf_log_debug(cs, "Retrieved dflt \"%s\" for \"%s\" - \"%s\"", buff, rule->name1, kctx->property);
236#endif
237 value = buff;
238
239 /*
240 * If it's multi we need to break the string apart on the string separator
241 * and potentially unescape the separator.
242 */
243 if (fr_rule_multi(rule)) {
244 fr_sbuff_t value_in = FR_SBUFF_IN(value, buff_len);
245 char tmp[256];
246 fr_sbuff_t value_elem = FR_SBUFF_OUT(tmp, sizeof(tmp));
248 fr_sbuff_unescape_rules_t ue_rules = {
249 .name = __FUNCTION__,
250 .chr = '\\'
251 };
252 /*
253 * Convert escaped separators back
254 */
255 ue_rules.subs[(uint8_t)kctx->string_sep[0]] = kctx->string_sep[0];
256
257 while (fr_sbuff_out_unescape_until(&value_elem, &value_in, SIZE_MAX, &tt, &ue_rules) > 0) {
258 if (kafka_config_dflt_single(out, parent, cs, fr_sbuff_start(&value_elem), quote, rule) < 0) return -1;
259
260 /*
261 * Skip past the string separator
262 */
263 fr_sbuff_advance(&value_in, strlen(kctx->string_sep));
264
265 /*
266 * Reset
267 */
268 fr_sbuff_set_to_start(&value_elem);
269 }
270 return 0;
271 }
272
273 /*
274 * Parse a single value
275 */
276 if (kafka_config_dflt_single(out, parent, cs, value, quote, rule) < 0) return -1;
277
278 return 0;
279}
280
281/** Return the default value for a topic from the kafka client library
282 *
283 * @param[out] out Where to write the pair.
284 * @param[in] parent being populated.
285 * @param[in] cs to allocate the pair in.
286 * @param[in] quote to use when allocing the pair.
287 * @param[in] rule UNUSED.
288 * @return
289 * - 0 on success.
290 * - -1 on failure.
291 */
293{
294 char buff[1024];
295 size_t buff_len = sizeof(buff);
296 char const *value;
297
299 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
300 rd_kafka_conf_res_t ret;
301
302 ktc = kafka_topic_conf_from_cs(cs);
303 fr_assert(ktc);
304
305 if ((ret = rd_kafka_topic_conf_get(ktc->conf, kctx->property, buff, &buff_len)) != RD_KAFKA_CONF_OK) {
306 if (ret == RD_KAFKA_CONF_UNKNOWN) {
307 if (kctx->empty_default) return 0;
308
309 cf_log_debug(cs, "No default available for \"%s\" - \"%s\"", rule->name1, kctx->property);
310 return 0; /* Not an error */
311 }
312
313 fr_assert(ret == RD_KAFKA_CONF_UNKNOWN);
314 cf_log_err(cs, "Failed retrieving kafka property '%s'", kctx->property);
315 return -1;
316 }
317#if 0
318 cf_log_debug(cs, "Retrieved dflt \"%s\" for \"%s\" - \"%s\"", buff, rule->name1, kctx->property);
319#endif
320 value = buff;
321
322 /*
323 * Parse a single value
324 */
325 if (kafka_config_dflt_single(out, parent, cs, value, quote, rule) < 0) return -1;
326
327 return 0;
328}
329
330static int kafka_config_parse_single(char const **out, CONF_PAIR *cp, conf_parser_t const *rule)
331{
333 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
334 fr_type_t type = rule->type;
335 static _Thread_local char buff[sizeof("18446744073709551615")];
336 static _Thread_local fr_sbuff_t sbuff;
337
338 /*
339 * Map string values if possible, and if there's
340 * no match then just pass the original through.
341 *
342 * We count this as validation...
343 */
344 if (kctx->mapping) {
345 fr_table_ptr_sorted_t *mapping = kctx->mapping;
346 size_t mapping_len = *kctx->mapping_len;
347
349 return 0;
350 } else if (fr_type_is_string(type)) {
351 *out = cf_pair_value(cp);
352 return 0;
353 }
354
355 /*
356 * Parse as a box for basic validation
357 */
358 if (cf_pair_to_value_box(NULL, &vb, cp, rule) < 0) return -1;
359
360 /*
361 * In kafka all the time deltas are in ms
362 * resolution, so we need to take the parsed value,
363 * scale it, and print it back to a string.
364 */
365 switch (type) {
367 {
368 uint64_t delta;
369
370 sbuff = FR_SBUFF_IN(buff, sizeof(buff));
371 delta = fr_time_delta_to_msec(vb.vb_time_delta);
372 if (fr_sbuff_in_sprintf(&sbuff, "%" PRIu64, delta) < 0) {
373 error:
375 return -1;
376 }
377 *out = fr_sbuff_start(&sbuff);
378 }
379 break;
380
381 case FR_TYPE_SIZE:
382 {
383 size_t size = vb.vb_size;
384
385 sbuff = FR_SBUFF_IN(buff, sizeof(buff));
386
387 /*
388 * Most options are in bytes, but some are in kilobytes
389 */
390 if (kctx->size_scale) size /= kctx->size_scale;
391
392 /*
393 * Kafka doesn't want units...
394 */
395 if (fr_sbuff_in_sprintf(&sbuff, "%zu", size) < 0) goto error;
396 *out = fr_sbuff_start(&sbuff);
397 }
398 break;
399
400 /*
401 * Ensure bool is always mapped to the string constants
402 * "true" or "false".
403 */
404 case FR_TYPE_BOOL:
405 *out = vb.vb_bool ? "true" : "false";
406 break;
407
408 default:
409 *out = cf_pair_value(cp);
410 break;
411 }
412
414
415 return 0;
416}
417
418/** Translate config items directly to settings in a kafka config struct
419 *
420 * @param[in] ctx to allocate fr_kafka_conf_t in.
421 * @param[out] out Unused.
422 * @param[in] base Unused.
423 * @param[in] ci To parse.
424 * @param[in] rule describing how to parse the item.
425 * @return
426 * - 0 on success.
427 * - -1 on failure
428 */
429static int kafka_config_parse(TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base,
430 CONF_ITEM *ci, conf_parser_t const *rule)
431{
432 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
435 CONF_PAIR *cp = cf_item_to_pair(ci);
436
437 fr_kafka_conf_t *kc;
438 char const *value;
439
441
442 /*
443 * Multi rules require us to concat the values together before handing them off
444 */
445 if (fr_rule_multi(rule)) {
446 unsigned int i;
447 CONF_PAIR *cp_p;
448 size_t count;
449 char const **array;
450 fr_sbuff_t *agg;
451 fr_slen_t slen;
452
453 FR_SBUFF_TALLOC_THREAD_LOCAL(&agg, 256, SIZE_MAX);
454
455 count = cf_pair_count(cs, rule->name1);
456 if (count <= 1) goto do_single;
457
458 MEM(array = talloc_array(ctx, char const *, count));
459 for (cp_p = cp, i = 0;
460 cp_p;
461 cp_p = cf_pair_find_next(cs, cp_p, rule->name1), i++) {
462 if (kafka_config_parse_single(&array[i], cp_p, rule) < 0) return -1;
464 }
465
466 slen = talloc_array_concat(agg, array, kctx->string_sep);
467 talloc_free(array);
468 if (slen < 0) return -1;
469
470 value = fr_sbuff_start(agg);
471 } else {
472 do_single:
473 if (kafka_config_parse_single(&value, cp, rule) < 0) return -1;
474 }
475
476 {
477 char errstr[512];
478
479 if (rd_kafka_conf_set(kc->conf, kctx->property,
480 value, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
481 cf_log_perr(cp, "%s", errstr);
482 return -1;
483 }
484 }
485
486 return 0;
487}
488
489
490/** Translate config items directly to settings in a kafka topic config struct
491 *
492 * @param[in] ctx to allocate fr_kafka_conf_t in.
493 * @param[out] out Unused.
494 * @param[in] base Unused.
495 * @param[in] ci To parse.
496 * @param[in] rule describing how to parse the item.
497 * @return
498 * - 0 on success.
499 * - -1 on failure
500 */
501static int kafka_topic_config_parse(UNUSED TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base,
502 CONF_ITEM *ci, conf_parser_t const *rule)
503{
504 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
506 CONF_PAIR *cp = cf_item_to_pair(ci);
507
509 char const *value;
510
512 if (kafka_config_parse_single(&value, cp, rule) < 0) return -1;
513
514 {
515 char errstr[512];
516
517 if (rd_kafka_topic_conf_set(ktc->conf, kctx->property,
518 value, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
519 cf_log_perr(cp, "%s", errstr);
520 return -1;
521 }
522 }
523
524 return 0;
525}
526
527#if 0
528/** Configure a new topic for production or consumption
529 *
530 */
531static int kafka_topic_new(UNUSED TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base,
532 CONF_ITEM *ci, conf_parser_t const *rule)
533{
534 fr_kafka_conf_ctx_t const *kctx = rule->uctx;
536 CONF_PAIR *cp = cf_item_to_pair(ci);
537
539 char const *value;
540
542
543 rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)
544}
545#endif
546
549 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.oauthbearer.config", .empty_default = true }},
550
552 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.sasl.oauthbearer.unsecure.jwt" }},
553
555};
556
558 /*
559 * Service principal
560 */
562 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.service.name" }},
563
564 /*
565 * Principal
566 */
568 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.principal" }},
569
570 /*
571 * knit cmd
572 */
574 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.kinit.cmd" }},
575
576 /*
577 * keytab
578 */
580 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.kinit.keytab", .empty_default = true }},
581
582 /*
583 * How long between key refreshes
584 */
586 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.min.time.before.relogin" }},
587
589};
590
592 /*
593 * SASL mechanism
594 */
596 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.mechanism" }},
597
598 /*
599 * Static SASL username
600 */
602 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.username", .empty_default = true }},
603
604 /*
605 * Static SASL password
606 */
608 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.password", .empty_default = true }},
609
611
613
615};
616
618 { L("false"), "none" },
619 { L("no"), "none" },
620 { L("true"), "https" },
621 { L("yes"), "https" }
622};
624
626 /*
627 * Cipher suite list in OpenSSL's format
628 */
630 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.cipher.suites", .empty_default = true }},
631
632 /*
633 * Curves list in OpenSSL's format
634 */
636 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.curves.list", .empty_default = true }},
637
638 /*
639 * Curves list in OpenSSL's format
640 */
642 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.sigalgs.list", .empty_default = true }},
643
644 /*
645 * Sets the full path to a CA certificate (used to validate
646 * the certificate the server presents).
647 */
649 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.ca.location", .empty_default = true }},
650
651 /*
652 * Location of the CRL file.
653 */
655 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.crl.location", .empty_default = true }},
656
657 /*
658 * Sets the path to the public certificate file we present
659 * to the servers.
660 */
662 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.certificate.location", .empty_default = true }},
663
664 /*
665 * Sets the path to the private key for our public
666 * certificate.
667 */
669 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.key.location", .empty_default = true }},
670
671 /*
672 * Enable or disable certificate validation
673 */
675 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.ssl.certificate.verification" }},
676
678 .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.endpoint.identification.algorithm",
679 .mapping = kafka_check_cert_cn_table,
680 .mapping_len = &kafka_check_cert_cn_table_len }},
682};
683
685 /*
686 * Socket timeout
687 */
689 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.timeout.ms" }},
690
691 /*
692 * Close broker connections after this period.
693 */
695 .uctx = &(fr_kafka_conf_ctx_t){ .property = "connections.max.idle.ms" }},
696
697 /*
698 * Maximum requests in flight (per connection).
699 */
700 { FR_CONF_FUNC("max_requests_in_flight", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
701 .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.in.flight.requests.per.connection" }},
702
703 /*
704 * Socket send buffer.
705 */
707 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.send.buffer.bytes" }},
708
709 /*
710 * Socket recv buffer.
711 */
713 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.receive.buffer.bytes" }},
714
715 /*
716 * If true, send TCP keepalives
717 */
719 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.keepalive.enable" }},
720
721 /*
722 * If true, disable nagle algorithm
723 */
725 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.nagle.disable" }},
726
727 /*
728 * How long the DNS resolver cache is valid for
729 */
731 .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.address.ttl" }},
732
733 /*
734 * Should we use A records, AAAA records or either
735 * when resolving broker addresses
736 */
737 { FR_CONF_FUNC("resolver_addr_family", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
738 .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.address.family" }},
739
740 /*
741 * How many failures before we reconnect the connection
742 */
743 { FR_CONF_FUNC("reconnection_failure_count", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
744 .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.max.fails" }},
745
746 /*
747 * Initial time to wait before reconnecting.
748 */
749 { FR_CONF_FUNC("reconnection_delay_initial", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
750 .uctx = &(fr_kafka_conf_ctx_t){ .property = "reconnect.backoff.ms" }},
751
752 /*
753 * Max time to wait before reconnecting.
754 */
756 .uctx = &(fr_kafka_conf_ctx_t){ .property = "reconnect.backoff.max.ms" }},
757
759};
760
762 /*
763 * Request the API version from connected brokers
764 */
766 .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.request" }},
767
768 /*
769 * How long to wait for a version response.
770 */
772 .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.request.timeout.ms" }},
773
774 /*
775 * How long to wait before retrying a version request.
776 */
778 .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.fallback.ms" }},
779
780 /*
781 * Default version to use if the version request fails.
782 */
784 .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.version.fallback" }},
785
787};
788
790 /*
791 * Interval between attempts to refresh metadata from brokers
792 */
794 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.interval.ms" }},
795
796 /*
797 * Interval between attempts to refresh metadata from brokers
798 */
800 .uctx = &(fr_kafka_conf_ctx_t){ .property = "metadata.max.age.ms" }},
801
802 /*
803 * Used when a topic loses its leader
804 */
806 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.fast.interval.ms" }},
807
808 /*
809 * Used when a topic loses its leader to prevent spurious metadata changes
810 */
812 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.propagation.max.ms" }},
813
814 /*
815 * Use sparse metadata requests which use less bandwidth maps
816 */
818 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.sparse" }},
819
820 /*
821 * List of topics to ignore
822 */
824 .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.blacklist", .string_sep = ",", .empty_default = true }},
825
827};
828
829#define BASE_CONFIG \
830 { FR_CONF_FUNC("server", FR_TYPE_STRING, CONF_FLAG_REQUIRED | CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
831 .uctx = &(fr_kafka_conf_ctx_t){ .property = "metadata.broker.list", .string_sep = "," }}, \
832 { FR_CONF_FUNC("client_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
833 .uctx = &(fr_kafka_conf_ctx_t){ .property = "client.id" }}, \
834 { FR_CONF_FUNC("rack_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
835 .uctx = &(fr_kafka_conf_ctx_t){ .property = "client.rack" }}, \
836 { FR_CONF_FUNC("request_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
837 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.max.bytes" }}, \
838 { FR_CONF_FUNC("request_copy_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
839 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.copy.max.bytes" }}, \
840 { FR_CONF_FUNC("response_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
841 .uctx = &(fr_kafka_conf_ctx_t){ .property = "receive.message.max.bytes" }}, \
842 { FR_CONF_FUNC("feature", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
843 .uctx = &(fr_kafka_conf_ctx_t){ .property = "builtin.features", .string_sep = "," }}, \
844 { FR_CONF_FUNC("debug", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
845 .uctx = &(fr_kafka_conf_ctx_t){ .property = "debug", .string_sep = "," }}, \
846 { FR_CONF_FUNC("plugin", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, NULL), \
847 .uctx = &(fr_kafka_conf_ctx_t){ .property = "plugin.library.paths", .string_sep = ";" }}, \
848 { FR_CONF_SUBSECTION_GLOBAL("metadata", 0, kafka_metadata_config) }, \
849 { FR_CONF_SUBSECTION_GLOBAL("version", 0, kafka_version_config) }, \
850 { FR_CONF_SUBSECTION_GLOBAL("connection", 0, kafka_connection_config) }, \
851 { FR_CONF_SUBSECTION_GLOBAL("tls", 0, kafka_tls_config) }, \
852 { FR_CONF_SUBSECTION_GLOBAL("sasl", 0, kafka_sasl_config) }
853
855 /*
856 * Group consumer is a member of
857 */
859 .uctx = &(fr_kafka_conf_ctx_t){ .property = "group.id" }},
860
861 /*
862 * A unique identifier of the consumer instance provided by the end user
863 */
865 .uctx = &(fr_kafka_conf_ctx_t){ .property = "group.instance.id" }},
866
867 /*
868 * Range or roundrobin
869 */
870 { FR_CONF_FUNC("partition_assignment_strategy", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
871 .uctx = &(fr_kafka_conf_ctx_t){ .property = "partition.assignment.strategy" }},
872
873 /*
874 * Client group session and failure detection timeout.
875 */
877 .uctx = &(fr_kafka_conf_ctx_t){ .property = "session.timeout.ms" }},
878
879 /*
880 * Group session keepalive heartbeat interval.
881 */
883 .uctx = &(fr_kafka_conf_ctx_t){ .property = "heartbeat.interval.ms" }},
884
885 /*
886 * How often to query for the current client group coordinator
887 */
888 { FR_CONF_FUNC("coordinator_query_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
889 .uctx = &(fr_kafka_conf_ctx_t){ .property = "coordinator.query.interval.ms" }},
890
891
893};
894
896 /*
897 * How many messages we process at a time
898 *
899 * High numbers may starve the worker thread
900 */
901 { FR_CONF_FUNC("max_messages_per_cycle", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
902 .uctx = &(fr_kafka_conf_ctx_t){ .property = "consume.callback.max.messages" }},
903
904 /*
905 * Action to take when there is no initial offset
906 * in offset store or the desired offset is out of range.
907 */
909 .uctx = &(fr_kafka_conf_ctx_t){ .property = "auto.offset.reset" }},
910
912};
913
914/*
915 * Allows topic configurations in the format:
916 *
917 * topic {
918 * <name> {
919 * request_required_acks = ...
920 * }
921 * }
922 *
923 */
929
933
934 /*
935 * Maximum allowed time between calls to consume messages.
936 */
938 .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.poll.interval.ms" }},
939
940 /*
941 * Toggle auto commit
942 */
944 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable_auto.commit" }},
945
946 /*
947 * Auto commit interval
948 */
950 .uctx = &(fr_kafka_conf_ctx_t){ .property = "auto.commit.interval.ms" }},
951
952 /*
953 * Automatically store offset of last message provided to application.
954 */
956 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.auto.offset.store" }},
957
958 /*
959 * Minimum number of messages per topic+partition librdkafka tries to
960 * maintain in the local consumer queue.
961 */
963 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queued.min.messages" }},
964
965 /*
966 * Maximum size of queued pre-fetched messages in the local consumer queue.
967 */
968 { FR_CONF_FUNC("queued_messages_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt),
969 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queued.max.messages.kbytes", .size_scale = 1024 }},
970
971 /*
972 * Maximum time the broker may wait to fill the Fetch response.
973 */
975 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.wait.max.ms" }},
976
977 /*
978 * Initial maximum number of bytes per topic+partition to request when
979 * fetching messages from the broker.
980 */
981 { FR_CONF_FUNC("fetch_message_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt),
982 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.message.max.bytes" }},
983
984 /*
985 * Initial maximum number of bytes per topic+partition to request when
986 * fetching messages from the broker.
987 */
988 { FR_CONF_FUNC("fetch_partition_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt),
989 .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.partition.fetch.bytes" }},
990
991 /*
992 * Maximum amount of data the broker shall return for a Fetch request.
993 */
995 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.max.bytes" }},
996
997 /*
998 * Minimum number of bytes the broker responds with.
999 */
1001 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.min.bytes" }},
1002
1003 /*
1004 * How long to postpone the next fetch request for a topic+partition
1005 * in case of a fetch error.
1006 */
1008 .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.error.backoff.ms" }},
1009
1010 /*
1011 * Controls how to read messages written transactionally
1012 */
1014 .uctx = &(fr_kafka_conf_ctx_t){ .property = "isolation.level" }},
1015
1016 /*
1017 * Verify CRC32 of consumed messages, ensuring no on-the-wire or
1018 * on-disk corruption to the messages occurred.
1019 */
1021 .uctx = &(fr_kafka_conf_ctx_t){ .property = "check.crcs" }},
1022
1023 /*
1024 * Allow automatic topic creation on the broker when subscribing
1025 * to or assigning non-existent topics
1026 */
1028 .uctx = &(fr_kafka_conf_ctx_t){ .property = "allow.auto.create.topics" }},
1029
1031
1033};
1034
1036 /*
1037 * This field indicates the number of acknowledgements the leader
1038 * broker must receive from ISR brokers before responding to the request.
1039 */
1041 .uctx = &(fr_kafka_conf_ctx_t){ .property = "request.required.acks" }},
1042
1043 /*
1044 * medium The ack timeout of the producer request in milliseconds
1045 */
1047 .uctx = &(fr_kafka_conf_ctx_t){ .property = "request.timeout.ms" }},
1048
1049 /*
1050 * Local message timeout
1051 */
1053 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.timeout.ms" }},
1054
1055 /*
1056 * Partitioning strategy
1057 */
1059 .uctx = &(fr_kafka_conf_ctx_t){ .property = "partitioner" }},
1060
1061 /*
1062 * compression codec to use for compressing message sets.
1063 */
1065 .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.type" }},
1066
1067 /*
1068 * compression level to use
1069 */
1071 .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.level" }},
1072
1074};
1075
1076/*
1077 * Allows topic configurations in the format:
1078 *
1079 * topic {
1080 * <name> {
1081 * request_required_acks = ...
1082 * }
1083 * }
1084 *
1085 */
1091
1094
1095 /*
1096 * Enables the transactional producer
1097 */
1099 .uctx = &(fr_kafka_conf_ctx_t){ .property = "transactional.id", .empty_default = true }},
1100
1101 /*
1102 * The maximum amount of time in milliseconds that the transaction
1103 * coordinator will wait for a transaction status update from the
1104 * producer before proactively aborting the ongoing transaction.
1105 */
1107 .uctx = &(fr_kafka_conf_ctx_t){ .property = "transaction.timeout.ms" }},
1108
1109 /*
1110 * When set to true, the producer will ensure that messages are
1111 * successfully produced exactly once and in the original produce
1112 * order.
1113 */
1115 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.idempotence" }},
1116
1117 /*
1118 * When set to true, any error that could result in a gap in the
1119 * produced message series when a batch of messages fails.
1120 */
1122 .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.gapless.guarantee" }},
1123
1124 /*
1125 * Maximum number of messages allowed on the producer queue.
1126 * This queue is shared by all topics and partitions.
1127 */
1129 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.messages" }},
1130
1131 /*
1132 * Maximum total message size sum allowed on the producer queue.
1133 */
1135 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.kbytes", .size_scale = 1024 }},
1136
1137 /*
1138 * How long we wait to aggregate messages
1139 */
1141 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.ms" }},
1142
1143 /*
1144 * How many times we resend a message
1145 */
1147 .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.send.max.retries" }},
1148
1149 /*
1150 * The backoff time in milliseconds before retrying a protocol request.
1151 */
1152 { FR_CONF_FUNC("message_retry_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
1153 .uctx = &(fr_kafka_conf_ctx_t){ .property = "retry.backoff.ms" }},
1154
1155 /*
1156 * The threshold of outstanding not yet transmitted broker requests
1157 * needed to backpressure the producer's message accumulator.
1158 */
1159 { FR_CONF_FUNC("backpressure_threshold", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
1160 .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.backpressure.threshold" }},
1161
1162 /*
1163 * compression codec to use for compressing message sets.
1164 */
1166 .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.type" }},
1167
1168 /*
1169 * Maximum size (in bytes) of all messages batched in one MessageSet
1170 */
1172 .uctx = &(fr_kafka_conf_ctx_t){ .property = "batch.size" }},
1173
1174 /*
1175 * Delay in milliseconds to wait to assign new sticky partitions for each topic
1176 */
1177 { FR_CONF_FUNC("sticky_partition_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
1178 .uctx = &(fr_kafka_conf_ctx_t){ .property = "sticky.partitioning.linger.ms" }},
1179
1181
1183};
#define L(_str)
Helper for initialising arrays of string literals.
Definition build.h:209
#define UNUSED
Definition build.h:315
#define NUM_ELEMENTS(_t)
Definition build.h:337
int cf_pair_to_value_box(TALLOC_CTX *ctx, fr_value_box_t *out, CONF_PAIR *cp, conf_parser_t const *rule)
Parses a CONF_PAIR into a boxed value.
Definition cf_parse.c:126
#define CONF_PARSER_TERMINATOR
Definition cf_parse.h:642
void const * uctx
User data accessible by the cf_parse_t func.
Definition cf_parse.h:602
#define FR_CONF_SUBSECTION_GLOBAL(_name, _flags, _subcs)
conf_parser_t entry which runs conf_parser_t entries for a subsection without any output
Definition cf_parse.h:387
fr_type_t type
An fr_type_t value, controls the output type.
Definition cf_parse.h:583
#define fr_rule_multi(_rule)
Definition cf_parse.h:465
char const * name1
Name of the CONF_ITEM to parse.
Definition cf_parse.h:580
@ CONF_FLAG_MULTI
CONF_PAIR can have multiple copies.
Definition cf_parse.h:432
@ CONF_FLAG_SECRET
Only print value if debug level >= 3.
Definition cf_parse.h:422
@ CONF_FLAG_FILE_INPUT
File matching value must exist, and must be readable.
Definition cf_parse.h:424
#define FR_CONF_FUNC(_name, _type, _flags, _func, _dflt_func)
conf_parser_t entry which doesn't fill in a pointer or offset, but relies on functions to record valu...
Definition cf_parse.h:374
Defines a CONF_PAIR to C data type mapping.
Definition cf_parse.h:579
Internal data that is associated with a configuration section.
Definition cf_priv.h:124
Common header for all CONF_* types.
Definition cf_priv.h:49
Configuration AVP similar to a fr_pair_t.
Definition cf_priv.h:70
A section grouping multiple CONF_PAIR.
Definition cf_priv.h:101
CONF_PAIR * cf_pair_find_next(CONF_SECTION const *cs, CONF_PAIR const *prev, char const *attr)
Find a pair with a name matching attr, after specified pair.
Definition cf_util.c:1453
unsigned int cf_pair_count(CONF_SECTION const *cs, char const *attr)
Count the number of times an attribute occurs in a parent section.
Definition cf_util.c:1520
void * cf_data_value(CONF_DATA const *cd)
Return the user assigned value of CONF_DATA.
Definition cf_util.c:1763
CONF_PAIR * cf_pair_alloc(CONF_SECTION *parent, char const *attr, char const *value, fr_token_t op, fr_token_t lhs_quote, fr_token_t rhs_quote)
Allocate a CONF_PAIR.
Definition cf_util.c:1279
void cf_pair_mark_parsed(CONF_PAIR *cp)
Mark a pair as parsed.
Definition cf_util.c:1376
CONF_SECTION * cf_item_to_section(CONF_ITEM const *ci)
Cast a CONF_ITEM to a CONF_SECTION.
Definition cf_util.c:684
CONF_PAIR * cf_item_to_pair(CONF_ITEM const *ci)
Cast a CONF_ITEM to a CONF_PAIR.
Definition cf_util.c:664
char const * cf_pair_value(CONF_PAIR const *pair)
Return the value of a CONF_PAIR.
Definition cf_util.c:1594
#define cf_log_err(_cf, _fmt,...)
Definition cf_util.h:289
#define cf_data_add(_cf, _data, _name, _free)
Definition cf_util.h:255
#define cf_data_find(_cf, _type, _name)
Definition cf_util.h:244
#define cf_parent(_cf)
Definition cf_util.h:101
#define cf_log_perr(_cf, _fmt,...)
Definition cf_util.h:296
#define cf_log_debug(_cf, _fmt,...)
Definition cf_util.h:292
#define CF_IDENT_ANY
Definition cf_util.h:78
#define MEM(x)
Definition debug.h:36
Test enumeration values.
Definition dict_test.h:92
static conf_parser_t const kafka_connection_config[]
Definition base.c:684
static conf_parser_t const kafka_base_consumer_topics_config[]
Definition base.c:924
static int kafka_topic_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
Return the default value for a topic from the kafka client library.
Definition base.c:292
static size_t kafka_check_cert_cn_table_len
Definition base.c:623
static fr_kafka_topic_conf_t * kafka_topic_conf_from_cs(CONF_SECTION *cs)
Definition base.c:97
size_t size_scale
Divide/multiply FR_TYPE_SIZE by this amount.
Definition base.c:47
static conf_parser_t const kafka_base_consumer_topic_config[]
Definition base.c:895
rd_kafka_conf_t * conf
Definition base.c:33
rd_kafka_topic_conf_t * conf
Definition base.c:37
#define BASE_CONFIG
Definition base.c:829
static int kafka_config_parse(TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Translate config items directly to settings in a kafka config struct.
Definition base.c:429
conf_parser_t const kafka_base_consumer_config[]
Definition base.c:930
char const * property
Kafka configuration property.
Definition base.c:49
static int _kafka_topic_conf_free(fr_kafka_topic_conf_t *ktc)
Destroy a kafka topic configuration handle.
Definition base.c:90
static conf_parser_t const kafka_sasl_oauth_config[]
Definition base.c:547
static fr_table_ptr_sorted_t kafka_check_cert_cn_table[]
Definition base.c:617
static conf_parser_t const kafka_version_config[]
Definition base.c:761
char const * string_sep
Used for multi-value configuration items.
Definition base.c:51
static conf_parser_t const kafka_consumer_group_config[]
Definition base.c:854
static conf_parser_t const kafka_tls_config[]
Definition base.c:625
static int kafka_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
Return the default value from the kafka client library.
Definition base.c:210
static fr_kafka_conf_t * kafka_conf_from_cs(CONF_SECTION *cs)
Definition base.c:67
static conf_parser_t const kafka_base_producer_topics_config[]
Definition base.c:1086
static int kafka_topic_config_parse(UNUSED TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Translate config items directly to settings in a kafka topic config struct.
Definition base.c:501
bool empty_default
Don't produce messages saying the default is missing.
Definition base.c:45
conf_parser_t const kafka_base_producer_config[]
Definition base.c:1092
static int _kafka_conf_free(fr_kafka_conf_t *kc)
Destroy a kafka configuration handle.
Definition base.c:60
static conf_parser_t const kafka_metadata_config[]
Definition base.c:789
size_t * mapping_len
Length of the mapping tables.
Definition base.c:43
fr_table_ptr_sorted_t * mapping
Mapping table between string constant.
Definition base.c:41
static conf_parser_t const kafka_sasl_kerberos_config[]
Definition base.c:557
static conf_parser_t const kafka_base_producer_topic_config[]
Definition base.c:1035
static int kafka_config_parse_single(char const **out, CONF_PAIR *cp, conf_parser_t const *rule)
Definition base.c:330
static int kafka_config_dflt_single(CONF_PAIR **out, UNUSED void *parent, CONF_SECTION *cs, char const *value, fr_token_t quote, conf_parser_t const *rule)
Perform any conversions necessary to map kafka defaults to our values.
Definition base.c:127
static conf_parser_t const kafka_sasl_config[]
Definition base.c:591
talloc_free(reap)
size_t fr_sbuff_out_unescape_until(fr_sbuff_t *out, fr_sbuff_t *in, size_t len, fr_sbuff_term_t const *tt, fr_sbuff_unescape_rules_t const *u_rules)
fr_type_t
@ FR_TYPE_TIME_DELTA
A period of time measured in nanoseconds.
@ FR_TYPE_INT8
8 Bit signed integer.
@ FR_TYPE_STRING
String of printable characters.
@ FR_TYPE_INT16
16 Bit signed integer.
@ FR_TYPE_UINT32
32 Bit unsigned integer.
@ FR_TYPE_UINT64
64 Bit unsigned integer.
@ FR_TYPE_BOOL
A truth value.
@ FR_TYPE_SIZE
Unsigned integer capable of representing any memory address on the local system.
unsigned char uint8_t
ssize_t fr_slen_t
#define fr_assert(_expr)
Definition rad_assert.h:38
static rs_t * conf
Definition radsniff.c:53
ssize_t fr_sbuff_in_sprintf(fr_sbuff_t *sbuff, char const *fmt,...)
Print using a fmt string to an sbuff.
Definition sbuff.c:1595
#define fr_sbuff_start(_sbuff_or_marker)
#define FR_SBUFF_IN(_start, _len_or_end)
char const * name
Name for rule set to aid we debugging.
Definition sbuff.h:202
#define fr_sbuff_advance(_sbuff_or_marker, _len)
#define FR_SBUFF_OUT(_start, _len_or_end)
char subs[UINT8_MAX+1]
Special characters and their substitutions.
Definition sbuff.h:205
#define FR_SBUFF_TERM(_str)
Initialise a terminal structure with a single string.
Definition sbuff.h:180
#define FR_SBUFF_TALLOC_THREAD_LOCAL(_out, _init, _max)
Set of terminal elements.
Set of parsing rules for *unescape_until functions.
fr_slen_t fr_size_from_str(size_t *out, fr_sbuff_t *in)
Parse a size string with optional unit.
Definition size.c:40
fr_slen_t fr_size_to_str(fr_sbuff_t *out, size_t in)
Print a size string with unit.
Definition size.c:155
static char buff[sizeof("18446744073709551615")+3]
Definition size_tests.c:41
return count
Definition module.c:163
fr_aka_sim_id_type_t type
#define fr_table_value_by_str(_table, _name, _def)
Convert a string to a value using a sorted or ordered table.
Definition table.h:653
#define fr_table_str_by_str_value(_table, _str_value, _def)
Brute force search a sorted or ordered ptr table, assuming the pointers are strings.
Definition table.h:625
An element in a lexicographically sorted array of name to ptr mappings.
Definition table.h:65
fr_slen_t talloc_array_concat(fr_sbuff_t *out, char const *const *array, char const *sep)
Concat an array of strings (not NULL terminated), with a string separator.
Definition talloc.c:914
fr_slen_t fr_time_delta_from_str(fr_time_delta_t *out, char const *in, size_t inlen, fr_time_res_t hint)
Create fr_time_delta_t from a string.
Definition time.c:449
fr_slen_t fr_time_delta_to_str(fr_sbuff_t *out, fr_time_delta_t delta, fr_time_res_t res, bool is_unsigned)
Print fr_time_delta_t to a string with an appropriate suffix.
Definition time.c:472
@ FR_TIME_RES_MSEC
Definition time.h:58
@ FR_TIME_RES_SEC
Definition time.h:50
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
Definition time.h:637
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
enum fr_token fr_token_t
@ T_BARE_WORD
Definition token.h:120
@ T_OP_EQ
Definition token.h:83
static fr_slen_t parent
Definition pair.h:851
#define fr_type_is_string(_x)
Definition types.h:327
void fr_value_box_clear(fr_value_box_t *data)
Clear/free any existing value and metadata.
Definition value.c:3723
#define FR_VALUE_BOX_INITIALISER_NULL(_vb)
A static initialiser for stack/globally allocated boxes.
Definition value.h:488
static size_t char ** out
Definition value.h:997