The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
rlm_kafka.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: d4d831e7f9306c4fa151ca65044ef7eec0cbda7b $
19 * @file rlm_kafka.c
20 * @brief Asynchronous Kafka producer module.
21 *
22 * A single shared `rd_kafka_t` serves every worker in the module
23 * instance. Delivery reports are fanned out to the originating worker
24 * via librdkafka's own background thread: at `mod_instantiate` we set
25 * `rd_kafka_conf_set_background_event_cb` and forward the producer's
26 * main queue to the background queue, so DRs arrive at our bg cb
27 * (`_kafka_background_event_cb`). The cb pushes each DR's opaque onto
28 * the originating worker's `fr_atomic_ring_t` mailbox and triggers its
29 * `EVFILT_USER` wake event; the worker's event loop drains the mailbox
30 * on its own stack and marks the request runnable itself - resumption
31 * never crosses threads.
32 *
33 * pctx is `malloc`'d, not talloc'd, so the bg thread can free it
34 * directly without racing worker-thread talloc. `pctx->request` is
35 * atomic and is NULLed by the cancel signal handler on the worker;
36 * by the time `mod_thread_detach` runs the framework has already
37 * cancelled every yielded request this worker owned, so the bg cb
38 * sees NULL and frees inline without touching the (about-to-be-freed)
39 * thread_inst.
40 *
41 * The schema for the module config is just @ref kafka_base_producer_config
42 * from the kafka base library (librdkafka passthrough plus
43 * `flush_timeout`), with `fr_kafka_conf_t` embedded as the first member
44 * of @ref rlm_kafka_t so `FR_CONF_OFFSET` entries resolve correctly.
45 * Topics are declared once in the module config and referenced by name at
46 * method/xlat invocation time; unknown topics are rejected at config-parse
47 * time rather than being created on the fly, so per-topic settings (acks,
48 * compression, partitioner) always match the declared configuration.
49 *
50 * See @ref mod_produce and @ref kafka_xlat_produce for the caller-facing
51 * surfaces.
52 *
53 * @copyright 2022,2026 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
54 */
55RCSID("$Id: d4d831e7f9306c4fa151ca65044ef7eec0cbda7b $")
56
58
59#include <freeradius-devel/util/debug.h>
60#include <freeradius-devel/util/dlist.h>
61#include <freeradius-devel/util/misc.h>
62#include <freeradius-devel/util/rb.h>
63#include <freeradius-devel/util/types.h>
64#include <freeradius-devel/util/value.h>
65
66#include <freeradius-devel/io/atomic_queue.h>
67#include <freeradius-devel/kafka/base.h>
68
69#include <freeradius-devel/server/base.h>
70#include <freeradius-devel/server/module_rlm.h>
71
72#include <freeradius-devel/unlang/call_env.h>
73#include <freeradius-devel/unlang/module.h>
74#include <freeradius-devel/unlang/xlat.h>
75#include <freeradius-devel/unlang/xlat_ctx.h>
76#include <freeradius-devel/unlang/xlat_func.h>
77#include <freeradius-devel/unlang/xlat_priv.h>
78
79#include <fcntl.h>
80#include <pthread.h>
81#include <stdatomic.h>
82#include <stdlib.h>
83#include <unistd.h>
84
85/** Module instance data
86 *
87 * `fr_kafka_conf_t` is embedded as the first member so
88 * `KAFKA_BASE_CONFIG` / `KAFKA_PRODUCER_CONFIG` `FR_CONF_OFFSET`
89 * entries (relative to `fr_kafka_conf_t`) address correctly when the
90 * framework passes `rlm_kafka_t` as the parse base. The librdkafka
91 * handle and per-topic tree inside `kconf` are released automatically
92 * by talloc when the module instance is torn down (the library attaches
93 * a lifecycle sentinel during config parse).
94 */
95typedef struct {
96 fr_kafka_conf_t kconf; //!< parsed producer conf - MUST be first
97 fr_time_delta_t flush_timeout; //!< How long `mod_detach` waits for in-flight
98 //!< produces to drain before `rd_kafka_destroy`.
99 char const *log_prefix; //!< pre-rendered `"rlm_kafka (<instance>)"`, used by
100 //!< librdkafka's log_cb which fires from internal
101 //!< threads with no mctx in scope. Built once in
102 //!< mod_instantiate so we don't reformat per line.
103
104 rd_kafka_t *rk; //!< shared producer, created at mod_instantiate.
105 fr_rb_tree_t *topics; //!< rlm_kafka_topic_t keyed by name, read-only
106 //!< after mod_instantiate.
108
109/** Topic handle
110 *
111 * One per declared topic, created at mod_instantiate. `rd_kafka_topic_t`
112 * is bound to the producer that created it; we have one shared producer
113 * per module instance so the topic handles are inst-scoped.
114 */
115typedef struct {
116 char const *name;
117 rd_kafka_topic_t *kt;
120
121typedef struct rlm_kafka_thread_s {
123
124 fr_event_user_t *wake; //!< EVFILT_USER handle; bg cb triggers it to wake the
125 //!< worker's event loop on this thread.
126
127 fr_atomic_ring_t *queue; //!< rlm_kafka_msg_ctx_t pushed by bg cb on librdkafka's
128 //!< thread, popped by our event loop on this worker.
129 //!< Segmented SPSC ring: grows on demand, so the bg cb
130 //!< never has to drop a delivery report.
131
132#ifndef NDEBUG
133 pthread_t worker_tid; //!< pthread_self() captured at thread_instantiate.
134 //!< Debug-build sanity check: the bg cb must NOT run on
135 //!< this thread, and the mailbox drain must.
136#endif
138
139/** Per produce() invocation context
140 *
141 * Raw `malloc`'d (not talloc) so the background dispatch thread can
142 * `free()` it directly without racing worker-thread talloc activity.
143 * `request` is atomic because the cancel signal handler (worker) and
144 * bg cb (librdkafka's own thread) access it from different threads.
145 * Reused as the rctx for both module-method and xlat invocations so
146 * we don't need a separate wrapper.
147 */
148typedef struct {
149 _Atomic(request_t *) request; //!< NULL once cancelled; bg cb / mailbox drain frees
150 //!< when NULL, resume path frees on success.
151 rlm_kafka_thread_t *target; //!< worker's thread_inst; bg cb pushes to its mailbox
152 //!< and writes its wake pipe.
153 rd_kafka_resp_err_t err; //!< stashed by bg cb for resume
154 int32_t partition;
155 int64_t offset;
157
158/** Call env for `kafka.produce.<topic>`
159 *
160 * Topic comes from the method's second identifier and is validated
161 * against the declared-topic list at call_env parse time, then stashed
162 * here as a plain name string.
163 */
164typedef struct {
165 char const *topic; //!< resolved topic name (validated at parse time)
166 fr_value_box_t *key; //!< optional message key
167 fr_value_box_t *value; //!< message payload
169
170/** Module config: just the kafka base producer config for now
171 *
172 * Kept as a local array rather than pointing `common.config` directly
173 * at `KAFKA_BASE_PRODUCER_CONFIG` so we can drop in rlm_kafka-specific
174 * entries (or additional librdkafka properties) alongside it later
175 * without touching the library.
176 */
177static conf_parser_t const module_config[] = {
180
181 /*
182 * How long `mod_detach` waits for the shared producer's
183 * outstanding produces to drain before `rd_kafka_destroy`.
184 * Module-level (not a librdkafka property) so we own the
185 * CONF_PARSER entry rather than the kafka base library.
186 */
187 { FR_CONF_OFFSET("flush_timeout", rlm_kafka_t, flush_timeout), .dflt = "5s" },
188
190};
191
192/** @param[in] a rlm_kafka_topic_t
193 * @param[in] b same.
194 * @return `strcmp` ordering of `a->name` and `b->name`. */
195static int8_t topic_name_cmp(void const *a, void const *b)
196{
197 rlm_kafka_topic_t const *ta = a;
198 rlm_kafka_topic_t const *tb = b;
199 return CMP(strcmp(ta->name, tb->name), 0);
200}
201
202/** Look up a shared topic handle on the module instance by name
203 *
204 * @param[in] inst module instance.
205 * @param[in] name topic name (must have been declared at config time).
206 * @return `rd_kafka_topic_t` if found, `NULL` otherwise.
207 */
208static inline CC_HINT(always_inline)
209rd_kafka_topic_t *kafka_find_topic(rlm_kafka_t const *inst, char const *name)
210{
211 rlm_kafka_topic_t key = { .name = name };
213
214 if (!inst->topics) return NULL;
215
216 h = fr_rb_find(inst->topics, &key);
217 return h ? h->kt : NULL;
218}
219
220/** librdkafka log callback - bridge internal library messages into the server log
221 *
222 * Called from librdkafka's internal threads (no request context, no mctx in
223 * scope), so we pull the pre-rendered log prefix off the producer's opaque
224 * pointer (the `rlm_kafka_t` we attached at mod_instantiate).
225 * Which librdkafka categories are actually emitted is controlled by the
226 * top-level `debug` config knob.
227 *
228 * @param[in] rk producer handle. `rd_kafka_opaque(rk)` is the
229 * `rlm_kafka_t` set during mod_instantiate.
230 * @param[in] level syslog-style severity (0 emerg .. 7 debug).
231 * @param[in] fac librdkafka facility / category, e.g. `BROKER`, `MSG`.
232 * @param[in] buf pre-formatted message body.
233 */
234static void _kafka_log_cb(rd_kafka_t const *rk, int level, char const *fac, char const *buf)
235{
236 rlm_kafka_t *inst = talloc_get_type_abort(rd_kafka_opaque(rk), rlm_kafka_t);
237
238 switch (level) {
239 case 0: /* LOG_EMERG */
240 case 1: /* LOG_ALERT */
241 case 2: /* LOG_CRIT */
242 case 3: /* LOG_ERR */
243 ERROR("%s - %s - %s", inst->log_prefix, fac, buf);
244 break;
245
246 case 4: /* LOG_WARNING */
247 WARN("%s - %s - %s", inst->log_prefix, fac, buf);
248 break;
249
250 case 5: /* LOG_NOTICE */
251 case 6: /* LOG_INFO */
252 INFO("%s - %s - %s", inst->log_prefix, fac, buf);
253 break;
254
255 default: /* LOG_DEBUG and anything else */
256 DEBUG("%s - %s - %s", inst->log_prefix, fac, buf);
257 break;
258 }
259}
260
261/** Worker wake-up callback - the bg cb triggered our EVFILT_USER event
262 *
263 * Pops everything sitting in the mailbox and dispatches each pctx on
264 * this worker's stack.
265 *
266 * @param[in] el UNUSED.
267 * @param[in] uctx `rlm_kafka_thread_t` pointer.
268 */
269static void _kafka_wake(UNUSED fr_event_list_t *el, void *uctx)
270{
271 rlm_kafka_thread_t *t = talloc_get_type_abort(uctx, rlm_kafka_thread_t);
273
274#ifndef NDEBUG
275 fr_assert(pthread_equal(pthread_self(), t->worker_tid) != 0);
276#endif
277
278 while (fr_atomic_ring_pop(t->queue, (void **)&pctx)) {
279 /* See kafka_delivery_notification() for why relaxed is sufficient here. */
280 request_t *request = atomic_load_explicit(&pctx->request, memory_order_relaxed);
281
282 if (!request) {
283 free(pctx);
284 continue;
285 }
287 }
288}
289
290/** Translate a librdkafka delivery-report error into a module rcode
291 *
292 * @param[in] request associated request (for logging).
293 * @param[in] pctx produce context with the stashed error.
294 * @return an `rlm_rcode_t` summarising the outcome.
295 */
297{
298 switch (pctx->err) {
299 case RD_KAFKA_RESP_ERR_NO_ERROR:
300 RDEBUG2("Delivered to partition %" PRId32 " offset %" PRId64, pctx->partition, pctx->offset);
301 return RLM_MODULE_OK;
302
303 case RD_KAFKA_RESP_ERR__MSG_TIMED_OUT:
304 case RD_KAFKA_RESP_ERR__TIMED_OUT:
305 case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE:
306 REDEBUG("Kafka delivery timed out - %s", rd_kafka_err2str(pctx->err));
307 return RLM_MODULE_TIMEOUT;
308
309 case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE:
310 case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
311 case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED:
312 case RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC:
313 REDEBUG("Kafka rejected message - %s", rd_kafka_err2str(pctx->err));
314 return RLM_MODULE_REJECT;
315
316 default:
317 REDEBUG("Kafka delivery failed - %s (%s)",
318 rd_kafka_err2name(pctx->err), rd_kafka_err2str(pctx->err));
319 return RLM_MODULE_FAIL;
320 }
321}
322
323/** Common produce-and-yield helper
324 *
325 * Submits a message to the shared producer and returns the produce
326 * context on success. The caller is responsible for yielding the
327 * request with the returned pctx as rctx. On synchronous failure the
328 * pctx is freed and `NULL` is returned.
329 *
330 * pctx is `malloc`'d (not talloc'd) so the bg cb can `free()` it
331 * directly without racing worker-thread talloc.
332 *
333 * @param[in] t originating worker's thread_inst.
334 * @param[in] request request to yield on.
335 * @param[in] topic preconfigured inst-scoped topic handle.
336 * @param[in] key optional message key, may be `NULL`.
337 * @param[in] key_len length of `key`, 0 if `key` is `NULL`.
338 * @param[in] value message payload.
339 * @param[in] value_len length of `value`.
340 * @return the rlm_kafka_msg_ctx_t tracking the request, or `NULL` on failure.
341 */
342static inline CC_HINT(always_inline)
344 rd_kafka_topic_t *topic,
345 uint8_t const *key, size_t key_len,
346 uint8_t const *value, size_t value_len)
347{
349
350 MEM(pctx = malloc(sizeof(*pctx)));
351 pctx->target = t;
352 pctx->err = RD_KAFKA_RESP_ERR_NO_ERROR;
353 pctx->partition = RD_KAFKA_PARTITION_UA;
354 pctx->offset = -1;
355 atomic_init(&pctx->request, request);
356
357 if (unlikely(rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
358 /* librdkafka copies under MSG_F_COPY */
359 (void *)(uintptr_t) value, value_len,
360 key, key_len,
361 pctx) != 0)) {
362 rd_kafka_resp_err_t err = rd_kafka_last_error();
363
364 free(pctx);
365
366 REDEBUG("Failed enqueuing message - %s", rd_kafka_err2str(err));
367 return NULL;
368 }
369
370 return pctx;
371}
372
373/** Per-topic call_env rules, applied against the `topic <name>` subsection
374 *
375 * Invoked recursively from @ref _kafka_topic_env_parse via `call_env_parse()`
376 * so the framework handles pair lookup / tmpl compilation / offset writes
377 * for us.
378 *
379 * Both `value` and `key` are typed `FR_TYPE_OCTETS`. Kafka payloads and
380 * keys are opaque byte strings on the wire, and casting to octets keeps
381 * binary content (embedded NULs, high-bit bytes) intact without any
382 * UTF-8/string-termination assumptions creeping in from intermediate
383 * tmpl expansion. It also means an integer-typed `key` attribute is
384 * serialised in network byte order, which matches the keying convention
385 * other Kafka clients use so the same numeric key hashes to the same
386 * partition across producers.
387 */
395
396/** Resolve the topic named in the method's second identifier, then hand its
397 * subsection back to the call_env framework for per-topic `value` / `key`
398 * tmpl parsing.
399 *
400 * Invocations look like `kafka.produce.<topic_name>`. We:
401 *
402 * 1. Validate the topic against the declared-topic tree. Unknown topics
403 * fail here so typos surface at startup instead of at first produce.
404 * 2. Emit a synthetic call_env entry carrying the topic name.
405 * 3. Recurse into `call_env_parse()` with @ref topic_env pointed at the
406 * topic's CONF_SECTION - the framework walks `value` and `key` for us.
407 *
408 * Per-topic `value` and `key` means each declared topic carries its own
409 * payload template; operators can publish different shapes to different
410 * topics from one module instance.
411 */
412static int _kafka_topic_env_parse(TALLOC_CTX *ctx, call_env_parsed_head_t *out,
413 tmpl_rules_t const *t_rules, CONF_ITEM *ci,
414 call_env_ctx_t const *cec, UNUSED call_env_parser_t const *rule)
415{
417 fr_kafka_topic_t *topic;
418 call_env_parsed_t *parsed;
419 char const *topic_name = cec->asked->name2;
420
421 if (!topic_name) {
422 cf_log_err(ci, "kafka.produce requires a topic name, e.g. kafka.produce.<topic>");
423 return -1;
424 }
425
426 topic = kafka_topic_conf_find(&inst->kconf, topic_name);
427 if (!topic) {
428 cf_log_err(ci, "Kafka topic '%s' is not declared in the '%s' module config",
429 topic_name, cec->mi->name);
430 return -1;
431 }
432
433 /*
434 * Topic name (plain string).
435 */
436 MEM(parsed = call_env_parsed_add(ctx, out,
438 .name = "topic",
440 .pair = {
441 .parsed = {
442 .offset = offsetof(rlm_kafka_env_t, topic),
444 }
445 }
446 }));
447 call_env_parsed_set_data(parsed, talloc_strdup(ctx, topic_name));
448
449 /*
450 * Framework walks `value` / `key` inside the topic subsection
451 * according to topic_env.
452 */
453 return call_env_parse(ctx, out, "kafka", t_rules, topic->cs, cec, topic_env);
454}
455
464
465/** Resume a yielded module method after its delivery report has arrived
466 *
467 * Runs on the same worker as the originating produce (per-thread
468 * producer), with `mctx->rctx` being the @ref rlm_kafka_msg_ctx_t the
469 * method stashed on yield. Translates the dr_msg_cb-populated error
470 * into an rcode, frees the pctx, and hands control back to unlang.
471 *
472 * @param[out] p_result where to write the resulting rcode.
473 * @param[in] mctx module ctx carrying the pctx as rctx.
474 * @param[in] request the request being resumed.
475 * @return `UNLANG_ACTION_CALCULATE_RESULT` always.
476 */
477static unlang_action_t mod_resume(unlang_result_t *p_result, module_ctx_t const *mctx, request_t *request)
478{
479 rlm_kafka_msg_ctx_t *pctx = mctx->rctx; /* malloc'd, not talloc */
480 rlm_rcode_t rcode = kafka_err_to_rcode(request, pctx);
481
482 free(pctx);
483 RETURN_UNLANG_RCODE(rcode);
484}
485
486/** Module-method cancellation
487 *
488 * Do NOT free the pctx - librdkafka still owns the in-flight message and
489 * will fire a DR later with our opaque pointer. Atomic-store NULL to
490 * `pctx->request` so the bg cb sees the cancellation when it unpacks
491 * the DR and frees the pctx inline without touching the mailbox.
492 *
493 * Cross-thread: the store is release, matched by the acquire-load in
494 * `_kafka_background_event_cb`.
495 *
496 * @param[in] mctx module ctx with pctx as rctx.
497 * @param[in] request associated request.
498 * @param[in] action UNUSED (we mask off everything except CANCEL).
499 */
500static void mod_signal(module_ctx_t const *mctx, request_t *request, UNUSED fr_signal_t action)
501{
502 rlm_kafka_msg_ctx_t *pctx = mctx->rctx;
503
504 RDEBUG2("Cancellation signal received - detaching delivery report");
505 atomic_store_explicit(&pctx->request, NULL, memory_order_release);
506}
507
508/** Module method entry point for `kafka.produce.<topic>`
509 *
510 * The topic is the method's second identifier; `key` and `value` are
511 * pulled from the module config via @ref rlm_kafka_produce_env:
512 *
513 * @code
514 * kafka {
515 * server = "broker1:9092"
516 * topic {
517 * Accounting-Request {
518 * request_required_acks = -1
519 * value = %json.encode(&request.[*])
520 * key = &User-Name
521 * }
522 * }
523 * }
524 *
525 * recv Accounting-Request {
526 * kafka
527 * }
528 * @endcode
529 *
530 * The topic name in `name2` is resolved against the declared-topic
531 * tree at config-parse time (see @ref _kafka_topic_env_parse), so
532 * typos fail fast. Different topics reuse the same module instance -
533 * e.g. `kafka.produce.auth` and `kafka.produce.accounting` both
534 * dispatch through the same per-worker producer handle.
535 *
536 * Runtime behaviour: looks up the per-thread `rd_kafka_topic_t` by the
537 * parse-time-resolved topic declaration, extracts the expanded
538 * `key`/value tmpls from the call_env, hands everything to
539 * @ref kafka_produce_enqueue, and yields until the delivery report
540 * arrives (see @ref kafka_produce_resume for rcode mapping).
541 *
542 * @param[out] p_result UNUSED (resume writes the real rcode).
543 * @param[in] mctx module ctx (mctx->thread is the rlm_kafka_thread_t,
544 * mctx->env_data is the rlm_kafka_env_t).
545 * @param[in] request the request being handled.
546 * @return yielded on success, UNLANG_ACTION_FAIL if the produce couldn't
547 * even be enqueued.
548 */
550 module_ctx_t const *mctx, request_t *request)
551{
553 rlm_kafka_thread_t *t = talloc_get_type_abort(mctx->thread, rlm_kafka_thread_t);
554 rlm_kafka_env_t *env = talloc_get_type_abort(mctx->env_data, rlm_kafka_env_t);
555 rd_kafka_topic_t *topic;
557
558 uint8_t const *key = NULL;
559 size_t key_len = 0;
560
561 topic = kafka_find_topic(inst, env->topic);
562 if (unlikely(!topic)) {
563 /*
564 * Can't happen if parsing succeeded, but defensive.
565 */
566 REDEBUG("Kafka topic '%s' has no handle on this module instance", env->topic);
568 }
569
570 if (env->key) {
571 key = env->key->vb_octets;
572 key_len = env->key->vb_length;
573 }
574
575 pctx = kafka_produce_enqueue(t, request, topic,
576 key, key_len,
577 env->value->vb_octets, env->value->vb_length);
578 if (unlikely(!pctx)) RETURN_UNLANG_FAIL;
579
581 ~FR_SIGNAL_CANCEL, pctx);
582}
583
584/** Xlat instance data - cached topic handle for literal-topic calls
585 *
586 * Topics are inst-scoped (shared producer), so the lookup can be
587 * resolved once at xlat_instantiate and the `rd_kafka_topic_t *`
588 * cached directly - no per-thread hop needed.
589 */
590typedef struct {
591 rd_kafka_topic_t *topic; //!< pre-resolved handle, NULL if topic arg is dynamic.
593
594/** Xlat instance init: if the topic arg is a compile-time literal, resolve
595 * and cache the inst-scoped `rd_kafka_topic_t` handle.
596 *
597 * Runs after `mod_instantiate` has created the shared producer and all
598 * topic handles, so the lookup is an rbtree walk against `inst->topics`.
599 * Dynamic topic args (attribute refs, nested xlats) are left to the
600 * per-call lookup in the xlat runtime; validation happens there.
601 */
603{
604 rlm_kafka_xlat_inst_t *inst = talloc_get_type_abort(xctx->inst, rlm_kafka_xlat_inst_t);
606 xlat_exp_t *topic_arg;
607 xlat_exp_t const *topic_node;
608 char const *topic_name;
610
611 /*
612 * ex is the XLAT_FUNC node; its args are wrapped as
613 * XLAT_GROUP children, one per positional argument.
614 */
615 topic_arg = xlat_exp_head(xctx->ex->call.args);
616 if (!topic_arg || topic_arg->type != XLAT_GROUP) return 0;
617
618 if (!xlat_is_literal(topic_arg->group)) return 0;
619
620 topic_node = xlat_exp_head(topic_arg->group);
621 if (!topic_node) return 0;
622
623 /*
624 * Attempt to cast to a string
625 */
626 if (topic_node->data.type != FR_TYPE_STRING) {
627 if (unlikely(fr_value_box_cast(inst, &topic_vb, FR_TYPE_STRING, NULL, &topic_node->data) < 0)) {
628 PERROR("First argument of %%<module>.produce() must be stringlike");
629 return -1;
630 }
631 topic_name = topic_vb.vb_strvalue;
632 } else {
633 topic_name = topic_node->data.vb_strvalue;
634 }
635
636 /*
637 * Resolve to the inst-scoped handle now so the xlat
638 * runtime can skip the rbtree walk. Unknown topics fail
639 * here at config-compile time.
640 */
641 inst->topic = kafka_find_topic(mod_inst, topic_name);
642 if (!inst->topic) {
643 cf_log_err(xctx->mctx->mi->conf,
644 "Kafka topic '%s' is not declared in the '%s' module config",
645 topic_name, xctx->mctx->mi->name);
646 fr_value_box_clear_value(&topic_vb);
647 return -1;
648 }
649 fr_value_box_clear_value(&topic_vb);
650
651 return 0;
652}
653
654/** Xlat resume: translate delivery report into a "partition:offset" string
655 *
656 * @param[in] xctx_ctx talloc context for the returned value box.
657 * @param[in] out cursor to append the result to.
658 * @param[in] xctx xlat ctx, rctx points at the rlm_kafka_msg_ctx_t.
659 * @param[in] request associated request (for logging).
660 * @param[in] in UNUSED (original args).
661 */
663 xlat_ctx_t const *xctx,
664 request_t *request, UNUSED fr_value_box_list_t *in)
665{
666 rlm_kafka_msg_ctx_t *pctx = xctx->rctx; /* malloc'd, not talloc */
667 fr_value_box_t *vb;
668 bool delivered = (pctx->err == RD_KAFKA_RESP_ERR_NO_ERROR);
669
670 if (unlikely(!delivered)) REDEBUG("Kafka produce failed - %s", rd_kafka_err2str(pctx->err));
671
672 MEM(vb = fr_value_box_alloc(xctx_ctx, FR_TYPE_BOOL, NULL));
673 vb->vb_bool = delivered;
675
676 free(pctx);
677 return XLAT_ACTION_DONE;
678}
679
680/** Xlat cancellation
681 *
682 * Same semantics as @ref mod_signal: detach the request from the
683 * in-flight `pctx` so the eventual bg cb discards silently rather than
684 * resuming a cancelled request. The bg cb owns the free.
685 *
686 * @param[in] xctx xlat ctx (xctx->rctx is the rlm_kafka_msg_ctx_t).
687 * @param[in] request UNUSED.
688 * @param[in] action UNUSED (we mask off everything except CANCEL).
689 */
690static void kafka_xlat_produce_signal(xlat_ctx_t const *xctx, UNUSED request_t *request, UNUSED fr_signal_t action)
691{
692 rlm_kafka_msg_ctx_t *pctx = xctx->rctx;
693
694 atomic_store_explicit(&pctx->request, NULL, memory_order_release);
695}
696
698 { .required = true, .concat = true, .type = FR_TYPE_STRING }, /* topic */
699 { .required = false, .concat = true, .type = FR_TYPE_OCTETS }, /* key (null / empty / absent = no key on the wire) */
700 { .required = true, .concat = true, .type = FR_TYPE_OCTETS }, /* value */
702};
703
704/** `%kafka.produce(topic, key, value)` - runtime-named produce
705 *
706 * Unlike the @ref mod_produce method form (which resolves topics at
707 * config-parse time), the xlat takes the topic name as a runtime
708 * argument. Use this when the topic or payload is chosen per-request:
709 *
710 * @code
711 * send Accounting-Response {
712 * if (!%kafka.produce('accounting', %{Acct-Session-Id}, %json.encode(&request.[*]))) {
713 * reject
714 * }
715 * }
716 * @endcode
717 *
718 * `key` is optional. Pass `null`, an empty string, `(octets) ""`, or
719 * an attribute that expands to nothing to produce without a key -
720 * librdkafka then uses the configured partitioner to spread records
721 * across partitions. When a non-empty key is supplied, librdkafka
722 * hashes it to pick a partition, so records with the same key end up
723 * on the same partition and preserve per-key produce order on the
724 * consumer side.
725 *
726 * Returns a bool: `true` on successful delivery, `false` on failure.
727 * The topic must have been declared in the module config (unknown
728 * topics fail the xlat) so librdkafka per-topic settings continue to
729 * apply to whichever topic is selected.
730 *
731 * Runtime behaviour mirrors the method: submit via
732 * @ref kafka_produce_enqueue, yield until the delivery report arrives,
733 * then resume in @ref kafka_xlat_produce_resume.
734 */
736 xlat_ctx_t const *xctx,
737 request_t *request, fr_value_box_list_t *in)
738{
740 rlm_kafka_thread_t *t = talloc_get_type_abort(xctx->mctx->thread, rlm_kafka_thread_t);
742 fr_value_box_t *topic_vb = fr_value_box_list_head(in);
743 fr_value_box_t *key_vb = fr_value_box_list_next(in, topic_vb);
744 fr_value_box_t *value_vb = fr_value_box_list_next(in, key_vb);
745 rd_kafka_topic_t *topic;
747 uint8_t const *key = NULL;
748 size_t key_len = 0;
749
750 /*
751 * The xlat framework enforces the arg contract before calling
752 * us: `required = true` for topic + value, and the required
753 * value slot after key keeps key's position filled even when
754 * the caller passes `null`. Assert the invariant so Coverity
755 * stops flagging the downstream derefs.
756 */
757 fr_assert(topic_vb && key_vb && value_vb);
758
759 /*
760 * Fast path: a literal topic argument was pre-resolved to
761 * an rd_kafka_topic_t at xlat_instantiate time.
762 */
763 topic = xlat_inst->topic;
764 if (!topic) topic = kafka_find_topic(inst, topic_vb->vb_strvalue);
765 if (unlikely(!topic)) {
766 REDEBUG("Kafka topic '%s' is not declared in the module config", topic_vb->vb_strvalue);
767 return XLAT_ACTION_FAIL;
768 }
769
770 /*
771 * `null`, a zero-length literal, or an attribute expanding
772 * to nothing all map to "no key" on the wire - librdkafka
773 * then uses the configured partitioner instead of key-hash
774 * partitioning. The key box itself is always present here -
775 * the required value slot after it forces the caller to
776 * provide three args or fail at arg validation.
777 */
778 if (!fr_type_is_null(key_vb->type) && key_vb->vb_length > 0) {
779 key = key_vb->vb_octets;
780 key_len = key_vb->vb_length;
781 }
782
783 pctx = kafka_produce_enqueue(t, request, topic,
784 key, key_len,
785 value_vb->vb_octets, value_vb->vb_length);
786 if (unlikely(!pctx)) return XLAT_ACTION_FAIL;
787
789 ~FR_SIGNAL_CANCEL, pctx);
790}
791
792/** Background event callback, runs on librdkafka's bg thread
793 *
794 * Dispatches delivery reports to the originating worker's mailbox.
795 * Runs on a librdkafka-owned thread, so MUST NOT touch talloc (races
796 * worker-thread allocs) or the FR logger (`fr_log` is talloc-backed).
797 * Allowed: plain pointer deref, atomic load/store, `fr_atomic_ring_push`,
798 * `fr_event_user_trigger`, `malloc`/`free`, `rd_kafka_event_destroy`.
799 *
800 * For each DR:
801 * 1. Acquire-load `pctx->request`. If NULL the request was cancelled
802 * via the signal handler - just `free(pctx)` inline; no thread_inst
803 * or mailbox access. Safe even if the owning worker has since
804 * detached.
805 * 2. Otherwise stash err / partition / offset, push onto
806 * `target->mailbox`, and `fr_event_user_trigger()` the worker's
807 * wake event. The worker is guaranteed alive because cancellation
808 * happens before thread_detach and any still-live request pins
809 * its worker.
810 *
811 * `NULL` opaque indicates a fire-and-forget produce - nothing to do.
812 *
813 * @param[in] rk UNUSED.
814 * @param[in] ev librdkafka event batch; destroyed at end.
815 * @param[in] uctx UNUSED (we don't need the inst here).
816 */
817static void _kafka_background_event_cb(UNUSED rd_kafka_t *rk, rd_kafka_event_t *ev, UNUSED void *uctx)
818{
819 switch (rd_kafka_event_type(ev)) {
820 case RD_KAFKA_EVENT_DR:
821 {
822 rd_kafka_message_t const *msg;
823 while ((msg = rd_kafka_event_message_next(ev))) {
826
827 if (!msg->_private) continue; /* fire-and-forget */
828
829 pctx = msg->_private; /* plain cast; no talloc ops */
830
831 /*
832 * Advisory: if the request was already cancelled
833 * on the worker side, short-circuit and drop the
834 * pctx here rather than walking the full dispatch
835 * path just for the worker to see NULL again and
836 * free it. Correctness does NOT depend on this
837 * check - a missed cancel just means the pctx
838 * takes the slow path through the mailbox. The
839 * shutdown barrier is `rd_kafka_flush` in
840 * `mod_thread_detach`, not this early-out.
841 */
842 if (atomic_load_explicit(&pctx->request, memory_order_acquire) == NULL) {
843 free(pctx);
844 continue;
845 }
846
847 pctx->err = msg->err;
848 pctx->partition = msg->partition;
849 pctx->offset = msg->offset;
850 t = pctx->target;
851
852#ifndef NDEBUG
853 fr_assert(pthread_equal(pthread_self(), t->worker_tid) == 0);
854#endif
855
856 MEM(fr_atomic_ring_push(t->queue, pctx) == true);
857
858 (void) fr_event_user_trigger(t->wake);
859 }
860 break;
861 }
862
863 default:
864 /*
865 * Broker-level errors surface via `_kafka_log_cb`
866 * on librdkafka's broker threads; anything else
867 * reaching this bg cb (instance-scoped errors,
868 * throttle events, etc.) is currently swallowed.
869 * Add a relaxed-atomic counter off `inst` if we
870 * ever need observability here.
871 */
872 break;
873 }
874
875 rd_kafka_event_destroy(ev);
876}
877
878/** Destructor for inst-scoped topic handles. Releases the rd_kafka_topic_t. */
880{
881 if (h->kt) rd_kafka_topic_destroy(h->kt);
882 return 0;
883}
884
885/** Create a shared rd_kafka_topic_t for every declared topic
886 *
887 * Called at mod_instantiate. Walks the `topic { <name> { ... } }`
888 * subsections directly off the module's CONF_SECTION - the kafka base
889 * library has already parsed each per-topic conf into an
890 * `fr_kafka_topic_conf_t` stashed via cf_data on the topic's section,
891 * so we just fetch and dup it.
892 */
894{
896
897 if (!inst->kconf.topics) return 0;
898
899 fr_rb_inorder_foreach(inst->kconf.topics, fr_kafka_topic_t, topic) {
900 rlm_kafka_topic_t *topic_t;
901 rd_kafka_topic_conf_t *ktc;
902
903 MEM(ktc = rd_kafka_topic_conf_dup(topic->conf->rdtc));
904 MEM(topic_t = talloc_zero(inst->topics, rlm_kafka_topic_t));
905 MEM(topic_t->name = talloc_strdup(topic_t, topic->name));
906 topic_t->kt = rd_kafka_topic_new(inst->rk, topic_t->name, ktc);
907 if (!topic_t->kt) {
908 /* librdkafka consumes tc only on success */
909 rd_kafka_topic_conf_destroy(ktc);
910 ERROR("Failed creating topic '%s' - %s",
911 topic_t->name, rd_kafka_err2str(rd_kafka_last_error()));
912 talloc_free(topic_t);
913 return -1;
914 }
915 talloc_set_destructor(topic_t, _topic_free);
916
917 if (!fr_cond_assert_msg(fr_rb_insert(inst->topics, topic_t), "duplicate topic handle")) {
918 talloc_free(topic_t);
919 return -1;
920 }
921 }
923
924 return 0;
925}
926
927/** Tear down a worker's kafka state
928 *
929 * The barrier we need is "no bg cb is mid-invocation for any of our
930 * pctxs when the framework frees `t` / `t->el`". `rd_kafka_flush`
931 * waits for every outstanding produce's DR to be fired AND the bg cb
932 * to have returned, which gives us exactly that. If flush times out
933 * (broker unreachable mid-shutdown) we purge all inflight messages -
934 * librdkafka synthesises `ERR__PURGE_QUEUE` DRs for them locally, no
935 * broker round-trip - and a second flush drains those through the
936 * bg cb with an unbounded wait (purge makes the drain finite without
937 * needing a user-configured timeout).
938 *
939 * Every worker flushes. The first one through actually drains
940 * librdkafka's queues; subsequent calls return immediately because
941 * `outq_len` is already zero. The cost is one extra flush call per
942 * worker (cheap when there's nothing to wait for), the gain is that
943 * each worker has its own barrier guaranteeing no bg cb invocation
944 * is mid-flight against this worker's `t->queue` / `t->wake`.
945 *
946 * Order: flush -> drain mailbox -> free wake. Freeing the wake
947 * before draining would race a bg cb that loaded a non-NULL
948 * `pctx->request` just before cancellation propagated and is about
949 * to call `fr_event_user_trigger(t->wake)`.
950 *
951 * @param[in] mctx thread-instance ctx.
952 * @return 0 (never fails fatally).
953 */
955{
957 rlm_kafka_thread_t *t = talloc_get_type_abort(mctx->thread, rlm_kafka_thread_t);
959 rd_kafka_resp_err_t err;
960
961 /*
962 * Flush is thread safe, and only returns after
963 * all in flight kafka requests have had their
964 * delivery reports run through the callback.
965 *
966 * At the point where thread_detach is called
967 * there are no more request_t in progress, so
968 * we gurantee the callback will never add additional
969 * delivery reports to this thread's queue.
970 *
971 * We call kafka flush in every thread, because
972 * there is no explicit synchronisation which
973 * gurantees all workers have stopped processing
974 * reuests by the time the first thread is being
975 * detached, so theoretically new requests can
976 * be enqueued by other threads after the first
977 * thread has called flush.
978 */
979 err = rd_kafka_flush(inst->rk, fr_time_delta_to_msec(inst->flush_timeout));
980 if (unlikely(err != RD_KAFKA_RESP_ERR_NO_ERROR)) {
981 WARN("Shutdown flush timed out, purging %d in-flight message(s)",
982 rd_kafka_outq_len(inst->rk));
983
984 rd_kafka_purge(inst->rk, RD_KAFKA_PURGE_F_QUEUE | RD_KAFKA_PURGE_F_INFLIGHT);
985
986 /*
987 * Drain the purge-generated DRs. No broker
988 * round-trip left; drain time is bounded by
989 * bg cb processing speed (us per pctx).
990 * -1 == wait indefinitely.
991 */
992 (void) rd_kafka_flush(inst->rk, -1);
993 }
994
995 /*
996 * Drain anything the bg cb pushed onto us. Every pctx
997 * here must have `request == NULL` because the framework
998 * cancels every yielded request this worker owned before
999 * calling thread_detach - assert that to catch any future
1000 * change to that ordering immediately.
1001 */
1002 while (fr_atomic_ring_pop(t->queue, (void **)&pctx)) {
1003 fr_assert(atomic_load_explicit(&pctx->request, memory_order_relaxed) == NULL);
1004 free(pctx);
1005 }
1006
1007 TALLOC_FREE(t->wake);
1008
1009 return 0;
1010}
1011
1012/** Stand up this worker's kafka mailbox + wake event
1013 *
1014 * Allocates the segmented SPSC ring that the bg cb will push delivery
1015 * reports onto and registers the `EVFILT_USER` wake event the cb uses
1016 * to kick us. The shared producer itself is created once at
1017 * `mod_instantiate` - there's nothing per-worker to wire up there.
1018 *
1019 * @param[in] mctx thread-instance ctx (`mctx->thread` is our
1020 * rlm_kafka_thread_t, `mctx->el` is the worker's
1021 * event list).
1022 * @return 0 on success, -1 on any setup failure.
1023 */
1025{
1026 rlm_kafka_thread_t *t = talloc_get_type_abort(mctx->thread, rlm_kafka_thread_t);
1027
1028 t->el = mctx->el;
1029
1030#ifndef NDEBUG
1031 t->worker_tid = pthread_self();
1032#endif
1033
1034 /*
1035 * Segment size is a growth-granularity knob, not a cap: the
1036 * ring grows on demand, so 1024 just controls how often the
1037 * bg thread has to malloc a fresh segment during bursts.
1038 */
1039 MEM(t->queue = fr_atomic_ring_alloc(t, 1024));
1040
1041 if (fr_event_user_insert(t, t->el, &t->wake, false, _kafka_wake, t) < 0) {
1042 PERROR("fr_event_user_insert failed");
1043 return -1;
1044 }
1045
1046 return 0;
1047}
1048
1049/** Module-instance setup
1050 *
1051 * Builds the log prefix, wires up the log + background event callbacks
1052 * on the shared conf, enables DR / ERROR events, creates the single
1053 * shared producer, forwards the main queue to the background queue
1054 * (so DRs reach `_kafka_background_event_cb` via librdkafka's own bg
1055 * thread), and finally creates the inst-scoped topic handles.
1056 *
1057 * @param[in] mctx module-instance ctx.
1058 * @return 0 on success, -1 on error.
1059 */
1060static int mod_instantiate(module_inst_ctx_t const *mctx)
1061{
1062 rlm_kafka_t *inst = talloc_get_type_abort(mctx->mi->data, rlm_kafka_t);
1063 rd_kafka_conf_t *conf;
1064 char errstr[512];
1065
1066 /*
1067 * rd_kafka_new consumes the conf on success. The original
1068 * lives under a talloc sentinel that destroys it at inst
1069 * teardown, so dup it before handing ownership off.
1070 */
1071 MEM(inst->log_prefix = talloc_typed_asprintf(inst, "rlm_kafka (%s)", mctx->mi->name));
1072
1073 MEM(conf = rd_kafka_conf_dup(inst->kconf.conf));
1074 rd_kafka_conf_set_log_cb(conf, _kafka_log_cb);
1075 rd_kafka_conf_set_background_event_cb(conf, _kafka_background_event_cb);
1076 rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_DR | RD_KAFKA_EVENT_ERROR);
1077 rd_kafka_conf_set_opaque(conf, inst);
1078
1079 inst->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
1080 if (!inst->rk) {
1081 rd_kafka_conf_destroy(conf); /* only consumed on success */
1082 ERROR("rd_kafka_new failed - %s", errstr);
1083 return -1;
1084 }
1085
1086 /*
1087 * Producer DRs land on the main queue by default; the bg cb
1088 * only runs for events on the background queue. Forward
1089 * main -> bg so delivery reports reach our cb.
1090 */
1091 {
1092 rd_kafka_queue_t *main_q = rd_kafka_queue_get_main(inst->rk);
1093 rd_kafka_queue_t *bg_q = rd_kafka_queue_get_background(inst->rk);
1094
1095 rd_kafka_queue_forward(main_q, bg_q);
1096 rd_kafka_queue_destroy(main_q);
1097 rd_kafka_queue_destroy(bg_q);
1098 }
1099
1100 if (kafka_topics_alloc(inst) < 0) {
1101 rd_kafka_destroy(inst->rk);
1102 inst->rk = NULL;
1103 return -1;
1104 }
1105
1106 return 0;
1107}
1108
1109/** Module detach: tear down the shared producer
1110 *
1111 * 1. `rd_kafka_flush` gives in-flight produces a grace window to
1112 * complete and fire their DRs through the bg cb. By this point
1113 * every worker has already detached; any remaining pctxs have
1114 * `request == NULL` and will be freed inline by the bg cb.
1115 * 2. Free the topic rbtree explicitly BEFORE `rd_kafka_destroy` -
1116 * destroy auto-tears-down topic handles attached to the producer,
1117 * and we'd double-free via `_topic_free` otherwise.
1118 * 3. `rd_kafka_destroy` blocks until the bg thread exits; after that
1119 * no more callbacks can fire.
1120 */
1121static int mod_detach(module_detach_ctx_t const *mctx)
1122{
1123 rlm_kafka_t *inst = talloc_get_type_abort(mctx->mi->data, rlm_kafka_t);
1124
1125 if (inst->rk) {
1126 rd_kafka_resp_err_t ferr;
1127
1128 ferr = rd_kafka_flush(inst->rk, fr_time_delta_to_msec(inst->flush_timeout));
1129 if (ferr != RD_KAFKA_RESP_ERR_NO_ERROR) {
1130 WARN("kafka - flush timed out; %d messages remain in queue",
1131 rd_kafka_outq_len(inst->rk));
1132 }
1133 }
1134
1135 TALLOC_FREE(inst->topics);
1136
1137 if (inst->rk) {
1138 rd_kafka_destroy(inst->rk);
1139 inst->rk = NULL;
1140 }
1141
1142 return 0;
1143}
1144
1145/** Bootstrap-phase setup
1146 *
1147 * Just registers the `%kafka.produce()` xlat. Topic declarations are
1148 * looked up directly via `cf_section_find` at call_env parse time
1149 * (see `_kafka_topic_env_parse`), and at worker thread_instantiate
1150 * time via `cf_section_find_next`, so there's nothing to build here.
1151 *
1152 * @param[in] mctx module-instance ctx.
1153 * @return 0 on success, -1 on error.
1154 */
1155static int mod_bootstrap(module_inst_ctx_t const *mctx)
1156{
1157 xlat_t *xlat;
1158
1159 xlat = module_rlm_xlat_register(mctx->mi->boot, mctx, "produce", kafka_xlat_produce, FR_TYPE_BOOL);
1160 if (!xlat) return -1;
1163 rlm_kafka_xlat_inst_t, NULL, NULL);
1164
1165 return 0;
1166}
1167
1168/** One-time library load hook
1169 *
1170 * Prime librdkafka's lazy global init (SSL lock callbacks on legacy
1171 * OpenSSL, SASL globals if compiled in) so the first real
1172 * `rd_kafka_new()` in a worker thread doesn't race the server's own
1173 * OpenSSL setup. Ref-counted against any other kafka-using module.
1174 */
1175static int mod_load(void)
1176{
1177 return fr_kafka_init();
1178}
1179
1180/** Paired with mod_load */
1181static void mod_unload(void)
1182{
1183 fr_kafka_free();
1184}
1185
1187 .common = {
1188 .magic = MODULE_MAGIC_INIT,
1189 .name = "kafka",
1190 .inst_size = sizeof(rlm_kafka_t),
1191 .thread_inst_size = sizeof(rlm_kafka_thread_t),
1192 .config = module_config,
1193 .onload = mod_load,
1194 .unload = mod_unload,
1195 .bootstrap = mod_bootstrap,
1196 .instantiate = mod_instantiate,
1197 .detach = mod_detach,
1198 .thread_instantiate = mod_thread_instantiate,
1199 .thread_detach = mod_thread_detach
1200 },
1201 /*
1202 * `send` and `recv` alias `produce` so the call reads naturally
1203 * in its surrounding section - e.g. `recv Access-Request {
1204 * kafka.recv.auth }` or `send Access-Accept { kafka.send.audit }`.
1205 * All three dispatch to the same producer path.
1206 */
1207 .method_group = {
1208 .bindings = (module_method_binding_t[]){
1209 {
1210 .section = SECTION_NAME("produce", CF_IDENT_ANY),
1211 .method = mod_produce,
1212 .method_env = &rlm_kafka_produce_env
1213 },
1214 {
1215 .section = SECTION_NAME("send", CF_IDENT_ANY),
1216 .method = mod_produce,
1217 .method_env = &rlm_kafka_produce_env
1218 },
1219 {
1220 .section = SECTION_NAME("recv", CF_IDENT_ANY),
1221 .method = mod_produce,
1222 .method_env = &rlm_kafka_produce_env
1223 },
1225 }
1226 }
1227};
unlang_action_t
Returned by unlang_op_t calls, determine the next action of the interpreter.
Definition action.h:35
log_entry msg
Definition acutest.h:794
bool fr_atomic_ring_push(fr_atomic_ring_t *ring, void *data)
Push a pointer into the ring; allocate a new segment on overflow.
fr_atomic_ring_t * fr_atomic_ring_alloc(TALLOC_CTX *ctx, size_t seg_size)
Allocate an empty SPSC ring.
bool fr_atomic_ring_pop(fr_atomic_ring_t *ring, void **p_data)
Pop a pointer from the ring, advancing past drained segments.
#define USES_APPLE_DEPRECATED_API
Definition build.h:499
#define RCSID(id)
Definition build.h:512
#define endforeach
Definition build.h:522
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:113
#define unlikely(_x)
Definition build.h:407
#define UNUSED
Definition build.h:336
int call_env_parse(TALLOC_CTX *ctx, call_env_parsed_head_t *parsed, char const *name, tmpl_rules_t const *t_rules, CONF_SECTION const *cs, call_env_ctx_t const *cec, call_env_parser_t const *rule)
Parse per call env.
Definition call_env.c:461
call_env_parsed_t * call_env_parsed_add(TALLOC_CTX *ctx, call_env_parsed_head_t *head, call_env_parser_t const *rule)
Allocate a new call_env_parsed_t structure and add it to the list of parsed call envs.
Definition call_env.c:690
void call_env_parsed_set_data(call_env_parsed_t *parsed, void const *data)
Assign data to a call_env_parsed_t.
Definition call_env.c:747
#define CALL_ENV_TERMINATOR
Definition call_env.h:236
#define FR_CALL_ENV_METHOD_OUT(_inst)
Helper macro for populating the size/type fields of a call_env_method_t from the output structure typ...
Definition call_env.h:240
call_env_parser_t const * env
Parsing rules for call method env.
Definition call_env.h:247
section_name_t const * asked
The actual name1/name2 that resolved to a module_method_binding_t.
Definition call_env.h:232
@ CALL_ENV_FLAG_CONCAT
If the tmpl produced multiple boxes they should be concatenated.
Definition call_env.h:76
@ CALL_ENV_FLAG_PARSE_ONLY
The result of parsing will not be evaluated at runtime.
Definition call_env.h:85
@ CALL_ENV_FLAG_REQUIRED
Associated conf pair or section is required.
Definition call_env.h:75
@ CALL_ENV_FLAG_PARSE_MISSING
If this subsection is missing, still parse it.
Definition call_env.h:88
@ CALL_ENV_FLAG_NULLABLE
Tmpl expansions are allowed to produce no output.
Definition call_env.h:80
@ CALL_ENV_PARSE_TYPE_VOID
Output of the parsing phase is undefined (a custom structure).
Definition call_env.h:62
module_instance_t const * mi
Module instance that the callenv is registered to.
Definition call_env.h:229
#define FR_CALL_ENV_SUBSECTION_FUNC(_name, _name2, _flags, _func)
Specify a call_env_parser_t which parses a subsection using a callback function.
Definition call_env.h:412
#define FR_CALL_ENV_OFFSET(_name, _cast_type, _flags, _struct, _field)
Specify a call_env_parser_t which writes out runtime results to the specified field.
Definition call_env.h:340
Per method call config.
Definition call_env.h:180
#define CONF_PARSER_TERMINATOR
Definition cf_parse.h:669
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition cf_parse.h:280
Defines a CONF_PAIR to C data type mapping.
Definition cf_parse.h:606
Common header for all CONF_* types.
Definition cf_priv.h:49
#define cf_log_err(_cf, _fmt,...)
Definition cf_util.h:287
#define CF_IDENT_ANY
Definition cf_util.h:75
static int fr_dcursor_append(fr_dcursor_t *cursor, void *v)
Insert a single item at the end of the list.
Definition dcursor.h:406
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:148
#define MEM(x)
Definition debug.h:36
#define ERROR(fmt,...)
Definition dhcpclient.c:40
#define DEBUG(fmt,...)
Definition dhcpclient.c:38
static fr_slen_t err
Definition dict.h:882
static fr_slen_t in
Definition dict.h:882
Test enumeration values.
Definition dict_test.h:92
#define MODULE_MAGIC_INIT
Stop people using different module/library/server versions together.
Definition dl_module.h:63
#define fr_event_user_insert(_ctx, _el, _ev_p, _trigger, _callback, _uctx)
Definition event.h:279
free(array)
talloc_free(hp)
void unlang_interpret_mark_runnable(request_t *request)
Mark a request as resumable.
Definition interpret.c:1988
CONF_SECTION * cs
topic's CONF_SECTION (for call_env lookups of per-topic pairs like value / key)
Definition base.h:69
#define KAFKA_BASE_CONFIG
Config entries common to producer and consumer clients.
Definition base.h:177
#define KAFKA_PRODUCER_CONFIG
Producer-only delta: librdkafka producer tuning + declared topics.
Definition base.h:228
Declared topic record - one per topic { <name> { ... } } subsection.
Definition base.h:66
void fr_kafka_free(void)
Drop one ref to librdkafka's global init.
Definition base.c:1238
fr_kafka_topic_t * kafka_topic_conf_find(fr_kafka_conf_t const *kc, char const *name)
Look up a declared topic by name on an fr_kafka_conf_t
Definition base.c:623
int fr_kafka_init(void)
Drive librdkafka's lazy global init deterministically.
Definition base.c:1207
#define PERROR(_fmt,...)
Definition log.h:228
int fr_event_user_trigger(fr_event_user_t *ev)
Trigger a user event.
Definition event.c:1927
Stores all information relating to an event list.
Definition event.c:377
Callbacks for kevent() user events.
Definition event.c:341
static void partition(fr_lst_t *lst, stack_index_t stack_index)
Definition lst.c:460
@ FR_TYPE_STRING
String of printable characters.
@ FR_TYPE_BOOL
A truth value.
@ FR_TYPE_OCTETS
Raw octets.
unsigned char uint8_t
void * env_data
Per call environment data.
Definition module_ctx.h:44
module_instance_t const * mi
Instance of the module being instantiated.
Definition module_ctx.h:42
void * thread
Thread specific instance data.
Definition module_ctx.h:43
void * rctx
Resume ctx that a module previously set.
Definition module_ctx.h:45
fr_event_list_t * el
Event list to register any IO handlers and timers against.
Definition module_ctx.h:68
module_instance_t * mi
Module instance to detach.
Definition module_ctx.h:57
void * thread
Thread instance data.
Definition module_ctx.h:67
module_instance_t const * mi
Instance of the module being instantiated.
Definition module_ctx.h:64
module_instance_t * mi
Instance of the module being instantiated.
Definition module_ctx.h:51
Temporary structure to hold arguments for module calls.
Definition module_ctx.h:41
Temporary structure to hold arguments for detach calls.
Definition module_ctx.h:56
Temporary structure to hold arguments for instantiation calls.
Definition module_ctx.h:50
Temporary structure to hold arguments for thread_instantiation calls.
Definition module_ctx.h:63
xlat_t * module_rlm_xlat_register(TALLOC_CTX *ctx, module_inst_ctx_t const *mctx, char const *name, xlat_func_t func, fr_type_t return_type)
Definition module_rlm.c:232
module_t common
Common fields presented by all modules.
Definition module_rlm.h:39
#define fr_assert(_expr)
Definition rad_assert.h:37
#define REDEBUG(fmt,...)
#define RDEBUG2(fmt,...)
#define WARN(fmt,...)
#define INFO(fmt,...)
Definition radict.c:63
static rs_t * conf
Definition radsniff.c:52
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
Definition rb.h:244
#define fr_rb_inorder_foreach(_tree, _type, _iter)
Definition rb.h:330
The main red black tree structure.
Definition rb.h:71
#define RETURN_UNLANG_RCODE(_rcode)
Definition rcode.h:61
#define RETURN_UNLANG_FAIL
Definition rcode.h:63
rlm_rcode_t
Return codes indicating the result of the module call.
Definition rcode.h:44
@ RLM_MODULE_OK
The module is OK, continue.
Definition rcode.h:49
@ RLM_MODULE_FAIL
Module failed, don't reply.
Definition rcode.h:48
@ RLM_MODULE_REJECT
Immediately reject the request.
Definition rcode.h:47
@ RLM_MODULE_TIMEOUT
Module (or section) timed out.
Definition rcode.h:56
char const * log_prefix
pre-rendered "rlm_kafka (<instance>)", used by librdkafka's log_cb which fires from internal threads ...
Definition rlm_kafka.c:99
static int mod_detach(module_detach_ctx_t const *mctx)
Module detach: tear down the shared producer.
Definition rlm_kafka.c:1121
static int mod_load(void)
One-time library load hook.
Definition rlm_kafka.c:1175
fr_value_box_t * value
message payload
Definition rlm_kafka.c:167
static unlang_action_t mod_produce(UNUSED unlang_result_t *p_result, module_ctx_t const *mctx, request_t *request)
Module method entry point for kafka.produce.
Definition rlm_kafka.c:549
static xlat_action_t kafka_xlat_produce(UNUSED TALLOC_CTX *xctx_ctx, UNUSED fr_dcursor_t *out, xlat_ctx_t const *xctx, request_t *request, fr_value_box_list_t *in)
kafka.produce(topic, key, value) - runtime-named produce
Definition rlm_kafka.c:735
static xlat_arg_parser_t const kafka_xlat_produce_args[]
Definition rlm_kafka.c:697
static void mod_signal(module_ctx_t const *mctx, request_t *request, UNUSED fr_signal_t action)
Module-method cancellation.
Definition rlm_kafka.c:500
fr_rb_tree_t * topics
rlm_kafka_topic_t keyed by name, read-only after mod_instantiate.
Definition rlm_kafka.c:105
fr_rb_node_t node
Definition rlm_kafka.c:118
static int _kafka_topic_env_parse(TALLOC_CTX *ctx, call_env_parsed_head_t *out, tmpl_rules_t const *t_rules, CONF_ITEM *ci, call_env_ctx_t const *cec, UNUSED call_env_parser_t const *rule)
Resolve the topic named in the method's second identifier, then hand its subsection back to the call_...
Definition rlm_kafka.c:412
fr_atomic_ring_t * queue
rlm_kafka_msg_ctx_t pushed by bg cb on librdkafka's thread, popped by our event loop on this worker.
Definition rlm_kafka.c:127
char const * name
Definition rlm_kafka.c:116
fr_event_list_t * el
Definition rlm_kafka.c:122
static int kafka_xlat_instantiate(xlat_inst_ctx_t const *xctx)
Xlat instance init: if the topic arg is a compile-time literal, resolve and cache the inst-scoped rd_...
Definition rlm_kafka.c:602
fr_value_box_t * key
optional message key
Definition rlm_kafka.c:166
static int kafka_topics_alloc(rlm_kafka_t *inst)
Create a shared rd_kafka_topic_t for every declared topic.
Definition rlm_kafka.c:893
module_rlm_t rlm_kafka
Definition rlm_kafka.c:1186
static int mod_bootstrap(module_inst_ctx_t const *mctx)
Bootstrap-phase setup.
Definition rlm_kafka.c:1155
static int _topic_free(rlm_kafka_topic_t *h)
Destructor for inst-scoped topic handles.
Definition rlm_kafka.c:879
static void kafka_xlat_produce_signal(xlat_ctx_t const *xctx, UNUSED request_t *request, UNUSED fr_signal_t action)
Xlat cancellation.
Definition rlm_kafka.c:690
char const * topic
resolved topic name (validated at parse time)
Definition rlm_kafka.c:165
static void mod_unload(void)
Paired with mod_load.
Definition rlm_kafka.c:1181
rd_kafka_topic_t * topic
pre-resolved handle, NULL if topic arg is dynamic.
Definition rlm_kafka.c:591
rd_kafka_topic_t * kt
Definition rlm_kafka.c:117
fr_kafka_conf_t kconf
parsed producer conf - MUST be first
Definition rlm_kafka.c:96
static rlm_rcode_t kafka_err_to_rcode(request_t *request, rlm_kafka_msg_ctx_t const *pctx)
Translate a librdkafka delivery-report error into a module rcode.
Definition rlm_kafka.c:296
static void _kafka_background_event_cb(UNUSED rd_kafka_t *rk, rd_kafka_event_t *ev, UNUSED void *uctx)
Background event callback, runs on librdkafka's bg thread.
Definition rlm_kafka.c:817
rd_kafka_t * rk
shared producer, created at mod_instantiate.
Definition rlm_kafka.c:104
static unlang_action_t mod_resume(unlang_result_t *p_result, module_ctx_t const *mctx, request_t *request)
Resume a yielded module method after its delivery report has arrived.
Definition rlm_kafka.c:477
static int mod_thread_instantiate(module_thread_inst_ctx_t const *mctx)
Stand up this worker's kafka mailbox + wake event.
Definition rlm_kafka.c:1024
fr_time_delta_t flush_timeout
How long mod_detach waits for in-flight produces to drain before rd_kafka_destroy.
Definition rlm_kafka.c:97
static const call_env_method_t rlm_kafka_produce_env
Definition rlm_kafka.c:456
static rlm_kafka_msg_ctx_t * kafka_produce_enqueue(rlm_kafka_thread_t *t, request_t *request, rd_kafka_topic_t *topic, uint8_t const *key, size_t key_len, uint8_t const *value, size_t value_len)
Common produce-and-yield helper.
Definition rlm_kafka.c:343
fr_event_user_t * wake
EVFILT_USER handle; bg cb triggers it to wake the worker's event loop on this thread.
Definition rlm_kafka.c:124
static xlat_action_t kafka_xlat_produce_resume(TALLOC_CTX *xctx_ctx, fr_dcursor_t *out, xlat_ctx_t const *xctx, request_t *request, UNUSED fr_value_box_list_t *in)
Xlat resume: translate delivery report into a "partition:offset" string.
Definition rlm_kafka.c:662
static int8_t topic_name_cmp(void const *a, void const *b)
Definition rlm_kafka.c:195
struct rlm_kafka_thread_s rlm_kafka_thread_t
pthread_t worker_tid
pthread_self() captured at thread_instantiate.
Definition rlm_kafka.c:133
static int mod_thread_detach(module_thread_inst_ctx_t const *mctx)
Tear down a worker's kafka state.
Definition rlm_kafka.c:954
static call_env_parser_t const topic_env[]
Per-topic call_env rules, applied against the topic <name> subsection.
Definition rlm_kafka.c:388
static int mod_instantiate(module_inst_ctx_t const *mctx)
Module-instance setup.
Definition rlm_kafka.c:1060
static conf_parser_t const module_config[]
Module config: just the kafka base producer config for now.
Definition rlm_kafka.c:177
static void _kafka_wake(UNUSED fr_event_list_t *el, void *uctx)
Worker wake-up callback - the bg cb triggered our EVFILT_USER event.
Definition rlm_kafka.c:269
static rd_kafka_topic_t * kafka_find_topic(rlm_kafka_t const *inst, char const *name)
Look up a shared topic handle on the module instance by name.
Definition rlm_kafka.c:209
static void _kafka_log_cb(rd_kafka_t const *rk, int level, char const *fac, char const *buf)
librdkafka log callback - bridge internal library messages into the server log
Definition rlm_kafka.c:234
Call env for kafka.produce.
Definition rlm_kafka.c:164
Per produce() invocation context.
Definition rlm_kafka.c:148
Module instance data.
Definition rlm_kafka.c:95
Topic handle.
Definition rlm_kafka.c:115
Xlat instance data - cached topic handle for literal-topic calls.
Definition rlm_kafka.c:590
static char const * name
#define SECTION_NAME(_name1, _name2)
Define a section name consisting of a verb and a noun.
Definition section.h:39
char const * name2
Second section name. Usually a packet type like 'access-request', 'access-accept',...
Definition section.h:45
char const * name
Instance name e.g. user_database.
Definition module.h:357
CONF_SECTION * conf
Module's instance configuration.
Definition module.h:351
size_t inst_size
Size of the module's instance data.
Definition module.h:212
void * data
Module's instance data.
Definition module.h:293
void * boot
Data allocated during the boostrap phase.
Definition module.h:296
#define MODULE_BINDING_TERMINATOR
Terminate a module binding list.
Definition module.h:152
Named methods exported by a module.
Definition module.h:174
Optional arguments passed to vp_tmpl functions.
Definition tmpl.h:336
fr_signal_t
Signals that can be generated/processed by request signal handlers.
Definition signal.h:38
@ FR_SIGNAL_CANCEL
Request has been cancelled.
Definition signal.h:40
unlang_action_t unlang_module_yield(request_t *request, module_method_t resume, unlang_module_signal_t signal, fr_signal_t sigmask, void *rctx)
Yield a request back to the interpreter from within a module.
Definition module.c:431
eap_aka_sim_process_conf_t * inst
@ memory_order_release
Definition stdatomic.h:130
@ memory_order_relaxed
Definition stdatomic.h:127
@ memory_order_acquire
Definition stdatomic.h:129
#define _Atomic(T)
Definition stdatomic.h:77
#define atomic_load_explicit(object, order)
Definition stdatomic.h:312
#define atomic_store_explicit(object, desired, order)
Definition stdatomic.h:314
#define atomic_init(obj, value)
Definition stdatomic.h:89
char * talloc_typed_asprintf(TALLOC_CTX *ctx, char const *fmt,...)
Call talloc vasprintf, setting the type on the new chunk correctly.
Definition talloc.c:546
#define talloc_get_type_abort_const
Definition talloc.h:117
#define talloc_strdup(_ctx, _str)
Definition talloc.h:149
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
Definition time.h:637
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
static fr_event_list_t * el
xlat_action_t unlang_xlat_yield(request_t *request, xlat_func_t resume, xlat_func_signal_t signal, fr_signal_t sigmask, void *rctx)
Yield a request back to the interpreter from within a module.
Definition xlat.c:543
bool xlat_is_literal(xlat_exp_head_t const *head)
Check to see if the expansion consists entirely of value-box elements.
unsigned int required
Argument must be present, and non-empty.
Definition xlat.h:146
#define XLAT_ARG_PARSER_TERMINATOR
Definition xlat.h:170
xlat_action_t
Definition xlat.h:37
@ XLAT_ACTION_FAIL
An xlat function failed.
Definition xlat.h:44
@ XLAT_ACTION_DONE
We're done evaluating this level of nesting.
Definition xlat.h:43
Definition for a single argument consumed by an xlat function.
Definition xlat.h:145
#define fr_type_is_null(_x)
Definition types.h:347
int fr_value_box_cast(TALLOC_CTX *ctx, fr_value_box_t *dst, fr_type_t dst_type, fr_dict_attr_t const *dst_enumv, fr_value_box_t const *src)
Convert one type of fr_value_box_t to another.
Definition value.c:3946
void fr_value_box_clear_value(fr_value_box_t *data)
Clear/free any existing value.
Definition value.c:4331
#define fr_value_box_alloc(_ctx, _type, _enumv)
Allocate a value box of a specific type.
Definition value.h:644
#define FR_VALUE_BOX_INITIALISER_NULL(_vb)
A static initialiser for stack/globally allocated boxes.
Definition value.h:511
int nonnull(2, 5))
static size_t char ** out
Definition value.h:1030
void * rctx
Resume context.
Definition xlat_ctx.h:54
xlat_exp_t * ex
Tokenized expression to use in expansion.
Definition xlat_ctx.h:64
void const * inst
xlat instance data.
Definition xlat_ctx.h:50
module_inst_ctx_t const * mctx
Synthesised module calling ctx.
Definition xlat_ctx.h:65
module_ctx_t const * mctx
Synthesised module calling ctx.
Definition xlat_ctx.h:52
void * inst
xlat instance data to populate.
Definition xlat_ctx.h:63
An xlat calling ctx.
Definition xlat_ctx.h:49
An xlat instantiation ctx.
Definition xlat_ctx.h:62
int xlat_func_args_set(xlat_t *x, xlat_arg_parser_t const args[])
Register the arguments of an xlat.
Definition xlat_func.c:363
#define xlat_func_instantiate_set(_xlat, _instantiate, _inst_struct, _detach, _uctx)
Set a callback for global instantiation of xlat functions.
Definition xlat_func.h:93
@ XLAT_GROUP
encapsulated string of xlats
Definition xlat_priv.h:116
xlat_type_t _CONST type
type of this expansion.
Definition xlat_priv.h:155
static xlat_exp_t * xlat_exp_head(xlat_exp_head_t const *head)
Definition xlat_priv.h:210
An xlat expansion node.
Definition xlat_priv.h:148