23 RCSID(
"$Id: 3a4c481cc9774cdda47a145cdcf94007bd4acdf0 $")
25 #include <freeradius-devel/io/control.h>
26 #include <freeradius-devel/io/listen.h>
27 #include <freeradius-devel/io/worker.h>
28 #include <freeradius-devel/util/debug.h>
29 #include <freeradius-devel/util/syserror.h>
30 #include <freeradius-devel/util/talloc.h>
39 #include <sys/event.h>
41 #define MAX_MESSAGES (2048)
42 #define MAX_CONTROL_PLANE (1024)
43 #define MAX_KEVENTS (10)
44 #define MAX_WORKERS (1024)
46 #define MPRINT1 if (debug_lvl) printf
47 #define MPRINT2 if (debug_lvl > 1) printf
84 fprintf(stderr,
"usage: worker_test [OPTS]\n");
85 fprintf(stderr,
" -c <control-plane> Size of the control plane queue.\n");
86 fprintf(stderr,
" -m <messages> Send number of messages.\n");
87 fprintf(stderr,
" -o <outstanding> Keep number of messages outstanding.\n");
88 fprintf(stderr,
" -q quiet - suppresses worker stats.\n");
89 fprintf(stderr,
" -t Touch memory for fake packets.\n");
90 fprintf(stderr,
" -w N Create N workers. Default is 1.\n");
91 fprintf(stderr,
" -x Debugging mode.\n");
98 MPRINT1(
"\t\tPROCESS --- request %"PRIu64
" action %d\n", request->number, action);
114 MPRINT1(
"\t\tDECODE <<< request %"PRIu64
" - %p data %p size %zd\n", request->number,
115 request->async->packet_ctx,
data, data_len);
121 MPRINT1(
"\t\tENCODE >>> request %"PRIu64
" - data %p %p size %zd\n", request->number,
122 instance,
data, data_len);
137 MPRINT1(
"\t\tNAK !!! request %"PRIu64
" - data %p %p size %zd\n", (uint64_t)
number, packet_ctx,
packet, packet_len);
143 .name =
"worker-test",
160 MPRINT1(
"\tWorker %d started.\n", sw->
id);
166 fprintf(stderr,
"worker_test: Failed to create the event list\n");
173 fprintf(stderr,
"worker_test: Failed to create the worker\n");
177 MPRINT1(
"\tWorker %d looping.\n", sw->
id);
181 MPRINT1(
"\tWorker %d exiting.\n", sw->
id);
190 bool running, signaled_close;
191 int rcode, i, num_events, which_worker;
192 int num_outstanding, num_messages;
207 fprintf(stderr,
"Failed creating message set\n");
216 (void) pthread_attr_init(&attr);
217 (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
232 if (!
workers[i].worker)
continue;
233 if (
workers[i].ch != NULL)
continue;
249 MPRINT1(
"Master created all channels.\n");
254 num_replies = num_outstanding = num_messages = 0;
258 signaled_close =
false;
269 MPRINT1(
"Master DONE sending\n");
277 MPRINT1(
"Master sending %d messages\n", num_to_send);
279 for (i = 0; i < num_to_send; i++) {
301 memcpy(cd->
m.
data, &num_messages,
sizeof(num_messages));
303 MPRINT1(
"Master sent message %d to worker %d\n", num_messages, which_worker);
306 fprintf(stderr,
"Failed sending request: %s\n",
fr_syserror(errno));
315 MPRINT1(
"Master got reply %d, outstanding=%d, %d/%d sent.\n",
316 num_replies, num_outstanding, num_messages,
max_messages);
325 if (!signaled_close && (num_messages >=
max_messages) && (num_outstanding == 0)) {
326 MPRINT1(
"Master signaling workers to exit.\n");
330 printf(
"Worker %d\n", i);
337 fprintf(stderr,
"Failed signaling close %d: %s\n", i,
fr_syserror(errno));
341 signaled_close =
true;
344 MPRINT1(
"Master waiting on events.\n");
348 MPRINT1(
"Master kevent returned %d\n", num_events);
350 if (num_events < 0) {
351 if (errno == EINTR)
continue;
353 fprintf(stderr,
"Failed waiting for kevent: %s\n",
fr_syserror(errno));
357 if (num_events == 0)
continue;
364 for (i = 0; i < num_events; i++) {
370 MPRINT1(
"Master servicing control-plane\n");
378 if (!data_size)
break;
383 MPRINT1(
"Master got channel event %d\n", ce);
387 MPRINT1(
"Master got data ready signal\n");
391 MPRINT1(
"Master SIGNAL WITH NO DATA!\n");
398 MPRINT1(
"Master got reply %d, outstanding=%d, %d/%d sent.\n",
399 num_replies, num_outstanding, num_messages,
max_messages);
405 sw = fr_channel_master_ctx_get(ch);
408 MPRINT1(
"Master received close signal for worker %d\n", sw->
id);
419 fprintf(stderr,
"Master got unexpected CE %d\n", ce);
443 if (!
workers[i].worker) num_outstanding--;
446 if ((now - last_checked) > (
NSEC / 10)) {
447 MPRINT1(
"still num_outstanding %d\n", num_outstanding);
450 }
while (num_outstanding > 0);
464 MPRINT2(
"Master messages used = %d\n", rcode);
476 int main(
int argc,
char *argv[])
482 fprintf(stderr,
"Failed to start time: %s\n",
fr_syserror(errno));
488 while ((c = getopt(argc, argv,
"c:hm:o:qtw:x")) != -1)
switch (c) {
532 argc -= (optind - 1);
533 argv += (optind - 1);
548 setvbuf(stdout, NULL, _IONBF, 0);
static int const char char buffer[256]
size_t default_message_size
Usually maximum message size.
Public structure describing an I/O path for a protocol.
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Structure to hold the atomic queue.
#define NEVER_RETURNS
Should be placed before the function return type.
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
int fr_channel_service_kevent(fr_channel_t *ch, fr_control_t *c, UNUSED struct kevent const *kev)
Service a control-plane event.
A full channel, which consists of two ends.
fr_message_t m
the message header
@ FR_CHANNEL_DATA_READY_REQUESTOR
fr_listen_t * listen
for tracking packet transport, etc.
uint32_t priority
Priority of this packet.
Channel information which is added to a message.
#define FR_CONTROL_ID_CHANNEL
#define fr_exit_now(_x)
Exit without calling atexit() handlers, producing a log message in debug builds.
ssize_t fr_control_message_pop(fr_atomic_queue_t *aq, uint32_t *p_id, void *data, size_t data_size)
Pop control-plane message.
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
fr_app_io_t const * app_io
I/O path functions.
fr_event_list_t * fr_event_list_alloc(TALLOC_CTX *ctx, fr_event_status_cb_t status, void *status_uctx)
Initialise a new event list.
Stores all information relating to an event list.
int fr_log_init_legacy(fr_log_t *log, bool daemonize)
Initialise file descriptors based on logging destination.
@ L_DBG_LVL_MAX
Lowest priority debug messages (-xxxxx | -Xxxx).
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
int fr_message_done(fr_message_t *m)
Mark a message as done.
int fr_message_set_messages_used(fr_message_set_t *ms)
Count the number of used messages.
void fr_message_set_debug(fr_message_set_t *ms, FILE *fp)
Print debug information about the message set.
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
A Message set, composed of message headers and ring buffer data.
fr_time_t when
when this message was sent
uint8_t * data
pointer to the data in the ring buffer
size_t data_size
size of the data in the ring buffer
static TALLOC_CTX * autofree
static fr_event_list_t * events
rlm_rcode_t
Return codes indicating the result of the module call.
fr_packet_t * packet
Incoming request.
fr_packet_t * reply
Outgoing response.
rlm_rcode_t rcode
Last rcode returned by a module.
uint64_t number
Monotonically increasing request number. Reset on server restart.
Optional arguments for initialising requests.
Signals that can be sent to a request.
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
eap_aka_sim_process_conf_t * inst
#define fr_time()
Allow us to arbitrarily manipulate time.
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
#define talloc_autofree_context
The original function is deprecated, so replace it with our version.
static TALLOC_CTX * talloc_init_const(char const *name)
Allocate a top level chunk with a constant name.
int fr_time_start(void)
Initialize the local time.
static fr_event_list_t * el
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
fr_worker_t * fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config)
Create a worker.
void fr_worker(fr_worker_t *worker)
The main loop and entry point of the stand-alone worker thread.
void fr_worker_debug(fr_worker_t *worker, FILE *fp)
Print debug information about the worker structure.
A worker which takes packets from a master, and processes them.
static int max_control_plane
unsigned int id
a unique ID
fr_channel_t * ch
channel for communicating with the worker
int main(int argc, char *argv[])
pthread_t pthread_id
the thread of this worker
static fr_schedule_worker_t workers[MAX_WORKERS]
static int test_decode(UNUSED void const *instance, request_t *request, uint8_t *const data, size_t data_len)
static fr_atomic_queue_t * aq_master
static size_t test_nak(UNUSED void const *instance, void *packet_ctx, uint8_t *const packet, size_t packet_len, uint8_t *reply, UNUSED size_t reply_len)
request_t * request_alloc(UNUSED TALLOC_CTX *ctx, UNUSED request_init_args_t const *args)
#define MAX_CONTROL_PLANE
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t const *request)
static void master_process(void)
static int max_outstanding
static fr_control_t * control_master
static void * worker_thread(void *arg)
static void sig_ignore(int sig)
static ssize_t test_encode(void const *instance, request_t *request, uint8_t *const data, size_t data_len)
static fr_app_io_t app_io
static rlm_rcode_t test_process(UNUSED void const *inst, request_t *request, fr_io_action_t action)
static NEVER_RETURNS void usage(void)
fr_worker_t * worker
the worker data structure
Scheduler specific information for worker threads.