27 #define LOG_PREFIX trunk->log_prefix
30 # define TALLOC_GET_TYPE_ABORT_NOOP 1
36 #define _TRUNK_PRIVATE 1
37 #include <freeradius-devel/server/trunk.h>
39 #include <freeradius-devel/server/connection.h>
40 #include <freeradius-devel/server/trigger.h>
41 #include <freeradius-devel/util/misc.h>
42 #include <freeradius-devel/util/syserror.h>
43 #include <freeradius-devel/util/table.h>
44 #include <freeradius-devel/util/minmax_heap.h>
46 #ifdef HAVE_STDATOMIC_H
49 # include <freeradius-devel/util/stdatomic.h>
62 #define fr_time test_time
69 #define FR_TRUNK_REQUEST_STATE_LOG_MAX 20
419 #define CONN_TRIGGER(_state) do { \
420 if (trunk->pub.triggers) { \
421 trigger_exec(unlang_interpret_get_thread_default(), \
422 NULL, fr_table_str_by_value(fr_trunk_conn_trigger_names, _state, \
423 "<INVALID>"), true, NULL); \
427 #define CONN_STATE_TRANSITION(_new, _log) \
429 _log("[%" PRIu64 "] Trunk connection changed state %s -> %s", \
430 tconn->pub.conn->id, \
431 fr_table_str_by_value(fr_trunk_connection_states, tconn->pub.state, "<INVALID>"), \
432 fr_table_str_by_value(fr_trunk_connection_states, _new, "<INVALID>")); \
433 tconn->pub.state = _new; \
434 CONN_TRIGGER(_new); \
435 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false); \
438 #define CONN_BAD_STATE_TRANSITION(_new) \
440 if (!fr_cond_assert_msg(0, "[%" PRIu64 "] Trunk connection invalid transition %s -> %s", \
441 tconn->pub.conn->id, \
442 fr_table_str_by_value(fr_trunk_connection_states, tconn->pub.state, "<INVALID>"), \
443 fr_table_str_by_value(fr_trunk_connection_states, _new, "<INVALID>"))) return; \
450 #define REQUEST_TRIGGER(_state) do { \
451 if (trunk->pub.triggers) { \
452 trigger_exec(unlang_interpret_get_thread_default(), \
453 NULL, fr_table_str_by_value(fr_trunk_req_trigger_names, _state, \
454 "<INVALID>"), true, NULL); \
461 #define REQUEST_STATE_TRANSITION(_new) \
463 request_t *request = treq->pub.request; \
464 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
466 fr_table_str_by_value(fr_trunk_request_states, treq->pub.state, "<INVALID>"), \
467 fr_table_str_by_value(fr_trunk_request_states, _new, "<INVALID>")); \
468 trunk_request_state_log_entry_add(__FUNCTION__, __LINE__, treq, _new); \
469 treq->pub.state = _new; \
470 REQUEST_TRIGGER(_new); \
472 #define REQUEST_BAD_STATE_TRANSITION(_new) \
474 fr_trunk_request_state_log(&default_log, L_ERR, __FILE__, __LINE__, treq); \
475 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
477 fr_table_str_by_value(fr_trunk_request_states, treq->pub.state, "<INVALID>"), \
478 fr_table_str_by_value(fr_trunk_request_states, _new, "<INVALID>"))) return; \
484 #define REQUEST_STATE_TRANSITION(_new) \
486 request_t *request = treq->pub.request; \
487 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
489 fr_table_str_by_value(fr_trunk_request_states, treq->pub.state, "<INVALID>"), \
490 fr_table_str_by_value(fr_trunk_request_states, _new, "<INVALID>")); \
491 treq->pub.state = _new; \
493 #define REQUEST_BAD_STATE_TRANSITION(_new) \
495 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
497 fr_table_str_by_value(fr_trunk_request_states, treq->pub.state, "<INVALID>"), \
498 fr_table_str_by_value(fr_trunk_request_states, _new, "<INVALID>"))) return; \
506 #define DO_REQUEST_CANCEL(_treq, _reason) \
508 if ((_treq)->pub.trunk->funcs.request_cancel) { \
509 request_t *request = (_treq)->pub.request; \
510 void *_prev = (_treq)->pub.trunk->in_handler; \
511 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_cancel; \
512 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_cancel(conn=%p, preq=%p, reason=%s, uctx=%p)", \
513 (_treq)->pub.tconn->pub.conn, \
515 fr_table_str_by_value(fr_trunk_cancellation_reasons, \
518 (_treq)->pub.trunk->uctx); \
519 (_treq)->pub.trunk->funcs.request_cancel((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_reason), (_treq)->pub.trunk->uctx); \
520 (_treq)->pub.trunk->in_handler = _prev; \
527 #define DO_REQUEST_CONN_RELEASE(_treq) \
529 if ((_treq)->pub.trunk->funcs.request_conn_release) { \
530 request_t *request = (_treq)->pub.request; \
531 void *_prev = (_treq)->pub.trunk->in_handler; \
532 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_conn_release; \
533 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_conn_release(conn=%p, preq=%p, uctx=%p)", \
534 (_treq)->pub.tconn->pub.conn, \
536 (_treq)->pub.trunk->uctx); \
537 (_treq)->pub.trunk->funcs.request_conn_release((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
538 (_treq)->pub.trunk->in_handler = _prev; \
545 #define DO_REQUEST_COMPLETE(_treq) \
547 if ((_treq)->pub.trunk->funcs.request_complete) { \
548 request_t *request = (_treq)->pub.request; \
549 void *_prev = (_treq)->pub.trunk->in_handler; \
550 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_complete(request=%p, preq=%p, rctx=%p, uctx=%p)", \
551 (_treq)->pub.request, \
554 (_treq)->pub.trunk->uctx); \
555 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_complete; \
556 (_treq)->pub.trunk->funcs.request_complete((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, (_treq)->pub.trunk->uctx); \
557 (_treq)->pub.trunk->in_handler = _prev; \
564 #define DO_REQUEST_FAIL(_treq, _prev_state) \
566 if ((_treq)->pub.trunk->funcs.request_fail) { \
567 request_t *request = (_treq)->pub.request; \
568 void *_prev = (_treq)->pub.trunk->in_handler; \
569 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_fail(request=%p, preq=%p, rctx=%p, state=%s uctx=%p)", \
570 (_treq)->pub.request, \
573 fr_table_str_by_value(fr_trunk_request_states, (_prev_state), "<INVALID>"), \
574 (_treq)->pub.trunk->uctx); \
575 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_fail; \
576 (_treq)->pub.trunk->funcs.request_fail((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, _prev_state, (_treq)->pub.trunk->uctx); \
577 (_treq)->pub.trunk->in_handler = _prev; \
584 #define DO_REQUEST_FREE(_treq) \
586 if ((_treq)->pub.trunk->funcs.request_free) { \
587 request_t *request = (_treq)->pub.request; \
588 void *_prev = (_treq)->pub.trunk->in_handler; \
589 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_free(request=%p, preq=%p, uctx=%p)", \
590 (_treq)->pub.request, \
592 (_treq)->pub.trunk->uctx); \
593 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_free; \
594 (_treq)->pub.trunk->funcs.request_free((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
595 (_treq)->pub.trunk->in_handler = _prev; \
602 #define DO_REQUEST_MUX(_tconn) \
604 void *_prev = (_tconn)->pub.trunk->in_handler; \
605 DEBUG3("[%" PRIu64 "] Calling request_mux(el=%p, tconn=%p, conn=%p, uctx=%p)", \
606 (_tconn)->pub.conn->id, \
607 (_tconn)->pub.trunk->el, \
609 (_tconn)->pub.conn, \
610 (_tconn)->pub.trunk->uctx); \
611 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_mux; \
612 (_tconn)->pub.trunk->funcs.request_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
613 (_tconn)->pub.trunk->in_handler = _prev; \
619 #define DO_REQUEST_DEMUX(_tconn) \
621 void *_prev = (_tconn)->pub.trunk->in_handler; \
622 DEBUG3("[%" PRIu64 "] Calling request_demux(tconn=%p, conn=%p, uctx=%p)", \
623 (_tconn)->pub.conn->id, \
625 (_tconn)->pub.conn, \
626 (_tconn)->pub.trunk->uctx); \
627 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_demux; \
628 (_tconn)->pub.trunk->funcs.request_demux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
629 (_tconn)->pub.trunk->in_handler = _prev; \
635 #define DO_REQUEST_CANCEL_MUX(_tconn) \
637 if ((_tconn)->pub.trunk->funcs.request_cancel_mux) { \
638 void *_prev = (_tconn)->pub.trunk->in_handler; \
639 DEBUG3("[%" PRIu64 "] Calling request_cancel_mux(tconn=%p, conn=%p, uctx=%p)", \
640 (_tconn)->pub.conn->id, \
642 (_tconn)->pub.conn, \
643 (_tconn)->pub.trunk->uctx); \
644 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_cancel_mux; \
645 (_tconn)->pub.trunk->funcs.request_cancel_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
646 (_tconn)->pub.trunk->in_handler = _prev; \
653 #define DO_CONNECTION_ALLOC(_tconn) \
655 void *_prev = trunk->in_handler; \
656 DEBUG3("Calling connection_alloc(tconn=%p, el=%p, conf=%p, log_prefix=\"%s\", uctx=%p)", \
658 (_tconn)->pub.trunk->el, \
659 (_tconn)->pub.trunk->conf.conn_conf, \
661 (_tconn)->pub.trunk->uctx); \
662 (_tconn)->pub.trunk->in_handler = (void *) (_tconn)->pub.trunk->funcs.connection_alloc; \
663 (_tconn)->pub.conn = trunk->funcs.connection_alloc((_tconn), (_tconn)->pub.trunk->el, (_tconn)->pub.trunk->conf.conn_conf, (_tconn)->pub.trunk->log_prefix, trunk->uctx); \
664 (_tconn)->pub.trunk->in_handler = _prev; \
665 if (!(_tconn)->pub.conn) { \
666 ERROR("Failed creating new connection"); \
667 talloc_free(tconn); \
675 #define DO_CONNECTION_NOTIFY(_tconn, _events) \
677 if ((_tconn)->pub.trunk->funcs.connection_notify) { \
678 void *_prev = (_tconn)->pub.trunk->in_handler; \
679 DEBUG3("[%" PRIu64 "] Calling connection_notify(tconn=%p, conn=%p, el=%p, events=%s, uctx=%p)", \
680 (_tconn)->pub.conn->id, \
682 (_tconn)->pub.conn, \
683 (_tconn)->pub.trunk->el, \
684 fr_table_str_by_value(fr_trunk_connection_events, (_events), "<INVALID>"), \
685 (_tconn)->pub.trunk->uctx); \
686 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.connection_notify; \
687 (_tconn)->pub.trunk->funcs.connection_notify((_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->el, (_events), (_tconn)->pub.trunk->uctx); \
688 (_tconn)->pub.trunk->in_handler = _prev; \
692 #define IN_HANDLER(_trunk) (((_trunk)->in_handler) != NULL)
693 #define IN_REQUEST_MUX(_trunk) (((_trunk)->funcs.request_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_mux))
694 #define IN_REQUEST_DEMUX(_trunk) (((_trunk)->funcs.request_demux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_demux))
695 #define IN_REQUEST_CANCEL_MUX(_trunk) (((_trunk)->funcs.request_cancel_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_cancel_mux))
697 #define IS_SERVICEABLE(_tconn) ((_tconn)->pub.state & FR_TRUNK_CONN_SERVICEABLE)
698 #define IS_PROCESSING(_tconn) ((tconn)->pub.state & FR_TRUNK_CONN_PROCESSING)
703 #define REQUEST_EXTRACT_BACKLOG(_treq) \
706 _ret = fr_heap_extract(&(_treq)->pub.trunk->backlog, _treq); \
707 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from backlog heap: %s", fr_strerror())) break; \
713 #define REQUEST_EXTRACT_PENDING(_treq) \
716 _ret = fr_heap_extract(&(_treq)->pub.tconn->pending, _treq); \
717 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from pending heap: %s", fr_strerror())) break; \
723 #define REQUEST_EXTRACT_PARTIAL(_treq) \
725 fr_assert((_treq)->pub.tconn->partial == treq); \
726 tconn->partial = NULL; \
732 #define REQUEST_EXTRACT_SENT(_treq) fr_dlist_remove(&tconn->sent, treq)
737 #define REQUEST_EXTRACT_CANCEL(_treq) fr_dlist_remove(&tconn->cancel, treq)
742 #define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq) \
744 fr_assert((_treq)->pub.tconn->cancel_partial == treq); \
745 tconn->cancel_partial = NULL; \
751 #define REQUEST_EXTRACT_CANCEL_SENT(_treq) fr_dlist_remove(&tconn->cancel_sent, treq)
757 #define CONN_REORDER(_tconn) \
760 if ((fr_minmax_heap_num_elements((_tconn)->pub.trunk->active) == 1)) break; \
761 if (!fr_cond_assert((_tconn)->pub.state == FR_TRUNK_CONN_ACTIVE)) break; \
762 _ret = fr_minmax_heap_extract((_tconn)->pub.trunk->active, (_tconn)); \
763 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) break; \
764 fr_minmax_heap_insert((_tconn)->pub.trunk->active, (_tconn)); \
796 #define CALL_WATCHERS(_trunk, _state) \
798 if (fr_dlist_empty(&(_trunk)->watch[_state])) break; \
799 trunk_watch_call((_trunk), &(_trunk)->watch[_state], _state); \
821 if (entry->
func == watch) {
860 memcpy(&entry->
uctx, &uctx,
sizeof(entry->
uctx));
866 #define TRUNK_STATE_TRANSITION(_new) \
868 DEBUG3("Trunk changed state %s -> %s", \
869 fr_table_str_by_value(fr_trunk_states, trunk->pub.state, "<INVALID>"), \
870 fr_table_str_by_value(fr_trunk_states, _new, "<INVALID>")); \
871 CALL_WATCHERS(trunk, _new); \
872 trunk->pub.state = _new; \
1552 RWARN,
WARN,
"Refusing to enqueue requests - "
1553 "No active connections and last event was a connection failure");
1573 uint64_t total_reqs;
1577 if (total_reqs >= limit) {
1579 RWARN,
WARN,
"Refusing to alloc requests - "
1580 "Limit of %"PRIu64
" (max = %u * per_connection_max = %u) "
1660 int states, uint64_t max)
1665 if (max == 0) max = UINT64_MAX;
1667 #define OVER_MAX_CHECK if (++count > max) return (count - 1)
1669 #define DEQUEUE_ALL(_src_list, _state) do { \
1670 while ((treq = fr_dlist_head(_src_list))) { \
1672 fr_assert(treq->pub.state == (_state)); \
1673 trunk_request_enter_unassigned(treq); \
1674 fr_dlist_insert_tail(out, treq); \
1778 if (max == 0) max = UINT64_MAX;
1811 "Failed extracting conn from active heap: %s",
fr_strerror()))
goto done;
1875 "Failed re-inserting conn into active heap: %s",
fr_strerror()))
goto done;
1877 if (moved >= max)
goto done;
1957 "%s can only be called from within request_mux handler", __FUNCTION__))
return;
1978 "%s can only be called from within request_mux handler", __FUNCTION__))
return;
2059 "%s cannot be called within a handler", __FUNCTION__))
return;
2079 "Bad state %s after cancellation",
2136 "%s can only be called from within request_cancel_mux handler", __FUNCTION__))
return;
2160 "%s can only be called from within request_cancel_mux handler", __FUNCTION__))
return;
2184 "%s can only be called from within request_demux or request_cancel_mux handlers",
2185 __FUNCTION__))
return;
2243 *treq_to_free = NULL;
2278 talloc_free_children(treq);
2297 talloc_free_children(treq);
2383 RWARN,
WARN,
"Refusing to alloc requests - "
2384 "Limit of %"PRIu64
" (max = %u * per_connection_max = %u) "
2482 request_t *request,
void *preq,
void *rctx)
2630 request_t *request,
void *preq,
void *rctx,
2644 if (!ignore_limits) {
2692 fr_assert_msg(slog,
"slog list head NULL but element counter was %u",
2695 memset(slog, 0,
sizeof(*slog));
2726 fr_log(log, log_type,
file,
line,
"[%u] %s:%i - in conn %"PRIu64
" in state %s - %s -> %s",
3363 PERROR(
"Failed inserting connection reconnection timer event, halting connection");
3390 bool need_requeue =
false;
3399 need_requeue =
true;
3713 "%s can only be called from within request_cancel_mux handler",
3714 __FUNCTION__))
return -2;
3717 if (!*treq_out)
return 1;
3761 "%s can only be called from within request_mux handler",
3762 __FUNCTION__))
return -2;
3765 if (!*treq_out)
return 1;
3781 "%s cannot be called within a handler", __FUNCTION__))
return;
3783 DEBUG3(
"[%" PRIu64
"] Signalled writable", tconn->
pub.
conn->id);
3799 "%s cannot be called within a handler", __FUNCTION__))
return;
3801 DEBUG3(
"[%" PRIu64
"] Signalled readable", tconn->
pub.
conn->id);
3941 DEBUG3(
"Closing %s connection with no requests",
3984 DEBUG3(
"Rebalancing requests");
4045 DEBUG4(
"Managing trunk");
4117 DEBUG4(
"Not opening connection - Too many (%u) connections in the connecting state",
4129 DEBUG4(
"Not opening connection - Need to be above target for %pVs. It's been %pVs",
4144 DEBUG4(
"Not opening connection - Have %u connections, need %u or below",
4155 DEBUG4(
"Not opening connection - No outstanding requests");
4163 if (conn_count > 0) {
4165 if (average < trunk->
conf.target_req_per_conn) {
4166 DEBUG4(
"Not opening connection - Would leave us below our target requests "
4167 "per connection (now %u, after open %u)",
4196 DEBUG4(
"Not opening connection - Need to wait %pVs before opening another connection. "
4203 DEBUG4(
"Opening connection - Above target requests per connection (now %u, target %u)",
4215 DEBUG4(
"Not closing connection - Need to be below target for %pVs. It's been %pVs",
4224 DEBUG4(
"Not closing connection - No connections to close!");
4228 if ((trunk->
conf.
min > 0) && ((conn_count - 1) < trunk->
conf.
min)) {
4229 DEBUG4(
"Not closing connection - Have %u connections, need %u or above",
4235 DEBUG4(
"Closing connection - No outstanding requests");
4247 if (conn_count == 1) {
4248 DEBUG4(
"Not closing connection - Would leave connections "
4249 "and there are still %u outstanding requests", req_count);
4259 DEBUG4(
"Not closing connection - Would leave us above our target requests per connection "
4260 "(now %u, after close %u)",
ROUND_UP_DIV(req_count, conn_count), average);
4264 DEBUG4(
"Closing connection - Below target requests per connection (now %u, target %u)",
4269 DEBUG4(
"Not closing connection - Need to wait %pVs before closing another connection. "
4341 PERROR(
"Failed inserting trunk management event");
4360 #define COUNT_BY_STATE(_state, _list) \
4362 if (conn_state & (_state)) { \
4364 while ((tconn = fr_dlist_next(&trunk->_list, tconn))) { \
4365 count += fr_trunk_request_count_by_connection(tconn, req_state); \
4409 uint64_t req_per_conn = 0;
4452 if (conn_count == 0) {
4457 if (req_count == 0) {
4474 }
else if (req_per_conn < trunk->
conf.target_req_per_conn) {
4485 if (conn_count_out) *conn_count_out = conn_count;
4486 if (req_count_out) *req_count_out = req_count;
4495 return req_per_conn;
4560 #define RECONNECT_BY_STATE(_state, _list) \
4562 if (states & (_state)) { \
4564 for (i = fr_dlist_num_elements(&trunk->_list); i > 0; i--) { \
4565 fr_connection_signal_reconnect(((fr_trunk_connection_t *)fr_dlist_tail(&trunk->_list))->pub.conn, reason); \
4605 for (i = 0; i < trunk->
conf.
start; i++) {
4606 DEBUG(
"[%i] Starting initial connection", i);
4617 PERROR(
"Failed inserting trunk management event");
4634 DEBUG3(
"Connection management enabled");
4645 DEBUG3(
"Connection management disabled");
4656 PERROR(
"Failed inserting trunk management event");
4677 return ((a_count > b_count) && ((a_count - b_count) > 1)) - ((b_count > a_count) && ((b_count - a_count) > 1));
4690 DEBUG4(
"Trunk free %p", trunk);
4769 char const *log_prefix,
void const *uctx,
bool delay_start)
4781 trunk->
log_prefix = talloc_strdup(trunk, log_prefix);
4783 memcpy(&trunk->
funcs, funcs,
sizeof(trunk->
funcs));
4791 memcpy(&trunk->
uctx, &uctx,
sizeof(trunk->
uctx));
4827 DEBUG4(
"Trunk allocated %p", trunk);
4839 #ifndef TALLOC_GET_TYPE_ABORT_NOOP
4851 (void) talloc_get_type_abort(trunk,
fr_trunk_t);
4857 #define IO_FUNC_VERIFY(_func) \
4858 fr_fatal_assert_msg(trunk->funcs._func, "CONSISTENCY_CHECK_FAILED %s[%i}: " #_func " was NULL", file, line)
4867 #define TRUNK_TCONN_CHECKS(_tconn, _state) \
4869 fr_fatal_assert_msg(trunk == _tconn->pub.trunk, \
4870 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-trunk mismatch", file, line); \
4871 fr_fatal_assert_msg(_state == _tconn->pub.state, \
4872 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-state mismatch", file, line); \
4875 #define TCONN_DLIST_VERIFY(_dlist, _state) \
4877 _fr_dlist_verify(file, line, &(trunk->_dlist)); \
4878 fr_dlist_foreach(&(trunk->_dlist), fr_trunk_connection_t, tconn) { \
4879 fr_trunk_connection_verify(file, line, tconn); \
4880 TRUNK_TCONN_CHECKS(tconn, _state); \
4884 #define TCONN_MINMAX_HEAP_VERIFY(_heap, _state) \
4886 fr_minmax_heap_verify(file, line, trunk->_heap); \
4887 fr_minmax_heap_foreach(trunk->_heap, fr_trunk_connection_t, tconn) { \
4888 fr_trunk_connection_verify(file, line, tconn); \
4889 TRUNK_TCONN_CHECKS(tconn, _state); \
4921 #define TCONN_TREQ_CHECKS(_treq, _state) \
4923 fr_fatal_assert_msg(tconn == _treq->pub.tconn, \
4924 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-tconn mismatch", file, line); \
4925 fr_fatal_assert_msg(tconn->pub.trunk == _treq->pub.trunk, \
4926 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-trunk mismatch", file, line); \
4927 fr_fatal_assert_msg(_state == _treq->pub.state, \
4928 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-state mismatch", file, line); \
4931 #define TREQ_DLIST_VERIFY(_dlist, _state) \
4933 _fr_dlist_verify(file, line, &(tconn->_dlist)); \
4934 fr_dlist_foreach(&(tconn->_dlist), fr_trunk_request_t, treq) { \
4935 fr_trunk_request_verify(file, line, treq); \
4936 TCONN_TREQ_CHECKS(treq, _state); \
4940 #define TREQ_HEAP_VERIFY(_heap, _state) \
4942 fr_heap_iter_t _iter; \
4943 fr_heap_verify(file, line, tconn->_heap); \
4944 for (fr_trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
4946 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
4947 fr_trunk_request_verify(file, line, treq); \
4948 TCONN_TREQ_CHECKS(treq, _state); \
4952 #define TREQ_OPTION_VERIFY(_option, _state) \
4954 if (tconn->_option) { \
4955 fr_trunk_request_verify(file, line, tconn->_option); \
4956 TCONN_TREQ_CHECKS(tconn->_option, _state); \
4974 #ifdef WITH_VERIFY_PTR
4982 #define TCONN_DLIST_SEARCH(_dlist) \
4984 fr_dlist_foreach(&(trunk->_dlist), fr_trunk_connection_t, tconn) { \
4985 if (ptr == tconn) { \
4986 fr_fprintf(stderr, "fr_trunk_search: tconn %p on " #_dlist "\n", ptr); \
4989 if (fr_trunk_connection_search(tconn, ptr)) { \
4990 fr_fprintf(stderr, " in tconn %p on " #_dlist "\n", tconn); \
4996 #define TCONN_MINMAX_HEAP_SEARCH(_heap) \
4998 fr_minmax_heap_foreach(trunk->_heap, fr_trunk_connection_t, tconn) { \
4999 if (ptr == tconn) { \
5000 fr_fprintf(stderr, "fr_trunk_search: tconn %p on " #_heap "\n", ptr); \
5003 if (fr_trunk_connection_search(tconn, ptr)) { \
5004 fr_fprintf(stderr, " on tconn %p on " #_heap "\n", tconn); \
5027 #define TREQ_DLIST_SEARCH(_dlist) \
5029 fr_dlist_foreach(&(tconn->_dlist), fr_trunk_request_t, treq) { \
5030 if (ptr == treq) { \
5031 fr_fprintf(stderr, "fr_trunk_search: treq %p on " #_dlist "\n", ptr); \
5034 if (fr_trunk_request_search(treq, ptr)) { \
5035 fr_fprintf(stderr, "fr_trunk_search: preq %p found on " #_dlist, ptr); \
5041 #define TREQ_HEAP_SEARCH(_heap) \
5043 fr_heap_iter_t _iter; \
5044 for (fr_trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5046 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5047 if (ptr == treq) { \
5048 fr_fprintf(stderr, "fr_trunk_search: treq %p in " #_heap "\n", ptr); \
5051 if (fr_trunk_request_search(treq, ptr)) { \
5052 fr_fprintf(stderr, "fr_trunk_search: preq %p found in " #_heap, ptr); \
5058 #define TREQ_OPTION_SEARCH(_option) \
5060 if (tconn->_option) { \
5061 if (ptr == tconn->_option) { \
5062 fr_fprintf(stderr, "fr_trunk_search: treq %p is " #_option "\n", ptr); \
5065 if (fr_trunk_request_search(tconn->_option, ptr)) { \
5066 fr_fprintf(stderr, "fr_trunk_search: preq %p found in " #_option, ptr); \
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request)
#define L(_str)
Helper for initialising arrays of string literals.
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
#define CONF_PARSER_TERMINATOR
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
#define FR_CONF_OFFSET_SUBSECTION(_name, _flags, _struct, _field, _subcs)
conf_parser_t which populates a sub-struct using a CONF_SECTION
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Defines a CONF_PAIR to C data type mapping.
@ FR_CONNECTION_STATE_CLOSED
Connection has been closed.
@ FR_CONNECTION_STATE_HALTED
The connection is in a halted stat.
@ FR_CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
@ FR_CONNECTION_STATE_FAILED
Connection has failed.
@ FR_CONNECTION_STATE_INIT
Init state, sets up connection.
@ FR_CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
@ FR_CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
static void _fr_dlist_verify(char const *file, int line, fr_dlist_head_t const *list_head)
Check all items in the list are valid.
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
static void * fr_dlist_tail(fr_dlist_head_t const *list_head)
Return the TAIL item of a list or NULL if the list is empty.
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
#define fr_dlist_verify(_head)
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
static void * fr_dlist_prev(fr_dlist_head_t const *list_head, void const *ptr)
Get the previous item in a list.
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
Head of a doubly linked list.
Entry in a doubly linked list.
#define fr_event_timer_in(...)
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
unsigned int fr_heap_index_t
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
#define FR_HEAP_VERIFY(_heap)
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
#define RATE_LIMIT_LOCAL_ROPTIONAL(_entry, _l_request, _l_global, _fmt,...)
Rate limit messages using a local limiting entry.
Track when a log message was last repeated.
int fr_event_timer_delete(fr_event_timer_t const **ev_p)
Delete a timer event from the event list.
Stores all information relating to an event list.
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
#define ROUND_UP_DIV(_x, _y)
Get the ceiling value of integer division.
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
void * fr_minmax_heap_iter_next(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Get the next entry in a minmax heap.
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
void * fr_minmax_heap_iter_init(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Iterate over entries in a minmax heap.
unsigned int fr_minmax_heap_iter_t
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
int8_t fr_pointer_cmp(void const *a, void const *b)
Compares two pointers.
static fr_event_list_t * events
static int8_t request_prioritise(void const *one, void const *two)
static size_t min(size_t x, size_t y)
void fr_connection_signals_pause(fr_connection_t *conn)
Pause processing of deferred signals.
int fr_connection_del_watch_pre(fr_connection_t *conn, fr_connection_state_t state, fr_connection_watch_t watch)
Remove a watch function from a pre list.
fr_connection_watch_entry_t * fr_connection_add_watch_post(fr_connection_t *conn, fr_connection_state_t state, fr_connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed after a state function has been called.
void fr_connection_signal_halt(fr_connection_t *conn)
Shuts down a connection ungracefully.
void fr_connection_signals_resume(fr_connection_t *conn)
Resume processing of deferred signals.
void fr_connection_signal_init(fr_connection_t *conn)
Asynchronously signal a halted connection to start.
fr_connection_watch_entry_t * fr_connection_add_watch_pre(fr_connection_t *conn, fr_connection_state_t state, fr_connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed before a state function has been called.
void fr_connection_signal_shutdown(fr_connection_t *conn)
Shuts down a connection gracefully.
int fr_connection_del_watch_post(fr_connection_t *conn, fr_connection_state_t state, fr_connection_watch_t watch)
Remove a watch function from a post list.
void fr_connection_signal_reconnect(fr_connection_t *conn, fr_connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
static fr_time_t test_time_base
if(!subtype_vp) goto fail
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
init
Enter the EAP-IDENTITY state.
#define fr_time()
Allow us to arbitrarily manipulate time.
#define atomic_fetch_add_explicit(object, operand, order)
#define ATOMIC_VAR_INIT(value)
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
An element in a table indexed by bit position.
An element in an arbitrarily ordered array of name to num mappings.
#define talloc_get_type_abort_const
#define talloc_pooled_object(_ctx, _type, _num_subobjects, _total_subobjects_size)
#define fr_time_gteq(_a, _b)
#define fr_time_delta_wrap(_time)
#define fr_time_wrap(_time)
#define fr_time_lteq(_a, _b)
#define fr_time_delta_ispos(_a)
#define fr_time_add(_a, _b)
Add a time/time delta together.
#define fr_time_gt(_a, _b)
#define fr_time_sub(_a, _b)
Subtract one time from another.
#define fr_time_lt(_a, _b)
fr_dlist_head_t inactive_draining
Connections which have been signalled to be inactive by the API client, which the trunk manager is dr...
static atomic_uint_fast64_t request_counter
fr_dlist_head_t free_requests
Requests in the unassigned state.
#define CONN_REORDER(_tconn)
Reorder the connections in the active heap.
static void _trunk_connection_on_connecting(UNUSED fr_connection_t *conn, UNUSED fr_connection_state_t prev, UNUSED fr_connection_state_t state, void *uctx)
Connection transitioned to the connecting state.
int fr_trunk_start(fr_trunk_t *trunk)
Start the trunk running.
static void trunk_request_enter_unassigned(fr_trunk_request_t *treq)
Transition a request to the unassigned state, in preparation for re-assignment.
#define REQUEST_EXTRACT_SENT(_treq)
Remove the current request from the sent list.
fr_dlist_head_t inactive
Connections which have been signalled to be inactive by the API client.
uint64_t tconn_id
If the treq was associated with a connection the connection ID.
static fr_table_num_indexed_bit_pos_t const fr_trunk_req_trigger_names[]
Map request states to trigger names.
void fr_trunk_request_free(fr_trunk_request_t **treq_to_free)
If the trunk request is freed then update the target requests.
void fr_trunk_connection_manage_stop(fr_trunk_t *trunk)
Stop the trunk from opening and closing connections in response to load.
static void trunk_connection_close_if_empty(fr_trunk_t *trunk, fr_dlist_head_t *head)
Close connections in a particular connection list if they have no requests associated with them.
void * uctx
User data to pass to the function.
void fr_trunk_request_signal_cancel_sent(fr_trunk_request_t *treq)
Signal that a remote server has been notified of the cancellation.
static fr_table_num_ordered_t const fr_trunk_connection_states[]
conf_parser_t const fr_trunk_config[]
Config parser definitions to populate a fr_trunk_conf_t.
#define IN_HANDLER(_trunk)
static int8_t _trunk_request_prioritise(void const *a, void const *b)
Compare two protocol requests.
fr_dlist_head_t connecting
Connections which are not yet in the open state.
static void trunk_request_enter_cancel_complete(fr_trunk_request_t *treq)
Cancellation was acked, the request is complete, free it.
fr_heap_index_t heap_id
Used to track the request conn->pending heap.
struct fr_trunk_pub_s pub
Public fields in the trunk connection.
void fr_trunk_reconnect(fr_trunk_t *trunk, int states, fr_connection_reason_t reason)
Force the trunk to re-establish its connections.
static int _trunk_free(fr_trunk_t *trunk)
Free a trunk, gracefully closing all connections.
#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq)
Remove the current request from the cancel_partial slot.
fr_dlist_head_t failed
Connections that'll be reconnected shortly.
void fr_trunk_connection_signal_active(fr_trunk_connection_t *tconn)
Signal a trunk connection is no longer full.
fr_dlist_t entry
Used to track the connection in the connecting, full and failed lists.
static int _trunk_connection_free(fr_trunk_connection_t *tconn)
Free a connection.
void fr_trunk_request_signal_cancel(fr_trunk_request_t *treq)
Cancel a trunk request.
#define TREQ_OPTION_SEARCH(_option)
static void trunk_request_enter_sent(fr_trunk_request_t *treq)
Transition a request to the sent state, indicating that it's been sent in its entirety.
fr_trunk_connection_t * tconn
The request was associated with.
#define DO_REQUEST_FREE(_treq)
Call the free callback (if set)
fr_trunk_enqueue_t fr_trunk_request_enqueue_on_conn(fr_trunk_request_t **treq_out, fr_trunk_connection_t *tconn, request_t *request, void *preq, void *rctx, bool ignore_limits)
Enqueue additional requests on a specific connection.
#define REQUEST_EXTRACT_BACKLOG(_treq)
Remove the current request from the backlog.
fr_heap_t * backlog
The request backlog.
struct fr_trunk_connection_pub_s pub
Public fields in the trunk connection.
#define CONN_STATE_TRANSITION(_new, _log)
fr_trunk_request_t * fr_trunk_request_alloc(fr_trunk_t *trunk, request_t *request)
(Pre-)Allocate a new trunk request
fr_minmax_heap_t * active
Connections which can service requests.
static void _trunk_connection_on_closed(UNUSED fr_connection_t *conn, UNUSED fr_connection_state_t prev, UNUSED fr_connection_state_t state, void *uctx)
Connection failed after it was connected.
#define TCONN_DLIST_VERIFY(_dlist, _state)
#define IO_FUNC_VERIFY(_func)
fr_dlist_head_t closed
Connections that have closed.
#define DO_REQUEST_COMPLETE(_treq)
Call the complete callback (if set)
fr_trunk_watch_entry_t * fr_trunk_add_watch(fr_trunk_t *trunk, fr_trunk_state_t state, fr_trunk_watch_t watch, bool oneshot, void const *uctx)
Add a watch entry to the trunk state list.
static conf_parser_t const fr_trunk_config_request[]
bool freeing
Trunk is being freed, don't spawn new connections or re-enqueue.
static void _trunk_connection_on_shutdown(UNUSED fr_connection_t *conn, UNUSED fr_connection_state_t prev, UNUSED fr_connection_state_t state, void *uctx)
Connection transitioned to the shutdown state.
static fr_table_num_ordered_t const fr_trunk_cancellation_reasons[]
static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out, fr_trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify)
Update timestamps for when we last had a transition from above target to below target or vice versa.
bool oneshot
Remove the function after it's called once.
static conf_parser_t const fr_trunk_config_connection[]
static void trunk_connection_auto_unfull(fr_trunk_connection_t *tconn)
Automatically mark a connection as active or reconnect it.
int fr_trunk_connection_pop_cancellation(fr_trunk_request_t **treq_out, fr_trunk_connection_t *tconn)
Pop a cancellation request off a connection's cancellation queue.
void fr_trunk_request_signal_sent(fr_trunk_request_t *treq)
Signal that the request was written to a connection successfully.
fr_trunk_enqueue_t fr_trunk_request_enqueue(fr_trunk_request_t **treq_out, fr_trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
#define DO_REQUEST_CANCEL(_treq, _reason)
Call the cancel callback if set.
static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
Order connections by queue depth.
static fr_trunk_enqueue_t trunk_request_enqueue_existing(fr_trunk_request_t *treq)
Enqueue a request which has never been assigned to a connection or was previously cancelled.
static int _trunk_request_free(fr_trunk_request_t *treq)
Actually free the trunk request.
int fr_trunk_del_watch(fr_trunk_t *trunk, fr_trunk_state_t state, fr_trunk_watch_t watch)
Remove a watch function from a trunk state list.
#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state)
uint32_t fr_trunk_request_count_by_connection(fr_trunk_connection_t const *tconn, int req_state)
Return the count number of requests associated with a trunk connection.
#define TCONN_MINMAX_HEAP_SEARCH(_heap)
struct fr_trunk_watch_entry_s fr_trunk_watch_entry_t
An entry in a trunk watch function list.
fr_heap_index_t heap_id
Used to track the connection in the connected heap.
#define DO_REQUEST_CONN_RELEASE(_treq)
Call the "conn_release" callback (if set)
#define TREQ_DLIST_SEARCH(_dlist)
#define REQUEST_EXTRACT_CANCEL(_treq)
Remove the current request from the cancel list.
bool fr_trunk_connection_search(fr_trunk_connection_t *tconn, void *ptr)
static void _trunk_connection_on_failed(fr_connection_t *conn, fr_connection_state_t prev, fr_connection_state_t state, void *uctx)
Connection failed.
#define REQUEST_BAD_STATE_TRANSITION(_new)
static void trunk_backlog_drain(fr_trunk_t *trunk)
Drain the backlog of as many requests as possible.
fr_rate_limit_t limit_max_requests_alloc_log
Rate limit on "Refusing to alloc requests - Limit of * requests reached".
bool bound_to_conn
Fail the request if there's an attempt to re-enqueue it.
static bool trunk_connection_is_full(fr_trunk_connection_t *tconn)
Return whether a trunk connection should currently be considered full.
fr_trunk_request_state_t from
What state we transitioned from.
static void trunk_request_remove_from_conn(fr_trunk_request_t *treq)
Remove a request from all connection lists.
#define TCONN_DLIST_SEARCH(_dlist)
fr_trunk_connection_event_t events
The current events we expect to be notified on.
static size_t fr_trunk_states_len
static int trunk_connection_spawn(fr_trunk_t *trunk, fr_time_t now)
Attempt to spawn a new connection.
#define IS_SERVICEABLE(_tconn)
static void trunk_rebalance(fr_trunk_t *trunk)
Rebalance connections across active trunk members when a new connection becomes active.
fr_dlist_t entry
Entry in the linked list.
static void trunk_request_enter_complete(fr_trunk_request_t *treq)
Request completed successfully, inform the API client and free the request.
#define IS_PROCESSING(_tconn)
#define RECONNECT_BY_STATE(_state, _list)
uint64_t fr_trunk_connection_requests_requeue(fr_trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Move requests off of a connection and requeue elsewhere.
fr_dlist_head_t cancel
Requests in the cancel state.
uint16_t fr_trunk_connection_count_by_state(fr_trunk_t *trunk, int conn_state)
Return the count number of connections in the specified states.
fr_trunk_io_funcs_t funcs
I/O functions.
fr_dlist_head_t cancel_sent
Sent cancellation request.
void fr_trunk_request_signal_complete(fr_trunk_request_t *treq)
Signal that a trunk request is complete.
bool fr_trunk_request_search(fr_trunk_request_t *treq, void *ptr)
fr_trunk_cancel_reason_t cancel_reason
Why this request was cancelled.
fr_event_timer_t const * lifetime_ev
Maximum time this connection can be open.
static fr_table_num_ordered_t const fr_trunk_connection_events[]
uint64_t fr_trunk_request_count_by_state(fr_trunk_t *trunk, int conn_state, int req_state)
Return a count of requests on a connection in a specific state.
static void trunk_connection_writable(fr_trunk_connection_t *tconn)
A connection is writable.
fr_rate_limit_t limit_last_failure_log
Rate limit on "Refusing to enqueue requests - No active conns".
#define DO_REQUEST_MUX(_tconn)
Write one or more requests to a connection.
#define REQUEST_EXTRACT_PARTIAL(_treq)
Remove the current request from the partial slot.
fr_trunk_enqueue_t fr_trunk_request_requeue(fr_trunk_request_t *treq)
Re-enqueue a request on the same connection.
static void _trunk_timer(fr_event_list_t *el, fr_time_t now, void *uctx)
Event to periodically call the connection management function.
#define TRUNK_STATE_TRANSITION(_new)
void fr_trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line, fr_trunk_request_t const *treq)
void * uctx
Uctx data to pass to alloc.
static void _trunk_connection_on_init(UNUSED fr_connection_t *conn, UNUSED fr_connection_state_t prev, UNUSED fr_connection_state_t state, void *uctx)
Connection transitioned to the the init state.
static void trunk_connection_auto_full(fr_trunk_connection_t *tconn)
Automatically mark a connection as inactive.
fr_trunk_request_t * partial
Partially written request.
static void trunk_connection_enter_active(fr_trunk_connection_t *tconn)
Transition a connection back to the active state.
fr_event_timer_t const * manage_ev
Periodic connection management event.
static fr_table_num_ordered_t const fr_trunk_request_states[]
static void trunk_request_enter_cancel_partial(fr_trunk_request_t *treq)
Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot.
static void trunk_connection_enter_draining(fr_trunk_connection_t *tconn)
Transition a connection to the draining state.
void fr_trunk_verify(char const *file, int line, fr_trunk_t *trunk)
Verify a trunk.
fr_dlist_head_t to_free
Connections we're done with and will free on the next call to trunk_manage.
#define IN_REQUEST_DEMUX(_trunk)
#define DO_REQUEST_FAIL(_treq, _prev_state)
Call the fail callback (if set)
fr_trunk_connection_state_t tconn_state
If the treq was associated with a connection the connection state at the time of the state transition...
#define DO_CONNECTION_ALLOC(_tconn)
Allocate a new connection.
void fr_trunk_request_signal_partial(fr_trunk_request_t *treq)
Signal a partial write.
static void trunk_request_enter_backlog(fr_trunk_request_t *treq, bool new)
Transition a request to the backlog state, adding it to the backlog of the trunk.
static uint64_t trunk_connection_requests_dequeue(fr_dlist_head_t *out, fr_trunk_connection_t *tconn, int states, uint64_t max)
Shift requests in the specified states onto new connections.
char const * function
State change occurred in.
static void trunk_manage(fr_trunk_t *trunk, fr_time_t now)
Implements the algorithm we use to manage requests per connection levels.
static void trunk_connection_enter_inactive(fr_trunk_connection_t *tconn)
Transition a connection to the inactive state.
static size_t fr_trunk_req_trigger_names_len
fr_trunk_t * fr_trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, fr_trunk_io_funcs_t const *funcs, fr_trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start)
Allocate a new collection of connections.
int fr_trunk_connection_pop_request(fr_trunk_request_t **treq_out, fr_trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
#define DO_REQUEST_CANCEL_MUX(_tconn)
Write one or more cancellation requests to a connection.
static fr_trunk_enqueue_t trunk_request_check_enqueue(fr_trunk_connection_t **tconn_out, fr_trunk_t *trunk, request_t *request)
Check to see if a trunk request can be enqueued.
fr_trunk_watch_entry_t * next_watcher
Watcher about to be run. Used to prevent nested watchers.
fr_dlist_head_t * log_head
To allow the log entry to remove itself on free.
void * in_handler
Which handler we're inside.
#define DO_REQUEST_DEMUX(_tconn)
Read one or more requests from a connection.
static void trunk_request_enter_cancel_sent(fr_trunk_request_t *treq)
Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list.
fr_dlist_head_t full
Connections which have too many outstanding requests.
static size_t fr_trunk_conn_trigger_names_len
void fr_trunk_connection_signal_inactive(fr_trunk_connection_t *tconn)
Signal a trunk connection cannot accept more requests.
static size_t fr_trunk_cancellation_reasons_len
#define REQUEST_EXTRACT_PENDING(_treq)
Remove the current request from the pending list.
void fr_trunk_connection_signal_readable(fr_trunk_connection_t *tconn)
Signal that a trunk connection is readable.
#define DEQUEUE_ALL(_src_list, _state)
static void trunk_connection_readable(fr_trunk_connection_t *tconn)
A connection is readable.
void fr_trunk_connection_callback_readable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O read function.
fr_trunk_request_t * cancel_partial
Partially written cancellation request.
static void _trunk_connection_lifetime_expire(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Trigger a reconnection of the trunk connection.
char const * log_prefix
What to prepend to messages.
int line
Line change occurred on.
#define DO_CONNECTION_NOTIFY(_tconn, _events)
Change what events the connection should be notified about.
#define TREQ_DLIST_VERIFY(_dlist, _state)
static void trunk_connection_enter_full(fr_trunk_connection_t *tconn)
Transition a connection to the full state.
static fr_table_num_ordered_t const fr_trunk_states[]
uint64_t sent_count
The number of requests that have been sent using this connection.
fr_dlist_head_t draining_to_free
Connections that will be freed once all their requests are complete.
#define TREQ_HEAP_VERIFY(_heap, _state)
static int _state_log_entry_free(fr_trunk_request_state_log_t *slog)
Used for sanity checks to ensure all log entries have been freed.
struct fr_trunk_request_pub_s pub
Public fields in the trunk request.
static uint64_t trunk_connection_requests_requeue(fr_trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Remove requests in specified states from a connection, attempting to distribute them to new connectio...
void fr_trunk_request_signal_fail(fr_trunk_request_t *treq)
Signal that a trunk request failed.
struct fr_trunk_request_s fr_trunk_request_t
static void trunk_request_enter_cancel(fr_trunk_request_t *treq, fr_trunk_cancel_reason_t reason)
Transition a request to the cancel state, placing it in a connection's cancellation list.
bool started
Has the trunk been started.
void fr_trunk_request_verify(char const *file, int line, fr_trunk_request_t *treq)
#define FR_TRUNK_REQUEST_STATE_LOG_MAX
The maximum number of state logs to record per request.
fr_dlist_t entry
Used to track the trunk request in the conn->sent or trunk->backlog request.
static size_t fr_trunk_connection_states_len
fr_trunk_request_state_t to
What state we transitioned to.
static void _trunk_connection_on_connected(UNUSED fr_connection_t *conn, UNUSED fr_connection_state_t prev, UNUSED fr_connection_state_t state, void *uctx)
Connection transitioned to the connected state.
#define IN_REQUEST_CANCEL_MUX(_trunk)
static size_t fr_trunk_connection_events_len
uint64_t last_req_per_conn
The last request to connection ratio we calculated.
fr_time_t last_freed
Last time this request was freed.
void trunk_request_state_log_entry_add(char const *function, int line, fr_trunk_request_t *treq, fr_trunk_request_state_t new)
uint64_t id
Trunk request ID.
static void trunk_request_enter_pending(fr_trunk_request_t *treq, fr_trunk_connection_t *tconn, bool new)
Transition a request to the pending state, adding it to the backlog of an active connection.
fr_dlist_head_t log
State change log.
#define COUNT_BY_STATE(_state, _list)
#define TREQ_OPTION_VERIFY(_option, _state)
fr_dlist_t entry
List entry.
fr_trunk_watch_t func
Function to call when a trunk enters the state this list belongs to.
fr_dlist_head_t watch[FR_TRUNK_STATE_MAX]
To be called when trunk changes state.
#define CONN_BAD_STATE_TRANSITION(_new)
#define REQUEST_STATE_TRANSITION(_new)
Record a request state transition and log appropriate output.
void fr_trunk_connection_signal_reconnect(fr_trunk_connection_t *tconn, fr_connection_reason_t reason)
Signal a trunk connection is no longer viable.
void fr_trunk_connection_signal_writable(fr_trunk_connection_t *tconn)
Signal that a trunk connection is writable.
fr_heap_t * pending
Requests waiting to be sent.
void fr_trunk_connection_callback_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O write function.
static fr_table_num_indexed_bit_pos_t const fr_trunk_conn_trigger_names[]
Map connection states to trigger names.
static void _trunk_connection_on_halted(UNUSED fr_connection_t *conn, UNUSED fr_connection_state_t prev, UNUSED fr_connection_state_t state, void *uctx)
Connection transitioned to the halted state.
void fr_trunk_connection_verify(char const *file, int line, fr_trunk_connection_t *tconn)
static void trunk_request_enter_failed(fr_trunk_request_t *treq)
Request failed, inform the API client and free the request.
fr_dlist_head_t draining
Connections that will be freed once all their requests are complete, but can be reactivated.
bool fr_trunk_connection_in_state(fr_trunk_connection_t *tconn, int state)
Returns true if the trunk connection is in one of the specified states.
static void trunk_connection_event_update(fr_trunk_connection_t *tconn)
Update the registrations for I/O events we're interested in.
static void trunk_connection_enter_draining_to_free(fr_trunk_connection_t *tconn)
Transition a connection to the draining-to-reconnect state.
fr_event_list_t * el
Event list used by this trunk and the connection.
bool managing_connections
Whether the trunk is allowed to manage (open/close) connections.
void fr_trunk_connection_manage_start(fr_trunk_t *trunk)
Allow the trunk to open and close connections in response to load.
static void trunk_watch_call(fr_trunk_t *trunk, fr_dlist_head_t *list, fr_trunk_state_t state)
Call a list of watch functions associated with a state.
static size_t fr_trunk_request_states_len
void fr_trunk_request_signal_cancel_complete(fr_trunk_request_t *treq)
Signal that a remote server acked our cancellation.
static void trunk_connection_enter_inactive_draining(fr_trunk_connection_t *tconn)
Transition a connection to the inactive-draining state.
void fr_trunk_request_signal_cancel_partial(fr_trunk_request_t *treq)
Signal a partial cancel write.
static void trunk_connection_remove(fr_trunk_connection_t *tconn)
Remove a trunk connection from whichever list it's currently in.
bool fr_trunk_search(fr_trunk_t *trunk, void *ptr)
bool enabled
Whether the watch entry is enabled.
#define IN_REQUEST_MUX(_trunk)
fr_dlist_head_t sent
Sent request.
#define TREQ_HEAP_SEARCH(_heap)
#define REQUEST_EXTRACT_CANCEL_SENT(_treq)
Remove the current request from the cancel sent list.
int fr_trunk_connection_manage_schedule(fr_trunk_t *trunk)
Schedule a trunk management event for the next time the event loop is executed.
fr_trunk_conf_t conf
Trunk common configuration.
fr_dlist_head_t init
Connections which have not yet started connecting.
static void trunk_request_enter_partial(fr_trunk_request_t *treq)
Transition a request to the partial state, indicating that is has been partially sent.
Associates request queues with a connection.
Trace state machine changes for a particular request.
Main trunk management handle.
An entry in a trunk watch function list.
fr_time_delta_t req_cleanup_delay
How long must a request in the unassigned (free) list not have been used for before it's cleaned up a...
fr_time_t _CONST last_connected
Last time a connection connected.
uint64_t _CONST req_alloc_reused
How many requests were reused.
fr_trunk_cancel_reason_t
Reasons for a request being cancelled.
@ FR_TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
@ FR_TRUNK_CANCEL_REASON_MOVE
Request cancelled because it's being moved.
@ FR_TRUNK_CANCEL_REASON_REQUEUE
A previously sent request is being requeued.
@ FR_TRUNK_CANCEL_REASON_NONE
Request has not been cancelled.
fr_trunk_connection_t *_CONST tconn
Connection this request belongs to.
uint32_t max_req_per_conn
Maximum connections per request.
fr_trunk_connection_state_t
Used for sanity checks and to track which list the connection is in.
@ FR_TRUNK_CONN_INACTIVE
Connection is inactive and can't accept any more requests.
@ FR_TRUNK_CONN_CLOSED
Connection was closed, either explicitly or due to failure.
@ FR_TRUNK_CONN_HALTED
Halted, ready to be freed.
@ FR_TRUNK_CONN_FULL
Connection is full and can't accept any more requests.
@ FR_TRUNK_CONN_CONNECTING
Connection is connecting.
@ FR_TRUNK_CONN_DRAINING_TO_FREE
Connection will be closed once it has no more outstanding requests.
@ FR_TRUNK_CONN_INACTIVE_DRAINING
Connection is inactive, can't accept any more requests, and will be closed once it has no more outsta...
@ FR_TRUNK_CONN_DRAINING
Connection will be closed once it has no more outstanding requests, if it's not reactivated.
@ FR_TRUNK_CONN_ACTIVE
Connection is connected and ready to service requests.
@ FR_TRUNK_CONN_INIT
In the initial state.
uint16_t start
How many connections to start.
void(* fr_trunk_watch_t)(fr_trunk_t *trunk, fr_trunk_state_t prev, fr_trunk_state_t state, void *uctx)
Receive a notification when a trunk enters a particular state.
unsigned req_pool_headers
How many chunk headers the talloc pool allocated with the treq should contain.
fr_time_t _CONST last_below_target
Last time average utilisation went below the target value.
fr_trunk_request_state_t
Used for sanity checks and to simplify freeing.
@ FR_TRUNK_REQUEST_STATE_BACKLOG
In the backlog.
@ FR_TRUNK_REQUEST_STATE_CANCEL
A request on a particular socket was cancel.
@ FR_TRUNK_REQUEST_STATE_SENT
Was written to a socket. Waiting for a response.
@ FR_TRUNK_REQUEST_STATE_PENDING
In the queue of a connection and is pending writing.
@ FR_TRUNK_REQUEST_STATE_UNASSIGNED
Transition state - Request currently not assigned to any connection.
@ FR_TRUNK_REQUEST_STATE_CANCEL_SENT
We've informed the remote server that the request has been cancelled.
@ FR_TRUNK_REQUEST_STATE_FAILED
The request failed.
@ FR_TRUNK_REQUEST_STATE_CANCEL_COMPLETE
Remote server has acknowledged our cancellation.
@ FR_TRUNK_REQUEST_STATE_CANCEL_PARTIAL
We partially wrote a cancellation request.
@ FR_TRUNK_REQUEST_STATE_PARTIAL
Some of the request was written to the socket, more of it should be written later.
@ FR_TRUNK_REQUEST_STATE_INIT
Initial state.
@ FR_TRUNK_REQUEST_STATE_COMPLETE
The request is complete.
fr_time_t _CONST last_closed
Last time the connection management function closed a connection.
fr_time_t _CONST last_above_target
Last time average utilisation went above the target value.
#define FR_TRUNK_CONN_ALL
All connection states.
bool backlog_on_failed_conn
Assign requests to the backlog when there are no available connections and the last connection event ...
uint64_t max_uses
The maximum time a connection can be used.
fr_trunk_request_state_t _CONST state
Which list the request is now located in.
fr_time_delta_t open_delay
How long we must be above target utilisation to spawn a new connection.
fr_heap_cmp_t request_prioritise
Ordering function for requests.
request_t *_CONST request
The request that we're writing the data on behalf of.
uint64_t _CONST req_alloc_new
How many requests we've allocated.
@ FR_TRUNK_STATE_IDLE
Trunk has no connections.
@ FR_TRUNK_STATE_ACTIVE
Trunk has active connections.
@ FR_TRUNK_STATE_PENDING
Trunk has connections, but none are active.
fr_trunk_connection_alloc_t connection_alloc
Allocate a new fr_connection_t.
fr_heap_cmp_t connection_prioritise
Ordering function for connections.
fr_time_delta_t close_delay
How long we must be below target utilisation to close an existing connection.
uint32_t target_req_per_conn
How many pending requests should ideally be running on each connection.
uint64_t _CONST req_alloc
The number of requests currently allocated that have not been freed or returned to the free list.
fr_trunk_connection_state_t _CONST state
What state the connection is in.
#define FR_TRUNK_REQUEST_STATE_ALL
All request states.
fr_trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
@ FR_TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
@ FR_TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
@ FR_TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
@ FR_TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
fr_trunk_t *_CONST trunk
Trunk this connection belongs to.
fr_connection_t *_CONST conn
The underlying connection.
fr_trunk_t *_CONST trunk
Trunk this request belongs to.
void *_CONST preq
Data for the muxer to write to the connection.
uint16_t connecting
Maximum number of connections that can be in the connecting state.
bool always_writable
Set to true if our ability to write requests to a connection handle is not dependent on the state of ...
uint16_t max
Maximum number of connections in the trunk.
fr_time_delta_t manage_interval
How often we run the management algorithm to open/close connections.
void *_CONST rctx
Resume ctx of the module.
fr_trunk_state_t _CONST state
Current state of the trunk.
fr_time_t _CONST last_read_success
Last time we read a response.
fr_time_t _CONST last_open
Last time the connection management function opened a connection.
size_t req_pool_size
The size of the talloc pool allocated with the treq.
fr_time_t _CONST last_failed
Last time a connection failed.
fr_trunk_request_cancel_mux_t request_cancel_mux
!< Read one or more requests from a connection.
fr_time_delta_t lifetime
Time between reconnects.
@ FR_TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
@ FR_TRUNK_ENQUEUE_NO_CAPACITY
At maximum number of connections, and no connection has capacity.
@ FR_TRUNK_ENQUEUE_OK
Operation was successful.
@ FR_TRUNK_ENQUEUE_DST_UNAVAILABLE
Destination is down.
@ FR_TRUNK_ENQUEUE_FAIL
General failure.
uint16_t min
Shouldn't let connections drop below this number.
#define FR_TRUNK_REQUEST_STATE_CANCEL_ALL
All requests in various cancellation states.
Common configuration parameters for a trunk.
Public fields for the trunk connection.
I/O functions to pass to fr_trunk_alloc.
Public fields for the trunk.
Public fields for the trunk request.
static fr_event_list_t * el
char const * fr_strerror(void)
Get the last library error.
#define fr_box_time_delta(_val)
static size_t char ** out