The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
retry.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: b6f78cd84e5a94dcf21c9d4a459069127f691a10 $
19 * @file lib/bio/retry.c
20 * @brief Binary IO abstractions for retrying packets.
21 *
22 * The retry BIO provides a mechanism for the application to send one packet, and then delegate
23 * retransmissions to the retry bio.
24 *
25 * This BIO will monitor writes, and run callbacks when a packet is sent, received, and released. The
26 * application should cache the request and response until the release callback has been run. The BIO will
27 * call the application on retries, or when the retransmissions have stopped.
28 *
29 * The retry BIO also deals with partially written packets. The BIO takes responsibility for not writing
30 * partial packets, which means that requests can be rleeased even if the data has been partially written.
31 * The application can also cancel an ongoing retryt entrty at any time.
32 *
33 * If something blocks IO, the application should call the blocked / resume functions for this BIO to inform
34 * it of IO changes. Otherwise, the only time this BIO blocks is when it runs out of retransmission slots.
35 *
36 * There are provisions for application-layer watchdogs, where the application can reserve a retry entry. It
37 * can then call the fr_bio_retry_rewrite() function instead of fr_bio_write() to write the watchdog packet.
38 * Any retransmission timers for the application-layer watchdog must be handled by the application. The BIO
39 * will not retry reserved watchdog requests.
40 *
41 * In general, the next BIO after this one should be the memory bio, so that this bio receives only complete
42 * packets.
43 *
44 * @copyright 2024 Network RADIUS SAS (legal@networkradius.com)
45 */
46
47#include <freeradius-devel/bio/bio_priv.h>
48#include <freeradius-devel/bio/null.h>
49#include <freeradius-devel/bio/buf.h>
50#include <freeradius-devel/util/rb.h>
51#include <freeradius-devel/util/dlist.h>
52
53#define _BIO_RETRY_PRIVATE
54#include <freeradius-devel/bio/retry.h>
55
56typedef struct fr_bio_retry_list_s fr_bio_retry_list_t;
58
59/*
60 * Define type-safe wrappers for head and entry definitions.
61 */
62FR_DLIST_TYPES(fr_bio_retry_list)
63
65 void *uctx;
67 fr_bio_retry_rewrite_t rewrite; //!< per-packet rewrite callback
68 void *rewrite_ctx; //!< context specifically for rewriting this packet
69
70 fr_retry_t retry; //!< retry timers and counters
71
72 union {
73 fr_rb_node_t next_retry_node; //!< for retries
74 FR_DLIST_ENTRY(fr_bio_retry_list) entry; //!< for the free list
75 };
76 fr_rb_node_t expiry_node; //!< for expiries
77
78 fr_bio_retry_t *my; //!< so we can get to it from the event timer callback
79
80 uint8_t const *buffer; //!< cached copy of the packet to send
81 size_t size; //!< size of the cached packet
82
83 bool cancelled; //!< was this item cancelled?
84 bool reserved; //!< for application-layer watchdog
85};
86
87FR_DLIST_FUNCS(fr_bio_retry_list, fr_bio_retry_entry_t, entry)
88
91
92 fr_timer_list_t *next_tl; //!< when packets are retried next
93 fr_timer_list_t *expiry_tl; //!< when packets expire, so that we expire packets when the socket is blocked.
94
96
98
100 bool all_used; //!< blocked due to no free entries
101
102 /*
103 * Cache a partial write when IO is blocked. Partial
104 * packets are left in the timer tree so that they can be expired.
105 */
106 fr_bio_retry_entry_t *partial; //!< for partial writes
107
108 fr_bio_retry_sent_t sent; //!< callback for when we successfully sent a packet
109 fr_bio_retry_rewrite_t rewrite; //!< optional callback which can change a packet on retry
110 fr_bio_retry_response_t response; //!< callback to see if we got a valid response
111 fr_bio_retry_release_t release; //!< callback to release a request / response pair
112
113 fr_bio_buf_t buffer; //!< to store partial packets
114
115 FR_DLIST_HEAD(fr_bio_retry_list) free; //!< free lists are better than memory fragmentation
116};
117
118static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
120
121/** Release an entry back to the free list.
122 *
123 */
125{
126 item->cancelled = true;
127
128 /*
129 * Remove the item from all timer lists before calling the application "release" function.
130 *
131 * reserved items (e.g. application-layer watchdogs like Status-Server) are run by the
132 * application, and aren't inserted into any tree.
133 */
134 if (!item->reserved) {
135 (void) fr_timer_uctx_remove(my->next_tl, item);
136 (void) fr_timer_uctx_remove(my->expiry_tl, item);
137 }
138
139 /*
140 * Tell the caller that we've released it before doing anything else. That way we can safely
141 * modify anything we want.
142 */
143 my->release((fr_bio_t *) my, item, reason);
144
145 /*
146 * We've partially written this item. Don't bother changing it's position in any of the lists,
147 * as it's in progress.
148 */
149 if (my->partial == item) return;
150
151 /*
152 * This item is reserved. The application has cached a pointer to it, so it never gets returned
153 * to the free list.
154 */
155 if (item->reserved) return;
156
157 /*
158 * If we were blocked due to having no free entries, then we can resume writes, since we now have
159 * a free entry.
160 */
161 if (my->all_used) {
162 fr_assert(fr_bio_retry_list_num_elements(&my->free) == 0);
163
164 /*
165 * The application MUST call fr_bio_retry_write_resume(), which will check if IO is
166 * actually blocked.
167 *
168 * @todo - make this function return a failure, OR update the ctx with a failure? OR
169 * call a bio error function on failure? That way we can just call write_resume() from here.
170 */
171 my->all_used = false;
172
173 if (!my->info.write_blocked && my->cb.write_resume) (void) my->cb.write_resume(&my->bio);
174 }
175
176 item->packet_ctx = NULL;
177
178 fr_bio_retry_list_insert_head(&my->free, item);
179}
180
181/** Writes are blocked.
182 *
183 */
185{
186 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
187
188 if (my->info.write_blocked) {
189 return 1;
190 }
191
192 /*
193 * Disarm the retry timer, and enable the expiry timer.
194 *
195 * i.e. we won't retry packets, but we will expire them when their timer runs out.
196 */
197 if (fr_timer_list_disarm(my->next_tl) < 0) return fr_bio_error(GENERIC);
198
199 if (fr_timer_list_arm(my->expiry_tl) < 0) return fr_bio_error(GENERIC);
200
201 my->info.write_blocked = true;
202
203 return 1;
204}
205
206
207/** Write one item.
208 *
209 * @return
210 * - <0 on error
211 * - 0 for "can't write any more"
212 * - 1 for "wrote a packet"
213 */
215{
216 ssize_t rcode;
217 fr_retry_state_t state;
218
219 fr_assert(!my->partial);
220 fr_assert(!item->reserved);
221
222 /*
223 * Are we there yet?
224 *
225 * Release it, indicating whether or not we successfully got a reply.
226 */
227 state = fr_retry_next(&item->retry, now);
228 if (state != FR_RETRY_CONTINUE) {
230 return 1;
231 }
232
233 /*
234 * Track when we last sent a NEW packet. Also track when we first sent a packet after becoming
235 * writeable again.
236 */
237 if ((item->retry.count == 1) && fr_time_lt(my->info.last_sent, now)) {
238 my->info.last_sent = now;
239
240 if (fr_time_lteq(my->info.first_sent, my->info.last_idle)) my->info.first_sent = now;
241 }
242
243 fr_assert(fr_time_gt(item->retry.next, now));
244
245 /*
246 * We rewrote the "next" timer. Remove the item from the timer tree, which doesn't call the cmp
247 * function and therefore doesn't care that the time has changed. Then re-insert it, which does
248 * call the cmp function.
249 */
250 (void) fr_timer_uctx_remove(my->next_tl, item);
251 (void) fr_timer_uctx_insert(my->next_tl, item);
252
253 /*
254 * Write out the packet. On failure release this item.
255 *
256 * If there's an error, we hope that the next "real" write will find the error, and do any
257 * necessary cleanups. Note that we can't call bio shutdown here, as the bio is controlled by the
258 * application, and not by us.
259 */
260 if (item->rewrite) {
261 rcode = item->rewrite(&my->bio, item, item->buffer, item->size);
262 } else {
263 rcode = my->rewrite(&my->bio, item, item->buffer, item->size);
264 }
265 if (rcode < 0) {
266 if (rcode == fr_bio_error(IO_WOULD_BLOCK)) return rcode;
267
269 return rcode;
270 }
271
272 /*
273 * We didn't write the whole packet, we're blocked.
274 */
275 if ((size_t) rcode < item->size) {
276 if (fr_bio_retry_save_write(my, item, rcode) < 0) return fr_bio_error(OOM);
277
278 return 0;
279 }
280
281 return 1;
282}
283
284/** Resume writes.
285 *
286 * On resume, we try to flush any pending packets which should have been sent.
287 */
289{
290 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
291
292 if (!my->info.write_blocked) return 1;
293
294 my->info.write_blocked = false;
295
296 /*
297 * Disarm the expiry list, and rearm the next retry list.
298 *
299 * Rearming the next retry list will cause all pending events to be run. Which means calling the
300 * write routine for each item. If the write ends up blocking, it will disarm the next retry
301 * timer, re-arm the expiry timer, and then set the write_blocked flag.
302 */
303 (void) fr_timer_list_disarm(my->expiry_tl);
304 (void) fr_timer_list_arm(my->next_tl);
305
306 return !my->info.write_blocked; /* return 0 for "can't resume" and 1 for "can resume" */
307}
308
309
310/** There's a partial packet written. Write all of that one first, before writing another packet.
311 *
312 * The packet can either be cancelled, or IO blocked. In either case, we must write the full packet before
313 * going on to the next one, OR retrying another packet.
314 */
315static ssize_t fr_bio_retry_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
316{
317 size_t used;
318 ssize_t rcode;
319 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
320 fr_bio_t *next;
321 fr_bio_retry_entry_t *item = my->partial;
322
323 fr_assert(my->partial != NULL);
324 fr_assert(my->buffer.start);
325
326 used = fr_bio_buf_used(&my->buffer);
327 fr_assert(used > 0);
328
329 /*
330 * There must be a next bio.
331 */
332 next = fr_bio_next(&my->bio);
333 fr_assert(next != NULL);
334
335 rcode = next->write(next, NULL, my->buffer.read, used);
336 if (rcode <= 0) return rcode;
337
338 my->buffer.read += rcode;
339
340 /*
341 * Still data in the buffer. We can't send more packets until we finished writing this one.
342 */
343 if (fr_bio_buf_used(&my->buffer) > 0) return 0;
344
345 /*
346 * We're done. Reset the buffer and clean up our cached partial packet.
347 */
348 fr_bio_buf_reset(&my->buffer);
349 my->partial = NULL;
350
351 /*
352 * The item was cancelled, which means it's no longer in the timer tree.
353 *
354 * If it's not cancelled, then we leave it in the tree, and run its timers s normal.
355 */
356 if (item->cancelled) {
357 item->packet_ctx = NULL;
358
359 fr_bio_retry_list_insert_head(&my->free, item);
360 }
361
362 rcode = fr_bio_retry_write_resume(&my->bio);
363 if (rcode <= 0) return rcode;
364
365 /*
366 * Try to write the packet which we were given.
367 */
368 my->bio.write = fr_bio_retry_write;
369 return fr_bio_retry_write(bio, packet_ctx, buffer, size);
370}
371
372/** Save a partial packet when the write becomes blocked.
373 */
375{
376 fr_assert(!my->partial);
377 fr_assert(rcode > 0);
378 fr_assert((size_t) rcode < item->size);
379
380 /*
381 * (re)-alloc the buffer for partial writes.
382 */
383 if (!my->buffer.start ||
384 (item->size > fr_bio_buf_size(&my->buffer))) {
385 if (fr_bio_buf_alloc(my, &my->buffer, item->size)) return fr_bio_error(OOM);
386 }
387
388 fr_assert(fr_bio_buf_used(&my->buffer) == 0);
389 fr_assert(my->buffer.read == my->buffer.start);
390
391 fr_bio_buf_write(&my->buffer, item->buffer + rcode, item->size - rcode);
392
393 my->partial = item;
394
395 /*
396 * If the "next" BIO blocked, then the call to fr_bio_write_blocked() will have already called
397 * this function.
398 */
399 if (fr_bio_retry_write_blocked(&my->bio) < 0) return fr_bio_error(GENERIC);
400
401 my->bio.write = fr_bio_retry_write_partial;
402
403 /*
404 * We leave the entry in the timer tree so that the expiry timer will get hit.
405 *
406 * And then return the size of the partial data we wrote.
407 */
408 return rcode;
409}
410
411
412/** Resend a packet.
413 *
414 * This function should be called by the rewrite() callback, after (possibly) re-encoding the packet.
415 *
416 * @param bio the binary IO handler
417 * @param item the retry context from #fr_bio_retry_sent_t
418 * @param buffer raw data for the packet. May be NULL, in which case the previous packet is retried
419 * @param size size of the raw data
420 * @return
421 * - <0 on error
422 * - 0 for "wrote no data"
423 * - >0 for "wrote data".
424 */
426{
427 ssize_t rcode;
428 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
429 fr_bio_t *next;
430
431 /*
432 * The caller may (accidentally or intentionally) call this function when there's a partial
433 * packet. The intention for rewrite() is that it is only called from timers, and those only run
434 * when the socket isn't blocked. But the caller might not pay attention to those issues.
435 */
436 if (my->partial) return 0;
437
438 /*
439 * There must be a next bio.
440 */
441 next = fr_bio_next(&my->bio);
442 fr_assert(next != NULL);
443
444 /*
445 * The caller should pass NULL for "use the previous packet".
446 */
447 if (buffer) {
448 item->buffer = buffer;
449 item->size = size;
450 }
451
452 /*
453 * Write out the packet, if everything is OK, return.
454 *
455 * Note that we don't update any timers if the write succeeded. That is handled by the caller.
456 */
457 rcode = next->write(next, item->packet_ctx, item->buffer, item->size);
458 if ((size_t) rcode == size) return rcode;
459
460 /*
461 * Can't write anything, be sad.
462 */
463 if (rcode == 0) return 0;
464
465 /*
466 * There's an error writing the packet. Release it, and move the item to the free list.
467 *
468 * Note that we don't bother resetting the timer. There's no point in changing the timer when
469 * the bio is likely dead.
470 */
471 if (rcode < 0) {
472 if (rcode == fr_bio_error(IO_WOULD_BLOCK)) return rcode;
473
475 return rcode;
476 }
477
478 /*
479 * We had previously written the packet, so save the re-sent one, too.
480 */
481 return fr_bio_retry_save_write(my, item, rcode);
482}
483
484/** A previous timer write had a fatal error, so we forbid further writes.
485 *
486 */
487static ssize_t fr_bio_retry_write_fatal(fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
488{
489 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
490 ssize_t rcode = my->error;
491
492 my->error = 0;
493 my->bio.write = fr_bio_null_write;
494
495 return rcode;
496}
497
498/** Run an expiry timer event.
499 *
500 */
502{
503 fr_bio_retry_entry_t *item = talloc_get_type_abort(uctx, fr_bio_retry_entry_t);
504 fr_bio_retry_t *my = item->my;
505
506 /*
507 * We only expire entries if writing is blocked.
508 */
509 fr_assert(my->info.write_blocked);
510
511 /*
512 * An item is DONE if it received a reply, then waited for another reply, and then the socket
513 * became blocked.
514 */
516}
517
518/** Run a timer event. Usually to write out another packet.
519 *
520 */
522{
523 ssize_t rcode;
524 fr_bio_retry_entry_t *item = talloc_get_type_abort(uctx, fr_bio_retry_entry_t);
525 fr_bio_retry_t *my = item->my;
526
527 fr_assert(my->partial == NULL);
528 fr_assert(!my->info.write_blocked);
529
530 /*
531 * Retry one item.
532 */
533 rcode = fr_bio_retry_write_item(my, item, now);
534 if (rcode < 0) {
535 if (rcode == fr_bio_error(IO_WOULD_BLOCK)) return;
536
537 my->error = rcode;
538 my->bio.write = fr_bio_retry_write_fatal;
539 return;
540 }
541}
542
543/** Write a request, and see if we have a reply.
544 *
545 */
546static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
547{
548 ssize_t rcode;
550 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
551 fr_bio_t *next;
552
553 fr_assert(!my->partial);
554
555 /*
556 * There must be a next bio.
557 */
558 next = fr_bio_next(&my->bio);
559 fr_assert(next != NULL);
560
561 /*
562 * The caller is trying to flush partial data. But we don't have any partial data, so just call
563 * the next bio to flush it.
564 */
565 if (!buffer) {
566 return next->write(next, packet_ctx, NULL, size);
567 }
568
569 /*
570 * Catch the corner case where the max number of saved packets is exceeded.
571 */
572 if (fr_bio_retry_list_num_elements(&my->free) == 0) {
573 /*
574 * Grab the first item which can be expired.
575 */
576 item = fr_timer_uctx_peek(my->expiry_tl);
577 fr_assert(item != NULL);
578
579 /*
580 * If the item has no replies, we can't cancel it. Otherwise, try to cancel it, which
581 * will give us a free slot. If we can't cancel it, tell the application that we're
582 * blocked.
583 *
584 * Note that we do NOT call fr_bio_retry_write_blocked(), as that assumes the IO is
585 * blocked, and will stop all of the timers. Instead, the IO is fine, but we have no way
586 * to send more packets.
587 */
588 if (!item->retry.replies || (fr_bio_retry_entry_cancel(bio, item) < 0)) {
589 /*
590 * Note that we're blocked BEFORE running the callback, so that calls to
591 * fr_bio_retry_write_blocked() doesn't delete timers and stop retrying packets.
592 */
593 my->info.write_blocked = true;
594 my->all_used = true;
595
596 /*
597 * Previous BIOs are blocked, but we still try to write retries.
598 */
599 rcode = fr_bio_write_blocked(bio);
600 if (rcode < 0) return rcode;
601
602 return fr_bio_error(IO_WOULD_BLOCK);
603 }
604
605 /*
606 * We now have a free item, so we can use it.
607 */
608 fr_assert(fr_bio_retry_list_num_elements(&my->free) > 0);
609 }
610
611 /*
612 * Write out the packet. If there's an error, OR we wrote nothing, return.
613 *
614 * Note that we don't mark the socket as blocked if the next bio didn't write anything. We want
615 * the caller to know that the write didn't succeed, and the caller takes care of managing the
616 * current packet. So there's no need for us to do that.
617 */
618 rcode = next->write(next, packet_ctx, buffer, size);
619 if (rcode <= 0) return rcode;
620
621 /*
622 * Initialize the retry timers after writing the packet.
623 */
624 item = fr_bio_retry_list_pop_head(&my->free);
625 fr_assert(item != NULL);
626
627 fr_assert(item->my == my);
629 .my = my,
630 .retry.start = fr_time(),
631 .packet_ctx = packet_ctx,
632 .buffer = buffer,
633 .size = size,
634 };
635
636 /*
637 * Always initialize the retry timer. That way the sent() callback doesn't have to call
638 * fr_time().
639 *
640 * The application can call fr_bio_retry_entry_init() to re-initialize it, but that's fine.
641 */
642 fr_retry_init(&item->retry, item->retry.start, &my->retry_config);
643
644 /*
645 * Tell the application that we've saved the packet. The "item" pointer allows the application
646 * to cancel this packet if necessary.
647 */
648 my->sent(bio, packet_ctx, buffer, size, item);
649
650 /*
651 * This should never fail.
652 */
653 (void) fr_timer_uctx_insert(my->next_tl, item);
654 (void) fr_timer_uctx_insert(my->expiry_tl, item);
655
656 /*
657 * We only wrote part of the packet, remember to write the rest of it.
658 */
659 if ((size_t) rcode < size) {
660 return fr_bio_retry_save_write(my, item, rcode);
661 }
662
663 return size;
664}
665
666static ssize_t fr_bio_retry_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
667{
668 ssize_t rcode;
670 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
671 fr_bio_t *next;
672
673 /*
674 * There must be a next bio.
675 */
676 next = fr_bio_next(&my->bio);
677 fr_assert(next != NULL);
678
679 /*
680 * Read the packet. If error or nothing, return immediately.
681 */
682 rcode = next->read(next, packet_ctx, buffer, size);
683 if (rcode <= 0) return rcode;
684
685 /*
686 * Not a valid response to a request, OR a duplicate response to a request: don't return it to
687 * the caller.
688 *
689 * But if it is a duplicate response, update the counters and do cleanups as necessary.
690 */
691 item = NULL;
692 if (!my->response(bio, &item, packet_ctx, buffer, size)) {
693 if (!item) return 0;
694
695 item->retry.replies++;
696
697 /*
698 * We have enough replies. Release it.
699 */
700 if ((item->retry.replies >= item->retry.count) || !fr_time_delta_ispos(my->retry_config.mrd)) {
702 }
703
704 return 0;
705 }
706
707 fr_assert(item != NULL);
708 fr_assert(item->retry.replies == 0);
709 fr_assert(item != my->partial);
710
711 /*
712 * Track when the "most recently sent" packet has a reply. This metric is better than most
713 * others for judging the liveliness of the destination.
714 */
715 if (fr_time_lt(my->info.mrs_time, item->retry.start)) my->info.mrs_time = item->retry.start;
716
717 /*
718 * We have a new reply, remember when that happened. Note that we don't update this timer for
719 * duplicate replies, but perhaps we should?
720 */
721 my->info.last_reply = fr_time();
722
723 /*
724 * We have a new reply. If we've received all of the replies (i.e. one), OR we don't have a
725 * maximum lifetime for this request, then release it immediately.
726 */
727 item->retry.replies++;
728
729 /*
730 * We don't retry application-layer watchdog packets. And we don't run timers for them. The
731 * application is responsible for managing those timers itself.
732 */
733 if (item->reserved) return rcode;
734
735 /*
736 * There are no more packets to send, so this connection is idle.
737 *
738 * Note that partial packets aren't tracked in the timer tree. We can't do retransmits until the
739 * socket is writable.
740 */
741 if (fr_bio_retry_outstanding((fr_bio_t *) my) == 1) my->info.last_idle = my->info.last_reply;
742
743 /*
744 * We have enough replies. Release it.
745 */
746 if ((item->retry.replies >= item->retry.count) || !fr_time_delta_ispos(my->retry_config.mrd)) {
748 return rcode;
749 }
750
751 /*
752 * There are more replies pending. Wait passively for more replies, and clean up the item
753 * when the timer has expired.
754 */
755 item->retry.next = fr_time_add_time_delta(item->retry.start, my->retry_config.mrd);
756
757 (void) fr_timer_uctx_remove(my->next_tl, item);
758 (void) fr_timer_uctx_insert(my->next_tl, item);
759
760 return rcode;
761}
762
763/*
764 * Order the retries by what we have to do next.
765 *
766 * Note that "retry.next" here is capped at "retry.end". So if we need to expire an entry, it will
767 * happen at the "next" retry.
768 */
769static int8_t _next_retry_cmp(void const *one, void const *two)
770{
771 fr_bio_retry_entry_t const *a = one;
772 fr_bio_retry_entry_t const *b = two;
773
774 fr_assert(a->buffer);
775 fr_assert(b->buffer);
776
777 return fr_time_cmp(a->retry.next, b->retry.next);
778}
779
780/*
781 * Order entries by when they expire, when we're not retrying.
782 *
783 * i.e. the socket is blocked, so all retries are paused.
784 */
785static int8_t _expiry_cmp(void const *one, void const *two)
786{
787 fr_bio_retry_entry_t const *a = one;
788 fr_bio_retry_entry_t const *b = two;
789
790 fr_assert(a->buffer);
791 fr_assert(b->buffer);
792
793 return fr_time_cmp(a->retry.end, b->retry.end);
794}
795
796/** Cancel one item.
797 *
798 * If "item" is NULL, the last entry in the timer tree is cancelled.
799 *
800 * @param bio the binary IO handler
801 * @param item the retry context from #fr_bio_retry_sent_t
802 * @return
803 * - <0 error
804 * - 0 - didn't cancel
805 * - 1 - did cancel
806 */
808{
809 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
810
811 /*
812 * No item passed, try to cancel the first one to expire.
813 */
814 if (!item) {
815 item = fr_timer_uctx_peek(my->expiry_tl);
816 if (!item) return 0;
817
818 /*
819 * This item hasn't had a response, we can't cancel it.
820 */
821 if (!item->retry.replies) return 0;
822 }
823
824 /*
825 * If the caller has cached a previously finished item, then that's a fatal error.
826 */
827 fr_assert(item->buffer != NULL);
828
830
831 return 1;
832}
833
834/** Set a per-packet retry config
835 *
836 * This function should be called from the #fr_bio_retry_sent_t callback to set a unique retry timer for this
837 * packet. If no retry configuration is set, then the main one from the alloc() function is used.
838 */
840{
841 fr_assert(item->buffer != NULL);
842
843 if (item->retry.config) return -1;
844
846
847 fr_retry_init(&item->retry, item->retry.start, cfg);
848
849 return 0;
850}
851
852/** Allow the callbacks / application to know when things are being retried.
853 *
854 * This is not initialized util _after_ fr_bio_retry_entry_start() has been called.
855 */
857{
858 fr_assert(item->buffer != NULL);
859
860 if (!item->retry.config) return NULL;
861
862 return &item->retry;
863}
864
865
866/** Cancel all outstanding packets.
867 *
868 */
870{
872
873 fr_timer_list_disarm(my->next_tl);
874 fr_timer_list_disarm(my->expiry_tl);
875
876 /*
877 * Cancel all outgoing packets. Don't bother updating the tree or the free list, as all of the
878 * entries will be deleted when the memory is freed.
879 */
880 while ((item = fr_timer_uctx_peek(my->next_tl)) != NULL) {
881 (void) fr_timer_uctx_remove(my->next_tl, item);
882 my->release((fr_bio_t *) my, item, FR_BIO_RETRY_CANCELLED);
883 }
884
885 return 0;
886}
887
888/** Orderly shutdown.
889 *
890 */
892{
893 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
894
896}
897
898/** Allocate a #fr_bio_retry_t
899 *
900 */
901fr_bio_t *fr_bio_retry_alloc(TALLOC_CTX *ctx, size_t max_saved,
906 fr_bio_retry_config_t const *cfg,
907 fr_bio_t *next)
908{
909 size_t i;
912
913 fr_assert(cfg->el);
914
915 /*
916 * Limit to reasonable values.
917 */
918 if (!max_saved) return NULL;
919 if (max_saved > 65536) return NULL;
920
921 my = talloc_zero(ctx, fr_bio_retry_t);
922 if (!my) return NULL;
923
924 /*
925 * Allocate everything up front, to get better locality of reference, less memory fragmentation,
926 * and better reuse of data structures.
927 */
928 items = talloc_array(my, fr_bio_retry_entry_t, max_saved);
929 if (!items) return NULL;
930
931 /*
932 * Insert the entries into the free list in order.
933 */
934 fr_bio_retry_list_init(&my->free);
935 for (i = 0; i < max_saved; i++) {
936 items[i].my = my;
937 fr_bio_retry_list_insert_tail(&my->free, &items[i]);
938 }
939
941 offsetof(fr_bio_retry_entry_t, next_retry_node),
942 offsetof(fr_bio_retry_entry_t, retry.next));
943 if (!my->next_tl) {
945 return NULL;
946 }
947
949 offsetof(fr_bio_retry_entry_t, expiry_node),
950 offsetof(fr_bio_retry_entry_t, retry.end));
951 if (!my->expiry_tl) {
953 return NULL;
954 }
955
956 /*
957 * The expiry list is run only when writes are blocked. We cannot have both lists active at the
958 * same time.
959 */
960 (void) fr_timer_list_disarm(my->expiry_tl);
961
962 my->sent = sent;
963 if (!rewrite) {
964 my->rewrite = fr_bio_retry_rewrite;
965 } else {
966 my->rewrite = rewrite;
967 }
968 my->response = response;
969 my->release = release;
970
971 my->info.last_idle = fr_time();
972 my->info.el = cfg->el;
973 my->info.cfg = cfg;
974
976
977 my->bio.write = fr_bio_retry_write;
978 my->bio.read = fr_bio_retry_read;
979
980 my->priv_cb.write_blocked = fr_bio_retry_write_blocked;
981 my->priv_cb.write_resume = fr_bio_retry_write_resume;
982 my->priv_cb.shutdown = fr_bio_retry_shutdown;
983
984 fr_bio_chain(&my->bio, next);
985
986 talloc_set_destructor(my, fr_bio_retry_destructor);
987
988 return (fr_bio_t *) my;
989}
990
992{
993 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
994
995 return &my->info;
996}
997
999{
1000 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
1001 size_t num;
1002
1003 num = fr_timer_list_num_events(my->next_tl);
1004
1005 if (!my->partial) return num;
1006
1007 /*
1008 * Only count partially written items if they haven't been cancelled.
1009 */
1010 return num + !my->partial->cancelled;
1011}
1012
1013/** Reserve an entry for later use with fr_bio_retry_rewrite()
1014 *
1015 * So that application-layer watchdogs can bypass the normal write / retry routines.
1016 */
1018{
1019 fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
1021
1022 item = fr_bio_retry_list_pop_head(&my->free);
1023 if (!item) return NULL;
1024
1025 fr_assert(item->my == my);
1027 .my = my,
1028 .reserved = true,
1029 };
1030
1031 return item;
1032}
static int const char char buffer[256]
Definition acutest.h:576
fr_bio_write_t _CONST write
write to the underlying bio
Definition base.h:116
fr_bio_read_t _CONST read
read from the underlying bio
Definition base.h:115
static fr_bio_t * fr_bio_next(fr_bio_t *bio)
Definition base.h:130
#define fr_bio_error(_x)
Definition base.h:192
static ssize_t fr_bio_retry_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
Definition retry.c:666
static int8_t _expiry_cmp(void const *one, void const *two)
Definition retry.c:785
fr_bio_retry_release_t release
callback to release a request / response pair
Definition retry.c:111
int fr_bio_retry_entry_init(UNUSED fr_bio_t *bio, fr_bio_retry_entry_t *item, fr_retry_config_t const *cfg)
Set a per-packet retry config.
Definition retry.c:839
fr_bio_t * fr_bio_retry_alloc(TALLOC_CTX *ctx, size_t max_saved, fr_bio_retry_sent_t sent, fr_bio_retry_response_t response, fr_bio_retry_rewrite_t rewrite, fr_bio_retry_release_t release, fr_bio_retry_config_t const *cfg, fr_bio_t *next)
Allocate a fr_bio_retry_t.
Definition retry.c:901
static ssize_t fr_bio_retry_save_write(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode)
Save a partial packet when the write becomes blocked.
Definition retry.c:374
fr_bio_buf_t buffer
to store partial packets
Definition retry.c:113
static void fr_bio_retry_release(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, fr_bio_retry_release_reason_t reason)
Release an entry back to the free list.
Definition retry.c:124
size_t fr_bio_retry_outstanding(fr_bio_t *bio)
Definition retry.c:998
fr_timer_list_t * next_tl
when packets are retried next
Definition retry.c:92
static void fr_bio_retry_next_timer(UNUSED fr_timer_list_t *tl, fr_time_t now, void *uctx)
Run a timer event.
Definition retry.c:521
static int fr_bio_retry_destructor(fr_bio_retry_t *my)
Cancel all outstanding packets.
Definition retry.c:869
int fr_bio_retry_entry_cancel(fr_bio_t *bio, fr_bio_retry_entry_t *item)
Cancel one item.
Definition retry.c:807
fr_timer_list_t * expiry_tl
when packets expire, so that we expire packets when the socket is blocked.
Definition retry.c:93
struct fr_bio_retry_list_s fr_bio_retry_list_t
Definition retry.c:56
ssize_t fr_bio_retry_rewrite(fr_bio_t *bio, fr_bio_retry_entry_t *item, const void *buffer, size_t size)
Resend a packet.
Definition retry.c:425
static void fr_bio_retry_shutdown(fr_bio_t *bio)
Orderly shutdown.
Definition retry.c:891
static int8_t _next_retry_cmp(void const *one, void const *two)
Definition retry.c:769
static int fr_bio_retry_write_item(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, fr_time_t now)
Write one item.
Definition retry.c:214
fr_retry_config_t retry_config
Definition retry.c:97
static void fr_bio_retry_expiry_timer(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t now, void *uctx)
Run an expiry timer event.
Definition retry.c:501
fr_bio_retry_entry_t * partial
for partial writes
Definition retry.c:106
const fr_retry_t * fr_bio_retry_entry_info(UNUSED fr_bio_t *bio, fr_bio_retry_entry_t *item)
Allow the callbacks / application to know when things are being retried.
Definition retry.c:856
fr_bio_retry_sent_t sent
callback for when we successfully sent a packet
Definition retry.c:108
fr_bio_retry_response_t response
callback to see if we got a valid response
Definition retry.c:110
static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write a request, and see if we have a reply.
Definition retry.c:546
ssize_t error
Definition retry.c:99
static ssize_t fr_bio_retry_write_fatal(fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
A previous timer write had a fatal error, so we forbid further writes.
Definition retry.c:487
bool all_used
blocked due to no free entries
Definition retry.c:100
static int fr_bio_retry_write_blocked(fr_bio_t *bio)
Writes are blocked.
Definition retry.c:184
static ssize_t fr_bio_retry_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
There's a partial packet written.
Definition retry.c:315
fr_bio_retry_info_t info
Definition retry.c:95
fr_bio_retry_rewrite_t rewrite
optional callback which can change a packet on retry
Definition retry.c:109
fr_bio_retry_entry_t * fr_bio_retry_item_reserve(fr_bio_t *bio)
Reserve an entry for later use with fr_bio_retry_rewrite()
Definition retry.c:1017
fr_bio_retry_info_t const * fr_bio_retry_info(fr_bio_t *bio)
Definition retry.c:991
static int fr_bio_retry_write_resume(fr_bio_t *bio)
Resume writes.
Definition retry.c:288
void * rewrite_ctx
context specifically for rewriting this packet
Definition retry.c:68
fr_retry_config_t retry_config
base retry config
Definition retry.h:47
void(* fr_bio_retry_release_t)(fr_bio_t *bio, fr_bio_retry_entry_t *retry_ctx, fr_bio_retry_release_reason_t reason)
Callback on release the packet (timeout or have all replies)
Definition retry.h:136
fr_retry_t retry
retry timers and counters
Definition retry.c:70
fr_bio_retry_release_reason_t
Definition retry.h:79
@ FR_BIO_RETRY_WRITE_ERROR
Definition retry.h:83
@ FR_BIO_RETRY_CANCELLED
Definition retry.h:82
@ FR_BIO_RETRY_DONE
Definition retry.h:80
@ FR_BIO_RETRY_NO_REPLY
Definition retry.h:81
struct fr_bio_retry_entry_s fr_bio_retry_entry_t
Definition retry.h:64
uint8_t const * buffer
cached copy of the packet to send
Definition retry.c:80
size_t size
size of the cached packet
Definition retry.c:81
bool reserved
for application-layer watchdog
Definition retry.c:84
void(* fr_bio_retry_sent_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size, fr_bio_retry_entry_t *retry_ctx)
Callback for when a packet is sent.
Definition retry.h:98
ssize_t(* fr_bio_retry_rewrite_t)(fr_bio_t *bio, fr_bio_retry_entry_t *retry_ctx, const void *buffer, size_t size)
Definition retry.h:66
bool(* fr_bio_retry_response_t)(fr_bio_t *bio, fr_bio_retry_entry_t **item_p, void *packet_ctx, const void *buffer, size_t size)
Callback on read to see if a packet is a response.
Definition retry.h:123
fr_event_list_t * el
event list
Definition retry.h:45
void * packet_ctx
packet_ctx from the write() call
Definition retry.c:66
fr_rb_node_t expiry_node
for expiries
Definition retry.c:76
bool cancelled
was this item cancelled?
Definition retry.c:83
fr_bio_retry_t * my
so we can get to it from the event timer callback
Definition retry.c:78
fr_bio_retry_rewrite_t rewrite
per-packet rewrite callback
Definition retry.c:67
void * uctx
user-writable context
Definition retry.c:65
Definition retry.c:64
static void fr_bio_chain(fr_bio_t *first, fr_bio_t *second)
Chain one bio after another.
Definition bio_priv.h:69
int fr_bio_buf_alloc(TALLOC_CTX *ctx, fr_bio_buf_t *bio_buf, size_t size)
Definition buf.c:114
ssize_t fr_bio_buf_write(fr_bio_buf_t *bio_buf, const void *buffer, size_t size)
Definition buf.c:81
static size_t fr_bio_buf_used(fr_bio_buf_t const *bio_buf)
Definition buf.h:73
static void fr_bio_buf_reset(fr_bio_buf_t *bio_buf)
Definition buf.h:61
static size_t fr_bio_buf_size(fr_bio_buf_t const *bio_buf)
Definition buf.h:151
#define UNUSED
Definition build.h:317
#define FR_DLIST_TYPES(_name)
Define type specific wrapper structs for dlists.
Definition dlist.h:1129
#define FR_DLIST_ENTRY(_name)
Expands to the type name used for the entry wrapper structure.
Definition dlist.h:1115
#define FR_DLIST_FUNCS(_name, _element_type, _element_entry)
Define type specific wrapper functions for dlists.
Definition dlist.h:1152
#define FR_DLIST_HEAD(_name)
Expands to the type name used for the head wrapper structure.
Definition dlist.h:1122
fr_bio_shutdown & my
Definition fd_errno.h:69
free(array)
int fr_bio_write_blocked(fr_bio_t *bio)
Internal BIO function to tell all BIOs that it's blocked.
Definition base.c:291
talloc_free(reap)
static void * item(fr_lst_t const *lst, fr_lst_index_t idx)
Definition lst.c:122
long int ssize_t
unsigned char uint8_t
static size_t used
ssize_t fr_bio_null_write(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
Always return 0 on write.
Definition null.c:39
#define fr_assert(_expr)
Definition rad_assert.h:38
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
static fr_time_t fr_time_add_time_delta(fr_time_t a, fr_time_delta_t b)
Definition time.h:173
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
Definition time.h:154
#define fr_time_lteq(_a, _b)
Definition time.h:240
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_gt(_a, _b)
Definition time.h:237
#define fr_time_lt(_a, _b)
Definition time.h:239
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition time.h:916
"server local" time.
Definition time.h:69
int fr_timer_list_disarm(fr_timer_list_t *tl)
Disarm a timer list.
Definition timer.c:1101
uint64_t fr_timer_list_num_events(fr_timer_list_t *tl)
Return number of pending events.
Definition timer.c:1149
int fr_timer_uctx_insert(fr_timer_list_t *tl, void *uctx)
Insert a uctx into a shared timer, and update the timer.
Definition timer.c:1344
fr_timer_list_t * fr_timer_list_shared_alloc(TALLOC_CTX *ctx, fr_timer_list_t *parent, fr_cmp_t cmp, fr_timer_cb_t callback, size_t node_offset, size_t time_offset)
Allocate a new shared event timer list.
Definition timer.c:1308
int fr_timer_uctx_remove(fr_timer_list_t *tl, void *uctx)
Remove a uctx from a shared timer.
Definition timer.c:1367
void * fr_timer_uctx_peek(fr_timer_list_t *tl)
Definition timer.c:1377
int fr_timer_list_arm(fr_timer_list_t *tl)
Arm (or re-arm) a timer list.
Definition timer.c:1122
An event timer list.
Definition timer.c:50
fr_retry_state_t fr_retry_next(fr_retry_t *r, fr_time_t now)
Initialize a retransmission counter.
Definition retry.c:108
void fr_retry_init(fr_retry_t *r, fr_time_t now, fr_retry_config_t const *config)
Initialize a retransmission counter.
Definition retry.c:36
fr_time_delta_t irt
Initial transmission time.
Definition retry.h:33
fr_retry_state_t
Definition retry.h:45
@ FR_RETRY_CONTINUE
Definition retry.h:46
fr_time_t end
when we will end the retransmissions
Definition retry.h:54
fr_time_t next
when the next timer should be set
Definition retry.h:55