The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
worker_test.c
Go to the documentation of this file.
1 /*
2  * worker_test.c Tests for channels
3  *
4  * Version: $Id: 3a4c481cc9774cdda47a145cdcf94007bd4acdf0 $
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * @copyright 2016 Alan DeKok (aland@freeradius.org)
21  */
22 
23 RCSID("$Id: 3a4c481cc9774cdda47a145cdcf94007bd4acdf0 $")
24 
25 #include <freeradius-devel/io/control.h>
26 #include <freeradius-devel/io/listen.h>
27 #include <freeradius-devel/io/worker.h>
28 #include <freeradius-devel/util/debug.h>
29 #include <freeradius-devel/util/syserror.h>
30 #include <freeradius-devel/util/talloc.h>
31 
32 #ifdef HAVE_GETOPT_H
33 # include <getopt.h>
34 #endif
35 
36 #include <pthread.h>
37 #include <signal.h>
38 
39 #include <sys/event.h>
40 
41 #define MAX_MESSAGES (2048)
42 #define MAX_CONTROL_PLANE (1024)
43 #define MAX_KEVENTS (10)
44 #define MAX_WORKERS (1024)
45 
46 #define MPRINT1 if (debug_lvl) printf
47 #define MPRINT2 if (debug_lvl > 1) printf
48 
49 typedef struct {
50  int id; //!< ID of the worker 0..N
51  pthread_t pthread_id; //!< pthread ID of the worker
52  fr_worker_t *worker; //!< pointer to the worker
53  fr_channel_t *ch; //!< channel for communicating with the worker
55 
56 static int debug_lvl = 0;
57 static int kq_master;
60 static int max_messages = 10;
61 static int max_control_plane = 0;
62 static int max_outstanding = 1;
63 static bool touch_memory = false;
64 static int num_workers = 1;
65 static bool quiet = false;
67 
68 /**********************************************************************/
69 typedef struct request_s request_t;
70 
72 {
73  return NULL;
74 }
75 
76 void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t const *request)
77 {
78 }
79 
80 /**********************************************************************/
81 
82 static NEVER_RETURNS void usage(void)
83 {
84  fprintf(stderr, "usage: worker_test [OPTS]\n");
85  fprintf(stderr, " -c <control-plane> Size of the control plane queue.\n");
86  fprintf(stderr, " -m <messages> Send number of messages.\n");
87  fprintf(stderr, " -o <outstanding> Keep number of messages outstanding.\n");
88  fprintf(stderr, " -q quiet - suppresses worker stats.\n");
89  fprintf(stderr, " -t Touch memory for fake packets.\n");
90  fprintf(stderr, " -w N Create N workers. Default is 1.\n");
91  fprintf(stderr, " -x Debugging mode.\n");
92 
93  fr_exit_now(EXIT_FAILURE);
94 }
95 
96 static rlm_rcode_t test_process(UNUSED void const *inst, request_t *request, fr_io_action_t action)
97 {
98  MPRINT1("\t\tPROCESS --- request %"PRIu64" action %d\n", request->number, action);
100 }
101 
102 static int test_decode(UNUSED void const *instance, request_t *request, uint8_t *const data, size_t data_len)
103 {
105 
106  /*
107  * The data is the packet number.
108  */
109  memcpy(&number, data, sizeof(number));
110  request->number = number;
111 
112  request->async->process = test_process;
113 
114  MPRINT1("\t\tDECODE <<< request %"PRIu64" - %p data %p size %zd\n", request->number,
115  request->async->packet_ctx, data, data_len);
116  return 0;
117 }
118 
119 static ssize_t test_encode(void const *instance, request_t *request, uint8_t *const data, size_t data_len)
120 {
121  MPRINT1("\t\tENCODE >>> request %"PRIu64" - data %p %p size %zd\n", request->number,
122  instance, data, data_len);
123 
124  return data_len;
125 }
126 
127 static size_t test_nak(UNUSED void const *instance, void *packet_ctx, uint8_t *const packet, size_t packet_len, uint8_t *reply, UNUSED size_t reply_len)
128 {
130 
131  /*
132  * The data is the packet number.
133  */
134  memcpy(&number, packet, sizeof(number));
135  memcpy(reply, packet, sizeof(number));
136 
137  MPRINT1("\t\tNAK !!! request %"PRIu64" - data %p %p size %zd\n", (uint64_t) number, packet_ctx, packet, packet_len);
138 
139  return 10;
140 }
141 
142 static fr_app_io_t app_io = {
143  .name = "worker-test",
144  .default_message_size = 4096,
145  .nak = test_nak,
146  .encode = test_encode,
147  .decode = test_decode
148 };
149 
150 static void *worker_thread(void *arg)
151 {
152  TALLOC_CTX *ctx;
153  fr_worker_t *worker;
156  char buffer[16];
157 
158  sw = (fr_schedule_worker_t *) arg;
159 
160  MPRINT1("\tWorker %d started.\n", sw->id);
161 
162  MEM(ctx = talloc_init_const("worker"));
163 
164  el = fr_event_list_alloc(ctx, NULL, NULL);
165  if (!el) {
166  fprintf(stderr, "worker_test: Failed to create the event list\n");
167  fr_exit_now(EXIT_FAILURE);
168  }
169 
170  snprintf(buffer, sizeof(buffer), "%d", sw->id);
171  worker = sw->worker = fr_worker_create(ctx, el, buffer, &default_log, L_DBG_LVL_MAX);
172  if (!worker) {
173  fprintf(stderr, "worker_test: Failed to create the worker\n");
174  fr_exit_now(EXIT_FAILURE);
175  }
176 
177  MPRINT1("\tWorker %d looping.\n", sw->id);
178  fr_worker(worker);
179 
180  sw->worker = NULL;
181  MPRINT1("\tWorker %d exiting.\n", sw->id);
182 
183  talloc_free(ctx);
184  return NULL;
185 }
186 
187 
188 static void master_process(void)
189 {
190  bool running, signaled_close;
191  int rcode, i, num_events, which_worker;
192  int num_outstanding, num_messages;
193  int num_replies;
194  fr_message_set_t *ms;
195  TALLOC_CTX *ctx;
196  fr_channel_t *ch;
198  pthread_attr_t attr;
200  fr_listen_t listen = { .app_io = &app_io };
201  struct kevent events[MAX_KEVENTS];
202 
203  MEM(ctx = talloc_init_const("master"));
204 
206  if (!ms) {
207  fprintf(stderr, "Failed creating message set\n");
208  fr_exit_now(EXIT_FAILURE);
209  }
210 
211  MPRINT1("Master started.\n");
212 
213  /*
214  * Create the worker threads.
215  */
216  (void) pthread_attr_init(&attr);
217  (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
218 
219  for (i = 0; i < num_workers; i++) {
220  workers[i].id = i;
221  (void) pthread_create(&workers[i].pthread_id, &attr, worker_thread, &workers[i]);
222  }
223 
224  MPRINT1("Master created %d workers.\n", num_workers);
225 
226  /*
227  * Busy loop because that's fine for the test
228  */
229  num_outstanding = 0;
230  while (num_outstanding < num_workers) {
231  for (i = 0; i < num_workers; i++) {
232  if (!workers[i].worker) continue;
233  if (workers[i].ch != NULL) continue;
234 
235  /*
236  * Create the channel and signal the
237  * worker that it is open
238  */
239  MPRINT1("Master creating channel to worker %d.\n", num_workers);
241  fr_assert(workers[i].ch != NULL);
242 
243  (void) fr_channel_master_ctx_add(workers[i].ch, &workers[i]);
244 
245  num_outstanding++;
246  }
247  }
248 
249  MPRINT1("Master created all channels.\n");
250 
251  /*
252  * Bootstrap the queue with messages.
253  */
254  num_replies = num_outstanding = num_messages = 0;
255  which_worker = 0;
256 
257  running = true;
258  signaled_close = false;
259 
260  while (running) {
261  fr_time_t now;
262  int num_to_send;
263  fr_channel_data_t *cd, *reply;
264 
265  /*
266  * Ensure we have outstanding messages.
267  */
268  if (num_messages >= max_messages) {
269  MPRINT1("Master DONE sending\n");
270  goto check_close;
271  }
272 
273  num_to_send = max_outstanding - num_outstanding;
274  if ((num_messages + num_to_send) > max_messages) {
275  num_to_send = max_messages - num_messages;
276  }
277  MPRINT1("Master sending %d messages\n", num_to_send);
278 
279  for (i = 0; i < num_to_send; i++) {
280  cd = (fr_channel_data_t *) fr_message_alloc(ms, NULL, 100);
281  fr_assert(cd != NULL);
282 
283  num_outstanding++;
284  num_messages++;
285 
286  cd->m.when = fr_time();
287 
288  cd->priority = 0;
289  cd->listen = &listen;
290 
291  if (touch_memory) {
292  size_t j, k;
293 
294  for (j = k = 0; j < cd->m.data_size; j++) {
295  k += cd->m.data[j];
296  }
297 
298  cd->m.data[4] = k;
299  }
300 
301  memcpy(cd->m.data, &num_messages, sizeof(num_messages));
302 
303  MPRINT1("Master sent message %d to worker %d\n", num_messages, which_worker);
304  rcode = fr_channel_send_request(workers[which_worker].ch, cd, &reply);
305  if (rcode < 0) {
306  fprintf(stderr, "Failed sending request: %s\n", fr_syserror(errno));
307  }
308  which_worker++;
309  if (which_worker >= num_workers) which_worker = 0;
310 
311  fr_assert(rcode == 0);
312  if (reply) {
313  num_replies++;
314  num_outstanding--;
315  MPRINT1("Master got reply %d, outstanding=%d, %d/%d sent.\n",
316  num_replies, num_outstanding, num_messages, max_messages);
317  fr_message_done(&reply->m);
318  }
319  }
320 
321  /*
322  * Signal close only when done.
323  */
324 check_close:
325  if (!signaled_close && (num_messages >= max_messages) && (num_outstanding == 0)) {
326  MPRINT1("Master signaling workers to exit.\n");
327 
328  for (i = 0; i < num_workers; i++) {
329  if (!quiet) {
330  printf("Worker %d\n", i);
331  fr_worker_debug(workers[i].worker, stdout);
332  }
333 
335  MPRINT1("Master asked exit for worker %d.\n", workers[i].id);
336  if (rcode < 0) {
337  fprintf(stderr, "Failed signaling close %d: %s\n", i, fr_syserror(errno));
338  fr_exit_now(EXIT_FAILURE);
339  }
340  }
341  signaled_close = true;
342  }
343 
344  MPRINT1("Master waiting on events.\n");
345  fr_assert(num_messages <= max_messages);
346 
347  num_events = kevent(kq_master, NULL, 0, events, MAX_KEVENTS, NULL);
348  MPRINT1("Master kevent returned %d\n", num_events);
349 
350  if (num_events < 0) {
351  if (errno == EINTR) continue;
352 
353  fprintf(stderr, "Failed waiting for kevent: %s\n", fr_syserror(errno));
354  fr_exit_now(EXIT_FAILURE);
355  }
356 
357  if (num_events == 0) continue;
358 
359  /*
360  * Service the events.
361  *
362  * @todo this should NOT take a channel pointer
363  */
364  for (i = 0; i < num_events; i++) {
366  }
367 
368  now = fr_time();
369 
370  MPRINT1("Master servicing control-plane\n");
371 
372  while (true) {
373  uint32_t id;
374  size_t data_size;
375  char data[256];
376 
377  data_size = fr_control_message_pop(aq_master, &id, data, sizeof(data));
378  if (!data_size) break;
379 
381 
382  ce = fr_channel_service_message(now, &ch, data, data_size);
383  MPRINT1("Master got channel event %d\n", ce);
384 
385  switch (ce) {
387  MPRINT1("Master got data ready signal\n");
388 
389  reply = fr_channel_recv_reply(ch);
390  if (!reply) {
391  MPRINT1("Master SIGNAL WITH NO DATA!\n");
392  continue;
393  }
394 
395  do {
396  num_replies++;
397  num_outstanding--;
398  MPRINT1("Master got reply %d, outstanding=%d, %d/%d sent.\n",
399  num_replies, num_outstanding, num_messages, max_messages);
400  fr_message_done(&reply->m);
401  } while ((reply = fr_channel_recv_reply(ch)) != NULL);
402  break;
403 
404  case FR_CHANNEL_CLOSE:
405  sw = fr_channel_master_ctx_get(ch);
406  fr_assert(sw != NULL);
407 
408  MPRINT1("Master received close signal for worker %d\n", sw->id);
409  fr_assert(signaled_close == true);
410 
411  (void) pthread_kill(sw->pthread_id, SIGTERM);
412  running = false;
413  break;
414 
415  case FR_CHANNEL_NOOP:
416  break;
417 
418  default:
419  fprintf(stderr, "Master got unexpected CE %d\n", ce);
420 
421  /*
422  * Not written yet!
423  */
424  fr_assert(0 == 1);
425  break;
426  } /* switch over signal returned */
427  } /* drain the control plane */
428  } /* loop until told to exit */
429 
430  MPRINT1("Master exiting.\n");
431 
432  fr_time_t last_checked = fr_time();
433 
434  /*
435  * Busy-wait for the workers to exit;
436  */
437  do {
438  fr_time_t now = fr_time();
439 
440  num_outstanding = num_workers;
441 
442  for (i = 0; i < num_workers; i++) {
443  if (!workers[i].worker) num_outstanding--;
444  }
445 
446  if ((now - last_checked) > (NSEC / 10)) {
447  MPRINT1("still num_outstanding %d\n", num_outstanding);
448  }
449 
450  } while (num_outstanding > 0);
451 
452  /*
453  * Force all messages to be garbage collected
454  */
455  MPRINT2("GC\n");
456  fr_message_set_gc(ms);
457 
458  if (debug_lvl > 1) fr_message_set_debug(ms, stdout);
459 
460  /*
461  * After the garbage collection, all messages marked "done" MUST also be marked "free".
462  */
463  rcode = fr_message_set_messages_used(ms);
464  MPRINT2("Master messages used = %d\n", rcode);
465  fr_assert(rcode == 0);
466 
467  talloc_free(ctx);
468 
469 }
470 
471 static void sig_ignore(int sig)
472 {
473  (void) signal(sig, sig_ignore);
474 }
475 
476 int main(int argc, char *argv[])
477 {
478  int c;
479  TALLOC_CTX *autofree = talloc_autofree_context();
480 
481  if (fr_time_start() < 0) {
482  fprintf(stderr, "Failed to start time: %s\n", fr_syserror(errno));
483  fr_exit_now(EXIT_FAILURE);
484  }
485 
487 
488  while ((c = getopt(argc, argv, "c:hm:o:qtw:x")) != -1) switch (c) {
489  case 'x':
490  debug_lvl++;
491  break;
492 
493  case 'c':
494  max_control_plane = atoi(optarg);
495  break;
496 
497  case 'm':
498  max_messages = atoi(optarg);
499  break;
500 
501  case 'o':
502  max_outstanding = atoi(optarg);
503  break;
504 
505  case 'q':
506  quiet = true;
507  break;
508 
509  case 't':
510  touch_memory = true;
511  break;
512 
513  case 'w':
514  num_workers = atoi(optarg);
515  if ((num_workers <= 0) || (num_workers >= MAX_WORKERS)) usage();
516  break;
517 
518  case 'h':
519  default:
520  usage();
521  }
522 
524 
525  if (!max_control_plane) {
529  }
530 
531 #if 0
532  argc -= (optind - 1);
533  argv += (optind - 1);
534 #endif
535 
536  kq_master = kqueue();
537  fr_assert(kq_master >= 0);
538 
540  fr_assert(aq_master != NULL);
541 
543  fr_assert(control_master != NULL);
544 
545  signal(SIGTERM, sig_ignore);
546 
547  if (debug_lvl) {
548  setvbuf(stdout, NULL, _IONBF, 0);
549  }
550 
551  master_process();
552 
553  close(kq_master);
554 
555  return EXIT_SUCCESS;
556 }
static int const char char buffer[256]
Definition: acutest.h:574
int const char * file
Definition: acutest.h:702
va_list args
Definition: acutest.h:770
int const char int line
Definition: acutest.h:702
size_t default_message_size
Usually maximum message size.
Definition: app_io.h:39
Public structure describing an I/O path for a protocol.
Definition: app_io.h:33
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Definition: atomic_queue.c:80
Structure to hold the atomic queue.
Definition: atomic_queue.c:54
#define RCSID(id)
Definition: build.h:481
#define NEVER_RETURNS
Should be placed before the function return type.
Definition: build.h:311
#define UNUSED
Definition: build.h:313
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
Definition: channel.c:408
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
Definition: channel.c:824
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
Definition: channel.c:306
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition: channel.c:685
int fr_channel_service_kevent(fr_channel_t *ch, fr_control_t *c, UNUSED struct kevent const *kev)
Service a control-plane event.
Definition: channel.c:788
A full channel, which consists of two ends.
Definition: channel.c:144
fr_message_t m
the message header
Definition: channel.h:105
fr_channel_event_t
Definition: channel.h:67
@ FR_CHANNEL_NOOP
Definition: channel.h:74
@ FR_CHANNEL_CLOSE
Definition: channel.h:72
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition: channel.h:70
fr_listen_t * listen
for tracking packet transport, etc.
Definition: channel.h:146
uint32_t priority
Priority of this packet.
Definition: channel.h:140
Channel information which is added to a message.
Definition: channel.h:104
#define FR_CONTROL_ID_CHANNEL
Definition: control.h:56
#define fr_exit_now(_x)
Exit without calling atexit() handlers, producing a log message in debug builds.
Definition: debug.h:234
ssize_t fr_control_message_pop(fr_atomic_queue_t *aq, uint32_t *p_id, void *data, size_t data_size)
Pop control-plane message.
Definition: control.c:377
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition: control.c:149
The control structure.
Definition: control.c:79
fr_app_io_t const * app_io
I/O path functions.
Definition: listen.h:31
fr_event_list_t * fr_event_list_alloc(TALLOC_CTX *ctx, fr_event_status_cb_t status, void *status_uctx)
Initialise a new event list.
Definition: event.c:2899
talloc_free(reap)
Stores all information relating to an event list.
Definition: event.c:411
int fr_log_init_legacy(fr_log_t *log, bool daemonize)
Initialise file descriptors based on logging destination.
Definition: log.c:907
fr_log_t default_log
Definition: log.c:291
@ L_DBG_LVL_MAX
Lowest priority debug messages (-xxxxx | -Xxxx).
Definition: log.h:74
unsigned int uint32_t
Definition: merged_model.c:33
long int ssize_t
Definition: merged_model.c:24
unsigned char uint8_t
Definition: merged_model.c:30
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
Definition: message.c:127
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition: message.c:190
int fr_message_set_messages_used(fr_message_set_t *ms)
Count the number of used messages.
Definition: message.c:1212
void fr_message_set_debug(fr_message_set_t *ms, FILE *fp)
Print debug information about the message set.
Definition: message.c:1262
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition: message.c:1238
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition: message.c:988
A Message set, composed of message headers and ring buffer data.
Definition: message.c:95
fr_time_t when
when this message was sent
Definition: message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition: message.h:49
size_t data_size
size of the data in the ring buffer
Definition: message.h:50
static TALLOC_CTX * autofree
Definition: radclient-ng.c:107
static fr_event_list_t * events
Definition: radsniff.c:59
#define RETURN_MODULE_OK
Definition: rcode.h:57
rlm_rcode_t
Return codes indicating the result of the module call.
Definition: rcode.h:40
fr_packet_t * packet
Incoming request.
Definition: request.h:224
fr_packet_t * reply
Outgoing response.
Definition: request.h:225
rlm_rcode_t rcode
Last rcode returned by a module.
Definition: request.h:233
uint64_t number
Monotonically increasing request number. Reset on server restart.
Definition: request.h:180
Optional arguments for initialising requests.
Definition: request.h:254
Signals that can be sent to a request.
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
Definition: snprintf.c:689
fr_assert(0)
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
eap_aka_sim_process_conf_t * inst
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition: syserror.c:243
#define talloc_autofree_context
The original function is deprecated, so replace it with our version.
Definition: talloc.h:51
static TALLOC_CTX * talloc_init_const(char const *name)
Allocate a top level chunk with a constant name.
Definition: talloc.h:112
int fr_time_start(void)
Initialize the local time.
Definition: time.c:150
#define NSEC
Definition: time.h:379
"server local" time.
Definition: time.h:69
close(uq->fd)
static fr_event_list_t * el
static fr_slen_t data
Definition: value.h:1265
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
Definition: worker.c:1605
fr_worker_t * fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config)
Create a worker.
Definition: worker.c:1356
void fr_worker(fr_worker_t *worker)
The main loop and entry point of the stand-alone worker thread.
Definition: worker.c:1493
void fr_worker_debug(fr_worker_t *worker, FILE *fp)
Print debug information about the worker structure.
Definition: worker.c:1580
A worker which takes packets from a master, and processes them.
Definition: worker.c:94
static int max_control_plane
Definition: worker_test.c:61
unsigned int id
a unique ID
Definition: schedule.c:88
fr_channel_t * ch
channel for communicating with the worker
Definition: radius1_test.c:57
int main(int argc, char *argv[])
Definition: worker_test.c:476
#define MAX_KEVENTS
Definition: worker_test.c:43
#define MAX_MESSAGES
Definition: worker_test.c:41
pthread_t pthread_id
the thread of this worker
Definition: schedule.c:86
static fr_schedule_worker_t workers[MAX_WORKERS]
Definition: worker_test.c:66
static bool touch_memory
Definition: worker_test.c:63
static int test_decode(UNUSED void const *instance, request_t *request, uint8_t *const data, size_t data_len)
Definition: worker_test.c:102
static fr_atomic_queue_t * aq_master
Definition: worker_test.c:58
static size_t test_nak(UNUSED void const *instance, void *packet_ctx, uint8_t *const packet, size_t packet_len, uint8_t *reply, UNUSED size_t reply_len)
Definition: worker_test.c:127
request_t * request_alloc(UNUSED TALLOC_CTX *ctx, UNUSED request_init_args_t const *args)
Definition: worker_test.c:71
#define MAX_CONTROL_PLANE
Definition: worker_test.c:42
#define MAX_WORKERS
Definition: worker_test.c:44
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t const *request)
Definition: worker_test.c:76
#define MPRINT2
Definition: worker_test.c:47
static void master_process(void)
Definition: worker_test.c:188
static int max_outstanding
Definition: worker_test.c:62
#define MPRINT1
Definition: worker_test.c:46
static fr_control_t * control_master
Definition: worker_test.c:59
static void * worker_thread(void *arg)
Definition: worker_test.c:150
static int kq_master
Definition: worker_test.c:57
static void sig_ignore(int sig)
Definition: worker_test.c:471
static bool quiet
Definition: worker_test.c:65
static ssize_t test_encode(void const *instance, request_t *request, uint8_t *const data, size_t data_len)
Definition: worker_test.c:119
static fr_app_io_t app_io
Definition: worker_test.c:142
static int num_workers
Definition: worker_test.c:64
static rlm_rcode_t test_process(UNUSED void const *inst, request_t *request, fr_io_action_t action)
Definition: worker_test.c:96
static int max_messages
Definition: worker_test.c:60
static NEVER_RETURNS void usage(void)
Definition: worker_test.c:82
fr_worker_t * worker
the worker data structure
Definition: schedule.c:97
static int debug_lvl
Definition: worker_test.c:56
Scheduler specific information for worker threads.
Definition: schedule.c:83