The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
queue.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: b10df2426ee30bc074fae4abe9197e3e477b9c76 $
19 * @file lib/bio/queue.c
20 * @brief Binary IO abstractions for queues of raw packets
21 *
22 * @copyright 2024 Network RADIUS SAS (legal@networkradius.com)
23 */
24
25#include <freeradius-devel/bio/bio_priv.h>
26#include <freeradius-devel/bio/queue.h>
27#include <freeradius-devel/bio/null.h>
28
29typedef struct fr_bio_queue_list_s fr_bio_queue_list_t;
31
32/*
33 * Define type-safe wrappers for head and entry definitions.
34 */
35FR_DLIST_TYPES(fr_bio_queue_list)
36
37/*
38 * For delayed writes.
39 *
40 * @todo - we can remove the "cancelled" field by setting packet_ctx == my?
41 */
44 void const *buffer;
45 size_t size;
48
50
51 FR_DLIST_ENTRY(fr_bio_queue_list) entry; //!< List entry.
52};
53
54FR_DLIST_FUNCS(fr_bio_queue_list, fr_bio_queue_entry_t, entry)
55
70
71static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
72
73/** Forcibly cancel all outstanding packets.
74 *
75 * Even partially written ones. This function is called from
76 * shutdown(), when the destructor is called, or on fatal read / write
77 * errors.
78 */
80{
82
83 my->bio.read = fr_bio_fail_read;
84 my->bio.write = fr_bio_fail_write;
85
86 if (!my->cancel) return;
87
88 if (fr_bio_queue_list_num_elements(&my->pending) == 0) return;
89
90 /*
91 * Cancel any remaining saved items.
92 */
93 while ((item = fr_bio_queue_list_pop_head(&my->pending)) != NULL) {
94 my->cancel(&my->bio, item->packet_ctx, item->buffer, item->size);
95 item->cancelled = true;
96 fr_bio_queue_list_insert_head(&my->free, item);
97 }
98}
99
101{
103
104 fr_assert(my->cancel); /* otherwise it would be fr_bio_destructor */
105
107
108 return 0;
109}
110
111/** Push a packet onto a list.
112 *
113 */
114static ssize_t fr_bio_queue_list_push(fr_bio_queue_t *my, void *packet_ctx, const void *buffer, size_t size, size_t already_written)
115{
117
118 item = fr_bio_queue_list_pop_head(&my->free);
119 if (!item) return fr_bio_error(IO_WOULD_BLOCK);
120
121 /*
122 * If we're the first entry in the saved list, we can have a partially written packet.
123 *
124 * Otherwise, we're a subsequent entry, and we cannot have any data which is partially written.
125 */
126 fr_assert((fr_bio_queue_list_num_elements(&my->pending) == 0) ||
127 (already_written == 0));
128
129 item->packet_ctx = packet_ctx;
130 item->buffer = buffer;
131 item->size = size;
132 item->already_written = already_written;
133 item->cancelled = false;
134
135 fr_bio_queue_list_insert_tail(&my->pending, item);
136
137 if (my->saved) my->saved(&my->bio, packet_ctx, buffer, size, item);
138
139 return size;
140}
141
142/** Write one packet to the next bio.
143 *
144 * If it blocks, save the packet and return OK to the caller.
145 */
146static ssize_t fr_bio_queue_write_next(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
147{
148 ssize_t rcode;
149 fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
150 fr_bio_t *next;
151
152 /*
153 * We can't call the next bio if there's still cached data to flush.
154 */
155 fr_assert(fr_bio_queue_list_num_elements(&my->pending) == 0);
156
157 next = fr_bio_next(&my->bio);
158 fr_assert(next != NULL);
159
160 /*
161 * Write the data out. If we write all of it, we're done.
162 */
163 rcode = next->write(next, packet_ctx, buffer, size);
164 if ((size_t) rcode == size) return rcode;
165
166 if (rcode < 0) {
167 /*
168 * IO would block, return it back up the chain.
169 */
170 if (rcode == fr_bio_error(IO_WOULD_BLOCK)) return rcode;
171
172 /*
173 * All other errors are fatal.
174 */
176 return rcode;
177 }
178
179 /*
180 * We were flushing the next buffer, return any data which was written.
181 */
182 if (!buffer) return rcode;
183
184 /*
185 * The next bio wrote a partial packet. Save the entire packet, and swap the write function to
186 * save all future packets in the saved list.
187 */
189
190 fr_assert(fr_bio_queue_list_num_elements(&my->free) > 0);
191
192 /*
193 * This can only error out if the free list has no more entries.
194 */
195 return fr_bio_queue_list_push(my, packet_ctx, buffer, size, (size_t) rcode);
196}
197
198/** Flush the packet list.
199 *
200 */
202{
203 size_t written;
204 fr_bio_t *next;
205
206 if (fr_bio_queue_list_num_elements(&my->pending) == 0) {
208 return 0;
209 }
210
211 next = fr_bio_next(&my->bio);
212 fr_assert(next != NULL);
213
214 /*
215 * Loop over the saved packets, flushing them to the next bio.
216 */
217 written = 0;
218 while (written < size) {
219 ssize_t rcode;
221
222 /*
223 * No more saved packets to write: stop.
224 */
225 item = fr_bio_queue_list_head(&my->pending);
226 if (!item) break;
227
228 /*
229 * A cancelled item must be partially written. A cancelled item which has zero bytes
230 * written should not be in the saved list.
231 */
232 fr_assert(!item->cancelled || (item->already_written > 0));
233
234 /*
235 * Push out however much data we can to the next bio.
236 */
237 rcode = next->write(next, item->packet_ctx, ((uint8_t const *) item->buffer) + item->already_written, item->size - item->already_written);
238 if (rcode == 0) break;
239
240 if (rcode < 0) {
241 if (rcode == fr_bio_error(IO_WOULD_BLOCK)) break;
242
243 return rcode;
244 }
245
246 /*
247 * Update the written count.
248 */
249 written += rcode;
250 item->already_written += rcode;
251
252 if (item->already_written < item->size) break;
253
254 /*
255 * We don't run "sent" callbacks for cancelled items.
256 */
257 if (item->cancelled) {
258 if (my->cancel) my->cancel(&my->bio, item->packet_ctx, item->buffer, item->size);
259 } else {
260 if (my->sent) my->sent(&my->bio, item->packet_ctx, item->buffer, item->size);
261 }
262
263 (void) fr_bio_queue_list_pop_head(&my->pending);
264#ifndef NDEBUG
265 item->buffer = NULL;
266 item->packet_ctx = NULL;
267 item->size = 0;
268 item->already_written = 0;
269#endif
270 item->cancelled = true;
271
272 fr_bio_queue_list_insert_head(&my->free, item);
273 }
274
275 /*
276 * If we've written all of the saved packets, go back to writing to the "next" bio.
277 */
278 if (!fr_bio_queue_list_head(&my->pending)) my->bio.write = fr_bio_queue_write_next;
279
280 return written;
281}
282
283/** Write to the packet list buffer.
284 *
285 * The special buffer pointer of NULL means flush(). On flush, we call next->read(), and if that succeeds,
286 * go back to "pass through" mode for the buffers.
287 */
288static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
289{
290 fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
291
292 if (!buffer) return fr_bio_queue_write_flush(my, size);
293
294 /*
295 * This can only error out if the free list has no more entries.
296 */
297 return fr_bio_queue_list_push(my, packet_ctx, buffer, size, 0);
298}
299
300/** Read one packet from next bio.
301 *
302 * This function does NOT respect packet boundaries. The caller should use other APIs to determine how big
303 * the "next" packet is.
304 *
305 * The caller may buffer the output data itself, or it may use other APIs to do checking.
306 *
307 * The main
308 */
309static ssize_t fr_bio_queue_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
310{
311 ssize_t rcode;
312 fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
313 fr_bio_t *next;
314
315 next = fr_bio_next(&my->bio);
316 fr_assert(next != NULL);
317
318 rcode = next->read(next, packet_ctx, buffer, size);
319 if (rcode >= 0) return rcode;
320
321 /*
322 * We didn't read anything, return that.
323 */
324 if (rcode == fr_bio_error(IO_WOULD_BLOCK)) return rcode;
325
326 /*
327 * Error reading, which means that we can't write to it, either. We don't care if the error is
328 * EOF or anything else. We just cancel the outstanding packets, and shut ourselves down.
329 */
331 return rcode;
332}
333
334/** Shutdown
335 *
336 * Cancel / close has to be called before re-init.
337 */
339{
340 fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
341
343
344 return 0;
345}
346
347
348/** Allocate a packet-based bio.
349 *
350 * This bio assumes that each call to fr_bio_write() is for one packet, and only one packet. If the next bio
351 * returns a partial write, or WOULD BLOCK, then information about the packet is cached. Subsequent writes
352 * will write the partial data first, and then continue with subsequent writes.
353 *
354 * The caller is responsible for not freeing the packet ctx or the packet buffer until either the write has
355 * been performed, or the write has been cancelled.
356 *
357 * The read() API makes no provisions for reading complete packets. It simply returns whatever the next bio
358 * allows. If instead there is a need to read only complete packets, then the next bio should be
359 * fr_bio_mem_alloc() with a fr_bio_mem_set_verify()
360 *
361 * The read() API may return 0. There may have been data read from an underlying FD, but that data did not
362 * make it through the filters of the "next" bios. e.g. Any underlying FD should be put into a "wait for
363 * readable" state.
364 *
365 * The write() API will return a full write, even if the next layer is blocked. Any underlying FD
366 * should be put into a "wait for writeable" state. The packet which was supposed to be written has been
367 * cached, and cannot be cancelled as it is partially written. The caller should likely start using another
368 * bio for writes. If the caller continues to use the bio, then any subsequent writes will *always* cache
369 * the packets. @todo - we need to mark up the bio as "blocked", and then have a write_blocked() API? ugh.
370 * or just add `bool blocked` and `bool eof` to both read/write bios
371 *
372 * Once the underlying FD has become writeable, the caller should call fr_bio_write(bio, NULL, NULL, SIZE_MAX);
373 * That will cause the pending packets to be flushed.
374 *
375 * The write() API may return that it's written a full packet, in which case it's either completely written to
376 * the next bio, or to the pending queue.
377 *
378 * The read / write APIs can return WOULD_BLOCK, in which case nothing was done. Any underlying FD should be
379 * put into a "wait for writeable" state. Other errors from bios "further down" the chain can also be
380 * returned.
381 *
382 * @param ctx the talloc ctx
383 * @param max_saved Maximum number of packets to cache. Must be 1..1^17
384 * @param saved callback to run when a packet is saved in the pending queue
385 * @param sent callback to run when a packet is sent.
386 * @param cancel callback to run when a packet is cancelled.
387 * @param next the next bio which will perform the underlying reads and writes.
388 * - NULL on error, memory allocation failed
389 * - !NULL the bio
390 */
391fr_bio_t *fr_bio_queue_alloc(TALLOC_CTX *ctx, size_t max_saved,
395 fr_bio_t *next)
396{
397 size_t i;
399
400 if (!max_saved) max_saved = 1;
401 if (max_saved > (1 << 17)) max_saved = 1 << 17;
402
403 my = (fr_bio_queue_t *) talloc_zero_array(ctx, uint8_t, sizeof(fr_bio_queue_t) +
405 if (!my) return NULL;
406
407 talloc_set_type(my, fr_bio_queue_t);
408
409 my->max_saved = max_saved;
410
411 fr_bio_queue_list_init(&my->pending);
412 fr_bio_queue_list_init(&my->free);
413
414 my->saved = saved;
415 my->sent = sent;
416 my->cancel = cancel;
417
418 for (i = 0; i < max_saved; i++) {
419 my->array[i].my = my;
420 my->array[i].cancelled = true;
421 fr_bio_queue_list_insert_tail(&my->free, &my->array[i]);
422 }
423
424 my->bio.read = fr_bio_queue_read;
425 my->bio.write = fr_bio_queue_write_next;
426 my->cb.shutdown = fr_bio_queue_shutdown;
427
428 fr_bio_chain(&my->bio, next);
429
430 if (my->cancel) {
431 talloc_set_destructor(my, fr_bio_queue_destructor);
432 } else {
433 talloc_set_destructor((fr_bio_t *) my, fr_bio_destructor);
434 }
435
436 return (fr_bio_t *) my;
437}
438
439/** Cancel the write for a packet.
440 *
441 * Cancel one a saved packets, and call the cancel() routine if it exists.
442 *
443 * There is no way to cancel all packets. The caller must find the lowest bio in the chain, and shutdown it.
444 * e.g. by closing the socket via fr_bio_fd_close(). That function will take care of walking back up the
445 * chain, and shutting down each bio.
446 *
447 * @param bio the #fr_bio_queue_t
448 * @param item The context returned from #fr_bio_queue_saved_t
449 * @return
450 * - <0 no such packet was found in the list of saved packets, OR the packet cannot be cancelled.
451 * - 0 the packet was cancelled.
452 */
454{
455 fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
456
457 if (!((item >= &my->array[0]) && (item < &my->array[my->max_saved]))) {
458 return -1;
459 }
460
461 /*
462 * Already cancelled, that's a NOOP.
463 */
464 if (item->cancelled) return 0;
465
466 /*
467 * If the item has been partially written, AND we have a working write function, see if we can
468 * cancel it.
469 */
470 if (item->already_written && (my->bio.write != fr_bio_null_write)) {
471 ssize_t rcode;
472 fr_bio_t *next;
473
474 next = fr_bio_next(bio);
475 fr_assert(next != NULL);
476
477 /*
478 * If the write fails or returns nothing, the item can't be cancelled.
479 */
480 rcode = next->write(next, item->packet_ctx, ((uint8_t const *) item->buffer) + item->already_written, item->size - item->already_written);
481 if (rcode <= 0) return -1;
482
483 /*
484 * If we haven't written the full item, it can't be cancelled.
485 */
486 item->already_written += rcode;
487 if (item->already_written < item->size) return -1;
488
489 /*
490 * Else the item has been fully written, it can be safely cancelled.
491 */
492 }
493
494 /*
495 * Remove it from the saved list, and run the cancellation callback.
496 */
497 (void) fr_bio_queue_list_remove(&my->pending, item);
498 fr_bio_queue_list_insert_head(&my->free, item);
499 item->cancelled = true;
500
501 if (my->cancel) my->cancel(bio, item->packet_ctx, item->buffer, item->size);
502
503 return 0;
504}
static int const char char buffer[256]
Definition acutest.h:576
fr_bio_write_t _CONST write
write to the underlying bio
Definition base.h:117
fr_bio_read_t _CONST read
read from the underlying bio
Definition base.h:116
static fr_bio_t * fr_bio_next(fr_bio_t *bio)
Definition base.h:131
#define fr_bio_error(_x)
Definition base.h:200
fr_bio_t * fr_bio_queue_alloc(TALLOC_CTX *ctx, size_t max_saved, fr_bio_queue_saved_t saved, fr_bio_queue_callback_t sent, fr_bio_queue_callback_t cancel, fr_bio_t *next)
Allocate a packet-based bio.
Definition queue.c:391
fr_bio_queue_callback_t cancel
Definition queue.c:63
static int fr_bio_queue_destructor(fr_bio_queue_t *my)
Definition queue.c:100
static ssize_t fr_bio_queue_list_push(fr_bio_queue_t *my, void *packet_ctx, const void *buffer, size_t size, size_t already_written)
Push a packet onto a list.
Definition queue.c:114
int fr_bio_queue_cancel(fr_bio_t *bio, fr_bio_queue_entry_t *item)
Cancel the write for a packet.
Definition queue.c:453
void const * buffer
Definition queue.c:44
fr_bio_queue_callback_t sent
Definition queue.c:62
static ssize_t fr_bio_queue_write_flush(fr_bio_queue_t *my, size_t size)
Flush the packet list.
Definition queue.c:201
struct fr_bio_queue_list_s fr_bio_queue_list_t
Definition queue.c:29
size_t max_saved
Definition queue.c:59
static ssize_t fr_bio_queue_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
Read one packet from next bio.
Definition queue.c:309
fr_bio_queue_t * my
Definition queue.c:49
void * packet_ctx
Definition queue.c:43
struct fr_bio_queue_s fr_bio_queue_t
Definition queue.c:30
fr_bio_queue_saved_t saved
Definition queue.c:61
size_t already_written
Definition queue.c:46
static int fr_bio_queue_shutdown(fr_bio_t *bio)
Shutdown.
Definition queue.c:338
static ssize_t fr_bio_queue_write_next(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write one packet to the next bio.
Definition queue.c:146
static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write to the packet list buffer.
Definition queue.c:288
static void fr_bio_queue_list_cancel(fr_bio_queue_t *my)
Forcibly cancel all outstanding packets.
Definition queue.c:79
Definition queue.c:42
void(* fr_bio_queue_callback_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
Definition queue.h:32
struct fr_bio_queue_entry_s fr_bio_queue_entry_t
Definition queue.h:30
void(* fr_bio_queue_saved_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size, fr_bio_queue_entry_t *queue_ctx)
Definition queue.h:33
static void fr_bio_chain(fr_bio_t *first, fr_bio_t *second)
Chain one bio after another.
Definition bio_priv.h:84
#define FR_BIO_DESTRUCTOR_COMMON
Define a common destructor pattern.
Definition bio_priv.h:64
#define FR_DLIST_TYPES(_name)
Define type specific wrapper structs for dlists.
Definition dlist.h:1111
#define FR_DLIST_ENTRY(_name)
Expands to the type name used for the entry wrapper structure.
Definition dlist.h:1097
#define FR_DLIST_FUNCS(_name, _element_type, _element_entry)
Define type specific wrapper functions for dlists.
Definition dlist.h:1134
#define FR_DLIST_HEAD(_name)
Expands to the type name used for the head wrapper structure.
Definition dlist.h:1104
void fr_bio_shutdown & my
Definition fd_errno.h:70
free(array)
int fr_bio_destructor(fr_bio_t *bio)
Free this bio.
Definition base.c:35
static void * item(fr_lst_t const *lst, fr_lst_index_t idx)
Definition lst.c:121
long int ssize_t
unsigned char uint8_t
ssize_t fr_bio_null_write(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
Always return 0 on write.
Definition null.c:39
ssize_t fr_bio_fail_read(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void *buffer, UNUSED size_t size)
Always return error on read.
Definition null.c:47
ssize_t fr_bio_fail_write(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
Always return an error on write.
Definition null.c:56
#define fr_assert(_expr)
Definition rad_assert.h:37