The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
control.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 5f3de7e110fe4b8d0ec8f5b1041450db4d9dc70b $
19 *
20 * @brief Control-plane signaling
21 * @file io/control.c
22 *
23 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24 */
25RCSID("$Id: 5f3de7e110fe4b8d0ec8f5b1041450db4d9dc70b $")
26
27#include <freeradius-devel/io/control.h>
28#include <freeradius-devel/io/ring_buffer.h>
29#include <freeradius-devel/util/strerror.h>
30#include <freeradius-devel/util/syserror.h>
31#include <freeradius-devel/util/misc.h>
32#include <freeradius-devel/util/rand.h>
33
34#include <fcntl.h>
35#include <string.h>
36#include <sys/event.h>
37
38#define FR_CONTROL_MAX_TYPES (32)
39
40/*
41 * Debugging, mainly for channel_test
42 */
43#if 0
44#define MPRINT(...) fprintf(stderr, __VA_ARGS__)
45#else
46#define MPRINT(...)
47#endif
48
49/**
50 * Status of control messages
51 */
53 FR_CONTROL_MESSAGE_FREE = 0, //!< the message is free
54 FR_CONTROL_MESSAGE_USED, //!< the message is used (set only by originator)
55 FR_CONTROL_MESSAGE_DONE //!< the message is done (set only by receiver)
57
58
59/**
60 * The header for the control message
61 */
62typedef struct {
63 fr_control_message_status_t status; //!< status of this message
64 uint32_t id; //!< ID of this message
65 size_t data_size; //!< size of the data we're sending
67
68
69typedef struct {
70 uint32_t id; //!< id of this callback
71 void *ctx; //!< context for the callback
72 fr_control_callback_t callback; //!< the function to call
74
75
76/**
77 * The control structure.
78 */
80 fr_event_list_t *el; //!< our event list
81
82 fr_atomic_queue_t *aq; //!< destination AQ
83
84 int pipe[2]; //!< our pipes
85
86 bool same_thread; //!< are the two ends in the same thread
87
89};
90
91static void pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
92{
93 fr_control_t *c = talloc_get_type_abort(uctx, fr_control_t);
94 ssize_t i, num;
95 fr_time_t now;
96 char read_buffer[256];
97 uint8_t data[256];
98
99 num = read(fd, read_buffer, sizeof(read_buffer));
100 if (num <= 0) return;
101
102 now = fr_time();
103
104 for (i = 0; i < num; i++) {
105 uint32_t id = 0;
106 size_t message_size;
107
108 message_size = fr_control_message_pop(c->aq, &id, data, sizeof(data));
109 if (!message_size) return;
110
111 if (id >= FR_CONTROL_MAX_TYPES) continue;
112
113 if (!c->type[id].callback) continue;
114
115 c->type[id].callback(c->type[id].ctx, data, message_size, now);
116 }
117}
118
119/** Free a control structure
120 *
121 * This function really only calls the underlying "garbage collect".
122 *
123 * @param[in] c the control structure
124 */
126{
127 (void) talloc_get_type_abort(c, fr_control_t);
128
129#ifndef NDEBUG
130 (void) fr_event_fd_unarmour(c->el, c->pipe[0], FR_EVENT_FILTER_IO, (uintptr_t)c);
131#endif
132 (void) fr_event_fd_delete(c->el, c->pipe[0], FR_EVENT_FILTER_IO);
133
134 close(c->pipe[0]);
135 close(c->pipe[1]);
136
137 return 0;
138}
139
140/** Create a control-plane signaling path.
141 *
142 * @param[in] ctx the talloc context
143 * @param[in] el the event list for the control socket
144 * @param[in] aq the atomic queue where we will be pushing message data
145 * @return
146 * - NULL on error
147 * - fr_control_t on success
148 */
150{
151 fr_control_t *c;
152
153 c = talloc_zero(ctx, fr_control_t);
154 if (!c) {
155 fr_strerror_const("Failed allocating memory");
156 return NULL;
157 }
158 c->el = el;
159 c->aq = aq;
160
161 if (pipe((int *) &c->pipe) < 0) {
162 talloc_free(c);
163 fr_strerror_printf("Failed opening pipe for control socket: %s", fr_syserror(errno));
164 return NULL;
165 }
166 talloc_set_destructor(c, _control_free);
167
168 /*
169 * We don't want reads from the pipe to be blocking.
170 */
171 (void) fcntl(c->pipe[0], F_SETFL, O_NONBLOCK | FD_CLOEXEC);
172 (void) fcntl(c->pipe[1], F_SETFL, O_NONBLOCK | FD_CLOEXEC);
173
174 if (fr_event_fd_insert(c, NULL, el, c->pipe[0], pipe_read, NULL, NULL, c) < 0) {
175 talloc_free(c);
176 fr_strerror_const_push("Failed adding FD to event list control socket");
177 return NULL;
178 }
179
180#ifndef NDEBUG
181 (void) fr_event_fd_armour(c->el, c->pipe[0], FR_EVENT_FILTER_IO, (uintptr_t)c);
182#endif
183
184 return c;
185}
186
187
188/** Clean up messages in a control-plane buffer
189 *
190 * Find the oldest messages which are marked FR_CONTROL_MESSAGE_DONE,
191 * and mark them FR_CONTROL_MESSAGE_FREE.
192 *
193 * @param[in] c the fr_control_t
194 * @param[in] rb the callers ring buffer for message allocation.
195 * @return
196 * - <0 there are still messages used
197 * - 0 the control list is empty.
198 */
200{
201 while (true) {
202 size_t room, message_size;
204
205 (void) fr_ring_buffer_start(rb, (uint8_t **) &m, &room);
206 if (room == 0) break;
207
208 fr_assert(m != NULL);
209 fr_assert(room >= sizeof(*m));
210
212
213 if (m->status != FR_CONTROL_MESSAGE_DONE) break;
214
216
217 /*
218 * Each message is aligned to a 64-byte boundary,
219 * for cache contention issues.
220 */
221 message_size = sizeof(*m);
222 message_size += m->data_size;
223 message_size += 63;
224 message_size &= ~(size_t) 63;
225 fr_ring_buffer_free(rb, message_size);
226 }
227
228 /*
229 * Maybe we failed to garbage collect everything?
230 */
231 if (fr_ring_buffer_used(rb) > 0) {
232 fr_strerror_const("Data still in control buffers");
233 return -1;
234 }
235
236 return 0;
237}
238
239
240/** Allocate a control message
241 *
242 * @param[in] c the control structure
243 * @param[in] rb the callers ring buffer for message allocation.
244 * @param[in] id the ident of this message.
245 * @param[in] data the data to write to the control plane
246 * @param[in] data_size the size of the data to write to the control plane.
247 * @return
248 * - NULL on error
249 * - fr_message_t on success
250 */
252{
253 size_t message_size;
255 uint8_t *p;
256
257 message_size = sizeof(*m);
258 message_size += data_size;
259 message_size += 63;
260 message_size &= ~(size_t) 63;
261
262 m = (fr_control_message_t *) fr_ring_buffer_alloc(rb, message_size);
263 if (!m) {
264 (void) fr_control_gc(c, rb);
265 m = (fr_control_message_t *) fr_ring_buffer_alloc(rb, message_size);
266 if (!m) {
267 fr_strerror_const_push("Failed allocating from ring buffer");
268 return NULL;
269 }
270 }
271
273 m->id = id;
274 m->data_size = data_size;
275
276 p = (uint8_t *) m;
277 memcpy(p + sizeof(*m), data, data_size);
278
279 return m;
280
281}
282
283
284/** Push a control-plane message
285 *
286 * This function is called ONLY from the originating thread.
287 *
288 * @param[in] c the control structure
289 * @param[in] rb the callers ring buffer for message allocation.
290 * @param[in] id the ident of this message.
291 * @param[in] data the data to write to the control plane
292 * @param[in] data_size the size of the data to write to the control plane.
293 * @return
294 * - -2 on ring buffer full
295 * - <0 on error
296 * - 0 on success
297 */
299{
301
302 (void) talloc_get_type_abort(c, fr_control_t);
303
304 MPRINT("CONTROL push aq %p\n", c->aq);
305
306 /*
307 * Get a message. If we can't get one, do garbage
308 * collection. Get another, and if that fails, we're
309 * done.
310 */
311 m = fr_control_message_alloc(c, rb, id, data, data_size);
312 if (!m) {
313 (void) fr_control_gc(c, rb);
314 m = fr_control_message_alloc(c, rb, id, data, data_size);
315 if (!m) {
316 fr_strerror_const("Failed allocating after GC");
317 return -2;
318 }
319 }
320
321 if (!fr_atomic_queue_push(c->aq, m)) {
323 fr_strerror_const("Failed pushing message to atomic queue.");
324 return -1;
325 }
326
327 return 0;
328}
329
330/** Send a control-plane message
331 *
332 * This function is called ONLY from the originating thread.
333 *
334 * @param[in] c the control structure
335 * @param[in] rb the callers ring buffer for message allocation.
336 * @param[in] id the ident of this message.
337 * @param[in] data the data to write to the control plane
338 * @param[in] data_size the size of the data to write to the control plane.
339 * @return
340 * - <0 on error
341 * - 0 on success
342 */
344{
345 (void) talloc_get_type_abort(c, fr_control_t);
346
347 if (c->same_thread) {
348 if (!c->type[id].callback) return -1;
349
350 c->type[id].callback(c->type[id].ctx, data, data_size, fr_time());
351 return 0;
352 }
353
354 if (fr_control_message_push(c, rb, id, data, data_size) < 0) return -1;
355
356 while (write(c->pipe[1], ".", 1) == 0) {
357 /* nothing */
358 }
359
360 return 0;
361}
362
363
364/** Pop control-plane message
365 *
366 * This function is called ONLY from the receiving thread.
367 *
368 * @param[in] aq the recipients atomic queue for control-plane messages
369 * @param[out] p_id the ident of this message.
370 * @param[in,out] data where the data is stored
371 * @param[in] data_size the size of the buffer where we store the data.
372 * @return
373 * - <0 the size of the data we need to read the next message
374 * - 0 this kevent is not for us.
375 * - >0 the amount of data we've read
376 */
378{
379 uint8_t *p;
381
382 MPRINT("CONTROL pop aq %p\n", aq);
383
384 if (!fr_atomic_queue_pop(aq, (void **) &m)) return 0;
385
386 fr_assert_msg(m->status == FR_CONTROL_MESSAGE_USED, "Bad control message state, expected %u got %u",
388
389 /*
390 * There isn't enough room to store the data, die.
391 */
392 if (data_size < m->data_size) {
393 fr_strerror_printf("Allocation size should be at least %zd", m->data_size);
394 return -(m->data_size);
395 }
396
397 p = (uint8_t *) m;
398 data_size = m->data_size;
399 memcpy(data, p + sizeof(*m), data_size);
400
402 *p_id = m->id;
403 return data_size;
404}
405
406
407/** Register a callback for an ID
408 *
409 * @param[in] c the control structure
410 * @param[in] id the ident of this message.
411 * @param[in] ctx the context for the callback
412 * @param[in] callback the callback function
413 * @return
414 * - <0 on error
415 * - 0 on success
416 */
418{
419 (void) talloc_get_type_abort(c, fr_control_t);
420
421 if (id >= FR_CONTROL_MAX_TYPES) {
422 fr_strerror_printf("Failed adding unknown ID %d", id);
423 return -1;
424 }
425
426 /*
427 * Re-registering the same thing is OK.
428 */
429 if ((c->type[id].ctx == ctx) &&
430 (c->type[id].callback == callback)) {
431 return 0;
432 }
433
434 if (c->type[id].callback != NULL) {
435 fr_strerror_const("Callback is already set");
436 return -1;
437 }
438
439 c->type[id].id = id;
440 c->type[id].ctx = ctx;
441 c->type[id].callback = callback;
442
443 return 0;
444}
445
446/** Delete a callback for an ID
447 *
448 * @param[in] c the control structure
449 * @param[in] id the ident of this message.
450 * @return
451 * - <0 on error
452 * - 0 on success
453 */
455{
456 (void) talloc_get_type_abort(c, fr_control_t);
457
458 if (id >= FR_CONTROL_MAX_TYPES) {
459 fr_strerror_printf("Failed adding unknown ID %d", id);
460 return -1;
461 }
462
463 if (c->type[id].callback == NULL) return 0;
464
465 c->type[id].id = 0;
466 c->type[id].ctx = NULL;
467 c->type[id].callback = NULL;
468
469 return 0;
470}
471
473{
474 c->same_thread = true;
475 (void) fr_event_fd_delete(c->el, c->pipe[0], FR_EVENT_FILTER_IO);
476 close(c->pipe[0]);
477 close(c->pipe[1]);
478
479 /*
480 * Nothing more to do now that everything is gone.
481 */
482 talloc_set_destructor(c, NULL);
483
484 return 0;
485}
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:483
#define UNUSED
Definition build.h:315
void(* fr_control_callback_t)(void *ctx, void const *data, size_t data_size, fr_time_t now)
Definition control.h:45
static fr_ring_buffer_t * rb
static fr_atomic_queue_t * aq
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:210
#define fr_event_fd_insert(...)
Definition event.h:232
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition event.h:62
#define MPRINT(...)
Definition control.c:46
fr_atomic_queue_t * aq
destination AQ
Definition control.c:82
int fr_control_gc(UNUSED fr_control_t *c, fr_ring_buffer_t *rb)
Clean up messages in a control-plane buffer.
Definition control.c:199
ssize_t fr_control_message_pop(fr_atomic_queue_t *aq, uint32_t *p_id, void *data, size_t data_size)
Pop control-plane message.
Definition control.c:377
#define FR_CONTROL_MAX_TYPES
Definition control.c:38
static fr_control_message_t * fr_control_message_alloc(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Allocate a control message.
Definition control.c:251
static int _control_free(fr_control_t *c)
Free a control structure.
Definition control.c:125
bool same_thread
are the two ends in the same thread
Definition control.c:86
fr_event_list_t * el
our event list
Definition control.c:80
void * ctx
context for the callback
Definition control.c:71
fr_control_message_status_t status
status of this message
Definition control.c:63
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition control.c:417
int pipe[2]
our pipes
Definition control.c:84
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:343
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition control.c:149
int fr_control_message_push(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Push a control-plane message.
Definition control.c:298
fr_control_callback_t callback
the function to call
Definition control.c:72
uint32_t id
id of this callback
Definition control.c:70
static void pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Definition control.c:91
fr_control_ctx_t type[FR_CONTROL_MAX_TYPES]
callbacks
Definition control.c:88
size_t data_size
size of the data we're sending
Definition control.c:65
fr_control_message_status_t
Status of control messages.
Definition control.c:52
@ FR_CONTROL_MESSAGE_DONE
the message is done (set only by receiver)
Definition control.c:55
@ FR_CONTROL_MESSAGE_FREE
the message is free
Definition control.c:53
@ FR_CONTROL_MESSAGE_USED
the message is used (set only by originator)
Definition control.c:54
uint32_t id
ID of this message.
Definition control.c:64
int fr_control_same_thread(fr_control_t *c)
Definition control.c:472
int fr_control_callback_delete(fr_control_t *c, uint32_t id)
Delete a callback for an ID.
Definition control.c:454
The header for the control message.
Definition control.c:62
The control structure.
Definition control.c:79
talloc_free(reap)
int fr_event_fd_unarmour(fr_event_list_t *el, int fd, fr_event_filter_t filter, uintptr_t armour)
Unarmour an FD.
Definition event.c:1372
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition event.c:1260
int fr_event_fd_armour(fr_event_list_t *el, int fd, fr_event_filter_t filter, uintptr_t armour)
Armour an FD.
Definition event.c:1342
Stores all information relating to an event list.
Definition event.c:411
unsigned int uint32_t
long int ssize_t
unsigned char uint8_t
unsigned long int size_t
#define fr_assert(_expr)
Definition rad_assert.h:38
uint8_t * fr_ring_buffer_alloc(fr_ring_buffer_t *rb, size_t size)
Mark data as allocated.
int fr_ring_buffer_free(fr_ring_buffer_t *rb, size_t size_to_free)
Mark data as free,.
size_t fr_ring_buffer_used(fr_ring_buffer_t *rb)
Get the amount of data used in a ring buffer.
int fr_ring_buffer_start(fr_ring_buffer_t *rb, uint8_t **p_start, size_t *p_size)
Get a pointer to the data at the start of the ring buffer.
#define FD_CLOEXEC
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
"server local" time.
Definition time.h:69
close(uq->fd)
static fr_event_list_t * el
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1265