The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
channel_test.c
Go to the documentation of this file.
1/*
2 * channel_test.c Tests for channels
3 *
4 * Version: $Id: d322ff20be747104b209c6e9b6c68872e2f19fb7 $
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19 *
20 * @copyright 2016 Alan DeKok (aland@freeradius.org)
21 */
22
23RCSID("$Id: d322ff20be747104b209c6e9b6c68872e2f19fb7 $")
24
25#include <freeradius-devel/io/channel.h>
26#include <freeradius-devel/io/control.h>
27#include <freeradius-devel/util/debug.h>
28#include <freeradius-devel/util/syserror.h>
29#include <freeradius-devel/util/talloc.h>
30
31#ifdef HAVE_GETOPT_H
32# include <getopt.h>
33#endif
34
35#include <pthread.h>
36#include <sys/event.h>
37
38#define MAX_MESSAGES (2048)
39#define MAX_CONTROL_PLANE (1024)
40#define MAX_KEVENTS (10)
41
42#define MPRINT1 if (debug_lvl) printf
43#define MPRINT2 if (debug_lvl > 1) printf
44
45static int debug_lvl = 0;
46static int kq_master, kq_worker;
49static int max_messages = 10;
50static int max_control_plane = 0;
51static int max_outstanding = 1;
52static bool touch_memory = false;
53
54/**********************************************************************/
55typedef struct request_s request_t;
56
58{
59 return NULL;
60}
61
62void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t const *request)
63{
64}
65
66/**********************************************************************/
67
68static NEVER_RETURNS void usage(void)
69{
70 fprintf(stderr, "usage: channel_test [OPTS]\n");
71 fprintf(stderr, " -c <control-plane> Size of the control plane queue.\n");
72 fprintf(stderr, " -m <messages> Send number of messages.\n");
73 fprintf(stderr, " -o <outstanding> Keep number of messages outstanding.\n");
74 fprintf(stderr, " -t Touch memory for fake packets.\n");
75 fprintf(stderr, " -x Debugging mode.\n");
76
77 fr_exit_now(EXIT_FAILURE);
78}
79
80static void *channel_master(void *arg)
81{
82 bool running, signaled_close;
83 int rcode, i, num_events;
84 int num_outstanding, num_messages;
85 int num_replies;
87 TALLOC_CTX *ctx;
88 fr_channel_t *channel = arg;
89 fr_channel_t *new_channel;
91 struct kevent events[MAX_KEVENTS];
92
93 MEM(ctx = talloc_init_const("channel_master"));
94
96 if (!ms) {
97 fprintf(stderr, "Failed creating message set\n");
98 fr_exit_now(EXIT_FAILURE);
99 }
100
101 MPRINT1("Master started.\n");
102
103 /*
104 * Signal the worker that the channel is open
105 */
106 rcode = fr_channel_signal_open(channel);
107 if (rcode < 0) {
108 fprintf(stderr, "Failed signaling open: %s\n", fr_syserror(errno));
109 fr_exit_now(EXIT_FAILURE);
110 }
111
112 /*
113 * Bootstrap the queue with messages.
114 */
115 num_replies = num_outstanding = num_messages = 0;
116
117 running = true;
118 signaled_close = false;
119
120 while (running) {
121 fr_time_t now;
122 int num_to_send;
123 fr_channel_data_t *cd, *reply;
124
125#if 0
126 /*
127 * Drain the input queues before sleeping.
128 */
129 while ((reply = fr_channel_recv_reply(channel)) != NULL) {
130 num_replies++;
131 num_outstanding--;
132 MPRINT1("Master got reply %d, outstanding=%d, %d/%d sent.\n",
133 num_replies, num_outstanding, num_messages, max_messages);
134 fr_message_done(&reply->m);
135 }
136#endif
137
138 /*
139 * Ensure we have outstanding messages.
140 */
141 if (num_messages >= max_messages) {
142 MPRINT1("Master DONE sending\n");
143 goto check_close;
144 }
145
146 num_to_send = max_outstanding - num_outstanding;
147 if ((num_messages + num_to_send) > max_messages) {
148 num_to_send = max_messages - num_messages;
149 }
150 MPRINT1("Master sending %d messages\n", num_to_send);
151
152 for (i = 0; i < num_to_send; i++) {
153 cd = (fr_channel_data_t *) fr_message_alloc(ms, NULL, 100);
154 fr_assert(cd != NULL);
155
156 num_outstanding++;
157 num_messages++;
158
159 cd->m.when = fr_time();
160
161 if (touch_memory) {
162 size_t j, k;
163
164 for (j = k = 0; j < cd->m.data_size; j++) {
165 k += cd->m.data[j];
166 }
167
168 cd->m.data[4] = k;
169 }
170
171 memcpy(cd->m.data, &num_messages, sizeof(num_messages));
172
173 MPRINT1("Master sent message %d\n", num_messages);
174 rcode = fr_channel_send_request(channel, cd, &reply);
175 if (rcode < 0) {
176 fprintf(stderr, "Failed sending request: %s\n", fr_syserror(errno));
177 }
178 fr_assert(rcode == 0);
179 if (reply) {
180 num_replies++;
181 num_outstanding--;
182 MPRINT1("Master got reply %d, outstanding=%d, %d/%d sent.\n",
183 num_replies, num_outstanding, num_messages, max_messages);
184 fr_message_done(&reply->m);
185 }
186 }
187
188 /*
189 * Signal close only when done.
190 */
191check_close:
192
193 if (!signaled_close && (num_messages >= max_messages) && (num_outstanding == 0)) {
194 MPRINT1("Master signaling worker to exit.\n");
195 rcode = fr_channel_signal_responder_close(channel);
196 if (rcode < 0) {
197 fprintf(stderr, "Failed signaling close: %s\n", fr_syserror(errno));
198 fr_exit_now(EXIT_FAILURE);
199 }
200
201 signaled_close = true;
202 }
203
204 MPRINT1("Master waiting on events.\n");
205 fr_assert(num_messages <= max_messages);
206
207 num_events = kevent(kq_master, NULL, 0, events, MAX_KEVENTS, NULL);
208 MPRINT1("Master kevent returned %d\n", num_events);
209
210 if (num_events < 0) {
211 if (num_events == EINTR) continue;
212
213 fprintf(stderr, "Failed waiting for kevent: %s\n", fr_syserror(errno));
214 fr_exit_now(EXIT_FAILURE);
215 }
216
217 if (num_events == 0) continue;
218
219 /*
220 * Service the events.
221 */
222 for (i = 0; i < num_events; i++) {
223 (void) fr_channel_service_kevent(channel, control_master, &events[i]);
224 }
225
226 now = fr_time();
227
228 MPRINT1("Master servicing control-plane aq %p\n", aq_master);
229
230 while (true) {
231 uint32_t id;
232 size_t data_size;
233 char data[256];
234
235 data_size = fr_control_message_pop(aq_master, &id, data, sizeof(data));
236 if (!data_size) break;
237
239
240 ce = fr_channel_service_message(now, &new_channel, data, data_size);
241 MPRINT1("Master got channel event %d\n", ce);
242
243 switch (ce) {
245 MPRINT1("Master got data ready signal\n");
246 fr_assert(new_channel == channel);
247
248 reply = fr_channel_recv_reply(channel);
249 if (!reply) {
250 MPRINT1("Master SIGNAL WITH NO DATA!\n");
251 continue;
252 }
253
254 do {
255 num_replies++;
256 num_outstanding--;
257 MPRINT1("Master got reply %d, outstanding=%d, %d/%d sent.\n",
258 num_replies, num_outstanding, num_messages, max_messages);
259 fr_message_done(&reply->m);
260 } while ((reply = fr_channel_recv_reply(channel)) != NULL);
261 break;
262
263 case FR_CHANNEL_CLOSE:
264 MPRINT1("Master received close signal\n");
265 fr_assert(new_channel == channel);
266 fr_assert(signaled_close == true);
267 running = false;
268 break;
269
270 case FR_CHANNEL_NOOP:
271 MPRINT1("Master got NOOP\n");
272 break;
273
274 default:
275 fprintf(stderr, "Master got unexpected CE %d\n", ce);
276
277 /*
278 * Not written yet!
279 */
280 fr_assert(0 == 1);
281 break;
282 } /* switch over signal returned */
283 } /* drain the control plane */
284 } /* loop until told to exit */
285
286 MPRINT1("Master exiting.\n");
287
288 /*
289 * Force all messages to be garbage collected
290 */
291 MPRINT2("GC\n");
293
294 if (debug_lvl > 1) fr_message_set_debug(ms, stdout);
295
296 /*
297 * After the garbage collection, all messages marked "done" MUST also be marked "free".
298 */
300 MPRINT2("Master messages used = %d\n", rcode);
301 fr_assert(rcode == 0);
302
303 talloc_free(ctx);
304
305 return NULL;
306}
307
308static void *channel_worker(void *arg)
309{
310 bool running = true;
311 int rcode, num_events;
312 int worker_messages = 0;
314 TALLOC_CTX *ctx;
315 fr_channel_t *channel = arg;
317 struct kevent events[MAX_KEVENTS];
318
319 MEM(ctx = talloc_init_const("channel_worker"));
320
322 if (!ms) {
323 fprintf(stderr, "Failed creating message set\n");
324 fr_exit_now(EXIT_FAILURE);
325 }
326
327 MPRINT1("\tWorker started.\n");
328
329 while (running) {
330 int i;
331 fr_time_t now;
332 fr_channel_t *new_channel;
333
334 MPRINT1("\tWorker waiting on events.\n");
335
336 num_events = kevent(kq_worker, NULL, 0, events, MAX_KEVENTS, NULL);
337 MPRINT1("\tWorker kevent returned %d events\n", num_events);
338
339 if (num_events < 0) {
340 if (errno == EINTR) continue;
341
342 fprintf(stderr, "Failed waiting for kevent: %s\n", fr_syserror(errno));
343 fr_exit_now(EXIT_FAILURE);
344 }
345
346 if (num_events == 0) continue;
347
348 for (i = 0; i < num_events; i++) {
349 (void) fr_channel_service_kevent(channel, control_worker, &events[i]);
350 }
351
352 MPRINT1("\tWorker servicing control-plane aq %p\n", aq_worker);
353
354 now = fr_time();
355
356 while (true) {
357 uint32_t id;
358 size_t data_size;
359 char data[256];
360 fr_channel_data_t *cd, *reply;
361
362 data_size = fr_control_message_pop(aq_worker, &id, data, sizeof(data));
363 if (!data_size) break;
364
366
367 ce = fr_channel_service_message(now, &new_channel, data, data_size);
368 MPRINT1("\tWorker got channel event %d\n", ce);
369
370 switch (ce) {
371
372 case FR_CHANNEL_OPEN:
373 MPRINT1("\tWorker received a new channel\n");
374 fr_assert(new_channel == channel);
375 break;
376
377 case FR_CHANNEL_CLOSE:
378 MPRINT1("\tWorker requested to close the channel.\n");
379 fr_assert(new_channel == channel);
380 running = false;
381
382 /*
383 * Drain the input before we ACK the exit.
384 */
385 while ((cd = fr_channel_recv_request(channel)) != NULL) {
386 worker_messages++;
387 MPRINT1("\tWorker got message %d\n", worker_messages);
388 fr_message_done(&cd->m);
389 }
390
391 (void) fr_channel_responder_ack_close(channel);
392 break;
393
395 MPRINT1("\tWorker got data ready signal\n");
396 fr_assert(new_channel == channel);
397
398 cd = fr_channel_recv_request(channel);
399 if (!cd) {
400 MPRINT1("\tWorker SIGNAL WITH NO DATA!\n");
401 break;
402 }
403
404 while (cd) {
405 int message_id;
406
407 worker_messages++;
408
409 fr_assert(cd->m.data != NULL);
410 memcpy(&message_id, cd->m.data, sizeof(message_id));
411 MPRINT1("\tWorker got message %d (says %d)\n", worker_messages, message_id);
412
413 reply = (fr_channel_data_t *) fr_message_alloc(ms, NULL, 100);
414 fr_assert(reply != NULL);
415
416 reply->m.when = fr_time();
417 fr_message_done(&cd->m);
418
419 if (touch_memory) {
420 size_t j, k;
421
422 for (j = k = 0; j < reply->m.data_size; j++) {
423 k += reply->m.data[j];
424 }
425
426 reply->m.data[4] = k;
427 }
428
429
430 MPRINT1("\tWorker sending reply to messages %d\n", worker_messages);
431 rcode = fr_channel_send_reply(channel, reply, &cd);
432 if (rcode < 0) {
433 fprintf(stderr, "Failed sending reply: %s\n", fr_syserror(errno));
434 }
435 fr_assert(rcode == 0);
436 }
437 break;
438
439 case FR_CHANNEL_NOOP:
440 MPRINT1("\tWorker got NOOP\n");
441 fr_assert(new_channel == channel);
442 break;
443
444 default:
445 fprintf(stderr, "\tWorker got unexpected CE %d\n", ce);
446
447 /*
448 * Not written yet!
449 */
450 fr_assert(0 == 1);
451 break;
452 } /* switch over signals */
453
454 /*
455 * Get a new idea of "now".
456 */
457 now = fr_time();
458 } /* drain the control plane */
459 }
460
461 MPRINT1("\tWorker exiting.\n");
462
463 /*
464 * Force all messages to be garbage collected
465 */
466 MPRINT2("Worker GC\n");
468
469 if (debug_lvl > 1) fr_message_set_debug(ms, stdout);
470
471 /*
472 * After the garbage collection, all messages marked "done" MUST also be marked "free".
473 */
475 fr_cond_assert(rcode == 0);
476
477 talloc_free(ctx);
478
479 return NULL;
480}
481
482
483
484int main(int argc, char *argv[])
485{
486 int c;
487 fr_channel_t *channel;
488 TALLOC_CTX *autofree = talloc_autofree_context();
489 pthread_attr_t attr;
490 pthread_t master_id, worker_id;
491
493
494 while ((c = getopt(argc, argv, "c:hm:o:tx")) != -1) switch (c) {
495 case 'x':
496 debug_lvl++;
497 break;
498
499 case 'c':
500 max_control_plane = atoi(optarg);
501 break;
502
503 case 'm':
504 max_messages = atoi(optarg);
505 break;
506
507 case 'o':
508 max_outstanding = atoi(optarg);
509 break;
510
511 case 't':
512 touch_memory = true;
513 break;
514
515 case 'h':
516 default:
517 usage();
518 }
519
521
522 if (!max_control_plane) {
525 }
526
527#if 0
528 argc -= (optind - 1);
529 argv += (optind - 1);
530#endif
531
532 kq_master = kqueue();
533 fr_assert(kq_master >= 0);
534
535 kq_worker = kqueue();
536 fr_assert(kq_worker >= 0);
537
539 fr_assert(aq_master != NULL);
540
542 fr_assert(aq_worker != NULL);
543
545 fr_assert(control_master != NULL);
546
548 fr_assert(control_worker != NULL);
549
551 if (!channel) {
552 fprintf(stderr, "channel_test: Failed to create channel\n");
553 fr_exit_now(EXIT_FAILURE);
554 }
555
556 /*
557 * Start the two threads, with the channel.
558 */
559 (void) pthread_attr_init(&attr);
560 (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
561
562 (void) pthread_create(&master_id, &attr, channel_master, channel);
563 (void) pthread_create(&worker_id, &attr, channel_worker, channel);
564
565 (void) pthread_join(master_id, NULL);
566 (void) pthread_join(worker_id, NULL);
567
570
571 fr_channel_debug(channel, stdout);
572
573 fr_exit_now(EXIT_SUCCESS);
574}
int const char * file
Definition acutest.h:702
va_list args
Definition acutest.h:770
int const char int line
Definition acutest.h:702
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:483
#define NEVER_RETURNS
Should be placed before the function return type.
Definition build.h:313
#define UNUSED
Definition build.h:315
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
Definition channel.c:408
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
Definition channel.c:824
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
Definition channel.c:306
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
Definition channel.c:183
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition channel.c:685
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
Definition channel.c:472
int fr_channel_service_kevent(fr_channel_t *ch, fr_control_t *c, UNUSED struct kevent const *kev)
Service a control-plane event.
Definition channel.c:788
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
Definition channel.c:511
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
Definition channel.c:854
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
Definition channel.c:952
A full channel, which consists of two ends.
Definition channel.c:144
fr_message_t m
the message header
Definition channel.h:105
fr_channel_event_t
Definition channel.h:67
@ FR_CHANNEL_NOOP
Definition channel.h:74
@ FR_CHANNEL_CLOSE
Definition channel.h:72
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition channel.h:70
@ FR_CHANNEL_OPEN
Definition channel.h:71
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition channel.h:69
Channel information which is added to a message.
Definition channel.h:104
static int max_control_plane
static void * channel_master(void *arg)
int main(int argc, char *argv[])
#define MAX_KEVENTS
#define MAX_MESSAGES
static void * channel_worker(void *arg)
static bool touch_memory
static fr_atomic_queue_t * aq_master
#define MAX_CONTROL_PLANE
request_t * request_alloc(UNUSED TALLOC_CTX *ctx, UNUSED request_init_args_t const *args)
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t const *request)
#define MPRINT2
static int max_outstanding
#define MPRINT1
static fr_control_t * control_worker
static fr_control_t * control_master
static int kq_master
static int kq_worker
static int max_messages
static NEVER_RETURNS void usage(void)
static fr_atomic_queue_t * aq_worker
static int debug_lvl
#define FR_CONTROL_ID_CHANNEL
Definition control.h:56
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:139
#define MEM(x)
Definition debug.h:36
#define fr_exit_now(_x)
Exit without calling atexit() handlers, producing a log message in debug builds.
Definition debug.h:234
ssize_t fr_control_message_pop(fr_atomic_queue_t *aq, uint32_t *p_id, void *data, size_t data_size)
Pop control-plane message.
Definition control.c:377
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition control.c:149
The control structure.
Definition control.c:79
talloc_free(reap)
unsigned int uint32_t
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:190
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:988
int fr_message_set_messages_used(fr_message_set_t *ms)
Count the number of used messages.
Definition message.c:1212
void fr_message_set_debug(fr_message_set_t *ms, FILE *fp)
Print debug information about the message set.
Definition message.c:1262
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition message.c:1238
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
Definition message.c:127
A Message set, composed of message headers and ring buffer data.
Definition message.c:95
fr_time_t when
when this message was sent
Definition message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
#define fr_assert(_expr)
Definition rad_assert.h:38
static TALLOC_CTX * autofree
static fr_event_list_t * events
Definition radsniff.c:59
rlm_rcode_t rcode
Last rcode returned by a module.
Definition request.h:233
Optional arguments for initialising requests.
Definition request.h:254
static _Thread_local int worker_id
Internal ID of the current worker thread.
Definition schedule.c:151
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
static TALLOC_CTX * talloc_init_const(char const *name)
Allocate a top level chunk with a constant name.
Definition talloc.h:112
#define talloc_autofree_context
The original function is deprecated, so replace it with our version.
Definition talloc.h:51
int fr_time_start(void)
Initialize the local time.
Definition time.c:150
"server local" time.
Definition time.h:69
close(uq->fd)
static fr_slen_t data
Definition value.h:1265