The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
dedup.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 668c7dff003ac35c70eef1f66a2c6d773f649055 $
19 * @file lib/bio/dedup.c
20 * @brief Binary IO abstractions for deduping packets.
21 *
22 * The dedup BIO receives packets from the network, and allows for deduplication of requests, so that
23 * duplicate requests are only processed once. In addition, if a reply is available, a duplicate request will
24 * result in a duplicate reply. The actual deduplication tree / table has to be maintained by the calling
25 * application, as packet comparisons for deduplication is very protocol-specific. The purpose of the dedup
26 * BIO is to abstract all of the common support functions around this limitation.
27 *
28 * When packets are read() from the next bio, the #fr_bio_dedup_receive_t callback is run. It tells the BIO
29 * whether or not the packet should be received, and whether or not the packet should be returned to the
30 * caller. The receive callback is also passed a #fr_bio_dedup_entry_t pointer, where the packet_ctx, packet,
31 * and size are already filled out. This entry is used to correlate requests and replies.
32 *
33 * When packets are write() to the network, the #fr_bio_dedup_get_item_t callback is called to get the
34 * previously cached #fr_bio_dedup_entry_t pointer. This is because there is no generic way to get an
35 * additional context to this BIO via the write() routine. i.e. the packet_ctx for write() may include things
36 * like src/dst ip/port, and therefore can't always be an #fr_bio_dedup_entry_t. The caller should associate
37 * the #fr_bio_dedup_entry_t with the packet_ctx for the reply. The get_item() routine can then return the entry.
38 *
39 * For simplicity, the next bio should be a memory one. That way the read() can read more than one packet if
40 * necessary. And the write() can cache a partial packet if it blocks.
41 *
42 * The entry needs to be cached in order to maintain the internal tracking used by the dedup BIO.
43 *
44 * On client retransmit, the #fr_bio_dedup_receive_t callback is run, just as if it is a new packet. The
45 * dedup BIO does not know if the received data is a new packet until the #fr_bio_dedup_receive_t callback
46 * says so. On duplicate client request, the #fr_bio_dedup_receive_t callback can call fr_bio_dedup_respond()
47 * to send a duplicate reply. That call bypasses the normal dedup stack, and writes directly to the next bio.
48 *
49 * The calling application can also call fr_bio_dedup_respond() as soon as it has a reply. i.e. skip the BIO
50 * write() call. That works, and is safe.
51 *
52 * The dedup BIO tracks a number of lists / trees internally. Packets which are received but which have no
53 * reply are in an "active" list. Packets which have a reply are in an "expired" RB tree, where a timer is
54 * set to expire packets. If a write() call results in a partial write, that packet is put into a "partially
55 * written" state. If multiple calls to write() are done when writing is blocked, the replies are put into a
56 * "pending" state.
57 *
58 * The calling application can always call fr_bio_dedup_cancel() to cancel or expire a packet. This call is
59 * safe, and can be made at any time, no matter what state the packet is in.
60 *
61 * @copyright 2024 Network RADIUS SAS (legal@networkradius.com)
62 */
63
64#include <freeradius-devel/bio/bio_priv.h>
65#include <freeradius-devel/bio/null.h>
66#include <freeradius-devel/bio/buf.h>
67#include <freeradius-devel/util/rb.h>
68#include <freeradius-devel/util/dlist.h>
69
70#define _BIO_DEDUP_PRIVATE
71#include <freeradius-devel/bio/dedup.h>
72
73typedef struct fr_bio_dedup_list_s fr_bio_dedup_list_t;
75
76/*
77 * There is substantial similarity between this code and the
78 * "retry" bio. Any fixes which are done here should be checked
79 * there, and vice versa.
80 */
81
82/*
83 * Define type-safe wrappers for head and entry definitions.
84 */
85FR_DLIST_TYPES(fr_bio_dedup_list)
86
87typedef enum {
89 FR_BIO_DEDUP_STATE_ACTIVE, //!< Received, but not replied.
90 FR_BIO_DEDUP_STATE_PENDING, //!< Have a reply, but we're trying to write it out.
91 FR_BIO_DEDUP_STATE_REPLIED, //!< Replied, and waiting for it to expire.
92 FR_BIO_DEDUP_STATE_PARTIAL, //!< Partially written
93 FR_BIO_DEDUP_STATE_CANCELLED, //!< Partially written, and then cancelled.
95
97 void *uctx;
99 uint8_t *packet; //!< cached packet data for finding duplicates
100 size_t packet_size; //!< size of the cached packet data
101 void *reply_ctx; //!< reply ctx
102 uint8_t *reply; //!< reply cached by the application
103 size_t reply_size; //!< size of the cached reply
104
105 fr_rb_node_t dedup; //!< user managed dedup node
106
107 union {
108 struct {
109 fr_rb_node_t node; //!< for the expiry timers
110 };
111 FR_DLIST_ENTRY(fr_bio_dedup_list) entry; //!< for the free list
112 };
113
114 fr_bio_dedup_t *my; //!< so we can get to it from the event timer callback
115
116 fr_time_t expires; //!< when this entry expires
117 fr_bio_dedup_state_t state; //!< which tree or list this item is in
118};
119
120FR_DLIST_FUNCS(fr_bio_dedup_list, fr_bio_dedup_entry_t, entry)
121
124
126
127 fr_rb_tree_t rb; //!< expire list
128
130
132
133 /*
134 * The "first" entry is cached here so that we can detect when it changes. The insert / delete
135 * code can just do its work without worrying about timers. And then when the tree manipulation
136 * is done, call the fr_bio_dedup_timer_reset() function to reset (or not) the timer.
137 *
138 * The timer is set for is when the first packet expires.
139 */
141
142 /*
143 * Cache a partial write when IO is blocked.
144 *
145 * When the IO is blocked, we can still expire old entries, unlike the "retry" BIOs. This is
146 * because we're not resending packets, we're just cleaning up *sent* packets when they expire.
147 */
149
150 fr_bio_dedup_receive_t receive; //!< called when we receive a potentially new packet
151 fr_bio_dedup_release_t release; //!< called to release a packet
152
153 fr_bio_dedup_get_item_t get_item; //!< turn a packet_ctx into a #fr_bio_dedup_entry_t
154
156
157 FR_DLIST_HEAD(fr_bio_dedup_list) active; //!< received but not yet replied
158 FR_DLIST_HEAD(fr_bio_dedup_list) pending; //!< trying to write when the socket is blocked.
159 FR_DLIST_HEAD(fr_bio_dedup_list) free; //!< free list
160};
161
162static void fr_bio_dedup_timer(UNUSED fr_timer_list_t *el, fr_time_t now, void *uctx);
163static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
167
169{
170 if (my->first != item) return;
171
172 my->first = NULL;
174}
175
176/** Move an item from active to replied.
177 *
178 * Note that we don't update any timers. The caller is responsible for that.
179 */
181{
182 if (item->state == FR_BIO_DEDUP_STATE_REPLIED) return;
183
185
186 (void) fr_bio_dedup_list_remove(&my->active, item);
187
188 /*
189 * Now that we have a reply, set the default expiry time. The caller can always call
190 * fr_bio_dedup_entry_extend() to change the expiry time.
191 */
192 item->expires = fr_time_add_time_delta(fr_time(), my->config.lifetime);
193
194 /*
195 * This should never fail.
196 */
197 (void) fr_rb_insert(&my->rb, item);
199}
200
201/** Resend a reply when we receive a duplicate request.
202 *
203 * This function should be called by the respond() callback to re-send a duplicate reply.
204 *
205 * It can also be called by the application when it first has a response to the request.
206 *
207 * @param bio the binary IO handler
208 * @param item the dedup context from #fr_bio_dedup_sent_t
209 * @return
210 * - <0 on error
211 * - 0 for "wrote no data"
212 * - >0 for "wrote data".
213 */
215{
216 ssize_t rcode;
217 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
218 fr_bio_t *next;
219
220 if (!item->reply || !item->reply_size) return 0;
221
222 switch (item->state) {
223 /*
224 * Send a first reply if we can.
225 */
227 /*
228 * If we're not writing to the socket, just insert the packet into the pending list.
229 */
230 if (my->bio.write != fr_bio_dedup_write) {
231 (void) fr_bio_dedup_list_remove(&my->active, item);
232 fr_bio_dedup_list_insert_tail(&my->pending, item);
233
235 item->expires = fr_time_add_time_delta(fr_time(), my->config.lifetime);
236 return item->reply_size;
237 }
238
239 /*
240 * The socket is writable, go do that.
241 */
242 break;
243
244 /*
245 * Send a duplicate reply.
246 */
248 if (my->bio.write == fr_bio_dedup_write) break;
249
250 /*
251 * The socket is blocked. Save the packet in the pending queue.
252 */
253 move_to_pending:
254 fr_rb_remove_by_inline_node(&my->rb, &item->node);
255
256 save_in_pending:
257 /*
258 * We could update the timer for pending packets. However, that's more complicated.
259 *
260 * The packets will be expire when the pending queue is flushed, OR when the application
261 * cancels the pending packet.
262 */
264
265 fr_bio_dedup_list_insert_tail(&my->pending, item);
267 return item->reply_size;
268
269 /*
270 * We're already trying to write this entry, don't do anything else.
271 */
274
276 return fr_bio_error(IO_WOULD_BLOCK);
277
280 fr_assert(0);
281 return fr_bio_error(GENERIC);
282 }
283
284 /*
285 * There must be a next bio.
286 */
287 next = fr_bio_next(&my->bio);
288 fr_assert(next != NULL);
289
290 /*
291 * Write out the packet, if everything is OK, return.
292 */
293 rcode = next->write(next, item->reply_ctx, item->reply, item->reply_size);
294 if ((size_t) rcode == item->reply_size) {
296 return rcode;
297 }
298
299 /*
300 * Can't write anything, be sad.
301 */
302 if ((rcode == 0) || (rcode == fr_bio_error(IO_WOULD_BLOCK))) {
303 if (item->state == FR_BIO_DEDUP_STATE_ACTIVE) {
304 (void) fr_bio_dedup_list_remove(&my->active, item);
305 goto save_in_pending;
306 }
307
309 goto move_to_pending;
310 }
311
312 /*
313 * There's an error writing the packet. Release it, and move the item to the free list.
314 *
315 * Note that we don't bother resetting the timer. There's no point in changing the timer when
316 * the bio is likely dead.
317 */
318 if (rcode < 0) {
320 return rcode;
321 }
322
323 /*
324 * We are writing item->reply, and that's blocked. Save the partial packet for later.
325 */
326 return fr_bio_dedup_blocked(my, item, rcode);
327}
328
329/** Reset the timer after changing the rb tree.
330 *
331 */
333{
335
336 /*
337 * Nothing to do, don't set any timers.
338 */
339 first = fr_rb_first(&my->rb);
340 if (!first) {
342 my->ev = NULL;
343 my->first = NULL;
344 return 0;
345 }
346
347 /*
348 * We don't care about partially written packets. The timer remains set even when we have a
349 * partial outgoing packet, because we can expire entries which aren't partially written.
350 *
351 * However, the partially written packet MUST NOT be in the expiry tree.
352 *
353 * We also don't care about the pending list. The application will either cancel the item, or
354 * the socket will become writable, and the item will get handled then.
355 */
356 fr_assert(first != my->partial);
357
358 /*
359 * The timer is already set correctly, we're done.
360 */
361 if (first == my->first) return 0;
362
363 /*
364 * Update the timer. This should never fail.
365 */
366 if (fr_timer_at(my, my->el->tl, &my->ev, first->expires, false, fr_bio_dedup_timer, my) < 0) return -1;
367
368 my->first = first;
369 return 0;
370}
371
372/** Release an entry back to the free list.
373 *
374 */
376{
377 my->release((fr_bio_t *) my, item, reason);
378
379 switch (item->state) {
380 /*
381 * Cancel an active item, just nuke it.
382 */
384 fr_bio_dedup_list_remove(&my->active, item);
385 break;
386
387 /*
388 * We already replied, remove it from the expiry tree.
389 *
390 * We only update the timer if the caller isn't already expiring the entry.
391 */
393 (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
394
396 break;
397
398 /*
399 * It was pending another write, so we just discard the write.
400 */
402 fr_bio_dedup_list_remove(&my->pending, item);
403 break;
404
405 /*
406 * Don't free it. Just set its state to cancelled.
407 */
409 fr_assert(my->partial == item);
411 return;
412
415 fr_assert(0);
416 return;
417 }
418
419#ifndef NDEBUG
420 item->packet = NULL;
421#endif
422 item->uctx = NULL;
423 item->packet_ctx = NULL;
424
425 fr_assert(my->first != item);
426 fr_bio_dedup_list_insert_head(&my->free, item);
427}
428
429/** Flush any packets in the pending queue.
430 *
431 */
433{
434 ssize_t rcode, out_rcode;
436 fr_bio_t *next;
437 fr_time_t now;
438
439 /*
440 * We can only flush the pending list when any previous partial packet has been written.
441 */
442 fr_assert(!my->partial);
443
444 /*
445 * Nothing in the list, we're done.
446 */
447 if (fr_bio_dedup_list_num_elements(&my->pending) == 0) return 0;
448
449 /*
450 * There must be a next bio.
451 */
452 next = fr_bio_next(&my->bio);
453 fr_assert(next != NULL);
454
455 now = fr_time();
456 out_rcode = 0;
457
458 /*
459 * Write out any pending packets.
460 */
461 while ((item = fr_bio_dedup_list_pop_head(&my->pending)) != NULL) {
463
464 /*
465 * It's already expired, don't bother replying.
466 */
467 if (fr_time_lteq(item->expires, now)) {
469 continue;
470 }
471
472 /*
473 * Write the entry to the next bio.
474 */
475 rcode = next->write(next, item->reply_ctx, item->reply, item->reply_size);
476 if (rcode <= 0) return rcode; /* @todo - update timer if we've written one packet */
477
478 /*
479 * We've written the entire packet, move it to the expiry list.
480 */
481 if ((size_t) rcode == item->reply_size) {
482 (void) fr_rb_insert(&my->rb, item);
484 continue;
485 }
486
488
489 out_rcode = fr_bio_error(IO_WOULD_BLOCK);
490 break;
491 }
492
493 /*
494 * We may need to update the timer if we've removed the first entry from the tree, or added a new
495 * first entry.
496 */
497 if (!my->first || (my->first != fr_rb_first(&my->rb))) {
498 my->first = NULL;
500 }
501
502 return out_rcode;
503}
504
505/** Save partially written data to our local buffer.
506 *
507 */
508static int fr_bio_dedup_buffer_save(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
509{
510 /*
511 * (re)-alloc the buffer for partial writes.
512 */
513 if (!my->buffer.start ||
514 (size > fr_bio_buf_size(&my->buffer))) {
515 if (fr_bio_buf_alloc(my, &my->buffer, size) < 0) return -1;
516 }
517
518 fr_assert(fr_bio_buf_used(&my->buffer) == 0);
519 fr_assert(my->buffer.read == my->buffer.start);
520
521 fr_bio_buf_write(&my->buffer, buffer + rcode, size - rcode);
522
523 return 0;
524}
525
526/** Write data from our local buffer to the next bio.
527 *
528 */
530{
531 size_t used;
532 ssize_t rcode;
533 fr_bio_t *next;
534
535 fr_assert(my->buffer.start);
536
537 used = fr_bio_buf_used(&my->buffer);
538 fr_assert(used > 0);
539
540 /*
541 * There must be a next bio.
542 */
543 next = fr_bio_next(&my->bio);
544 fr_assert(next != NULL);
545
546 rcode = next->write(next, NULL, my->buffer.read, used);
547 if (rcode <= 0) return rcode;
548
549 my->buffer.read += rcode;
550
551 /*
552 * Still data in the buffer. We can't send more packets until we finish writing this one.
553 */
554 if (fr_bio_buf_used(&my->buffer) > 0) return 0;
555
556 /*
557 * We're done. Reset the buffer and clean up our cached partial packet.
558 */
559 fr_bio_buf_reset(&my->buffer);
560
561 return rcode;
562}
563
564/** There's a partial packet written. Write all of that one first, before writing another packet.
565 *
566 * The packet can either be cancelled, or IO blocked. In either case, we must write this packet before
567 * we can write another one.
568 */
569static ssize_t fr_bio_dedup_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
570{
571 ssize_t rcode;
572 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
573 fr_bio_dedup_entry_t *item = my->partial;
574
575 fr_assert(my->partial != NULL);
576 fr_assert(my->buffer.start);
577
580
582 if (rcode <= 0) return rcode;
583
584 my->partial = NULL;
585
586 /*
587 * Partial writes are removed from the expiry tree until they're fully written. When they're
588 * written, either add it back to the tree if it's still operational, or add it to the free list
589 * if it has been cancelled.
590 */
591 if (item->state == FR_BIO_DEDUP_STATE_PARTIAL) {
592
593 /*
594 * See if we have to clean up this entry. If so, do it now. That avoids another bounce
595 * through the event loop.
596 */
597 if (fr_time_lteq(item->expires, fr_time())) {
599
600 } else {
601 /*
602 * We've changed the tree, so update the timer. fr_bio_dedup_write() only
603 * updates the timer on successful write.
604 */
606 (void) fr_rb_insert(&my->rb, item);
607 }
609
610 } else {
611 /*
612 * The item was cancelled, add it to the free list.
613 */
614#ifndef NDEBUG
615 item->packet = NULL;
616#endif
617 item->uctx = NULL;
618 item->packet_ctx = NULL;
619
621 fr_bio_dedup_list_insert_head(&my->free, item);
622 }
623
624 /*
625 * Flush any packets which were pending during the blocking period.
626 */
628 if (rcode < 0) return rcode;
629
630 /*
631 * Unlike the retry BIO, we don't retry writes for items in the RB tree. Those packets have already
632 * been written.
633 */
634
635 /*
636 * Try to write the packet which we were given.
637 */
638 my->bio.write = fr_bio_dedup_write;
639 return fr_bio_dedup_write(bio, packet_ctx, buffer, size);
640}
641
642/** The write is blocked.
643 *
644 * We couldn't write out the entire packet, the bio is blocked. Don't write anything else until we become
645 * unblocked!
646 *
647 * Do NOT free the timer. We can still expire old entries. This newly written entry usually ends up as the
648 * _last_ item in the RB tree.
649 */
651{
652 fr_assert(!my->partial);
653 fr_assert(rcode > 0);
654 fr_assert((size_t) rcode < item->reply_size);
655
656 if (fr_bio_dedup_buffer_save(my, item->reply, item->reply_size, rcode) < 0) return fr_bio_error(OOM);
657
658 switch (item->state) {
660 (void) fr_bio_dedup_list_remove(&my->active, item);
661 break;
662
663 /*
664 * We cannot expire this entry, so remove it from the expiration tree. That step lets us
665 * expire other entries.
666 */
668 (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
670 break;
671
672 /*
673 * We tried to write a pending packet and got blocked.
674 */
676 fr_assert(fr_bio_dedup_list_head(&my->pending) == item);
677 (void) fr_bio_dedup_list_remove(&my->pending, item);
678 break;
679
680 /*
681 * None of these states should be possible.
682 */
686 fr_assert(0);
687 return fr_bio_error(GENERIC);
688 }
689
690 my->partial = item;
692
693 /*
694 * Reset the write routine, so that if the application tries any more writes, the partial entry
695 * gets written first.
696 */
697 my->bio.write = fr_bio_dedup_write_partial;
698 return rcode;
699}
700
701/** There's a partial block of data written. Write all of that data first, before writing another packet.
702 */
703static ssize_t fr_bio_dedup_write_data(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
704{
705 ssize_t rcode;
706 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
707
708 fr_assert(!my->partial);
709
710 /*
711 * Flush out any partly written data.
712 */
714 if (rcode <= 0) return rcode;
715
716 /*
717 * Flush any packets which were pending during the blocking period.
718 */
720 if (rcode < 0) return rcode;
721
722 /*
723 * Try to write the packet which we were given.
724 */
725 my->bio.write = fr_bio_dedup_write;
726 return fr_bio_dedup_write(bio, packet_ctx, buffer, size);
727}
728
729
730/** The write is blocked, but we don't have "item".
731 *
732 * We couldn't write out the entire packet, the bio is blocked. Don't write anything else until we become
733 * unblocked!
734 *
735 * Do NOT free the timer. We can still expire old entries. This newly written entry usually ends up as the
736 * _last_ item in the RB tree.
737 */
739{
740 fr_assert(!my->partial);
741 fr_assert(rcode > 0);
742 fr_assert((size_t) rcode < size);
743
744 if (fr_bio_dedup_buffer_save(my, buffer, size, rcode) < 0) return fr_bio_error(OOM);
745
746 /*
747 * Reset the write routine, so that if the application tries any more writes, the data
748 * gets written first.
749 */
750 my->bio.write = fr_bio_dedup_write_data;
751 return rcode;
752}
753
754/*
755 * There is no fr_bio_dedup_rewrite(), packets are never re-written by this bio.
756 */
757
758/** Expire an entry when its timer fires.
759 *
760 * @todo - expire items from the pending list, too
761 */
762static void fr_bio_dedup_timer(UNUSED fr_timer_list_t *tl, fr_time_t now, void *uctx)
763{
764 fr_bio_dedup_t *my = talloc_get_type_abort(uctx, fr_bio_dedup_t);
766 fr_time_t expires;
767
768 fr_assert(my->first != NULL);
769 fr_assert(fr_rb_first(&my->rb) == my->first);
770
771 my->first = NULL;
772
773 /*
774 * Expire all entries which are within 10ms of "now". That way we don't reset the event many
775 * times in short succession.
776 *
777 * @todo - also expire entries on the pending list?
778 */
779 expires = fr_time_add(now, fr_time_delta_from_msec(10));
780
781 while ((item = fr_rb_first(&my->rb)) != NULL) {
782 if (fr_time_gt(item->expires, expires)) break;
783
785 }
786
788}
789
790/** Write raw data to the bio.
791 *
792 * This function is largely a duplicate of fr_bio_dedup_respond(). Except due to the BIO API, it can be
793 * passed a NULL buffer (for flushing the BIOs), and it can't be passed a #fr_bio_dedup_entry_t, and instead
794 * has to be passed a "void *packet_ctx".
795 *
796 * The caller is free to ignore this function,
797 */
798static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
799{
800 ssize_t rcode;
802 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
803 fr_bio_t *next;
804
805 fr_assert(!my->partial);
806
807 /*
808 * There must be a next bio.
809 */
810 next = fr_bio_next(&my->bio);
811 fr_assert(next != NULL);
812
813 /*
814 * The caller is trying to flush partial data. But we don't have any partial data, so just call
815 * the next bio to flush it.
816 */
817 if (!buffer) {
818 return next->write(next, packet_ctx, NULL, size);
819 }
820
821 /*
822 * Write out the packet. If there's an error, OR we wrote nothing, return.
823 *
824 * Note that we don't mark the socket as blocked if the next bio didn't write anything. We want
825 * the caller to know that the write didn't succeed, and the caller takes care of managing the
826 * current packet. So there's no need for us to do that.
827 */
828 rcode = next->write(next, packet_ctx, buffer, size);
829 if (rcode <= 0) return rcode;
830
831 /*
832 * We need the item pointer to mark this entry as blocked. If that doesn't exist, then we try
833 * really hard to write out the un-tracked data.
834 */
835 item = NULL;
836 if (my->get_item) item = my->get_item(bio, packet_ctx);
837 if ((size_t) rcode == size) {
839 return rcode;
840 }
841
842 if (!item) return fr_bio_dedup_blocked_data(my, buffer, size, rcode);
843
844 fr_assert(item->reply_ctx == packet_ctx);
845 fr_assert(item->reply == buffer);
846 fr_assert(item->reply_size == size);
847
848 return fr_bio_dedup_blocked(my, item, rcode);
849}
850
851static ssize_t fr_bio_dedup_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
852{
853 ssize_t rcode;
855 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
856 fr_bio_t *next;
857
858 /*
859 * There must be a next bio.
860 */
861 next = fr_bio_next(&my->bio);
862 fr_assert(next != NULL);
863
864 /*
865 * Read the packet. If error or nothing, return immediately.
866 */
867 rcode = next->read(next, packet_ctx, buffer, size);
868 if (rcode <= 0) return rcode;
869
870 /*
871 * Get a free item
872 */
873 item = fr_bio_dedup_list_pop_head(&my->free);
874 if (!item) return fr_bio_error(OOM);
875
876 fr_assert(item->my == my);
878 .my = my,
879 .packet_ctx = packet_ctx,
880 .packet = buffer,
881 .packet_size = (size_t) rcode,
883 };
884
885 /*
886 * See if we want to receive this packet. If this isn't
887 * something we need to receive, then we just discard it.
888 *
889 * The "receive" function is responsible for looking in a local dedup tree to see if there's a
890 * cached reply. It's also responsible for calling the fr_bio_retry_respond() function to send
891 * a duplicate reply, and then return "don't receive" this packet.
892 *
893 * The application can alos call fr_bio_dedup_entry_extend() in order to extend the lifetime of a
894 * packet which has a cached response.
895 *
896 * If there's an active packet, then the receive() function should do whatever it needs to do in
897 * order to update the application for a duplicate packet. And then return "don't receive" for
898 * this packet.
899 *
900 * If we're NOT going to process this packet, then the item we just popped needs to get inserted
901 * back into the free list.
902 *
903 * The caller should cancel any conflicting packets by calling fr_bio_dedup_entry_cancel(). Note
904 * that for sanity, we don't re-use the previous #fr_bio_dedup_entry_t.
905 */
906 if (!my->receive(bio, item, packet_ctx)) {
908 fr_bio_dedup_list_insert_head(&my->free, item);
909 return 0;
910 }
911
912 fr_bio_dedup_list_insert_tail(&my->active, item);
913
914 return rcode;
915}
916
917static int8_t _entry_cmp(void const *one, void const *two)
918{
919 fr_bio_dedup_entry_t const *a = one;
920 fr_bio_dedup_entry_t const *b = two;
921
922 fr_assert(a->packet);
923 fr_assert(b->packet);
924
925 return fr_time_cmp(a->expires, b->expires);
926}
927
928/** Cancel one item.
929 *
930 * @param bio the binary IO handler
931 * @param item the dedup context from #fr_bio_dedup_respond_t
932 */
941
942/** Extend the expiry time for an entry
943 *
944 * @param bio the binary IO handler
945 * @param item the dedup context from #fr_bio_dedup_respond_t
946 * @param expires the new expiry time
947 * @return
948 * - <0 error
949 * - 0 success
950 */
952{
953 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
954
955 switch (item->state) {
957 return 0;
958
960 break;
961
962 /*
963 * Partially written or pending replies aren't in the expirty tree. We can just change their
964 * expiry time and be done.
965 */
968 item->expires = expires;
969 return 0;
970
973 fr_assert(0);
974 return fr_bio_error(GENERIC);
975 }
976
977 /*
978 * Change places in the tree.
979 */
980 item->expires = expires;
981 (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
982 (void) fr_rb_insert(&my->rb, item);
983
984 /*
985 * If we're not changing the first item, we don't need to change the timers.
986 *
987 * Otherwise we clear the "first" flag, so that the reset timer function will change the timer
988 * value.
989 */
990 if (my->first != item) return 0;
991
992 my->first = NULL;
993
995}
996
997
998/** Remove the dedup cache
999 *
1000 */
1002{
1005 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
1006
1007 talloc_const_free(my->ev);
1008
1009 /*
1010 * Cancel all outgoing packets. Don't bother updating the tree or the free list, as all of the
1011 * entries will be deleted when the memory is freed.
1012 */
1013 while ((item = fr_rb_iter_init_inorder(&my->rb, &iter)) != NULL) {
1014 fr_rb_iter_delete_inorder(&my->rb, &iter);
1015 my->release((fr_bio_t *) my, item, FR_BIO_DEDUP_CANCELLED);
1016 }
1017
1018 while ((item = fr_bio_dedup_list_pop_head(&my->active)) != NULL) {
1019 my->release((fr_bio_t *) my, item, FR_BIO_DEDUP_CANCELLED);
1020 }
1021
1022 while ((item = fr_bio_dedup_list_pop_head(&my->pending)) != NULL) {
1023 my->release((fr_bio_t *) my, item, FR_BIO_DEDUP_CANCELLED);
1024 }
1025
1026#ifndef NDEBUG
1027 my->ev = NULL;
1028 my->first = NULL;
1029#endif
1030
1031 return 0;
1032}
1033
1034/** Allocate a #fr_bio_dedup_t
1035 *
1036 */
1037fr_bio_t *fr_bio_dedup_alloc(TALLOC_CTX *ctx, size_t max_saved,
1038 fr_bio_dedup_receive_t receive,
1039 fr_bio_dedup_release_t release,
1040 fr_bio_dedup_get_item_t get_item,
1041 fr_bio_dedup_config_t const *cfg,
1042 fr_bio_t *next)
1043{
1044 size_t i;
1046 fr_bio_dedup_entry_t *items;
1047
1048 fr_assert(cfg->el);
1049
1050 /*
1051 * Limit to reasonable values.
1052 */
1053 if (!max_saved) return NULL;
1054 if (max_saved > 65536) return NULL;
1055
1056 my = talloc_zero(ctx, fr_bio_dedup_t);
1057 if (!my) return NULL;
1058
1059 /*
1060 * Allocate everything up front, to get better locality of reference, less memory fragmentation,
1061 * and better reuse of data structures.
1062 */
1063 items = talloc_array(my, fr_bio_dedup_entry_t, max_saved);
1064 if (!items) {
1065 talloc_free(my);
1066 return NULL;
1067 }
1068
1069 /*
1070 * Insert the entries into the free list in order.
1071 */
1072 fr_bio_dedup_list_init(&my->free);
1073
1074 for (i = 0; i < max_saved; i++) {
1075 items[i].my = my;
1076 items[i].state = FR_BIO_DEDUP_STATE_FREE;
1077 fr_bio_dedup_list_insert_tail(&my->free, &items[i]);
1078 }
1079
1080 fr_bio_dedup_list_init(&my->active);
1081 fr_bio_dedup_list_init(&my->pending);
1082
1083 (void) fr_rb_inline_init(&my->rb, fr_bio_dedup_entry_t, node, _entry_cmp, NULL);
1084
1085 my->receive = receive;
1086 my->release = release;
1087 my->get_item = get_item;
1088
1089 my->el = cfg->el;
1090 my->config = *cfg;
1091
1092 my->bio.write = fr_bio_dedup_write;
1093 my->bio.read = fr_bio_dedup_read;
1094
1095 fr_bio_chain(&my->bio, next);
1096
1097 my->priv_cb.shutdown = fr_bio_dedup_shutdown;
1098
1099 talloc_set_destructor((fr_bio_t *) my, fr_bio_destructor); /* always use a common destructor */
1100
1101 return (fr_bio_t *) my;
1102}
static int const char char buffer[256]
Definition acutest.h:578
fr_bio_write_t _CONST write
write to the underlying bio
Definition base.h:117
fr_bio_read_t _CONST read
read from the underlying bio
Definition base.h:116
static fr_bio_t * fr_bio_next(fr_bio_t *bio)
Definition base.h:131
#define fr_bio_error(_x)
Definition base.h:200
static void fr_bio_chain(fr_bio_t *first, fr_bio_t *second)
Chain one bio after another.
Definition bio_priv.h:84
int fr_bio_buf_alloc(TALLOC_CTX *ctx, fr_bio_buf_t *bio_buf, size_t size)
Definition buf.c:117
ssize_t fr_bio_buf_write(fr_bio_buf_t *bio_buf, const void *buffer, size_t size)
Definition buf.c:84
static size_t fr_bio_buf_used(fr_bio_buf_t const *bio_buf)
Definition buf.h:73
static void fr_bio_buf_reset(fr_bio_buf_t *bio_buf)
Definition buf.h:61
static size_t fr_bio_buf_size(fr_bio_buf_t const *bio_buf)
Definition buf.h:151
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition build.h:324
#define UNUSED
Definition build.h:317
ssize_t fr_bio_dedup_respond(fr_bio_t *bio, fr_bio_dedup_entry_t *item)
Resend a reply when we receive a duplicate request.
Definition dedup.c:214
static int fr_bio_dedup_shutdown(fr_bio_t *bio)
Remove the dedup cache.
Definition dedup.c:1001
fr_bio_dedup_release_t release
called to release a packet
Definition dedup.c:151
static void fr_bio_dedup_timer_reset_item(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item)
Definition dedup.c:168
static int fr_bio_dedup_buffer_save(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
Save partially written data to our local buffer.
Definition dedup.c:508
int fr_bio_dedup_entry_extend(fr_bio_t *bio, fr_bio_dedup_entry_t *item, fr_time_t expires)
Extend the expiry time for an entry.
Definition dedup.c:951
fr_bio_dedup_config_t config
Definition dedup.c:129
fr_bio_dedup_get_item_t get_item
turn a packet_ctx into a fr_bio_dedup_entry_t
Definition dedup.c:153
static ssize_t fr_bio_dedup_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
There's a partial packet written.
Definition dedup.c:569
void fr_bio_dedup_entry_cancel(fr_bio_t *bio, fr_bio_dedup_entry_t *item)
Cancel one item.
Definition dedup.c:933
static ssize_t fr_bio_dedup_write_data(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
There's a partial block of data written.
Definition dedup.c:703
fr_bio_dedup_entry_t * partial
Definition dedup.c:148
fr_event_list_t * el
Definition dedup.c:125
fr_bio_t * fr_bio_dedup_alloc(TALLOC_CTX *ctx, size_t max_saved, fr_bio_dedup_receive_t receive, fr_bio_dedup_release_t release, fr_bio_dedup_get_item_t get_item, fr_bio_dedup_config_t const *cfg, fr_bio_t *next)
Allocate a fr_bio_dedup_t.
Definition dedup.c:1037
static ssize_t fr_bio_dedup_flush_pending(fr_bio_dedup_t *my)
Flush any packets in the pending queue.
Definition dedup.c:432
static ssize_t fr_bio_dedup_blocked(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item, ssize_t rcode)
The write is blocked.
Definition dedup.c:650
static void fr_bio_dedup_timer(UNUSED fr_timer_list_t *el, fr_time_t now, void *uctx)
Expire an entry when its timer fires.
Definition dedup.c:762
fr_rb_tree_t rb
expire list
Definition dedup.c:127
static void fr_bio_dedup_replied(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item)
Move an item from active to replied.
Definition dedup.c:180
fr_bio_dedup_receive_t receive
called when we receive a potentially new packet
Definition dedup.c:150
static ssize_t fr_bio_dedup_blocked_data(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
The write is blocked, but we don't have "item".
Definition dedup.c:738
static int8_t _entry_cmp(void const *one, void const *two)
Definition dedup.c:917
static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write raw data to the bio.
Definition dedup.c:798
fr_bio_dedup_entry_t * first
Definition dedup.c:140
static ssize_t fr_bio_dedup_buffer_write(fr_bio_dedup_t *my)
Write data from our local buffer to the next bio.
Definition dedup.c:529
fr_bio_buf_t buffer
Definition dedup.c:155
struct fr_bio_dedup_list_s fr_bio_dedup_list_t
Definition dedup.c:73
fr_bio_dedup_state_t
Definition dedup.c:87
@ FR_BIO_DEDUP_STATE_PARTIAL
Partially written.
Definition dedup.c:92
@ FR_BIO_DEDUP_STATE_FREE
Definition dedup.c:88
@ FR_BIO_DEDUP_STATE_ACTIVE
Received, but not replied.
Definition dedup.c:89
@ FR_BIO_DEDUP_STATE_CANCELLED
Partially written, and then cancelled.
Definition dedup.c:93
@ FR_BIO_DEDUP_STATE_PENDING
Have a reply, but we're trying to write it out.
Definition dedup.c:90
@ FR_BIO_DEDUP_STATE_REPLIED
Replied, and waiting for it to expire.
Definition dedup.c:91
static int fr_bio_dedup_timer_reset(fr_bio_dedup_t *my)
Reset the timer after changing the rb tree.
Definition dedup.c:332
static ssize_t fr_bio_dedup_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
Definition dedup.c:851
fr_timer_t * ev
Definition dedup.c:131
static void fr_bio_dedup_release(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item, fr_bio_dedup_release_reason_t reason)
Release an entry back to the free list.
Definition dedup.c:375
void * uctx
user-writable context
Definition dedup.c:97
void(* fr_bio_dedup_release_t)(fr_bio_t *bio, fr_bio_dedup_entry_t *dedup_ctx, fr_bio_dedup_release_reason_t reason)
Callback on release the packet (timeout, or cancelled by the application)
Definition dedup.h:91
fr_event_list_t * el
event list
Definition dedup.h:37
fr_bio_dedup_t * my
so we can get to it from the event timer callback
Definition dedup.c:114
fr_time_t expires
when this entry expires
Definition dedup.c:116
size_t packet_size
size of the cached packet data
Definition dedup.c:100
fr_bio_dedup_state_t state
which tree or list this item is in
Definition dedup.c:117
struct fr_bio_dedup_entry_s fr_bio_dedup_entry_t
Definition dedup.h:42
void * packet_ctx
packet_ctx for dedup purposes
Definition dedup.c:98
uint8_t * reply
reply cached by the application
Definition dedup.c:102
size_t reply_size
size of the cached reply
Definition dedup.c:103
void * reply_ctx
reply ctx
Definition dedup.c:101
uint8_t * packet
cached packet data for finding duplicates
Definition dedup.c:99
fr_rb_node_t dedup
user managed dedup node
Definition dedup.c:105
bool(* fr_bio_dedup_receive_t)(fr_bio_t *bio, fr_bio_dedup_entry_t *dedup_ctx, void *packet_ctx)
Callback on read to see if we should receive the packet.
Definition dedup.h:78
fr_bio_dedup_entry_t *(* fr_bio_dedup_get_item_t)(fr_bio_t *bio, void *packet_ctx)
Definition dedup.h:93
fr_bio_dedup_release_reason_t
Definition dedup.h:58
@ FR_BIO_DEDUP_WRITE_ERROR
Definition dedup.h:61
@ FR_BIO_DEDUP_CANCELLED
Definition dedup.h:60
@ FR_BIO_DEDUP_EXPIRED
Definition dedup.h:59
Definition dedup.c:96
#define FR_DLIST_TYPES(_name)
Define type specific wrapper structs for dlists.
Definition dlist.h:1111
#define FR_DLIST_ENTRY(_name)
Expands to the type name used for the entry wrapper structure.
Definition dlist.h:1097
#define FR_DLIST_FUNCS(_name, _element_type, _element_entry)
Define type specific wrapper functions for dlists.
Definition dlist.h:1134
#define FR_DLIST_HEAD(_name)
Expands to the type name used for the head wrapper structure.
Definition dlist.h:1104
void fr_bio_shutdown & my
Definition fd_errno.h:70
free(array)
talloc_free(hp)
int fr_bio_destructor(fr_bio_t *bio)
Free this bio.
Definition base.c:35
#define fr_time()
Definition event.c:60
Stores all information relating to an event list.
Definition event.c:377
static void * item(fr_lst_t const *lst, fr_lst_index_t idx)
Definition lst.c:122
long int ssize_t
unsigned char uint8_t
unsigned long int size_t
static size_t used
#define fr_assert(_expr)
Definition rad_assert.h:38
void * fr_rb_iter_init_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Initialise an in-order iterator.
Definition rb.c:824
void fr_rb_iter_delete_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Remove the current node from the tree.
Definition rb.c:899
void * fr_rb_remove_by_inline_node(fr_rb_tree_t *tree, fr_rb_node_t *node)
Remove an entry from the tree, using the node structure, without freeing the data.
Definition rb.c:722
void * fr_rb_first(fr_rb_tree_t *tree)
Definition rb.c:786
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
#define fr_rb_inline_init(_tree, _type, _field, _data_cmp, _data_free)
Initialises a red black tree.
Definition rb.h:180
Iterator structure for in-order traversal of an rbtree.
Definition rb.h:321
The main red black tree structure.
Definition rb.h:73
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition talloc.h:259
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition time.h:575
static fr_time_t fr_time_add_time_delta(fr_time_t a, fr_time_delta_t b)
Definition time.h:173
#define fr_time_lteq(_a, _b)
Definition time.h:240
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_gt(_a, _b)
Definition time.h:237
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition time.h:916
"server local" time.
Definition time.h:69
An event timer list.
Definition timer.c:50
A timer event.
Definition timer.c:84
#define fr_timer_at(...)
Definition timer.h:81
static fr_event_list_t * el