The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
message.c
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or
5  * (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: 986864d4ac3fcfcc45754b9048f12929e9f54319 $
19  *
20  * @brief Messages for inter-thread communication
21  * @file io/message.c
22  *
23  * @copyright 2016 Alan DeKok (aland@freeradius.org)
24  */
25 RCSID("$Id: 986864d4ac3fcfcc45754b9048f12929e9f54319 $")
26 
27 #include <freeradius-devel/io/message.h>
28 #include <freeradius-devel/util/strerror.h>
29 
30 #include <string.h>
31 
32 /*
33  * Debugging, mainly for message_set_test
34  */
35 #if 0
36 #define MPRINT(...) fprintf(stderr, __VA_ARGS__)
37 #else
38 #define MPRINT(...)
39 #endif
40 
41 #define MSG_ARRAY_SIZE (16)
42 
43 #define CACHE_ALIGN(_x) do { _x += 63; _x &= ~(size_t) 63; } while (0)
44 
45 /** A Message set, composed of message headers and ring buffer data.
46  *
47  * A message set is intended to send short-lived messages. The
48  * message headers are fixed in size, and allocated from an array
49  * which is treated like a circular buffer. Message bodies (i.e. raw
50  * packets) are variable in size, and live in a separate ring buffer.
51  *
52  * The message set starts off with a small array of message headers,
53  * and a small ring buffer. If an array/buffer fills up, a new one
54  * is allocated at double the size of the previous one.
55  *
56  * The array / buffers are themselves kept in fixed-size arrays, of
57  * MSG_ARRAY_SIZE. The reason is that the memory for fr_message_set_t
58  * should be contiguous, and not in a linked list scattered in
59  * memory.
60  *
61  * The originator allocates a message, and sends it to a recipient.
62  * The recipient (usually in another thread) uses the message, and
63  * marks it as FR_MESSAGE_DONE. The originator then asynchronously
64  * cleans up the message.
65  *
66  * This asynchronous cleanup is done via self-clocking. If there is
67  * no need to clean up the messages, it isn't done. Only when we run
68  * out of space to store messages (or packets) is the cleanup done.
69  *
70  * This cleanup latency ensures that we don't have cache line
71  * bouncing, where the originator sends the message, and then while
72  * the recipieent is reading it... thrashes the cache line with
73  * checks for "are you done? Are you done?"
74  *
75  * If there are more than one used entry in either array, we then try
76  * to coalesce the buffers on cleanup. If we discover that one array
77  * is empty, we discard it, and move the used array entries into it's
78  * place.
79  *
80  * This process ensures that we don't have too many buffers in
81  * progress. It is better to have a few large buffers than many
82  * small ones.
83  *
84  * MSG_ARRAY_SIZE is defined to be large (16 doublings) to allow for
85  * the edge case where messages are stuck for long periods of time.
86  *
87  * With an initial message array size of 64, this gives us room for
88  * 2M packets, if *all* of the mr_array entries have packets stuck in
89  * them that aren't cleaned up for extended periods of time.
90  *
91  * @todo Add a flag for UDP-style protocols, where we can put the
92  * message into the ring buffer. This helps with locality of
93  * reference, and removes the need to track two separate things.
94  */
96  int mr_current; //!< current used message ring entry
97  int mr_max; //!< max used message ring entry
98 
99  size_t message_size; //!< size of the callers message, including fr_message_t
100 
101  int mr_cleaned; //!< where we last cleaned
102 
103  int rb_current; //!< current used ring buffer entry
104  int rb_max; //!< max used ring buffer entry
105 
106  size_t max_allocation; //!< maximum allocation size
107 
109  int freed;
110 
111  fr_ring_buffer_t *mr_array[MSG_ARRAY_SIZE]; //!< array of message arrays
112 
113  fr_ring_buffer_t *rb_array[MSG_ARRAY_SIZE]; //!< array of ring buffers
114 };
115 
116 
117 /** Create a message set
118  *
119  * @param[in] ctx the context for talloc
120  * @param[in] num_messages size of the initial message array. MUST be a power of 2.
121  * @param[in] message_size the size of each message, INCLUDING fr_message_t, which MUST be at the start of the struct
122  * @param[in] ring_buffer_size of the ring buffer. MUST be a power of 2.
123  * @return
124  * - NULL on error
125  * - newly allocated fr_message_set_t on success
126  */
127 fr_message_set_t *fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
128 {
129  fr_message_set_t *ms;
130 
131  /*
132  * Too small, or not a power of 2.
133  */
134  if (num_messages < 8) num_messages = 8;
135 
136  if ((num_messages & (num_messages - 1)) != 0) {
137  fr_strerror_const("Number of messages must be a power of 2");
138  return NULL;
139  }
140 
141  if (message_size < sizeof(fr_message_t)) {
142  fr_strerror_printf("Message size must be at least %zd", sizeof(fr_message_t));
143  return NULL;
144  }
145 
146  if (message_size > 1024) {
147  fr_strerror_const("Message size must be no larger than 1024");
148  return NULL;
149  }
150 
151  ms = talloc_zero(ctx, fr_message_set_t);
152  if (!ms) {
153  fr_strerror_const("Failed allocating memory");
154  return NULL;
155  }
156 
157  CACHE_ALIGN(message_size);
158  ms->message_size = message_size;
159 
160  ms->rb_array[0] = fr_ring_buffer_create(ms, ring_buffer_size);
161  if (!ms->rb_array[0]) {
162  talloc_free(ms);
163  return NULL;
164  }
165  ms->rb_max = 0;
166 
167  ms->mr_array[0] = fr_ring_buffer_create(ms, num_messages * message_size);
168  if (!ms->mr_array[0]) {
169  talloc_free(ms);
170  return NULL;
171  }
172 
173  ms->max_allocation = ring_buffer_size / 2;
174 
175  return ms;
176 }
177 
178 
179 /** Mark a message as done
180  *
181  * Note that this call is usually done from a thread OTHER than the
182  * originator of the message. As such, the message is NOT actually
183  * freed. Instead, it is just marked as freed.
184  *
185  * @param[in] m the message to make as done.
186  * @return
187  * - <0 on error
188  * - 0 on success
189  */
191 {
194 
195  /*
196  * Mark a message as freed. The originator will take
197  * care of cleaning it up.
198  */
199  if (m->status == FR_MESSAGE_USED) {
200  m->status = FR_MESSAGE_DONE;
201  return 0;
202  }
203 
204  /*
205  * This message was localized, so we can free it via
206  * talloc.
207  */
208  if (m->status == FR_MESSAGE_LOCALIZED) {
209  talloc_free(m);
210  return 0;
211  }
212 
213  /*
214  * A catastrophic error.
215  */
216  fr_assert(0 == 1);
217 
218  fr_strerror_const("Failed marking message as done");
219  return -1;
220 }
221 
222 
223 /** Localize a message by copying it to local storage
224  *
225  * This function "localizes" a message by copying it to local
226  * storage. In the case where the recipient of a message has to sit
227  * on it for a while, that blocks the originator from cleaning up the
228  * message. The recipient can then copy the message to local
229  * storage, so that the originator can clean it up.
230  *
231  * The localized message is marked as FR_MESSAGE_LOCALIZED, so that
232  * the recipient can call the normal fr_message_done() function to
233  * free it.
234  *
235  * @param[in] ctx the talloc context to use for localization
236  * @param[in] m the message to be localized
237  * @param[in] message_size the size of the message, including the fr_message_t
238  * @return
239  * - NULL on allocation error
240  * - a newly localized message
241  */
242 fr_message_t *fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
243 {
244  fr_message_t *l;
245 
246  if (m->status != FR_MESSAGE_USED) {
247  fr_strerror_const("Cannot localize message unless it is in use");
248  return NULL;
249  }
250 
251  if (message_size <= sizeof(fr_message_t)) {
252  fr_strerror_const("Message size is too small");
253  return NULL;
254  }
255 
256  l = talloc_memdup(ctx, m, message_size);
257  if (!l) {
258  nomem:
259  fr_strerror_const("Failed allocating memory");
260  return NULL;
261  }
262 
263  l->data = NULL;
264 
265  if (l->data_size) {
266  l->data = talloc_memdup(l, m->data, l->data_size);
267  if (!l->data) {
268  talloc_free(l);
269  goto nomem;
270  }
271  }
272 
274 
275  /*
276  * After this change, "m" should not be used for
277  * anything.
278  */
279  m->status = FR_MESSAGE_DONE;
280 
281  /*
282  * Now clean up the other fields of the newly localized
283  * message.
284  */
285  l->rb = NULL;
286  l->rb_size = 0;
287 
288  return l;
289 }
290 
291 
292 /** Clean up messages in a message ring.
293  *
294  * Find the oldest messages which are marked FR_MESSAGE_DONE,
295  * and mark them FR_MESSAGE_FREE.
296  *
297  * FIXME: If we care, track which ring buffer is in use, and how
298  * many contiguous chunks we can free. Then, free the chunks at
299  * once, instead of piecemeal. Realistically tho... this will
300  * probably make little difference.
301  *
302  * @param[in] ms the message set
303  * @param[in] mr the message ring
304  * @param[in] max_to_clean maximum number of messages to clean at a time.
305  */
306 static int fr_message_ring_gc(fr_message_set_t *ms, fr_ring_buffer_t *mr, int max_to_clean)
307 {
308  int messages_cleaned = 0;
309  size_t size;
310  fr_message_t *m;
311 
312  while (true) {
313  (void) fr_ring_buffer_start(mr, (uint8_t **) &m, &size);
314  if (size == 0) break;
315 
316  fr_assert(m != NULL);
317  fr_assert(size >= ms->message_size);
318 
320  if (m->status != FR_MESSAGE_DONE) break;
321 
322  messages_cleaned++;
323  m->status = FR_MESSAGE_FREE;
324  ms->freed++;
325 
326  if (m->rb) {
327  (void) fr_ring_buffer_free(m->rb, m->rb_size);
328 #ifndef NDEBUG
329  memset(m, 0, ms->message_size);
330 #endif
331  }
332 
334 
335  if (messages_cleaned >= max_to_clean) break;
336  }
337 
338  MPRINT("CLEANED %d (%p) left\n", messages_cleaned, mr);
339  return messages_cleaned;
340 }
341 
342 
343 /** Garbage collect "done" messages.
344  *
345  * Called only from the originating thread. We also clean a limited
346  * number of messages at a time, so that we don't have sudden latency
347  * spikes when cleaning 1M messages.
348  *
349  * @param[in] ms the message set
350  * @param[in] max_to_clean the maximum number of messages to clean
351  */
352 static void fr_message_gc(fr_message_set_t *ms, int max_to_clean)
353 {
354  int i;
355  int arrays_freed, arrays_used, empty_slot;
356  int largest_free_slot;
357  int total_cleaned;
358  size_t largest_free_size;
359 
360  /*
361  * Clean up "done" messages.
362  */
363  total_cleaned = 0;
364 
365  /*
366  * Garbage collect the smaller buffers first.
367  */
368  for (i = 0; i <= ms->mr_max; i++) {
369  int cleaned;
370 
371  cleaned = fr_message_ring_gc(ms, ms->mr_array[i], max_to_clean - total_cleaned);
372  total_cleaned += cleaned;
373  fr_assert(total_cleaned <= max_to_clean);
374 
375  /*
376  * Stop when we've reached our GC limit.
377  */
378  if (total_cleaned == max_to_clean) break;
379  }
380 
381  /*
382  * Couldn't GC anything. Don't do more work.
383  */
384  if (total_cleaned == 0) return;
385 
386  arrays_freed = 0;
387  arrays_used = 0;
388 
389  /*
390  * Keep the two largest message buffers (used or not),
391  * and free all smaller ones which are empty.
392  */
393  for (i = ms->mr_max; i >= 0; i--) {
394  fr_assert(ms->mr_array[i] != NULL);
395 
396  if (arrays_used < 2) {
397  MPRINT("\tleaving entry %d alone\n", i);
398  arrays_used++;
399  continue;
400  }
401 
402  /*
403  * If the message ring buffer is empty, check if
404  * we should perhaps delete it.
405  */
406  if (fr_ring_buffer_used(ms->mr_array[i]) == 0) {
407  MPRINT("\tfreeing entry %d\n", i);
408  TALLOC_FREE(ms->mr_array[i]);
409  arrays_freed++;
410  continue;
411  }
412 
413  MPRINT("\tstill in use entry %d\n", i);
414  }
415 
416  /*
417  * Some entries have been freed. We need to coalesce the
418  * remaining entries.
419  */
420  if (arrays_freed) {
421  MPRINT("TRYING TO PACK from %d free arrays out of %d\n", arrays_freed, ms->rb_max + 1);
422 
423  empty_slot = -1;
424 
425  /*
426  * Pack the rb array by moving used entries to
427  * the bottom of the array.
428  */
429  for (i = 0; i <= ms->mr_max; i++) {
430  int j;
431 
432  /*
433  * Skip over empty entries, but set
434  * "empty_slot" to the first empty on we
435  * found.
436  */
437  if (!ms->mr_array[i]) {
438  if (empty_slot < 0) empty_slot = i;
439 
440  continue;
441  }
442 
443  /*
444  * This array entry is used, but there is
445  * no empty slot to put it into. Ignore
446  * it, and continue
447  */
448  if (empty_slot < 0) continue;
449 
450  fr_assert(ms->mr_array[empty_slot] == NULL);
451 
452  ms->mr_array[empty_slot] = ms->mr_array[i];
453  ms->mr_array[i] = NULL;
454 
455  /*
456  * Find the next empty slot which is
457  * greater than the one we just used.
458  */
459  for (j = empty_slot + 1; j <= i; j++) {
460  if (!ms->mr_array[j]) {
461  empty_slot = j;
462  break;
463  }
464  }
465  }
466 
467  /*
468  * Lower max, and set current to the largest
469  * array, whether or not it's used.
470  */
471  ms->mr_max -= arrays_freed;
472  ms->mr_current = ms->mr_max;
473 
474 #ifndef NDEBUG
475  MPRINT("NUM RB ARRAYS NOW %d\n", ms->mr_max + 1);
476  for (i = 0; i <= ms->mr_max; i++) {
477  MPRINT("\t%d %p\n", i, ms->mr_array[i]);
478  fr_assert(ms->mr_array[i] != NULL);
479  }
480 #endif
481  }
482 
483  /*
484  * And now we do the same thing for the ring buffers.
485  * Except that freeing the messages above also cleaned up
486  * the contents of each ring buffer, so all we need to do
487  * is find the largest empty ring buffer.
488  *
489  * We do this by keeping the two largest ring buffers
490  * (used or not), and then freeing all smaller ones which
491  * are empty.
492  */
493  arrays_used = 0;
494  arrays_freed = 0;
495  MPRINT("TRYING TO FREE ARRAYS %d\n", ms->rb_max);
496  for (i = ms->rb_max; i >= 0; i--) {
497  fr_assert(ms->rb_array[i] != NULL);
498 
499  if (arrays_used < 2) {
500  MPRINT("\tleaving entry %d alone\n", i);
501  arrays_used++;
502  continue;
503  }
504 
505  if (fr_ring_buffer_used(ms->rb_array[i]) == 0) {
506  MPRINT("\tfreeing entry %d\n", i);
507  TALLOC_FREE(ms->rb_array[i]);
508  arrays_freed++;
509  continue;
510  }
511 
512  MPRINT("\tstill in use entry %d\n", i);
513  }
514 
515  /*
516  * Pack the array entries back down.
517  */
518  if (arrays_freed > 0) {
519  MPRINT("TRYING TO PACK from %d free arrays out of %d\n", arrays_freed, ms->rb_max + 1);
520 
521  empty_slot = -1;
522 
523  /*
524  * Pack the rb array by moving used entries to
525  * the bottom of the array.
526  */
527  for (i = 0; i <= ms->rb_max; i++) {
528  int j;
529 
530  /*
531  * Skip over empty entries, but set
532  * "empty_slot" to the first empty on we
533  * found.
534  */
535  if (!ms->rb_array[i]) {
536  if (empty_slot < 0) empty_slot = i;
537 
538  continue;
539  }
540 
541  /*
542  * This array entry is used, but there is
543  * no empty slot to put it into. Ignore
544  * it, and continue
545  */
546  if (empty_slot < 0) continue;
547 
548  fr_assert(ms->rb_array[empty_slot] == NULL);
549 
550  ms->rb_array[empty_slot] = ms->rb_array[i];
551  ms->rb_array[i] = NULL;
552 
553  /*
554  * Find the next empty slot which is
555  * greater than the one we just used.
556  */
557  for (j = empty_slot + 1; j <= i; j++) {
558  if (!ms->rb_array[j]) {
559  empty_slot = j;
560  break;
561  }
562  }
563  }
564 
565  /*
566  * Lower max, and set current to the largest
567  * array, whether or not it's used.
568  */
569  ms->rb_max -= arrays_freed;
570  ms->rb_current = ms->rb_max;
571 
572 #ifndef NDEBUG
573  MPRINT("NUM RB ARRAYS NOW %d\n", ms->rb_max + 1);
574  for (i = 0; i <= ms->rb_max; i++) {
575  MPRINT("\t%d %p\n", i, ms->rb_array[i]);
576  fr_assert(ms->rb_array[i] != NULL);
577  }
578 #endif
579  }
580 
581  /*
582  * Set the current ring buffer to the one with the
583  * largest free space in it.
584  *
585  * This is different from the allocation strategy for
586  * messages.
587  */
588  if (!fr_cond_assert(ms->rb_array[ms->rb_max] != NULL)) return;
589 
590  largest_free_slot = ms->rb_max;
591  largest_free_size = (fr_ring_buffer_size(ms->rb_array[ms->rb_max]) -
593 
594  for (i = 0; i < ms->rb_max; i++) {
595  size_t free_size;
596 
597  fr_assert(ms->rb_array[i] != NULL);
598 
599  free_size = (fr_ring_buffer_size(ms->rb_array[i]) -
600  fr_ring_buffer_used(ms->rb_array[i]));
601  if (largest_free_size < free_size) {
602  largest_free_slot = i;
603  largest_free_size = free_size;
604  }
605  }
606 
607  ms->rb_current = largest_free_slot;
608  fr_assert(ms->rb_current >= 0);
609  fr_assert(ms->rb_current <= ms->rb_max);
610 }
611 
612 /** Allocate a message from a message ring.
613  *
614  * The newly allocated message is zeroed.
615  *
616  * @param[in] ms the message set
617  * @param[in] mr the message ring to allocate from
618  * @param[in] clean whether to clean the message ring
619  * @return
620  * - NULL on failed allocation
621  * - fr_message_t* on successful allocation.
622  */
624 {
625  fr_message_t *m;
626 
627  /*
628  * We're at the start of a buffer with data, and there's
629  * no room. Do a quick check to see if we can free up
630  * the oldest entry. If not, return.
631  *
632  * Otherwise, fall through to allocating a entry, of
633  * which there must now be at least one free one.
634  *
635  * This check results in a small amount of cache line
636  * thrashing. But if the buffer is full, it's likely
637  * that the oldest entry can be freed. If not, we have a
638  * small amount of cache thrashing, which should be
639  * extremely rare.
640  */
641  if (clean) {
642  if (fr_message_ring_gc(ms, mr, 4) == 0) {
643  fr_strerror_const("No free memory after GC attempt");
644  return NULL;
645  }
646 
647  /*
648  * Else we cleaned up some entries in this array.
649  * Go allocate a message.
650  */
651  }
652 
653  /*
654  * Grab a new message from the underlying ring buffer.
655  */
657  if (!m) return NULL;
658 
659 #ifndef NDEBUG
660  memset(m, 0, ms->message_size);
661 #endif
662  m->status = FR_MESSAGE_USED;
663  return m;
664 }
665 
666 /** Allocate a fr_message_t, WITHOUT a ring buffer.
667  *
668  * @param[in] ms the message set
669  * @param[out] p_cleaned a flag to indicate if we cleaned the message array
670  * @return
671  * - NULL on error
672  * - fr_message_t* on success
673  */
675 {
676  int i;
677  fr_message_t *m;
678  fr_ring_buffer_t *mr;
679 
680  ms->allocated++;
681  *p_cleaned = false;
682 
683  /*
684  * Grab the current message array. In the general case,
685  * there's room, so we grab a message and go find a ring
686  * buffer.
687  */
688  mr = ms->mr_array[ms->mr_current];
690  if (m) {
691  memset(m, 0, ms->message_size);
692  m->status = FR_MESSAGE_USED;
693  MPRINT("ALLOC normal\n");
694  return m;
695  }
696 
697  MPRINT("CLEANING UP (%zd - %zd = %zd)\n", ms->allocated, ms->freed,
698  ms->allocated - ms->freed);
699 
700  /*
701  * Else the buffer is full. Do a global cleanup.
702  */
703  fr_message_gc(ms, 128);
704  *p_cleaned = true;
705 
706  /*
707  * If we're lucky, the cleanup has given us a new
708  * "current" buffer, which is empty. If so, use it.
709  */
710  mr = ms->mr_array[ms->mr_current];
711  m = fr_message_ring_alloc(ms, mr, true);
712  if (m) {
713  MPRINT("ALLOC after cleanup\n");
714  return m;
715  }
716 
717  /*
718  * We've tried two allocations, and both failed. Brute
719  * force over all arrays, trying to allocate one
720  * somewhere... anywhere. We start from the largest
721  * array, because that is the one we want to use the
722  * most.
723  *
724  * We want to avoid allocations in the smallest array,
725  * because that array will quickly wrap, and will cause
726  * us to do cleanups more often. That also lets old
727  * entries in the smallest array age out, so that we can
728  * free the smallest arrays.
729  */
730  for (i = ms->mr_max; i >= 0; i--) {
731  mr = ms->mr_array[i];
732 
733  m = fr_message_ring_alloc(ms, mr, true);
734  if (m) {
735  ms->mr_current = i;
736  MPRINT("ALLOC from changed ring buffer\n");
737  MPRINT("SET MR to changed %d\n", ms->mr_current);
738  return m;
739  }
740  }
741 
742  /*
743  * All of the arrays are full. If we don't have
744  * room to allocate another array, we're dead.
745  */
746  if ((ms->mr_max + 1) >= MSG_ARRAY_SIZE) {
747  fr_strerror_const("All message arrays are full");
748  return NULL;
749  }
750 
751  /*
752  * Allocate another message ring, double the size
753  * of the previous maximum.
754  */
756  if (!mr) {
757  fr_strerror_const_push("Failed allocating ring buffer");
758  return NULL;
759  }
760 
761  /*
762  * Set the new one as current for all new
763  * allocations, allocate a message, and go try to
764  * reserve room for the raw packet data.
765  */
766  ms->mr_max++;
767  ms->mr_current = ms->mr_max;
768  ms->mr_array[ms->mr_max] = mr;
769 
770  MPRINT("SET MR to doubled %d\n", ms->mr_current);
771 
772  /*
773  * And we should now have an entirely empty message ring.
774  */
775  m = fr_message_ring_alloc(ms, mr, false);
776  if (!m) return NULL;
777 
778  MPRINT("ALLOC after doubled message ring\n");
779 
780  return m;
781 }
782 
783 
784 /** Get a ring buffer for a message
785  *
786  * @param[in] ms the message set
787  * @param[in] m the message
788  * @param[in] cleaned_up whether the message set was partially garbage collected
789  * @return
790  * - NULL on error, and m is deallocated
791  * - m on success
792  */
794  bool cleaned_up)
795 {
796  int i;
798 
799  /*
800  * And... we go through a bunch of hoops, all over again.
801  */
802  m->rb = ms->rb_array[ms->rb_current];
803  fr_assert(m->rb != NULL);
804  m->data = fr_ring_buffer_reserve(m->rb, m->rb_size);
805  if (m->data) return m;
806 
807  /*
808  * When the simple allocation fails, ensure we don't do
809  * the cleanup twice in one allocation.
810  */
811  if (!cleaned_up) {
812  /*
813  * If we run out of room in the current ring
814  * buffer, AND it's our only one, then just
815  * double it in size.
816  */
817  if (ms->rb_max == 0) goto alloc_rb;
818 
819  /*
820  * We're using multiple ring buffers, and we
821  * haven't already done a cleanup. Force a
822  * cleanup.
823  */
824  MPRINT("CLEANED UP BECAUSE OF RING BUFFER (%zd - %zd = %zd)\n", ms->allocated, ms->freed,
825  ms->allocated - ms->freed);
826 
827  fr_message_gc(ms, 128);
828 
829  /*
830  * Try to allocate the packet from the newly current ring buffer.
831  */
832  m->rb = ms->rb_array[ms->rb_current];
833  fr_assert(m->rb != NULL);
834  m->data = fr_ring_buffer_reserve(m->rb, m->rb_size);
835  if (m->data) return m;
836 
837  MPRINT("CLEANUP RING BUFFER FAILED\n");
838  }
839 
840  /*
841  * We've tried two allocations, and both failed. Brute
842  * force over all arrays, trying to allocate one
843  * somewhere... anywhere. We start from the largest
844  * array, because that is the one we want to use the
845  * most.
846  *
847  * We want to avoid allocations in the smallest array,
848  * because that array will quickly wrap, and will cause
849  * us to do cleanups more often. That also lets old
850  * entries in the smallest array age out, so that we can
851  * free the smallest arrays.
852  */
853  for (i = ms->rb_max; i >= 0; i--) {
854  m->rb = ms->rb_array[i];
855  fr_assert(m->rb != NULL);
856  m->data = fr_ring_buffer_reserve(m->rb, m->rb_size);
857  if (m->data) {
858  MPRINT("MOVED TO RING BUFFER %d\n", i);
859  ms->rb_current = i;
860  return m;
861  }
862  }
863 
864  /*
865  * All of the arrays are full. If we don't have
866  * room to allocate another array, we're dead.
867  */
868  if ((ms->rb_max + 1) >= MSG_ARRAY_SIZE) {
869  fr_strerror_const("Message arrays are full");
870  goto cleanup;
871  }
872 
873 alloc_rb:
874  /*
875  * Allocate another message ring, double the size
876  * of the previous maximum.
877  */
879  if (!rb) {
880  fr_strerror_const_push("Failed allocating ring buffer");
881  goto cleanup;
882  }
883 
884  MPRINT("RING BUFFER DOUBLES\n");
885 
886  /*
887  * Set the new one as current for all new
888  * allocations, allocate a message, and go try to
889  * reserve room for the raw packet data.
890  */
891  ms->rb_max++;
892  ms->rb_current = ms->rb_max;
893  ms->rb_array[ms->rb_current] = rb;
894 
895  /*
896  * And we should now have an entirely empty message ring.
897  */
898  m->rb = rb;
899  m->data = fr_ring_buffer_reserve(m->rb, m->rb_size);
900  if (m->data) return m;
901 
902 cleanup:
903  MPRINT("OUT OF MEMORY\n");
904 
905  m->rb = NULL;
906  m->status = FR_MESSAGE_DONE;
907  return NULL;
908 }
909 
910 
911 /** Reserve a message
912  *
913  * A later call to fr_message_alloc() will allocate the correct
914  * packet ring buffer size. This call just allocates a message
915  * header, and reserves space for the packet.
916  *
917  * If the caller later decides that the message is not needed, he
918  * should call fr_message_free() to free the message.
919  *
920  * We assume that the caller will call fr_message_reserve(), and then
921  * almost immediately fr_message_alloc(). Multiple calls in series
922  * to fr_message_reserve() MUST NOT be done. The caller could also
923  * just call fr_ring_buffer_alloc(m->rb, size) if they wanted, and
924  * then update m->data_size by hand...
925  *
926  * The message is returned
927  *
928  * @param[in] ms the message set
929  * @param[in] reserve_size to reserve
930  * @return
931  * - NULL on error
932  * - fr_message_t* on success
933  */
935 {
936  bool cleaned_up;
937  fr_message_t *m;
938 
939  (void) talloc_get_type_abort(ms, fr_message_set_t);
940 
941  if (reserve_size > ms->max_allocation) {
942  fr_strerror_printf("Cannot reserve %zd > max allocation %zd\n", reserve_size, ms->max_allocation);
943  return NULL;
944  }
945 
946  /*
947  * Allocate a bare message.
948  */
949  m = fr_message_get_message(ms, &cleaned_up);
950  if (!m) {
951  MPRINT("Failed to reserve message\n");
952  return NULL;
953  }
954 
955  /*
956  * If the caller is not allocating any packet data, just
957  * return the empty message.
958  */
959  if (!reserve_size) return m;
960 
961  /*
962  * We leave m->data_size as zero, and m->rb_size as the
963  * reserved size. This indicates that the message has
964  * reserved room for the packet data, but nothing has
965  * been allocated.
966  */
968  m->rb_size = reserve_size;
969 
970  return fr_message_get_ring_buffer(ms, m, cleaned_up);
971 }
972 
973 /** Allocate packet data for a message
974  *
975  * The caller will normally call fr_message_reserve() before calling
976  * this function, and pass the resulting message 'm' here. If 'm' is
977  * NULL, however, this function will call fr_message_reserve() of
978  * 'actual_packet_size'. This capability is there for callers who
979  * know the size of the message in advance.
980  *
981  * @param[in] ms the message set
982  * @param[in] m the message message to allocate packet data for
983  * @param[in] actual_packet_size to use
984  * @return
985  * - NULL on error, and input message m is left alone
986  * - fr_message_t* on success. Will always be input message m.
987  */
988 fr_message_t *fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
989 {
990  uint8_t *p;
991  size_t reserve_size;
992 
993  (void) talloc_get_type_abort(ms, fr_message_set_t);
994 
995  /* m is NOT talloc'd */
996 
997  if (!m) {
998  m = fr_message_reserve(ms, actual_packet_size); /* will cache align it */
999  if (!m) return NULL;
1000  }
1001 
1003  fr_assert(m->rb != NULL);
1004  fr_assert(m->data != NULL);
1005  fr_assert(m->data_size == 0);
1006  fr_assert(m->rb_size >= actual_packet_size);
1007 
1008  /*
1009  * No data to send? Just send a bare message;
1010  */
1011  if (actual_packet_size == 0) {
1012  m->data = NULL;
1013  m->rb = NULL;
1014  m->data_size = m->rb_size = 0;
1015  return m;
1016  }
1017 
1018  reserve_size = actual_packet_size;
1020 
1022  fr_assert(p != NULL);
1023  if (!p) {
1024  fr_strerror_const_push("Failed allocating from ring buffer");
1025  return NULL;
1026  }
1027 
1028  fr_assert(p == m->data);
1029 
1030  m->data_size = actual_packet_size;
1031  m->rb_size = reserve_size;
1032 
1033  return m;
1034 }
1035 
1036 /** Allocate packet data for a message, and reserve a new message
1037  *
1038  * This function allocates a previously reserved message, and then
1039  * reserves a new message.
1040  *
1041  * The application should call fr_message_reserve() with a large
1042  * buffer, and then read data into the buffer. If the buffer
1043  * contains multiple packets, the application should call
1044  * fr_message_alloc_reserve() repeatedly to allocate the full
1045  * packets, while reserving room for the partial packet.
1046  *
1047  * When the application is determines that there is only one full
1048  * packet, and one partial packet in the buffer, it should call this
1049  * function with actual_packet_size, and a large reserve_size. The
1050  * partial packet will be reserved. If the ring buffer is full, the
1051  * partial packet will be copied to a new ring buffer.
1052  *
1053  * When the application determines that there are multiple full
1054  * packets in the buffer, it should call this function with
1055  * actual_packet_size for each buffer, and reserve_size which
1056  * reserves all of the data in the buffer. i.e. the full packets +
1057  * partial packets, which should start off as the original
1058  * reserve_size.
1059  *
1060  * The application should call this function to allocate each packet,
1061  * while decreasing reserve_size by each actual_packet_size that was
1062  * allocated. Once there is only one full and a partial packet in
1063  * the buffer, it should use a large reserve_size, as above.
1064  *
1065  * The application could just always ecall this function with a large
1066  * reserve_size, at the cost of substantially more memcpy()s.
1067  *
1068  * @param[in] ms the message set
1069  * @param[in] m the message message to allocate packet data for
1070  * @param[in] actual_packet_size to use
1071  * @param[in] leftover "dirty" bytes in the buffer
1072  * @param[in] reserve_size to reserve for new message
1073  * @return
1074  * - NULL on error, and input message m is left alone
1075  * - fr_message_t* on success. Will always be a new message.
1076  */
1078  size_t leftover, size_t reserve_size)
1079 {
1080  bool cleaned_up;
1081  uint8_t *p;
1082  fr_message_t *m2;
1083  size_t m_rb_size, align_size;
1084 
1085  (void) talloc_get_type_abort(ms, fr_message_set_t);
1086 
1087  align_size = actual_packet_size;
1088  CACHE_ALIGN(align_size);
1089 
1090  /* m is NOT talloc'd */
1091 
1093  fr_assert(m->rb != NULL);
1094  fr_assert(m->data != NULL);
1095  fr_assert(m->rb_size >= actual_packet_size);
1096 
1097  p = fr_ring_buffer_alloc(m->rb, align_size);
1098  fr_assert(p != NULL);
1099  if (!p) {
1100  fr_strerror_const_push("Failed allocating from ring buffer");
1101  return NULL;
1102  }
1103 
1104  fr_assert(p == m->data);
1105 
1106  m_rb_size = m->rb_size; /* for ring buffer cleanups */
1107 
1108  m->data_size = actual_packet_size;
1109  m->rb_size = align_size;
1110 
1111  /*
1112  * If we've allocated all of the reserved ring buffer
1113  * data, then just reserve a brand new reservation.
1114  *
1115  * This will be automatically cache aligned.
1116  */
1117  if (!leftover) return fr_message_reserve(ms, reserve_size);
1118 
1119  /*
1120  * Allocate a new message.
1121  */
1122  m2 = fr_message_get_message(ms, &cleaned_up);
1123  if (!m2) return NULL;
1124 
1125  /*
1126  * Ensure that there's enough room to shift the next
1127  * packet, so that it's cache aligned. Moving small
1128  * amounts of memory is likely faster than having two
1129  * CPUs fight over the same cache lines.
1130  */
1131  reserve_size += (align_size - actual_packet_size);
1133 
1134  /*
1135  * Track how much data there is in the packet.
1136  */
1137  m2->rb = m->rb;
1138  m2->data_size = leftover;
1139  m2->rb_size = reserve_size;
1140 
1141  /*
1142  * Try to extend the reservation. If we can do it,
1143  * return.
1144  */
1146  if (m2->data) {
1147  /*
1148  * The next packet pointer doesn't point to the
1149  * actual data after the current packet. Move
1150  * the next packet to match up with the ring
1151  * buffer allocation.
1152  */
1153  if (m2->data != (m->data + actual_packet_size)) {
1154  memmove(m2->data, m->data + actual_packet_size, leftover);
1155  }
1156  return m2;
1157  }
1158 
1159  /*
1160  * We failed reserving more memory at the end of the
1161  * current ring buffer.
1162  *
1163  * Reserve data from a new ring buffer. If it doesn't
1164  * succeed, ensure that the old message will properly
1165  * clean up the old ring buffer.
1166  */
1167  if (!fr_message_get_ring_buffer(ms, m2, false)) {
1168  m->rb_size = m_rb_size;
1169  return NULL;
1170  }
1171 
1172  /*
1173  * If necessary, copy the remaining data from the old
1174  * buffer to the new one.
1175  */
1176  if (m2->data != (m->data + actual_packet_size)) {
1177  memmove(m2->data, m->data + actual_packet_size, leftover);
1178  }
1179 
1180  /*
1181  * The messages are in different ring buffers. We've
1182  * aligned m->rb_size above for the current packet, but
1183  * there's no subsequent message to clean up this
1184  * reservation. Re-extend the current message to it's
1185  * original size, so that cleaning it up will clean up the ring buffer.
1186  */
1187  if (m2->rb != m->rb) {
1188  m->rb_size = m_rb_size;
1189  return m2;
1190  }
1191 
1192  /*
1193  * If we've managed to allocate the next message in the
1194  * current ring buffer, then it really should have
1195  * wrapped around. In which case, re-extend the current
1196  * message as above.
1197  */
1198  if (m2->data < m->data) {
1199  m->rb_size = m_rb_size;
1200  return m2;
1201  }
1202 
1203  return m2;
1204 }
1205 
1206 /** Count the number of used messages
1207  *
1208  * @param[in] ms the message set
1209  * @return
1210  * - number of used messages
1211  */
1213 {
1214  int i, used;
1215 
1216  (void) talloc_get_type_abort(ms, fr_message_set_t);
1217 
1218  used = 0;
1219  for (i = 0; i <= ms->mr_max; i++) {
1220  fr_ring_buffer_t *mr;
1221 
1222  mr = ms->mr_array[i];
1223 
1224  used += fr_ring_buffer_used(mr) / ms->message_size;
1225  }
1226 
1227  return used;
1228 }
1229 
1230 /** Garbage collect the message set.
1231  *
1232  * This function should ONLY be called just before freeing the
1233  * message set. It is intended only for debugging, and will cause
1234  * huge latency spikes if used at run time.
1235  *
1236  * @param[in] ms the message set
1237  */
1239 {
1240  int i;
1241 
1242  (void) talloc_get_type_abort(ms, fr_message_set_t);
1243 
1244  /*
1245  * Manually clean up each message ring.
1246  */
1247  for (i = 0; i <= ms->mr_max; i++) {
1248  (void) fr_message_ring_gc(ms, ms->mr_array[i], ~0);
1249  }
1250 
1251  /*
1252  * And then do one last pass to clean up the arrays.
1253  */
1254  fr_message_gc(ms, 1 << 24);
1255 }
1256 
1257 /** Print debug information about the message set.
1258  *
1259  * @param[in] ms the message set
1260  * @param[in] fp the FILE where the messages are printed.
1261  */
1263 {
1264  int i;
1265 
1266  (void) talloc_get_type_abort(ms, fr_message_set_t);
1267 
1268  fprintf(fp, "message arrays = %d\t(current %d)\n", ms->mr_max + 1, ms->mr_current);
1269  fprintf(fp, "ring buffers = %d\t(current %d)\n", ms->rb_max + 1, ms->rb_current);
1270 
1271  for (i = 0; i <= ms->mr_max; i++) {
1272  fr_ring_buffer_t *mr = ms->mr_array[i];
1273 
1274  fprintf(fp, "messages[%d] =\tsize %zu, used %zu\n",
1276  }
1277 
1278  for (i = 0; i <= ms->rb_max; i++) {
1279  fprintf(fp, "ring buffer[%d] =\tsize %zu, used %zu\n",
1281  }
1282 }
#define RCSID(id)
Definition: build.h:444
static fr_ring_buffer_t * rb
Definition: control_test.c:51
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:137
talloc_free(reap)
unsigned char uint8_t
Definition: merged_model.c:30
static fr_message_t * fr_message_get_message(fr_message_set_t *ms, bool *p_cleaned)
Allocate a fr_message_t, WITHOUT a ring buffer.
Definition: message.c:674
fr_ring_buffer_t * rb_array[MSG_ARRAY_SIZE]
array of ring buffers
Definition: message.c:113
size_t message_size
size of the callers message, including fr_message_t
Definition: message.c:99
int mr_current
current used message ring entry
Definition: message.c:96
#define MPRINT(...)
Definition: message.c:38
static fr_message_t * fr_message_ring_alloc(fr_message_set_t *ms, fr_ring_buffer_t *mr, bool clean)
Allocate a message from a message ring.
Definition: message.c:623
fr_ring_buffer_t * mr_array[MSG_ARRAY_SIZE]
array of message arrays
Definition: message.c:111
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
Definition: message.c:127
static void fr_message_gc(fr_message_set_t *ms, int max_to_clean)
Garbage collect "done" messages.
Definition: message.c:352
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition: message.c:190
fr_message_t * fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
Localize a message by copying it to local storage.
Definition: message.c:242
int rb_max
max used ring buffer entry
Definition: message.c:104
static fr_message_t * fr_message_get_ring_buffer(fr_message_set_t *ms, fr_message_t *m, bool cleaned_up)
Get a ring buffer for a message.
Definition: message.c:793
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition: message.c:934
#define MSG_ARRAY_SIZE
Definition: message.c:41
int fr_message_set_messages_used(fr_message_set_t *ms)
Count the number of used messages.
Definition: message.c:1212
void fr_message_set_debug(fr_message_set_t *ms, FILE *fp)
Print debug information about the message set.
Definition: message.c:1262
#define CACHE_ALIGN(_x)
Definition: message.c:43
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition: message.c:1238
int mr_max
max used message ring entry
Definition: message.c:97
int mr_cleaned
where we last cleaned
Definition: message.c:101
static int fr_message_ring_gc(fr_message_set_t *ms, fr_ring_buffer_t *mr, int max_to_clean)
Clean up messages in a message ring.
Definition: message.c:306
size_t max_allocation
maximum allocation size
Definition: message.c:106
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition: message.c:988
fr_message_t * fr_message_alloc_reserve(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size, size_t leftover, size_t reserve_size)
Allocate packet data for a message, and reserve a new message.
Definition: message.c:1077
int rb_current
current used ring buffer entry
Definition: message.c:103
A Message set, composed of message headers and ring buffer data.
Definition: message.c:95
fr_ring_buffer_t * rb
pointer to the ring buffer
Definition: message.h:48
size_t rb_size
cache-aligned size in the ring buffer
Definition: message.h:51
uint8_t * data
pointer to the data in the ring buffer
Definition: message.h:49
size_t data_size
size of the data in the ring buffer
Definition: message.h:50
@ FR_MESSAGE_USED
Definition: message.h:39
@ FR_MESSAGE_LOCALIZED
Definition: message.h:40
@ FR_MESSAGE_DONE
Definition: message.h:41
@ FR_MESSAGE_FREE
Definition: message.h:38
fr_message_status_t status
free, used, done, etc.
Definition: message.h:45
static size_t reserve_size
static size_t used
static bool cleanup
Definition: radsniff.c:60
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition: ring_buffer.c:64
uint8_t * fr_ring_buffer_reserve(fr_ring_buffer_t *rb, size_t size)
Reserve room in the ring buffer.
Definition: ring_buffer.c:119
int fr_ring_buffer_free(fr_ring_buffer_t *rb, size_t size_to_free)
Mark data as free,.
Definition: ring_buffer.c:304
size_t fr_ring_buffer_used(fr_ring_buffer_t *rb)
Get the amount of data used in a ring buffer.
Definition: ring_buffer.c:437
size_t fr_ring_buffer_size(fr_ring_buffer_t *rb)
Get the size of the ring buffer.
Definition: ring_buffer.c:423
int fr_ring_buffer_start(fr_ring_buffer_t *rb, uint8_t **p_start, size_t *p_size)
Get a pointer to the data at the start of the ring buffer.
Definition: ring_buffer.c:464
uint8_t * fr_ring_buffer_alloc(fr_ring_buffer_t *rb, size_t size)
Mark data as allocated.
Definition: ring_buffer.c:196
fr_assert(0)
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition: strerror.h:64
#define fr_strerror_const_push(_msg)
Definition: strerror.h:227
#define fr_strerror_const(_msg)
Definition: strerror.h:223