The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
channel.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: d6f3e31b83f774bebcfa7d3d2cb46109cc6bb828 $
19 *
20 * @brief Two-way thread-safe channels.
21 * @file io/channel.c
22 *
23 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24 */
25RCSID("$Id: d6f3e31b83f774bebcfa7d3d2cb46109cc6bb828 $")
26
27#include <freeradius-devel/io/channel.h>
28#include <freeradius-devel/util/debug.h>
29
30#ifdef HAVE_STDATOMIC_H
31# include <stdatomic.h>
32#else
33# include <freeradius-devel/util/stdatomic.h>
34#endif
35
36/*
37 * Debugging, mainly for channel_test
38 */
39#ifdef DEBUG_CHANNEL
40#define MPRINT(...) fprintf(stdout, __VA_ARGS__)
41#else
42#define MPRINT(...)
43#endif
44
45/*
46 * We disable this until we fix all of the signaling issues...
47 */
48#define ENABLE_SKIPS (0)
49
54
55#ifdef DEBUG_CHANNEL
56static fr_table_num_sorted_t const channel_direction[] = {
57 { L("to responder"), TO_RESPONDER },
58 { L("to requestor"), TO_REQUESTOR },
59};
60size_t channel_direction_len = NUM_ELEMENTS(channel_direction);
61#endif
62
63#if 0
64#define SIGNAL_INTERVAL (1000000) //!< The minimum interval between responder signals.
65#endif
66
67/** Size of the atomic queues
68 *
69 * The queue reader MUST service the queue occasionally,
70 * otherwise the writer will not be able to write. If it's too
71 * low, the writer will fail. If it's too high, it will
72 * unnecessarily use memory. So we're better off putting it on
73 * the high side.
74 *
75 * The reader SHOULD service the queues at inter-packet latency.
76 * i.e. at 1M pps, the queue will get serviced every microsecond.
77 */
78#define ATOMIC_QUEUE_SIZE (1024)
79
94
95typedef struct {
96 fr_channel_signal_t signal; //!< the signal to send
97 uint64_t ack; //!< or the endpoint..
98 fr_channel_t *ch; //!< the channel
100
101/** One end of a channel
102 *
103 * Consists of a kqueue descriptor, and an atomic queue.
104 * The atomic queue is there to get bulk data through, because it's more efficient
105 * than pushing 1M+ events per second through a kqueue.
106 */
107typedef struct {
108 fr_channel_direction_t direction; //!< Use for debug messages.
109
110 fr_control_t *control; //!< The control plane, consisting of an atomic queue and kqueue.
111
112 fr_ring_buffer_t *rb; //!< Ring buffer for control-plane messages.
113
114 void *uctx; //!< Worker context.
115
116 fr_channel_recv_callback_t recv; //!< callback for receiving messages
117 void *recv_uctx; //!< context for receiving messages
118
119 bool must_signal; //!< we need to signal the other end
120
121
122 uint64_t sequence; //!< Sequence number for this channel.
123 uint64_t ack; //!< Sequence number of the other end.
124 uint64_t their_view_of_my_sequence; //!< Should be clear.
125
126 uint64_t sequence_at_last_signal; //!< When we last signaled.
127
128 fr_atomic_queue_t *aq; //!< The queue of messages - visible only to this channel.
129
130 atomic_bool active; //!< Whether the channel is active.
131
132 fr_channel_stats_t stats; //!< channel statistics
134
136
137/** A full channel, which consists of two ends
138 *
139 * A channel consists of an I/O identifier that can be placed in kequeue
140 * and an atomic queue in each direction to allow for bidirectional communication.
141 */
143 fr_time_delta_t cpu_time; //!< Total time used by the responder for this channel.
144 fr_time_delta_t processing_time; //!< Time spent by the responder processing requests.
145
146 bool same_thread; //!< are both ends in the same thread?
147
148 fr_channel_end_t end[2]; //!< Two ends of the channel.
149};
150
152 { L("error"), FR_CHANNEL_ERROR },
153 { L("data-to-responder"), FR_CHANNEL_SIGNAL_DATA_TO_RESPONDER },
154 { L("data-to-requestor"), FR_CHANNEL_DATA_READY_REQUESTOR },
155 { L("open"), FR_CHANNEL_OPEN },
156 { L("close"), FR_CHANNEL_CLOSE },
157 { L("data-done-responder"), FR_CHANNEL_SIGNAL_DATA_DONE_RESPONDER },
158 { L("responder-sleeping"), FR_CHANNEL_SIGNAL_RESPONDER_SLEEPING },
159};
161
163 { L("high"), PRIORITY_HIGH },
164 { L("low"), PRIORITY_LOW },
165 { L("normal"), PRIORITY_NORMAL },
166 { L("now"), PRIORITY_NOW }
167};
169
170
171/** Create a new channel
172 *
173 * @param[in] ctx The talloc_ctx to allocate channel data in.
174 * @param[in] requestor control plane.
175 * @param[in] responder control plane.
176 * @param[in] same whether or not the channel is for the same thread
177 * @return
178 * - NULL on error
179 * - channel on success
180 */
181fr_channel_t *fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
182{
183 fr_time_t now;
184 fr_channel_t *ch;
185
186 ch = talloc_zero(ctx, fr_channel_t);
187 if (!ch) {
188 nomem:
189 fr_strerror_const("Failed allocating memory");
190 return NULL;
191 }
192
193 ch->same_thread = same;
194
197
199 if (!ch->end[TO_RESPONDER].aq) {
200 talloc_free(ch);
201 goto nomem;
202 }
203
205 if (!ch->end[TO_REQUESTOR].aq) {
206 talloc_free(ch);
207 goto nomem;
208 }
209
210 ch->end[TO_RESPONDER].control = responder;
211 ch->end[TO_REQUESTOR].control = requestor;
212
213 /*
214 * Create the ring buffer for the requestor to send
215 * control-plane messages to the responder, and vice-versa.
216 */
218 if (!ch->end[TO_RESPONDER].rb) {
219 rb_nomem:
220 fr_strerror_const_push("Failed allocating ring buffer");
221 talloc_free(ch);
222 return NULL;
223 }
224
226 if (!ch->end[TO_REQUESTOR].rb) {
227 talloc_free(ch);
228 goto rb_nomem;
229 }
230
231 /*
232 * Initialize all of the timers to now.
233 */
234 now = fr_time();
235
236 ch->end[TO_RESPONDER].stats.last_write = now;
239 atomic_store(&ch->end[TO_RESPONDER].active, true);
240
241 ch->end[TO_REQUESTOR].stats.last_write = now;
244 atomic_store(&ch->end[TO_REQUESTOR].active, true);
245
246 return ch;
247}
248
249
250/** Send a message via a kq user signal
251 *
252 * Note that the caller doesn't care about data in the event, that is
253 * sent via the atomic queue. The kevent code takes care of
254 * delivering the signal once, even if it's sent by multiple requestor
255 * threads.
256 *
257 * The thread watching the KQ knows which end it is. So when it gets
258 * the signal (and the channel pointer) it knows to look at end[0] or
259 * end[1]. We also send which end in 'which' (0, 1) to further help
260 * the recipient.
261 *
262 * @param[in] ch the channel.
263 * @param[in] when the data was ready. Typically taken from the message.
264 * @param[in] end of the channel that the message was written to.
265 * @param[in] which end of the channel (0/1).
266 * @return
267 * - <0 on error
268 * - 0 on success
269 */
271{
273
274 end->stats.last_sent_signal = when;
275 end->stats.signals++;
276 end->must_signal = false;
277
278 cc.signal = which;
279 cc.ack = end->ack;
280 cc.ch = ch;
281
282 MPRINT("Signalling %s, with %s\n",
283 fr_table_str_by_value(channel_direction, end->direction, "<INVALID>"),
284 fr_table_str_by_value(channel_signals, which, "<INVALID>"));
285
286 return fr_control_message_send(end->control, end->rb, FR_CONTROL_ID_CHANNEL, &cc, sizeof(cc));
287}
288
289#define IALPHA (8)
290#define RTT(_old, _new) fr_time_delta_wrap((fr_time_delta_unwrap(_new) + (fr_time_delta_unwrap(_old) * (IALPHA - 1))) / IALPHA)
291
292/** Send a request message into the channel
293 *
294 * The message should be initialized, other than "sequence" and "ack".
295 *
296 * This function automatically calls the recv_reply callback if there is a reply.
297 *
298 * @param[in] ch the channel to send the request on.
299 * @param[in] cd the message to send.
300 * @return
301 * - <0 on error
302 * - 0 on success
303 */
305{
306 uint64_t sequence;
307 fr_time_t when;
308 fr_time_delta_t message_interval;
309 fr_channel_end_t *requestor;
310
311 if (!fr_cond_assert_msg(atomic_load(&ch->end[TO_RESPONDER].active), "Channel not active")) return -1;
312
313 /*
314 * Same thread? Just call the "recv" function directly.
315 */
316 if (ch->same_thread) {
317 ch->end[TO_REQUESTOR].recv(ch->end[TO_REQUESTOR].recv_uctx, ch, cd);
318 return 0;
319 }
320
321 requestor = &(ch->end[TO_RESPONDER]);
322 when = cd->m.when;
323
324 sequence = requestor->sequence + 1;
325 cd->live.sequence = sequence;
326 cd->live.ack = requestor->ack;
327
328 /*
329 * Push the message onto the queue for the other end. If
330 * the push fails, the caller should try another queue.
331 */
332 if (!fr_atomic_queue_push(requestor->aq, cd)) {
333 fr_strerror_printf("Failed pushing to atomic queue - full. Queue contains %zu items",
334 fr_atomic_queue_size(requestor->aq));
335 while (fr_channel_recv_reply(ch));
336 return -1;
337 }
338
339 requestor->sequence = sequence;
340 message_interval = fr_time_sub(when, requestor->stats.last_write);
341
342 if (!fr_time_delta_ispos(requestor->stats.message_interval)) {
343 requestor->stats.message_interval = message_interval;
344 } else {
345 requestor->stats.message_interval = RTT(requestor->stats.message_interval, message_interval);
346 }
347
348 fr_assert_msg(fr_time_lteq(requestor->stats.last_write, when),
349 "Channel data timestamp (%" PRId64") older than last channel data sent (%" PRId64 ")",
350 fr_time_unwrap(when), fr_time_unwrap(requestor->stats.last_write));
351 requestor->stats.last_write = when;
352
353 requestor->stats.outstanding++;
354 requestor->stats.packets++;
355
356 MPRINT("REQUESTOR requests %"PRIu64", num_outstanding %"PRIu64"\n", requestor->stats.packets, requestor->stats.outstanding);
357
358#if ENABLE_SKIPS
359 /*
360 * We just sent the first packet. There can't possibly be a reply, so don't bother looking.
361 */
362 if (requestor->stats.outstanding == 1) {
363
364 /*
365 * There is at least one old packet which is
366 * outstanding, look for a reply.
367 */
368 } else if (requestor->stats.outstanding > 1) {
369 bool has_reply;
370
371 has_reply = fr_channel_recv_reply(ch);
372
373 if (has_reply) while (fr_channel_recv_reply(ch));
374
375 /*
376 * There's no reply yet, so we still have packets outstanding.
377 * Or, there is a reply, and there are more packets outstanding.
378 * Skip the signal.
379 */
380 if (!requestor->must_signal && (!has_reply || (has_reply && (requestor->stats.outstanding > 1)))) {
381 MPRINT("REQUESTOR SKIPS signal\n");
382 return 0;
383 }
384 }
385#endif
386
387 /*
388 * Tell the other end that there is new data ready.
389 *
390 * Ignore errors on signalling. The responder already has
391 * the packet in its inbound queue, so at some point, it
392 * will pick up the message.
393 */
394 MPRINT("REQUESTOR SIGNALS\n");
396 return 0;
397}
398
399/** Receive a reply message from the channel
400 *
401 * @param[in] ch the channel to read data from.
402 * @return
403 * - true if there was a message received
404 * - false if there are no more messages
405 */
407{
409 fr_channel_end_t *requestor;
411
412 fr_assert(ch->end[TO_RESPONDER].recv != NULL);
413
414 aq = ch->end[TO_REQUESTOR].aq;
415 requestor = &(ch->end[TO_RESPONDER]);
416
417 /*
418 * It's OK for the queue to be empty.
419 */
420 if (!fr_atomic_queue_pop(aq, (void **) &cd)) return false;
421
422 /*
423 * We want an exponential moving average for round trip
424 * time, where "alpha" is a number between [0,1)
425 *
426 * RTT_new = alpha * RTT_old + (1 - alpha) * RTT_sample
427 *
428 * BUT we use fixed-point arithmetic, so we need to use inverse alpha,
429 * which works out to the following equation:
430 *
431 * RTT_new = (RTT_sample + (ialpha - 1) * RTT_old) / ialpha
432 *
433 * NAKs have zero processing time, so we ignore them for
434 * the purpose of RTT.
435 */
436 if (fr_time_delta_ispos(cd->reply.processing_time)) {
437 ch->processing_time = RTT(ch->processing_time, cd->reply.processing_time);
438 }
439 ch->cpu_time = cd->reply.cpu_time;
440
441 /*
442 * Update the outbound channel with the knowledge that
443 * we've received one more reply, and with the responders
444 * ACK.
445 */
446 fr_assert(requestor->stats.outstanding > 0);
447 fr_assert(cd->live.sequence > requestor->ack);
448 fr_assert(cd->live.sequence <= requestor->sequence); /* must have fewer replies than requests */
449
450 requestor->stats.outstanding--;
451 requestor->ack = cd->live.sequence;
452 requestor->their_view_of_my_sequence = cd->live.ack;
453
455 requestor->stats.last_read_other = cd->m.when;
456
457 ch->end[TO_RESPONDER].recv(ch->end[TO_RESPONDER].recv_uctx, ch, cd);
458
459 return true;
460}
461
462
463/** Receive a request message from the channel
464 *
465 * @param[in] ch the channel
466 * @return
467 * - true if there was a message received
468 * - false if there are no more messages
469 */
471{
473 fr_channel_end_t *responder;
475
476 aq = ch->end[TO_RESPONDER].aq;
477 responder = &(ch->end[TO_REQUESTOR]);
478
479 /*
480 * It's OK for the queue to be empty.
481 */
482 if (!fr_atomic_queue_pop(aq, (void **) &cd)) return false;
483
484 fr_assert(cd->live.sequence > responder->ack);
485 fr_assert(cd->live.sequence >= responder->sequence); /* must have more requests than replies */
486
487 responder->stats.outstanding++;
488 responder->ack = cd->live.sequence;
489 responder->their_view_of_my_sequence = cd->live.ack;
490
492 responder->stats.last_read_other = cd->m.when;
493
494 ch->end[TO_REQUESTOR].recv(ch->end[TO_REQUESTOR].recv_uctx, ch, cd);
495
496 return true;
497}
498
499/** Send a reply message into the channel
500 *
501 * The message should be initialized, other than "sequence" and "ack".
502 *
503 * @param[in] ch the channel to send the reply on.
504 * @param[in] cd the message to send
505 * @return
506 * - <0 on error
507 * - 0 on success
508 */
510{
511 uint64_t sequence;
512 fr_time_t when;
513 fr_time_delta_t message_interval;
514 fr_channel_end_t *responder;
515
516 if (!fr_cond_assert_msg(atomic_load(&ch->end[TO_REQUESTOR].active), "Channel not active")) return -1;
517
518 /*
519 * Same thread? Just call the "recv" function directly.
520 */
521 if (ch->same_thread) {
522 ch->end[TO_RESPONDER].recv(ch->end[TO_RESPONDER].recv_uctx, ch, cd);
523 return 0;
524 }
525
526 responder = &(ch->end[TO_REQUESTOR]);
527
528 when = cd->m.when;
529
530 sequence = responder->sequence + 1;
531 cd->live.sequence = sequence;
532 cd->live.ack = responder->ack;
533
534 if (!fr_atomic_queue_push(responder->aq, cd)) {
535 fr_strerror_printf("Failed pushing to atomic queue - full. Queue contains %zu items",
536 fr_atomic_queue_size(responder->aq));
537 while (fr_channel_recv_request(ch));
538 return -1;
539 }
540
541 fr_assert(responder->stats.outstanding > 0);
542 responder->stats.outstanding--;
543 responder->stats.packets++;
544
545 MPRINT("\tRESPONDER replies %"PRIu64", num_outstanding %"PRIu64"\n", responder->stats.packets, responder->stats.outstanding);
546
547 responder->sequence = sequence;
548 message_interval = fr_time_sub(when, responder->stats.last_write);
549 responder->stats.message_interval = RTT(responder->stats.message_interval, message_interval);
550
551 fr_assert_msg(fr_time_lteq(responder->stats.last_write, when),
552 "Channel data timestamp (%" PRId64") older than last channel data sent (%" PRId64 ")",
553 fr_time_unwrap(when), fr_time_unwrap(responder->stats.last_write));
554 responder->stats.last_write = when;
555
556 /*
557 * Even if we think we have no more packets to process,
558 * the caller may have sent us one. Go check the input
559 * channel.
560 */
561 while (fr_channel_recv_request(ch));
562
563 /*
564 * No packets outstanding, we HAVE to signal the requestor
565 * thread.
566 */
567 if (responder->stats.outstanding == 0) {
569 return 0;
570 }
571
572 MPRINT("\twhen - last_read_other = %"PRIu64" - %"PRIu64" = %"PRIu64"\n", when, responder->stats.last_read_other, when - responder->stats.last_read_other);
573 MPRINT("\twhen - last signal = %"PRIu64" - %"PRIu64" = %"PRIu64"\n", when, responder->stats.last_sent_signal, when - responder->stats.last_sent_signal);
574 MPRINT("\tsequence - ack = %"PRIu64" - %"PRIu64" = %"PRIu64"\n", responder->sequence, responder->their_view_of_my_sequence, responder->sequence - responder->their_view_of_my_sequence);
575
576#ifdef __APPLE__
577 /*
578 * If we've sent them a signal since the last ACK, they
579 * will receive it, and process the packets. So we don't
580 * need to signal them again.
581 *
582 * But... this doesn't appear to work on the Linux
583 * libkqueue implementation.
584 */
585 if (responder->sequence_at_last_signal > responder->their_view_of_my_sequence) return 0;
586#endif
587
588 /*
589 * If we've received a new packet in the last while, OR
590 * we've sent a signal in the last while, then we don't
591 * need to send a new signal. But we DO send a signal if
592 * we haven't seen an ACK for a few packets.
593 *
594 * FIXME: make these limits configurable, or include
595 * predictions about packet processing time?
596 */
597 fr_assert(responder->their_view_of_my_sequence <= responder->sequence);
598#if 0
599 if (((responder->sequence - their_view_of_my_sequence) <= 1000) &&
600 ((when - responder->stats.last_read_other < SIGNAL_INTERVAL) ||
601 ((when - responder->stats.last_sent_signal) < SIGNAL_INTERVAL))) {
602 MPRINT("\tRESPONDER SKIPS signal\n");
603 return 0;
604 }
605#endif
606
607 MPRINT("\tRESPONDER SIGNALS num_outstanding %"PRIu64"\n", responder->stats.outstanding);
609 return 0;
610}
611
612
613/** Don't send a reply message into the channel
614 *
615 * The message should be the one we received from the network.
616 *
617 * @param[in] ch the channel on which we're dropping a packet
618 * @return
619 * - <0 on error
620 * - 0 on success
621 */
623{
624 fr_channel_end_t *responder;
625
626 responder = &(ch->end[TO_REQUESTOR]);
627
628 responder->sequence++;
629 return 0;
630}
631
632
633
634/** Signal a channel that the responder is sleeping
635 *
636 * This function should be called from the responders idle loop.
637 * i.e. only when it has nothing else to do.
638 *
639 * @param[in] ch the channel to signal we're no longer listening on.
640 * @return
641 * - <0 on error
642 * - 0 on success
643 */
645{
646 fr_channel_end_t *responder;
648
649 responder = &(ch->end[TO_REQUESTOR]);
650
651 /*
652 * We don't have any outstanding requests to process for
653 * this channel, don't signal the network thread that
654 * we're sleeping. It already knows.
655 */
656 if (responder->stats.outstanding == 0) return 0;
657
658 responder->stats.signals++;
659
661 cc.ack = responder->ack;
662 cc.ch = ch;
663
664 MPRINT("\tRESPONDER SLEEPING num_outstanding %"PRIu64", packets in %"PRIu64", packets out %"PRIu64"\n", responder->stats.outstanding,
665 ch->end[TO_RESPONDER].stats.packets, responder->stats.packets);
666 return fr_control_message_send(responder->control, responder->rb, FR_CONTROL_ID_CHANNEL, &cc, sizeof(cc));
667}
668
669
670/** Service a control-plane message
671 *
672 * @param[in] when The current time.
673 * @param[out] p_channel The channel which should be serviced.
674 * @param[in] data The control message.
675 * @param[in] data_size The size of the control message.
676 * @return
677 * - FR_CHANNEL_ERROR on error
678 * - FR_CHANNEL_NOOP, on do nothing
679 * - FR_CHANNEL_DATA_READY on data ready
680 * - FR_CHANNEL_OPEN when a channel has been opened and sent to us
681 * - FR_CHANNEL_CLOSE when a channel should be closed
682 */
683fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
684{
685 int rcode;
686#if ENABLE_SKIPS
687 uint64_t ack;
688#endif
692 fr_channel_end_t *requestor;
693 fr_channel_t *ch;
694
695 fr_assert(data_size == sizeof(cc));
696 memcpy(&cc, data, data_size);
697
698 cs = cc.signal;
699#if ENABLE_SKIPS
700 ack = cc.ack;
701#endif
702 *p_channel = ch = cc.ch;
703
704 switch (cs) {
705 /*
706 * These all have the same numbers as the channel
707 * events, and have no extra processing. We just
708 * return them as-is.
709 */
715 MPRINT("channel got %d\n", cs);
716 return (fr_channel_event_t) cs;
717
718 /*
719 * Only sent by the responder. Both of these
720 * situations are largely the same, except for
721 * return codes.
722 */
724 MPRINT("channel got data_done_responder\n");
726 ch->end[TO_RESPONDER].must_signal = true;
727 break;
728
730 MPRINT("channel got responder_sleeping\n");
731 ce = FR_CHANNEL_NOOP;
732 ch->end[TO_RESPONDER].must_signal = true;
733 break;
734 }
735
736 /*
737 * Compare their ACK to the last sequence we
738 * sent. If it's different, we signal the responder
739 * to wake up.
740 */
741 requestor = &ch->end[TO_RESPONDER];
742#if ENABLE_SKIPS
743 if (!requestor->must_signal && (ack == requestor->sequence)) {
744 MPRINT("REQUESTOR SKIPS signal AFTER CE %d num_outstanding %"PRIu64"\n", cs, requestor->stats.outstanding);
745 MPRINT("REQUESTOR has ack %"PRIu64", my seq %"PRIu64" my_view %"PRIu64"\n", ack, requestor->sequence, requestor->their_view_of_my_sequence);
746 return ce;
747 }
748
749 /*
750 * The responder is sleeping or done. There are more
751 * packets available, so we signal it to wake up again.
752 */
753 fr_assert(ack <= requestor->sequence);
754#endif
755
756 /*
757 * We're signaling it again...
758 */
759 requestor->stats.resignals++;
760
761 /*
762 * The responder hasn't seen our last few packets. Signal
763 * that there is data ready.
764 */
765 MPRINT("REQUESTOR SIGNALS AFTER CE %d\n", cs);
767 if (rcode < 0) return FR_CHANNEL_ERROR;
768
769 return ce;
770}
771
772
773/** Service a control-plane event.
774 *
775 * The channels use control planes for internal signaling. Note that
776 * the caller does NOT pass the channel into this function. Instead,
777 * the channel is taken from the kevent.
778 *
779 * @param[in] ch The channel to service.
780 * @param[in] c The control plane on which we received the kev.
781 * @param[in] kev The kevent data, should get passed to the control plane.
782 * @return
783 * - <0 on error
784 * - 0 on success
785 */
786int fr_channel_service_kevent(fr_channel_t *ch, fr_control_t *c, UNUSED struct kevent const *kev)
787{
788 (void) talloc_get_type_abort(ch, fr_channel_t);
789
790 if (c == ch->end[TO_RESPONDER].control) {
792 } else {
794 }
795
796 return 0;
797}
798
799
800/** Check if a channel is active.
801 *
802 * A channel may be closed by either end. If so, it stays alive (but
803 * inactive) until both ends acknowledge the close.
804 *
805 * @param[in] ch the channel
806 * @return
807 * - false the channel is closing.
808 * - true the channel is active
809 */
814
815/** Signal a responder that the channel is closing
816 *
817 * @param[in] ch The channel.
818 * @return
819 * - <0 on error
820 * - 0 on success
821 */
823{
824 int ret;
825 bool active;
827
828 active = atomic_load(&ch->end[TO_RESPONDER].active);
829 if (!active) return 0; /* Already signalled to close */
830
831 atomic_store(&ch->end[TO_RESPONDER].active, false); /* Prevent further requests */
832
833 (void) talloc_get_type_abort(ch, fr_channel_t);
834
836 cc.ack = TO_RESPONDER;
837 cc.ch = ch;
838
840 ch->end[TO_RESPONDER].rb, FR_CONTROL_ID_CHANNEL, &cc, sizeof(cc));
841
842 return ret;
843}
844
845/** Acknowledge that the channel is closing
846 *
847 * @param[in] ch The channel.
848 * @return
849 * - <0 on error
850 * - 0 on success
851 */
853{
854 int ret;
855 bool active;
856
858
859 active = atomic_load(&ch->end[TO_REQUESTOR].active);
860 if (!active) return 0; /* Already signalled to close */
861
862 atomic_store(&ch->end[TO_REQUESTOR].active, false); /* Prevent further responses */
863
864 (void) talloc_get_type_abort(ch, fr_channel_t);
865
867 cc.ack = TO_REQUESTOR;
868 cc.ch = ch;
869
871 ch->end[TO_REQUESTOR].rb, FR_CONTROL_ID_CHANNEL, &cc, sizeof(cc));
872
873 return ret;
874}
875
876/** Add responder-specific data to a channel
877 *
878 * @param[in] ch The channel.
879 * @param[in] uctx The context to add.
880 */
882{
883 (void) talloc_get_type_abort(ch, fr_channel_t);
884
885 ch->end[TO_REQUESTOR].uctx = uctx;
886}
887
888
889/** Get responder-specific data from a channel
890 *
891 * @param[in] ch The channel.
892 */
894{
895 (void) talloc_get_type_abort(ch, fr_channel_t);
896
897 return ch->end[TO_REQUESTOR].uctx;
898}
899
900
901/** Add network-specific data to a channel
902 *
903 * @param[in] ch The channel.
904 * @param[in] uctx The context to add.
905 */
907{
908 (void) talloc_get_type_abort(ch, fr_channel_t);
909
910 ch->end[TO_RESPONDER].uctx = uctx;
911}
912
913
914/** Get network-specific data from a channel
915 *
916 * @param[in] ch The channel.
917 */
919{
920 (void) talloc_get_type_abort(ch, fr_channel_t);
921
922 return ch->end[TO_RESPONDER].uctx;
923}
924
925
927{
928 ch->end[TO_RESPONDER].recv = recv_reply;
929 ch->end[TO_RESPONDER].recv_uctx = uctx;
930
931 return 0;
932}
933
935{
936 ch->end[TO_REQUESTOR].recv = recv_request;
937 ch->end[TO_REQUESTOR].recv_uctx = uctx;
938 return 0;
939}
940
941/** Send a channel to a responder
942 *
943 * @param[in] ch The channel.
944 * @return
945 * - <0 on error
946 * - 0 on success
947 */
949{
951
953 cc.ack = 0;
954 cc.ch = ch;
955
957}
958
959void fr_channel_stats_log(fr_channel_t const *ch, fr_log_t const *log, char const *file, int line)
960{
961 fr_log(log, L_INFO, file, line, "requestor\n");
962 fr_log(log, L_INFO, file, line, "\tsignals sent = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.signals);
963 fr_log(log, L_INFO, file, line, "\tsignals re-sent = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.resignals);
964 fr_log(log, L_INFO, file, line, "\tkevents checked = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.kevents);
965 fr_log(log, L_INFO, file, line, "\toutstanding = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.outstanding);
966 fr_log(log, L_INFO, file, line, "\tpackets processed = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.packets);
967 fr_log(log, L_INFO, file, line, "\tmessage interval (RTT) = %" PRIu64 "\n", fr_time_delta_unwrap(ch->end[TO_RESPONDER].stats.message_interval));
968 fr_log(log, L_INFO, file, line, "\tlast write = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_RESPONDER].stats.last_write));
969 fr_log(log, L_INFO, file, line, "\tlast read other end = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_RESPONDER].stats.last_read_other));
970 fr_log(log, L_INFO, file, line, "\tlast signal other = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_RESPONDER].stats.last_sent_signal));
971
972 fr_log(log, L_INFO, file, line, "responder\n");
973 fr_log(log, L_INFO, file, line, "\tsignals sent = %" PRIu64"\n", ch->end[TO_REQUESTOR].stats.signals);
974 fr_log(log, L_INFO, file, line, "\tkevents checked = %" PRIu64 "\n", ch->end[TO_REQUESTOR].stats.kevents);
975 fr_log(log, L_INFO, file, line, "\tpackets processed = %" PRIu64 "\n", ch->end[TO_REQUESTOR].stats.packets);
976 fr_log(log, L_INFO, file, line, "\tmessage interval (RTT) = %" PRIu64 "\n", fr_time_delta_unwrap(ch->end[TO_REQUESTOR].stats.message_interval));
977 fr_log(log, L_INFO, file, line, "\tlast write = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_REQUESTOR].stats.last_write));
978 fr_log(log, L_INFO, file, line, "\tlast read other end = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_REQUESTOR].stats.last_read_other));
979 fr_log(log, L_INFO, file, line, "\tlast signal other = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_REQUESTOR].stats.last_sent_signal));
980}
int const char * file
Definition acutest.h:702
int const char int line
Definition acutest.h:702
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
size_t fr_atomic_queue_size(fr_atomic_queue_t *aq)
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:488
#define L(_str)
Helper for initialising arrays of string literals.
Definition build.h:210
#define UNUSED
Definition build.h:318
#define NUM_ELEMENTS(_t)
Definition build.h:340
fr_atomic_queue_t * aq
The queue of messages - visible only to this channel.
Definition channel.c:128
atomic_bool active
Whether the channel is active.
Definition channel.c:130
#define MPRINT(...)
Definition channel.c:42
void * fr_channel_requestor_uctx_get(fr_channel_t *ch)
Get network-specific data from a channel.
Definition channel.c:918
fr_channel_signal_t
Definition channel.c:80
@ FR_CHANNEL_SIGNAL_DATA_DONE_RESPONDER
Definition channel.c:91
@ FR_CHANNEL_SIGNAL_DATA_TO_REQUESTOR
Definition channel.c:83
@ FR_CHANNEL_SIGNAL_DATA_TO_RESPONDER
Definition channel.c:82
@ FR_CHANNEL_SIGNAL_RESPONDER_SLEEPING
Definition channel.c:92
@ FR_CHANNEL_SIGNAL_ERROR
Definition channel.c:81
@ FR_CHANNEL_SIGNAL_OPEN
Definition channel.c:84
@ FR_CHANNEL_SIGNAL_CLOSE
Definition channel.c:85
uint64_t sequence_at_last_signal
When we last signaled.
Definition channel.c:126
uint64_t sequence
Sequence number for this channel.
Definition channel.c:122
bool must_signal
we need to signal the other end
Definition channel.c:119
fr_channel_signal_t signal
the signal to send
Definition channel.c:96
fr_table_num_sorted_t const channel_signals[]
Definition channel.c:151
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
Definition channel.c:406
fr_channel_direction_t direction
Use for debug messages.
Definition channel.c:108
size_t channel_signals_len
Definition channel.c:160
#define RTT(_old, _new)
Definition channel.c:290
void * uctx
Worker context.
Definition channel.c:114
size_t channel_packet_priority_len
Definition channel.c:168
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
Definition channel.c:822
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
Definition channel.c:304
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
Definition channel.c:181
fr_channel_direction_t
Definition channel.c:50
@ TO_RESPONDER
Definition channel.c:51
@ TO_REQUESTOR
Definition channel.c:52
fr_table_num_sorted_t const channel_packet_priority[]
Definition channel.c:162
static int fr_channel_data_ready(fr_channel_t *ch, fr_time_t when, fr_channel_end_t *end, fr_channel_signal_t which)
Send a message via a kq user signal.
Definition channel.c:270
int fr_channel_set_recv_reply(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_reply)
Definition channel.c:926
fr_ring_buffer_t * rb
Ring buffer for control-plane messages.
Definition channel.c:112
#define ATOMIC_QUEUE_SIZE
Size of the atomic queues.
Definition channel.c:78
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition channel.c:683
fr_channel_stats_t stats
channel statistics
Definition channel.c:132
fr_control_t * control
The control plane, consisting of an atomic queue and kqueue.
Definition channel.c:110
bool same_thread
are both ends in the same thread?
Definition channel.c:146
int fr_channel_set_recv_request(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_request)
Definition channel.c:934
uint64_t ack
or the endpoint..
Definition channel.c:97
void * fr_channel_responder_uctx_get(fr_channel_t *ch)
Get responder-specific data from a channel.
Definition channel.c:893
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
Definition channel.c:470
int fr_channel_null_reply(fr_channel_t *ch)
Don't send a reply message into the channel.
Definition channel.c:622
void fr_channel_requestor_uctx_add(fr_channel_t *ch, void *uctx)
Add network-specific data to a channel.
Definition channel.c:906
fr_time_delta_t cpu_time
Total time used by the responder for this channel.
Definition channel.c:143
int fr_channel_service_kevent(fr_channel_t *ch, fr_control_t *c, UNUSED struct kevent const *kev)
Service a control-plane event.
Definition channel.c:786
void fr_channel_responder_uctx_add(fr_channel_t *ch, void *uctx)
Add responder-specific data to a channel.
Definition channel.c:881
int fr_channel_responder_sleeping(fr_channel_t *ch)
Signal a channel that the responder is sleeping.
Definition channel.c:644
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
Definition channel.c:509
fr_channel_end_t end[2]
Two ends of the channel.
Definition channel.c:148
void fr_channel_stats_log(fr_channel_t const *ch, fr_log_t const *log, char const *file, int line)
Definition channel.c:959
fr_channel_t * ch
the channel
Definition channel.c:98
uint64_t their_view_of_my_sequence
Should be clear.
Definition channel.c:124
bool fr_channel_active(fr_channel_t *ch)
Check if a channel is active.
Definition channel.c:810
fr_time_delta_t processing_time
Time spent by the responder processing requests.
Definition channel.c:144
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
Definition channel.c:852
uint64_t ack
Sequence number of the other end.
Definition channel.c:123
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
Definition channel.c:948
fr_channel_recv_callback_t recv
callback for receiving messages
Definition channel.c:116
void * recv_uctx
context for receiving messages
Definition channel.c:117
One end of a channel.
Definition channel.c:107
A full channel, which consists of two ends.
Definition channel.c:142
fr_message_t m
the message header
Definition channel.h:107
fr_channel_event_t
Definition channel.h:69
@ FR_CHANNEL_NOOP
Definition channel.h:76
@ FR_CHANNEL_CLOSE
Definition channel.h:74
@ FR_CHANNEL_ERROR
Definition channel.h:70
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition channel.h:72
@ FR_CHANNEL_OPEN
Definition channel.h:73
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition channel.h:71
fr_time_delta_t message_interval
Interval between messages.
Definition channel.h:94
#define FR_CONTROL_ID_CHANNEL
Definition channel.h:67
void(* fr_channel_recv_callback_t)(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Definition channel.h:171
#define PRIORITY_NORMAL
Definition channel.h:153
uint64_t packets
Number of actual data packets.
Definition channel.h:88
uint64_t resignals
Number of signals resent.
Definition channel.h:86
uint64_t outstanding
Number of outstanding requests with no reply.
Definition channel.h:84
fr_time_t last_sent_signal
The last time when we signaled the other end.
Definition channel.h:96
#define PRIORITY_NOW
Definition channel.h:151
fr_time_t last_read_other
Last time we successfully read a message from the other the channel.
Definition channel.h:93
#define PRIORITY_HIGH
Definition channel.h:152
fr_time_t last_write
Last write to the channel.
Definition channel.h:92
uint64_t kevents
Number of times we've looked at kevents.
Definition channel.h:90
uint64_t signals
Number of kevent signals we've sent.
Definition channel.h:85
#define PRIORITY_LOW
Definition channel.h:154
Channel information which is added to a message.
Definition channel.h:106
Statistics for the channel.
Definition channel.h:83
#define FR_CONTROL_MAX_SIZE
Definition control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition control.h:50
static fr_atomic_queue_t ** aq
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:212
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:158
talloc_free(hp)
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:355
The control structure.
Definition control.c:76
#define fr_time()
Definition event.c:60
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition log.c:577
@ L_INFO
Informational message.
Definition log.h:52
fr_time_t when
when this message was sent
Definition message.h:47
#define fr_assert(_expr)
Definition rad_assert.h:37
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:63
#define atomic_store(object, desired)
Definition stdatomic.h:345
#define atomic_load(object)
Definition stdatomic.h:343
Definition log.h:93
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
An element in a lexicographically sorted array of name to num mappings.
Definition table.h:49
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
Definition time.h:154
static int64_t fr_time_unwrap(fr_time_t time)
Definition time.h:146
#define fr_time_lteq(_a, _b)
Definition time.h:240
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1340