The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
pool.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * @file pool.c
19 * @brief Handle pools of connections (threads, sockets, etc.)
20 * @note This API must be used by all modules in the public distribution that
21 * maintain pools of connections.
22 *
23 * @copyright 2012 The FreeRADIUS server project
24 * @copyright 2012 Alan DeKok (aland@deployingradius.com)
25 */
26RCSID("$Id: 185e43b6f392295097c6327b91955835522520ad $")
27
28#define LOG_PREFIX pool->log_prefix
29
30#include <freeradius-devel/server/main_config.h>
31#include <freeradius-devel/server/modpriv.h>
32#include <freeradius-devel/server/trigger.h>
33
34#include <freeradius-devel/util/debug.h>
35
36#include <freeradius-devel/util/misc.h>
37
38
40
41static int connection_check(fr_pool_t *pool, request_t *request);
42static int max_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule);
43
44/** An individual connection within the connection pool
45 *
46 * Defines connection counters, timestamps, and holds a pointer to the
47 * connection handle itself.
48 *
49 * @see fr_pool_t
50 */
52 fr_pool_connection_t *prev; //!< Previous connection in list.
53 fr_pool_connection_t *next; //!< Next connection in list.
54 fr_heap_index_t heap_id; //!< For the next connection heap.
55
56 fr_time_t created; //!< Time connection was created.
57 fr_time_t last_reserved; //!< Last time the connection was reserved.
58
59 fr_time_t last_released; //!< Time the connection was released.
60
61 uint32_t num_uses; //!< Number of times the connection has been reserved.
62 uint64_t number; //!< Unique ID assigned when the connection is created,
63 //!< these will monotonically increase over the
64 //!< lifetime of the connection pool.
65 void *connection; //!< Pointer to whatever the module uses for a connection
66 //!< handle.
67 bool in_use; //!< Whether the connection is currently reserved.
68
69 bool needs_reconnecting; //!< Reconnect this connection before use.
70
71#ifdef PTHREAD_DEBUG
72 pthread_t pthread_id; //!< When 'in_use == true'.
73#endif
74};
75
76/** A connection pool
77 *
78 * Defines the configuration of the connection pool, all the counters and
79 * timestamps related to the connection pool, the mutex that stops multiple
80 * threads leaving the pool in an inconsistent state, and the callbacks
81 * required to open, close and check the status of connections within the pool.
82 *
83 * @see connection
84 */
85struct fr_pool_s {
86 int ref; //!< Reference counter to prevent connection
87 //!< pool being freed multiple times.
88 uint32_t start; //!< Number of initial connections.
89 uint32_t min; //!< Minimum number of concurrent connections to keep open.
90 uint32_t max; //!< Maximum number of concurrent connections to allow.
91 uint32_t max_pending; //!< Max number of pending connections to allow.
92 uint32_t spare; //!< Number of spare connections to try.
93 uint64_t max_uses; //!< Maximum number of times a connection can be used
94 //!< before being closed.
95 uint32_t pending_window; //!< Sliding window of pending connections.
96
97 fr_time_delta_t retry_delay; //!< seconds to delay re-open after a failed open.
98 fr_time_delta_t cleanup_interval; //!< Initial timer for how often we sweep the pool
99 //!< for free connections. (0 is infinite).
100 fr_time_delta_t delay_interval; //!< When we next do a cleanup. Initialized to
101 //!< cleanup_interval, and increase from there based
102 //!< on the delay.
103 fr_time_delta_t lifetime; //!< How long a connection can be open before being
104 //!< closed (irrespective of whether it's idle or not).
105 fr_time_delta_t idle_timeout; //!< How long a connection can be idle before
106 //!< being closed.
107 fr_time_delta_t connect_timeout; //!< New connection timeout, enforced by the create
108 //!< callback.
109
110 bool spread; //!< If true we spread requests over the connections,
111 //!< using the connection released longest ago, first.
112
113 fr_heap_t *heap; //!< For the next connection heap
114
115 fr_pool_connection_t *head; //!< Start of the connection list.
116 fr_pool_connection_t *tail; //!< End of the connection list.
117
118 pthread_mutex_t mutex; //!< Mutex used to keep consistent state when making
119 //!< modifications in threaded mode.
120 pthread_cond_t done_spawn; //!< Threads that need to ensure no spawning is in progress,
121 //!< should block on this condition if pending != 0.
122 pthread_cond_t done_reconnecting; //!< Before calling the create callback, threads should
123 //!< block on this condition if reconnecting == true.
124
125 CONF_SECTION const *cs; //!< Configuration section holding the section of parsed
126 //!< config file that relates to this pool.
127 void *opaque; //!< Pointer to context data that will be passed to callbacks.
128
129 char const *log_prefix; //!< Log prefix to prepend to all log messages created
130 //!< by the connection pool code.
131
132 bool triggers_enabled; //!< Whether we call the trigger functions.
133
134 char const *trigger_prefix; //!< Prefix to prepend to names of all triggers
135 //!< fired by the connection pool code.
136 fr_pair_list_t trigger_args; //!< Arguments to make available in connection pool triggers.
137
138 fr_time_delta_t held_trigger_min; //!< If a connection is held for less than the specified
139 //!< period, fire a trigger.
140 fr_time_delta_t held_trigger_max; //!< If a connection is held for longer than the specified
141 //!< period, fire a trigger.
142
143 fr_pool_connection_create_t create; //!< Function used to create new connections.
144 fr_pool_connection_alive_t alive; //!< Function used to check status of connections.
145
146 fr_pool_reconnect_t reconnect; //!< Called during connection pool reconnect.
147
148 fr_pool_state_t state; //!< Stats and state of the connection pool.
149};
150
151static const conf_parser_t pool_config[] = {
152 { FR_CONF_OFFSET("start", fr_pool_t, start), .dflt = "0" },
153 { FR_CONF_OFFSET("min", fr_pool_t, min), .dflt = "0" },
154 { FR_CONF_OFFSET("max", fr_pool_t, max), .dflt_func = max_dflt },
155 { FR_CONF_OFFSET("max_pending", fr_pool_t, max_pending), .dflt = "0" },
156 { FR_CONF_OFFSET("spare", fr_pool_t, spare), .dflt = "3" },
157 { FR_CONF_OFFSET("uses", fr_pool_t, max_uses), .dflt = "0" },
158 { FR_CONF_OFFSET("lifetime", fr_pool_t, lifetime), .dflt = "0" },
159 { FR_CONF_OFFSET("cleanup_interval", fr_pool_t, cleanup_interval), .dflt = "30" },
160 { FR_CONF_OFFSET("idle_timeout", fr_pool_t, idle_timeout), .dflt = "60" },
161 { FR_CONF_OFFSET("connect_timeout", fr_pool_t, connect_timeout), .dflt = "3.0" },
162 { FR_CONF_OFFSET("held_trigger_min", fr_pool_t, held_trigger_min), .dflt = "0.0" },
163 { FR_CONF_OFFSET("held_trigger_max", fr_pool_t, held_trigger_max), .dflt = "0.5" },
164 { FR_CONF_OFFSET("retry_delay", fr_pool_t, retry_delay), .dflt = "1" },
165 { FR_CONF_OFFSET("spread", fr_pool_t, spread), .dflt = "no" },
167};
168
169static int max_dflt(CONF_PAIR **out, UNUSED void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
170{
171 char *strvalue;
172
173 strvalue = talloc_asprintf(NULL, "%u", main_config->max_workers);
174 *out = cf_pair_alloc(cs, rule->name1, strvalue, T_OP_EQ, T_BARE_WORD, quote);
175 talloc_free(strvalue);
176
177 return 0;
178}
179
180/** Order connections by reserved most recently
181 */
182static int8_t last_reserved_cmp(void const *one, void const *two)
183{
184 fr_pool_connection_t const *a = one, *b = two;
185
186 return fr_time_cmp(a->last_reserved, b->last_reserved);
187}
188
189/** Order connections by released longest ago
190 */
191static int8_t last_released_cmp(void const *one, void const *two)
192{
193 fr_pool_connection_t const *a = one, *b = two;
194
195 return fr_time_cmp(a->last_released, b->last_released);
196}
197
198/** Removes a connection from the connection list
199 *
200 * @note Must be called with the mutex held.
201 *
202 * @param[in] pool to modify.
203 * @param[in] this Connection to delete.
204 */
206{
207 if (this->prev) {
208 fr_assert(pool->head != this);
209 this->prev->next = this->next;
210 } else {
211 fr_assert(pool->head == this);
212 pool->head = this->next;
213 }
214 if (this->next) {
215 fr_assert(pool->tail != this);
216 this->next->prev = this->prev;
217 } else {
218 fr_assert(pool->tail == this);
219 pool->tail = this->prev;
220 }
221
222 this->prev = this->next = NULL;
223}
224
225/** Adds a connection to the head of the connection list
226 *
227 * @note Must be called with the mutex held.
228 *
229 * @param[in] pool to modify.
230 * @param[in] this Connection to add.
231 */
233{
234 fr_assert(pool != NULL);
235 fr_assert(this != NULL);
236 fr_assert(pool->head != this);
237 fr_assert(pool->tail != this);
238
239 if (pool->head) {
240 pool->head->prev = this;
241 }
242
243 this->next = pool->head;
244 this->prev = NULL;
245 pool->head = this;
246 if (!pool->tail) {
247 fr_assert(this->next == NULL);
248 pool->tail = this;
249 } else {
250 fr_assert(this->next != NULL);
251 }
252}
253
254/** Send a connection pool trigger.
255 *
256 * @param[in] pool to send trigger for.
257 * @param[in] event trigger name suffix.
258 */
259static inline void fr_pool_trigger(fr_pool_t *pool, char const *event)
260{
261 char name[128];
262
263 fr_assert(pool != NULL);
264 fr_assert(event != NULL);
265
266 if (!pool->triggers_enabled) return;
267
268 snprintf(name, sizeof(name), "%s.%s", pool->trigger_prefix, event);
269 trigger(unlang_interpret_get_thread_default(), pool->cs, NULL, name, true, &pool->trigger_args);
270}
271
272/** Find a connection handle in the connection list
273 *
274 * Walks over the list of connections searching for a specified connection
275 * handle and returns the first connection that contains that pointer.
276 *
277 * @note Will lock mutex and only release mutex if connection handle
278 * is not found, so will usually return will mutex held.
279 * @note Must be called with the mutex free.
280 *
281 * @param[in] pool to search in.
282 * @param[in] conn handle to search for.
283 * @return
284 * - Connection containing the specified handle.
285 * - NULL if no such connection was found.
286 */
288{
290
291 if (!pool || !conn) return NULL;
292
293 pthread_mutex_lock(&pool->mutex);
294
295 /*
296 * FIXME: This loop could be avoided if we passed a 'void
297 * **connection' instead. We could use "offsetof" in
298 * order to find top of the parent structure.
299 */
300 for (this = pool->head; this != NULL; this = this->next) {
301 if (this->connection == conn) {
302#ifdef PTHREAD_DEBUG
303 pthread_t pthread_id;
304
305 pthread_id = pthread_self();
306 fr_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
307#endif
308
309 fr_assert(this->in_use == true);
310 return this;
311 }
312 }
313
314 pthread_mutex_unlock(&pool->mutex);
315 return NULL;
316}
317
318/** Spawns a new connection
319 *
320 * Spawns a new connection using the create callback, and returns it for
321 * adding to the connection list.
322 *
323 * @note Will call the 'open' trigger.
324 * @note Must be called with the mutex free.
325 *
326 * @param[in] pool to modify.
327 * @param[in] request The current request.
328 * @param[in] now Current time.
329 * @param[in] in_use whether the new connection should be "in_use" or not
330 * @param[in] unlock whether we should unlock the mutex before returning
331 * @return
332 * - New connection struct.
333 * - NULL on error.
334 */
335static fr_pool_connection_t *connection_spawn(fr_pool_t *pool, request_t *request, fr_time_t now, bool in_use, bool unlock)
336{
337 uint64_t number;
338 uint32_t pending_window;
339 TALLOC_CTX *ctx;
340
342 void *conn;
343
344 fr_assert(pool != NULL);
345
346 /*
347 * If we have NO connections, and we've previously failed
348 * opening connections, don't open multiple connections until
349 * we successfully open at least one.
350 */
351 if ((pool->state.num == 0) &&
352 pool->state.pending &&
353 fr_time_gt(pool->state.last_failed, fr_time_wrap(0))) return NULL;
354
355 pthread_mutex_lock(&pool->mutex);
356 fr_assert(pool->state.num <= pool->max);
357
358 /*
359 * Don't spawn too many connections at the same time.
360 */
361 if ((pool->state.num + pool->state.pending) >= pool->max) {
362 pthread_mutex_unlock(&pool->mutex);
363
364 ERROR("Cannot open new connection, already at max");
365 return NULL;
366 }
367
368 /*
369 * If the last attempt failed, wait a bit before
370 * retrying.
371 */
372 if (fr_time_gt(pool->state.last_failed, fr_time_wrap(0)) &&
373 fr_time_gt(fr_time_add(pool->state.last_failed, pool->retry_delay), now)) {
374 bool complain = false;
375
377 complain = true;
378
379 pool->state.last_throttled = now;
380 }
381
382 pthread_mutex_unlock(&pool->mutex);
383
384 if (!fr_rate_limit_enabled() || complain) {
385 ERROR("Last connection attempt failed, waiting %pV seconds before retrying",
387 }
388
389 return NULL;
390 }
391
392 /*
393 * We limit the rate of new connections after a failed attempt.
394 */
395 if (pool->state.pending > pool->pending_window) {
396 pthread_mutex_unlock(&pool->mutex);
397
399 "Cannot open a new connection due to rate limit after failure");
400
401 return NULL;
402 }
403
404 pool->state.pending++;
405 number = pool->state.count++;
406
407 /*
408 * Don't starve out the thread trying to reconnect
409 * the pool, by continuously opening new connections.
410 */
411 while (pool->state.reconnecting) pthread_cond_wait(&pool->done_reconnecting, &pool->mutex);
412
413 /*
414 * The true value for pending_window is the smaller of
415 * free connection slots, or pool->pending_window.
416 */
417 pending_window = (pool->max - pool->state.num);
418 if (pool->pending_window < pending_window) pending_window = pool->pending_window;
419 ROPTIONAL(RDEBUG2, DEBUG2, "Opening additional connection (%" PRIu64 "), %u of %u pending slots used",
420 number, pool->state.pending, pending_window);
421
422 /*
423 * Unlock the mutex while we try to open a new
424 * connection. If there are issues with the back-end,
425 * opening a new connection may take a LONG time. In
426 * that case, we want the other connections to continue
427 * to be used.
428 */
429 pthread_mutex_unlock(&pool->mutex);
430
431 /*
432 * Allocate a new top level ctx for the create callback
433 * to hang its memory off of.
434 */
435 MEM(ctx = talloc_init("connection_ctx"));
436
437 /*
438 * This may take a long time, which prevents other
439 * threads from releasing connections. We don't care
440 * about other threads opening new connections, as we
441 * already have no free connections.
442 */
443 conn = pool->create(ctx, pool->opaque, pool->connect_timeout);
444 if (!conn) {
445 ERROR("Opening connection failed (%" PRIu64 ")", number);
446
447 pthread_mutex_lock(&pool->mutex);
448 pool->state.last_failed = now;
449 pool->pending_window = 1;
450 pool->state.pending--;
451
452 /*
453 * Must be done inside the mutex, reconnect callback
454 * may modify args.
455 */
456 fr_pool_trigger(pool, "fail");
457 pthread_cond_broadcast(&pool->done_spawn);
458 pthread_mutex_unlock(&pool->mutex);
459
460 talloc_free(ctx);
461
462 return NULL;
463 }
464
465 /*
466 * And lock the mutex again while we link the new
467 * connection back into the pool.
468 */
469 pthread_mutex_lock(&pool->mutex);
470
471 this = talloc_zero(pool, fr_pool_connection_t);
472 if (!this) {
473 pool->state.last_failed = now;
474 pool->state.pending--;
475
476 pthread_cond_broadcast(&pool->done_spawn);
477 pthread_mutex_unlock(&pool->mutex);
478
479 ERROR("Memory allocation failed for new connection (%" PRIu64 ")", number);
480
481 talloc_free(ctx);
482
483 return NULL;
484 }
485 MEM(talloc_link_ctx(this, ctx) >= 0);
486
487 this->created = now;
488 this->connection = conn;
489 this->in_use = in_use;
490
491 this->number = number;
492 this->last_reserved = fr_time();
493 this->last_released = this->last_reserved;
494
495 /*
496 * The connection pool is starting up. Insert the
497 * connection into the heap.
498 */
499 if (!in_use) fr_heap_insert(&pool->heap, this);
500
501 connection_link_head(pool, this);
502
503 /*
504 * Do NOT insert the connection into the heap. That's
505 * done when the connection is released.
506 */
507
508 pool->state.num++;
509
510 fr_assert(pool->state.pending > 0);
511 pool->state.pending--;
512
513 /*
514 * We've successfully opened one more connection. Allow
515 * more connections to open in parallel.
516 */
517 if ((pool->pending_window < pool->max) &&
518 ((pool->max_pending == 0) || (pool->pending_window < pool->max_pending))) {
519 pool->pending_window++;
520 }
521
522 pool->state.last_spawned = fr_time();
523 pool->delay_interval = pool->cleanup_interval;
524 pool->state.next_delay = pool->cleanup_interval;
525 pool->state.last_failed = fr_time_wrap(0);
526
527 /*
528 * Must be done inside the mutex, reconnect callback
529 * may modify args.
530 */
531 fr_pool_trigger(pool, "open");
532
533 pthread_cond_broadcast(&pool->done_spawn);
534 if (unlock) pthread_mutex_unlock(&pool->mutex);
535
536 /* coverity[missing_unlock] */
537 return this;
538}
539
540/** Close an existing connection.
541 *
542 * Removes the connection from the list, calls the delete callback to close
543 * the connection, then frees memory allocated to the connection.
544 *
545 * @note Will call the 'close' trigger.
546 * @note Must be called with the mutex held.
547 *
548 * @param[in] pool to modify.
549 * @param[in] this Connection to delete.
550 */
552{
553 /*
554 * If it's in use, release it.
555 */
556 if (this->in_use) {
557#ifdef PTHREAD_DEBUG
558 pthread_t pthread_id = pthread_self();
559 fr_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
560#endif
561
562 this->in_use = false;
563
564 fr_assert(pool->state.active != 0);
565 pool->state.active--;
566
567 } else {
568 /*
569 * Connection isn't used, remove it from the heap.
570 */
571 fr_heap_extract(&pool->heap, this);
572 }
573
574 fr_pool_trigger(pool, "close");
575
576 connection_unlink(pool, this);
577
578 fr_assert(pool->state.num > 0);
579 pool->state.num--;
580 talloc_free(this);
581}
582
583/** Check whether a connection needs to be removed from the pool
584 *
585 * Will verify that the connection is within idle_timeout, max_uses, and
586 * lifetime values. If it is not, the connection will be closed.
587 *
588 * @note Will only close connections not in use.
589 * @note Must be called with the mutex held.
590 *
591 * @param[in] pool to modify.
592 * @param[in] request The current request.
593 * @param[in] this Connection to manage.
594 * @param[in] now Current time.
595 * @return
596 * - 0 if connection was closed.
597 * - 1 if connection handle was left open.
598 */
600{
601 fr_assert(pool != NULL);
602 fr_assert(this != NULL);
603
604 /*
605 * Don't terminated in-use connections
606 */
607 if (this->in_use) return 1;
608
609 if (this->needs_reconnecting) {
610 ROPTIONAL(RDEBUG2, DEBUG2, "Closing expired connection (%" PRIu64 "): Needs reconnecting",
611 this->number);
612 do_delete:
613 if (pool->state.num <= pool->min) {
614 ROPTIONAL(RDEBUG2, DEBUG2, "You probably need to lower \"min\"");
615 }
616 connection_close_internal(pool, this);
617 return 0;
618 }
619
620 if ((pool->max_uses > 0) &&
621 (this->num_uses >= pool->max_uses)) {
622 ROPTIONAL(RDEBUG2, DEBUG2, "Closing expired connection (%" PRIu64 "): Hit max_uses limit",
623 this->number);
624 goto do_delete;
625 }
626
627 if (fr_time_delta_ispos(pool->lifetime) &&
628 (fr_time_lt(fr_time_add(this->created, pool->lifetime), now))) {
629 ROPTIONAL(RDEBUG2, DEBUG2, "Closing expired connection (%" PRIu64 "): Hit lifetime limit",
630 this->number);
631 goto do_delete;
632 }
633
635 (fr_time_lt(fr_time_add(this->last_released, pool->idle_timeout), now))) {
636 ROPTIONAL(RINFO, INFO, "Closing connection (%" PRIu64 "): Hit idle_timeout, was idle for %pVs",
637 this->number, fr_box_time_delta(fr_time_sub(now, this->last_released)));
638 goto do_delete;
639 }
640
641 return 1;
642}
643
644
645/** Check whether any connections need to be removed from the pool
646 *
647 * Maintains the number of connections in the pool as per the configuration
648 * parameters for the connection pool.
649 *
650 * @note Will only run checks the first time it's called in a given second,
651 * to throttle connection spawning/closing.
652 * @note Will only close connections not in use.
653 * @note Must be called with the mutex held, will release mutex before returning.
654 *
655 * @param[in] pool to manage.
656 * @param[in] request The current request.
657 * @return 1
658 */
659static int connection_check(fr_pool_t *pool, request_t *request)
660{
661 uint32_t num, spare;
662 fr_time_t now = fr_time();
663 fr_pool_connection_t *this, *next;
664
666 pthread_mutex_unlock(&pool->mutex);
667 return 1;
668 }
669
670 /*
671 * Get "real" number of connections, and count pending
672 * connections as spare.
673 */
674 num = pool->state.num + pool->state.pending;
675 spare = pool->state.pending + (pool->state.num - pool->state.active);
676
677 /*
678 * The other end can close connections. If so, we'll
679 * have fewer than "min". When that happens, open more
680 * connections to enforce "min".
681 *
682 * The code for spawning connections enforces that
683 * num + pending <= max.
684 */
685 if (num < pool->min) {
686 ROPTIONAL(RINFO, INFO, "Need %i more connections to reach min connections (%i)", pool->min - num, pool->min);
687 goto add_connection;
688 }
689
690 /*
691 * On the odd chance that we've opened too many
692 * connections, take care of that.
693 */
694 if (num > pool->max) {
695 /*
696 * Pending connections don't get closed as "spare".
697 */
698 if (pool->state.pending > 0) goto manage_connections;
699
700 /*
701 * Otherwise close one of the connections to
702 * bring us down to "max".
703 */
704 goto close_connection;
705 }
706
707 /*
708 * Now that we've enforced min/max connections, try to
709 * keep the "spare" connections at the correct number.
710 */
711
712 /*
713 * Nothing to do? Go check all of the connections for
714 * timeouts, etc.
715 */
716 if (spare == pool->spare) goto manage_connections;
717
718 /*
719 * Too many spare connections, delete some.
720 */
721 if (spare > pool->spare) {
723
724 /*
725 * Pending connections don't get closed as "spare".
726 */
727 if (pool->state.pending > 0) goto manage_connections;
728
729 /*
730 * Don't close too many connections, even they
731 * are spare.
732 */
733 if (num <= pool->min) goto manage_connections;
734
735 /*
736 * Too many spares, go close one.
737 */
738
739 close_connection:
740 /*
741 * Don't close connections too often, in order to
742 * prevent flapping. Coverity doesn't notice that
743 * all callers have the lock, so we annotate the issue.
744 */
745 /* coverity[missing_lock] */
746 if (fr_time_lt(now, fr_time_add(pool->state.last_spawned, pool->delay_interval))) goto manage_connections;
747
748 /*
749 * Find a connection to close.
750 */
751 found = NULL;
752 for (this = pool->tail; this != NULL; this = this->prev) {
753 if (this->in_use) continue;
754
755 if (!found || (fr_time_lt(this->last_reserved, found->last_reserved))) found = this;
756 }
757
758 if (!fr_cond_assert(found)) goto done;
759
760 ROPTIONAL(RDEBUG2, DEBUG2, "Closing connection (%" PRIu64 ") as we have too many unused connections",
761 found->number);
762 connection_close_internal(pool, found);
763
764 /*
765 * Decrease the delay for the next time we clean
766 * up.
767 */
771
772 goto manage_connections;
773 }
774
775 /*
776 * Too few connections, open some more.
777 */
778 if (spare < pool->spare) {
779 /*
780 * Don't open too many pending connections.
781 * Again, coverity doesn't realize all callers have the lock,
782 * so we must annotate here as well.
783 */
784 /* coverity[missing_lock] */
785 if (pool->state.pending >= pool->pending_window) goto manage_connections;
786
787 /*
788 * Don't open too many connections, even if we
789 * need more spares.
790 */
791 if (num >= pool->max) goto manage_connections;
792
793 /*
794 * Too few spares, go add one.
795 */
796 ROPTIONAL(RINFO, INFO, "Need %i more connections to reach %i spares", pool->spare - spare, pool->spare);
797
798 add_connection:
799 /*
800 * Only try to open spares if we're not already attempting to open
801 * a connection. Avoids spurious log messages.
802 */
803 pthread_mutex_unlock(&pool->mutex);
804 (void) connection_spawn(pool, request, now, false, true);
805 pthread_mutex_lock(&pool->mutex);
806 goto manage_connections;
807 }
808
809 /*
810 * Pass over all of the connections in the pool, limiting
811 * lifetime, idle time, max requests, etc.
812 */
813manage_connections:
814 for (this = pool->head; this != NULL; this = next) {
815 next = this->next;
816 connection_manage(pool, request, this, now);
817 }
818
819 pool->state.last_checked = now;
820
821done:
822 pthread_mutex_unlock(&pool->mutex);
823
824 return 1;
825}
826
827/** Get a connection from the connection pool
828 *
829 * @note Must be called with the mutex free.
830 *
831 * @param[in] pool to reserve the connection from.
832 * @param[in] request The current request.
833 * @param[in] spawn whether to spawn a new connection
834 * @return
835 * - A pointer to the connection handle.
836 * - NULL on error.
837 */
838static void *connection_get_internal(fr_pool_t *pool, request_t *request, bool spawn)
839{
840 fr_time_t now;
842
843 if (!pool) return NULL;
844
845 pthread_mutex_lock(&pool->mutex);
846
847 now = fr_time();
848
849 /*
850 * Grab the link with the lowest latency, and check it
851 * for limits. If "connection manage" says the link is
852 * no longer usable, go grab another one.
853 */
854 do {
855 this = fr_heap_peek(pool->heap);
856 if (!this) break;
857 } while (!connection_manage(pool, request, this, now));
858
859 /*
860 * We have a working connection. Extract it from the
861 * heap and use it.
862 */
863 if (this) {
864 fr_heap_extract(&pool->heap, this);
865 goto do_return;
866 }
867
868 if (pool->state.num == pool->max) {
869 bool complain = false;
870
871 /*
872 * Rate-limit complaints.
873 */
875 complain = true;
876 pool->state.last_at_max = now;
877 }
878
879 if (!fr_rate_limit_enabled() || complain) {
880 ERROR("No connections available and at max connection limit");
881
882 /*
883 * Must be done inside the mutex, reconnect callback
884 * may modify args.
885 */
886 fr_pool_trigger(pool, "none");
887 }
888 pthread_mutex_unlock(&pool->mutex);
889
890 return NULL;
891 }
892
893 pthread_mutex_unlock(&pool->mutex);
894
895 if (!spawn) return NULL;
896
897 ROPTIONAL(RDEBUG2, DEBUG2, "%i of %u connections in use. You may need to increase \"spare\"",
898 pool->state.active, pool->state.num);
899
900 /*
901 * Returns unlocked on failure, or locked on success
902 */
903 this = connection_spawn(pool, request, now, true, false);
904 if (!this) return NULL;
905
906do_return:
907 pool->state.active++;
908 this->num_uses++;
909 this->last_reserved = fr_time();
910 this->in_use = true;
911
912#ifdef PTHREAD_DEBUG
913 this->pthread_id = pthread_self();
914#endif
915 pthread_mutex_unlock(&pool->mutex);
916
917 ROPTIONAL(RDEBUG2, DEBUG2, "Reserved connection (%" PRIu64 ")", this->number);
918
919 return this->connection;
920}
921
922/** Enable triggers for a connection pool
923 *
924 * @param[in] pool to enable triggers for.
925 * @param[in] trigger_prefix prefix to prepend to all trigger names. Usually a path
926 * to the module's trigger configuration e.g.
927 * @verbatim modules.<name>.pool @endverbatim
928 * @verbatim <trigger name> @endverbatim is appended to form
929 * the complete path.
930 * @param[in] trigger_args to make available in any triggers executed by the connection pool.
931 * These will usually be fr_pair_t (s) describing the host
932 * associated with the pool.
933 * Trigger args will be copied, input trigger_args should be freed
934 * if necessary.
935 */
936void fr_pool_enable_triggers(fr_pool_t *pool, char const *trigger_prefix, fr_pair_list_t *trigger_args)
937{
938 pool->triggers_enabled = true;
939
941 MEM(pool->trigger_prefix = trigger_prefix ? talloc_strdup(pool, trigger_prefix) : "");
942
944
945 if (!trigger_args) return;
946
947 MEM(fr_pair_list_copy(pool, &pool->trigger_args, trigger_args) >= 0);
948}
949
950/** Create a new connection pool
951 *
952 * Allocates structures used by the connection pool, initialises the various
953 * configuration options and counters, and sets the callback functions.
954 *
955 * Will also spawn the number of connections specified by the 'start' configuration
956 * option.
957 *
958 * @note Will call the 'start' trigger.
959 *
960 * @param[in] ctx Context to link pool's destruction to.
961 * @param[in] cs pool section.
962 * @param[in] opaque data pointer to pass to callbacks.
963 * @param[in] c Callback to create new connections.
964 * @param[in] a Callback to check the status of connections.
965 * @param[in] log_prefix prefix to prepend to all log messages.
966 * @return
967 * - New connection pool.
968 * - NULL on error.
969 */
970fr_pool_t *fr_pool_init(TALLOC_CTX *ctx,
971 CONF_SECTION const *cs,
972 void *opaque,
974 char const *log_prefix)
975{
976 fr_pool_t *pool = NULL;
977
978 if (!cs || !opaque || !c) return NULL;
979
980 /*
981 * Pool is allocated in the NULL context as
982 * threads are likely to allocate memory
983 * beneath the pool.
984 */
985 MEM(pool = talloc_zero(NULL, fr_pool_t));
987
988 /*
989 * Ensure the pool is freed at the same time
990 * as its parent.
991 */
992 if (ctx && (talloc_link_ctx(ctx, pool) < 0)) {
993 PERROR("%s: Failed linking pool ctx", __FUNCTION__);
994 talloc_free(pool);
995
996 return NULL;
997 }
998
999 pool->cs = cs;
1000 pool->opaque = opaque;
1001 pool->create = c;
1002 pool->alive = a;
1003
1004 pool->head = pool->tail = NULL;
1005
1006 /*
1007 * We keep a heap of connections, sorted by the last time
1008 * we STARTED using them. Newly opened connections
1009 * aren't in the heap. They're only inserted in the list
1010 * once they're released.
1011 *
1012 * We do "most recently started" instead of "most
1013 * recently used", because MRU is done as most recently
1014 * *released*. We want to order connections by
1015 * responsiveness, and MRU prioritizes high latency
1016 * connections.
1017 *
1018 * We want most recently *started*, which gives
1019 * preference to low latency links, and pushes high
1020 * latency links down in the priority heap.
1021 *
1022 * https://code.facebook.com/posts/1499322996995183/solving-the-mystery-of-link-imbalance-a-metastable-failure-state-at-scale/
1023 */
1024 if (!pool->spread) {
1026 /*
1027 * For some types of connections we need to used a different
1028 * algorithm, because load balancing benefits are secondary
1029 * to maintaining a cache of open connections.
1030 *
1031 * With libcurl's multihandle, connections can only be reused
1032 * if all handles that make up the multihandle are done processing
1033 * their requests.
1034 *
1035 * We can't tell when that's happened using libcurl, and even
1036 * if we could, blocking until all servers had responded
1037 * would have huge cost.
1038 *
1039 * The solution is to order the heap so that the connection that
1040 * was released longest ago is at the top.
1041 *
1042 * That way we maximise time between connection use.
1043 */
1044 } else {
1046 }
1047 if (!pool->heap) {
1048 ERROR("%s: Failed creating connection heap", __FUNCTION__);
1049 error:
1050 fr_pool_free(pool);
1051 return NULL;
1052 }
1053
1054 pool->log_prefix = log_prefix ? talloc_strdup(pool, log_prefix) : "core";
1055 pthread_mutex_init(&pool->mutex, NULL);
1056 pthread_cond_init(&pool->done_spawn, NULL);
1057 pthread_cond_init(&pool->done_reconnecting, NULL);
1058
1059 DEBUG2("Initialising connection pool");
1060
1061 if (cf_section_rules_push(UNCONST(CONF_SECTION *, cs), pool_config) < 0) goto error;
1062 if (cf_section_parse(pool, pool, UNCONST(CONF_SECTION *, cs)) < 0) {
1063 PERROR("Configuration parsing failed");
1064 goto error;
1065 }
1066
1067 /*
1068 * Some simple limits
1069 */
1070 if (pool->max == 0) {
1071 cf_log_err(cs, "Cannot set 'max' to zero");
1072 goto error;
1073 }
1074
1075 /*
1076 * Coverity notices that other uses of max_pending are protected with a mutex,
1077 * and thus thinks it should be locked/unlocked here...but coverity does not
1078 * consider that until this function returns a pointer to a pool, nobody can
1079 * use the pool, so there's no point to doing so.
1080 */
1081 /* coverity[missing_lock] */
1082 pool->pending_window = (pool->max_pending > 0) ? pool->max_pending : pool->max;
1083
1084 if (pool->min > pool->max) {
1085 cf_log_err(cs, "Cannot set 'min' to more than 'max'");
1086 goto error;
1087 }
1088
1089 FR_INTEGER_BOUND_CHECK("max", pool->max, <=, 1024);
1090 FR_INTEGER_BOUND_CHECK("start", pool->start, <=, pool->max);
1091 FR_INTEGER_BOUND_CHECK("spare", pool->spare, <=, (pool->max - pool->min));
1092
1093 if (fr_time_delta_ispos(pool->lifetime)) {
1094 FR_TIME_DELTA_COND_CHECK("idle_timeout", pool->idle_timeout,
1096 }
1097
1098 if (fr_time_delta_ispos(pool->idle_timeout)) {
1099 FR_TIME_DELTA_BOUND_CHECK("cleanup_interval", pool->cleanup_interval, <=, pool->idle_timeout);
1100 }
1101
1102 /*
1103 * Some libraries treat 0.0 as infinite timeout, others treat it
1104 * as instantaneous timeout. Solve the inconsistency by making
1105 * the smallest allowable timeout 100ms.
1106 */
1107 FR_TIME_DELTA_BOUND_CHECK("connect_timeout", pool->connect_timeout, >=, fr_time_delta_from_msec(100));
1108
1109 /*
1110 * Don't open any connections. Instead, force the limits
1111 * to only 1 connection.
1112 *
1113 */
1114 if (check_config) pool->start = pool->min = pool->max = 1;
1115
1116 return pool;
1117}
1118
1120{
1121 uint32_t i;
1123
1124 /*
1125 * Don't spawn any connections
1126 */
1127 if (check_config) return 0;
1128
1129 /*
1130 * Create all of the connections, unless the admin says
1131 * not to.
1132 */
1133 for (i = 0; i < pool->start; i++) {
1134 /*
1135 * Call time() once for each spawn attempt as there
1136 * could be a significant delay.
1137 */
1138 this = connection_spawn(pool, NULL, fr_time(), false, true);
1139 if (!this) {
1140 ERROR("Failed spawning initial connections");
1141 return -1;
1142 }
1143 }
1144
1145 fr_pool_trigger(pool, "start");
1146
1147 return 0;
1148}
1149
1150/** Allocate a new pool using an existing one as a template
1151 *
1152 * @param[in] ctx to allocate new pool in.
1153 * @param[in] pool to copy.
1154 * @param[in] opaque data to pass to connection function.
1155 * @return
1156 * - New connection pool.
1157 * - NULL on error.
1158 */
1159fr_pool_t *fr_pool_copy(TALLOC_CTX *ctx, fr_pool_t *pool, void *opaque)
1160{
1161 fr_pool_t *copy;
1162
1163 copy = fr_pool_init(ctx, pool->cs, opaque, pool->create, pool->alive, pool->log_prefix);
1164 if (!copy) return NULL;
1165
1166 if (pool->trigger_prefix) fr_pool_enable_triggers(copy, pool->trigger_prefix, &pool->trigger_args);
1167
1168 return copy;
1169}
1170
1171/** Get the number of connections currently in the pool
1172 *
1173 * @param[in] pool to count connections for.
1174 * @return the number of connections in the pool
1175 */
1177{
1178 return &pool->state;
1179}
1180
1181/** Connection pool get timeout
1182 *
1183 * @param[in] pool to get connection timeout for.
1184 * @return the connection timeout configured for the pool.
1185 */
1187{
1188 return pool->connect_timeout;
1189}
1190
1191/** Connection pool get start
1192 *
1193 * @param[in] pool to get connection start for.
1194 * @return the connection start value configured for the pool.
1195 */
1197{
1198 return pool->start;
1199}
1200
1201/** Return the opaque data associated with a connection pool
1202 *
1203 * @param pool to return data for.
1204 * @return opaque data associated with pool.
1205 */
1206void const *fr_pool_opaque(fr_pool_t *pool)
1207{
1208 return pool->opaque;
1209}
1210
1211/** Increment pool reference by one.
1212 *
1213 * @param[in] pool to increment reference counter for.
1214 */
1216{
1217 pool->ref++;
1218}
1219
1220/** Set a reconnection callback for the connection pool
1221 *
1222 * This can be called at any time during the pool's lifecycle.
1223 *
1224 * @param[in] pool to set reconnect callback for.
1225 * @param reconnect callback to call when reconnecting pool's connections.
1226 */
1228{
1229 pool->reconnect = reconnect;
1230}
1231
1232/** Mark connections for reconnection, and spawn at least 'start' connections
1233 *
1234 * @note This call may block whilst waiting for pending connection attempts to complete.
1235 *
1236 * This intended to be called on a connection pool that's in use, to have it reflect
1237 * a configuration change, or because the administrator knows that all connections
1238 * in the pool are inviable and need to be reconnected.
1239 *
1240 * @param[in] pool to reconnect.
1241 * @param[in] request The current request.
1242 * @return
1243 * - 0 On success.
1244 * - -1 If we couldn't create start connections, this may be ignored
1245 * depending on the context in which this function is being called.
1246 */
1248{
1249 uint32_t i;
1251 fr_time_t now;
1252
1253 pthread_mutex_lock(&pool->mutex);
1254
1255 /*
1256 * Pause new spawn attempts (we release the mutex
1257 * during our cond wait).
1258 */
1259 pool->state.reconnecting = true;
1260
1261 /*
1262 * When the loop exits, we'll hold the lock for the pool,
1263 * and we're guaranteed the connection create callback
1264 * will not be using the opaque data.
1265 */
1266 while (pool->state.pending) pthread_cond_wait(&pool->done_spawn, &pool->mutex);
1267
1268 /*
1269 * We want to ensure at least 'start' connections
1270 * have been reconnected. We can't call reconnect
1271 * because, we might get the same connection each
1272 * time we reserve one, so we close 'start'
1273 * connections, and then attempt to spawn them again.
1274 */
1275 for (i = 0; i < pool->start; i++) {
1276 this = fr_heap_peek(pool->heap);
1277 if (!this) break; /* There wasn't 'start' connections available */
1278
1279 connection_close_internal(pool, this);
1280 }
1281
1282 /*
1283 * Mark all remaining connections in the pool as
1284 * requiring reconnection.
1285 */
1286 for (this = pool->head; this; this = this->next) this->needs_reconnecting = true;
1287
1288 /*
1289 * Call the reconnect callback (if one's set)
1290 * This may modify the opaque data associated
1291 * with the pool.
1292 */
1293 if (pool->reconnect) pool->reconnect(pool, pool->opaque);
1294
1295 /*
1296 * Must be done inside the mutex, reconnect callback
1297 * may modify args.
1298 */
1299 fr_pool_trigger(pool, "reconnect");
1300
1301 /*
1302 * Allow new spawn attempts, and wakeup any threads
1303 * waiting to spawn new connections.
1304 */
1305 pool->state.reconnecting = false;
1306 pthread_cond_broadcast(&pool->done_reconnecting);
1307 pthread_mutex_unlock(&pool->mutex);
1308
1309 now = fr_time();
1310
1311 /*
1312 * Now attempt to spawn 'start' connections.
1313 */
1314 for (i = 0; i < pool->start; i++) {
1315 this = connection_spawn(pool, request, now, false, true);
1316 if (!this) return -1;
1317 }
1318
1319 return 0;
1320}
1321
1322/** Delete a connection pool
1323 *
1324 * Closes, unlinks and frees all connections in the connection pool, then frees
1325 * all memory used by the connection pool.
1326 *
1327 * @note Will call the 'stop' trigger.
1328 * @note Must be called with the mutex free.
1329 *
1330 * @param[in,out] pool to delete.
1331 */
1333{
1335
1336 if (!pool) return;
1337
1338 /*
1339 * More modules hold a reference to this pool, don't free
1340 * it yet.
1341 */
1342 if (pool->ref > 0) {
1343 pool->ref--;
1344 return;
1345 }
1346
1347 DEBUG2("Removing connection pool");
1348
1349 pthread_mutex_lock(&pool->mutex);
1350
1351 /*
1352 * Don't loop over the list. Just keep removing the head
1353 * until they're all gone.
1354 */
1355 while ((this = pool->head) != NULL) {
1356 INFO("Closing connection (%" PRIu64 ")", this->number);
1357
1358 connection_close_internal(pool, this);
1359 }
1360
1361 fr_pool_trigger(pool, "stop");
1362
1363 fr_assert(pool->head == NULL);
1364 fr_assert(pool->tail == NULL);
1365 fr_assert(pool->state.num == 0);
1366
1367 pthread_mutex_unlock(&pool->mutex);
1368 pthread_mutex_destroy(&pool->mutex);
1369 pthread_cond_destroy(&pool->done_spawn);
1370 pthread_cond_destroy(&pool->done_reconnecting);
1371
1372 talloc_free(pool);
1373}
1374
1375/** Reserve a connection in the connection pool
1376 *
1377 * Will attempt to find an unused connection in the connection pool, if one is
1378 * found, will mark it as in use, and increment the number of active connections
1379 * and return the connection handle.
1380 *
1381 * If no free connections are found will attempt to spawn a new one, conditional
1382 * on a connection spawning not already being in progress, and not being at the
1383 * 'max' connection limit.
1384 *
1385 * @note fr_pool_connection_release must be called once the caller has finished
1386 * using the connection.
1387 *
1388 * @see fr_pool_connection_release
1389 * @param[in] pool to reserve the connection from.
1390 * @param[in] request The current request.
1391 * @return
1392 * - A pointer to the connection handle.
1393 * - NULL on error.
1394 */
1396{
1397 return connection_get_internal(pool, request, true);
1398}
1399
1400/** Release a connection
1401 *
1402 * Will mark a connection as unused and decrement the number of active
1403 * connections.
1404 *
1405 * @see fr_pool_connection_get
1406 * @param[in] pool to release the connection in.
1407 * @param[in] request The current request.
1408 * @param[in] conn to release.
1409 */
1410void fr_pool_connection_release(fr_pool_t *pool, request_t *request, void *conn)
1411{
1413 fr_time_delta_t held;
1414 bool trigger_min = false, trigger_max = false;
1415
1416 this = connection_find(pool, conn);
1417 if (!this) return;
1418
1419 this->in_use = false;
1420
1421 /*
1422 * Record when the connection was last released
1423 */
1424 this->last_released = fr_time();
1425 pool->state.last_released = this->last_released;
1426
1427 /*
1428 * This is done inside the mutex to ensure
1429 * updates are atomic.
1430 */
1431 held = fr_time_sub(this->last_released, this->last_reserved);
1432
1433 /*
1434 * Check we've not exceeded out trigger limits
1435 *
1436 * These should only fire once per second.
1437 */
1439 (fr_time_delta_lt(held, pool->held_trigger_min)) &&
1440 (fr_time_delta_gteq(fr_time_sub(this->last_released, pool->state.last_held_min), fr_time_delta_from_sec(1)))) {
1441 trigger_min = true;
1442 pool->state.last_held_min = this->last_released;
1443 }
1444
1446 (fr_time_delta_gt(held, pool->held_trigger_max)) &&
1447 (fr_time_delta_gteq(fr_time_sub(this->last_released, pool->state.last_held_max), fr_time_delta_from_sec(1)))) {
1448 trigger_max = true;
1449 pool->state.last_held_max = this->last_released;
1450 }
1451
1452 /*
1453 * Insert the connection in the heap.
1454 *
1455 * This will either be based on when we *started* using it
1456 * (allowing fast links to be re-used, and slow links to be
1457 * gradually expired), or when we released it (allowing
1458 * the maximum amount of time between connection use).
1459 */
1460 fr_heap_insert(&pool->heap, this);
1461
1462 fr_assert(pool->state.active != 0);
1463 pool->state.active--;
1464
1465 ROPTIONAL(RDEBUG2, DEBUG2, "Released connection (%" PRIu64 ")", this->number);
1466
1467 /*
1468 * We mirror the "spawn on get" functionality by having
1469 * "delete on release". If there are too many spare
1470 * connections, go manage the pool && clean some up.
1471 */
1472 connection_check(pool, request);
1473
1474 if (trigger_min) fr_pool_trigger(pool, "min");
1475 if (trigger_max) fr_pool_trigger(pool, "max");
1476}
1477
1478/** Reconnect a suspected inviable connection
1479 *
1480 * This should be called by the module if it suspects that a connection is
1481 * not viable (e.g. the server has closed it).
1482 *
1483 * When implementing a module that uses the connection pool API, it is advisable
1484 * to pass a pointer to the pointer to the handle (void **conn)
1485 * to all functions which may call reconnect. This is so that if a new handle
1486 * is created and returned, the handle pointer can be updated up the callstack,
1487 * and a function higher up the stack doesn't attempt to use a now invalid
1488 * connection handle.
1489 *
1490 * @note Will free any talloced memory hung off the context of the connection,
1491 * being reconnected.
1492 *
1493 * @warning After calling reconnect the caller *MUST NOT* attempt to use
1494 * the old handle in any other operations, as its memory will have been
1495 * freed.
1496 *
1497 * @see fr_pool_connection_get
1498 * @param[in] pool to reconnect the connection in.
1499 * @param[in] request The current request.
1500 * @param[in] conn to reconnect.
1501 * @return new connection handle if successful else NULL.
1502 */
1503void *fr_pool_connection_reconnect(fr_pool_t *pool, request_t *request, void *conn)
1504{
1506
1507 if (!pool || !conn) return NULL;
1508
1509 /*
1510 * If connection_find is successful the pool is now locked
1511 */
1512 this = connection_find(pool, conn);
1513 if (!this) return NULL;
1514
1515 ROPTIONAL(RINFO, INFO, "Deleting inviable connection (%" PRIu64 ")", this->number);
1516
1517 connection_close_internal(pool, this);
1518 connection_check(pool, request); /* Whilst we still have the lock (will release the lock) */
1519
1520 /*
1521 * Return an existing connection or spawn a new one.
1522 */
1523 return connection_get_internal(pool, request, true);
1524}
1525
1526/** Delete a connection from the connection pool.
1527 *
1528 * Resolves the connection handle to a connection, then (if found)
1529 * closes, unlinks and frees that connection.
1530 *
1531 * @note Must be called with the mutex free.
1532 *
1533 * @param[in] pool Connection pool to modify.
1534 * @param[in] request The current request.
1535 * @param[in] conn to delete.
1536 * @return
1537 * - 0 If the connection could not be found.
1538 * - 1 if the connection was deleted.
1539 */
1540int fr_pool_connection_close(fr_pool_t *pool, request_t *request, void *conn)
1541{
1543
1544 this = connection_find(pool, conn);
1545 if (!this) return 0;
1546
1547 /*
1548 * Record the last time a connection was closed
1549 */
1550 pool->state.last_closed = fr_time();
1551
1552 ROPTIONAL(RINFO, INFO, "Deleting connection (%" PRIu64 ")", this->number);
1553
1554 connection_close_internal(pool, this);
1555 connection_check(pool, request);
1556 return 1;
1557}
#define UNCONST(_type, _ptr)
Remove const qualification from a pointer.
Definition build.h:168
#define RCSID(id)
Definition build.h:488
#define UNUSED
Definition build.h:318
bool check_config
Definition cf_file.c:61
int cf_section_parse(TALLOC_CTX *ctx, void *base, CONF_SECTION *cs)
Parse a configuration section into user-supplied variables.
Definition cf_parse.c:1207
#define CONF_PARSER_TERMINATOR
Definition cf_parse.h:657
#define FR_INTEGER_BOUND_CHECK(_name, _var, _op, _bound)
Definition cf_parse.h:517
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition cf_parse.h:280
#define FR_TIME_DELTA_COND_CHECK(_name, _var, _cond, _new)
Definition cf_parse.h:519
#define cf_section_rules_push(_cs, _rule)
Definition cf_parse.h:689
char const * name1
Name of the CONF_ITEM to parse.
Definition cf_parse.h:595
#define FR_TIME_DELTA_BOUND_CHECK(_name, _var, _op, _bound)
Definition cf_parse.h:528
Defines a CONF_PAIR to C data type mapping.
Definition cf_parse.h:594
Configuration AVP similar to a fr_pair_t.
Definition cf_priv.h:72
A section grouping multiple CONF_PAIR.
Definition cf_priv.h:101
CONF_PAIR * cf_pair_alloc(CONF_SECTION *parent, char const *attr, char const *value, fr_token_t op, fr_token_t lhs_quote, fr_token_t rhs_quote)
Allocate a CONF_PAIR.
Definition cf_util.c:1266
#define cf_log_err(_cf, _fmt,...)
Definition cf_util.h:285
static size_t min(size_t x, size_t y)
Definition dbuff.c:66
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:141
#define MEM(x)
Definition debug.h:46
#define ERROR(fmt,...)
Definition dhcpclient.c:40
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
int fr_heap_extract(fr_heap_t **hp, void *data)
Remove a node from the heap.
Definition heap.c:239
unsigned int fr_heap_index_t
Definition heap.h:80
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition heap.h:136
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition heap.h:115
The main heap structure.
Definition heap.h:66
talloc_free(hp)
unlang_interpret_t * unlang_interpret_get_thread_default(void)
Get the default interpreter for this thread.
Definition interpret.c:2076
#define PERROR(_fmt,...)
Definition log.h:228
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition log.h:540
#define RWARN(fmt,...)
Definition log.h:309
#define RINFO(fmt,...)
Definition log.h:308
#define RATE_LIMIT_GLOBAL_ROPTIONAL(_l_request, _l_global, _fmt,...)
Rate limit messages using a global limiting entry.
Definition log.h:675
#define fr_time()
Definition event.c:60
static bool fr_rate_limit_enabled(void)
Whether rate limiting is enabled.
Definition log.h:145
main_config_t const * main_config
Main server configuration.
Definition main_config.c:58
uint32_t max_workers
for the scheduler
unsigned int uint32_t
int fr_pair_list_copy(TALLOC_CTX *ctx, fr_pair_list_t *to, fr_pair_list_t const *from)
Duplicate a list of pairs.
Definition pair.c:2330
void fr_pair_list_init(fr_pair_list_t *list)
Initialise a pair list header.
Definition pair.c:46
fr_pool_reconnect_t reconnect
Called during connection pool reconnect.
Definition pool.c:146
fr_time_delta_t fr_pool_timeout(fr_pool_t *pool)
Connection pool get timeout.
Definition pool.c:1186
static void connection_link_head(fr_pool_t *pool, fr_pool_connection_t *this)
Adds a connection to the head of the connection list.
Definition pool.c:232
int fr_pool_start(fr_pool_t *pool)
Definition pool.c:1119
bool spread
If true we spread requests over the connections, using the connection released longest ago,...
Definition pool.c:110
uint32_t max_pending
Max number of pending connections to allow.
Definition pool.c:91
fr_time_delta_t delay_interval
When we next do a cleanup.
Definition pool.c:100
static int8_t last_released_cmp(void const *one, void const *two)
Order connections by released longest ago.
Definition pool.c:191
fr_time_t last_reserved
Last time the connection was reserved.
Definition pool.c:57
fr_heap_t * heap
For the next connection heap.
Definition pool.c:113
fr_time_delta_t held_trigger_min
If a connection is held for less than the specified period, fire a trigger.
Definition pool.c:138
fr_pool_connection_create_t create
Function used to create new connections.
Definition pool.c:143
uint32_t start
Number of initial connections.
Definition pool.c:88
fr_time_delta_t idle_timeout
How long a connection can be idle before being closed.
Definition pool.c:105
static void connection_unlink(fr_pool_t *pool, fr_pool_connection_t *this)
Removes a connection from the connection list.
Definition pool.c:205
static int connection_check(fr_pool_t *pool, request_t *request)
Check whether any connections need to be removed from the pool.
Definition pool.c:659
void fr_pool_connection_release(fr_pool_t *pool, request_t *request, void *conn)
Release a connection.
Definition pool.c:1410
uint32_t min
Minimum number of concurrent connections to keep open.
Definition pool.c:89
fr_heap_index_t heap_id
For the next connection heap.
Definition pool.c:54
uint64_t number
Unique ID assigned when the connection is created, these will monotonically increase over the lifetim...
Definition pool.c:62
CONF_SECTION const * cs
Configuration section holding the section of parsed config file that relates to this pool.
Definition pool.c:125
void fr_pool_free(fr_pool_t *pool)
Delete a connection pool.
Definition pool.c:1332
static fr_pool_connection_t * connection_find(fr_pool_t *pool, void *conn)
Find a connection handle in the connection list.
Definition pool.c:287
fr_pool_state_t const * fr_pool_state(fr_pool_t *pool)
Get the number of connections currently in the pool.
Definition pool.c:1176
static void fr_pool_trigger(fr_pool_t *pool, char const *event)
Send a connection pool trigger.
Definition pool.c:259
static int max_dflt(CONF_PAIR **out, void *parent, CONF_SECTION *cs, fr_token_t quote, conf_parser_t const *rule)
void fr_pool_ref(fr_pool_t *pool)
Increment pool reference by one.
Definition pool.c:1215
uint32_t pending_window
Sliding window of pending connections.
Definition pool.c:95
static void connection_close_internal(fr_pool_t *pool, fr_pool_connection_t *this)
Close an existing connection.
Definition pool.c:551
char const * trigger_prefix
Prefix to prepend to names of all triggers fired by the connection pool code.
Definition pool.c:134
fr_time_delta_t cleanup_interval
Initial timer for how often we sweep the pool for free connections.
Definition pool.c:98
void * connection
Pointer to whatever the module uses for a connection handle.
Definition pool.c:65
int fr_pool_reconnect(fr_pool_t *pool, request_t *request)
Mark connections for reconnection, and spawn at least 'start' connections.
Definition pool.c:1247
fr_pool_connection_t * tail
End of the connection list.
Definition pool.c:116
fr_pool_t * fr_pool_init(TALLOC_CTX *ctx, CONF_SECTION const *cs, void *opaque, fr_pool_connection_create_t c, fr_pool_connection_alive_t a, char const *log_prefix)
Create a new connection pool.
Definition pool.c:970
bool needs_reconnecting
Reconnect this connection before use.
Definition pool.c:69
fr_pool_t * fr_pool_copy(TALLOC_CTX *ctx, fr_pool_t *pool, void *opaque)
Allocate a new pool using an existing one as a template.
Definition pool.c:1159
static int connection_manage(fr_pool_t *pool, request_t *request, fr_pool_connection_t *this, fr_time_t now)
Check whether a connection needs to be removed from the pool.
Definition pool.c:599
fr_time_delta_t held_trigger_max
If a connection is held for longer than the specified period, fire a trigger.
Definition pool.c:140
fr_pool_connection_alive_t alive
Function used to check status of connections.
Definition pool.c:144
uint32_t num_uses
Number of times the connection has been reserved.
Definition pool.c:61
bool triggers_enabled
Whether we call the trigger functions.
Definition pool.c:132
fr_time_delta_t connect_timeout
New connection timeout, enforced by the create callback.
Definition pool.c:107
int fr_pool_connection_close(fr_pool_t *pool, request_t *request, void *conn)
Delete a connection from the connection pool.
Definition pool.c:1540
uint32_t spare
Number of spare connections to try.
Definition pool.c:92
uint64_t max_uses
Maximum number of times a connection can be used before being closed.
Definition pool.c:93
void * fr_pool_connection_get(fr_pool_t *pool, request_t *request)
Reserve a connection in the connection pool.
Definition pool.c:1395
void const * fr_pool_opaque(fr_pool_t *pool)
Return the opaque data associated with a connection pool.
Definition pool.c:1206
pthread_cond_t done_reconnecting
Before calling the create callback, threads should block on this condition if reconnecting == true.
Definition pool.c:122
fr_time_delta_t lifetime
How long a connection can be open before being closed (irrespective of whether it's idle or not).
Definition pool.c:103
pthread_mutex_t mutex
Mutex used to keep consistent state when making modifications in threaded mode.
Definition pool.c:118
uint32_t max
Maximum number of concurrent connections to allow.
Definition pool.c:90
fr_pool_connection_t * head
Start of the connection list.
Definition pool.c:115
int ref
Reference counter to prevent connection pool being freed multiple times.
Definition pool.c:86
int fr_pool_start_num(fr_pool_t *pool)
Connection pool get start.
Definition pool.c:1196
fr_pool_state_t state
Stats and state of the connection pool.
Definition pool.c:148
void fr_pool_enable_triggers(fr_pool_t *pool, char const *trigger_prefix, fr_pair_list_t *trigger_args)
Enable triggers for a connection pool.
Definition pool.c:936
fr_pair_list_t trigger_args
Arguments to make available in connection pool triggers.
Definition pool.c:136
fr_time_t created
Time connection was created.
Definition pool.c:56
bool in_use
Whether the connection is currently reserved.
Definition pool.c:67
fr_time_delta_t retry_delay
seconds to delay re-open after a failed open.
Definition pool.c:97
static int8_t last_reserved_cmp(void const *one, void const *two)
Order connections by reserved most recently.
Definition pool.c:182
pthread_cond_t done_spawn
Threads that need to ensure no spawning is in progress, should block on this condition if pending !...
Definition pool.c:120
static void * connection_get_internal(fr_pool_t *pool, request_t *request, bool spawn)
Get a connection from the connection pool.
Definition pool.c:838
fr_pool_connection_t * prev
Previous connection in list.
Definition pool.c:52
void fr_pool_reconnect_func(fr_pool_t *pool, fr_pool_reconnect_t reconnect)
Set a reconnection callback for the connection pool.
Definition pool.c:1227
static const conf_parser_t pool_config[]
Definition pool.c:151
fr_pool_connection_t * next
Next connection in list.
Definition pool.c:53
void * opaque
Pointer to context data that will be passed to callbacks.
Definition pool.c:127
static fr_pool_connection_t * connection_spawn(fr_pool_t *pool, request_t *request, fr_time_t now, bool in_use, bool unlock)
Spawns a new connection.
Definition pool.c:335
fr_time_t last_released
Time the connection was released.
Definition pool.c:59
void * fr_pool_connection_reconnect(fr_pool_t *pool, request_t *request, void *conn)
Reconnect a suspected inviable connection.
Definition pool.c:1503
char const * log_prefix
Log prefix to prepend to all log messages created by the connection pool code.
Definition pool.c:129
An individual connection within the connection pool.
Definition pool.c:51
A connection pool.
Definition pool.c:85
fr_time_t last_failed
Last time we tried to spawn a connection but failed.
Definition pool.h:50
fr_time_t last_closed
Last time a connection was closed.
Definition pool.h:57
fr_time_t last_throttled
Last time we refused to spawn a connection because the last connection failed, or we were already spa...
Definition pool.h:51
uint32_t active
Number of currently reserved connections.
Definition pool.h:68
fr_time_t last_released
Last time a connection was released.
Definition pool.h:56
fr_time_delta_t next_delay
The next delay time.
Definition pool.h:62
fr_time_t last_checked
Last time we pruned the connection pool.
Definition pool.h:48
void(* fr_pool_reconnect_t)(fr_pool_t *pool, void *opaque)
Alter the opaque data of a connection pool during reconnection event.
Definition pool.h:85
uint64_t count
Number of connections spawned over the lifetime of the pool.
Definition pool.h:65
fr_time_t last_held_min
Last time we warned about a low latency event.
Definition pool.h:59
fr_time_t last_at_max
Last time we hit the maximum number of allowed connections.
Definition pool.h:54
void *(* fr_pool_connection_create_t)(TALLOC_CTX *ctx, void *opaque, fr_time_delta_t timeout)
Create a new connection handle.
Definition pool.h:111
uint32_t pending
Number of pending open connections.
Definition pool.h:47
fr_time_t last_spawned
Last time we spawned a connection.
Definition pool.h:49
fr_time_t last_held_max
Last time we warned about a high latency event.
Definition pool.h:60
bool reconnecting
We are currently reconnecting the pool.
Definition pool.h:70
uint32_t num
Number of connections in the pool.
Definition pool.h:67
int(* fr_pool_connection_alive_t)(void *opaque, void *connection)
Check a connection handle is still viable.
Definition pool.h:126
#define fr_assert(_expr)
Definition rad_assert.h:37
#define RDEBUG2(fmt,...)
#define DEBUG2(fmt,...)
#define WARN(fmt,...)
static bool done
Definition radclient.c:80
#define INFO(fmt,...)
Definition radict.c:63
static char const * name
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
Definition snprintf.c:689
int talloc_link_ctx(TALLOC_CTX *parent, TALLOC_CTX *child)
Link two different parent and child contexts, so the child is freed before the parent.
Definition talloc.c:167
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition talloc.h:253
#define talloc_asprintf
Definition talloc.h:144
#define talloc_strdup(_ctx, _str)
Definition talloc.h:142
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition time.h:575
static fr_time_delta_t fr_time_delta_add(fr_time_delta_t a, fr_time_delta_t b)
Definition time.h:255
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
Definition time.h:154
#define fr_time_delta_lt(_a, _b)
Definition time.h:285
static fr_time_delta_t fr_time_delta_from_sec(int64_t sec)
Definition time.h:590
#define fr_time_delta_wrap(_time)
Definition time.h:152
#define fr_time_wrap(_time)
Definition time.h:145
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_delta_lteq(_a, _b)
Definition time.h:286
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_gt(_a, _b)
Definition time.h:237
#define fr_time_delta_gteq(_a, _b)
Definition time.h:284
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
#define fr_time_lt(_a, _b)
Definition time.h:239
#define fr_time_delta_gt(_a, _b)
Definition time.h:283
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition time.h:916
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
enum fr_token fr_token_t
@ T_BARE_WORD
Definition token.h:118
@ T_OP_EQ
Definition token.h:81
int trigger(unlang_interpret_t *intp, CONF_SECTION const *cs, CONF_PAIR **trigger_cp, char const *name, bool rate_limit, fr_pair_list_t *args)
Execute a trigger - call an executable to process an event.
Definition trigger.c:155
void fr_pair_list_free(fr_pair_list_t *list)
Free memory used by a valuepair list.
static fr_slen_t parent
Definition pair.h:858
#define fr_box_time_delta(_val)
Definition value.h:366
static size_t char ** out
Definition value.h:1030