The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
queue.c
Go to the documentation of this file.
1 /*
2  * This program is is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or (at
5  * your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: 2a54fd676fb639ff0bad4de5e4f79d42ca17bba1 $
19  * @file lib/bio/queue.c
20  * @brief Binary IO abstractions for queues of raw packets
21  *
22  * @copyright 2024 Network RADIUS SAS (legal@networkradius.com)
23  */
24 
25 #include <freeradius-devel/bio/bio_priv.h>
26 #include <freeradius-devel/bio/queue.h>
27 #include <freeradius-devel/bio/null.h>
28 #include <freeradius-devel/util/dlist.h>
29 
30 typedef struct fr_bio_queue_list_s fr_bio_queue_list_t;
31 typedef struct fr_bio_queue_s fr_bio_queue_t;
32 
33 /*
34  * Define type-safe wrappers for head and entry definitions.
35  */
36 FR_DLIST_TYPES(fr_bio_queue_list)
37 
38 /*
39  * For delayed writes.
40  *
41  * @todo - we can remove the "cancelled" field by setting packet_ctx == my?
42  */
44  void *packet_ctx;
45  void const *buffer;
46  size_t size;
48  bool cancelled;
49 
51 
52  FR_DLIST_ENTRY(fr_bio_queue_list) entry; //!< List entry.
53 };
54 
55 FR_DLIST_FUNCS(fr_bio_queue_list, fr_bio_queue_entry_t, entry)
56 
57 typedef struct fr_bio_queue_s {
59 
60  size_t max_saved;
61 
65 
66  FR_DLIST_HEAD(fr_bio_queue_list) pending;
67  FR_DLIST_HEAD(fr_bio_queue_list) free;
68 
71 
72 static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
73 
74 /** Forcibly cancel all outstanding packets.
75  *
76  * Even partially written ones. This function is called from
77  * shutdown(), when the destructor is called, or on fatal read / write
78  * errors.
79  */
81 {
83 
84  if (!my->cancel) return;
85 
86  if (fr_bio_queue_list_num_elements(&my->pending) == 0) return;
87 
88  /*
89  * Cancel any remaining saved items.
90  */
91  while ((item = fr_bio_queue_list_pop_head(&my->pending)) != NULL) {
92  my->cancel(&my->bio, item->packet_ctx, item->buffer, item->size);
93  item->cancelled = true;
94  fr_bio_queue_list_insert_head(&my->free, item);
95  }
96 }
97 
99 {
100  fr_assert(my->cancel); /* otherwise it would be fr_bio_destructor */
101 
102  my->bio.write = fr_bio_null_write;
104 
105  return 0;
106 }
107 
108 /** Push a packet onto a list.
109  *
110  */
111 static ssize_t fr_bio_queue_list_push(fr_bio_queue_t *my, void *packet_ctx, const void *buffer, size_t size, size_t already_written)
112 {
114 
115  item = fr_bio_queue_list_pop_head(&my->free);
116  if (!item) return fr_bio_error(IO_WOULD_BLOCK);
117 
118  /*
119  * If we're the first entry in the saved list, we can have a partially written packet.
120  *
121  * Otherwise, we're a subsequent entry, and we cannot have any data which is partially written.
122  */
123  fr_assert((fr_bio_queue_list_num_elements(&my->pending) == 0) ||
124  (already_written == 0));
125 
126  item->packet_ctx = packet_ctx;
127  item->buffer = buffer;
128  item->size = size;
129  item->already_written = already_written;
130  item->cancelled = false;
131 
132  fr_bio_queue_list_insert_tail(&my->pending, item);
133 
134  if (my->saved) my->saved(&my->bio, packet_ctx, buffer, size, item);
135 
136  return size;
137 }
138 
139 /** Write one packet to the next bio.
140  *
141  * If it blocks, save the packet and return OK to the caller.
142  */
143 static ssize_t fr_bio_queue_write_next(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
144 {
145  ssize_t rcode;
146  fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
147  fr_bio_t *next;
148 
149  /*
150  * We can't call the next bio if there's still cached data to flush.
151  */
152  fr_assert(fr_bio_queue_list_num_elements(&my->pending) == 0);
153 
154  next = fr_bio_next(&my->bio);
155  fr_assert(next != NULL);
156 
157  /*
158  * Write the data out. If we write all of it, we're done.
159  */
160  rcode = next->write(next, packet_ctx, buffer, size);
161  if ((size_t) rcode == size) return rcode;
162 
163  if (rcode < 0) {
164  /*
165  * A non-blocking error: return it back up the chain.
166  */
167  if (rcode != fr_bio_error(IO_WOULD_BLOCK)) return rcode;
168 
169  /*
170  * All other errors are fatal.
171  */
172  my->bio.read = fr_bio_eof_read;
173  my->bio.write = fr_bio_null_write;
174 
176  return rcode;
177  }
178 
179  /*
180  * We were flushing the next buffer, return any data which was written.
181  */
182  if (!buffer) return rcode;
183 
184  /*
185  * The next bio wrote a partial packet. Save the entire packet, and swap the write function to
186  * save all future packets in the saved list.
187  */
189 
190  fr_assert(fr_bio_queue_list_num_elements(&my->free) > 0);
191 
192  /*
193  * This can only error out if the free list has no more entries.
194  */
195  return fr_bio_queue_list_push(my, packet_ctx, buffer, size, (size_t) rcode);
196 }
197 
198 /** Flush the packet list.
199  *
200  */
202 {
203  size_t written;
204  fr_bio_t *next;
205 
206  if (fr_bio_queue_list_num_elements(&my->pending) == 0) {
207  my->bio.write = fr_bio_queue_write_next;
208  return 0;
209  }
210 
211  next = fr_bio_next(&my->bio);
212  fr_assert(next != NULL);
213 
214  /*
215  * Loop over the saved packets, flushing them to the next bio.
216  */
217  written = 0;
218  while (written < size) {
219  ssize_t rcode;
221 
222  /*
223  * No more saved packets to write: stop.
224  */
225  item = fr_bio_queue_list_head(&my->pending);
226  if (!item) break;
227 
228  /*
229  * A cancelled item must be partially written. A cancelled item which has zero bytes
230  * written should not be in the saved list.
231  */
232  fr_assert(!item->cancelled || (item->already_written > 0));
233 
234  /*
235  * Push out however much data we can to the next bio.
236  */
237  rcode = next->write(next, item->packet_ctx, ((uint8_t const *) item->buffer) + item->already_written, item->size - item->already_written);
238  if (rcode == 0) break;
239 
240  if (rcode < 0) {
241  if (rcode == fr_bio_error(IO_WOULD_BLOCK)) break;
242 
243  return rcode;
244  }
245 
246  /*
247  * Update the written count.
248  */
249  written += rcode;
250  item->already_written += rcode;
251 
252  if (item->already_written < item->size) break;
253 
254  /*
255  * We don't run "sent" callbacks for cancelled items.
256  */
257  if (item->cancelled) {
258  if (my->cancel) my->cancel(&my->bio, item->packet_ctx, item->buffer, item->size);
259  } else {
260  if (my->sent) my->sent(&my->bio, item->packet_ctx, item->buffer, item->size);
261  }
262 
263  (void) fr_bio_queue_list_pop_head(&my->pending);
264 #ifndef NDEBUG
265  item->buffer = NULL;
266  item->packet_ctx = NULL;
267  item->size = 0;
268  item->already_written = 0;
269 #endif
270  item->cancelled = true;
271 
272  fr_bio_queue_list_insert_head(&my->free, item);
273  }
274 
275  /*
276  * If we've written all of the saved packets, go back to writing to the "next" bio.
277  */
278  if (fr_bio_queue_list_head(&my->pending)) my->bio.write = fr_bio_queue_write_next;
279 
280  return written;
281 }
282 
283 /** Write to the packet list buffer.
284  *
285  * The special buffer pointer of NULL means flush(). On flush, we call next->read(), and if that succeeds,
286  * go back to "pass through" mode for the buffers.
287  */
288 static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
289 {
290  fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
291 
292  if (!buffer) return fr_bio_queue_write_flush(my, size);
293 
294  /*
295  * This can only error out if the free list has no more entries.
296  */
297  return fr_bio_queue_list_push(my, packet_ctx, buffer, size, 0);
298 }
299 
300 /** Read one packet from next bio.
301  *
302  * This function does NOT respect packet boundaries. The caller should use other APIs to determine how big
303  * the "next" packet is.
304  *
305  * The caller may buffer the output data itself, or it may use other APIs to do checking.
306  *
307  * The main
308  */
309 static ssize_t fr_bio_queue_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
310 {
311  int rcode;
312  fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
313  fr_bio_t *next;
314 
315  next = fr_bio_next(&my->bio);
316  fr_assert(next != NULL);
317 
318  rcode = next->read(next, packet_ctx, buffer, size);
319  if (rcode >= 0) return rcode;
320 
321  /*
322  * We didn't read anything, return that.
323  */
324  if (rcode == fr_bio_error(IO_WOULD_BLOCK)) return rcode;
325 
326  /*
327  * Error reading, which means that we can't write to it, either. We don't care if the error is
328  * EOF or anything else. We just cancel the outstanding packets, and shut ourselves down.
329  */
330  my->bio.read = fr_bio_eof_read;
331  my->bio.write = fr_bio_null_write;
332 
334  return rcode;
335 }
336 
337 /** Shutdown
338  *
339  * Cancel / close has to be called before re-init.
340  */
342 {
343  fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
344 
346 
347  my->bio.read = fr_bio_queue_read;
348  my->bio.write = fr_bio_queue_write_next;
349 
350  return 0;
351 }
352 
353 
354 /** Allocate a packet-based bio.
355  *
356  * This bio assumes that each call to fr_bio_write() is for one packet, and only one packet. If the next bio
357  * returns a partial write, or WOULD BLOCK, then information about the packet is cached. Subsequent writes
358  * will write the partial data first, and then continue with subsequent writes.
359  *
360  * The caller is responsible for not freeing the packet ctx or the packet buffer until either the write has
361  * been performed, or the write has been cancelled.
362  *
363  * The read() API makes no provisions for reading complete packets. It simply returns whatever the next bio
364  * allows. If instead there is a need to read only complete packets, then the next bio should be
365  * fr_bio_mem_alloc() with a fr_bio_mem_set_verify()
366  *
367  * The read() API may return 0. There may have been data read from an underlying FD, but that data did not
368  * make it through the filters of the "next" bios. e.g. Any underlying FD should be put into a "wait for
369  * readable" state.
370  *
371  * The write() API will return a full write, even if the next layer is blocked. Any underlying FD
372  * should be put into a "wait for writeable" state. The packet which was supposed to be written has been
373  * cached, and cannot be cancelled as it is partially written. The caller should likely start using another
374  * bio for writes. If the caller continues to use the bio, then any subsequent writes will *always* cache
375  * the packets. @todo - we need to mark up the bio as "blocked", and then have a write_blocked() API? ugh.
376  * or just add `bool blocked` and `bool eof` to both read/write bios
377  *
378  * Once the underlying FD has become writeable, the caller should call fr_bio_write(bio, NULL, NULL, SIZE_MAX);
379  * That will cause the pending packets to be flushed.
380  *
381  * The write() API may return that it's written a full packet, in which case it's either completely written to
382  * the next bio, or to the pending queue.
383  *
384  * The read / write APIs can return WOULD_BLOCK, in which case nothing was done. Any underlying FD should be
385  * put into a "wait for writeable" state. Other errors from bios "further down" the chain can also be
386  * returned.
387  *
388  * @param ctx the talloc ctx
389  * @param max_saved Maximum number of packets to cache. Must be 1..1^17
390  * @param saved callback to run when a packet is saved in the pending queue
391  * @param sent callback to run when a packet is sent.
392  * @param cancel callback to run when a packet is cancelled.
393  * @param next the next bio which will perform the underlying reads and writes.
394  * - NULL on error, memory allocation failed
395  * - !NULL the bio
396  */
397 fr_bio_t *fr_bio_queue_alloc(TALLOC_CTX *ctx, size_t max_saved,
401  fr_bio_t *next)
402 {
403  size_t i;
404  fr_bio_queue_t *my;
405 
406  if (!max_saved) max_saved = 1;
407  if (max_saved > (1 << 17)) max_saved = 1 << 17;
408 
409  my = (fr_bio_queue_t *) talloc_zero_array(ctx, uint8_t, sizeof(fr_bio_queue_t) +
410  sizeof(fr_bio_queue_entry_t) * max_saved);
411  if (!my) return NULL;
412 
413  talloc_set_type(my, fr_bio_queue_t);
414 
415  my->max_saved = max_saved;
416 
417  fr_bio_queue_list_init(&my->pending);
418  fr_bio_queue_list_init(&my->free);
419 
420  my->saved = saved;
421  my->sent = sent;
422  my->cancel = cancel;
423 
424  for (i = 0; i < max_saved; i++) {
425  my->array[i].my = my;
426  my->array[i].cancelled = true;
427  fr_bio_queue_list_insert_tail(&my->free, &my->array[i]);
428  }
429 
430  my->bio.read = fr_bio_queue_read;
431  my->bio.write = fr_bio_queue_write_next;
432  my->cb.shutdown = fr_bio_queue_shutdown;
433 
434  fr_bio_chain(&my->bio, next);
435 
436  if (my->cancel) {
437  talloc_set_destructor(my, fr_bio_queue_destructor);
438  } else {
439  talloc_set_destructor((fr_bio_t *) my, fr_bio_destructor);
440  }
441 
442  return (fr_bio_t *) my;
443 }
444 
445 /** Cancel the write for a packet.
446  *
447  * Cancel one a saved packets, and call the cancel() routine if it exists.
448  *
449  * There is no way to cancel all packets. The caller must find the lowest bio in the chain, and shutdown it.
450  * e.g. by closing the socket via fr_bio_fd_close(). That function will take care of walking back up the
451  * chain, and shutting down each bio.
452  *
453  * @param bio the #fr_bio_queue_t
454  * @param item The context returned from #fr_bio_queue_saved_t
455  * @return
456  * - <0 no such packet was found in the list of saved packets, OR the packet cannot be cancelled.
457  * - 0 the packet was cancelled.
458  */
460 {
461  fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
462 
463  if (!(item >= &my->array[0]) && (item < &my->array[my->max_saved])) {
464  return -1;
465  }
466 
467  /*
468  * Already cancelled, that's a NOOP.
469  */
470  if (item->cancelled) return 0;
471 
472  /*
473  * If the item has been partially written, AND we have a working write function, see if we can
474  * cancel it.
475  */
476  if (item->already_written && (my->bio.write != fr_bio_null_write)) {
477  ssize_t rcode;
478  fr_bio_t *next;
479 
480  next = fr_bio_next(bio);
481  fr_assert(next != NULL);
482 
483  /*
484  * If the write fails or returns nothing, the item can't be cancelled.
485  */
486  rcode = next->write(next, item->packet_ctx, ((uint8_t const *) item->buffer) + item->already_written, item->size - item->already_written);
487  if (rcode <= 0) return -1;
488 
489  /*
490  * If we haven't written the full item, it can't be cancelled.
491  */
492  item->already_written += rcode;
493  if (item->already_written < item->size) return -1;
494 
495  /*
496  * Else the item has been fully written, it can be safely cancelled.
497  */
498  }
499 
500  /*
501  * Remove it from the saved list, and run the cancellation callback.
502  */
503  (void) fr_bio_queue_list_remove(&my->pending, item);
504  fr_bio_queue_list_insert_head(&my->free, item);
505 
506  if (my->cancel) my->cancel(bio, item->packet_ctx, item->buffer, item->size);
507 
508  return 0;
509 }
static int const char char buffer[256]
Definition: acutest.h:574
fr_bio_write_t _CONST write
write to the underlying bio
Definition: base.h:107
fr_bio_read_t _CONST read
read from the underlying bio
Definition: base.h:106
static fr_bio_t * fr_bio_next(fr_bio_t *bio)
Definition: base.h:121
#define fr_bio_error(_x)
Definition: base.h:184
Definition: base.h:103
fr_bio_queue_callback_t cancel
Definition: queue.c:64
static int fr_bio_queue_destructor(fr_bio_queue_t *my)
Definition: queue.c:98
static ssize_t fr_bio_queue_list_push(fr_bio_queue_t *my, void *packet_ctx, const void *buffer, size_t size, size_t already_written)
Push a packet onto a list.
Definition: queue.c:111
int fr_bio_queue_cancel(fr_bio_t *bio, fr_bio_queue_entry_t *item)
Cancel the write for a packet.
Definition: queue.c:459
void const * buffer
Definition: queue.c:45
fr_bio_queue_callback_t sent
Definition: queue.c:63
static ssize_t fr_bio_queue_write_flush(fr_bio_queue_t *my, size_t size)
Flush the packet list.
Definition: queue.c:201
struct fr_bio_queue_list_s fr_bio_queue_list_t
Definition: queue.c:30
size_t max_saved
Definition: queue.c:60
static ssize_t fr_bio_queue_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
Read one packet from next bio.
Definition: queue.c:309
fr_bio_queue_t * my
Definition: queue.c:50
void * packet_ctx
Definition: queue.c:44
fr_bio_t * fr_bio_queue_alloc(TALLOC_CTX *ctx, size_t max_saved, fr_bio_queue_saved_t saved, fr_bio_queue_callback_t sent, fr_bio_queue_callback_t cancel, fr_bio_t *next)
Allocate a packet-based bio.
Definition: queue.c:397
struct fr_bio_queue_s fr_bio_queue_t
Definition: queue.c:31
fr_bio_queue_saved_t saved
Definition: queue.c:62
size_t already_written
Definition: queue.c:47
static int fr_bio_queue_shutdown(fr_bio_t *bio)
Shutdown.
Definition: queue.c:341
static ssize_t fr_bio_queue_write_next(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write one packet to the next bio.
Definition: queue.c:143
static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write to the packet list buffer.
Definition: queue.c:288
static void fr_bio_queue_list_cancel(fr_bio_queue_t *my)
Forcibly cancel all outstanding packets.
Definition: queue.c:80
Definition: queue.c:43
void(* fr_bio_queue_callback_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
Definition: queue.h:32
struct fr_bio_queue_entry_s fr_bio_queue_entry_t
Definition: queue.h:30
void(* fr_bio_queue_saved_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size, fr_bio_queue_entry_t *queue_ctx)
Definition: queue.h:33
static void fr_bio_chain(fr_bio_t *first, fr_bio_t *second)
Chain one bio after another.
Definition: bio_priv.h:57
#define FR_DLIST_TYPES(_name)
Define type specific wrapper structs for dlists.
Definition: dlist.h:1129
#define FR_DLIST_ENTRY(_name)
Expands to the type name used for the entry wrapper structure.
Definition: dlist.h:1115
#define FR_DLIST_FUNCS(_name, _element_type, _element_entry)
Define type specific wrapper functions for dlists.
Definition: dlist.h:1152
free(array)
ssize_t fr_bio_eof_read(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void *buffer, UNUSED size_t size)
Always returns EOF on fr_bio_read()
Definition: base.c:49
int fr_bio_destructor(fr_bio_t *bio)
Free this bio.
Definition: base.c:34
static void * item(fr_lst_t const *lst, fr_lst_index_t idx)
Definition: lst.c:122
typedef FR_DLIST_HEAD(map_list) map_list_t
Given these are used in so many places, it's more friendly to have a proper type.
long int ssize_t
Definition: merged_model.c:24
unsigned char uint8_t
Definition: merged_model.c:30
static size_t array[MY_ARRAY_SIZE]
ssize_t fr_bio_null_write(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
Always return 0 on write.
Definition: null.c:39
static fr_bio_t * bio
Definition: radclient-ng.c:86
fr_assert(0)