The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
retry.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: a0d3078b8bb4db8ba2a96e7b0d3ef85a54c66f4b $
19 * @file lib/bio/retry.c
20 * @brief Binary IO abstractions for retrying packets.
21 *
22 * The retry BIO provides a mechanism for the application to send one packet, and then delegate
23 * retransmissions to the retry bio.
24 *
25 * This BIO will monitor writes, and run callbacks when a packet is sent, received, and released. The
26 * application should cache the request and response until the release callback has been run. The BIO will
27 * call the application on retries, or when the retransmissions have stopped.
28 *
29 * The retry BIO also deals with partially written packets. The BIO takes responsibility for not writing
30 * partial packets, which means that requests can be rleeased even if the data has been partially written.
31 * The application can also cancel an ongoing retryt entrty at any time.
32 *
33 * If something blocks IO, the application should call the blocked / resume functions for this BIO to inform
34 * it of IO changes. Otherwise, the only time this BIO blocks is when it runs out of retransmission slots.
35 *
36 * There are provisions for application-layer watchdogs, where the application can reserve a retry entry. It
37 * can then call the fr_bio_retry_rewrite() function instead of fr_bio_write() to write the watchdog packet.
38 * Any retransmission timers for the application-layer watchdog must be handled by the application. The BIO
39 * will not retry reserved watchdog requests.
40 *
41 * In general, the next BIO after this one should be the memory bio, so that this bio receives only complete
42 * packets.
43 *
44 * @copyright 2024 Network RADIUS SAS (legal@networkradius.com)
45 */
46
47#include <freeradius-devel/bio/bio_priv.h>
48#include <freeradius-devel/bio/null.h>
49#include <freeradius-devel/bio/buf.h>
50#include <freeradius-devel/util/rb.h>
51#include <freeradius-devel/util/dlist.h>
52
53#define _BIO_RETRY_PRIVATE
54#include <freeradius-devel/bio/retry.h>
55
56typedef struct fr_bio_retry_list_s fr_bio_retry_list_t;
58
59/*
60 * Define type-safe wrappers for head and entry definitions.
61 */
62FR_DLIST_TYPES(fr_bio_retry_list)
63
65 void *uctx;
67 fr_bio_retry_rewrite_t rewrite; //!< per-packet rewrite callback
68 void *rewrite_ctx; //!< context specifically for rewriting this packet
69
70 fr_retry_t retry; //!< retry timers and counters
71
72 union {
73 fr_rb_node_t next_retry_node; //!< for retries
74 FR_DLIST_ENTRY(fr_bio_retry_list) entry; //!< for the free list
75 };
76 fr_rb_node_t expiry_node; //!< for expiries
77
78 fr_bio_retry_t *my; //!< so we can get to it from the event timer callback
79
80 uint8_t const *buffer; //!< cached copy of the packet to send
81 size_t size; //!< size of the cached packet
82
83 bool cancelled; //!< was this item cancelled?
84 bool reserved; //!< for application-layer watchdog
85};
86
87FR_DLIST_FUNCS(fr_bio_retry_list, fr_bio_retry_entry_t, entry)
88
91
92 fr_rb_tree_t next_retry_tree; //!< when packets are retried next
93 fr_rb_tree_t expiry_tree; //!< when packets expire, so that we expire packets when the socket is blocked.
94
96
98
100 bool all_used; //!< blocked due to no free entries
101
102 fr_event_timer_t const *ev; //!< we only need one timer event: next time we do something
103
104 /*
105 * The first item is cached here so that we can detect when it changes. The insert / delete
106 * code can just do its work without worrying about timers. And then when the tree manipulation
107 * is done, call the fr_bio_retry_timer_reset() function to reset (or not) the timer.
108 */
110
111 /*
112 * Cache a partial write when IO is blocked. Partial
113 * packets are left in the timer tree so that they can be expired.
114 */
115 fr_bio_retry_entry_t *partial; //!< for partial writes
116
117 fr_bio_retry_sent_t sent; //!< callback for when we successfully sent a packet
118 fr_bio_retry_rewrite_t rewrite; //!< optional callback which can change a packet on retry
119 fr_bio_retry_response_t response; //!< callback to see if we got a valid response
120 fr_bio_retry_release_t release; //!< callback to release a request / response pair
121
122 fr_bio_buf_t buffer; //!< to store partial packets
123
124 FR_DLIST_HEAD(fr_bio_retry_list) free; //!< free lists are better than memory fragmentation
125};
126
127static void fr_bio_retry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx);
128static void fr_bio_retry_expiry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx);
129static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
131
132#define fr_bio_retry_timer_clear(_x) do { \
133 talloc_const_free((_x)->ev); \
134 (_x)->next_retry_item = NULL; \
135 } while (0)
136
137/** Reset the expiry timer after expiring one element
138 *
139 */
141{
143
144 fr_assert(my->info.write_blocked);
145
146 /*
147 * Nothing to do, don't set any timers.
148 */
149 first = fr_rb_first(&my->expiry_tree);
150 if (!first) {
152 return 0;
153 }
154
155 /*
156 * The timer is already set correctly, we're done.
157 */
158 if (first == my->next_retry_item) return 0;
159
160 /*
161 * Update the timer. This should never fail.
162 */
163 if (fr_event_timer_at(my, my->info.el, &my->ev, first->retry.end, fr_bio_retry_expiry_timer, my) < 0) return -1;
164
165 my->next_retry_item = first;
166 return 0;
167}
168
169
170/** Reset the timer after changing the rb tree.
171 *
172 */
174{
176
177 if (my->info.write_blocked) return fr_bio_retry_expiry_timer_reset(my);
178
179 /*
180 * Nothing to do, don't set any timers.
181 */
182 first = fr_rb_first(&my->next_retry_tree);
183 if (!first) {
184 cancel_timer:
186 return 0;
187 }
188
189 /*
190 * We're partially writing a response. Don't bother with the timer, and delete any existing
191 * timer. It will be reset when the partial entry is placed back into the queue.
192 */
193 if (first == my->partial) goto cancel_timer;
194
195 /*
196 * The timer is already set correctly, we're done.
197 */
198 if (first == my->next_retry_item) return 0;
199
200 /*
201 * Update the timer. This should never fail.
202 */
203 if (fr_event_timer_at(my, my->info.el, &my->ev, first->retry.next, fr_bio_retry_timer, my) < 0) return -1;
204
205 my->next_retry_item = first;
206 return 0;
207}
208
209/** Release an entry back to the free list.
210 *
211 */
213{
214 bool timer_reset = false;
215
216 /*
217 * Remove the item before calling the application "release" function.
218 */
219 if (my->partial != item) {
220 if (!item->reserved) {
221 (void) fr_rb_remove_by_inline_node(&my->next_retry_tree, &item->next_retry_node);
222 (void) fr_rb_remove_by_inline_node(&my->expiry_tree, &item->expiry_node);
223 }
224 } else {
225 item->cancelled = true;
226 }
227
228 /*
229 * Tell the caller that we've released it before doing anything else. That way we can safely
230 * modify anything we want.
231 */
232 my->release((fr_bio_t *) my, item, reason);
233
234 /*
235 * We've partially written this item. Don't bother changing it's position in any of the lists,
236 * as it's in progress.
237 */
238 if (my->partial == item) return;
239
240 /*
241 * We're deleting the timer entry, make sure that we clean up its events,
242 */
243 if (my->next_retry_item == item) {
245 timer_reset = true;
246 }
247
248 /*
249 * If we were blocked due to having no free entries, then resume writes as soon as we create a free entry.
250 */
251 if (my->all_used) {
252 fr_assert(fr_bio_retry_list_num_elements(&my->free) == 0);
253
254 /*
255 * The application MUST call fr_bio_retry_write_resume(), which will check if IO is
256 * actually blocked.
257 */
258 my->all_used = false;
259
260 if (my->cb.write_resume) (void) my->cb.write_resume(&my->bio);
261 }
262
263 /*
264 * If write_resume() above called the application, then it might have already updated the timer.
265 * Don't do that again.
266 */
267 if (timer_reset) (void) fr_bio_retry_timer_reset(my);
268
269 item->packet_ctx = NULL;
270
271 fr_assert(my->next_retry_item != item);
272 fr_bio_retry_list_insert_head(&my->free, item);
273}
274
275/** Writes are blocked.
276 *
277 */
279{
280 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
281
282 if (my->info.write_blocked) {
283 fr_assert(!my->ev);
284 return 1;
285 }
286
287 my->info.write_blocked = true;
288
290 if (fr_bio_retry_expiry_timer_reset(my) < 0) return fr_bio_error(GENERIC);
291
292 return 1;
293}
294
295
296/** Write one item.
297 *
298 * @return
299 * - <0 on error
300 * - 0 for "can't write any more"
301 * - 1 for "wrote a packet"
302 */
304{
305 ssize_t rcode;
306 fr_retry_state_t state;
307
308 fr_assert(!my->partial);
309 fr_assert(!item->reserved);
310
311 /*
312 * Are we there yet?
313 *
314 * Release it, indicating whether or not we successfully got a reply.
315 */
316 state = fr_retry_next(&item->retry, now);
317 if (state != FR_RETRY_CONTINUE) {
319 return 1;
320 }
321
322 /*
323 * Track when we last sent a NEW packet. Also track when we first sent a packet after becoming
324 * writeable again.
325 */
326 if ((item->retry.count == 1) && fr_time_lt(my->info.last_sent, now)) {
327 my->info.last_sent = now;
328
329 if (fr_time_lteq(my->info.first_sent, my->info.last_idle)) my->info.first_sent = now;
330 }
331
332 /*
333 * Write out the packet. On failure release this item.
334 *
335 * If there's an error, we hope that the next "real" write will find the error, and do any
336 * necessary cleanups. Note that we can't call bio shutdown here, as the bio is controlled by the
337 * application, and not by us.
338 */
339 if (item->rewrite) {
340 rcode = item->rewrite(&my->bio, item, item->buffer, item->size);
341 } else {
342 rcode = my->rewrite(&my->bio, item, item->buffer, item->size);
343 }
344 if (rcode < 0) {
345 if (rcode == fr_bio_error(IO_WOULD_BLOCK)) return rcode;
346
348 return rcode;
349 }
350
351 /*
352 * We didn't write the whole packet, we're blocked.
353 */
354 if ((size_t) rcode < item->size) {
355 if (fr_bio_retry_save_write(my, item, rcode) < 0) return fr_bio_error(OOM);
356
357 return 0;
358 }
359
360 /*
361 * We wrote the whole packet. Re-insert it, which is done _without_ doing calls to
362 * cmp(), so we it's OK for us to rewrite item->retry.next.
363 */
364 (void) fr_rb_remove_by_inline_node(&my->next_retry_tree, &item->next_retry_node);
365 (void) fr_rb_insert(&my->next_retry_tree, item);
366
367 return 1;
368}
369
370/*
371 * Check for the "next next" retry. If that's still in the past,
372 * then skip it. But _don't_ update retry.count, as we don't
373 * send packets. Instead, just enforce MRD, etc.
374 */
376{
378
379 /*
380 * We can't be in this function if there's a partial packet. We must be in
381 * fr_bio_retry_write_partial().
382 */
383 fr_assert(!my->partial);
384 fr_assert(!my->info.write_blocked);
385
386 while ((item = fr_rb_first(&my->next_retry_tree)) != NULL) {
387 int rcode;
388
389 /*
390 * This item needs to be sent in the future, we're done.
391 */
392 if (fr_time_cmp(now, item->retry.next) > 0) break;
393
394 /*
395 * Write one item, and don't update timers.
396 */
397 rcode = fr_bio_retry_write_item(my, item, now);
398 if (rcode <= 0) return rcode;
399 }
400
401 /*
402 * Now that we've written multiple items, reset the timer.
403 *
404 * We do this at the end of the loop so that we don't update it for each item in the loop.
405 *
406 * @todo - set generic write error?
407 */
409
410 return 1;
411}
412
413
414/** Resume writes.
415 *
416 * On resume, we try to flush any pending packets which should have been sent.
417 */
419{
420 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
421 int rcode;
422
423 if (!my->info.write_blocked) return 1;
424
426 if (rcode <= 0) return rcode;
427
428 my->info.write_blocked = false;
429
432
433 return 1;
434}
435
436
437/** There's a partial packet written. Write all of that one first, before writing another packet.
438 *
439 * The packet can either be cancelled, or IO blocked. In either case, we must write the full packet before
440 * going on to the next one, OR retrying another packet.
441 */
442static ssize_t fr_bio_retry_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
443{
444 size_t used;
445 ssize_t rcode;
446 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
447 fr_bio_t *next;
448 fr_bio_retry_entry_t *item = my->partial;
449
450 fr_assert(!my->next_retry_item);
451 fr_assert(!my->ev);
452 fr_assert(my->partial != NULL);
453 fr_assert(my->buffer.start);
454
455 used = fr_bio_buf_used(&my->buffer);
456 fr_assert(used > 0);
457
458 /*
459 * There must be a next bio.
460 */
461 next = fr_bio_next(&my->bio);
462 fr_assert(next != NULL);
463
464 rcode = next->write(next, NULL, my->buffer.read, used);
465 if (rcode <= 0) return rcode;
466
467 my->buffer.read += rcode;
468
469 /*
470 * Still data in the buffer. We can't send more packets until we finished writing this one.
471 */
472 if (fr_bio_buf_used(&my->buffer) > 0) return 0;
473
474 /*
475 * We're done. Reset the buffer and clean up our cached partial packet.
476 */
477 fr_bio_buf_reset(&my->buffer);
478 my->partial = NULL;
479
480 /*
481 * The item was cancelled, which means it's no longer in the timer tree.
482 *
483 * If it's not cancelled, then we leave it in the tree, and run its timers s normal.
484 */
485 if (item->cancelled) {
486 item->packet_ctx = NULL;
487
488 fr_bio_retry_list_insert_head(&my->free, item);
489 }
490
491 rcode = fr_bio_retry_write_resume(&my->bio);
492 if (rcode <= 0) return rcode;
493
494 /*
495 * Try to write the packet which we were given.
496 */
497 my->bio.write = fr_bio_retry_write;
498 return fr_bio_retry_write(bio, packet_ctx, buffer, size);
499}
500
501/** Save a partial packet when the write becomes blocked.
502 */
504{
505 fr_assert(!my->partial);
506 fr_assert(rcode > 0);
507 fr_assert((size_t) rcode < item->size);
508
509 /*
510 * (re)-alloc the buffer for partial writes.
511 */
512 if (!my->buffer.start ||
513 (item->size > fr_bio_buf_size(&my->buffer))) {
514 if (fr_bio_buf_alloc(my, &my->buffer, item->size)) return fr_bio_error(OOM);
515 }
516
517 fr_assert(fr_bio_buf_used(&my->buffer) == 0);
518 fr_assert(my->buffer.read == my->buffer.start);
519
520 fr_bio_buf_write(&my->buffer, item->buffer + rcode, item->size - rcode);
521
522 my->partial = item;
523
524 /*
525 * If the "next" BIO blocked, then the call to fr_bio_write_blocked() will have already called
526 * this function.
527 */
528 if (fr_bio_retry_write_blocked(&my->bio) < 0) return fr_bio_error(GENERIC);
529
530 my->bio.write = fr_bio_retry_write_partial;
531
532 /*
533 * We leave the entry in the timer tree so that the expiry timer will get hit.
534 *
535 * And then return the size of the partial data we wrote.
536 */
537 return rcode;
538}
539
540
541/** Resend a packet.
542 *
543 * This function should be called by the rewrite() callback, after (possibly) re-encoding the packet.
544 *
545 * @param bio the binary IO handler
546 * @param item the retry context from #fr_bio_retry_sent_t
547 * @param buffer raw data for the packet. May be NULL, in which case the previous packet is retried
548 * @param size size of the raw data
549 * @return
550 * - <0 on error
551 * - 0 for "wrote no data"
552 * - >0 for "wrote data".
553 */
555{
556 ssize_t rcode;
557 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
558 fr_bio_t *next;
559
560 /*
561 * The caller may (accidentally or intentionally) call this function when there's a partial
562 * packet. The intention for rewrite() is that it is only called from timers, and those only run
563 * when the socket isn't blocked. But the caller might not pay attention to those issues.
564 */
565 if (my->partial) return 0;
566
567 /*
568 * There must be a next bio.
569 */
570 next = fr_bio_next(&my->bio);
571 fr_assert(next != NULL);
572
573 /*
574 * The caller should pass NULL for "use the previous packet".
575 */
576 if (buffer) {
577 item->buffer = buffer;
578 item->size = size;
579 }
580
581 /*
582 * Write out the packet, if everything is OK, return.
583 *
584 * Note that we don't update any timers if the write succeeded. That is handled by the caller.
585 */
586 rcode = next->write(next, item->packet_ctx, item->buffer, item->size);
587 if ((size_t) rcode == size) return rcode;
588
589 /*
590 * Can't write anything, be sad.
591 */
592 if (rcode == 0) return 0;
593
594 /*
595 * There's an error writing the packet. Release it, and move the item to the free list.
596 *
597 * Note that we don't bother resetting the timer. There's no point in changing the timer when
598 * the bio is likely dead.
599 */
600 if (rcode < 0) {
601 if (rcode == fr_bio_error(IO_WOULD_BLOCK)) return rcode;
602
604 return rcode;
605 }
606
607 /*
608 * We had previously written the packet, so save the re-sent one, too.
609 */
610 return fr_bio_retry_save_write(my, item, rcode);
611}
612
613/** A previous timer write had a fatal error, so we forbid further writes.
614 *
615 */
616static ssize_t fr_bio_retry_write_fatal(fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
617{
618 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
619 ssize_t rcode = my->error;
620
621 my->error = 0;
622 my->bio.write = fr_bio_null_write;
623
624 return rcode;
625}
626
627/** Run an expiry timer event.
628 *
629 */
631{
632 fr_bio_retry_t *my = talloc_get_type_abort(uctx, fr_bio_retry_t);
634 fr_time_t expires;
635
636 /*
637 * For the timer to be running, there must be a "first" entry which causes the timer to fire.
638 *
639 * There must also be no partially written entry. If the IO is blocked, then all timers are
640 * suspended.
641 */
642 fr_assert(my->next_retry_item != NULL);
643 fr_assert(!my->partial);
644 fr_assert(my->info.write_blocked);
645
646 /*
647 * We should be expiring at least one entry, so nuke the timers.
648 */
649 my->next_retry_item = NULL;
650
651 /*
652 * Expire all entries which are within 10ms of "now". That way we don't reset the event many
653 * times in short succession.
654 */
655 expires = fr_time_add(now, fr_time_delta_from_msec(10));
656
657 while ((item = fr_rb_first(&my->expiry_tree)) != NULL) {
658 if (fr_time_gt(item->retry.end, expires)) break;
659
661 }
662
664}
665
666/** Run a timer event. Usually to write out another packet.
667 *
668 */
670{
671 ssize_t rcode;
672 fr_bio_retry_t *my = talloc_get_type_abort(uctx, fr_bio_retry_t);
674
675 /*
676 * For the timer to be running, there must be a "first" entry which causes the timer to fire.
677 *
678 * There must also be no partially written entry. If the IO is blocked, then all timers are
679 * suspended.
680 */
681 fr_assert(my->next_retry_item != NULL);
682 fr_assert(my->partial == NULL);
683
684 item = my->next_retry_item;
685 my->next_retry_item = NULL;
686
687 /*
688 * Retry one item.
689 */
690 rcode = fr_bio_retry_write_item(my, item, now);
691 if (rcode < 0) {
692 if (rcode == fr_bio_error(IO_WOULD_BLOCK)) return;
693
694 my->error = rcode;
695 my->bio.write = fr_bio_retry_write_fatal;
696 return;
697 }
698
699 /*
700 * Partial write - no timers get set. We need to wait until the descriptor is writable.
701 */
702 if (rcode == 0) {
703 fr_assert(my->partial != NULL);
704 return;
705 }
706
707 /*
708 * We successfull wrote this item. Reset the timer to the next one, which is likely to be a
709 * different one from the item we just updated.
710 */
712}
713
714/** Write a request, and see if we have a reply.
715 *
716 */
717static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
718{
719 ssize_t rcode;
721 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
722 fr_bio_t *next;
723
724 fr_assert(!my->partial);
725
726 /*
727 * There must be a next bio.
728 */
729 next = fr_bio_next(&my->bio);
730 fr_assert(next != NULL);
731
732 /*
733 * The caller is trying to flush partial data. But we don't have any partial data, so just call
734 * the next bio to flush it.
735 */
736 if (!buffer) {
737 return next->write(next, packet_ctx, NULL, size);
738 }
739
740 /*
741 * Catch the corner case where the max number of saved packets is exceeded.
742 */
743 if (fr_bio_retry_list_num_elements(&my->free) == 0) {
744 /*
745 * Grab the first item which can be expired.
746 */
747 item = fr_rb_first(&my->expiry_tree);
748 fr_assert(item != NULL);
749
750 /*
751 * If the item has no replies, we can't cancel it. Otherwise, try to cancel it, which
752 * will give us a free slot. If we can't cancel it, tell the application that we're
753 * blocked.
754 *
755 * Note that we do NOT call fr_bio_retry_write_blocked(), as that assumes the IO is
756 * blocked, and will stop all of the timers. Instead, the IO is fine, but we have no way
757 * to send more packets.
758 */
759 if (!item->retry.replies || (fr_bio_retry_entry_cancel(bio, item) < 0)) {
760 /*
761 * Note that we're blocked BEFORE running the callback, so that calls to
762 * fr_bio_retry_write_blocked() doesn't delete timers and stop retrying packets.
763 */
764 my->info.write_blocked = true;
765 my->all_used = true;
766
767 /*
768 * Previous BIOs are blocked, but we still try to write retries.
769 */
770 rcode = fr_bio_write_blocked(bio);
771 if (rcode < 0) return rcode;
772
773 return fr_bio_error(IO_WOULD_BLOCK);
774 }
775
776 /*
777 * We now have a free item, so we can use it.
778 */
779 fr_assert(fr_bio_retry_list_num_elements(&my->free) > 0);
780 }
781
782 /*
783 * Write out the packet. If there's an error, OR we wrote nothing, return.
784 *
785 * Note that we don't mark the socket as blocked if the next bio didn't write anything. We want
786 * the caller to know that the write didn't succeed, and the caller takes care of managing the
787 * current packet. So there's no need for us to do that.
788 */
789 rcode = next->write(next, packet_ctx, buffer, size);
790 if (rcode <= 0) return rcode;
791
792 /*
793 * Initialize the retry timers after writing the packet.
794 */
795 item = fr_bio_retry_list_pop_head(&my->free);
796 fr_assert(item != NULL);
797
798 fr_assert(item->my == my);
800 .my = my,
801 .retry.start = fr_time(),
802 .packet_ctx = packet_ctx,
803 .buffer = buffer,
804 .size = size,
805 };
806
807 /*
808 * Always initialize the retry timer. That way the sent() callback doesn't have to call
809 * fr_time().
810 *
811 * The application can call fr_bio_retry_entry_init() to re-initialize it, but that's fine.
812 */
813 fr_retry_init(&item->retry, item->retry.start, &my->retry_config);
814
815 /*
816 * Tell the application that we've saved the packet. The "item" pointer allows the application
817 * to cancel this packet if necessary.
818 */
819 my->sent(bio, packet_ctx, buffer, size, item);
820
821 /*
822 * This should never fail.
823 */
824 (void) fr_rb_insert(&my->next_retry_tree, item);
825 (void) fr_rb_insert(&my->expiry_tree, item);
826
827 /*
828 * We only wrote part of the packet, remember to write the rest of it.
829 */
830 if ((size_t) rcode < size) {
831 return fr_bio_retry_save_write(my, item, rcode);
832 }
833
834 /*
835 * We've just inserted this packet into the timer tree, so it can't be used as the current timer.
836 * Once we've inserted it, we update the timer.
837 */
838 fr_assert(my->next_retry_item != item);
839
840 /*
841 * If we can't set the timer, then release this item.
842 */
843 if (fr_bio_retry_timer_reset(my) < 0) {
845 return fr_bio_error(GENERIC);
846 }
847
848 return size;
849}
850
851static ssize_t fr_bio_retry_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
852{
853 ssize_t rcode;
855 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
856 fr_bio_t *next;
857
858 /*
859 * There must be a next bio.
860 */
861 next = fr_bio_next(&my->bio);
862 fr_assert(next != NULL);
863
864 /*
865 * Read the packet. If error or nothing, return immediately.
866 */
867 rcode = next->read(next, packet_ctx, buffer, size);
868 if (rcode <= 0) return rcode;
869
870 /*
871 * Not a valid response to a request, OR a duplicate response to a request: don't return it to
872 * the caller.
873 *
874 * But if it is a duplicate response, update the counters and do cleanups as necessary.
875 */
876 item = NULL;
877 if (!my->response(bio, &item, packet_ctx, buffer, size)) {
878 if (!item) return 0;
879
880 item->retry.replies++;
881
882 /*
883 * We have enough replies. Release it.
884 */
885 if ((item->retry.replies >= item->retry.count) || !fr_time_delta_ispos(my->retry_config.mrd)) {
887 }
888
889 return 0;
890 }
891
892 fr_assert(item != NULL);
893 fr_assert(item->retry.replies == 0);
894 fr_assert(item != my->partial);
895
896 /*
897 * Track when the "most recently sent" packet has a reply. This metric is better than most
898 * others for judging the liveliness of the destination.
899 */
900 if (fr_time_lt(my->info.mrs_time, item->retry.start)) my->info.mrs_time = item->retry.start;
901
902 /*
903 * We have a new reply, remember when that happened. Note that we don't update this timer for
904 * duplicate replies, but perhaps we should?
905 */
906 my->info.last_reply = fr_time();
907
908 /*
909 * We have a new reply. If we've received all of the replies (i.e. one), OR we don't have a
910 * maximum lifetime for this request, then release it immediately.
911 */
912 item->retry.replies++;
913
914 /*
915 * We don't retry application-layer watchdog packets. And we don't run timers for them. The
916 * application is responsible for managing those timers itself.
917 */
918 if (item->reserved) return rcode;
919
920 /*
921 * There are no more packets to send, so this connection is idle.
922 *
923 * Note that partial packets aren't tracked in the timer tree. We can't do retransmits until the
924 * socket is writable.
925 */
926 if (fr_bio_retry_outstanding((fr_bio_t *) my) == 1) my->info.last_idle = my->info.last_reply;
927
928 /*
929 * We have enough replies. Release it.
930 */
931 if ((item->retry.replies >= item->retry.count) || !fr_time_delta_ispos(my->retry_config.mrd)) {
933 return rcode;
934 }
935
936 /*
937 * There are more replies pending. Wait passively for more replies, and clean up the item
938 * when the timer has expired.
939 */
940 item->retry.next = fr_time_add_time_delta(item->retry.start, my->retry_config.mrd);
941
942 (void) fr_rb_remove_by_inline_node(&my->next_retry_tree, &item->next_retry_node);
943 (void) fr_rb_insert(&my->next_retry_tree, item);
945
946 return rcode;
947}
948
949/*
950 * Order the retries by what we have to do next.
951 *
952 * Note that "retry.next" here is capped at "retry.end". So if we need to expire an entry, it will
953 * happen at the "next" retry.
954 */
955static int8_t _next_retry_cmp(void const *one, void const *two)
956{
957 fr_bio_retry_entry_t const *a = one;
958 fr_bio_retry_entry_t const *b = two;
959
960 fr_assert(a->buffer);
961 fr_assert(b->buffer);
962
963 return fr_time_cmp(a->retry.next, b->retry.next);
964}
965
966/*
967 * Order entries by when they expire, when we're not retrying.
968 *
969 * i.e. the socket is blocked, so all retries are paused.
970 */
971static int8_t _expiry_cmp(void const *one, void const *two)
972{
973 fr_bio_retry_entry_t const *a = one;
974 fr_bio_retry_entry_t const *b = two;
975
976 fr_assert(a->buffer);
977 fr_assert(b->buffer);
978
979 return fr_time_cmp(a->retry.end, b->retry.end);
980}
981
982/** Cancel one item.
983 *
984 * If "item" is NULL, the last entry in the timer tree is cancelled.
985 *
986 * @param bio the binary IO handler
987 * @param item the retry context from #fr_bio_retry_sent_t
988 * @return
989 * - <0 error
990 * - 0 - didn't cancel
991 * - 1 - did cancel
992 */
994{
995 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
996
997 /*
998 * No item passed, try to cancel the first one to expire.
999 */
1000 if (!item) {
1001 item = fr_rb_first(&my->expiry_tree);
1002 if (!item) return 0;
1003
1004 /*
1005 * This item hasn't had a response, we can't cancel it.
1006 */
1007 if (!item->retry.replies) return 0;
1008 }
1009
1010 /*
1011 * If the caller has cached a previously finished item, then that's a fatal error.
1012 */
1013 fr_assert(item->buffer != NULL);
1014
1016
1017 return 1;
1018}
1019
1020/** Set a per-packet retry config
1021 *
1022 * This function should be called from the #fr_bio_retry_sent_t callback to set a unique retry timer for this
1023 * packet. If no retry configuration is set, then the main one from the alloc() function is used.
1024 */
1026{
1027 fr_assert(item->buffer != NULL);
1028
1029 if (item->retry.config) return -1;
1030
1032
1033 fr_retry_init(&item->retry, item->retry.start, cfg);
1034
1035 return 0;
1036}
1037
1038/** Allow the callbacks / application to know when things are being retried.
1039 *
1040 * This is not initialized util _after_ fr_bio_retry_entry_start() has been called.
1041 */
1043{
1044 fr_assert(item->buffer != NULL);
1045
1046 if (!item->retry.config) return NULL;
1047
1048 return &item->retry;
1049}
1050
1051
1052/** Cancel all outstanding packets.
1053 *
1054 */
1056{
1059
1061
1062 /*
1063 * Cancel all outgoing packets. Don't bother updating the tree or the free list, as all of the
1064 * entries will be deleted when the memory is freed.
1065 */
1066 while ((item = fr_rb_iter_init_inorder(&iter, &my->next_retry_tree)) != NULL) {
1068 my->release((fr_bio_t *) my, item, FR_BIO_RETRY_CANCELLED);
1069 }
1070
1071 return 0;
1072}
1073
1074/** Orderly shutdown.
1075 *
1076 */
1078{
1079 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
1080
1082}
1083
1084/** Allocate a #fr_bio_retry_t
1085 *
1086 */
1087fr_bio_t *fr_bio_retry_alloc(TALLOC_CTX *ctx, size_t max_saved,
1089 fr_bio_retry_response_t response,
1090 fr_bio_retry_rewrite_t rewrite,
1091 fr_bio_retry_release_t release,
1092 fr_bio_retry_config_t const *cfg,
1093 fr_bio_t *next)
1094{
1095 size_t i;
1097 fr_bio_retry_entry_t *items;
1098
1099 fr_assert(cfg->el);
1100
1101 /*
1102 * Limit to reasonable values.
1103 */
1104 if (!max_saved) return NULL;
1105 if (max_saved > 65536) return NULL;
1106
1107 my = talloc_zero(ctx, fr_bio_retry_t);
1108 if (!my) return NULL;
1109
1110 /*
1111 * Allocate everything up front, to get better locality of reference, less memory fragmentation,
1112 * and better reuse of data structures.
1113 */
1114 items = talloc_array(my, fr_bio_retry_entry_t, max_saved);
1115 if (!items) return NULL;
1116
1117 /*
1118 * Insert the entries into the free list in order.
1119 */
1120 fr_bio_retry_list_init(&my->free);
1121 for (i = 0; i < max_saved; i++) {
1122 items[i].my = my;
1123 fr_bio_retry_list_insert_tail(&my->free, &items[i]);
1124 }
1125
1126 (void) fr_rb_inline_init(&my->next_retry_tree, fr_bio_retry_entry_t, next_retry_node, _next_retry_cmp, NULL);
1127 (void) fr_rb_inline_init(&my->expiry_tree, fr_bio_retry_entry_t, expiry_node, _expiry_cmp, NULL);
1128
1129 my->sent = sent;
1130 if (!rewrite) {
1131 my->rewrite = fr_bio_retry_rewrite;
1132 } else {
1133 my->rewrite = rewrite;
1134 }
1135 my->response = response;
1136 my->release = release;
1137
1138 my->info.last_idle = fr_time();
1139 my->info.el = cfg->el;
1140 my->info.cfg = cfg;
1141
1143
1144 my->bio.write = fr_bio_retry_write;
1145 my->bio.read = fr_bio_retry_read;
1146
1147 my->priv_cb.write_blocked = fr_bio_retry_write_blocked;
1148 my->priv_cb.write_resume = fr_bio_retry_write_resume;
1149 my->priv_cb.shutdown = fr_bio_retry_shutdown;
1150
1151 fr_bio_chain(&my->bio, next);
1152
1153 talloc_set_destructor(my, fr_bio_retry_destructor);
1154
1155 return (fr_bio_t *) my;
1156}
1157
1159{
1160 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
1161
1162 return &my->info;
1163}
1164
1166{
1167 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
1168 size_t num;
1169
1170 num = fr_rb_num_elements(&my->next_retry_tree);
1171
1172 if (!my->partial) return num;
1173
1174 /*
1175 * Only count partially written items if they haven't been cancelled.
1176 */
1177 return num + !my->partial->cancelled;
1178}
1179
1180/** Reserve an entry for later use with fr_bio_retry_rewrite()
1181 *
1182 * So that application-layer watchdogs can bypass the normal write / retry routines.
1183 */
1185{
1186 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
1188
1189 item = fr_bio_retry_list_pop_head(&my->free);
1190 if (!item) return NULL;
1191
1192 fr_assert(item->my == my);
1194 .my = my,
1195 .reserved = true,
1196 };
1197
1198 return item;
1199}
1200
static int const char char buffer[256]
Definition acutest.h:576
fr_bio_write_t _CONST write
write to the underlying bio
Definition base.h:116
fr_bio_read_t _CONST read
read from the underlying bio
Definition base.h:115
static fr_bio_t * fr_bio_next(fr_bio_t *bio)
Definition base.h:130
#define fr_bio_error(_x)
Definition base.h:192
static ssize_t fr_bio_retry_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
Definition retry.c:851
static int8_t _expiry_cmp(void const *one, void const *two)
Definition retry.c:971
fr_bio_retry_release_t release
callback to release a request / response pair
Definition retry.c:120
fr_rb_tree_t expiry_tree
when packets expire, so that we expire packets when the socket is blocked.
Definition retry.c:93
int fr_bio_retry_entry_init(UNUSED fr_bio_t *bio, fr_bio_retry_entry_t *item, fr_retry_config_t const *cfg)
Set a per-packet retry config.
Definition retry.c:1025
fr_bio_t * fr_bio_retry_alloc(TALLOC_CTX *ctx, size_t max_saved, fr_bio_retry_sent_t sent, fr_bio_retry_response_t response, fr_bio_retry_rewrite_t rewrite, fr_bio_retry_release_t release, fr_bio_retry_config_t const *cfg, fr_bio_t *next)
Allocate a fr_bio_retry_t.
Definition retry.c:1087
static ssize_t fr_bio_retry_save_write(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode)
Save a partial packet when the write becomes blocked.
Definition retry.c:503
fr_bio_buf_t buffer
to store partial packets
Definition retry.c:122
static void fr_bio_retry_release(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, fr_bio_retry_release_reason_t reason)
Release an entry back to the free list.
Definition retry.c:212
size_t fr_bio_retry_outstanding(fr_bio_t *bio)
Definition retry.c:1165
#define fr_bio_retry_timer_clear(_x)
Definition retry.c:132
static int fr_bio_retry_destructor(fr_bio_retry_t *my)
Cancel all outstanding packets.
Definition retry.c:1055
int fr_bio_retry_entry_cancel(fr_bio_t *bio, fr_bio_retry_entry_t *item)
Cancel one item.
Definition retry.c:993
struct fr_bio_retry_list_s fr_bio_retry_list_t
Definition retry.c:56
ssize_t fr_bio_retry_rewrite(fr_bio_t *bio, fr_bio_retry_entry_t *item, const void *buffer, size_t size)
Resend a packet.
Definition retry.c:554
static int fr_bio_retry_timer_reset(fr_bio_retry_t *my)
Reset the timer after changing the rb tree.
Definition retry.c:173
static void fr_bio_retry_shutdown(fr_bio_t *bio)
Orderly shutdown.
Definition retry.c:1077
static int8_t _next_retry_cmp(void const *one, void const *two)
Definition retry.c:955
static int fr_bio_retry_write_item(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, fr_time_t now)
Write one item.
Definition retry.c:303
static void fr_bio_retry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx)
Run a timer event.
Definition retry.c:669
fr_retry_config_t retry_config
Definition retry.c:97
static int fr_bio_retry_write_delayed(fr_bio_retry_t *my, fr_time_t now)
Definition retry.c:375
fr_bio_retry_entry_t * partial
for partial writes
Definition retry.c:115
const fr_retry_t * fr_bio_retry_entry_info(UNUSED fr_bio_t *bio, fr_bio_retry_entry_t *item)
Allow the callbacks / application to know when things are being retried.
Definition retry.c:1042
fr_rb_tree_t next_retry_tree
when packets are retried next
Definition retry.c:92
static int fr_bio_retry_expiry_timer_reset(fr_bio_retry_t *my)
Reset the expiry timer after expiring one element.
Definition retry.c:140
fr_bio_retry_sent_t sent
callback for when we successfully sent a packet
Definition retry.c:117
fr_bio_retry_entry_t * next_retry_item
for timers
Definition retry.c:109
fr_bio_retry_response_t response
callback to see if we got a valid response
Definition retry.c:119
static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write a request, and see if we have a reply.
Definition retry.c:717
ssize_t error
Definition retry.c:99
static ssize_t fr_bio_retry_write_fatal(fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
A previous timer write had a fatal error, so we forbid further writes.
Definition retry.c:616
bool all_used
blocked due to no free entries
Definition retry.c:100
static int fr_bio_retry_write_blocked(fr_bio_t *bio)
Writes are blocked.
Definition retry.c:278
static ssize_t fr_bio_retry_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
There's a partial packet written.
Definition retry.c:442
fr_bio_retry_info_t info
Definition retry.c:95
fr_bio_retry_rewrite_t rewrite
optional callback which can change a packet on retry
Definition retry.c:118
static void fr_bio_retry_expiry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx)
Run an expiry timer event.
Definition retry.c:630
fr_event_timer_t const * ev
we only need one timer event: next time we do something
Definition retry.c:102
fr_bio_retry_entry_t * fr_bio_retry_item_reserve(fr_bio_t *bio)
Reserve an entry for later use with fr_bio_retry_rewrite()
Definition retry.c:1184
fr_bio_retry_info_t const * fr_bio_retry_info(fr_bio_t *bio)
Definition retry.c:1158
static int fr_bio_retry_write_resume(fr_bio_t *bio)
Resume writes.
Definition retry.c:418
void * rewrite_ctx
context specifically for rewriting this packet
Definition retry.c:68
fr_retry_config_t retry_config
base retry config
Definition retry.h:47
void(* fr_bio_retry_release_t)(fr_bio_t *bio, fr_bio_retry_entry_t *retry_ctx, fr_bio_retry_release_reason_t reason)
Callback on release the packet (timeout or have all replies)
Definition retry.h:136
fr_retry_t retry
retry timers and counters
Definition retry.c:70
fr_bio_retry_release_reason_t
Definition retry.h:79
@ FR_BIO_RETRY_WRITE_ERROR
Definition retry.h:83
@ FR_BIO_RETRY_CANCELLED
Definition retry.h:82
@ FR_BIO_RETRY_DONE
Definition retry.h:80
@ FR_BIO_RETRY_NO_REPLY
Definition retry.h:81
struct fr_bio_retry_entry_s fr_bio_retry_entry_t
Definition retry.h:64
uint8_t const * buffer
cached copy of the packet to send
Definition retry.c:80
size_t size
size of the cached packet
Definition retry.c:81
bool reserved
for application-layer watchdog
Definition retry.c:84
void(* fr_bio_retry_sent_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size, fr_bio_retry_entry_t *retry_ctx)
Callback for when a packet is sent.
Definition retry.h:98
ssize_t(* fr_bio_retry_rewrite_t)(fr_bio_t *bio, fr_bio_retry_entry_t *retry_ctx, const void *buffer, size_t size)
Definition retry.h:66
bool(* fr_bio_retry_response_t)(fr_bio_t *bio, fr_bio_retry_entry_t **item_p, void *packet_ctx, const void *buffer, size_t size)
Callback on read to see if a packet is a response.
Definition retry.h:123
fr_event_list_t * el
event list
Definition retry.h:45
void * packet_ctx
packet_ctx from the write() call
Definition retry.c:66
fr_rb_node_t expiry_node
for expiries
Definition retry.c:76
bool cancelled
was this item cancelled?
Definition retry.c:83
fr_bio_retry_t * my
so we can get to it from the event timer callback
Definition retry.c:78
fr_bio_retry_rewrite_t rewrite
per-packet rewrite callback
Definition retry.c:67
void * uctx
user-writable context
Definition retry.c:65
Definition retry.c:64
static void fr_bio_chain(fr_bio_t *first, fr_bio_t *second)
Chain one bio after another.
Definition bio_priv.h:69
int fr_bio_buf_alloc(TALLOC_CTX *ctx, fr_bio_buf_t *bio_buf, size_t size)
Definition buf.c:114
ssize_t fr_bio_buf_write(fr_bio_buf_t *bio_buf, const void *buffer, size_t size)
Definition buf.c:81
static size_t fr_bio_buf_used(fr_bio_buf_t const *bio_buf)
Definition buf.h:73
static void fr_bio_buf_reset(fr_bio_buf_t *bio_buf)
Definition buf.h:61
static size_t fr_bio_buf_size(fr_bio_buf_t const *bio_buf)
Definition buf.h:151
#define UNUSED
Definition build.h:315
#define FR_DLIST_TYPES(_name)
Define type specific wrapper structs for dlists.
Definition dlist.h:1129
#define FR_DLIST_ENTRY(_name)
Expands to the type name used for the entry wrapper structure.
Definition dlist.h:1115
#define FR_DLIST_FUNCS(_name, _element_type, _element_entry)
Define type specific wrapper functions for dlists.
Definition dlist.h:1152
#define FR_DLIST_HEAD(_name)
Expands to the type name used for the head wrapper structure.
Definition dlist.h:1122
#define fr_event_timer_at(...)
Definition event.h:250
fr_bio_shutdown & my
Definition fd_errno.h:59
free(array)
int fr_bio_write_blocked(fr_bio_t *bio)
Internal BIO function to tell all BIOs that it's blocked.
Definition base.c:293
Stores all information relating to an event list.
Definition event.c:411
A timer event.
Definition event.c:102
static void * item(fr_lst_t const *lst, fr_lst_index_t idx)
Definition lst.c:122
long int ssize_t
unsigned char uint8_t
static size_t used
ssize_t fr_bio_null_write(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
Always return 0 on write.
Definition null.c:39
#define fr_assert(_expr)
Definition rad_assert.h:38
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
Return how many nodes there are in a tree.
Definition rb.c:781
void * fr_rb_iter_init_inorder(fr_rb_iter_inorder_t *iter, fr_rb_tree_t *tree)
Initialise an in-order iterator.
Definition rb.c:824
void fr_rb_iter_delete_inorder(fr_rb_iter_inorder_t *iter)
Remove the current node from the tree.
Definition rb.c:898
void * fr_rb_remove_by_inline_node(fr_rb_tree_t *tree, fr_rb_node_t *node)
Remove an entry from the tree, using the node structure, without freeing the data.
Definition rb.c:722
void * fr_rb_first(fr_rb_tree_t *tree)
Definition rb.c:786
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
#define fr_rb_inline_init(_tree, _type, _field, _data_cmp, _data_free)
Initialises a red black tree.
Definition rb.h:180
Iterator structure for in-order traversal of an rbtree.
Definition rb.h:321
The main red black tree structure.
Definition rb.h:73
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition time.h:575
static fr_time_t fr_time_add_time_delta(fr_time_t a, fr_time_delta_t b)
Definition time.h:173
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
Definition time.h:154
#define fr_time_lteq(_a, _b)
Definition time.h:240
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_gt(_a, _b)
Definition time.h:237
#define fr_time_lt(_a, _b)
Definition time.h:239
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition time.h:916
"server local" time.
Definition time.h:69
static fr_event_list_t * el
fr_retry_state_t fr_retry_next(fr_retry_t *r, fr_time_t now)
Initialize a retransmission counter.
Definition retry.c:108
void fr_retry_init(fr_retry_t *r, fr_time_t now, fr_retry_config_t const *config)
Initialize a retransmission counter.
Definition retry.c:36
fr_time_delta_t irt
Initial transmission time.
Definition retry.h:33
fr_retry_state_t
Definition retry.h:45
@ FR_RETRY_CONTINUE
Definition retry.h:46
fr_time_t end
when we will end the retransmissions
Definition retry.h:54
fr_time_t next
when the next timer should be set
Definition retry.h:55