The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
channel_test.c
Go to the documentation of this file.
1/*
2 * channel_test.c Tests for channels
3 *
4 * Version: $Id: 1eeac385cefd9396515ad0b28a03804fd6597090 $
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19 *
20 * @copyright 2016 Alan DeKok (aland@freeradius.org)
21 */
22
23RCSID("$Id: 1eeac385cefd9396515ad0b28a03804fd6597090 $")
24
25#include <freeradius-devel/io/channel.h>
26#include <freeradius-devel/util/debug.h>
27#include <freeradius-devel/util/syserror.h>
28#include <freeradius-devel/util/talloc.h>
29
30#ifdef HAVE_GETOPT_H
31# include <getopt.h>
32#endif
33
34#include <pthread.h>
35
36#define MAX_MESSAGES (2048)
37#define MAX_CONTROL_PLANE (1024)
38#define MAX_KEVENTS (10)
39
40#define MPRINT1 if (debug_lvl) printf
41#define MPRINT2 if (debug_lvl > 1) printf
42
43static int debug_lvl = 0;
44static int kq_master, kq_worker;
47static int max_messages = 10;
48static int max_control_plane = 0;
49static int max_outstanding = 1;
50static bool touch_memory = false;
51
52/**********************************************************************/
53typedef struct request_s request_t;
54
56{
57 return NULL;
58}
59
60void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t const *request)
61{
62}
63
64/**********************************************************************/
65
66static NEVER_RETURNS void usage(void)
67{
68 fprintf(stderr, "usage: channel_test [OPTS]\n");
69 fprintf(stderr, " -c <control-plane> Size of the control plane queue.\n");
70 fprintf(stderr, " -m <messages> Send number of messages.\n");
71 fprintf(stderr, " -o <outstanding> Keep number of messages outstanding.\n");
72 fprintf(stderr, " -t Touch memory for fake packets.\n");
73 fprintf(stderr, " -x Debugging mode.\n");
74
75 fr_exit_now(EXIT_FAILURE);
76}
77
78static void *channel_master(void *arg)
79{
80 bool running, signaled_close;
81 int rcode, i, num_events;
82 int num_outstanding, num_messages;
83 int num_replies;
85 TALLOC_CTX *ctx;
86 fr_channel_t *channel = arg;
87 fr_channel_t *new_channel;
89 struct kevent events[MAX_KEVENTS];
90
91 MEM(ctx = talloc_init_const("channel_master"));
92
93 ms = fr_message_set_create(ctx, MAX_MESSAGES, sizeof(fr_channel_data_t), MAX_MESSAGES * 1024, false);
94 if (!ms) {
95 fprintf(stderr, "Failed creating message set\n");
96 fr_exit_now(EXIT_FAILURE);
97 }
98
99 MPRINT1("Master started.\n");
100
101 /*
102 * Signal the worker that the channel is open
103 */
104 rcode = fr_channel_signal_open(channel);
105 if (rcode < 0) {
106 fprintf(stderr, "Failed signaling open: %s\n", fr_syserror(errno));
107 fr_exit_now(EXIT_FAILURE);
108 }
109
110 /*
111 * Bootstrap the queue with messages.
112 */
113 num_replies = num_outstanding = num_messages = 0;
114
115 running = true;
116 signaled_close = false;
117
118 while (running) {
119 fr_time_t now;
120 int num_to_send;
121 fr_channel_data_t *cd, *reply;
122
123#if 0
124 /*
125 * Drain the input queues before sleeping.
126 */
127 while ((reply = fr_channel_recv_reply(channel)) != NULL) {
128 num_replies++;
129 num_outstanding--;
130 MPRINT1("Master got reply %d, outstanding=%d, %d/%d sent.\n",
131 num_replies, num_outstanding, num_messages, max_messages);
132 fr_message_done(&reply->m);
133 }
134#endif
135
136 /*
137 * Ensure we have outstanding messages.
138 */
139 if (num_messages >= max_messages) {
140 MPRINT1("Master DONE sending\n");
141 goto check_close;
142 }
143
144 num_to_send = max_outstanding - num_outstanding;
145 if ((num_messages + num_to_send) > max_messages) {
146 num_to_send = max_messages - num_messages;
147 }
148 MPRINT1("Master sending %d messages\n", num_to_send);
149
150 for (i = 0; i < num_to_send; i++) {
151 cd = (fr_channel_data_t *) fr_message_alloc(ms, NULL, 100);
152 fr_assert(cd != NULL);
153
154 num_outstanding++;
155 num_messages++;
156
157 cd->m.when = fr_time();
158
159 if (touch_memory) {
160 size_t j, k;
161
162 for (j = k = 0; j < cd->m.data_size; j++) {
163 k += cd->m.data[j];
164 }
165
166 cd->m.data[4] = k;
167 }
168
169 memcpy(cd->m.data, &num_messages, sizeof(num_messages));
170
171 MPRINT1("Master sent message %d\n", num_messages);
172 rcode = fr_channel_send_request(channel, cd, &reply);
173 if (rcode < 0) {
174 fprintf(stderr, "Failed sending request: %s\n", fr_syserror(errno));
175 }
176 fr_assert(rcode == 0);
177 if (reply) {
178 num_replies++;
179 num_outstanding--;
180 MPRINT1("Master got reply %d, outstanding=%d, %d/%d sent.\n",
181 num_replies, num_outstanding, num_messages, max_messages);
182 fr_message_done(&reply->m);
183 }
184 }
185
186 /*
187 * Signal close only when done.
188 */
189check_close:
190
191 if (!signaled_close && (num_messages >= max_messages) && (num_outstanding == 0)) {
192 MPRINT1("Master signaling worker to exit.\n");
193 rcode = fr_channel_signal_responder_close(channel);
194 if (rcode < 0) {
195 fprintf(stderr, "Failed signaling close: %s\n", fr_syserror(errno));
196 fr_exit_now(EXIT_FAILURE);
197 }
198
199 signaled_close = true;
200 }
201
202 MPRINT1("Master waiting on events.\n");
203 fr_assert(num_messages <= max_messages);
204
205 num_events = kevent(kq_master, NULL, 0, events, MAX_KEVENTS, NULL);
206 MPRINT1("Master kevent returned %d\n", num_events);
207
208 if (num_events < 0) {
209 if (errno == EINTR) continue;
210
211 fprintf(stderr, "Failed waiting for kevent: %s\n", fr_syserror(errno));
212 fr_exit_now(EXIT_FAILURE);
213 }
214
215 if (num_events == 0) continue;
216
217 /*
218 * Service the events.
219 */
220 for (i = 0; i < num_events; i++) {
221 (void) fr_channel_service_kevent(channel, control_master, &events[i]);
222 }
223
224 now = fr_time();
225
226 MPRINT1("Master servicing control-plane aq %p\n", aq_master);
227
228 while (true) {
229 uint32_t id;
230 size_t data_size;
231 char data[256];
232
233 data_size = fr_control_message_pop(aq_master, &id, data, sizeof(data));
234 if (!data_size) break;
235
237
238 ce = fr_channel_service_message(now, &new_channel, data, data_size);
239 MPRINT1("Master got channel event %d\n", ce);
240
241 switch (ce) {
243 MPRINT1("Master got data ready signal\n");
244 fr_assert(new_channel == channel);
245
246 reply = fr_channel_recv_reply(channel);
247 if (!reply) {
248 MPRINT1("Master SIGNAL WITH NO DATA!\n");
249 continue;
250 }
251
252 do {
253 num_replies++;
254 num_outstanding--;
255 MPRINT1("Master got reply %d, outstanding=%d, %d/%d sent.\n",
256 num_replies, num_outstanding, num_messages, max_messages);
257 fr_message_done(&reply->m);
258 } while ((reply = fr_channel_recv_reply(channel)) != NULL);
259 break;
260
261 case FR_CHANNEL_CLOSE:
262 MPRINT1("Master received close signal\n");
263 fr_assert(new_channel == channel);
264 fr_assert(signaled_close == true);
265 running = false;
266 break;
267
268 case FR_CHANNEL_NOOP:
269 MPRINT1("Master got NOOP\n");
270 break;
271
272 default:
273 fprintf(stderr, "Master got unexpected CE %d\n", ce);
274
275 /*
276 * Not written yet!
277 */
278 fr_assert(0 == 1);
279 break;
280 } /* switch over signal returned */
281 } /* drain the control plane */
282 } /* loop until told to exit */
283
284 MPRINT1("Master exiting.\n");
285
286 /*
287 * Force all messages to be garbage collected
288 */
289 MPRINT2("GC\n");
291
292 if (debug_lvl > 1) fr_message_set_debug(stdout, ms);
293
294 /*
295 * After the garbage collection, all messages marked "done" MUST also be marked "free".
296 */
298 MPRINT2("Master messages used = %d\n", rcode);
299 fr_assert(rcode == 0);
300
301 talloc_free(ctx);
302
303 return NULL;
304}
305
306static void *channel_worker(void *arg)
307{
308 bool running = true;
309 int rcode, num_events;
310 int worker_messages = 0;
312 TALLOC_CTX *ctx;
313 fr_channel_t *channel = arg;
315 struct kevent events[MAX_KEVENTS];
316
317 MEM(ctx = talloc_init_const("channel_worker"));
318
319 ms = fr_message_set_create(ctx, MAX_MESSAGES, sizeof(fr_channel_data_t), MAX_MESSAGES * 1024, false);
320 if (!ms) {
321 fprintf(stderr, "Failed creating message set\n");
322 fr_exit_now(EXIT_FAILURE);
323 }
324
325 MPRINT1("\tWorker started.\n");
326
327 while (running) {
328 int i;
329 fr_time_t now;
330 fr_channel_t *new_channel;
331
332 MPRINT1("\tWorker waiting on events.\n");
333
334 num_events = kevent(kq_worker, NULL, 0, events, MAX_KEVENTS, NULL);
335 MPRINT1("\tWorker kevent returned %d events\n", num_events);
336
337 if (num_events < 0) {
338 if (errno == EINTR) continue;
339
340 fprintf(stderr, "Failed waiting for kevent: %s\n", fr_syserror(errno));
341 fr_exit_now(EXIT_FAILURE);
342 }
343
344 if (num_events == 0) continue;
345
346 for (i = 0; i < num_events; i++) {
347 (void) fr_channel_service_kevent(channel, control_worker, &events[i]);
348 }
349
350 MPRINT1("\tWorker servicing control-plane aq %p\n", aq_worker);
351
352 now = fr_time();
353
354 while (true) {
355 uint32_t id;
356 size_t data_size;
357 char data[256];
358 fr_channel_data_t *cd, *reply;
359
360 data_size = fr_control_message_pop(aq_worker, &id, data, sizeof(data));
361 if (!data_size) break;
362
364
365 ce = fr_channel_service_message(now, &new_channel, data, data_size);
366 MPRINT1("\tWorker got channel event %d\n", ce);
367
368 switch (ce) {
369
370 case FR_CHANNEL_OPEN:
371 MPRINT1("\tWorker received a new channel\n");
372 fr_assert(new_channel == channel);
373 break;
374
375 case FR_CHANNEL_CLOSE:
376 MPRINT1("\tWorker requested to close the channel.\n");
377 fr_assert(new_channel == channel);
378 running = false;
379
380 /*
381 * Drain the input before we ACK the exit.
382 */
383 while ((cd = fr_channel_recv_request(channel)) != NULL) {
384 worker_messages++;
385 MPRINT1("\tWorker got message %d\n", worker_messages);
386 fr_message_done(&cd->m);
387 }
388
389 (void) fr_channel_responder_ack_close(channel);
390 break;
391
393 MPRINT1("\tWorker got data ready signal\n");
394 fr_assert(new_channel == channel);
395
396 cd = fr_channel_recv_request(channel);
397 if (!cd) {
398 MPRINT1("\tWorker SIGNAL WITH NO DATA!\n");
399 break;
400 }
401
402 while (cd) {
403 int message_id;
404
405 worker_messages++;
406
407 fr_assert(cd->m.data != NULL);
408 memcpy(&message_id, cd->m.data, sizeof(message_id));
409 MPRINT1("\tWorker got message %d (says %d)\n", worker_messages, message_id);
410
411 reply = (fr_channel_data_t *) fr_message_alloc(ms, NULL, 100);
412 fr_assert(reply != NULL);
413
414 reply->m.when = fr_time();
415 fr_message_done(&cd->m);
416
417 if (touch_memory) {
418 size_t j, k;
419
420 for (j = k = 0; j < reply->m.data_size; j++) {
421 k += reply->m.data[j];
422 }
423
424 reply->m.data[4] = k;
425 }
426
427
428 MPRINT1("\tWorker sending reply to messages %d\n", worker_messages);
429 rcode = fr_channel_send_reply(channel, reply, &cd);
430 if (rcode < 0) {
431 fprintf(stderr, "Failed sending reply: %s\n", fr_syserror(errno));
432 }
433 fr_assert(rcode == 0);
434 }
435 break;
436
437 case FR_CHANNEL_NOOP:
438 MPRINT1("\tWorker got NOOP\n");
439 fr_assert(new_channel == channel);
440 break;
441
442 default:
443 fprintf(stderr, "\tWorker got unexpected CE %d\n", ce);
444
445 /*
446 * Not written yet!
447 */
448 fr_assert(0 == 1);
449 break;
450 } /* switch over signals */
451
452 /*
453 * Get a new idea of "now".
454 */
455 now = fr_time();
456 } /* drain the control plane */
457 }
458
459 MPRINT1("\tWorker exiting.\n");
460
461 /*
462 * Force all messages to be garbage collected
463 */
464 MPRINT2("Worker GC\n");
466
467 if (debug_lvl > 1) fr_message_set_debug(stdout, ms);
468
469 /*
470 * After the garbage collection, all messages marked "done" MUST also be marked "free".
471 */
473 fr_cond_assert(rcode == 0);
474
475 talloc_free(ctx);
476
477 return NULL;
478}
479
480
481
482int main(int argc, char *argv[])
483{
484 int c;
485 fr_channel_t *channel;
486 TALLOC_CTX *autofree = talloc_autofree_context();
487 pthread_attr_t attr;
488 pthread_t master_id, worker_id;
489
491
492 while ((c = getopt(argc, argv, "c:hm:o:tx")) != -1) switch (c) {
493 case 'x':
494 debug_lvl++;
495 break;
496
497 case 'c':
498 max_control_plane = atoi(optarg);
499 break;
500
501 case 'm':
502 max_messages = atoi(optarg);
503 break;
504
505 case 'o':
506 max_outstanding = atoi(optarg);
507 break;
508
509 case 't':
510 touch_memory = true;
511 break;
512
513 case 'h':
514 default:
515 usage();
516 }
517
519
520 if (!max_control_plane) {
523 }
524
525#if 0
526 argc -= (optind - 1);
527 argv += (optind - 1);
528#endif
529
530 kq_master = kqueue();
531 fr_assert(kq_master >= 0);
532
533 kq_worker = kqueue();
534 fr_assert(kq_worker >= 0);
535
537 fr_assert(aq_master != NULL);
538
540 fr_assert(aq_worker != NULL);
541
543 fr_assert(control_master != NULL);
544
546 fr_assert(control_worker != NULL);
547
549 if (!channel) {
550 fprintf(stderr, "channel_test: Failed to create channel\n");
551 fr_exit_now(EXIT_FAILURE);
552 }
553
554 /*
555 * Start the two threads, with the channel.
556 */
557 (void) pthread_attr_init(&attr);
558 (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
559
560 (void) pthread_create(&master_id, &attr, channel_master, channel);
561 (void) pthread_create(&worker_id, &attr, channel_worker, channel);
562
563 (void) pthread_join(master_id, NULL);
564 (void) pthread_join(worker_id, NULL);
565
566 close(kq_master);
567 close(kq_worker);
568
569 fr_channel_debug(channel, stdout);
570
571 fr_exit_now(EXIT_SUCCESS);
572}
int const char * file
Definition acutest.h:702
va_list args
Definition acutest.h:770
int const char int line
Definition acutest.h:702
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Structure to hold the atomic queue.
static TALLOC_CTX * autofree
Definition fuzzer.c:44
#define RCSID(id)
Definition build.h:506
#define NEVER_RETURNS
Should be placed before the function return type.
Definition build.h:334
#define UNUSED
Definition build.h:336
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
Definition channel.c:406
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
Definition channel.c:822
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
Definition channel.c:304
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
Definition channel.c:181
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition channel.c:683
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
Definition channel.c:470
int fr_channel_service_kevent(fr_channel_t *ch, fr_control_t *c, UNUSED struct kevent const *kev)
Service a control-plane event.
Definition channel.c:786
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
Definition channel.c:509
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
Definition channel.c:852
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
Definition channel.c:948
A full channel, which consists of two ends.
Definition channel.c:142
fr_message_t m
the message header
Definition channel.h:107
fr_channel_event_t
Definition channel.h:69
@ FR_CHANNEL_NOOP
Definition channel.h:76
@ FR_CHANNEL_CLOSE
Definition channel.h:74
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition channel.h:72
@ FR_CHANNEL_OPEN
Definition channel.h:73
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition channel.h:71
#define FR_CONTROL_ID_CHANNEL
Definition channel.h:67
Channel information which is added to a message.
Definition channel.h:106
static int max_control_plane
static void * channel_master(void *arg)
int main(int argc, char *argv[])
#define MAX_KEVENTS
#define MAX_MESSAGES
static void * channel_worker(void *arg)
static bool touch_memory
static fr_atomic_queue_t * aq_master
#define MAX_CONTROL_PLANE
request_t * request_alloc(UNUSED TALLOC_CTX *ctx, UNUSED request_init_args_t const *args)
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t const *request)
#define MPRINT2
static int max_outstanding
#define MPRINT1
static fr_control_t * control_worker
static fr_control_t * control_master
static int kq_master
static int kq_worker
static int max_messages
static NEVER_RETURNS void usage(void)
static fr_atomic_queue_t * aq_worker
static int debug_lvl
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:141
#define MEM(x)
Definition debug.h:46
#define fr_exit_now(_x)
Exit without calling atexit() handlers, producing a log message in debug builds.
Definition debug.h:236
talloc_free(hp)
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq, size_t num_callbacks)
Create a control-plane signaling path.
Definition control.c:152
ssize_t fr_control_message_pop(fr_atomic_queue_t *aq, uint32_t *p_id, void *data, size_t data_size)
Pop control-plane message.
Definition control.c:403
The control structure.
Definition control.c:76
#define fr_time()
Definition event.c:60
unsigned int uint32_t
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
Create a message set.
Definition message.c:127
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:195
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:1000
int fr_message_set_messages_used(fr_message_set_t *ms)
Count the number of used messages.
Definition message.c:1224
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition message.c:1250
void fr_message_set_debug(FILE *fp, fr_message_set_t *ms)
Print debug information about the message set.
Definition message.c:1274
A Message set, composed of message headers and ring buffer data.
Definition message.c:94
fr_time_t when
when this message was sent
Definition message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
#define fr_assert(_expr)
Definition rad_assert.h:37
static fr_event_list_t * events
Definition radsniff.c:58
rlm_rcode_t rcode
Last rcode returned by a module.
Definition request.h:260
Optional arguments for initialising requests.
Definition request.h:287
static _Thread_local int worker_id
Internal ID of the current worker thread.
Definition schedule.c:104
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
static TALLOC_CTX * talloc_init_const(char const *name)
Allocate a top level chunk with a constant name.
Definition talloc.h:120
#define talloc_autofree_context
The original function is deprecated, so replace it with our version.
Definition talloc.h:48
int fr_time_start(void)
Initialize the local time.
Definition time.c:150
"server local" time.
Definition time.h:69
static fr_slen_t data
Definition value.h:1340