50RCSID(
"$Id: e2b2aa4df197056d1663a6687c4c61688da97810 $")
52#define LOG_PREFIX worker->name
53#define LOG_DST worker->log
55#include <freeradius-devel/io/channel.h>
56#include <freeradius-devel/io/listen.h>
57#include <freeradius-devel/io/message.h>
58#include <freeradius-devel/io/worker.h>
59#include <freeradius-devel/unlang/base.h>
60#include <freeradius-devel/unlang/call.h>
61#include <freeradius-devel/unlang/interpret.h>
62#include <freeradius-devel/server/request.h>
63#include <freeradius-devel/server/time_tracking.h>
64#include <freeradius-devel/util/dlist.h>
65#include <freeradius-devel/util/minmax_heap.h>
71#define WORKER_VERIFY worker_verify(worker)
76#define CACHE_LINE_SIZE 64
182 fr_perror(
"Failed allocating memory for worker ring buffer");
193 return (pthread_equal(pthread_self(), worker->
thread_id) != 0);
251 bool ok, was_sleeping;
297 if (worker->
channel[i].
ch != NULL)
continue;
302 DEBUG3(
"Received channel %p into array entry %d", ch, i);
330 if (worker->
channel[i].
ch != ch)
continue;
337 "Network added messages to channel after sending FR_CHANNEL_CLOSE");
374 RDEBUG(
"Canceling request due to socket being closed");
399 memcpy(&li,
data,
sizeof(li));
472 reply->reply.request_time = cd->request.recv_time;
486 DEBUG2(
"Failed sending reply to channel");
548 REDEBUG(
"Request has reached max_request_time - signalling it to stop");
573 if (!request)
return;
580 ERROR(
"Failed inserting max_request_time timer");
601 RDEBUG3(
"Time tracking started in yielded state");
614 RDEBUG3(
"Time tracking ended");
644 size = request->async->listen->app_io->default_reply_size;
645 if (!size) size = request->async->listen->app_io->default_message_size;
651 ch = request->async->channel;
676 fr_listen_t const *listen = request->async->listen;
686 RPERROR(
"Failed encoding request");
696 fr_assert((
size_t) slen <= reply->m.rb_size);
707 reply->reply.processing_time = request->async->tracking.running_total;
708 reply->reply.request_time = request->async->recv_time;
710 reply->
listen = request->async->listen;
711 reply->
packet_ctx = request->async->packet_ctx;
719 RDEBUG(
"Finished request");
732 RPERROR(
"Failed sending reply to network thread");
743 request->async->el = NULL;
744 request->async->channel = NULL;
745 request->async->packet_ctx = NULL;
746 request->async->listen = NULL;
758 char const *numbers =
"0123456789";
764 *(p--) = numbers[number % 10];
768 if (p[1])
return talloc_strdup(ctx, p + 1);
770 return talloc_strdup(ctx,
"0");
776static inline CC_HINT(always_inline)
786 request->packet->timestamp = now;
787 request->async = talloc_zero(request,
fr_async_t);
788 request->async->recv_time = now;
789 request->async->el = worker->el;
793static inline CC_HINT(always_inline)
811 if (!request)
goto nak;
821 request->packet->timestamp = cd->request.recv_time;
832 request->async->channel = cd->channel.ch;
834 request->async->recv_time = cd->request.recv_time;
836 request->async->listen = cd->
listen;
838 request->async->priority = cd->
priority;
839 listen = request->async->listen;
863 RERROR(
"Protocol failed to set 'process' function");
877 if (request->async->listen->track_duplicates) {
885 fr_assert(old->async->listen == request->async->listen);
886 fr_assert(old->async->channel == request->async->channel);
905 if (
fr_time_eq(old->async->recv_time, request->async->recv_time)) {
906 RWARN(
"Discarding duplicate of request (%"PRIu64
")", old->number);
931 RWARN(
"Got conflicting packet for request (%" PRIu64
"), telling old request to stop", old->number);
967 ret =
CMP(b->async->priority, a->async->priority);
968 if (ret != 0)
return ret;
970 ret =
CMP(a->async->sequence, b->async->sequence);
971 if (ret != 0)
return ret;
973 return fr_time_cmp(a->async->recv_time, b->async->recv_time);
983 return fr_time_cmp(a->async->recv_time, b->async->recv_time);
994 ret =
CMP(a->async->listen, b->async->listen);
997 return CMP(a->async->packet_ctx, b->async->packet_ctx);
1032 DEBUG(
"Worker is exiting - telling request %s to stop", request->name);
1054 "Pending messages in channel after cancelling request");
1093 fr_assert(request->async->listen != NULL);
1099 if (request->async->listen->track_duplicates) {
1115 "Request %s bad log indentation - expected 0 got %u", request->name, request->log.indent.unlang);
1117 "Request %s is marked as yielded at end of processing", request->name);
1216 RDEBUG3(
"Forcing time tracking to running state, from yielded, for request detach");
1223 RDEBUG3(
"Request is detached");
1240 RDEBUG3(
"Cleaning up request execution state");
1248 RDEBUG3(
"Forcing time tracking to running state, from yielded, for request stop");
1267 RDEBUG3(
"Request marked as runnable");
1368 worker->
name = talloc_strdup(worker,
name);
1374#define CHECK_CONFIG(_x, _min, _max) do { \
1375 if (!worker->config._x) worker->config._x = _min; \
1376 if (worker->config._x < _min) worker->config._x = _min; \
1377 if (worker->config._x > _max) worker->config._x = _max; \
1380#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max) do { \
1381 if (fr_time_delta_lt(worker->config._x, _min)) worker->config._x = _min; \
1382 if (fr_time_delta_gt(worker->config._x, _max)) worker->config._x = _max; \
1400 worker->
log = logger;
1447 if (!worker->
dedup) {
1498 bool wait_for_event;
1508 if (wait_for_event) {
1511 DEBUG4(
"Ready to process requests");
1518 DEBUG4(
"Gathering events - %s", wait_for_event ?
"will wait" :
"Will not wait");
1520 if (num_events < 0) {
1522 DEBUG4(
"Event loop exiting");
1526 PERROR(
"Failed retrieving events");
1530 DEBUG4(
"%u event(s) pending", num_events);
1535 if (num_events > 0) {
1536 DEBUG4(
"Servicing event(s)");
1557 if (!request)
return 0;
1588 fprintf(fp,
"\tnum_channels = %d\n", worker->
num_channels);
1589 fprintf(fp,
"\tstats.in = %" PRIu64
"\n", worker->
stats.
in);
1591 fprintf(fp,
"\tcalculated (predicted) total CPU time = %" PRIu64
"\n",
1593 fprintf(fp,
"\tcalculated (counted) per request time = %" PRIu64
"\n",
1617 id = pthread_self();
1618 same = (pthread_equal(
id, worker->
thread_id) != 0);
1621 if (!ch)
return NULL;
1653#ifdef WITH_VERIFY_PTR
1662 (void) talloc_get_type_abort(worker,
fr_worker_t);
1687 if (num < 0)
return -1;
1688 if (num == 0)
return 0;
1691 if (num >= 2) stats[1] = worker->
stats.
out;
1692 if (num >= 3) stats[2] = worker->
stats.
dup;
1694 if (num >= 5) stats[4] = worker->
num_naks;
1697 if (num <= 6)
return num;
1707 if ((info->
argc == 0) || (strcmp(info->
argv[0],
"count") == 0)) {
1708 fprintf(fp,
"count.in\t\t\t%" PRIu64
"\n", worker->
stats.
in);
1709 fprintf(fp,
"count.out\t\t\t%" PRIu64
"\n", worker->
stats.
out);
1710 fprintf(fp,
"count.dup\t\t\t%" PRIu64
"\n", worker->
stats.
dup);
1711 fprintf(fp,
"count.dropped\t\t\t%" PRIu64
"\n", worker->
stats.
dropped);
1712 fprintf(fp,
"count.naks\t\t\t%" PRIu64
"\n", worker->
num_naks);
1713 fprintf(fp,
"count.active\t\t\t%" PRIu64
"\n", worker->
num_active);
1717 if ((info->
argc == 0) || (strcmp(info->
argv[0],
"cpu") == 0)) {
1742 .help =
"Statistics for workers threads.",
1747 .parent =
"stats worker",
1750 .syntax =
"[(count|cpu)]",
1752 .help =
"Show statistics for a specific worker thread.",
static int const char char buffer[256]
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
size_t default_reply_size
same for replies
size_t default_message_size
Usually maximum message size.
fr_io_nak_t nak
Function to send a NAK.
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
#define fr_atexit_thread_local(_name, _free, _uctx)
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Structure to hold the atomic queue.
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
unlang_action_t unlang_call_push(request_t *request, CONF_SECTION *server_cs, bool top_frame)
Push a call frame onto the stack.
fr_table_num_sorted_t const channel_signals[]
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
int fr_channel_set_recv_request(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_request)
void * fr_channel_responder_uctx_get(fr_channel_t *ch)
Get responder-specific data from a channel.
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
int fr_channel_null_reply(fr_channel_t *ch)
Don't send a reply message into the channel.
void fr_channel_responder_uctx_add(fr_channel_t *ch, void *uctx)
Add responder-specific data to a channel.
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
bool fr_channel_active(fr_channel_t *ch)
Check if a channel is active.
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
A full channel, which consists of two ends.
fr_message_t m
the message header
@ FR_CHANNEL_DATA_READY_REQUESTOR
@ FR_CHANNEL_DATA_READY_RESPONDER
void * packet_ctx
Packet specific context for holding client information, and other proto_* specific information that n...
fr_listen_t * listen
for tracking packet transport, etc.
uint32_t priority
Priority of this packet.
Channel information which is added to a message.
int argc
current argument count
char const * parent
e.g. "show module"
char const ** argv
text version of commands
#define FR_CONTROL_ID_LISTEN_DEAD
#define FR_CONTROL_ID_CHANNEL
#define FR_CONTROL_ID_LISTEN
#define FR_CONTROL_MAX_SIZE
#define FR_CONTROL_MAX_MESSAGES
static fr_ring_buffer_t * rb
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
static void fr_dlist_entry_unlink(fr_dlist_t *entry)
Remove an item from the dlist when we don't have access to the head.
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
static void fr_dlist_entry_init(fr_dlist_t *entry)
Initialise a linked list without metadata.
Head of a doubly linked list.
#define fr_event_timer_at(...)
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
int fr_heap_extract(fr_heap_t **hp, void *data)
Remove a node from the heap.
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
static bool fr_heap_entry_inserted(fr_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
rlm_rcode_t unlang_interpret(request_t *request)
Run the interpreter for a current request.
void unlang_interpret_set(request_t *request, unlang_interpret_t *intp)
Set a specific interpreter for a request.
int unlang_interpret_stack_depth(request_t *request)
Return the depth of the request's stack.
void unlang_interpret_set_thread_default(unlang_interpret_t *intp)
Set the default interpreter for this thread.
unlang_interpret_t * unlang_interpret_init(TALLOC_CTX *ctx, fr_event_list_t *el, unlang_request_func_t *funcs, void *uctx)
Initialize a unlang compiler / interpret.
void unlang_interpret_signal(request_t *request, fr_signal_t action)
Send a signal (usually stop) to a request.
bool unlang_interpret_is_resumable(request_t *request)
Check if a request as resumable.
External functions provided by the owner of the interpret.
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
void const * app_instance
void const * app_io_instance
I/O path configuration context.
CONF_SECTION * server_cs
CONF_SECTION of the server.
fr_app_io_t const * app_io
I/O path functions.
Minimal data structure to use the new code.
#define RPEDEBUG(fmt,...)
int fr_event_post_delete(fr_event_list_t *el, fr_event_timer_cb_t callback, void *uctx)
Delete a post-event callback from the event list.
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
bool fr_event_loop_exiting(fr_event_list_t *el)
Check to see whether the event loop is in the process of exiting.
Stores all information relating to an event list.
fr_packet_t * fr_packet_alloc(TALLOC_CTX *ctx, bool new_vector)
Allocate a new fr_packet_t.
int fr_message_done(fr_message_t *m)
Mark a message as done.
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
A Message set, composed of message headers and ring buffer data.
size_t rb_size
cache-aligned size in the ring buffer
fr_time_t when
when this message was sent
uint8_t * data
pointer to the data in the ring buffer
size_t data_size
size of the data in the ring buffer
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
static bool fr_minmax_heap_entry_inserted(fr_minmax_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
static const conf_parser_t config[]
static void send_reply(int sockfd, fr_channel_data_t *reply)
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
bool fr_rb_delete(fr_rb_tree_t *tree, void const *data)
Remove node and free data (if a free function was specified)
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
The main red black tree structure.
rlm_rcode_t
Return codes indicating the result of the module call.
int request_detach(request_t *child)
Unlink a subrequest from its parent.
#define REQUEST_VERIFY(_x)
#define request_is_detached(_x)
#define request_is_external(_x)
#define request_is_internal(_x)
#define request_is_detachable(_x)
#define request_alloc_external(_ctx, _args)
Allocate a new external request.
@ REQUEST_STOP_PROCESSING
Request has been signalled to stop.
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
@ FR_SIGNAL_DUP
A duplicate request was received.
@ FR_SIGNAL_CANCEL
Request has been cancelled.
#define fr_time()
Allow us to arbitrarily manipulate time.
#define atomic_fetch_add_explicit(object, operand, order)
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
static int talloc_const_free(void const *ptr)
Free const'd memory.
void fr_time_elapsed_update(fr_time_elapsed_t *elapsed, fr_time_t start, fr_time_t end)
void fr_time_elapsed_fprint(FILE *fp, fr_time_elapsed_t const *elapsed, char const *prefix, int tab_offset)
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
#define fr_time_delta_lt(_a, _b)
static fr_time_delta_t fr_time_delta_from_sec(int64_t sec)
#define fr_time_delta_wrap(_time)
#define fr_time_delta_ispos(_a)
#define fr_time_eq(_a, _b)
#define fr_time_add(_a, _b)
Add a time/time delta together.
#define fr_time_gt(_a, _b)
#define fr_time_sub(_a, _b)
Subtract one time from another.
static fr_time_delta_t fr_time_delta_div(fr_time_delta_t a, fr_time_delta_t b)
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
A time delta, a difference in time measured in nanoseconds.
@ FR_TIME_TRACKING_YIELDED
We're currently tracking time in the yielded state.
static void fr_time_tracking_yield(fr_time_tracking_t *tt, fr_time_t now)
Transition to the yielded state, recording the time we just spent running.
static void fr_time_tracking_end(fr_time_delta_t *predicted, fr_time_tracking_t *tt, fr_time_t now)
End time tracking for this entity.
fr_time_delta_t waiting_total
total time spent waiting
fr_time_delta_t running_total
total time spent running
static void fr_time_tracking_start(fr_time_tracking_t *parent, fr_time_tracking_t *tt, fr_time_t now)
Start time tracking for a tracked entity.
static void fr_time_tracking_resume(fr_time_tracking_t *tt, fr_time_t now)
Track that a request resumed.
static void fr_time_tracking_debug(fr_time_tracking_t *tt, FILE *fp)
Print debug information about the time tracking structure.
static fr_event_list_t * el
int unlang_thread_instantiate(TALLOC_CTX *ctx)
Create thread-specific data structures for unlang.
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
#define fr_strerror_const_push(_msg)
#define fr_strerror_const(_msg)
#define fr_box_time_delta(_val)
static int8_t worker_time_order_cmp(void const *one, void const *two)
Track a request_t in the "time_order" heap.
fr_heap_t * runnable
current runnable requests which we've spent time processing
static void worker_request_time_tracking_end(fr_worker_t *worker, request_t *request, fr_time_t now)
static void _worker_request_yield(request_t *request, UNUSED void *uctx)
Interpreter yielded request.
static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Detached request (i.e.
static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
Handle a control plane message sent to the worker via a channel.
fr_event_list_t * el
our event list
int fr_worker_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
Pre-event handler.
static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now)
Send a response packet to the network side.
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
fr_rb_tree_t * listeners
so we can cancel requests when a listener goes away
static void worker_run_request(fr_worker_t *worker, fr_time_t start)
Run a request.
static void worker_exit(fr_worker_t *worker)
fr_worker_t * fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config)
Create a worker.
bool was_sleeping
used to suppress multiple sleep signals in a row
static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
static void _worker_request_runnable(request_t *request, void *uctx)
Request is now runnable.
static char * itoa_internal(TALLOC_CTX *ctx, uint64_t number)
static int8_t worker_dedup_cmp(void const *one, void const *two)
Track a request_t in the "dedup" tree.
static void worker_stop_request(request_t **request_p)
Signal the unlang interpreter that it needs to stop running the request.
fr_worker_channel_t * channel
list of channels
char const * name
name of this worker
uint64_t num_active
number of active requests
fr_event_timer_t const * ev_cleanup
timer for max_request_time
fr_cmd_table_t cmd_worker_table[]
static _Thread_local fr_ring_buffer_t * fr_worker_rb
fr_minmax_heap_t * time_order
time ordered heap of requests
int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
static void worker_request_time_tracking_start(fr_worker_t *worker, request_t *request, fr_time_t now)
Start time tracking for a request, and mark it as runnable.
static void _worker_request_resume(request_t *request, UNUSED void *uctx)
Interpreter is starting to work on request again.
fr_rb_tree_t * dedup
de-dup tree
fr_atomic_queue_t * aq_control
atomic queue for control messages sent to me
static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Send a NAK to the network thread.
static void worker_request_name_number(request_t *request)
static void _worker_request_stop(request_t *request, void *uctx)
This is called by the interpreter when it wants to stop a request.
static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
fr_log_t const * log
log destination
fr_io_stats_t stats
input / output stats
#define CHECK_CONFIG(_x, _min, _max)
static void _worker_request_detach(request_t *request, void *uctx)
Make us responsible for running the request.
static int _fr_worker_rb_free(void *arg)
fr_time_tracking_t tracking
how much time the worker has spent doing things.
static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
External request is now complete.
void fr_worker_destroy(fr_worker_t *worker)
Destroy a worker.
uint64_t num_naks
number of messages which were nak'd
static void worker_request_init(fr_worker_t *worker, request_t *request, fr_time_t now)
Initialize various request fields needed by the worker.
fr_worker_config_t config
external configuration
fr_listen_t const * listener
incoming packets
unlang_interpret_t * intp
Worker's local interpreter.
fr_time_t checked_timeout
when we last checked the tails of the queues
static int fr_worker_listen_cancel_self(fr_worker_t *worker, fr_listen_t const *li)
bool exiting
are we exiting?
fr_log_lvl_t lvl
log level
static void worker_max_request_timer(fr_worker_t *worker)
See when we next need to service the time_order heap for "too old" packets.
static void worker_requests_cancel(fr_worker_channel_t *ch)
int num_channels
actual number of channels
static atomic_uint64_t request_number
fr_time_elapsed_t cpu_time
histogram of total CPU time per request
fr_rb_node_t node
in tree of listeners
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
void fr_worker_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Post-event handler.
fr_dlist_head_t dlist
of requests associated with this listener.
void fr_worker(fr_worker_t *worker)
The main loop and entry point of the stand-alone worker thread.
fr_time_delta_t predicted
How long we predict a request will take to execute.
pthread_t thread_id
my thread ID
static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Callback which handles a message being received on the worker side.
fr_time_elapsed_t wall_clock
histogram of wall clock time per request
static int8_t worker_listener_cmp(void const *one, void const *two)
static void worker_max_request_time(UNUSED fr_event_list_t *el, UNUSED fr_time_t when, void *uctx)
Enforce max_request_time.
static bool is_worker_thread(fr_worker_t const *worker)
static fr_ring_buffer_t * fr_worker_rb_init(void)
Initialise thread local storage.
fr_control_t * control
the control plane
static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
Check if a request is scheduled.
static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Internal request (i.e.
void fr_worker_debug(fr_worker_t *worker, FILE *fp)
Print debug information about the worker structure.
static void _worker_request_internal_init(request_t *request, void *uctx)
Internal request (i.e.
#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max)
static int8_t worker_runnable_cmp(void const *one, void const *two)
Track a request_t in the "runnable" heap.
static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
A socket is going away, so clean up any requests which use this socket.
A worker which takes packets from a master, and processes them.
int message_set_size
default start number of messages
int max_requests
max requests this worker will handle
int max_channels
maximum number of channels
int ring_buffer_size
default start size for the ring buffers
fr_time_delta_t max_request_time
maximum time a request can be processed