1#include <freeradius-devel/util/acutest.h>
2#include <freeradius-devel/util/acutest_helpers.h>
3#include <freeradius-devel/util/syserror.h>
29#define DEBUG_LVL_SET if (acutest_verbose_level_ >= 3) fr_debug_lvl = L_DBG_LVL_4 + 1
35 int fd = *(talloc_get_type_abort(conn->h,
int));
53 slen = write(fd, &preq,
sizeof(preq));
55 if (slen == 0)
return;
56 if (slen < (
ssize_t)
sizeof(preq)) abort();
67 int fd = *(talloc_get_type_abort(conn->h,
int));
87 slen = write(fd, &preq,
sizeof(preq));
92 if (slen == 0)
return;
93 if (slen < (
ssize_t)
sizeof(preq)) abort();
102 int fd = *(talloc_get_type_abort(conn->h,
int));
107 slen = read(fd, &preq,
sizeof(preq));
108 if (slen <= 0)
break;
125 if (preq->
freed)
continue;
152 UNUSED int fd_errno,
void *uctx)
176 int fd = *(talloc_get_type_abort(conn->h,
int));
234 if (stats) stats->
failed++;
245 our_preq->
freed =
true;
246 if (stats) stats->
freed++;
254 int *our_h = talloc_get_type_abort(uctx,
int);
256 static size_t to_write;
262 slen = read(fd,
buff,
sizeof(
buff));
263 if (slen <= 0)
return;
268 slen = write(our_h[1],
buff, (
size_t)to_write);
269 if (slen < 0)
return;
271 if (slen < (
ssize_t)to_write) {
274 printf(
"%s - Partial write %zu bytes left\n", __FUNCTION__, to_write);
285 int *our_h = talloc_get_type_abort(h,
int);
287 talloc_free_children(our_h);
300 int *our_h = talloc_get_type_abort(h,
int);
318 h = talloc_array(conn,
int, 2);
319 socketpair(AF_UNIX, SOCK_STREAM, 0, h);
332 char const *log_prefix,
UNUSED void *uctx)
337 memset(&cstat, 0,
sizeof(cstat));
484 h = talloc_array(conn,
int, 2);
485 socketpair(AF_UNIX, SOCK_STREAM, 0, h);
494 char const *log_prefix,
void *uctx)
546 if (tconn == NULL)
return;
572 char const *log_prefix,
void *uctx)
627 if (tconn == NULL)
return;
784 TEST_CASE(
"cancellation via trunk free - TRUNK_REQUEST_STATE_BACKLOG");
792 TEST_CASE(
"cancellation via signal - TRUNK_REQUEST_STATE_BACKLOG");
808 TEST_CASE(
"cancellation via trunk free - TRUNK_REQUEST_STATE_PARTIAL");
832 TEST_CASE(
"cancellation via signal - TRUNK_REQUEST_STATE_PARTIAL");
857 TEST_CASE(
"cancellation via trunk free - TRUNK_REQUEST_STATE_SENT");
879 TEST_CASE(
"cancellation via signal - TRUNK_REQUEST_STATE_SENT");
903 TEST_CASE(
"cancellation via trunk free - TRUNK_REQUEST_STATE_CANCEL_PARTIAL");
934 TEST_CASE(
"cancellation via trunk free - TRUNK_REQUEST_STATE_CANCEL_SENT");
964 TEST_CASE(
"trunk free after TRUNK_REQUEST_STATE_CANCEL_COMPLETE");
1032 TEST_CASE(
"TRUNK_REQUEST_STATE_PARTIAL -> TRUNK_REQUEST_STATE_SENT");
1053 TEST_CASE(
"TRUNK_REQUEST_STATE_CANCEL_PARTIAL -> TRUNK_REQUEST_STATE_CANCEL_SENT");
1096 .backlog_on_failed_conn =
true
1116 TEST_CASE(
"dequeue on reconnect - TRUNK_REQUEST_STATE_PENDING");
1141 TEST_CASE(
"cancel on reconnect - TRUNK_REQUEST_STATE_PARTIAL");
1184 TEST_CASE(
"cancel on reconnect - TRUNK_REQUEST_STATE_SENT");
1222 TEST_CASE(
"free on reconnect - TRUNK_REQUEST_STATE_CANCEL");
1263 TEST_CASE(
"free on reconnect - TRUNK_REQUEST_STATE_CANCEL_PARTIAL");
1320 TEST_CASE(
"free on reconnect - TRUNK_REQUEST_STATE_CANCEL_SENT");
1409 TEST_CASE(
"C1 connecting, !max_req_per_conn - Enqueue MUST NOT spawn");
1422 TEST_CASE(
"C1 active, !max_req_per_conn - Enqueue MUST NOT spawn");
1453 printf(
"Rebalance %p\n", preq);
1467 TEST_CASE(
"C2 connected, R0 - Signal inactive");
1475 TEST_CASE(
"C1 connected, C2 inactive, R3 - Enqueued");
1484 TEST_CASE(
"C2 active, R3 - Signal active, should balance");
1494#define ALLOC_REQ(_id) \
1496 treq_##_id = trunk_request_alloc(trunk, NULL); \
1497 preq_##_id = talloc_zero(ctx, test_proto_request_t); \
1498 preq_##_id->treq = treq_##_id; \
1499 preq_##_id->priority = next_prio++; \
1511 .max_req_per_conn = 2,
1512 .target_req_per_conn = 2,
1516 trunk_request_t *treq_a = NULL, *treq_b = NULL, *treq_c = NULL, *treq_d = NULL, *treq_e = NULL;
1533 TEST_CASE(
"C0, R1 - Enqueue should spawn");
1552 TEST_CASE(
"C1 connecting, R2 - MUST NOT spawn");
1558 TEST_CASE(
"C1 connecting, R3 - MUST NOT spawn");
1564 TEST_CASE(
"C1 connecting, R4 - MUST NOT spawn");
1570 TEST_CASE(
"C1 connecting, R5 - MUST NOT spawn, NO CAPACITY");
1582 TEST_CASE(
"C1 active, R4 - Check pending 2");
1593 TEST_CASE(
"C1 active, R4 - Check sent 2");
1625 TEST_CASE(
"C1 active, R0 - Check complete 2, pending 0");
1671 .max_req_per_conn = 0,
1672 .target_req_per_conn = 2,
1689 memset(&stats, 0,
sizeof(stats));
1695 TEST_CASE(
"C0, R1 - Enqueue should spawn");
1708 TEST_CASE(
"C1 connecting, R2 - MUST NOT spawn");
1722 TEST_CASE(
"C1 connected, R3 - should spawn");
1744 TEST_CASE(
"C1 connected, C2 connecting, R2 - MUST NOT spawn");
1779 TEST_CASE(
"C0, R1 - Enqueue should spawn");
1791 TEST_CASE(
"C1 connecting, R2 - MUST NOT spawn");
1805 TEST_CASE(
"C1 connected, R3 - should spawn");
1832 .max_req_per_conn = 0,
1833 .target_req_per_conn = 0,
1834 .req_pool_headers = 1,
1838 size_t i = 0, requests = 100000;
1839 fr_time_t enqueue_start, enqueue_stop, io_start, io_stop;
1853 memset(&stats, 0,
sizeof(stats));
1880 for (i = 0; i < requests; i++) {
1890 enqueue_time =
fr_time_sub(enqueue_stop, enqueue_start);
1892 INFO(
"Enqueue time %pV (%u rps) (%"PRIu64
"/%"PRIu64
")",
1910 INFO(
"I/O time %pV (%u rps)",
1917 INFO(
"Total time %pV (%u rps)",
static int acutest_verbose_level_
#define TEST_CHECK_LEN(_got, _exp)
#define CC_NO_UBSAN(_sanitize)
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
@ CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
@ CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
@ CONNECTION_FAILED
Connection is being reconnected because it failed.
Holds a complete set of functions for a connection.
void fr_talloc_fault_setup(void)
Register talloc fault handlers.
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
#define fr_event_fd_insert(...)
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
void fr_event_list_set_time_func(fr_event_list_t *el, fr_event_time_source_t func)
Override event list time source.
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
uint64_t fr_event_list_num_timers(fr_event_list_t *el)
Return the number of timer events currently scheduled.
fr_event_list_t * fr_event_list_alloc(TALLOC_CTX *ctx, fr_event_status_cb_t status, void *status_uctx)
Initialise a new event list.
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Stores all information relating to an event list.
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
int fr_nonblock(UNUSED int fd)
int8_t fr_pointer_cmp(void const *a, void const *b)
Compares two pointers.
static const trunk_io_funcs_t io_funcs
static fr_event_list_t * events
uint64_t connection_get_num_timed_out(connection_t const *conn)
Return the number of times this connection has timed out whilst connecting.
uint64_t connection_get_num_reconnected(connection_t const *conn)
Return the number of times we've attempted to establish or re-establish this connection.
void connection_signal_reconnect(connection_t *conn, connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
int connection_signal_on_fd(connection_t *conn, int fd)
Setup the connection to change states to connected or failed based on I/O events.
connection_t * connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_funcs_t const *funcs, connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
static char buff[sizeof("18446744073709551615")+3]
static fr_time_t test_time_base
#define fr_time()
Allow us to arbitrarily manipulate time.
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
static TALLOC_CTX * talloc_init_const(char const *name)
Allocate a top level chunk with a constant name.
static fr_time_t fr_time_add_time_delta(fr_time_t a, fr_time_delta_t b)
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
static fr_time_delta_t fr_time_delta_from_sec(int64_t sec)
static fr_time_delta_t fr_time_delta_from_nsec(int64_t nsec)
#define fr_time_sub(_a, _b)
Subtract one time from another.
A time delta, a difference in time measured in nanoseconds.
A management API for bonding multiple connections together.
int trunk_connection_pop_cancellation(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a cancellation request off a connection's cancellation queue.
void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
Force the trunk to re-establish its connections.
void trunk_request_signal_partial(trunk_request_t *treq)
Signal a partial write.
void trunk_request_signal_cancel_sent(trunk_request_t *treq)
Signal that a remote server has been notified of the cancellation.
void trunk_connection_signal_readable(trunk_connection_t *tconn)
Signal that a trunk connection is readable.
trunk_request_t * trunk_request_alloc(trunk_t *trunk, request_t *request)
(Pre-)Allocate a new trunk request
uint32_t trunk_request_count_by_connection(trunk_connection_t const *tconn, int req_state)
Return the count number of requests associated with a trunk connection.
struct trunk_request_pub_s pub
Public fields in the trunk request.
trunk_t * trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, trunk_io_funcs_t const *funcs, trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start)
Allocate a new collection of connections.
struct trunk_pub_s pub
Public fields in the trunk connection.
trunk_enqueue_t trunk_request_enqueue(trunk_request_t **treq_out, trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
void trunk_request_signal_cancel_complete(trunk_request_t *treq)
Signal that a remote server acked our cancellation.
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
fr_dlist_head_t connecting
Connections which are not yet in the open state.
void trunk_request_signal_cancel(trunk_request_t *treq)
Cancel a trunk request.
struct trunk_connection_pub_s pub
Public fields in the trunk connection.
uint16_t trunk_connection_count_by_state(trunk_t *trunk, int conn_state)
Return the count number of connections in the specified states.
fr_minmax_heap_t * active
Connections which can service requests.
void trunk_request_free(trunk_request_t **treq_to_free)
If the trunk request is freed then update the target requests.
void trunk_connection_signal_active(trunk_connection_t *tconn)
Signal a trunk connection is no longer full.
void trunk_connection_signal_inactive(trunk_connection_t *tconn)
Signal a trunk connection cannot accept more requests.
uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
Return a count of requests on a connection in a specific state.
void trunk_request_signal_cancel_partial(trunk_request_t *treq)
Signal a partial cancel write.
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
void trunk_request_signal_complete(trunk_request_t *treq)
Signal that a trunk request is complete.
void trunk_connection_signal_reconnect(trunk_connection_t *tconn, connection_reason_t reason)
Signal a trunk connection is no longer viable.
void trunk_connection_signal_writable(trunk_connection_t *tconn)
Signal that a trunk connection is writable.
Associates request queues with a connection.
Main trunk management handle.
#define TRUNK_VERIFY(_trunk)
#define TRUNK_REQUEST_STATE_ALL
All request states.
@ TRUNK_CONN_CONNECTING
Connection is connecting.
@ TRUNK_CONN_ACTIVE
Connection is connected and ready to service requests.
uint64_t _CONST req_alloc_reused
How many requests were reused.
trunk_request_state_t _CONST state
Which list the request is now located in.
trunk_connection_t *_CONST tconn
Connection this request belongs to.
trunk_connection_alloc_t connection_alloc
Allocate a new connection_t.
trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
@ TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
@ TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
@ TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
@ TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
#define TRUNK_CONN_ALL
All connection states.
trunk_cancel_reason_t
Reasons for a request being cancelled.
uint64_t _CONST req_alloc_new
How many requests we've allocated.
connection_t *_CONST conn
The underlying connection.
@ TRUNK_ENQUEUE_OK
Operation was successful.
@ TRUNK_ENQUEUE_NO_CAPACITY
At maximum number of connections, and no connection has capacity.
@ TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
void *_CONST preq
Data for the muxer to write to the connection.
trunk_request_cancel_mux_t request_cancel_mux
!< Read one or more requests from a connection.
trunk_request_state_t
Used for sanity checks and to simplify freeing.
@ TRUNK_REQUEST_STATE_PARTIAL
Some of the request was written to the socket, more of it should be written later.
@ TRUNK_REQUEST_STATE_CANCEL_SENT
We've informed the remote server that the request has been cancelled.
@ TRUNK_REQUEST_STATE_CANCEL
A request on a particular socket was cancel.
@ TRUNK_REQUEST_STATE_CANCEL_PARTIAL
We partially wrote a cancellation request.
@ TRUNK_REQUEST_STATE_BACKLOG
In the backlog.
@ TRUNK_REQUEST_STATE_PENDING
In the queue of a connection and is pending writing.
@ TRUNK_REQUEST_STATE_SENT
Was written to a socket. Waiting for a response.
Common configuration parameters for a trunk.
I/O functions to pass to trunk_alloc.
trunk_request_t * treq
Trunk request.
static void test_demux(UNUSED fr_event_list_t *el, UNUSED trunk_connection_t *tconn, connection_t *conn, UNUSED void *uctx)
uint64_t cancelled
Count of tests in this run that were cancelled.
uint64_t freed
Count of tests in this run that were freed.
static connection_t * test_setup_socket_pair_connection_alloc(trunk_connection_t *tconn, fr_event_list_t *el, connection_conf_t const *conn_conf, char const *log_prefix, UNUSED void *uctx)
static void test_enqueue_and_io_speed(void)
bool cancelled
Seen by the cancelled callback.
static void _conn_io_error(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, UNUSED int fd_errno, void *uctx)
static void _conn_io_loopback(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Whenever the second socket in a socket pair is readable, read all pending data, and write it back.
static void test_mux(UNUSED fr_event_list_t *el, trunk_connection_t *tconn, connection_t *conn, UNUSED void *uctx)
static void test_socket_pair_alloc_then_connect_timeout(void)
static void test_cancel_mux(UNUSED fr_event_list_t *el, trunk_connection_t *tconn, connection_t *conn, UNUSED void *uctx)
static void _conn_io_write(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
static void test_partial_to_complete_states(void)
static connection_t * test_setup_socket_pair_1s_timeout_connection_alloc(trunk_connection_t *tconn, fr_event_list_t *el, UNUSED connection_conf_t const *conf, char const *log_prefix, void *uctx)
static void test_request_free(UNUSED request_t *request, void *preq, void *uctx)
static void test_socket_pair_alloc_then_free(void)
bool failed
Seen by the failed callback.
static void test_request_fail(UNUSED request_t *request, void *preq, UNUSED void *rctx, UNUSED trunk_request_state_t state, void *uctx)
static void test_connection_rebalance_requests(void)
static connection_state_t _conn_init_no_signal(void **h_out, connection_t *conn, UNUSED void *uctx)
static connection_t * test_setup_socket_pair_1s_reconnection_delay_alloc(trunk_connection_t *tconn, fr_event_list_t *el, UNUSED connection_conf_t const *conn_conf, char const *log_prefix, void *uctx)
static void test_connection_levels_max(void)
static void test_socket_pair_alloc_then_reconnect_check_delay(void)
bool freed
Seen by the free callback.
uint64_t failed
Count of tests in this run that failed.
static void _conn_io_read(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
bool completed
Seen by the complete callback.
static void _conn_notify(trunk_connection_t *tconn, connection_t *conn, fr_event_list_t *el, trunk_connection_event_t notify_on, UNUSED void *uctx)
int priority
Priority of request.
static void test_socket_pair_alloc_then_reconnect_then_free(void)
static trunk_t * test_setup_trunk(TALLOC_CTX *ctx, fr_event_list_t *el, trunk_conf_t *conf, bool with_cancel_mux, void *uctx)
static connection_state_t _conn_open(fr_event_list_t *el, void *h, UNUSED void *uctx)
Insert I/O handlers that loop any data back round.
static void test_enqueue_cancellation_points(void)
static void test_requeue_on_reconnect(void)
static void test_connection_start_on_enqueue(void)
static int8_t test_preq_cmp(void const *a, void const *b)
bool signal_partial
Muxer should signal that this request is partially written.
static void test_request_complete(UNUSED request_t *request, void *preq, UNUSED void *rctx, void *uctx)
static void test_request_cancel(UNUSED connection_t *conn, void *preq, UNUSED trunk_cancel_reason_t reason, void *uctx)
static void test_enqueue_basic(void)
uint64_t completed
Count of tests in this run that completed.
static void test_connection_levels_alternating_edges(void)
static void _conn_close(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
static connection_state_t _conn_init(void **h_out, connection_t *conn, UNUSED void *uctx)
Allocate a basic socket pair.
bool signal_cancel_partial
Muxer should signal that this request is partially cancelled.
static fr_event_list_t * el
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
#define fr_box_time_delta(_val)