27#define LOG_PREFIX trunk->log_prefix
30# define TALLOC_GET_TYPE_ABORT_NOOP 1
36#define _TRUNK_PRIVATE 1
37#include <freeradius-devel/server/trunk.h>
39#include <freeradius-devel/server/connection.h>
40#include <freeradius-devel/server/trigger.h>
41#include <freeradius-devel/util/misc.h>
42#include <freeradius-devel/util/syserror.h>
43#include <freeradius-devel/util/table.h>
44#include <freeradius-devel/util/minmax_heap.h>
46#ifdef HAVE_STDATOMIC_H
48# ifndef ATOMIC_VAR_INIT
49# define ATOMIC_VAR_INIT(_x) (_x)
52# include <freeradius-devel/util/stdatomic.h>
65#define fr_time test_time
72#define TRUNK_REQUEST_STATE_LOG_MAX 20
433#define CONN_TRIGGER(_state) do { \
434 if (trunk->pub.triggers) { \
435 trigger_exec(unlang_interpret_get_thread_default(), \
436 NULL, fr_table_str_by_value(trunk_conn_trigger_names, _state, \
437 "<INVALID>"), true, NULL); \
441#define CONN_STATE_TRANSITION(_new, _log) \
443 _log("[%" PRIu64 "] Trunk connection changed state %s -> %s", \
444 tconn->pub.conn->id, \
445 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
446 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>")); \
447 tconn->pub.state = _new; \
448 CONN_TRIGGER(_new); \
449 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false); \
452#define CONN_BAD_STATE_TRANSITION(_new) \
454 if (!fr_cond_assert_msg(0, "[%" PRIu64 "] Trunk connection invalid transition %s -> %s", \
455 tconn->pub.conn->id, \
456 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
457 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>"))) return; \
464#define REQUEST_TRIGGER(_state) do { \
465 if (trunk->pub.triggers) { \
466 trigger_exec(unlang_interpret_get_thread_default(), \
467 NULL, fr_table_str_by_value(trunk_req_trigger_names, _state, \
468 "<INVALID>"), true, NULL); \
475#define REQUEST_STATE_TRANSITION(_new) \
477 request_t *request = treq->pub.request; \
478 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
480 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
481 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
482 trunk_request_state_log_entry_add(__FUNCTION__, __LINE__, treq, _new); \
483 treq->pub.state = _new; \
484 REQUEST_TRIGGER(_new); \
486#define REQUEST_BAD_STATE_TRANSITION(_new) \
488 trunk_request_state_log(&default_log, L_ERR, __FILE__, __LINE__, treq); \
489 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
491 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
492 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
498#define REQUEST_STATE_TRANSITION(_new) \
500 request_t *request = treq->pub.request; \
501 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
503 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
504 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
505 treq->pub.state = _new; \
507#define REQUEST_BAD_STATE_TRANSITION(_new) \
509 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
511 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
512 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
520#define DO_REQUEST_CANCEL(_treq, _reason) \
522 if ((_treq)->pub.trunk->funcs.request_cancel) { \
523 request_t *request = (_treq)->pub.request; \
524 void *_prev = (_treq)->pub.trunk->in_handler; \
525 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_cancel; \
526 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_cancel(conn=%p, preq=%p, reason=%s, uctx=%p)", \
527 (_treq)->pub.tconn->pub.conn, \
529 fr_table_str_by_value(trunk_cancellation_reasons, \
532 (_treq)->pub.trunk->uctx); \
533 (_treq)->pub.trunk->funcs.request_cancel((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_reason), (_treq)->pub.trunk->uctx); \
534 (_treq)->pub.trunk->in_handler = _prev; \
541#define DO_REQUEST_CONN_RELEASE(_treq) \
543 if ((_treq)->pub.trunk->funcs.request_conn_release) { \
544 request_t *request = (_treq)->pub.request; \
545 void *_prev = (_treq)->pub.trunk->in_handler; \
546 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_conn_release; \
547 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_conn_release(conn=%p, preq=%p, uctx=%p)", \
548 (_treq)->pub.tconn->pub.conn, \
550 (_treq)->pub.trunk->uctx); \
551 (_treq)->pub.trunk->funcs.request_conn_release((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
552 (_treq)->pub.trunk->in_handler = _prev; \
559#define DO_REQUEST_COMPLETE(_treq) \
561 if ((_treq)->pub.trunk->funcs.request_complete) { \
562 request_t *request = (_treq)->pub.request; \
563 void *_prev = (_treq)->pub.trunk->in_handler; \
564 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_complete(request=%p, preq=%p, rctx=%p, uctx=%p)", \
565 (_treq)->pub.request, \
568 (_treq)->pub.trunk->uctx); \
569 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_complete; \
570 (_treq)->pub.trunk->funcs.request_complete((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, (_treq)->pub.trunk->uctx); \
571 (_treq)->pub.trunk->in_handler = _prev; \
578#define DO_REQUEST_FAIL(_treq, _prev_state) \
580 if ((_treq)->pub.trunk->funcs.request_fail) { \
581 request_t *request = (_treq)->pub.request; \
582 void *_prev = (_treq)->pub.trunk->in_handler; \
583 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_fail(request=%p, preq=%p, rctx=%p, state=%s uctx=%p)", \
584 (_treq)->pub.request, \
587 fr_table_str_by_value(trunk_request_states, (_prev_state), "<INVALID>"), \
588 (_treq)->pub.trunk->uctx); \
589 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_fail; \
590 (_treq)->pub.trunk->funcs.request_fail((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, _prev_state, (_treq)->pub.trunk->uctx); \
591 (_treq)->pub.trunk->in_handler = _prev; \
598#define DO_REQUEST_FREE(_treq) \
600 if ((_treq)->pub.trunk->funcs.request_free) { \
601 request_t *request = (_treq)->pub.request; \
602 void *_prev = (_treq)->pub.trunk->in_handler; \
603 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_free(request=%p, preq=%p, uctx=%p)", \
604 (_treq)->pub.request, \
606 (_treq)->pub.trunk->uctx); \
607 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_free; \
608 (_treq)->pub.trunk->funcs.request_free((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
609 (_treq)->pub.trunk->in_handler = _prev; \
616#define DO_REQUEST_MUX(_tconn) \
618 void *_prev = (_tconn)->pub.trunk->in_handler; \
619 DEBUG3("[%" PRIu64 "] Calling request_mux(el=%p, tconn=%p, conn=%p, uctx=%p)", \
620 (_tconn)->pub.conn->id, \
621 (_tconn)->pub.trunk->el, \
623 (_tconn)->pub.conn, \
624 (_tconn)->pub.trunk->uctx); \
625 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_mux; \
626 (_tconn)->pub.trunk->funcs.request_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
627 (_tconn)->pub.trunk->in_handler = _prev; \
633#define DO_REQUEST_DEMUX(_tconn) \
635 void *_prev = (_tconn)->pub.trunk->in_handler; \
636 DEBUG3("[%" PRIu64 "] Calling request_demux(tconn=%p, conn=%p, uctx=%p)", \
637 (_tconn)->pub.conn->id, \
639 (_tconn)->pub.conn, \
640 (_tconn)->pub.trunk->uctx); \
641 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_demux; \
642 (_tconn)->pub.trunk->funcs.request_demux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
643 (_tconn)->pub.trunk->in_handler = _prev; \
649#define DO_REQUEST_CANCEL_MUX(_tconn) \
651 if ((_tconn)->pub.trunk->funcs.request_cancel_mux) { \
652 void *_prev = (_tconn)->pub.trunk->in_handler; \
653 DEBUG3("[%" PRIu64 "] Calling request_cancel_mux(tconn=%p, conn=%p, uctx=%p)", \
654 (_tconn)->pub.conn->id, \
656 (_tconn)->pub.conn, \
657 (_tconn)->pub.trunk->uctx); \
658 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_cancel_mux; \
659 (_tconn)->pub.trunk->funcs.request_cancel_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
660 (_tconn)->pub.trunk->in_handler = _prev; \
667#define DO_CONNECTION_ALLOC(_tconn) \
669 void *_prev = trunk->in_handler; \
670 DEBUG3("Calling connection_alloc(tconn=%p, el=%p, conf=%p, log_prefix=\"%s\", uctx=%p)", \
672 (_tconn)->pub.trunk->el, \
673 (_tconn)->pub.trunk->conf.conn_conf, \
675 (_tconn)->pub.trunk->uctx); \
676 (_tconn)->pub.trunk->in_handler = (void *) (_tconn)->pub.trunk->funcs.connection_alloc; \
677 (_tconn)->pub.conn = trunk->funcs.connection_alloc((_tconn), (_tconn)->pub.trunk->el, (_tconn)->pub.trunk->conf.conn_conf, (_tconn)->pub.trunk->log_prefix, trunk->uctx); \
678 (_tconn)->pub.trunk->in_handler = _prev; \
679 if (!(_tconn)->pub.conn) { \
680 ERROR("Failed creating new connection"); \
681 talloc_free(tconn); \
689#define DO_CONNECTION_NOTIFY(_tconn, _events) \
691 if ((_tconn)->pub.trunk->funcs.connection_notify) { \
692 void *_prev = (_tconn)->pub.trunk->in_handler; \
693 DEBUG3("[%" PRIu64 "] Calling connection_notify(tconn=%p, conn=%p, el=%p, events=%s, uctx=%p)", \
694 (_tconn)->pub.conn->id, \
696 (_tconn)->pub.conn, \
697 (_tconn)->pub.trunk->el, \
698 fr_table_str_by_value(trunk_connection_events, (_events), "<INVALID>"), \
699 (_tconn)->pub.trunk->uctx); \
700 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.connection_notify; \
701 (_tconn)->pub.trunk->funcs.connection_notify((_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->el, (_events), (_tconn)->pub.trunk->uctx); \
702 (_tconn)->pub.trunk->in_handler = _prev; \
706#define IN_HANDLER(_trunk) (((_trunk)->in_handler) != NULL)
707#define IN_REQUEST_MUX(_trunk) (((_trunk)->funcs.request_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_mux))
708#define IN_REQUEST_DEMUX(_trunk) (((_trunk)->funcs.request_demux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_demux))
709#define IN_REQUEST_CANCEL_MUX(_trunk) (((_trunk)->funcs.request_cancel_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_cancel_mux))
711#define IS_SERVICEABLE(_tconn) ((_tconn)->pub.state & TRUNK_CONN_SERVICEABLE)
712#define IS_PROCESSING(_tconn) ((tconn)->pub.state & TRUNK_CONN_PROCESSING)
717#define REQUEST_EXTRACT_BACKLOG(_treq) \
720 _ret = fr_heap_extract(&(_treq)->pub.trunk->backlog, _treq); \
721 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from backlog heap: %s", fr_strerror())) break; \
727#define REQUEST_EXTRACT_PENDING(_treq) \
730 _ret = fr_heap_extract(&(_treq)->pub.tconn->pending, _treq); \
731 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from pending heap: %s", fr_strerror())) break; \
737#define REQUEST_EXTRACT_PARTIAL(_treq) \
739 fr_assert((_treq)->pub.tconn->partial == treq); \
740 tconn->partial = NULL; \
746#define REQUEST_EXTRACT_SENT(_treq) fr_dlist_remove(&tconn->sent, treq)
751#define REQUEST_EXTRACT_REAPABLE(_treq) fr_dlist_remove(&tconn->reapable, treq)
756#define REQUEST_EXTRACT_CANCEL(_treq) fr_dlist_remove(&tconn->cancel, treq)
761#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq) \
763 fr_assert((_treq)->pub.tconn->cancel_partial == treq); \
764 tconn->cancel_partial = NULL; \
770#define REQUEST_EXTRACT_CANCEL_SENT(_treq) fr_dlist_remove(&tconn->cancel_sent, treq)
776#define CONN_REORDER(_tconn) \
779 if ((fr_minmax_heap_num_elements((_tconn)->pub.trunk->active) == 1)) break; \
780 if (!fr_cond_assert((_tconn)->pub.state == TRUNK_CONN_ACTIVE)) break; \
781 _ret = fr_minmax_heap_extract((_tconn)->pub.trunk->active, (_tconn)); \
782 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) break; \
783 fr_minmax_heap_insert((_tconn)->pub.trunk->active, (_tconn)); \
815#define CALL_WATCHERS(_trunk, _state) \
817 if (fr_dlist_empty(&(_trunk)->watch[_state])) break; \
818 trunk_watch_call((_trunk), &(_trunk)->watch[_state], _state); \
840 if (entry->
func == watch) {
879 memcpy(&entry->
uctx, &uctx,
sizeof(entry->
uctx));
885#define TRUNK_STATE_TRANSITION(_new) \
887 DEBUG3("Trunk changed state %s -> %s", \
888 fr_table_str_by_value(trunk_states, trunk->pub.state, "<INVALID>"), \
889 fr_table_str_by_value(trunk_states, _new, "<INVALID>")); \
890 CALL_WATCHERS(trunk, _new); \
891 trunk->pub.state = _new; \
1629 RWARN,
WARN,
"Refusing to enqueue requests - "
1630 "No active connections and last event was a connection failure");
1650 uint64_t total_reqs;
1656 RWARN,
WARN,
"Refusing to alloc requests - "
1657 "Limit of %"PRIu64
" (max = %u * per_connection_max = %u) "
1658 "plus %u backlog requests reached",
1738 int states, uint64_t max)
1743 if (max == 0) max = UINT64_MAX;
1745#define OVER_MAX_CHECK if (++count > max) return (count - 1)
1747#define DEQUEUE_ALL(_src_list, _state) do { \
1748 while ((treq = fr_dlist_head(_src_list))) { \
1750 fr_assert(treq->pub.state == (_state)); \
1751 trunk_request_enter_unassigned(treq); \
1752 fr_dlist_insert_tail(out, treq); \
1855 if (max == 0) max = UINT64_MAX;
1888 "Failed extracting conn from active heap: %s",
fr_strerror()))
goto done;
1952 "Failed re-inserting conn into active heap: %s",
fr_strerror()))
goto done;
1954 if (moved >= max)
goto done;
2034 "%s can only be called from within request_mux handler", __FUNCTION__))
return;
2055 "%s can only be called from within request_mux handler", __FUNCTION__))
return;
2077 "%s can only be called from within request_mux handler", __FUNCTION__))
return;
2164 "%s cannot be called within a handler", __FUNCTION__))
return;
2184 "Bad state %s after cancellation",
2241 "%s can only be called from within request_cancel_mux handler", __FUNCTION__))
return;
2265 "%s can only be called from within request_cancel_mux handler", __FUNCTION__))
return;
2289 "%s can only be called from within request_demux or request_cancel_mux handlers",
2290 __FUNCTION__))
return;
2348 *treq_to_free = NULL;
2383 talloc_free_children(treq);
2402 talloc_free_children(treq);
2488 RWARN,
WARN,
"Refusing to alloc requests - "
2489 "Limit of %"PRIu64
" (max = %u * per_connection_max = %u) "
2490 "plus %u backlog requests reached",
2588 request_t *request,
void *preq,
void *rctx)
2694 if (treq->
pub.
trunk->conf.always_writable) {
2742 request_t *request,
void *preq,
void *rctx,
2756 if (!ignore_limits) {
2804 fr_assert_msg(slog,
"slog list head NULL but element counter was %u",
2807 memset(slog, 0,
sizeof(*slog));
2838 fr_log(log, log_type,
file,
line,
"[%u] %s:%i - in conn %"PRIu64
" in state %s - %s -> %s",
3489 PERROR(
"Failed inserting connection reconnection timer event, halting connection");
3516 bool need_requeue =
false;
3525 need_requeue =
true;
3840 "%s can only be called from within request_cancel_mux handler",
3841 __FUNCTION__))
return -2;
3844 if (!*treq_out)
return 1;
3888 "%s can only be called from within request_mux handler",
3889 __FUNCTION__))
return -2;
3892 if (!*treq_out)
return 1;
3908 "%s cannot be called within a handler", __FUNCTION__))
return;
3910 DEBUG3(
"[%" PRIu64
"] Signalled writable", tconn->
pub.
conn->id);
3926 "%s cannot be called within a handler", __FUNCTION__))
return;
3928 DEBUG3(
"[%" PRIu64
"] Signalled readable", tconn->
pub.
conn->id);
4068 DEBUG3(
"Closing %s connection with no requests",
4111 DEBUG3(
"Rebalancing requests");
4172 DEBUG4(
"Managing trunk");
4272 DEBUG4(
"Not opening connection - Too many (%u) connections in the connecting state",
4284 DEBUG4(
"Not opening connection - Need to be above target for %pVs. It's been %pVs",
4299 DEBUG4(
"Not opening connection - Have %u connections, need %u or below",
4310 DEBUG4(
"Not opening connection - No outstanding requests");
4318 if (conn_count > 0) {
4320 if (average < trunk->
conf.target_req_per_conn) {
4321 DEBUG4(
"Not opening connection - Would leave us below our target requests "
4322 "per connection (now %u, after open %u)",
4351 DEBUG4(
"Not opening connection - Need to wait %pVs before opening another connection. "
4358 DEBUG4(
"Opening connection - Above target requests per connection (now %u, target %u)",
4370 DEBUG4(
"Not closing connection - Need to be below target for %pVs. It's been %pVs",
4379 DEBUG4(
"Not closing connection - No connections to close!");
4383 if ((trunk->
conf.
min > 0) && ((conn_count - 1) < trunk->
conf.
min)) {
4384 DEBUG4(
"Not closing connection - Have %u connections, need %u or above",
4390 DEBUG4(
"Closing connection - No outstanding requests");
4402 if (conn_count == 1) {
4403 DEBUG4(
"Not closing connection - Would leave connections "
4404 "and there are still %u outstanding requests", req_count);
4414 DEBUG4(
"Not closing connection - Would leave us above our target requests per connection "
4415 "(now %u, after close %u)",
ROUND_UP_DIV(req_count, conn_count), average);
4419 DEBUG4(
"Closing connection - Below target requests per connection (now %u, target %u)",
4424 DEBUG4(
"Not closing connection - Need to wait %pVs before closing another connection. "
4440 DEBUG4(
"Not closing remaining connection - last event was a failure");
4509 PERROR(
"Failed inserting trunk management event");
4528#define COUNT_BY_STATE(_state, _list) \
4530 if (conn_state & (_state)) { \
4532 while ((tconn = fr_dlist_next(&trunk->_list, tconn))) { \
4533 count += trunk_request_count_by_connection(tconn, req_state); \
4577 uint64_t req_per_conn = 0;
4620 if (conn_count == 0) {
4625 if (req_count == 0) {
4642 }
else if (req_per_conn < trunk->
conf.target_req_per_conn) {
4653 if (conn_count_out) *conn_count_out = conn_count;
4654 if (req_count_out) *req_count_out = req_count;
4663 return req_per_conn;
4728#define RECONNECT_BY_STATE(_state, _list) \
4730 if (states & (_state)) { \
4732 for (i = fr_dlist_num_elements(&trunk->_list); i > 0; i--) { \
4733 connection_signal_reconnect(((trunk_connection_t *)fr_dlist_tail(&trunk->_list))->pub.conn, reason); \
4773 for (i = 0; i < trunk->
conf.
start; i++) {
4774 DEBUG(
"[%i] Starting initial connection", i);
4795 PERROR(
"Failed inserting trunk management event");
4812 DEBUG3(
"Connection management enabled");
4823 DEBUG3(
"Connection management disabled");
4834 PERROR(
"Failed inserting trunk management event");
4855 return ((a_count > b_count) && ((a_count - b_count) > 1)) - ((b_count > a_count) && ((b_count - a_count) > 1));
4868 DEBUG4(
"Trunk free %p", trunk);
4947 char const *log_prefix,
void const *uctx,
bool delay_start)
4959 trunk->
log_prefix = talloc_strdup(trunk, log_prefix);
4961 memcpy(&trunk->
funcs, funcs,
sizeof(trunk->
funcs));
4969 memcpy(&trunk->
uctx, &uctx,
sizeof(trunk->
uctx));
5005 DEBUG4(
"Trunk allocated %p", trunk);
5017#ifndef TALLOC_GET_TYPE_ABORT_NOOP
5029 (void) talloc_get_type_abort(trunk,
trunk_t);
5035#define IO_FUNC_VERIFY(_func) \
5036 fr_fatal_assert_msg(trunk->funcs._func, "CONSISTENCY_CHECK_FAILED %s[%i}: " #_func " was NULL", file, line)
5045#define TRUNK_TCONN_CHECKS(_tconn, _state) \
5047 fr_fatal_assert_msg(trunk == _tconn->pub.trunk, \
5048 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-trunk mismatch", file, line); \
5049 fr_fatal_assert_msg(_state == _tconn->pub.state, \
5050 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-state mismatch", file, line); \
5053#define TCONN_DLIST_VERIFY(_dlist, _state) \
5055 _fr_dlist_verify(file, line, &(trunk->_dlist)); \
5056 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5057 trunk_connection_verify(file, line, tconn); \
5058 TRUNK_TCONN_CHECKS(tconn, _state); \
5062#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state) \
5064 fr_minmax_heap_verify(file, line, trunk->_heap); \
5065 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5066 trunk_connection_verify(file, line, tconn); \
5067 TRUNK_TCONN_CHECKS(tconn, _state); \
5099#define TCONN_TREQ_CHECKS(_treq, _state) \
5101 fr_fatal_assert_msg(tconn == _treq->pub.tconn, \
5102 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-tconn mismatch", file, line); \
5103 fr_fatal_assert_msg(tconn->pub.trunk == _treq->pub.trunk, \
5104 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-trunk mismatch", file, line); \
5105 fr_fatal_assert_msg(_state == _treq->pub.state, \
5106 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-state mismatch", file, line); \
5109#define TREQ_DLIST_VERIFY(_dlist, _state) \
5111 _fr_dlist_verify(file, line, &(tconn->_dlist)); \
5112 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5113 trunk_request_verify(file, line, treq); \
5114 TCONN_TREQ_CHECKS(treq, _state); \
5118#define TREQ_HEAP_VERIFY(_heap, _state) \
5120 fr_heap_iter_t _iter; \
5121 fr_heap_verify(file, line, tconn->_heap); \
5122 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5124 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5125 trunk_request_verify(file, line, treq); \
5126 TCONN_TREQ_CHECKS(treq, _state); \
5130#define TREQ_OPTION_VERIFY(_option, _state) \
5132 if (tconn->_option) { \
5133 trunk_request_verify(file, line, tconn->_option); \
5134 TCONN_TREQ_CHECKS(tconn->_option, _state); \
5152#ifdef WITH_VERIFY_PTR
5160#define TCONN_DLIST_SEARCH(_dlist) \
5162 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5163 if (ptr == tconn) { \
5164 fr_fprintf(stderr, "trunk_search: tconn %p on " #_dlist "\n", ptr); \
5167 if (trunk_connection_search(tconn, ptr)) { \
5168 fr_fprintf(stderr, " in tconn %p on " #_dlist "\n", tconn); \
5174#define TCONN_MINMAX_HEAP_SEARCH(_heap) \
5176 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5177 if (ptr == tconn) { \
5178 fr_fprintf(stderr, "trunk_search: tconn %p on " #_heap "\n", ptr); \
5181 if (trunk_connection_search(tconn, ptr)) { \
5182 fr_fprintf(stderr, " on tconn %p on " #_heap "\n", tconn); \
5205#define TREQ_DLIST_SEARCH(_dlist) \
5207 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5208 if (ptr == treq) { \
5209 fr_fprintf(stderr, "trunk_search: treq %p on " #_dlist "\n", ptr); \
5212 if (trunk_request_search(treq, ptr)) { \
5213 fr_fprintf(stderr, "trunk_search: preq %p found on " #_dlist, ptr); \
5219#define TREQ_HEAP_SEARCH(_heap) \
5221 fr_heap_iter_t _iter; \
5222 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5224 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5225 if (ptr == treq) { \
5226 fr_fprintf(stderr, "trunk_search: treq %p in " #_heap "\n", ptr); \
5229 if (trunk_request_search(treq, ptr)) { \
5230 fr_fprintf(stderr, "trunk_search: preq %p found in " #_heap, ptr); \
5236#define TREQ_OPTION_SEARCH(_option) \
5238 if (tconn->_option) { \
5239 if (ptr == tconn->_option) { \
5240 fr_fprintf(stderr, "trunk_search: treq %p is " #_option "\n", ptr); \
5243 if (trunk_request_search(tconn->_option, ptr)) { \
5244 fr_fprintf(stderr, "trunk_search: preq %p found in " #_option, ptr); \
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request)
#define L(_str)
Helper for initialising arrays of string literals.
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
#define CONF_PARSER_TERMINATOR
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
#define FR_CONF_OFFSET_SUBSECTION(_name, _flags, _struct, _field, _subcs)
conf_parser_t which populates a sub-struct using a CONF_SECTION
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Defines a CONF_PAIR to C data type mapping.
@ CONNECTION_STATE_FAILED
Connection has failed.
@ CONNECTION_STATE_HALTED
The connection is in a halted stat.
@ CONNECTION_STATE_CLOSED
Connection has been closed.
@ CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
@ CONNECTION_STATE_INIT
Init state, sets up connection.
@ CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
@ CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
static size_t min(size_t x, size_t y)
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
static void _fr_dlist_verify(char const *file, int line, fr_dlist_head_t const *list_head)
Check all items in the list are valid.
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
static void * fr_dlist_prev(fr_dlist_head_t const *list_head, void const *ptr)
Get the previous item in a list.
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
static void * fr_dlist_tail(fr_dlist_head_t const *list_head)
Return the TAIL item of a list or NULL if the list is empty.
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
#define fr_dlist_verify(_head)
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Head of a doubly linked list.
Entry in a doubly linked list.
#define fr_event_timer_in(...)
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
unsigned int fr_heap_index_t
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
#define FR_HEAP_VERIFY(_heap)
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
#define RATE_LIMIT_LOCAL_ROPTIONAL(_entry, _l_request, _l_global, _fmt,...)
Rate limit messages using a local limiting entry.
Track when a log message was last repeated.
int fr_event_timer_delete(fr_event_timer_t const **ev_p)
Delete a timer event from the event list.
Stores all information relating to an event list.
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
#define ROUND_UP_DIV(_x, _y)
Get the ceiling value of integer division.
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
void * fr_minmax_heap_iter_next(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Get the next entry in a minmax heap.
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
void * fr_minmax_heap_iter_init(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Iterate over entries in a minmax heap.
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
unsigned int fr_minmax_heap_iter_t
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
int8_t fr_pointer_cmp(void const *a, void const *b)
Compares two pointers.
static int8_t request_prioritise(void const *one, void const *two)
static fr_event_list_t * events
void connection_signal_shutdown(connection_t *conn)
Shuts down a connection gracefully.
int connection_del_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a post list.
void connection_signal_halt(connection_t *conn)
Shuts down a connection ungracefully.
void connection_signals_resume(connection_t *conn)
Resume processing of deferred signals.
void connection_signal_reconnect(connection_t *conn, connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
void connection_signal_init(connection_t *conn)
Asynchronously signal a halted connection to start.
connection_t * connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_funcs_t const *funcs, connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
connection_watch_entry_t * connection_add_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed before a state function has been called.
connection_watch_entry_t * connection_add_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed after a state function has been called.
int connection_del_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a pre list.
void connection_signals_pause(connection_t *conn)
Pause processing of deferred signals.
static fr_time_t test_time_base
#define fr_time()
Allow us to arbitrarily manipulate time.
#define atomic_fetch_add_explicit(object, operand, order)
#define ATOMIC_VAR_INIT(value)
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
An element in a table indexed by bit position.
An element in an arbitrarily ordered array of name to num mappings.
#define talloc_get_type_abort_const
#define talloc_pooled_object(_ctx, _type, _num_subobjects, _total_subobjects_size)
#define fr_time_gteq(_a, _b)
#define fr_time_delta_wrap(_time)
#define fr_time_wrap(_time)
#define fr_time_lteq(_a, _b)
#define fr_time_delta_ispos(_a)
#define fr_time_add(_a, _b)
Add a time/time delta together.
#define fr_time_gt(_a, _b)
#define fr_time_sub(_a, _b)
Subtract one time from another.
#define fr_time_lt(_a, _b)
#define fr_time_delta_gt(_a, _b)
bool trunk_search(trunk_t *trunk, void *ptr)
static atomic_uint_fast64_t request_counter
static void trunk_connection_enter_active(trunk_connection_t *tconn)
Transition a connection back to the active state.
#define CONN_REORDER(_tconn)
Reorder the connections in the active heap.
static size_t trunk_req_trigger_names_len
int trunk_connection_pop_cancellation(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a cancellation request off a connection's cancellation queue.
fr_dlist_head_t cancel
Requests in the cancel state.
int trunk_connection_manage_schedule(trunk_t *trunk)
Schedule a trunk management event for the next time the event loop is executed.
#define REQUEST_EXTRACT_SENT(_treq)
Remove the current request from the sent list.
static void _trunk_connection_on_shutdown(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the shutdown state.
struct trunk_watch_entry_s trunk_watch_entry_t
An entry in a trunk watch function list.
fr_dlist_head_t reapable
Idle request.
fr_heap_t * pending
Requests waiting to be sent.
trunk_conf_t conf
Trunk common configuration.
static size_t trunk_connection_states_len
#define REQUEST_EXTRACT_REAPABLE(_treq)
Remove the current request from the reapable list.
trunk_connection_t * tconn
The request was associated with.
void trunk_connection_callback_readable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O read function.
fr_rate_limit_t limit_last_failure_log
Rate limit on "Refusing to enqueue requests - No active conns".
void trunk_verify(char const *file, int line, trunk_t *trunk)
Verify a trunk.
#define IN_HANDLER(_trunk)
static fr_table_num_ordered_t const trunk_connection_states[]
void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
Force the trunk to re-establish its connections.
void trunk_connection_callback_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O write function.
void * uctx
User data to pass to the function.
static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new)
Transition a request to the pending state, adding it to the backlog of an active connection.
static void trunk_request_remove_from_conn(trunk_request_t *treq)
Remove a request from all connection lists.
fr_rate_limit_t limit_max_requests_alloc_log
Rate limit on "Refusing to alloc requests - Limit of * requests reached".
trunk_request_state_t to
What state we transitioned to.
static int8_t _trunk_request_prioritise(void const *a, void const *b)
Compare two protocol requests.
static void trunk_manage(trunk_t *trunk, fr_time_t now)
Implements the algorithm we use to manage requests per connection levels.
static int _trunk_connection_free(trunk_connection_t *tconn)
Free a connection.
trunk_io_funcs_t funcs
I/O functions.
fr_dlist_head_t draining
Connections that will be freed once all their requests are complete, but can be reactivated.
#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq)
Remove the current request from the cancel_partial slot.
int trunk_start(trunk_t *trunk)
Start the trunk running.
void trunk_request_signal_partial(trunk_request_t *treq)
Signal a partial write.
void trunk_request_signal_fail(trunk_request_t *treq)
Signal that a trunk request failed.
fr_event_timer_t const * manage_ev
Periodic connection management event.
#define TREQ_OPTION_SEARCH(_option)
void trunk_request_signal_cancel_sent(trunk_request_t *treq)
Signal that a remote server has been notified of the cancellation.
static void trunk_connection_enter_draining_to_free(trunk_connection_t *tconn)
Transition a connection to the draining-to-reconnect state.
trunk_watch_t func
Function to call when a trunk enters.
void trunk_connection_signal_readable(trunk_connection_t *tconn)
Signal that a trunk connection is readable.
#define DO_REQUEST_FREE(_treq)
Call the free callback (if set)
trunk_request_t * trunk_request_alloc(trunk_t *trunk, request_t *request)
(Pre-)Allocate a new trunk request
static void _trunk_connection_on_halted(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the halted state.
#define REQUEST_EXTRACT_BACKLOG(_treq)
Remove the current request from the backlog.
fr_heap_index_t heap_id
Used to track the connection in the connected heap.
fr_dlist_head_t closed
Connections that have closed.
fr_dlist_head_t watch[TRUNK_STATE_MAX]
To be called when trunk changes state.
static void trunk_watch_call(trunk_t *trunk, fr_dlist_head_t *list, trunk_state_t state)
Call a list of watch functions associated with a state.
static void trunk_request_enter_cancel_complete(trunk_request_t *treq)
Cancellation was acked, the request is complete, free it.
int line
Line change occurred on.
static void trunk_connection_enter_inactive_draining(trunk_connection_t *tconn)
Transition a connection to the inactive-draining state.
#define CONN_STATE_TRANSITION(_new, _log)
static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out, trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify)
Update timestamps for when we last had a transition from above target to below target or vice versa.
static size_t trunk_connection_events_len
static void _trunk_connection_on_failed(connection_t *conn, connection_state_t prev, connection_state_t state, void *uctx)
Connection failed.
bool oneshot
Remove the function after it's called once.
bool started
Has the trunk been started.
static size_t trunk_states_len
#define TCONN_DLIST_VERIFY(_dlist, _state)
#define IO_FUNC_VERIFY(_func)
uint32_t trunk_request_count_by_connection(trunk_connection_t const *tconn, int req_state)
Return the count number of requests associated with a trunk connection.
uint64_t last_req_per_conn
The last request to connection ratio we calculated.
#define DO_REQUEST_COMPLETE(_treq)
Call the complete callback (if set)
static void trunk_connection_auto_full(trunk_connection_t *tconn)
Automatically mark a connection as inactive.
static void trunk_connection_remove(trunk_connection_t *tconn)
Remove a trunk connection from whichever list it's currently in.
#define TRUNK_REQUEST_STATE_LOG_MAX
The maximum number of state logs to record per request.
static void trunk_connection_writable(trunk_connection_t *tconn)
A connection is writable.
fr_event_timer_t const * lifetime_ev
Maximum time this connection can be open.
trunk_connection_event_t events
The current events we expect to be notified on.
trunk_watch_entry_t * trunk_add_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch, bool oneshot, void const *uctx)
Add a watch entry to the trunk state list.
static int _trunk_free(trunk_t *trunk)
Free a trunk, gracefully closing all connections.
fr_dlist_head_t failed
Connections that'll be reconnected shortly.
static void trunk_rebalance(trunk_t *trunk)
Rebalance connections across active trunk members when a new connection becomes active.
static void trunk_backlog_drain(trunk_t *trunk)
Drain the backlog of as many requests as possible.
#define DO_REQUEST_CANCEL(_treq, _reason)
Call the cancel callback if set.
static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
Order connections by queue depth.
struct trunk_request_pub_s pub
Public fields in the trunk request.
trunk_t * trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, trunk_io_funcs_t const *funcs, trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start)
Allocate a new collection of connections.
#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state)
trunk_request_t * cancel_partial
Partially written cancellation request.
#define TCONN_MINMAX_HEAP_SEARCH(_heap)
uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Move requests off of a connection and requeue elsewhere.
bool enabled
Whether the watch entry is enabled.
fr_time_t last_freed
Last time this request was freed.
#define DO_REQUEST_CONN_RELEASE(_treq)
Call the "conn_release" callback (if set)
#define TREQ_DLIST_SEARCH(_dlist)
#define REQUEST_EXTRACT_CANCEL(_treq)
Remove the current request from the cancel list.
static bool trunk_connection_is_full(trunk_connection_t *tconn)
Return whether a trunk connection should currently be considered full.
struct trunk_pub_s pub
Public fields in the trunk connection.
trunk_cancel_reason_t cancel_reason
Why this request was cancelled.
#define REQUEST_BAD_STATE_TRANSITION(_new)
trunk_enqueue_t trunk_request_enqueue_on_conn(trunk_request_t **treq_out, trunk_connection_t *tconn, request_t *request, void *preq, void *rctx, bool ignore_limits)
Enqueue additional requests on a specific connection.
static void _trunk_connection_on_closed(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection failed after it was connected.
static fr_table_num_ordered_t const trunk_connection_events[]
trunk_enqueue_t trunk_request_enqueue(trunk_request_t **treq_out, trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
#define TCONN_DLIST_SEARCH(_dlist)
static void trunk_request_enter_unassigned(trunk_request_t *treq)
Transition a request to the unassigned state, in preparation for re-assignment.
struct trunk_request_s trunk_request_t
void * in_handler
Which handler we're inside.
bool freeing
Trunk is being freed, don't spawn new connections or re-enqueue.
static fr_table_num_ordered_t const trunk_states[]
static void trunk_connection_readable(trunk_connection_t *tconn)
A connection is readable.
#define IS_SERVICEABLE(_tconn)
trunk_enqueue_t trunk_request_requeue(trunk_request_t *treq)
Re-enqueue a request on the same connection.
#define IS_PROCESSING(_tconn)
#define RECONNECT_BY_STATE(_state, _list)
static void trunk_connection_enter_draining(trunk_connection_t *tconn)
Transition a connection to the draining state.
static fr_table_num_indexed_bit_pos_t const trunk_req_trigger_names[]
Map request states to trigger names.
fr_dlist_t entry
Used to track the trunk request in the conn->sent or trunk->backlog request.
static void trunk_connection_close_if_empty(trunk_t *trunk, fr_dlist_head_t *head)
Close connections in a particular connection list if they have no requests associated with them.
void trunk_request_signal_cancel_complete(trunk_request_t *treq)
Signal that a remote server acked our cancellation.
static trunk_enqueue_t trunk_request_check_enqueue(trunk_connection_t **tconn_out, trunk_t *trunk, request_t *request)
Check to see if a trunk request can be enqueued.
#define DO_REQUEST_MUX(_tconn)
Write one or more requests to a connection.
#define REQUEST_EXTRACT_PARTIAL(_treq)
Remove the current request from the partial slot.
fr_dlist_head_t sent
Sent request.
static void trunk_request_enter_partial(trunk_request_t *treq)
Transition a request to the partial state, indicating that is has been partially sent.
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
fr_dlist_head_t connecting
Connections which are not yet in the open state.
static void _trunk_timer(fr_event_list_t *el, fr_time_t now, void *uctx)
Event to periodically call the connection management function.
#define TRUNK_STATE_TRANSITION(_new)
void trunk_request_signal_cancel(trunk_request_t *treq)
Cancel a trunk request.
void trunk_request_state_log_entry_add(char const *function, int line, trunk_request_t *treq, trunk_request_state_t new)
static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now)
Attempt to spawn a new connection.
int trunk_del_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch)
Remove a watch function from a trunk state list.
struct trunk_connection_pub_s pub
Public fields in the trunk connection.
static void trunk_request_enter_reapable(trunk_request_t *treq)
Transition a request to the reapable state, indicating that it's been sent in its entirety,...
uint16_t trunk_connection_count_by_state(trunk_t *trunk, int conn_state)
Return the count number of connections in the specified states.
#define IN_REQUEST_DEMUX(_trunk)
#define DO_REQUEST_FAIL(_treq, _prev_state)
Call the fail callback (if set)
static void trunk_request_enter_cancel(trunk_request_t *treq, trunk_cancel_reason_t reason)
Transition a request to the cancel state, placing it in a connection's cancellation list.
static trunk_enqueue_t trunk_request_enqueue_existing(trunk_request_t *treq)
Enqueue a request which has never been assigned to a connection or was previously cancelled.
bool managing_connections
Whether the trunk is allowed to manage (open/close) connections.
#define DO_CONNECTION_ALLOC(_tconn)
Allocate a new connection.
char const * function
State change occurred in.
static size_t trunk_request_states_len
fr_dlist_head_t init
Connections which have not yet started connecting.
fr_dlist_head_t * log_head
To allow the log entry to remove itself on free.
static void trunk_request_enter_cancel_partial(trunk_request_t *treq)
Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot.
static void _trunk_connection_on_connected(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connected state.
fr_dlist_head_t to_free
Connections we're done with and will free on the next call to trunk_manage.
trunk_request_t * partial
Partially written request.
static void trunk_request_enter_failed(trunk_request_t *treq)
Request failed, inform the API client and free the request.
fr_minmax_heap_t * active
Connections which can service requests.
conf_parser_t const trunk_config[]
Config parser definitions to populate a trunk_conf_t.
static void trunk_request_enter_complete(trunk_request_t *treq)
Request completed successfully, inform the API client and free the request.
static void trunk_request_enter_sent(trunk_request_t *treq)
Transition a request to the sent state, indicating that it's been sent in its entirety.
#define DO_REQUEST_CANCEL_MUX(_tconn)
Write one or more cancellation requests to a connection.
static void trunk_connection_enter_full(trunk_connection_t *tconn)
Transition a connection to the full state.
void trunk_request_free(trunk_request_t **treq_to_free)
If the trunk request is freed then update the target requests.
#define DO_REQUEST_DEMUX(_tconn)
Read one or more requests from a connection.
static uint64_t trunk_connection_requests_dequeue(fr_dlist_head_t *out, trunk_connection_t *tconn, int states, uint64_t max)
Shift requests in the specified states onto new connections.
static int _trunk_request_free(trunk_request_t *treq)
Actually free the trunk request.
char const * log_prefix
What to prepend to messages.
#define REQUEST_EXTRACT_PENDING(_treq)
Remove the current request from the pending list.
static void trunk_connection_event_update(trunk_connection_t *tconn)
Update the registrations for I/O events we're interested in.
static conf_parser_t const trunk_config_request[]
fr_dlist_head_t full
Connections which have too many outstanding requests.
#define DEQUEUE_ALL(_src_list, _state)
static void trunk_request_enter_backlog(trunk_request_t *treq, bool new)
Transition a request to the backlog state, adding it to the backlog of the trunk.
static fr_table_num_ordered_t const trunk_request_states[]
static void _trunk_connection_on_connecting(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connecting state.
static void _trunk_connection_lifetime_expire(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Trigger a reconnection of the trunk connection.
static fr_table_num_indexed_bit_pos_t const trunk_conn_trigger_names[]
Map connection states to trigger names.
fr_dlist_head_t draining_to_free
Connections that will be freed once all their requests are complete.
uint64_t id
Trunk request ID.
uint64_t sent_count
The number of requests that have been sent using this connection.
static void _trunk_connection_on_init(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the the init state.
#define DO_CONNECTION_NOTIFY(_tconn, _events)
Change what events the connection should be notified about.
#define TREQ_DLIST_VERIFY(_dlist, _state)
fr_dlist_head_t inactive
Connections which have been signalled to be inactive by the API client.
void trunk_connection_manage_stop(trunk_t *trunk)
Stop the trunk from opening and closing connections in response to load.
#define TREQ_HEAP_VERIFY(_heap, _state)
void trunk_connection_signal_active(trunk_connection_t *tconn)
Signal a trunk connection is no longer full.
fr_dlist_head_t log
State change log.
uint64_t tconn_id
If the treq was associated with a connection the connection ID.
fr_dlist_t entry
Used to track the connection in the connecting, full and failed lists.
static void trunk_request_enter_cancel_sent(trunk_request_t *treq)
Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list.
static void trunk_connection_enter_inactive(trunk_connection_t *tconn)
Transition a connection to the inactive state.
trunk_request_state_t from
What state we transitioned from.
fr_dlist_head_t cancel_sent
Sent cancellation request.
void trunk_connection_manage_start(trunk_t *trunk)
Allow the trunk to open and close connections in response to load.
fr_dlist_head_t inactive_draining
Connections which have been signalled to be inactive by the API client, which the trunk manager is dr...
void trunk_connection_signal_inactive(trunk_connection_t *tconn)
Signal a trunk connection cannot accept more requests.
static int _state_log_entry_free(trunk_request_state_log_t *slog)
Used for sanity checks to ensure all log entries have been freed.
void trunk_connection_verify(char const *file, int line, trunk_connection_t *tconn)
fr_heap_t * backlog
The request backlog.
#define IN_REQUEST_CANCEL_MUX(_trunk)
void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
Return a count of requests on a connection in a specific state.
void trunk_request_signal_cancel_partial(trunk_request_t *treq)
Signal a partial cancel write.
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
#define COUNT_BY_STATE(_state, _list)
void * uctx
Uctx data to pass to alloc.
#define TREQ_OPTION_VERIFY(_option, _state)
bool trunk_connection_search(trunk_connection_t *tconn, void *ptr)
#define CONN_BAD_STATE_TRANSITION(_new)
fr_heap_index_t heap_id
Used to track the request conn->pending heap.
#define REQUEST_STATE_TRANSITION(_new)
Record a request state transition and log appropriate output.
trunk_watch_entry_t * next_watcher
Watcher about to be run. Used to prevent nested watchers.
static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Remove requests in specified states from a connection, attempting to distribute them to new connectio...
bool sent
Trunk request has been sent at least once.
void trunk_request_signal_complete(trunk_request_t *treq)
Signal that a trunk request is complete.
static void trunk_connection_auto_unfull(trunk_connection_t *tconn)
Automatically mark a connection as active or reconnect it.
void trunk_connection_signal_reconnect(trunk_connection_t *tconn, connection_reason_t reason)
Signal a trunk connection is no longer viable.
void trunk_connection_signal_writable(trunk_connection_t *tconn)
Signal that a trunk connection is writable.
bool trunk_request_search(trunk_request_t *treq, void *ptr)
fr_dlist_t entry
List entry.
static conf_parser_t const trunk_config_connection[]
trunk_connection_state_t tconn_state
If the treq was associated with a connection the connection state at the time of the state transition...
bool bound_to_conn
Fail the request if there's an attempt to re-enqueue it.
static size_t trunk_cancellation_reasons_len
static fr_table_num_ordered_t const trunk_cancellation_reasons[]
static size_t trunk_conn_trigger_names_len
fr_event_list_t * el
Event list used by this trunk and the connection.
void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line, trunk_request_t const *treq)
#define IN_REQUEST_MUX(_trunk)
fr_dlist_head_t free_requests
Requests in the unassigned state.
bool trunk_connection_in_state(trunk_connection_t *tconn, int state)
Returns true if the trunk connection is in one of the specified states.
#define TREQ_HEAP_SEARCH(_heap)
#define REQUEST_EXTRACT_CANCEL_SENT(_treq)
Remove the current request from the cancel sent list.
fr_dlist_t entry
Entry in the linked list.
void trunk_request_signal_reapable(trunk_request_t *treq)
Signal that the request was written to a connection successfully, but no response is expected.
Associates request queues with a connection.
Trace state machine changes for a particular request.
Main trunk management handle.
An entry in a trunk watch function list.
uint16_t max
Maximum number of connections in the trunk.
uint32_t max_req_per_conn
Maximum requests per connection.
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
trunk_t *_CONST trunk
Trunk this request belongs to.
bool backlog_on_failed_conn
Assign requests to the backlog when there are no available connections and the last connection event ...
uint16_t min
Shouldn't let connections drop below this number.
#define TRUNK_REQUEST_STATE_ALL
All request states.
void *_CONST rctx
Resume ctx of the module.
trunk_t *_CONST trunk
Trunk this connection belongs to.
fr_heap_cmp_t connection_prioritise
Ordering function for connections.
trunk_connection_state_t
Used for sanity checks and to track which list the connection is in.
@ TRUNK_CONN_FULL
Connection is full and can't accept any more requests.
@ TRUNK_CONN_CONNECTING
Connection is connecting.
@ TRUNK_CONN_DRAINING
Connection will be closed once it has no more outstanding requests, if it's not reactivated.
@ TRUNK_CONN_INACTIVE_DRAINING
Connection is inactive, can't accept any more requests, and will be closed once it has no more outsta...
@ TRUNK_CONN_INACTIVE
Connection is inactive and can't accept any more requests.
@ TRUNK_CONN_HALTED
Halted, ready to be freed.
@ TRUNK_CONN_CLOSED
Connection was closed, either explicitly or due to failure.
@ TRUNK_CONN_INIT
In the initial state.
@ TRUNK_CONN_DRAINING_TO_FREE
Connection will be closed once it has no more outstanding requests.
@ TRUNK_CONN_ACTIVE
Connection is connected and ready to service requests.
unsigned req_pool_headers
How many chunk headers the talloc pool allocated with the treq should contain.
request_t *_CONST request
The request that we're writing the data on behalf of.
fr_time_t _CONST last_open
Last time the connection management function opened a connection.
fr_time_delta_t idle_timeout
how long a connection can remain idle for
trunk_connection_state_t _CONST state
What state the connection is in.
size_t req_pool_size
The size of the talloc pool allocated with the treq.
uint64_t max_uses
The maximum time a connection can be used.
fr_time_delta_t lifetime
Time between reconnects.
uint16_t connecting
Maximum number of connections that can be in the connecting state.
uint64_t _CONST req_alloc_reused
How many requests were reused.
uint32_t max_backlog
Maximum number of requests that can be in the backlog.
fr_time_t _CONST last_failed
Last time a connection failed.
trunk_request_state_t _CONST state
Which list the request is now located in.
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
trunk_connection_t *_CONST tconn
Connection this request belongs to.
trunk_connection_alloc_t connection_alloc
Allocate a new connection_t.
fr_time_t _CONST last_read_success
Last time we read a response.
fr_time_t _CONST last_below_target
Last time average utilisation went below the target value.
fr_time_t _CONST last_read_success
Last time we read from the connection.
fr_time_delta_t close_delay
How long we must be below target utilisation to close an existing connection.
uint16_t start
How many connections to start.
fr_time_delta_t req_cleanup_delay
How long must a request in the unassigned (free) list not have been used for before it's cleaned up a...
#define TRUNK_REQUEST_STATE_CANCEL_ALL
All requests in various cancellation states.
bool always_writable
Set to true if our ability to write requests to a connection handle is not dependent on the state of ...
trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
@ TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
@ TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
@ TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
@ TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
#define TRUNK_CONN_ALL
All connection states.
fr_heap_cmp_t request_prioritise
Ordering function for requests.
uint64_t _CONST req_alloc
The number of requests currently allocated that have not been freed or returned to the free list.
trunk_cancel_reason_t
Reasons for a request being cancelled.
@ TRUNK_CANCEL_REASON_NONE
Request has not been cancelled.
@ TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
@ TRUNK_CANCEL_REASON_REQUEUE
A previously sent request is being requeued.
@ TRUNK_CANCEL_REASON_MOVE
Request cancelled because it's being moved.
uint64_t _CONST req_alloc_new
How many requests we've allocated.
fr_time_delta_t open_delay
How long we must be above target utilisation to spawn a new connection.
connection_t *_CONST conn
The underlying connection.
@ TRUNK_STATE_PENDING
Trunk has connections, but none are active.
@ TRUNK_STATE_ACTIVE
Trunk has active connections.
@ TRUNK_STATE_IDLE
Trunk has no connections.
fr_time_t _CONST last_closed
Last time the connection management function closed a connection.
void(* trunk_watch_t)(trunk_t *trunk, trunk_state_t prev, trunk_state_t state, void *uctx)
Receive a notification when a trunk enters a particular state.
fr_time_delta_t manage_interval
How often we run the management algorithm to open/close connections.
@ TRUNK_ENQUEUE_DST_UNAVAILABLE
Destination is down.
@ TRUNK_ENQUEUE_FAIL
General failure.
@ TRUNK_ENQUEUE_OK
Operation was successful.
@ TRUNK_ENQUEUE_NO_CAPACITY
At maximum number of connections, and no connection has capacity.
@ TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
void *_CONST preq
Data for the muxer to write to the connection.
uint32_t target_req_per_conn
How many pending requests should ideally be running on each connection.
fr_time_t _CONST last_connected
Last time a connection connected.
trunk_request_cancel_mux_t request_cancel_mux
!< Read one or more requests from a connection.
trunk_request_state_t
Used for sanity checks and to simplify freeing.
@ TRUNK_REQUEST_STATE_PARTIAL
Some of the request was written to the socket, more of it should be written later.
@ TRUNK_REQUEST_STATE_REAPABLE
Request has been written, needs to persist, but we are not currently waiting for any response.
@ TRUNK_REQUEST_STATE_UNASSIGNED
Transition state - Request currently not assigned to any connection.
@ TRUNK_REQUEST_STATE_INIT
Initial state.
@ TRUNK_REQUEST_STATE_CANCEL_SENT
We've informed the remote server that the request has been cancelled.
@ TRUNK_REQUEST_STATE_COMPLETE
The request is complete.
@ TRUNK_REQUEST_STATE_FAILED
The request failed.
@ TRUNK_REQUEST_STATE_CANCEL
A request on a particular socket was cancel.
@ TRUNK_REQUEST_STATE_CANCEL_PARTIAL
We partially wrote a cancellation request.
@ TRUNK_REQUEST_STATE_BACKLOG
In the backlog.
@ TRUNK_REQUEST_STATE_CANCEL_COMPLETE
Remote server has acknowledged our cancellation.
@ TRUNK_REQUEST_STATE_PENDING
In the queue of a connection and is pending writing.
@ TRUNK_REQUEST_STATE_SENT
Was written to a socket. Waiting for a response.
trunk_state_t _CONST state
Current state of the trunk.
fr_time_t _CONST last_above_target
Last time average utilisation went above the target value.
Common configuration parameters for a trunk.
Public fields for the trunk connection.
I/O functions to pass to trunk_alloc.
Public fields for the trunk.
Public fields for the trunk request.
static fr_event_list_t * el
char const * fr_strerror(void)
Get the last library error.
#define fr_box_time_delta(_val)
static size_t char ** out