The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
control.c
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or
5  * (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: 5f3de7e110fe4b8d0ec8f5b1041450db4d9dc70b $
19  *
20  * @brief Control-plane signaling
21  * @file io/control.c
22  *
23  * @copyright 2016 Alan DeKok (aland@freeradius.org)
24  */
25 RCSID("$Id: 5f3de7e110fe4b8d0ec8f5b1041450db4d9dc70b $")
26 
27 #include <freeradius-devel/io/control.h>
28 #include <freeradius-devel/io/ring_buffer.h>
29 #include <freeradius-devel/util/strerror.h>
30 #include <freeradius-devel/util/syserror.h>
31 #include <freeradius-devel/util/misc.h>
32 #include <freeradius-devel/util/rand.h>
33 
34 #include <fcntl.h>
35 #include <string.h>
36 #include <sys/event.h>
37 
38 #define FR_CONTROL_MAX_TYPES (32)
39 
40 /*
41  * Debugging, mainly for channel_test
42  */
43 #if 0
44 #define MPRINT(...) fprintf(stderr, __VA_ARGS__)
45 #else
46 #define MPRINT(...)
47 #endif
48 
49 /**
50  * Status of control messages
51  */
53  FR_CONTROL_MESSAGE_FREE = 0, //!< the message is free
54  FR_CONTROL_MESSAGE_USED, //!< the message is used (set only by originator)
55  FR_CONTROL_MESSAGE_DONE //!< the message is done (set only by receiver)
57 
58 
59 /**
60  * The header for the control message
61  */
62 typedef struct {
63  fr_control_message_status_t status; //!< status of this message
64  uint32_t id; //!< ID of this message
65  size_t data_size; //!< size of the data we're sending
67 
68 
69 typedef struct {
70  uint32_t id; //!< id of this callback
71  void *ctx; //!< context for the callback
72  fr_control_callback_t callback; //!< the function to call
74 
75 
76 /**
77  * The control structure.
78  */
79 struct fr_control_s {
80  fr_event_list_t *el; //!< our event list
81 
82  fr_atomic_queue_t *aq; //!< destination AQ
83 
84  int pipe[2]; //!< our pipes
85 
86  bool same_thread; //!< are the two ends in the same thread
87 
89 };
90 
91 static void pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
92 {
93  fr_control_t *c = talloc_get_type_abort(uctx, fr_control_t);
94  ssize_t i, num;
95  fr_time_t now;
96  char read_buffer[256];
97  uint8_t data[256];
98 
99  num = read(fd, read_buffer, sizeof(read_buffer));
100  if (num <= 0) return;
101 
102  now = fr_time();
103 
104  for (i = 0; i < num; i++) {
105  uint32_t id = 0;
106  size_t message_size;
107 
108  message_size = fr_control_message_pop(c->aq, &id, data, sizeof(data));
109  if (!message_size) return;
110 
111  if (id >= FR_CONTROL_MAX_TYPES) continue;
112 
113  if (!c->type[id].callback) continue;
114 
115  c->type[id].callback(c->type[id].ctx, data, message_size, now);
116  }
117 }
118 
119 /** Free a control structure
120  *
121  * This function really only calls the underlying "garbage collect".
122  *
123  * @param[in] c the control structure
124  */
126 {
127  (void) talloc_get_type_abort(c, fr_control_t);
128 
129 #ifndef NDEBUG
130  (void) fr_event_fd_unarmour(c->el, c->pipe[0], FR_EVENT_FILTER_IO, (uintptr_t)c);
131 #endif
132  (void) fr_event_fd_delete(c->el, c->pipe[0], FR_EVENT_FILTER_IO);
133 
134  close(c->pipe[0]);
135  close(c->pipe[1]);
136 
137  return 0;
138 }
139 
140 /** Create a control-plane signaling path.
141  *
142  * @param[in] ctx the talloc context
143  * @param[in] el the event list for the control socket
144  * @param[in] aq the atomic queue where we will be pushing message data
145  * @return
146  * - NULL on error
147  * - fr_control_t on success
148  */
150 {
151  fr_control_t *c;
152 
153  c = talloc_zero(ctx, fr_control_t);
154  if (!c) {
155  fr_strerror_const("Failed allocating memory");
156  return NULL;
157  }
158  c->el = el;
159  c->aq = aq;
160 
161  if (pipe((int *) &c->pipe) < 0) {
162  talloc_free(c);
163  fr_strerror_printf("Failed opening pipe for control socket: %s", fr_syserror(errno));
164  return NULL;
165  }
166  talloc_set_destructor(c, _control_free);
167 
168  /*
169  * We don't want reads from the pipe to be blocking.
170  */
171  (void) fcntl(c->pipe[0], F_SETFL, O_NONBLOCK | FD_CLOEXEC);
172  (void) fcntl(c->pipe[1], F_SETFL, O_NONBLOCK | FD_CLOEXEC);
173 
174  if (fr_event_fd_insert(c, NULL, el, c->pipe[0], pipe_read, NULL, NULL, c) < 0) {
175  talloc_free(c);
176  fr_strerror_const_push("Failed adding FD to event list control socket");
177  return NULL;
178  }
179 
180 #ifndef NDEBUG
181  (void) fr_event_fd_armour(c->el, c->pipe[0], FR_EVENT_FILTER_IO, (uintptr_t)c);
182 #endif
183 
184  return c;
185 }
186 
187 
188 /** Clean up messages in a control-plane buffer
189  *
190  * Find the oldest messages which are marked FR_CONTROL_MESSAGE_DONE,
191  * and mark them FR_CONTROL_MESSAGE_FREE.
192  *
193  * @param[in] c the fr_control_t
194  * @param[in] rb the callers ring buffer for message allocation.
195  * @return
196  * - <0 there are still messages used
197  * - 0 the control list is empty.
198  */
200 {
201  while (true) {
202  size_t room, message_size;
204 
205  (void) fr_ring_buffer_start(rb, (uint8_t **) &m, &room);
206  if (room == 0) break;
207 
208  fr_assert(m != NULL);
209  fr_assert(room >= sizeof(*m));
210 
212 
213  if (m->status != FR_CONTROL_MESSAGE_DONE) break;
214 
216 
217  /*
218  * Each message is aligned to a 64-byte boundary,
219  * for cache contention issues.
220  */
221  message_size = sizeof(*m);
222  message_size += m->data_size;
223  message_size += 63;
224  message_size &= ~(size_t) 63;
225  fr_ring_buffer_free(rb, message_size);
226  }
227 
228  /*
229  * Maybe we failed to garbage collect everything?
230  */
231  if (fr_ring_buffer_used(rb) > 0) {
232  fr_strerror_const("Data still in control buffers");
233  return -1;
234  }
235 
236  return 0;
237 }
238 
239 
240 /** Allocate a control message
241  *
242  * @param[in] c the control structure
243  * @param[in] rb the callers ring buffer for message allocation.
244  * @param[in] id the ident of this message.
245  * @param[in] data the data to write to the control plane
246  * @param[in] data_size the size of the data to write to the control plane.
247  * @return
248  * - NULL on error
249  * - fr_message_t on success
250  */
252 {
253  size_t message_size;
255  uint8_t *p;
256 
257  message_size = sizeof(*m);
258  message_size += data_size;
259  message_size += 63;
260  message_size &= ~(size_t) 63;
261 
262  m = (fr_control_message_t *) fr_ring_buffer_alloc(rb, message_size);
263  if (!m) {
264  (void) fr_control_gc(c, rb);
265  m = (fr_control_message_t *) fr_ring_buffer_alloc(rb, message_size);
266  if (!m) {
267  fr_strerror_const_push("Failed allocating from ring buffer");
268  return NULL;
269  }
270  }
271 
273  m->id = id;
274  m->data_size = data_size;
275 
276  p = (uint8_t *) m;
277  memcpy(p + sizeof(*m), data, data_size);
278 
279  return m;
280 
281 }
282 
283 
284 /** Push a control-plane message
285  *
286  * This function is called ONLY from the originating thread.
287  *
288  * @param[in] c the control structure
289  * @param[in] rb the callers ring buffer for message allocation.
290  * @param[in] id the ident of this message.
291  * @param[in] data the data to write to the control plane
292  * @param[in] data_size the size of the data to write to the control plane.
293  * @return
294  * - -2 on ring buffer full
295  * - <0 on error
296  * - 0 on success
297  */
298 int fr_control_message_push(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
299 {
301 
302  (void) talloc_get_type_abort(c, fr_control_t);
303 
304  MPRINT("CONTROL push aq %p\n", c->aq);
305 
306  /*
307  * Get a message. If we can't get one, do garbage
308  * collection. Get another, and if that fails, we're
309  * done.
310  */
311  m = fr_control_message_alloc(c, rb, id, data, data_size);
312  if (!m) {
313  (void) fr_control_gc(c, rb);
314  m = fr_control_message_alloc(c, rb, id, data, data_size);
315  if (!m) {
316  fr_strerror_const("Failed allocating after GC");
317  return -2;
318  }
319  }
320 
321  if (!fr_atomic_queue_push(c->aq, m)) {
323  fr_strerror_const("Failed pushing message to atomic queue.");
324  return -1;
325  }
326 
327  return 0;
328 }
329 
330 /** Send a control-plane message
331  *
332  * This function is called ONLY from the originating thread.
333  *
334  * @param[in] c the control structure
335  * @param[in] rb the callers ring buffer for message allocation.
336  * @param[in] id the ident of this message.
337  * @param[in] data the data to write to the control plane
338  * @param[in] data_size the size of the data to write to the control plane.
339  * @return
340  * - <0 on error
341  * - 0 on success
342  */
343 int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
344 {
345  (void) talloc_get_type_abort(c, fr_control_t);
346 
347  if (c->same_thread) {
348  if (!c->type[id].callback) return -1;
349 
350  c->type[id].callback(c->type[id].ctx, data, data_size, fr_time());
351  return 0;
352  }
353 
354  if (fr_control_message_push(c, rb, id, data, data_size) < 0) return -1;
355 
356  while (write(c->pipe[1], ".", 1) == 0) {
357  /* nothing */
358  }
359 
360  return 0;
361 }
362 
363 
364 /** Pop control-plane message
365  *
366  * This function is called ONLY from the receiving thread.
367  *
368  * @param[in] aq the recipients atomic queue for control-plane messages
369  * @param[out] p_id the ident of this message.
370  * @param[in,out] data where the data is stored
371  * @param[in] data_size the size of the buffer where we store the data.
372  * @return
373  * - <0 the size of the data we need to read the next message
374  * - 0 this kevent is not for us.
375  * - >0 the amount of data we've read
376  */
378 {
379  uint8_t *p;
381 
382  MPRINT("CONTROL pop aq %p\n", aq);
383 
384  if (!fr_atomic_queue_pop(aq, (void **) &m)) return 0;
385 
386  fr_assert_msg(m->status == FR_CONTROL_MESSAGE_USED, "Bad control message state, expected %u got %u",
388 
389  /*
390  * There isn't enough room to store the data, die.
391  */
392  if (data_size < m->data_size) {
393  fr_strerror_printf("Allocation size should be at least %zd", m->data_size);
394  return -(m->data_size);
395  }
396 
397  p = (uint8_t *) m;
398  data_size = m->data_size;
399  memcpy(data, p + sizeof(*m), data_size);
400 
402  *p_id = m->id;
403  return data_size;
404 }
405 
406 
407 /** Register a callback for an ID
408  *
409  * @param[in] c the control structure
410  * @param[in] id the ident of this message.
411  * @param[in] ctx the context for the callback
412  * @param[in] callback the callback function
413  * @return
414  * - <0 on error
415  * - 0 on success
416  */
418 {
419  (void) talloc_get_type_abort(c, fr_control_t);
420 
421  if (id >= FR_CONTROL_MAX_TYPES) {
422  fr_strerror_printf("Failed adding unknown ID %d", id);
423  return -1;
424  }
425 
426  /*
427  * Re-registering the same thing is OK.
428  */
429  if ((c->type[id].ctx == ctx) &&
430  (c->type[id].callback == callback)) {
431  return 0;
432  }
433 
434  if (c->type[id].callback != NULL) {
435  fr_strerror_const("Callback is already set");
436  return -1;
437  }
438 
439  c->type[id].id = id;
440  c->type[id].ctx = ctx;
441  c->type[id].callback = callback;
442 
443  return 0;
444 }
445 
446 /** Delete a callback for an ID
447  *
448  * @param[in] c the control structure
449  * @param[in] id the ident of this message.
450  * @return
451  * - <0 on error
452  * - 0 on success
453  */
455 {
456  (void) talloc_get_type_abort(c, fr_control_t);
457 
458  if (id >= FR_CONTROL_MAX_TYPES) {
459  fr_strerror_printf("Failed adding unknown ID %d", id);
460  return -1;
461  }
462 
463  if (c->type[id].callback == NULL) return 0;
464 
465  c->type[id].id = 0;
466  c->type[id].ctx = NULL;
467  c->type[id].callback = NULL;
468 
469  return 0;
470 }
471 
473 {
474  c->same_thread = true;
475  (void) fr_event_fd_delete(c->el, c->pipe[0], FR_EVENT_FILTER_IO);
476  close(c->pipe[0]);
477  close(c->pipe[1]);
478 
479  /*
480  * Nothing more to do now that everything is gone.
481  */
482  talloc_set_destructor(c, NULL);
483 
484  return 0;
485 }
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
Definition: atomic_queue.c:215
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
Definition: atomic_queue.c:148
Structure to hold the atomic queue.
Definition: atomic_queue.c:54
#define RCSID(id)
Definition: build.h:444
#define UNUSED
Definition: build.h:313
void(* fr_control_callback_t)(void *ctx, void const *data, size_t data_size, fr_time_t now)
Definition: control.h:45
static fr_ring_buffer_t * rb
Definition: control_test.c:51
static fr_atomic_queue_t * aq
Definition: control_test.c:47
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition: debug.h:208
#define fr_event_fd_insert(...)
Definition: event.h:232
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition: event.h:62
#define MPRINT(...)
Definition: control.c:46
fr_atomic_queue_t * aq
destination AQ
Definition: control.c:82
int fr_control_gc(UNUSED fr_control_t *c, fr_ring_buffer_t *rb)
Clean up messages in a control-plane buffer.
Definition: control.c:199
ssize_t fr_control_message_pop(fr_atomic_queue_t *aq, uint32_t *p_id, void *data, size_t data_size)
Pop control-plane message.
Definition: control.c:377
#define FR_CONTROL_MAX_TYPES
Definition: control.c:38
static int _control_free(fr_control_t *c)
Free a control structure.
Definition: control.c:125
static fr_control_message_t * fr_control_message_alloc(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Allocate a control message.
Definition: control.c:251
bool same_thread
are the two ends in the same thread
Definition: control.c:86
fr_event_list_t * el
our event list
Definition: control.c:80
void * ctx
context for the callback
Definition: control.c:71
fr_control_message_status_t status
status of this message
Definition: control.c:63
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition: control.c:149
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition: control.c:417
int pipe[2]
our pipes
Definition: control.c:84
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition: control.c:343
int fr_control_message_push(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Push a control-plane message.
Definition: control.c:298
fr_control_callback_t callback
the function to call
Definition: control.c:72
uint32_t id
id of this callback
Definition: control.c:70
static void pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Definition: control.c:91
fr_control_ctx_t type[FR_CONTROL_MAX_TYPES]
callbacks
Definition: control.c:88
size_t data_size
size of the data we're sending
Definition: control.c:65
fr_control_message_status_t
Status of control messages.
Definition: control.c:52
@ FR_CONTROL_MESSAGE_DONE
the message is done (set only by receiver)
Definition: control.c:55
@ FR_CONTROL_MESSAGE_FREE
the message is free
Definition: control.c:53
@ FR_CONTROL_MESSAGE_USED
the message is used (set only by originator)
Definition: control.c:54
uint32_t id
ID of this message.
Definition: control.c:64
int fr_control_same_thread(fr_control_t *c)
Definition: control.c:472
int fr_control_callback_delete(fr_control_t *c, uint32_t id)
Delete a callback for an ID.
Definition: control.c:454
The header for the control message.
Definition: control.c:62
The control structure.
Definition: control.c:79
talloc_free(reap)
int fr_event_fd_unarmour(fr_event_list_t *el, int fd, fr_event_filter_t filter, uintptr_t armour)
Unarmour an FD.
Definition: event.c:1365
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition: event.c:1253
int fr_event_fd_armour(fr_event_list_t *el, int fd, fr_event_filter_t filter, uintptr_t armour)
Armour an FD.
Definition: event.c:1335
Stores all information relating to an event list.
Definition: event.c:411
unsigned int uint32_t
Definition: merged_model.c:33
long int ssize_t
Definition: merged_model.c:24
unsigned char uint8_t
Definition: merged_model.c:30
unsigned long int size_t
Definition: merged_model.c:25
int fr_ring_buffer_free(fr_ring_buffer_t *rb, size_t size_to_free)
Mark data as free,.
Definition: ring_buffer.c:304
size_t fr_ring_buffer_used(fr_ring_buffer_t *rb)
Get the amount of data used in a ring buffer.
Definition: ring_buffer.c:437
int fr_ring_buffer_start(fr_ring_buffer_t *rb, uint8_t **p_start, size_t *p_size)
Get a pointer to the data at the start of the ring buffer.
Definition: ring_buffer.c:464
uint8_t * fr_ring_buffer_alloc(fr_ring_buffer_t *rb, size_t size)
Mark data as allocated.
Definition: ring_buffer.c:196
#define FD_CLOEXEC
fr_assert(0)
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition: syserror.c:243
"server local" time.
Definition: time.h:69
close(uq->fd)
static fr_event_list_t * el
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition: strerror.h:64
#define fr_strerror_const_push(_msg)
Definition: strerror.h:227
#define fr_strerror_const(_msg)
Definition: strerror.h:223
static fr_slen_t data
Definition: value.h:1259