27#define LOG_PREFIX trunk->log_prefix
30# define TALLOC_GET_TYPE_ABORT_NOOP 1
36#define _TRUNK_PRIVATE 1
37#include <freeradius-devel/server/trunk.h>
39#include <freeradius-devel/server/trigger.h>
40#include <freeradius-devel/util/debug.h>
41#include <freeradius-devel/util/misc.h>
42#include <freeradius-devel/util/syserror.h>
43#include <freeradius-devel/util/minmax_heap.h>
45#ifdef HAVE_STDATOMIC_H
47# ifndef ATOMIC_VAR_INIT
48# define ATOMIC_VAR_INIT(_x) (_x)
51# include <freeradius-devel/util/stdatomic.h>
64#define fr_time test_time
71#define TRUNK_REQUEST_STATE_LOG_MAX 20
432#define CONN_TRIGGER(_state) do { \
433 if (trunk->pub.triggers) { \
434 trigger_exec(unlang_interpret_get_thread_default(), \
435 NULL, fr_table_str_by_value(trunk_conn_trigger_names, _state, \
436 "<INVALID>"), true, NULL); \
440#define CONN_STATE_TRANSITION(_new, _log) \
442 _log("[%" PRIu64 "] Trunk connection changed state %s -> %s", \
443 tconn->pub.conn->id, \
444 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
445 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>")); \
446 tconn->pub.state = _new; \
447 CONN_TRIGGER(_new); \
448 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false); \
451#define CONN_BAD_STATE_TRANSITION(_new) \
453 if (!fr_cond_assert_msg(0, "[%" PRIu64 "] Trunk connection invalid transition %s -> %s", \
454 tconn->pub.conn->id, \
455 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
456 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>"))) return; \
463#define REQUEST_TRIGGER(_state) do { \
464 if (trunk->pub.triggers) { \
465 trigger_exec(unlang_interpret_get_thread_default(), \
466 NULL, fr_table_str_by_value(trunk_req_trigger_names, _state, \
467 "<INVALID>"), true, NULL); \
474#define REQUEST_STATE_TRANSITION(_new) \
476 request_t *request = treq->pub.request; \
477 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
479 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
480 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
481 trunk_request_state_log_entry_add(__FUNCTION__, __LINE__, treq, _new); \
482 treq->pub.state = _new; \
483 REQUEST_TRIGGER(_new); \
485#define REQUEST_BAD_STATE_TRANSITION(_new) \
487 trunk_request_state_log(&default_log, L_ERR, __FILE__, __LINE__, treq); \
488 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
490 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
491 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
497#define REQUEST_STATE_TRANSITION(_new) \
499 request_t *request = treq->pub.request; \
500 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
502 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
503 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
504 treq->pub.state = _new; \
506#define REQUEST_BAD_STATE_TRANSITION(_new) \
508 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
510 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
511 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
519#define DO_REQUEST_CANCEL(_treq, _reason) \
521 if ((_treq)->pub.trunk->funcs.request_cancel) { \
522 request_t *request = (_treq)->pub.request; \
523 void *_prev = (_treq)->pub.trunk->in_handler; \
524 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_cancel; \
525 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_cancel(conn=%p, preq=%p, reason=%s, uctx=%p)", \
526 (_treq)->pub.tconn->pub.conn, \
528 fr_table_str_by_value(trunk_cancellation_reasons, \
531 (_treq)->pub.trunk->uctx); \
532 (_treq)->pub.trunk->funcs.request_cancel((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_reason), (_treq)->pub.trunk->uctx); \
533 (_treq)->pub.trunk->in_handler = _prev; \
540#define DO_REQUEST_CONN_RELEASE(_treq) \
542 if ((_treq)->pub.trunk->funcs.request_conn_release) { \
543 request_t *request = (_treq)->pub.request; \
544 void *_prev = (_treq)->pub.trunk->in_handler; \
545 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_conn_release; \
546 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_conn_release(conn=%p, preq=%p, uctx=%p)", \
547 (_treq)->pub.tconn->pub.conn, \
549 (_treq)->pub.trunk->uctx); \
550 (_treq)->pub.trunk->funcs.request_conn_release((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
551 (_treq)->pub.trunk->in_handler = _prev; \
558#define DO_REQUEST_COMPLETE(_treq) \
560 if ((_treq)->pub.trunk->funcs.request_complete) { \
561 request_t *request = (_treq)->pub.request; \
562 void *_prev = (_treq)->pub.trunk->in_handler; \
563 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_complete(request=%p, preq=%p, rctx=%p, uctx=%p)", \
564 (_treq)->pub.request, \
567 (_treq)->pub.trunk->uctx); \
568 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_complete; \
569 (_treq)->pub.trunk->funcs.request_complete((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, (_treq)->pub.trunk->uctx); \
570 (_treq)->pub.trunk->in_handler = _prev; \
577#define DO_REQUEST_FAIL(_treq, _prev_state) \
579 if ((_treq)->pub.trunk->funcs.request_fail) { \
580 request_t *request = (_treq)->pub.request; \
581 void *_prev = (_treq)->pub.trunk->in_handler; \
582 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_fail(request=%p, preq=%p, rctx=%p, state=%s uctx=%p)", \
583 (_treq)->pub.request, \
586 fr_table_str_by_value(trunk_request_states, (_prev_state), "<INVALID>"), \
587 (_treq)->pub.trunk->uctx); \
588 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_fail; \
589 (_treq)->pub.trunk->funcs.request_fail((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, _prev_state, (_treq)->pub.trunk->uctx); \
590 (_treq)->pub.trunk->in_handler = _prev; \
597#define DO_REQUEST_FREE(_treq) \
599 if ((_treq)->pub.trunk->funcs.request_free) { \
600 request_t *request = (_treq)->pub.request; \
601 void *_prev = (_treq)->pub.trunk->in_handler; \
602 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_free(request=%p, preq=%p, uctx=%p)", \
603 (_treq)->pub.request, \
605 (_treq)->pub.trunk->uctx); \
606 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_free; \
607 (_treq)->pub.trunk->funcs.request_free((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
608 (_treq)->pub.trunk->in_handler = _prev; \
615#define DO_REQUEST_MUX(_tconn) \
617 void *_prev = (_tconn)->pub.trunk->in_handler; \
618 DEBUG3("[%" PRIu64 "] Calling request_mux(el=%p, tconn=%p, conn=%p, uctx=%p)", \
619 (_tconn)->pub.conn->id, \
620 (_tconn)->pub.trunk->el, \
622 (_tconn)->pub.conn, \
623 (_tconn)->pub.trunk->uctx); \
624 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_mux; \
625 (_tconn)->pub.trunk->funcs.request_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
626 (_tconn)->pub.trunk->in_handler = _prev; \
632#define DO_REQUEST_DEMUX(_tconn) \
634 void *_prev = (_tconn)->pub.trunk->in_handler; \
635 DEBUG3("[%" PRIu64 "] Calling request_demux(tconn=%p, conn=%p, uctx=%p)", \
636 (_tconn)->pub.conn->id, \
638 (_tconn)->pub.conn, \
639 (_tconn)->pub.trunk->uctx); \
640 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_demux; \
641 (_tconn)->pub.trunk->funcs.request_demux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
642 (_tconn)->pub.trunk->in_handler = _prev; \
648#define DO_REQUEST_CANCEL_MUX(_tconn) \
650 if ((_tconn)->pub.trunk->funcs.request_cancel_mux) { \
651 void *_prev = (_tconn)->pub.trunk->in_handler; \
652 DEBUG3("[%" PRIu64 "] Calling request_cancel_mux(tconn=%p, conn=%p, uctx=%p)", \
653 (_tconn)->pub.conn->id, \
655 (_tconn)->pub.conn, \
656 (_tconn)->pub.trunk->uctx); \
657 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_cancel_mux; \
658 (_tconn)->pub.trunk->funcs.request_cancel_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
659 (_tconn)->pub.trunk->in_handler = _prev; \
666#define DO_CONNECTION_ALLOC(_tconn) \
668 void *_prev = trunk->in_handler; \
669 DEBUG3("Calling connection_alloc(tconn=%p, el=%p, conf=%p, log_prefix=\"%s\", uctx=%p)", \
671 (_tconn)->pub.trunk->el, \
672 (_tconn)->pub.trunk->conf.conn_conf, \
674 (_tconn)->pub.trunk->uctx); \
675 (_tconn)->pub.trunk->in_handler = (void *) (_tconn)->pub.trunk->funcs.connection_alloc; \
676 (_tconn)->pub.conn = trunk->funcs.connection_alloc((_tconn), (_tconn)->pub.trunk->el, (_tconn)->pub.trunk->conf.conn_conf, (_tconn)->pub.trunk->log_prefix, trunk->uctx); \
677 (_tconn)->pub.trunk->in_handler = _prev; \
678 if (!(_tconn)->pub.conn) { \
679 ERROR("Failed creating new connection"); \
680 talloc_free(tconn); \
688#define DO_CONNECTION_NOTIFY(_tconn, _events) \
690 if ((_tconn)->pub.trunk->funcs.connection_notify) { \
691 void *_prev = (_tconn)->pub.trunk->in_handler; \
692 DEBUG3("[%" PRIu64 "] Calling connection_notify(tconn=%p, conn=%p, el=%p, events=%s, uctx=%p)", \
693 (_tconn)->pub.conn->id, \
695 (_tconn)->pub.conn, \
696 (_tconn)->pub.trunk->el, \
697 fr_table_str_by_value(trunk_connection_events, (_events), "<INVALID>"), \
698 (_tconn)->pub.trunk->uctx); \
699 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.connection_notify; \
700 (_tconn)->pub.trunk->funcs.connection_notify((_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->el, (_events), (_tconn)->pub.trunk->uctx); \
701 (_tconn)->pub.trunk->in_handler = _prev; \
705#define IN_HANDLER(_trunk) (((_trunk)->in_handler) != NULL)
706#define IN_REQUEST_MUX(_trunk) (((_trunk)->funcs.request_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_mux))
707#define IN_REQUEST_DEMUX(_trunk) (((_trunk)->funcs.request_demux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_demux))
708#define IN_REQUEST_CANCEL_MUX(_trunk) (((_trunk)->funcs.request_cancel_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_cancel_mux))
710#define IS_SERVICEABLE(_tconn) ((_tconn)->pub.state & TRUNK_CONN_SERVICEABLE)
711#define IS_PROCESSING(_tconn) ((tconn)->pub.state & TRUNK_CONN_PROCESSING)
716#define REQUEST_EXTRACT_BACKLOG(_treq) \
719 _ret = fr_heap_extract(&(_treq)->pub.trunk->backlog, _treq); \
720 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from backlog heap: %s", fr_strerror())) break; \
726#define REQUEST_EXTRACT_PENDING(_treq) \
729 _ret = fr_heap_extract(&(_treq)->pub.tconn->pending, _treq); \
730 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from pending heap: %s", fr_strerror())) break; \
736#define REQUEST_EXTRACT_PARTIAL(_treq) \
738 fr_assert((_treq)->pub.tconn->partial == treq); \
739 tconn->partial = NULL; \
745#define REQUEST_EXTRACT_SENT(_treq) fr_dlist_remove(&tconn->sent, treq)
750#define REQUEST_EXTRACT_REAPABLE(_treq) fr_dlist_remove(&tconn->reapable, treq)
755#define REQUEST_EXTRACT_CANCEL(_treq) fr_dlist_remove(&tconn->cancel, treq)
760#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq) \
762 fr_assert((_treq)->pub.tconn->cancel_partial == treq); \
763 tconn->cancel_partial = NULL; \
769#define REQUEST_EXTRACT_CANCEL_SENT(_treq) fr_dlist_remove(&tconn->cancel_sent, treq)
775#define CONN_REORDER(_tconn) \
778 if ((fr_minmax_heap_num_elements((_tconn)->pub.trunk->active) == 1)) break; \
779 if (!fr_cond_assert((_tconn)->pub.state == TRUNK_CONN_ACTIVE)) break; \
780 _ret = fr_minmax_heap_extract((_tconn)->pub.trunk->active, (_tconn)); \
781 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) break; \
782 fr_minmax_heap_insert((_tconn)->pub.trunk->active, (_tconn)); \
814#define CALL_WATCHERS(_trunk, _state) \
816 if (fr_dlist_empty(&(_trunk)->watch[_state])) break; \
817 trunk_watch_call((_trunk), &(_trunk)->watch[_state], _state); \
839 if (entry->
func == watch) {
878 memcpy(&entry->
uctx, &uctx,
sizeof(entry->
uctx));
884#define TRUNK_STATE_TRANSITION(_new) \
886 DEBUG3("Trunk changed state %s -> %s", \
887 fr_table_str_by_value(trunk_states, trunk->pub.state, "<INVALID>"), \
888 fr_table_str_by_value(trunk_states, _new, "<INVALID>")); \
889 CALL_WATCHERS(trunk, _new); \
890 trunk->pub.state = _new; \
1628 RWARN,
WARN,
"Refusing to enqueue requests - "
1629 "No active connections and last event was a connection failure");
1649 uint64_t total_reqs;
1655 RWARN,
WARN,
"Refusing to alloc requests - "
1656 "Limit of %"PRIu64
" (max = %u * per_connection_max = %u) "
1657 "plus %u backlog requests reached",
1737 int states, uint64_t max)
1742 if (max == 0) max = UINT64_MAX;
1744#define OVER_MAX_CHECK if (++count > max) return (count - 1)
1746#define DEQUEUE_ALL(_src_list, _state) do { \
1747 while ((treq = fr_dlist_head(_src_list))) { \
1749 fr_assert(treq->pub.state == (_state)); \
1750 trunk_request_enter_unassigned(treq); \
1751 fr_dlist_insert_tail(out, treq); \
1854 if (max == 0) max = UINT64_MAX;
1887 "Failed extracting conn from active heap: %s",
fr_strerror()))
goto done;
1951 "Failed re-inserting conn into active heap: %s",
fr_strerror()))
goto done;
1953 if (moved >= max)
goto done;
2033 "%s can only be called from within request_mux handler", __FUNCTION__))
return;
2054 "%s can only be called from within request_mux handler", __FUNCTION__))
return;
2076 "%s can only be called from within request_mux handler", __FUNCTION__))
return;
2163 "%s cannot be called within a handler", __FUNCTION__))
return;
2183 "Bad state %s after cancellation",
2240 "%s can only be called from within request_cancel_mux handler", __FUNCTION__))
return;
2264 "%s can only be called from within request_cancel_mux handler", __FUNCTION__))
return;
2288 "%s can only be called from within request_demux or request_cancel_mux handlers",
2289 __FUNCTION__))
return;
2347 *treq_to_free = NULL;
2382 talloc_free_children(treq);
2401 talloc_free_children(treq);
2487 RWARN,
WARN,
"Refusing to alloc requests - "
2488 "Limit of %"PRIu64
" (max = %u * per_connection_max = %u) "
2489 "plus %u backlog requests reached",
2587 request_t *request,
void *preq,
void *rctx)
2693 if (treq->
pub.
trunk->conf.always_writable) {
2741 request_t *request,
void *preq,
void *rctx,
2755 if (!ignore_limits) {
2803 fr_assert_msg(slog,
"slog list head NULL but element counter was %u",
2806 memset(slog, 0,
sizeof(*slog));
2837 fr_log(log, log_type,
file,
line,
"[%u] %s:%i - in conn %"PRIu64
" in state %s - %s -> %s",
3488 PERROR(
"Failed inserting connection reconnection timer event, halting connection");
3515 bool need_requeue =
false;
3524 need_requeue =
true;
3839 "%s can only be called from within request_cancel_mux handler",
3840 __FUNCTION__))
return -2;
3843 if (!*treq_out)
return 1;
3887 "%s can only be called from within request_mux handler",
3888 __FUNCTION__))
return -2;
3891 if (!*treq_out)
return 1;
3907 "%s cannot be called within a handler", __FUNCTION__))
return;
3909 DEBUG3(
"[%" PRIu64
"] Signalled writable", tconn->
pub.
conn->id);
3925 "%s cannot be called within a handler", __FUNCTION__))
return;
3927 DEBUG3(
"[%" PRIu64
"] Signalled readable", tconn->
pub.
conn->id);
4067 DEBUG3(
"Closing %s connection with no requests",
4110 DEBUG3(
"Rebalancing requests");
4171 DEBUG4(
"Managing trunk");
4271 DEBUG4(
"Not opening connection - Too many (%u) connections in the connecting state",
4283 DEBUG4(
"Not opening connection - Need to be above target for %pVs. It's been %pVs",
4298 DEBUG4(
"Not opening connection - Have %u connections, need %u or below",
4309 DEBUG4(
"Not opening connection - No outstanding requests");
4317 if (conn_count > 0) {
4319 if (average < trunk->
conf.target_req_per_conn) {
4320 DEBUG4(
"Not opening connection - Would leave us below our target requests "
4321 "per connection (now %u, after open %u)",
4350 DEBUG4(
"Not opening connection - Need to wait %pVs before opening another connection. "
4357 DEBUG4(
"Opening connection - Above target requests per connection (now %u, target %u)",
4369 DEBUG4(
"Not closing connection - Need to be below target for %pVs. It's been %pVs",
4378 DEBUG4(
"Not closing connection - No connections to close!");
4382 if ((trunk->
conf.
min > 0) && ((conn_count - 1) < trunk->
conf.
min)) {
4383 DEBUG4(
"Not closing connection - Have %u connections, need %u or above",
4389 DEBUG4(
"Closing connection - No outstanding requests");
4401 if (conn_count == 1) {
4402 DEBUG4(
"Not closing connection - Would leave connections "
4403 "and there are still %u outstanding requests", req_count);
4413 DEBUG4(
"Not closing connection - Would leave us above our target requests per connection "
4414 "(now %u, after close %u)",
ROUND_UP_DIV(req_count, conn_count), average);
4418 DEBUG4(
"Closing connection - Below target requests per connection (now %u, target %u)",
4423 DEBUG4(
"Not closing connection - Need to wait %pVs before closing another connection. "
4439 DEBUG4(
"Not closing remaining connection - last event was a failure");
4508 PERROR(
"Failed inserting trunk management event");
4527#define COUNT_BY_STATE(_state, _list) \
4529 if (conn_state & (_state)) { \
4531 while ((tconn = fr_dlist_next(&trunk->_list, tconn))) { \
4532 count += trunk_request_count_by_connection(tconn, req_state); \
4576 uint64_t req_per_conn = 0;
4619 if (conn_count == 0) {
4624 if (req_count == 0) {
4641 }
else if (req_per_conn < trunk->
conf.target_req_per_conn) {
4652 if (conn_count_out) *conn_count_out = conn_count;
4653 if (req_count_out) *req_count_out = req_count;
4662 return req_per_conn;
4727#define RECONNECT_BY_STATE(_state, _list) \
4729 if (states & (_state)) { \
4731 for (i = fr_dlist_num_elements(&trunk->_list); i > 0; i--) { \
4732 connection_signal_reconnect(((trunk_connection_t *)fr_dlist_tail(&trunk->_list))->pub.conn, reason); \
4772 for (i = 0; i < trunk->
conf.
start; i++) {
4773 DEBUG(
"[%i] Starting initial connection", i);
4794 PERROR(
"Failed inserting trunk management event");
4811 DEBUG3(
"Connection management enabled");
4822 DEBUG3(
"Connection management disabled");
4834 PERROR(
"Failed inserting trunk management event");
4855 return ((a_count > b_count) && ((a_count - b_count) > 1)) - ((b_count > a_count) && ((b_count - a_count) > 1));
4868 DEBUG4(
"Trunk free %p", trunk);
4947 char const *log_prefix,
void const *uctx,
bool delay_start)
4959 trunk->
log_prefix = talloc_strdup(trunk, log_prefix);
4961 memcpy(&trunk->
funcs, funcs,
sizeof(trunk->
funcs));
4969 memcpy(&trunk->
uctx, &uctx,
sizeof(trunk->
uctx));
5005 DEBUG4(
"Trunk allocated %p", trunk);
5017#ifndef TALLOC_GET_TYPE_ABORT_NOOP
5029 (void) talloc_get_type_abort(trunk,
trunk_t);
5035#define IO_FUNC_VERIFY(_func) \
5036 fr_fatal_assert_msg(trunk->funcs._func, "CONSISTENCY_CHECK_FAILED %s[%i}: " #_func " was NULL", file, line)
5045#define TRUNK_TCONN_CHECKS(_tconn, _state) \
5047 fr_fatal_assert_msg(trunk == _tconn->pub.trunk, \
5048 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-trunk mismatch", file, line); \
5049 fr_fatal_assert_msg(_state == _tconn->pub.state, \
5050 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-state mismatch", file, line); \
5053#define TCONN_DLIST_VERIFY(_dlist, _state) \
5055 _fr_dlist_verify(file, line, &(trunk->_dlist)); \
5056 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5057 trunk_connection_verify(file, line, tconn); \
5058 TRUNK_TCONN_CHECKS(tconn, _state); \
5062#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state) \
5064 fr_minmax_heap_verify(file, line, trunk->_heap); \
5065 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5066 trunk_connection_verify(file, line, tconn); \
5067 TRUNK_TCONN_CHECKS(tconn, _state); \
5099#define TCONN_TREQ_CHECKS(_treq, _state) \
5101 fr_fatal_assert_msg(tconn == _treq->pub.tconn, \
5102 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-tconn mismatch", file, line); \
5103 fr_fatal_assert_msg(tconn->pub.trunk == _treq->pub.trunk, \
5104 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-trunk mismatch", file, line); \
5105 fr_fatal_assert_msg(_state == _treq->pub.state, \
5106 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-state mismatch", file, line); \
5109#define TREQ_DLIST_VERIFY(_dlist, _state) \
5111 _fr_dlist_verify(file, line, &(tconn->_dlist)); \
5112 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5113 trunk_request_verify(file, line, treq); \
5114 TCONN_TREQ_CHECKS(treq, _state); \
5118#define TREQ_HEAP_VERIFY(_heap, _state) \
5120 fr_heap_iter_t _iter; \
5121 fr_heap_verify(file, line, tconn->_heap); \
5122 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5124 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5125 trunk_request_verify(file, line, treq); \
5126 TCONN_TREQ_CHECKS(treq, _state); \
5130#define TREQ_OPTION_VERIFY(_option, _state) \
5132 if (tconn->_option) { \
5133 trunk_request_verify(file, line, tconn->_option); \
5134 TCONN_TREQ_CHECKS(tconn->_option, _state); \
5152#ifdef WITH_VERIFY_PTR
5160#define TCONN_DLIST_SEARCH(_dlist) \
5162 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5163 if (ptr == tconn) { \
5164 fr_fprintf(stderr, "trunk_search: tconn %p on " #_dlist "\n", ptr); \
5167 if (trunk_connection_search(tconn, ptr)) { \
5168 fr_fprintf(stderr, " in tconn %p on " #_dlist "\n", tconn); \
5174#define TCONN_MINMAX_HEAP_SEARCH(_heap) \
5176 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5177 if (ptr == tconn) { \
5178 fr_fprintf(stderr, "trunk_search: tconn %p on " #_heap "\n", ptr); \
5181 if (trunk_connection_search(tconn, ptr)) { \
5182 fr_fprintf(stderr, " on tconn %p on " #_heap "\n", tconn); \
5205#define TREQ_DLIST_SEARCH(_dlist) \
5207 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5208 if (ptr == treq) { \
5209 fr_fprintf(stderr, "trunk_search: treq %p on " #_dlist "\n", ptr); \
5212 if (trunk_request_search(treq, ptr)) { \
5213 fr_fprintf(stderr, "trunk_search: preq %p found on " #_dlist, ptr); \
5219#define TREQ_HEAP_SEARCH(_heap) \
5221 fr_heap_iter_t _iter; \
5222 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5224 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5225 if (ptr == treq) { \
5226 fr_fprintf(stderr, "trunk_search: treq %p in " #_heap "\n", ptr); \
5229 if (trunk_request_search(treq, ptr)) { \
5230 fr_fprintf(stderr, "trunk_search: preq %p found in " #_heap, ptr); \
5236#define TREQ_OPTION_SEARCH(_option) \
5238 if (tconn->_option) { \
5239 if (ptr == tconn->_option) { \
5240 fr_fprintf(stderr, "trunk_search: treq %p is " #_option "\n", ptr); \
5243 if (trunk_request_search(tconn->_option, ptr)) { \
5244 fr_fprintf(stderr, "trunk_search: preq %p found in " #_option, ptr); \
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request)
#define L(_str)
Helper for initialising arrays of string literals.
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
#define CONF_PARSER_TERMINATOR
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
#define FR_CONF_OFFSET_SUBSECTION(_name, _flags, _struct, _field, _subcs)
conf_parser_t which populates a sub-struct using a CONF_SECTION
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Defines a CONF_PAIR to C data type mapping.
@ CONNECTION_STATE_FAILED
Connection has failed.
@ CONNECTION_STATE_HALTED
The connection is in a halted stat.
@ CONNECTION_STATE_CLOSED
Connection has been closed.
@ CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
@ CONNECTION_STATE_INIT
Init state, sets up connection.
@ CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
@ CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
static size_t min(size_t x, size_t y)
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
static void _fr_dlist_verify(char const *file, int line, fr_dlist_head_t const *list_head)
Check all items in the list are valid.
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
static void * fr_dlist_prev(fr_dlist_head_t const *list_head, void const *ptr)
Get the previous item in a list.
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
static void * fr_dlist_tail(fr_dlist_head_t const *list_head)
Return the TAIL item of a list or NULL if the list is empty.
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
#define fr_dlist_verify(_head)
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Head of a doubly linked list.
Entry in a doubly linked list.
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
unsigned int fr_heap_index_t
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
#define FR_HEAP_VERIFY(_heap)
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
#define RATE_LIMIT_LOCAL_ROPTIONAL(_entry, _l_request, _l_global, _fmt,...)
Rate limit messages using a local limiting entry.
Track when a log message was last repeated.
Stores all information relating to an event list.
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
#define ROUND_UP_DIV(_x, _y)
Get the ceiling value of integer division.
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
void * fr_minmax_heap_iter_next(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Get the next entry in a minmax heap.
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
void * fr_minmax_heap_iter_init(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Iterate over entries in a minmax heap.
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
unsigned int fr_minmax_heap_iter_t
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
int8_t fr_pointer_cmp(void const *a, void const *b)
Compares two pointers.
static int8_t request_prioritise(void const *one, void const *two)
static fr_event_list_t * events
void connection_signal_shutdown(connection_t *conn)
Shuts down a connection gracefully.
int connection_del_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a post list.
void connection_signal_halt(connection_t *conn)
Shuts down a connection ungracefully.
void connection_signals_resume(connection_t *conn)
Resume processing of deferred signals.
void connection_signal_reconnect(connection_t *conn, connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
void connection_signal_init(connection_t *conn)
Asynchronously signal a halted connection to start.
connection_t * connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_funcs_t const *funcs, connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
connection_watch_entry_t * connection_add_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed before a state function has been called.
connection_watch_entry_t * connection_add_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed after a state function has been called.
int connection_del_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a pre list.
void connection_signals_pause(connection_t *conn)
Pause processing of deferred signals.
static fr_time_t test_time_base
#define fr_time()
Allow us to arbitrarily manipulate time.
#define atomic_fetch_add_explicit(object, operand, order)
#define ATOMIC_VAR_INIT(value)
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
An element in a table indexed by bit position.
An element in an arbitrarily ordered array of name to num mappings.
#define talloc_get_type_abort_const
#define talloc_pooled_object(_ctx, _type, _num_subobjects, _total_subobjects_size)
#define fr_time_gteq(_a, _b)
#define fr_time_delta_wrap(_time)
#define fr_time_wrap(_time)
#define fr_time_lteq(_a, _b)
#define fr_time_delta_ispos(_a)
#define fr_time_add(_a, _b)
Add a time/time delta together.
#define fr_time_gt(_a, _b)
#define fr_time_sub(_a, _b)
Subtract one time from another.
#define fr_time_lt(_a, _b)
#define fr_time_delta_gt(_a, _b)
#define FR_TIMER_DELETE(_ev_p)
#define FR_TIMER_DELETE_RETURN(_ev_p)
#define FR_TIMER_DISARM(_ev)
bool trunk_search(trunk_t *trunk, void *ptr)
static atomic_uint_fast64_t request_counter
static void trunk_connection_enter_active(trunk_connection_t *tconn)
Transition a connection back to the active state.
#define CONN_REORDER(_tconn)
Reorder the connections in the active heap.
static size_t trunk_req_trigger_names_len
int trunk_connection_pop_cancellation(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a cancellation request off a connection's cancellation queue.
fr_dlist_head_t cancel
Requests in the cancel state.
int trunk_connection_manage_schedule(trunk_t *trunk)
Schedule a trunk management event for the next time the event loop is executed.
#define REQUEST_EXTRACT_SENT(_treq)
Remove the current request from the sent list.
static void _trunk_connection_on_shutdown(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the shutdown state.
struct trunk_watch_entry_s trunk_watch_entry_t
An entry in a trunk watch function list.
fr_dlist_head_t reapable
Idle request.
fr_heap_t * pending
Requests waiting to be sent.
trunk_conf_t conf
Trunk common configuration.
static size_t trunk_connection_states_len
#define REQUEST_EXTRACT_REAPABLE(_treq)
Remove the current request from the reapable list.
trunk_connection_t * tconn
The request was associated with.
void trunk_connection_callback_readable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O read function.
fr_rate_limit_t limit_last_failure_log
Rate limit on "Refusing to enqueue requests - No active conns".
void trunk_verify(char const *file, int line, trunk_t *trunk)
Verify a trunk.
fr_timer_t * manage_ev
Periodic connection management event.
#define IN_HANDLER(_trunk)
static fr_table_num_ordered_t const trunk_connection_states[]
void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
Force the trunk to re-establish its connections.
void trunk_connection_callback_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O write function.
void * uctx
User data to pass to the function.
static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new)
Transition a request to the pending state, adding it to the backlog of an active connection.
static void trunk_request_remove_from_conn(trunk_request_t *treq)
Remove a request from all connection lists.
fr_rate_limit_t limit_max_requests_alloc_log
Rate limit on "Refusing to alloc requests - Limit of * requests reached".
trunk_request_state_t to
What state we transitioned to.
static int8_t _trunk_request_prioritise(void const *a, void const *b)
Compare two protocol requests.
static void trunk_manage(trunk_t *trunk, fr_time_t now)
Implements the algorithm we use to manage requests per connection levels.
static int _trunk_connection_free(trunk_connection_t *tconn)
Free a connection.
trunk_io_funcs_t funcs
I/O functions.
fr_dlist_head_t draining
Connections that will be freed once all their requests are complete, but can be reactivated.
#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq)
Remove the current request from the cancel_partial slot.
int trunk_start(trunk_t *trunk)
Start the trunk running.
void trunk_request_signal_partial(trunk_request_t *treq)
Signal a partial write.
void trunk_request_signal_fail(trunk_request_t *treq)
Signal that a trunk request failed.
#define TREQ_OPTION_SEARCH(_option)
void trunk_request_signal_cancel_sent(trunk_request_t *treq)
Signal that a remote server has been notified of the cancellation.
static void trunk_connection_enter_draining_to_free(trunk_connection_t *tconn)
Transition a connection to the draining-to-reconnect state.
trunk_watch_t func
Function to call when a trunk enters.
void trunk_connection_signal_readable(trunk_connection_t *tconn)
Signal that a trunk connection is readable.
#define DO_REQUEST_FREE(_treq)
Call the free callback (if set)
trunk_request_t * trunk_request_alloc(trunk_t *trunk, request_t *request)
(Pre-)Allocate a new trunk request
static void _trunk_connection_on_halted(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the halted state.
#define REQUEST_EXTRACT_BACKLOG(_treq)
Remove the current request from the backlog.
fr_heap_index_t heap_id
Used to track the connection in the connected heap.
fr_dlist_head_t closed
Connections that have closed.
fr_dlist_head_t watch[TRUNK_STATE_MAX]
To be called when trunk changes state.
static void trunk_watch_call(trunk_t *trunk, fr_dlist_head_t *list, trunk_state_t state)
Call a list of watch functions associated with a state.
static void trunk_request_enter_cancel_complete(trunk_request_t *treq)
Cancellation was acked, the request is complete, free it.
int line
Line change occurred on.
static void trunk_connection_enter_inactive_draining(trunk_connection_t *tconn)
Transition a connection to the inactive-draining state.
#define CONN_STATE_TRANSITION(_new, _log)
static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out, trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify)
Update timestamps for when we last had a transition from above target to below target or vice versa.
static size_t trunk_connection_events_len
static void _trunk_connection_on_failed(connection_t *conn, connection_state_t prev, connection_state_t state, void *uctx)
Connection failed.
bool oneshot
Remove the function after it's called once.
bool started
Has the trunk been started.
static size_t trunk_states_len
#define TCONN_DLIST_VERIFY(_dlist, _state)
#define IO_FUNC_VERIFY(_func)
uint32_t trunk_request_count_by_connection(trunk_connection_t const *tconn, int req_state)
Return the count number of requests associated with a trunk connection.
uint64_t last_req_per_conn
The last request to connection ratio we calculated.
#define DO_REQUEST_COMPLETE(_treq)
Call the complete callback (if set)
static void trunk_connection_auto_full(trunk_connection_t *tconn)
Automatically mark a connection as inactive.
static void trunk_connection_remove(trunk_connection_t *tconn)
Remove a trunk connection from whichever list it's currently in.
#define TRUNK_REQUEST_STATE_LOG_MAX
The maximum number of state logs to record per request.
static void trunk_connection_writable(trunk_connection_t *tconn)
A connection is writable.
trunk_connection_event_t events
The current events we expect to be notified on.
trunk_watch_entry_t * trunk_add_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch, bool oneshot, void const *uctx)
Add a watch entry to the trunk state list.
static int _trunk_free(trunk_t *trunk)
Free a trunk, gracefully closing all connections.
fr_dlist_head_t failed
Connections that'll be reconnected shortly.
static void trunk_rebalance(trunk_t *trunk)
Rebalance connections across active trunk members when a new connection becomes active.
static void trunk_backlog_drain(trunk_t *trunk)
Drain the backlog of as many requests as possible.
#define DO_REQUEST_CANCEL(_treq, _reason)
Call the cancel callback if set.
static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
Order connections by queue depth.
struct trunk_request_pub_s pub
Public fields in the trunk request.
trunk_t * trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, trunk_io_funcs_t const *funcs, trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start)
Allocate a new collection of connections.
#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state)
trunk_request_t * cancel_partial
Partially written cancellation request.
#define TCONN_MINMAX_HEAP_SEARCH(_heap)
uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Move requests off of a connection and requeue elsewhere.
bool enabled
Whether the watch entry is enabled.
fr_time_t last_freed
Last time this request was freed.
#define DO_REQUEST_CONN_RELEASE(_treq)
Call the "conn_release" callback (if set)
#define TREQ_DLIST_SEARCH(_dlist)
#define REQUEST_EXTRACT_CANCEL(_treq)
Remove the current request from the cancel list.
static bool trunk_connection_is_full(trunk_connection_t *tconn)
Return whether a trunk connection should currently be considered full.
struct trunk_pub_s pub
Public fields in the trunk connection.
trunk_cancel_reason_t cancel_reason
Why this request was cancelled.
#define REQUEST_BAD_STATE_TRANSITION(_new)
trunk_enqueue_t trunk_request_enqueue_on_conn(trunk_request_t **treq_out, trunk_connection_t *tconn, request_t *request, void *preq, void *rctx, bool ignore_limits)
Enqueue additional requests on a specific connection.
static void _trunk_connection_on_closed(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection failed after it was connected.
static fr_table_num_ordered_t const trunk_connection_events[]
trunk_enqueue_t trunk_request_enqueue(trunk_request_t **treq_out, trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
#define TCONN_DLIST_SEARCH(_dlist)
static void trunk_request_enter_unassigned(trunk_request_t *treq)
Transition a request to the unassigned state, in preparation for re-assignment.
struct trunk_request_s trunk_request_t
void * in_handler
Which handler we're inside.
bool freeing
Trunk is being freed, don't spawn new connections or re-enqueue.
static fr_table_num_ordered_t const trunk_states[]
static void trunk_connection_readable(trunk_connection_t *tconn)
A connection is readable.
#define IS_SERVICEABLE(_tconn)
trunk_enqueue_t trunk_request_requeue(trunk_request_t *treq)
Re-enqueue a request on the same connection.
#define IS_PROCESSING(_tconn)
#define RECONNECT_BY_STATE(_state, _list)
static void trunk_connection_enter_draining(trunk_connection_t *tconn)
Transition a connection to the draining state.
static fr_table_num_indexed_bit_pos_t const trunk_req_trigger_names[]
Map request states to trigger names.
fr_dlist_t entry
Used to track the trunk request in the conn->sent or trunk->backlog request.
static void trunk_connection_close_if_empty(trunk_t *trunk, fr_dlist_head_t *head)
Close connections in a particular connection list if they have no requests associated with them.
void trunk_request_signal_cancel_complete(trunk_request_t *treq)
Signal that a remote server acked our cancellation.
static trunk_enqueue_t trunk_request_check_enqueue(trunk_connection_t **tconn_out, trunk_t *trunk, request_t *request)
Check to see if a trunk request can be enqueued.
#define DO_REQUEST_MUX(_tconn)
Write one or more requests to a connection.
#define REQUEST_EXTRACT_PARTIAL(_treq)
Remove the current request from the partial slot.
fr_dlist_head_t sent
Sent request.
static void trunk_request_enter_partial(trunk_request_t *treq)
Transition a request to the partial state, indicating that is has been partially sent.
fr_timer_t * lifetime_ev
Maximum time this connection can be open.
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
fr_dlist_head_t connecting
Connections which are not yet in the open state.
#define TRUNK_STATE_TRANSITION(_new)
void trunk_request_signal_cancel(trunk_request_t *treq)
Cancel a trunk request.
void trunk_request_state_log_entry_add(char const *function, int line, trunk_request_t *treq, trunk_request_state_t new)
static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now)
Attempt to spawn a new connection.
int trunk_del_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch)
Remove a watch function from a trunk state list.
static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx)
Event to periodically call the connection management function.
struct trunk_connection_pub_s pub
Public fields in the trunk connection.
static void trunk_request_enter_reapable(trunk_request_t *treq)
Transition a request to the reapable state, indicating that it's been sent in its entirety,...
uint16_t trunk_connection_count_by_state(trunk_t *trunk, int conn_state)
Return the count number of connections in the specified states.
#define IN_REQUEST_DEMUX(_trunk)
#define DO_REQUEST_FAIL(_treq, _prev_state)
Call the fail callback (if set)
static void trunk_request_enter_cancel(trunk_request_t *treq, trunk_cancel_reason_t reason)
Transition a request to the cancel state, placing it in a connection's cancellation list.
static trunk_enqueue_t trunk_request_enqueue_existing(trunk_request_t *treq)
Enqueue a request which has never been assigned to a connection or was previously cancelled.
bool managing_connections
Whether the trunk is allowed to manage (open/close) connections.
#define DO_CONNECTION_ALLOC(_tconn)
Allocate a new connection.
char const * function
State change occurred in.
static size_t trunk_request_states_len
fr_dlist_head_t init
Connections which have not yet started connecting.
fr_dlist_head_t * log_head
To allow the log entry to remove itself on free.
static void trunk_request_enter_cancel_partial(trunk_request_t *treq)
Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot.
static void _trunk_connection_on_connected(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connected state.
fr_dlist_head_t to_free
Connections we're done with and will free on the next call to trunk_manage.
trunk_request_t * partial
Partially written request.
static void trunk_request_enter_failed(trunk_request_t *treq)
Request failed, inform the API client and free the request.
fr_minmax_heap_t * active
Connections which can service requests.
conf_parser_t const trunk_config[]
Config parser definitions to populate a trunk_conf_t.
static void trunk_request_enter_complete(trunk_request_t *treq)
Request completed successfully, inform the API client and free the request.
static void trunk_request_enter_sent(trunk_request_t *treq)
Transition a request to the sent state, indicating that it's been sent in its entirety.
#define DO_REQUEST_CANCEL_MUX(_tconn)
Write one or more cancellation requests to a connection.
static void trunk_connection_enter_full(trunk_connection_t *tconn)
Transition a connection to the full state.
void trunk_request_free(trunk_request_t **treq_to_free)
If the trunk request is freed then update the target requests.
#define DO_REQUEST_DEMUX(_tconn)
Read one or more requests from a connection.
static uint64_t trunk_connection_requests_dequeue(fr_dlist_head_t *out, trunk_connection_t *tconn, int states, uint64_t max)
Shift requests in the specified states onto new connections.
static int _trunk_request_free(trunk_request_t *treq)
Actually free the trunk request.
char const * log_prefix
What to prepend to messages.
#define REQUEST_EXTRACT_PENDING(_treq)
Remove the current request from the pending list.
static void _trunk_connection_lifetime_expire(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t now, void *uctx)
Trigger a reconnection of the trunk connection.
static void trunk_connection_event_update(trunk_connection_t *tconn)
Update the registrations for I/O events we're interested in.
static conf_parser_t const trunk_config_request[]
fr_dlist_head_t full
Connections which have too many outstanding requests.
#define DEQUEUE_ALL(_src_list, _state)
static void trunk_request_enter_backlog(trunk_request_t *treq, bool new)
Transition a request to the backlog state, adding it to the backlog of the trunk.
static fr_table_num_ordered_t const trunk_request_states[]
static void _trunk_connection_on_connecting(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connecting state.
static fr_table_num_indexed_bit_pos_t const trunk_conn_trigger_names[]
Map connection states to trigger names.
fr_dlist_head_t draining_to_free
Connections that will be freed once all their requests are complete.
uint64_t id
Trunk request ID.
uint64_t sent_count
The number of requests that have been sent using this connection.
static void _trunk_connection_on_init(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the the init state.
#define DO_CONNECTION_NOTIFY(_tconn, _events)
Change what events the connection should be notified about.
#define TREQ_DLIST_VERIFY(_dlist, _state)
fr_dlist_head_t inactive
Connections which have been signalled to be inactive by the API client.
void trunk_connection_manage_stop(trunk_t *trunk)
Stop the trunk from opening and closing connections in response to load.
#define TREQ_HEAP_VERIFY(_heap, _state)
void trunk_connection_signal_active(trunk_connection_t *tconn)
Signal a trunk connection is no longer full.
fr_dlist_head_t log
State change log.
uint64_t tconn_id
If the treq was associated with a connection the connection ID.
fr_dlist_t entry
Used to track the connection in the connecting, full and failed lists.
static void trunk_request_enter_cancel_sent(trunk_request_t *treq)
Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list.
static void trunk_connection_enter_inactive(trunk_connection_t *tconn)
Transition a connection to the inactive state.
trunk_request_state_t from
What state we transitioned from.
fr_dlist_head_t cancel_sent
Sent cancellation request.
void trunk_connection_manage_start(trunk_t *trunk)
Allow the trunk to open and close connections in response to load.
fr_dlist_head_t inactive_draining
Connections which have been signalled to be inactive by the API client, which the trunk manager is dr...
void trunk_connection_signal_inactive(trunk_connection_t *tconn)
Signal a trunk connection cannot accept more requests.
static int _state_log_entry_free(trunk_request_state_log_t *slog)
Used for sanity checks to ensure all log entries have been freed.
void trunk_connection_verify(char const *file, int line, trunk_connection_t *tconn)
fr_heap_t * backlog
The request backlog.
#define IN_REQUEST_CANCEL_MUX(_trunk)
void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
Return a count of requests on a connection in a specific state.
void trunk_request_signal_cancel_partial(trunk_request_t *treq)
Signal a partial cancel write.
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
#define COUNT_BY_STATE(_state, _list)
void * uctx
Uctx data to pass to alloc.
#define TREQ_OPTION_VERIFY(_option, _state)
bool trunk_connection_search(trunk_connection_t *tconn, void *ptr)
#define CONN_BAD_STATE_TRANSITION(_new)
fr_heap_index_t heap_id
Used to track the request conn->pending heap.
#define REQUEST_STATE_TRANSITION(_new)
Record a request state transition and log appropriate output.
trunk_watch_entry_t * next_watcher
Watcher about to be run. Used to prevent nested watchers.
static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Remove requests in specified states from a connection, attempting to distribute them to new connectio...
bool sent
Trunk request has been sent at least once.
void trunk_request_signal_complete(trunk_request_t *treq)
Signal that a trunk request is complete.
static void trunk_connection_auto_unfull(trunk_connection_t *tconn)
Automatically mark a connection as active or reconnect it.
void trunk_connection_signal_reconnect(trunk_connection_t *tconn, connection_reason_t reason)
Signal a trunk connection is no longer viable.
void trunk_connection_signal_writable(trunk_connection_t *tconn)
Signal that a trunk connection is writable.
bool trunk_request_search(trunk_request_t *treq, void *ptr)
fr_dlist_t entry
List entry.
static conf_parser_t const trunk_config_connection[]
trunk_connection_state_t tconn_state
If the treq was associated with a connection the connection state at the time of the state transition...
bool bound_to_conn
Fail the request if there's an attempt to re-enqueue it.
static size_t trunk_cancellation_reasons_len
static fr_table_num_ordered_t const trunk_cancellation_reasons[]
static size_t trunk_conn_trigger_names_len
fr_event_list_t * el
Event list used by this trunk and the connection.
void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line, trunk_request_t const *treq)
#define IN_REQUEST_MUX(_trunk)
fr_dlist_head_t free_requests
Requests in the unassigned state.
bool trunk_connection_in_state(trunk_connection_t *tconn, int state)
Returns true if the trunk connection is in one of the specified states.
#define TREQ_HEAP_SEARCH(_heap)
#define REQUEST_EXTRACT_CANCEL_SENT(_treq)
Remove the current request from the cancel sent list.
fr_dlist_t entry
Entry in the linked list.
void trunk_request_signal_reapable(trunk_request_t *treq)
Signal that the request was written to a connection successfully, but no response is expected.
Associates request queues with a connection.
Trace state machine changes for a particular request.
Main trunk management handle.
An entry in a trunk watch function list.
uint16_t max
Maximum number of connections in the trunk.
uint32_t max_req_per_conn
Maximum requests per connection.
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
trunk_t *_CONST trunk
Trunk this request belongs to.
bool backlog_on_failed_conn
Assign requests to the backlog when there are no available connections and the last connection event ...
uint16_t min
Shouldn't let connections drop below this number.
#define TRUNK_REQUEST_STATE_ALL
All request states.
void *_CONST rctx
Resume ctx of the module.
trunk_t *_CONST trunk
Trunk this connection belongs to.
fr_heap_cmp_t connection_prioritise
Ordering function for connections.
trunk_connection_state_t
Used for sanity checks and to track which list the connection is in.
@ TRUNK_CONN_FULL
Connection is full and can't accept any more requests.
@ TRUNK_CONN_CONNECTING
Connection is connecting.
@ TRUNK_CONN_DRAINING
Connection will be closed once it has no more outstanding requests, if it's not reactivated.
@ TRUNK_CONN_INACTIVE_DRAINING
Connection is inactive, can't accept any more requests, and will be closed once it has no more outsta...
@ TRUNK_CONN_INACTIVE
Connection is inactive and can't accept any more requests.
@ TRUNK_CONN_HALTED
Halted, ready to be freed.
@ TRUNK_CONN_CLOSED
Connection was closed, either explicitly or due to failure.
@ TRUNK_CONN_INIT
In the initial state.
@ TRUNK_CONN_DRAINING_TO_FREE
Connection will be closed once it has no more outstanding requests.
@ TRUNK_CONN_ACTIVE
Connection is connected and ready to service requests.
unsigned req_pool_headers
How many chunk headers the talloc pool allocated with the treq should contain.
request_t *_CONST request
The request that we're writing the data on behalf of.
fr_time_t _CONST last_open
Last time the connection management function opened a connection.
fr_time_delta_t idle_timeout
how long a connection can remain idle for
trunk_connection_state_t _CONST state
What state the connection is in.
size_t req_pool_size
The size of the talloc pool allocated with the treq.
uint64_t max_uses
The maximum time a connection can be used.
fr_time_delta_t lifetime
Time between reconnects.
uint16_t connecting
Maximum number of connections that can be in the connecting state.
uint64_t _CONST req_alloc_reused
How many requests were reused.
uint32_t max_backlog
Maximum number of requests that can be in the backlog.
fr_time_t _CONST last_failed
Last time a connection failed.
trunk_request_state_t _CONST state
Which list the request is now located in.
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
trunk_connection_t *_CONST tconn
Connection this request belongs to.
trunk_connection_alloc_t connection_alloc
Allocate a new connection_t.
fr_time_t _CONST last_read_success
Last time we read a response.
fr_time_t _CONST last_below_target
Last time average utilisation went below the target value.
fr_time_t _CONST last_read_success
Last time we read from the connection.
fr_time_delta_t close_delay
How long we must be below target utilisation to close an existing connection.
uint16_t start
How many connections to start.
fr_time_delta_t req_cleanup_delay
How long must a request in the unassigned (free) list not have been used for before it's cleaned up a...
#define TRUNK_REQUEST_STATE_CANCEL_ALL
All requests in various cancellation states.
bool always_writable
Set to true if our ability to write requests to a connection handle is not dependent on the state of ...
trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
@ TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
@ TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
@ TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
@ TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
#define TRUNK_CONN_ALL
All connection states.
fr_heap_cmp_t request_prioritise
Ordering function for requests.
uint64_t _CONST req_alloc
The number of requests currently allocated that have not been freed or returned to the free list.
trunk_cancel_reason_t
Reasons for a request being cancelled.
@ TRUNK_CANCEL_REASON_NONE
Request has not been cancelled.
@ TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
@ TRUNK_CANCEL_REASON_REQUEUE
A previously sent request is being requeued.
@ TRUNK_CANCEL_REASON_MOVE
Request cancelled because it's being moved.
uint64_t _CONST req_alloc_new
How many requests we've allocated.
fr_time_delta_t open_delay
How long we must be above target utilisation to spawn a new connection.
connection_t *_CONST conn
The underlying connection.
@ TRUNK_STATE_PENDING
Trunk has connections, but none are active.
@ TRUNK_STATE_ACTIVE
Trunk has active connections.
@ TRUNK_STATE_IDLE
Trunk has no connections.
fr_time_t _CONST last_closed
Last time the connection management function closed a connection.
void(* trunk_watch_t)(trunk_t *trunk, trunk_state_t prev, trunk_state_t state, void *uctx)
Receive a notification when a trunk enters a particular state.
fr_time_delta_t manage_interval
How often we run the management algorithm to open/close connections.
@ TRUNK_ENQUEUE_DST_UNAVAILABLE
Destination is down.
@ TRUNK_ENQUEUE_FAIL
General failure.
@ TRUNK_ENQUEUE_OK
Operation was successful.
@ TRUNK_ENQUEUE_NO_CAPACITY
At maximum number of connections, and no connection has capacity.
@ TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
void *_CONST preq
Data for the muxer to write to the connection.
uint32_t target_req_per_conn
How many pending requests should ideally be running on each connection.
fr_time_t _CONST last_connected
Last time a connection connected.
trunk_request_cancel_mux_t request_cancel_mux
!< Read one or more requests from a connection.
trunk_request_state_t
Used for sanity checks and to simplify freeing.
@ TRUNK_REQUEST_STATE_PARTIAL
Some of the request was written to the socket, more of it should be written later.
@ TRUNK_REQUEST_STATE_REAPABLE
Request has been written, needs to persist, but we are not currently waiting for any response.
@ TRUNK_REQUEST_STATE_UNASSIGNED
Transition state - Request currently not assigned to any connection.
@ TRUNK_REQUEST_STATE_INIT
Initial state.
@ TRUNK_REQUEST_STATE_CANCEL_SENT
We've informed the remote server that the request has been cancelled.
@ TRUNK_REQUEST_STATE_COMPLETE
The request is complete.
@ TRUNK_REQUEST_STATE_FAILED
The request failed.
@ TRUNK_REQUEST_STATE_CANCEL
A request on a particular socket was cancel.
@ TRUNK_REQUEST_STATE_CANCEL_PARTIAL
We partially wrote a cancellation request.
@ TRUNK_REQUEST_STATE_BACKLOG
In the backlog.
@ TRUNK_REQUEST_STATE_CANCEL_COMPLETE
Remote server has acknowledged our cancellation.
@ TRUNK_REQUEST_STATE_PENDING
In the queue of a connection and is pending writing.
@ TRUNK_REQUEST_STATE_SENT
Was written to a socket. Waiting for a response.
trunk_state_t _CONST state
Current state of the trunk.
fr_time_t _CONST last_above_target
Last time average utilisation went above the target value.
Common configuration parameters for a trunk.
Public fields for the trunk connection.
I/O functions to pass to trunk_alloc.
Public fields for the trunk.
Public fields for the trunk request.
static fr_event_list_t * el
char const * fr_strerror(void)
Get the last library error.
#define fr_box_time_delta(_val)
static size_t char ** out