The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
channel_test.c
Go to the documentation of this file.
1 /*
2  * channel_test.c Tests for channels
3  *
4  * Version: $Id: d322ff20be747104b209c6e9b6c68872e2f19fb7 $
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * @copyright 2016 Alan DeKok (aland@freeradius.org)
21  */
22 
23 RCSID("$Id: d322ff20be747104b209c6e9b6c68872e2f19fb7 $")
24 
25 #include <freeradius-devel/io/channel.h>
26 #include <freeradius-devel/io/control.h>
27 #include <freeradius-devel/util/debug.h>
28 #include <freeradius-devel/util/syserror.h>
29 #include <freeradius-devel/util/talloc.h>
30 
31 #ifdef HAVE_GETOPT_H
32 # include <getopt.h>
33 #endif
34 
35 #include <pthread.h>
36 #include <sys/event.h>
37 
38 #define MAX_MESSAGES (2048)
39 #define MAX_CONTROL_PLANE (1024)
40 #define MAX_KEVENTS (10)
41 
42 #define MPRINT1 if (debug_lvl) printf
43 #define MPRINT2 if (debug_lvl > 1) printf
44 
45 static int debug_lvl = 0;
46 static int kq_master, kq_worker;
49 static int max_messages = 10;
50 static int max_control_plane = 0;
51 static int max_outstanding = 1;
52 static bool touch_memory = false;
53 
54 /**********************************************************************/
55 typedef struct request_s request_t;
56 
58 {
59  return NULL;
60 }
61 
62 void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t const *request)
63 {
64 }
65 
66 /**********************************************************************/
67 
68 static NEVER_RETURNS void usage(void)
69 {
70  fprintf(stderr, "usage: channel_test [OPTS]\n");
71  fprintf(stderr, " -c <control-plane> Size of the control plane queue.\n");
72  fprintf(stderr, " -m <messages> Send number of messages.\n");
73  fprintf(stderr, " -o <outstanding> Keep number of messages outstanding.\n");
74  fprintf(stderr, " -t Touch memory for fake packets.\n");
75  fprintf(stderr, " -x Debugging mode.\n");
76 
77  fr_exit_now(EXIT_FAILURE);
78 }
79 
80 static void *channel_master(void *arg)
81 {
82  bool running, signaled_close;
83  int rcode, i, num_events;
84  int num_outstanding, num_messages;
85  int num_replies;
86  fr_message_set_t *ms;
87  TALLOC_CTX *ctx;
88  fr_channel_t *channel = arg;
89  fr_channel_t *new_channel;
91  struct kevent events[MAX_KEVENTS];
92 
93  MEM(ctx = talloc_init_const("channel_master"));
94 
96  if (!ms) {
97  fprintf(stderr, "Failed creating message set\n");
98  fr_exit_now(EXIT_FAILURE);
99  }
100 
101  MPRINT1("Master started.\n");
102 
103  /*
104  * Signal the worker that the channel is open
105  */
106  rcode = fr_channel_signal_open(channel);
107  if (rcode < 0) {
108  fprintf(stderr, "Failed signaling open: %s\n", fr_syserror(errno));
109  fr_exit_now(EXIT_FAILURE);
110  }
111 
112  /*
113  * Bootstrap the queue with messages.
114  */
115  num_replies = num_outstanding = num_messages = 0;
116 
117  running = true;
118  signaled_close = false;
119 
120  while (running) {
121  fr_time_t now;
122  int num_to_send;
123  fr_channel_data_t *cd, *reply;
124 
125 #if 0
126  /*
127  * Drain the input queues before sleeping.
128  */
129  while ((reply = fr_channel_recv_reply(channel)) != NULL) {
130  num_replies++;
131  num_outstanding--;
132  MPRINT1("Master got reply %d, outstanding=%d, %d/%d sent.\n",
133  num_replies, num_outstanding, num_messages, max_messages);
134  fr_message_done(&reply->m);
135  }
136 #endif
137 
138  /*
139  * Ensure we have outstanding messages.
140  */
141  if (num_messages >= max_messages) {
142  MPRINT1("Master DONE sending\n");
143  goto check_close;
144  }
145 
146  num_to_send = max_outstanding - num_outstanding;
147  if ((num_messages + num_to_send) > max_messages) {
148  num_to_send = max_messages - num_messages;
149  }
150  MPRINT1("Master sending %d messages\n", num_to_send);
151 
152  for (i = 0; i < num_to_send; i++) {
153  cd = (fr_channel_data_t *) fr_message_alloc(ms, NULL, 100);
154  fr_assert(cd != NULL);
155 
156  num_outstanding++;
157  num_messages++;
158 
159  cd->m.when = fr_time();
160 
161  if (touch_memory) {
162  size_t j, k;
163 
164  for (j = k = 0; j < cd->m.data_size; j++) {
165  k += cd->m.data[j];
166  }
167 
168  cd->m.data[4] = k;
169  }
170 
171  memcpy(cd->m.data, &num_messages, sizeof(num_messages));
172 
173  MPRINT1("Master sent message %d\n", num_messages);
174  rcode = fr_channel_send_request(channel, cd, &reply);
175  if (rcode < 0) {
176  fprintf(stderr, "Failed sending request: %s\n", fr_syserror(errno));
177  }
178  fr_assert(rcode == 0);
179  if (reply) {
180  num_replies++;
181  num_outstanding--;
182  MPRINT1("Master got reply %d, outstanding=%d, %d/%d sent.\n",
183  num_replies, num_outstanding, num_messages, max_messages);
184  fr_message_done(&reply->m);
185  }
186  }
187 
188  /*
189  * Signal close only when done.
190  */
191 check_close:
192 
193  if (!signaled_close && (num_messages >= max_messages) && (num_outstanding == 0)) {
194  MPRINT1("Master signaling worker to exit.\n");
195  rcode = fr_channel_signal_responder_close(channel);
196  if (rcode < 0) {
197  fprintf(stderr, "Failed signaling close: %s\n", fr_syserror(errno));
198  fr_exit_now(EXIT_FAILURE);
199  }
200 
201  signaled_close = true;
202  }
203 
204  MPRINT1("Master waiting on events.\n");
205  fr_assert(num_messages <= max_messages);
206 
207  num_events = kevent(kq_master, NULL, 0, events, MAX_KEVENTS, NULL);
208  MPRINT1("Master kevent returned %d\n", num_events);
209 
210  if (num_events < 0) {
211  if (num_events == EINTR) continue;
212 
213  fprintf(stderr, "Failed waiting for kevent: %s\n", fr_syserror(errno));
214  fr_exit_now(EXIT_FAILURE);
215  }
216 
217  if (num_events == 0) continue;
218 
219  /*
220  * Service the events.
221  */
222  for (i = 0; i < num_events; i++) {
223  (void) fr_channel_service_kevent(channel, control_master, &events[i]);
224  }
225 
226  now = fr_time();
227 
228  MPRINT1("Master servicing control-plane aq %p\n", aq_master);
229 
230  while (true) {
231  uint32_t id;
232  size_t data_size;
233  char data[256];
234 
235  data_size = fr_control_message_pop(aq_master, &id, data, sizeof(data));
236  if (!data_size) break;
237 
239 
240  ce = fr_channel_service_message(now, &new_channel, data, data_size);
241  MPRINT1("Master got channel event %d\n", ce);
242 
243  switch (ce) {
245  MPRINT1("Master got data ready signal\n");
246  fr_assert(new_channel == channel);
247 
248  reply = fr_channel_recv_reply(channel);
249  if (!reply) {
250  MPRINT1("Master SIGNAL WITH NO DATA!\n");
251  continue;
252  }
253 
254  do {
255  num_replies++;
256  num_outstanding--;
257  MPRINT1("Master got reply %d, outstanding=%d, %d/%d sent.\n",
258  num_replies, num_outstanding, num_messages, max_messages);
259  fr_message_done(&reply->m);
260  } while ((reply = fr_channel_recv_reply(channel)) != NULL);
261  break;
262 
263  case FR_CHANNEL_CLOSE:
264  MPRINT1("Master received close signal\n");
265  fr_assert(new_channel == channel);
266  fr_assert(signaled_close == true);
267  running = false;
268  break;
269 
270  case FR_CHANNEL_NOOP:
271  MPRINT1("Master got NOOP\n");
272  break;
273 
274  default:
275  fprintf(stderr, "Master got unexpected CE %d\n", ce);
276 
277  /*
278  * Not written yet!
279  */
280  fr_assert(0 == 1);
281  break;
282  } /* switch over signal returned */
283  } /* drain the control plane */
284  } /* loop until told to exit */
285 
286  MPRINT1("Master exiting.\n");
287 
288  /*
289  * Force all messages to be garbage collected
290  */
291  MPRINT2("GC\n");
292  fr_message_set_gc(ms);
293 
294  if (debug_lvl > 1) fr_message_set_debug(ms, stdout);
295 
296  /*
297  * After the garbage collection, all messages marked "done" MUST also be marked "free".
298  */
299  rcode = fr_message_set_messages_used(ms);
300  MPRINT2("Master messages used = %d\n", rcode);
301  fr_assert(rcode == 0);
302 
303  talloc_free(ctx);
304 
305  return NULL;
306 }
307 
308 static void *channel_worker(void *arg)
309 {
310  bool running = true;
311  int rcode, num_events;
312  int worker_messages = 0;
313  fr_message_set_t *ms;
314  TALLOC_CTX *ctx;
315  fr_channel_t *channel = arg;
317  struct kevent events[MAX_KEVENTS];
318 
319  MEM(ctx = talloc_init_const("channel_worker"));
320 
322  if (!ms) {
323  fprintf(stderr, "Failed creating message set\n");
324  fr_exit_now(EXIT_FAILURE);
325  }
326 
327  MPRINT1("\tWorker started.\n");
328 
329  while (running) {
330  int i;
331  fr_time_t now;
332  fr_channel_t *new_channel;
333 
334  MPRINT1("\tWorker waiting on events.\n");
335 
336  num_events = kevent(kq_worker, NULL, 0, events, MAX_KEVENTS, NULL);
337  MPRINT1("\tWorker kevent returned %d events\n", num_events);
338 
339  if (num_events < 0) {
340  if (errno == EINTR) continue;
341 
342  fprintf(stderr, "Failed waiting for kevent: %s\n", fr_syserror(errno));
343  fr_exit_now(EXIT_FAILURE);
344  }
345 
346  if (num_events == 0) continue;
347 
348  for (i = 0; i < num_events; i++) {
349  (void) fr_channel_service_kevent(channel, control_worker, &events[i]);
350  }
351 
352  MPRINT1("\tWorker servicing control-plane aq %p\n", aq_worker);
353 
354  now = fr_time();
355 
356  while (true) {
357  uint32_t id;
358  size_t data_size;
359  char data[256];
360  fr_channel_data_t *cd, *reply;
361 
362  data_size = fr_control_message_pop(aq_worker, &id, data, sizeof(data));
363  if (!data_size) break;
364 
366 
367  ce = fr_channel_service_message(now, &new_channel, data, data_size);
368  MPRINT1("\tWorker got channel event %d\n", ce);
369 
370  switch (ce) {
371 
372  case FR_CHANNEL_OPEN:
373  MPRINT1("\tWorker received a new channel\n");
374  fr_assert(new_channel == channel);
375  break;
376 
377  case FR_CHANNEL_CLOSE:
378  MPRINT1("\tWorker requested to close the channel.\n");
379  fr_assert(new_channel == channel);
380  running = false;
381 
382  /*
383  * Drain the input before we ACK the exit.
384  */
385  while ((cd = fr_channel_recv_request(channel)) != NULL) {
386  worker_messages++;
387  MPRINT1("\tWorker got message %d\n", worker_messages);
388  fr_message_done(&cd->m);
389  }
390 
391  (void) fr_channel_responder_ack_close(channel);
392  break;
393 
395  MPRINT1("\tWorker got data ready signal\n");
396  fr_assert(new_channel == channel);
397 
398  cd = fr_channel_recv_request(channel);
399  if (!cd) {
400  MPRINT1("\tWorker SIGNAL WITH NO DATA!\n");
401  break;
402  }
403 
404  while (cd) {
405  int message_id;
406 
407  worker_messages++;
408 
409  fr_assert(cd->m.data != NULL);
410  memcpy(&message_id, cd->m.data, sizeof(message_id));
411  MPRINT1("\tWorker got message %d (says %d)\n", worker_messages, message_id);
412 
413  reply = (fr_channel_data_t *) fr_message_alloc(ms, NULL, 100);
414  fr_assert(reply != NULL);
415 
416  reply->m.when = fr_time();
417  fr_message_done(&cd->m);
418 
419  if (touch_memory) {
420  size_t j, k;
421 
422  for (j = k = 0; j < reply->m.data_size; j++) {
423  k += reply->m.data[j];
424  }
425 
426  reply->m.data[4] = k;
427  }
428 
429 
430  MPRINT1("\tWorker sending reply to messages %d\n", worker_messages);
431  rcode = fr_channel_send_reply(channel, reply, &cd);
432  if (rcode < 0) {
433  fprintf(stderr, "Failed sending reply: %s\n", fr_syserror(errno));
434  }
435  fr_assert(rcode == 0);
436  }
437  break;
438 
439  case FR_CHANNEL_NOOP:
440  MPRINT1("\tWorker got NOOP\n");
441  fr_assert(new_channel == channel);
442  break;
443 
444  default:
445  fprintf(stderr, "\tWorker got unexpected CE %d\n", ce);
446 
447  /*
448  * Not written yet!
449  */
450  fr_assert(0 == 1);
451  break;
452  } /* switch over signals */
453 
454  /*
455  * Get a new idea of "now".
456  */
457  now = fr_time();
458  } /* drain the control plane */
459  }
460 
461  MPRINT1("\tWorker exiting.\n");
462 
463  /*
464  * Force all messages to be garbage collected
465  */
466  MPRINT2("Worker GC\n");
467  fr_message_set_gc(ms);
468 
469  if (debug_lvl > 1) fr_message_set_debug(ms, stdout);
470 
471  /*
472  * After the garbage collection, all messages marked "done" MUST also be marked "free".
473  */
474  rcode = fr_message_set_messages_used(ms);
475  fr_cond_assert(rcode == 0);
476 
477  talloc_free(ctx);
478 
479  return NULL;
480 }
481 
482 
483 
484 int main(int argc, char *argv[])
485 {
486  int c;
487  fr_channel_t *channel;
488  TALLOC_CTX *autofree = talloc_autofree_context();
489  pthread_attr_t attr;
490  pthread_t master_id, worker_id;
491 
492  fr_time_start();
493 
494  while ((c = getopt(argc, argv, "c:hm:o:tx")) != -1) switch (c) {
495  case 'x':
496  debug_lvl++;
497  break;
498 
499  case 'c':
500  max_control_plane = atoi(optarg);
501  break;
502 
503  case 'm':
504  max_messages = atoi(optarg);
505  break;
506 
507  case 'o':
508  max_outstanding = atoi(optarg);
509  break;
510 
511  case 't':
512  touch_memory = true;
513  break;
514 
515  case 'h':
516  default:
517  usage();
518  }
519 
521 
522  if (!max_control_plane) {
525  }
526 
527 #if 0
528  argc -= (optind - 1);
529  argv += (optind - 1);
530 #endif
531 
532  kq_master = kqueue();
533  fr_assert(kq_master >= 0);
534 
535  kq_worker = kqueue();
536  fr_assert(kq_worker >= 0);
537 
539  fr_assert(aq_master != NULL);
540 
542  fr_assert(aq_worker != NULL);
543 
545  fr_assert(control_master != NULL);
546 
548  fr_assert(control_worker != NULL);
549 
551  if (!channel) {
552  fprintf(stderr, "channel_test: Failed to create channel\n");
553  fr_exit_now(EXIT_FAILURE);
554  }
555 
556  /*
557  * Start the two threads, with the channel.
558  */
559  (void) pthread_attr_init(&attr);
560  (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
561 
562  (void) pthread_create(&master_id, &attr, channel_master, channel);
563  (void) pthread_create(&worker_id, &attr, channel_worker, channel);
564 
565  (void) pthread_join(master_id, NULL);
566  (void) pthread_join(worker_id, NULL);
567 
568  close(kq_master);
569  close(kq_worker);
570 
571  fr_channel_debug(channel, stdout);
572 
573  fr_exit_now(EXIT_SUCCESS);
574 }
int const char * file
Definition: acutest.h:702
va_list args
Definition: acutest.h:770
int const char int line
Definition: acutest.h:702
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Definition: atomic_queue.c:80
Structure to hold the atomic queue.
Definition: atomic_queue.c:54
#define RCSID(id)
Definition: build.h:481
#define NEVER_RETURNS
Should be placed before the function return type.
Definition: build.h:311
#define UNUSED
Definition: build.h:313
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
Definition: channel.c:183
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
Definition: channel.c:408
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
Definition: channel.c:824
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
Definition: channel.c:306
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition: channel.c:685
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
Definition: channel.c:472
int fr_channel_service_kevent(fr_channel_t *ch, fr_control_t *c, UNUSED struct kevent const *kev)
Service a control-plane event.
Definition: channel.c:788
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
Definition: channel.c:511
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
Definition: channel.c:854
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
Definition: channel.c:952
A full channel, which consists of two ends.
Definition: channel.c:144
fr_message_t m
the message header
Definition: channel.h:105
fr_channel_event_t
Definition: channel.h:67
@ FR_CHANNEL_NOOP
Definition: channel.h:74
@ FR_CHANNEL_CLOSE
Definition: channel.h:72
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition: channel.h:70
@ FR_CHANNEL_OPEN
Definition: channel.h:71
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition: channel.h:69
Channel information which is added to a message.
Definition: channel.h:104
static int max_control_plane
Definition: channel_test.c:50
int main(int argc, char *argv[])
Definition: channel_test.c:484
#define MAX_KEVENTS
Definition: channel_test.c:40
#define MAX_MESSAGES
Definition: channel_test.c:38
static void * channel_worker(void *arg)
Definition: channel_test.c:308
static bool touch_memory
Definition: channel_test.c:52
static fr_atomic_queue_t * aq_master
Definition: channel_test.c:47
#define MAX_CONTROL_PLANE
Definition: channel_test.c:39
request_t * request_alloc(UNUSED TALLOC_CTX *ctx, UNUSED request_init_args_t const *args)
Definition: channel_test.c:57
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t const *request)
Definition: channel_test.c:62
#define MPRINT2
Definition: channel_test.c:43
static void * channel_master(void *arg)
Definition: channel_test.c:80
static int max_outstanding
Definition: channel_test.c:51
#define MPRINT1
Definition: channel_test.c:42
static fr_control_t * control_worker
Definition: channel_test.c:48
static fr_control_t * control_master
Definition: channel_test.c:48
static int kq_master
Definition: channel_test.c:46
static int kq_worker
Definition: channel_test.c:46
static int max_messages
Definition: channel_test.c:49
static NEVER_RETURNS void usage(void)
Definition: channel_test.c:68
static fr_atomic_queue_t * aq_worker
Definition: channel_test.c:47
static int debug_lvl
Definition: channel_test.c:45
#define FR_CONTROL_ID_CHANNEL
Definition: control.h:56
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:139
#define fr_exit_now(_x)
Exit without calling atexit() handlers, producing a log message in debug builds.
Definition: debug.h:234
ssize_t fr_control_message_pop(fr_atomic_queue_t *aq, uint32_t *p_id, void *data, size_t data_size)
Pop control-plane message.
Definition: control.c:377
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition: control.c:149
The control structure.
Definition: control.c:79
talloc_free(reap)
unsigned int uint32_t
Definition: merged_model.c:33
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
Definition: message.c:127
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition: message.c:190
int fr_message_set_messages_used(fr_message_set_t *ms)
Count the number of used messages.
Definition: message.c:1212
void fr_message_set_debug(fr_message_set_t *ms, FILE *fp)
Print debug information about the message set.
Definition: message.c:1262
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition: message.c:1238
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition: message.c:988
A Message set, composed of message headers and ring buffer data.
Definition: message.c:95
fr_time_t when
when this message was sent
Definition: message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition: message.h:49
size_t data_size
size of the data in the ring buffer
Definition: message.h:50
static TALLOC_CTX * autofree
Definition: radclient-ng.c:107
static fr_event_list_t * events
Definition: radsniff.c:59
rlm_rcode_t rcode
Last rcode returned by a module.
Definition: request.h:233
Optional arguments for initialising requests.
Definition: request.h:254
static _Thread_local int worker_id
Internal ID of the current worker thread.
Definition: schedule.c:151
fr_assert(0)
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition: syserror.c:243
#define talloc_autofree_context
The original function is deprecated, so replace it with our version.
Definition: talloc.h:51
static TALLOC_CTX * talloc_init_const(char const *name)
Allocate a top level chunk with a constant name.
Definition: talloc.h:112
int fr_time_start(void)
Initialize the local time.
Definition: time.c:150
"server local" time.
Definition: time.h:69
close(uq->fd)
static fr_slen_t data
Definition: value.h:1265