The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
queue.c
Go to the documentation of this file.
1 /*
2  * This program is is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or (at
5  * your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: 6c3f2996bb9624e9043d8e877db66e8179b53787 $
19  * @file lib/bio/queue.c
20  * @brief Binary IO abstractions for queues of raw packets
21  *
22  * @copyright 2024 Network RADIUS SAS (legal@networkradius.com)
23  */
24 
25 #include <freeradius-devel/bio/bio_priv.h>
26 #include <freeradius-devel/bio/queue.h>
27 #include <freeradius-devel/bio/null.h>
28 #include <freeradius-devel/util/dlist.h>
29 
30 typedef struct fr_bio_queue_list_s fr_bio_queue_list_t;
31 typedef struct fr_bio_queue_s fr_bio_queue_t;
32 
33 /*
34  * Define type-safe wrappers for head and entry definitions.
35  */
36 FR_DLIST_TYPES(fr_bio_queue_list)
37 
38 /*
39  * For delayed writes.
40  *
41  * @todo - we can remove the "cancelled" field by setting packet_ctx == my?
42  */
44  void *packet_ctx;
45  void const *buffer;
46  size_t size;
48  bool cancelled;
49 
51 
52  FR_DLIST_ENTRY(fr_bio_queue_list) entry; //!< List entry.
53 };
54 
55 FR_DLIST_FUNCS(fr_bio_queue_list, fr_bio_queue_entry_t, entry)
56 
57 typedef struct fr_bio_queue_s {
59 
60  size_t max_saved;
61 
65 
66  FR_DLIST_HEAD(fr_bio_queue_list) pending;
67  FR_DLIST_HEAD(fr_bio_queue_list) free;
68 
71 
72 static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
73 
74 /** Forcibly cancel all outstanding packets.
75  *
76  * Even partially written ones. This function is called from
77  * shutdown(), when the destructor is called, or on fatal read / write
78  * errors.
79  */
81 {
83 
84  my->bio.read = fr_bio_fail_read;
85  my->bio.write = fr_bio_fail_write;
86 
87  if (!my->cancel) return;
88 
89  if (fr_bio_queue_list_num_elements(&my->pending) == 0) return;
90 
91  /*
92  * Cancel any remaining saved items.
93  */
94  while ((item = fr_bio_queue_list_pop_head(&my->pending)) != NULL) {
95  my->cancel(&my->bio, item->packet_ctx, item->buffer, item->size);
96  item->cancelled = true;
97  fr_bio_queue_list_insert_head(&my->free, item);
98  }
99 }
100 
102 {
103  fr_assert(my->cancel); /* otherwise it would be fr_bio_destructor */
104 
106 
107  return 0;
108 }
109 
110 /** Push a packet onto a list.
111  *
112  */
113 static ssize_t fr_bio_queue_list_push(fr_bio_queue_t *my, void *packet_ctx, const void *buffer, size_t size, size_t already_written)
114 {
116 
117  item = fr_bio_queue_list_pop_head(&my->free);
118  if (!item) return fr_bio_error(IO_WOULD_BLOCK);
119 
120  /*
121  * If we're the first entry in the saved list, we can have a partially written packet.
122  *
123  * Otherwise, we're a subsequent entry, and we cannot have any data which is partially written.
124  */
125  fr_assert((fr_bio_queue_list_num_elements(&my->pending) == 0) ||
126  (already_written == 0));
127 
128  item->packet_ctx = packet_ctx;
129  item->buffer = buffer;
130  item->size = size;
131  item->already_written = already_written;
132  item->cancelled = false;
133 
134  fr_bio_queue_list_insert_tail(&my->pending, item);
135 
136  if (my->saved) my->saved(&my->bio, packet_ctx, buffer, size, item);
137 
138  return size;
139 }
140 
141 /** Write one packet to the next bio.
142  *
143  * If it blocks, save the packet and return OK to the caller.
144  */
145 static ssize_t fr_bio_queue_write_next(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
146 {
147  ssize_t rcode;
148  fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
149  fr_bio_t *next;
150 
151  /*
152  * We can't call the next bio if there's still cached data to flush.
153  */
154  fr_assert(fr_bio_queue_list_num_elements(&my->pending) == 0);
155 
156  next = fr_bio_next(&my->bio);
157  fr_assert(next != NULL);
158 
159  /*
160  * Write the data out. If we write all of it, we're done.
161  */
162  rcode = next->write(next, packet_ctx, buffer, size);
163  if ((size_t) rcode == size) return rcode;
164 
165  if (rcode < 0) {
166  /*
167  * A non-blocking error: return it back up the chain.
168  */
169  if (rcode != fr_bio_error(IO_WOULD_BLOCK)) return rcode;
170 
171  /*
172  * All other errors are fatal.
173  */
175  return rcode;
176  }
177 
178  /*
179  * We were flushing the next buffer, return any data which was written.
180  */
181  if (!buffer) return rcode;
182 
183  /*
184  * The next bio wrote a partial packet. Save the entire packet, and swap the write function to
185  * save all future packets in the saved list.
186  */
188 
189  fr_assert(fr_bio_queue_list_num_elements(&my->free) > 0);
190 
191  /*
192  * This can only error out if the free list has no more entries.
193  */
194  return fr_bio_queue_list_push(my, packet_ctx, buffer, size, (size_t) rcode);
195 }
196 
197 /** Flush the packet list.
198  *
199  */
201 {
202  size_t written;
203  fr_bio_t *next;
204 
205  if (fr_bio_queue_list_num_elements(&my->pending) == 0) {
206  my->bio.write = fr_bio_queue_write_next;
207  return 0;
208  }
209 
210  next = fr_bio_next(&my->bio);
211  fr_assert(next != NULL);
212 
213  /*
214  * Loop over the saved packets, flushing them to the next bio.
215  */
216  written = 0;
217  while (written < size) {
218  ssize_t rcode;
220 
221  /*
222  * No more saved packets to write: stop.
223  */
224  item = fr_bio_queue_list_head(&my->pending);
225  if (!item) break;
226 
227  /*
228  * A cancelled item must be partially written. A cancelled item which has zero bytes
229  * written should not be in the saved list.
230  */
231  fr_assert(!item->cancelled || (item->already_written > 0));
232 
233  /*
234  * Push out however much data we can to the next bio.
235  */
236  rcode = next->write(next, item->packet_ctx, ((uint8_t const *) item->buffer) + item->already_written, item->size - item->already_written);
237  if (rcode == 0) break;
238 
239  if (rcode < 0) {
240  if (rcode == fr_bio_error(IO_WOULD_BLOCK)) break;
241 
242  return rcode;
243  }
244 
245  /*
246  * Update the written count.
247  */
248  written += rcode;
249  item->already_written += rcode;
250 
251  if (item->already_written < item->size) break;
252 
253  /*
254  * We don't run "sent" callbacks for cancelled items.
255  */
256  if (item->cancelled) {
257  if (my->cancel) my->cancel(&my->bio, item->packet_ctx, item->buffer, item->size);
258  } else {
259  if (my->sent) my->sent(&my->bio, item->packet_ctx, item->buffer, item->size);
260  }
261 
262  (void) fr_bio_queue_list_pop_head(&my->pending);
263 #ifndef NDEBUG
264  item->buffer = NULL;
265  item->packet_ctx = NULL;
266  item->size = 0;
267  item->already_written = 0;
268 #endif
269  item->cancelled = true;
270 
271  fr_bio_queue_list_insert_head(&my->free, item);
272  }
273 
274  /*
275  * If we've written all of the saved packets, go back to writing to the "next" bio.
276  */
277  if (fr_bio_queue_list_head(&my->pending)) my->bio.write = fr_bio_queue_write_next;
278 
279  return written;
280 }
281 
282 /** Write to the packet list buffer.
283  *
284  * The special buffer pointer of NULL means flush(). On flush, we call next->read(), and if that succeeds,
285  * go back to "pass through" mode for the buffers.
286  */
287 static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
288 {
289  fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
290 
291  if (!buffer) return fr_bio_queue_write_flush(my, size);
292 
293  /*
294  * This can only error out if the free list has no more entries.
295  */
296  return fr_bio_queue_list_push(my, packet_ctx, buffer, size, 0);
297 }
298 
299 /** Read one packet from next bio.
300  *
301  * This function does NOT respect packet boundaries. The caller should use other APIs to determine how big
302  * the "next" packet is.
303  *
304  * The caller may buffer the output data itself, or it may use other APIs to do checking.
305  *
306  * The main
307  */
308 static ssize_t fr_bio_queue_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
309 {
310  int rcode;
311  fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
312  fr_bio_t *next;
313 
314  next = fr_bio_next(&my->bio);
315  fr_assert(next != NULL);
316 
317  rcode = next->read(next, packet_ctx, buffer, size);
318  if (rcode >= 0) return rcode;
319 
320  /*
321  * We didn't read anything, return that.
322  */
323  if (rcode == fr_bio_error(IO_WOULD_BLOCK)) return rcode;
324 
325  /*
326  * Error reading, which means that we can't write to it, either. We don't care if the error is
327  * EOF or anything else. We just cancel the outstanding packets, and shut ourselves down.
328  */
330  return rcode;
331 }
332 
333 /** Shutdown
334  *
335  * Cancel / close has to be called before re-init.
336  */
338 {
339  fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
340 
342 }
343 
344 
345 /** Allocate a packet-based bio.
346  *
347  * This bio assumes that each call to fr_bio_write() is for one packet, and only one packet. If the next bio
348  * returns a partial write, or WOULD BLOCK, then information about the packet is cached. Subsequent writes
349  * will write the partial data first, and then continue with subsequent writes.
350  *
351  * The caller is responsible for not freeing the packet ctx or the packet buffer until either the write has
352  * been performed, or the write has been cancelled.
353  *
354  * The read() API makes no provisions for reading complete packets. It simply returns whatever the next bio
355  * allows. If instead there is a need to read only complete packets, then the next bio should be
356  * fr_bio_mem_alloc() with a fr_bio_mem_set_verify()
357  *
358  * The read() API may return 0. There may have been data read from an underlying FD, but that data did not
359  * make it through the filters of the "next" bios. e.g. Any underlying FD should be put into a "wait for
360  * readable" state.
361  *
362  * The write() API will return a full write, even if the next layer is blocked. Any underlying FD
363  * should be put into a "wait for writeable" state. The packet which was supposed to be written has been
364  * cached, and cannot be cancelled as it is partially written. The caller should likely start using another
365  * bio for writes. If the caller continues to use the bio, then any subsequent writes will *always* cache
366  * the packets. @todo - we need to mark up the bio as "blocked", and then have a write_blocked() API? ugh.
367  * or just add `bool blocked` and `bool eof` to both read/write bios
368  *
369  * Once the underlying FD has become writeable, the caller should call fr_bio_write(bio, NULL, NULL, SIZE_MAX);
370  * That will cause the pending packets to be flushed.
371  *
372  * The write() API may return that it's written a full packet, in which case it's either completely written to
373  * the next bio, or to the pending queue.
374  *
375  * The read / write APIs can return WOULD_BLOCK, in which case nothing was done. Any underlying FD should be
376  * put into a "wait for writeable" state. Other errors from bios "further down" the chain can also be
377  * returned.
378  *
379  * @param ctx the talloc ctx
380  * @param max_saved Maximum number of packets to cache. Must be 1..1^17
381  * @param saved callback to run when a packet is saved in the pending queue
382  * @param sent callback to run when a packet is sent.
383  * @param cancel callback to run when a packet is cancelled.
384  * @param next the next bio which will perform the underlying reads and writes.
385  * - NULL on error, memory allocation failed
386  * - !NULL the bio
387  */
388 fr_bio_t *fr_bio_queue_alloc(TALLOC_CTX *ctx, size_t max_saved,
392  fr_bio_t *next)
393 {
394  size_t i;
396 
397  if (!max_saved) max_saved = 1;
398  if (max_saved > (1 << 17)) max_saved = 1 << 17;
399 
400  my = (fr_bio_queue_t *) talloc_zero_array(ctx, uint8_t, sizeof(fr_bio_queue_t) +
401  sizeof(fr_bio_queue_entry_t) * max_saved);
402  if (!my) return NULL;
403 
404  talloc_set_type(my, fr_bio_queue_t);
405 
406  my->max_saved = max_saved;
407 
408  fr_bio_queue_list_init(&my->pending);
409  fr_bio_queue_list_init(&my->free);
410 
411  my->saved = saved;
412  my->sent = sent;
413  my->cancel = cancel;
414 
415  for (i = 0; i < max_saved; i++) {
416  my->array[i].my = my;
417  my->array[i].cancelled = true;
418  fr_bio_queue_list_insert_tail(&my->free, &my->array[i]);
419  }
420 
421  my->bio.read = fr_bio_queue_read;
422  my->bio.write = fr_bio_queue_write_next;
423  my->cb.shutdown = fr_bio_queue_shutdown;
424 
425  fr_bio_chain(&my->bio, next);
426 
427  if (my->cancel) {
428  talloc_set_destructor(my, fr_bio_queue_destructor);
429  } else {
430  talloc_set_destructor((fr_bio_t *) my, fr_bio_destructor);
431  }
432 
433  return (fr_bio_t *) my;
434 }
435 
436 /** Cancel the write for a packet.
437  *
438  * Cancel one a saved packets, and call the cancel() routine if it exists.
439  *
440  * There is no way to cancel all packets. The caller must find the lowest bio in the chain, and shutdown it.
441  * e.g. by closing the socket via fr_bio_fd_close(). That function will take care of walking back up the
442  * chain, and shutting down each bio.
443  *
444  * @param bio the #fr_bio_queue_t
445  * @param item The context returned from #fr_bio_queue_saved_t
446  * @return
447  * - <0 no such packet was found in the list of saved packets, OR the packet cannot be cancelled.
448  * - 0 the packet was cancelled.
449  */
451 {
452  fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
453 
454  if (!(item >= &my->array[0]) && (item < &my->array[my->max_saved])) {
455  return -1;
456  }
457 
458  /*
459  * Already cancelled, that's a NOOP.
460  */
461  if (item->cancelled) return 0;
462 
463  /*
464  * If the item has been partially written, AND we have a working write function, see if we can
465  * cancel it.
466  */
467  if (item->already_written && (my->bio.write != fr_bio_null_write)) {
468  ssize_t rcode;
469  fr_bio_t *next;
470 
471  next = fr_bio_next(bio);
472  fr_assert(next != NULL);
473 
474  /*
475  * If the write fails or returns nothing, the item can't be cancelled.
476  */
477  rcode = next->write(next, item->packet_ctx, ((uint8_t const *) item->buffer) + item->already_written, item->size - item->already_written);
478  if (rcode <= 0) return -1;
479 
480  /*
481  * If we haven't written the full item, it can't be cancelled.
482  */
483  item->already_written += rcode;
484  if (item->already_written < item->size) return -1;
485 
486  /*
487  * Else the item has been fully written, it can be safely cancelled.
488  */
489  }
490 
491  /*
492  * Remove it from the saved list, and run the cancellation callback.
493  */
494  (void) fr_bio_queue_list_remove(&my->pending, item);
495  fr_bio_queue_list_insert_head(&my->free, item);
496 
497  if (my->cancel) my->cancel(bio, item->packet_ctx, item->buffer, item->size);
498 
499  return 0;
500 }
static int const char char buffer[256]
Definition: acutest.h:574
fr_bio_write_t _CONST write
write to the underlying bio
Definition: base.h:116
static fr_bio_t * fr_bio_next(fr_bio_t *bio)
Definition: base.h:130
#define fr_bio_error(_x)
Definition: base.h:192
Definition: base.h:112
fr_bio_queue_callback_t cancel
Definition: queue.c:64
static void fr_bio_queue_shutdown(fr_bio_t *bio)
Shutdown.
Definition: queue.c:337
static int fr_bio_queue_destructor(fr_bio_queue_t *my)
Definition: queue.c:101
static ssize_t fr_bio_queue_list_push(fr_bio_queue_t *my, void *packet_ctx, const void *buffer, size_t size, size_t already_written)
Push a packet onto a list.
Definition: queue.c:113
int fr_bio_queue_cancel(fr_bio_t *bio, fr_bio_queue_entry_t *item)
Cancel the write for a packet.
Definition: queue.c:450
void const * buffer
Definition: queue.c:45
fr_bio_queue_callback_t sent
Definition: queue.c:63
static ssize_t fr_bio_queue_write_flush(fr_bio_queue_t *my, size_t size)
Flush the packet list.
Definition: queue.c:200
struct fr_bio_queue_list_s fr_bio_queue_list_t
Definition: queue.c:30
size_t max_saved
Definition: queue.c:60
static ssize_t fr_bio_queue_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
Read one packet from next bio.
Definition: queue.c:308
fr_bio_queue_t * my
Definition: queue.c:50
void * packet_ctx
Definition: queue.c:44
fr_bio_t * fr_bio_queue_alloc(TALLOC_CTX *ctx, size_t max_saved, fr_bio_queue_saved_t saved, fr_bio_queue_callback_t sent, fr_bio_queue_callback_t cancel, fr_bio_t *next)
Allocate a packet-based bio.
Definition: queue.c:388
struct fr_bio_queue_s fr_bio_queue_t
Definition: queue.c:31
fr_bio_queue_saved_t saved
Definition: queue.c:62
size_t already_written
Definition: queue.c:47
static ssize_t fr_bio_queue_write_next(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write one packet to the next bio.
Definition: queue.c:145
static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write to the packet list buffer.
Definition: queue.c:287
static void fr_bio_queue_list_cancel(fr_bio_queue_t *my)
Forcibly cancel all outstanding packets.
Definition: queue.c:80
Definition: queue.c:43
void(* fr_bio_queue_callback_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
Definition: queue.h:32
struct fr_bio_queue_entry_s fr_bio_queue_entry_t
Definition: queue.h:30
void(* fr_bio_queue_saved_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size, fr_bio_queue_entry_t *queue_ctx)
Definition: queue.h:33
static void fr_bio_chain(fr_bio_t *first, fr_bio_t *second)
Chain one bio after another.
Definition: bio_priv.h:69
next
Definition: dcursor.h:178
return item
Definition: dcursor.h:553
#define FR_DLIST_TYPES(_name)
Define type specific wrapper structs for dlists.
Definition: dlist.h:1129
#define FR_DLIST_ENTRY(_name)
Expands to the type name used for the entry wrapper structure.
Definition: dlist.h:1115
#define FR_DLIST_FUNCS(_name, _element_type, _element_entry)
Define type specific wrapper functions for dlists.
Definition: dlist.h:1152
fr_bio_shutdown & my
Definition: fd_errno.h:59
free(array)
int fr_bio_destructor(fr_bio_t *bio)
Free this bio.
Definition: base.c:34
typedef FR_DLIST_HEAD(map_list) map_list_t
Given these are used in so many places, it's more friendly to have a proper type.
long int ssize_t
Definition: merged_model.c:24
unsigned char uint8_t
Definition: merged_model.c:30
static size_t array[MY_ARRAY_SIZE]
ssize_t fr_bio_null_write(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
Always return 0 on write.
Definition: null.c:39
ssize_t fr_bio_fail_read(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void *buffer, UNUSED size_t size)
Always return error on read.
Definition: null.c:47
ssize_t fr_bio_fail_write(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
Always return 0 on write.
Definition: null.c:56
fr_assert(0)