The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
channel.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: b7df5c377c5e3778bcc5c2b7a7185d59bf6bed06 $
19 *
20 * @brief Two-way thread-safe channels.
21 * @file io/channel.c
22 *
23 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24 */
25RCSID("$Id: b7df5c377c5e3778bcc5c2b7a7185d59bf6bed06 $")
26
27#include <freeradius-devel/io/channel.h>
28#include <freeradius-devel/io/control.h>
29#include <freeradius-devel/util/log.h>
30#include <freeradius-devel/util/debug.h>
31
32#ifdef HAVE_STDATOMIC_H
33# include <stdatomic.h>
34#else
35# include <freeradius-devel/util/stdatomic.h>
36#endif
37
38/*
39 * Debugging, mainly for channel_test
40 */
41#ifdef DEBUG_CHANNEL
42#define MPRINT(...) fprintf(stdout, __VA_ARGS__)
43#else
44#define MPRINT(...)
45#endif
46
47/*
48 * We disable this until we fix all of the signaling issues...
49 */
50#define ENABLE_SKIPS (0)
51
56
57#ifdef DEBUG_CHANNEL
58static fr_table_num_sorted_t const channel_direction[] = {
59 { L("to responder"), TO_RESPONDER },
60 { L("to requestor"), TO_REQUESTOR },
61};
62size_t channel_direction_len = NUM_ELEMENTS(channel_direction);
63#endif
64
65#if 0
66#define SIGNAL_INTERVAL (1000000) //!< The minimum interval between responder signals.
67#endif
68
69/** Size of the atomic queues
70 *
71 * The queue reader MUST service the queue occasionally,
72 * otherwise the writer will not be able to write. If it's too
73 * low, the writer will fail. If it's too high, it will
74 * unnecessarily use memory. So we're better off putting it on
75 * the high side.
76 *
77 * The reader SHOULD service the queues at inter-packet latency.
78 * i.e. at 1M pps, the queue will get serviced every microsecond.
79 */
80#define ATOMIC_QUEUE_SIZE (1024)
81
96
97typedef struct {
98 fr_channel_signal_t signal; //!< the signal to send
99 uint64_t ack; //!< or the endpoint..
100 fr_channel_t *ch; //!< the channel
102
103/** One end of a channel
104 *
105 * Consists of a kqueue descriptor, and an atomic queue.
106 * The atomic queue is there to get bulk data through, because it's more efficient
107 * than pushing 1M+ events per second through a kqueue.
108 */
109typedef struct {
110 fr_channel_direction_t direction; //!< Use for debug messages.
111
112 fr_control_t *control; //!< The control plane, consisting of an atomic queue and kqueue.
113
114 fr_ring_buffer_t *rb; //!< Ring buffer for control-plane messages.
115
116 void *uctx; //!< Worker context.
117
118 fr_channel_recv_callback_t recv; //!< callback for receiving messages
119 void *recv_uctx; //!< context for receiving messages
120
121 bool must_signal; //!< we need to signal the other end
122
123
124 uint64_t sequence; //!< Sequence number for this channel.
125 uint64_t ack; //!< Sequence number of the other end.
126 uint64_t their_view_of_my_sequence; //!< Should be clear.
127
128 uint64_t sequence_at_last_signal; //!< When we last signaled.
129
130 fr_atomic_queue_t *aq; //!< The queue of messages - visible only to this channel.
131
132 atomic_bool active; //!< Whether the channel is active.
133
134 fr_channel_stats_t stats; //!< channel statistics
136
138
139/** A full channel, which consists of two ends
140 *
141 * A channel consists of an I/O identifier that can be placed in kequeue
142 * and an atomic queue in each direction to allow for bidirectional communication.
143 */
145 fr_time_delta_t cpu_time; //!< Total time used by the responder for this channel.
146 fr_time_delta_t processing_time; //!< Time spent by the responder processing requests.
147
148 bool same_thread; //!< are both ends in the same thread?
149
150 fr_channel_end_t end[2]; //!< Two ends of the channel.
151};
152
154 { L("error"), FR_CHANNEL_ERROR },
155 { L("data-to-responder"), FR_CHANNEL_SIGNAL_DATA_TO_RESPONDER },
156 { L("data-to-requestor"), FR_CHANNEL_DATA_READY_REQUESTOR },
157 { L("open"), FR_CHANNEL_OPEN },
158 { L("close"), FR_CHANNEL_CLOSE },
159 { L("data-done-responder"), FR_CHANNEL_SIGNAL_DATA_DONE_RESPONDER },
160 { L("responder-sleeping"), FR_CHANNEL_SIGNAL_RESPONDER_SLEEPING },
161};
163
165 { L("high"), PRIORITY_HIGH },
166 { L("low"), PRIORITY_LOW },
167 { L("normal"), PRIORITY_NORMAL },
168 { L("now"), PRIORITY_NOW }
169};
171
172
173/** Create a new channel
174 *
175 * @param[in] ctx The talloc_ctx to allocate channel data in.
176 * @param[in] requestor control plane.
177 * @param[in] responder control plane.
178 * @param[in] same whether or not the channel is for the same thread
179 * @return
180 * - NULL on error
181 * - channel on success
182 */
183fr_channel_t *fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
184{
185 fr_time_t now;
186 fr_channel_t *ch;
187
188 ch = talloc_zero(ctx, fr_channel_t);
189 if (!ch) {
190 nomem:
191 fr_strerror_const("Failed allocating memory");
192 return NULL;
193 }
194
195 ch->same_thread = same;
196
199
201 if (!ch->end[TO_RESPONDER].aq) {
202 talloc_free(ch);
203 goto nomem;
204 }
205
207 if (!ch->end[TO_REQUESTOR].aq) {
208 talloc_free(ch);
209 goto nomem;
210 }
211
212 ch->end[TO_RESPONDER].control = responder;
213 ch->end[TO_REQUESTOR].control = requestor;
214
215 /*
216 * Create the ring buffer for the requestor to send
217 * control-plane messages to the responder, and vice-versa.
218 */
220 if (!ch->end[TO_RESPONDER].rb) {
221 rb_nomem:
222 fr_strerror_const_push("Failed allocating ring buffer");
223 talloc_free(ch);
224 return NULL;
225 }
226
228 if (!ch->end[TO_REQUESTOR].rb) {
229 talloc_free(ch);
230 goto rb_nomem;
231 }
232
233 /*
234 * Initialize all of the timers to now.
235 */
236 now = fr_time();
237
238 ch->end[TO_RESPONDER].stats.last_write = now;
241 atomic_store(&ch->end[TO_RESPONDER].active, true);
242
243 ch->end[TO_REQUESTOR].stats.last_write = now;
246 atomic_store(&ch->end[TO_REQUESTOR].active, true);
247
248 return ch;
249}
250
251
252/** Send a message via a kq user signal
253 *
254 * Note that the caller doesn't care about data in the event, that is
255 * sent via the atomic queue. The kevent code takes care of
256 * delivering the signal once, even if it's sent by multiple requestor
257 * threads.
258 *
259 * The thread watching the KQ knows which end it is. So when it gets
260 * the signal (and the channel pointer) it knows to look at end[0] or
261 * end[1]. We also send which end in 'which' (0, 1) to further help
262 * the recipient.
263 *
264 * @param[in] ch the channel.
265 * @param[in] when the data was ready. Typically taken from the message.
266 * @param[in] end of the channel that the message was written to.
267 * @param[in] which end of the channel (0/1).
268 * @return
269 * - <0 on error
270 * - 0 on success
271 */
273{
275
276 end->stats.last_sent_signal = when;
277 end->stats.signals++;
278 end->must_signal = false;
279
280 cc.signal = which;
281 cc.ack = end->ack;
282 cc.ch = ch;
283
284 MPRINT("Signalling %s, with %s\n",
285 fr_table_str_by_value(channel_direction, end->direction, "<INVALID>"),
286 fr_table_str_by_value(channel_signals, which, "<INVALID>"));
287
288 return fr_control_message_send(end->control, end->rb, FR_CONTROL_ID_CHANNEL, &cc, sizeof(cc));
289}
290
291#define IALPHA (8)
292#define RTT(_old, _new) fr_time_delta_wrap((fr_time_delta_unwrap(_new) + (fr_time_delta_unwrap(_old) * (IALPHA - 1))) / IALPHA)
293
294/** Send a request message into the channel
295 *
296 * The message should be initialized, other than "sequence" and "ack".
297 *
298 * This function automatically calls the recv_reply callback if there is a reply.
299 *
300 * @param[in] ch the channel to send the request on.
301 * @param[in] cd the message to send.
302 * @return
303 * - <0 on error
304 * - 0 on success
305 */
307{
308 uint64_t sequence;
309 fr_time_t when;
310 fr_time_delta_t message_interval;
311 fr_channel_end_t *requestor;
312
313 if (!fr_cond_assert_msg(atomic_load(&ch->end[TO_RESPONDER].active), "Channel not active")) return -1;
314
315 /*
316 * Same thread? Just call the "recv" function directly.
317 */
318 if (ch->same_thread) {
319 ch->end[TO_REQUESTOR].recv(ch->end[TO_REQUESTOR].recv_uctx, ch, cd);
320 return 0;
321 }
322
323 requestor = &(ch->end[TO_RESPONDER]);
324 when = cd->m.when;
325
326 sequence = requestor->sequence + 1;
327 cd->live.sequence = sequence;
328 cd->live.ack = requestor->ack;
329
330 /*
331 * Push the message onto the queue for the other end. If
332 * the push fails, the caller should try another queue.
333 */
334 if (!fr_atomic_queue_push(requestor->aq, cd)) {
335 fr_strerror_printf("Failed pushing to atomic queue - full. Queue contains %zu items",
336 fr_atomic_queue_size(requestor->aq));
337 while (fr_channel_recv_reply(ch));
338 return -1;
339 }
340
341 requestor->sequence = sequence;
342 message_interval = fr_time_sub(when, requestor->stats.last_write);
343
345 requestor->stats.message_interval = message_interval;
346 } else {
347 requestor->stats.message_interval = RTT(requestor->stats.message_interval, message_interval);
348 }
349
350 fr_assert_msg(fr_time_lteq(requestor->stats.last_write, when),
351 "Channel data timestamp (%" PRId64") older than last channel data sent (%" PRId64 ")",
352 fr_time_unwrap(when), fr_time_unwrap(requestor->stats.last_write));
353 requestor->stats.last_write = when;
354
355 requestor->stats.outstanding++;
356 requestor->stats.packets++;
357
358 MPRINT("REQUESTOR requests %"PRIu64", num_outstanding %"PRIu64"\n", requestor->stats.packets, requestor->stats.outstanding);
359
360#if ENABLE_SKIPS
361 /*
362 * We just sent the first packet. There can't possibly be a reply, so don't bother looking.
363 */
364 if (requestor->stats.outstanding == 1) {
365
366 /*
367 * There is at least one old packet which is
368 * outstanding, look for a reply.
369 */
370 } else if (requestor->stats.outstanding > 1) {
371 bool has_reply;
372
373 has_reply = fr_channel_recv_reply(ch);
374
375 if (has_reply) while (fr_channel_recv_reply(ch));
376
377 /*
378 * There's no reply yet, so we still have packets outstanding.
379 * Or, there is a reply, and there are more packets outstanding.
380 * Skip the signal.
381 */
382 if (!requestor->must_signal && (!has_reply || (has_reply && (requestor->stats.outstanding > 1)))) {
383 MPRINT("REQUESTOR SKIPS signal\n");
384 return 0;
385 }
386 }
387#endif
388
389 /*
390 * Tell the other end that there is new data ready.
391 *
392 * Ignore errors on signalling. The responder already has
393 * the packet in its inbound queue, so at some point, it
394 * will pick up the message.
395 */
396 MPRINT("REQUESTOR SIGNALS\n");
398 return 0;
399}
400
401/** Receive a reply message from the channel
402 *
403 * @param[in] ch the channel to read data from.
404 * @return
405 * - true if there was a message received
406 * - false if there are no more messages
407 */
409{
411 fr_channel_end_t *requestor;
413
414 fr_assert(ch->end[TO_RESPONDER].recv != NULL);
415
416 aq = ch->end[TO_REQUESTOR].aq;
417 requestor = &(ch->end[TO_RESPONDER]);
418
419 /*
420 * It's OK for the queue to be empty.
421 */
422 if (!fr_atomic_queue_pop(aq, (void **) &cd)) return false;
423
424 /*
425 * We want an exponential moving average for round trip
426 * time, where "alpha" is a number between [0,1)
427 *
428 * RTT_new = alpha * RTT_old + (1 - alpha) * RTT_sample
429 *
430 * BUT we use fixed-point arithmetic, so we need to use inverse alpha,
431 * which works out to the following equation:
432 *
433 * RTT_new = (RTT_sample + (ialpha - 1) * RTT_old) / ialpha
434 *
435 * NAKs have zero processing time, so we ignore them for
436 * the purpose of RTT.
437 */
438 if (fr_time_delta_ispos(cd->reply.processing_time)) {
439 ch->processing_time = RTT(ch->processing_time, cd->reply.processing_time);
440 }
441 ch->cpu_time = cd->reply.cpu_time;
442
443 /*
444 * Update the outbound channel with the knowledge that
445 * we've received one more reply, and with the responders
446 * ACK.
447 */
448 fr_assert(requestor->stats.outstanding > 0);
449 fr_assert(cd->live.sequence > requestor->ack);
450 fr_assert(cd->live.sequence <= requestor->sequence); /* must have fewer replies than requests */
451
452 requestor->stats.outstanding--;
453 requestor->ack = cd->live.sequence;
454 requestor->their_view_of_my_sequence = cd->live.ack;
455
457 requestor->stats.last_read_other = cd->m.when;
458
459 ch->end[TO_RESPONDER].recv(ch->end[TO_RESPONDER].recv_uctx, ch, cd);
460
461 return true;
462}
463
464
465/** Receive a request message from the channel
466 *
467 * @param[in] ch the channel
468 * @return
469 * - true if there was a message received
470 * - false if there are no more messages
471 */
473{
475 fr_channel_end_t *responder;
477
478 aq = ch->end[TO_RESPONDER].aq;
479 responder = &(ch->end[TO_REQUESTOR]);
480
481 /*
482 * It's OK for the queue to be empty.
483 */
484 if (!fr_atomic_queue_pop(aq, (void **) &cd)) return false;
485
486 fr_assert(cd->live.sequence > responder->ack);
487 fr_assert(cd->live.sequence >= responder->sequence); /* must have more requests than replies */
488
489 responder->stats.outstanding++;
490 responder->ack = cd->live.sequence;
491 responder->their_view_of_my_sequence = cd->live.ack;
492
494 responder->stats.last_read_other = cd->m.when;
495
496 ch->end[TO_REQUESTOR].recv(ch->end[TO_REQUESTOR].recv_uctx, ch, cd);
497
498 return true;
499}
500
501/** Send a reply message into the channel
502 *
503 * The message should be initialized, other than "sequence" and "ack".
504 *
505 * @param[in] ch the channel to send the reply on.
506 * @param[in] cd the message to send
507 * @return
508 * - <0 on error
509 * - 0 on success
510 */
512{
513 uint64_t sequence;
514 fr_time_t when;
515 fr_time_delta_t message_interval;
516 fr_channel_end_t *responder;
517
518 if (!fr_cond_assert_msg(atomic_load(&ch->end[TO_REQUESTOR].active), "Channel not active")) return -1;
519
520 /*
521 * Same thread? Just call the "recv" function directly.
522 */
523 if (ch->same_thread) {
524 ch->end[TO_RESPONDER].recv(ch->end[TO_RESPONDER].recv_uctx, ch, cd);
525 return 0;
526 }
527
528 responder = &(ch->end[TO_REQUESTOR]);
529
530 when = cd->m.when;
531
532 sequence = responder->sequence + 1;
533 cd->live.sequence = sequence;
534 cd->live.ack = responder->ack;
535
536 if (!fr_atomic_queue_push(responder->aq, cd)) {
537 fr_strerror_printf("Failed pushing to atomic queue - full. Queue contains %zu items",
538 fr_atomic_queue_size(responder->aq));
539 while (fr_channel_recv_request(ch));
540 return -1;
541 }
542
543 fr_assert(responder->stats.outstanding > 0);
544 responder->stats.outstanding--;
545 responder->stats.packets++;
546
547 MPRINT("\tRESPONDER replies %"PRIu64", num_outstanding %"PRIu64"\n", responder->stats.packets, responder->stats.outstanding);
548
549 responder->sequence = sequence;
550 message_interval = fr_time_sub(when, responder->stats.last_write);
551 responder->stats.message_interval = RTT(responder->stats.message_interval, message_interval);
552
553 fr_assert_msg(fr_time_lteq(responder->stats.last_write, when),
554 "Channel data timestamp (%" PRId64") older than last channel data sent (%" PRId64 ")",
555 fr_time_unwrap(when), fr_time_unwrap(responder->stats.last_write));
556 responder->stats.last_write = when;
557
558 /*
559 * Even if we think we have no more packets to process,
560 * the caller may have sent us one. Go check the input
561 * channel.
562 */
563 while (fr_channel_recv_request(ch));
564
565 /*
566 * No packets outstanding, we HAVE to signal the requestor
567 * thread.
568 */
569 if (responder->stats.outstanding == 0) {
571 return 0;
572 }
573
574 MPRINT("\twhen - last_read_other = %"PRIu64" - %"PRIu64" = %"PRIu64"\n", when, responder->stats.last_read_other, when - responder->stats.last_read_other);
575 MPRINT("\twhen - last signal = %"PRIu64" - %"PRIu64" = %"PRIu64"\n", when, responder->stats.last_sent_signal, when - responder->stats.last_sent_signal);
576 MPRINT("\tsequence - ack = %"PRIu64" - %"PRIu64" = %"PRIu64"\n", responder->sequence, responder->their_view_of_my_sequence, responder->sequence - responder->their_view_of_my_sequence);
577
578#ifdef __APPLE__
579 /*
580 * If we've sent them a signal since the last ACK, they
581 * will receive it, and process the packets. So we don't
582 * need to signal them again.
583 *
584 * But... this doesn't appear to work on the Linux
585 * libkqueue implementation.
586 */
587 if (responder->sequence_at_last_signal > responder->their_view_of_my_sequence) return 0;
588#endif
589
590 /*
591 * If we've received a new packet in the last while, OR
592 * we've sent a signal in the last while, then we don't
593 * need to send a new signal. But we DO send a signal if
594 * we haven't seen an ACK for a few packets.
595 *
596 * FIXME: make these limits configurable, or include
597 * predictions about packet processing time?
598 */
599 fr_assert(responder->their_view_of_my_sequence <= responder->sequence);
600#if 0
601 if (((responder->sequence - their_view_of_my_sequence) <= 1000) &&
602 ((when - responder->stats.last_read_other < SIGNAL_INTERVAL) ||
603 ((when - responder->stats.last_sent_signal) < SIGNAL_INTERVAL))) {
604 MPRINT("\tRESPONDER SKIPS signal\n");
605 return 0;
606 }
607#endif
608
609 MPRINT("\tRESPONDER SIGNALS num_outstanding %"PRIu64"\n", responder->stats.outstanding);
611 return 0;
612}
613
614
615/** Don't send a reply message into the channel
616 *
617 * The message should be the one we received from the network.
618 *
619 * @param[in] ch the channel on which we're dropping a packet
620 * @return
621 * - <0 on error
622 * - 0 on success
623 */
625{
626 fr_channel_end_t *responder;
627
628 responder = &(ch->end[TO_REQUESTOR]);
629
630 responder->sequence++;
631 return 0;
632}
633
634
635
636/** Signal a channel that the responder is sleeping
637 *
638 * This function should be called from the responders idle loop.
639 * i.e. only when it has nothing else to do.
640 *
641 * @param[in] ch the channel to signal we're no longer listening on.
642 * @return
643 * - <0 on error
644 * - 0 on success
645 */
647{
648 fr_channel_end_t *responder;
650
651 responder = &(ch->end[TO_REQUESTOR]);
652
653 /*
654 * We don't have any outstanding requests to process for
655 * this channel, don't signal the network thread that
656 * we're sleeping. It already knows.
657 */
658 if (responder->stats.outstanding == 0) return 0;
659
660 responder->stats.signals++;
661
663 cc.ack = responder->ack;
664 cc.ch = ch;
665
666 MPRINT("\tRESPONDER SLEEPING num_outstanding %"PRIu64", packets in %"PRIu64", packets out %"PRIu64"\n", responder->stats.outstanding,
667 ch->end[TO_RESPONDER].stats.packets, responder->stats.packets);
668 return fr_control_message_send(responder->control, responder->rb, FR_CONTROL_ID_CHANNEL, &cc, sizeof(cc));
669}
670
671
672/** Service a control-plane message
673 *
674 * @param[in] when The current time.
675 * @param[out] p_channel The channel which should be serviced.
676 * @param[in] data The control message.
677 * @param[in] data_size The size of the control message.
678 * @return
679 * - FR_CHANNEL_ERROR on error
680 * - FR_CHANNEL_NOOP, on do nothing
681 * - FR_CHANNEL_DATA_READY on data ready
682 * - FR_CHANNEL_OPEN when a channel has been opened and sent to us
683 * - FR_CHANNEL_CLOSE when a channel should be closed
684 */
685fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
686{
687 int rcode;
688#if ENABLE_SKIPS
689 uint64_t ack;
690#endif
694 fr_channel_end_t *requestor;
695 fr_channel_t *ch;
696
697 fr_assert(data_size == sizeof(cc));
698 memcpy(&cc, data, data_size);
699
700 cs = cc.signal;
701#if ENABLE_SKIPS
702 ack = cc.ack;
703#endif
704 *p_channel = ch = cc.ch;
705
706 switch (cs) {
707 /*
708 * These all have the same numbers as the channel
709 * events, and have no extra processing. We just
710 * return them as-is.
711 */
717 MPRINT("channel got %d\n", cs);
718 return (fr_channel_event_t) cs;
719
720 /*
721 * Only sent by the responder. Both of these
722 * situations are largely the same, except for
723 * return codes.
724 */
726 MPRINT("channel got data_done_responder\n");
728 ch->end[TO_RESPONDER].must_signal = true;
729 break;
730
732 MPRINT("channel got responder_sleeping\n");
733 ce = FR_CHANNEL_NOOP;
734 ch->end[TO_RESPONDER].must_signal = true;
735 break;
736 }
737
738 /*
739 * Compare their ACK to the last sequence we
740 * sent. If it's different, we signal the responder
741 * to wake up.
742 */
743 requestor = &ch->end[TO_RESPONDER];
744#if ENABLE_SKIPS
745 if (!requestor->must_signal && (ack == requestor->sequence)) {
746 MPRINT("REQUESTOR SKIPS signal AFTER CE %d num_outstanding %"PRIu64"\n", cs, requestor->stats.outstanding);
747 MPRINT("REQUESTOR has ack %"PRIu64", my seq %"PRIu64" my_view %"PRIu64"\n", ack, requestor->sequence, requestor->their_view_of_my_sequence);
748 return ce;
749 }
750
751 /*
752 * The responder is sleeping or done. There are more
753 * packets available, so we signal it to wake up again.
754 */
755 fr_assert(ack <= requestor->sequence);
756#endif
757
758 /*
759 * We're signaling it again...
760 */
761 requestor->stats.resignals++;
762
763 /*
764 * The responder hasn't seen our last few packets. Signal
765 * that there is data ready.
766 */
767 MPRINT("REQUESTOR SIGNALS AFTER CE %d\n", cs);
769 if (rcode < 0) return FR_CHANNEL_ERROR;
770
771 return ce;
772}
773
774
775/** Service a control-plane event.
776 *
777 * The channels use control planes for internal signaling. Note that
778 * the caller does NOT pass the channel into this function. Instead,
779 * the channel is taken from the kevent.
780 *
781 * @param[in] ch The channel to service.
782 * @param[in] c The control plane on which we received the kev.
783 * @param[in] kev The kevent data, should get passed to the control plane.
784 * @return
785 * - <0 on error
786 * - 0 on success
787 */
788int fr_channel_service_kevent(fr_channel_t *ch, fr_control_t *c, UNUSED struct kevent const *kev)
789{
790 (void) talloc_get_type_abort(ch, fr_channel_t);
791
792 if (c == ch->end[TO_RESPONDER].control) {
794 } else {
796 }
797
798 return 0;
799}
800
801
802/** Check if a channel is active.
803 *
804 * A channel may be closed by either end. If so, it stays alive (but
805 * inactive) until both ends acknowledge the close.
806 *
807 * @param[in] ch the channel
808 * @return
809 * - false the channel is closing.
810 * - true the channel is active
811 */
816
817/** Signal a responder that the channel is closing
818 *
819 * @param[in] ch The channel.
820 * @return
821 * - <0 on error
822 * - 0 on success
823 */
825{
826 int ret;
827 bool active;
829
830 active = atomic_load(&ch->end[TO_RESPONDER].active);
831 if (!active) return 0; /* Already signalled to close */
832
833 (void) talloc_get_type_abort(ch, fr_channel_t);
834
836 cc.ack = TO_RESPONDER;
837 cc.ch = ch;
838
840 ch->end[TO_RESPONDER].rb, FR_CONTROL_ID_CHANNEL, &cc, sizeof(cc));
841
842 atomic_store(&ch->end[TO_RESPONDER].active, false); /* Prevent further requests */
843
844 return ret;
845}
846
847/** Acknowledge that the channel is closing
848 *
849 * @param[in] ch The channel.
850 * @return
851 * - <0 on error
852 * - 0 on success
853 */
855{
856 int ret;
857 bool active;
858
860
861 active = atomic_load(&ch->end[TO_REQUESTOR].active);
862 if (!active) return 0; /* Already signalled to close */
863
864 (void) talloc_get_type_abort(ch, fr_channel_t);
865
866 atomic_store(&ch->end[TO_REQUESTOR].active, false); /* Prevent further responses */
867
869 cc.ack = TO_REQUESTOR;
870 cc.ch = ch;
871
873 ch->end[TO_REQUESTOR].rb, FR_CONTROL_ID_CHANNEL, &cc, sizeof(cc));
874
875 atomic_store(&ch->end[TO_REQUESTOR].active, false); /* Prevent further requests */
876
877 return ret;
878}
879
880/** Add responder-specific data to a channel
881 *
882 * @param[in] ch The channel.
883 * @param[in] uctx The context to add.
884 */
886{
887 (void) talloc_get_type_abort(ch, fr_channel_t);
888
889 ch->end[TO_REQUESTOR].uctx = uctx;
890}
891
892
893/** Get responder-specific data from a channel
894 *
895 * @param[in] ch The channel.
896 */
898{
899 (void) talloc_get_type_abort(ch, fr_channel_t);
900
901 return ch->end[TO_REQUESTOR].uctx;
902}
903
904
905/** Add network-specific data to a channel
906 *
907 * @param[in] ch The channel.
908 * @param[in] uctx The context to add.
909 */
911{
912 (void) talloc_get_type_abort(ch, fr_channel_t);
913
914 ch->end[TO_RESPONDER].uctx = uctx;
915}
916
917
918/** Get network-specific data from a channel
919 *
920 * @param[in] ch The channel.
921 */
923{
924 (void) talloc_get_type_abort(ch, fr_channel_t);
925
926 return ch->end[TO_RESPONDER].uctx;
927}
928
929
931{
932 ch->end[TO_RESPONDER].recv = recv_reply;
933 ch->end[TO_RESPONDER].recv_uctx = uctx;
934
935 return 0;
936}
937
939{
940 ch->end[TO_REQUESTOR].recv = recv_request;
941 ch->end[TO_REQUESTOR].recv_uctx = uctx;
942 return 0;
943}
944
945/** Send a channel to a responder
946 *
947 * @param[in] ch The channel.
948 * @return
949 * - <0 on error
950 * - 0 on success
951 */
953{
955
957 cc.ack = 0;
958 cc.ch = ch;
959
961}
962
963void fr_channel_stats_log(fr_channel_t const *ch, fr_log_t const *log, char const *file, int line)
964{
965 fr_log(log, L_INFO, file, line, "requestor\n");
966 fr_log(log, L_INFO, file, line, "\tsignals sent = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.signals);
967 fr_log(log, L_INFO, file, line, "\tsignals re-sent = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.resignals);
968 fr_log(log, L_INFO, file, line, "\tkevents checked = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.kevents);
969 fr_log(log, L_INFO, file, line, "\toutstanding = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.outstanding);
970 fr_log(log, L_INFO, file, line, "\tpackets processed = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.packets);
971 fr_log(log, L_INFO, file, line, "\tmessage interval (RTT) = %" PRIu64 "\n", fr_time_delta_unwrap(ch->end[TO_RESPONDER].stats.message_interval));
972 fr_log(log, L_INFO, file, line, "\tlast write = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_RESPONDER].stats.last_read_other));
973 fr_log(log, L_INFO, file, line, "\tlast read other end = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_RESPONDER].stats.last_read_other));
974 fr_log(log, L_INFO, file, line, "\tlast signal other = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_RESPONDER].stats.last_sent_signal));
975
976 fr_log(log, L_INFO, file, line, "responder\n");
977 fr_log(log, L_INFO, file, line, "\tsignals sent = %" PRIu64"\n", ch->end[TO_REQUESTOR].stats.signals);
978 fr_log(log, L_INFO, file, line, "\tkevents checked = %" PRIu64 "\n", ch->end[TO_REQUESTOR].stats.kevents);
979 fr_log(log, L_INFO, file, line, "\tpackets processed = %" PRIu64 "\n", ch->end[TO_REQUESTOR].stats.packets);
980 fr_log(log, L_INFO, file, line, "\tmessage interval (RTT) = %" PRIu64 "\n", fr_time_delta_unwrap(ch->end[TO_REQUESTOR].stats.message_interval));
981 fr_log(log, L_INFO, file, line, "\tlast write = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_REQUESTOR].stats.last_read_other));
982 fr_log(log, L_INFO, file, line, "\tlast read other end = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_REQUESTOR].stats.last_read_other));
983 fr_log(log, L_INFO, file, line, "\tlast signal other = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_REQUESTOR].stats.last_sent_signal));
984}
int const char * file
Definition acutest.h:702
int const char int line
Definition acutest.h:702
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
size_t fr_atomic_queue_size(fr_atomic_queue_t *aq)
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:483
#define L(_str)
Helper for initialising arrays of string literals.
Definition build.h:209
#define UNUSED
Definition build.h:315
#define NUM_ELEMENTS(_t)
Definition build.h:337
fr_atomic_queue_t * aq
The queue of messages - visible only to this channel.
Definition channel.c:130
atomic_bool active
Whether the channel is active.
Definition channel.c:132
#define MPRINT(...)
Definition channel.c:44
void * fr_channel_requestor_uctx_get(fr_channel_t *ch)
Get network-specific data from a channel.
Definition channel.c:922
fr_channel_signal_t
Definition channel.c:82
@ FR_CHANNEL_SIGNAL_DATA_DONE_RESPONDER
Definition channel.c:93
@ FR_CHANNEL_SIGNAL_DATA_TO_REQUESTOR
Definition channel.c:85
@ FR_CHANNEL_SIGNAL_DATA_TO_RESPONDER
Definition channel.c:84
@ FR_CHANNEL_SIGNAL_RESPONDER_SLEEPING
Definition channel.c:94
@ FR_CHANNEL_SIGNAL_ERROR
Definition channel.c:83
@ FR_CHANNEL_SIGNAL_OPEN
Definition channel.c:86
@ FR_CHANNEL_SIGNAL_CLOSE
Definition channel.c:87
uint64_t sequence_at_last_signal
When we last signaled.
Definition channel.c:128
uint64_t sequence
Sequence number for this channel.
Definition channel.c:124
bool must_signal
we need to signal the other end
Definition channel.c:121
fr_channel_signal_t signal
the signal to send
Definition channel.c:98
fr_table_num_sorted_t const channel_signals[]
Definition channel.c:153
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
Definition channel.c:408
fr_channel_direction_t direction
Use for debug messages.
Definition channel.c:110
size_t channel_signals_len
Definition channel.c:162
#define RTT(_old, _new)
Definition channel.c:292
void * uctx
Worker context.
Definition channel.c:116
size_t channel_packet_priority_len
Definition channel.c:170
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
Definition channel.c:824
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
Definition channel.c:306
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
Definition channel.c:183
fr_channel_direction_t
Definition channel.c:52
@ TO_RESPONDER
Definition channel.c:53
@ TO_REQUESTOR
Definition channel.c:54
fr_table_num_sorted_t const channel_packet_priority[]
Definition channel.c:164
static int fr_channel_data_ready(fr_channel_t *ch, fr_time_t when, fr_channel_end_t *end, fr_channel_signal_t which)
Send a message via a kq user signal.
Definition channel.c:272
int fr_channel_set_recv_reply(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_reply)
Definition channel.c:930
fr_ring_buffer_t * rb
Ring buffer for control-plane messages.
Definition channel.c:114
#define ATOMIC_QUEUE_SIZE
Size of the atomic queues.
Definition channel.c:80
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition channel.c:685
fr_channel_stats_t stats
channel statistics
Definition channel.c:134
fr_control_t * control
The control plane, consisting of an atomic queue and kqueue.
Definition channel.c:112
bool same_thread
are both ends in the same thread?
Definition channel.c:148
int fr_channel_set_recv_request(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_request)
Definition channel.c:938
uint64_t ack
or the endpoint..
Definition channel.c:99
void * fr_channel_responder_uctx_get(fr_channel_t *ch)
Get responder-specific data from a channel.
Definition channel.c:897
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
Definition channel.c:472
int fr_channel_null_reply(fr_channel_t *ch)
Don't send a reply message into the channel.
Definition channel.c:624
void fr_channel_requestor_uctx_add(fr_channel_t *ch, void *uctx)
Add network-specific data to a channel.
Definition channel.c:910
fr_time_delta_t cpu_time
Total time used by the responder for this channel.
Definition channel.c:145
int fr_channel_service_kevent(fr_channel_t *ch, fr_control_t *c, UNUSED struct kevent const *kev)
Service a control-plane event.
Definition channel.c:788
void fr_channel_responder_uctx_add(fr_channel_t *ch, void *uctx)
Add responder-specific data to a channel.
Definition channel.c:885
int fr_channel_responder_sleeping(fr_channel_t *ch)
Signal a channel that the responder is sleeping.
Definition channel.c:646
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
Definition channel.c:511
fr_channel_end_t end[2]
Two ends of the channel.
Definition channel.c:150
void fr_channel_stats_log(fr_channel_t const *ch, fr_log_t const *log, char const *file, int line)
Definition channel.c:963
fr_channel_t * ch
the channel
Definition channel.c:100
uint64_t their_view_of_my_sequence
Should be clear.
Definition channel.c:126
bool fr_channel_active(fr_channel_t *ch)
Check if a channel is active.
Definition channel.c:812
fr_time_delta_t processing_time
Time spent by the responder processing requests.
Definition channel.c:146
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
Definition channel.c:854
uint64_t ack
Sequence number of the other end.
Definition channel.c:125
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
Definition channel.c:952
fr_channel_recv_callback_t recv
callback for receiving messages
Definition channel.c:118
void * recv_uctx
context for receiving messages
Definition channel.c:119
One end of a channel.
Definition channel.c:109
A full channel, which consists of two ends.
Definition channel.c:144
fr_message_t m
the message header
Definition channel.h:105
fr_channel_event_t
Definition channel.h:67
@ FR_CHANNEL_NOOP
Definition channel.h:74
@ FR_CHANNEL_CLOSE
Definition channel.h:72
@ FR_CHANNEL_ERROR
Definition channel.h:68
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition channel.h:70
@ FR_CHANNEL_OPEN
Definition channel.h:71
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition channel.h:69
fr_time_delta_t message_interval
Interval between messages.
Definition channel.h:92
void(* fr_channel_recv_callback_t)(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Definition channel.h:169
#define PRIORITY_NORMAL
Definition channel.h:151
uint64_t packets
Number of actual data packets.
Definition channel.h:86
uint64_t resignals
Number of signals resent.
Definition channel.h:84
uint64_t outstanding
Number of outstanding requests with no reply.
Definition channel.h:82
fr_time_t last_sent_signal
The last time when we signaled the other end.
Definition channel.h:94
#define PRIORITY_NOW
Definition channel.h:149
fr_time_t last_read_other
Last time we successfully read a message from the other the channel.
Definition channel.h:91
#define PRIORITY_HIGH
Definition channel.h:150
fr_time_t last_write
Last write to the channel.
Definition channel.h:90
uint64_t kevents
Number of times we've looked at kevents.
Definition channel.h:88
uint64_t signals
Number of kevent signals we've sent.
Definition channel.h:83
#define PRIORITY_LOW
Definition channel.h:152
Channel information which is added to a message.
Definition channel.h:104
Statistics for the channel.
Definition channel.h:81
#define FR_CONTROL_ID_CHANNEL
Definition control.h:56
#define FR_CONTROL_MAX_SIZE
Definition control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition control.h:50
static fr_atomic_queue_t * aq
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:210
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:156
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:343
The control structure.
Definition control.c:79
talloc_free(reap)
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition log.c:583
@ L_INFO
Informational message.
Definition log.h:55
fr_time_t when
when this message was sent
Definition message.h:47
#define fr_assert(_expr)
Definition rad_assert.h:38
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:64
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
#define atomic_store(object, desired)
Definition stdatomic.h:345
#define atomic_load(object)
Definition stdatomic.h:343
Definition log.h:96
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
An element in a lexicographically sorted array of name to num mappings.
Definition table.h:49
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
Definition time.h:154
static int64_t fr_time_unwrap(fr_time_t time)
Definition time.h:146
#define fr_time_lteq(_a, _b)
Definition time.h:240
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1265