All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
connection.c
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or
5  * (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * @file connection.c
19  * @brief Handle pools of connections (threads, sockets, etc.)
20  * @note This API must be used by all modules in the public distribution that
21  * maintain pools of connections.
22  *
23  * @copyright 2012 The FreeRADIUS server project
24  * @copyright 2012 Alan DeKok <aland@deployingradius.com>
25  */
26 RCSID("$Id: 70fd8457ca7d2755a25a0eb935b043cd001dba0e $")
27 
28 #include <freeradius-devel/radiusd.h>
29 #include <freeradius-devel/heap.h>
30 #include <freeradius-devel/modpriv.h>
31 #include <freeradius-devel/rad_assert.h>
32 
34 
36 
37 #ifndef NDEBUG
38 #ifdef HAVE_PTHREAD_H
39 /* #define PTHREAD_DEBUG (1) */
40 #endif
41 #endif
42 
43 /** An individual connection within the connection pool
44  *
45  * Defines connection counters, timestamps, and holds a pointer to the
46  * connection handle itself.
47  *
48  * @see fr_connection_pool_t
49  */
50 struct fr_connection {
51  fr_connection_t *prev; //!< Previous connection in list.
52  fr_connection_t *next; //!< Next connection in list.
53 
54  time_t created; //!< Time connection was created.
55  struct timeval last_reserved; //!< Last time the connection was reserved.
56 
57  struct timeval last_released; //!< Time the connection was released.
58 
59  uint32_t num_uses; //!< Number of times the connection has been reserved.
60  uint64_t number; //!< Unique ID assigned when the connection is created,
61  //!< these will monotonically increase over the
62  //!< lifetime of the connection pool.
63  void *connection; //!< Pointer to whatever the module uses for a connection
64  //!< handle.
65  bool in_use; //!< Whether the connection is currently reserved.
66 
67  int heap; //!< For the next connection heap.
68 
69  bool needs_reconnecting; //!< Reconnect this connection before use.
70 
71 #ifdef PTHREAD_DEBUG
72  pthread_t pthread_id; //!< When 'in_use == true'.
73 #endif
74 };
75 
76 /** A connection pool
77  *
78  * Defines the configuration of the connection pool, all the counters and
79  * timestamps related to the connection pool, the mutex that stops multiple
80  * threads leaving the pool in an inconsistent state, and the callbacks
81  * required to open, close and check the status of connections within the pool.
82  *
83  * @see fr_connection
84  */
86  int ref; //!< Reference counter to prevent connection
87  //!< pool being freed multiple times.
88  uint32_t start; //!< Number of initial connections.
89  uint32_t min; //!< Minimum number of concurrent connections to keep open.
90  uint32_t max; //!< Maximum number of concurrent connections to allow.
91  uint32_t spare; //!< Number of spare connections to try.
92  uint32_t retry_delay; //!< seconds to delay re-open after a failed open.
93  uint32_t cleanup_interval; //!< Initial timer for how often we sweep the pool
94  //!< for free connections. (0 is infinite).
95  int delay_interval; //!< When we next do a cleanup. Initialized to
96  //!< cleanup_interval, and increase from there based
97  //!< on the delay.
98  uint64_t max_uses; //!< Maximum number of times a connection can be used
99  //!< before being closed.
100  uint32_t max_pending; //!< Max number of connections to open.
101  uint32_t lifetime; //!< How long a connection can be open before being
102  //!< closed (irrespective of whether it's idle or not).
103  uint32_t idle_timeout; //!< How long a connection can be idle before
104  //!< being closed.
105  struct timeval connect_timeout; //!< New connection timeout, enforced by the create
106  //!< callback.
107 
108  bool spread; //!< If true we spread requests over the connections,
109  //!< using the connection released longest ago, first.
110 
111  fr_heap_t *heap; //!< For the next connection heap
112 
113  fr_connection_t *head; //!< Start of the connection list.
114  fr_connection_t *tail; //!< End of the connection list.
115 
116 #ifdef HAVE_PTHREAD_H
117  pthread_mutex_t mutex; //!< Mutex used to keep consistent state when making
118  //!< modifications in threaded mode.
119  pthread_cond_t done_spawn; //!< Threads that need to ensure no spawning is in progress,
120  //!< should block on this condition if pending != 0.
121  pthread_cond_t done_reconnecting; //!< Before calling the create callback, threads should
122  //!< block on this condition if reconnecting == true.
123 #endif
124 
125  CONF_SECTION *cs; //!< Configuration section holding the section of parsed
126  //!< config file that relates to this pool.
127  void *opaque; //!< Pointer to context data that will be passed to callbacks.
128 
129  char const *log_prefix; //!< Log prefix to prepend to all log messages created
130  //!< by the connection pool code.
131 
132  char const *trigger_prefix; //!< Prefix to prepend to names of all triggers
133  //!< fired by the connection pool code.
134 
135  fr_connection_create_t create; //!< Function used to create new connections.
136  fr_connection_alive_t alive; //!< Function used to check status of connections.
137  fr_connection_pool_reconnect_t reconnect; //!< Called during connection pool reconnect.
138 
139  fr_connection_pool_state_t state; //!< Stats and state of the connection pool.
140 };
141 
142 #ifdef HAVE_PTHREAD_H
143 # define PTHREAD_MUTEX_LOCK pthread_mutex_lock
144 # define PTHREAD_MUTEX_UNLOCK pthread_mutex_unlock
145 # define PTHREAD_COND_BROADCAST pthread_cond_broadcast
146 #else
147 # define PTHREAD_MUTEX_LOCK(_x)
148 # define PTHREAD_MUTEX_UNLOCK(_x)
149 # define PTHREAD_COND_BROADCAST(_x)
150 #endif
151 
152 static const CONF_PARSER connection_config[] = {
153  { FR_CONF_OFFSET("start", PW_TYPE_INTEGER, fr_connection_pool_t, start), .dflt = "5" },
154  { FR_CONF_OFFSET("min", PW_TYPE_INTEGER, fr_connection_pool_t, min), .dflt = "5" },
155  { FR_CONF_OFFSET("max", PW_TYPE_INTEGER, fr_connection_pool_t, max), .dflt = "10" },
156  { FR_CONF_OFFSET("spare", PW_TYPE_INTEGER, fr_connection_pool_t, spare), .dflt = "3" },
157  { FR_CONF_OFFSET("uses", PW_TYPE_INTEGER64, fr_connection_pool_t, max_uses), .dflt = "0" },
158  { FR_CONF_OFFSET("lifetime", PW_TYPE_INTEGER, fr_connection_pool_t, lifetime), .dflt = "0" },
159  { FR_CONF_OFFSET("cleanup_delay", PW_TYPE_INTEGER, fr_connection_pool_t, cleanup_interval) },
160  { FR_CONF_OFFSET("cleanup_interval", PW_TYPE_INTEGER, fr_connection_pool_t, cleanup_interval), .dflt = "30" },
161  { FR_CONF_OFFSET("idle_timeout", PW_TYPE_INTEGER, fr_connection_pool_t, idle_timeout), .dflt = "60" },
162  { FR_CONF_OFFSET("connect_timeout", PW_TYPE_TIMEVAL, fr_connection_pool_t, connect_timeout), .dflt = "3.0" },
163  { FR_CONF_OFFSET("retry_delay", PW_TYPE_INTEGER, fr_connection_pool_t, retry_delay), .dflt = "1" },
164  { FR_CONF_OFFSET("spread", PW_TYPE_BOOLEAN, fr_connection_pool_t, spread), .dflt = "no" },
166 };
167 
168 /** Order connections by reserved most recently
169  */
170 static int last_reserved_cmp(void const *one, void const *two)
171 {
172  fr_connection_t const *a = one;
173  fr_connection_t const *b = two;
174 
175  if (a->last_reserved.tv_sec < b->last_reserved.tv_sec) return -1;
176  if (a->last_reserved.tv_sec > b->last_reserved.tv_sec) return +1;
177 
178  if (a->last_reserved.tv_usec < b->last_reserved.tv_usec) return -1;
179  if (a->last_reserved.tv_usec > b->last_reserved.tv_usec) return +1;
180 
181  return 0;
182 }
183 
184 /** Order connections by released longest ago
185  */
186 static int last_released_cmp(void const *one, void const *two)
187 {
188  fr_connection_t const *a = one;
189  fr_connection_t const *b = two;
190 
191  if (b->last_released.tv_sec < a->last_released.tv_sec) return -1;
192  if (b->last_released.tv_sec > a->last_released.tv_sec) return +1;
193 
194  if (b->last_released.tv_usec < a->last_released.tv_usec) return -1;
195  if (b->last_released.tv_usec > a->last_released.tv_usec) return +1;
196 
197  return 0;
198 }
199 
200 /** Removes a connection from the connection list
201  *
202  * @note Must be called with the mutex held.
203  *
204  * @param[in,out] pool to modify.
205  * @param[in] this Connection to delete.
206  */
208 {
209  if (this->prev) {
210  rad_assert(pool->head != this);
211  this->prev->next = this->next;
212  } else {
213  rad_assert(pool->head == this);
214  pool->head = this->next;
215  }
216  if (this->next) {
217  rad_assert(pool->tail != this);
218  this->next->prev = this->prev;
219  } else {
220  rad_assert(pool->tail == this);
221  pool->tail = this->prev;
222  }
223 
224  this->prev = this->next = NULL;
225 }
226 
227 /** Adds a connection to the head of the connection list
228  *
229  * @note Must be called with the mutex held.
230  *
231  * @param[in,out] pool to modify.
232  * @param[in] this Connection to add.
233  */
235 {
236  rad_assert(pool != NULL);
237  rad_assert(this != NULL);
238  rad_assert(pool->head != this);
239  rad_assert(pool->tail != this);
240 
241  if (pool->head) {
242  pool->head->prev = this;
243  }
244 
245  this->next = pool->head;
246  this->prev = NULL;
247  pool->head = this;
248  if (!pool->tail) {
249  rad_assert(this->next == NULL);
250  pool->tail = this;
251  } else {
252  rad_assert(this->next != NULL);
253  }
254 }
255 
256 /** Send a connection pool trigger.
257  *
258  * @param[in] pool to send trigger for.
259  * @param[in] name_suffix trigger name suffix.
260  */
261 static void fr_connection_exec_trigger(fr_connection_pool_t *pool, char const *name_suffix)
262 {
263  char name[64];
264  rad_assert(pool != NULL);
265  rad_assert(name_suffix != NULL);
266  snprintf(name, sizeof(name), "%s.%s", pool->trigger_prefix, name_suffix);
267  exec_trigger(NULL, pool->cs, name, true);
268 }
269 
270 /** Find a connection handle in the connection list
271  *
272  * Walks over the list of connections searching for a specified connection
273  * handle and returns the first connection that contains that pointer.
274  *
275  * @note Will lock mutex and only release mutex if connection handle
276  * is not found, so will usually return will mutex held.
277  * @note Must be called with the mutex free.
278  *
279  * @param[in] pool to search in.
280  * @param[in] conn handle to search for.
281  * @return
282  * - Connection containing the specified handle.
283  * - NULL if non if connection was found.
284  */
286 {
287  fr_connection_t *this;
288 
289  if (!pool || !conn) return NULL;
290 
291  PTHREAD_MUTEX_LOCK(&pool->mutex);
292 
293  /*
294  * FIXME: This loop could be avoided if we passed a 'void
295  * **connection' instead. We could use "offsetof" in
296  * order to find top of the parent structure.
297  */
298  for (this = pool->head; this != NULL; this = this->next) {
299  if (this->connection == conn) {
300 #ifdef PTHREAD_DEBUG
301  pthread_t pthread_id;
302 
303  pthread_id = pthread_self();
304  rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
305 #endif
306 
307  rad_assert(this->in_use == true);
308  return this;
309  }
310  }
311 
312  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
313  return NULL;
314 }
315 
316 /** Spawns a new connection
317  *
318  * Spawns a new connection using the create callback, and returns it for
319  * adding to the connection list.
320  *
321  * @note Will call the 'open' trigger.
322  * @note Must be called with the mutex free.
323  *
324  * @param[in] pool to modify.
325  * @param[in] now Current time.
326  * @param[in] in_use whether the new connection should be "in_use" or not
327  * @return
328  * - New connection struct.
329  * - NULL on error.
330  */
331 static fr_connection_t *fr_connection_spawn(fr_connection_pool_t *pool, time_t now, bool in_use)
332 {
333  uint64_t number;
334  uint32_t max_pending;
335  TALLOC_CTX *ctx;
336 
337  fr_connection_t *this;
338  void *conn;
339 
340  rad_assert(pool != NULL);
341 
342  /*
343  * If we have NO connections, and we've previously failed
344  * opening connections, don't open multiple connections until
345  * we successfully open at least one.
346  */
347  if ((pool->state.num == 0) && pool->state.pending && pool->state.last_failed) return NULL;
348 
349  PTHREAD_MUTEX_LOCK(&pool->mutex);
350  rad_assert(pool->state.num <= pool->max);
351 
352  /*
353  * Don't spawn too many connections at the same time.
354  */
355  if ((pool->state.num + pool->state.pending) >= pool->max) {
356  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
357 
358  ERROR("%s: Cannot open new connection, already at max", pool->log_prefix);
359  return NULL;
360  }
361 
362  /*
363  * If the last attempt failed, wait a bit before
364  * retrying.
365  */
366  if (pool->state.last_failed && ((pool->state.last_failed + pool->retry_delay) > now)) {
367  bool complain = false;
368 
369  if (pool->state.last_throttled != now) {
370  complain = true;
371 
372  pool->state.last_throttled = now;
373  }
374 
375  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
376 
377  if (!RATE_LIMIT_ENABLED || complain) {
378  ERROR("%s: Last connection attempt failed, waiting %d seconds before retrying",
379  pool->log_prefix, pool->retry_delay);
380  }
381 
382  return NULL;
383  }
384 
385  /*
386  * We limit the rate of new connections after a failed attempt.
387  */
388  if (pool->state.pending > pool->max_pending) {
389  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
390  RATE_LIMIT(WARN("%s: Cannot open a new connection due to rate limit after failure",
391  pool->log_prefix));
392  return NULL;
393  }
394 
395  pool->state.pending++;
396  number = pool->state.count++;
397 
398  /*
399  * Don't starve out the thread trying to reconnect
400  * the pool, by continuously opening new connections.
401  */
402 #ifdef HAVE_PTHREAD_H
403  while (pool->state.reconnecting) pthread_cond_wait(&pool->done_reconnecting, &pool->mutex);
404 #endif
405 
406  /*
407  * Unlock the mutex while we try to open a new
408  * connection. If there are issues with the back-end,
409  * opening a new connection may take a LONG time. In
410  * that case, we want the other connections to continue
411  * to be used.
412  */
413  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
414 
415  /*
416  * The true value for max_pending is the smaller of
417  * free connection slots, or pool->max_pending.
418  */
419  max_pending = (pool->max - pool->state.num);
420  if (pool->max_pending < max_pending) max_pending = pool->max_pending;
421  INFO("%s: Opening additional connection (%" PRIu64 "), %u of %u pending slots used",
422  pool->log_prefix, number, pool->state.pending, max_pending);
423 
424  /*
425  * Allocate a new top level ctx for the create callback
426  * to hang its memory off of.
427  */
428  ctx = talloc_init("fr_connection_ctx");
429  if (!ctx) return NULL;
430 
431  /*
432  * This may take a long time, which prevents other
433  * threads from releasing connections. We don't care
434  * about other threads opening new connections, as we
435  * already have no free connections.
436  */
437  conn = pool->create(ctx, pool->opaque, &pool->connect_timeout);
438  if (!conn) {
439  ERROR("%s: Opening connection failed (%" PRIu64 ")", pool->log_prefix, number);
440 
441  pool->state.last_failed = now;
442  PTHREAD_MUTEX_LOCK(&pool->mutex);
443  pool->max_pending = 1;
444  pool->state.pending--;
445 
446  PTHREAD_COND_BROADCAST(&pool->done_spawn);
447  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
448 
449  talloc_free(ctx);
450 
451  return NULL;
452  }
453 
454  /*
455  * And lock the mutex again while we link the new
456  * connection back into the pool.
457  */
458  PTHREAD_MUTEX_LOCK(&pool->mutex);
459 
460  this = talloc_zero(pool, fr_connection_t);
461  if (!this) {
462  PTHREAD_COND_BROADCAST(&pool->done_spawn);
463  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
464 
465  talloc_free(ctx);
466 
467  return NULL;
468  }
469  fr_talloc_link_ctx(this, ctx);
470 
471  this->created = now;
472  this->connection = conn;
473  this->in_use = in_use;
474 
475  this->number = number;
476  gettimeofday(&this->last_reserved, NULL);
477  this->last_released = this->last_reserved;
478 
479  /*
480  * The connection pool is starting up. Insert the
481  * connection into the heap.
482  */
483  if (!in_use) fr_heap_insert(pool->heap, this);
484 
485  fr_connection_link_head(pool, this);
486 
487  /*
488  * Do NOT insert the connection into the heap. That's
489  * done when the connection is released.
490  */
491 
492  pool->state.num++;
493 
494  rad_assert(pool->state.pending > 0);
495  pool->state.pending--;
496 
497  /*
498  * We've successfully opened one more connection. Allow
499  * more connections to open in parallel.
500  */
501  if (pool->max_pending < pool->max) pool->max_pending++;
502 
503  pool->state.last_spawned = time(NULL);
504  pool->delay_interval = pool->cleanup_interval;
505  pool->state.next_delay = pool->cleanup_interval;
506  pool->state.last_failed = 0;
507 
508  PTHREAD_COND_BROADCAST(&pool->done_spawn);
509  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
510 
511  fr_connection_exec_trigger(pool, "open");
512 
513  return this;
514 }
515 
516 /** Close an existing connection.
517  *
518  * Removes the connection from the list, calls the delete callback to close
519  * the connection, then frees memory allocated to the connection.
520  *
521  * @note Will call the 'close' trigger.
522  * @note Must be called with the mutex held.
523  *
524  * @param[in,out] pool to modify.
525  * @param[in,out] this Connection to delete.
526  */
528 {
529  /*
530  * If it's in use, release it.
531  */
532  if (this->in_use) {
533 #ifdef PTHREAD_DEBUG
534  pthread_t pthread_id = pthread_self();
535  rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
536 #endif
537 
538  this->in_use = false;
539 
540  rad_assert(pool->state.active != 0);
541  pool->state.active--;
542 
543  } else {
544  /*
545  * Connection isn't used, remove it from the heap.
546  */
547  fr_heap_extract(pool->heap, this);
548  }
549 
550  fr_connection_exec_trigger(pool, "close");
551 
552  fr_connection_unlink(pool, this);
553 
554  rad_assert(pool->state.num > 0);
555  pool->state.num--;
556  talloc_free(this);
557 }
558 
559 /** Check whether a connection needs to be removed from the pool
560  *
561  * Will verify that the connection is within idle_timeout, max_uses, and
562  * lifetime values. If it is not, the connection will be closed.
563  *
564  * @note Will only close connections not in use.
565  * @note Must be called with the mutex held.
566  *
567  * @param[in,out] pool to modify.
568  * @param[in,out] this Connection to manage.
569  * @param[in] now Current time.
570  * @return
571  * - 0 if connection was closed.
572  * - 1 if connection handle was left open.
573  */
575  fr_connection_t *this,
576  time_t now)
577 {
578  rad_assert(pool != NULL);
579  rad_assert(this != NULL);
580 
581  /*
582  * Don't terminated in-use connections
583  */
584  if (this->in_use) return 1;
585 
586  if (this->needs_reconnecting) {
587  DEBUG2("%s: Closing expired connection (%" PRIu64 "): Needs reconnecting", pool->log_prefix,
588  this->number);
589  do_delete:
590  if (pool->state.num <= pool->min) {
591  DEBUG2("%s: You probably need to lower \"min\"", pool->log_prefix);
592  }
593  fr_connection_close_internal(pool, this);
594  return 0;
595  }
596 
597  if ((pool->max_uses > 0) &&
598  (this->num_uses >= pool->max_uses)) {
599  DEBUG2("%s: Closing expired connection (%" PRIu64 "): Hit max_uses limit", pool->log_prefix,
600  this->number);
601  goto do_delete;
602  }
603 
604  if ((pool->lifetime > 0) &&
605  ((this->created + pool->lifetime) < now)) {
606  DEBUG2("%s: Closing expired connection (%" PRIu64 "): Hit lifetime limit",
607  pool->log_prefix, this->number);
608  goto do_delete;
609  }
610 
611  if ((pool->idle_timeout > 0) &&
612  ((this->last_released.tv_sec + pool->idle_timeout) < now)) {
613  INFO("%s: Closing connection (%" PRIu64 "): Hit idle_timeout, was idle for %u seconds",
614  pool->log_prefix, this->number, (int) (now - this->last_released.tv_sec));
615  goto do_delete;
616  }
617 
618  return 1;
619 }
620 
621 
622 /** Check whether any connections need to be removed from the pool
623  *
624  * Maintains the number of connections in the pool as per the configuration
625  * parameters for the connection pool.
626  *
627  * @note Will only run checks the first time it's called in a given second,
628  * to throttle connection spawning/closing.
629  * @note Will only close connections not in use.
630  * @note Must be called with the mutex held, will release mutex before
631  * returning.
632  *
633  * @param[in,out] pool to manage.
634  * @return 1
635  */
637 {
638  uint32_t spawn, idle, extra;
639  time_t now = time(NULL);
640  fr_connection_t *this, *next;
641 
642  if (pool->state.last_checked == now) {
643  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
644  return 1;
645  }
646 
647  /*
648  * Some idle connections are OK, if they're within the
649  * configured "spare" range. Any extra connections
650  * outside of that range can be closed.
651  */
652  idle = pool->state.num - pool->state.active;
653  if (idle <= pool->spare) {
654  extra = 0;
655  } else {
656  extra = idle - pool->spare;
657  }
658 
659  /*
660  * The other end can close connections. If so, we'll
661  * have fewer than "min". When that happens, open more
662  * connections to enforce "min".
663  */
664  if ((pool->state.num + pool->state.pending) <= pool->min) {
665  spawn = pool->min - (pool->state.num + pool->state.pending);
666  extra = 0;
667 
668  /*
669  * If we're about to create more than "max",
670  * don't create more.
671  */
672  } else if ((pool->state.num + pool->state.pending) >= pool->max) {
673  /*
674  * Ensure we don't spawn more connections. If
675  * there are extra idle connections, we can
676  * delete all of them.
677  */
678  spawn = 0;
679  /* leave extra alone from above */
680 
681  /*
682  * min < num < max
683  *
684  * AND we don't have enough idle connections.
685  * Open some more.
686  */
687  } else if (idle <= pool->spare) {
688  /*
689  * Not enough spare connections. Spawn a few.
690  * But cap the pool size at "max"
691  */
692  spawn = pool->spare - idle;
693  extra = 0;
694 
695  if ((pool->state.num + pool->state.pending + spawn) > pool->max) {
696  spawn = pool->max - (pool->state.num + pool->state.pending);
697  }
698 
699  /*
700  * min < num < max
701  *
702  * We have more than enough idle connections, AND
703  * some are pending. Don't open or close any.
704  */
705  } else if (pool->state.pending) {
706  spawn = 0;
707  extra = 0;
708 
709  /*
710  * We have too many idle connections, but closing
711  * some would take us below "min", so we only
712  * close enough to take us to "min".
713  */
714  } else if ((pool->min + extra) >= pool->state.num) {
715  spawn = 0;
716  extra = pool->state.num - pool->min;
717 
718  } else {
719  /*
720  * Closing the "extra" connections won't take us
721  * below "min". It's therefore safe to close
722  * them all.
723  */
724  spawn = 0;
725  /* leave extra alone from above */
726  }
727 
728  /*
729  * Only try to open spares if we're not already attempting to open
730  * a connection. Avoids spurious log messages.
731  */
732  if (spawn) {
733  INFO("%s: Need %i more connections to reach %i spares",
734  pool->log_prefix, spawn, pool->spare);
735  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
736  fr_connection_spawn(pool, now, false); /* ignore return code */
737  PTHREAD_MUTEX_LOCK(&pool->mutex);
738  }
739 
740  /*
741  * We haven't spawned connections in a while, and there
742  * are too many spare ones. Close the one which has been
743  * unused for the longest.
744  */
745  if (extra && (now >= (pool->state.last_spawned + pool->delay_interval))) {
746  fr_connection_t *found;
747 
748  found = NULL;
749  for (this = pool->tail; this != NULL; this = this->prev) {
750  if (this->in_use) continue;
751 
752  if (!found ||
753  timercmp(&this->last_reserved, &found->last_reserved, <)) {
754  found = this;
755  }
756  }
757 
758  rad_assert(found != NULL);
759 
760  INFO("%s: Closing connection (%" PRIu64 "), from %d unused connections", pool->log_prefix,
761  found->number, extra);
762  fr_connection_close_internal(pool, found);
763 
764  /*
765  * Decrease the delay for the next time we clean
766  * up.
767  */
768  pool->state.next_delay >>= 1;
769  if (pool->state.next_delay == 0) pool->state.next_delay = 1;
770  pool->delay_interval += pool->state.next_delay;
771  }
772 
773  /*
774  * Pass over all of the connections in the pool, limiting
775  * lifetime, idle time, max requests, etc.
776  */
777  for (this = pool->head; this != NULL; this = next) {
778  next = this->next;
779  fr_connection_manage(pool, this, now);
780  }
781 
782  pool->state.last_checked = now;
783  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
784 
785  return 1;
786 }
787 
788 /** Get a connection from the connection pool
789  *
790  * @note Must be called with the mutex free.
791  *
792  * @param[in,out] pool to reserve the connection from.
793  * @param[in] spawn whether to spawn a new connection
794  * @return
795  * - A pointer to the connection handle.
796  * - NULL on error.
797  */
798 static void *fr_connection_get_internal(fr_connection_pool_t *pool, bool spawn)
799 {
800  time_t now;
801  fr_connection_t *this;
802 
803  if (!pool) return NULL;
804 
805  PTHREAD_MUTEX_LOCK(&pool->mutex);
806 
807  now = time(NULL);
808 
809  /*
810  * Grab the link with the lowest latency, and check it
811  * for limits. If "connection manage" says the link is
812  * no longer usable, go grab another one.
813  */
814  do {
815  this = fr_heap_peek(pool->heap);
816  if (!this) break;
817  } while (!fr_connection_manage(pool, this, now));
818 
819  /*
820  * We have a working connection. Extract it from the
821  * heap and use it.
822  */
823  if (this) {
824  fr_heap_extract(pool->heap, this);
825  goto do_return;
826  }
827 
828  /*
829  * We don't have a connection. Try to open a new one.
830  */
831  rad_assert(pool->state.active == pool->state.num);
832 
833  if (pool->state.num == pool->max) {
834  bool complain = false;
835 
836  /*
837  * Rate-limit complaints.
838  */
839  if (pool->state.last_at_max != now) {
840  complain = true;
841  pool->state.last_at_max = now;
842  }
843 
844  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
845  if (!RATE_LIMIT_ENABLED || complain) {
846  ERROR("%s: No connections available and at max connection limit", pool->log_prefix);
847  }
848 
849  return NULL;
850  }
851 
852  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
853 
854  if (!spawn) return NULL;
855 
856  DEBUG2("%s: %i of %u connections in use. You may need to increase \"spare\"", pool->log_prefix,
857  pool->state.active, pool->state.num);
858  this = fr_connection_spawn(pool, now, true); /* MY connection! */
859  if (!this) return NULL;
860  PTHREAD_MUTEX_LOCK(&pool->mutex);
861 
862 do_return:
863  pool->state.active++;
864  this->num_uses++;
865  gettimeofday(&this->last_reserved, NULL);
866  this->in_use = true;
867 
868 #ifdef PTHREAD_DEBUG
869  this->pthread_id = pthread_self();
870 #endif
871  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
872 
873  DEBUG2("%s: Reserved connection (%" PRIu64 ")", pool->log_prefix, this->number);
874 
875  return this->connection;
876 }
877 
878 /** Create a new connection pool
879  *
880  * Allocates structures used by the connection pool, initialises the various
881  * configuration options and counters, and sets the callback functions.
882  *
883  * Will also spawn the number of connections specified by the 'start'
884  * configuration options.
885  *
886  * @note Will call the 'start' trigger.
887  *
888  * @param[in] ctx Context to link pool's destruction to.
889  * @param[in] cs pool section.
890  * @param[in] opaque data pointer to pass to callbacks.
891  * @param[in] c Callback to create new connections.
892  * @param[in] a Callback to check the status of connections.
893  * @param[in] log_prefix prefix to prepend to all log messages.
894  * @param[in] trigger_prefix prefix to prepend to all trigger names.
895  * @return
896  * - New connection pool.
897  * - NULL on error.
898  */
900  CONF_SECTION *cs,
901  void *opaque,
904  char const *log_prefix,
905  char const *trigger_prefix)
906 {
907  uint32_t i;
908  fr_connection_pool_t *pool;
909  fr_connection_t *this;
910  time_t now;
911 
912  if (!cs || !opaque || !c) return NULL;
913 
914  now = time(NULL);
915 
916  /*
917  * Pool is allocated in the NULL context as
918  * threads are likely to allocate memory
919  * beneath the pool.
920  */
921  pool = talloc_zero(NULL, fr_connection_pool_t);
922  if (!pool) return NULL;
923 
924  /*
925  * Ensure the pool is freed at the same time
926  * as its parent.
927  */
928  if (fr_talloc_link_ctx(ctx, pool) < 0) {
929  talloc_free(pool);
930 
931  return NULL;
932  }
933 
934  pool->cs = cs;
935  pool->opaque = opaque;
936  pool->create = c;
937  pool->alive = a;
938 
939  pool->head = pool->tail = NULL;
940 
941  /*
942  * We keep a heap of connections, sorted by the last time
943  * we STARTED using them. Newly opened connections
944  * aren't in the heap. They're only inserted in the list
945  * once they're released.
946  *
947  * We do "most recently started" instead of "most
948  * recently used", because MRU is done as most recently
949  * *released*. We want to order connections by
950  * responsiveness, and MRU prioritizes high latency
951  * connections.
952  *
953  * We want most recently *started*, which gives
954  * preference to low latency links, and pushes high
955  * latency links down in the priority heap.
956  *
957  * https://code.facebook.com/posts/1499322996995183/solving-the-mystery-of-link-imbalance-a-metastable-failure-state-at-scale/
958  */
959  if (!pool->spread) {
960  pool->heap = fr_heap_create(last_reserved_cmp, offsetof(fr_connection_t, heap));
961  /*
962  * For some types of connections we need to used a different
963  * algorithm, because load balancing benefits are secondary
964  * to maintaining a cache of open connections.
965  *
966  * With libcurl's multihandle, connections can only be reused
967  * if all handles that make up the multhandle are done processing
968  * their requests.
969  *
970  * We can't tell when that's happened using libcurl, and even
971  * if we could, blocking until all servers had responded
972  * would have huge cost.
973  *
974  * The solution is to order the heap so that the connection that
975  * was released longest ago is at the top.
976  *
977  * That way we maximise time between connection use.
978  */
979  } else {
980  pool->heap = fr_heap_create(last_released_cmp, offsetof(fr_connection_t, heap));
981  }
982  if (!pool->heap) {
983  talloc_free(pool);
984  return NULL;
985  }
986 
987  pool->log_prefix = log_prefix ? talloc_typed_strdup(pool, log_prefix) : "core";
988  pool->trigger_prefix = trigger_prefix ? talloc_typed_strdup(pool, trigger_prefix) : "";
989 
990 #ifdef HAVE_PTHREAD_H
991  pthread_mutex_init(&pool->mutex, NULL);
992  pthread_cond_init(&pool->done_spawn, NULL);
993  pthread_cond_init(&pool->done_reconnecting, NULL);
994 #endif
995 
996  DEBUG2("%s: Initialising connection pool", pool->log_prefix);
997 
998  if (cf_section_parse(cs, pool, connection_config) < 0) goto error;
999 
1000  /*
1001  * Some simple limits
1002  */
1003  if (pool->max == 0) {
1004  cf_log_err_cs(cs, "Cannot set 'max' to zero");
1005  goto error;
1006  }
1007  pool->max_pending = pool->max; /* can open all connections now */
1008 
1009  if (pool->min > pool->max) {
1010  cf_log_err_cs(cs, "Cannot set 'min' to more than 'max'");
1011  goto error;
1012  }
1013 
1014  FR_INTEGER_BOUND_CHECK("max", pool->max, <=, 1024);
1015  FR_INTEGER_BOUND_CHECK("start", pool->start, <=, pool->max);
1016  FR_INTEGER_BOUND_CHECK("spare", pool->spare, <=, (pool->max - pool->min));
1017 
1018  if (pool->lifetime > 0) {
1019  FR_INTEGER_COND_CHECK("idle_timeout", pool->idle_timeout, (pool->idle_timeout <= pool->lifetime), 0);
1020  }
1021 
1022  if (pool->idle_timeout > 0) {
1023  FR_INTEGER_BOUND_CHECK("cleanup_interval", pool->cleanup_interval, <=, pool->idle_timeout);
1024  }
1025 
1026  /*
1027  * Some libraries treat 0.0 as infinite timeout, others treat it
1028  * as instantaneous timeout. Solve the inconsistency by making
1029  * the smallest allowable timeout 100ms.
1030  */
1031  FR_TIMEVAL_BOUND_CHECK("connect_timeout", &pool->connect_timeout, >=, 0, 100000);
1032 
1033  /*
1034  * Don't open any connections. Instead, force the limits
1035  * to only 1 connection.
1036  *
1037  */
1038  if (check_config) {
1039  pool->start = pool->min = pool->max = 1;
1040  return pool;
1041  }
1042 
1043  /*
1044  * Create all of the connections, unless the admin says
1045  * not to.
1046  */
1047  for (i = 0; i < pool->start; i++) {
1048  this = fr_connection_spawn(pool, now, false);
1049  if (!this) {
1050  error:
1052  return NULL;
1053  }
1054  }
1055 
1056  fr_connection_exec_trigger(pool, "start");
1057 
1058  return pool;
1059 }
1060 
1061 /** Allocate a new pool using an existing one as a template
1062  *
1063  * @param ctx to allocate new pool in.
1064  * @param pool to copy.
1065  * @param opaque data to pass to connection function.
1066  * @return
1067  * - New connection pool.
1068  * - NULL on error.
1069  */
1071 {
1072  return fr_connection_pool_init(ctx, pool->cs, opaque, pool->create,
1073  pool->alive, pool->log_prefix, pool->trigger_prefix);
1074 }
1075 
1076 /** Get the number of connections currently in the pool
1077  *
1078  * @param[in] pool to count connections for.
1079  * @return the number of connections in the pool
1080  */
1082 {
1083  return &pool->state;
1084 }
1085 
1086 /** Connection pool get timeout
1087  *
1088  * @param[in] pool to get connection timeout for.
1089  * @return the connection timeout configured for the pool.
1090  */
1092 {
1093  return pool->connect_timeout;
1094 }
1095 
1096 /** Return the opaque data associated with a connection pool
1097  *
1098  * @param pool to return data for.
1099  * @return opaque data associated with pool.
1100  */
1102 {
1103  return pool->opaque;
1104 }
1105 
1106 /** Increment pool reference by one.
1107  *
1108  * @param[in] pool to increment reference counter for.
1109  */
1111 {
1112  pool->ref++;
1113 }
1114 
1115 /** Set a reconnection callback for the connection pool
1116  *
1117  * This can be called at any time during the pool's lifecycle.
1118  *
1119  * @param[in] pool to set reconnect callback for.
1120  * @param reconnect callback to call when reconnecting pool's connections.
1121  */
1123 {
1124  pool->reconnect = reconnect;
1125 }
1126 
1127 /** Mark connections for reconnection, and spawn at least 'start' connections
1128  *
1129  * @note This call may block whilst waiting for pending connection attempts to complete.
1130  *
1131  * This intended to be called on a connection pool that's in use, to have it reflect
1132  * a configuration change, or because the administrator knows that all connections
1133  * in the pool are inviable and need to be reconnected.
1134  *
1135  * @param[in] pool to reconnect.
1136  * @return
1137  * - 0 On success.
1138  * - -1 If we couldn't create start connections, this may be ignored
1139  * depending on the context in which this function is being called.
1140  */
1142 {
1143  uint32_t i;
1144  fr_connection_t *this;
1145  time_t now;
1146 
1147  PTHREAD_MUTEX_LOCK(&pool->mutex);
1148 
1149  /*
1150  * Pause new spawn attempts (we release the mutex
1151  * during our cond wait).
1152  */
1153  pool->state.reconnecting = true;
1154 
1155 #ifdef HAVE_PTHREAD_H
1156  /*
1157  * When the loop exits, we'll hold the lock for the pool,
1158  * and we're guaranteed the connection create callback
1159  * will not be using the opaque data.
1160  */
1161  while (pool->state.pending) pthread_cond_wait(&pool->done_spawn, &pool->mutex);
1162 #endif
1163 
1164  /*
1165  * We want to ensure at least 'start' connections
1166  * have been reconnected. We can't call reconnect
1167  * because, we might get the same connection each
1168  * time we reserve one, so we close 'start'
1169  * connections, and then attempt to spawn them again.
1170  */
1171  for (i = 0; i < pool->start; i++) {
1172  this = fr_heap_peek(pool->heap);
1173  if (!this) break; /* There wasn't 'start' connections available */
1174 
1175  fr_connection_close_internal(pool, this);
1176  }
1177 
1178  /*
1179  * Mark all remaining connections in the pool as
1180  * requiring reconnection.
1181  */
1182  for (this = pool->head; this; this = this->next) this->needs_reconnecting = true;
1183 
1184  /*
1185  * Call the reconnect callback (if one's set)
1186  * This may modify the opaque data associated
1187  * with the pool.
1188  */
1189  if (pool->reconnect) pool->reconnect(pool->opaque);
1190 
1191 #ifdef HAVE_PTHREAD_H
1192  /*
1193  * Allow new spawn attempts, and wakeup any threads
1194  * waiting to spawn new connections.
1195  */
1196  pool->state.reconnecting = false;
1197  PTHREAD_COND_BROADCAST(&pool->done_reconnecting);
1198  PTHREAD_MUTEX_UNLOCK(&pool->mutex);
1199 #endif
1200 
1201  fr_connection_exec_trigger(pool, "reconnect");
1202 
1203  now = time(NULL);
1204 
1205  /*
1206  * Now attempt to spawn 'start' connections.
1207  */
1208  for (i = 0; i < pool->start; i++) {
1209  this = fr_connection_spawn(pool, now, false);
1210  if (!this) return -1;
1211  }
1212 
1213  return 0;
1214 }
1215 
1216 /** Delete a connection pool
1217  *
1218  * Closes, unlinks and frees all connections in the connection pool, then frees
1219  * all memory used by the connection pool.
1220  *
1221  * @note Will call the 'stop' trigger.
1222  * @note Must be called with the mutex free.
1223  *
1224  * @param[in,out] pool to delete.
1225  */
1227 {
1228  fr_connection_t *this;
1229 
1230  if (!pool) return;
1231 
1232  /*
1233  * More modules hold a reference to this pool, don't free
1234  * it yet.
1235  */
1236  if (pool->ref > 0) {
1237  pool->ref--;
1238  return;
1239  }
1240 
1241  DEBUG2("%s: Removing connection pool", pool->log_prefix);
1242 
1243  PTHREAD_MUTEX_LOCK(&pool->mutex);
1244 
1245  /*
1246  * Don't loop over the list. Just keep removing the head
1247  * until they're all gone.
1248  */
1249  while ((this = pool->head) != NULL) {
1250  INFO("%s: Closing connection (%" PRIu64 ")", pool->log_prefix, this->number);
1251 
1252  fr_connection_close_internal(pool, this);
1253  }
1254 
1255  fr_heap_delete(pool->heap);
1256 
1257  fr_connection_exec_trigger(pool, "stop");
1258 
1259  rad_assert(pool->head == NULL);
1260  rad_assert(pool->tail == NULL);
1261  rad_assert(pool->state.num == 0);
1262 
1263 #ifdef HAVE_PTHREAD_H
1264  pthread_mutex_destroy(&pool->mutex);
1265  pthread_cond_destroy(&pool->done_spawn);
1266  pthread_cond_destroy(&pool->done_reconnecting);
1267 #endif
1268 
1269  talloc_free(pool);
1270 }
1271 
1272 /** Reserve a connection in the connection pool
1273  *
1274  * Will attempt to find an unused connection in the connection pool, if one is
1275  * found, will mark it as in in use increment the number of active connections
1276  * and return the connection handle.
1277  *
1278  * If no free connections are found will attempt to spawn a new one, conditional
1279  * on a connection spawning not already being in progress, and not being at the
1280  * 'max' connection limit.
1281  *
1282  * @note fr_connection_release must be called once the caller has finished
1283  * using the connection.
1284  *
1285  * @see fr_connection_release
1286  * @param[in,out] pool to reserve the connection from.
1287  * @return
1288  * - A pointer to the connection handle.
1289  * - NULL on error.
1290  */
1292 {
1293  return fr_connection_get_internal(pool, true);
1294 }
1295 
1296 /** Release a connection
1297  *
1298  * Will mark a connection as unused and decrement the number of active
1299  * connections.
1300  *
1301  * @see fr_connection_get
1302  * @param[in,out] pool to release the connection in.
1303  * @param[in,out] conn to release.
1304  */
1306 {
1307  fr_connection_t *this;
1308 
1309  this = fr_connection_find(pool, conn);
1310  if (!this) return;
1311 
1312  this->in_use = false;
1313 
1314  /*
1315  * Record when the connection was last released
1316  */
1317  gettimeofday(&this->last_released, NULL);
1318  pool->state.last_released = this->last_released;
1319 
1320  /*
1321  * Insert the connection in the heap.
1322  *
1323  * This will either be based on when we *started* using it
1324  * (allowing fast links to be re-used, and slow links to be
1325  * gradually expired), or when we released it (allowing
1326  * the maximum amount of time between connection use).
1327  */
1328  fr_heap_insert(pool->heap, this);
1329 
1330  rad_assert(pool->state.active != 0);
1331  pool->state.active--;
1332 
1333  DEBUG2("%s: Released connection (%" PRIu64 ")", pool->log_prefix, this->number);
1334 
1335  /*
1336  * We mirror the "spawn on get" functionality by having
1337  * "delete on release". If there are too many spare
1338  * connections, go manage the pool && clean some up.
1339  */
1341 }
1342 
1343 /** Reconnect a suspected inviable connection
1344  *
1345  * This should be called by the module if it suspects that a connection is
1346  * not viable (e.g. the server has closed it).
1347  *
1348  * When implementing a module that uses the connection pool API, it is advisable
1349  * to pass a pointer to the pointer to the handle (void **conn)
1350  * to all functions which may call reconnect. This is so that if a new handle
1351  * is created and returned, the handle pointer can be updated up the callstack,
1352  * and a function higher up the stack doesn't attempt to use a now invalid
1353  * connection handle.
1354  *
1355  * @note Will free any talloced memory hung off the context of the connection,
1356  * being reconnected.
1357  *
1358  * @warning After calling reconnect the caller *MUST NOT* attempt to use
1359  * the old handle in any other operations, as its memory will have been
1360  * freed.
1361  *
1362  * @see fr_connection_get
1363  * @param[in,out] pool to reconnect the connection in.
1364  * @param[in,out] conn to reconnect.
1365  * @return new connection handle if successful else NULL.
1366  */
1368 {
1369  fr_connection_t *this;
1370 
1371  if (!pool || !conn) return NULL;
1372 
1373  /*
1374  * If fr_connection_find is successful the pool is now locked
1375  */
1376  this = fr_connection_find(pool, conn);
1377  if (!this) return NULL;
1378 
1379  INFO("%s: Deleting inviable connection (%" PRIu64 ")", pool->log_prefix, this->number);
1380 
1381  fr_connection_close_internal(pool, this);
1382  fr_connection_pool_check(pool); /* Whilst we still have the lock (will release the lock) */
1383 
1384  /*
1385  * Return an existing connection or spawn a new one.
1386  */
1387  return fr_connection_get_internal(pool, true);
1388 }
1389 
1390 /** Delete a connection from the connection pool.
1391  *
1392  * Resolves the connection handle to a connection, then (if found)
1393  * closes, unlinks and frees that connection.
1394  *
1395  * @note Must be called with the mutex free.
1396  *
1397  * @param[in,out] pool Connection pool to modify.
1398  * @param[in] conn to delete.
1399  * @return
1400  * - 0 If the connection could not be found.
1401  * - 1 if the connection was deleted.
1402  */
1404 {
1405  fr_connection_t *this;
1406 
1407  this = fr_connection_find(pool, conn);
1408  if (!this) return 0;
1409 
1410  /*
1411  * Record the last time a connection was closed
1412  */
1413  gettimeofday(&pool->state.last_closed, NULL);
1414 
1415  INFO("%s: Deleting connection (%" PRIu64 ")", pool->log_prefix, this->number);
1416 
1417  fr_connection_close_internal(pool, this);
1419  return 1;
1420 }
#define PTHREAD_MUTEX_UNLOCK(_x)
Definition: connection.c:148
uint32_t cleanup_interval
Initial timer for how often we sweep the pool for free connections.
Definition: connection.c:93
void fr_connection_pool_free(fr_connection_pool_t *pool)
Delete a connection pool.
Definition: connection.c:1226
#define pthread_mutex_init(_x, _y)
Definition: rlm_eap.h:75
static int last_reserved_cmp(void const *one, void const *two)
Order connections by reserved most recently.
Definition: connection.c:170
Time value (struct timeval), only for config items.
Definition: radius.h:55
uint32_t lifetime
How long a connection can be open before being closed (irrespective of whether it's idle or not)...
Definition: connection.c:101
struct timeval last_reserved
Last time the connection was reserved.
Definition: connection.c:55
void exec_trigger(REQUEST *request, CONF_SECTION *cs, char const *name, bool quench) CC_HINT(nonnull(3))
Execute a trigger - call an executable to process an event.
Definition: exec.c:686
int(* fr_connection_alive_t)(void *opaque, void *connection)
Check a connection handle is still viable.
Definition: connection.h:113
bool needs_reconnecting
Reconnect this connection before use.
Definition: connection.c:69
#define INFO(fmt,...)
Definition: log.h:143
static char const * name
static int last_released_cmp(void const *one, void const *two)
Order connections by released longest ago.
Definition: connection.c:186
void * fr_connection_reconnect(fr_connection_pool_t *pool, void *conn)
Reconnect a suspected inviable connection.
Definition: connection.c:1367
int heap
For the next connection heap.
Definition: connection.c:67
#define RATE_LIMIT_ENABLED
True if rate limiting is enabled.
Definition: log.h:366
#define CONF_PARSER_TERMINATOR
Definition: conffile.h:289
static void fr_connection_link_head(fr_connection_pool_t *pool, fr_connection_t *this)
Adds a connection to the head of the connection list.
Definition: connection.c:234
void const * fr_connection_pool_opaque(fr_connection_pool_t *pool)
Return the opaque data associated with a connection pool.
Definition: connection.c:1101
static void fr_connection_close_internal(fr_connection_pool_t *pool, fr_connection_t *this)
Close an existing connection.
Definition: connection.c:527
char const * log_prefix
Log prefix to prepend to all log messages created by the connection pool code.
Definition: connection.c:129
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
Definition: snprintf.c:686
uint32_t spare
Number of spare connections to try.
Definition: connection.c:91
uint32_t start
Number of initial connections.
Definition: connection.c:88
#define FR_INTEGER_BOUND_CHECK(_name, _var, _op, _bound)
Definition: conffile.h:222
int fr_heap_extract(fr_heap_t *hp, void *data)
Definition: heap.c:147
uint64_t number
Unique ID assigned when the connection is created, these will monotonically increase over the lifetim...
Definition: connection.c:60
fr_heap_t * heap
For the next connection heap.
Definition: connection.c:111
int ref
Reference counter to prevent connection pool being freed multiple times.
Definition: connection.c:86
bool reconnecting
We are currently reconnecting the pool.
Definition: connection.h:58
Defines a CONF_PAIR to C data type mapping.
Definition: conffile.h:267
int delay_interval
When we next do a cleanup.
Definition: connection.c:95
uint32_t num_uses
Number of times the connection has been reserved.
Definition: connection.c:59
void * fr_heap_peek(fr_heap_t *hp)
Definition: heap.c:207
time_t last_checked
Last time we pruned the connection pool.
Definition: connection.h:39
fr_connection_t * tail
End of the connection list.
Definition: connection.c:114
uint64_t max_uses
Maximum number of times a connection can be used before being closed.
Definition: connection.c:98
#define FR_TIMEVAL_BOUND_CHECK(_name, _var, _op, _bound_sec, _bound_usec)
Definition: conffile.h:224
#define rad_assert(expr)
Definition: rad_assert.h:38
#define PTHREAD_COND_BROADCAST(_x)
Definition: connection.c:149
uint32_t min
Minimum number of concurrent connections to keep open.
Definition: connection.c:89
int fr_heap_insert(fr_heap_t *hp, void *data)
Definition: heap.c:92
struct timeval last_released
Time the connection was released.
Definition: connection.c:57
bool in_use
Whether the connection is currently reserved.
Definition: connection.c:65
fr_connection_pool_state_t state
Stats and state of the connection pool.
Definition: connection.c:139
An individual connection within the connection pool.
Definition: connection.c:50
void(* fr_connection_pool_reconnect_t)(void *opaque)
Alter the opaque data of a connection pool during reconnection event.
Definition: connection.h:72
fr_connection_alive_t alive
Function used to check status of connections.
Definition: connection.c:136
static const CONF_PARSER connection_config[]
Definition: connection.c:152
struct timeval last_released
Last time a connection was released.
Definition: connection.h:47
#define DEBUG2(fmt,...)
Definition: log.h:176
int cf_section_parse(CONF_SECTION *, void *base, CONF_PARSER const *variables)
Parse a configuration section into user-supplied variables.
Definition: conffile.c:2234
Definition: heap.c:16
void * opaque
Pointer to context data that will be passed to callbacks.
Definition: connection.c:127
static fr_connection_t * fr_connection_find(fr_connection_pool_t *pool, void *conn)
Find a connection handle in the connection list.
Definition: connection.c:285
struct timeval fr_connection_pool_timeout(fr_connection_pool_t *pool)
Connection pool get timeout.
Definition: connection.c:1091
fr_connection_pool_state_t const * fr_connection_pool_state(fr_connection_pool_t *pool)
Get the number of connections currently in the pool.
Definition: connection.c:1081
time_t last_spawned
Last time we spawned a connection.
Definition: connection.h:40
void fr_heap_delete(fr_heap_t *hp)
Definition: heap.c:36
void void cf_log_err_cs(CONF_SECTION const *cs, char const *fmt,...) CC_HINT(format(printf
static int fr_connection_pool_check(fr_connection_pool_t *pool)
Check whether any connections need to be removed from the pool.
Definition: connection.c:636
A truth value.
Definition: radius.h:56
time_t created
Time connection was created.
Definition: connection.c:54
32 Bit unsigned integer.
Definition: radius.h:34
uint32_t active
Number of currently reserved connections.
Definition: connection.h:56
64 Bit unsigned integer.
Definition: radius.h:51
char const * trigger_prefix
Prefix to prepend to names of all triggers fired by the connection pool code.
Definition: connection.c:132
fr_connection_t * prev
Previous connection in list.
Definition: connection.c:51
static void * fr_connection_get_internal(fr_connection_pool_t *pool, bool spawn)
Get a connection from the connection pool.
Definition: connection.c:798
uint32_t idle_timeout
How long a connection can be idle before being closed.
Definition: connection.c:103
static int fr_connection_manage(fr_connection_pool_t *pool, fr_connection_t *this, time_t now)
Check whether a connection needs to be removed from the pool.
Definition: connection.c:574
time_t last_failed
Last time we tried to spawn a connection but failed.
Definition: connection.h:41
#define PTHREAD_MUTEX_LOCK(_x)
Definition: connection.c:147
time_t last_throttled
Last time we refused to spawn a connection because the last connection failed, or we were already spa...
Definition: connection.h:42
static fr_connection_t * fr_connection_spawn(fr_connection_pool_t *pool, time_t now, bool in_use)
Spawns a new connection.
Definition: connection.c:331
CONF_SECTION * cs
Configuration section holding the section of parsed config file that relates to this pool...
Definition: connection.c:125
#define FR_CONF_OFFSET(_n, _t, _s, _f)
Definition: conffile.h:168
uint32_t num
Number of connections in the pool.
Definition: connection.h:55
A connection pool.
Definition: connection.c:85
void fr_connection_release(fr_connection_pool_t *pool, void *conn)
Release a connection.
Definition: connection.c:1305
fr_connection_pool_t * fr_connection_pool_copy(TALLOC_CTX *ctx, fr_connection_pool_t *pool, void *opaque)
Allocate a new pool using an existing one as a template.
Definition: connection.c:1070
#define FR_INTEGER_COND_CHECK(_name, _var, _cond, _new)
Definition: conffile.h:214
struct timeval connect_timeout
New connection timeout, enforced by the create callback.
Definition: connection.c:105
#define WARN(fmt,...)
Definition: log.h:144
#define RATE_LIMIT(_x)
Rate limit messages.
Definition: log.h:380
time_t last_at_max
Last time we hit the maximum number of allowed connections.
Definition: connection.h:45
uint32_t retry_delay
seconds to delay re-open after a failed open.
Definition: connection.c:92
void * connection
Pointer to whatever the module uses for a connection handle.
Definition: connection.c:63
void * fr_connection_get(fr_connection_pool_t *pool)
Reserve a connection in the connection pool.
Definition: connection.c:1291
bool spread
If true we spread requests over the connections, using the connection released longest ago...
Definition: connection.c:108
uint32_t max
Maximum number of concurrent connections to allow.
Definition: connection.c:90
int fr_talloc_link_ctx(TALLOC_CTX *parent, TALLOC_CTX *child)
Link a parent and a child context, so the child is freed before the parent.
Definition: misc.c:105
struct timeval last_closed
Last time a connection was closed.
Definition: connection.h:48
fr_heap_t * fr_heap_create(fr_heap_cmp_t cmp, size_t offset)
Definition: heap.c:44
bool check_config
Definition: conffile.c:45
static void fr_connection_unlink(fr_connection_pool_t *pool, fr_connection_t *this)
Removes a connection from the connection list.
Definition: connection.c:207
void fr_connection_pool_reconnect_func(fr_connection_pool_t *pool, fr_connection_pool_reconnect_t reconnect)
Set a reconnection callback for the connection pool.
Definition: connection.c:1122
fr_connection_pool_reconnect_t reconnect
Called during connection pool reconnect.
Definition: connection.c:137
uint64_t count
Number of connections spawned over the lifetime of the pool.
Definition: connection.h:53
int fr_connection_close(fr_connection_pool_t *pool, void *conn)
Delete a connection from the connection pool.
Definition: connection.c:1403
fr_connection_pool_t * fr_connection_pool_init(TALLOC_CTX *ctx, CONF_SECTION *cs, void *opaque, fr_connection_create_t c, fr_connection_alive_t a, char const *log_prefix, char const *trigger_prefix)
Create a new connection pool.
Definition: connection.c:899
void *(* fr_connection_create_t)(TALLOC_CTX *ctx, void *opaque, struct timeval const *timeout)
Create a new connection handle.
Definition: connection.h:98
#define pthread_mutex_destroy(_x)
Definition: rlm_eap.h:76
#define RCSID(id)
Definition: build.h:135
int next_delay
The next delay time.
Definition: connection.h:50
char * talloc_typed_strdup(void const *t, char const *p)
Call talloc strdup, setting the type on the new chunk correctly.
Definition: missing.c:588
fr_connection_create_t create
Function used to create new connections.
Definition: connection.c:135
void fr_connection_pool_ref(fr_connection_pool_t *pool)
Increment pool reference by one.
Definition: connection.c:1110
static void fr_connection_exec_trigger(fr_connection_pool_t *pool, char const *name_suffix)
Send a connection pool trigger.
Definition: connection.c:261
uint32_t pending
Number of pending open connections.
Definition: connection.h:38
#define ERROR(fmt,...)
Definition: log.h:145
int fr_connection_pool_reconnect(fr_connection_pool_t *pool)
Mark connections for reconnection, and spawn at least 'start' connections.
Definition: connection.c:1141
fr_connection_t * next
Next connection in list.
Definition: connection.c:52
uint32_t max_pending
Max number of connections to open.
Definition: connection.c:100
fr_connection_t * head
Start of the connection list.
Definition: connection.c:113