The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
dedup.c
Go to the documentation of this file.
1 /*
2  * This program is is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or (at
5  * your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: fbf88c1a785fa0f8894f5029d0e2871a8c49f745 $
19  * @file lib/bio/dedup.c
20  * @brief Binary IO abstractions for deduping packets.
21  *
22  * The dedup BIO receives packets from the network, and allows for deduplication of replies. The actual
23  * deduplication tree / table has to be maintained by the calling application, as packet dedup is very
24  * protocol-specific. The purpose of the dedup BIO is to abstract all of the common support functions around
25  * this requirement.
26  *
27  * When packets are read() from the next bio, the #fr_bio_dedup_receive_t callback is run. It tells the BIO
28  * whether or not the packet should be received, and whether or not the packet should be returned to the
29  * caller. The receive callback is also passed a #fr_bio_dedup_entry_t pointer, where the packet_ctx, packet,
30  * and size are already filled out. This entry is used to correlate requests and replies.
31  *
32  * When packets are write() to the network, the #fr_bio_dedup_get_item_t callback is called to get the
33  * previously cached #fr_bio_dedup_entry_t pointer. This is because there is no generic way to get an
34  * additional context to this BIO via the write() routine. i.e. the packet_ctx for write() may include things
35  * like src/dst ip/port, and therefore can't always be an #fr_bio_dedup_entry_t. The caller should associate
36  * the #fr_bio_dedup_entry_t with the packet_ctx for the reply. The get_item() routine can then return the entry.
37  *
38  * For simplicity, the next bio should be a memory one. That way the read() can read more than one packet if
39  * necessary. And the write() can cache a partial packet if it blocks.
40  *
41  * The entry needs to be cached in order to maintain the internal tracking used by the dedup BIO.
42  *
43  * On client retransmit, the #fr_bio_dedup_receive_t callback is run, just as if it is a new packet. The
44  * dedup BIO does not know if the received data is a new packet until the #fr_bio_dedup_receive_t callback
45  * says so. On duplicate client request, the #fr_bio_dedup_receive_t callback can call fr_bio_dedup_respond()
46  * to send a duplicate reply. That call bypasses the normal dedup stack, and writes directly to the next bio.
47  *
48  * The calling application can also call fr_bio_dedup_respond() as soon as it has a reply. i.e. skip the BIO
49  * write() call. That works, and is safe.
50  *
51  * The dedup BIO tracks a number of lists / trees internally. Packets which are received but which have no
52  * reply are in an "active" list. Packets which have a reply are in an "expired" RB tree, where a timer is
53  * set to expire packets. If a write() call results in a partial write, that packet is put into a "partially
54  * written" state. If multiple calls to write() are done when writing is blocked, the replies are put into a
55  * "pending" state.
56  *
57  * The calling application can always call fr_bio_dedup_cancel() to cancel or expire a packet. This call is
58  * safe, and can be made at any time, no matter what state the packet is in.
59  *
60  * @copyright 2024 Network RADIUS SAS (legal@networkradius.com)
61  */
62 
63 #include <freeradius-devel/bio/bio_priv.h>
64 #include <freeradius-devel/bio/null.h>
65 #include <freeradius-devel/bio/buf.h>
66 #include <freeradius-devel/util/rb.h>
67 #include <freeradius-devel/util/dlist.h>
68 
69 #define _BIO_DEDUP_PRIVATE
70 #include <freeradius-devel/bio/dedup.h>
71 
72 typedef struct fr_bio_dedup_list_s fr_bio_dedup_list_t;
73 typedef struct fr_bio_dedup_s fr_bio_dedup_t;
74 
75 /*
76  * There is substantial similarity between this code and the
77  * "retry" bio. Any fixes which are done here should be checked
78  * there, and vice versa.
79  */
80 
81 /*
82  * Define type-safe wrappers for head and entry definitions.
83  */
84 FR_DLIST_TYPES(fr_bio_dedup_list)
85 
86 typedef enum {
88  FR_BIO_DEDUP_STATE_ACTIVE, //!< Received, but not replied.
89  FR_BIO_DEDUP_STATE_PENDING, //!< Have a reply, but we're trying to write it out.
90  FR_BIO_DEDUP_STATE_REPLIED, //!< Replied, and waiting for it to expire.
91  FR_BIO_DEDUP_STATE_PARTIAL, //!< Partially written
92  FR_BIO_DEDUP_STATE_CANCELLED, //!< Partially written, and then cancelled.
94 
96  void *uctx;
97  void *packet_ctx;
98  uint8_t *packet; //!< cached packet data for finding duplicates
99  size_t packet_size; //!< size of the cached packet data
100  void *reply_ctx; //!< reply ctx
101  uint8_t *reply; //!< reply cached by the application
102  size_t reply_size; //!< size of the cached reply
103 
104  fr_rb_node_t dedup; //!< user managed dedup node
105 
106  union {
107  struct {
108  fr_rb_node_t node; //!< for the expiry timers
109  };
110  FR_DLIST_ENTRY(fr_bio_dedup_list) entry; //!< for the free list
111  };
112 
113  fr_bio_dedup_t *my; //!< so we can get to it from the event timer callback
114 
115  fr_time_t expires; //!< when this entry expires
116  fr_bio_dedup_state_t state; //!< which tree or list this item is in
117 };
118 
119 FR_DLIST_FUNCS(fr_bio_dedup_list, fr_bio_dedup_entry_t, entry)
120 
123 
125 
126  fr_rb_tree_t rb; //!< expire list
127 
129 
131 
132  /*
133  * The "first" entry is cached here so that we can detect when it changes. The insert / delete
134  * code can just do its work without worrying about timers. And then when the tree manipulation
135  * is done, call the fr_bio_dedup_timer_reset() function to reset (or not) the timer.
136  *
137  * The timer is set for is when the first packet expires.
138  */
140 
141  /*
142  * Cache a partial write when IO is blocked.
143  *
144  * When the IO is blocked, we can still expire old entries, unlike the "retry" BIOs. This is
145  * because we're not resending packets, we're just cleaning up *sent* packets when they expire.
146  */
148 
149  fr_bio_dedup_receive_t receive; //!< called when we receive a potentially new packet
150  fr_bio_dedup_release_t release; //!< called to release a packet
151 
152  fr_bio_dedup_get_item_t get_item; //!< turn a packet_ctx into a #fr_bio_dedup_entry_t
153 
155 
156  FR_DLIST_HEAD(fr_bio_dedup_list) active; //!< received but not yet replied
157  FR_DLIST_HEAD(fr_bio_dedup_list) pending; //!< trying to write when the socket is blocked.
158  FR_DLIST_HEAD(fr_bio_dedup_list) free; //!< free list
159 };
160 
161 static void fr_bio_dedup_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx);
162 static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
166 
168 {
169  if (my->first != item) return;
170 
171  my->first = NULL;
173 }
174 
175 /** Move an item from active to replied.
176  *
177  * Note that we don't update any timers. The caller is responsible for that.
178  */
180 {
181  if (item->state == FR_BIO_DEDUP_STATE_REPLIED) return;
182 
184 
185  (void) fr_bio_dedup_list_remove(&my->active, item);
186 
187  /*
188  * Now that we have a reply, set the default expiry time. The caller can always call
189  * fr_bio_dedup_entry_extend() to change the expiry time.
190  */
191  item->expires = fr_time_add_time_delta(fr_time(), my->config.lifetime);
192 
193  /*
194  * This should never fail.
195  */
196  (void) fr_rb_insert(&my->rb, item);
198 }
199 
200 /** Resend a reply when we receive a duplicate request.
201  *
202  * This function should be called by the respond() callback to re-send a duplicate reply.
203  *
204  * It can also be called by the application when it first has a response to the request.
205  *
206  * @param bio the binary IO handler
207  * @param item the dedup context from #fr_bio_dedup_sent_t
208  * @return
209  * - <0 on error
210  * - 0 for "wrote no data"
211  * - >0 for "wrote data".
212  */
214 {
215  ssize_t rcode;
216  fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
217  fr_bio_t *next;
218 
219  if (!item->reply || !item->reply_size) return 0;
220 
221  switch (item->state) {
222  /*
223  * Send a first reply if we can.
224  */
226  /*
227  * If we're not writing to the socket, just insert the packet into the pending list.
228  */
229  if (my->bio.write != fr_bio_dedup_write) {
230  (void) fr_bio_dedup_list_remove(&my->active, item);
231  fr_bio_dedup_list_insert_tail(&my->pending, item);
232 
234  item->expires = fr_time_add_time_delta(fr_time(), my->config.lifetime);
235  return item->reply_size;
236  }
237 
238  /*
239  * The socket is writable, go do that.
240  */
241  break;
242 
243  /*
244  * Send a duplicate reply.
245  */
247  if (my->bio.write == fr_bio_dedup_write) break;
248 
249  /*
250  * The socket is blocked. Save the packet in the pending queue.
251  */
252  move_to_pending:
253  fr_rb_remove_by_inline_node(&my->rb, &item->node);
254 
255  save_in_pending:
256  /*
257  * We could update the timer for pending packets. However, that's more complicated.
258  *
259  * The packets will be expire when the pending queue is flushed, OR when the application
260  * cancels the pending packet.
261  */
263 
264  fr_bio_dedup_list_insert_tail(&my->pending, item);
266  return item->reply_size;
267 
268  /*
269  * We're already trying to write this entry, don't do anything else.
270  */
272  fr_assert(my->partial != NULL);
273  FALL_THROUGH;
274 
276  return fr_bio_error(IO_WOULD_BLOCK);
277 
280  fr_assert(0);
281  return fr_bio_error(GENERIC);
282  }
283 
284  /*
285  * There must be a next bio.
286  */
287  next = fr_bio_next(&my->bio);
288  fr_assert(next != NULL);
289 
290  /*
291  * Write out the packet, if everything is OK, return.
292  */
293  rcode = next->write(next, item->reply_ctx, item->reply, item->reply_size);
294  if ((size_t) rcode == item->reply_size) {
296  return rcode;
297  }
298 
299  /*
300  * Can't write anything, be sad.
301  */
302  if ((rcode == 0) || (rcode == fr_bio_error(IO_WOULD_BLOCK))) {
303  if (item->state == FR_BIO_DEDUP_STATE_ACTIVE) {
304  (void) fr_bio_dedup_list_remove(&my->active, item);
305  goto save_in_pending;
306  }
307 
309  goto move_to_pending;
310  }
311 
312  /*
313  * There's an error writing the packet. Release it, and move the item to the free list.
314  *
315  * Note that we don't bother resetting the timer. There's no point in changing the timer when
316  * the bio is likely dead.
317  */
318  if (rcode < 0) {
320  return rcode;
321  }
322 
323  /*
324  * We are writing item->reply, and that's blocked. Save the partial packet for later.
325  */
326  return fr_bio_dedup_blocked(my, item, rcode);
327 }
328 
329 /** Reset the timer after changing the rb tree.
330  *
331  */
333 {
334  fr_bio_dedup_entry_t *first;
335 
336  /*
337  * Nothing to do, don't set any timers.
338  */
339  first = fr_rb_first(&my->rb);
340  if (!first) {
341  talloc_const_free(my->ev);
342  my->ev = NULL;
343  my->first = NULL;
344  return 0;
345  }
346 
347  /*
348  * We don't care about partially written packets. The timer remains set even when we have a
349  * partial outgoing packet, because we can expire entries which aren't partially written.
350  *
351  * However, the partially written packet MUST NOT be in the expiry tree.
352  *
353  * We also don't care about the pending list. The application will either cancel the item, or
354  * the socket will become writable, and the item will get handled then.
355  */
356  fr_assert(first != my->partial);
357 
358  /*
359  * The timer is already set correctly, we're done.
360  */
361  if (first == my->first) return 0;
362 
363  /*
364  * Update the timer. This should never fail.
365  */
366  if (fr_event_timer_at(my, my->el, &my->ev, first->expires, fr_bio_dedup_timer, my) < 0) return -1;
367 
368  my->first = first;
369  return 0;
370 }
371 
372 /** Release an entry back to the free list.
373  *
374  */
376 {
377  my->release((fr_bio_t *) my, item, reason);
378 
379  switch (item->state) {
380  /*
381  * Cancel an active item, just nuke it.
382  */
384  fr_bio_dedup_list_remove(&my->active, item);
385  break;
386 
387  /*
388  * We already replied, remove it from the expiry tree.
389  *
390  * We only update the timer if the caller isn't already expiring the entry.
391  */
393  (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
394 
396  break;
397 
398  /*
399  * It was pending another write, so we just discard the write.
400  */
402  fr_bio_dedup_list_remove(&my->active, item);
403  break;
404 
405  /*
406  * Don't free it. Just set its state to cancelled.
407  */
409  fr_assert(my->partial == item);
411  return;
412 
415  fr_assert(0);
416  return;
417  }
418 
419 #ifndef NDEBUG
420  item->packet = NULL;
421 #endif
422  item->uctx = NULL;
423  item->packet_ctx = NULL;
424 
425  fr_assert(my->first != item);
426  fr_bio_dedup_list_insert_head(&my->free, item);
427 }
428 
429 /** Flush any packets in the pending queue.
430  *
431  */
433 {
434  ssize_t rcode, out_rcode;
436  fr_bio_t *next;
437  fr_time_t now;
438 
439  /*
440  * We can only flush the pending list when any previous partial packet has been written.
441  */
442  fr_assert(!my->partial);
443 
444  /*
445  * Nothing in the list, we're done.
446  */
447  if (fr_bio_dedup_list_num_elements(&my->pending) == 0) return 0;
448 
449  /*
450  * There must be a next bio.
451  */
452  next = fr_bio_next(&my->bio);
453  fr_assert(next != NULL);
454 
455  now = fr_time();
456  out_rcode = 0;
457 
458  /*
459  * Write out any pending packets.
460  */
461  while ((item = fr_bio_dedup_list_pop_head(&my->pending)) != NULL) {
463 
464  /*
465  * It's already expired, don't bother replying.
466  */
467  if (fr_time_lteq(item->expires, now)) {
469  continue;
470  }
471 
472  /*
473  * Write the entry to the next bio.
474  */
475  rcode = next->write(next, item->reply_ctx, item->reply, item->reply_size);
476  if (rcode <= 0) return rcode; /* @todo - update timer if we've written one packet */
477 
478  /*
479  * We've written the entire packet, move it to the expiry list.
480  */
481  if ((size_t) rcode == item->reply_size) {
482  (void) fr_bio_dedup_list_remove(&my->pending, item);
483  (void) fr_rb_insert(&my->rb, item);
485  continue;
486  }
487 
488  fr_bio_dedup_blocked(my, item, rcode);
489 
490  out_rcode = fr_bio_error(IO_WOULD_BLOCK);
491  break;
492  }
493 
494  /*
495  * We may need to update the timer if we've removed the first entry from the tree, or added a new
496  * first entry.
497  */
498  if (!my->first || (my->first != fr_rb_first(&my->rb))) {
499  my->first = NULL;
501  }
502 
503  return out_rcode;
504 }
505 
506 /** Save partially written data to our local buffer.
507  *
508  */
509 static int fr_bio_dedup_buffer_save(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
510 {
511  /*
512  * (re)-alloc the buffer for partial writes.
513  */
514  if (!my->buffer.start ||
515  (size > fr_bio_buf_size(&my->buffer))) {
516  if (fr_bio_buf_alloc(my, &my->buffer, size) < 0) return -1;
517  }
518 
519  fr_assert(fr_bio_buf_used(&my->buffer) == 0);
520  fr_assert(my->buffer.read == my->buffer.start);
521 
522  fr_bio_buf_write(&my->buffer, buffer + rcode, size - rcode);
523 
524  return 0;
525 }
526 
527 /** Write data from our local buffer to the next bio.
528  *
529  */
531 {
532  size_t used;
533  ssize_t rcode;
534  fr_bio_t *next;
535 
536  fr_assert(my->buffer.start);
537 
538  used = fr_bio_buf_used(&my->buffer);
539  fr_assert(used > 0);
540 
541  /*
542  * There must be a next bio.
543  */
544  next = fr_bio_next(&my->bio);
545  fr_assert(next != NULL);
546 
547  rcode = next->write(next, NULL, my->buffer.read, used);
548  if (rcode <= 0) return rcode;
549 
550  my->buffer.read += rcode;
551 
552  /*
553  * Still data in the buffer. We can't send more packets until we finish writing this one.
554  */
555  if (fr_bio_buf_used(&my->buffer) > 0) return 0;
556 
557  /*
558  * We're done. Reset the buffer and clean up our cached partial packet.
559  */
560  fr_bio_buf_reset(&my->buffer);
561 
562  return rcode;
563 }
564 
565 /** There's a partial packet written. Write all of that one first, before writing another packet.
566  *
567  * The packet can either be cancelled, or IO blocked. In either case, we must write this packet before
568  * we can write another one.
569  */
570 static ssize_t fr_bio_dedup_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
571 {
572  ssize_t rcode;
573  fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
574  fr_bio_dedup_entry_t *item = my->partial;
575 
576  fr_assert(my->partial != NULL);
577  fr_assert(my->buffer.start);
578 
580  (item->state == FR_BIO_DEDUP_STATE_CANCELLED));
581 
582  rcode = fr_bio_dedup_buffer_write(my);
583  if (rcode <= 0) return rcode;
584 
585  my->partial = NULL;
586 
587  /*
588  * Partial writes are removed from the expiry tree until they're fully written. When they're
589  * written, either add it back to the tree if it's still operational, or add it to the free list
590  * if it has been cancelled.
591  */
592  if (item->state == FR_BIO_DEDUP_STATE_PARTIAL) {
593 
594  /*
595  * See if we have to clean up this entry. If so, do it now. That avoids another bounce
596  * through the event loop.
597  */
598  if (fr_time_lteq(item->expires, fr_time())) {
600 
601  } else {
602  /*
603  * We've changed the tree, so update the timer. fr_bio_dedup_write() only
604  * updates the timer on successful write.
605  */
607  (void) fr_rb_insert(&my->rb, item);
608  }
610 
611  } else {
612  /*
613  * The item was cancelled, add it to the free list.
614  */
615 #ifndef NDEBUG
616  item->packet = NULL;
617 #endif
618  item->uctx = NULL;
619  item->packet_ctx = NULL;
620 
621  item->state = FR_BIO_DEDUP_STATE_FREE;
622  fr_bio_dedup_list_insert_head(&my->free, item);
623  }
624 
625  /*
626  * Flush any packets which were pending during the blocking period.
627  */
629  if (rcode < 0) return rcode;
630 
631  /*
632  * Unlike the retry BIO, we don't retry writes for items in the RB tree. Those packets have already
633  * been written.
634  */
635 
636  /*
637  * Try to write the packet which we were given.
638  */
639  my->bio.write = fr_bio_dedup_write;
640  return fr_bio_dedup_write(bio, packet_ctx, buffer, size);
641 }
642 
643 /** The write is blocked.
644  *
645  * We couldn't write out the entire packet, the bio is blocked. Don't write anything else until we become
646  * unblocked!
647  *
648  * Do NOT free the timer. We can still expire old entries. This newly written entry usually ends up as the
649  * _last_ item in the RB tree.
650  */
652 {
653  fr_assert(!my->partial);
654  fr_assert(rcode > 0);
655  fr_assert((size_t) rcode < item->reply_size);
656 
657  if (fr_bio_dedup_buffer_save(my, item->reply, item->reply_size, rcode) < 0) return fr_bio_error(OOM);
658 
659  switch (item->state) {
661  (void) fr_bio_dedup_list_remove(&my->active, item);
662  break;
663 
664  /*
665  * We cannot expire this entry, so remove it from the expiration tree. That step lets us
666  * expire other entries.
667  */
669  (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
671  break;
672 
673  /*
674  * We tried to write a pending packet and got blocked.
675  */
677  fr_assert(fr_bio_dedup_list_head(&my->pending) == item);
678  (void) fr_bio_dedup_list_remove(&my->pending, item);
679  break;
680 
681  /*
682  * None of these states should be possible.
683  */
687  fr_assert(0);
688  return fr_bio_error(GENERIC);
689  }
690 
691  my->partial = item;
693 
694  /*
695  * Reset the write routine, so that if the application tries any more writes, the partial entry
696  * gets written first.
697  */
698  my->bio.write = fr_bio_dedup_write_partial;
699  return rcode;
700 }
701 
702 /** There's a partial block of data written. Write all of that data first, before writing another packet.
703  */
704 static ssize_t fr_bio_dedup_write_data(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
705 {
706  ssize_t rcode;
707  fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
708 
709  fr_assert(!my->partial);
710 
711  /*
712  * Flush out any partly written data.
713  */
714  rcode = fr_bio_dedup_buffer_write(my);
715  if (rcode <= 0) return rcode;
716 
717  /*
718  * Flush any packets which were pending during the blocking period.
719  */
721  if (rcode < 0) return rcode;
722 
723  /*
724  * Try to write the packet which we were given.
725  */
726  my->bio.write = fr_bio_dedup_write;
727  return fr_bio_dedup_write(bio, packet_ctx, buffer, size);
728 }
729 
730 
731 /** The write is blocked, but we don't have "item".
732  *
733  * We couldn't write out the entire packet, the bio is blocked. Don't write anything else until we become
734  * unblocked!
735  *
736  * Do NOT free the timer. We can still expire old entries. This newly written entry usually ends up as the
737  * _last_ item in the RB tree.
738  */
740 {
741  fr_assert(!my->partial);
742  fr_assert(rcode > 0);
743  fr_assert((size_t) rcode < size);
744 
745  if (fr_bio_dedup_buffer_save(my, buffer, size, rcode) < 0) return fr_bio_error(OOM);
746 
747  /*
748  * Reset the write routine, so that if the application tries any more writes, the data
749  * gets written first.
750  */
751  my->bio.write = fr_bio_dedup_write_data;
752  return rcode;
753 }
754 
755 /*
756  * There is no fr_bio_dedup_rewrite(), packets are never re-written by this bio.
757  */
758 
759 /** Expire an entry when its timer fires.
760  *
761  * @todo - expire items from the pending list, too
762  */
764 {
765  fr_bio_dedup_t *my = talloc_get_type_abort(uctx, fr_bio_dedup_t);
767  fr_time_t expires;
768 
769  fr_assert(my->first != NULL);
770  fr_assert(fr_rb_first(&my->rb) == my->first);
771 
772  my->first = NULL;
773 
774  /*
775  * Expire all entries which are within 10ms of "now". That way we don't reset the event many
776  * times in short succession.
777  *
778  * @todo - also expire entries on the pending list?
779  */
780  expires = fr_time_add(now, fr_time_delta_from_msec(10));
781 
782  while ((item = fr_rb_first(&my->rb)) != NULL) {
783  if (fr_time_gt(item->expires, expires)) break;
784 
786  }
787 
789 }
790 
791 /** Write raw data to the bio.
792  *
793  * This function is largely a duplicate of fr_bio_dedup_respond(). Except due to the BIO API, it can be
794  * passed a NULL buffer (for flushing the BIOs), and it can't be passed a #fr_bio_dedup_entry_t, and instead
795  * has to be passed a "void *packet_ctx".
796  *
797  * The caller is free to ignore this function,
798  */
799 static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
800 {
801  ssize_t rcode;
803  fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
804  fr_bio_t *next;
805 
806  fr_assert(!my->partial);
807 
808  /*
809  * There must be a next bio.
810  */
811  next = fr_bio_next(&my->bio);
812  fr_assert(next != NULL);
813 
814  /*
815  * The caller is trying to flush partial data. But we don't have any partial data, so just call
816  * the next bio to flush it.
817  */
818  if (!buffer) {
819  return next->write(next, packet_ctx, NULL, size);
820  }
821 
822  /*
823  * Write out the packet. If there's an error, OR we wrote nothing, return.
824  *
825  * Note that we don't mark the socket as blocked if the next bio didn't write anything. We want
826  * the caller to know that the write didn't succeed, and the caller takes care of managing the
827  * current packet. So there's no need for us to do that.
828  */
829  rcode = next->write(next, packet_ctx, buffer, size);
830  if (rcode <= 0) return rcode;
831 
832  /*
833  * We need the item pointer to mark this entry as blocked. If that doesn't exist, then we try
834  * really hard to write out the un-tracked data.
835  */
836  item = NULL;
837  if (my->get_item) item = my->get_item(bio, packet_ctx);
838  if ((size_t) rcode == size) {
840  return rcode;
841  }
842 
843  if (!item) return fr_bio_dedup_blocked_data(my, buffer, size, rcode);
844 
845  fr_assert(item->reply_ctx == packet_ctx);
846  fr_assert(item->reply == buffer);
847  fr_assert(item->reply_size == size);
848 
849  return fr_bio_dedup_blocked(my, item, rcode);
850 }
851 
852 static ssize_t fr_bio_dedup_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
853 {
854  ssize_t rcode;
856  fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
857  fr_bio_t *next;
858 
859  /*
860  * There must be a next bio.
861  */
862  next = fr_bio_next(&my->bio);
863  fr_assert(next != NULL);
864 
865  /*
866  * Read the packet. If error or nothing, return immediately.
867  */
868  rcode = next->read(next, packet_ctx, buffer, size);
869  if (rcode <= 0) return rcode;
870 
871  /*
872  * Get a free item
873  */
874  item = fr_bio_dedup_list_pop_head(&my->free);
875  fr_assert(item != NULL);
876 
877  fr_assert(item->my == my);
879  .my = my,
880  .packet_ctx = packet_ctx,
881  .packet = buffer,
882  .packet_size = (size_t) rcode,
883  .state = FR_BIO_DEDUP_STATE_ACTIVE,
884  };
885 
886  /*
887  * See if we want to receive this packet. If this isn't
888  * something we need to receive, then we just discard it.
889  *
890  * The "receive" function is responsible for looking in a local dedup tree to see if there's a
891  * cached reply. It's also responsible for calling the fr_bio_retry_respond() function to send
892  * a duplicate reply, and then return "don't receive" this packet.
893  *
894  * The application can alos call fr_bio_dedup_entry_extend() in order to extend the lifetime of a
895  * packet which has a cached response.
896  *
897  * If there's an active packet, then the receive() function should do whatever it needs to do in
898  * order to update the application for a duplicate packet. And then return "don't receive" for
899  * this packet.
900  *
901  * If we're NOT going to process this packet, then the item we just popped needs to get inserted
902  * back into the free list.
903  *
904  * The caller should cancel any conflicting packets by calling fr_bio_dedup_entry_cancel(). Note
905  * that for sanity, we don't re-use the previous #fr_bio_dedup_entry_t.
906  */
907  if (!my->receive(bio, item, packet_ctx)) {
908  item->state = FR_BIO_DEDUP_STATE_FREE;
909  fr_bio_dedup_list_insert_head(&my->free, item);
910  return 0;
911  }
912 
913  fr_bio_dedup_list_insert_tail(&my->active, item);
914 
915  return rcode;
916 }
917 
918 static int8_t _entry_cmp(void const *one, void const *two)
919 {
920  fr_bio_dedup_entry_t const *a = one;
921  fr_bio_dedup_entry_t const *b = two;
922 
923  fr_assert(a->packet);
924  fr_assert(b->packet);
925 
926  return fr_time_cmp(a->expires, b->expires);
927 }
928 
929 /** Cancel one item.
930  *
931  * @param bio the binary IO handler
932  * @param item the dedup context from #fr_bio_dedup_respond_t
933  */
935 {
936  fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
937 
939 
941 }
942 
943 /** Extend the expiry time for an entry
944  *
945  * @param bio the binary IO handler
946  * @param item the dedup context from #fr_bio_dedup_respond_t
947  * @param expires the new expiry time
948  * @return
949  * - <0 error
950  * - 0 success
951  */
953 {
954  fr_bio_dedup_t *my = talloc_get_type_abort(bio, fr_bio_dedup_t);
955 
956  switch (item->state) {
958  return 0;
959 
961  break;
962 
963  /*
964  * Partially written or pending replies aren't in the expirty tree. We can just change their
965  * expiry time and be done.
966  */
969  item->expires = expires;
970  return 0;
971 
974  fr_assert(0);
975  return fr_bio_error(GENERIC);
976  }
977 
978  /*
979  * Shortening the lifetime is OK. If the caller does something dumb like set expiry to a time in
980  * the past, well... that's their problem.
981  */
982  fr_assert(fr_time_lteq(expires, fr_time()));
983 
984  /*
985  * Change places in the tree.
986  */
987  item->expires = expires;
988  (void) fr_rb_remove_by_inline_node(&my->rb, &item->node);
989  (void) fr_rb_insert(&my->rb, item);
990 
991  /*
992  * If we're not changing the first item, we don't need to change the timers.
993  *
994  * Otherwise we clear the "first" flag, so that the reset timer function will change the timer
995  * value.
996  */
997  if (my->first != item) return 0;
998 
999  my->first = NULL;
1000 
1001  return fr_bio_dedup_timer_reset(my);
1002 }
1003 
1004 
1005 /** Remove the dedup cache
1006  *
1007  */
1009 {
1012 
1013  talloc_const_free(my->ev);
1014 
1015  /*
1016  * Cancel all outgoing packets. Don't bother updating the tree or the free list, as all of the
1017  * entries will be deleted when the memory is freed.
1018  */
1019  while ((item = fr_rb_iter_init_inorder(&iter, &my->rb)) != NULL) {
1021  my->release((fr_bio_t *) my, item, FR_BIO_DEDUP_CANCELLED);
1022  }
1023 
1024 #ifndef NDEBUG
1025  my->ev = NULL;
1026  my->first = NULL;
1027 #endif
1028 
1029  return 0;
1030 }
1031 
1032 /** Allocate a #fr_bio_dedup_t
1033  *
1034  */
1035 fr_bio_t *fr_bio_dedup_alloc(TALLOC_CTX *ctx, size_t max_saved,
1036  fr_bio_dedup_receive_t receive,
1037  fr_bio_dedup_release_t release,
1038  fr_bio_dedup_get_item_t get_item,
1039  fr_bio_dedup_config_t const *cfg,
1040  fr_bio_t *next)
1041 {
1042  size_t i;
1043  fr_bio_dedup_t *my;
1044  fr_bio_dedup_entry_t *items;
1045 
1046  fr_assert(cfg->el);
1047 
1048  /*
1049  * Limit to reasonable values.
1050  */
1051  if (!max_saved) return NULL;
1052  if (max_saved > 65536) return NULL;
1053 
1054  my = talloc_zero(ctx, fr_bio_dedup_t);
1055  if (!my) return NULL;
1056 
1057  /*
1058  * Allocate everything up front, to get better locality of reference, less memory fragmentation,
1059  * and better reuse of data structures.
1060  */
1061  items = talloc_array(my, fr_bio_dedup_entry_t, max_saved);
1062  if (!items) return NULL;
1063 
1064  /*
1065  * Insert the entries into the free list in order.
1066  */
1067  fr_bio_dedup_list_init(&my->free);
1068 
1069  for (i = 0; i < max_saved; i++) {
1070  items[i].my = my;
1071  items[i].state = FR_BIO_DEDUP_STATE_FREE;
1072  fr_bio_dedup_list_insert_tail(&my->free, &items[i]);
1073  }
1074 
1075  fr_bio_dedup_list_init(&my->active);
1076  fr_bio_dedup_list_init(&my->pending);
1077 
1078  (void) fr_rb_inline_init(&my->rb, fr_bio_dedup_entry_t, node, _entry_cmp, NULL);
1079 
1080  my->receive = receive;
1081  my->release = release;
1082  my->get_item = get_item;
1083 
1084  my->el = cfg->el;
1085  my->config = *cfg;
1086 
1087  my->bio.write = fr_bio_dedup_write;
1088  my->bio.read = fr_bio_dedup_read;
1089 
1090  fr_bio_chain(&my->bio, next);
1091 
1092  talloc_set_destructor(my, fr_bio_dedup_destructor);
1093 
1094  return (fr_bio_t *) my;
1095 }
static int const char char buffer[256]
Definition: acutest.h:574
static fr_bio_t * fr_bio_next(fr_bio_t *bio)
Definition: base.h:130
#define fr_bio_error(_x)
Definition: base.h:192
Definition: base.h:112
static void fr_bio_chain(fr_bio_t *first, fr_bio_t *second)
Chain one bio after another.
Definition: bio_priv.h:69
int fr_bio_buf_alloc(TALLOC_CTX *ctx, fr_bio_buf_t *bio_buf, size_t size)
Definition: buf.c:114
ssize_t fr_bio_buf_write(fr_bio_buf_t *bio_buf, const void *buffer, size_t size)
Definition: buf.c:81
static size_t fr_bio_buf_used(fr_bio_buf_t const *bio_buf)
Definition: buf.h:73
static void fr_bio_buf_reset(fr_bio_buf_t *bio_buf)
Definition: buf.h:61
static size_t fr_bio_buf_size(fr_bio_buf_t const *bio_buf)
Definition: buf.h:151
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition: build.h:320
#define UNUSED
Definition: build.h:313
next
Definition: dcursor.h:178
fr_dcursor_eval_t void const * uctx
Definition: dcursor.h:546
fr_dcursor_iter_t iter
Definition: dcursor.h:147
return item
Definition: dcursor.h:553
ssize_t fr_bio_dedup_respond(fr_bio_t *bio, fr_bio_dedup_entry_t *item)
Resend a reply when we receive a duplicate request.
Definition: dedup.c:213
fr_bio_dedup_release_t release
called to release a packet
Definition: dedup.c:150
static void fr_bio_dedup_timer_reset_item(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item)
Definition: dedup.c:167
static int fr_bio_dedup_buffer_save(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
Save partially written data to our local buffer.
Definition: dedup.c:509
int fr_bio_dedup_entry_extend(fr_bio_t *bio, fr_bio_dedup_entry_t *item, fr_time_t expires)
Extend the expiry time for an entry.
Definition: dedup.c:952
fr_bio_dedup_config_t config
Definition: dedup.c:128
fr_bio_dedup_get_item_t get_item
turn a packet_ctx into a fr_bio_dedup_entry_t
Definition: dedup.c:152
static ssize_t fr_bio_dedup_write_partial(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
There's a partial packet written.
Definition: dedup.c:570
void fr_bio_dedup_entry_cancel(fr_bio_t *bio, fr_bio_dedup_entry_t *item)
Cancel one item.
Definition: dedup.c:934
static ssize_t fr_bio_dedup_write_data(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
There's a partial block of data written.
Definition: dedup.c:704
fr_bio_dedup_entry_t * partial
Definition: dedup.c:147
fr_event_timer_t const * ev
Definition: dedup.c:130
fr_event_list_t * el
Definition: dedup.c:124
static int fr_bio_dedup_destructor(fr_bio_dedup_t *my)
Remove the dedup cache.
Definition: dedup.c:1008
static void fr_bio_dedup_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx)
Expire an entry when its timer fires.
Definition: dedup.c:763
static ssize_t fr_bio_dedup_flush_pending(fr_bio_dedup_t *my)
Flush any packets in the pending queue.
Definition: dedup.c:432
static ssize_t fr_bio_dedup_blocked(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item, ssize_t rcode)
The write is blocked.
Definition: dedup.c:651
fr_rb_tree_t rb
expire list
Definition: dedup.c:126
static void fr_bio_dedup_replied(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item)
Move an item from active to replied.
Definition: dedup.c:179
fr_bio_t * fr_bio_dedup_alloc(TALLOC_CTX *ctx, size_t max_saved, fr_bio_dedup_receive_t receive, fr_bio_dedup_release_t release, fr_bio_dedup_get_item_t get_item, fr_bio_dedup_config_t const *cfg, fr_bio_t *next)
Allocate a fr_bio_dedup_t.
Definition: dedup.c:1035
fr_bio_dedup_receive_t receive
called when we receive a potentially new packet
Definition: dedup.c:149
static ssize_t fr_bio_dedup_blocked_data(fr_bio_dedup_t *my, uint8_t const *buffer, size_t size, ssize_t rcode)
The write is blocked, but we don't have "item".
Definition: dedup.c:739
static int8_t _entry_cmp(void const *one, void const *two)
Definition: dedup.c:918
static ssize_t fr_bio_dedup_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write raw data to the bio.
Definition: dedup.c:799
fr_bio_dedup_entry_t * first
Definition: dedup.c:139
static ssize_t fr_bio_dedup_buffer_write(fr_bio_dedup_t *my)
Write data from our local buffer to the next bio.
Definition: dedup.c:530
fr_bio_buf_t buffer
Definition: dedup.c:154
struct fr_bio_dedup_list_s fr_bio_dedup_list_t
Definition: dedup.c:72
fr_bio_dedup_state_t
Definition: dedup.c:86
@ FR_BIO_DEDUP_STATE_PARTIAL
Partially written.
Definition: dedup.c:91
@ FR_BIO_DEDUP_STATE_FREE
Definition: dedup.c:87
@ FR_BIO_DEDUP_STATE_ACTIVE
Received, but not replied.
Definition: dedup.c:88
@ FR_BIO_DEDUP_STATE_CANCELLED
Partially written, and then cancelled.
Definition: dedup.c:92
@ FR_BIO_DEDUP_STATE_PENDING
Have a reply, but we're trying to write it out.
Definition: dedup.c:89
@ FR_BIO_DEDUP_STATE_REPLIED
Replied, and waiting for it to expire.
Definition: dedup.c:90
static int fr_bio_dedup_timer_reset(fr_bio_dedup_t *my)
Reset the timer after changing the rb tree.
Definition: dedup.c:332
static ssize_t fr_bio_dedup_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
Definition: dedup.c:852
static void fr_bio_dedup_release(fr_bio_dedup_t *my, fr_bio_dedup_entry_t *item, fr_bio_dedup_release_reason_t reason)
Release an entry back to the free list.
Definition: dedup.c:375
void * uctx
user-writable context
Definition: dedup.c:96
void(* fr_bio_dedup_release_t)(fr_bio_t *bio, fr_bio_dedup_entry_t *dedup_ctx, fr_bio_dedup_release_reason_t reason)
Callback on release the packet (timeout, or cancelled by the application)
Definition: dedup.h:91
fr_event_list_t * el
event list
Definition: dedup.h:37
fr_bio_dedup_t * my
so we can get to it from the event timer callback
Definition: dedup.c:113
fr_time_t expires
when this entry expires
Definition: dedup.c:115
size_t packet_size
size of the cached packet data
Definition: dedup.c:99
fr_bio_dedup_state_t state
which tree or list this item is in
Definition: dedup.c:116
struct fr_bio_dedup_entry_s fr_bio_dedup_entry_t
Definition: dedup.h:42
void * packet_ctx
packet_ctx for dedup purposes
Definition: dedup.c:97
uint8_t * reply
reply cached by the application
Definition: dedup.c:101
size_t reply_size
size of the cached reply
Definition: dedup.c:102
void * reply_ctx
reply ctx
Definition: dedup.c:100
uint8_t * packet
cached packet data for finding duplicates
Definition: dedup.c:98
fr_bio_dedup_entry_t *(* fr_bio_dedup_get_item_t)(fr_bio_t *bio, void *packet_ctx)
Definition: dedup.h:93
fr_rb_node_t dedup
user managed dedup node
Definition: dedup.c:104
bool(* fr_bio_dedup_receive_t)(fr_bio_t *bio, fr_bio_dedup_entry_t *dedup_ctx, void *packet_ctx)
Callback on read to see if we should receive the packet.
Definition: dedup.h:78
fr_bio_dedup_release_reason_t
Definition: dedup.h:58
@ FR_BIO_DEDUP_WRITE_ERROR
Definition: dedup.h:61
@ FR_BIO_DEDUP_CANCELLED
Definition: dedup.h:60
@ FR_BIO_DEDUP_EXPIRED
Definition: dedup.h:59
Definition: dedup.c:95
#define FR_DLIST_TYPES(_name)
Define type specific wrapper structs for dlists.
Definition: dlist.h:1129
#define FR_DLIST_ENTRY(_name)
Expands to the type name used for the entry wrapper structure.
Definition: dlist.h:1115
#define FR_DLIST_FUNCS(_name, _element_type, _element_entry)
Define type specific wrapper functions for dlists.
Definition: dlist.h:1152
#define fr_event_timer_at(...)
Definition: event.h:250
fr_bio_shutdown & my
Definition: fd_errno.h:59
free(array)
Stores all information relating to an event list.
Definition: event.c:411
A timer event.
Definition: event.c:102
typedef FR_DLIST_HEAD(map_list) map_list_t
Given these are used in so many places, it's more friendly to have a proper type.
long int ssize_t
Definition: merged_model.c:24
unsigned char uint8_t
Definition: merged_model.c:30
unsigned long int size_t
Definition: merged_model.c:25
static size_t used
void * fr_rb_first(fr_rb_tree_t *tree)
Definition: rb.c:786
void fr_rb_iter_delete_inorder(fr_rb_iter_inorder_t *iter)
Remove the current node from the tree.
Definition: rb.c:898
void * fr_rb_remove_by_inline_node(fr_rb_tree_t *tree, fr_rb_node_t *node)
Remove an entry from the tree, using the node structure, without freeing the data.
Definition: rb.c:722
void * fr_rb_iter_init_inorder(fr_rb_iter_inorder_t *iter, fr_rb_tree_t *tree)
Initialise an in-order iterator.
Definition: rb.c:824
#define fr_rb_inline_init(_tree, _type, _field, _data_cmp, _data_free)
Initialises a red black tree.
Definition: rb.h:180
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Iterator structure for in-order traversal of an rbtree.
Definition: rb.h:321
The main red black tree structure.
Definition: rb.h:73
fr_assert(0)
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition: talloc.h:224
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition: time.h:575
static fr_time_t fr_time_add_time_delta(fr_time_t a, fr_time_delta_t b)
Definition: time.h:173
#define fr_time_lteq(_a, _b)
Definition: time.h:240
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition: time.h:196
#define fr_time_gt(_a, _b)
Definition: time.h:237
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition: time.h:916
"server local" time.
Definition: time.h:69
static fr_event_list_t * el