The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
queue.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 6c3f2996bb9624e9043d8e877db66e8179b53787 $
19 * @file lib/bio/queue.c
20 * @brief Binary IO abstractions for queues of raw packets
21 *
22 * @copyright 2024 Network RADIUS SAS (legal@networkradius.com)
23 */
24
25#include <freeradius-devel/bio/bio_priv.h>
26#include <freeradius-devel/bio/queue.h>
27#include <freeradius-devel/bio/null.h>
28#include <freeradius-devel/util/dlist.h>
29
30typedef struct fr_bio_queue_list_s fr_bio_queue_list_t;
32
33/*
34 * Define type-safe wrappers for head and entry definitions.
35 */
36FR_DLIST_TYPES(fr_bio_queue_list)
37
38/*
39 * For delayed writes.
40 *
41 * @todo - we can remove the "cancelled" field by setting packet_ctx == my?
42 */
45 void const *buffer;
46 size_t size;
49
51
52 FR_DLIST_ENTRY(fr_bio_queue_list) entry; //!< List entry.
53};
54
55FR_DLIST_FUNCS(fr_bio_queue_list, fr_bio_queue_entry_t, entry)
56
71
72static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
73
74/** Forcibly cancel all outstanding packets.
75 *
76 * Even partially written ones. This function is called from
77 * shutdown(), when the destructor is called, or on fatal read / write
78 * errors.
79 */
81{
83
84 my->bio.read = fr_bio_fail_read;
85 my->bio.write = fr_bio_fail_write;
86
87 if (!my->cancel) return;
88
89 if (fr_bio_queue_list_num_elements(&my->pending) == 0) return;
90
91 /*
92 * Cancel any remaining saved items.
93 */
94 while ((item = fr_bio_queue_list_pop_head(&my->pending)) != NULL) {
95 my->cancel(&my->bio, item->packet_ctx, item->buffer, item->size);
96 item->cancelled = true;
97 fr_bio_queue_list_insert_head(&my->free, item);
98 }
99}
100
102{
103 fr_assert(my->cancel); /* otherwise it would be fr_bio_destructor */
104
106
107 return 0;
108}
109
110/** Push a packet onto a list.
111 *
112 */
113static ssize_t fr_bio_queue_list_push(fr_bio_queue_t *my, void *packet_ctx, const void *buffer, size_t size, size_t already_written)
114{
116
117 item = fr_bio_queue_list_pop_head(&my->free);
118 if (!item) return fr_bio_error(IO_WOULD_BLOCK);
119
120 /*
121 * If we're the first entry in the saved list, we can have a partially written packet.
122 *
123 * Otherwise, we're a subsequent entry, and we cannot have any data which is partially written.
124 */
125 fr_assert((fr_bio_queue_list_num_elements(&my->pending) == 0) ||
126 (already_written == 0));
127
128 item->packet_ctx = packet_ctx;
129 item->buffer = buffer;
130 item->size = size;
131 item->already_written = already_written;
132 item->cancelled = false;
133
134 fr_bio_queue_list_insert_tail(&my->pending, item);
135
136 if (my->saved) my->saved(&my->bio, packet_ctx, buffer, size, item);
137
138 return size;
139}
140
141/** Write one packet to the next bio.
142 *
143 * If it blocks, save the packet and return OK to the caller.
144 */
145static ssize_t fr_bio_queue_write_next(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
146{
147 ssize_t rcode;
148 fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
149 fr_bio_t *next;
150
151 /*
152 * We can't call the next bio if there's still cached data to flush.
153 */
154 fr_assert(fr_bio_queue_list_num_elements(&my->pending) == 0);
155
156 next = fr_bio_next(&my->bio);
157 fr_assert(next != NULL);
158
159 /*
160 * Write the data out. If we write all of it, we're done.
161 */
162 rcode = next->write(next, packet_ctx, buffer, size);
163 if ((size_t) rcode == size) return rcode;
164
165 if (rcode < 0) {
166 /*
167 * A non-blocking error: return it back up the chain.
168 */
169 if (rcode != fr_bio_error(IO_WOULD_BLOCK)) return rcode;
170
171 /*
172 * All other errors are fatal.
173 */
175 return rcode;
176 }
177
178 /*
179 * We were flushing the next buffer, return any data which was written.
180 */
181 if (!buffer) return rcode;
182
183 /*
184 * The next bio wrote a partial packet. Save the entire packet, and swap the write function to
185 * save all future packets in the saved list.
186 */
188
189 fr_assert(fr_bio_queue_list_num_elements(&my->free) > 0);
190
191 /*
192 * This can only error out if the free list has no more entries.
193 */
194 return fr_bio_queue_list_push(my, packet_ctx, buffer, size, (size_t) rcode);
195}
196
197/** Flush the packet list.
198 *
199 */
201{
202 size_t written;
203 fr_bio_t *next;
204
205 if (fr_bio_queue_list_num_elements(&my->pending) == 0) {
207 return 0;
208 }
209
210 next = fr_bio_next(&my->bio);
211 fr_assert(next != NULL);
212
213 /*
214 * Loop over the saved packets, flushing them to the next bio.
215 */
216 written = 0;
217 while (written < size) {
218 ssize_t rcode;
220
221 /*
222 * No more saved packets to write: stop.
223 */
224 item = fr_bio_queue_list_head(&my->pending);
225 if (!item) break;
226
227 /*
228 * A cancelled item must be partially written. A cancelled item which has zero bytes
229 * written should not be in the saved list.
230 */
231 fr_assert(!item->cancelled || (item->already_written > 0));
232
233 /*
234 * Push out however much data we can to the next bio.
235 */
236 rcode = next->write(next, item->packet_ctx, ((uint8_t const *) item->buffer) + item->already_written, item->size - item->already_written);
237 if (rcode == 0) break;
238
239 if (rcode < 0) {
240 if (rcode == fr_bio_error(IO_WOULD_BLOCK)) break;
241
242 return rcode;
243 }
244
245 /*
246 * Update the written count.
247 */
248 written += rcode;
249 item->already_written += rcode;
250
251 if (item->already_written < item->size) break;
252
253 /*
254 * We don't run "sent" callbacks for cancelled items.
255 */
256 if (item->cancelled) {
257 if (my->cancel) my->cancel(&my->bio, item->packet_ctx, item->buffer, item->size);
258 } else {
259 if (my->sent) my->sent(&my->bio, item->packet_ctx, item->buffer, item->size);
260 }
261
262 (void) fr_bio_queue_list_pop_head(&my->pending);
263#ifndef NDEBUG
264 item->buffer = NULL;
265 item->packet_ctx = NULL;
266 item->size = 0;
267 item->already_written = 0;
268#endif
269 item->cancelled = true;
270
271 fr_bio_queue_list_insert_head(&my->free, item);
272 }
273
274 /*
275 * If we've written all of the saved packets, go back to writing to the "next" bio.
276 */
277 if (fr_bio_queue_list_head(&my->pending)) my->bio.write = fr_bio_queue_write_next;
278
279 return written;
280}
281
282/** Write to the packet list buffer.
283 *
284 * The special buffer pointer of NULL means flush(). On flush, we call next->read(), and if that succeeds,
285 * go back to "pass through" mode for the buffers.
286 */
287static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
288{
289 fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
290
291 if (!buffer) return fr_bio_queue_write_flush(my, size);
292
293 /*
294 * This can only error out if the free list has no more entries.
295 */
296 return fr_bio_queue_list_push(my, packet_ctx, buffer, size, 0);
297}
298
299/** Read one packet from next bio.
300 *
301 * This function does NOT respect packet boundaries. The caller should use other APIs to determine how big
302 * the "next" packet is.
303 *
304 * The caller may buffer the output data itself, or it may use other APIs to do checking.
305 *
306 * The main
307 */
308static ssize_t fr_bio_queue_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
309{
310 int rcode;
311 fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
312 fr_bio_t *next;
313
314 next = fr_bio_next(&my->bio);
315 fr_assert(next != NULL);
316
317 rcode = next->read(next, packet_ctx, buffer, size);
318 if (rcode >= 0) return rcode;
319
320 /*
321 * We didn't read anything, return that.
322 */
323 if (rcode == fr_bio_error(IO_WOULD_BLOCK)) return rcode;
324
325 /*
326 * Error reading, which means that we can't write to it, either. We don't care if the error is
327 * EOF or anything else. We just cancel the outstanding packets, and shut ourselves down.
328 */
330 return rcode;
331}
332
333/** Shutdown
334 *
335 * Cancel / close has to be called before re-init.
336 */
338{
339 fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
340
342}
343
344
345/** Allocate a packet-based bio.
346 *
347 * This bio assumes that each call to fr_bio_write() is for one packet, and only one packet. If the next bio
348 * returns a partial write, or WOULD BLOCK, then information about the packet is cached. Subsequent writes
349 * will write the partial data first, and then continue with subsequent writes.
350 *
351 * The caller is responsible for not freeing the packet ctx or the packet buffer until either the write has
352 * been performed, or the write has been cancelled.
353 *
354 * The read() API makes no provisions for reading complete packets. It simply returns whatever the next bio
355 * allows. If instead there is a need to read only complete packets, then the next bio should be
356 * fr_bio_mem_alloc() with a fr_bio_mem_set_verify()
357 *
358 * The read() API may return 0. There may have been data read from an underlying FD, but that data did not
359 * make it through the filters of the "next" bios. e.g. Any underlying FD should be put into a "wait for
360 * readable" state.
361 *
362 * The write() API will return a full write, even if the next layer is blocked. Any underlying FD
363 * should be put into a "wait for writeable" state. The packet which was supposed to be written has been
364 * cached, and cannot be cancelled as it is partially written. The caller should likely start using another
365 * bio for writes. If the caller continues to use the bio, then any subsequent writes will *always* cache
366 * the packets. @todo - we need to mark up the bio as "blocked", and then have a write_blocked() API? ugh.
367 * or just add `bool blocked` and `bool eof` to both read/write bios
368 *
369 * Once the underlying FD has become writeable, the caller should call fr_bio_write(bio, NULL, NULL, SIZE_MAX);
370 * That will cause the pending packets to be flushed.
371 *
372 * The write() API may return that it's written a full packet, in which case it's either completely written to
373 * the next bio, or to the pending queue.
374 *
375 * The read / write APIs can return WOULD_BLOCK, in which case nothing was done. Any underlying FD should be
376 * put into a "wait for writeable" state. Other errors from bios "further down" the chain can also be
377 * returned.
378 *
379 * @param ctx the talloc ctx
380 * @param max_saved Maximum number of packets to cache. Must be 1..1^17
381 * @param saved callback to run when a packet is saved in the pending queue
382 * @param sent callback to run when a packet is sent.
383 * @param cancel callback to run when a packet is cancelled.
384 * @param next the next bio which will perform the underlying reads and writes.
385 * - NULL on error, memory allocation failed
386 * - !NULL the bio
387 */
388fr_bio_t *fr_bio_queue_alloc(TALLOC_CTX *ctx, size_t max_saved,
392 fr_bio_t *next)
393{
394 size_t i;
396
397 if (!max_saved) max_saved = 1;
398 if (max_saved > (1 << 17)) max_saved = 1 << 17;
399
400 my = (fr_bio_queue_t *) talloc_zero_array(ctx, uint8_t, sizeof(fr_bio_queue_t) +
402 if (!my) return NULL;
403
404 talloc_set_type(my, fr_bio_queue_t);
405
406 my->max_saved = max_saved;
407
408 fr_bio_queue_list_init(&my->pending);
409 fr_bio_queue_list_init(&my->free);
410
411 my->saved = saved;
412 my->sent = sent;
413 my->cancel = cancel;
414
415 for (i = 0; i < max_saved; i++) {
416 my->array[i].my = my;
417 my->array[i].cancelled = true;
418 fr_bio_queue_list_insert_tail(&my->free, &my->array[i]);
419 }
420
421 my->bio.read = fr_bio_queue_read;
422 my->bio.write = fr_bio_queue_write_next;
423 my->cb.shutdown = fr_bio_queue_shutdown;
424
425 fr_bio_chain(&my->bio, next);
426
427 if (my->cancel) {
428 talloc_set_destructor(my, fr_bio_queue_destructor);
429 } else {
430 talloc_set_destructor((fr_bio_t *) my, fr_bio_destructor);
431 }
432
433 return (fr_bio_t *) my;
434}
435
436/** Cancel the write for a packet.
437 *
438 * Cancel one a saved packets, and call the cancel() routine if it exists.
439 *
440 * There is no way to cancel all packets. The caller must find the lowest bio in the chain, and shutdown it.
441 * e.g. by closing the socket via fr_bio_fd_close(). That function will take care of walking back up the
442 * chain, and shutting down each bio.
443 *
444 * @param bio the #fr_bio_queue_t
445 * @param item The context returned from #fr_bio_queue_saved_t
446 * @return
447 * - <0 no such packet was found in the list of saved packets, OR the packet cannot be cancelled.
448 * - 0 the packet was cancelled.
449 */
451{
452 fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
453
454 if (!(item >= &my->array[0]) && (item < &my->array[my->max_saved])) {
455 return -1;
456 }
457
458 /*
459 * Already cancelled, that's a NOOP.
460 */
461 if (item->cancelled) return 0;
462
463 /*
464 * If the item has been partially written, AND we have a working write function, see if we can
465 * cancel it.
466 */
467 if (item->already_written && (my->bio.write != fr_bio_null_write)) {
468 ssize_t rcode;
469 fr_bio_t *next;
470
471 next = fr_bio_next(bio);
472 fr_assert(next != NULL);
473
474 /*
475 * If the write fails or returns nothing, the item can't be cancelled.
476 */
477 rcode = next->write(next, item->packet_ctx, ((uint8_t const *) item->buffer) + item->already_written, item->size - item->already_written);
478 if (rcode <= 0) return -1;
479
480 /*
481 * If we haven't written the full item, it can't be cancelled.
482 */
483 item->already_written += rcode;
484 if (item->already_written < item->size) return -1;
485
486 /*
487 * Else the item has been fully written, it can be safely cancelled.
488 */
489 }
490
491 /*
492 * Remove it from the saved list, and run the cancellation callback.
493 */
494 (void) fr_bio_queue_list_remove(&my->pending, item);
495 fr_bio_queue_list_insert_head(&my->free, item);
496
497 if (my->cancel) my->cancel(bio, item->packet_ctx, item->buffer, item->size);
498
499 return 0;
500}
static int const char char buffer[256]
Definition acutest.h:576
fr_bio_write_t _CONST write
write to the underlying bio
Definition base.h:116
fr_bio_read_t _CONST read
read from the underlying bio
Definition base.h:115
static fr_bio_t * fr_bio_next(fr_bio_t *bio)
Definition base.h:130
#define fr_bio_error(_x)
Definition base.h:192
fr_bio_t * fr_bio_queue_alloc(TALLOC_CTX *ctx, size_t max_saved, fr_bio_queue_saved_t saved, fr_bio_queue_callback_t sent, fr_bio_queue_callback_t cancel, fr_bio_t *next)
Allocate a packet-based bio.
Definition queue.c:388
fr_bio_queue_callback_t cancel
Definition queue.c:64
static void fr_bio_queue_shutdown(fr_bio_t *bio)
Shutdown.
Definition queue.c:337
static int fr_bio_queue_destructor(fr_bio_queue_t *my)
Definition queue.c:101
static ssize_t fr_bio_queue_list_push(fr_bio_queue_t *my, void *packet_ctx, const void *buffer, size_t size, size_t already_written)
Push a packet onto a list.
Definition queue.c:113
int fr_bio_queue_cancel(fr_bio_t *bio, fr_bio_queue_entry_t *item)
Cancel the write for a packet.
Definition queue.c:450
void const * buffer
Definition queue.c:45
fr_bio_queue_callback_t sent
Definition queue.c:63
static ssize_t fr_bio_queue_write_flush(fr_bio_queue_t *my, size_t size)
Flush the packet list.
Definition queue.c:200
struct fr_bio_queue_list_s fr_bio_queue_list_t
Definition queue.c:30
size_t max_saved
Definition queue.c:60
static ssize_t fr_bio_queue_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
Read one packet from next bio.
Definition queue.c:308
fr_bio_queue_t * my
Definition queue.c:50
void * packet_ctx
Definition queue.c:44
struct fr_bio_queue_s fr_bio_queue_t
Definition queue.c:31
fr_bio_queue_saved_t saved
Definition queue.c:62
size_t already_written
Definition queue.c:47
static ssize_t fr_bio_queue_write_next(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write one packet to the next bio.
Definition queue.c:145
static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
Write to the packet list buffer.
Definition queue.c:287
static void fr_bio_queue_list_cancel(fr_bio_queue_t *my)
Forcibly cancel all outstanding packets.
Definition queue.c:80
Definition queue.c:43
void(* fr_bio_queue_callback_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size)
Definition queue.h:32
struct fr_bio_queue_entry_s fr_bio_queue_entry_t
Definition queue.h:30
void(* fr_bio_queue_saved_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size, fr_bio_queue_entry_t *queue_ctx)
Definition queue.h:33
static void fr_bio_chain(fr_bio_t *first, fr_bio_t *second)
Chain one bio after another.
Definition bio_priv.h:69
#define FR_DLIST_TYPES(_name)
Define type specific wrapper structs for dlists.
Definition dlist.h:1129
#define FR_DLIST_ENTRY(_name)
Expands to the type name used for the entry wrapper structure.
Definition dlist.h:1115
#define FR_DLIST_FUNCS(_name, _element_type, _element_entry)
Define type specific wrapper functions for dlists.
Definition dlist.h:1152
#define FR_DLIST_HEAD(_name)
Expands to the type name used for the head wrapper structure.
Definition dlist.h:1122
fr_bio_shutdown & my
Definition fd_errno.h:59
free(array)
int fr_bio_destructor(fr_bio_t *bio)
Free this bio.
Definition base.c:34
static void * item(fr_lst_t const *lst, fr_lst_index_t idx)
Definition lst.c:122
long int ssize_t
unsigned char uint8_t
ssize_t fr_bio_null_write(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
Always return 0 on write.
Definition null.c:39
ssize_t fr_bio_fail_read(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void *buffer, UNUSED size_t size)
Always return error on read.
Definition null.c:47
ssize_t fr_bio_fail_write(UNUSED fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size)
Always return 0 on write.
Definition null.c:56
#define fr_assert(_expr)
Definition rad_assert.h:38