The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
dedup.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 38c41bb754b0706aaadc23f61f0bab0c9a440a6a $
19 * @file lib/bio/dedup.c
20 * @brief Binary IO abstractions for deduping packets.
21 *
22 * The dedup BIO receives packets from the network, and allows for deduplication of requests, so that
23 * duplicate requests are only processed once. In addition, if a reply is available, a duplicate request will
24 * result in a duplicate reply. The actual deduplication tree / table has to be maintained by the calling
25 * application, as packet comparisons for deduplication is very protocol-specific. The purpose of the dedup
26 * BIO is to abstract all of the common support functions around this limitation.
27 *
28 * When packets are read() from the next bio, the #fr_bio_dedup_receive_t callback is run. It tells the BIO
29 * whether or not the packet should be received, and whether or not the packet should be returned to the
30 * caller. The receive callback is also passed a #fr_bio_dedup_entry_t pointer, where the packet_ctx, packet,
31 * and size are already filled out. This entry is used to correlate requests and replies.
32 *
33 * When packets are write() to the network, the #fr_bio_dedup_get_item_t callback is called to get the
34 * previously cached #fr_bio_dedup_entry_t pointer. This is because there is no generic way to get an
35 * additional context to this BIO via the write() routine. i.e. the packet_ctx for write() may include things
36 * like src/dst ip/port, and therefore can't always be an #fr_bio_dedup_entry_t. The caller should associate
37 * the #fr_bio_dedup_entry_t with the packet_ctx for the reply. The get_item() routine can then return the entry.
38 *
39 * For simplicity, the next bio should be a memory one. That way the read() can read more than one packet if
40 * necessary. And the write() can cache a partial packet if it blocks.
41 *
42 * The entry needs to be cached in order to maintain the internal tracking used by the dedup BIO.
43 *
44 * On client retransmit, the #fr_bio_dedup_receive_t callback is run, just as if it is a new packet. The
45 * dedup BIO does not know if the received data is a new packet until the #fr_bio_dedup_receive_t callback
46 * says so. On duplicate client request, the #fr_bio_dedup_receive_t callback can call fr_bio_dedup_respond()
47 * to send a duplicate reply. That call bypasses the normal dedup stack, and writes directly to the next bio.
48 *
49 * The calling application can also call fr_bio_dedup_respond() as soon as it has a reply. i.e. skip the BIO
50 * write() call. That works, and is safe.
51 *
52 * The dedup BIO tracks a number of lists / trees internally. Packets which are received but which have no
53 * reply are in an "active" list. Packets which have a reply are in an "expired" RB tree, where a timer is
54 * set to expire packets. If a write() call results in a partial write, that packet is put into a "partially
55 * written" state. If multiple calls to write() are done when writing is blocked, the replies are put into a
56 * "pending" state.
57 *
58 * The calling application can always call fr_bio_dedup_cancel() to cancel or expire a packet. This call is
59 * safe, and can be made at any time, no matter what state the packet is in.
60 *
61 * @copyright 2024 Network RADIUS SAS (legal@networkradius.com)
62 */
63
64#include <freeradius-devel/bio/bio_priv.h>
65#include <freeradius-devel/bio/null.h>
66#include <freeradius-devel/bio/buf.h>
67#include <freeradius-devel/util/rb.h>
68#include <freeradius-devel/util/dlist.h>
69
70#define _BIO_DEDUP_PRIVATE
71#include <freeradius-devel/bio/dedup.h>
72
73typedef struct fr_bio_dedup_list_s fr_bio_dedup_list_t;
75
76/*
77 * There is substantial similarity between this code and the
78 * "retry" bio. Any fixes which are done here should be checked
79 * there, and vice versa.
80 */
81
82/*
83 * Define type-safe wrappers for head and entry definitions.
84 */
85FR_DLIST_TYPES(fr_bio_dedup_list)
86
87typedef enum {
89 FR_BIO_DEDUP_STATE_ACTIVE, //!< Received, but not replied.
90 FR_BIO_DEDUP_STATE_PENDING, //!< Have a reply, but we're trying to write it out.
91 FR_BIO_DEDUP_STATE_REPLIED, //!< Replied, and waiting for it to expire.
92 FR_BIO_DEDUP_STATE_PARTIAL, //!< Partially written
93 FR_BIO_DEDUP_STATE_CANCELLED, //!< Partially written, and then cancelled.
95
97 void *uctx;
99 uint8_t *packet; //!< cached packet data for finding duplicates
100 size_t packet_size; //!< size of the cached packet data
101 void *reply_ctx; //!< reply ctx
102 uint8_t *reply; //!< reply cached by the application
103 size_t reply_size; //!< size of the cached reply
104
105 fr_rb_node_t dedup; //!< user managed dedup node
106
107 union {
108 struct {
109 fr_rb_node_t node; //!< for the expiry timers
110 };
111 FR_DLIST_ENTRY(fr_bio_dedup_list) entry; //!< for the free list
112 };
113
114 fr_bio_dedup_t *my; //!< so we can get to it from the event timer callback
115
116 fr_time_t expires; //!< when this entry expires
117 fr_bio_dedup_state_t state; //!< which tree or list this item is in
118};
119
120FR_DLIST_FUNCS(fr_bio_dedup_list, fr_bio_dedup_entry_t, entry)
121
124
126
127 fr_rb_tree_t rb; //!< expire list
128
130
132
133 /*
134 * The "first" entry is cached here so that we can detect when it changes. The insert / delete
135 * code can just do its work without worrying about timers. And then when the tree manipulation
136 * is done, call the fr_bio_dedup_timer_reset() function to reset (or not) the timer.
137 *
138 * The timer is set for is when the first packet expires.
139 */
141
142 /*
143 * Cache a partial write when IO is blocked.
144 *
145 * When the IO is blocked, we can still expire old entries, unlike the "retry" BIOs. This is
146 * because we're not resending packets, we're just cleaning up *sent* packets when they expire.
147 */
149
150 fr_bio_dedup_receive_t receive; //!< called when we receive a potentially new packet
151 fr_bio_dedup_release_t release; //!< called to release a packet
152
153 fr_bio_dedup_get_item_t get_item; //!< turn a packet_ctx into a #fr_bio_dedup_entry_t
154
156
157 FR_DLIST_HEAD(fr_bio_dedup_list) active; //!< received but not yet replied
158 FR_DLIST_HEAD(fr_bio_dedup_list) pending; //!< trying to write when the socket is blocked.
159 FR_DLIST_HEAD(fr_bio_dedup_list) free; //!< free list
160};
161
162static void fr_bio_dedup_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx);
163static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
167
169{
170 if (my->first != item) return;
171
172 my->first = NULL;
174}
175
176/** Move an item from active to replied.
177 *
178 * Note that we don't update any timers. The caller is responsible for that.
179 */
181{
182 if (item->state == FR_BIO_DEDUP_STATE_REPLIED) return;
183
185
186 (void) fr_bio_dedup_list_remove(&my->active, item);
187
188 /*
189 * Now that we have a reply, set the default expiry time. The caller can always call
190 * fr_bio_dedup_entry_extend() to change the expiry time.
191 */
192 item->expires = fr_time_add_time_delta(fr_time(), my->config.lifetime);
193
194 /*
195 * This should never fail.
196 */
197 (void) fr_rb_insert(&my->rb, item);
199}
200
201/** Resend a reply when we receive a duplicate request.
202 *
203 * This function should be called by the respond() callback to re-send a duplicate reply.
204 *
205 * It can also be called by the application when it first has a response to the request.
206 *
207 * @param bio the binary IO handler
208 * @param item the dedup context from #fr_bio_dedup_sent_t
209 * @return
210 * - <0 on error
211 * - 0 for "wrote no data"
212 * - >0 for "wrote data".
213 */
215{
216 ssize_t rcode;
217 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
218 fr_bio_t *next;
219
220 if (!item->reply || !item->reply_size) return 0;
221
222 switch (item->state) {
223 /*
224 * Send a first reply if we can.
225 */
227 /*
228 * If we're not writing to the socket, just insert the packet into the pending list.
229 */
230 if (my->bio.write != fr_bio_dedup_write) {
231 (void) fr_bio_dedup_list_remove(&my->active, item);
232 fr_bio_dedup_list_insert_tail(&my->pending, item);
233
235 item->expires = fr_time_add_time_delta(fr_time(), my->config.lifetime);
236 return item->reply_size;
237 }
238
239 /*
240 * The socket is writable, go do that.
241 */
242 break;
243
244 /*
245 * Send a duplicate reply.
246 */
248 if (my->bio.write == fr_bio_dedup_write) break;
249
250 /*
251 * The socket is blocked. Save the packet in the pending queue.
252 */
253 move_to_pending:
254 fr_rb_remove_by_inline_node(&my->rb, &item->node);
255
256 save_in_pending:
257 /*
258 * We could update the timer for pending packets. However, that's more complicated.
259 *
260 * The packets will be expire when the pending queue is flushed, OR when the application
261 * cancels the pending packet.
262 */
264
265 fr_bio_dedup_list_insert_tail(&my->pending, item);
267 return item->reply_size;
268
269 /*
270 * We're already trying to write this entry, don't do anything else.
271 */
273 fr_assert(my->partial != NULL);
275
277 return fr_bio_error(IO_WOULD_BLOCK);
278
281 fr_assert(0);
282 return fr_bio_error(GENERIC);
283 }
284
285 /*
286 * There must be a next bio.
287 */
288 next = fr_bio_next(&my->bio);
289 fr_assert(next != NULL);
290
291 /*
292 * Write out the packet, if everything is OK, return.
293 */
294 rcode = next->write(next, item->reply_ctx, item->reply, item->reply_size);
295 if ((size_t) rcode == item->reply_size) {
297 return rcode;
298 }
299
300 /*
301 * Can't write anything, be sad.
302 */
303 if ((rcode == 0) || (rcode == fr_bio_error(IO_WOULD_BLOCK))) {
304 if (item->state == FR_BIO_DEDUP_STATE_ACTIVE) {
305 (void) fr_bio_dedup_list_remove(&my->active, item);
306 goto save_in_pending;
307 }
308
310 goto move_to_pending;
311 }
312
313 /*
314 * There's an error writing the packet. Release it, and move the item to the free list.
315 *
316 * Note that we don't bother resetting the timer. There's no point in changing the timer when
317 * the bio is likely dead.
318 */
319 if (rcode < 0) {
321 return rcode;
322 }
323
324 /*
325 * We are writing item->reply, and that's blocked. Save the partial packet for later.
326 */
327 return fr_bio_dedup_blocked(my, item, rcode);
328}
329
330/** Reset the timer after changing the rb tree.
331 *
332 */
334{
336
337 /*
338 * Nothing to do, don't set any timers.
339 */
340 first = fr_rb_first(&my->rb);
341 if (!first) {
343 my->ev = NULL;
344 my->first = NULL;
345 return 0;
346 }
347
348 /*
349 * We don't care about partially written packets. The timer remains set even when we have a
350 * partial outgoing packet, because we can expire entries which aren't partially written.
351 *
352 * However, the partially written packet MUST NOT be in the expiry tree.
353 *
354 * We also don't care about the pending list. The application will either cancel the item, or
355 * the socket will become writable, and the item will get handled then.
356 */
357 fr_assert(first != my->partial);
358
359 /*
360 * The timer is already set correctly, we're done.
361 */
362 if (first == my->first) return 0;
363
364 /*
365 * Update the timer. This should never fail.
366 */
367 if (fr_event_timer_at(my, my->el, &my->ev, first->expires, fr_bio_dedup_timer, my) < 0) return -1;
368
369 my->first = first;
370 return 0;
371}
372
373/** Release an entry back to the free list.
374 *
375 */
377{
378 my->release((fr_bio_t *) my, item, reason);
379
380 switch (item->state) {
381 /*
382 * Cancel an active item, just nuke it.
383 */
385 fr_bio_dedup_list_remove(&my->active, item);
386 break;
387
388 /*
389 * We already replied, remove it from the expiry tree.
390 *
391 * We only update the timer if the caller isn't already expiring the entry.
392 */
394 (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
395
397 break;
398
399 /*
400 * It was pending another write, so we just discard the write.
401 */
403 fr_bio_dedup_list_remove(&my->active, item);
404 break;
405
406 /*
407 * Don't free it. Just set its state to cancelled.
408 */
410 fr_assert(my->partial == item);
412 return;
413
416 fr_assert(0);
417 return;
418 }
419
420#ifndef NDEBUG
421 item->packet = NULL;
422#endif
423 item->uctx = NULL;
424 item->packet_ctx = NULL;
425
426 fr_assert(my->first != item);
427 fr_bio_dedup_list_insert_head(&my->free, item);
428}
429
430/** Flush any packets in the pending queue.
431 *
432 */
434{
435 ssize_t rcode, out_rcode;
437 fr_bio_t *next;
438 fr_time_t now;
439
440 /*
441 * We can only flush the pending list when any previous partial packet has been written.
442 */
443 fr_assert(!my->partial);
444
445 /*
446 * Nothing in the list, we're done.
447 */
448 if (fr_bio_dedup_list_num_elements(&my->pending) == 0) return 0;
449
450 /*
451 * There must be a next bio.
452 */
453 next = fr_bio_next(&my->bio);
454 fr_assert(next != NULL);
455
456 now = fr_time();
457 out_rcode = 0;
458
459 /*
460 * Write out any pending packets.
461 */
462 while ((item = fr_bio_dedup_list_pop_head(&my->pending)) != NULL) {
464
465 /*
466 * It's already expired, don't bother replying.
467 */
468 if (fr_time_lteq(item->expires, now)) {
470 continue;
471 }
472
473 /*
474 * Write the entry to the next bio.
475 */
476 rcode = next->write(next, item->reply_ctx, item->reply, item->reply_size);
477 if (rcode <= 0) return rcode; /* @todo - update timer if we've written one packet */
478
479 /*
480 * We've written the entire packet, move it to the expiry list.
481 */
482 if ((size_t) rcode == item->reply_size) {
483 (void) fr_bio_dedup_list_remove(&my->pending, item);
484 (void) fr_rb_insert(&my->rb, item);
486 continue;
487 }
488
490
491 out_rcode = fr_bio_error(IO_WOULD_BLOCK);
492 break;
493 }
494
495 /*
496 * We may need to update the timer if we've removed the first entry from the tree, or added a new
497 * first entry.
498 */
499 if (!my->first || (my->first != fr_rb_first(&my->rb))) {
500 my->first = NULL;
502 }
503
504 return out_rcode;
505}
506
507/** Save partially written data to our local buffer.
508 *
509 */
510static int fr_bio_dedup_buffer_save(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
511{
512 /*
513 * (re)-alloc the buffer for partial writes.
514 */
515 if (!my->buffer.start ||
516 (size > fr_bio_buf_size(&my->buffer))) {
517 if (fr_bio_buf_alloc(my, &my->buffer, size) < 0) return -1;
518 }
519
520 fr_assert(fr_bio_buf_used(&my->buffer) == 0);
521 fr_assert(my->buffer.read == my->buffer.start);
522
523 fr_bio_buf_write(&my->buffer, buffer + rcode, size - rcode);
524
525 return 0;
526}
527
528/** Write data from our local buffer to the next bio.
529 *
530 */
532{
533 size_t used;
534 ssize_t rcode;
535 fr_bio_t *next;
536
537 fr_assert(my->buffer.start);
538
539 used = fr_bio_buf_used(&my->buffer);
540 fr_assert(used > 0);
541
542 /*
543 * There must be a next bio.
544 */
545 next = fr_bio_next(&my->bio);
546 fr_assert(next != NULL);
547
548 rcode = next->write(next, NULL, my->buffer.read, used);
549 if (rcode <= 0) return rcode;
550
551 my->buffer.read += rcode;
552
553 /*
554 * Still data in the buffer. We can't send more packets until we finish writing this one.
555 */
556 if (fr_bio_buf_used(&my->buffer) > 0) return 0;
557
558 /*
559 * We're done. Reset the buffer and clean up our cached partial packet.
560 */
561 fr_bio_buf_reset(&my->buffer);
562
563 return rcode;
564}
565
566/** There's a partial packet written. Write all of that one first, before writing another packet.
567 *
568 * The packet can either be cancelled, or IO blocked. In either case, we must write this packet before
569 * we can write another one.
570 */
571static ssize_t fr_bio_dedup_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
572{
573 ssize_t rcode;
574 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
575 fr_bio_dedup_entry_t *item = my->partial;
576
577 fr_assert(my->partial != NULL);
578 fr_assert(my->buffer.start);
579
582
584 if (rcode <= 0) return rcode;
585
586 my->partial = NULL;
587
588 /*
589 * Partial writes are removed from the expiry tree until they're fully written. When they're
590 * written, either add it back to the tree if it's still operational, or add it to the free list
591 * if it has been cancelled.
592 */
593 if (item->state == FR_BIO_DEDUP_STATE_PARTIAL) {
594
595 /*
596 * See if we have to clean up this entry. If so, do it now. That avoids another bounce
597 * through the event loop.
598 */
599 if (fr_time_lteq(item->expires, fr_time())) {
601
602 } else {
603 /*
604 * We've changed the tree, so update the timer. fr_bio_dedup_write() only
605 * updates the timer on successful write.
606 */
608 (void) fr_rb_insert(&my->rb, item);
609 }
611
612 } else {
613 /*
614 * The item was cancelled, add it to the free list.
615 */
616#ifndef NDEBUG
617 item->packet = NULL;
618#endif
619 item->uctx = NULL;
620 item->packet_ctx = NULL;
621
623 fr_bio_dedup_list_insert_head(&my->free, item);
624 }
625
626 /*
627 * Flush any packets which were pending during the blocking period.
628 */
630 if (rcode < 0) return rcode;
631
632 /*
633 * Unlike the retry BIO, we don't retry writes for items in the RB tree. Those packets have already
634 * been written.
635 */
636
637 /*
638 * Try to write the packet which we were given.
639 */
640 my->bio.write = fr_bio_dedup_write;
641 return fr_bio_dedup_write(bio, packet_ctx, buffer, size);
642}
643
644/** The write is blocked.
645 *
646 * We couldn't write out the entire packet, the bio is blocked. Don't write anything else until we become
647 * unblocked!
648 *
649 * Do NOT free the timer. We can still expire old entries. This newly written entry usually ends up as the
650 * _last_ item in the RB tree.
651 */
653{
654 fr_assert(!my->partial);
655 fr_assert(rcode > 0);
656 fr_assert((size_t) rcode < item->reply_size);
657
658 if (fr_bio_dedup_buffer_save(my, item->reply, item->reply_size, rcode) < 0) return fr_bio_error(OOM);
659
660 switch (item->state) {
662 (void) fr_bio_dedup_list_remove(&my->active, item);
663 break;
664
665 /*
666 * We cannot expire this entry, so remove it from the expiration tree. That step lets us
667 * expire other entries.
668 */
670 (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
672 break;
673
674 /*
675 * We tried to write a pending packet and got blocked.
676 */
678 fr_assert(fr_bio_dedup_list_head(&my->pending) == item);
679 (void) fr_bio_dedup_list_remove(&my->pending, item);
680 break;
681
682 /*
683 * None of these states should be possible.
684 */
688 fr_assert(0);
689 return fr_bio_error(GENERIC);
690 }
691
692 my->partial = item;
694
695 /*
696 * Reset the write routine, so that if the application tries any more writes, the partial entry
697 * gets written first.
698 */
699 my->bio.write = fr_bio_dedup_write_partial;
700 return rcode;
701}
702
703/** There's a partial block of data written. Write all of that data first, before writing another packet.
704 */
705static ssize_t fr_bio_dedup_write_data(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
706{
707 ssize_t rcode;
708 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
709
710 fr_assert(!my->partial);
711
712 /*
713 * Flush out any partly written data.
714 */
716 if (rcode <= 0) return rcode;
717
718 /*
719 * Flush any packets which were pending during the blocking period.
720 */
722 if (rcode < 0) return rcode;
723
724 /*
725 * Try to write the packet which we were given.
726 */
727 my->bio.write = fr_bio_dedup_write;
728 return fr_bio_dedup_write(bio, packet_ctx, buffer, size);
729}
730
731
732/** The write is blocked, but we don't have "item".
733 *
734 * We couldn't write out the entire packet, the bio is blocked. Don't write anything else until we become
735 * unblocked!
736 *
737 * Do NOT free the timer. We can still expire old entries. This newly written entry usually ends up as the
738 * _last_ item in the RB tree.
739 */
741{
742 fr_assert(!my->partial);
743 fr_assert(rcode > 0);
744 fr_assert((size_t) rcode < size);
745
746 if (fr_bio_dedup_buffer_save(my, buffer, size, rcode) < 0) return fr_bio_error(OOM);
747
748 /*
749 * Reset the write routine, so that if the application tries any more writes, the data
750 * gets written first.
751 */
752 my->bio.write = fr_bio_dedup_write_data;
753 return rcode;
754}
755
756/*
757 * There is no fr_bio_dedup_rewrite(), packets are never re-written by this bio.
758 */
759
760/** Expire an entry when its timer fires.
761 *
762 * @todo - expire items from the pending list, too
763 */
765{
766 fr_bio_dedup_t *my = talloc_get_type_abort(uctx, fr_bio_dedup_t);
768 fr_time_t expires;
769
770 fr_assert(my->first != NULL);
771 fr_assert(fr_rb_first(&my->rb) == my->first);
772
773 my->first = NULL;
774
775 /*
776 * Expire all entries which are within 10ms of "now". That way we don't reset the event many
777 * times in short succession.
778 *
779 * @todo - also expire entries on the pending list?
780 */
781 expires = fr_time_add(now, fr_time_delta_from_msec(10));
782
783 while ((item = fr_rb_first(&my->rb)) != NULL) {
784 if (fr_time_gt(item->expires, expires)) break;
785
787 }
788
790}
791
792/** Write raw data to the bio.
793 *
794 * This function is largely a duplicate of fr_bio_dedup_respond(). Except due to the BIO API, it can be
795 * passed a NULL buffer (for flushing the BIOs), and it can't be passed a #fr_bio_dedup_entry_t, and instead
796 * has to be passed a "void *packet_ctx".
797 *
798 * The caller is free to ignore this function,
799 */
800static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
801{
802 ssize_t rcode;
804 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
805 fr_bio_t *next;
806
807 fr_assert(!my->partial);
808
809 /*
810 * There must be a next bio.
811 */
812 next = fr_bio_next(&my->bio);
813 fr_assert(next != NULL);
814
815 /*
816 * The caller is trying to flush partial data. But we don't have any partial data, so just call
817 * the next bio to flush it.
818 */
819 if (!buffer) {
820 return next->write(next, packet_ctx, NULL, size);
821 }
822
823 /*
824 * Write out the packet. If there's an error, OR we wrote nothing, return.
825 *
826 * Note that we don't mark the socket as blocked if the next bio didn't write anything. We want
827 * the caller to know that the write didn't succeed, and the caller takes care of managing the
828 * current packet. So there's no need for us to do that.
829 */
830 rcode = next->write(next, packet_ctx, buffer, size);
831 if (rcode <= 0) return rcode;
832
833 /*
834 * We need the item pointer to mark this entry as blocked. If that doesn't exist, then we try
835 * really hard to write out the un-tracked data.
836 */
837 item = NULL;
838 if (my->get_item) item = my->get_item(bio, packet_ctx);
839 if ((size_t) rcode == size) {
841 return rcode;
842 }
843
844 if (!item) return fr_bio_dedup_blocked_data(my, buffer, size, rcode);
845
846 fr_assert(item->reply_ctx == packet_ctx);
847 fr_assert(item->reply == buffer);
848 fr_assert(item->reply_size == size);
849
850 return fr_bio_dedup_blocked(my, item, rcode);
851}
852
853static ssize_t fr_bio_dedup_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
854{
855 ssize_t rcode;
857 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
858 fr_bio_t *next;
859
860 /*
861 * There must be a next bio.
862 */
863 next = fr_bio_next(&my->bio);
864 fr_assert(next != NULL);
865
866 /*
867 * Read the packet. If error or nothing, return immediately.
868 */
869 rcode = next->read(next, packet_ctx, buffer, size);
870 if (rcode <= 0) return rcode;
871
872 /*
873 * Get a free item
874 */
875 item = fr_bio_dedup_list_pop_head(&my->free);
876 fr_assert(item != NULL);
877
878 fr_assert(item->my == my);
880 .my = my,
881 .packet_ctx = packet_ctx,
882 .packet = buffer,
883 .packet_size = (size_t) rcode,
885 };
886
887 /*
888 * See if we want to receive this packet. If this isn't
889 * something we need to receive, then we just discard it.
890 *
891 * The "receive" function is responsible for looking in a local dedup tree to see if there's a
892 * cached reply. It's also responsible for calling the fr_bio_retry_respond() function to send
893 * a duplicate reply, and then return "don't receive" this packet.
894 *
895 * The application can alos call fr_bio_dedup_entry_extend() in order to extend the lifetime of a
896 * packet which has a cached response.
897 *
898 * If there's an active packet, then the receive() function should do whatever it needs to do in
899 * order to update the application for a duplicate packet. And then return "don't receive" for
900 * this packet.
901 *
902 * If we're NOT going to process this packet, then the item we just popped needs to get inserted
903 * back into the free list.
904 *
905 * The caller should cancel any conflicting packets by calling fr_bio_dedup_entry_cancel(). Note
906 * that for sanity, we don't re-use the previous #fr_bio_dedup_entry_t.
907 */
908 if (!my->receive(bio, item, packet_ctx)) {
910 fr_bio_dedup_list_insert_head(&my->free, item);
911 return 0;
912 }
913
914 fr_bio_dedup_list_insert_tail(&my->active, item);
915
916 return rcode;
917}
918
919static int8_t _entry_cmp(void const *one, void const *two)
920{
921 fr_bio_dedup_entry_t const *a = one;
922 fr_bio_dedup_entry_t const *b = two;
923
924 fr_assert(a->packet);
925 fr_assert(b->packet);
926
927 return fr_time_cmp(a->expires, b->expires);
928}
929
930/** Cancel one item.
931 *
932 * @param bio the binary IO handler
933 * @param item the dedup context from #fr_bio_dedup_respond_t
934 */
943
944/** Extend the expiry time for an entry
945 *
946 * @param bio the binary IO handler
947 * @param item the dedup context from #fr_bio_dedup_respond_t
948 * @param expires the new expiry time
949 * @return
950 * - <0 error
951 * - 0 success
952 */
954{
955 fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
956
957 switch (item->state) {
959 return 0;
960
962 break;
963
964 /*
965 * Partially written or pending replies aren't in the expirty tree. We can just change their
966 * expiry time and be done.
967 */
970 item->expires = expires;
971 return 0;
972
975 fr_assert(0);
976 return fr_bio_error(GENERIC);
977 }
978
979 /*
980 * Shortening the lifetime is OK. If the caller does something dumb like set expiry to a time in
981 * the past, well... that's their problem.
982 */
983 fr_assert(fr_time_lteq(expires, fr_time()));
984
985 /*
986 * Change places in the tree.
987 */
988 item->expires = expires;
989 (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
990 (void) fr_rb_insert(&my->rb, item);
991
992 /*
993 * If we're not changing the first item, we don't need to change the timers.
994 *
995 * Otherwise we clear the "first" flag, so that the reset timer function will change the timer
996 * value.
997 */
998 if (my->first != item) return 0;
999
1000 my->first = NULL;
1001
1003}
1004
1005
1006/** Remove the dedup cache
1007 *
1008 */
1010{
1013
1014 talloc_const_free(my->ev);
1015
1016 /*
1017 * Cancel all outgoing packets. Don't bother updating the tree or the free list, as all of the
1018 * entries will be deleted when the memory is freed.
1019 */
1020 while ((item = fr_rb_iter_init_inorder(&iter, &my->rb)) != NULL) {
1022 my->release((fr_bio_t *) my, item, FR_BIO_DEDUP_CANCELLED);
1023 }
1024
1025#ifndef NDEBUG
1026 my->ev = NULL;
1027 my->first = NULL;
1028#endif
1029
1030 return 0;
1031}
1032
1033/** Allocate a #fr_bio_dedup_t
1034 *
1035 */
1036fr_bio_t *fr_bio_dedup_alloc(TALLOC_CTX *ctx, size_t max_saved,
1037 fr_bio_dedup_receive_t receive,
1038 fr_bio_dedup_release_t release,
1039 fr_bio_dedup_get_item_t get_item,
1040 fr_bio_dedup_config_t const *cfg,
1041 fr_bio_t *next)
1042{
1043 size_t i;
1045 fr_bio_dedup_entry_t *items;
1046
1047 fr_assert(cfg->el);
1048
1049 /*
1050 * Limit to reasonable values.
1051 */
1052 if (!max_saved) return NULL;
1053 if (max_saved > 65536) return NULL;
1054
1055 my = talloc_zero(ctx, fr_bio_dedup_t);
1056 if (!my) return NULL;
1057
1058 /*
1059 * Allocate everything up front, to get better locality of reference, less memory fragmentation,
1060 * and better reuse of data structures.
1061 */
1062 items = talloc_array(my, fr_bio_dedup_entry_t, max_saved);
1063 if (!items) return NULL;
1064
1065 /*
1066 * Insert the entries into the free list in order.
1067 */
1068 fr_bio_dedup_list_init(&my->free);
1069
1070 for (i = 0; i < max_saved; i++) {
1071 items[i].my = my;
1072 items[i].state = FR_BIO_DEDUP_STATE_FREE;
1073 fr_bio_dedup_list_insert_tail(&my->free, &items[i]);
1074 }
1075
1076 fr_bio_dedup_list_init(&my->active);
1077 fr_bio_dedup_list_init(&my->pending);
1078
1079 (void) fr_rb_inline_init(&my->rb, fr_bio_dedup_entry_t, node, _entry_cmp, NULL);
1080
1081 my->receive = receive;
1082 my->release = release;
1083 my->get_item = get_item;
1084
1085 my->el = cfg->el;
1086 my->config = *cfg;
1087
1088 my->bio.write = fr_bio_dedup_write;
1089 my->bio.read = fr_bio_dedup_read;
1090
1091 fr_bio_chain(&my->bio, next);
1092
1093 talloc_set_destructor(my, fr_bio_dedup_destructor);
1094
1095 return (fr_bio_t *) my;
1096}
static int const char char buffer[256]
Definition acutest.h:576
fr_bio_write_t _CONST write
write to the underlying bio
Definition base.h:116
fr_bio_read_t _CONST read
read from the underlying bio
Definition base.h:115
static fr_bio_t * fr_bio_next(fr_bio_t *bio)
Definition base.h:130
#define fr_bio_error(_x)
Definition base.h:192
static void fr_bio_chain(fr_bio_t *first, fr_bio_t *second)
Chain one bio after another.
Definition bio_priv.h:69
int fr_bio_buf_alloc(TALLOC_CTX *ctx, fr_bio_buf_t *bio_buf, size_t size)
Definition buf.c:114
ssize_t fr_bio_buf_write(fr_bio_buf_t *bio_buf, const void *buffer, size_t size)
Definition buf.c:81
static size_t fr_bio_buf_used(fr_bio_buf_t const *bio_buf)
Definition buf.h:73
static void fr_bio_buf_reset(fr_bio_buf_t *bio_buf)
Definition buf.h:61
static size_t fr_bio_buf_size(fr_bio_buf_t const *bio_buf)
Definition buf.h:151
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition build.h:322
#define UNUSED
Definition build.h:315
ssize_t fr_bio_dedup_respond(fr_bio_t *bio, fr_bio_dedup_entry_t *item)
Resend a reply when we receive a duplicate request.
Definition dedup.c:214
fr_bio_dedup_release_t release
called to release a packet
Definition dedup.c:151
static void fr_bio_dedup_timer_reset_item(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item)
Definition dedup.c:168
static int fr_bio_dedup_buffer_save(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
Save partially written data to our local buffer.
Definition dedup.c:510
int fr_bio_dedup_entry_extend(fr_bio_t *bio, fr_bio_dedup_entry_t *item, fr_time_t expires)
Extend the expiry time for an entry.
Definition dedup.c:953
fr_bio_dedup_config_t config
Definition dedup.c:129
fr_bio_dedup_get_item_t get_item
turn a packet_ctx into a fr_bio_dedup_entry_t
Definition dedup.c:153
static ssize_t fr_bio_dedup_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
There's a partial packet written.
Definition dedup.c:571
void fr_bio_dedup_entry_cancel(fr_bio_t *bio, fr_bio_dedup_entry_t *item)
Cancel one item.
Definition dedup.c:935
static ssize_t fr_bio_dedup_write_data(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
There's a partial block of data written.
Definition dedup.c:705
fr_bio_dedup_entry_t * partial
Definition dedup.c:148
fr_event_timer_t const * ev
Definition dedup.c:131
fr_event_list_t * el
Definition dedup.c:125
fr_bio_t * fr_bio_dedup_alloc(TALLOC_CTX *ctx, size_t max_saved, fr_bio_dedup_receive_t receive, fr_bio_dedup_release_t release, fr_bio_dedup_get_item_t get_item, fr_bio_dedup_config_t const *cfg, fr_bio_t *next)
Allocate a fr_bio_dedup_t.
Definition dedup.c:1036
static int fr_bio_dedup_destructor(fr_bio_dedup_t *my)
Remove the dedup cache.
Definition dedup.c:1009
static void fr_bio_dedup_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx)
Expire an entry when its timer fires.
Definition dedup.c:764
static ssize_t fr_bio_dedup_flush_pending(fr_bio_dedup_t *my)
Flush any packets in the pending queue.
Definition dedup.c:433
static ssize_t fr_bio_dedup_blocked(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item, ssize_t rcode)
The write is blocked.
Definition dedup.c:652
fr_rb_tree_t rb
expire list
Definition dedup.c:127
static void fr_bio_dedup_replied(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item)
Move an item from active to replied.
Definition dedup.c:180
fr_bio_dedup_receive_t receive
called when we receive a potentially new packet
Definition dedup.c:150
static ssize_t fr_bio_dedup_blocked_data(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
The write is blocked, but we don't have "item".
Definition dedup.c:740
static int8_t _entry_cmp(void const *one, void const *two)
Definition dedup.c:919
static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write raw data to the bio.
Definition dedup.c:800
fr_bio_dedup_entry_t * first
Definition dedup.c:140
static ssize_t fr_bio_dedup_buffer_write(fr_bio_dedup_t *my)
Write data from our local buffer to the next bio.
Definition dedup.c:531
fr_bio_buf_t buffer
Definition dedup.c:155
struct fr_bio_dedup_list_s fr_bio_dedup_list_t
Definition dedup.c:73
fr_bio_dedup_state_t
Definition dedup.c:87
@ FR_BIO_DEDUP_STATE_PARTIAL
Partially written.
Definition dedup.c:92
@ FR_BIO_DEDUP_STATE_FREE
Definition dedup.c:88
@ FR_BIO_DEDUP_STATE_ACTIVE
Received, but not replied.
Definition dedup.c:89
@ FR_BIO_DEDUP_STATE_CANCELLED
Partially written, and then cancelled.
Definition dedup.c:93
@ FR_BIO_DEDUP_STATE_PENDING
Have a reply, but we're trying to write it out.
Definition dedup.c:90
@ FR_BIO_DEDUP_STATE_REPLIED
Replied, and waiting for it to expire.
Definition dedup.c:91
static int fr_bio_dedup_timer_reset(fr_bio_dedup_t *my)
Reset the timer after changing the rb tree.
Definition dedup.c:333
static ssize_t fr_bio_dedup_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
Definition dedup.c:853
static void fr_bio_dedup_release(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item, fr_bio_dedup_release_reason_t reason)
Release an entry back to the free list.
Definition dedup.c:376
void * uctx
user-writable context
Definition dedup.c:97
void(* fr_bio_dedup_release_t)(fr_bio_t *bio, fr_bio_dedup_entry_t *dedup_ctx, fr_bio_dedup_release_reason_t reason)
Callback on release the packet (timeout, or cancelled by the application)
Definition dedup.h:91
fr_event_list_t * el
event list
Definition dedup.h:37
fr_bio_dedup_t * my
so we can get to it from the event timer callback
Definition dedup.c:114
fr_time_t expires
when this entry expires
Definition dedup.c:116
size_t packet_size
size of the cached packet data
Definition dedup.c:100
fr_bio_dedup_state_t state
which tree or list this item is in
Definition dedup.c:117
struct fr_bio_dedup_entry_s fr_bio_dedup_entry_t
Definition dedup.h:42
void * packet_ctx
packet_ctx for dedup purposes
Definition dedup.c:98
uint8_t * reply
reply cached by the application
Definition dedup.c:102
size_t reply_size
size of the cached reply
Definition dedup.c:103
void * reply_ctx
reply ctx
Definition dedup.c:101
uint8_t * packet
cached packet data for finding duplicates
Definition dedup.c:99
fr_rb_node_t dedup
user managed dedup node
Definition dedup.c:105
bool(* fr_bio_dedup_receive_t)(fr_bio_t *bio, fr_bio_dedup_entry_t *dedup_ctx, void *packet_ctx)
Callback on read to see if we should receive the packet.
Definition dedup.h:78
fr_bio_dedup_entry_t *(* fr_bio_dedup_get_item_t)(fr_bio_t *bio, void *packet_ctx)
Definition dedup.h:93
fr_bio_dedup_release_reason_t
Definition dedup.h:58
@ FR_BIO_DEDUP_WRITE_ERROR
Definition dedup.h:61
@ FR_BIO_DEDUP_CANCELLED
Definition dedup.h:60
@ FR_BIO_DEDUP_EXPIRED
Definition dedup.h:59
Definition dedup.c:96
#define FR_DLIST_TYPES(_name)
Define type specific wrapper structs for dlists.
Definition dlist.h:1129
#define FR_DLIST_ENTRY(_name)
Expands to the type name used for the entry wrapper structure.
Definition dlist.h:1115
#define FR_DLIST_FUNCS(_name, _element_type, _element_entry)
Define type specific wrapper functions for dlists.
Definition dlist.h:1152
#define FR_DLIST_HEAD(_name)
Expands to the type name used for the head wrapper structure.
Definition dlist.h:1122
#define fr_event_timer_at(...)
Definition event.h:250
fr_bio_shutdown & my
Definition fd_errno.h:59
free(array)
Stores all information relating to an event list.
Definition event.c:411
A timer event.
Definition event.c:102
static void * item(fr_lst_t const *lst, fr_lst_index_t idx)
Definition lst.c:122
long int ssize_t
unsigned char uint8_t
unsigned long int size_t
static size_t used
#define fr_assert(_expr)
Definition rad_assert.h:38
void * fr_rb_iter_init_inorder(fr_rb_iter_inorder_t *iter, fr_rb_tree_t *tree)
Initialise an in-order iterator.
Definition rb.c:824
void fr_rb_iter_delete_inorder(fr_rb_iter_inorder_t *iter)
Remove the current node from the tree.
Definition rb.c:898
void * fr_rb_remove_by_inline_node(fr_rb_tree_t *tree, fr_rb_node_t *node)
Remove an entry from the tree, using the node structure, without freeing the data.
Definition rb.c:722
void * fr_rb_first(fr_rb_tree_t *tree)
Definition rb.c:786
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
#define fr_rb_inline_init(_tree, _type, _field, _data_cmp, _data_free)
Initialises a red black tree.
Definition rb.h:180
Iterator structure for in-order traversal of an rbtree.
Definition rb.h:321
The main red black tree structure.
Definition rb.h:73
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition talloc.h:224
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition time.h:575
static fr_time_t fr_time_add_time_delta(fr_time_t a, fr_time_delta_t b)
Definition time.h:173
#define fr_time_lteq(_a, _b)
Definition time.h:240
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_gt(_a, _b)
Definition time.h:237
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition time.h:916
"server local" time.
Definition time.h:69
static fr_event_list_t * el