55RCSID(
"$Id: d4d831e7f9306c4fa151ca65044ef7eec0cbda7b $")
59#include <freeradius-devel/util/debug.h>
60#include <freeradius-devel/util/dlist.h>
61#include <freeradius-devel/util/misc.h>
62#include <freeradius-devel/util/rb.h>
63#include <freeradius-devel/util/types.h>
64#include <freeradius-devel/util/value.h>
66#include <freeradius-devel/io/atomic_queue.h>
67#include <freeradius-devel/kafka/base.h>
69#include <freeradius-devel/server/base.h>
70#include <freeradius-devel/server/module_rlm.h>
72#include <freeradius-devel/unlang/call_env.h>
73#include <freeradius-devel/unlang/module.h>
74#include <freeradius-devel/unlang/xlat.h>
75#include <freeradius-devel/unlang/xlat_ctx.h>
76#include <freeradius-devel/unlang/xlat_func.h>
77#include <freeradius-devel/unlang/xlat_priv.h>
117 rd_kafka_topic_t *
kt;
153 rd_kafka_resp_err_t
err;
208static inline CC_HINT(always_inline)
214 if (!
inst->topics)
return NULL;
217 return h ? h->
kt : NULL;
234static void _kafka_log_cb(rd_kafka_t
const *rk,
int level,
char const *fac,
char const *buf)
243 ERROR(
"%s - %s - %s",
inst->log_prefix, fac, buf);
247 WARN(
"%s - %s - %s",
inst->log_prefix, fac, buf);
252 INFO(
"%s - %s - %s",
inst->log_prefix, fac, buf);
256 DEBUG(
"%s - %s - %s",
inst->log_prefix, fac, buf);
299 case RD_KAFKA_RESP_ERR_NO_ERROR:
300 RDEBUG2(
"Delivered to partition %" PRId32
" offset %" PRId64, pctx->partition, pctx->offset);
303 case RD_KAFKA_RESP_ERR__MSG_TIMED_OUT:
304 case RD_KAFKA_RESP_ERR__TIMED_OUT:
305 case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE:
306 REDEBUG(
"Kafka delivery timed out - %s", rd_kafka_err2str(pctx->err));
309 case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE:
310 case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
311 case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED:
312 case RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC:
313 REDEBUG(
"Kafka rejected message - %s", rd_kafka_err2str(pctx->err));
317 REDEBUG(
"Kafka delivery failed - %s (%s)",
318 rd_kafka_err2name(pctx->err), rd_kafka_err2str(pctx->err));
342static inline CC_HINT(always_inline)
344 rd_kafka_topic_t *topic,
345 uint8_t const *key,
size_t key_len,
350 MEM(pctx = malloc(
sizeof(*pctx)));
352 pctx->err = RD_KAFKA_RESP_ERR_NO_ERROR;
353 pctx->partition = RD_KAFKA_PARTITION_UA;
357 if (
unlikely(rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
359 (
void *)(uintptr_t)
value, value_len,
362 rd_kafka_resp_err_t
err = rd_kafka_last_error();
366 REDEBUG(
"Failed enqueuing message - %s", rd_kafka_err2str(
err));
422 cf_log_err(ci,
"kafka.produce requires a topic name, e.g. kafka.produce.<topic>");
428 cf_log_err(ci,
"Kafka topic '%s' is not declared in the '%s' module config",
429 topic_name, cec->
mi->
name);
504 RDEBUG2(
"Cancellation signal received - detaching delivery report");
555 rd_kafka_topic_t *topic;
566 REDEBUG(
"Kafka topic '%s' has no handle on this module instance", env->
topic);
571 key = env->
key->vb_octets;
572 key_len = env->
key->vb_length;
577 env->
value->vb_octets, env->
value->vb_length);
608 char const *topic_name;
621 if (!topic_node)
return 0;
628 PERROR(
"First argument of %%<module>.produce() must be stringlike");
631 topic_name = topic_vb.vb_strvalue;
633 topic_name = topic_node->data.vb_strvalue;
644 "Kafka topic '%s' is not declared in the '%s' module config",
668 bool delivered = (pctx->err == RD_KAFKA_RESP_ERR_NO_ERROR);
670 if (
unlikely(!delivered))
REDEBUG(
"Kafka produce failed - %s", rd_kafka_err2str(pctx->err));
673 vb->vb_bool = delivered;
745 rd_kafka_topic_t *topic;
757 fr_assert(topic_vb && key_vb && value_vb);
763 topic = xlat_inst->
topic;
766 REDEBUG(
"Kafka topic '%s' is not declared in the module config", topic_vb->vb_strvalue);
779 key = key_vb->vb_octets;
780 key_len = key_vb->vb_length;
785 value_vb->vb_octets, value_vb->vb_length);
819 switch (rd_kafka_event_type(ev)) {
820 case RD_KAFKA_EVENT_DR:
822 rd_kafka_message_t
const *
msg;
823 while ((
msg = rd_kafka_event_message_next(ev))) {
827 if (!
msg->_private)
continue;
829 pctx =
msg->_private;
847 pctx->err =
msg->err;
848 pctx->partition =
msg->partition;
849 pctx->offset =
msg->offset;
875 rd_kafka_event_destroy(ev);
881 if (h->
kt) rd_kafka_topic_destroy(h->
kt);
897 if (!
inst->kconf.topics)
return 0;
901 rd_kafka_topic_conf_t *ktc;
903 MEM(ktc = rd_kafka_topic_conf_dup(topic->conf->rdtc));
906 topic_t->
kt = rd_kafka_topic_new(
inst->rk, topic_t->
name, ktc);
909 rd_kafka_topic_conf_destroy(ktc);
910 ERROR(
"Failed creating topic '%s' - %s",
911 topic_t->
name, rd_kafka_err2str(rd_kafka_last_error()));
959 rd_kafka_resp_err_t
err;
981 WARN(
"Shutdown flush timed out, purging %d in-flight message(s)",
982 rd_kafka_outq_len(
inst->rk));
984 rd_kafka_purge(
inst->rk, RD_KAFKA_PURGE_F_QUEUE | RD_KAFKA_PURGE_F_INFLIGHT);
992 (void) rd_kafka_flush(
inst->rk, -1);
1007 TALLOC_FREE(t->
wake);
1042 PERROR(
"fr_event_user_insert failed");
1063 rd_kafka_conf_t *
conf;
1076 rd_kafka_conf_set_events(
conf, RD_KAFKA_EVENT_DR | RD_KAFKA_EVENT_ERROR);
1077 rd_kafka_conf_set_opaque(
conf,
inst);
1079 inst->rk = rd_kafka_new(RD_KAFKA_PRODUCER,
conf, errstr,
sizeof(errstr));
1081 rd_kafka_conf_destroy(
conf);
1082 ERROR(
"rd_kafka_new failed - %s", errstr);
1092 rd_kafka_queue_t *main_q = rd_kafka_queue_get_main(
inst->rk);
1093 rd_kafka_queue_t *bg_q = rd_kafka_queue_get_background(
inst->rk);
1095 rd_kafka_queue_forward(main_q, bg_q);
1096 rd_kafka_queue_destroy(main_q);
1097 rd_kafka_queue_destroy(bg_q);
1101 rd_kafka_destroy(
inst->rk);
1126 rd_kafka_resp_err_t ferr;
1129 if (ferr != RD_KAFKA_RESP_ERR_NO_ERROR) {
1130 WARN(
"kafka - flush timed out; %d messages remain in queue",
1131 rd_kafka_outq_len(
inst->rk));
1135 TALLOC_FREE(
inst->topics);
1138 rd_kafka_destroy(
inst->rk);
1160 if (!xlat)
return -1;
unlang_action_t
Returned by unlang_op_t calls, determine the next action of the interpreter.
bool fr_atomic_ring_push(fr_atomic_ring_t *ring, void *data)
Push a pointer into the ring; allocate a new segment on overflow.
fr_atomic_ring_t * fr_atomic_ring_alloc(TALLOC_CTX *ctx, size_t seg_size)
Allocate an empty SPSC ring.
bool fr_atomic_ring_pop(fr_atomic_ring_t *ring, void **p_data)
Pop a pointer from the ring, advancing past drained segments.
#define USES_APPLE_DEPRECATED_API
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
int call_env_parse(TALLOC_CTX *ctx, call_env_parsed_head_t *parsed, char const *name, tmpl_rules_t const *t_rules, CONF_SECTION const *cs, call_env_ctx_t const *cec, call_env_parser_t const *rule)
Parse per call env.
call_env_parsed_t * call_env_parsed_add(TALLOC_CTX *ctx, call_env_parsed_head_t *head, call_env_parser_t const *rule)
Allocate a new call_env_parsed_t structure and add it to the list of parsed call envs.
void call_env_parsed_set_data(call_env_parsed_t *parsed, void const *data)
Assign data to a call_env_parsed_t.
#define CALL_ENV_TERMINATOR
#define FR_CALL_ENV_METHOD_OUT(_inst)
Helper macro for populating the size/type fields of a call_env_method_t from the output structure typ...
call_env_parser_t const * env
Parsing rules for call method env.
section_name_t const * asked
The actual name1/name2 that resolved to a module_method_binding_t.
@ CALL_ENV_FLAG_CONCAT
If the tmpl produced multiple boxes they should be concatenated.
@ CALL_ENV_FLAG_PARSE_ONLY
The result of parsing will not be evaluated at runtime.
@ CALL_ENV_FLAG_REQUIRED
Associated conf pair or section is required.
@ CALL_ENV_FLAG_PARSE_MISSING
If this subsection is missing, still parse it.
@ CALL_ENV_FLAG_NULLABLE
Tmpl expansions are allowed to produce no output.
@ CALL_ENV_PARSE_TYPE_VOID
Output of the parsing phase is undefined (a custom structure).
module_instance_t const * mi
Module instance that the callenv is registered to.
#define FR_CALL_ENV_SUBSECTION_FUNC(_name, _name2, _flags, _func)
Specify a call_env_parser_t which parses a subsection using a callback function.
#define FR_CALL_ENV_OFFSET(_name, _cast_type, _flags, _struct, _field)
Specify a call_env_parser_t which writes out runtime results to the specified field.
#define CONF_PARSER_TERMINATOR
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Defines a CONF_PAIR to C data type mapping.
Common header for all CONF_* types.
#define cf_log_err(_cf, _fmt,...)
static int fr_dcursor_append(fr_dcursor_t *cursor, void *v)
Insert a single item at the end of the list.
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
#define MODULE_MAGIC_INIT
Stop people using different module/library/server versions together.
#define fr_event_user_insert(_ctx, _el, _ev_p, _trigger, _callback, _uctx)
void unlang_interpret_mark_runnable(request_t *request)
Mark a request as resumable.
CONF_SECTION * cs
topic's CONF_SECTION (for call_env lookups of per-topic pairs like value / key)
#define KAFKA_BASE_CONFIG
Config entries common to producer and consumer clients.
#define KAFKA_PRODUCER_CONFIG
Producer-only delta: librdkafka producer tuning + declared topics.
Declared topic record - one per topic { <name> { ... } } subsection.
void fr_kafka_free(void)
Drop one ref to librdkafka's global init.
fr_kafka_topic_t * kafka_topic_conf_find(fr_kafka_conf_t const *kc, char const *name)
Look up a declared topic by name on an fr_kafka_conf_t
int fr_kafka_init(void)
Drive librdkafka's lazy global init deterministically.
int fr_event_user_trigger(fr_event_user_t *ev)
Trigger a user event.
Stores all information relating to an event list.
Callbacks for kevent() user events.
static void partition(fr_lst_t *lst, stack_index_t stack_index)
@ FR_TYPE_STRING
String of printable characters.
@ FR_TYPE_BOOL
A truth value.
@ FR_TYPE_OCTETS
Raw octets.
void * env_data
Per call environment data.
module_instance_t const * mi
Instance of the module being instantiated.
void * thread
Thread specific instance data.
void * rctx
Resume ctx that a module previously set.
fr_event_list_t * el
Event list to register any IO handlers and timers against.
module_instance_t * mi
Module instance to detach.
void * thread
Thread instance data.
module_instance_t const * mi
Instance of the module being instantiated.
module_instance_t * mi
Instance of the module being instantiated.
Temporary structure to hold arguments for module calls.
Temporary structure to hold arguments for detach calls.
Temporary structure to hold arguments for instantiation calls.
Temporary structure to hold arguments for thread_instantiation calls.
xlat_t * module_rlm_xlat_register(TALLOC_CTX *ctx, module_inst_ctx_t const *mctx, char const *name, xlat_func_t func, fr_type_t return_type)
module_t common
Common fields presented by all modules.
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
#define fr_rb_inorder_foreach(_tree, _type, _iter)
The main red black tree structure.
#define RETURN_UNLANG_RCODE(_rcode)
#define RETURN_UNLANG_FAIL
rlm_rcode_t
Return codes indicating the result of the module call.
@ RLM_MODULE_OK
The module is OK, continue.
@ RLM_MODULE_FAIL
Module failed, don't reply.
@ RLM_MODULE_REJECT
Immediately reject the request.
@ RLM_MODULE_TIMEOUT
Module (or section) timed out.
char const * log_prefix
pre-rendered "rlm_kafka (<instance>)", used by librdkafka's log_cb which fires from internal threads ...
static int mod_detach(module_detach_ctx_t const *mctx)
Module detach: tear down the shared producer.
static int mod_load(void)
One-time library load hook.
fr_value_box_t * value
message payload
static unlang_action_t mod_produce(UNUSED unlang_result_t *p_result, module_ctx_t const *mctx, request_t *request)
Module method entry point for kafka.produce.
static xlat_action_t kafka_xlat_produce(UNUSED TALLOC_CTX *xctx_ctx, UNUSED fr_dcursor_t *out, xlat_ctx_t const *xctx, request_t *request, fr_value_box_list_t *in)
kafka.produce(topic, key, value) - runtime-named produce
static xlat_arg_parser_t const kafka_xlat_produce_args[]
static void mod_signal(module_ctx_t const *mctx, request_t *request, UNUSED fr_signal_t action)
Module-method cancellation.
fr_rb_tree_t * topics
rlm_kafka_topic_t keyed by name, read-only after mod_instantiate.
static int _kafka_topic_env_parse(TALLOC_CTX *ctx, call_env_parsed_head_t *out, tmpl_rules_t const *t_rules, CONF_ITEM *ci, call_env_ctx_t const *cec, UNUSED call_env_parser_t const *rule)
Resolve the topic named in the method's second identifier, then hand its subsection back to the call_...
fr_atomic_ring_t * queue
rlm_kafka_msg_ctx_t pushed by bg cb on librdkafka's thread, popped by our event loop on this worker.
static int kafka_xlat_instantiate(xlat_inst_ctx_t const *xctx)
Xlat instance init: if the topic arg is a compile-time literal, resolve and cache the inst-scoped rd_...
fr_value_box_t * key
optional message key
static int kafka_topics_alloc(rlm_kafka_t *inst)
Create a shared rd_kafka_topic_t for every declared topic.
static int mod_bootstrap(module_inst_ctx_t const *mctx)
Bootstrap-phase setup.
static int _topic_free(rlm_kafka_topic_t *h)
Destructor for inst-scoped topic handles.
static void kafka_xlat_produce_signal(xlat_ctx_t const *xctx, UNUSED request_t *request, UNUSED fr_signal_t action)
Xlat cancellation.
char const * topic
resolved topic name (validated at parse time)
static void mod_unload(void)
Paired with mod_load.
rd_kafka_topic_t * topic
pre-resolved handle, NULL if topic arg is dynamic.
fr_kafka_conf_t kconf
parsed producer conf - MUST be first
static rlm_rcode_t kafka_err_to_rcode(request_t *request, rlm_kafka_msg_ctx_t const *pctx)
Translate a librdkafka delivery-report error into a module rcode.
static void _kafka_background_event_cb(UNUSED rd_kafka_t *rk, rd_kafka_event_t *ev, UNUSED void *uctx)
Background event callback, runs on librdkafka's bg thread.
rd_kafka_t * rk
shared producer, created at mod_instantiate.
static unlang_action_t mod_resume(unlang_result_t *p_result, module_ctx_t const *mctx, request_t *request)
Resume a yielded module method after its delivery report has arrived.
static int mod_thread_instantiate(module_thread_inst_ctx_t const *mctx)
Stand up this worker's kafka mailbox + wake event.
fr_time_delta_t flush_timeout
How long mod_detach waits for in-flight produces to drain before rd_kafka_destroy.
static const call_env_method_t rlm_kafka_produce_env
static rlm_kafka_msg_ctx_t * kafka_produce_enqueue(rlm_kafka_thread_t *t, request_t *request, rd_kafka_topic_t *topic, uint8_t const *key, size_t key_len, uint8_t const *value, size_t value_len)
Common produce-and-yield helper.
fr_event_user_t * wake
EVFILT_USER handle; bg cb triggers it to wake the worker's event loop on this thread.
static xlat_action_t kafka_xlat_produce_resume(TALLOC_CTX *xctx_ctx, fr_dcursor_t *out, xlat_ctx_t const *xctx, request_t *request, UNUSED fr_value_box_list_t *in)
Xlat resume: translate delivery report into a "partition:offset" string.
static int8_t topic_name_cmp(void const *a, void const *b)
struct rlm_kafka_thread_s rlm_kafka_thread_t
pthread_t worker_tid
pthread_self() captured at thread_instantiate.
static int mod_thread_detach(module_thread_inst_ctx_t const *mctx)
Tear down a worker's kafka state.
static call_env_parser_t const topic_env[]
Per-topic call_env rules, applied against the topic <name> subsection.
static int mod_instantiate(module_inst_ctx_t const *mctx)
Module-instance setup.
static conf_parser_t const module_config[]
Module config: just the kafka base producer config for now.
static void _kafka_wake(UNUSED fr_event_list_t *el, void *uctx)
Worker wake-up callback - the bg cb triggered our EVFILT_USER event.
static rd_kafka_topic_t * kafka_find_topic(rlm_kafka_t const *inst, char const *name)
Look up a shared topic handle on the module instance by name.
static void _kafka_log_cb(rd_kafka_t const *rk, int level, char const *fac, char const *buf)
librdkafka log callback - bridge internal library messages into the server log
Call env for kafka.produce.
Per produce() invocation context.
Xlat instance data - cached topic handle for literal-topic calls.
#define SECTION_NAME(_name1, _name2)
Define a section name consisting of a verb and a noun.
char const * name2
Second section name. Usually a packet type like 'access-request', 'access-accept',...
char const * name
Instance name e.g. user_database.
CONF_SECTION * conf
Module's instance configuration.
size_t inst_size
Size of the module's instance data.
void * data
Module's instance data.
void * boot
Data allocated during the boostrap phase.
#define MODULE_BINDING_TERMINATOR
Terminate a module binding list.
Named methods exported by a module.
Optional arguments passed to vp_tmpl functions.
fr_signal_t
Signals that can be generated/processed by request signal handlers.
@ FR_SIGNAL_CANCEL
Request has been cancelled.
unlang_action_t unlang_module_yield(request_t *request, module_method_t resume, unlang_module_signal_t signal, fr_signal_t sigmask, void *rctx)
Yield a request back to the interpreter from within a module.
eap_aka_sim_process_conf_t * inst
#define atomic_load_explicit(object, order)
#define atomic_store_explicit(object, desired, order)
#define atomic_init(obj, value)
char * talloc_typed_asprintf(TALLOC_CTX *ctx, char const *fmt,...)
Call talloc vasprintf, setting the type on the new chunk correctly.
#define talloc_get_type_abort_const
#define talloc_strdup(_ctx, _str)
static int64_t fr_time_delta_to_msec(fr_time_delta_t delta)
A time delta, a difference in time measured in nanoseconds.
static fr_event_list_t * el
xlat_action_t unlang_xlat_yield(request_t *request, xlat_func_t resume, xlat_func_signal_t signal, fr_signal_t sigmask, void *rctx)
Yield a request back to the interpreter from within a module.
bool xlat_is_literal(xlat_exp_head_t const *head)
Check to see if the expansion consists entirely of value-box elements.
unsigned int required
Argument must be present, and non-empty.
#define XLAT_ARG_PARSER_TERMINATOR
@ XLAT_ACTION_FAIL
An xlat function failed.
@ XLAT_ACTION_DONE
We're done evaluating this level of nesting.
Definition for a single argument consumed by an xlat function.
#define fr_type_is_null(_x)
int fr_value_box_cast(TALLOC_CTX *ctx, fr_value_box_t *dst, fr_type_t dst_type, fr_dict_attr_t const *dst_enumv, fr_value_box_t const *src)
Convert one type of fr_value_box_t to another.
void fr_value_box_clear_value(fr_value_box_t *data)
Clear/free any existing value.
#define fr_value_box_alloc(_ctx, _type, _enumv)
Allocate a value box of a specific type.
#define FR_VALUE_BOX_INITIALISER_NULL(_vb)
A static initialiser for stack/globally allocated boxes.
static size_t char ** out
void * rctx
Resume context.
xlat_exp_t * ex
Tokenized expression to use in expansion.
void const * inst
xlat instance data.
module_inst_ctx_t const * mctx
Synthesised module calling ctx.
module_ctx_t const * mctx
Synthesised module calling ctx.
void * inst
xlat instance data to populate.
An xlat instantiation ctx.
int xlat_func_args_set(xlat_t *x, xlat_arg_parser_t const args[])
Register the arguments of an xlat.
#define xlat_func_instantiate_set(_xlat, _instantiate, _inst_struct, _detach, _uctx)
Set a callback for global instantiation of xlat functions.
@ XLAT_GROUP
encapsulated string of xlats
xlat_type_t _CONST type
type of this expansion.
static xlat_exp_t * xlat_exp_head(xlat_exp_head_t const *head)