The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
channel.c
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or
5  * (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: b7df5c377c5e3778bcc5c2b7a7185d59bf6bed06 $
19  *
20  * @brief Two-way thread-safe channels.
21  * @file io/channel.c
22  *
23  * @copyright 2016 Alan DeKok (aland@freeradius.org)
24  */
25 RCSID("$Id: b7df5c377c5e3778bcc5c2b7a7185d59bf6bed06 $")
26 
27 #include <freeradius-devel/io/channel.h>
28 #include <freeradius-devel/io/control.h>
29 #include <freeradius-devel/util/log.h>
30 #include <freeradius-devel/util/debug.h>
31 
32 #ifdef HAVE_STDATOMIC_H
33 # include <stdatomic.h>
34 #else
35 # include <freeradius-devel/util/stdatomic.h>
36 #endif
37 
38 /*
39  * Debugging, mainly for channel_test
40  */
41 #ifdef DEBUG_CHANNEL
42 #define MPRINT(...) fprintf(stdout, __VA_ARGS__)
43 #else
44 #define MPRINT(...)
45 #endif
46 
47 /*
48  * We disable this until we fix all of the signaling issues...
49  */
50 #define ENABLE_SKIPS (0)
51 
52 typedef enum {
54  TO_REQUESTOR = 1
56 
57 #ifdef DEBUG_CHANNEL
58 static fr_table_num_sorted_t const channel_direction[] = {
59  { L("to responder"), TO_RESPONDER },
60  { L("to requestor"), TO_REQUESTOR },
61 };
62 size_t channel_direction_len = NUM_ELEMENTS(channel_direction);
63 #endif
64 
65 #if 0
66 #define SIGNAL_INTERVAL (1000000) //!< The minimum interval between responder signals.
67 #endif
68 
69 /** Size of the atomic queues
70  *
71  * The queue reader MUST service the queue occasionally,
72  * otherwise the writer will not be able to write. If it's too
73  * low, the writer will fail. If it's too high, it will
74  * unnecessarily use memory. So we're better off putting it on
75  * the high side.
76  *
77  * The reader SHOULD service the queues at inter-packet latency.
78  * i.e. at 1M pps, the queue will get serviced every microsecond.
79  */
80 #define ATOMIC_QUEUE_SIZE (1024)
81 
82 typedef enum fr_channel_signal_t {
88 
89  /*
90  * The preceding MUST be in the same order as fr_channel_event_t
91  */
92 
96 
97 typedef struct {
98  fr_channel_signal_t signal; //!< the signal to send
99  uint64_t ack; //!< or the endpoint..
100  fr_channel_t *ch; //!< the channel
102 
103 /** One end of a channel
104  *
105  * Consists of a kqueue descriptor, and an atomic queue.
106  * The atomic queue is there to get bulk data through, because it's more efficient
107  * than pushing 1M+ events per second through a kqueue.
108  */
109 typedef struct {
110  fr_channel_direction_t direction; //!< Use for debug messages.
111 
112  fr_control_t *control; //!< The control plane, consisting of an atomic queue and kqueue.
113 
114  fr_ring_buffer_t *rb; //!< Ring buffer for control-plane messages.
115 
116  void *uctx; //!< Worker context.
117 
118  fr_channel_recv_callback_t recv; //!< callback for receiving messages
119  void *recv_uctx; //!< context for receiving messages
120 
121  bool must_signal; //!< we need to signal the other end
122 
123 
124  uint64_t sequence; //!< Sequence number for this channel.
125  uint64_t ack; //!< Sequence number of the other end.
126  uint64_t their_view_of_my_sequence; //!< Should be clear.
127 
128  uint64_t sequence_at_last_signal; //!< When we last signaled.
129 
130  fr_atomic_queue_t *aq; //!< The queue of messages - visible only to this channel.
131 
132  atomic_bool active; //!< Whether the channel is active.
133 
134  fr_channel_stats_t stats; //!< channel statistics
136 
137 typedef struct fr_channel_s fr_channel_t;
138 
139 /** A full channel, which consists of two ends
140  *
141  * A channel consists of an I/O identifier that can be placed in kequeue
142  * and an atomic queue in each direction to allow for bidirectional communication.
143  */
144 struct fr_channel_s {
145  fr_time_delta_t cpu_time; //!< Total time used by the responder for this channel.
146  fr_time_delta_t processing_time; //!< Time spent by the responder processing requests.
147 
148  bool same_thread; //!< are both ends in the same thread?
149 
150  fr_channel_end_t end[2]; //!< Two ends of the channel.
151 };
152 
154  { L("error"), FR_CHANNEL_ERROR },
155  { L("data-to-responder"), FR_CHANNEL_SIGNAL_DATA_TO_RESPONDER },
156  { L("data-to-requestor"), FR_CHANNEL_DATA_READY_REQUESTOR },
157  { L("open"), FR_CHANNEL_OPEN },
158  { L("close"), FR_CHANNEL_CLOSE },
159  { L("data-done-responder"), FR_CHANNEL_SIGNAL_DATA_DONE_RESPONDER },
160  { L("responder-sleeping"), FR_CHANNEL_SIGNAL_RESPONDER_SLEEPING },
161 };
163 
165  { L("high"), PRIORITY_HIGH },
166  { L("low"), PRIORITY_LOW },
167  { L("normal"), PRIORITY_NORMAL },
168  { L("now"), PRIORITY_NOW }
169 };
171 
172 
173 /** Create a new channel
174  *
175  * @param[in] ctx The talloc_ctx to allocate channel data in.
176  * @param[in] requestor control plane.
177  * @param[in] responder control plane.
178  * @param[in] same whether or not the channel is for the same thread
179  * @return
180  * - NULL on error
181  * - channel on success
182  */
183 fr_channel_t *fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
184 {
185  fr_time_t now;
186  fr_channel_t *ch;
187 
188  ch = talloc_zero(ctx, fr_channel_t);
189  if (!ch) {
190  nomem:
191  fr_strerror_const("Failed allocating memory");
192  return NULL;
193  }
194 
195  ch->same_thread = same;
196 
199 
201  if (!ch->end[TO_RESPONDER].aq) {
202  talloc_free(ch);
203  goto nomem;
204  }
205 
207  if (!ch->end[TO_REQUESTOR].aq) {
208  talloc_free(ch);
209  goto nomem;
210  }
211 
212  ch->end[TO_RESPONDER].control = responder;
213  ch->end[TO_REQUESTOR].control = requestor;
214 
215  /*
216  * Create the ring buffer for the requestor to send
217  * control-plane messages to the responder, and vice-versa.
218  */
220  if (!ch->end[TO_RESPONDER].rb) {
221  rb_nomem:
222  fr_strerror_const_push("Failed allocating ring buffer");
223  talloc_free(ch);
224  return NULL;
225  }
226 
228  if (!ch->end[TO_REQUESTOR].rb) {
229  talloc_free(ch);
230  goto rb_nomem;
231  }
232 
233  /*
234  * Initialize all of the timers to now.
235  */
236  now = fr_time();
237 
238  ch->end[TO_RESPONDER].stats.last_write = now;
241  atomic_store(&ch->end[TO_RESPONDER].active, true);
242 
243  ch->end[TO_REQUESTOR].stats.last_write = now;
246  atomic_store(&ch->end[TO_REQUESTOR].active, true);
247 
248  return ch;
249 }
250 
251 
252 /** Send a message via a kq user signal
253  *
254  * Note that the caller doesn't care about data in the event, that is
255  * sent via the atomic queue. The kevent code takes care of
256  * delivering the signal once, even if it's sent by multiple requestor
257  * threads.
258  *
259  * The thread watching the KQ knows which end it is. So when it gets
260  * the signal (and the channel pointer) it knows to look at end[0] or
261  * end[1]. We also send which end in 'which' (0, 1) to further help
262  * the recipient.
263  *
264  * @param[in] ch the channel.
265  * @param[in] when the data was ready. Typically taken from the message.
266  * @param[in] end of the channel that the message was written to.
267  * @param[in] which end of the channel (0/1).
268  * @return
269  * - <0 on error
270  * - 0 on success
271  */
273 {
275 
276  end->stats.last_sent_signal = when;
277  end->stats.signals++;
278  end->must_signal = false;
279 
280  cc.signal = which;
281  cc.ack = end->ack;
282  cc.ch = ch;
283 
284  MPRINT("Signalling %s, with %s\n",
285  fr_table_str_by_value(channel_direction, end->direction, "<INVALID>"),
286  fr_table_str_by_value(channel_signals, which, "<INVALID>"));
287 
288  return fr_control_message_send(end->control, end->rb, FR_CONTROL_ID_CHANNEL, &cc, sizeof(cc));
289 }
290 
291 #define IALPHA (8)
292 #define RTT(_old, _new) fr_time_delta_wrap((fr_time_delta_unwrap(_new) + (fr_time_delta_unwrap(_old) * (IALPHA - 1))) / IALPHA)
293 
294 /** Send a request message into the channel
295  *
296  * The message should be initialized, other than "sequence" and "ack".
297  *
298  * This function automatically calls the recv_reply callback if there is a reply.
299  *
300  * @param[in] ch the channel to send the request on.
301  * @param[in] cd the message to send.
302  * @return
303  * - <0 on error
304  * - 0 on success
305  */
307 {
308  uint64_t sequence;
309  fr_time_t when;
310  fr_time_delta_t message_interval;
311  fr_channel_end_t *requestor;
312 
313  if (!fr_cond_assert_msg(atomic_load(&ch->end[TO_RESPONDER].active), "Channel not active")) return -1;
314 
315  /*
316  * Same thread? Just call the "recv" function directly.
317  */
318  if (ch->same_thread) {
319  ch->end[TO_REQUESTOR].recv(ch->end[TO_REQUESTOR].recv_uctx, ch, cd);
320  return 0;
321  }
322 
323  requestor = &(ch->end[TO_RESPONDER]);
324  when = cd->m.when;
325 
326  sequence = requestor->sequence + 1;
327  cd->live.sequence = sequence;
328  cd->live.ack = requestor->ack;
329 
330  /*
331  * Push the message onto the queue for the other end. If
332  * the push fails, the caller should try another queue.
333  */
334  if (!fr_atomic_queue_push(requestor->aq, cd)) {
335  fr_strerror_printf("Failed pushing to atomic queue - full. Queue contains %zu items",
336  fr_atomic_queue_size(requestor->aq));
337  while (fr_channel_recv_reply(ch));
338  return -1;
339  }
340 
341  requestor->sequence = sequence;
342  message_interval = fr_time_sub(when, requestor->stats.last_write);
343 
344  if (fr_time_delta_ispos(requestor->stats.message_interval)) {
345  requestor->stats.message_interval = message_interval;
346  } else {
347  requestor->stats.message_interval = RTT(requestor->stats.message_interval, message_interval);
348  }
349 
350  fr_assert_msg(fr_time_lteq(requestor->stats.last_write, when),
351  "Channel data timestamp (%" PRId64") older than last channel data sent (%" PRId64 ")",
352  fr_time_unwrap(when), fr_time_unwrap(requestor->stats.last_write));
353  requestor->stats.last_write = when;
354 
355  requestor->stats.outstanding++;
356  requestor->stats.packets++;
357 
358  MPRINT("REQUESTOR requests %"PRIu64", num_outstanding %"PRIu64"\n", requestor->stats.packets, requestor->stats.outstanding);
359 
360 #if ENABLE_SKIPS
361  /*
362  * We just sent the first packet. There can't possibly be a reply, so don't bother looking.
363  */
364  if (requestor->stats.outstanding == 1) {
365 
366  /*
367  * There is at least one old packet which is
368  * outstanding, look for a reply.
369  */
370  } else if (requestor->stats.outstanding > 1) {
371  bool has_reply;
372 
373  has_reply = fr_channel_recv_reply(ch);
374 
375  if (has_reply) while (fr_channel_recv_reply(ch));
376 
377  /*
378  * There's no reply yet, so we still have packets outstanding.
379  * Or, there is a reply, and there are more packets outstanding.
380  * Skip the signal.
381  */
382  if (!requestor->must_signal && (!has_reply || (has_reply && (requestor->stats.outstanding > 1)))) {
383  MPRINT("REQUESTOR SKIPS signal\n");
384  return 0;
385  }
386  }
387 #endif
388 
389  /*
390  * Tell the other end that there is new data ready.
391  *
392  * Ignore errors on signalling. The responder already has
393  * the packet in its inbound queue, so at some point, it
394  * will pick up the message.
395  */
396  MPRINT("REQUESTOR SIGNALS\n");
397  (void) fr_channel_data_ready(ch, when, requestor, FR_CHANNEL_SIGNAL_DATA_TO_RESPONDER);
398  return 0;
399 }
400 
401 /** Receive a reply message from the channel
402  *
403  * @param[in] ch the channel to read data from.
404  * @return
405  * - true if there was a message received
406  * - false if there are no more messages
407  */
409 {
410  fr_channel_data_t *cd;
411  fr_channel_end_t *requestor;
413 
414  fr_assert(ch->end[TO_RESPONDER].recv != NULL);
415 
416  aq = ch->end[TO_REQUESTOR].aq;
417  requestor = &(ch->end[TO_RESPONDER]);
418 
419  /*
420  * It's OK for the queue to be empty.
421  */
422  if (!fr_atomic_queue_pop(aq, (void **) &cd)) return false;
423 
424  /*
425  * We want an exponential moving average for round trip
426  * time, where "alpha" is a number between [0,1)
427  *
428  * RTT_new = alpha * RTT_old + (1 - alpha) * RTT_sample
429  *
430  * BUT we use fixed-point arithmetic, so we need to use inverse alpha,
431  * which works out to the following equation:
432  *
433  * RTT_new = (RTT_sample + (ialpha - 1) * RTT_old) / ialpha
434  *
435  * NAKs have zero processing time, so we ignore them for
436  * the purpose of RTT.
437  */
438  if (fr_time_delta_ispos(cd->reply.processing_time)) {
439  ch->processing_time = RTT(ch->processing_time, cd->reply.processing_time);
440  }
441  ch->cpu_time = cd->reply.cpu_time;
442 
443  /*
444  * Update the outbound channel with the knowledge that
445  * we've received one more reply, and with the responders
446  * ACK.
447  */
448  fr_assert(requestor->stats.outstanding > 0);
449  fr_assert(cd->live.sequence > requestor->ack);
450  fr_assert(cd->live.sequence <= requestor->sequence); /* must have fewer replies than requests */
451 
452  requestor->stats.outstanding--;
453  requestor->ack = cd->live.sequence;
454  requestor->their_view_of_my_sequence = cd->live.ack;
455 
456  fr_assert(fr_time_lteq(requestor->stats.last_read_other, cd->m.when));
457  requestor->stats.last_read_other = cd->m.when;
458 
459  ch->end[TO_RESPONDER].recv(ch->end[TO_RESPONDER].recv_uctx, ch, cd);
460 
461  return true;
462 }
463 
464 
465 /** Receive a request message from the channel
466  *
467  * @param[in] ch the channel
468  * @return
469  * - true if there was a message received
470  * - false if there are no more messages
471  */
473 {
474  fr_channel_data_t *cd;
475  fr_channel_end_t *responder;
477 
478  aq = ch->end[TO_RESPONDER].aq;
479  responder = &(ch->end[TO_REQUESTOR]);
480 
481  /*
482  * It's OK for the queue to be empty.
483  */
484  if (!fr_atomic_queue_pop(aq, (void **) &cd)) return false;
485 
486  fr_assert(cd->live.sequence > responder->ack);
487  fr_assert(cd->live.sequence >= responder->sequence); /* must have more requests than replies */
488 
489  responder->stats.outstanding++;
490  responder->ack = cd->live.sequence;
491  responder->their_view_of_my_sequence = cd->live.ack;
492 
493  fr_assert(fr_time_lteq(responder->stats.last_read_other, cd->m.when));
494  responder->stats.last_read_other = cd->m.when;
495 
496  ch->end[TO_REQUESTOR].recv(ch->end[TO_REQUESTOR].recv_uctx, ch, cd);
497 
498  return true;
499 }
500 
501 /** Send a reply message into the channel
502  *
503  * The message should be initialized, other than "sequence" and "ack".
504  *
505  * @param[in] ch the channel to send the reply on.
506  * @param[in] cd the message to send
507  * @return
508  * - <0 on error
509  * - 0 on success
510  */
512 {
513  uint64_t sequence;
514  fr_time_t when;
515  fr_time_delta_t message_interval;
516  fr_channel_end_t *responder;
517 
518  if (!fr_cond_assert_msg(atomic_load(&ch->end[TO_REQUESTOR].active), "Channel not active")) return -1;
519 
520  /*
521  * Same thread? Just call the "recv" function directly.
522  */
523  if (ch->same_thread) {
524  ch->end[TO_RESPONDER].recv(ch->end[TO_RESPONDER].recv_uctx, ch, cd);
525  return 0;
526  }
527 
528  responder = &(ch->end[TO_REQUESTOR]);
529 
530  when = cd->m.when;
531 
532  sequence = responder->sequence + 1;
533  cd->live.sequence = sequence;
534  cd->live.ack = responder->ack;
535 
536  if (!fr_atomic_queue_push(responder->aq, cd)) {
537  fr_strerror_printf("Failed pushing to atomic queue - full. Queue contains %zu items",
538  fr_atomic_queue_size(responder->aq));
539  while (fr_channel_recv_request(ch));
540  return -1;
541  }
542 
543  fr_assert(responder->stats.outstanding > 0);
544  responder->stats.outstanding--;
545  responder->stats.packets++;
546 
547  MPRINT("\tRESPONDER replies %"PRIu64", num_outstanding %"PRIu64"\n", responder->stats.packets, responder->stats.outstanding);
548 
549  responder->sequence = sequence;
550  message_interval = fr_time_sub(when, responder->stats.last_write);
551  responder->stats.message_interval = RTT(responder->stats.message_interval, message_interval);
552 
553  fr_assert_msg(fr_time_lteq(responder->stats.last_write, when),
554  "Channel data timestamp (%" PRId64") older than last channel data sent (%" PRId64 ")",
555  fr_time_unwrap(when), fr_time_unwrap(responder->stats.last_write));
556  responder->stats.last_write = when;
557 
558  /*
559  * Even if we think we have no more packets to process,
560  * the caller may have sent us one. Go check the input
561  * channel.
562  */
563  while (fr_channel_recv_request(ch));
564 
565  /*
566  * No packets outstanding, we HAVE to signal the requestor
567  * thread.
568  */
569  if (responder->stats.outstanding == 0) {
571  return 0;
572  }
573 
574  MPRINT("\twhen - last_read_other = %"PRIu64" - %"PRIu64" = %"PRIu64"\n", when, responder->stats.last_read_other, when - responder->stats.last_read_other);
575  MPRINT("\twhen - last signal = %"PRIu64" - %"PRIu64" = %"PRIu64"\n", when, responder->stats.last_sent_signal, when - responder->stats.last_sent_signal);
576  MPRINT("\tsequence - ack = %"PRIu64" - %"PRIu64" = %"PRIu64"\n", responder->sequence, responder->their_view_of_my_sequence, responder->sequence - responder->their_view_of_my_sequence);
577 
578 #ifdef __APPLE__
579  /*
580  * If we've sent them a signal since the last ACK, they
581  * will receive it, and process the packets. So we don't
582  * need to signal them again.
583  *
584  * But... this doesn't appear to work on the Linux
585  * libkqueue implementation.
586  */
587  if (responder->sequence_at_last_signal > responder->their_view_of_my_sequence) return 0;
588 #endif
589 
590  /*
591  * If we've received a new packet in the last while, OR
592  * we've sent a signal in the last while, then we don't
593  * need to send a new signal. But we DO send a signal if
594  * we haven't seen an ACK for a few packets.
595  *
596  * FIXME: make these limits configurable, or include
597  * predictions about packet processing time?
598  */
599  fr_assert(responder->their_view_of_my_sequence <= responder->sequence);
600 #if 0
601  if (((responder->sequence - their_view_of_my_sequence) <= 1000) &&
602  ((when - responder->stats.last_read_other < SIGNAL_INTERVAL) ||
603  ((when - responder->stats.last_sent_signal) < SIGNAL_INTERVAL))) {
604  MPRINT("\tRESPONDER SKIPS signal\n");
605  return 0;
606  }
607 #endif
608 
609  MPRINT("\tRESPONDER SIGNALS num_outstanding %"PRIu64"\n", responder->stats.outstanding);
610  (void) fr_channel_data_ready(ch, when, responder, FR_CHANNEL_SIGNAL_DATA_TO_REQUESTOR);
611  return 0;
612 }
613 
614 
615 /** Don't send a reply message into the channel
616  *
617  * The message should be the one we received from the network.
618  *
619  * @param[in] ch the channel on which we're dropping a packet
620  * @return
621  * - <0 on error
622  * - 0 on success
623  */
625 {
626  fr_channel_end_t *responder;
627 
628  responder = &(ch->end[TO_REQUESTOR]);
629 
630  responder->sequence++;
631  return 0;
632 }
633 
634 
635 
636 /** Signal a channel that the responder is sleeping
637  *
638  * This function should be called from the responders idle loop.
639  * i.e. only when it has nothing else to do.
640  *
641  * @param[in] ch the channel to signal we're no longer listening on.
642  * @return
643  * - <0 on error
644  * - 0 on success
645  */
647 {
648  fr_channel_end_t *responder;
650 
651  responder = &(ch->end[TO_REQUESTOR]);
652 
653  /*
654  * We don't have any outstanding requests to process for
655  * this channel, don't signal the network thread that
656  * we're sleeping. It already knows.
657  */
658  if (responder->stats.outstanding == 0) return 0;
659 
660  responder->stats.signals++;
661 
663  cc.ack = responder->ack;
664  cc.ch = ch;
665 
666  MPRINT("\tRESPONDER SLEEPING num_outstanding %"PRIu64", packets in %"PRIu64", packets out %"PRIu64"\n", responder->stats.outstanding,
667  ch->end[TO_RESPONDER].stats.packets, responder->stats.packets);
668  return fr_control_message_send(responder->control, responder->rb, FR_CONTROL_ID_CHANNEL, &cc, sizeof(cc));
669 }
670 
671 
672 /** Service a control-plane message
673  *
674  * @param[in] when The current time.
675  * @param[out] p_channel The channel which should be serviced.
676  * @param[in] data The control message.
677  * @param[in] data_size The size of the control message.
678  * @return
679  * - FR_CHANNEL_ERROR on error
680  * - FR_CHANNEL_NOOP, on do nothing
681  * - FR_CHANNEL_DATA_READY on data ready
682  * - FR_CHANNEL_OPEN when a channel has been opened and sent to us
683  * - FR_CHANNEL_CLOSE when a channel should be closed
684  */
685 fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
686 {
687  int rcode;
688 #if ENABLE_SKIPS
689  uint64_t ack;
690 #endif
694  fr_channel_end_t *requestor;
695  fr_channel_t *ch;
696 
697  fr_assert(data_size == sizeof(cc));
698  memcpy(&cc, data, data_size);
699 
700  cs = cc.signal;
701 #if ENABLE_SKIPS
702  ack = cc.ack;
703 #endif
704  *p_channel = ch = cc.ch;
705 
706  switch (cs) {
707  /*
708  * These all have the same numbers as the channel
709  * events, and have no extra processing. We just
710  * return them as-is.
711  */
717  MPRINT("channel got %d\n", cs);
718  return (fr_channel_event_t) cs;
719 
720  /*
721  * Only sent by the responder. Both of these
722  * situations are largely the same, except for
723  * return codes.
724  */
726  MPRINT("channel got data_done_responder\n");
728  ch->end[TO_RESPONDER].must_signal = true;
729  break;
730 
732  MPRINT("channel got responder_sleeping\n");
733  ce = FR_CHANNEL_NOOP;
734  ch->end[TO_RESPONDER].must_signal = true;
735  break;
736  }
737 
738  /*
739  * Compare their ACK to the last sequence we
740  * sent. If it's different, we signal the responder
741  * to wake up.
742  */
743  requestor = &ch->end[TO_RESPONDER];
744 #if ENABLE_SKIPS
745  if (!requestor->must_signal && (ack == requestor->sequence)) {
746  MPRINT("REQUESTOR SKIPS signal AFTER CE %d num_outstanding %"PRIu64"\n", cs, requestor->stats.outstanding);
747  MPRINT("REQUESTOR has ack %"PRIu64", my seq %"PRIu64" my_view %"PRIu64"\n", ack, requestor->sequence, requestor->their_view_of_my_sequence);
748  return ce;
749  }
750 
751  /*
752  * The responder is sleeping or done. There are more
753  * packets available, so we signal it to wake up again.
754  */
755  fr_assert(ack <= requestor->sequence);
756 #endif
757 
758  /*
759  * We're signaling it again...
760  */
761  requestor->stats.resignals++;
762 
763  /*
764  * The responder hasn't seen our last few packets. Signal
765  * that there is data ready.
766  */
767  MPRINT("REQUESTOR SIGNALS AFTER CE %d\n", cs);
768  rcode = fr_channel_data_ready(ch, when, requestor, FR_CHANNEL_SIGNAL_DATA_TO_RESPONDER);
769  if (rcode < 0) return FR_CHANNEL_ERROR;
770 
771  return ce;
772 }
773 
774 
775 /** Service a control-plane event.
776  *
777  * The channels use control planes for internal signaling. Note that
778  * the caller does NOT pass the channel into this function. Instead,
779  * the channel is taken from the kevent.
780  *
781  * @param[in] ch The channel to service.
782  * @param[in] c The control plane on which we received the kev.
783  * @param[in] kev The kevent data, should get passed to the control plane.
784  * @return
785  * - <0 on error
786  * - 0 on success
787  */
788 int fr_channel_service_kevent(fr_channel_t *ch, fr_control_t *c, UNUSED struct kevent const *kev)
789 {
790  (void) talloc_get_type_abort(ch, fr_channel_t);
791 
792  if (c == ch->end[TO_RESPONDER].control) {
793  ch->end[TO_RESPONDER].stats.kevents++;
794  } else {
795  ch->end[TO_REQUESTOR].stats.kevents++;
796  }
797 
798  return 0;
799 }
800 
801 
802 /** Check if a channel is active.
803  *
804  * A channel may be closed by either end. If so, it stays alive (but
805  * inactive) until both ends acknowledge the close.
806  *
807  * @param[in] ch the channel
808  * @return
809  * - false the channel is closing.
810  * - true the channel is active
811  */
813 {
815 }
816 
817 /** Signal a responder that the channel is closing
818  *
819  * @param[in] ch The channel.
820  * @return
821  * - <0 on error
822  * - 0 on success
823  */
825 {
826  int ret;
827  bool active;
829 
830  active = atomic_load(&ch->end[TO_RESPONDER].active);
831  if (!active) return 0; /* Already signalled to close */
832 
833  (void) talloc_get_type_abort(ch, fr_channel_t);
834 
836  cc.ack = TO_RESPONDER;
837  cc.ch = ch;
838 
840  ch->end[TO_RESPONDER].rb, FR_CONTROL_ID_CHANNEL, &cc, sizeof(cc));
841 
842  atomic_store(&ch->end[TO_RESPONDER].active, false); /* Prevent further requests */
843 
844  return ret;
845 }
846 
847 /** Acknowledge that the channel is closing
848  *
849  * @param[in] ch The channel.
850  * @return
851  * - <0 on error
852  * - 0 on success
853  */
855 {
856  int ret;
857  bool active;
858 
860 
861  active = atomic_load(&ch->end[TO_REQUESTOR].active);
862  if (!active) return 0; /* Already signalled to close */
863 
864  (void) talloc_get_type_abort(ch, fr_channel_t);
865 
866  atomic_store(&ch->end[TO_REQUESTOR].active, false); /* Prevent further responses */
867 
869  cc.ack = TO_REQUESTOR;
870  cc.ch = ch;
871 
873  ch->end[TO_REQUESTOR].rb, FR_CONTROL_ID_CHANNEL, &cc, sizeof(cc));
874 
875  atomic_store(&ch->end[TO_REQUESTOR].active, false); /* Prevent further requests */
876 
877  return ret;
878 }
879 
880 /** Add responder-specific data to a channel
881  *
882  * @param[in] ch The channel.
883  * @param[in] uctx The context to add.
884  */
886 {
887  (void) talloc_get_type_abort(ch, fr_channel_t);
888 
889  ch->end[TO_REQUESTOR].uctx = uctx;
890 }
891 
892 
893 /** Get responder-specific data from a channel
894  *
895  * @param[in] ch The channel.
896  */
898 {
899  (void) talloc_get_type_abort(ch, fr_channel_t);
900 
901  return ch->end[TO_REQUESTOR].uctx;
902 }
903 
904 
905 /** Add network-specific data to a channel
906  *
907  * @param[in] ch The channel.
908  * @param[in] uctx The context to add.
909  */
911 {
912  (void) talloc_get_type_abort(ch, fr_channel_t);
913 
914  ch->end[TO_RESPONDER].uctx = uctx;
915 }
916 
917 
918 /** Get network-specific data from a channel
919  *
920  * @param[in] ch The channel.
921  */
923 {
924  (void) talloc_get_type_abort(ch, fr_channel_t);
925 
926  return ch->end[TO_RESPONDER].uctx;
927 }
928 
929 
931 {
932  ch->end[TO_RESPONDER].recv = recv_reply;
933  ch->end[TO_RESPONDER].recv_uctx = uctx;
934 
935  return 0;
936 }
937 
939 {
940  ch->end[TO_REQUESTOR].recv = recv_request;
941  ch->end[TO_REQUESTOR].recv_uctx = uctx;
942  return 0;
943 }
944 
945 /** Send a channel to a responder
946  *
947  * @param[in] ch The channel.
948  * @return
949  * - <0 on error
950  * - 0 on success
951  */
953 {
955 
957  cc.ack = 0;
958  cc.ch = ch;
959 
961 }
962 
963 void fr_channel_stats_log(fr_channel_t const *ch, fr_log_t const *log, char const *file, int line)
964 {
965  fr_log(log, L_INFO, file, line, "requestor\n");
966  fr_log(log, L_INFO, file, line, "\tsignals sent = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.signals);
967  fr_log(log, L_INFO, file, line, "\tsignals re-sent = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.resignals);
968  fr_log(log, L_INFO, file, line, "\tkevents checked = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.kevents);
969  fr_log(log, L_INFO, file, line, "\toutstanding = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.outstanding);
970  fr_log(log, L_INFO, file, line, "\tpackets processed = %" PRIu64 "\n", ch->end[TO_RESPONDER].stats.packets);
971  fr_log(log, L_INFO, file, line, "\tmessage interval (RTT) = %" PRIu64 "\n", fr_time_delta_unwrap(ch->end[TO_RESPONDER].stats.message_interval));
972  fr_log(log, L_INFO, file, line, "\tlast write = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_RESPONDER].stats.last_read_other));
973  fr_log(log, L_INFO, file, line, "\tlast read other end = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_RESPONDER].stats.last_read_other));
974  fr_log(log, L_INFO, file, line, "\tlast signal other = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_RESPONDER].stats.last_sent_signal));
975 
976  fr_log(log, L_INFO, file, line, "responder\n");
977  fr_log(log, L_INFO, file, line, "\tsignals sent = %" PRIu64"\n", ch->end[TO_REQUESTOR].stats.signals);
978  fr_log(log, L_INFO, file, line, "\tkevents checked = %" PRIu64 "\n", ch->end[TO_REQUESTOR].stats.kevents);
979  fr_log(log, L_INFO, file, line, "\tpackets processed = %" PRIu64 "\n", ch->end[TO_REQUESTOR].stats.packets);
980  fr_log(log, L_INFO, file, line, "\tmessage interval (RTT) = %" PRIu64 "\n", fr_time_delta_unwrap(ch->end[TO_REQUESTOR].stats.message_interval));
981  fr_log(log, L_INFO, file, line, "\tlast write = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_REQUESTOR].stats.last_read_other));
982  fr_log(log, L_INFO, file, line, "\tlast read other end = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_REQUESTOR].stats.last_read_other));
983  fr_log(log, L_INFO, file, line, "\tlast signal other = %" PRIu64 "\n", fr_time_unwrap(ch->end[TO_REQUESTOR].stats.last_sent_signal));
984 }
int const char * file
Definition: acutest.h:702
int const char int line
Definition: acutest.h:702
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
Definition: atomic_queue.c:215
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Definition: atomic_queue.c:80
size_t fr_atomic_queue_size(fr_atomic_queue_t *aq)
Definition: atomic_queue.c:265
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
Definition: atomic_queue.c:148
Structure to hold the atomic queue.
Definition: atomic_queue.c:54
#define RCSID(id)
Definition: build.h:444
#define L(_str)
Helper for initialising arrays of string literals.
Definition: build.h:207
#define UNUSED
Definition: build.h:313
#define NUM_ELEMENTS(_t)
Definition: build.h:335
fr_atomic_queue_t * aq
The queue of messages - visible only to this channel.
Definition: channel.c:130
atomic_bool active
Whether the channel is active.
Definition: channel.c:132
#define MPRINT(...)
Definition: channel.c:44
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
Definition: channel.c:183
fr_channel_signal_t
Definition: channel.c:82
@ FR_CHANNEL_SIGNAL_DATA_DONE_RESPONDER
Definition: channel.c:93
@ FR_CHANNEL_SIGNAL_DATA_TO_REQUESTOR
Definition: channel.c:85
@ FR_CHANNEL_SIGNAL_DATA_TO_RESPONDER
Definition: channel.c:84
@ FR_CHANNEL_SIGNAL_RESPONDER_SLEEPING
Definition: channel.c:94
@ FR_CHANNEL_SIGNAL_ERROR
Definition: channel.c:83
@ FR_CHANNEL_SIGNAL_OPEN
Definition: channel.c:86
@ FR_CHANNEL_SIGNAL_CLOSE
Definition: channel.c:87
uint64_t sequence_at_last_signal
When we last signaled.
Definition: channel.c:128
uint64_t sequence
Sequence number for this channel.
Definition: channel.c:124
bool must_signal
we need to signal the other end
Definition: channel.c:121
fr_channel_signal_t signal
the signal to send
Definition: channel.c:98
void * fr_channel_requestor_uctx_get(fr_channel_t *ch)
Get network-specific data from a channel.
Definition: channel.c:922
fr_table_num_sorted_t const channel_signals[]
Definition: channel.c:153
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
Definition: channel.c:408
fr_channel_direction_t direction
Use for debug messages.
Definition: channel.c:110
size_t channel_signals_len
Definition: channel.c:162
#define RTT(_old, _new)
Definition: channel.c:292
void * uctx
Worker context.
Definition: channel.c:116
size_t channel_packet_priority_len
Definition: channel.c:170
void * fr_channel_responder_uctx_get(fr_channel_t *ch)
Get responder-specific data from a channel.
Definition: channel.c:897
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
Definition: channel.c:824
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
Definition: channel.c:306
fr_channel_direction_t
Definition: channel.c:52
@ TO_RESPONDER
Definition: channel.c:53
@ TO_REQUESTOR
Definition: channel.c:54
fr_table_num_sorted_t const channel_packet_priority[]
Definition: channel.c:164
static int fr_channel_data_ready(fr_channel_t *ch, fr_time_t when, fr_channel_end_t *end, fr_channel_signal_t which)
Send a message via a kq user signal.
Definition: channel.c:272
int fr_channel_set_recv_reply(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_reply)
Definition: channel.c:930
fr_ring_buffer_t * rb
Ring buffer for control-plane messages.
Definition: channel.c:114
#define ATOMIC_QUEUE_SIZE
Size of the atomic queues.
Definition: channel.c:80
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition: channel.c:685
fr_channel_stats_t stats
channel statistics
Definition: channel.c:134
fr_control_t * control
The control plane, consisting of an atomic queue and kqueue.
Definition: channel.c:112
bool same_thread
are both ends in the same thread?
Definition: channel.c:148
int fr_channel_set_recv_request(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_request)
Definition: channel.c:938
uint64_t ack
or the endpoint..
Definition: channel.c:99
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
Definition: channel.c:472
int fr_channel_null_reply(fr_channel_t *ch)
Don't send a reply message into the channel.
Definition: channel.c:624
void fr_channel_requestor_uctx_add(fr_channel_t *ch, void *uctx)
Add network-specific data to a channel.
Definition: channel.c:910
fr_time_delta_t cpu_time
Total time used by the responder for this channel.
Definition: channel.c:145
int fr_channel_service_kevent(fr_channel_t *ch, fr_control_t *c, UNUSED struct kevent const *kev)
Service a control-plane event.
Definition: channel.c:788
void fr_channel_responder_uctx_add(fr_channel_t *ch, void *uctx)
Add responder-specific data to a channel.
Definition: channel.c:885
int fr_channel_responder_sleeping(fr_channel_t *ch)
Signal a channel that the responder is sleeping.
Definition: channel.c:646
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
Definition: channel.c:511
fr_channel_end_t end[2]
Two ends of the channel.
Definition: channel.c:150
void fr_channel_stats_log(fr_channel_t const *ch, fr_log_t const *log, char const *file, int line)
Definition: channel.c:963
fr_channel_t * ch
the channel
Definition: channel.c:100
uint64_t their_view_of_my_sequence
Should be clear.
Definition: channel.c:126
bool fr_channel_active(fr_channel_t *ch)
Check if a channel is active.
Definition: channel.c:812
fr_time_delta_t processing_time
Time spent by the responder processing requests.
Definition: channel.c:146
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
Definition: channel.c:854
uint64_t ack
Sequence number of the other end.
Definition: channel.c:125
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
Definition: channel.c:952
fr_channel_recv_callback_t recv
callback for receiving messages
Definition: channel.c:118
void * recv_uctx
context for receiving messages
Definition: channel.c:119
One end of a channel.
Definition: channel.c:109
A full channel, which consists of two ends.
Definition: channel.c:144
fr_message_t m
the message header
Definition: channel.h:105
fr_channel_event_t
Definition: channel.h:67
@ FR_CHANNEL_NOOP
Definition: channel.h:74
@ FR_CHANNEL_CLOSE
Definition: channel.h:72
@ FR_CHANNEL_ERROR
Definition: channel.h:68
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition: channel.h:70
@ FR_CHANNEL_OPEN
Definition: channel.h:71
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition: channel.h:69
fr_time_delta_t message_interval
Interval between messages.
Definition: channel.h:92
void(* fr_channel_recv_callback_t)(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Definition: channel.h:169
#define PRIORITY_NORMAL
Definition: channel.h:151
uint64_t packets
Number of actual data packets.
Definition: channel.h:86
uint64_t resignals
Number of signals resent.
Definition: channel.h:84
uint64_t outstanding
Number of outstanding requests with no reply.
Definition: channel.h:82
fr_time_t last_sent_signal
The last time when we signaled the other end.
Definition: channel.h:94
#define PRIORITY_NOW
Definition: channel.h:149
fr_time_t last_read_other
Last time we successfully read a message from the other the channel.
Definition: channel.h:91
#define PRIORITY_HIGH
Definition: channel.h:150
fr_time_t last_write
Last write to the channel.
Definition: channel.h:90
uint64_t kevents
Number of times we've looked at kevents.
Definition: channel.h:88
uint64_t signals
Number of kevent signals we've sent.
Definition: channel.h:83
#define PRIORITY_LOW
Definition: channel.h:152
Channel information which is added to a message.
Definition: channel.h:104
Statistics for the channel.
Definition: channel.h:81
#define FR_CONTROL_ID_CHANNEL
Definition: control.h:56
#define FR_CONTROL_MAX_SIZE
Definition: control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition: control.h:50
static fr_atomic_queue_t * aq
Definition: control_test.c:47
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition: debug.h:208
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:154
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition: control.c:343
The control structure.
Definition: control.c:79
talloc_free(reap)
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition: log.c:599
@ L_INFO
Informational message.
Definition: log.h:55
fr_time_t when
when this message was sent
Definition: message.h:47
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition: ring_buffer.c:64
fr_assert(0)
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
#define atomic_store(object, desired)
Definition: stdatomic.h:345
#define atomic_load(object)
Definition: stdatomic.h:343
Definition: log.h:96
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition: table.h:253
An element in a lexicographically sorted array of name to num mappings.
Definition: table.h:45
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
Definition: time.h:154
static int64_t fr_time_unwrap(fr_time_t time)
Definition: time.h:146
#define fr_time_lteq(_a, _b)
Definition: time.h:240
#define fr_time_delta_ispos(_a)
Definition: time.h:288
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition: time.h:229
A time delta, a difference in time measured in nanoseconds.
Definition: time.h:80
"server local" time.
Definition: time.h:69
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition: strerror.h:64
#define fr_strerror_const_push(_msg)
Definition: strerror.h:227
#define fr_strerror_const(_msg)
Definition: strerror.h:223
static fr_slen_t data
Definition: value.h:1259