![]() |
The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
|
Asynchronous Kafka producer module. More...
#include <freeradius-devel/util/debug.h>#include <freeradius-devel/util/dlist.h>#include <freeradius-devel/util/misc.h>#include <freeradius-devel/util/rb.h>#include <freeradius-devel/util/types.h>#include <freeradius-devel/util/value.h>#include <freeradius-devel/io/atomic_queue.h>#include <freeradius-devel/kafka/base.h>#include <freeradius-devel/server/base.h>#include <freeradius-devel/server/module_rlm.h>#include <freeradius-devel/unlang/call_env.h>#include <freeradius-devel/unlang/module.h>#include <freeradius-devel/unlang/xlat.h>#include <freeradius-devel/unlang/xlat_ctx.h>#include <freeradius-devel/unlang/xlat_func.h>#include <freeradius-devel/unlang/xlat_priv.h>#include <fcntl.h>#include <pthread.h>#include <stdatomic.h>#include <stdlib.h>#include <unistd.h>
Include dependency graph for rlm_kafka.c:Go to the source code of this file.
Data Structures | |
| struct | rlm_kafka_env_t |
Call env for kafka.produce. More... | |
| struct | rlm_kafka_msg_ctx_t |
| Per produce() invocation context. More... | |
| struct | rlm_kafka_t |
| Module instance data. More... | |
| struct | rlm_kafka_thread_s |
| struct | rlm_kafka_topic_t |
| Topic handle. More... | |
| struct | rlm_kafka_xlat_inst_t |
| Xlat instance data - cached topic handle for literal-topic calls. More... | |
Typedefs | |
| typedef struct rlm_kafka_thread_s | rlm_kafka_thread_t |
Functions | |
| static void | _kafka_background_event_cb (UNUSED rd_kafka_t *rk, rd_kafka_event_t *ev, UNUSED void *uctx) |
| Background event callback, runs on librdkafka's bg thread. | |
| static void | _kafka_log_cb (rd_kafka_t const *rk, int level, char const *fac, char const *buf) |
| librdkafka log callback - bridge internal library messages into the server log | |
| static int | _kafka_topic_env_parse (TALLOC_CTX *ctx, call_env_parsed_head_t *out, tmpl_rules_t const *t_rules, CONF_ITEM *ci, call_env_ctx_t const *cec, UNUSED call_env_parser_t const *rule) |
Resolve the topic named in the method's second identifier, then hand its subsection back to the call_env framework for per-topic value / key tmpl parsing. | |
| static void | _kafka_wake (UNUSED fr_event_list_t *el, void *uctx) |
| Worker wake-up callback - the bg cb triggered our EVFILT_USER event. | |
| static int | _topic_free (rlm_kafka_topic_t *h) |
| Destructor for inst-scoped topic handles. | |
| static rlm_rcode_t | kafka_err_to_rcode (request_t *request, rlm_kafka_msg_ctx_t const *pctx) |
| Translate a librdkafka delivery-report error into a module rcode. | |
| static rd_kafka_topic_t * | kafka_find_topic (rlm_kafka_t const *inst, char const *name) |
| Look up a shared topic handle on the module instance by name. | |
| static rlm_kafka_msg_ctx_t * | kafka_produce_enqueue (rlm_kafka_thread_t *t, request_t *request, rd_kafka_topic_t *topic, uint8_t const *key, size_t key_len, uint8_t const *value, size_t value_len) |
| Common produce-and-yield helper. | |
| static int | kafka_topics_alloc (rlm_kafka_t *inst) |
| Create a shared rd_kafka_topic_t for every declared topic. | |
| static int | kafka_xlat_instantiate (xlat_inst_ctx_t const *xctx) |
Xlat instance init: if the topic arg is a compile-time literal, resolve and cache the inst-scoped rd_kafka_topic_t handle. | |
| static xlat_action_t | kafka_xlat_produce (UNUSED TALLOC_CTX *xctx_ctx, UNUSED fr_dcursor_t *out, xlat_ctx_t const *xctx, request_t *request, fr_value_box_list_t *in) |
kafka.produce(topic, key, value) - runtime-named produce | |
| static xlat_action_t | kafka_xlat_produce_resume (TALLOC_CTX *xctx_ctx, fr_dcursor_t *out, xlat_ctx_t const *xctx, request_t *request, UNUSED fr_value_box_list_t *in) |
| Xlat resume: translate delivery report into a "partition:offset" string. | |
| static void | kafka_xlat_produce_signal (xlat_ctx_t const *xctx, UNUSED request_t *request, UNUSED fr_signal_t action) |
| Xlat cancellation. | |
| static int | mod_bootstrap (module_inst_ctx_t const *mctx) |
| Bootstrap-phase setup. | |
| static int | mod_detach (module_detach_ctx_t const *mctx) |
| Module detach: tear down the shared producer. | |
| static int | mod_instantiate (module_inst_ctx_t const *mctx) |
| Module-instance setup. | |
| static int | mod_load (void) |
| One-time library load hook. | |
| static unlang_action_t | mod_produce (UNUSED unlang_result_t *p_result, module_ctx_t const *mctx, request_t *request) |
Module method entry point for kafka.produce. | |
| static unlang_action_t | mod_resume (unlang_result_t *p_result, module_ctx_t const *mctx, request_t *request) |
| Resume a yielded module method after its delivery report has arrived. | |
| static void | mod_signal (module_ctx_t const *mctx, request_t *request, UNUSED fr_signal_t action) |
| Module-method cancellation. | |
| static int | mod_thread_detach (module_thread_inst_ctx_t const *mctx) |
| Tear down a worker's kafka state. | |
| static int | mod_thread_instantiate (module_thread_inst_ctx_t const *mctx) |
| Stand up this worker's kafka mailbox + wake event. | |
| static void | mod_unload (void) |
| Paired with mod_load. | |
| static int8_t | topic_name_cmp (void const *a, void const *b) |
Variables | |
| static xlat_arg_parser_t const | kafka_xlat_produce_args [] |
| static conf_parser_t const | module_config [] |
| Module config: just the kafka base producer config for now. | |
| module_rlm_t | rlm_kafka |
| static const call_env_method_t | rlm_kafka_produce_env |
| static call_env_parser_t const | topic_env [] |
Per-topic call_env rules, applied against the topic <name> subsection. | |
Asynchronous Kafka producer module.
A single shared rd_kafka_t serves every worker in the module instance. Delivery reports are fanned out to the originating worker via librdkafka's own background thread: at mod_instantiate we set rd_kafka_conf_set_background_event_cb and forward the producer's main queue to the background queue, so DRs arrive at our bg cb (_kafka_background_event_cb). The cb pushes each DR's opaque onto the originating worker's fr_atomic_ring_t mailbox and triggers its EVFILT_USER wake event; the worker's event loop drains the mailbox on its own stack and marks the request runnable itself - resumption never crosses threads.
pctx is malloc'd, not talloc'd, so the bg thread can free it directly without racing worker-thread talloc. pctx->request is atomic and is NULLed by the cancel signal handler on the worker; by the time mod_thread_detach runs the framework has already cancelled every yielded request this worker owned, so the bg cb sees NULL and frees inline without touching the (about-to-be-freed) thread_inst.
The schema for the module config is just kafka_base_producer_config from the kafka base library (librdkafka passthrough plus flush_timeout), with fr_kafka_conf_t embedded as the first member of rlm_kafka_t so FR_CONF_OFFSET entries resolve correctly. Topics are declared once in the module config and referenced by name at method/xlat invocation time; unknown topics are rejected at config-parse time rather than being created on the fly, so per-topic settings (acks, compression, partitioner) always match the declared configuration.
See mod_produce and kafka_xlat_produce for the caller-facing surfaces.
Definition in file rlm_kafka.c.
| struct rlm_kafka_env_t |
Call env for kafka.produce.
<topic>
Topic comes from the method's second identifier and is validated against the declared-topic list at call_env parse time, then stashed here as a plain name string.
Definition at line 164 of file rlm_kafka.c.
Collaboration diagram for rlm_kafka_env_t:| Data Fields | ||
|---|---|---|
| fr_value_box_t * | key | optional message key |
| char const * | topic | resolved topic name (validated at parse time) |
| fr_value_box_t * | value | message payload |
| struct rlm_kafka_msg_ctx_t |
Per produce() invocation context.
Raw malloc'd (not talloc) so the background dispatch thread can free() it directly without racing worker-thread talloc activity. request is atomic because the cancel signal handler (worker) and bg cb (librdkafka's own thread) access it from different threads. Reused as the rctx for both module-method and xlat invocations so we don't need a separate wrapper.
Definition at line 148 of file rlm_kafka.c.
| struct rlm_kafka_t |
Module instance data.
fr_kafka_conf_t is embedded as the first member so KAFKA_BASE_CONFIG / KAFKA_PRODUCER_CONFIG FR_CONF_OFFSET entries (relative to fr_kafka_conf_t) address correctly when the framework passes rlm_kafka_t as the parse base. The librdkafka handle and per-topic tree inside kconf are released automatically by talloc when the module instance is torn down (the library attaches a lifecycle sentinel during config parse).
Definition at line 95 of file rlm_kafka.c.
Collaboration diagram for rlm_kafka_t:| Data Fields | ||
|---|---|---|
| fr_time_delta_t | flush_timeout |
How long mod_detach waits for in-flight produces to drain before rd_kafka_destroy. |
| fr_kafka_conf_t | kconf | parsed producer conf - MUST be first |
| char const * | log_prefix |
pre-rendered "rlm_kafka (<instance>)", used by librdkafka's log_cb which fires from internal threads with no mctx in scope. Built once in mod_instantiate so we don't reformat per line. |
| rd_kafka_t * | rk | shared producer, created at mod_instantiate. |
| fr_rb_tree_t * | topics | rlm_kafka_topic_t keyed by name, read-only after mod_instantiate. |
| struct rlm_kafka_thread_s |
Definition at line 121 of file rlm_kafka.c.
Collaboration diagram for rlm_kafka_thread_s:| Data Fields | ||
|---|---|---|
| fr_event_list_t * | el | |
| fr_atomic_ring_t * | queue |
rlm_kafka_msg_ctx_t pushed by bg cb on librdkafka's thread, popped by our event loop on this worker. Segmented SPSC ring: grows on demand, so the bg cb never has to drop a delivery report. |
| fr_event_user_t * | wake | EVFILT_USER handle; bg cb triggers it to wake the worker's event loop on this thread. |
| pthread_t | worker_tid |
pthread_self() captured at thread_instantiate. Debug-build sanity check: the bg cb must NOT run on this thread, and the mailbox drain must. |
| struct rlm_kafka_topic_t |
Topic handle.
One per declared topic, created at mod_instantiate. rd_kafka_topic_t is bound to the producer that created it; we have one shared producer per module instance so the topic handles are inst-scoped.
Definition at line 115 of file rlm_kafka.c.
Collaboration diagram for rlm_kafka_topic_t:| Data Fields | ||
|---|---|---|
| rd_kafka_topic_t * | kt | |
| char const * | name | |
| fr_rb_node_t | node | |
| struct rlm_kafka_xlat_inst_t |
Xlat instance data - cached topic handle for literal-topic calls.
Topics are inst-scoped (shared producer), so the lookup can be resolved once at xlat_instantiate and the rd_kafka_topic_t * cached directly - no per-thread hop needed.
Definition at line 590 of file rlm_kafka.c.
| Data Fields | ||
|---|---|---|
| rd_kafka_topic_t * | topic | pre-resolved handle, NULL if topic arg is dynamic. |
| typedef struct rlm_kafka_thread_s rlm_kafka_thread_t |
|
static |
Background event callback, runs on librdkafka's bg thread.
Dispatches delivery reports to the originating worker's mailbox. Runs on a librdkafka-owned thread, so MUST NOT touch talloc (races worker-thread allocs) or the FR logger (fr_log is talloc-backed). Allowed: plain pointer deref, atomic load/store, fr_atomic_ring_push, fr_event_user_trigger, malloc/free, rd_kafka_event_destroy.
For each DR:
pctx->request. If NULL the request was cancelled via the signal handler - just free(pctx) inline; no thread_inst or mailbox access. Safe even if the owning worker has since detached.target->mailbox, and fr_event_user_trigger() the worker's wake event. The worker is guaranteed alive because cancellation happens before thread_detach and any still-live request pins its worker.NULL opaque indicates a fire-and-forget produce - nothing to do.
| [in] | rk | UNUSED. |
| [in] | ev | librdkafka event batch; destroyed at end. |
| [in] | uctx | UNUSED (we don't need the inst here). |
Definition at line 817 of file rlm_kafka.c.
Here is the call graph for this function:
Here is the caller graph for this function:
|
static |
librdkafka log callback - bridge internal library messages into the server log
Called from librdkafka's internal threads (no request context, no mctx in scope), so we pull the pre-rendered log prefix off the producer's opaque pointer (the rlm_kafka_t we attached at mod_instantiate). Which librdkafka categories are actually emitted is controlled by the top-level debug config knob.
| [in] | rk | producer handle. rd_kafka_opaque(rk) is the rlm_kafka_t set during mod_instantiate. |
| [in] | level | syslog-style severity (0 emerg .. 7 debug). |
| [in] | fac | librdkafka facility / category, e.g. BROKER, MSG. |
| [in] | buf | pre-formatted message body. |
Definition at line 234 of file rlm_kafka.c.
Here is the caller graph for this function:
|
static |
Resolve the topic named in the method's second identifier, then hand its subsection back to the call_env framework for per-topic value / key tmpl parsing.
Invocations look like kafka.produce.<topic_name>. We:
call_env_parse() with topic_env pointed at the topic's CONF_SECTION - the framework walks value and key for us.Per-topic value and key means each declared topic carries its own payload template; operators can publish different shapes to different topics from one module instance.
Definition at line 412 of file rlm_kafka.c.
Here is the call graph for this function:
|
static |
Worker wake-up callback - the bg cb triggered our EVFILT_USER event.
Pops everything sitting in the mailbox and dispatches each pctx on this worker's stack.
| [in] | el | UNUSED. |
| [in] | uctx | rlm_kafka_thread_t pointer. |
Definition at line 269 of file rlm_kafka.c.
Here is the call graph for this function:
Here is the caller graph for this function:
|
static |
Destructor for inst-scoped topic handles.
Releases the rd_kafka_topic_t.
Definition at line 879 of file rlm_kafka.c.
Here is the caller graph for this function:
|
static |
Translate a librdkafka delivery-report error into a module rcode.
| [in] | request | associated request (for logging). |
| [in] | pctx | produce context with the stashed error. |
rlm_rcode_t summarising the outcome. Definition at line 296 of file rlm_kafka.c.
Here is the caller graph for this function:
|
inlinestatic |
Look up a shared topic handle on the module instance by name.
| [in] | inst | module instance. |
| [in] | name | topic name (must have been declared at config time). |
rd_kafka_topic_t if found, NULL otherwise. Definition at line 209 of file rlm_kafka.c.
Here is the call graph for this function:
Here is the caller graph for this function:
|
inlinestatic |
Common produce-and-yield helper.
Submits a message to the shared producer and returns the produce context on success. The caller is responsible for yielding the request with the returned pctx as rctx. On synchronous failure the pctx is freed and NULL is returned.
pctx is malloc'd (not talloc'd) so the bg cb can free() it directly without racing worker-thread talloc.
| [in] | t | originating worker's thread_inst. |
| [in] | request | request to yield on. |
| [in] | topic | preconfigured inst-scoped topic handle. |
| [in] | key | optional message key, may be NULL. |
| [in] | key_len | length of key, 0 if key is NULL. |
| [in] | value | message payload. |
| [in] | value_len | length of value. |
NULL on failure. Definition at line 343 of file rlm_kafka.c.
Here is the call graph for this function:
Here is the caller graph for this function:
|
static |
Create a shared rd_kafka_topic_t for every declared topic.
Called at mod_instantiate. Walks the topic { <name> { ... } } subsections directly off the module's CONF_SECTION - the kafka base library has already parsed each per-topic conf into an fr_kafka_topic_conf_t stashed via cf_data on the topic's section, so we just fetch and dup it.
Definition at line 893 of file rlm_kafka.c.
Here is the call graph for this function:
Here is the caller graph for this function:
|
static |
Xlat instance init: if the topic arg is a compile-time literal, resolve and cache the inst-scoped rd_kafka_topic_t handle.
Runs after mod_instantiate has created the shared producer and all topic handles, so the lookup is an rbtree walk against inst->topics. Dynamic topic args (attribute refs, nested xlats) are left to the per-call lookup in the xlat runtime; validation happens there.
Definition at line 602 of file rlm_kafka.c.
Here is the call graph for this function:
Here is the caller graph for this function:
|
static |
kafka.produce(topic, key, value) - runtime-named produce
Unlike the mod_produce method form (which resolves topics at config-parse time), the xlat takes the topic name as a runtime argument. Use this when the topic or payload is chosen per-request:
key is optional. Pass null, an empty string, (octets) "", or an attribute that expands to nothing to produce without a key - librdkafka then uses the configured partitioner to spread records across partitions. When a non-empty key is supplied, librdkafka hashes it to pick a partition, so records with the same key end up on the same partition and preserve per-key produce order on the consumer side.
Returns a bool: true on successful delivery, false on failure. The topic must have been declared in the module config (unknown topics fail the xlat) so librdkafka per-topic settings continue to apply to whichever topic is selected.
Runtime behaviour mirrors the method: submit via kafka_produce_enqueue, yield until the delivery report arrives, then resume in kafka_xlat_produce_resume.
Definition at line 735 of file rlm_kafka.c.
Here is the call graph for this function:
Here is the caller graph for this function:
|
static |
Xlat resume: translate delivery report into a "partition:offset" string.
| [in] | xctx_ctx | talloc context for the returned value box. |
| [in] | out | cursor to append the result to. |
| [in] | xctx | xlat ctx, rctx points at the rlm_kafka_msg_ctx_t. |
| [in] | request | associated request (for logging). |
| [in] | in | UNUSED (original args). |
Definition at line 662 of file rlm_kafka.c.
Here is the call graph for this function:
Here is the caller graph for this function:
|
static |
Xlat cancellation.
Same semantics as mod_signal: detach the request from the in-flight pctx so the eventual bg cb discards silently rather than resuming a cancelled request. The bg cb owns the free.
| [in] | xctx | xlat ctx (xctx->rctx is the rlm_kafka_msg_ctx_t). |
| [in] | request | UNUSED. |
| [in] | action | UNUSED (we mask off everything except CANCEL). |
Definition at line 690 of file rlm_kafka.c.
Here is the caller graph for this function:
|
static |
Bootstrap-phase setup.
Just registers the kafka.produce() xlat. Topic declarations are looked up directly via cf_section_find at call_env parse time (see _kafka_topic_env_parse), and at worker thread_instantiate time via cf_section_find_next, so there's nothing to build here.
| [in] | mctx | module-instance ctx. |
Definition at line 1155 of file rlm_kafka.c.
Here is the call graph for this function:
|
static |
Module detach: tear down the shared producer.
rd_kafka_flush gives in-flight produces a grace window to complete and fire their DRs through the bg cb. By this point every worker has already detached; any remaining pctxs have request == NULL and will be freed inline by the bg cb.rd_kafka_destroy - destroy auto-tears-down topic handles attached to the producer, and we'd double-free via _topic_free otherwise.rd_kafka_destroy blocks until the bg thread exits; after that no more callbacks can fire. Definition at line 1121 of file rlm_kafka.c.
Here is the call graph for this function:
|
static |
Module-instance setup.
Builds the log prefix, wires up the log + background event callbacks on the shared conf, enables DR / ERROR events, creates the single shared producer, forwards the main queue to the background queue (so DRs reach _kafka_background_event_cb via librdkafka's own bg thread), and finally creates the inst-scoped topic handles.
| [in] | mctx | module-instance ctx. |
Definition at line 1060 of file rlm_kafka.c.
Here is the call graph for this function:
|
static |
One-time library load hook.
Prime librdkafka's lazy global init (SSL lock callbacks on legacy OpenSSL, SASL globals if compiled in) so the first real rd_kafka_new() in a worker thread doesn't race the server's own OpenSSL setup. Ref-counted against any other kafka-using module.
Definition at line 1175 of file rlm_kafka.c.
Here is the call graph for this function:
|
static |
Module method entry point for kafka.produce.
<topic>
The topic is the method's second identifier; key and value are pulled from the module config via rlm_kafka_produce_env:
The topic name in name2 is resolved against the declared-topic tree at config-parse time (see _kafka_topic_env_parse), so typos fail fast. Different topics reuse the same module instance - e.g. kafka.produce.auth and kafka.produce.accounting both dispatch through the same per-worker producer handle.
Runtime behaviour: looks up the per-thread rd_kafka_topic_t by the parse-time-resolved topic declaration, extracts the expanded key/value tmpls from the call_env, hands everything to kafka_produce_enqueue, and yields until the delivery report arrives (see kafka_produce_resume for rcode mapping).
| [out] | p_result | UNUSED (resume writes the real rcode). |
| [in] | mctx | module ctx (mctx->thread is the rlm_kafka_thread_t, mctx->env_data is the rlm_kafka_env_t). |
| [in] | request | the request being handled. |
Definition at line 549 of file rlm_kafka.c.
Here is the call graph for this function:
|
static |
Resume a yielded module method after its delivery report has arrived.
Runs on the same worker as the originating produce (per-thread producer), with mctx->rctx being the rlm_kafka_msg_ctx_t the method stashed on yield. Translates the dr_msg_cb-populated error into an rcode, frees the pctx, and hands control back to unlang.
| [out] | p_result | where to write the resulting rcode. |
| [in] | mctx | module ctx carrying the pctx as rctx. |
| [in] | request | the request being resumed. |
UNLANG_ACTION_CALCULATE_RESULT always. Definition at line 477 of file rlm_kafka.c.
Here is the call graph for this function:
Here is the caller graph for this function:
|
static |
Module-method cancellation.
Do NOT free the pctx - librdkafka still owns the in-flight message and will fire a DR later with our opaque pointer. Atomic-store NULL to pctx->request so the bg cb sees the cancellation when it unpacks the DR and frees the pctx inline without touching the mailbox.
Cross-thread: the store is release, matched by the acquire-load in _kafka_background_event_cb.
| [in] | mctx | module ctx with pctx as rctx. |
| [in] | request | associated request. |
| [in] | action | UNUSED (we mask off everything except CANCEL). |
Definition at line 500 of file rlm_kafka.c.
Here is the caller graph for this function:
|
static |
Tear down a worker's kafka state.
The barrier we need is "no bg cb is mid-invocation for any of our
pctxs when the framework frees `t` / `t->el`". rd_kafka_flush waits for every outstanding produce's DR to be fired AND the bg cb to have returned, which gives us exactly that. If flush times out (broker unreachable mid-shutdown) we purge all inflight messages - librdkafka synthesises ERR__PURGE_QUEUE DRs for them locally, no broker round-trip - and a second flush drains those through the bg cb with an unbounded wait (purge makes the drain finite without needing a user-configured timeout).
Every worker flushes. The first one through actually drains librdkafka's queues; subsequent calls return immediately because outq_len is already zero. The cost is one extra flush call per worker (cheap when there's nothing to wait for), the gain is that each worker has its own barrier guaranteeing no bg cb invocation is mid-flight against this worker's t->queue / t->wake.
Order: flush -> drain mailbox -> free wake. Freeing the wake before draining would race a bg cb that loaded a non-NULL pctx->request just before cancellation propagated and is about to call fr_event_user_trigger(t->wake).
| [in] | mctx | thread-instance ctx. |
Definition at line 954 of file rlm_kafka.c.
Here is the call graph for this function:
|
static |
Stand up this worker's kafka mailbox + wake event.
Allocates the segmented SPSC ring that the bg cb will push delivery reports onto and registers the EVFILT_USER wake event the cb uses to kick us. The shared producer itself is created once at mod_instantiate - there's nothing per-worker to wire up there.
| [in] | mctx | thread-instance ctx (mctx->thread is our rlm_kafka_thread_t, mctx->el is the worker's event list). |
Definition at line 1024 of file rlm_kafka.c.
Here is the call graph for this function:
|
static |
Paired with mod_load.
Definition at line 1181 of file rlm_kafka.c.
Here is the call graph for this function:
|
static |
| [in] | a | rlm_kafka_topic_t |
| [in] | b | same. |
strcmp ordering of a->name and b->name. Definition at line 195 of file rlm_kafka.c.
Here is the caller graph for this function:
|
static |
Definition at line 697 of file rlm_kafka.c.
|
static |
Module config: just the kafka base producer config for now.
Kept as a local array rather than pointing common.config directly at KAFKA_BASE_PRODUCER_CONFIG so we can drop in rlm_kafka-specific entries (or additional librdkafka properties) alongside it later without touching the library.
Definition at line 177 of file rlm_kafka.c.
| module_rlm_t rlm_kafka |
Definition at line 1186 of file rlm_kafka.c.
|
static |
Definition at line 456 of file rlm_kafka.c.
|
static |
Per-topic call_env rules, applied against the topic <name> subsection.
Invoked recursively from _kafka_topic_env_parse via call_env_parse() so the framework handles pair lookup / tmpl compilation / offset writes for us.
Both value and key are typed FR_TYPE_OCTETS. Kafka payloads and keys are opaque byte strings on the wire, and casting to octets keeps binary content (embedded NULs, high-bit bytes) intact without any UTF-8/string-termination assumptions creeping in from intermediate tmpl expansion. It also means an integer-typed key attribute is serialised in network byte order, which matches the keying convention other Kafka clients use so the same numeric key hashes to the same partition across producers.
Definition at line 388 of file rlm_kafka.c.
1.9.8