The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
control.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: c133ceaca5bce535f3de3800c578b6d6b51bba5f $
19 *
20 * @brief Control-plane signaling
21 * @file io/control.c
22 *
23 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24 */
25RCSID("$Id: c133ceaca5bce535f3de3800c578b6d6b51bba5f $")
26
27#include <freeradius-devel/io/control.h>
28#include <freeradius-devel/util/strerror.h>
29#include <freeradius-devel/util/syserror.h>
30#include <freeradius-devel/util/misc.h>
31#include <freeradius-devel/util/rand.h>
32
33#include <fcntl.h>
34#include <sys/event.h>
35#include <poll.h>
36
37/*
38 * Debugging, mainly for channel_test
39 */
40#if 0
41#define MPRINT(...) fprintf(stderr, __VA_ARGS__)
42#else
43#define MPRINT(...)
44#endif
45
46/**
47 * Status of control messages
48 */
50 FR_CONTROL_MESSAGE_FREE = 0, //!< the message is free
51 FR_CONTROL_MESSAGE_USED, //!< the message is used (set only by originator)
52 FR_CONTROL_MESSAGE_DONE //!< the message is done (set only by receiver)
54
55
56/**
57 * The header for the control message
58 */
59typedef struct {
60 fr_control_message_status_t status; //!< status of this message
61 uint32_t id; //!< ID of this message
62 size_t data_size; //!< size of the data we're sending
64
65
66typedef struct {
67 uint32_t id; //!< id of this callback
68 void *ctx; //!< context for the callback
69 fr_control_callback_t callback; //!< the function to call
71
72
73/**
74 * The control structure.
75 */
77 fr_event_list_t *el; //!< our event list
78
79 fr_atomic_queue_t *aq; //!< destination AQ
80
81 int pipe[2]; //!< our pipes
82
83 bool same_thread; //!< are the two ends in the same thread
84
85 bool opened; //!< has the control path been opened.
86
87 uint32_t num_callbacks; //!< the size of the callback array
88
89 fr_control_ctx_t type[]; //!< callbacks. Must be at the end of the structure as these
90 ///< are created by allocating a larger memory size.
91};
92
93static void pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
94{
95 fr_control_t *c = talloc_get_type_abort(uctx, fr_control_t);
96 fr_time_t now;
97 char read_buffer[256];
98 uint8_t data[256];
99 size_t message_size;
100 uint32_t id = 0;
101
102 /*
103 * The presence of data on the pipe fd is just a trigger to pop all
104 * available messages from the atomic queue, so the number of bytes
105 * read is not important.
106 */
107 /* coverity[check_return] */
108 if (read(fd, read_buffer, sizeof(read_buffer)) <= 0) return;
109
110 now = fr_time();
111
112 while((message_size = fr_control_message_pop(c->aq, &id, data, sizeof(data)))) {
113 if (id >= c->num_callbacks) continue;
114
115 if (!c->type[id].callback) continue;
116
117 c->type[id].callback(c->type[id].ctx, data, message_size, now);
118 }
119}
120
121/** Free a control structure
122 *
123 * This function really only calls the underlying "garbage collect".
124 *
125 * @param[in] c the control structure
126 */
128{
129 (void) talloc_get_type_abort(c, fr_control_t);
130
131#ifndef NDEBUG
132 (void) fr_event_fd_unarmour(c->el, c->pipe[0], FR_EVENT_FILTER_IO, (uintptr_t)c);
133#endif
134 (void) fr_event_fd_delete(c->el, c->pipe[0], FR_EVENT_FILTER_IO);
135
136 close(c->pipe[0]);
137 close(c->pipe[1]);
138
139 return 0;
140}
141
142/** Create a control-plane signaling path.
143 *
144 * @param[in] ctx the talloc context
145 * @param[in] el the event list for the control socket
146 * @param[in] aq the atomic queue where we will be pushing message data
147 * @param[in] num_callbacks the initial number of callback entries to allocate.
148 * @return
149 * - NULL on error
150 * - fr_control_t on success
151 */
152fr_control_t *fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq, size_t num_callbacks)
153{
154 fr_control_t *c;
155
156 c = talloc_zero_size(ctx, sizeof(fr_control_t) + sizeof(fr_control_ctx_t) * num_callbacks);
157 talloc_set_type(c, fr_control_t);
158 if (!c) {
159 fr_strerror_const("Failed allocating memory");
160 return NULL;
161 }
162 c->el = el;
163 c->aq = aq;
164 c->num_callbacks = num_callbacks;
165
166 return c;
167}
168
169/** Open the control-plane signalling path
170 *
171 * @param[in] c the control-plane to open
172 * @return
173 * - 0 on success
174 * - -1 on failure
175 */
177{
178 if (pipe(c->pipe) < 0) {
179 fr_strerror_printf("Failed opening pipe for control socket: %s", fr_syserror(errno));
180 return -1;
181 }
182 talloc_set_destructor(c, _control_free);
183
184 /*
185 * We don't want reads from the pipe to be blocking.
186 */
187 (void) fr_nonblock(c->pipe[0]);
188 (void) fr_nonblock(c->pipe[1]);
189 (void) fr_cloexec(c->pipe[0]);
190 (void) fr_cloexec(c->pipe[1]);
191
192 if (fr_event_fd_insert(c, NULL, c->el, c->pipe[0], pipe_read, NULL, NULL, c) < 0) {
193 fr_strerror_const_push("Failed adding FD to event list control socket");
194 return -1;
195 }
196
197#ifndef NDEBUG
198 (void) fr_event_fd_armour(c->el, c->pipe[0], FR_EVENT_FILTER_IO, (uintptr_t)c);
199#endif
200 c->opened = true;
201
202 return 0;
203}
204
205/** Clean up messages in a control-plane buffer
206 *
207 * Find the oldest messages which are marked FR_CONTROL_MESSAGE_DONE,
208 * and mark them FR_CONTROL_MESSAGE_FREE.
209 *
210 * @param[in] c the fr_control_t
211 * @param[in] rb the callers ring buffer for message allocation.
212 * @return
213 * - <0 there are still messages used
214 * - 0 the control list is empty.
215 */
217{
218 while (true) {
219 size_t room, message_size;
221
222 (void) fr_ring_buffer_start(rb, (uint8_t **) &m, &room);
223 if (room == 0) break;
224
225 fr_assert(m != NULL);
226 fr_assert(room >= sizeof(*m));
227
229
230 if (m->status != FR_CONTROL_MESSAGE_DONE) break;
231
233
234 /*
235 * Each message is aligned to a 64-byte boundary,
236 * for cache contention issues.
237 */
238 message_size = sizeof(*m);
239 message_size += m->data_size;
240 message_size += 63;
241 message_size &= ~(size_t) 63;
242 fr_ring_buffer_free(rb, message_size);
243 }
244
245 /*
246 * Maybe we failed to garbage collect everything?
247 */
248 if (fr_ring_buffer_used(rb) > 0) {
249 fr_strerror_const("Data still in control buffers");
250 return -1;
251 }
252
253 return 0;
254}
255
256
257/** Allocate a control message
258 *
259 * @param[in] c the control structure
260 * @param[in] rb the callers ring buffer for message allocation.
261 * @param[in] id the ident of this message.
262 * @param[in] data the data to write to the control plane
263 * @param[in] data_size the size of the data to write to the control plane.
264 * @return
265 * - NULL on error
266 * - fr_message_t on success
267 */
269{
270 size_t message_size;
272 uint8_t *p;
273
274 message_size = sizeof(*m);
275 message_size += data_size;
276 message_size += 63;
277 message_size &= ~(size_t) 63;
278
279 m = (fr_control_message_t *) fr_ring_buffer_alloc(rb, message_size);
280 if (!m) {
281 (void) fr_control_gc(c, rb);
282 m = (fr_control_message_t *) fr_ring_buffer_alloc(rb, message_size);
283 if (!m) {
284 fr_strerror_const_push("Failed allocating from ring buffer");
285 return NULL;
286 }
287 }
288
290 m->id = id;
291 m->data_size = data_size;
292
293 p = (uint8_t *) m;
294 memcpy(p + sizeof(*m), data, data_size);
295
296 return m;
297
298}
299
300
301/** Push a control-plane message
302 *
303 * This function is called ONLY from the originating threads.
304 *
305 * @param[in] c the control structure
306 * @param[in] rb the callers ring buffer for message allocation.
307 * @param[in] id the ident of this message.
308 * @param[in] data the data to write to the control plane
309 * @param[in] data_size the size of the data to write to the control plane.
310 * @return
311 * - -2 on ring buffer full
312 * - <0 on error
313 * - 0 on success
314 */
315int fr_control_message_push(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
316{
318
319 (void) talloc_get_type_abort(c, fr_control_t);
320
321 MPRINT("CONTROL push aq %p\n", c->aq);
322
323 /*
324 * Get a message. The alloc call attempts garbage collection
325 * if there is not enough free space.
326 */
327 m = fr_control_message_alloc(c, rb, id, data, data_size);
328 if (!m) {
329 fr_strerror_const_push("Failed allocating after GC");
330 return -2;
331 }
332
333 if (!fr_atomic_queue_push(c->aq, m)) {
335 fr_strerror_const("Failed pushing message to atomic queue.");
336 return -1;
337 }
338
339 return 0;
340}
341
342/** Send a control-plane message
343 *
344 * This function is called ONLY from the originating threads.
345 *
346 * @param[in] c the control structure
347 * @param[in] rb the callers ring buffer for message allocation.
348 * @param[in] id the ident of this message.
349 * @param[in] data the data to write to the control plane
350 * @param[in] data_size the size of the data to write to the control plane.
351 * @return
352 * - <0 on error
353 * - 0 on success
354 */
355int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
356{
357 (void) talloc_get_type_abort(c, fr_control_t);
358
359 if (c->same_thread) {
360 if (!c->type[id].callback) return -1;
361
362 c->type[id].callback(c->type[id].ctx, data, data_size, fr_time());
363 return 0;
364 }
365
366 if (fr_control_message_push(c, rb, id, data, data_size) < 0) return -1;
367
368redo:
369 if (write(c->pipe[1], ".", 1) >= 0) return 0;
370
371 if (errno == EINTR) goto redo;
372
373 /*
374 * EAGAIN means that the pipe is full, which means that the other end will eventually
375 * read from it.
376 */
377 if (errno == EAGAIN) return 0;
378
379#if defined(EWOULDBLOCK) && (EWOULDBLOCK != EAGAIN)
380 if (errno == EWOULDBLOCK) return 0;
381#endif
382
383 /*
384 * Other error, that's an issue.
385 */
386 return -1;
387}
388
389
390/** Pop control-plane message
391 *
392 * This function is called ONLY from the receiving thread.
393 *
394 * @param[in] aq the recipients atomic queue for control-plane messages
395 * @param[out] p_id the ident of this message.
396 * @param[in,out] data where the data is stored
397 * @param[in] data_size the size of the buffer where we store the data.
398 * @return
399 * - <0 the size of the data we need to read the next message
400 * - 0 this kevent is not for us.
401 * - >0 the amount of data we've read
402 */
404{
405 uint8_t *p;
407
408 MPRINT("CONTROL pop aq %p\n", aq);
409
410 if (!fr_atomic_queue_pop(aq, (void **) &m)) return 0;
411
412 fr_assert_msg(m->status == FR_CONTROL_MESSAGE_USED, "Bad control message state, expected %u got %u",
414
415 /*
416 * There isn't enough room to store the data, die.
417 */
418 if (data_size < m->data_size) {
419 fr_strerror_printf("Allocation size should be at least %zd", m->data_size);
420 return -(m->data_size);
421 }
422
423 p = (uint8_t *) m;
424 data_size = m->data_size;
425 memcpy(data, p + sizeof(*m), data_size);
426
428 *p_id = m->id;
429 return data_size;
430}
431
432
433/** Register a callback for an ID
434 *
435 * @param[in] c the control structure
436 * @param[in] id the ident of this message.
437 * @param[in] ctx the context for the callback
438 * @param[in] callback the callback function
439 * @return
440 * - <0 on error
441 * - 0 on success
442 */
444{
445 fr_control_t *ctrl = talloc_get_type_abort(*c, fr_control_t);
446
447 if (ctrl->opened) {
448 fr_strerror_printf("Cannot add callbacks after the control is opened");
449 return -1;
450 }
451
452 /*
453 * If there is not enough space in the array of callbacks
454 * re-allocate the fr_control_t with a larger array.
455 */
456 if (id >= ctrl->num_callbacks) {
457 ctrl = talloc_realloc_size(talloc_parent(*c), *c, sizeof(fr_control_t) + sizeof(fr_control_ctx_t) * (id + 1));
458 if (!ctrl) {
459 fr_strerror_printf("Failed re-allocating control when registering callback ID %d", id);
460 return -1;
461 }
462 talloc_set_type(ctrl, fr_control_t);
463
464 /*
465 * Zero the additional callback entries.
466 */
467 memset((uint8_t *)ctrl + sizeof(fr_control_t) + sizeof(fr_control_ctx_t) * ctrl->num_callbacks, 0,
468 (id + 1 - ctrl->num_callbacks) * sizeof(fr_control_ctx_t));
469
470 ctrl->num_callbacks = id + 1;
471 *c = ctrl;
472 }
473
474 /*
475 * Re-registering the same thing is OK.
476 */
477 if ((ctrl->type[id].ctx == ctx) &&
478 (ctrl->type[id].callback == callback)) {
479 return 0;
480 }
481
482 if (ctrl->type[id].callback != NULL) {
483 fr_strerror_const("Callback is already set");
484 return -1;
485 }
486
487 ctrl->type[id].id = id;
488 ctrl->type[id].ctx = ctx;
489 ctrl->type[id].callback = callback;
490
491 return 0;
492}
493
494/** Delete a callback for an ID
495 *
496 * @param[in] c the control structure
497 * @param[in] id the ident of this message.
498 * @return
499 * - <0 on error
500 * - 0 on success
501 */
503{
504 (void) talloc_get_type_abort(c, fr_control_t);
505
506 if (id >= c->num_callbacks) {
507 fr_strerror_printf("Failed deleting unknown ID %d", id);
508 return -1;
509 }
510
511 if (c->type[id].callback == NULL) return 0;
512
513 c->type[id].id = 0;
514 c->type[id].ctx = NULL;
515 c->type[id].callback = NULL;
516
517 return 0;
518}
519
521{
522 c->same_thread = true;
523 (void) fr_event_fd_delete(c->el, c->pipe[0], FR_EVENT_FILTER_IO);
524 close(c->pipe[0]);
525 close(c->pipe[1]);
526
527 /*
528 * Nothing more to do now that everything is gone.
529 */
530 talloc_set_destructor(c, NULL);
531
532 return 0;
533}
534
535/** Wait for a plane control to become readable
536 *
537 * This is a blocking function so only to be used in rare cases such as waiting
538 * for another thread to complete a task before proceeding.
539 *
540 * @param[in] c the control structure.
541 */
543{
544 struct pollfd fd;
545
546 fd.fd = c->pipe[0];
547 fd.events = POLLIN;
548
549 (void) poll(&fd, 1, -1);
550}
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:506
#define UNUSED
Definition build.h:336
void(* fr_control_callback_t)(void *ctx, void const *data, size_t data_size, fr_time_t now)
Definition control.h:45
static fr_atomic_queue_t ** aq
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:212
#define fr_event_fd_insert(...)
Definition event.h:247
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition event.h:83
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq, size_t num_callbacks)
Create a control-plane signaling path.
Definition control.c:152
#define MPRINT(...)
Definition control.c:43
fr_atomic_queue_t * aq
destination AQ
Definition control.c:79
int fr_control_gc(UNUSED fr_control_t *c, fr_ring_buffer_t *rb)
Clean up messages in a control-plane buffer.
Definition control.c:216
ssize_t fr_control_message_pop(fr_atomic_queue_t *aq, uint32_t *p_id, void *data, size_t data_size)
Pop control-plane message.
Definition control.c:403
static fr_control_message_t * fr_control_message_alloc(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Allocate a control message.
Definition control.c:268
static int _control_free(fr_control_t *c)
Free a control structure.
Definition control.c:127
int fr_control_callback_add(fr_control_t **c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition control.c:443
bool same_thread
are the two ends in the same thread
Definition control.c:83
fr_event_list_t * el
our event list
Definition control.c:77
void * ctx
context for the callback
Definition control.c:68
fr_control_message_status_t status
status of this message
Definition control.c:60
bool opened
has the control path been opened.
Definition control.c:85
int pipe[2]
our pipes
Definition control.c:81
int fr_control_open(fr_control_t *c)
Open the control-plane signalling path.
Definition control.c:176
fr_control_ctx_t type[]
callbacks.
Definition control.c:89
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:355
int fr_control_message_push(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Push a control-plane message.
Definition control.c:315
fr_control_callback_t callback
the function to call
Definition control.c:69
uint32_t id
id of this callback
Definition control.c:67
uint32_t num_callbacks
the size of the callback array
Definition control.c:87
static void pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Definition control.c:93
size_t data_size
size of the data we're sending
Definition control.c:62
fr_control_message_status_t
Status of control messages.
Definition control.c:49
@ FR_CONTROL_MESSAGE_DONE
the message is done (set only by receiver)
Definition control.c:52
@ FR_CONTROL_MESSAGE_FREE
the message is free
Definition control.c:50
@ FR_CONTROL_MESSAGE_USED
the message is used (set only by originator)
Definition control.c:51
uint32_t id
ID of this message.
Definition control.c:61
int fr_control_same_thread(fr_control_t *c)
Definition control.c:520
int fr_control_callback_delete(fr_control_t *c, uint32_t id)
Delete a callback for an ID.
Definition control.c:502
void fr_control_wait(fr_control_t *c)
Wait for a plane control to become readable.
Definition control.c:542
The header for the control message.
Definition control.c:59
The control structure.
Definition control.c:76
int fr_event_fd_unarmour(fr_event_list_t *el, int fd, fr_event_filter_t filter, uintptr_t armour)
Unarmour an FD.
Definition event.c:1315
#define fr_time()
Definition event.c:60
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition event.c:1203
int fr_event_fd_armour(fr_event_list_t *el, int fd, fr_event_filter_t filter, uintptr_t armour)
Armour an FD.
Definition event.c:1285
Stores all information relating to an event list.
Definition event.c:377
unsigned int uint32_t
long int ssize_t
unsigned char uint8_t
unsigned long int size_t
int fr_nonblock(UNUSED int fd)
Definition misc.c:293
int fr_cloexec(UNUSED int fd)
Definition misc.c:331
#define fr_assert(_expr)
Definition rad_assert.h:37
uint8_t * fr_ring_buffer_alloc(fr_ring_buffer_t *rb, size_t size)
Mark data as allocated.
int fr_ring_buffer_free(fr_ring_buffer_t *rb, size_t size_to_free)
Mark data as free,.
size_t fr_ring_buffer_used(fr_ring_buffer_t *rb)
Get the amount of data used in a ring buffer.
int fr_ring_buffer_start(fr_ring_buffer_t *rb, uint8_t **p_start, size_t *p_size)
Get a pointer to the data at the start of the ring buffer.
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
"server local" time.
Definition time.h:69
static fr_event_list_t * el
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1340