The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
dedup.c
Go to the documentation of this file.
1 /*
2  * This program is is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or (at
5  * your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: ab4555a4d2ce7461cc77a460269fd40343d29961 $
19  * @file lib/bio/dedup.c
20  * @brief Binary IO abstractions for deduping packets.
21  *
22  * @copyright 2024 Network RADIUS SAS (legal@networkradius.com)
23  */
24 
25 #include <freeradius-devel/bio/bio_priv.h>
26 #include <freeradius-devel/bio/null.h>
27 #include <freeradius-devel/bio/buf.h>
28 #include <freeradius-devel/util/rb.h>
29 #include <freeradius-devel/util/dlist.h>
30 
31 #define _BIO_DEDUP_PRIVATE
32 #include <freeradius-devel/bio/dedup.h>
33 
34 typedef struct fr_bio_dedup_list_s fr_bio_dedup_list_t;
35 typedef struct fr_bio_dedup_s fr_bio_dedup_t;
36 
37 /*
38  * There is substanstial similarity between this code and the
39  * "retry" bio. Any fixes which are done here should be checked
40  * there, and vice versa.
41  */
42 
43 /*
44  * Define type-safe wrappers for head and entry definitions.
45  */
46 FR_DLIST_TYPES(fr_bio_dedup_list)
47 
48 typedef enum {
50  FR_BIO_DEDUP_STATE_ACTIVE, //!< Received, but not replied.
51  FR_BIO_DEDUP_STATE_PENDING, //!< Have a reply, but we're trying to write it out.
52  FR_BIO_DEDUP_STATE_REPLIED, //!< Replied, and waiting for it to expire.
53  FR_BIO_DEDUP_STATE_PARTIAL, //!< Partially written
54  FR_BIO_DEDUP_STATE_CANCELLED, //!< Partially written, and then cancelled.
56 
58  void *uctx;
59  void *packet_ctx;
60  uint8_t *packet; //!< cached packet data for finding duplicates
61  size_t packet_size; //!< size of the cached packet data
62  void *reply_ctx; //!< reply ctx
63  uint8_t *reply; //!< reply cached by the application
64  size_t reply_size; //!< size of the cached reply
65 
66  union {
67  struct {
68  fr_rb_node_t node; //!< for the expiry timers
69  };
70  FR_DLIST_ENTRY(fr_bio_dedup_list) entry; //!< for the free list
71  };
72 
73  fr_bio_dedup_t *my; //!< so we can get to it from the event timer callback
74 
75  fr_time_t expires; //!< when this entry expires
76  fr_bio_dedup_state_t state; //!< which tree or list this item is in
77 };
78 
79 FR_DLIST_FUNCS(fr_bio_dedup_list, fr_bio_dedup_entry_t, entry)
80 
83 
85 
86  fr_rb_tree_t rb; //!< expire list
87 
89 
91 
92  /*
93  * The "first" entry is cached here so that we can detect when it changes. The insert / delete
94  * code can just do its work without worrying about timers. And then when the tree manipulation
95  * is done, call the fr_bio_dedup_reset_timer() function to reset (or not) the timer.
96  *
97  * The timer is set for is when the first packet expires.
98  */
100 
101  /*
102  * Cache a partial write when IO is blocked.
103  *
104  * When the IO is blocked, we can still expire old entries, unlike the "retry" BIOs. This is
105  * because we're not resending packets, we're just cleaning up *sent* packets when they expire.
106  */
108 
109  fr_bio_dedup_receive_t receive; //!< called when we receive a potentially new packet
110  fr_bio_dedup_release_t release; //!< called to release a packet
111 
112  fr_bio_dedup_get_item_t get_item; //!< turn a packet_ctx into a #fr_bio_dedup_entry_t
113 
115 
116  FR_DLIST_HEAD(fr_bio_dedup_list) active; //!< received but not yet replied
117  FR_DLIST_HEAD(fr_bio_dedup_list) pending; //!< trying to write when the socket is blocked.
118  FR_DLIST_HEAD(fr_bio_dedup_list) free; //!< free list
119 };
120 
121 static void fr_bio_dedup_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx);
122 static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
126 
128 {
129  if (my->first != item) return;
130 
131  my->first = NULL;
132  (void) fr_bio_dedup_reset_timer(my);
133 }
134 
135 /** Move an item from active to replied.
136  *
137  * Note that we don't update any timers. The caller is responsible for that.
138  */
140 {
141  if (item->state == FR_BIO_DEDUP_STATE_REPLIED) return;
142 
144 
145  (void) fr_bio_dedup_list_remove(&my->active, item);
146 
147  /*
148  * Now that we have a reply, set the default expiry time. The caller can always call
149  * fr_bio_dedup_entry_extend() to change the expiry time.
150  */
152 
153  /*
154  * This should never fail.
155  */
156  (void) fr_rb_insert(&my->rb, item);
158 }
159 
160 /** Resend a reply when we receive a duplicate request.
161  *
162  * This function should be called by the respond() callback to re-send a duplicate reply.
163  *
164  * It can also be called by the application when it first has a response to the request.
165  *
166  * @param bio the binary IO handler
167  * @param item the dedup context from #fr_bio_dedup_sent_t
168  * @return
169  * - <0 on error
170  * - 0 for "wrote no data"
171  * - >0 for "wrote data".
172  */
174 {
175  ssize_t rcode;
176  fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
177  fr_bio_t *next;
178 
179  if (!item->reply || !item->reply_size) return 0;
180 
181  switch (item->state) {
182  /*
183  * Send a first reply if we can.
184  */
186  /*
187  * If we're not writing to the socket, just insert the packet into the pending list.
188  */
189  if (my->bio.write != fr_bio_dedup_write) {
190  (void) fr_bio_dedup_list_remove(&my->active, item);
191  fr_bio_dedup_list_insert_tail(&my->pending, item);
192 
195  return item->reply_size;
196  }
197 
198  /*
199  * The socket is writable, go do that.
200  */
201  break;
202 
203  /*
204  * Send a duplicate reply.
205  */
207  if (my->bio.write == fr_bio_dedup_write) break;
208 
209  /*
210  * The socket is blocked. Save the packet in the pending queue.
211  */
212  move_to_pending:
213  fr_rb_remove_by_inline_node(&my->rb, &item->node);
214 
215  save_in_pending:
216  /*
217  * We could update the timer for pending packets. However, that's more complicated.
218  *
219  * The packets will be expire when the pending queue is flushed, OR when the application
220  * cancels the pending packet.
221  */
223 
224  fr_bio_dedup_list_insert_tail(&my->pending, item);
226  return item->reply_size;
227 
228  /*
229  * We're already trying to write this entry, don't do anything else.
230  */
232  fr_assert(my->partial != NULL);
233  FALL_THROUGH;
234 
236  return fr_bio_error(IO_WOULD_BLOCK);
237 
240  fr_assert(0);
241  return fr_bio_error(GENERIC);
242  }
243 
244  /*
245  * There must be a next bio.
246  */
247  next = fr_bio_next(&my->bio);
248  fr_assert(next != NULL);
249 
250  /*
251  * Write out the packet, if everything is OK, return.
252  */
253  rcode = next->write(next, item->reply_ctx, item->reply, item->reply_size);
254  if ((size_t) rcode == item->reply_size) {
256  return rcode;
257  }
258 
259  /*
260  * Can't write anything, be sad.
261  */
262  if ((rcode == 0) || (rcode == fr_bio_error(IO_WOULD_BLOCK))) {
263  if (item->state == FR_BIO_DEDUP_STATE_ACTIVE) {
264  (void) fr_bio_dedup_list_remove(&my->active, item);
265  goto save_in_pending;
266  }
267 
269  goto move_to_pending;
270  }
271 
272  /*
273  * There's an error writing the packet. Release it, and move the item to the free list.
274  *
275  * Note that we don't bother resetting the timer. There's no point in changing the timer when
276  * the bio is likely dead.
277  */
278  if (rcode < 0) {
280  return rcode;
281  }
282 
283  /*
284  * We are writing item->reply, and that's blocked. Save the partial packet for later.
285  */
286  return fr_bio_dedup_blocked(my, item, rcode);
287 }
288 
289 /** Reset the timer after changing the rb tree.
290  *
291  */
293 {
294  fr_bio_dedup_entry_t *first;
295 
296  /*
297  * Nothing to do, don't set any timers.
298  */
299  first = fr_rb_first(&my->rb);
300  if (!first) {
301  talloc_const_free(my->ev);
302  my->ev = NULL;
303  my->first = NULL;
304  return 0;
305  }
306 
307  /*
308  * We don't care about partially written packets. The timer remains set even when we have a
309  * partial outgoing packet, because we can expire entries which aren't partially written.
310  *
311  * However, the partially written packet MUST NOT be in the expiry tree.
312  *
313  * We also don't care about the pending list. The application will either cancel the item, or
314  * the socket will become writable, and the item will get handled then.
315  */
316  fr_assert(first != my->partial);
317 
318  /*
319  * The timer is already set correctly, we're done.
320  */
321  if (first == my->first) return 0;
322 
323  /*
324  * Update the timer. This should never fail.
325  */
326  if (fr_event_timer_at(my, my->el, &my->ev, first->expires, fr_bio_dedup_timer, my) < 0) return -1;
327 
328  my->first = first;
329  return 0;
330 }
331 
332 /** Release an entry back to the free list.
333  *
334  */
336 {
337  my->release((fr_bio_t *) my, item, reason);
338 
339  switch (item->state) {
340  /*
341  * Cancel an active item, just nuke it.
342  */
344  fr_bio_dedup_list_remove(&my->active, item);
345  break;
346 
347  /*
348  * We already replied, remove it from the expiry tree.
349  *
350  * We only update the timer if the caller isn't already expiring the entry.
351  */
353  (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
354 
356  break;
357 
358  /*
359  * It was pending another write, so we just discard the write.
360  */
362  fr_bio_dedup_list_remove(&my->active, item);
363  break;
364 
365  /*
366  * Don't free it. Just set its state to cancelled.
367  */
369  fr_assert(my->partial == item);
371  return;
372 
375  fr_assert(0);
376  return;
377  }
378 
379 #ifndef NDEBUG
380  item->packet = NULL;
381 #endif
382  item->uctx = NULL;
383  item->packet_ctx = NULL;
384 
385  fr_assert(my->first != item);
386  fr_bio_dedup_list_insert_head(&my->free, item);
387 }
388 
389 /** Flush any packets in the pending queue.
390  *
391  */
393 {
394  ssize_t rcode, out_rcode;
396  fr_bio_t *next;
397  fr_time_t now;
398 
399  /*
400  * We can only flush the pending list when any previous partial packet has been written.
401  */
402  fr_assert(!my->partial);
403 
404  /*
405  * Nothing in the list, we're done.
406  */
407  if (fr_bio_dedup_list_num_elements(&my->pending) == 0) return 0;
408 
409  /*
410  * There must be a next bio.
411  */
412  next = fr_bio_next(&my->bio);
413  fr_assert(next != NULL);
414 
415  now = fr_time();
416  out_rcode = 0;
417 
418  /*
419  * Write out any pending packets.
420  */
421  while ((item = fr_bio_dedup_list_pop_head(&my->pending)) != NULL) {
423 
424  /*
425  * It's already expired, don't bother replying.
426  */
427  if (fr_time_lteq(item->expires, now)) {
429  continue;
430  }
431 
432  /*
433  * Write the entry to the next bio.
434  */
435  rcode = next->write(next, item->reply_ctx, item->reply, item->reply_size);
436  if (rcode <= 0) return rcode; /* @todo - update timer if we've written one packet */
437 
438  /*
439  * We've written the entire packet, move it to the expiry list.
440  */
441  if ((size_t) rcode == item->reply_size) {
442  (void) fr_bio_dedup_list_remove(&my->pending, item);
443  (void) fr_rb_insert(&my->rb, item);
445  continue;
446  }
447 
448  fr_bio_dedup_blocked(my, item, rcode);
449 
450  out_rcode = fr_bio_error(IO_WOULD_BLOCK);
451  break;
452  }
453 
454  /*
455  * We may need to update the timer if we've removed the first entry from the tree, or added a new
456  * first entry.
457  */
458  if (!my->first || (my->first != fr_rb_first(&my->rb))) {
459  my->first = NULL;
460  (void) fr_bio_dedup_reset_timer(my);
461  }
462 
463  return out_rcode;
464 }
465 
466 /** Save partially written data to our local buffer.
467  *
468  */
469 static int fr_bio_dedup_buffer_save(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
470 {
471  /*
472  * (re)-alloc the buffer for partial writes.
473  */
474  if (!my->buffer.start ||
475  (size > fr_bio_buf_size(&my->buffer))) {
476  if (fr_bio_buf_alloc(my, &my->buffer, size) < 0) return -1;
477  }
478 
479  fr_assert(fr_bio_buf_used(&my->buffer) == 0);
480  fr_assert(my->buffer.read == my->buffer.start);
481 
482  fr_bio_buf_write(&my->buffer, buffer + rcode, size - rcode);
483 
484  return 0;
485 }
486 
487 /** Write data from our local buffer to the next bio.
488  *
489  */
491 {
492  size_t used;
493  ssize_t rcode;
494  fr_bio_t *next;
495 
496  fr_assert(my->buffer.start);
497 
498  used = fr_bio_buf_used(&my->buffer);
499  fr_assert(used > 0);
500 
501  /*
502  * There must be a next bio.
503  */
504  next = fr_bio_next(&my->bio);
505  fr_assert(next != NULL);
506 
507  rcode = next->write(next, NULL, my->buffer.read, used);
508  if (rcode <= 0) return rcode;
509 
510  my->buffer.read += rcode;
511 
512  /*
513  * Still data in the buffer. We can't send more packets until we finish writing this one.
514  */
515  if (fr_bio_buf_used(&my->buffer) > 0) return 0;
516 
517  /*
518  * We're done. Reset the buffer and clean up our cached partial packet.
519  */
520  fr_bio_buf_reset(&my->buffer);
521 
522  return rcode;
523 }
524 
525 /** There's a partial packet written. Write all of that one first, before writing another packet.
526  *
527  * The packet can either be cancelled, or IO blocked. In either case, we must write this packet before
528  * we can write another one.
529  */
530 static ssize_t fr_bio_dedup_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
531 {
532  ssize_t rcode;
533  fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
535 
536  fr_assert(my->partial != NULL);
537  fr_assert(my->buffer.start);
538 
540  (item->state == FR_BIO_DEDUP_STATE_CANCELLED));
541 
542  rcode = fr_bio_dedup_buffer_write(my);
543  if (rcode <= 0) return rcode;
544 
545  my->partial = NULL;
546 
547  /*
548  * Partial writes are removed from the expiry tree until they're fully written. When they're
549  * written, either add it back to the tree if it's still operational, or add it to the free list
550  * if it has been cancelled.
551  */
552  if (item->state == FR_BIO_DEDUP_STATE_PARTIAL) {
553 
554  /*
555  * See if we have to clean up this entry. If so, do it now. That avoids another bounce
556  * through the event loop.
557  */
558  if (fr_time_lteq(item->expires, fr_time())) {
560 
561  } else {
562  /*
563  * We've changed the tree, so update the timer. fr_bio_dedup_write() only
564  * updates the timer on successful write.
565  */
567  (void) fr_rb_insert(&my->rb, item);
568  }
569  (void) fr_bio_dedup_reset_timer(my);
570 
571  } else {
572  /*
573  * The item was cancelled, add it to the free list.
574  */
575 #ifndef NDEBUG
576  item->packet = NULL;
577 #endif
578  item->uctx = NULL;
579  item->packet_ctx = NULL;
580 
581  item->state = FR_BIO_DEDUP_STATE_FREE;
582  fr_bio_dedup_list_insert_head(&my->free, item);
583  }
584 
585  /*
586  * Flush any packets which were pending during the blocking period.
587  */
588  rcode = fr_bio_dedup_flush_pending(my);
589  if (rcode < 0) return rcode;
590 
591  /*
592  * Unlike the retry BIO, we don't retry writes for items in the RB tree. Those packets have already
593  * been written.
594  */
595 
596  /*
597  * Try to write the packet which we were given.
598  */
599  my->bio.write = fr_bio_dedup_write;
600  return fr_bio_dedup_write(bio, packet_ctx, buffer, size);
601 }
602 
603 /** The write is blocked.
604  *
605  * We couldn't write out the entire packet, the bio is blocked. Don't write anything else until we become
606  * unblocked!
607  *
608  * Do NOT free the timer. We can still expire old entries. This newly written entry usually ends up as the
609  * _last_ item in the RB tree.
610  */
612 {
613  fr_assert(!my->partial);
614  fr_assert(rcode > 0);
615  fr_assert((size_t) rcode < item->reply_size);
616 
617  if (fr_bio_dedup_buffer_save(my, item->reply, item->reply_size, rcode) < 0) return fr_bio_error(GENERIC);
618 
619  switch (item->state) {
621  (void) fr_bio_dedup_list_remove(&my->active, item);
622  break;
623 
624  /*
625  * We cannot expire this entry, so remove it from the expiration tree. That step lets us
626  * expire other entries.
627  */
629  (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
631  break;
632 
633  /*
634  * We tried to write a pending packet and got blocked.
635  */
637  fr_assert(fr_bio_dedup_list_head(&my->pending) == item);
638  (void) fr_bio_dedup_list_remove(&my->pending, item);
639  break;
640 
641  /*
642  * None of these states should be possible.
643  */
647  fr_assert(0);
648  return fr_bio_error(GENERIC);
649  }
650 
651  my->partial = item;
653 
654  /*
655  * Reset the write routine, so that if the application tries any more writes, the partial entry
656  * gets written first.
657  */
658  my->bio.write = fr_bio_dedup_write_partial;
659  return rcode;
660 }
661 
662 /** There's a partial block of data written. Write all of that data first, before writing another packet.
663  */
664 static ssize_t fr_bio_dedup_write_data(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
665 {
666  ssize_t rcode;
667  fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
668 
669  fr_assert(!my->partial);
670 
671  /*
672  * Flush out any partly written data.
673  */
674  rcode = fr_bio_dedup_buffer_write(my);
675  if (rcode <= 0) return rcode;
676 
677  /*
678  * Flush any packets which were pending during the blocking period.
679  */
680  rcode = fr_bio_dedup_flush_pending(my);
681  if (rcode < 0) return rcode;
682 
683  /*
684  * Try to write the packet which we were given.
685  */
686  my->bio.write = fr_bio_dedup_write;
687  return fr_bio_dedup_write(bio, packet_ctx, buffer, size);
688 }
689 
690 
691 /** The write is blocked, but we don't have "item".
692  *
693  * We couldn't write out the entire packet, the bio is blocked. Don't write anything else until we become
694  * unblocked!
695  *
696  * Do NOT free the timer. We can still expire old entries. This newly written entry usually ends up as the
697  * _last_ item in the RB tree.
698  */
699 static ssize_t fr_bio_dedup_blocked_data(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
700 {
701  fr_assert(!my->partial);
702  fr_assert(rcode > 0);
703  fr_assert((size_t) rcode < size);
704 
705  if (fr_bio_dedup_buffer_save(my, buffer, size, rcode) < 0) return fr_bio_error(GENERIC);
706 
707  /*
708  * Reset the write routine, so that if the application tries any more writes, the data
709  * gets written first.
710  */
711  my->bio.write = fr_bio_dedup_write_data;
712  return rcode;
713 }
714 
715 /*
716  * There is no fr_bio_dedup_rewrite(), packets are never re-written by this bio.
717  */
718 
719 /** Expire an entry when its timer fires.
720  *
721  * @todo - expire items from the pending list, too
722  */
723 static void fr_bio_dedup_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx)
724 {
725  fr_bio_dedup_t *my = talloc_get_type_abort(uctx, fr_bio_dedup_t);
727  fr_time_t expires;
728 
729  fr_assert(my->first != NULL);
730  fr_assert(fr_rb_first(&my->rb) == my->first);
731 
732  my->first = NULL;
733 
734  /*
735  * Expire all entries which are within 10ms of "now". That way we don't reset the event many
736  * times in short succession.
737  *
738  * @todo - also expire entries on the pending list?
739  */
740  expires = fr_time_add(now, fr_time_delta_from_msec(10));
741 
742  while ((item = fr_rb_first(&my->rb)) != NULL) {
743  if (fr_time_gt(item->expires, expires)) break;
744 
746  }
747 
748  (void) fr_bio_dedup_reset_timer(my);
749 }
750 
751 /** Write raw data to the bio.
752  *
753  * This function is largely a duplicate of fr_bio_dedup_respond(). Except due to the BIO API, it can be
754  * passed a NULL buffer (for flushing the BIOs), and it can't be passed a #fr_bio_dedup_entry_t, and instead
755  * has to be passed a "void *packet_ctx".
756  *
757  * The caller is free to ignore this function,
758  */
759 static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
760 {
761  ssize_t rcode;
763  fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
764  fr_bio_t *next;
765 
766  fr_assert(!my->partial);
767 
768  /*
769  * There must be a next bio.
770  */
771  next = fr_bio_next(&my->bio);
772  fr_assert(next != NULL);
773 
774  /*
775  * The caller is trying to flush partial data. But we don't have any partial data, so just call
776  * the next bio to flush it.
777  */
778  if (!buffer) {
779  return next->write(next, packet_ctx, NULL, size);
780  }
781 
782  /*
783  * Write out the packet. If there's an error, OR we wrote nothing, return.
784  *
785  * Note that we don't mark the socket as blocked if the next bio didn't write anything. We want
786  * the caller to know that the write didn't succeed, and the caller takes care of managing the
787  * current packet. So there's no need for us to do that.
788  */
789  rcode = next->write(next, packet_ctx, buffer, size);
790  if (rcode <= 0) return rcode;
791 
792  /*
793  * We need the item pointer to mark this entry as blocked. If that doesn't exist, then we try
794  * really hard to write out the un-tracked data.
795  */
796  item = NULL;
797  if (my->get_item) item = my->get_item(bio, packet_ctx);
798  if ((size_t) rcode == size) {
799  if (item) fr_bio_dedup_replied(my, item);
800  return rcode;
801  }
802 
803  if (!item) return fr_bio_dedup_blocked_data(my, buffer, size, rcode);
804 
805  fr_assert(item->reply_ctx == packet_ctx);
806  fr_assert(item->reply == buffer);
807  fr_assert(item->reply_size == size);
808 
809  return fr_bio_dedup_blocked(my, item, rcode);
810 }
811 
812 static ssize_t fr_bio_dedup_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
813 {
814  ssize_t rcode;
816  fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
817  fr_bio_t *next;
818 
819  /*
820  * There must be a next bio.
821  */
822  next = fr_bio_next(&my->bio);
823  fr_assert(next != NULL);
824 
825  /*
826  * Read the packet. If error or nothing, return immediately.
827  */
828  rcode = next->read(next, packet_ctx, buffer, size);
829  if (rcode <= 0) return rcode;
830 
831  /*
832  * Get a free item
833  */
834  item = fr_bio_dedup_list_pop_head(&my->free);
835  fr_assert(item != NULL);
836 
837  fr_assert(item->my == my);
839  .my = my,
840  .packet_ctx = packet_ctx,
841  .packet = buffer,
842  .packet_size = (size_t) rcode,
843  .state = FR_BIO_DEDUP_STATE_ACTIVE,
844  };
845 
846  /*
847  * See if we want to respond to this packet. If this isn't something we respond to, then we just
848  * discard it.
849  *
850  * The "respond" function is responsible for looking in a local dedup tree to see if there's a
851  * cached reply. It's also responsible for calling the fr_bio_retry_respond() function to send
852  * any duplicate reply/
853  *
854  * If we're NOT going to reply to this packet, then the item we just popped needs to get inserted
855  * back into the free list.
856  *
857  * The caller should potentially cancel any conflicting packets via fr_bio_dedup_entry_cancel(),
858  * and potentially also write a duplicate reply via fr_bio_dedup_respond().
859  */
860  if (!my->receive(bio, item, packet_ctx, buffer, rcode)) {
861  item->state = FR_BIO_DEDUP_STATE_FREE;
862  fr_bio_dedup_list_insert_head(&my->free, item);
863  return 0;
864  }
865 
866  fr_bio_dedup_list_insert_tail(&my->active, item);
867 
868  return rcode;
869 }
870 
871 static int8_t _entry_cmp(void const *one, void const *two)
872 {
873  fr_bio_dedup_entry_t const *a = one;
874  fr_bio_dedup_entry_t const *b = two;
875 
876  fr_assert(a->packet);
877  fr_assert(b->packet);
878 
879  return fr_time_cmp(a->expires, b->expires);
880 }
881 
882 /** Cancel one item.
883  *
884  * @param bio the binary IO handler
885  * @param item the dedup context from #fr_bio_dedup_respond_t
886  */
888 {
889  fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
890 
892 
894 }
895 
896 /** Extend the expiry time for an entry
897  *
898  * @param bio the binary IO handler
899  * @param item the dedup context from #fr_bio_dedup_respond_t
900  * @param expires the new expiry time
901  * @return
902  * - <0 error
903  * - 0 success
904  */
906 {
907  fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
908 
909  switch (item->state) {
911  return 0;
912 
914  break;
915 
916  /*
917  * Partially written or pending replies aren't in the expirty tree. We can just change their
918  * expiry time and be done.
919  */
922  item->expires = expires;
923  return 0;
924 
927  fr_assert(0);
928  return fr_bio_error(GENERIC);
929  }
930 
931  /*
932  * Shortening the lifetime is OK. If the caller does something dumb like set expiry to a time in
933  * the past, well... that's their problem.
934  */
935  fr_assert(fr_time_lteq(expires, fr_time()));
936 
937  /*
938  * Change places in the tree.
939  */
940  item->expires = expires;
941  (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
942  (void) fr_rb_insert(&my->rb, item);
943 
944  /*
945  * If we're not changing the first item, we don't need to change the timers.
946  *
947  * Otherwise we clear the "first" flag, so that the reset timer function will change the timer
948  * value.
949  */
950  if (my->first != item) return 0;
951 
952  my->first = NULL;
953 
954  return fr_bio_dedup_reset_timer(my);
955 }
956 
957 
958 /** Remove the dedup cache
959  *
960  */
962 {
965 
966  talloc_const_free(my->ev);
967 
968  /*
969  * Cancel all outgoing packets. Don't bother updating the tree or the free list, as all of the
970  * entries will be deleted when the memory is freed.
971  */
972  while ((item = fr_rb_iter_init_inorder(&iter, &my->rb)) != NULL) {
974  }
975 
976 #ifndef NDEBUG
977  my->ev = NULL;
978  my->first = NULL;
979 #endif
980 
981  return 0;
982 }
983 
984 /** Allocate a #fr_bio_dedup_t
985  *
986  */
987 fr_bio_t *fr_bio_dedup_alloc(TALLOC_CTX *ctx, size_t max_saved,
988  fr_bio_dedup_receive_t receive,
989  fr_bio_dedup_release_t release,
990  fr_bio_dedup_get_item_t get_item,
991  fr_bio_dedup_config_t const *cfg,
992  fr_bio_t *next)
993 {
994  size_t i;
995  fr_bio_dedup_t *my;
996  fr_bio_dedup_entry_t *items;
997 
998  fr_assert(cfg->el);
999 
1000  /*
1001  * Limit to reasonable values.
1002  */
1003  if (!max_saved) return NULL;
1004  if (max_saved > 65536) return NULL;
1005 
1006  my = talloc_zero(ctx, fr_bio_dedup_t);
1007  if (!my) return NULL;
1008 
1009  /*
1010  * Allocate everything up front, to get better locality of reference, less memory fragmentation,
1011  * and better reuse of data structures.
1012  */
1013  items = talloc_array(my, fr_bio_dedup_entry_t, max_saved);
1014  if (!items) return NULL;
1015 
1016  /*
1017  * Insert the entries into the free list in order.
1018  */
1019  fr_bio_dedup_list_init(&my->free);
1020 
1021  for (i = 0; i < max_saved; i++) {
1022  items[i].my = my;
1023  items[i].state = FR_BIO_DEDUP_STATE_FREE;
1024  fr_bio_dedup_list_insert_tail(&my->free, &items[i]);
1025  }
1026 
1027  fr_bio_dedup_list_init(&my->active);
1028  fr_bio_dedup_list_init(&my->pending);
1029 
1030  (void) fr_rb_inline_init(&my->rb, fr_bio_dedup_entry_t, node, _entry_cmp, NULL);
1031 
1032  my->receive = receive;
1033  my->release = release;
1034  my->get_item = get_item;
1035 
1036  my->el = cfg->el;
1037  my->config = *cfg;
1038 
1039  my->bio.write = fr_bio_dedup_write;
1040  my->bio.read = fr_bio_dedup_read;
1041 
1042  fr_bio_chain(&my->bio, next);
1043 
1044  talloc_set_destructor(my, fr_bio_dedup_destructor);
1045 
1046  return (fr_bio_t *) my;
1047 }
static int const char char buffer[256]
Definition: acutest.h:574
fr_bio_write_t _CONST write
write to the underlying bio
Definition: base.h:107
fr_bio_read_t _CONST read
read from the underlying bio
Definition: base.h:106
static fr_bio_t * fr_bio_next(fr_bio_t *bio)
Definition: base.h:121
#define fr_bio_error(_x)
Definition: base.h:184
Definition: base.h:103
static void fr_bio_chain(fr_bio_t *first, fr_bio_t *second)
Chain one bio after another.
Definition: bio_priv.h:57
int fr_bio_buf_alloc(TALLOC_CTX *ctx, fr_bio_buf_t *bio_buf, size_t size)
Definition: buf.c:114
ssize_t fr_bio_buf_write(fr_bio_buf_t *bio_buf, const void *buffer, size_t size)
Definition: buf.c:81
uint8_t * start
start of the buffer
Definition: buf.h:30
static size_t fr_bio_buf_used(fr_bio_buf_t const *bio_buf)
Definition: buf.h:73
static void fr_bio_buf_reset(fr_bio_buf_t *bio_buf)
Definition: buf.h:61
uint8_t * read
where in the buffer reads are taken from
Definition: buf.h:33
static size_t fr_bio_buf_size(fr_bio_buf_t const *bio_buf)
Definition: buf.h:151
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition: build.h:320
#define UNUSED
Definition: build.h:313
ssize_t fr_bio_dedup_respond(fr_bio_t *bio, fr_bio_dedup_entry_t *item)
Resend a reply when we receive a duplicate request.
Definition: dedup.c:173
fr_bio_dedup_release_t release
called to release a packet
Definition: dedup.c:110
static int fr_bio_dedup_buffer_save(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
Save partially written data to our local buffer.
Definition: dedup.c:469
int fr_bio_dedup_entry_extend(fr_bio_t *bio, fr_bio_dedup_entry_t *item, fr_time_t expires)
Extend the expiry time for an entry.
Definition: dedup.c:905
fr_bio_dedup_config_t config
Definition: dedup.c:88
fr_bio_dedup_get_item_t get_item
turn a packet_ctx into a fr_bio_dedup_entry_t
Definition: dedup.c:112
static ssize_t fr_bio_dedup_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
There's a partial packet written.
Definition: dedup.c:530
void fr_bio_dedup_entry_cancel(fr_bio_t *bio, fr_bio_dedup_entry_t *item)
Cancel one item.
Definition: dedup.c:887
static ssize_t fr_bio_dedup_write_data(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
There's a partial block of data written.
Definition: dedup.c:664
fr_bio_dedup_entry_t * partial
Definition: dedup.c:107
fr_event_timer_t const * ev
Definition: dedup.c:90
fr_event_list_t * el
Definition: dedup.c:84
static int fr_bio_dedup_destructor(fr_bio_dedup_t *my)
Remove the dedup cache.
Definition: dedup.c:961
static void fr_bio_dedup_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx)
Expire an entry when its timer fires.
Definition: dedup.c:723
static ssize_t fr_bio_dedup_flush_pending(fr_bio_dedup_t *my)
Flush any packets in the pending queue.
Definition: dedup.c:392
static ssize_t fr_bio_dedup_blocked(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item, ssize_t rcode)
The write is blocked.
Definition: dedup.c:611
fr_rb_tree_t rb
expire list
Definition: dedup.c:86
static void fr_bio_dedup_replied(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item)
Move an item from active to replied.
Definition: dedup.c:139
fr_bio_t * fr_bio_dedup_alloc(TALLOC_CTX *ctx, size_t max_saved, fr_bio_dedup_receive_t receive, fr_bio_dedup_release_t release, fr_bio_dedup_get_item_t get_item, fr_bio_dedup_config_t const *cfg, fr_bio_t *next)
Allocate a fr_bio_dedup_t.
Definition: dedup.c:987
fr_bio_dedup_receive_t receive
called when we receive a potentially new packet
Definition: dedup.c:109
static ssize_t fr_bio_dedup_blocked_data(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
The write is blocked, but we don't have "item".
Definition: dedup.c:699
static int8_t _entry_cmp(void const *one, void const *two)
Definition: dedup.c:871
static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write raw data to the bio.
Definition: dedup.c:759
fr_bio_dedup_entry_t * first
Definition: dedup.c:99
static ssize_t fr_bio_dedup_buffer_write(fr_bio_dedup_t *my)
Write data from our local buffer to the next bio.
Definition: dedup.c:490
static void fr_bio_dedup_reset_timer_item(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item)
Definition: dedup.c:127
fr_bio_buf_t buffer
Definition: dedup.c:114
struct fr_bio_dedup_list_s fr_bio_dedup_list_t
Definition: dedup.c:34
static int fr_bio_dedup_reset_timer(fr_bio_dedup_t *my)
Reset the timer after changing the rb tree.
Definition: dedup.c:292
fr_bio_dedup_state_t
Definition: dedup.c:48
@ FR_BIO_DEDUP_STATE_PARTIAL
Partially written.
Definition: dedup.c:53
@ FR_BIO_DEDUP_STATE_FREE
Definition: dedup.c:49
@ FR_BIO_DEDUP_STATE_ACTIVE
Received, but not replied.
Definition: dedup.c:50
@ FR_BIO_DEDUP_STATE_CANCELLED
Partially written, and then cancelled.
Definition: dedup.c:54
@ FR_BIO_DEDUP_STATE_PENDING
Have a reply, but we're trying to write it out.
Definition: dedup.c:51
@ FR_BIO_DEDUP_STATE_REPLIED
Replied, and waiting for it to expire.
Definition: dedup.c:52
static ssize_t fr_bio_dedup_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
Definition: dedup.c:812
static void fr_bio_dedup_release(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item, fr_bio_dedup_release_reason_t reason)
Release an entry back to the free list.
Definition: dedup.c:335
void * uctx
user-writable context
Definition: dedup.c:58
void(* fr_bio_dedup_release_t)(fr_bio_t *bio, fr_bio_dedup_entry_t *dedup_ctx, fr_bio_dedup_release_reason_t reason)
Callback on release the packet (timeout, or cancelled by the application)
Definition: dedup.h:91
fr_event_list_t * el
event list
Definition: dedup.h:37
fr_bio_dedup_t * my
so we can get to it from the event timer callback
Definition: dedup.c:73
fr_time_t expires
when this entry expires
Definition: dedup.c:75
bool(* fr_bio_dedup_receive_t)(fr_bio_t *bio, fr_bio_dedup_entry_t *dedup_ctx, void *packet_ctx, const void *buffer, size_t size)
Callback on read to see if we should receive the packet.
Definition: dedup.h:78
fr_time_delta_t lifetime
default lifetime of dedup entry
Definition: dedup.h:39
size_t packet_size
size of the cached packet data
Definition: dedup.c:61
fr_bio_dedup_state_t state
which tree or list this item is in
Definition: dedup.c:76
struct fr_bio_dedup_entry_s fr_bio_dedup_entry_t
Definition: dedup.h:42
void * packet_ctx
packet_ctx for dedup purposes
Definition: dedup.c:59
uint8_t * reply
reply cached by the application
Definition: dedup.c:63
size_t reply_size
size of the cached reply
Definition: dedup.c:64
void * reply_ctx
reply ctx
Definition: dedup.c:62
uint8_t * packet
cached packet data for finding duplicates
Definition: dedup.c:60
fr_bio_dedup_entry_t *(* fr_bio_dedup_get_item_t)(fr_bio_t *bio, void *packet_ctx)
Definition: dedup.h:93
fr_bio_dedup_release_reason_t
Definition: dedup.h:56
@ FR_BIO_DEDUP_WRITE_ERROR
Definition: dedup.h:59
@ FR_BIO_DEDUP_CANCELLED
Definition: dedup.h:58
@ FR_BIO_DEDUP_EXPIRED
Definition: dedup.h:57
Definition: dedup.c:57
#define FR_DLIST_TYPES(_name)
Define type specific wrapper structs for dlists.
Definition: dlist.h:1129
#define FR_DLIST_ENTRY(_name)
Expands to the type name used for the entry wrapper structure.
Definition: dlist.h:1115
#define FR_DLIST_FUNCS(_name, _element_type, _element_entry)
Define type specific wrapper functions for dlists.
Definition: dlist.h:1152
#define fr_event_timer_at(...)
Definition: event.h:250
free(array)
Stores all information relating to an event list.
Definition: event.c:411
A timer event.
Definition: event.c:102
static void * item(fr_lst_t const *lst, fr_lst_index_t idx)
Definition: lst.c:122
typedef FR_DLIST_HEAD(map_list) map_list_t
Given these are used in so many places, it's more friendly to have a proper type.
long int ssize_t
Definition: merged_model.c:24
unsigned char uint8_t
Definition: merged_model.c:30
unsigned long int size_t
Definition: merged_model.c:25
static size_t used
static fr_bio_t * bio
Definition: radclient-ng.c:86
void * fr_rb_first(fr_rb_tree_t *tree)
Definition: rb.c:780
void * fr_rb_remove_by_inline_node(fr_rb_tree_t *tree, fr_rb_node_t *node)
Remove an entry from the tree, using the node structure, without freeing the data.
Definition: rb.c:718
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition: rb.c:624
void * fr_rb_iter_init_inorder(fr_rb_iter_inorder_t *iter, fr_rb_tree_t *tree)
Initialise an in-order iterator.
Definition: rb.c:818
#define fr_rb_inline_init(_tree, _type, _field, _data_cmp, _data_free)
Initialises a red black tree.
Definition: rb.h:180
Iterator structure for in-order traversal of an rbtree.
Definition: rb.h:321
The main red black tree structure.
Definition: rb.h:73
fr_assert(0)
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition: talloc.h:212
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition: time.h:573
static fr_time_t fr_time_add_time_delta(fr_time_t a, fr_time_delta_t b)
Definition: time.h:173
#define fr_time_lteq(_a, _b)
Definition: time.h:240
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition: time.h:196
#define fr_time_gt(_a, _b)
Definition: time.h:237
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition: time.h:914
"server local" time.
Definition: time.h:69
static fr_event_list_t * el