The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
base.c
Go to the documentation of this file.
1 /*
2  * This program is is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or (at
5  * your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: 1c5b31927d4e569cd934a3f2d567d2a70091105c $
19  * @file kafka/base.c
20  * @brief Kafka global structures
21  *
22  * @copyright 2022 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
23  */
24 
25 #include <freeradius-devel/kafka/base.h>
26 #include <freeradius-devel/server/cf_parse.h>
27 #include <freeradius-devel/server/tmpl.h>
28 #include <freeradius-devel/util/value.h>
29 #include <freeradius-devel/util/sbuff.h>
30 #include <freeradius-devel/util/size.h>
31 
32 typedef struct {
33  rd_kafka_conf_t *conf;
35 
36 typedef struct {
37  rd_kafka_topic_conf_t *conf;
39 
40 typedef struct {
41  fr_table_ptr_sorted_t *mapping; //!< Mapping table between string constant.
42 
43  size_t *mapping_len; //!< Length of the mapping tables
44 
45  bool empty_default; //!< Don't produce messages saying the default is missing.
46 
47  size_t size_scale; //!< Divide/multiply FR_TYPE_SIZE by this amount.
48 
49  char const *property; //!< Kafka configuration property.
50 
51  char const *string_sep; //!< Used for multi-value configuration items.
52  //!< Kafka uses ', ' or ';' seemingly at random.
54 
55 /** Destroy a kafka configuration handle
56  *
57  * @param[in] kc To destroy.
58  * @return 0
59  */
61 {
62  rd_kafka_conf_destroy(kc->conf);
63  return 0;
64 }
65 
66 static inline CC_HINT(always_inline)
68 {
69  CONF_DATA const *cd;
70  fr_kafka_conf_t *kc;
71 
72  cd = cf_data_find(cs, fr_kafka_conf_t, "conf");
73  if (cd) {
74  kc = cf_data_value(cd);
75  } else {
76  MEM(kc = talloc(NULL, fr_kafka_conf_t));
77  MEM(kc->conf = rd_kafka_conf_new());
78  talloc_set_destructor(kc, _kafka_conf_free);
79  cf_data_add(cs, kc, "conf", true);
80  }
81 
82  return kc;
83 }
84 
85 /** Destroy a kafka topic configuration handle
86  *
87  * @param[in] ktc To destroy.
88  * @return 0
89  */
91 {
92  rd_kafka_topic_conf_destroy(ktc->conf);
93  return 0;
94 }
95 
96 static inline CC_HINT(always_inline)
98 {
99  CONF_DATA const *cd;
101 
102  cd = cf_data_find(cs, fr_kafka_topic_conf_t, "conf");
103  if (cd) {
104  ktc = cf_data_value(cd);
105  } else {
106  MEM(ktc = talloc(NULL, fr_kafka_topic_conf_t));
107  MEM(ktc->conf = rd_kafka_topic_conf_new());
108  talloc_set_destructor(ktc, _kafka_topic_conf_free);
109  cf_data_add(cs, ktc, "conf", true);
110  }
111 
112  return ktc;
113 }
114 
115 /** Perform any conversions necessary to map kafka defaults to our values
116  *
117  * @param[out] out Where to write the pair.
118  * @param[in] parent being populated.
119  * @param[in] cs to allocate the pair in.
120  * @param[in] value to convert.
121  * @param[in] quote to use when allocing the pair.
122  * @param[in] rule UNUSED.
123  * @return
124  * - 0 on success.
125  * - -1 on failure.
126  */
127 static int kafka_config_dflt_single(CONF_PAIR **out, UNUSED void *parent, CONF_SECTION *cs, char const *value,
128  fr_token_t quote, conf_parser_t const *rule)
129 {
130  char tmp[sizeof("18446744073709551615b")];
131  fr_kafka_conf_ctx_t const *kctx = rule->uctx;
132  fr_type_t type = rule->type;
133 
134  /*
135  * Apply any mappings available, but default back
136  * to the raw value if we don't have a match.
137  */
138  if (kctx->mapping) {
139  fr_table_ptr_sorted_t *mapping = kctx->mapping;
140  size_t mapping_len = *kctx->mapping_len;
141 
143  }
144  /*
145  * Convert time delta as an integer with ms precision
146  */
147  switch (type) {
148  case FR_TYPE_TIME_DELTA:
149  {
150  fr_sbuff_t value_elem = FR_SBUFF_IN(tmp, sizeof(tmp));
151  fr_time_delta_t delta;
152 
153  if (fr_time_delta_from_str(&delta, value, strlen(value), FR_TIME_RES_MSEC) < 0) {
154  cf_log_perr(cs, "Failed parsing default \"%s\"", value);
155  return -1;
156  }
157 
158  fr_time_delta_to_str(&value_elem, delta, FR_TIME_RES_SEC, true);
159  value = fr_sbuff_start(&value_elem);
160  }
161  break;
162 
163  case FR_TYPE_SIZE:
164  {
165  fr_sbuff_t value_elem = FR_SBUFF_IN(tmp, sizeof(tmp));
166  size_t size;
167 
168  if (fr_size_from_str(&size, &FR_SBUFF_IN(value, strlen(value))) < 0) {
169  cf_log_perr(cs, "Failed parsing default \"%s\"", value);
170  return -1;
171  }
172 
173  /*
174  * Some options are in kbytes *sigh*
175  */
176  if (kctx->size_scale) size *= kctx->size_scale;
177 
178  /*
179  * reprint the size with an appropriate unit
180  */
181  if (fr_size_to_str(&value_elem, size) < 0) {
182  cf_log_perr(cs, "Failed size reprint");
183  return -1;
184  }
185  value = fr_sbuff_start(&value_elem);
186  }
187  break;
188 
189  default:
190  break;
191  }
192 
193  MEM(*out = cf_pair_alloc(cs, rule->name1, value, T_OP_EQ, T_BARE_WORD, quote));
194  cf_pair_mark_parsed(*out); /* Don't re-parse this */
195 
196  return 0;
197 }
198 
199 /** Return the default value from the kafka client library
200  *
201  * @param[out] out Where to write the pair.
202  * @param[in] parent being populated.
203  * @param[in] cs to allocate the pair in.
204  * @param[in] quote to use when allocing the pair.
205  * @param[in] rule UNUSED.
206  * @return
207  * - 0 on success.
208  * - -1 on failure.
209  */
210 static int kafka_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
211 {
212  char buff[1024];
213  size_t buff_len = sizeof(buff);
214  char const *value;
215 
216  fr_kafka_conf_t *kc;
217  fr_kafka_conf_ctx_t const *kctx = rule->uctx;
218  rd_kafka_conf_res_t ret;
219 
220  kc = kafka_conf_from_cs(cs);
221  fr_assert(kc);
222 
223  if ((ret = rd_kafka_conf_get(kc->conf, kctx->property, buff, &buff_len)) != RD_KAFKA_CONF_OK) {
224  if (ret == RD_KAFKA_CONF_UNKNOWN) {
225  if (kctx->empty_default) return 0;
226 
227  cf_log_debug(cs, "No default available for \"%s\" - \"%s\"", rule->name1, kctx->property);
228  return 0; /* Not an error */
229  }
230 
231  cf_log_err(cs, "Failed retrieving kafka property \"%s\"", kctx->property);
232  return -1;
233  }
234 #if 0
235  cf_log_debug(cs, "Retrieved dflt \"%s\" for \"%s\" - \"%s\"", buff, rule->name1, kctx->property);
236 #endif
237  value = buff;
238 
239  /*
240  * If it's multi we need to break the string apart on the string separator
241  * and potentially unescape the separator.
242  */
243  if (fr_rule_multi(rule)) {
244  fr_sbuff_t value_in = FR_SBUFF_IN(value, buff_len);
245  char tmp[256];
246  fr_sbuff_t value_elem = FR_SBUFF_OUT(tmp, sizeof(tmp));
248  fr_sbuff_unescape_rules_t ue_rules = {
249  .name = __FUNCTION__,
250  .chr = '\\'
251  };
252  /*
253  * Convert escaped separators back
254  */
255  ue_rules.subs[(uint8_t)kctx->string_sep[0]] = kctx->string_sep[0];
256 
257  while (fr_sbuff_out_unescape_until(&value_elem, &value_in, SIZE_MAX, &tt, &ue_rules) > 0) {
258  if (kafka_config_dflt_single(out, parent, cs, fr_sbuff_start(&value_elem), quote, rule) < 0) return -1;
259 
260  /*
261  * Skip past the string separator
262  */
263  fr_sbuff_advance(&value_in, strlen(kctx->string_sep));
264 
265  /*
266  * Reset
267  */
268  fr_sbuff_set_to_start(&value_elem);
269  }
270  return 0;
271  }
272 
273  /*
274  * Parse a single value
275  */
276  if (kafka_config_dflt_single(out, parent, cs, value, quote, rule) < 0) return -1;
277 
278  return 0;
279 }
280 
281 /** Return the default value for a topic from the kafka client library
282  *
283  * @param[out] out Where to write the pair.
284  * @param[in] parent being populated.
285  * @param[in] cs to allocate the pair in.
286  * @param[in] quote to use when allocing the pair.
287  * @param[in] rule UNUSED.
288  * @return
289  * - 0 on success.
290  * - -1 on failure.
291  */
292 static int kafka_topic_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
293 {
294  char buff[1024];
295  size_t buff_len = sizeof(buff);
296  char const *value;
297 
299  fr_kafka_conf_ctx_t const *kctx = rule->uctx;
300  rd_kafka_conf_res_t ret;
301 
302  ktc = kafka_topic_conf_from_cs(cs);
303  fr_assert(ktc);
304 
305  if ((ret = rd_kafka_topic_conf_get(ktc->conf, kctx->property, buff, &buff_len)) != RD_KAFKA_CONF_OK) {
306  if (ret == RD_KAFKA_CONF_UNKNOWN) {
307  if (kctx->empty_default) return 0;
308 
309  cf_log_debug(cs, "No default available for \"%s\" - \"%s\"", rule->name1, kctx->property);
310  return 0; /* Not an error */
311  }
312 
313  fr_assert(ret == RD_KAFKA_CONF_UNKNOWN);
314  cf_log_err(cs, "Failed retrieving kafka property '%s'", kctx->property);
315  return -1;
316  }
317 #if 0
318  cf_log_debug(cs, "Retrieved dflt \"%s\" for \"%s\" - \"%s\"", buff, rule->name1, kctx->property);
319 #endif
320  value = buff;
321 
322  /*
323  * Parse a single value
324  */
325  if (kafka_config_dflt_single(out, parent, cs, value, quote, rule) < 0) return -1;
326 
327  return 0;
328 }
329 
330 static int kafka_config_parse_single(char const **out, CONF_PAIR *cp, conf_parser_t const *rule)
331 {
333  fr_kafka_conf_ctx_t const *kctx = rule->uctx;
334  fr_type_t type = rule->type;
335  static _Thread_local char buff[sizeof("18446744073709551615")];
336  static _Thread_local fr_sbuff_t sbuff;
337 
338  /*
339  * Map string values if possible, and if there's
340  * no match then just pass the original through.
341  *
342  * We count this as validation...
343  */
344  if (kctx->mapping) {
345  fr_table_ptr_sorted_t *mapping = kctx->mapping;
346  size_t mapping_len = *kctx->mapping_len;
347 
349  return 0;
350  } else if (fr_type_is_string(type)) {
351  *out = cf_pair_value(cp);
352  return 0;
353  }
354 
355  /*
356  * Parse as a box for basic validation
357  */
358  if (cf_pair_to_value_box(NULL, &vb, cp, rule) < 0) return -1;
359 
360  /*
361  * In kafka all the time deltas are in ms
362  * resolution, so we need to take the parsed value,
363  * scale it, and print it back to a string.
364  */
365  switch (type) {
366  case FR_TYPE_TIME_DELTA:
367  {
368  uint64_t delta;
369 
370  sbuff = FR_SBUFF_IN(buff, sizeof(buff));
371  delta = fr_time_delta_to_msec(vb.vb_time_delta);
372  if (fr_sbuff_in_sprintf(&sbuff, "%" PRIu64, delta) < 0) {
373  error:
374  fr_value_box_clear(&vb);
375  return -1;
376  }
377  *out = fr_sbuff_start(&sbuff);
378  }
379  break;
380 
381  case FR_TYPE_SIZE:
382  {
383  size_t size = vb.vb_size;
384 
385  sbuff = FR_SBUFF_IN(buff, sizeof(buff));
386 
387  /*
388  * Most options are in bytes, but some are in kilobytes
389  */
390  if (kctx->size_scale) size /= kctx->size_scale;
391 
392  /*
393  * Kafka doesn't want units...
394  */
395  if (fr_sbuff_in_sprintf(&sbuff, "%zu", size) < 0) goto error;
396  *out = fr_sbuff_start(&sbuff);
397  }
398  break;
399 
400  /*
401  * Ensure bool is always mapped to the string constants
402  * "true" or "false".
403  */
404  case FR_TYPE_BOOL:
405  *out = vb.vb_bool ? "true" : "false";
406  break;
407 
408  default:
409  *out = cf_pair_value(cp);
410  break;
411  }
412 
413  fr_value_box_clear(&vb);
414 
415  return 0;
416 }
417 
418 /** Translate config items directly to settings in a kafka config struct
419  *
420  * @param[in] ctx to allocate fr_kafka_conf_t in.
421  * @param[out] out Unused.
422  * @param[in] base Unused.
423  * @param[in] ci To parse.
424  * @param[in] rule describing how to parse the item.
425  * @return
426  * - 0 on success.
427  * - -1 on failure
428  */
429 static int kafka_config_parse(TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base,
430  CONF_ITEM *ci, conf_parser_t const *rule)
431 {
432  fr_kafka_conf_ctx_t const *kctx = rule->uctx;
433  CONF_ITEM *parent = cf_parent(ci);
435  CONF_PAIR *cp = cf_item_to_pair(ci);
436 
437  fr_kafka_conf_t *kc;
438  char const *value;
439 
441 
442  /*
443  * Multi rules require us to concat the values together before handing them off
444  */
445  if (fr_rule_multi(rule)) {
446  unsigned int i;
447  CONF_PAIR *cp_p;
448  size_t count;
449  char const **array;
450  fr_sbuff_t *agg;
451  fr_slen_t slen;
452 
453  FR_SBUFF_TALLOC_THREAD_LOCAL(&agg, 256, SIZE_MAX);
454 
455  count = cf_pair_count(cs, rule->name1);
456  if (count <= 1) goto do_single;
457 
458  MEM(array = talloc_array(ctx, char const *, count));
459  for (cp_p = cp, i = 0;
460  cp_p;
461  cp_p = cf_pair_find_next(cs, cp_p, rule->name1), i++) {
462  if (kafka_config_parse_single(&array[i], cp_p, rule) < 0) return -1;
463  cf_pair_mark_parsed(cp_p);
464  }
465 
466  slen = talloc_array_concat(agg, array, kctx->string_sep);
468  if (slen < 0) return -1;
469 
470  value = fr_sbuff_start(agg);
471  } else {
472  do_single:
473  if (kafka_config_parse_single(&value, cp, rule) < 0) return -1;
474  }
475 
476  {
477  char errstr[512];
478 
479  if (rd_kafka_conf_set(kc->conf, kctx->property,
480  value, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
481  cf_log_perr(cp, "%s", errstr);
482  return -1;
483  }
484  }
485 
486  return 0;
487 }
488 
489 
490 /** Translate config items directly to settings in a kafka topic config struct
491  *
492  * @param[in] ctx to allocate fr_kafka_conf_t in.
493  * @param[out] out Unused.
494  * @param[in] base Unused.
495  * @param[in] ci To parse.
496  * @param[in] rule describing how to parse the item.
497  * @return
498  * - 0 on success.
499  * - -1 on failure
500  */
501 static int kafka_topic_config_parse(UNUSED TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base,
502  CONF_ITEM *ci, conf_parser_t const *rule)
503 {
504  fr_kafka_conf_ctx_t const *kctx = rule->uctx;
505  CONF_ITEM *parent = cf_parent(ci);
506  CONF_PAIR *cp = cf_item_to_pair(ci);
507 
509  char const *value;
510 
512  if (kafka_config_parse_single(&value, cp, rule) < 0) return -1;
513 
514  {
515  char errstr[512];
516 
517  if (rd_kafka_topic_conf_set(ktc->conf, kctx->property,
518  value, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
519  cf_log_perr(cp, "%s", errstr);
520  return -1;
521  }
522  }
523 
524  return 0;
525 }
526 
527 #if 0
528 /** Configure a new topic for production or consumption
529  *
530  */
531 static int kafka_topic_new(UNUSED TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base,
532  CONF_ITEM *ci, conf_parser_t const *rule)
533 {
534  fr_kafka_conf_ctx_t const *kctx = rule->uctx;
535  CONF_ITEM *parent = cf_parent(ci);
536  CONF_PAIR *cp = cf_item_to_pair(ci);
537 
539  char const *value;
540 
542 
543  rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)
544 }
545 #endif
546 
549  .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.oauthbearer.config", .empty_default = true }},
550 
552  .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.sasl.oauthbearer.unsecure.jwt" }},
553 
555 };
556 
558  /*
559  * Service principal
560  */
562  .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.service.name" }},
563 
564  /*
565  * Principal
566  */
568  .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.principal" }},
569 
570  /*
571  * knit cmd
572  */
574  .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.kinit.cmd" }},
575 
576  /*
577  * keytab
578  */
580  .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.kinit.keytab", .empty_default = true }},
581 
582  /*
583  * How long between key refreshes
584  */
586  .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.min.time.before.relogin" }},
587 
589 };
590 
592  /*
593  * SASL mechanism
594  */
596  .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.mechanism" }},
597 
598  /*
599  * Static SASL username
600  */
602  .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.username", .empty_default = true }},
603 
604  /*
605  * Static SASL password
606  */
608  .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.password", .empty_default = true }},
609 
611 
613 
615 };
616 
618  { L("false"), "none" },
619  { L("no"), "none" },
620  { L("true"), "https" },
621  { L("yes"), "https" }
622 };
624 
625 static conf_parser_t const kafka_tls_config[] = {
626  /*
627  * Cipher suite list in OpenSSL's format
628  */
630  .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.cipher.suites", .empty_default = true }},
631 
632  /*
633  * Curves list in OpenSSL's format
634  */
636  .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.curves.list", .empty_default = true }},
637 
638  /*
639  * Curves list in OpenSSL's format
640  */
642  .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.sigalgs.list", .empty_default = true }},
643 
644  /*
645  * Sets the full path to a CA certificate (used to validate
646  * the certificate the server presents).
647  */
649  .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.ca.location", .empty_default = true }},
650 
651  /*
652  * Location of the CRL file.
653  */
655  .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.crl.location", .empty_default = true }},
656 
657  /*
658  * Sets the path to the public certificate file we present
659  * to the servers.
660  */
662  .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.certificate.location", .empty_default = true }},
663 
664  /*
665  * Sets the path to the private key for our public
666  * certificate.
667  */
669  .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.key.location", .empty_default = true }},
670 
671  /*
672  * Enable or disable certificate validation
673  */
675  .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.ssl.certificate.verification" }},
676 
678  .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.endpoint.identification.algorithm",
679  .mapping = kafka_check_cert_cn_table,
680  .mapping_len = &kafka_check_cert_cn_table_len }},
682 };
683 
685  /*
686  * Socket timeout
687  */
689  .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.timeout.ms" }},
690 
691  /*
692  * Close broker connections after this period.
693  */
695  .uctx = &(fr_kafka_conf_ctx_t){ .property = "connections.max.idle.ms" }},
696 
697  /*
698  * Maximum requests in flight (per connection).
699  */
700  { FR_CONF_FUNC("max_requests_in_flight", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
701  .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.in.flight.requests.per.connection" }},
702 
703  /*
704  * Socket send buffer.
705  */
707  .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.send.buffer.bytes" }},
708 
709  /*
710  * Socket recv buffer.
711  */
713  .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.receive.buffer.bytes" }},
714 
715  /*
716  * If true, send TCP keepalives
717  */
719  .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.keepalive.enable" }},
720 
721  /*
722  * If true, disable nagle algorithm
723  */
725  .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.nagle.disable" }},
726 
727  /*
728  * How long the DNS resolver cache is valid for
729  */
731  .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.address.ttl" }},
732 
733  /*
734  * Should we use A records, AAAA records or either
735  * when resolving broker addresses
736  */
737  { FR_CONF_FUNC("resolver_addr_family", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
738  .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.address.family" }},
739 
740  /*
741  * How many failures before we reconnect the connection
742  */
743  { FR_CONF_FUNC("reconnection_failure_count", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
744  .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.max.fails" }},
745 
746  /*
747  * Initial time to wait before reconnecting.
748  */
749  { FR_CONF_FUNC("reconnection_delay_initial", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
750  .uctx = &(fr_kafka_conf_ctx_t){ .property = "reconnect.backoff.ms" }},
751 
752  /*
753  * Max time to wait before reconnecting.
754  */
755  { FR_CONF_FUNC("reconnection_delay_max", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
756  .uctx = &(fr_kafka_conf_ctx_t){ .property = "reconnect.backoff.max.ms" }},
757 
759 };
760 
762  /*
763  * Request the API version from connected brokers
764  */
766  .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.request" }},
767 
768  /*
769  * How long to wait for a version response.
770  */
772  .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.request.timeout.ms" }},
773 
774  /*
775  * How long to wait before retrying a version request.
776  */
778  .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.fallback.ms" }},
779 
780  /*
781  * Default version to use if the version request fails.
782  */
784  .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.version.fallback" }},
785 
787 };
788 
790  /*
791  * Interval between attempts to refresh metadata from brokers
792  */
794  .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.interval.ms" }},
795 
796  /*
797  * Interval between attempts to refresh metadata from brokers
798  */
800  .uctx = &(fr_kafka_conf_ctx_t){ .property = "metadata.max.age.ms" }},
801 
802  /*
803  * Used when a topic loses its leader
804  */
805  { FR_CONF_FUNC("fast_refresh_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
806  .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.fast.interval.ms" }},
807 
808  /*
809  * Used when a topic loses its leader to prevent spurious metadata changes
810  */
812  .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.propagation.max.ms" }},
813 
814  /*
815  * Use sparse metadata requests which use less bandwidth maps
816  */
818  .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.sparse" }},
819 
820  /*
821  * List of topics to ignore
822  */
824  .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.blacklist", .string_sep = ",", .empty_default = true }},
825 
827 };
828 
829 #define BASE_CONFIG \
830  { FR_CONF_FUNC("server", FR_TYPE_STRING, CONF_FLAG_REQUIRED | CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
831  .uctx = &(fr_kafka_conf_ctx_t){ .property = "metadata.broker.list", .string_sep = "," }}, \
832  { FR_CONF_FUNC("client_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
833  .uctx = &(fr_kafka_conf_ctx_t){ .property = "client.id" }}, \
834  { FR_CONF_FUNC("rack_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
835  .uctx = &(fr_kafka_conf_ctx_t){ .property = "client.rack" }}, \
836  { FR_CONF_FUNC("request_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
837  .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.max.bytes" }}, \
838  { FR_CONF_FUNC("request_copy_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
839  .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.copy.max.bytes" }}, \
840  { FR_CONF_FUNC("response_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
841  .uctx = &(fr_kafka_conf_ctx_t){ .property = "receive.message.max.bytes" }}, \
842  { FR_CONF_FUNC("feature", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
843  .uctx = &(fr_kafka_conf_ctx_t){ .property = "builtin.features", .string_sep = "," }}, \
844  { FR_CONF_FUNC("debug", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
845  .uctx = &(fr_kafka_conf_ctx_t){ .property = "debug", .string_sep = "," }}, \
846  { FR_CONF_FUNC("plugin", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, NULL), \
847  .uctx = &(fr_kafka_conf_ctx_t){ .property = "plugin.library.paths", .string_sep = ";" }}, \
848  { FR_CONF_SUBSECTION_GLOBAL("metadata", 0, kafka_metadata_config) }, \
849  { FR_CONF_SUBSECTION_GLOBAL("version", 0, kafka_version_config) }, \
850  { FR_CONF_SUBSECTION_GLOBAL("connection", 0, kafka_connection_config) }, \
851  { FR_CONF_SUBSECTION_GLOBAL("tls", 0, kafka_tls_config) }, \
852  { FR_CONF_SUBSECTION_GLOBAL("sasl", 0, kafka_sasl_config) }
853 
855  /*
856  * Group consumer is a member of
857  */
859  .uctx = &(fr_kafka_conf_ctx_t){ .property = "group.id" }},
860 
861  /*
862  * A unique identifier of the consumer instance provided by the end user
863  */
865  .uctx = &(fr_kafka_conf_ctx_t){ .property = "group.instance.id" }},
866 
867  /*
868  * Range or roundrobin
869  */
870  { FR_CONF_FUNC("partition_assignment_strategy", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
871  .uctx = &(fr_kafka_conf_ctx_t){ .property = "partition.assignment.strategy" }},
872 
873  /*
874  * Client group session and failure detection timeout.
875  */
877  .uctx = &(fr_kafka_conf_ctx_t){ .property = "session.timeout.ms" }},
878 
879  /*
880  * Group session keepalive heartbeat interval.
881  */
883  .uctx = &(fr_kafka_conf_ctx_t){ .property = "heartbeat.interval.ms" }},
884 
885  /*
886  * How often to query for the current client group coordinator
887  */
888  { FR_CONF_FUNC("coordinator_query_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
889  .uctx = &(fr_kafka_conf_ctx_t){ .property = "coordinator.query.interval.ms" }},
890 
891 
893 };
894 
896  /*
897  * How many messages we process at a time
898  *
899  * High numbers may starve the worker thread
900  */
901  { FR_CONF_FUNC("max_messages_per_cycle", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
902  .uctx = &(fr_kafka_conf_ctx_t){ .property = "consume.callback.max.messages" }},
903 
904  /*
905  * Action to take when there is no initial offset
906  * in offset store or the desired offset is out of range.
907  */
909  .uctx = &(fr_kafka_conf_ctx_t){ .property = "auto.offset.reset" }},
910 
912 };
913 
914 /*
915  * Allows topic configurations in the format:
916  *
917  * topic {
918  * <name> {
919  * request_required_acks = ...
920  * }
921  * }
922  *
923  */
926 
928 };
929 
931  BASE_CONFIG,
933 
934  /*
935  * Maximum allowed time between calls to consume messages.
936  */
938  .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.poll.interval.ms" }},
939 
940  /*
941  * Toggle auto commit
942  */
944  .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable_auto.commit" }},
945 
946  /*
947  * Auto commit interval
948  */
950  .uctx = &(fr_kafka_conf_ctx_t){ .property = "auto.commit.interval.ms" }},
951 
952  /*
953  * Automatically store offset of last message provided to application.
954  */
955  { FR_CONF_FUNC("auto_offset_store", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
956  .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.auto.offset.store" }},
957 
958  /*
959  * Minimum number of messages per topic+partition librdkafka tries to
960  * maintain in the local consumer queue.
961  */
962  { FR_CONF_FUNC("queued_messages_min", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
963  .uctx = &(fr_kafka_conf_ctx_t){ .property = "queued.min.messages" }},
964 
965  /*
966  * Maximum size of queued pre-fetched messages in the local consumer queue.
967  */
968  { FR_CONF_FUNC("queued_messages_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt),
969  .uctx = &(fr_kafka_conf_ctx_t){ .property = "queued.max.messages.kbytes", .size_scale = 1024 }},
970 
971  /*
972  * Maximum time the broker may wait to fill the Fetch response.
973  */
975  .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.wait.max.ms" }},
976 
977  /*
978  * Initial maximum number of bytes per topic+partition to request when
979  * fetching messages from the broker.
980  */
981  { FR_CONF_FUNC("fetch_message_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt),
982  .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.message.max.bytes" }},
983 
984  /*
985  * Initial maximum number of bytes per topic+partition to request when
986  * fetching messages from the broker.
987  */
988  { FR_CONF_FUNC("fetch_partition_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt),
989  .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.partition.fetch.bytes" }},
990 
991  /*
992  * Maximum amount of data the broker shall return for a Fetch request.
993  */
995  .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.max.bytes" }},
996 
997  /*
998  * Minimum number of bytes the broker responds with.
999  */
1001  .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.min.bytes" }},
1002 
1003  /*
1004  * How long to postpone the next fetch request for a topic+partition
1005  * in case of a fetch error.
1006  */
1008  .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.error.backoff.ms" }},
1009 
1010  /*
1011  * Controls how to read messages written transactionally
1012  */
1014  .uctx = &(fr_kafka_conf_ctx_t){ .property = "isolation.level" }},
1015 
1016  /*
1017  * Verify CRC32 of consumed messages, ensuring no on-the-wire or
1018  * on-disk corruption to the messages occurred.
1019  */
1021  .uctx = &(fr_kafka_conf_ctx_t){ .property = "check.crcs" }},
1022 
1023  /*
1024  * Allow automatic topic creation on the broker when subscribing
1025  * to or assigning non-existent topics
1026  */
1027  { FR_CONF_FUNC("auto_create_topic", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
1028  .uctx = &(fr_kafka_conf_ctx_t){ .property = "allow.auto.create.topics" }},
1029 
1031 
1032  CONF_PARSER_TERMINATOR
1033 };
1034 
1036  /*
1037  * This field indicates the number of acknowledgements the leader
1038  * broker must receive from ISR brokers before responding to the request.
1039  */
1041  .uctx = &(fr_kafka_conf_ctx_t){ .property = "request.required.acks" }},
1042 
1043  /*
1044  * medium The ack timeout of the producer request in milliseconds
1045  */
1047  .uctx = &(fr_kafka_conf_ctx_t){ .property = "request.timeout.ms" }},
1048 
1049  /*
1050  * Local message timeout
1051  */
1053  .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.timeout.ms" }},
1054 
1055  /*
1056  * Partitioning strategy
1057  */
1059  .uctx = &(fr_kafka_conf_ctx_t){ .property = "partitioner" }},
1060 
1061  /*
1062  * compression codec to use for compressing message sets.
1063  */
1065  .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.type" }},
1066 
1067  /*
1068  * compression level to use
1069  */
1071  .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.level" }},
1072 
1074 };
1075 
1076 /*
1077  * Allows topic configurations in the format:
1078  *
1079  * topic {
1080  * <name> {
1081  * request_required_acks = ...
1082  * }
1083  * }
1084  *
1085  */
1088 
1090 };
1091 
1093  BASE_CONFIG,
1094 
1095  /*
1096  * Enables the transactional producer
1097  */
1099  .uctx = &(fr_kafka_conf_ctx_t){ .property = "transactional.id", .empty_default = true }},
1100 
1101  /*
1102  * The maximum amount of time in milliseconds that the transaction
1103  * coordinator will wait for a transaction status update from the
1104  * producer before proactively aborting the ongoing transaction.
1105  */
1107  .uctx = &(fr_kafka_conf_ctx_t){ .property = "transaction.timeout.ms" }},
1108 
1109  /*
1110  * When set to true, the producer will ensure that messages are
1111  * successfully produced exactly once and in the original produce
1112  * order.
1113  */
1115  .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.idempotence" }},
1116 
1117  /*
1118  * When set to true, any error that could result in a gap in the
1119  * produced message series when a batch of messages fails.
1120  */
1121  { FR_CONF_FUNC("gapless_guarantee", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
1122  .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.gapless.guarantee" }},
1123 
1124  /*
1125  * Maximum number of messages allowed on the producer queue.
1126  * This queue is shared by all topics and partitions.
1127  */
1128  { FR_CONF_FUNC("queue_max_messages", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
1129  .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.messages" }},
1130 
1131  /*
1132  * Maximum total message size sum allowed on the producer queue.
1133  */
1135  .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.kbytes", .size_scale = 1024 }},
1136 
1137  /*
1138  * How long we wait to aggregate messages
1139  */
1141  .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.ms" }},
1142 
1143  /*
1144  * How many times we resend a message
1145  */
1146  { FR_CONF_FUNC("message_retry_max", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
1147  .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.send.max.retries" }},
1148 
1149  /*
1150  * The backoff time in milliseconds before retrying a protocol request.
1151  */
1152  { FR_CONF_FUNC("message_retry_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
1153  .uctx = &(fr_kafka_conf_ctx_t){ .property = "retry.backoff.ms" }},
1154 
1155  /*
1156  * The threshold of outstanding not yet transmitted broker requests
1157  * needed to backpressure the producer's message accumulator.
1158  */
1159  { FR_CONF_FUNC("backpressure_threshold", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
1160  .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.backpressure.threshold" }},
1161 
1162  /*
1163  * compression codec to use for compressing message sets.
1164  */
1166  .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.type" }},
1167 
1168  /*
1169  * Maximum size (in bytes) of all messages batched in one MessageSet
1170  */
1172  .uctx = &(fr_kafka_conf_ctx_t){ .property = "batch.size" }},
1173 
1174  /*
1175  * Delay in milliseconds to wait to assign new sticky partitions for each topic
1176  */
1177  { FR_CONF_FUNC("sticky_partition_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
1178  .uctx = &(fr_kafka_conf_ctx_t){ .property = "sticky.partitioning.linger.ms" }},
1179 
1181 
1182  CONF_PARSER_TERMINATOR
1183 };
while(1)
Definition: acutest.h:856
#define L(_str)
Helper for initialising arrays of string literals.
Definition: build.h:207
#define UNUSED
Definition: build.h:313
#define NUM_ELEMENTS(_t)
Definition: build.h:335
int cf_pair_to_value_box(TALLOC_CTX *ctx, fr_value_box_t *out, CONF_PAIR *cp, conf_parser_t const *rule)
Parses a CONF_PAIR into a boxed value.
Definition: cf_parse.c:126
#define CONF_PARSER_TERMINATOR
Definition: cf_parse.h:627
void const * uctx
User data accessible by the cf_parse_t func.
Definition: cf_parse.h:587
#define FR_CONF_SUBSECTION_GLOBAL(_name, _flags, _subcs)
conf_parser_t entry which runs conf_parser_t entries for a subsection without any output
Definition: cf_parse.h:374
fr_type_t type
An fr_type_t value, controls the output type.
Definition: cf_parse.h:568
#define fr_rule_multi(_rule)
Definition: cf_parse.h:450
char const * name1
Name of the CONF_ITEM to parse.
Definition: cf_parse.h:565
@ CONF_FLAG_MULTI
CONF_PAIR can have multiple copies.
Definition: cf_parse.h:419
@ CONF_FLAG_SECRET
Only print value if debug level >= 3.
Definition: cf_parse.h:409
@ CONF_FLAG_FILE_INPUT
File matching value must exist, and must be readable.
Definition: cf_parse.h:411
#define FR_CONF_FUNC(_name, _type, _flags, _func, _dflt_func)
conf_parser_t entry which doesn't fill in a pointer or offset, but relies on functions to record valu...
Definition: cf_parse.h:361
Defines a CONF_PAIR to C data type mapping.
Definition: cf_parse.h:564
Internal data that is associated with a configuration section.
Definition: cf_priv.h:124
Common header for all CONF_* types.
Definition: cf_priv.h:49
Configuration AVP similar to a fr_pair_t.
Definition: cf_priv.h:70
A section grouping multiple CONF_PAIR.
Definition: cf_priv.h:101
CONF_PAIR * cf_item_to_pair(CONF_ITEM const *ci)
Cast a CONF_ITEM to a CONF_PAIR.
Definition: cf_util.c:664
unsigned int cf_pair_count(CONF_SECTION const *cs, char const *attr)
Count the number of times an attribute occurs in a parent section.
Definition: cf_util.c:1520
char const * cf_pair_value(CONF_PAIR const *pair)
Return the value of a CONF_PAIR.
Definition: cf_util.c:1594
void cf_pair_mark_parsed(CONF_PAIR *cp)
Mark a pair as parsed.
Definition: cf_util.c:1376
void * cf_data_value(CONF_DATA const *cd)
Return the user assigned value of CONF_DATA.
Definition: cf_util.c:1763
CONF_SECTION * cf_item_to_section(CONF_ITEM const *ci)
Cast a CONF_ITEM to a CONF_SECTION.
Definition: cf_util.c:684
CONF_PAIR * cf_pair_find_next(CONF_SECTION const *cs, CONF_PAIR const *prev, char const *attr)
Find a pair with a name matching attr, after specified pair.
Definition: cf_util.c:1453
CONF_PAIR * cf_pair_alloc(CONF_SECTION *parent, char const *attr, char const *value, fr_token_t op, fr_token_t lhs_quote, fr_token_t rhs_quote)
Allocate a CONF_PAIR.
Definition: cf_util.c:1279
#define cf_log_err(_cf, _fmt,...)
Definition: cf_util.h:289
#define cf_data_add(_cf, _data, _name, _free)
Definition: cf_util.h:255
#define cf_data_find(_cf, _type, _name)
Definition: cf_util.h:244
#define cf_parent(_cf)
Definition: cf_util.h:101
#define cf_log_perr(_cf, _fmt,...)
Definition: cf_util.h:296
#define cf_log_debug(_cf, _fmt,...)
Definition: cf_util.h:292
#define CF_IDENT_ANY
Definition: cf_util.h:78
Test enumeration values.
Definition: dict_test.h:92
static conf_parser_t const kafka_connection_config[]
Definition: base.c:684
static conf_parser_t const kafka_base_consumer_topics_config[]
Definition: base.c:924
static int kafka_topic_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
Return the default value for a topic from the kafka client library.
Definition: base.c:292
static size_t kafka_check_cert_cn_table_len
Definition: base.c:623
size_t size_scale
Divide/multiply FR_TYPE_SIZE by this amount.
Definition: base.c:47
static conf_parser_t const kafka_base_consumer_topic_config[]
Definition: base.c:895
rd_kafka_conf_t * conf
Definition: base.c:33
rd_kafka_topic_conf_t * conf
Definition: base.c:37
#define BASE_CONFIG
Definition: base.c:829
static fr_kafka_topic_conf_t * kafka_topic_conf_from_cs(CONF_SECTION *cs)
Definition: base.c:97
static int kafka_config_parse(TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Translate config items directly to settings in a kafka config struct.
Definition: base.c:429
conf_parser_t const kafka_base_consumer_config[]
Definition: base.c:930
char const * property
Kafka configuration property.
Definition: base.c:49
static int _kafka_topic_conf_free(fr_kafka_topic_conf_t *ktc)
Destroy a kafka topic configuration handle.
Definition: base.c:90
static conf_parser_t const kafka_sasl_oauth_config[]
Definition: base.c:547
static fr_table_ptr_sorted_t kafka_check_cert_cn_table[]
Definition: base.c:617
static conf_parser_t const kafka_version_config[]
Definition: base.c:761
char const * string_sep
Used for multi-value configuration items.
Definition: base.c:51
static conf_parser_t const kafka_consumer_group_config[]
Definition: base.c:854
static conf_parser_t const kafka_tls_config[]
Definition: base.c:625
static int kafka_config_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
Return the default value from the kafka client library.
Definition: base.c:210
static conf_parser_t const kafka_base_producer_topics_config[]
Definition: base.c:1086
static int kafka_topic_config_parse(UNUSED TALLOC_CTX *ctx, UNUSED void *out, UNUSED void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Translate config items directly to settings in a kafka topic config struct.
Definition: base.c:501
bool empty_default
Don't produce messages saying the default is missing.
Definition: base.c:45
conf_parser_t const kafka_base_producer_config[]
Definition: base.c:1092
static int _kafka_conf_free(fr_kafka_conf_t *kc)
Destroy a kafka configuration handle.
Definition: base.c:60
static conf_parser_t const kafka_metadata_config[]
Definition: base.c:789
size_t * mapping_len
Length of the mapping tables.
Definition: base.c:43
fr_table_ptr_sorted_t * mapping
Mapping table between string constant.
Definition: base.c:41
static conf_parser_t const kafka_sasl_kerberos_config[]
Definition: base.c:557
static conf_parser_t const kafka_base_producer_topic_config[]
Definition: base.c:1035
static int kafka_config_parse_single(char const **out, CONF_PAIR *cp, conf_parser_t const *rule)
Definition: base.c:330
static int kafka_config_dflt_single(CONF_PAIR **out, UNUSED void *parent, CONF_SECTION *cs, char const *value, fr_token_t quote, conf_parser_t const *rule)
Perform any conversions necessary to map kafka defaults to our values.
Definition: base.c:127
static fr_kafka_conf_t * kafka_conf_from_cs(CONF_SECTION *cs)
Definition: base.c:67
static conf_parser_t const kafka_sasl_config[]
Definition: base.c:591
talloc_free(reap)
size_t fr_sbuff_out_unescape_until(fr_sbuff_t *out, fr_sbuff_t *in, size_t len, fr_sbuff_term_t const *tt, fr_sbuff_unescape_rules_t const *u_rules)
Definition: merged_model.c:177
fr_type_t
Definition: merged_model.c:80
@ FR_TYPE_TIME_DELTA
A period of time measured in nanoseconds.
Definition: merged_model.c:113
@ FR_TYPE_INT8
8 Bit signed integer.
Definition: merged_model.c:103
@ FR_TYPE_STRING
String of printable characters.
Definition: merged_model.c:83
@ FR_TYPE_INT16
16 Bit signed integer.
Definition: merged_model.c:104
@ FR_TYPE_UINT32
32 Bit unsigned integer.
Definition: merged_model.c:99
@ FR_TYPE_UINT64
64 Bit unsigned integer.
Definition: merged_model.c:100
@ FR_TYPE_BOOL
A truth value.
Definition: merged_model.c:95
@ FR_TYPE_SIZE
Unsigned integer capable of representing any memory address on the local system.
Definition: merged_model.c:115
unsigned char uint8_t
Definition: merged_model.c:30
ssize_t fr_slen_t
Definition: merged_model.c:35
static size_t array[MY_ARRAY_SIZE]
static rs_t * conf
Definition: radsniff.c:53
ssize_t fr_sbuff_in_sprintf(fr_sbuff_t *sbuff, char const *fmt,...)
Print using a fmt string to an sbuff.
Definition: sbuff.c:1573
#define fr_sbuff_start(_sbuff_or_marker)
#define FR_SBUFF_IN(_start, _len_or_end)
char const * name
Name for rule set to aid we debugging.
Definition: sbuff.h:177
#define fr_sbuff_advance(_sbuff_or_marker, _len)
#define FR_SBUFF_OUT(_start, _len_or_end)
char subs[UINT8_MAX+1]
Special characters and their substitutions.
Definition: sbuff.h:180
#define FR_SBUFF_TERM(_str)
Initialise a terminal structure with a single string.
Definition: sbuff.h:155
#define FR_SBUFF_TALLOC_THREAD_LOCAL(_out, _init, _max)
Set of terminal elements.
Definition: merged_model.c:161
Set of parsing rules for *unescape_until functions.
Definition: merged_model.c:163
fr_slen_t fr_size_from_str(size_t *out, fr_sbuff_t *in)
Parse a size string with optional unit.
Definition: size.c:40
fr_slen_t fr_size_to_str(fr_sbuff_t *out, size_t in)
Print a size string with unit.
Definition: size.c:155
static char buff[sizeof("18446744073709551615")+3]
Definition: size_tests.c:41
return count
Definition: module.c:163
fr_assert(0)
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
fr_aka_sim_id_type_t type
#define fr_table_value_by_str(_table, _name, _def)
Convert a string to a value using a sorted or ordered table.
Definition: table.h:653
#define fr_table_str_by_str_value(_table, _str_value, _def)
Brute force search a sorted or ordered ptr table, assuming the pointers are strings.
Definition: table.h:625
An element in a lexicographically sorted array of name to ptr mappings.
Definition: table.h:65
fr_slen_t talloc_array_concat(fr_sbuff_t *out, char const *const *array, char const *sep)
Concat an array of strings (not NULL terminated), with a string separator.
Definition: talloc.c:914
fr_slen_t fr_time_delta_from_str(fr_time_delta_t *out, char const *in, size_t inlen, fr_time_res_t hint)
Create fr_time_delta_t from a string.
Definition: time.c:445
fr_slen_t fr_time_delta_to_str(fr_sbuff_t *out, fr_time_delta_t delta, fr_time_res_t res, bool is_unsigned)
Print fr_time_delta_t to a string with an appropriate suffix.
Definition: time.c:468
@ FR_TIME_RES_MSEC
Definition: time.h:58
@ FR_TIME_RES_SEC
Definition: time.h:50
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
Definition: time.h:637
A time delta, a difference in time measured in nanoseconds.
Definition: time.h:80
enum fr_token fr_token_t
@ T_BARE_WORD
Definition: token.h:120
@ T_OP_EQ
Definition: token.h:83
static fr_slen_t parent
Definition: pair.h:851
#define fr_type_is_string(_x)
Definition: types.h:327
void fr_value_box_clear(fr_value_box_t *data)
Clear/free any existing value and metadata.
Definition: value.c:3723
#define FR_VALUE_BOX_INITIALISER_NULL(_vb)
A static initialiser for stack/globally allocated boxes.
Definition: value.h:488
static size_t char ** out
Definition: value.h:997