The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
dedup.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: d6be229e40577b02d9c51e14bf05ac99a7a877fb $
19 * @file lib/bio/dedup.c
20 * @brief Binary IO abstractions for deduping packets.
21 *
22 * The dedup BIO receives packets from the network, and allows for deduplication of requests, so that
23 * duplicate requests are only processed once. In addition, if a reply is available, a duplicate request will
24 * result in a duplicate reply. The actual deduplication tree / table has to be maintained by the calling
25 * application, as packet comparisons for deduplication is very protocol-specific. The purpose of the dedup
26 * BIO is to abstract all of the common support functions around this limitation.
27 *
28 * When packets are read() from the next bio, the #fr_bio_dedup_receive_t callback is run. It tells the BIO
29 * whether or not the packet should be received, and whether or not the packet should be returned to the
30 * caller. The receive callback is also passed a #fr_bio_dedup_entry_t pointer, where the packet_ctx, packet,
31 * and size are already filled out. This entry is used to correlate requests and replies.
32 *
33 * When packets are write() to the network, the #fr_bio_dedup_get_item_t callback is called to get the
34 * previously cached #fr_bio_dedup_entry_t pointer. This is because there is no generic way to get an
35 * additional context to this BIO via the write() routine. i.e. the packet_ctx for write() may include things
36 * like src/dst ip/port, and therefore can't always be an #fr_bio_dedup_entry_t. The caller should associate
37 * the #fr_bio_dedup_entry_t with the packet_ctx for the reply. The get_item() routine can then return the entry.
38 *
39 * For simplicity, the next bio should be a memory one. That way the read() can read more than one packet if
40 * necessary. And the write() can cache a partial packet if it blocks.
41 *
42 * The entry needs to be cached in order to maintain the internal tracking used by the dedup BIO.
43 *
44 * On client retransmit, the #fr_bio_dedup_receive_t callback is run, just as if it is a new packet. The
45 * dedup BIO does not know if the received data is a new packet until the #fr_bio_dedup_receive_t callback
46 * says so. On duplicate client request, the #fr_bio_dedup_receive_t callback can call fr_bio_dedup_respond()
47 * to send a duplicate reply. That call bypasses the normal dedup stack, and writes directly to the next bio.
48 *
49 * The calling application can also call fr_bio_dedup_respond() as soon as it has a reply. i.e. skip the BIO
50 * write() call. That works, and is safe.
51 *
52 * The dedup BIO tracks a number of lists / trees internally. Packets which are received but which have no
53 * reply are in an "active" list. Packets which have a reply are in an "expired" RB tree, where a timer is
54 * set to expire packets. If a write() call results in a partial write, that packet is put into a "partially
55 * written" state. If multiple calls to write() are done when writing is blocked, the replies are put into a
56 * "pending" state.
57 *
58 * The calling application can always call fr_bio_dedup_cancel() to cancel or expire a packet. This call is
59 * safe, and can be made at any time, no matter what state the packet is in.
60 *
61 * @copyright 2024 Network RADIUS SAS (legal@networkradius.com)
62 */
63
64#include <freeradius-devel/bio/bio_priv.h>
65#include <freeradius-devel/bio/null.h>
66#include <freeradius-devel/bio/buf.h>
67#include <freeradius-devel/util/rb.h>
68
69#define _BIO_DEDUP_PRIVATE
70#include <freeradius-devel/bio/dedup.h>
71
72typedef struct fr_bio_dedup_list_s fr_bio_dedup_list_t;
74
75/*
76 * There is substantial similarity between this code and the
77 * "retry" bio. Any fixes which are done here should be checked
78 * there, and vice versa.
79 */
80
81/*
82 * Define type-safe wrappers for head and entry definitions.
83 */
84FR_DLIST_TYPES(fr_bio_dedup_list)
85
86typedef enum {
88 FR_BIO_DEDUP_STATE_ACTIVE, //!< Received, but not replied.
89 FR_BIO_DEDUP_STATE_PENDING, //!< Have a reply, but we're trying to write it out.
90 FR_BIO_DEDUP_STATE_REPLIED, //!< Replied, and waiting for it to expire.
91 FR_BIO_DEDUP_STATE_PARTIAL, //!< Partially written
92 FR_BIO_DEDUP_STATE_CANCELLED, //!< Partially written, and then cancelled.
94
96 void *uctx;
98 uint8_t *packet; //!< cached packet data for finding duplicates
99 size_t packet_size; //!< size of the cached packet data
100 void *reply_ctx; //!< reply ctx
101 uint8_t *reply; //!< reply cached by the application
102 size_t reply_size; //!< size of the cached reply
103
104 fr_rb_node_t dedup; //!< user managed dedup node
105
106 union {
107 struct {
108 fr_rb_node_t node; //!< for the expiry timers
109 };
110 FR_DLIST_ENTRY(fr_bio_dedup_list) entry; //!< for the free list
111 };
112
113 fr_bio_dedup_t *my; //!< so we can get to it from the event timer callback
114
115 fr_time_t expires; //!< when this entry expires
116 fr_bio_dedup_state_t state; //!< which tree or list this item is in
117};
118
119FR_DLIST_FUNCS(fr_bio_dedup_list, fr_bio_dedup_entry_t, entry)
120
123
125
126 fr_rb_tree_t rb; //!< expire list
127
129
131
132 /*
133 * The "first" entry is cached here so that we can detect when it changes. The insert / delete
134 * code can just do its work without worrying about timers. And then when the tree manipulation
135 * is done, call the fr_bio_dedup_timer_reset() function to reset (or not) the timer.
136 *
137 * The timer is set for is when the first packet expires.
138 */
140
141 /*
142 * Cache a partial write when IO is blocked.
143 *
144 * When the IO is blocked, we can still expire old entries, unlike the "retry" BIOs. This is
145 * because we're not resending packets, we're just cleaning up *sent* packets when they expire.
146 */
148
149 fr_bio_dedup_receive_t receive; //!< called when we receive a potentially new packet
150 fr_bio_dedup_release_t release; //!< called to release a packet
151
152 fr_bio_dedup_get_item_t get_item; //!< turn a packet_ctx into a #fr_bio_dedup_entry_t
153
155
156 FR_DLIST_HEAD(fr_bio_dedup_list) active; //!< received but not yet replied
157 FR_DLIST_HEAD(fr_bio_dedup_list) pending; //!< trying to write when the socket is blocked.
158 FR_DLIST_HEAD(fr_bio_dedup_list) free; //!< free list
159};
160
161static void fr_bio_dedup_timer(UNUSED fr_timer_list_t *el, fr_time_t now, void *uctx);
162static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
166
168{
169 if (my->first != item) return;
170
171 my->first = NULL;
173}
174
175/** Move an item from active to replied.
176 *
177 * Note that we don't update any timers. The caller is responsible for that.
178 */
180{
181 if (item->state == FR_BIO_DEDUP_STATE_REPLIED) return;
182
184
185 (void) fr_bio_dedup_list_remove(&my->active, item);
186
187 /*
188 * Now that we have a reply, set the default expiry time. The caller can always call
189 * fr_bio_dedup_entry_extend() to change the expiry time.
190 */
191 item->expires = fr_time_add_time_delta(fr_time(), my->config.lifetime);
192
193 /*
194 * This should never fail.
195 */
196 (void) fr_rb_insert(&my->rb, item);
198}
199
200/** Resend a reply when we receive a duplicate request.
201 *
202 * This function should be called by the respond() callback to re-send a duplicate reply.
203 *
204 * It can also be called by the application when it first has a response to the request.
205 *
206 * @param bio the binary IO handler
207 * @param item the dedup context from #fr_bio_dedup_sent_t
208 * @return
209 * - <0 on error
210 * - 0 for "wrote no data"
211 * - >0 for "wrote data".
212 */
214{
215 ssize_t rcode;
216 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
217 fr_bio_t *next;
218
219 if (!item->reply || !item->reply_size) return 0;
220
221 switch (item->state) {
222 /*
223 * Send a first reply if we can.
224 */
226 /*
227 * If we're not writing to the socket, just insert the packet into the pending list.
228 */
229 if (my->bio.write != fr_bio_dedup_write) {
230 (void) fr_bio_dedup_list_remove(&my->active, item);
231 fr_bio_dedup_list_insert_tail(&my->pending, item);
232
234 item->expires = fr_time_add_time_delta(fr_time(), my->config.lifetime);
235 return item->reply_size;
236 }
237
238 /*
239 * The socket is writable, go do that.
240 */
241 break;
242
243 /*
244 * Send a duplicate reply.
245 */
247 if (my->bio.write == fr_bio_dedup_write) break;
248
249 /*
250 * The socket is blocked. Save the packet in the pending queue.
251 */
252 move_to_pending:
253 fr_rb_remove_by_inline_node(&my->rb, &item->node);
254
255 save_in_pending:
256 /*
257 * We could update the timer for pending packets. However, that's more complicated.
258 *
259 * The packets will be expire when the pending queue is flushed, OR when the application
260 * cancels the pending packet.
261 */
263
264 fr_bio_dedup_list_insert_tail(&my->pending, item);
266 return item->reply_size;
267
268 /*
269 * We're already trying to write this entry, don't do anything else.
270 */
273
275 return fr_bio_error(IO_WOULD_BLOCK);
276
279 fr_assert(0);
280 return fr_bio_error(GENERIC);
281 }
282
283 /*
284 * There must be a next bio.
285 */
286 next = fr_bio_next(&my->bio);
287 fr_assert(next != NULL);
288
289 /*
290 * Write out the packet, if everything is OK, return.
291 */
292 rcode = next->write(next, item->reply_ctx, item->reply, item->reply_size);
293 if ((size_t) rcode == item->reply_size) {
295 return rcode;
296 }
297
298 /*
299 * Can't write anything, be sad.
300 */
301 if ((rcode == 0) || (rcode == fr_bio_error(IO_WOULD_BLOCK))) {
302 if (item->state == FR_BIO_DEDUP_STATE_ACTIVE) {
303 (void) fr_bio_dedup_list_remove(&my->active, item);
304 goto save_in_pending;
305 }
306
308 goto move_to_pending;
309 }
310
311 /*
312 * There's an error writing the packet. Release it, and move the item to the free list.
313 *
314 * Note that we don't bother resetting the timer. There's no point in changing the timer when
315 * the bio is likely dead.
316 */
317 if (rcode < 0) {
319 return rcode;
320 }
321
322 /*
323 * We are writing item->reply, and that's blocked. Save the partial packet for later.
324 */
325 return fr_bio_dedup_blocked(my, item, rcode);
326}
327
328/** Reset the timer after changing the rb tree.
329 *
330 */
332{
334
335 /*
336 * Nothing to do, don't set any timers.
337 */
338 first = fr_rb_first(&my->rb);
339 if (!first) {
341 my->ev = NULL;
342 my->first = NULL;
343 return 0;
344 }
345
346 /*
347 * We don't care about partially written packets. The timer remains set even when we have a
348 * partial outgoing packet, because we can expire entries which aren't partially written.
349 *
350 * However, the partially written packet MUST NOT be in the expiry tree.
351 *
352 * We also don't care about the pending list. The application will either cancel the item, or
353 * the socket will become writable, and the item will get handled then.
354 */
355 fr_assert(first != my->partial);
356
357 /*
358 * The timer is already set correctly, we're done.
359 */
360 if (first == my->first) return 0;
361
362 /*
363 * Update the timer. This should never fail.
364 */
365 if (fr_timer_at(my, my->el->tl, &my->ev, first->expires, false, fr_bio_dedup_timer, my) < 0) return -1;
366
367 my->first = first;
368 return 0;
369}
370
371/** Release an entry back to the free list.
372 *
373 */
375{
376 my->release((fr_bio_t *) my, item, reason);
377
378 switch (item->state) {
379 /*
380 * Cancel an active item, just nuke it.
381 */
383 fr_bio_dedup_list_remove(&my->active, item);
384 break;
385
386 /*
387 * We already replied, remove it from the expiry tree.
388 *
389 * We only update the timer if the caller isn't already expiring the entry.
390 */
392 (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
393
395 break;
396
397 /*
398 * It was pending another write, so we just discard the write.
399 */
401 fr_bio_dedup_list_remove(&my->pending, item);
402 break;
403
404 /*
405 * Don't free it. Just set its state to cancelled.
406 */
408 fr_assert(my->partial == item);
410 return;
411
414 fr_assert(0);
415 return;
416 }
417
418#ifndef NDEBUG
419 item->packet = NULL;
420#endif
421 item->uctx = NULL;
422 item->packet_ctx = NULL;
423
424 fr_assert(my->first != item);
425 fr_bio_dedup_list_insert_head(&my->free, item);
426}
427
428/** Flush any packets in the pending queue.
429 *
430 */
432{
433 ssize_t rcode, out_rcode;
435 fr_bio_t *next;
436 fr_time_t now;
437
438 /*
439 * We can only flush the pending list when any previous partial packet has been written.
440 */
441 fr_assert(!my->partial);
442
443 /*
444 * Nothing in the list, we're done.
445 */
446 if (fr_bio_dedup_list_num_elements(&my->pending) == 0) return 0;
447
448 /*
449 * There must be a next bio.
450 */
451 next = fr_bio_next(&my->bio);
452 fr_assert(next != NULL);
453
454 now = fr_time();
455 out_rcode = 0;
456
457 /*
458 * Write out any pending packets.
459 */
460 while ((item = fr_bio_dedup_list_pop_head(&my->pending)) != NULL) {
462
463 /*
464 * It's already expired, don't bother replying.
465 */
466 if (fr_time_lteq(item->expires, now)) {
468 continue;
469 }
470
471 /*
472 * Write the entry to the next bio.
473 */
474 rcode = next->write(next, item->reply_ctx, item->reply, item->reply_size);
475 if (rcode <= 0) return rcode; /* @todo - update timer if we've written one packet */
476
477 /*
478 * We've written the entire packet, move it to the expiry list.
479 */
480 if ((size_t) rcode == item->reply_size) {
481 (void) fr_rb_insert(&my->rb, item);
483 continue;
484 }
485
487
488 out_rcode = fr_bio_error(IO_WOULD_BLOCK);
489 break;
490 }
491
492 /*
493 * We may need to update the timer if we've removed the first entry from the tree, or added a new
494 * first entry.
495 */
496 if (!my->first || (my->first != fr_rb_first(&my->rb))) {
497 my->first = NULL;
499 }
500
501 return out_rcode;
502}
503
504/** Save partially written data to our local buffer.
505 *
506 */
507static int fr_bio_dedup_buffer_save(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
508{
509 /*
510 * (re)-alloc the buffer for partial writes.
511 */
512 if (!my->buffer.start ||
513 (size > fr_bio_buf_size(&my->buffer))) {
514 if (fr_bio_buf_alloc(my, &my->buffer, size) < 0) return -1;
515 }
516
517 fr_assert(fr_bio_buf_used(&my->buffer) == 0);
518 fr_assert(my->buffer.read == my->buffer.start);
519
520 fr_bio_buf_write(&my->buffer, buffer + rcode, size - rcode);
521
522 return 0;
523}
524
525/** Write data from our local buffer to the next bio.
526 *
527 */
529{
530 size_t used;
531 ssize_t rcode;
532 fr_bio_t *next;
533
534 fr_assert(my->buffer.start);
535
536 used = fr_bio_buf_used(&my->buffer);
537 fr_assert(used > 0);
538
539 /*
540 * There must be a next bio.
541 */
542 next = fr_bio_next(&my->bio);
543 fr_assert(next != NULL);
544
545 rcode = next->write(next, NULL, my->buffer.read, used);
546 if (rcode <= 0) return rcode;
547
548 my->buffer.read += rcode;
549
550 /*
551 * Still data in the buffer. We can't send more packets until we finish writing this one.
552 */
553 if (fr_bio_buf_used(&my->buffer) > 0) return 0;
554
555 /*
556 * We're done. Reset the buffer and clean up our cached partial packet.
557 */
558 fr_bio_buf_reset(&my->buffer);
559
560 return rcode;
561}
562
563/** There's a partial packet written. Write all of that one first, before writing another packet.
564 *
565 * The packet can either be cancelled, or IO blocked. In either case, we must write this packet before
566 * we can write another one.
567 */
568static ssize_t fr_bio_dedup_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
569{
570 ssize_t rcode;
571 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
572 fr_bio_dedup_entry_t *item = my->partial;
573
574 fr_assert(my->partial != NULL);
575 fr_assert(my->buffer.start);
576
579
581 if (rcode <= 0) return rcode;
582
583 my->partial = NULL;
584
585 /*
586 * Partial writes are removed from the expiry tree until they're fully written. When they're
587 * written, either add it back to the tree if it's still operational, or add it to the free list
588 * if it has been cancelled.
589 */
590 if (item->state == FR_BIO_DEDUP_STATE_PARTIAL) {
591
592 /*
593 * See if we have to clean up this entry. If so, do it now. That avoids another bounce
594 * through the event loop.
595 */
596 if (fr_time_lteq(item->expires, fr_time())) {
598
599 } else {
600 /*
601 * We've changed the tree, so update the timer. fr_bio_dedup_write() only
602 * updates the timer on successful write.
603 */
605 (void) fr_rb_insert(&my->rb, item);
606 }
608
609 } else {
610 /*
611 * The item was cancelled, add it to the free list.
612 */
613#ifndef NDEBUG
614 item->packet = NULL;
615#endif
616 item->uctx = NULL;
617 item->packet_ctx = NULL;
618
620 fr_bio_dedup_list_insert_head(&my->free, item);
621 }
622
623 /*
624 * Flush any packets which were pending during the blocking period.
625 */
627 if (rcode < 0) return rcode;
628
629 /*
630 * Unlike the retry BIO, we don't retry writes for items in the RB tree. Those packets have already
631 * been written.
632 */
633
634 /*
635 * Try to write the packet which we were given.
636 */
637 my->bio.write = fr_bio_dedup_write;
638 return fr_bio_dedup_write(bio, packet_ctx, buffer, size);
639}
640
641/** The write is blocked.
642 *
643 * We couldn't write out the entire packet, the bio is blocked. Don't write anything else until we become
644 * unblocked!
645 *
646 * Do NOT free the timer. We can still expire old entries. This newly written entry usually ends up as the
647 * _last_ item in the RB tree.
648 */
650{
651 fr_assert(!my->partial);
652 fr_assert(rcode > 0);
653 fr_assert((size_t) rcode < item->reply_size);
654
655 if (fr_bio_dedup_buffer_save(my, item->reply, item->reply_size, rcode) < 0) return fr_bio_error(OOM);
656
657 switch (item->state) {
659 (void) fr_bio_dedup_list_remove(&my->active, item);
660 break;
661
662 /*
663 * We cannot expire this entry, so remove it from the expiration tree. That step lets us
664 * expire other entries.
665 */
667 (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
669 break;
670
671 /*
672 * We tried to write a pending packet and got blocked.
673 */
675 fr_assert(fr_bio_dedup_list_head(&my->pending) == item);
676 (void) fr_bio_dedup_list_remove(&my->pending, item);
677 break;
678
679 /*
680 * None of these states should be possible.
681 */
685 fr_assert(0);
686 return fr_bio_error(GENERIC);
687 }
688
689 my->partial = item;
691
692 /*
693 * Reset the write routine, so that if the application tries any more writes, the partial entry
694 * gets written first.
695 */
696 my->bio.write = fr_bio_dedup_write_partial;
697 return rcode;
698}
699
700/** There's a partial block of data written. Write all of that data first, before writing another packet.
701 */
702static ssize_t fr_bio_dedup_write_data(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
703{
704 ssize_t rcode;
705 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
706
707 fr_assert(!my->partial);
708
709 /*
710 * Flush out any partly written data.
711 */
713 if (rcode <= 0) return rcode;
714
715 /*
716 * Flush any packets which were pending during the blocking period.
717 */
719 if (rcode < 0) return rcode;
720
721 /*
722 * Try to write the packet which we were given.
723 */
724 my->bio.write = fr_bio_dedup_write;
725 return fr_bio_dedup_write(bio, packet_ctx, buffer, size);
726}
727
728
729/** The write is blocked, but we don't have "item".
730 *
731 * We couldn't write out the entire packet, the bio is blocked. Don't write anything else until we become
732 * unblocked!
733 *
734 * Do NOT free the timer. We can still expire old entries. This newly written entry usually ends up as the
735 * _last_ item in the RB tree.
736 */
738{
739 fr_assert(!my->partial);
740 fr_assert(rcode > 0);
741 fr_assert((size_t) rcode < size);
742
743 if (fr_bio_dedup_buffer_save(my, buffer, size, rcode) < 0) return fr_bio_error(OOM);
744
745 /*
746 * Reset the write routine, so that if the application tries any more writes, the data
747 * gets written first.
748 */
749 my->bio.write = fr_bio_dedup_write_data;
750 return rcode;
751}
752
753/*
754 * There is no fr_bio_dedup_rewrite(), packets are never re-written by this bio.
755 */
756
757/** Expire an entry when its timer fires.
758 *
759 * @todo - expire items from the pending list, too
760 */
761static void fr_bio_dedup_timer(UNUSED fr_timer_list_t *tl, fr_time_t now, void *uctx)
762{
763 fr_bio_dedup_t *my = talloc_get_type_abort(uctx, fr_bio_dedup_t);
765 fr_time_t expires;
766
767 fr_assert(my->first != NULL);
768 fr_assert(fr_rb_first(&my->rb) == my->first);
769
770 my->first = NULL;
771
772 /*
773 * Expire all entries which are within 10ms of "now". That way we don't reset the event many
774 * times in short succession.
775 *
776 * @todo - also expire entries on the pending list?
777 */
778 expires = fr_time_add(now, fr_time_delta_from_msec(10));
779
780 while ((item = fr_rb_first(&my->rb)) != NULL) {
781 if (fr_time_gt(item->expires, expires)) break;
782
784 }
785
787}
788
789/** Write raw data to the bio.
790 *
791 * This function is largely a duplicate of fr_bio_dedup_respond(). Except due to the BIO API, it can be
792 * passed a NULL buffer (for flushing the BIOs), and it can't be passed a #fr_bio_dedup_entry_t, and instead
793 * has to be passed a "void *packet_ctx".
794 *
795 * The caller is free to ignore this function,
796 */
797static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
798{
799 ssize_t rcode;
801 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
802 fr_bio_t *next;
803
804 fr_assert(!my->partial);
805
806 /*
807 * There must be a next bio.
808 */
809 next = fr_bio_next(&my->bio);
810 fr_assert(next != NULL);
811
812 /*
813 * The caller is trying to flush partial data. But we don't have any partial data, so just call
814 * the next bio to flush it.
815 */
816 if (!buffer) {
817 return next->write(next, packet_ctx, NULL, size);
818 }
819
820 /*
821 * Write out the packet. If there's an error, OR we wrote nothing, return.
822 *
823 * Note that we don't mark the socket as blocked if the next bio didn't write anything. We want
824 * the caller to know that the write didn't succeed, and the caller takes care of managing the
825 * current packet. So there's no need for us to do that.
826 */
827 rcode = next->write(next, packet_ctx, buffer, size);
828 if (rcode <= 0) return rcode;
829
830 /*
831 * We need the item pointer to mark this entry as blocked. If that doesn't exist, then we try
832 * really hard to write out the un-tracked data.
833 */
834 item = NULL;
835 if (my->get_item) item = my->get_item(bio, packet_ctx);
836 if ((size_t) rcode == size) {
838 return rcode;
839 }
840
841 if (!item) return fr_bio_dedup_blocked_data(my, buffer, size, rcode);
842
843 fr_assert(item->reply_ctx == packet_ctx);
844 fr_assert(item->reply == buffer);
845 fr_assert(item->reply_size == size);
846
847 return fr_bio_dedup_blocked(my, item, rcode);
848}
849
850static ssize_t fr_bio_dedup_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
851{
852 ssize_t rcode;
854 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
855 fr_bio_t *next;
856
857 /*
858 * There must be a next bio.
859 */
860 next = fr_bio_next(&my->bio);
861 fr_assert(next != NULL);
862
863 /*
864 * Read the packet. If error or nothing, return immediately.
865 */
866 rcode = next->read(next, packet_ctx, buffer, size);
867 if (rcode <= 0) return rcode;
868
869 /*
870 * Get a free item
871 */
872 item = fr_bio_dedup_list_pop_head(&my->free);
873 if (!item) return fr_bio_error(OOM);
874
875 fr_assert(item->my == my);
877 .my = my,
878 .packet_ctx = packet_ctx,
879 .packet = buffer,
880 .packet_size = (size_t) rcode,
882 };
883
884 /*
885 * See if we want to receive this packet. If this isn't
886 * something we need to receive, then we just discard it.
887 *
888 * The "receive" function is responsible for looking in a local dedup tree to see if there's a
889 * cached reply. It's also responsible for calling the fr_bio_retry_respond() function to send
890 * a duplicate reply, and then return "don't receive" this packet.
891 *
892 * The application can alos call fr_bio_dedup_entry_extend() in order to extend the lifetime of a
893 * packet which has a cached response.
894 *
895 * If there's an active packet, then the receive() function should do whatever it needs to do in
896 * order to update the application for a duplicate packet. And then return "don't receive" for
897 * this packet.
898 *
899 * If we're NOT going to process this packet, then the item we just popped needs to get inserted
900 * back into the free list.
901 *
902 * The caller should cancel any conflicting packets by calling fr_bio_dedup_entry_cancel(). Note
903 * that for sanity, we don't re-use the previous #fr_bio_dedup_entry_t.
904 */
905 if (!my->receive(bio, item, packet_ctx)) {
907 fr_bio_dedup_list_insert_head(&my->free, item);
908 return 0;
909 }
910
911 fr_bio_dedup_list_insert_tail(&my->active, item);
912
913 return rcode;
914}
915
916static int8_t _entry_cmp(void const *one, void const *two)
917{
918 fr_bio_dedup_entry_t const *a = one;
919 fr_bio_dedup_entry_t const *b = two;
920
921 fr_assert(a->packet);
922 fr_assert(b->packet);
923
924 return fr_time_cmp(a->expires, b->expires);
925}
926
927/** Cancel one item.
928 *
929 * @param bio the binary IO handler
930 * @param item the dedup context from #fr_bio_dedup_respond_t
931 */
940
941/** Extend the expiry time for an entry
942 *
943 * @param bio the binary IO handler
944 * @param item the dedup context from #fr_bio_dedup_respond_t
945 * @param expires the new expiry time
946 * @return
947 * - <0 error
948 * - 0 success
949 */
951{
952 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
953
954 switch (item->state) {
956 return 0;
957
959 break;
960
961 /*
962 * Partially written or pending replies aren't in the expirty tree. We can just change their
963 * expiry time and be done.
964 */
967 item->expires = expires;
968 return 0;
969
972 fr_assert(0);
973 return fr_bio_error(GENERIC);
974 }
975
976 /*
977 * Change places in the tree.
978 */
979 item->expires = expires;
980 (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
981 (void) fr_rb_insert(&my->rb, item);
982
983 /*
984 * If we're not changing the first item, we don't need to change the timers.
985 *
986 * Otherwise we clear the "first" flag, so that the reset timer function will change the timer
987 * value.
988 */
989 if (my->first != item) return 0;
990
991 my->first = NULL;
992
994}
995
996
997/** Remove the dedup cache
998 *
999 */
1001{
1004 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
1005
1006 talloc_const_free(my->ev);
1007
1008 /*
1009 * Cancel all outgoing packets. Don't bother updating the tree or the free list, as all of the
1010 * entries will be deleted when the memory is freed.
1011 */
1012 while ((item = fr_rb_iter_init_inorder(&my->rb, &iter)) != NULL) {
1013 fr_rb_iter_delete_inorder(&my->rb, &iter);
1014 my->release((fr_bio_t *) my, item, FR_BIO_DEDUP_CANCELLED);
1015 }
1016
1017 while ((item = fr_bio_dedup_list_pop_head(&my->active)) != NULL) {
1018 my->release((fr_bio_t *) my, item, FR_BIO_DEDUP_CANCELLED);
1019 }
1020
1021 while ((item = fr_bio_dedup_list_pop_head(&my->pending)) != NULL) {
1022 my->release((fr_bio_t *) my, item, FR_BIO_DEDUP_CANCELLED);
1023 }
1024
1025#ifndef NDEBUG
1026 my->ev = NULL;
1027 my->first = NULL;
1028#endif
1029
1030 return 0;
1031}
1032
1033/** Allocate a #fr_bio_dedup_t
1034 *
1035 */
1036fr_bio_t *fr_bio_dedup_alloc(TALLOC_CTX *ctx, size_t max_saved,
1037 fr_bio_dedup_receive_t receive,
1038 fr_bio_dedup_release_t release,
1039 fr_bio_dedup_get_item_t get_item,
1040 fr_bio_dedup_config_t const *cfg,
1041 fr_bio_t *next)
1042{
1043 size_t i;
1045 fr_bio_dedup_entry_t *items;
1046
1047 fr_assert(cfg->el);
1048
1049 /*
1050 * Limit to reasonable values.
1051 */
1052 if (!max_saved) return NULL;
1053 if (max_saved > 65536) return NULL;
1054
1055 my = talloc_zero(ctx, fr_bio_dedup_t);
1056 if (!my) return NULL;
1057
1058 /*
1059 * Allocate everything up front, to get better locality of reference, less memory fragmentation,
1060 * and better reuse of data structures.
1061 */
1062 items = talloc_array(my, fr_bio_dedup_entry_t, max_saved);
1063 if (!items) {
1064 talloc_free(my);
1065 return NULL;
1066 }
1067
1068 /*
1069 * Insert the entries into the free list in order.
1070 */
1071 fr_bio_dedup_list_init(&my->free);
1072
1073 for (i = 0; i < max_saved; i++) {
1074 items[i].my = my;
1075 items[i].state = FR_BIO_DEDUP_STATE_FREE;
1076 fr_bio_dedup_list_insert_tail(&my->free, &items[i]);
1077 }
1078
1079 fr_bio_dedup_list_init(&my->active);
1080 fr_bio_dedup_list_init(&my->pending);
1081
1082 (void) fr_rb_inline_init(&my->rb, fr_bio_dedup_entry_t, node, _entry_cmp, NULL);
1083
1084 my->receive = receive;
1085 my->release = release;
1086 my->get_item = get_item;
1087
1088 my->el = cfg->el;
1089 my->config = *cfg;
1090
1091 my->bio.write = fr_bio_dedup_write;
1092 my->bio.read = fr_bio_dedup_read;
1093
1094 fr_bio_chain(&my->bio, next);
1095
1096 my->priv_cb.shutdown = fr_bio_dedup_shutdown;
1097
1098 talloc_set_destructor((fr_bio_t *) my, fr_bio_destructor); /* always use a common destructor */
1099
1100 return (fr_bio_t *) my;
1101}
static int const char char buffer[256]
Definition acutest.h:576
fr_bio_write_t _CONST write
write to the underlying bio
Definition base.h:117
fr_bio_read_t _CONST read
read from the underlying bio
Definition base.h:116
static fr_bio_t * fr_bio_next(fr_bio_t *bio)
Definition base.h:131
#define fr_bio_error(_x)
Definition base.h:200
static void fr_bio_chain(fr_bio_t *first, fr_bio_t *second)
Chain one bio after another.
Definition bio_priv.h:84
int fr_bio_buf_alloc(TALLOC_CTX *ctx, fr_bio_buf_t *bio_buf, size_t size)
Definition buf.c:117
ssize_t fr_bio_buf_write(fr_bio_buf_t *bio_buf, const void *buffer, size_t size)
Definition buf.c:84
static size_t fr_bio_buf_used(fr_bio_buf_t const *bio_buf)
Definition buf.h:73
static void fr_bio_buf_reset(fr_bio_buf_t *bio_buf)
Definition buf.h:61
static size_t fr_bio_buf_size(fr_bio_buf_t const *bio_buf)
Definition buf.h:151
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition build.h:343
#define UNUSED
Definition build.h:336
ssize_t fr_bio_dedup_respond(fr_bio_t *bio, fr_bio_dedup_entry_t *item)
Resend a reply when we receive a duplicate request.
Definition dedup.c:213
static int fr_bio_dedup_shutdown(fr_bio_t *bio)
Remove the dedup cache.
Definition dedup.c:1000
fr_bio_dedup_release_t release
called to release a packet
Definition dedup.c:150
static void fr_bio_dedup_timer_reset_item(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item)
Definition dedup.c:167
static int fr_bio_dedup_buffer_save(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
Save partially written data to our local buffer.
Definition dedup.c:507
int fr_bio_dedup_entry_extend(fr_bio_t *bio, fr_bio_dedup_entry_t *item, fr_time_t expires)
Extend the expiry time for an entry.
Definition dedup.c:950
fr_bio_dedup_config_t config
Definition dedup.c:128
fr_bio_dedup_get_item_t get_item
turn a packet_ctx into a fr_bio_dedup_entry_t
Definition dedup.c:152
static ssize_t fr_bio_dedup_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
There's a partial packet written.
Definition dedup.c:568
void fr_bio_dedup_entry_cancel(fr_bio_t *bio, fr_bio_dedup_entry_t *item)
Cancel one item.
Definition dedup.c:932
static ssize_t fr_bio_dedup_write_data(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
There's a partial block of data written.
Definition dedup.c:702
fr_bio_dedup_entry_t * partial
Definition dedup.c:147
fr_event_list_t * el
Definition dedup.c:124
fr_bio_t * fr_bio_dedup_alloc(TALLOC_CTX *ctx, size_t max_saved, fr_bio_dedup_receive_t receive, fr_bio_dedup_release_t release, fr_bio_dedup_get_item_t get_item, fr_bio_dedup_config_t const *cfg, fr_bio_t *next)
Allocate a fr_bio_dedup_t.
Definition dedup.c:1036
static ssize_t fr_bio_dedup_flush_pending(fr_bio_dedup_t *my)
Flush any packets in the pending queue.
Definition dedup.c:431
static ssize_t fr_bio_dedup_blocked(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item, ssize_t rcode)
The write is blocked.
Definition dedup.c:649
static void fr_bio_dedup_timer(UNUSED fr_timer_list_t *el, fr_time_t now, void *uctx)
Expire an entry when its timer fires.
Definition dedup.c:761
fr_rb_tree_t rb
expire list
Definition dedup.c:126
static void fr_bio_dedup_replied(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item)
Move an item from active to replied.
Definition dedup.c:179
fr_bio_dedup_receive_t receive
called when we receive a potentially new packet
Definition dedup.c:149
static ssize_t fr_bio_dedup_blocked_data(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
The write is blocked, but we don't have "item".
Definition dedup.c:737
static int8_t _entry_cmp(void const *one, void const *two)
Definition dedup.c:916
static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write raw data to the bio.
Definition dedup.c:797
fr_bio_dedup_entry_t * first
Definition dedup.c:139
static ssize_t fr_bio_dedup_buffer_write(fr_bio_dedup_t *my)
Write data from our local buffer to the next bio.
Definition dedup.c:528
fr_bio_buf_t buffer
Definition dedup.c:154
struct fr_bio_dedup_list_s fr_bio_dedup_list_t
Definition dedup.c:72
fr_bio_dedup_state_t
Definition dedup.c:86
@ FR_BIO_DEDUP_STATE_PARTIAL
Partially written.
Definition dedup.c:91
@ FR_BIO_DEDUP_STATE_FREE
Definition dedup.c:87
@ FR_BIO_DEDUP_STATE_ACTIVE
Received, but not replied.
Definition dedup.c:88
@ FR_BIO_DEDUP_STATE_CANCELLED
Partially written, and then cancelled.
Definition dedup.c:92
@ FR_BIO_DEDUP_STATE_PENDING
Have a reply, but we're trying to write it out.
Definition dedup.c:89
@ FR_BIO_DEDUP_STATE_REPLIED
Replied, and waiting for it to expire.
Definition dedup.c:90
static int fr_bio_dedup_timer_reset(fr_bio_dedup_t *my)
Reset the timer after changing the rb tree.
Definition dedup.c:331
static ssize_t fr_bio_dedup_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
Definition dedup.c:850
fr_timer_t * ev
Definition dedup.c:130
static void fr_bio_dedup_release(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item, fr_bio_dedup_release_reason_t reason)
Release an entry back to the free list.
Definition dedup.c:374
void * uctx
user-writable context
Definition dedup.c:96
void(* fr_bio_dedup_release_t)(fr_bio_t *bio, fr_bio_dedup_entry_t *dedup_ctx, fr_bio_dedup_release_reason_t reason)
Callback on release the packet (timeout, or cancelled by the application)
Definition dedup.h:91
fr_event_list_t * el
event list
Definition dedup.h:37
fr_bio_dedup_t * my
so we can get to it from the event timer callback
Definition dedup.c:113
fr_time_t expires
when this entry expires
Definition dedup.c:115
size_t packet_size
size of the cached packet data
Definition dedup.c:99
fr_bio_dedup_state_t state
which tree or list this item is in
Definition dedup.c:116
struct fr_bio_dedup_entry_s fr_bio_dedup_entry_t
Definition dedup.h:42
void * packet_ctx
packet_ctx for dedup purposes
Definition dedup.c:97
uint8_t * reply
reply cached by the application
Definition dedup.c:101
size_t reply_size
size of the cached reply
Definition dedup.c:102
void * reply_ctx
reply ctx
Definition dedup.c:100
uint8_t * packet
cached packet data for finding duplicates
Definition dedup.c:98
fr_rb_node_t dedup
user managed dedup node
Definition dedup.c:104
bool(* fr_bio_dedup_receive_t)(fr_bio_t *bio, fr_bio_dedup_entry_t *dedup_ctx, void *packet_ctx)
Callback on read to see if we should receive the packet.
Definition dedup.h:78
fr_bio_dedup_entry_t *(* fr_bio_dedup_get_item_t)(fr_bio_t *bio, void *packet_ctx)
Definition dedup.h:93
fr_bio_dedup_release_reason_t
Definition dedup.h:58
@ FR_BIO_DEDUP_WRITE_ERROR
Definition dedup.h:61
@ FR_BIO_DEDUP_CANCELLED
Definition dedup.h:60
@ FR_BIO_DEDUP_EXPIRED
Definition dedup.h:59
Definition dedup.c:95
#define FR_DLIST_TYPES(_name)
Define type specific wrapper structs for dlists.
Definition dlist.h:1111
#define FR_DLIST_ENTRY(_name)
Expands to the type name used for the entry wrapper structure.
Definition dlist.h:1097
#define FR_DLIST_FUNCS(_name, _element_type, _element_entry)
Define type specific wrapper functions for dlists.
Definition dlist.h:1134
#define FR_DLIST_HEAD(_name)
Expands to the type name used for the head wrapper structure.
Definition dlist.h:1104
void fr_bio_shutdown & my
Definition fd_errno.h:70
free(array)
talloc_free(hp)
int fr_bio_destructor(fr_bio_t *bio)
Free this bio.
Definition base.c:35
#define fr_time()
Definition event.c:60
Stores all information relating to an event list.
Definition event.c:377
static void * item(fr_lst_t const *lst, fr_lst_index_t idx)
Definition lst.c:121
long int ssize_t
unsigned char uint8_t
unsigned long int size_t
static size_t used
#define fr_assert(_expr)
Definition rad_assert.h:37
void * fr_rb_iter_init_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Initialise an in-order iterator.
Definition rb.c:824
void fr_rb_iter_delete_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Remove the current node from the tree.
Definition rb.c:899
void * fr_rb_remove_by_inline_node(fr_rb_tree_t *tree, fr_rb_node_t *node)
Remove an entry from the tree, using the node structure, without freeing the data.
Definition rb.c:722
void * fr_rb_first(fr_rb_tree_t *tree)
Definition rb.c:786
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
#define fr_rb_inline_init(_tree, _type, _field, _data_cmp, _data_free)
Initialises a red black tree.
Definition rb.h:178
Iterator structure for in-order traversal of an rbtree.
Definition rb.h:319
The main red black tree structure.
Definition rb.h:71
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition talloc.h:253
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition time.h:575
static fr_time_t fr_time_add_time_delta(fr_time_t a, fr_time_delta_t b)
Definition time.h:173
#define fr_time_lteq(_a, _b)
Definition time.h:240
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_gt(_a, _b)
Definition time.h:237
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition time.h:916
"server local" time.
Definition time.h:69
An event timer list.
Definition timer.c:49
A timer event.
Definition timer.c:83
#define fr_timer_at(...)
Definition timer.h:81
static fr_event_list_t * el