The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
control.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: ffd94466f771b28b23e130216fe8ea5f906eb17d $
19 *
20 * @brief Control-plane signaling
21 * @file io/control.c
22 *
23 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24 */
25RCSID("$Id: ffd94466f771b28b23e130216fe8ea5f906eb17d $")
26
27#include <freeradius-devel/io/control.h>
28#include <freeradius-devel/io/ring_buffer.h>
29#include <freeradius-devel/util/strerror.h>
30#include <freeradius-devel/util/syserror.h>
31#include <freeradius-devel/util/misc.h>
32#include <freeradius-devel/util/rand.h>
33
34#include <fcntl.h>
35#include <string.h>
36#include <sys/event.h>
37
38#define FR_CONTROL_MAX_TYPES (32)
39
40/*
41 * Debugging, mainly for channel_test
42 */
43#if 0
44#define MPRINT(...) fprintf(stderr, __VA_ARGS__)
45#else
46#define MPRINT(...)
47#endif
48
49/**
50 * Status of control messages
51 */
53 FR_CONTROL_MESSAGE_FREE = 0, //!< the message is free
54 FR_CONTROL_MESSAGE_USED, //!< the message is used (set only by originator)
55 FR_CONTROL_MESSAGE_DONE //!< the message is done (set only by receiver)
57
58
59/**
60 * The header for the control message
61 */
62typedef struct {
63 fr_control_message_status_t status; //!< status of this message
64 uint32_t id; //!< ID of this message
65 size_t data_size; //!< size of the data we're sending
67
68
69typedef struct {
70 uint32_t id; //!< id of this callback
71 void *ctx; //!< context for the callback
72 fr_control_callback_t callback; //!< the function to call
74
75
76/**
77 * The control structure.
78 */
80 fr_event_list_t *el; //!< our event list
81
82 fr_atomic_queue_t *aq; //!< destination AQ
83
84 int pipe[2]; //!< our pipes
85
86 bool same_thread; //!< are the two ends in the same thread
87
89};
90
91static void pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
92{
93 fr_control_t *c = talloc_get_type_abort(uctx, fr_control_t);
94 fr_time_t now;
95 char read_buffer[256];
96 uint8_t data[256];
97 size_t message_size;
98 uint32_t id = 0;
99
100 if (read(fd, read_buffer, sizeof(read_buffer)) <= 0) return;
101
102 now = fr_time();
103
104 while((message_size = fr_control_message_pop(c->aq, &id, data, sizeof(data)))) {
105 if (id >= FR_CONTROL_MAX_TYPES) continue;
106
107 if (!c->type[id].callback) continue;
108
109 c->type[id].callback(c->type[id].ctx, data, message_size, now);
110 }
111}
112
113/** Free a control structure
114 *
115 * This function really only calls the underlying "garbage collect".
116 *
117 * @param[in] c the control structure
118 */
120{
121 (void) talloc_get_type_abort(c, fr_control_t);
122
123#ifndef NDEBUG
124 (void) fr_event_fd_unarmour(c->el, c->pipe[0], FR_EVENT_FILTER_IO, (uintptr_t)c);
125#endif
126 (void) fr_event_fd_delete(c->el, c->pipe[0], FR_EVENT_FILTER_IO);
127
128 close(c->pipe[0]);
129 close(c->pipe[1]);
130
131 return 0;
132}
133
134/** Create a control-plane signaling path.
135 *
136 * @param[in] ctx the talloc context
137 * @param[in] el the event list for the control socket
138 * @param[in] aq the atomic queue where we will be pushing message data
139 * @return
140 * - NULL on error
141 * - fr_control_t on success
142 */
144{
145 fr_control_t *c;
146
147 c = talloc_zero(ctx, fr_control_t);
148 if (!c) {
149 fr_strerror_const("Failed allocating memory");
150 return NULL;
151 }
152 c->el = el;
153 c->aq = aq;
154
155 if (pipe((int *) &c->pipe) < 0) {
156 talloc_free(c);
157 fr_strerror_printf("Failed opening pipe for control socket: %s", fr_syserror(errno));
158 return NULL;
159 }
160 talloc_set_destructor(c, _control_free);
161
162 /*
163 * We don't want reads from the pipe to be blocking.
164 */
165 (void) fcntl(c->pipe[0], F_SETFL, O_NONBLOCK | FD_CLOEXEC);
166 (void) fcntl(c->pipe[1], F_SETFL, O_NONBLOCK | FD_CLOEXEC);
167
168 if (fr_event_fd_insert(c, NULL, el, c->pipe[0], pipe_read, NULL, NULL, c) < 0) {
169 talloc_free(c);
170 fr_strerror_const_push("Failed adding FD to event list control socket");
171 return NULL;
172 }
173
174#ifndef NDEBUG
175 (void) fr_event_fd_armour(c->el, c->pipe[0], FR_EVENT_FILTER_IO, (uintptr_t)c);
176#endif
177
178 return c;
179}
180
181
182/** Clean up messages in a control-plane buffer
183 *
184 * Find the oldest messages which are marked FR_CONTROL_MESSAGE_DONE,
185 * and mark them FR_CONTROL_MESSAGE_FREE.
186 *
187 * @param[in] c the fr_control_t
188 * @param[in] rb the callers ring buffer for message allocation.
189 * @return
190 * - <0 there are still messages used
191 * - 0 the control list is empty.
192 */
194{
195 while (true) {
196 size_t room, message_size;
198
199 (void) fr_ring_buffer_start(rb, (uint8_t **) &m, &room);
200 if (room == 0) break;
201
202 fr_assert(m != NULL);
203 fr_assert(room >= sizeof(*m));
204
206
207 if (m->status != FR_CONTROL_MESSAGE_DONE) break;
208
210
211 /*
212 * Each message is aligned to a 64-byte boundary,
213 * for cache contention issues.
214 */
215 message_size = sizeof(*m);
216 message_size += m->data_size;
217 message_size += 63;
218 message_size &= ~(size_t) 63;
219 fr_ring_buffer_free(rb, message_size);
220 }
221
222 /*
223 * Maybe we failed to garbage collect everything?
224 */
225 if (fr_ring_buffer_used(rb) > 0) {
226 fr_strerror_const("Data still in control buffers");
227 return -1;
228 }
229
230 return 0;
231}
232
233
234/** Allocate a control message
235 *
236 * @param[in] c the control structure
237 * @param[in] rb the callers ring buffer for message allocation.
238 * @param[in] id the ident of this message.
239 * @param[in] data the data to write to the control plane
240 * @param[in] data_size the size of the data to write to the control plane.
241 * @return
242 * - NULL on error
243 * - fr_message_t on success
244 */
246{
247 size_t message_size;
249 uint8_t *p;
250
251 message_size = sizeof(*m);
252 message_size += data_size;
253 message_size += 63;
254 message_size &= ~(size_t) 63;
255
256 m = (fr_control_message_t *) fr_ring_buffer_alloc(rb, message_size);
257 if (!m) {
258 (void) fr_control_gc(c, rb);
259 m = (fr_control_message_t *) fr_ring_buffer_alloc(rb, message_size);
260 if (!m) {
261 fr_strerror_const_push("Failed allocating from ring buffer");
262 return NULL;
263 }
264 }
265
267 m->id = id;
268 m->data_size = data_size;
269
270 p = (uint8_t *) m;
271 memcpy(p + sizeof(*m), data, data_size);
272
273 return m;
274
275}
276
277
278/** Push a control-plane message
279 *
280 * This function is called ONLY from the originating thread.
281 *
282 * @param[in] c the control structure
283 * @param[in] rb the callers ring buffer for message allocation.
284 * @param[in] id the ident of this message.
285 * @param[in] data the data to write to the control plane
286 * @param[in] data_size the size of the data to write to the control plane.
287 * @return
288 * - -2 on ring buffer full
289 * - <0 on error
290 * - 0 on success
291 */
292int fr_control_message_push(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
293{
295
296 (void) talloc_get_type_abort(c, fr_control_t);
297
298 MPRINT("CONTROL push aq %p\n", c->aq);
299
300 /*
301 * Get a message. The alloc call attempts garbage collection
302 * if there is not enough free space.
303 */
304 m = fr_control_message_alloc(c, rb, id, data, data_size);
305 if (!m) {
306 fr_strerror_const("Failed allocating after GC");
307 return -2;
308 }
309
310 if (!fr_atomic_queue_push(c->aq, m)) {
312 fr_strerror_const("Failed pushing message to atomic queue.");
313 return -1;
314 }
315
316 return 0;
317}
318
319/** Send a control-plane message
320 *
321 * This function is called ONLY from the originating thread.
322 *
323 * @param[in] c the control structure
324 * @param[in] rb the callers ring buffer for message allocation.
325 * @param[in] id the ident of this message.
326 * @param[in] data the data to write to the control plane
327 * @param[in] data_size the size of the data to write to the control plane.
328 * @return
329 * - <0 on error
330 * - 0 on success
331 */
332int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
333{
334 ssize_t ret;
335 int delay = 0;
336 (void) talloc_get_type_abort(c, fr_control_t);
337
338 if (c->same_thread) {
339 if (!c->type[id].callback) return -1;
340
341 c->type[id].callback(c->type[id].ctx, data, data_size, fr_time());
342 return 0;
343 }
344
345 if (fr_control_message_push(c, rb, id, data, data_size) < 0) return -1;
346
347again:
348 while ((ret = write(c->pipe[1], ".", 1)) == 0) {
349 /* nothing */
350 }
351 if ((ret < 0) && ((errno == EAGAIN))
352#if defined(EWOULDBLOCK) && (EWOULDBLOCK != EAGAIN)
353 || (errno == EWOULDBLOCK)
354#endif
355 ) {
356 delay += 10;
357 usleep(delay);
358 goto again;
359 }
360 return 0;
361}
362
363
364/** Pop control-plane message
365 *
366 * This function is called ONLY from the receiving thread.
367 *
368 * @param[in] aq the recipients atomic queue for control-plane messages
369 * @param[out] p_id the ident of this message.
370 * @param[in,out] data where the data is stored
371 * @param[in] data_size the size of the buffer where we store the data.
372 * @return
373 * - <0 the size of the data we need to read the next message
374 * - 0 this kevent is not for us.
375 * - >0 the amount of data we've read
376 */
378{
379 uint8_t *p;
381
382 MPRINT("CONTROL pop aq %p\n", aq);
383
384 if (!fr_atomic_queue_pop(aq, (void **) &m)) return 0;
385
386 fr_assert_msg(m->status == FR_CONTROL_MESSAGE_USED, "Bad control message state, expected %u got %u",
388
389 /*
390 * There isn't enough room to store the data, die.
391 */
392 if (data_size < m->data_size) {
393 fr_strerror_printf("Allocation size should be at least %zd", m->data_size);
394 return -(m->data_size);
395 }
396
397 p = (uint8_t *) m;
398 data_size = m->data_size;
399 memcpy(data, p + sizeof(*m), data_size);
400
402 *p_id = m->id;
403 return data_size;
404}
405
406
407/** Register a callback for an ID
408 *
409 * @param[in] c the control structure
410 * @param[in] id the ident of this message.
411 * @param[in] ctx the context for the callback
412 * @param[in] callback the callback function
413 * @return
414 * - <0 on error
415 * - 0 on success
416 */
418{
419 (void) talloc_get_type_abort(c, fr_control_t);
420
421 if (id >= FR_CONTROL_MAX_TYPES) {
422 fr_strerror_printf("Failed adding unknown ID %d", id);
423 return -1;
424 }
425
426 /*
427 * Re-registering the same thing is OK.
428 */
429 if ((c->type[id].ctx == ctx) &&
430 (c->type[id].callback == callback)) {
431 return 0;
432 }
433
434 if (c->type[id].callback != NULL) {
435 fr_strerror_const("Callback is already set");
436 return -1;
437 }
438
439 c->type[id].id = id;
440 c->type[id].ctx = ctx;
441 c->type[id].callback = callback;
442
443 return 0;
444}
445
446/** Delete a callback for an ID
447 *
448 * @param[in] c the control structure
449 * @param[in] id the ident of this message.
450 * @return
451 * - <0 on error
452 * - 0 on success
453 */
455{
456 (void) talloc_get_type_abort(c, fr_control_t);
457
458 if (id >= FR_CONTROL_MAX_TYPES) {
459 fr_strerror_printf("Failed adding unknown ID %d", id);
460 return -1;
461 }
462
463 if (c->type[id].callback == NULL) return 0;
464
465 c->type[id].id = 0;
466 c->type[id].ctx = NULL;
467 c->type[id].callback = NULL;
468
469 return 0;
470}
471
473{
474 c->same_thread = true;
475 (void) fr_event_fd_delete(c->el, c->pipe[0], FR_EVENT_FILTER_IO);
476 close(c->pipe[0]);
477 close(c->pipe[1]);
478
479 /*
480 * Nothing more to do now that everything is gone.
481 */
482 talloc_set_destructor(c, NULL);
483
484 return 0;
485}
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:485
#define UNUSED
Definition build.h:317
void(* fr_control_callback_t)(void *ctx, void const *data, size_t data_size, fr_time_t now)
Definition control.h:45
static fr_atomic_queue_t ** aq
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:202
#define fr_event_fd_insert(...)
Definition event.h:248
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition event.h:84
#define MPRINT(...)
Definition control.c:46
fr_atomic_queue_t * aq
destination AQ
Definition control.c:82
int fr_control_gc(UNUSED fr_control_t *c, fr_ring_buffer_t *rb)
Clean up messages in a control-plane buffer.
Definition control.c:193
ssize_t fr_control_message_pop(fr_atomic_queue_t *aq, uint32_t *p_id, void *data, size_t data_size)
Pop control-plane message.
Definition control.c:377
#define FR_CONTROL_MAX_TYPES
Definition control.c:38
static fr_control_message_t * fr_control_message_alloc(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Allocate a control message.
Definition control.c:245
static int _control_free(fr_control_t *c)
Free a control structure.
Definition control.c:119
bool same_thread
are the two ends in the same thread
Definition control.c:86
fr_event_list_t * el
our event list
Definition control.c:80
void * ctx
context for the callback
Definition control.c:71
fr_control_message_status_t status
status of this message
Definition control.c:63
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition control.c:417
int pipe[2]
our pipes
Definition control.c:84
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:332
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition control.c:143
int fr_control_message_push(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Push a control-plane message.
Definition control.c:292
fr_control_callback_t callback
the function to call
Definition control.c:72
uint32_t id
id of this callback
Definition control.c:70
static void pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Definition control.c:91
fr_control_ctx_t type[FR_CONTROL_MAX_TYPES]
callbacks
Definition control.c:88
size_t data_size
size of the data we're sending
Definition control.c:65
fr_control_message_status_t
Status of control messages.
Definition control.c:52
@ FR_CONTROL_MESSAGE_DONE
the message is done (set only by receiver)
Definition control.c:55
@ FR_CONTROL_MESSAGE_FREE
the message is free
Definition control.c:53
@ FR_CONTROL_MESSAGE_USED
the message is used (set only by originator)
Definition control.c:54
uint32_t id
ID of this message.
Definition control.c:64
int fr_control_same_thread(fr_control_t *c)
Definition control.c:472
int fr_control_callback_delete(fr_control_t *c, uint32_t id)
Delete a callback for an ID.
Definition control.c:454
The header for the control message.
Definition control.c:62
The control structure.
Definition control.c:79
talloc_free(reap)
int fr_event_fd_unarmour(fr_event_list_t *el, int fd, fr_event_filter_t filter, uintptr_t armour)
Unarmour an FD.
Definition event.c:1315
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition event.c:1203
int fr_event_fd_armour(fr_event_list_t *el, int fd, fr_event_filter_t filter, uintptr_t armour)
Armour an FD.
Definition event.c:1285
Stores all information relating to an event list.
Definition event.c:377
unsigned int uint32_t
long int ssize_t
unsigned char uint8_t
unsigned long int size_t
#define fr_assert(_expr)
Definition rad_assert.h:38
uint8_t * fr_ring_buffer_alloc(fr_ring_buffer_t *rb, size_t size)
Mark data as allocated.
int fr_ring_buffer_free(fr_ring_buffer_t *rb, size_t size_to_free)
Mark data as free,.
size_t fr_ring_buffer_used(fr_ring_buffer_t *rb)
Get the amount of data used in a ring buffer.
int fr_ring_buffer_start(fr_ring_buffer_t *rb, uint8_t **p_start, size_t *p_size)
Get a pointer to the data at the start of the ring buffer.
#define FD_CLOEXEC
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
"server local" time.
Definition time.h:69
close(uq->fd)
static fr_event_list_t * el
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1291