The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
pool.c
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or
5  * (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * @file pool.c
19  * @brief Handle pools of connections (threads, sockets, etc.)
20  * @note This API must be used by all modules in the public distribution that
21  * maintain pools of connections.
22  *
23  * @copyright 2012 The FreeRADIUS server project
24  * @copyright 2012 Alan DeKok (aland@deployingradius.com)
25  */
26 RCSID("$Id: a2a3bf4d2eee17a3c5460f4c6233b8043adf1dba $")
27 
28 #define LOG_PREFIX pool->log_prefix
29 
30 #include <freeradius-devel/server/base.h>
31 #include <freeradius-devel/server/modpriv.h>
32 #include <freeradius-devel/util/debug.h>
33 
34 #include <freeradius-devel/util/heap.h>
35 #include <freeradius-devel/util/misc.h>
36 
37 #include <time.h>
38 
40 
41 static int connection_check(fr_pool_t *pool, request_t *request);
42 static int max_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule);
43 
44 /** An individual connection within the connection pool
45  *
46  * Defines connection counters, timestamps, and holds a pointer to the
47  * connection handle itself.
48  *
49  * @see fr_pool_t
50  */
52  fr_pool_connection_t *prev; //!< Previous connection in list.
53  fr_pool_connection_t *next; //!< Next connection in list.
54  fr_heap_index_t heap_id; //!< For the next connection heap.
55 
56  fr_time_t created; //!< Time connection was created.
57  fr_time_t last_reserved; //!< Last time the connection was reserved.
58 
59  fr_time_t last_released; //!< Time the connection was released.
60 
61  uint32_t num_uses; //!< Number of times the connection has been reserved.
62  uint64_t number; //!< Unique ID assigned when the connection is created,
63  //!< these will monotonically increase over the
64  //!< lifetime of the connection pool.
65  void *connection; //!< Pointer to whatever the module uses for a connection
66  //!< handle.
67  bool in_use; //!< Whether the connection is currently reserved.
68 
69  bool needs_reconnecting; //!< Reconnect this connection before use.
70 
71 #ifdef PTHREAD_DEBUG
72  pthread_t pthread_id; //!< When 'in_use == true'.
73 #endif
74 };
75 
76 /** A connection pool
77  *
78  * Defines the configuration of the connection pool, all the counters and
79  * timestamps related to the connection pool, the mutex that stops multiple
80  * threads leaving the pool in an inconsistent state, and the callbacks
81  * required to open, close and check the status of connections within the pool.
82  *
83  * @see fr_connection
84  */
85 struct fr_pool_s {
86  int ref; //!< Reference counter to prevent connection
87  //!< pool being freed multiple times.
88  uint32_t start; //!< Number of initial connections.
89  uint32_t min; //!< Minimum number of concurrent connections to keep open.
90  uint32_t max; //!< Maximum number of concurrent connections to allow.
91  uint32_t max_pending; //!< Max number of pending connections to allow.
92  uint32_t spare; //!< Number of spare connections to try.
93  uint64_t max_uses; //!< Maximum number of times a connection can be used
94  //!< before being closed.
95  uint32_t pending_window; //!< Sliding window of pending connections.
96 
97  fr_time_delta_t retry_delay; //!< seconds to delay re-open after a failed open.
98  fr_time_delta_t cleanup_interval; //!< Initial timer for how often we sweep the pool
99  //!< for free connections. (0 is infinite).
100  fr_time_delta_t delay_interval; //!< When we next do a cleanup. Initialized to
101  //!< cleanup_interval, and increase from there based
102  //!< on the delay.
103  fr_time_delta_t lifetime; //!< How long a connection can be open before being
104  //!< closed (irrespective of whether it's idle or not).
105  fr_time_delta_t idle_timeout; //!< How long a connection can be idle before
106  //!< being closed.
107  fr_time_delta_t connect_timeout; //!< New connection timeout, enforced by the create
108  //!< callback.
109 
110  bool spread; //!< If true we spread requests over the connections,
111  //!< using the connection released longest ago, first.
112 
113  fr_heap_t *heap; //!< For the next connection heap
114 
115  fr_pool_connection_t *head; //!< Start of the connection list.
116  fr_pool_connection_t *tail; //!< End of the connection list.
117 
118  pthread_mutex_t mutex; //!< Mutex used to keep consistent state when making
119  //!< modifications in threaded mode.
120  pthread_cond_t done_spawn; //!< Threads that need to ensure no spawning is in progress,
121  //!< should block on this condition if pending != 0.
122  pthread_cond_t done_reconnecting; //!< Before calling the create callback, threads should
123  //!< block on this condition if reconnecting == true.
124 
125  CONF_SECTION const *cs; //!< Configuration section holding the section of parsed
126  //!< config file that relates to this pool.
127  void *opaque; //!< Pointer to context data that will be passed to callbacks.
128 
129  char const *log_prefix; //!< Log prefix to prepend to all log messages created
130  //!< by the connection pool code.
131 
132  bool triggers_enabled; //!< Whether we call the trigger functions.
133 
134  char const *trigger_prefix; //!< Prefix to prepend to names of all triggers
135  //!< fired by the connection pool code.
136  fr_pair_list_t trigger_args; //!< Arguments to make available in connection pool triggers.
137 
138  fr_time_delta_t held_trigger_min; //!< If a connection is held for less than the specified
139  //!< period, fire a trigger.
140  fr_time_delta_t held_trigger_max; //!< If a connection is held for longer than the specified
141  //!< period, fire a trigger.
142 
143  fr_pool_connection_create_t create; //!< Function used to create new connections.
144  fr_pool_connection_alive_t alive; //!< Function used to check status of connections.
145 
146  fr_pool_reconnect_t reconnect; //!< Called during connection pool reconnect.
147 
148  fr_pool_state_t state; //!< Stats and state of the connection pool.
149 };
150 
151 static const conf_parser_t pool_config[] = {
152  { FR_CONF_OFFSET("start", fr_pool_t, start), .dflt = "0" },
153  { FR_CONF_OFFSET("min", fr_pool_t, min), .dflt = "0" },
154  { FR_CONF_OFFSET("max", fr_pool_t, max), .dflt_func = max_dflt },
155  { FR_CONF_OFFSET("max_pending", fr_pool_t, max_pending), .dflt = "0" },
156  { FR_CONF_OFFSET("spare", fr_pool_t, spare), .dflt = "3" },
157  { FR_CONF_OFFSET("uses", fr_pool_t, max_uses), .dflt = "0" },
158  { FR_CONF_OFFSET("lifetime", fr_pool_t, lifetime), .dflt = "0" },
159  { FR_CONF_OFFSET("cleanup_interval", fr_pool_t, cleanup_interval), .dflt = "30" },
160  { FR_CONF_OFFSET("idle_timeout", fr_pool_t, idle_timeout), .dflt = "60" },
161  { FR_CONF_OFFSET("connect_timeout", fr_pool_t, connect_timeout), .dflt = "3.0" },
162  { FR_CONF_OFFSET("held_trigger_min", fr_pool_t, held_trigger_min), .dflt = "0.0" },
163  { FR_CONF_OFFSET("held_trigger_max", fr_pool_t, held_trigger_max), .dflt = "0.5" },
164  { FR_CONF_OFFSET("retry_delay", fr_pool_t, retry_delay), .dflt = "1" },
165  { FR_CONF_OFFSET("spread", fr_pool_t, spread), .dflt = "no" },
167 };
168 
169 static int max_dflt(CONF_PAIR **out, UNUSED void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
170 {
171  char *strvalue;
172 
173  strvalue = talloc_asprintf(NULL, "%u", main_config->max_workers);
174  *out = cf_pair_alloc(cs, rule->name1, strvalue, T_OP_EQ, T_BARE_WORD, quote);
175  talloc_free(strvalue);
176 
177  return 0;
178 }
179 
180 /** Order connections by reserved most recently
181  */
182 static int8_t last_reserved_cmp(void const *one, void const *two)
183 {
184  fr_pool_connection_t const *a = one, *b = two;
185 
186  return fr_time_cmp(a->last_reserved, b->last_reserved);
187 }
188 
189 /** Order connections by released longest ago
190  */
191 static int8_t last_released_cmp(void const *one, void const *two)
192 {
193  fr_pool_connection_t const *a = one, *b = two;
194 
195  return fr_time_cmp(a->last_released, b->last_released);
196 }
197 
198 /** Removes a connection from the connection list
199  *
200  * @note Must be called with the mutex held.
201  *
202  * @param[in] pool to modify.
203  * @param[in] this Connection to delete.
204  */
206 {
207  if (this->prev) {
208  fr_assert(pool->head != this);
209  this->prev->next = this->next;
210  } else {
211  fr_assert(pool->head == this);
212  pool->head = this->next;
213  }
214  if (this->next) {
215  fr_assert(pool->tail != this);
216  this->next->prev = this->prev;
217  } else {
218  fr_assert(pool->tail == this);
219  pool->tail = this->prev;
220  }
221 
222  this->prev = this->next = NULL;
223 }
224 
225 /** Adds a connection to the head of the connection list
226  *
227  * @note Must be called with the mutex held.
228  *
229  * @param[in] pool to modify.
230  * @param[in] this Connection to add.
231  */
233 {
234  fr_assert(pool != NULL);
235  fr_assert(this != NULL);
236  fr_assert(pool->head != this);
237  fr_assert(pool->tail != this);
238 
239  if (pool->head) {
240  pool->head->prev = this;
241  }
242 
243  this->next = pool->head;
244  this->prev = NULL;
245  pool->head = this;
246  if (!pool->tail) {
247  fr_assert(this->next == NULL);
248  pool->tail = this;
249  } else {
250  fr_assert(this->next != NULL);
251  }
252 }
253 
254 /** Send a connection pool trigger.
255  *
256  * @param[in] pool to send trigger for.
257  * @param[in] event trigger name suffix.
258  */
259 static inline void fr_pool_trigger_exec(fr_pool_t *pool, char const *event)
260 {
261  char name[128];
262 
263  fr_assert(pool != NULL);
264  fr_assert(event != NULL);
265 
266  if (!pool->triggers_enabled) return;
267 
268  snprintf(name, sizeof(name), "%s.%s", pool->trigger_prefix, event);
270 }
271 
272 /** Find a connection handle in the connection list
273  *
274  * Walks over the list of connections searching for a specified connection
275  * handle and returns the first connection that contains that pointer.
276  *
277  * @note Will lock mutex and only release mutex if connection handle
278  * is not found, so will usually return will mutex held.
279  * @note Must be called with the mutex free.
280  *
281  * @param[in] pool to search in.
282  * @param[in] conn handle to search for.
283  * @return
284  * - Connection containing the specified handle.
285  * - NULL if non if connection was found.
286  */
288 {
289  fr_pool_connection_t *this;
290 
291  if (!pool || !conn) return NULL;
292 
293  pthread_mutex_lock(&pool->mutex);
294 
295  /*
296  * FIXME: This loop could be avoided if we passed a 'void
297  * **connection' instead. We could use "offsetof" in
298  * order to find top of the parent structure.
299  */
300  for (this = pool->head; this != NULL; this = this->next) {
301  if (this->connection == conn) {
302 #ifdef PTHREAD_DEBUG
303  pthread_t pthread_id;
304 
305  pthread_id = pthread_self();
306  fr_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
307 #endif
308 
309  fr_assert(this->in_use == true);
310  return this;
311  }
312  }
313 
314  pthread_mutex_unlock(&pool->mutex);
315  return NULL;
316 }
317 
318 /** Spawns a new connection
319  *
320  * Spawns a new connection using the create callback, and returns it for
321  * adding to the connection list.
322  *
323  * @note Will call the 'open' trigger.
324  * @note Must be called with the mutex free.
325  *
326  * @param[in] pool to modify.
327  * @param[in] request The current request.
328  * @param[in] now Current time.
329  * @param[in] in_use whether the new connection should be "in_use" or not
330  * @param[in] unlock whether we should unlock the mutex before returning
331  * @return
332  * - New connection struct.
333  * - NULL on error.
334  */
335 static fr_pool_connection_t *connection_spawn(fr_pool_t *pool, request_t *request, fr_time_t now, bool in_use, bool unlock)
336 {
337  uint64_t number;
338  uint32_t pending_window;
339  TALLOC_CTX *ctx;
340 
341  fr_pool_connection_t *this;
342  void *conn;
343 
344  fr_assert(pool != NULL);
345 
346  /*
347  * If we have NO connections, and we've previously failed
348  * opening connections, don't open multiple connections until
349  * we successfully open at least one.
350  */
351  if ((pool->state.num == 0) &&
352  pool->state.pending &&
353  fr_time_gt(pool->state.last_failed, fr_time_wrap(0))) return NULL;
354 
355  pthread_mutex_lock(&pool->mutex);
356  fr_assert(pool->state.num <= pool->max);
357 
358  /*
359  * Don't spawn too many connections at the same time.
360  */
361  if ((pool->state.num + pool->state.pending) >= pool->max) {
362  pthread_mutex_unlock(&pool->mutex);
363 
364  ERROR("Cannot open new connection, already at max");
365  return NULL;
366  }
367 
368  /*
369  * If the last attempt failed, wait a bit before
370  * retrying.
371  */
372  if (fr_time_gt(pool->state.last_failed, fr_time_wrap(0)) &&
373  fr_time_gt(fr_time_add(pool->state.last_failed, pool->retry_delay), now)) {
374  bool complain = false;
375 
377  complain = true;
378 
379  pool->state.last_throttled = now;
380  }
381 
382  pthread_mutex_unlock(&pool->mutex);
383 
384  if (!fr_rate_limit_enabled() || complain) {
385  ERROR("Last connection attempt failed, waiting %pV seconds before retrying",
387  }
388 
389  return NULL;
390  }
391 
392  /*
393  * We limit the rate of new connections after a failed attempt.
394  */
395  if (pool->state.pending > pool->pending_window) {
396  pthread_mutex_unlock(&pool->mutex);
397 
399  "Cannot open a new connection due to rate limit after failure");
400 
401  return NULL;
402  }
403 
404  pool->state.pending++;
405  number = pool->state.count++;
406 
407  /*
408  * Don't starve out the thread trying to reconnect
409  * the pool, by continuously opening new connections.
410  */
411  while (pool->state.reconnecting) pthread_cond_wait(&pool->done_reconnecting, &pool->mutex);
412 
413  /*
414  * The true value for pending_window is the smaller of
415  * free connection slots, or pool->pending_window.
416  */
417  pending_window = (pool->max - pool->state.num);
418  if (pool->pending_window < pending_window) pending_window = pool->pending_window;
419  ROPTIONAL(RDEBUG2, DEBUG2, "Opening additional connection (%" PRIu64 "), %u of %u pending slots used",
420  number, pool->state.pending, pending_window);
421 
422  /*
423  * Unlock the mutex while we try to open a new
424  * connection. If there are issues with the back-end,
425  * opening a new connection may take a LONG time. In
426  * that case, we want the other connections to continue
427  * to be used.
428  */
429  pthread_mutex_unlock(&pool->mutex);
430 
431  /*
432  * Allocate a new top level ctx for the create callback
433  * to hang its memory off of.
434  */
435  ctx = talloc_init("fr_connection_ctx");
436  if (!ctx) return NULL;
437 
438  /*
439  * This may take a long time, which prevents other
440  * threads from releasing connections. We don't care
441  * about other threads opening new connections, as we
442  * already have no free connections.
443  */
444  conn = pool->create(ctx, pool->opaque, pool->connect_timeout);
445  if (!conn) {
446  ERROR("Opening connection failed (%" PRIu64 ")", number);
447 
448  pool->state.last_failed = now;
449  pthread_mutex_lock(&pool->mutex);
450  pool->pending_window = 1;
451  pool->state.pending--;
452 
453  /*
454  * Must be done inside the mutex, reconnect callback
455  * may modify args.
456  */
457  fr_pool_trigger_exec(pool, "fail");
458  pthread_cond_broadcast(&pool->done_spawn);
459  pthread_mutex_unlock(&pool->mutex);
460 
461  talloc_free(ctx);
462 
463  return NULL;
464  }
465 
466  /*
467  * And lock the mutex again while we link the new
468  * connection back into the pool.
469  */
470  pthread_mutex_lock(&pool->mutex);
471 
472  this = talloc_zero(pool, fr_pool_connection_t);
473  if (!this) {
474  pthread_cond_broadcast(&pool->done_spawn);
475  pthread_mutex_unlock(&pool->mutex);
476 
477  talloc_free(ctx);
478 
479  return NULL;
480  }
481  talloc_link_ctx(this, ctx);
482 
483  this->created = now;
484  this->connection = conn;
485  this->in_use = in_use;
486 
487  this->number = number;
488  this->last_reserved = fr_time();
489  this->last_released = this->last_reserved;
490 
491  /*
492  * The connection pool is starting up. Insert the
493  * connection into the heap.
494  */
495  if (!in_use) fr_heap_insert(&pool->heap, this);
496 
497  connection_link_head(pool, this);
498 
499  /*
500  * Do NOT insert the connection into the heap. That's
501  * done when the connection is released.
502  */
503 
504  pool->state.num++;
505 
506  fr_assert(pool->state.pending > 0);
507  pool->state.pending--;
508 
509  /*
510  * We've successfully opened one more connection. Allow
511  * more connections to open in parallel.
512  */
513  if ((pool->pending_window < pool->max) &&
514  ((pool->max_pending == 0) || (pool->pending_window < pool->max_pending))) {
515  pool->pending_window++;
516  }
517 
518  pool->state.last_spawned = fr_time();
519  pool->delay_interval = pool->cleanup_interval;
520  pool->state.next_delay = pool->cleanup_interval;
521  pool->state.last_failed = fr_time_wrap(0);
522 
523  /*
524  * Must be done inside the mutex, reconnect callback
525  * may modify args.
526  */
527  fr_pool_trigger_exec(pool, "open");
528 
529  pthread_cond_broadcast(&pool->done_spawn);
530  if (unlock) pthread_mutex_unlock(&pool->mutex);
531 
532  /* coverity[missing_unlock] */
533  return this;
534 }
535 
536 /** Close an existing connection.
537  *
538  * Removes the connection from the list, calls the delete callback to close
539  * the connection, then frees memory allocated to the connection.
540  *
541  * @note Will call the 'close' trigger.
542  * @note Must be called with the mutex held.
543  *
544  * @param[in] pool to modify.
545  * @param[in] this Connection to delete.
546  */
548 {
549  /*
550  * If it's in use, release it.
551  */
552  if (this->in_use) {
553 #ifdef PTHREAD_DEBUG
554  pthread_t pthread_id = pthread_self();
555  fr_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
556 #endif
557 
558  this->in_use = false;
559 
560  fr_assert(pool->state.active != 0);
561  pool->state.active--;
562 
563  } else {
564  /*
565  * Connection isn't used, remove it from the heap.
566  */
567  fr_heap_extract(&pool->heap, this);
568  }
569 
570  fr_pool_trigger_exec(pool, "close");
571 
572  connection_unlink(pool, this);
573 
574  fr_assert(pool->state.num > 0);
575  pool->state.num--;
576  talloc_free(this);
577 }
578 
579 /** Check whether a connection needs to be removed from the pool
580  *
581  * Will verify that the connection is within idle_timeout, max_uses, and
582  * lifetime values. If it is not, the connection will be closed.
583  *
584  * @note Will only close connections not in use.
585  * @note Must be called with the mutex held.
586  *
587  * @param[in] pool to modify.
588  * @param[in] request The current request.
589  * @param[in] this Connection to manage.
590  * @param[in] now Current time.
591  * @return
592  * - 0 if connection was closed.
593  * - 1 if connection handle was left open.
594  */
595 static int connection_manage(fr_pool_t *pool, request_t *request, fr_pool_connection_t *this, fr_time_t now)
596 {
597  fr_assert(pool != NULL);
598  fr_assert(this != NULL);
599 
600  /*
601  * Don't terminated in-use connections
602  */
603  if (this->in_use) return 1;
604 
605  if (this->needs_reconnecting) {
606  ROPTIONAL(RDEBUG2, DEBUG2, "Closing expired connection (%" PRIu64 "): Needs reconnecting",
607  this->number);
608  do_delete:
609  if (pool->state.num <= pool->min) {
610  ROPTIONAL(RDEBUG2, DEBUG2, "You probably need to lower \"min\"");
611  }
612  connection_close_internal(pool, this);
613  return 0;
614  }
615 
616  if ((pool->max_uses > 0) &&
617  (this->num_uses >= pool->max_uses)) {
618  ROPTIONAL(RDEBUG2, DEBUG2, "Closing expired connection (%" PRIu64 "): Hit max_uses limit",
619  this->number);
620  goto do_delete;
621  }
622 
623  if (fr_time_delta_ispos(pool->lifetime) &&
624  (fr_time_lt(fr_time_add(this->created, pool->lifetime), now))) {
625  ROPTIONAL(RDEBUG2, DEBUG2, "Closing expired connection (%" PRIu64 "): Hit lifetime limit",
626  this->number);
627  goto do_delete;
628  }
629 
630  if (fr_time_delta_ispos(pool->idle_timeout) &&
631  (fr_time_lt(fr_time_add(this->last_released, pool->idle_timeout), now))) {
632  ROPTIONAL(RINFO, INFO, "Closing connection (%" PRIu64 "): Hit idle_timeout, was idle for %pVs",
633  this->number, fr_box_time_delta(fr_time_sub(now, this->last_released)));
634  goto do_delete;
635  }
636 
637  return 1;
638 }
639 
640 
641 /** Check whether any connections need to be removed from the pool
642  *
643  * Maintains the number of connections in the pool as per the configuration
644  * parameters for the connection pool.
645  *
646  * @note Will only run checks the first time it's called in a given second,
647  * to throttle connection spawning/closing.
648  * @note Will only close connections not in use.
649  * @note Must be called with the mutex held, will release mutex before returning.
650  *
651  * @param[in] pool to manage.
652  * @param[in] request The current request.
653  * @return 1
654  */
655 static int connection_check(fr_pool_t *pool, request_t *request)
656 {
657  uint32_t num, spare;
658  fr_time_t now = fr_time();
659  fr_pool_connection_t *this, *next;
660 
662  pthread_mutex_unlock(&pool->mutex);
663  return 1;
664  }
665 
666  /*
667  * Get "real" number of connections, and count pending
668  * connections as spare.
669  */
670  num = pool->state.num + pool->state.pending;
671  spare = pool->state.pending + (pool->state.num - pool->state.active);
672 
673  /*
674  * The other end can close connections. If so, we'll
675  * have fewer than "min". When that happens, open more
676  * connections to enforce "min".
677  *
678  * The code for spawning connections enforces that
679  * num + pending <= max.
680  */
681  if (num < pool->min) {
682  ROPTIONAL(RINFO, INFO, "Need %i more connections to reach min connections (%i)", pool->min - num, pool->min);
683  goto add_connection;
684  }
685 
686  /*
687  * On the odd chance that we've opened too many
688  * connections, take care of that.
689  */
690  if (num > pool->max) {
691  /*
692  * Pending connections don't get closed as "spare".
693  */
694  if (pool->state.pending > 0) goto manage_connections;
695 
696  /*
697  * Otherwise close one of the connections to
698  * bring us down to "max".
699  */
700  goto close_connection;
701  }
702 
703  /*
704  * Now that we've enforced min/max connections, try to
705  * keep the "spare" connections at the correct number.
706  */
707 
708  /*
709  * Nothing to do? Go check all of the connections for
710  * timeouts, etc.
711  */
712  if (spare == pool->spare) goto manage_connections;
713 
714  /*
715  * Too many spare connections, delete some.
716  */
717  if (spare > pool->spare) {
718  fr_pool_connection_t *found;
719 
720  /*
721  * Pending connections don't get closed as "spare".
722  */
723  if (pool->state.pending > 0) goto manage_connections;
724 
725  /*
726  * Don't close too many connections, even they
727  * are spare.
728  */
729  if (num <= pool->min) goto manage_connections;
730 
731  /*
732  * Too many spares, go close one.
733  */
734 
735  close_connection:
736  /*
737  * Don't close connections too often, in order to
738  * prevent flapping. Coverity doesn't notice that
739  * all callers have the lock, so we annotate the issue.
740  */
741  /* coverity[missing_lock] */
742  if (fr_time_lt(now, fr_time_add(pool->state.last_spawned, pool->delay_interval))) goto manage_connections;
743 
744  /*
745  * Find a connection to close.
746  */
747  found = NULL;
748  for (this = pool->tail; this != NULL; this = this->prev) {
749  if (this->in_use) continue;
750 
751  if (!found || (fr_time_lt(this->last_reserved, found->last_reserved))) found = this;
752  }
753 
754  if (!fr_cond_assert(found)) goto done;
755 
756  ROPTIONAL(RDEBUG2, DEBUG2, "Closing connection (%" PRIu64 ") as we have too many unused connections",
757  found->number);
758  connection_close_internal(pool, found);
759 
760  /*
761  * Decrease the delay for the next time we clean
762  * up.
763  */
767 
768  goto manage_connections;
769  }
770 
771  /*
772  * Too few connections, open some more.
773  */
774  if (spare < pool->spare) {
775  /*
776  * Don't open too many pending connections.
777  * Again, coverity doesn't realize all callers have the lock,
778  * so we must annotate here as well.
779  */
780  /* coverity[missing_lock] */
781  if (pool->state.pending >= pool->pending_window) goto manage_connections;
782 
783  /*
784  * Don't open too many connections, even if we
785  * need more spares.
786  */
787  if (num >= pool->max) goto manage_connections;
788 
789  /*
790  * Too few spares, go add one.
791  */
792  ROPTIONAL(RINFO, INFO, "Need %i more connections to reach %i spares", pool->spare - spare, pool->spare);
793 
794  add_connection:
795  /*
796  * Only try to open spares if we're not already attempting to open
797  * a connection. Avoids spurious log messages.
798  */
799  pthread_mutex_unlock(&pool->mutex);
800  (void) connection_spawn(pool, request, now, false, true);
801  pthread_mutex_lock(&pool->mutex);
802  goto manage_connections;
803  }
804 
805  /*
806  * Pass over all of the connections in the pool, limiting
807  * lifetime, idle time, max requests, etc.
808  */
809 manage_connections:
810  for (this = pool->head; this != NULL; this = next) {
811  next = this->next;
812  connection_manage(pool, request, this, now);
813  }
814 
815  pool->state.last_checked = now;
816 
817 done:
818  pthread_mutex_unlock(&pool->mutex);
819 
820  return 1;
821 }
822 
823 /** Get a connection from the connection pool
824  *
825  * @note Must be called with the mutex free.
826  *
827  * @param[in] pool to reserve the connection from.
828  * @param[in] request The current request.
829  * @param[in] spawn whether to spawn a new connection
830  * @return
831  * - A pointer to the connection handle.
832  * - NULL on error.
833  */
834 static void *connection_get_internal(fr_pool_t *pool, request_t *request, bool spawn)
835 {
836  fr_time_t now;
837  fr_pool_connection_t *this;
838 
839  if (!pool) return NULL;
840 
841  pthread_mutex_lock(&pool->mutex);
842 
843  now = fr_time();
844 
845  /*
846  * Grab the link with the lowest latency, and check it
847  * for limits. If "connection manage" says the link is
848  * no longer usable, go grab another one.
849  */
850  do {
851  this = fr_heap_peek(pool->heap);
852  if (!this) break;
853  } while (!connection_manage(pool, request, this, now));
854 
855  /*
856  * We have a working connection. Extract it from the
857  * heap and use it.
858  */
859  if (this) {
860  fr_heap_extract(&pool->heap, this);
861  goto do_return;
862  }
863 
864  if (pool->state.num == pool->max) {
865  bool complain = false;
866 
867  /*
868  * Rate-limit complaints.
869  */
871  complain = true;
872  pool->state.last_at_max = now;
873  }
874 
875  pthread_mutex_unlock(&pool->mutex);
876  if (!fr_rate_limit_enabled() || complain) {
877  ERROR("No connections available and at max connection limit");
878  /*
879  * Must be done inside the mutex, reconnect callback
880  * may modify args.
881  */
882  fr_pool_trigger_exec(pool, "none");
883  }
884 
885  return NULL;
886  }
887 
888  pthread_mutex_unlock(&pool->mutex);
889 
890  if (!spawn) return NULL;
891 
892  ROPTIONAL(RDEBUG2, DEBUG2, "%i of %u connections in use. You may need to increase \"spare\"",
893  pool->state.active, pool->state.num);
894 
895  /*
896  * Returns unlocked on failure, or locked on success
897  */
898  this = connection_spawn(pool, request, now, true, false);
899  if (!this) return NULL;
900 
901 do_return:
902  pool->state.active++;
903  this->num_uses++;
904  this->last_reserved = fr_time();
905  this->in_use = true;
906 
907 #ifdef PTHREAD_DEBUG
908  this->pthread_id = pthread_self();
909 #endif
910  pthread_mutex_unlock(&pool->mutex);
911 
912  ROPTIONAL(RDEBUG2, DEBUG2, "Reserved connection (%" PRIu64 ")", this->number);
913 
914  return this->connection;
915 }
916 
917 /** Enable triggers for a connection pool
918  *
919  * @param[in] pool to enable triggers for.
920  * @param[in] trigger_prefix prefix to prepend to all trigger names. Usually a path
921  * to the module's trigger configuration .e.g.
922  * @verbatim modules.<name>.pool @endverbatim
923  * @verbatim <trigger name> @endverbatim is appended to form
924  * the complete path.
925  * @param[in] trigger_args to make available in any triggers executed by the connection pool.
926  * These will usually be fr_pair_t (s) describing the host
927  * associated with the pool.
928  * Trigger args will be copied, input trigger_args should be freed
929  * if necessary.
930  */
931 void fr_pool_enable_triggers(fr_pool_t *pool, char const *trigger_prefix, fr_pair_list_t *trigger_args)
932 {
933  pool->triggers_enabled = true;
934 
936  MEM(pool->trigger_prefix = trigger_prefix ? talloc_typed_strdup(pool, trigger_prefix) : "");
937 
939 
940  if (!trigger_args) return;
941 
942  MEM(fr_pair_list_copy(pool, &pool->trigger_args, trigger_args) >= 0);
943 }
944 
945 /** Create a new connection pool
946  *
947  * Allocates structures used by the connection pool, initialises the various
948  * configuration options and counters, and sets the callback functions.
949  *
950  * Will also spawn the number of connections specified by the 'start' configuration
951  * option.
952  *
953  * @note Will call the 'start' trigger.
954  *
955  * @param[in] ctx Context to link pool's destruction to.
956  * @param[in] cs pool section.
957  * @param[in] opaque data pointer to pass to callbacks.
958  * @param[in] c Callback to create new connections.
959  * @param[in] a Callback to check the status of connections.
960  * @param[in] log_prefix prefix to prepend to all log messages.
961  * @return
962  * - New connection pool.
963  * - NULL on error.
964  */
965 fr_pool_t *fr_pool_init(TALLOC_CTX *ctx,
966  CONF_SECTION const *cs,
967  void *opaque,
969  char const *log_prefix)
970 {
971  fr_pool_t *pool = NULL;
972 
973  if (!cs || !opaque || !c) return NULL;
974 
975  /*
976  * Pool is allocated in the NULL context as
977  * threads are likely to allocate memory
978  * beneath the pool.
979  */
980  MEM(pool = talloc_zero(NULL, fr_pool_t));
982 
983  /*
984  * Ensure the pool is freed at the same time
985  * as its parent.
986  */
987  if (ctx && (talloc_link_ctx(ctx, pool) < 0)) {
988  PERROR("%s: Failed linking pool ctx", __FUNCTION__);
989  talloc_free(pool);
990 
991  return NULL;
992  }
993 
994  pool->cs = cs;
995  pool->opaque = opaque;
996  pool->create = c;
997  pool->alive = a;
998 
999  pool->head = pool->tail = NULL;
1000 
1001  /*
1002  * We keep a heap of connections, sorted by the last time
1003  * we STARTED using them. Newly opened connections
1004  * aren't in the heap. They're only inserted in the list
1005  * once they're released.
1006  *
1007  * We do "most recently started" instead of "most
1008  * recently used", because MRU is done as most recently
1009  * *released*. We want to order connections by
1010  * responsiveness, and MRU prioritizes high latency
1011  * connections.
1012  *
1013  * We want most recently *started*, which gives
1014  * preference to low latency links, and pushes high
1015  * latency links down in the priority heap.
1016  *
1017  * https://code.facebook.com/posts/1499322996995183/solving-the-mystery-of-link-imbalance-a-metastable-failure-state-at-scale/
1018  */
1019  if (!pool->spread) {
1021  /*
1022  * For some types of connections we need to used a different
1023  * algorithm, because load balancing benefits are secondary
1024  * to maintaining a cache of open connections.
1025  *
1026  * With libcurl's multihandle, connections can only be reused
1027  * if all handles that make up the multhandle are done processing
1028  * their requests.
1029  *
1030  * We can't tell when that's happened using libcurl, and even
1031  * if we could, blocking until all servers had responded
1032  * would have huge cost.
1033  *
1034  * The solution is to order the heap so that the connection that
1035  * was released longest ago is at the top.
1036  *
1037  * That way we maximise time between connection use.
1038  */
1039  } else {
1041  }
1042  if (!pool->heap) {
1043  ERROR("%s: Failed creating connection heap", __FUNCTION__);
1044  error:
1045  fr_pool_free(pool);
1046  return NULL;
1047  }
1048 
1049  pool->log_prefix = log_prefix ? talloc_typed_strdup(pool, log_prefix) : "core";
1050  pthread_mutex_init(&pool->mutex, NULL);
1051  pthread_cond_init(&pool->done_spawn, NULL);
1052  pthread_cond_init(&pool->done_reconnecting, NULL);
1053 
1054  DEBUG2("Initialising connection pool");
1055 
1056  if (cf_section_rules_push(UNCONST(CONF_SECTION *, cs), pool_config) < 0) goto error;
1057  if (cf_section_parse(pool, pool, UNCONST(CONF_SECTION *, cs)) < 0) {
1058  PERROR("Configuration parsing failed");
1059  goto error;
1060  }
1061 
1062  /*
1063  * Some simple limits
1064  */
1065  if (pool->max == 0) {
1066  cf_log_err(cs, "Cannot set 'max' to zero");
1067  goto error;
1068  }
1069 
1070  /*
1071  * Coverity notices that other uses of max_pending are protected with a mutex,
1072  * and thus thinks it should be locked/unlocked here...but coverity does not
1073  * consider that until this function returns a pointer to a pool, nobody can
1074  * use the pool, so there's no point to doing so.
1075  */
1076  /* coverity[missing_lock] */
1077  pool->pending_window = (pool->max_pending > 0) ? pool->max_pending : pool->max;
1078 
1079  if (pool->min > pool->max) {
1080  cf_log_err(cs, "Cannot set 'min' to more than 'max'");
1081  goto error;
1082  }
1083 
1084  FR_INTEGER_BOUND_CHECK("max", pool->max, <=, 1024);
1085  FR_INTEGER_BOUND_CHECK("start", pool->start, <=, pool->max);
1086  FR_INTEGER_BOUND_CHECK("spare", pool->spare, <=, (pool->max - pool->min));
1087 
1088  if (fr_time_delta_ispos(pool->lifetime)) {
1089  FR_TIME_DELTA_COND_CHECK("idle_timeout", pool->idle_timeout,
1091  }
1092 
1093  if (fr_time_delta_ispos(pool->idle_timeout)) {
1094  FR_TIME_DELTA_BOUND_CHECK("cleanup_interval", pool->cleanup_interval, <=, pool->idle_timeout);
1095  }
1096 
1097  /*
1098  * Some libraries treat 0.0 as infinite timeout, others treat it
1099  * as instantaneous timeout. Solve the inconsistency by making
1100  * the smallest allowable timeout 100ms.
1101  */
1102  FR_TIME_DELTA_BOUND_CHECK("connect_timeout", pool->connect_timeout, >=, fr_time_delta_from_msec(100));
1103 
1104  /*
1105  * Don't open any connections. Instead, force the limits
1106  * to only 1 connection.
1107  *
1108  */
1109  if (check_config) pool->start = pool->min = pool->max = 1;
1110 
1111  return pool;
1112 }
1113 
1115 {
1116  uint32_t i;
1117  fr_pool_connection_t *this;
1118 
1119  /*
1120  * Don't spawn any connections
1121  */
1122  if (check_config) return 0;
1123 
1124  /*
1125  * Create all of the connections, unless the admin says
1126  * not to.
1127  */
1128  for (i = 0; i < pool->start; i++) {
1129  /*
1130  * Call time() once for each spawn attempt as there
1131  * could be a significant delay.
1132  */
1133  this = connection_spawn(pool, NULL, fr_time(), false, true);
1134  if (!this) {
1135  ERROR("Failed spawning initial connections");
1136  return -1;
1137  }
1138  }
1139 
1140  fr_pool_trigger_exec(pool, "start");
1141 
1142  return 0;
1143 }
1144 
1145 /** Allocate a new pool using an existing one as a template
1146  *
1147  * @param[in] ctx to allocate new pool in.
1148  * @param[in] pool to copy.
1149  * @param[in] opaque data to pass to connection function.
1150  * @return
1151  * - New connection pool.
1152  * - NULL on error.
1153  */
1154 fr_pool_t *fr_pool_copy(TALLOC_CTX *ctx, fr_pool_t *pool, void *opaque)
1155 {
1156  fr_pool_t *copy;
1157 
1158  copy = fr_pool_init(ctx, pool->cs, opaque, pool->create, pool->alive, pool->log_prefix);
1159  if (!copy) return NULL;
1160 
1161  if (pool->trigger_prefix) fr_pool_enable_triggers(copy, pool->trigger_prefix, &pool->trigger_args);
1162 
1163  return copy;
1164 }
1165 
1166 /** Get the number of connections currently in the pool
1167  *
1168  * @param[in] pool to count connections for.
1169  * @return the number of connections in the pool
1170  */
1172 {
1173  return &pool->state;
1174 }
1175 
1176 /** Connection pool get timeout
1177  *
1178  * @param[in] pool to get connection timeout for.
1179  * @return the connection timeout configured for the pool.
1180  */
1182 {
1183  return pool->connect_timeout;
1184 }
1185 
1186 /** Connection pool get start
1187  *
1188  * @param[in] pool to get connection start for.
1189  * @return the connection start value configured for the pool.
1190  */
1192 {
1193  return pool->start;
1194 }
1195 
1196 /** Return the opaque data associated with a connection pool
1197  *
1198  * @param pool to return data for.
1199  * @return opaque data associated with pool.
1200  */
1201 void const *fr_pool_opaque(fr_pool_t *pool)
1202 {
1203  return pool->opaque;
1204 }
1205 
1206 /** Increment pool reference by one.
1207  *
1208  * @param[in] pool to increment reference counter for.
1209  */
1211 {
1212  pool->ref++;
1213 }
1214 
1215 /** Set a reconnection callback for the connection pool
1216  *
1217  * This can be called at any time during the pool's lifecycle.
1218  *
1219  * @param[in] pool to set reconnect callback for.
1220  * @param reconnect callback to call when reconnecting pool's connections.
1221  */
1223 {
1224  pool->reconnect = reconnect;
1225 }
1226 
1227 /** Mark connections for reconnection, and spawn at least 'start' connections
1228  *
1229  * @note This call may block whilst waiting for pending connection attempts to complete.
1230  *
1231  * This intended to be called on a connection pool that's in use, to have it reflect
1232  * a configuration change, or because the administrator knows that all connections
1233  * in the pool are inviable and need to be reconnected.
1234  *
1235  * @param[in] pool to reconnect.
1236  * @param[in] request The current request.
1237  * @return
1238  * - 0 On success.
1239  * - -1 If we couldn't create start connections, this may be ignored
1240  * depending on the context in which this function is being called.
1241  */
1243 {
1244  uint32_t i;
1245  fr_pool_connection_t *this;
1246  fr_time_t now;
1247 
1248  pthread_mutex_lock(&pool->mutex);
1249 
1250  /*
1251  * Pause new spawn attempts (we release the mutex
1252  * during our cond wait).
1253  */
1254  pool->state.reconnecting = true;
1255 
1256  /*
1257  * When the loop exits, we'll hold the lock for the pool,
1258  * and we're guaranteed the connection create callback
1259  * will not be using the opaque data.
1260  */
1261  while (pool->state.pending) pthread_cond_wait(&pool->done_spawn, &pool->mutex);
1262 
1263  /*
1264  * We want to ensure at least 'start' connections
1265  * have been reconnected. We can't call reconnect
1266  * because, we might get the same connection each
1267  * time we reserve one, so we close 'start'
1268  * connections, and then attempt to spawn them again.
1269  */
1270  for (i = 0; i < pool->start; i++) {
1271  this = fr_heap_peek(pool->heap);
1272  if (!this) break; /* There wasn't 'start' connections available */
1273 
1274  connection_close_internal(pool, this);
1275  }
1276 
1277  /*
1278  * Mark all remaining connections in the pool as
1279  * requiring reconnection.
1280  */
1281  for (this = pool->head; this; this = this->next) this->needs_reconnecting = true;
1282 
1283  /*
1284  * Call the reconnect callback (if one's set)
1285  * This may modify the opaque data associated
1286  * with the pool.
1287  */
1288  if (pool->reconnect) pool->reconnect(pool, pool->opaque);
1289 
1290  /*
1291  * Must be done inside the mutex, reconnect callback
1292  * may modify args.
1293  */
1294  fr_pool_trigger_exec(pool, "reconnect");
1295 
1296  /*
1297  * Allow new spawn attempts, and wakeup any threads
1298  * waiting to spawn new connections.
1299  */
1300  pool->state.reconnecting = false;
1301  pthread_cond_broadcast(&pool->done_reconnecting);
1302  pthread_mutex_unlock(&pool->mutex);
1303 
1304  now = fr_time();
1305 
1306  /*
1307  * Now attempt to spawn 'start' connections.
1308  */
1309  for (i = 0; i < pool->start; i++) {
1310  this = connection_spawn(pool, request, now, false, true);
1311  if (!this) return -1;
1312  }
1313 
1314  return 0;
1315 }
1316 
1317 /** Delete a connection pool
1318  *
1319  * Closes, unlinks and frees all connections in the connection pool, then frees
1320  * all memory used by the connection pool.
1321  *
1322  * @note Will call the 'stop' trigger.
1323  * @note Must be called with the mutex free.
1324  *
1325  * @param[in,out] pool to delete.
1326  */
1328 {
1329  fr_pool_connection_t *this;
1330 
1331  if (!pool) return;
1332 
1333  /*
1334  * More modules hold a reference to this pool, don't free
1335  * it yet.
1336  */
1337  if (pool->ref > 0) {
1338  pool->ref--;
1339  return;
1340  }
1341 
1342  DEBUG2("Removing connection pool");
1343 
1344  pthread_mutex_lock(&pool->mutex);
1345 
1346  /*
1347  * Don't loop over the list. Just keep removing the head
1348  * until they're all gone.
1349  */
1350  while ((this = pool->head) != NULL) {
1351  INFO("Closing connection (%" PRIu64 ")", this->number);
1352 
1353  connection_close_internal(pool, this);
1354  }
1355 
1356  fr_pool_trigger_exec(pool, "stop");
1357 
1358  fr_assert(pool->head == NULL);
1359  fr_assert(pool->tail == NULL);
1360  fr_assert(pool->state.num == 0);
1361 
1362  pthread_mutex_unlock(&pool->mutex);
1363  pthread_mutex_destroy(&pool->mutex);
1364  pthread_cond_destroy(&pool->done_spawn);
1365  pthread_cond_destroy(&pool->done_reconnecting);
1366 
1367  talloc_free(pool);
1368 }
1369 
1370 /** Reserve a connection in the connection pool
1371  *
1372  * Will attempt to find an unused connection in the connection pool, if one is
1373  * found, will mark it as in in use increment the number of active connections
1374  * and return the connection handle.
1375  *
1376  * If no free connections are found will attempt to spawn a new one, conditional
1377  * on a connection spawning not already being in progress, and not being at the
1378  * 'max' connection limit.
1379  *
1380  * @note fr_pool_connection_release must be called once the caller has finished
1381  * using the connection.
1382  *
1383  * @see fr_pool_connection_release
1384  * @param[in] pool to reserve the connection from.
1385  * @param[in] request The current request.
1386  * @return
1387  * - A pointer to the connection handle.
1388  * - NULL on error.
1389  */
1391 {
1392  return connection_get_internal(pool, request, true);
1393 }
1394 
1395 /** Release a connection
1396  *
1397  * Will mark a connection as unused and decrement the number of active
1398  * connections.
1399  *
1400  * @see fr_pool_connection_get
1401  * @param[in] pool to release the connection in.
1402  * @param[in] request The current request.
1403  * @param[in] conn to release.
1404  */
1405 void fr_pool_connection_release(fr_pool_t *pool, request_t *request, void *conn)
1406 {
1407  fr_pool_connection_t *this;
1408  fr_time_delta_t held;
1409  bool trigger_min = false, trigger_max = false;
1410 
1411  this = connection_find(pool, conn);
1412  if (!this) return;
1413 
1414  this->in_use = false;
1415 
1416  /*
1417  * Record when the connection was last released
1418  */
1419  this->last_released = fr_time();
1420  pool->state.last_released = this->last_released;
1421 
1422  /*
1423  * This is done inside the mutex to ensure
1424  * updates are atomic.
1425  */
1426  held = fr_time_sub(this->last_released, this->last_reserved);
1427 
1428  /*
1429  * Check we've not exceeded out trigger limits
1430  *
1431  * These should only fire once per second.
1432  */
1434  (fr_time_delta_lt(held, pool->held_trigger_min)) &&
1435  (fr_time_delta_gteq(fr_time_sub(this->last_released, pool->state.last_held_min), fr_time_delta_from_sec(1)))) {
1436  trigger_min = true;
1437  pool->state.last_held_min = this->last_released;
1438  }
1439 
1441  (fr_time_delta_gt(held, pool->held_trigger_max)) &&
1442  (fr_time_delta_gteq(fr_time_sub(this->last_released, pool->state.last_held_max), fr_time_delta_from_sec(1)))) {
1443  trigger_max = true;
1444  pool->state.last_held_max = this->last_released;
1445  }
1446 
1447  /*
1448  * Insert the connection in the heap.
1449  *
1450  * This will either be based on when we *started* using it
1451  * (allowing fast links to be re-used, and slow links to be
1452  * gradually expired), or when we released it (allowing
1453  * the maximum amount of time between connection use).
1454  */
1455  fr_heap_insert(&pool->heap, this);
1456 
1457  fr_assert(pool->state.active != 0);
1458  pool->state.active--;
1459 
1460  ROPTIONAL(RDEBUG2, DEBUG2, "Released connection (%" PRIu64 ")", this->number);
1461 
1462  /*
1463  * We mirror the "spawn on get" functionality by having
1464  * "delete on release". If there are too many spare
1465  * connections, go manage the pool && clean some up.
1466  */
1467  connection_check(pool, request);
1468 
1469  if (trigger_min) fr_pool_trigger_exec(pool, "min");
1470  if (trigger_max) fr_pool_trigger_exec(pool, "max");
1471 }
1472 
1473 /** Reconnect a suspected inviable connection
1474  *
1475  * This should be called by the module if it suspects that a connection is
1476  * not viable (e.g. the server has closed it).
1477  *
1478  * When implementing a module that uses the connection pool API, it is advisable
1479  * to pass a pointer to the pointer to the handle (void **conn)
1480  * to all functions which may call reconnect. This is so that if a new handle
1481  * is created and returned, the handle pointer can be updated up the callstack,
1482  * and a function higher up the stack doesn't attempt to use a now invalid
1483  * connection handle.
1484  *
1485  * @note Will free any talloced memory hung off the context of the connection,
1486  * being reconnected.
1487  *
1488  * @warning After calling reconnect the caller *MUST NOT* attempt to use
1489  * the old handle in any other operations, as its memory will have been
1490  * freed.
1491  *
1492  * @see fr_pool_connection_get
1493  * @param[in] pool to reconnect the connection in.
1494  * @param[in] request The current request.
1495  * @param[in] conn to reconnect.
1496  * @return new connection handle if successful else NULL.
1497  */
1498 void *fr_pool_connection_reconnect(fr_pool_t *pool, request_t *request, void *conn)
1499 {
1500  fr_pool_connection_t *this;
1501 
1502  if (!pool || !conn) return NULL;
1503 
1504  /*
1505  * If connection_find is successful the pool is now locked
1506  */
1507  this = connection_find(pool, conn);
1508  if (!this) return NULL;
1509 
1510  ROPTIONAL(RINFO, INFO, "Deleting inviable connection (%" PRIu64 ")", this->number);
1511 
1512  connection_close_internal(pool, this);
1513  connection_check(pool, request); /* Whilst we still have the lock (will release the lock) */
1514 
1515  /*
1516  * Return an existing connection or spawn a new one.
1517  */
1518  return connection_get_internal(pool, request, true);
1519 }
1520 
1521 /** Delete a connection from the connection pool.
1522  *
1523  * Resolves the connection handle to a connection, then (if found)
1524  * closes, unlinks and frees that connection.
1525  *
1526  * @note Must be called with the mutex free.
1527  *
1528  * @param[in] pool Connection pool to modify.
1529  * @param[in] request The current request.
1530  * @param[in] conn to delete.
1531  * @return
1532  * - 0 If the connection could not be found.
1533  * - 1 if the connection was deleted.
1534  */
1535 int fr_pool_connection_close(fr_pool_t *pool, request_t *request, void *conn)
1536 {
1537  fr_pool_connection_t *this;
1538 
1539  this = connection_find(pool, conn);
1540  if (!this) return 0;
1541 
1542  /*
1543  * Record the last time a connection was closed
1544  */
1545  pool->state.last_closed = fr_time();
1546 
1547  ROPTIONAL(RINFO, INFO, "Deleting connection (%" PRIu64 ")", this->number);
1548 
1549  connection_close_internal(pool, this);
1550  connection_check(pool, request);
1551  return 1;
1552 }
#define UNCONST(_type, _ptr)
Remove const qualification from a pointer.
Definition: build.h:165
#define RCSID(id)
Definition: build.h:444
#define UNUSED
Definition: build.h:313
bool check_config
Definition: cf_file.c:66
int cf_section_parse(TALLOC_CTX *ctx, void *base, CONF_SECTION *cs)
Parse a configuration section into user-supplied variables.
Definition: cf_parse.c:985
#define CONF_PARSER_TERMINATOR
Definition: cf_parse.h:626
#define FR_INTEGER_BOUND_CHECK(_name, _var, _op, _bound)
Definition: cf_parse.h:486
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition: cf_parse.h:268
#define FR_TIME_DELTA_COND_CHECK(_name, _var, _cond, _new)
Definition: cf_parse.h:488
#define cf_section_rules_push(_cs, _rule)
Definition: cf_parse.h:658
char const * name1
Name of the CONF_ITEM to parse.
Definition: cf_parse.h:564
#define FR_TIME_DELTA_BOUND_CHECK(_name, _var, _op, _bound)
Definition: cf_parse.h:497
Defines a CONF_PAIR to C data type mapping.
Definition: cf_parse.h:563
Configuration AVP similar to a fr_pair_t.
Definition: cf_priv.h:70
A section grouping multiple CONF_PAIR.
Definition: cf_priv.h:89
CONF_PAIR * cf_pair_alloc(CONF_SECTION *parent, char const *attr, char const *value, fr_token_t op, fr_token_t lhs_quote, fr_token_t rhs_quote)
Allocate a CONF_PAIR.
Definition: cf_util.c:1220
#define cf_log_err(_cf, _fmt,...)
Definition: cf_util.h:265
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:137
#define ERROR(fmt,...)
Definition: dhcpclient.c:41
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition: heap.c:146
int fr_heap_extract(fr_heap_t **hp, void *data)
Remove a node from the heap.
Definition: heap.c:239
unsigned int fr_heap_index_t
Definition: heap.h:80
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition: heap.h:136
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition: heap.h:115
The main heap structure.
Definition: heap.h:66
unlang_interpret_t * unlang_interpret_get_thread_default(void)
Get the default interpreter for this thread.
Definition: interpret.c:1768
#define PERROR(_fmt,...)
Definition: log.h:228
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition: log.h:528
#define RWARN(fmt,...)
Definition: log.h:297
#define RINFO(fmt,...)
Definition: log.h:296
#define RATE_LIMIT_GLOBAL_ROPTIONAL(_l_request, _l_global, _fmt,...)
Rate limit messages using a global limiting entry.
Definition: log.h:663
talloc_free(reap)
static bool fr_rate_limit_enabled(void)
Whether rate limiting is enabled.
Definition: log.h:148
main_config_t const * main_config
Main server configuration.
Definition: main_config.c:69
uint32_t max_workers
for the scheduler
Definition: main_config.h:151
unsigned int uint32_t
Definition: merged_model.c:33
int fr_pair_list_copy(TALLOC_CTX *ctx, fr_pair_list_t *to, fr_pair_list_t const *from)
Duplicate a list of pairs.
Definition: pair.c:2316
void fr_pair_list_init(fr_pair_list_t *list)
Initialise a pair list header.
Definition: pair.c:46
fr_pool_reconnect_t reconnect
Called during connection pool reconnect.
Definition: pool.c:146
fr_time_delta_t fr_pool_timeout(fr_pool_t *pool)
Connection pool get timeout.
Definition: pool.c:1181
static void connection_link_head(fr_pool_t *pool, fr_pool_connection_t *this)
Adds a connection to the head of the connection list.
Definition: pool.c:232
int fr_pool_start(fr_pool_t *pool)
Definition: pool.c:1114
bool spread
If true we spread requests over the connections, using the connection released longest ago,...
Definition: pool.c:110
uint32_t max_pending
Max number of pending connections to allow.
Definition: pool.c:91
fr_time_delta_t delay_interval
When we next do a cleanup.
Definition: pool.c:100
static int8_t last_released_cmp(void const *one, void const *two)
Order connections by released longest ago.
Definition: pool.c:191
static void * connection_get_internal(fr_pool_t *pool, request_t *request, bool spawn)
Get a connection from the connection pool.
Definition: pool.c:834
static fr_pool_connection_t * connection_find(fr_pool_t *pool, void *conn)
Find a connection handle in the connection list.
Definition: pool.c:287
fr_time_t last_reserved
Last time the connection was reserved.
Definition: pool.c:57
fr_heap_t * heap
For the next connection heap.
Definition: pool.c:113
fr_time_delta_t held_trigger_min
If a connection is held for less than the specified period, fire a trigger.
Definition: pool.c:138
fr_pool_connection_create_t create
Function used to create new connections.
Definition: pool.c:143
uint32_t start
Number of initial connections.
Definition: pool.c:88
fr_time_delta_t idle_timeout
How long a connection can be idle before being closed.
Definition: pool.c:105
static void connection_unlink(fr_pool_t *pool, fr_pool_connection_t *this)
Removes a connection from the connection list.
Definition: pool.c:205
static int connection_check(fr_pool_t *pool, request_t *request)
Check whether any connections need to be removed from the pool.
Definition: pool.c:655
void const * fr_pool_opaque(fr_pool_t *pool)
Return the opaque data associated with a connection pool.
Definition: pool.c:1201
void fr_pool_connection_release(fr_pool_t *pool, request_t *request, void *conn)
Release a connection.
Definition: pool.c:1405
uint32_t min
Minimum number of concurrent connections to keep open.
Definition: pool.c:89
fr_heap_index_t heap_id
For the next connection heap.
Definition: pool.c:54
static fr_pool_connection_t * connection_spawn(fr_pool_t *pool, request_t *request, fr_time_t now, bool in_use, bool unlock)
Spawns a new connection.
Definition: pool.c:335
uint64_t number
Unique ID assigned when the connection is created, these will monotonically increase over the lifetim...
Definition: pool.c:62
CONF_SECTION const * cs
Configuration section holding the section of parsed config file that relates to this pool.
Definition: pool.c:125
void fr_pool_free(fr_pool_t *pool)
Delete a connection pool.
Definition: pool.c:1327
static int max_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
void fr_pool_ref(fr_pool_t *pool)
Increment pool reference by one.
Definition: pool.c:1210
uint32_t pending_window
Sliding window of pending connections.
Definition: pool.c:95
static void connection_close_internal(fr_pool_t *pool, fr_pool_connection_t *this)
Close an existing connection.
Definition: pool.c:547
char const * trigger_prefix
Prefix to prepend to names of all triggers fired by the connection pool code.
Definition: pool.c:134
fr_time_delta_t cleanup_interval
Initial timer for how often we sweep the pool for free connections.
Definition: pool.c:98
void * connection
Pointer to whatever the module uses for a connection handle.
Definition: pool.c:65
fr_pool_state_t const * fr_pool_state(fr_pool_t *pool)
Get the number of connections currently in the pool.
Definition: pool.c:1171
int fr_pool_reconnect(fr_pool_t *pool, request_t *request)
Mark connections for reconnection, and spawn at least 'start' connections.
Definition: pool.c:1242
fr_pool_connection_t * tail
End of the connection list.
Definition: pool.c:116
bool needs_reconnecting
Reconnect this connection before use.
Definition: pool.c:69
static int connection_manage(fr_pool_t *pool, request_t *request, fr_pool_connection_t *this, fr_time_t now)
Check whether a connection needs to be removed from the pool.
Definition: pool.c:595
fr_time_delta_t held_trigger_max
If a connection is held for longer than the specified period, fire a trigger.
Definition: pool.c:140
fr_pool_connection_alive_t alive
Function used to check status of connections.
Definition: pool.c:144
void * fr_pool_connection_get(fr_pool_t *pool, request_t *request)
Reserve a connection in the connection pool.
Definition: pool.c:1390
uint32_t num_uses
Number of times the connection has been reserved.
Definition: pool.c:61
bool triggers_enabled
Whether we call the trigger functions.
Definition: pool.c:132
fr_time_delta_t connect_timeout
New connection timeout, enforced by the create callback.
Definition: pool.c:107
int fr_pool_connection_close(fr_pool_t *pool, request_t *request, void *conn)
Delete a connection from the connection pool.
Definition: pool.c:1535
uint32_t spare
Number of spare connections to try.
Definition: pool.c:92
static void fr_pool_trigger_exec(fr_pool_t *pool, char const *event)
Send a connection pool trigger.
Definition: pool.c:259
uint64_t max_uses
Maximum number of times a connection can be used before being closed.
Definition: pool.c:93
pthread_cond_t done_reconnecting
Before calling the create callback, threads should block on this condition if reconnecting == true.
Definition: pool.c:122
fr_time_delta_t lifetime
How long a connection can be open before being closed (irrespective of whether it's idle or not).
Definition: pool.c:103
pthread_mutex_t mutex
Mutex used to keep consistent state when making modifications in threaded mode.
Definition: pool.c:118
uint32_t max
Maximum number of concurrent connections to allow.
Definition: pool.c:90
fr_pool_connection_t * head
Start of the connection list.
Definition: pool.c:115
int ref
Reference counter to prevent connection pool being freed multiple times.
Definition: pool.c:86
int fr_pool_start_num(fr_pool_t *pool)
Connection pool get start.
Definition: pool.c:1191
fr_pool_state_t state
Stats and state of the connection pool.
Definition: pool.c:148
void fr_pool_enable_triggers(fr_pool_t *pool, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Enable triggers for a connection pool.
Definition: pool.c:931
fr_pair_list_t trigger_args
Arguments to make available in connection pool triggers.
Definition: pool.c:136
fr_time_t created
Time connection was created.
Definition: pool.c:56
bool in_use
Whether the connection is currently reserved.
Definition: pool.c:67
fr_time_delta_t retry_delay
seconds to delay re-open after a failed open.
Definition: pool.c:97
static int8_t last_reserved_cmp(void const *one, void const *two)
Order connections by reserved most recently.
Definition: pool.c:182
fr_pool_t * fr_pool_init(TALLOC_CTX *ctx, CONF_SECTION const *cs, void *opaque, fr_pool_connection_create_t c, fr_pool_connection_alive_t a, char const *log_prefix)
Create a new connection pool.
Definition: pool.c:965
fr_pool_t * fr_pool_copy(TALLOC_CTX *ctx, fr_pool_t *pool, void *opaque)
Allocate a new pool using an existing one as a template.
Definition: pool.c:1154
pthread_cond_t done_spawn
Threads that need to ensure no spawning is in progress, should block on this condition if pending !...
Definition: pool.c:120
fr_pool_connection_t * prev
Previous connection in list.
Definition: pool.c:52
void fr_pool_reconnect_func(fr_pool_t *pool, fr_pool_reconnect_t reconnect)
Set a reconnection callback for the connection pool.
Definition: pool.c:1222
static const conf_parser_t pool_config[]
Definition: pool.c:151
fr_pool_connection_t * next
Next connection in list.
Definition: pool.c:53
void * opaque
Pointer to context data that will be passed to callbacks.
Definition: pool.c:127
fr_time_t last_released
Time the connection was released.
Definition: pool.c:59
void * fr_pool_connection_reconnect(fr_pool_t *pool, request_t *request, void *conn)
Reconnect a suspected inviable connection.
Definition: pool.c:1498
char const * log_prefix
Log prefix to prepend to all log messages created by the connection pool code.
Definition: pool.c:129
An individual connection within the connection pool.
Definition: pool.c:51
A connection pool.
Definition: pool.c:85
fr_time_t last_failed
Last time we tried to spawn a connection but failed.
Definition: pool.h:50
fr_time_t last_closed
Last time a connection was closed.
Definition: pool.h:57
fr_time_t last_throttled
Last time we refused to spawn a connection because the last connection failed, or we were already spa...
Definition: pool.h:51
uint32_t active
Number of currently reserved connections.
Definition: pool.h:68
fr_time_t last_released
Last time a connection was released.
Definition: pool.h:56
fr_time_delta_t next_delay
The next delay time.
Definition: pool.h:62
fr_time_t last_checked
Last time we pruned the connection pool.
Definition: pool.h:48
void(* fr_pool_reconnect_t)(fr_pool_t *pool, void *opaque)
Alter the opaque data of a connection pool during reconnection event.
Definition: pool.h:85
uint64_t count
Number of connections spawned over the lifetime of the pool.
Definition: pool.h:65
fr_time_t last_held_min
Last time we warned about a low latency event.
Definition: pool.h:59
fr_time_t last_at_max
Last time we hit the maximum number of allowed connections.
Definition: pool.h:54
void *(* fr_pool_connection_create_t)(TALLOC_CTX *ctx, void *opaque, fr_time_delta_t timeout)
Create a new connection handle.
Definition: pool.h:111
uint32_t pending
Number of pending open connections.
Definition: pool.h:47
fr_time_t last_spawned
Last time we spawned a connection.
Definition: pool.h:49
fr_time_t last_held_max
Last time we warned about a high latency event.
Definition: pool.h:60
bool reconnecting
We are currently reconnecting the pool.
Definition: pool.h:70
uint32_t num
Number of connections in the pool.
Definition: pool.h:67
int(* fr_pool_connection_alive_t)(void *opaque, void *connection)
Check a connection handle is still viable.
Definition: pool.h:126
static bool done
Definition: radclient.c:80
#define RDEBUG2(fmt,...)
Definition: radclient.h:54
#define DEBUG2(fmt,...)
Definition: radclient.h:43
#define WARN(fmt,...)
Definition: radclient.h:47
#define INFO(fmt,...)
Definition: radict.c:54
static char const * name
static size_t min(size_t x, size_t y)
Definition: sbuff.c:135
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
Definition: snprintf.c:689
if(!subtype_vp) goto fail
fr_assert(0)
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
char * talloc_typed_strdup(TALLOC_CTX *ctx, char const *p)
Call talloc_strdup, setting the type on the new chunk correctly.
Definition: talloc.c:333
int talloc_link_ctx(TALLOC_CTX *parent, TALLOC_CTX *child)
Link two different parent and child contexts, so the child is freed before the parent.
Definition: talloc.c:167
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition: talloc.h:212
Simple time functions.
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition: time.h:573
static fr_time_delta_t fr_time_delta_add(fr_time_delta_t a, fr_time_delta_t b)
Definition: time.h:255
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
Definition: time.h:154
#define fr_time_delta_lt(_a, _b)
Definition: time.h:283
static fr_time_delta_t fr_time_delta_from_sec(int64_t sec)
Definition: time.h:588
#define fr_time_delta_wrap(_time)
Definition: time.h:152
#define fr_time_wrap(_time)
Definition: time.h:145
#define fr_time_delta_ispos(_a)
Definition: time.h:288
#define fr_time_delta_lteq(_a, _b)
Definition: time.h:284
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition: time.h:196
#define fr_time_gt(_a, _b)
Definition: time.h:237
#define fr_time_delta_gteq(_a, _b)
Definition: time.h:282
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition: time.h:229
#define fr_time_lt(_a, _b)
Definition: time.h:239
#define fr_time_delta_gt(_a, _b)
Definition: time.h:281
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition: time.h:914
A time delta, a difference in time measured in nanoseconds.
Definition: time.h:80
"server local" time.
Definition: time.h:69
enum fr_token fr_token_t
@ T_BARE_WORD
Definition: token.h:120
@ T_OP_EQ
Definition: token.h:83
int trigger_exec(unlang_interpret_t *intp, CONF_SECTION const *cs, char const *name, bool rate_limit, fr_pair_list_t *args)
Execute a trigger - call an executable to process an event.
Definition: trigger.c:280
void fr_pair_list_free(fr_pair_list_t *list)
Free memory used by a valuepair list.
Definition: pair_inline.c:113
static fr_slen_t parent
Definition: pair.h:844
#define fr_box_time_delta(_val)
Definition: value.h:336
static size_t char ** out
Definition: value.h:984