49RCSID(
"$Id: dc7b30f411ff6e99f3bca2dc1992f8b79969e50a $")
51#define LOG_PREFIX worker->name
52#define LOG_DST worker->log
54#include <freeradius-devel/io/channel.h>
55#include <freeradius-devel/io/listen.h>
56#include <freeradius-devel/io/message.h>
57#include <freeradius-devel/io/worker.h>
58#include <freeradius-devel/unlang/base.h>
59#include <freeradius-devel/unlang/call.h>
60#include <freeradius-devel/unlang/interpret.h>
61#include <freeradius-devel/server/request.h>
62#include <freeradius-devel/server/time_tracking.h>
63#include <freeradius-devel/util/dlist.h>
64#include <freeradius-devel/util/minmax_heap.h>
65#include <freeradius-devel/util/slab.h>
66#include <freeradius-devel/util/time.h>
67#include <freeradius-devel/util/timer.h>
73#define WORKER_VERIFY worker_verify(worker)
78#define CACHE_LINE_SIZE 64
190 fr_perror(
"Failed allocating memory for worker ring buffer");
201 return (pthread_equal(pthread_self(), worker->
thread_id) != 0);
257 bool ok, was_sleeping;
303 if (worker->
channel[i].
ch != NULL)
continue;
308 DEBUG3(
"Received channel %p into array entry %d", ch, i);
336 if (worker->
channel[i].
ch != ch)
continue;
343 "Network added messages to channel after sending FR_CHANNEL_CLOSE");
380 RDEBUG(
"Canceling request due to socket being closed");
405 memcpy(&li,
data,
sizeof(li));
478 reply->reply.request_time = cd->request.recv_time;
492 DEBUG2(
"Failed sending reply to channel");
540 REDEBUG(
"Request has reached max_request_time - signalling it to stop");
565 RERROR(
"Failed to create request timeout timer");
585 RERROR(
"Failed to set request timeout");
593 RDEBUG3(
"Time tracking started in yielded state");
606 RDEBUG3(
"Time tracking ended");
611 TALLOC_FREE(request->timeout);
636 size = request->async->listen->app_io->default_reply_size;
637 if (!size) size = request->async->listen->app_io->default_message_size;
643 ch = request->async->channel;
668 fr_listen_t const *listen = request->async->listen;
678 RPERROR(
"Failed encoding request");
688 fr_assert((
size_t) slen <= reply->m.rb_size);
699 reply->reply.processing_time = request->async->tracking.running_total;
700 reply->reply.request_time = request->async->recv_time;
702 reply->
listen = request->async->listen;
703 reply->
packet_ctx = request->async->packet_ctx;
711 RDEBUG(
"Finished request");
724 RPERROR(
"Failed sending reply to network thread");
735 request->async->el = NULL;
736 request->async->channel = NULL;
737 request->async->packet_ctx = NULL;
738 request->async->listen = NULL;
750 char const *numbers =
"0123456789";
756 *(p--) = numbers[number % 10];
760 if (p[1])
return talloc_strdup(ctx, p + 1);
762 return talloc_strdup(ctx,
"0");
768static inline CC_HINT(always_inline)
778 request->packet->timestamp = now;
779 request->async = talloc_zero(request,
fr_async_t);
780 request->async->recv_time = now;
781 request->async->el = worker->el;
785static inline CC_HINT(always_inline)
793static inline CC_HINT(always_inline)
822 ctx = request = request_slab_reserve(worker->
slab);
839 request_slab_release(request);
855 request->packet->timestamp = cd->request.recv_time;
860 request->async->channel = cd->channel.ch;
862 request->async->recv_time = cd->request.recv_time;
864 request->async->listen = listen;
866 request->async->priority = cd->
priority;
890 RERROR(
"Protocol failed to set 'process' function");
904 if (request->async->listen->track_duplicates) {
912 fr_assert(old->async->listen == request->async->listen);
913 fr_assert(old->async->channel == request->async->channel);
932 if (
fr_time_eq(old->async->recv_time, request->async->recv_time)) {
933 RWARN(
"Discarding duplicate of request (%"PRIu64
")", old->number);
936 request_slab_release(request);
958 RWARN(
"Got conflicting packet for request (%" PRIu64
"), telling old request to stop", old->number);
994 ret =
CMP(b->async->priority, a->async->priority);
995 if (ret != 0)
return ret;
997 ret =
CMP(a->async->sequence, b->async->sequence);
998 if (ret != 0)
return ret;
1000 return fr_time_cmp(a->async->recv_time, b->async->recv_time);
1011 ret =
CMP(a->async->listen, b->async->listen);
1012 if (ret)
return ret;
1014 return CMP(a->async->packet_ctx, b->async->packet_ctx);
1060 fr_assert_msg(0,
"Failed to force run the custom timeout list");
1066 DEBUG(
"Worker is exiting - stopped %u requests",
count);
1083 "Pending messages in channel after cancelling request");
1122 fr_assert(request->async->listen != NULL);
1128 if (request->async->listen->track_duplicates) {
1144 "Request %s bad log indentation - expected 0 got %u", request->name, request->log.indent.unlang);
1146 "Request %s is marked as yielded at end of processing", request->name);
1171 request_slab_release(request);
1176 request_slab_release(request);
1214 TALLOC_FREE(request->timeout);
1225 request_slab_release(request);
1243 RDEBUG3(
"Forcing time tracking to running state, from yielded, for request detach");
1250 RDEBUG3(
"Request is detached");
1267 RDEBUG3(
"Cleaning up request execution state");
1275 RDEBUG3(
"Forcing time tracking to running state, from yielded, for request stop");
1294 RDEBUG3(
"Request marked as runnable");
1395 worker->
name = talloc_strdup(worker,
name);
1401#define CHECK_CONFIG(_x, _min, _max) do { \
1402 if (!worker->config._x) worker->config._x = _min; \
1403 if (worker->config._x < _min) worker->config._x = _min; \
1404 if (worker->config._x > _max) worker->config._x = _max; \
1407#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max) do { \
1408 if (fr_time_delta_lt(worker->config._x, _min)) worker->config._x = _min; \
1409 if (fr_time_delta_gt(worker->config._x, _max)) worker->config._x = _max; \
1427 worker->
log = logger;
1480 if (!worker->
dedup) {
1517 if (!(worker->
slab = request_slab_list_alloc(worker,
el, &worker->
config.
reuse, NULL, NULL,
1518 UNCONST(
void *, worker),
true,
false))) {
1543 bool wait_for_event;
1553 if (wait_for_event) {
1556 DEBUG4(
"Ready to process requests");
1563 DEBUG4(
"Gathering events - %s", wait_for_event ?
"will wait" :
"Will not wait");
1565 if (num_events < 0) {
1567 DEBUG4(
"Event loop exiting");
1571 PERROR(
"Failed retrieving events");
1575 DEBUG4(
"%u event(s) pending", num_events);
1580 if (num_events > 0) {
1581 DEBUG4(
"Servicing event(s)");
1602 if (!request)
return 0;
1633 fprintf(fp,
"\tnum_channels = %d\n", worker->
num_channels);
1634 fprintf(fp,
"\tstats.in = %" PRIu64
"\n", worker->
stats.
in);
1636 fprintf(fp,
"\tcalculated (predicted) total CPU time = %" PRIu64
"\n",
1638 fprintf(fp,
"\tcalculated (counted) per request time = %" PRIu64
"\n",
1662 id = pthread_self();
1663 same = (pthread_equal(
id, worker->
thread_id) != 0);
1666 if (!ch)
return NULL;
1698#ifdef WITH_VERIFY_PTR
1707 (void) talloc_get_type_abort(worker,
fr_worker_t);
1732 if (num < 0)
return -1;
1733 if (num == 0)
return 0;
1736 if (num >= 2) stats[1] = worker->
stats.
out;
1737 if (num >= 3) stats[2] = worker->
stats.
dup;
1739 if (num >= 5) stats[4] = worker->
num_naks;
1742 if (num <= 6)
return num;
1752 if ((info->
argc == 0) || (strcmp(info->
argv[0],
"count") == 0)) {
1753 fprintf(fp,
"count.in\t\t\t%" PRIu64
"\n", worker->
stats.
in);
1754 fprintf(fp,
"count.out\t\t\t%" PRIu64
"\n", worker->
stats.
out);
1755 fprintf(fp,
"count.dup\t\t\t%" PRIu64
"\n", worker->
stats.
dup);
1756 fprintf(fp,
"count.dropped\t\t\t%" PRIu64
"\n", worker->
stats.
dropped);
1757 fprintf(fp,
"count.naks\t\t\t%" PRIu64
"\n", worker->
num_naks);
1758 fprintf(fp,
"count.active\t\t\t%" PRIu64
"\n", worker->
num_active);
1762 if ((info->
argc == 0) || (strcmp(info->
argv[0],
"cpu") == 0)) {
1787 .help =
"Statistics for workers threads.",
1792 .parent =
"stats worker",
1795 .syntax =
"[(count|cpu)]",
1797 .help =
"Show statistics for a specific worker thread.",
static int const char char buffer[256]
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
size_t default_reply_size
same for replies
size_t default_message_size
Usually maximum message size.
fr_io_nak_t nak
Function to send a NAK.
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
#define fr_atexit_thread_local(_name, _free, _uctx)
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Structure to hold the atomic queue.
#define UNCONST(_type, _ptr)
Remove const qualification from a pointer.
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
unlang_action_t unlang_call_push(request_t *request, CONF_SECTION *server_cs, bool top_frame)
Push a call frame onto the stack.
fr_table_num_sorted_t const channel_signals[]
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
int fr_channel_set_recv_request(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_request)
void * fr_channel_responder_uctx_get(fr_channel_t *ch)
Get responder-specific data from a channel.
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
int fr_channel_null_reply(fr_channel_t *ch)
Don't send a reply message into the channel.
void fr_channel_responder_uctx_add(fr_channel_t *ch, void *uctx)
Add responder-specific data to a channel.
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
bool fr_channel_active(fr_channel_t *ch)
Check if a channel is active.
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
A full channel, which consists of two ends.
fr_message_t m
the message header
@ FR_CHANNEL_DATA_READY_REQUESTOR
@ FR_CHANNEL_DATA_READY_RESPONDER
void * packet_ctx
Packet specific context for holding client information, and other proto_* specific information that n...
fr_listen_t * listen
for tracking packet transport, etc.
uint32_t priority
Priority of this packet.
Channel information which is added to a message.
int argc
current argument count
char const * parent
e.g. "show module"
char const ** argv
text version of commands
#define FR_CONTROL_ID_LISTEN_DEAD
#define FR_CONTROL_ID_CHANNEL
#define FR_CONTROL_ID_LISTEN
#define FR_CONTROL_MAX_SIZE
#define FR_CONTROL_MAX_MESSAGES
static fr_ring_buffer_t * rb
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
static void fr_dlist_entry_unlink(fr_dlist_t *entry)
Remove an item from the dlist when we don't have access to the head.
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
static void fr_dlist_entry_init(fr_dlist_t *entry)
Initialise a linked list without metadata.
Head of a doubly linked list.
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
int fr_heap_extract(fr_heap_t **hp, void *data)
Remove a node from the heap.
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
static bool fr_heap_entry_inserted(fr_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
rlm_rcode_t unlang_interpret(request_t *request, bool running)
Run the interpreter for a current request.
void unlang_interpret_set(request_t *request, unlang_interpret_t *intp)
Set a specific interpreter for a request.
int unlang_interpret_stack_depth(request_t *request)
Return the depth of the request's stack.
void unlang_interpret_set_thread_default(unlang_interpret_t *intp)
Set the default interpreter for this thread.
unlang_interpret_t * unlang_interpret_init(TALLOC_CTX *ctx, fr_event_list_t *el, unlang_request_func_t *funcs, void *uctx)
Initialize a unlang compiler / interpret.
void unlang_interpret_signal(request_t *request, fr_signal_t action)
Send a signal (usually stop) to a request.
bool unlang_interpret_is_resumable(request_t *request)
Check if a request as resumable.
#define UNLANG_REQUEST_RESUME
External functions provided by the owner of the interpret.
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
void const * app_instance
void const * app_io_instance
I/O path configuration context.
CONF_SECTION * server_cs
CONF_SECTION of the server.
fr_dict_t const * dict
dictionary for this listener
fr_app_io_t const * app_io
I/O path functions.
Minimal data structure to use the new code.
#define RPEDEBUG(fmt,...)
#define RATE_LIMIT_GLOBAL(_log, _fmt,...)
Rate limit messages using a global limiting entry.
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
int fr_event_post_delete(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Delete a post-event callback from the event list.
bool fr_event_loop_exiting(fr_event_list_t *el)
Check to see whether the event loop is in the process of exiting.
Stores all information relating to an event list.
fr_packet_t * fr_packet_alloc(TALLOC_CTX *ctx, bool new_vector)
Allocate a new fr_packet_t.
int fr_message_done(fr_message_t *m)
Mark a message as done.
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
A Message set, composed of message headers and ring buffer data.
size_t rb_size
cache-aligned size in the ring buffer
fr_time_t when
when this message was sent
uint8_t * data
pointer to the data in the ring buffer
size_t data_size
size of the data in the ring buffer
static const conf_parser_t config[]
static void send_reply(int sockfd, fr_channel_data_t *reply)
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
bool fr_rb_delete(fr_rb_tree_t *tree, void const *data)
Remove node and free data (if a free function was specified)
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
The main red black tree structure.
rlm_rcode_t
Return codes indicating the result of the module call.
int request_slab_deinit(request_t *request)
Callback for slabs to deinitialise the request.
int request_detach(request_t *child)
Unlink a subrequest from its parent.
#define REQUEST_VERIFY(_x)
#define request_is_detached(_x)
#define request_is_external(_x)
#define REQUEST_POOL_HEADERS
#define request_is_internal(_x)
@ REQUEST_TYPE_EXTERNAL
A request received on the wire.
#define request_is_detachable(_x)
#define request_init(_ctx, _type, _args)
#define REQUEST_POOL_SIZE
@ REQUEST_STOP_PROCESSING
Request has been signalled to stop.
Optional arguments for initialising requests.
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
@ FR_SIGNAL_DUP
A duplicate request was received.
@ FR_SIGNAL_CANCEL
Request has been cancelled.
#define FR_SLAB_FUNCS(_name, _type)
Define type specific wrapper functions for slabs and slab elements.
#define FR_SLAB_TYPES(_name, _type)
Define type specific wrapper structs for slabs and slab elements.
unsigned int num_children
How many child allocations are expected off each element.
size_t child_pool_size
Size of pool space to be allocated to each element.
#define fr_time()
Allow us to arbitrarily manipulate time.
#define atomic_fetch_add_explicit(object, operand, order)
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
static int talloc_const_free(void const *ptr)
Free const'd memory.
void fr_time_elapsed_update(fr_time_elapsed_t *elapsed, fr_time_t start, fr_time_t end)
void fr_time_elapsed_fprint(FILE *fp, fr_time_elapsed_t const *elapsed, char const *prefix, int tab_offset)
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
#define fr_time_delta_lt(_a, _b)
static fr_time_delta_t fr_time_delta_from_sec(int64_t sec)
#define fr_time_delta_wrap(_time)
#define fr_time_delta_ispos(_a)
#define fr_time_eq(_a, _b)
#define fr_time_delta_eq(_a, _b)
#define fr_time_add(_a, _b)
Add a time/time delta together.
#define fr_time_sub(_a, _b)
Subtract one time from another.
static fr_time_delta_t fr_time_delta_div(fr_time_delta_t a, fr_time_delta_t b)
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
A time delta, a difference in time measured in nanoseconds.
@ FR_TIME_TRACKING_YIELDED
We're currently tracking time in the yielded state.
static void fr_time_tracking_yield(fr_time_tracking_t *tt, fr_time_t now)
Transition to the yielded state, recording the time we just spent running.
static void fr_time_tracking_end(fr_time_delta_t *predicted, fr_time_tracking_t *tt, fr_time_t now)
End time tracking for this entity.
fr_time_delta_t waiting_total
total time spent waiting
fr_time_delta_t running_total
total time spent running
static void fr_time_tracking_start(fr_time_tracking_t *parent, fr_time_tracking_t *tt, fr_time_t now)
Start time tracking for a tracked entity.
static void fr_time_tracking_resume(fr_time_tracking_t *tt, fr_time_t now)
Track that a request resumed.
static void fr_time_tracking_debug(fr_time_tracking_t *tt, FILE *fp)
Print debug information about the time tracking structure.
uint64_t fr_timer_list_num_events(fr_timer_list_t *tl)
Return number of pending events.
fr_timer_list_t * fr_timer_list_ordered_alloc(TALLOC_CTX *ctx, fr_timer_list_t *parent)
Allocate a new sorted event timer list.
int fr_timer_list_force_run(fr_timer_list_t *tl)
Get the head of the timer list, the event may not be ready to fire.
fr_timer_list_t * fr_timer_list_lst_alloc(TALLOC_CTX *ctx, fr_timer_list_t *parent)
Allocate a new lst based timer list.
#define fr_timer_armed(_ev)
static fr_event_list_t * el
int unlang_thread_instantiate(TALLOC_CTX *ctx)
Create thread-specific data structures for unlang.
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
#define fr_strerror_const_push(_msg)
#define fr_strerror_const(_msg)
fr_heap_t * runnable
current runnable requests which we've spent time processing
static void worker_request_time_tracking_end(fr_worker_t *worker, request_t *request, fr_time_t now)
static void _worker_request_yield(request_t *request, UNUSED void *uctx)
Interpreter yielded request.
static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
Handle a control plane message sent to the worker via a channel.
fr_event_list_t * el
our event list
int fr_worker_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
Pre-event handler.
static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now)
Send a response packet to the network side.
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
fr_rb_tree_t * listeners
so we can cancel requests when a listener goes away
static void worker_run_request(fr_worker_t *worker, fr_time_t start)
Run a request.
static void worker_exit(fr_worker_t *worker)
bool was_sleeping
used to suppress multiple sleep signals in a row
static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
static void _worker_request_runnable(request_t *request, void *uctx)
Request is now runnable.
static char * itoa_internal(TALLOC_CTX *ctx, uint64_t number)
fr_worker_t * fr_worker_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config)
Create a worker.
static int8_t worker_dedup_cmp(void const *one, void const *two)
Track a request_t in the "dedup" tree.
static void worker_stop_request(request_t **request_p)
Signal the unlang interpreter that it needs to stop running the request.
fr_worker_channel_t * channel
list of channels
char const * name
name of this worker
uint64_t num_active
number of active requests
fr_cmd_table_t cmd_worker_table[]
static _Thread_local fr_ring_buffer_t * fr_worker_rb
static int worker_request_time_tracking_start(fr_worker_t *worker, request_t *request, fr_time_t now)
Start time tracking for a request, and mark it as runnable.
int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
static int _worker_request_deinit(request_t *request, UNUSED void *uctx)
static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
Detached request (i.e.
static void _worker_request_resume(request_t *request, UNUSED void *uctx)
Interpreter is starting to work on request again.
fr_rb_tree_t * dedup
de-dup tree
fr_atomic_queue_t * aq_control
atomic queue for control messages sent to me
static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Send a NAK to the network thread.
static void worker_request_name_number(request_t *request)
static void _worker_request_stop(request_t *request, void *uctx)
This is called by the interpreter when it wants to stop a request.
static void _worker_request_timeout(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t when, void *uctx)
Enforce max_request_time.
static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
fr_log_t const * log
log destination
fr_io_stats_t stats
input / output stats
#define CHECK_CONFIG(_x, _min, _max)
static void _worker_request_detach(request_t *request, void *uctx)
Make us responsible for running the request.
static int _fr_worker_rb_free(void *arg)
fr_time_tracking_t tracking
how much time the worker has spent doing things.
static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
External request is now complete.
void fr_worker_destroy(fr_worker_t *worker)
Destroy a worker.
uint64_t num_naks
number of messages which were nak'd
static void worker_request_init(fr_worker_t *worker, request_t *request, fr_time_t now)
Initialize various request fields needed by the worker.
fr_worker_config_t config
external configuration
fr_listen_t const * listener
incoming packets
unlang_interpret_t * intp
Worker's local interpreter.
static int fr_worker_listen_cancel_self(fr_worker_t *worker, fr_listen_t const *li)
bool exiting
are we exiting?
fr_log_lvl_t lvl
log level
static void worker_requests_cancel(fr_worker_channel_t *ch)
int num_channels
actual number of channels
fr_time_delta_t max_request_time
maximum time a request can be processed
static atomic_uint64_t request_number
fr_time_elapsed_t cpu_time
histogram of total CPU time per request
fr_rb_node_t node
in tree of listeners
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
int fr_worker_request_timeout_set(fr_worker_t *worker, request_t *request, fr_time_delta_t timeout)
Set, or re-set the request timer.
void fr_worker_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Post-event handler.
fr_dlist_head_t dlist
of requests associated with this listener.
void fr_worker(fr_worker_t *worker)
The main loop and entry point of the stand-alone worker thread.
request_slab_list_t * slab
slab allocator for request_t
static uint32_t worker_num_requests(fr_worker_t *worker)
fr_time_delta_t predicted
How long we predict a request will take to execute.
pthread_t thread_id
my thread ID
static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Callback which handles a message being received on the worker side.
fr_time_elapsed_t wall_clock
histogram of wall clock time per request
static int8_t worker_listener_cmp(void const *one, void const *two)
static bool is_worker_thread(fr_worker_t const *worker)
fr_timer_list_t * timeout
Track when requests timeout using a dlist.
static fr_ring_buffer_t * fr_worker_rb_init(void)
Initialise thread local storage.
fr_control_t * control
the control plane
static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
Check if a request is scheduled.
static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Internal request (i.e.
fr_timer_list_t * timeout_custom
Track when requests timeout using an lst.
void fr_worker_debug(fr_worker_t *worker, FILE *fp)
Print debug information about the worker structure.
static void _worker_request_internal_init(request_t *request, void *uctx)
Internal request (i.e.
#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max)
static int8_t worker_runnable_cmp(void const *one, void const *two)
Track a request_t in the "runnable" heap.
static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
A socket is going away, so clean up any requests which use this socket.
A worker which takes packets from a master, and processes them.
int message_set_size
default start number of messages
int max_requests
max requests this worker will handle
int max_channels
maximum number of channels
fr_slab_config_t reuse
slab allocator configuration
int ring_buffer_size
default start size for the ring buffers
fr_time_delta_t max_request_time
maximum time a request can be processed