27 #define LOG_PREFIX trunk->log_prefix
30 # define TALLOC_GET_TYPE_ABORT_NOOP 1
36 #define _TRUNK_PRIVATE 1
37 #include <freeradius-devel/server/trunk.h>
39 #include <freeradius-devel/server/connection.h>
40 #include <freeradius-devel/server/trigger.h>
41 #include <freeradius-devel/util/misc.h>
42 #include <freeradius-devel/util/syserror.h>
43 #include <freeradius-devel/util/table.h>
44 #include <freeradius-devel/util/minmax_heap.h>
46 #ifdef HAVE_STDATOMIC_H
49 # include <freeradius-devel/util/stdatomic.h>
62 #define fr_time test_time
69 #define TRUNK_REQUEST_STATE_LOG_MAX 20
429 #define CONN_TRIGGER(_state) do { \
430 if (trunk->pub.triggers) { \
431 trigger_exec(unlang_interpret_get_thread_default(), \
432 NULL, fr_table_str_by_value(trunk_conn_trigger_names, _state, \
433 "<INVALID>"), true, NULL); \
437 #define CONN_STATE_TRANSITION(_new, _log) \
439 _log("[%" PRIu64 "] Trunk connection changed state %s -> %s", \
440 tconn->pub.conn->id, \
441 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
442 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>")); \
443 tconn->pub.state = _new; \
444 CONN_TRIGGER(_new); \
445 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false); \
448 #define CONN_BAD_STATE_TRANSITION(_new) \
450 if (!fr_cond_assert_msg(0, "[%" PRIu64 "] Trunk connection invalid transition %s -> %s", \
451 tconn->pub.conn->id, \
452 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
453 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>"))) return; \
460 #define REQUEST_TRIGGER(_state) do { \
461 if (trunk->pub.triggers) { \
462 trigger_exec(unlang_interpret_get_thread_default(), \
463 NULL, fr_table_str_by_value(trunk_req_trigger_names, _state, \
464 "<INVALID>"), true, NULL); \
471 #define REQUEST_STATE_TRANSITION(_new) \
473 request_t *request = treq->pub.request; \
474 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
476 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
477 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
478 trunk_request_state_log_entry_add(__FUNCTION__, __LINE__, treq, _new); \
479 treq->pub.state = _new; \
480 REQUEST_TRIGGER(_new); \
482 #define REQUEST_BAD_STATE_TRANSITION(_new) \
484 trunk_request_state_log(&default_log, L_ERR, __FILE__, __LINE__, treq); \
485 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
487 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
488 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
494 #define REQUEST_STATE_TRANSITION(_new) \
496 request_t *request = treq->pub.request; \
497 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
499 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
500 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
501 treq->pub.state = _new; \
503 #define REQUEST_BAD_STATE_TRANSITION(_new) \
505 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
507 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
508 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
516 #define DO_REQUEST_CANCEL(_treq, _reason) \
518 if ((_treq)->pub.trunk->funcs.request_cancel) { \
519 request_t *request = (_treq)->pub.request; \
520 void *_prev = (_treq)->pub.trunk->in_handler; \
521 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_cancel; \
522 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_cancel(conn=%p, preq=%p, reason=%s, uctx=%p)", \
523 (_treq)->pub.tconn->pub.conn, \
525 fr_table_str_by_value(trunk_cancellation_reasons, \
528 (_treq)->pub.trunk->uctx); \
529 (_treq)->pub.trunk->funcs.request_cancel((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_reason), (_treq)->pub.trunk->uctx); \
530 (_treq)->pub.trunk->in_handler = _prev; \
537 #define DO_REQUEST_CONN_RELEASE(_treq) \
539 if ((_treq)->pub.trunk->funcs.request_conn_release) { \
540 request_t *request = (_treq)->pub.request; \
541 void *_prev = (_treq)->pub.trunk->in_handler; \
542 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_conn_release; \
543 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_conn_release(conn=%p, preq=%p, uctx=%p)", \
544 (_treq)->pub.tconn->pub.conn, \
546 (_treq)->pub.trunk->uctx); \
547 (_treq)->pub.trunk->funcs.request_conn_release((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
548 (_treq)->pub.trunk->in_handler = _prev; \
555 #define DO_REQUEST_COMPLETE(_treq) \
557 if ((_treq)->pub.trunk->funcs.request_complete) { \
558 request_t *request = (_treq)->pub.request; \
559 void *_prev = (_treq)->pub.trunk->in_handler; \
560 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_complete(request=%p, preq=%p, rctx=%p, uctx=%p)", \
561 (_treq)->pub.request, \
564 (_treq)->pub.trunk->uctx); \
565 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_complete; \
566 (_treq)->pub.trunk->funcs.request_complete((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, (_treq)->pub.trunk->uctx); \
567 (_treq)->pub.trunk->in_handler = _prev; \
574 #define DO_REQUEST_FAIL(_treq, _prev_state) \
576 if ((_treq)->pub.trunk->funcs.request_fail) { \
577 request_t *request = (_treq)->pub.request; \
578 void *_prev = (_treq)->pub.trunk->in_handler; \
579 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_fail(request=%p, preq=%p, rctx=%p, state=%s uctx=%p)", \
580 (_treq)->pub.request, \
583 fr_table_str_by_value(trunk_request_states, (_prev_state), "<INVALID>"), \
584 (_treq)->pub.trunk->uctx); \
585 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_fail; \
586 (_treq)->pub.trunk->funcs.request_fail((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, _prev_state, (_treq)->pub.trunk->uctx); \
587 (_treq)->pub.trunk->in_handler = _prev; \
594 #define DO_REQUEST_FREE(_treq) \
596 if ((_treq)->pub.trunk->funcs.request_free) { \
597 request_t *request = (_treq)->pub.request; \
598 void *_prev = (_treq)->pub.trunk->in_handler; \
599 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_free(request=%p, preq=%p, uctx=%p)", \
600 (_treq)->pub.request, \
602 (_treq)->pub.trunk->uctx); \
603 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_free; \
604 (_treq)->pub.trunk->funcs.request_free((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
605 (_treq)->pub.trunk->in_handler = _prev; \
612 #define DO_REQUEST_MUX(_tconn) \
614 void *_prev = (_tconn)->pub.trunk->in_handler; \
615 DEBUG3("[%" PRIu64 "] Calling request_mux(el=%p, tconn=%p, conn=%p, uctx=%p)", \
616 (_tconn)->pub.conn->id, \
617 (_tconn)->pub.trunk->el, \
619 (_tconn)->pub.conn, \
620 (_tconn)->pub.trunk->uctx); \
621 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_mux; \
622 (_tconn)->pub.trunk->funcs.request_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
623 (_tconn)->pub.trunk->in_handler = _prev; \
629 #define DO_REQUEST_DEMUX(_tconn) \
631 void *_prev = (_tconn)->pub.trunk->in_handler; \
632 DEBUG3("[%" PRIu64 "] Calling request_demux(tconn=%p, conn=%p, uctx=%p)", \
633 (_tconn)->pub.conn->id, \
635 (_tconn)->pub.conn, \
636 (_tconn)->pub.trunk->uctx); \
637 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_demux; \
638 (_tconn)->pub.trunk->funcs.request_demux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
639 (_tconn)->pub.trunk->in_handler = _prev; \
645 #define DO_REQUEST_CANCEL_MUX(_tconn) \
647 if ((_tconn)->pub.trunk->funcs.request_cancel_mux) { \
648 void *_prev = (_tconn)->pub.trunk->in_handler; \
649 DEBUG3("[%" PRIu64 "] Calling request_cancel_mux(tconn=%p, conn=%p, uctx=%p)", \
650 (_tconn)->pub.conn->id, \
652 (_tconn)->pub.conn, \
653 (_tconn)->pub.trunk->uctx); \
654 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_cancel_mux; \
655 (_tconn)->pub.trunk->funcs.request_cancel_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
656 (_tconn)->pub.trunk->in_handler = _prev; \
663 #define DO_CONNECTION_ALLOC(_tconn) \
665 void *_prev = trunk->in_handler; \
666 DEBUG3("Calling connection_alloc(tconn=%p, el=%p, conf=%p, log_prefix=\"%s\", uctx=%p)", \
668 (_tconn)->pub.trunk->el, \
669 (_tconn)->pub.trunk->conf.conn_conf, \
671 (_tconn)->pub.trunk->uctx); \
672 (_tconn)->pub.trunk->in_handler = (void *) (_tconn)->pub.trunk->funcs.connection_alloc; \
673 (_tconn)->pub.conn = trunk->funcs.connection_alloc((_tconn), (_tconn)->pub.trunk->el, (_tconn)->pub.trunk->conf.conn_conf, (_tconn)->pub.trunk->log_prefix, trunk->uctx); \
674 (_tconn)->pub.trunk->in_handler = _prev; \
675 if (!(_tconn)->pub.conn) { \
676 ERROR("Failed creating new connection"); \
677 talloc_free(tconn); \
685 #define DO_CONNECTION_NOTIFY(_tconn, _events) \
687 if ((_tconn)->pub.trunk->funcs.connection_notify) { \
688 void *_prev = (_tconn)->pub.trunk->in_handler; \
689 DEBUG3("[%" PRIu64 "] Calling connection_notify(tconn=%p, conn=%p, el=%p, events=%s, uctx=%p)", \
690 (_tconn)->pub.conn->id, \
692 (_tconn)->pub.conn, \
693 (_tconn)->pub.trunk->el, \
694 fr_table_str_by_value(trunk_connection_events, (_events), "<INVALID>"), \
695 (_tconn)->pub.trunk->uctx); \
696 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.connection_notify; \
697 (_tconn)->pub.trunk->funcs.connection_notify((_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->el, (_events), (_tconn)->pub.trunk->uctx); \
698 (_tconn)->pub.trunk->in_handler = _prev; \
702 #define IN_HANDLER(_trunk) (((_trunk)->in_handler) != NULL)
703 #define IN_REQUEST_MUX(_trunk) (((_trunk)->funcs.request_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_mux))
704 #define IN_REQUEST_DEMUX(_trunk) (((_trunk)->funcs.request_demux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_demux))
705 #define IN_REQUEST_CANCEL_MUX(_trunk) (((_trunk)->funcs.request_cancel_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_cancel_mux))
707 #define IS_SERVICEABLE(_tconn) ((_tconn)->pub.state & TRUNK_CONN_SERVICEABLE)
708 #define IS_PROCESSING(_tconn) ((tconn)->pub.state & TRUNK_CONN_PROCESSING)
713 #define REQUEST_EXTRACT_BACKLOG(_treq) \
716 _ret = fr_heap_extract(&(_treq)->pub.trunk->backlog, _treq); \
717 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from backlog heap: %s", fr_strerror())) break; \
723 #define REQUEST_EXTRACT_PENDING(_treq) \
726 _ret = fr_heap_extract(&(_treq)->pub.tconn->pending, _treq); \
727 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from pending heap: %s", fr_strerror())) break; \
733 #define REQUEST_EXTRACT_PARTIAL(_treq) \
735 fr_assert((_treq)->pub.tconn->partial == treq); \
736 tconn->partial = NULL; \
742 #define REQUEST_EXTRACT_SENT(_treq) fr_dlist_remove(&tconn->sent, treq)
747 #define REQUEST_EXTRACT_REAPABLE(_treq) fr_dlist_remove(&tconn->reapable, treq)
752 #define REQUEST_EXTRACT_CANCEL(_treq) fr_dlist_remove(&tconn->cancel, treq)
757 #define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq) \
759 fr_assert((_treq)->pub.tconn->cancel_partial == treq); \
760 tconn->cancel_partial = NULL; \
766 #define REQUEST_EXTRACT_CANCEL_SENT(_treq) fr_dlist_remove(&tconn->cancel_sent, treq)
772 #define CONN_REORDER(_tconn) \
775 if ((fr_minmax_heap_num_elements((_tconn)->pub.trunk->active) == 1)) break; \
776 if (!fr_cond_assert((_tconn)->pub.state == TRUNK_CONN_ACTIVE)) break; \
777 _ret = fr_minmax_heap_extract((_tconn)->pub.trunk->active, (_tconn)); \
778 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) break; \
779 fr_minmax_heap_insert((_tconn)->pub.trunk->active, (_tconn)); \
811 #define CALL_WATCHERS(_trunk, _state) \
813 if (fr_dlist_empty(&(_trunk)->watch[_state])) break; \
814 trunk_watch_call((_trunk), &(_trunk)->watch[_state], _state); \
836 if (entry->
func == watch) {
881 #define TRUNK_STATE_TRANSITION(_new) \
883 DEBUG3("Trunk changed state %s -> %s", \
884 fr_table_str_by_value(trunk_states, trunk->pub.state, "<INVALID>"), \
885 fr_table_str_by_value(trunk_states, _new, "<INVALID>")); \
886 CALL_WATCHERS(trunk, _new); \
887 trunk->pub.state = _new; \
1622 RWARN,
WARN,
"Refusing to enqueue requests - "
1623 "No active connections and last event was a connection failure");
1643 uint64_t total_reqs;
1649 RWARN,
WARN,
"Refusing to alloc requests - "
1650 "Limit of %"PRIu64
" (max = %u * per_connection_max = %u) "
1651 "plus %u backlog requests reached",
1731 int states, uint64_t max)
1736 if (max == 0) max = UINT64_MAX;
1738 #define OVER_MAX_CHECK if (++count > max) return (count - 1)
1740 #define DEQUEUE_ALL(_src_list, _state) do { \
1741 while ((treq = fr_dlist_head(_src_list))) { \
1743 fr_assert(treq->pub.state == (_state)); \
1744 trunk_request_enter_unassigned(treq); \
1745 fr_dlist_insert_tail(out, treq); \
1848 if (max == 0) max = UINT64_MAX;
1881 "Failed extracting conn from active heap: %s",
fr_strerror()))
goto done;
1945 "Failed re-inserting conn into active heap: %s",
fr_strerror()))
goto done;
1947 if (moved >= max)
goto done;
2027 "%s can only be called from within request_mux handler", __FUNCTION__))
return;
2048 "%s can only be called from within request_mux handler", __FUNCTION__))
return;
2070 "%s can only be called from within request_mux handler", __FUNCTION__))
return;
2152 "%s cannot be called within a handler", __FUNCTION__))
return;
2172 "Bad state %s after cancellation",
2229 "%s can only be called from within request_cancel_mux handler", __FUNCTION__))
return;
2253 "%s can only be called from within request_cancel_mux handler", __FUNCTION__))
return;
2277 "%s can only be called from within request_demux or request_cancel_mux handlers",
2278 __FUNCTION__))
return;
2336 *treq_to_free = NULL;
2371 talloc_free_children(treq);
2390 talloc_free_children(treq);
2476 RWARN,
WARN,
"Refusing to alloc requests - "
2477 "Limit of %"PRIu64
" (max = %u * per_connection_max = %u) "
2478 "plus %u backlog requests reached",
2576 request_t *request,
void *preq,
void *rctx)
2730 request_t *request,
void *preq,
void *rctx,
2744 if (!ignore_limits) {
2792 fr_assert_msg(slog,
"slog list head NULL but element counter was %u",
2795 memset(slog, 0,
sizeof(*slog));
2826 fr_log(log, log_type,
file,
line,
"[%u] %s:%i - in conn %"PRIu64
" in state %s - %s -> %s",
3467 PERROR(
"Failed inserting connection reconnection timer event, halting connection");
3494 bool need_requeue =
false;
3503 need_requeue =
true;
3818 "%s can only be called from within request_cancel_mux handler",
3819 __FUNCTION__))
return -2;
3822 if (!*treq_out)
return 1;
3866 "%s can only be called from within request_mux handler",
3867 __FUNCTION__))
return -2;
3870 if (!*treq_out)
return 1;
3886 "%s cannot be called within a handler", __FUNCTION__))
return;
3888 DEBUG3(
"[%" PRIu64
"] Signalled writable", tconn->
pub.
conn->id);
3904 "%s cannot be called within a handler", __FUNCTION__))
return;
3906 DEBUG3(
"[%" PRIu64
"] Signalled readable", tconn->
pub.
conn->id);
4046 DEBUG3(
"Closing %s connection with no requests",
4089 DEBUG3(
"Rebalancing requests");
4150 DEBUG4(
"Managing trunk");
4222 DEBUG4(
"Not opening connection - Too many (%u) connections in the connecting state",
4234 DEBUG4(
"Not opening connection - Need to be above target for %pVs. It's been %pVs",
4249 DEBUG4(
"Not opening connection - Have %u connections, need %u or below",
4260 DEBUG4(
"Not opening connection - No outstanding requests");
4268 if (conn_count > 0) {
4270 if (average < trunk->
conf.target_req_per_conn) {
4271 DEBUG4(
"Not opening connection - Would leave us below our target requests "
4272 "per connection (now %u, after open %u)",
4301 DEBUG4(
"Not opening connection - Need to wait %pVs before opening another connection. "
4308 DEBUG4(
"Opening connection - Above target requests per connection (now %u, target %u)",
4320 DEBUG4(
"Not closing connection - Need to be below target for %pVs. It's been %pVs",
4329 DEBUG4(
"Not closing connection - No connections to close!");
4333 if ((trunk->
conf.
min > 0) && ((conn_count - 1) < trunk->
conf.
min)) {
4334 DEBUG4(
"Not closing connection - Have %u connections, need %u or above",
4340 DEBUG4(
"Closing connection - No outstanding requests");
4352 if (conn_count == 1) {
4353 DEBUG4(
"Not closing connection - Would leave connections "
4354 "and there are still %u outstanding requests", req_count);
4364 DEBUG4(
"Not closing connection - Would leave us above our target requests per connection "
4365 "(now %u, after close %u)",
ROUND_UP_DIV(req_count, conn_count), average);
4369 DEBUG4(
"Closing connection - Below target requests per connection (now %u, target %u)",
4374 DEBUG4(
"Not closing connection - Need to wait %pVs before closing another connection. "
4390 DEBUG4(
"Not closing remaining connection - last event was a failure");
4459 PERROR(
"Failed inserting trunk management event");
4478 #define COUNT_BY_STATE(_state, _list) \
4480 if (conn_state & (_state)) { \
4482 while ((tconn = fr_dlist_next(&trunk->_list, tconn))) { \
4483 count += trunk_request_count_by_connection(tconn, req_state); \
4527 uint64_t req_per_conn = 0;
4570 if (conn_count == 0) {
4575 if (req_count == 0) {
4592 }
else if (req_per_conn < trunk->
conf.target_req_per_conn) {
4603 if (conn_count_out) *conn_count_out = conn_count;
4604 if (req_count_out) *req_count_out = req_count;
4613 return req_per_conn;
4678 #define RECONNECT_BY_STATE(_state, _list) \
4680 if (states & (_state)) { \
4682 for (i = fr_dlist_num_elements(&trunk->_list); i > 0; i--) { \
4683 connection_signal_reconnect(((trunk_connection_t *)fr_dlist_tail(&trunk->_list))->pub.conn, reason); \
4723 for (i = 0; i < trunk->
conf.
start; i++) {
4724 DEBUG(
"[%i] Starting initial connection", i);
4735 PERROR(
"Failed inserting trunk management event");
4752 DEBUG3(
"Connection management enabled");
4763 DEBUG3(
"Connection management disabled");
4774 PERROR(
"Failed inserting trunk management event");
4795 return ((a_count > b_count) && ((a_count - b_count) > 1)) - ((b_count > a_count) && ((b_count - a_count) > 1));
4808 DEBUG4(
"Trunk free %p", trunk);
4887 char const *log_prefix,
void const *
uctx,
bool delay_start)
4899 trunk->
log_prefix = talloc_strdup(trunk, log_prefix);
4901 memcpy(&trunk->
funcs, funcs,
sizeof(trunk->
funcs));
4945 DEBUG4(
"Trunk allocated %p", trunk);
4957 #ifndef TALLOC_GET_TYPE_ABORT_NOOP
4969 (void) talloc_get_type_abort(trunk,
trunk_t);
4975 #define IO_FUNC_VERIFY(_func) \
4976 fr_fatal_assert_msg(trunk->funcs._func, "CONSISTENCY_CHECK_FAILED %s[%i}: " #_func " was NULL", file, line)
4985 #define TRUNK_TCONN_CHECKS(_tconn, _state) \
4987 fr_fatal_assert_msg(trunk == _tconn->pub.trunk, \
4988 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-trunk mismatch", file, line); \
4989 fr_fatal_assert_msg(_state == _tconn->pub.state, \
4990 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-state mismatch", file, line); \
4993 #define TCONN_DLIST_VERIFY(_dlist, _state) \
4995 _fr_dlist_verify(file, line, &(trunk->_dlist)); \
4996 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
4997 trunk_connection_verify(file, line, tconn); \
4998 TRUNK_TCONN_CHECKS(tconn, _state); \
5002 #define TCONN_MINMAX_HEAP_VERIFY(_heap, _state) \
5004 fr_minmax_heap_verify(file, line, trunk->_heap); \
5005 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5006 trunk_connection_verify(file, line, tconn); \
5007 TRUNK_TCONN_CHECKS(tconn, _state); \
5039 #define TCONN_TREQ_CHECKS(_treq, _state) \
5041 fr_fatal_assert_msg(tconn == _treq->pub.tconn, \
5042 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-tconn mismatch", file, line); \
5043 fr_fatal_assert_msg(tconn->pub.trunk == _treq->pub.trunk, \
5044 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-trunk mismatch", file, line); \
5045 fr_fatal_assert_msg(_state == _treq->pub.state, \
5046 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-state mismatch", file, line); \
5049 #define TREQ_DLIST_VERIFY(_dlist, _state) \
5051 _fr_dlist_verify(file, line, &(tconn->_dlist)); \
5052 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5053 trunk_request_verify(file, line, treq); \
5054 TCONN_TREQ_CHECKS(treq, _state); \
5058 #define TREQ_HEAP_VERIFY(_heap, _state) \
5060 fr_heap_iter_t _iter; \
5061 fr_heap_verify(file, line, tconn->_heap); \
5062 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5064 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5065 trunk_request_verify(file, line, treq); \
5066 TCONN_TREQ_CHECKS(treq, _state); \
5070 #define TREQ_OPTION_VERIFY(_option, _state) \
5072 if (tconn->_option) { \
5073 trunk_request_verify(file, line, tconn->_option); \
5074 TCONN_TREQ_CHECKS(tconn->_option, _state); \
5092 #ifdef WITH_VERIFY_PTR
5100 #define TCONN_DLIST_SEARCH(_dlist) \
5102 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5103 if (ptr == tconn) { \
5104 fr_fprintf(stderr, "trunk_search: tconn %p on " #_dlist "\n", ptr); \
5107 if (trunk_connection_search(tconn, ptr)) { \
5108 fr_fprintf(stderr, " in tconn %p on " #_dlist "\n", tconn); \
5114 #define TCONN_MINMAX_HEAP_SEARCH(_heap) \
5116 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5117 if (ptr == tconn) { \
5118 fr_fprintf(stderr, "trunk_search: tconn %p on " #_heap "\n", ptr); \
5121 if (trunk_connection_search(tconn, ptr)) { \
5122 fr_fprintf(stderr, " on tconn %p on " #_heap "\n", tconn); \
5145 #define TREQ_DLIST_SEARCH(_dlist) \
5147 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5148 if (ptr == treq) { \
5149 fr_fprintf(stderr, "trunk_search: treq %p on " #_dlist "\n", ptr); \
5152 if (trunk_request_search(treq, ptr)) { \
5153 fr_fprintf(stderr, "trunk_search: preq %p found on " #_dlist, ptr); \
5159 #define TREQ_HEAP_SEARCH(_heap) \
5161 fr_heap_iter_t _iter; \
5162 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5164 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5165 if (ptr == treq) { \
5166 fr_fprintf(stderr, "trunk_search: treq %p in " #_heap "\n", ptr); \
5169 if (trunk_request_search(treq, ptr)) { \
5170 fr_fprintf(stderr, "trunk_search: preq %p found in " #_heap, ptr); \
5176 #define TREQ_OPTION_SEARCH(_option) \
5178 if (tconn->_option) { \
5179 if (ptr == tconn->_option) { \
5180 fr_fprintf(stderr, "trunk_search: treq %p is " #_option "\n", ptr); \
5183 if (trunk_request_search(tconn->_option, ptr)) { \
5184 fr_fprintf(stderr, "trunk_search: preq %p found in " #_option, ptr); \
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request)
#define L(_str)
Helper for initialising arrays of string literals.
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
#define CONF_PARSER_TERMINATOR
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
#define FR_CONF_OFFSET_SUBSECTION(_name, _flags, _struct, _field, _subcs)
conf_parser_t which populates a sub-struct using a CONF_SECTION
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Defines a CONF_PAIR to C data type mapping.
@ CONNECTION_STATE_FAILED
Connection has failed.
@ CONNECTION_STATE_HALTED
The connection is in a halted stat.
@ CONNECTION_STATE_CLOSED
Connection has been closed.
@ CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
@ CONNECTION_STATE_INIT
Init state, sets up connection.
@ CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
@ CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
fr_dcursor_eval_t void const * uctx
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
static void _fr_dlist_verify(char const *file, int line, fr_dlist_head_t const *list_head)
Check all items in the list are valid.
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
static void * fr_dlist_tail(fr_dlist_head_t const *list_head)
Return the TAIL item of a list or NULL if the list is empty.
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
#define fr_dlist_verify(_head)
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
static void * fr_dlist_prev(fr_dlist_head_t const *list_head, void const *ptr)
Get the previous item in a list.
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
Head of a doubly linked list.
Entry in a doubly linked list.
#define fr_event_timer_in(...)
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
unsigned int fr_heap_index_t
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
#define FR_HEAP_VERIFY(_heap)
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
#define RATE_LIMIT_LOCAL_ROPTIONAL(_entry, _l_request, _l_global, _fmt,...)
Rate limit messages using a local limiting entry.
Track when a log message was last repeated.
int fr_event_timer_delete(fr_event_timer_t const **ev_p)
Delete a timer event from the event list.
Stores all information relating to an event list.
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
#define ROUND_UP_DIV(_x, _y)
Get the ceiling value of integer division.
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
void * fr_minmax_heap_iter_next(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Get the next entry in a minmax heap.
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
void * fr_minmax_heap_iter_init(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Iterate over entries in a minmax heap.
unsigned int fr_minmax_heap_iter_t
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
int8_t fr_pointer_cmp(void const *a, void const *b)
Compares two pointers.
static fr_event_list_t * events
static int8_t request_prioritise(void const *one, void const *two)
static size_t min(size_t x, size_t y)
void connection_signal_shutdown(connection_t *conn)
Shuts down a connection gracefully.
int connection_del_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a post list.
void connection_signal_halt(connection_t *conn)
Shuts down a connection ungracefully.
void connection_signals_resume(connection_t *conn)
Resume processing of deferred signals.
void connection_signal_reconnect(connection_t *conn, connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
connection_watch_entry_t * connection_add_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed before a state function has been called.
void connection_signal_init(connection_t *conn)
Asynchronously signal a halted connection to start.
connection_watch_entry_t * connection_add_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed after a state function has been called.
connection_t * connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_funcs_t const *funcs, connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
int connection_del_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a pre list.
void connection_signals_pause(connection_t *conn)
Pause processing of deferred signals.
static fr_time_t test_time_base
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
init
Enter the EAP-IDENTITY state.
#define fr_time()
Allow us to arbitrarily manipulate time.
#define atomic_fetch_add_explicit(object, operand, order)
#define ATOMIC_VAR_INIT(value)
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
An element in a table indexed by bit position.
An element in an arbitrarily ordered array of name to num mappings.
#define talloc_get_type_abort_const
#define talloc_pooled_object(_ctx, _type, _num_subobjects, _total_subobjects_size)
#define fr_time_gteq(_a, _b)
#define fr_time_delta_wrap(_time)
#define fr_time_wrap(_time)
#define fr_time_lteq(_a, _b)
#define fr_time_delta_ispos(_a)
#define fr_time_add(_a, _b)
Add a time/time delta together.
#define fr_time_gt(_a, _b)
#define fr_time_sub(_a, _b)
Subtract one time from another.
#define fr_time_lt(_a, _b)
bool trunk_search(trunk_t *trunk, void *ptr)
static atomic_uint_fast64_t request_counter
static void trunk_connection_enter_active(trunk_connection_t *tconn)
Transition a connection back to the active state.
#define CONN_REORDER(_tconn)
Reorder the connections in the active heap.
static size_t trunk_req_trigger_names_len
int trunk_connection_pop_cancellation(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a cancellation request off a connection's cancellation queue.
fr_dlist_head_t cancel
Requests in the cancel state.
int trunk_connection_manage_schedule(trunk_t *trunk)
Schedule a trunk management event for the next time the event loop is executed.
#define REQUEST_EXTRACT_SENT(_treq)
Remove the current request from the sent list.
static void _trunk_connection_on_shutdown(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the shutdown state.
struct trunk_watch_entry_s trunk_watch_entry_t
An entry in a trunk watch function list.
fr_dlist_head_t reapable
Idle request.
fr_heap_t * pending
Requests waiting to be sent.
trunk_conf_t conf
Trunk common configuration.
static size_t trunk_connection_states_len
#define REQUEST_EXTRACT_REAPABLE(_treq)
Remove the current request from the reapable list.
trunk_connection_t * tconn
The request was associated with.
void trunk_connection_callback_readable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O read function.
fr_rate_limit_t limit_last_failure_log
Rate limit on "Refusing to enqueue requests - No active conns".
void trunk_verify(char const *file, int line, trunk_t *trunk)
Verify a trunk.
#define IN_HANDLER(_trunk)
static fr_table_num_ordered_t const trunk_connection_states[]
void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
Force the trunk to re-establish its connections.
void trunk_connection_callback_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O write function.
void * uctx
User data to pass to the function.
static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new)
Transition a request to the pending state, adding it to the backlog of an active connection.
static void trunk_request_remove_from_conn(trunk_request_t *treq)
Remove a request from all connection lists.
fr_rate_limit_t limit_max_requests_alloc_log
Rate limit on "Refusing to alloc requests - Limit of * requests reached".
trunk_request_state_t to
What state we transitioned to.
static int8_t _trunk_request_prioritise(void const *a, void const *b)
Compare two protocol requests.
static void trunk_manage(trunk_t *trunk, fr_time_t now)
Implements the algorithm we use to manage requests per connection levels.
static int _trunk_connection_free(trunk_connection_t *tconn)
Free a connection.
trunk_io_funcs_t funcs
I/O functions.
fr_dlist_head_t draining
Connections that will be freed once all their requests are complete, but can be reactivated.
#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq)
Remove the current request from the cancel_partial slot.
int trunk_start(trunk_t *trunk)
Start the trunk running.
void trunk_request_signal_partial(trunk_request_t *treq)
Signal a partial write.
void trunk_request_signal_fail(trunk_request_t *treq)
Signal that a trunk request failed.
fr_event_timer_t const * manage_ev
Periodic connection management event.
#define TREQ_OPTION_SEARCH(_option)
void trunk_request_signal_cancel_sent(trunk_request_t *treq)
Signal that a remote server has been notified of the cancellation.
static void trunk_connection_enter_draining_to_free(trunk_connection_t *tconn)
Transition a connection to the draining-to-reconnect state.
trunk_watch_t func
Function to call when a trunk enters.
void trunk_connection_signal_readable(trunk_connection_t *tconn)
Signal that a trunk connection is readable.
trunk_watch_entry_t * trunk_add_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch, bool oneshot, void const *uctx)
Add a watch entry to the trunk state list.
#define DO_REQUEST_FREE(_treq)
Call the free callback (if set)
static void _trunk_connection_on_halted(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the halted state.
#define REQUEST_EXTRACT_BACKLOG(_treq)
Remove the current request from the backlog.
fr_heap_index_t heap_id
Used to track the connection in the connected heap.
fr_dlist_head_t closed
Connections that have closed.
fr_dlist_head_t watch[TRUNK_STATE_MAX]
To be called when trunk changes state.
static void trunk_watch_call(trunk_t *trunk, fr_dlist_head_t *list, trunk_state_t state)
Call a list of watch functions associated with a state.
static void trunk_request_enter_cancel_complete(trunk_request_t *treq)
Cancellation was acked, the request is complete, free it.
int line
Line change occurred on.
static void trunk_connection_enter_inactive_draining(trunk_connection_t *tconn)
Transition a connection to the inactive-draining state.
#define CONN_STATE_TRANSITION(_new, _log)
static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out, trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify)
Update timestamps for when we last had a transition from above target to below target or vice versa.
static size_t trunk_connection_events_len
static void _trunk_connection_on_failed(connection_t *conn, connection_state_t prev, connection_state_t state, void *uctx)
Connection failed.
bool oneshot
Remove the function after it's called once.
bool started
Has the trunk been started.
static size_t trunk_states_len
#define TCONN_DLIST_VERIFY(_dlist, _state)
#define IO_FUNC_VERIFY(_func)
uint32_t trunk_request_count_by_connection(trunk_connection_t const *tconn, int req_state)
Return the count number of requests associated with a trunk connection.
uint64_t last_req_per_conn
The last request to connection ratio we calculated.
#define DO_REQUEST_COMPLETE(_treq)
Call the complete callback (if set)
static void trunk_connection_auto_full(trunk_connection_t *tconn)
Automatically mark a connection as inactive.
static void trunk_connection_remove(trunk_connection_t *tconn)
Remove a trunk connection from whichever list it's currently in.
#define TRUNK_REQUEST_STATE_LOG_MAX
The maximum number of state logs to record per request.
static void trunk_connection_writable(trunk_connection_t *tconn)
A connection is writable.
fr_event_timer_t const * lifetime_ev
Maximum time this connection can be open.
trunk_connection_event_t events
The current events we expect to be notified on.
static int _trunk_free(trunk_t *trunk)
Free a trunk, gracefully closing all connections.
fr_dlist_head_t failed
Connections that'll be reconnected shortly.
static void trunk_rebalance(trunk_t *trunk)
Rebalance connections across active trunk members when a new connection becomes active.
static void trunk_backlog_drain(trunk_t *trunk)
Drain the backlog of as many requests as possible.
#define DO_REQUEST_CANCEL(_treq, _reason)
Call the cancel callback if set.
static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
Order connections by queue depth.
struct trunk_request_pub_s pub
Public fields in the trunk request.
#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state)
trunk_request_t * cancel_partial
Partially written cancellation request.
#define TCONN_MINMAX_HEAP_SEARCH(_heap)
uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Move requests off of a connection and requeue elsewhere.
bool enabled
Whether the watch entry is enabled.
fr_time_t last_freed
Last time this request was freed.
#define DO_REQUEST_CONN_RELEASE(_treq)
Call the "conn_release" callback (if set)
#define TREQ_DLIST_SEARCH(_dlist)
#define REQUEST_EXTRACT_CANCEL(_treq)
Remove the current request from the cancel list.
static bool trunk_connection_is_full(trunk_connection_t *tconn)
Return whether a trunk connection should currently be considered full.
struct trunk_pub_s pub
Public fields in the trunk connection.
trunk_cancel_reason_t cancel_reason
Why this request was cancelled.
#define REQUEST_BAD_STATE_TRANSITION(_new)
trunk_enqueue_t trunk_request_enqueue_on_conn(trunk_request_t **treq_out, trunk_connection_t *tconn, request_t *request, void *preq, void *rctx, bool ignore_limits)
Enqueue additional requests on a specific connection.
static void _trunk_connection_on_closed(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection failed after it was connected.
static fr_table_num_ordered_t const trunk_connection_events[]
trunk_enqueue_t trunk_request_enqueue(trunk_request_t **treq_out, trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
#define TCONN_DLIST_SEARCH(_dlist)
static void trunk_request_enter_unassigned(trunk_request_t *treq)
Transition a request to the unassigned state, in preparation for re-assignment.
struct trunk_request_s trunk_request_t
void * in_handler
Which handler we're inside.
bool freeing
Trunk is being freed, don't spawn new connections or re-enqueue.
static fr_table_num_ordered_t const trunk_states[]
static void trunk_connection_readable(trunk_connection_t *tconn)
A connection is readable.
#define IS_SERVICEABLE(_tconn)
trunk_enqueue_t trunk_request_requeue(trunk_request_t *treq)
Re-enqueue a request on the same connection.
#define IS_PROCESSING(_tconn)
#define RECONNECT_BY_STATE(_state, _list)
static void trunk_connection_enter_draining(trunk_connection_t *tconn)
Transition a connection to the draining state.
static fr_table_num_indexed_bit_pos_t const trunk_req_trigger_names[]
Map request states to trigger names.
fr_dlist_t entry
Used to track the trunk request in the conn->sent or trunk->backlog request.
static void trunk_connection_close_if_empty(trunk_t *trunk, fr_dlist_head_t *head)
Close connections in a particular connection list if they have no requests associated with them.
void trunk_request_signal_cancel_complete(trunk_request_t *treq)
Signal that a remote server acked our cancellation.
static trunk_enqueue_t trunk_request_check_enqueue(trunk_connection_t **tconn_out, trunk_t *trunk, request_t *request)
Check to see if a trunk request can be enqueued.
#define DO_REQUEST_MUX(_tconn)
Write one or more requests to a connection.
#define REQUEST_EXTRACT_PARTIAL(_treq)
Remove the current request from the partial slot.
fr_dlist_head_t sent
Sent request.
static void trunk_request_enter_partial(trunk_request_t *treq)
Transition a request to the partial state, indicating that is has been partially sent.
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
fr_dlist_head_t connecting
Connections which are not yet in the open state.
static void _trunk_timer(fr_event_list_t *el, fr_time_t now, void *uctx)
Event to periodically call the connection management function.
#define TRUNK_STATE_TRANSITION(_new)
void trunk_request_signal_cancel(trunk_request_t *treq)
Cancel a trunk request.
void trunk_request_state_log_entry_add(char const *function, int line, trunk_request_t *treq, trunk_request_state_t new)
static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now)
Attempt to spawn a new connection.
int trunk_del_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch)
Remove a watch function from a trunk state list.
struct trunk_connection_pub_s pub
Public fields in the trunk connection.
static void trunk_request_enter_reapable(trunk_request_t *treq)
Transition a request to the reapable state, indicating that it's been sent in its entirety,...
uint16_t trunk_connection_count_by_state(trunk_t *trunk, int conn_state)
Return the count number of connections in the specified states.
#define IN_REQUEST_DEMUX(_trunk)
#define DO_REQUEST_FAIL(_treq, _prev_state)
Call the fail callback (if set)
static void trunk_request_enter_cancel(trunk_request_t *treq, trunk_cancel_reason_t reason)
Transition a request to the cancel state, placing it in a connection's cancellation list.
static trunk_enqueue_t trunk_request_enqueue_existing(trunk_request_t *treq)
Enqueue a request which has never been assigned to a connection or was previously cancelled.
bool managing_connections
Whether the trunk is allowed to manage (open/close) connections.
#define DO_CONNECTION_ALLOC(_tconn)
Allocate a new connection.
char const * function
State change occurred in.
static size_t trunk_request_states_len
fr_dlist_head_t init
Connections which have not yet started connecting.
fr_dlist_head_t * log_head
To allow the log entry to remove itself on free.
static void trunk_request_enter_cancel_partial(trunk_request_t *treq)
Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot.
static void _trunk_connection_on_connected(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connected state.
fr_dlist_head_t to_free
Connections we're done with and will free on the next call to trunk_manage.
trunk_request_t * partial
Partially written request.
static void trunk_request_enter_failed(trunk_request_t *treq)
Request failed, inform the API client and free the request.
fr_minmax_heap_t * active
Connections which can service requests.
conf_parser_t const trunk_config[]
Config parser definitions to populate a trunk_conf_t.
static void trunk_request_enter_complete(trunk_request_t *treq)
Request completed successfully, inform the API client and free the request.
static void trunk_request_enter_sent(trunk_request_t *treq)
Transition a request to the sent state, indicating that it's been sent in its entirety.
#define DO_REQUEST_CANCEL_MUX(_tconn)
Write one or more cancellation requests to a connection.
static void trunk_connection_enter_full(trunk_connection_t *tconn)
Transition a connection to the full state.
void trunk_request_free(trunk_request_t **treq_to_free)
If the trunk request is freed then update the target requests.
#define DO_REQUEST_DEMUX(_tconn)
Read one or more requests from a connection.
static uint64_t trunk_connection_requests_dequeue(fr_dlist_head_t *out, trunk_connection_t *tconn, int states, uint64_t max)
Shift requests in the specified states onto new connections.
static int _trunk_request_free(trunk_request_t *treq)
Actually free the trunk request.
char const * log_prefix
What to prepend to messages.
#define REQUEST_EXTRACT_PENDING(_treq)
Remove the current request from the pending list.
static void trunk_connection_event_update(trunk_connection_t *tconn)
Update the registrations for I/O events we're interested in.
static conf_parser_t const trunk_config_request[]
fr_dlist_head_t full
Connections which have too many outstanding requests.
#define DEQUEUE_ALL(_src_list, _state)
static void trunk_request_enter_backlog(trunk_request_t *treq, bool new)
Transition a request to the backlog state, adding it to the backlog of the trunk.
static fr_table_num_ordered_t const trunk_request_states[]
static void _trunk_connection_on_connecting(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connecting state.
static void _trunk_connection_lifetime_expire(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Trigger a reconnection of the trunk connection.
static fr_table_num_indexed_bit_pos_t const trunk_conn_trigger_names[]
Map connection states to trigger names.
fr_dlist_head_t draining_to_free
Connections that will be freed once all their requests are complete.
uint64_t id
Trunk request ID.
uint64_t sent_count
The number of requests that have been sent using this connection.
static void _trunk_connection_on_init(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the the init state.
trunk_request_t * trunk_request_alloc(trunk_t *trunk, request_t *request)
(Pre-)Allocate a new trunk request
#define DO_CONNECTION_NOTIFY(_tconn, _events)
Change what events the connection should be notified about.
#define TREQ_DLIST_VERIFY(_dlist, _state)
fr_dlist_head_t inactive
Connections which have been signalled to be inactive by the API client.
void trunk_connection_manage_stop(trunk_t *trunk)
Stop the trunk from opening and closing connections in response to load.
#define TREQ_HEAP_VERIFY(_heap, _state)
void trunk_connection_signal_active(trunk_connection_t *tconn)
Signal a trunk connection is no longer full.
fr_dlist_head_t log
State change log.
uint64_t tconn_id
If the treq was associated with a connection the connection ID.
fr_dlist_t entry
Used to track the connection in the connecting, full and failed lists.
static void trunk_request_enter_cancel_sent(trunk_request_t *treq)
Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list.
static void trunk_connection_enter_inactive(trunk_connection_t *tconn)
Transition a connection to the inactive state.
trunk_request_state_t from
What state we transitioned from.
fr_dlist_head_t cancel_sent
Sent cancellation request.
void trunk_connection_manage_start(trunk_t *trunk)
Allow the trunk to open and close connections in response to load.
fr_dlist_head_t inactive_draining
Connections which have been signalled to be inactive by the API client, which the trunk manager is dr...
void trunk_connection_signal_inactive(trunk_connection_t *tconn)
Signal a trunk connection cannot accept more requests.
static int _state_log_entry_free(trunk_request_state_log_t *slog)
Used for sanity checks to ensure all log entries have been freed.
void trunk_connection_verify(char const *file, int line, trunk_connection_t *tconn)
fr_heap_t * backlog
The request backlog.
#define IN_REQUEST_CANCEL_MUX(_trunk)
void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
Return a count of requests on a connection in a specific state.
void trunk_request_signal_cancel_partial(trunk_request_t *treq)
Signal a partial cancel write.
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
#define COUNT_BY_STATE(_state, _list)
void * uctx
Uctx data to pass to alloc.
#define TREQ_OPTION_VERIFY(_option, _state)
bool trunk_connection_search(trunk_connection_t *tconn, void *ptr)
#define CONN_BAD_STATE_TRANSITION(_new)
fr_heap_index_t heap_id
Used to track the request conn->pending heap.
#define REQUEST_STATE_TRANSITION(_new)
Record a request state transition and log appropriate output.
trunk_watch_entry_t * next_watcher
Watcher about to be run. Used to prevent nested watchers.
static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Remove requests in specified states from a connection, attempting to distribute them to new connectio...
bool sent
Trunk request has been sent at least once.
void trunk_request_signal_complete(trunk_request_t *treq)
Signal that a trunk request is complete.
static void trunk_connection_auto_unfull(trunk_connection_t *tconn)
Automatically mark a connection as active or reconnect it.
void trunk_connection_signal_reconnect(trunk_connection_t *tconn, connection_reason_t reason)
Signal a trunk connection is no longer viable.
void trunk_connection_signal_writable(trunk_connection_t *tconn)
Signal that a trunk connection is writable.
bool trunk_request_search(trunk_request_t *treq, void *ptr)
fr_dlist_t entry
List entry.
static conf_parser_t const trunk_config_connection[]
trunk_connection_state_t tconn_state
If the treq was associated with a connection the connection state at the time of the state transition...
bool bound_to_conn
Fail the request if there's an attempt to re-enqueue it.
static size_t trunk_cancellation_reasons_len
static fr_table_num_ordered_t const trunk_cancellation_reasons[]
static size_t trunk_conn_trigger_names_len
fr_event_list_t * el
Event list used by this trunk and the connection.
void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line, trunk_request_t const *treq)
#define IN_REQUEST_MUX(_trunk)
fr_dlist_head_t free_requests
Requests in the unassigned state.
trunk_t * trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, trunk_io_funcs_t const *funcs, trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start)
Allocate a new collection of connections.
bool trunk_connection_in_state(trunk_connection_t *tconn, int state)
Returns true if the trunk connection is in one of the specified states.
#define TREQ_HEAP_SEARCH(_heap)
#define REQUEST_EXTRACT_CANCEL_SENT(_treq)
Remove the current request from the cancel sent list.
fr_dlist_t entry
Entry in the linked list.
void trunk_request_signal_reapable(trunk_request_t *treq)
Signal that the request was written to a connection successfully, but no response is expected.
Associates request queues with a connection.
Trace state machine changes for a particular request.
Main trunk management handle.
An entry in a trunk watch function list.
uint16_t max
Maximum number of connections in the trunk.
uint32_t max_req_per_conn
Maximum requests per connection.
trunk_t *_CONST trunk
Trunk this request belongs to.
bool backlog_on_failed_conn
Assign requests to the backlog when there are no available connections and the last connection event ...
uint16_t min
Shouldn't let connections drop below this number.
#define TRUNK_REQUEST_STATE_ALL
All request states.
void *_CONST rctx
Resume ctx of the module.
trunk_t *_CONST trunk
Trunk this connection belongs to.
fr_heap_cmp_t connection_prioritise
Ordering function for connections.
trunk_connection_state_t
Used for sanity checks and to track which list the connection is in.
@ TRUNK_CONN_FULL
Connection is full and can't accept any more requests.
@ TRUNK_CONN_CONNECTING
Connection is connecting.
@ TRUNK_CONN_DRAINING
Connection will be closed once it has no more outstanding requests, if it's not reactivated.
@ TRUNK_CONN_INACTIVE_DRAINING
Connection is inactive, can't accept any more requests, and will be closed once it has no more outsta...
@ TRUNK_CONN_INACTIVE
Connection is inactive and can't accept any more requests.
@ TRUNK_CONN_HALTED
Halted, ready to be freed.
@ TRUNK_CONN_CLOSED
Connection was closed, either explicitly or due to failure.
@ TRUNK_CONN_INIT
In the initial state.
@ TRUNK_CONN_DRAINING_TO_FREE
Connection will be closed once it has no more outstanding requests.
@ TRUNK_CONN_ACTIVE
Connection is connected and ready to service requests.
unsigned req_pool_headers
How many chunk headers the talloc pool allocated with the treq should contain.
request_t *_CONST request
The request that we're writing the data on behalf of.
fr_time_t _CONST last_open
Last time the connection management function opened a connection.
trunk_connection_state_t _CONST state
What state the connection is in.
size_t req_pool_size
The size of the talloc pool allocated with the treq.
uint64_t max_uses
The maximum time a connection can be used.
fr_time_delta_t lifetime
Time between reconnects.
uint16_t connecting
Maximum number of connections that can be in the connecting state.
uint64_t _CONST req_alloc_reused
How many requests were reused.
uint32_t max_backlog
Maximum number of requests that can be in the backlog.
fr_time_t _CONST last_failed
Last time a connection failed.
trunk_request_state_t _CONST state
Which list the request is now located in.
trunk_connection_t *_CONST tconn
Connection this request belongs to.
trunk_connection_alloc_t connection_alloc
Allocate a new connection_t.
fr_time_t _CONST last_read_success
Last time we read a response.
fr_time_t _CONST last_below_target
Last time average utilisation went below the target value.
fr_time_delta_t close_delay
How long we must be below target utilisation to close an existing connection.
uint16_t start
How many connections to start.
fr_time_delta_t req_cleanup_delay
How long must a request in the unassigned (free) list not have been used for before it's cleaned up a...
#define TRUNK_REQUEST_STATE_CANCEL_ALL
All requests in various cancellation states.
bool always_writable
Set to true if our ability to write requests to a connection handle is not dependent on the state of ...
trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
@ TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
@ TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
@ TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
@ TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
#define TRUNK_CONN_ALL
All connection states.
fr_heap_cmp_t request_prioritise
Ordering function for requests.
uint64_t _CONST req_alloc
The number of requests currently allocated that have not been freed or returned to the free list.
trunk_cancel_reason_t
Reasons for a request being cancelled.
@ TRUNK_CANCEL_REASON_NONE
Request has not been cancelled.
@ TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
@ TRUNK_CANCEL_REASON_REQUEUE
A previously sent request is being requeued.
@ TRUNK_CANCEL_REASON_MOVE
Request cancelled because it's being moved.
uint64_t _CONST req_alloc_new
How many requests we've allocated.
fr_time_delta_t open_delay
How long we must be above target utilisation to spawn a new connection.
connection_t *_CONST conn
The underlying connection.
@ TRUNK_STATE_PENDING
Trunk has connections, but none are active.
@ TRUNK_STATE_ACTIVE
Trunk has active connections.
@ TRUNK_STATE_IDLE
Trunk has no connections.
fr_time_t _CONST last_closed
Last time the connection management function closed a connection.
void(* trunk_watch_t)(trunk_t *trunk, trunk_state_t prev, trunk_state_t state, void *uctx)
Receive a notification when a trunk enters a particular state.
fr_time_delta_t manage_interval
How often we run the management algorithm to open/close connections.
@ TRUNK_ENQUEUE_DST_UNAVAILABLE
Destination is down.
@ TRUNK_ENQUEUE_FAIL
General failure.
@ TRUNK_ENQUEUE_OK
Operation was successful.
@ TRUNK_ENQUEUE_NO_CAPACITY
At maximum number of connections, and no connection has capacity.
@ TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
void *_CONST preq
Data for the muxer to write to the connection.
uint32_t target_req_per_conn
How many pending requests should ideally be running on each connection.
fr_time_t _CONST last_connected
Last time a connection connected.
trunk_request_cancel_mux_t request_cancel_mux
!< Read one or more requests from a connection.
trunk_request_state_t
Used for sanity checks and to simplify freeing.
@ TRUNK_REQUEST_STATE_PARTIAL
Some of the request was written to the socket, more of it should be written later.
@ TRUNK_REQUEST_STATE_REAPABLE
Request has been written, needs to persist, but we are not currently waiting for any response.
@ TRUNK_REQUEST_STATE_UNASSIGNED
Transition state - Request currently not assigned to any connection.
@ TRUNK_REQUEST_STATE_INIT
Initial state.
@ TRUNK_REQUEST_STATE_CANCEL_SENT
We've informed the remote server that the request has been cancelled.
@ TRUNK_REQUEST_STATE_COMPLETE
The request is complete.
@ TRUNK_REQUEST_STATE_FAILED
The request failed.
@ TRUNK_REQUEST_STATE_CANCEL
A request on a particular socket was cancel.
@ TRUNK_REQUEST_STATE_CANCEL_PARTIAL
We partially wrote a cancellation request.
@ TRUNK_REQUEST_STATE_BACKLOG
In the backlog.
@ TRUNK_REQUEST_STATE_CANCEL_COMPLETE
Remote server has acknowledged our cancellation.
@ TRUNK_REQUEST_STATE_PENDING
In the queue of a connection and is pending writing.
@ TRUNK_REQUEST_STATE_SENT
Was written to a socket. Waiting for a response.
trunk_state_t _CONST state
Current state of the trunk.
fr_time_t _CONST last_above_target
Last time average utilisation went above the target value.
Common configuration parameters for a trunk.
Public fields for the trunk connection.
I/O functions to pass to trunk_alloc.
Public fields for the trunk.
Public fields for the trunk request.
static fr_event_list_t * el
char const * fr_strerror(void)
Get the last library error.
#define fr_box_time_delta(_val)
static size_t char ** out