![]() |
The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
|
Worker thread functions. More...
#include <freeradius-devel/io/channel.h>
#include <freeradius-devel/io/listen.h>
#include <freeradius-devel/io/message.h>
#include <freeradius-devel/io/worker.h>
#include <freeradius-devel/unlang/base.h>
#include <freeradius-devel/unlang/call.h>
#include <freeradius-devel/unlang/interpret.h>
#include <freeradius-devel/server/request.h>
#include <freeradius-devel/server/time_tracking.h>
#include <freeradius-devel/util/dlist.h>
#include <freeradius-devel/util/minmax_heap.h>
#include <freeradius-devel/util/slab.h>
#include <freeradius-devel/util/time.h>
#include <freeradius-devel/util/timer.h>
#include <stdalign.h>
Go to the source code of this file.
Data Structures | |
struct | fr_worker_channel_t |
struct | fr_worker_listen_t |
struct | fr_worker_s |
A worker which takes packets from a master, and processes them. More... | |
Macros | |
#define | CACHE_LINE_SIZE 64 |
#define | CHECK_CONFIG(_x, _min, _max) |
#define | CHECK_CONFIG_TIME_DELTA(_x, _min, _max) |
#define | LOG_DST worker->log |
#define | LOG_PREFIX worker->name |
#define | WORKER_VERIFY |
Functions | |
static int | _fr_worker_rb_free (void *arg) |
static int | _worker_request_deinit (request_t *request, UNUSED void *uctx) |
static void | _worker_request_detach (request_t *request, void *uctx) |
Make us responsible for running the request. | |
static void | _worker_request_done_detached (request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx) |
Detached request (i.e. | |
static void | _worker_request_done_external (request_t *request, UNUSED rlm_rcode_t rcode, void *uctx) |
External request is now complete. | |
static void | _worker_request_done_internal (request_t *request, UNUSED rlm_rcode_t rcode, void *uctx) |
Internal request (i.e. | |
static void | _worker_request_internal_init (request_t *request, void *uctx) |
Internal request (i.e. | |
static void | _worker_request_prioritise (request_t *request, void *uctx) |
Update a request's priority. | |
static void | _worker_request_resume (request_t *request, UNUSED void *uctx) |
Interpreter is starting to work on request again. | |
static void | _worker_request_runnable (request_t *request, void *uctx) |
Request is now runnable. | |
static bool | _worker_request_scheduled (request_t const *request, UNUSED void *uctx) |
Check if a request is scheduled. | |
static void | _worker_request_timeout (UNUSED fr_timer_list_t *tl, UNUSED fr_time_t when, void *uctx) |
Enforce max_request_time. | |
static void | _worker_request_yield (request_t *request, UNUSED void *uctx) |
Interpreter yielded request. | |
static int | cmd_stats_worker (FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info) |
FR_SLAB_FUNCS (request, request_t) | |
FR_SLAB_TYPES (request, request_t) | |
void | fr_worker (fr_worker_t *worker) |
The main loop and entry point of the stand-alone worker thread. | |
fr_worker_t * | fr_worker_alloc (TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config) |
Create a worker. | |
fr_channel_t * | fr_worker_channel_create (fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master) |
Create a channel to the worker. | |
void | fr_worker_debug (fr_worker_t *worker, FILE *fp) |
Print debug information about the worker structure. | |
void | fr_worker_destroy (fr_worker_t *worker) |
Destroy a worker. | |
int | fr_worker_listen_cancel (fr_worker_t *worker, fr_listen_t const *li) |
static int | fr_worker_listen_cancel_self (fr_worker_t *worker, fr_listen_t const *li) |
void | fr_worker_post_event (UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx) |
Post-event handler. | |
int | fr_worker_pre_event (UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx) |
Pre-event handler. | |
static fr_ring_buffer_t * | fr_worker_rb_init (void) |
Initialise thread local storage. | |
int | fr_worker_request_timeout_set (fr_worker_t *worker, request_t *request, fr_time_delta_t timeout) |
Set, or re-set the request timer. | |
int | fr_worker_stats (fr_worker_t const *worker, int num, uint64_t *stats) |
static bool | is_worker_thread (fr_worker_t const *worker) |
static char * | itoa_internal (TALLOC_CTX *ctx, uint64_t number) |
static void | worker_channel_callback (void *ctx, void const *data, size_t data_size, fr_time_t now) |
Handle a control plane message sent to the worker via a channel. | |
static int8_t | worker_dedup_cmp (void const *one, void const *two) |
Track a request_t in the "dedup" tree. | |
static void | worker_exit (fr_worker_t *worker) |
static void | worker_listen_cancel_callback (void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now) |
A socket is going away, so clean up any requests which use this socket. | |
static int8_t | worker_listener_cmp (void const *one, void const *two) |
static void | worker_nak (fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now) |
Send a NAK to the network thread. | |
static uint32_t | worker_num_requests (fr_worker_t *worker) |
static void | worker_recv_request (void *ctx, fr_channel_t *ch, fr_channel_data_t *cd) |
Callback which handles a message being received on the worker side. | |
static void | worker_request_bootstrap (fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now) |
static void | worker_request_init (fr_worker_t *worker, request_t *request, fr_time_t now) |
Initialize various request fields needed by the worker. | |
static void | worker_request_name_number (request_t *request) |
static void | worker_request_time_tracking_end (fr_worker_t *worker, request_t *request, fr_time_t now) |
static int | worker_request_time_tracking_start (fr_worker_t *worker, request_t *request, fr_time_t now) |
Start time tracking for a request, and mark it as runnable. | |
static void | worker_requests_cancel (fr_worker_channel_t *ch) |
static void | worker_run_request (fr_worker_t *worker, fr_time_t start) |
Run a request. | |
static int8_t | worker_runnable_cmp (void const *one, void const *two) |
Track a request_t in the "runnable" heap. | |
static void | worker_send_reply (fr_worker_t *worker, request_t *request, bool send_reply, fr_time_t now) |
Send a response packet to the network side. | |
static void | worker_stop_request (request_t **request_p) |
Signal the unlang interpreter that it needs to stop running the request. | |
Variables | |
fr_cmd_table_t | cmd_worker_table [] |
static _Thread_local fr_ring_buffer_t * | fr_worker_rb |
static atomic_uint64_t | request_number = 0 |
Worker thread functions.
The "worker" thread is the one responsible for the bulk of the work done when processing a request. Workers are spawned by the scheduler, and create a kqueue (KQ) and control-plane Atomic Queue (AQ) for control-plane communication.
When a network thread discovers that it needs more workers, it asks the scheduler for a KQ/AQ combination. The network thread then creates a channel dedicated to that worker, and sends the channel to the worker in a "new channel" message. The worker receives the channel, and sends an ACK back to the network thread.
The network thread then sends the worker new packets, which the worker receives and processes.
When a packet is decoded, it is put into the "runnable" heap, and also into the timeout sublist. The main loop fr_worker() then pulls new requests off of this heap and runs them. The main event loop checks the head of the timeout sublist, and forcefully terminates any requests which have been running for too long.
If a request is yielded, it is placed onto the yielded list in the worker "tracking" data structure.
Definition in file worker.c.
struct fr_worker_channel_t |
struct fr_worker_listen_t |
Data Fields | ||
---|---|---|
fr_dlist_head_t | dlist | of requests associated with this listener. |
fr_listen_t const * | listener | incoming packets |
fr_rb_node_t | node | in tree of listeners |
struct fr_worker_s |
A worker which takes packets from a master, and processes them.
Data Fields | ||
---|---|---|
fr_atomic_queue_t * | aq_control | atomic queue for control messages sent to me |
fr_worker_channel_t * | channel | list of channels |
fr_worker_config_t | config | external configuration |
fr_control_t * | control | the control plane |
fr_time_elapsed_t | cpu_time | histogram of total CPU time per request |
fr_rb_tree_t * | dedup | de-dup tree |
fr_event_list_t * | el | our event list |
bool | exiting | are we exiting? |
unlang_interpret_t * | intp | Worker's local interpreter. |
fr_rb_tree_t * | listeners | so we can cancel requests when a listener goes away |
fr_log_t const * | log | log destination |
fr_log_lvl_t | lvl | log level |
fr_time_delta_t | max_request_time | maximum time a request can be processed |
char const * | name | name of this worker |
uint64_t | num_active | number of active requests |
int | num_channels | actual number of channels |
uint64_t | num_naks | number of messages which were nak'd |
fr_time_delta_t | predicted | How long we predict a request will take to execute. |
fr_heap_t * | runnable | current runnable requests which we've spent time processing |
request_slab_list_t * | slab | slab allocator for request_t |
fr_io_stats_t | stats | input / output stats |
pthread_t | thread_id | my thread ID |
fr_timer_list_t * | timeout | Track when requests timeout using a dlist. |
fr_timer_list_t * | timeout_custom |
Track when requests timeout using an lst. requests must always be in one of these lists. |
fr_time_tracking_t | tracking | how much time the worker has spent doing things. |
fr_time_elapsed_t | wall_clock | histogram of wall clock time per request |
bool | was_sleeping | used to suppress multiple sleep signals in a row |
#define CHECK_CONFIG | ( | _x, | |
_min, | |||
_max | |||
) |
#define CHECK_CONFIG_TIME_DELTA | ( | _x, | |
_min, | |||
_max | |||
) |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
|
static |
Enforce max_request_time.
Run periodically, and tries to clean up requests which were received by the network thread more than max_request_time seconds ago. In the interest of not adding a timer for every packet, the requests are given a 1 second leeway.
[in] | tl | the worker's timer list. |
[in] | when | the current time |
[in] | uctx | the request_t timing out. |
Definition at line 533 of file worker.c.
|
static |
FR_SLAB_FUNCS | ( | request | , |
request_t | |||
) |
FR_SLAB_TYPES | ( | request | , |
request_t | |||
) |
void fr_worker | ( | fr_worker_t * | worker | ) |
The main loop and entry point of the stand-alone worker thread.
Where there is only one thread, the event loop runs fr_worker_pre_event() and fr_worker_post_event() instead, And then fr_worker_post_event() takes care of calling worker_run_request() to actually run the request.
[in] | worker | the worker data structure to manage |
Definition at line 1526 of file worker.c.
fr_worker_t * fr_worker_alloc | ( | TALLOC_CTX * | ctx, |
fr_event_list_t * | el, | ||
char const * | name, | ||
fr_log_t const * | logger, | ||
fr_log_lvl_t | lvl, | ||
fr_worker_config_t * | config | ||
) |
Create a worker.
[in] | ctx | the talloc context |
[in] | name | the name of this worker |
[in] | el | the event list |
[in] | logger | the destination for all logging messages |
[in] | lvl | log level |
[in] | config | various configuration parameters |
Definition at line 1371 of file worker.c.
fr_channel_t * fr_worker_channel_create | ( | fr_worker_t * | worker, |
TALLOC_CTX * | ctx, | ||
fr_control_t * | master | ||
) |
Create a channel to the worker.
Called by the master (i.e. network) thread when it needs to create a new channel to a particuler worker.
[in] | worker | the worker |
[in] | master | the control plane of the master |
[in] | ctx | the context in which the channel will be created |
Definition at line 1642 of file worker.c.
void fr_worker_debug | ( | fr_worker_t * | worker, |
FILE * | fp | ||
) |
void fr_worker_destroy | ( | fr_worker_t * | worker | ) |
Destroy a worker.
The input channels are signaled, and local messages are cleaned up.
This should be called to EXPLICITLY destroy a worker, when some fatal error has occurred on the worker side, and we need to destroy it.
We signal all pending requests in the backlog to stop, and tell the network side that it should not send us any more requests.
[in] | worker | the worker to destroy. |
Definition at line 1029 of file worker.c.
int fr_worker_listen_cancel | ( | fr_worker_t * | worker, |
fr_listen_t const * | li | ||
) |
|
static |
void fr_worker_post_event | ( | UNUSED fr_event_list_t * | el, |
UNUSED fr_time_t | now, | ||
void * | uctx | ||
) |
int fr_worker_pre_event | ( | UNUSED fr_time_t | now, |
UNUSED fr_time_delta_t | wake, | ||
void * | uctx | ||
) |
|
inlinestatic |
int fr_worker_request_timeout_set | ( | fr_worker_t * | worker, |
request_t * | request, | ||
fr_time_delta_t | timeout | ||
) |
Set, or re-set the request timer.
Automatically moves requests between the timer lists (timeout, custom_timeout).
Can be used to set the initial timeout, or extend the timeout of a request.
[in] | worker | the worker containing the timeout lists. |
[in] | request | that we're timing out. |
[in] | timeout | the timeout to set. |
Definition at line 557 of file worker.c.
int fr_worker_stats | ( | fr_worker_t const * | worker, |
int | num, | ||
uint64_t * | stats | ||
) |
|
inlinestatic |
|
static |
|
static |
|
static |
|
static |
A socket is going away, so clean up any requests which use this socket.
[in] | ctx | the worker |
[in] | data | the message |
[in] | data_size | size of the data |
[in] | now | the current time |
Definition at line 398 of file worker.c.
|
static |
|
static |
Send a NAK to the network thread.
The network thread believes that a worker is running a request until that request has been NAK'd. We typically NAK requests when they've been hanging around in the worker's backlog too long, or there was an error executing the request.
[in] | worker | the worker |
[in] | cd | the message to NAK |
[in] | now | when the message is NAKd |
Definition at line 420 of file worker.c.
|
inlinestatic |
|
static |
|
static |
|
inlinestatic |
|
inlinestatic |
|
static |
|
static |
|
static |
|
inlinestatic |
Run a request.
Until it either yields, or is done.
This function is also responsible for sending replies, and cleaning up the request.
[in] | worker | the worker |
[in] | start | the current time |
Definition at line 1322 of file worker.c.
|
static |
|
static |
Send a response packet to the network side.
[in] | worker | This worker. |
[in] | request | we're sending a reply for. |
[in] | send_reply | whether the network side sends a reply |
[in] | now | The current time |
Definition at line 621 of file worker.c.
|
static |
Signal the unlang interpreter that it needs to stop running the request.
Signalling is a synchronous operation. Whatever I/O requests the request is currently performing are immediately cancelled, and all the frames are popped off the unlang stack.
Modules and unlang keywords explicitly register signal handlers to deal with their yield points being cancelled/interrupted via this function.
The caller should assume the request is no longer viable after calling this function.
[in] | request_p | Pointer to the request to cancel. Will be set to NULL. |
Definition at line 513 of file worker.c.
fr_cmd_table_t cmd_worker_table[] |
|
static |
|
static |