The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
trunk.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 3e1c563ed3f975272113350dc96c4cb201406f43 $
19 *
20 * @file src/lib/server/trunk.c
21 * @brief A management API for bonding multiple connections together.
22 *
23 * @copyright 2019-2020 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
24 * @copyright 2019-2020 The FreeRADIUS server project
25 */
26
27#define LOG_PREFIX trunk->log_prefix
28
29#ifdef NDEBUG
30# define TALLOC_GET_TYPE_ABORT_NOOP 1
31#endif
32
35typedef struct trunk_s trunk_t;
36#define _TRUNK_PRIVATE 1
37#include <freeradius-devel/server/trunk.h>
38
39#include <freeradius-devel/server/trigger.h>
40#include <freeradius-devel/util/debug.h>
41#include <freeradius-devel/util/misc.h>
42#include <freeradius-devel/util/syserror.h>
43#include <freeradius-devel/util/minmax_heap.h>
44
45#ifdef HAVE_STDATOMIC_H
46# include <stdatomic.h>
47# ifndef ATOMIC_VAR_INIT
48# define ATOMIC_VAR_INIT(_x) (_x)
49# endif
50#else
51# include <freeradius-devel/util/stdatomic.h>
52#endif
53
54static atomic_uint_fast64_t request_counter = ATOMIC_VAR_INIT(1);
55
56#ifdef TESTING_TRUNK
58
59static fr_time_t test_time(void)
60{
61 return test_time_base;
62}
63
64#define fr_time test_time
65#endif
66
67#ifndef NDEBUG
68/** The maximum number of state logs to record per request
69 *
70 */
71#define TRUNK_REQUEST_STATE_LOG_MAX 20
72
73/** Trace state machine changes for a particular request
74 *
75 */
76typedef struct {
77 fr_dlist_head_t *log_head; //!< To allow the log entry to remove itself on free.
78 fr_dlist_t entry; //!< Entry in the linked list.
79 trunk_request_state_t from; //!< What state we transitioned from.
80 trunk_request_state_t to; //!< What state we transitioned to.
81
82 trunk_connection_t *tconn; //!< The request was associated with.
83 ///< Pointer may now be invalid, do no de-reference.
84
85 uint64_t tconn_id; //!< If the treq was associated with a connection
86 ///< the connection ID.
87 trunk_connection_state_t tconn_state; //!< If the treq was associated with a connection
88 ///< the connection state at the time of the
89 ///< state transition.
90
91 char const *function; //!< State change occurred in.
92 int line; //!< Line change occurred on.
94#endif
95
96/** Wraps a normal request
97 *
98 */
100 struct trunk_request_pub_s pub; //!< Public fields in the trunk request.
101 ///< This *MUST* be the first field in this
102 ///< structure.
103
104 uint64_t id; //!< Trunk request ID.
105
106 fr_heap_index_t heap_id; //!< Used to track the request conn->pending heap.
107
108 fr_dlist_t entry; //!< Used to track the trunk request in the conn->sent
109 ///< or trunk->backlog request.
110
111 trunk_cancel_reason_t cancel_reason; //!< Why this request was cancelled.
112
113 fr_time_t last_freed; //!< Last time this request was freed.
114
115 bool bound_to_conn; //!< Fail the request if there's an attempt to
116 ///< re-enqueue it.
117
118 bool sent; //!< Trunk request has been sent at least once.
119 ///< Used so that re-queueing doesn't increase trunk
120 ///< `sent` count.
121
122#ifndef NDEBUG
123 fr_dlist_head_t log; //!< State change log.
124#endif
125};
126
127
128/** Associates request queues with a connection
129 *
130 * @dotfile src/lib/server/trunk_conn.gv "Trunk connection state machine"
131 * @dotfile src/lib/server/trunk_req.gv "Trunk request state machine"
132 */
134 struct trunk_connection_pub_s pub; //!< Public fields in the trunk connection.
135 ///< This *MUST* be the first field in this
136 ///< structure.
137
138 fr_heap_index_t heap_id; //!< Used to track the connection in the connected
139 ///< heap.
140
141 fr_dlist_t entry; //!< Used to track the connection in the connecting,
142 ///< full and failed lists.
143
144 /** @name State
145 * @{
146 */
147 trunk_connection_event_t events; //!< The current events we expect to be notified on.
148 /** @} */
149
150 /** @name Request lists
151 * @{
152 */
153 fr_heap_t *pending; //!< Requests waiting to be sent.
154
155 trunk_request_t *partial; //!< Partially written request.
156
157 fr_dlist_head_t sent; //!< Sent request.
158
159 fr_dlist_head_t reapable; //!< Idle request.
160
161 fr_dlist_head_t cancel; //!< Requests in the cancel state.
162
163 trunk_request_t *cancel_partial; //!< Partially written cancellation request.
164
165 fr_dlist_head_t cancel_sent; //!< Sent cancellation request.
166 /** @} */
167
168 /** @name Statistics
169 * @{
170 */
171 uint64_t sent_count; //!< The number of requests that have been sent using
172 ///< this connection.
173 /** @} */
174
175 /** @name Timers
176 * @{
177 */
178 fr_timer_t *lifetime_ev; //!< Maximum time this connection can be open.
179 /** @} */
180};
181
182/** An entry in a trunk watch function list
183 *
184 */
185typedef struct trunk_watch_entry_s {
186 fr_dlist_t entry; //!< List entry.
187 trunk_watch_t func; //!< Function to call when a trunk enters
188 ///< the state this list belongs to
189 bool oneshot; //!< Remove the function after it's called once.
190 bool enabled; //!< Whether the watch entry is enabled.
191 void *uctx; //!< User data to pass to the function.
193
194/** Map connection states to trigger names
195 *
196 * Must stay in the same order as #trunk_connection_state_t
197 */
199 { L("pool.connection_halted"), TRUNK_CONN_HALTED }, /* 0x0000 - bit 0 */
200 { L("pool.connection_init"), TRUNK_CONN_INIT }, /* 0x0001 - bit 1 */
201 { L("pool.connection_connecting"), TRUNK_CONN_CONNECTING }, /* 0x0002 - bit 2 */
202 { L("pool.connection_active"), TRUNK_CONN_ACTIVE }, /* 0x0004 - bit 3 */
203 { L("pool.connection_closed"), TRUNK_CONN_CLOSED }, /* 0x0008 - bit 4 */
204 { L("pool.connection_full"), TRUNK_CONN_FULL }, /* 0x0010 - bit 5 */
205 { L("pool.connection_inactive"), TRUNK_CONN_INACTIVE }, /* 0x0020 - bit 6 */
206 { L("pool.connection_inactive_draining"), TRUNK_CONN_INACTIVE_DRAINING }, /* 0x0040 - bit 7 */
207 { L("pool.connection_draining"), TRUNK_CONN_DRAINING }, /* 0x0080 - bit 8 */
208 { L("pool.connection_draining_to_free"), TRUNK_CONN_DRAINING_TO_FREE } /* 0x0100 - bit 9 */
209};
211
212/** Main trunk management handle
213 *
214 */
215struct trunk_s {
216 struct trunk_pub_s pub; //!< Public fields in the trunk connection.
217 ///< This *MUST* be the first field in this
218 ///< structure.
219
220 char const *log_prefix; //!< What to prepend to messages.
221
222 fr_event_list_t *el; //!< Event list used by this trunk and the connection.
223
224 trunk_conf_t conf; //!< Trunk common configuration.
225
226 fr_dlist_head_t free_requests; //!< Requests in the unassigned state. Waiting to be
227 ///< enqueued.
228
229 fr_heap_t *backlog; //!< The request backlog. Requests we couldn't
230 ///< immediately assign to a connection.
231
232 /** @name Connection lists
233 *
234 * A connection must always be in exactly one of these lists
235 * or trees.
236 *
237 * @{
238 */
239 fr_dlist_head_t init; //!< Connections which have not yet started
240 ///< connecting.
241
242 fr_dlist_head_t connecting; //!< Connections which are not yet in the open state.
243
244 fr_minmax_heap_t *active; //!< Connections which can service requests.
245
246 fr_dlist_head_t full; //!< Connections which have too many outstanding
247 ///< requests.
248
249 fr_dlist_head_t inactive; //!< Connections which have been signalled to be
250 ///< inactive by the API client.
251
252 fr_dlist_head_t inactive_draining; //!< Connections which have been signalled to be
253 ///< inactive by the API client, which the trunk
254 ///< manager is draining to close.
255
256 fr_dlist_head_t failed; //!< Connections that'll be reconnected shortly.
257
258 fr_dlist_head_t closed; //!< Connections that have closed. Either due to
259 ///< shutdown, reconnection or failure.
260
261 fr_dlist_head_t draining; //!< Connections that will be freed once all their
262 ///< requests are complete, but can be reactivated.
263
264 fr_dlist_head_t draining_to_free; //!< Connections that will be freed once all their
265 ///< requests are complete.
266
267 fr_dlist_head_t to_free; //!< Connections we're done with and will free on
268 //!< the next call to trunk_manage.
269 //!< This prevents connections from being freed
270 //!< whilst we're inside callbacks.
271 /** @} */
272
273 /** @name Callbacks
274 * @{
275 */
276 trunk_io_funcs_t funcs; //!< I/O functions.
277
278 void *in_handler; //!< Which handler we're inside.
279
280 void *uctx; //!< Uctx data to pass to alloc.
281
282 fr_dlist_head_t watch[TRUNK_STATE_MAX]; //!< To be called when trunk changes state.
283
284 trunk_watch_entry_t *next_watcher; //!< Watcher about to be run. Used to prevent nested watchers.
285 /** @} */
286
287 /** @name Timers
288 * @{
289 */
290 fr_timer_t *manage_ev; //!< Periodic connection management event.
291 /** @} */
292
293 /** @name Log rate limiting entries
294 * @{
295 */
296 fr_rate_limit_t limit_max_requests_alloc_log; //!< Rate limit on "Refusing to alloc requests - Limit of * requests reached"
297
298 fr_rate_limit_t limit_last_failure_log; //!< Rate limit on "Refusing to enqueue requests - No active conns"
299 /** @} */
300
301 /** @name State
302 * @{
303 */
304 bool freeing; //!< Trunk is being freed, don't spawn new
305 ///< connections or re-enqueue.
306
307 bool started; //!< Has the trunk been started.
308
309 bool managing_connections; //!< Whether the trunk is allowed to manage
310 ///< (open/close) connections.
311
312 uint64_t last_req_per_conn; //!< The last request to connection ratio we calculated.
313 /** @} */
314
315 fr_pair_list_t *trigger_args; //!< Passed to trigger
316
317 bool trigger_undef[NUM_ELEMENTS(trunk_conn_trigger_names)]; //!< Record that a specific trigger is undefined.
318
320};
321
322int trunk_trigger_cf_parse(TALLOC_CTX *ctx, void *out, void *parent, CONF_ITEM *ci, conf_parser_t const *rule);
323
325 { FR_CONF_OFFSET("per_connection_max", trunk_conf_t, max_req_per_conn), .dflt = "2000" },
326 { FR_CONF_OFFSET("per_connection_target", trunk_conf_t, target_req_per_conn), .dflt = "1000" },
327 { FR_CONF_OFFSET("free_delay", trunk_conf_t, req_cleanup_delay), .dflt = "10.0" },
328 { FR_CONF_OFFSET("triggers", trunk_conf_t, req_triggers), .func = trunk_trigger_cf_parse },
329
331};
332
334 { FR_CONF_OFFSET("connect_timeout", connection_conf_t, connection_timeout), .dflt = "3.0" },
335 { FR_CONF_OFFSET("reconnect_delay", connection_conf_t, reconnection_delay), .dflt = "1" },
336
338};
339
340#ifndef TRUNK_TESTS
342 { FR_CONF_OFFSET("start", trunk_conf_t, start), .dflt = "1" },
343 { FR_CONF_OFFSET("min", trunk_conf_t, min), .dflt = "1" },
344 { FR_CONF_OFFSET("max", trunk_conf_t, max), .dflt = "5" },
345 { FR_CONF_OFFSET("connecting", trunk_conf_t, connecting), .dflt = "2" },
346 { FR_CONF_OFFSET("uses", trunk_conf_t, max_uses), .dflt = "0" },
347 { FR_CONF_OFFSET("lifetime", trunk_conf_t, lifetime), .dflt = "0" },
348 { FR_CONF_OFFSET("idle_timeout", trunk_conf_t, idle_timeout), .dflt = "0" },
349
350 { FR_CONF_OFFSET("open_delay", trunk_conf_t, open_delay), .dflt = "0.2" },
351 { FR_CONF_OFFSET("close_delay", trunk_conf_t, close_delay), .dflt = "10.0" },
352
353 { FR_CONF_OFFSET("manage_interval", trunk_conf_t, manage_interval), .dflt = "0.2" },
354
355 { FR_CONF_OFFSET("max_backlog", trunk_conf_t, max_backlog), .dflt = "1000" },
356
357 { FR_CONF_OFFSET("backlog_on_failed_conn", trunk_conf_t, backlog_on_failed_conn), },
358
359 { FR_CONF_OFFSET("triggers", trunk_conf_t, conn_triggers), .func = trunk_trigger_cf_parse },
360
361 { FR_CONF_OFFSET_SUBSECTION("connection", 0, trunk_conf_t, conn_conf, trunk_config_connection), .subcs_size = sizeof(trunk_config_connection) },
362 { FR_CONF_POINTER("request", 0, CONF_FLAG_SUBSECTION, NULL), .subcs = (void const *) trunk_config_request },
363
365};
366#endif
367
368#ifndef NDEBUG
369/** Map request states to trigger names
370 *
371 * Must stay in the same order as #trunk_connection_state_t
372 */
374 { L("pool.request_init"), TRUNK_REQUEST_STATE_INIT }, /* 0x0000 - bit 0 */
375 { L("pool.request_unassigned"), TRUNK_REQUEST_STATE_UNASSIGNED }, /* 0x0001 - bit 1 */
376 { L("pool.request_backlog"), TRUNK_REQUEST_STATE_BACKLOG }, /* 0x0002 - bit 2 */
377 { L("pool.request_pending"), TRUNK_REQUEST_STATE_PENDING }, /* 0x0004 - bit 3 */
378 { L("pool.request_partial"), TRUNK_REQUEST_STATE_PARTIAL }, /* 0x0008 - bit 4 */
379 { L("pool.request_sent"), TRUNK_REQUEST_STATE_SENT }, /* 0x0010 - bit 5 */
380 { L("pool.request_state_reapable"), TRUNK_REQUEST_STATE_REAPABLE }, /* 0x0020 - bit 6 */
381 { L("pool.request_complete"), TRUNK_REQUEST_STATE_COMPLETE }, /* 0x0040 - bit 7 */
382 { L("pool.request_state_failed"), TRUNK_REQUEST_STATE_FAILED }, /* 0x0080 - bit 8 */
383 { L("pool.request_state_cancel"), TRUNK_REQUEST_STATE_CANCEL }, /* 0x0100 - bit 9 */
384 { L("pool.request_state_cancel_sent"), TRUNK_REQUEST_STATE_CANCEL_SENT }, /* 0x0200 - bit 10 */
385 { L("pool.request_state_cancel_partial"), TRUNK_REQUEST_STATE_CANCEL_PARTIAL }, /* 0x0400 - bit 11 */
386 { L("pool.request_state_cancel_complete"), TRUNK_REQUEST_STATE_CANCEL_COMPLETE }, /* 0x0800 - bit 12 */
387};
389#endif
390
392 { L("INIT"), TRUNK_REQUEST_STATE_INIT },
393 { L("UNASSIGNED"), TRUNK_REQUEST_STATE_UNASSIGNED },
394 { L("BACKLOG"), TRUNK_REQUEST_STATE_BACKLOG },
395 { L("PENDING"), TRUNK_REQUEST_STATE_PENDING },
396 { L("PARTIAL"), TRUNK_REQUEST_STATE_PARTIAL },
397 { L("SENT"), TRUNK_REQUEST_STATE_SENT },
398 { L("REAPABLE"), TRUNK_REQUEST_STATE_REAPABLE },
399 { L("COMPLETE"), TRUNK_REQUEST_STATE_COMPLETE },
400 { L("FAILED"), TRUNK_REQUEST_STATE_FAILED },
401 { L("CANCEL"), TRUNK_REQUEST_STATE_CANCEL },
402 { L("CANCEL-SENT"), TRUNK_REQUEST_STATE_CANCEL_SENT },
403 { L("CANCEL-PARTIAL"), TRUNK_REQUEST_STATE_CANCEL_PARTIAL },
404 { L("CANCEL-COMPLETE"), TRUNK_REQUEST_STATE_CANCEL_COMPLETE }
405};
407
409 { L("IDLE"), TRUNK_STATE_IDLE },
410 { L("ACTIVE"), TRUNK_STATE_ACTIVE },
411 { L("PENDING"), TRUNK_STATE_PENDING }
412};
414
416 { L("INIT"), TRUNK_CONN_INIT },
417 { L("HALTED"), TRUNK_CONN_HALTED },
418 { L("CONNECTING"), TRUNK_CONN_CONNECTING },
419 { L("ACTIVE"), TRUNK_CONN_ACTIVE },
420 { L("CLOSED"), TRUNK_CONN_CLOSED },
421 { L("FULL"), TRUNK_CONN_FULL },
422 { L("INACTIVE"), TRUNK_CONN_INACTIVE },
423 { L("INACTIVE-DRAINING"), TRUNK_CONN_INACTIVE_DRAINING },
424 { L("DRAINING"), TRUNK_CONN_DRAINING },
425 { L("DRAINING-TO-FREE"), TRUNK_CONN_DRAINING_TO_FREE }
426};
428
430 { L("TRUNK_CANCEL_REASON_NONE"), TRUNK_CANCEL_REASON_NONE },
431 { L("TRUNK_CANCEL_REASON_SIGNAL"), TRUNK_CANCEL_REASON_SIGNAL },
432 { L("TRUNK_CANCEL_REASON_MOVE"), TRUNK_CANCEL_REASON_MOVE },
433 { L("TRUNK_CANCEL_REASON_REQUEUE"), TRUNK_CANCEL_REASON_REQUEUE }
434};
436
438 { L("TRUNK_CONN_EVENT_NONE"), TRUNK_CONN_EVENT_NONE },
439 { L("TRUNK_CONN_EVENT_READ"), TRUNK_CONN_EVENT_READ },
440 { L("TRUNK_CONN_EVENT_WRITE"), TRUNK_CONN_EVENT_WRITE },
441 { L("TRUNK_CONN_EVENT_BOTH"), TRUNK_CONN_EVENT_BOTH },
442};
444
445#define CONN_TRIGGER(_state) do { \
446 uint8_t idx = fr_high_bit_pos(_state); \
447 if (trunk->conf.conn_triggers && !trunk->trigger_undef[idx]) { \
448 if (trigger(unlang_interpret_get_thread_default(), trunk->conf.conn_trigger_cs, \
449 &trunk->trigger_cp[idx], \
450 fr_table_str_by_value(trunk_conn_trigger_names, _state, \
451 "<INVALID>"), true, trunk->trigger_args) == -1) { \
452 trunk->trigger_undef[idx] = true; \
453 } \
454 } \
455} while (0)
456
457#define CONN_STATE_TRANSITION(_new, _log) \
458do { \
459 _log("[%" PRIu64 "] Trunk connection changed state %s -> %s", \
460 tconn->pub.conn->id, \
461 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
462 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>")); \
463 tconn->pub.state = _new; \
464 CONN_TRIGGER(_new); \
465 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false); \
466} while (0)
467
468#define CONN_BAD_STATE_TRANSITION(_new) \
469do { \
470 if (!fr_cond_assert_msg(0, "[%" PRIu64 "] Trunk connection invalid transition %s -> %s", \
471 tconn->pub.conn->id, \
472 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
473 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>"))) return; \
474} while (0)
475
476#ifndef NDEBUG
477void trunk_request_state_log_entry_add(char const *function, int line,
478 trunk_request_t *treq, trunk_request_state_t new) CC_HINT(nonnull);
479
480#define REQUEST_TRIGGER(_state) do { \
481 if (trunk->conf.req_triggers) { \
482 trigger(unlang_interpret_get_thread_default(), \
483 trunk->conf.req_trigger_cs, NULL, fr_table_str_by_value(trunk_req_trigger_names, _state, \
484 "<INVALID>"), true, trunk->trigger_args); \
485 } \
486} while (0)
487
488/** Record a request state transition and log appropriate output
489 *
490 */
491#define REQUEST_STATE_TRANSITION(_new) \
492do { \
493 request_t *request = treq->pub.request; \
494 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
495 treq->id, \
496 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
497 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
498 trunk_request_state_log_entry_add(__FUNCTION__, __LINE__, treq, _new); \
499 treq->pub.state = _new; \
500 REQUEST_TRIGGER(_new); \
501} while (0)
502#define REQUEST_BAD_STATE_TRANSITION(_new) \
503do { \
504 trunk_request_state_log(&default_log, L_ERR, __FILE__, __LINE__, treq); \
505 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
506 treq->id, \
507 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
508 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
509} while (0)
510#else
511/** Record a request state transition
512 *
513 */
514#define REQUEST_STATE_TRANSITION(_new) \
515do { \
516 request_t *request = treq->pub.request; \
517 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
518 treq->id, \
519 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
520 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
521 treq->pub.state = _new; \
522} while (0)
523#define REQUEST_BAD_STATE_TRANSITION(_new) \
524do { \
525 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
526 treq->id, \
527 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
528 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
529} while (0)
530#endif
531
532
533/** Call the cancel callback if set
534 *
535 */
536#define DO_REQUEST_CANCEL(_treq, _reason) \
537do { \
538 if ((_treq)->pub.trunk->funcs.request_cancel) { \
539 request_t *request = (_treq)->pub.request; \
540 void *_prev = (_treq)->pub.trunk->in_handler; \
541 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_cancel; \
542 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_cancel(conn=%p, preq=%p, reason=%s, uctx=%p)", \
543 (_treq)->pub.tconn->pub.conn, \
544 (_treq)->pub.preq, \
545 fr_table_str_by_value(trunk_cancellation_reasons, \
546 (_reason), \
547 "<INVALID>"), \
548 (_treq)->pub.trunk->uctx); \
549 (_treq)->pub.trunk->funcs.request_cancel((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_reason), (_treq)->pub.trunk->uctx); \
550 (_treq)->pub.trunk->in_handler = _prev; \
551 } \
552} while(0)
553
554/** Call the "conn_release" callback (if set)
555 *
556 */
557#define DO_REQUEST_CONN_RELEASE(_treq) \
558do { \
559 if ((_treq)->pub.trunk->funcs.request_conn_release) { \
560 request_t *request = (_treq)->pub.request; \
561 void *_prev = (_treq)->pub.trunk->in_handler; \
562 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_conn_release; \
563 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_conn_release(conn=%p, preq=%p, uctx=%p)", \
564 (_treq)->pub.tconn->pub.conn, \
565 (_treq)->pub.preq, \
566 (_treq)->pub.trunk->uctx); \
567 (_treq)->pub.trunk->funcs.request_conn_release((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
568 (_treq)->pub.trunk->in_handler = _prev; \
569 } \
570} while(0)
571
572/** Call the complete callback (if set)
573 *
574 */
575#define DO_REQUEST_COMPLETE(_treq) \
576do { \
577 if ((_treq)->pub.trunk->funcs.request_complete) { \
578 request_t *request = (_treq)->pub.request; \
579 void *_prev = (_treq)->pub.trunk->in_handler; \
580 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_complete(request=%p, preq=%p, rctx=%p, uctx=%p)", \
581 (_treq)->pub.request, \
582 (_treq)->pub.preq, \
583 (_treq)->pub.rctx, \
584 (_treq)->pub.trunk->uctx); \
585 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_complete; \
586 (_treq)->pub.trunk->funcs.request_complete((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, (_treq)->pub.trunk->uctx); \
587 (_treq)->pub.trunk->in_handler = _prev; \
588 } \
589} while(0)
590
591/** Call the fail callback (if set)
592 *
593 */
594#define DO_REQUEST_FAIL(_treq, _prev_state) \
595do { \
596 if ((_treq)->pub.trunk->funcs.request_fail) { \
597 request_t *request = (_treq)->pub.request; \
598 void *_prev = (_treq)->pub.trunk->in_handler; \
599 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_fail(request=%p, preq=%p, rctx=%p, state=%s uctx=%p)", \
600 (_treq)->pub.request, \
601 (_treq)->pub.preq, \
602 (_treq)->pub.rctx, \
603 fr_table_str_by_value(trunk_request_states, (_prev_state), "<INVALID>"), \
604 (_treq)->pub.trunk->uctx); \
605 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_fail; \
606 (_treq)->pub.trunk->funcs.request_fail((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, _prev_state, (_treq)->pub.trunk->uctx); \
607 (_treq)->pub.trunk->in_handler = _prev; \
608 } \
609} while(0)
610
611/** Call the free callback (if set)
612 *
613 */
614#define DO_REQUEST_FREE(_treq) \
615do { \
616 if ((_treq)->pub.trunk->funcs.request_free) { \
617 request_t *request = (_treq)->pub.request; \
618 void *_prev = (_treq)->pub.trunk->in_handler; \
619 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_free(request=%p, preq=%p, uctx=%p)", \
620 (_treq)->pub.request, \
621 (_treq)->pub.preq, \
622 (_treq)->pub.trunk->uctx); \
623 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_free; \
624 (_treq)->pub.trunk->funcs.request_free((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
625 (_treq)->pub.trunk->in_handler = _prev; \
626 } \
627} while(0)
628
629/** Write one or more requests to a connection
630 *
631 */
632#define DO_REQUEST_MUX(_tconn) \
633do { \
634 void *_prev = (_tconn)->pub.trunk->in_handler; \
635 DEBUG3("[%" PRIu64 "] Calling request_mux(el=%p, tconn=%p, conn=%p, uctx=%p)", \
636 (_tconn)->pub.conn->id, \
637 (_tconn)->pub.trunk->el, \
638 (_tconn), \
639 (_tconn)->pub.conn, \
640 (_tconn)->pub.trunk->uctx); \
641 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_mux; \
642 (_tconn)->pub.trunk->funcs.request_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
643 (_tconn)->pub.trunk->in_handler = _prev; \
644} while(0)
645
646/** Read one or more requests from a connection
647 *
648 */
649#define DO_REQUEST_DEMUX(_tconn) \
650do { \
651 void *_prev = (_tconn)->pub.trunk->in_handler; \
652 DEBUG3("[%" PRIu64 "] Calling request_demux(tconn=%p, conn=%p, uctx=%p)", \
653 (_tconn)->pub.conn->id, \
654 (_tconn), \
655 (_tconn)->pub.conn, \
656 (_tconn)->pub.trunk->uctx); \
657 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_demux; \
658 (_tconn)->pub.trunk->funcs.request_demux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
659 (_tconn)->pub.trunk->in_handler = _prev; \
660} while(0)
661
662/** Write one or more cancellation requests to a connection
663 *
664 */
665#define DO_REQUEST_CANCEL_MUX(_tconn) \
666do { \
667 if ((_tconn)->pub.trunk->funcs.request_cancel_mux) { \
668 void *_prev = (_tconn)->pub.trunk->in_handler; \
669 DEBUG3("[%" PRIu64 "] Calling request_cancel_mux(tconn=%p, conn=%p, uctx=%p)", \
670 (_tconn)->pub.conn->id, \
671 (_tconn), \
672 (_tconn)->pub.conn, \
673 (_tconn)->pub.trunk->uctx); \
674 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_cancel_mux; \
675 (_tconn)->pub.trunk->funcs.request_cancel_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
676 (_tconn)->pub.trunk->in_handler = _prev; \
677 } \
678} while(0)
679
680/** Allocate a new connection
681 *
682 */
683#define DO_CONNECTION_ALLOC(_tconn) \
684do { \
685 void *_prev = trunk->in_handler; \
686 DEBUG3("Calling connection_alloc(tconn=%p, el=%p, conf=%p, log_prefix=\"%s\", uctx=%p)", \
687 (_tconn), \
688 (_tconn)->pub.trunk->el, \
689 (_tconn)->pub.trunk->conf.conn_conf, \
690 trunk->log_prefix, \
691 (_tconn)->pub.trunk->uctx); \
692 (_tconn)->pub.trunk->in_handler = (void *) (_tconn)->pub.trunk->funcs.connection_alloc; \
693 (_tconn)->pub.conn = trunk->funcs.connection_alloc((_tconn), (_tconn)->pub.trunk->el, (_tconn)->pub.trunk->conf.conn_conf, (_tconn)->pub.trunk->log_prefix, trunk->uctx); \
694 (_tconn)->pub.trunk->in_handler = _prev; \
695 if (!(_tconn)->pub.conn) { \
696 ERROR("Failed creating new connection"); \
697 talloc_free(tconn); \
698 return -1; \
699 } \
700} while(0)
701
702/** Change what events the connection should be notified about
703 *
704 */
705#define DO_CONNECTION_NOTIFY(_tconn, _events) \
706do { \
707 if ((_tconn)->pub.trunk->funcs.connection_notify) { \
708 void *_prev = (_tconn)->pub.trunk->in_handler; \
709 DEBUG3("[%" PRIu64 "] Calling connection_notify(tconn=%p, conn=%p, el=%p, events=%s, uctx=%p)", \
710 (_tconn)->pub.conn->id, \
711 (_tconn), \
712 (_tconn)->pub.conn, \
713 (_tconn)->pub.trunk->el, \
714 fr_table_str_by_value(trunk_connection_events, (_events), "<INVALID>"), \
715 (_tconn)->pub.trunk->uctx); \
716 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.connection_notify; \
717 (_tconn)->pub.trunk->funcs.connection_notify((_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->el, (_events), (_tconn)->pub.trunk->uctx); \
718 (_tconn)->pub.trunk->in_handler = _prev; \
719 } \
720} while(0)
721
722#define IN_HANDLER(_trunk) (((_trunk)->in_handler) != NULL)
723#define IN_REQUEST_MUX(_trunk) (((_trunk)->funcs.request_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_mux))
724#define IN_REQUEST_DEMUX(_trunk) (((_trunk)->funcs.request_demux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_demux))
725#define IN_REQUEST_CANCEL_MUX(_trunk) (((_trunk)->funcs.request_cancel_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_cancel_mux))
726
727#define IS_SERVICEABLE(_tconn) ((_tconn)->pub.state & TRUNK_CONN_SERVICEABLE)
728#define IS_PROCESSING(_tconn) ((_tconn)->pub.state & TRUNK_CONN_PROCESSING)
729
730/** Remove the current request from the backlog
731 *
732 */
733#define REQUEST_EXTRACT_BACKLOG(_treq) \
734do { \
735 int _ret; \
736 _ret = fr_heap_extract(&(_treq)->pub.trunk->backlog, _treq); \
737 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from backlog heap: %s", fr_strerror())) break; \
738} while (0)
739
740/** Remove the current request from the pending list
741 *
742 */
743#define REQUEST_EXTRACT_PENDING(_treq) \
744do { \
745 int _ret; \
746 _ret = fr_heap_extract(&(_treq)->pub.tconn->pending, _treq); \
747 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from pending heap: %s", fr_strerror())) break; \
748} while (0)
749
750/** Remove the current request from the partial slot
751 *
752 */
753#define REQUEST_EXTRACT_PARTIAL(_treq) \
754do { \
755 fr_assert((_treq)->pub.tconn->partial == treq); \
756 tconn->partial = NULL; \
757} while (0)
758
759/** Remove the current request from the sent list
760 *
761 */
762#define REQUEST_EXTRACT_SENT(_treq) fr_dlist_remove(&tconn->sent, treq)
763
764/** Remove the current request from the reapable list
765 *
766 */
767#define REQUEST_EXTRACT_REAPABLE(_treq) fr_dlist_remove(&tconn->reapable, treq)
768
769/** Remove the current request from the cancel list
770 *
771 */
772#define REQUEST_EXTRACT_CANCEL(_treq) fr_dlist_remove(&tconn->cancel, treq)
773
774/** Remove the current request from the cancel_partial slot
775 *
776 */
777#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq) \
778do { \
779 fr_assert((_treq)->pub.tconn->cancel_partial == treq); \
780 tconn->cancel_partial = NULL; \
781} while (0)
782
783/** Remove the current request from the cancel sent list
784 *
785 */
786#define REQUEST_EXTRACT_CANCEL_SENT(_treq) fr_dlist_remove(&tconn->cancel_sent, treq)
787
788/** Reorder the connections in the active heap
789 *
790 * fr_heap_extract will also error out if heap_id is bad - no need for assert
791 */
792#define CONN_REORDER(_tconn) \
793do { \
794 int _ret; \
795 if ((fr_minmax_heap_num_elements((_tconn)->pub.trunk->active) == 1)) break; \
796 if (!fr_cond_assert((_tconn)->pub.state == TRUNK_CONN_ACTIVE)) break; \
797 _ret = fr_minmax_heap_extract((_tconn)->pub.trunk->active, (_tconn)); \
798 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) break; \
799 fr_minmax_heap_insert((_tconn)->pub.trunk->active, (_tconn)); \
800} while (0)
801
802DIAG_OFF(unused-function)
803
804#define FR_TRUNK_LIST_FUNC(_list,_type) \
805static inline CC_HINT(nonnull, always_inline) void trunk_list_ ## _list ## _add(trunk_t *trunk, _type *arg) \
806{ \
807 fr_dlist_insert_head(&trunk->_list, arg); \
808} \
809static inline CC_HINT(nonnull, always_inline) _type *trunk_list_ ## _list ##_peek(trunk_t *trunk) \
810{ \
811 return fr_dlist_tail(&trunk->_list); \
812} \
813static inline CC_HINT(nonnull, always_inline) _type *trunk_list_ ## _list ##_pop(trunk_t *trunk) \
814{ \
815 return fr_dlist_pop_head(&trunk->_list); \
816} \
817static inline CC_HINT(nonnull, always_inline) void trunk_list_ ## _list ##_remove(trunk_t *trunk, _type *arg) \
818{ \
819 fr_dlist_remove(&trunk->_list, arg); \
820}
821
825FR_TRUNK_LIST_FUNC(inactive_draining, trunk_connection_t)
827
828DIAG_ON(unused-function)
829
830/** Call a list of watch functions associated with a state
831 *
832 */
834{
835 /*
836 * Nested watcher calls are not allowed
837 * and shouldn't be possible because of
838 * deferred signal processing.
839 */
840 fr_assert(trunk->next_watcher == NULL);
841
842 while ((trunk->next_watcher = fr_dlist_next(list, trunk->next_watcher))) {
843 trunk_watch_entry_t *entry = trunk->next_watcher;
844 bool oneshot = entry->oneshot; /* Watcher could be freed, so store now */
845
846 if (!entry->enabled) continue;
847 if (oneshot) trunk->next_watcher = fr_dlist_remove(list, entry);
848
849 entry->func(trunk, trunk->pub.state, state, entry->uctx);
850
851 if (oneshot) talloc_free(entry);
852 }
853 trunk->next_watcher = NULL;
854}
855
856/** Call the state change watch functions
857 *
858 */
859#define CALL_WATCHERS(_trunk, _state) \
860do { \
861 if (fr_dlist_empty(&(_trunk)->watch[_state])) break; \
862 trunk_watch_call((_trunk), &(_trunk)->watch[_state], _state); \
863} while(0)
864
865/** Remove a watch function from a trunk state list
866 *
867 * @param[in] trunk The trunk to remove the watcher from.
868 * @param[in] state to remove the watch from.
869 * @param[in] watch Function to remove.
870 * @return
871 * - 0 if the function was removed successfully.
872 * - -1 if the function wasn't present in the watch list.
873 * - -2 if an invalid state was passed.
874 */
876{
877 trunk_watch_entry_t *entry = NULL;
878 fr_dlist_head_t *list;
879
880 if (state >= TRUNK_STATE_MAX) return -2;
881
882 list = &trunk->watch[state];
883 while ((entry = fr_dlist_next(list, entry))) {
884 if (entry->func == watch) {
885 if (trunk->next_watcher == entry) {
886 trunk->next_watcher = fr_dlist_remove(list, entry);
887 } else {
888 fr_dlist_remove(list, entry);
889 }
890 talloc_free(entry);
891 return 0;
892 }
893 }
894
895 return -1;
896}
897
898/** Add a watch entry to the trunk state list
899 *
900 * @param[in] trunk The trunk to add the watcher to.
901 * @param[in] state to watch for.
902 * @param[in] watch Function to add.
903 * @param[in] oneshot Should this watcher only be run once.
904 * @param[in] uctx Context to pass to function.
905 * @return
906 * - NULL if an invalid state is passed.
907 * - A new watch entry handle on success.
908 */
910 trunk_watch_t watch, bool oneshot, void const *uctx)
911{
912 trunk_watch_entry_t *entry;
913 fr_dlist_head_t *list;
914
915 if (state >= TRUNK_STATE_MAX) return NULL;
916
917 list = &trunk->watch[state];
918 MEM(entry = talloc_zero(trunk, trunk_watch_entry_t));
919
920 entry->func = watch;
921 entry->oneshot = oneshot;
922 entry->enabled = true;
923 memcpy(&entry->uctx, &uctx, sizeof(entry->uctx));
924 fr_dlist_insert_tail(list, entry);
925
926 return entry;
927}
928
929#define TRUNK_STATE_TRANSITION(_new) \
930do { \
931 DEBUG3("Trunk changed state %s -> %s", \
932 fr_table_str_by_value(trunk_states, trunk->pub.state, "<INVALID>"), \
933 fr_table_str_by_value(trunk_states, _new, "<INVALID>")); \
934 CALL_WATCHERS(trunk, _new); \
935 trunk->pub.state = _new; \
936} while (0)
937
938static void trunk_request_enter_backlog(trunk_request_t *treq, bool new);
939static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new);
948
949static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out,
950 trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify);
951
952static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now);
953static inline void trunk_connection_auto_full(trunk_connection_t *tconn);
954static inline void trunk_connection_auto_unfull(trunk_connection_t *tconn);
955static inline void trunk_connection_readable(trunk_connection_t *tconn);
956static inline void trunk_connection_writable(trunk_connection_t *tconn);
964
965static void trunk_rebalance(trunk_t *trunk);
966static void trunk_manage(trunk_t *trunk, fr_time_t now);
967static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx);
968static void trunk_backlog_drain(trunk_t *trunk);
969
970/** Compare two protocol requests
971 *
972 * Allows protocol requests to be prioritised with a function
973 * specified by the API client. Defaults to by pointer address
974 * if no function is specified.
975 *
976 * @param[in] a treq to compare to b.
977 * @param[in] b treq to compare to a.
978 * @return
979 * - +1 if a > b.
980 * - 0 if a == b.
981 * - -1 if a < b.
982 */
983static int8_t _trunk_request_prioritise(void const *a, void const *b)
984{
987
988 fr_assert(treq_a->pub.trunk == treq_b->pub.trunk);
989
990 return treq_a->pub.trunk->funcs.request_prioritise(treq_a->pub.preq, treq_b->pub.preq);
991}
992
993/** Remove a request from all connection lists
994 *
995 * A common function used by init, fail, complete state functions to disassociate
996 * a request from a connection in preparation for freeing or reassignment.
997 *
998 * Despite its unassuming name, this function is *the* place to put calls to
999 * functions which need to be called when the number of requests associated with
1000 * a connection changes.
1001 *
1002 * Trunk requests will always be passed to this function before they're removed
1003 * from a connection, even if the requests are being freed.
1004 *
1005 * @param[in] treq to trigger a state change for.
1006 */
1008{
1009 trunk_connection_t *tconn = treq->pub.tconn;
1010 trunk_t *trunk = treq->pub.trunk;
1011
1012 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1013
1014 switch (treq->pub.state) {
1016 return; /* Not associated with connection */
1017
1020 break;
1021
1024 break;
1025
1028 break;
1029
1032 break;
1033
1036 break;
1037
1040 break;
1041
1044 break;
1045
1046 default:
1047 fr_assert(0);
1048 break;
1049 }
1050
1051 /*
1052 * If the request wasn't associated with a
1053 * connection, then there's nothing more
1054 * to do.
1055 */
1056 if (!tconn) return;
1057
1058 {
1059 request_t *request = treq->pub.request;
1060
1061 ROPTIONAL(RDEBUG3, DEBUG3, "%s Trunk connection released request %" PRIu64,
1062 tconn->pub.conn->name, treq->id);
1063 }
1064 /*
1065 * Release any connection specific resources the
1066 * treq holds.
1067 */
1069
1070 switch (tconn->pub.state){
1071 case TRUNK_CONN_FULL:
1072 trunk_connection_auto_unfull(tconn); /* Check if we can switch back to active */
1073 if (tconn->pub.state == TRUNK_CONN_FULL) break; /* Only fallthrough if conn is now active */
1075
1076 case TRUNK_CONN_ACTIVE:
1077 CONN_REORDER(tconn);
1078 break;
1079
1080 default:
1081 break;
1082 }
1083
1084 treq->pub.tconn = NULL;
1085
1086 /*
1087 * Request removed from the connection
1088 * see if we need up deregister I/O events.
1089 */
1091}
1092
1093/** Transition a request to the unassigned state, in preparation for re-assignment
1094 *
1095 * @note treq->tconn may be inviable after calling
1096 * if treq->conn and connection_signals_pause are not used.
1097 * This is due to call to trunk_request_remove_from_conn.
1098 *
1099 * @param[in] treq to trigger a state change for.
1100 */
1102{
1103 trunk_t *trunk = treq->pub.trunk;
1104
1105 switch (treq->pub.state) {
1107 return;
1108
1111 break;
1112
1118 break;
1119
1120 default:
1122 }
1123
1125}
1126
1127/** Transition a request to the backlog state, adding it to the backlog of the trunk
1128 *
1129 * @note treq->tconn and treq may be inviable after calling
1130 * if treq->conn and connection_signals_pause are not used.
1131 * This is due to call to trunk_manage.
1132 *
1133 * @param[in] treq to trigger a state change for.
1134 * @param[in] new Whether this is a new request.
1135 */
1137{
1138 trunk_connection_t *tconn = treq->pub.tconn;
1139 trunk_t *trunk = treq->pub.trunk;
1140
1141 switch (treq->pub.state) {
1144 break;
1145
1148 break;
1149
1152 break;
1153
1154 default:
1156 }
1157
1159 fr_heap_insert(&trunk->backlog, treq); /* Insert into the backlog heap */
1160
1161 /*
1162 * A new request has entered the trunk.
1163 * Re-calculate request/connection ratios.
1164 */
1165 if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1166
1167 /*
1168 * To reduce latency, if there's no connections
1169 * in the connecting state, call the trunk manage
1170 * function immediately.
1171 *
1172 * Likewise, if there's draining connections
1173 * which could be moved back to active call
1174 * the trunk manage function.
1175 *
1176 * Remember requests only enter the backlog if
1177 * there's no connections which can service them.
1178 */
1182 }
1183}
1184
1185/** Transition a request to the pending state, adding it to the backlog of an active connection
1186 *
1187 * All trunk requests being added to a connection get passed to this function.
1188 * All trunk requests being removed from a connection get passed to #trunk_request_remove_from_conn.
1189 *
1190 * @note treq->tconn and treq may be inviable after calling
1191 * if treq->conn and connection_signals_pause is not used.
1192 * This is due to call to trunk_connection_event_update.
1193 *
1194 * @param[in] treq to trigger a state change for.
1195 * @param[in] tconn to enqueue the request on.
1196 * @param[in] new Whether this is a new request.
1197 */
1199{
1200 trunk_t *trunk = treq->pub.trunk;
1201
1202 fr_assert(tconn->pub.trunk == trunk);
1203 fr_assert(IS_PROCESSING(tconn));
1204
1205 switch (treq->pub.state) {
1208 fr_assert(!treq->pub.tconn);
1209 break;
1210
1212 fr_assert(!treq->pub.tconn);
1214 break;
1215
1216 case TRUNK_REQUEST_STATE_CANCEL: /* Moved from another connection */
1218 break;
1219
1220 default:
1222 }
1223
1224 /*
1225 * Assign the new connection first this first so
1226 * it appears in the state log.
1227 */
1228 treq->pub.tconn = tconn;
1229
1231
1232 {
1233 request_t *request = treq->pub.request;
1234
1235 ROPTIONAL(RDEBUG, DEBUG3, "%s Trunk connection assigned request %"PRIu64,
1236 tconn->pub.conn->name, treq->id);
1237 }
1238 fr_heap_insert(&tconn->pending, treq);
1239
1240 /*
1241 * A new request has entered the trunk.
1242 * Re-calculate request/connection ratios.
1243 */
1244 if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1245
1246 /*
1247 * Check if we need to automatically transition the
1248 * connection to full.
1249 */
1251
1252 /*
1253 * Reorder the connection in the heap now it has an
1254 * additional request.
1255 */
1256 if (tconn->pub.state == TRUNK_CONN_ACTIVE) CONN_REORDER(tconn);
1257
1258 /*
1259 * We have a new request, see if we need to register
1260 * for I/O events.
1261 */
1263}
1264
1265/** Transition a request to the partial state, indicating that is has been partially sent
1266 *
1267 * @param[in] treq to trigger a state change for.
1268 */
1270{
1271 trunk_connection_t *tconn = treq->pub.tconn;
1272 trunk_t *trunk = treq->pub.trunk;
1273
1274 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1275
1276 switch (treq->pub.state) {
1277 case TRUNK_REQUEST_STATE_PENDING: /* All requests go through pending, even requeued ones */
1279 break;
1280
1281 default:
1283 }
1284
1285 fr_assert(!tconn->partial);
1286 tconn->partial = treq;
1287
1289}
1290
1291/** Transition a request to the sent state, indicating that it's been sent in its entirety
1292 *
1293 * @note treq->tconn and treq may be inviable after calling
1294 * if treq->conn and connection_signals_pause is not used.
1295 * This is due to call to trunk_connection_event_update.
1296 *
1297 * @param[in] treq to trigger a state change for.
1298 */
1300{
1301 trunk_connection_t *tconn = treq->pub.tconn;
1302 trunk_t *trunk = treq->pub.trunk;
1303
1304 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1305
1306 switch (treq->pub.state) {
1309 break;
1310
1313 break;
1314
1315 default:
1317 }
1318
1320 fr_dlist_insert_tail(&tconn->sent, treq);
1321
1322 /*
1323 * Update the connection's sent stats if this is the
1324 * first time this request is being sent.
1325 */
1326 if (!treq->sent) {
1327 trunk->pub.last_write_success = fr_time();
1328
1330 tconn->sent_count++;
1331 treq->sent = true;
1332
1333 /*
1334 * Enforces max_uses
1335 */
1336 if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1337 DEBUG3("Trunk hit max uses %" PRIu64 " at %d", trunk->conf.max_uses, __LINE__);
1339 }
1340 }
1341
1342 /*
1343 * We just sent a request, we probably need
1344 * to tell the event loop we want to be
1345 * notified if there's data available.
1346 */
1348}
1349
1350/** Transition a request to the reapable state, indicating that it's been sent in its entirety, but no response is expected
1351 *
1352 * @note Largely a replica of trunk_request_enter_sent.
1353 *
1354 * @param[in] treq to trigger a state change for.
1355 */
1357{
1358 trunk_connection_t *tconn = treq->pub.tconn;
1359 trunk_t *trunk = treq->pub.trunk;
1360
1361 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1362
1363 switch (treq->pub.state) {
1366 break;
1367
1370 break;
1371
1372 default:
1374 }
1375
1377 fr_dlist_insert_tail(&tconn->reapable, treq);
1378
1379 if (!treq->sent) {
1380 tconn->sent_count++;
1381 treq->sent = true;
1382
1383 if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1384 DEBUG3("Trunk hit max uses %" PRIu64 " at %d", trunk->conf.max_uses, __LINE__);
1386 }
1387 }
1388
1390}
1391
1392/** Transition a request to the cancel state, placing it in a connection's cancellation list
1393 *
1394 * If a request_cancel_send callback is provided, that callback will
1395 * be called periodically for requests which were cancelled due to
1396 * a signal.
1397 *
1398 * The request_cancel_send callback will dequeue cancelled requests
1399 * and inform a remote server that the result is no longer required.
1400 *
1401 * A request must enter this state before being added to the backlog
1402 * of another connection if it's been sent or partially sent.
1403 *
1404 * @note treq->tconn and treq may be inviable after calling
1405 * if treq->conn and connection_signals_pause is not used.
1406 * This is due to call to trunk_connection_event_update.
1407 *
1408 * @param[in] treq to trigger a state change for.
1409 * @param[in] reason Why the request was cancelled.
1410 * Should be one of:
1411 * - TRUNK_CANCEL_REASON_SIGNAL request cancelled
1412 * because of a signal from the interpreter.
1413 * - TRUNK_CANCEL_REASON_MOVE request cancelled
1414 * because the connection failed and it needs
1415 * to be assigned to a new connection.
1416 * - TRUNK_CANCEL_REASON_REQUEUE request cancelled
1417 * as it needs to be resent on the same connection.
1418 */
1420{
1421 trunk_connection_t *tconn = treq->pub.tconn;
1422 trunk_t *trunk = treq->pub.trunk;
1423
1424 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1425
1426 switch (treq->pub.state) {
1429 break;
1430
1433 break;
1434
1437 break;
1438
1439 default:
1441 }
1442
1444 fr_dlist_insert_tail(&tconn->cancel, treq);
1445 treq->cancel_reason = reason;
1446
1447 DO_REQUEST_CANCEL(treq, reason);
1448
1449 /*
1450 * Our treq is no longer bound to an actual
1451 * request_t *, as we can't guarantee the
1452 * lifetime of the original request_t *.
1453 */
1454 if (treq->cancel_reason == TRUNK_CANCEL_REASON_SIGNAL) treq->pub.request = NULL;
1455
1456 /*
1457 * Register for I/O write events if we need to.
1458 */
1460}
1461
1462/** Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot
1463 *
1464 * The request_demux function is then responsible for signalling
1465 * that the cancel request is complete when the remote server
1466 * acknowledges the cancellation request.
1467 *
1468 * @param[in] treq to trigger a state change for.
1469 */
1471{
1472 trunk_connection_t *tconn = treq->pub.tconn;
1473 trunk_t *trunk = treq->pub.trunk;
1474
1475 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1478
1479 switch (treq->pub.state) {
1480 case TRUNK_REQUEST_STATE_CANCEL: /* The only valid state cancel_sent can be reached from */
1482 break;
1483
1484 default:
1486 }
1487
1489 fr_assert(!tconn->cancel_partial);
1490 tconn->cancel_partial = treq;
1491}
1492
1493/** Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list
1494 *
1495 * The request_demux function is then responsible for signalling
1496 * that the cancel request is complete when the remote server
1497 * acknowledges the cancellation request.
1498 *
1499 * @note treq->tconn and treq may be inviable after calling
1500 * if treq->conn and connection_signals_pause is not used.
1501 * This is due to call to trunk_connection_event_update.
1502 *
1503 * @param[in] treq to trigger a state change for.
1504 */
1506{
1507 trunk_connection_t *tconn = treq->pub.tconn;
1508 trunk_t *trunk = treq->pub.trunk;
1509
1510 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1513
1514 switch (treq->pub.state) {
1517 break;
1518
1521 break;
1522
1523 default:
1525 }
1526
1528 fr_dlist_insert_tail(&tconn->cancel_sent, treq);
1529
1530 /*
1531 * De-register for I/O write events
1532 * and register the read events
1533 * to drain the cancel ACKs.
1534 */
1536}
1537
1538/** Cancellation was acked, the request is complete, free it
1539 *
1540 * The API client will not be informed, as the original request_t *
1541 * will likely have been freed by this point.
1542 *
1543 * @note treq will be inviable after a call to this function.
1544 * treq->tconn may be inviable after calling
1545 * if treq->conn and connection_signals_pause is not used.
1546 * This is due to call to trunk_request_remove_from_conn.
1547 *
1548 * @param[in] treq to mark as complete.
1549 */
1551{
1552 trunk_connection_t *tconn = treq->pub.tconn;
1553 trunk_t *trunk = treq->pub.trunk;
1554
1555 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1556 if (!fr_cond_assert(!treq->pub.request)) return; /* Only a valid state for request_t * which have been cancelled */
1557
1558 switch (treq->pub.state) {
1561 break;
1562
1563 default:
1565 }
1566
1568
1570 trunk_request_free(&treq); /* Free the request */
1571}
1572
1573/** Request completed successfully, inform the API client and free the request
1574 *
1575 * @note treq will be inviable after a call to this function.
1576 * treq->tconn may also be inviable due to call to
1577 * trunk_request_remove_from_conn.
1578 *
1579 * @param[in] treq to mark as complete.
1580 */
1582{
1583 trunk_connection_t *tconn = treq->pub.tconn;
1584 trunk_t *trunk = treq->pub.trunk;
1585
1586 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1587
1588 switch (treq->pub.state) {
1593 break;
1594
1595 default:
1597 }
1598
1600 DO_REQUEST_COMPLETE(treq);
1601 trunk_request_free(&treq); /* Free the request */
1602}
1603
1604/** Request failed, inform the API client and free the request
1605 *
1606 * @note treq will be inviable after a call to this function.
1607 * treq->tconn may also be inviable due to call to
1608 * trunk_request_remove_from_conn.
1609 *
1610 * @param[in] treq to mark as failed.
1611 */
1613{
1614 trunk_connection_t *tconn = treq->pub.tconn;
1615 trunk_t *trunk = treq->pub.trunk;
1616 trunk_request_state_t prev = treq->pub.state;
1617
1618 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1619
1620 switch (treq->pub.state) {
1623 break;
1624
1625 default:
1627 break;
1628 }
1629
1631 DO_REQUEST_FAIL(treq, prev);
1632 trunk_request_free(&treq); /* Free the request */
1633}
1634
1635/** Check to see if a trunk request can be enqueued
1636 *
1637 * @param[out] tconn_out Connection the request may be enqueued on.
1638 * @param[in] trunk To enqueue requests on.
1639 * @param[in] request associated with the treq (if any).
1640 * @return
1641 * - TRUNK_ENQUEUE_OK caller should enqueue request on provided tconn.
1642 * - TRUNK_ENQUEUE_IN_BACKLOG Request should be queued in the backlog.
1643 * - TRUNK_ENQUEUE_NO_CAPACITY Unable to enqueue request as we have no spare
1644 * connections or backlog space.
1645 * - TRUNK_ENQUEUE_DST_UNAVAILABLE Can't enqueue because the destination is
1646 * unreachable.
1647 */
1649 request_t *request)
1650{
1651 trunk_connection_t *tconn;
1652 /*
1653 * If we have an active connection then
1654 * return that.
1655 */
1656 tconn = fr_minmax_heap_min_peek(trunk->active);
1657 if (tconn) {
1658 *tconn_out = tconn;
1659 return TRUNK_ENQUEUE_OK;
1660 }
1661
1662 /*
1663 * Unlike the connection pool, we don't need
1664 * to drive any internal processes by feeding
1665 * it requests.
1666 *
1667 * If the last event to occur was a failure
1668 * we refuse to enqueue new requests until
1669 * one or more connections comes online.
1670 */
1671 if (!trunk->conf.backlog_on_failed_conn &&
1672 fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
1673 fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed)) {
1675 RWARN, WARN, "Refusing to enqueue requests - "
1676 "No active connections and last event was a connection failure");
1677
1679 }
1680
1681
1682 /*
1683 * Only enforce if we're limiting maximum
1684 * number of connections, and maximum
1685 * number of requests per connection.
1686 *
1687 * The alloc function also checks this
1688 * which is why this is only done for
1689 * debug builds.
1690 */
1691 if (trunk->conf.max_req_per_conn && trunk->conf.max) {
1692 uint64_t limit;
1693
1694 limit = trunk->conf.max * (uint64_t)trunk->conf.max_req_per_conn;
1695 if (limit > 0) {
1696 uint64_t total_reqs;
1697
1698 total_reqs = trunk_request_count_by_state(trunk, TRUNK_CONN_ALL,
1700 if (total_reqs >= (limit + trunk->conf.max_backlog)) {
1702 RWARN, WARN, "Refusing to alloc requests - "
1703 "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
1704 "plus %u backlog requests reached",
1705 limit, trunk->conf.max, trunk->conf.max_req_per_conn,
1706 trunk->conf.max_backlog);
1708 }
1709 }
1710 }
1711
1713}
1714
1715/** Enqueue a request which has never been assigned to a connection or was previously cancelled
1716 *
1717 * @param[in] treq to re enqueue. Must have been removed
1718 * from its existing connection with
1719 * #trunk_connection_requests_dequeue.
1720 * @return
1721 * - TRUNK_ENQUEUE_OK Request was re-enqueued.
1722 * - TRUNK_ENQUEUE_NO_CAPACITY Request enqueueing failed because we're at capacity.
1723 * - TRUNK_ENQUEUE_DST_UNAVAILABLE Enqueuing failed for some reason.
1724 * Usually because the connection to the resource is down.
1725 */
1727{
1728 trunk_t *trunk = treq->pub.trunk;
1729 trunk_connection_t *tconn = NULL;
1730 trunk_enqueue_t ret;
1731
1732 /*
1733 * Must *NOT* still be assigned to another connection
1734 */
1735 fr_assert(!treq->pub.tconn);
1736
1737 ret = trunk_request_check_enqueue(&tconn, trunk, treq->pub.request);
1738 switch (ret) {
1739 case TRUNK_ENQUEUE_OK:
1740 if (trunk->conf.always_writable) {
1742 trunk_request_enter_pending(treq, tconn, false);
1745 } else {
1746 trunk_request_enter_pending(treq, tconn, false);
1747 }
1748 break;
1749
1751 /*
1752 * No more connections and request
1753 * is already in the backlog.
1754 *
1755 * Signal our caller it should stop
1756 * trying to drain the backlog.
1757 */
1759 trunk_request_enter_backlog(treq, false);
1760 break;
1761
1762 default:
1763 break;
1764 }
1765
1766 return ret;
1767}
1768
1769/** Shift requests in the specified states onto new connections
1770 *
1771 * This function will blindly dequeue any requests in the specified state and get
1772 * them back to the unassigned state, cancelling any sent or partially sent requests.
1773 *
1774 * This function does not check that dequeuing a request in a particular state is a
1775 * sane or sensible thing to do, that's up to the caller!
1776 *
1777 * @param[out] out A list to insert the newly dequeued and unassigned
1778 * requests into.
1779 * @param[in] tconn to dequeue requests from.
1780 * @param[in] states Dequeue request in these states.
1781 * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
1782 */
1784 int states, uint64_t max)
1785{
1786 trunk_request_t *treq;
1787 uint64_t count = 0;
1788
1789 if (max == 0) max = UINT64_MAX;
1790
1791#define OVER_MAX_CHECK if (++count > max) return (count - 1)
1792
1793#define DEQUEUE_ALL(_src_list, _state) do { \
1794 while ((treq = fr_dlist_head(_src_list))) { \
1795 OVER_MAX_CHECK; \
1796 fr_assert(treq->pub.state == (_state)); \
1797 trunk_request_enter_unassigned(treq); \
1798 fr_dlist_insert_tail(out, treq); \
1799 } } while (0)
1800
1801 /*
1802 * Don't need to do anything with
1803 * cancellation requests.
1804 */
1805 if (states & TRUNK_REQUEST_STATE_CANCEL) DEQUEUE_ALL(&tconn->cancel,
1807
1808 /*
1809 * ...same with cancel inform
1810 */
1813
1814 /*
1815 * ....same with cancel partial
1816 */
1819 treq = tconn->cancel_partial;
1820 if (treq) {
1824 }
1825 }
1826
1827 /*
1828 * ...and pending.
1829 */
1830 if (states & TRUNK_REQUEST_STATE_PENDING) {
1831 while ((treq = fr_heap_peek(tconn->pending))) {
1836 }
1837 }
1838
1839 /*
1840 * Cancel partially sent requests
1841 */
1842 if (states & TRUNK_REQUEST_STATE_PARTIAL) {
1844 treq = tconn->partial;
1845 if (treq) {
1847
1848 /*
1849 * Don't allow the connection to change state whilst
1850 * we're draining requests from it.
1851 */
1857 }
1858 }
1859
1860 /*
1861 * Cancel sent requests
1862 */
1863 if (states & TRUNK_REQUEST_STATE_SENT) {
1864 /*
1865 * Don't allow the connection to change state whilst
1866 * we're draining requests from it.
1867 */
1869 while ((treq = fr_dlist_head(&tconn->sent))) {
1872
1876 }
1878 }
1879
1880 return count;
1881}
1882
1883/** Remove requests in specified states from a connection, attempting to distribute them to new connections
1884 *
1885 * @param[in] tconn To remove requests from.
1886 * @param[in] states One or more states or'd together.
1887 * @param[in] max The maximum number of requests to dequeue.
1888 * 0 for unlimited.
1889 * @param[in] fail_bound If true causes any requests bound to the connection to fail.
1890 * If false bound requests will not be moved.
1891 *
1892 * @return the number of requests re-queued.
1893 */
1894static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
1895{
1896 trunk_t *trunk = tconn->pub.trunk;
1897 fr_dlist_head_t to_process;
1898 trunk_request_t *treq = NULL;
1899 uint64_t moved = 0;
1900
1901 if (max == 0) max = UINT64_MAX;
1902
1903 fr_dlist_talloc_init(&to_process, trunk_request_t, entry);
1904
1905 /*
1906 * Prevent the connection changing state whilst we're
1907 * working with it.
1908 *
1909 * There's a user callback that can be called by
1910 * trunk_request_enqueue_existing which can reconnect
1911 * the connection.
1912 */
1914
1915 /*
1916 * Remove non-cancelled requests from the connection
1917 */
1918 moved += trunk_connection_requests_dequeue(&to_process, tconn, states & ~TRUNK_REQUEST_STATE_CANCEL_ALL, max);
1919
1920 /*
1921 * Prevent requests being requeued on the same trunk
1922 * connection, which would break rebalancing.
1923 *
1924 * This is a bit of a hack, but nothing should test
1925 * for connection/list consistency in this code,
1926 * and if something is added later, it'll be flagged
1927 * by the tests.
1928 */
1929 if (tconn->pub.state == TRUNK_CONN_ACTIVE) {
1930 int ret;
1931
1932 ret = fr_minmax_heap_extract(trunk->active, tconn);
1933 if (!fr_cond_assert_msg(ret == 0,
1934 "Failed extracting conn from active heap: %s", fr_strerror())) goto done;
1935
1936 }
1937
1938 /*
1939 * Loop over all the requests we gathered and
1940 * redistribute them to new connections.
1941 */
1942 while ((treq = fr_dlist_next(&to_process, treq))) {
1943 trunk_request_t *prev;
1944
1945 prev = fr_dlist_remove(&to_process, treq);
1946
1947 /*
1948 * Attempts to re-queue a request
1949 * that's bound to a connection
1950 * results in a failure.
1951 */
1952 if (treq->bound_to_conn) {
1953 if (fail_bound || !IS_SERVICEABLE(tconn)) {
1955 } else {
1956 trunk_request_enter_pending(treq, tconn, false);
1957 }
1958 goto next;
1959 }
1960
1961 switch (trunk_request_enqueue_existing(treq)) {
1962 case TRUNK_ENQUEUE_OK:
1963 break;
1964
1965 /*
1966 * A connection failed, and
1967 * there's no other connections
1968 * available to deal with the
1969 * load, it's been placed back
1970 * in the backlog.
1971 */
1973 break;
1974
1975 /*
1976 * If we fail to re-enqueue then
1977 * there's nothing to do except
1978 * fail the request.
1979 */
1982 case TRUNK_ENQUEUE_FAIL:
1984 break;
1985 }
1986 next:
1987 treq = prev;
1988 }
1989
1990 /*
1991 * Add the connection back into the active list
1992 */
1993 if (tconn->pub.state == TRUNK_CONN_ACTIVE) {
1994 int ret;
1995
1996 ret = fr_minmax_heap_insert(trunk->active, tconn);
1997 if (!fr_cond_assert_msg(ret == 0,
1998 "Failed re-inserting conn into active heap: %s", fr_strerror())) goto done;
1999 }
2000 if (moved >= max) goto done;
2001
2002 /*
2003 * Deal with the cancelled requests specially we can't
2004 * queue them up again as they were only valid on that
2005 * specific connection.
2006 *
2007 * We just need to run them to completion which, as
2008 * they should already be in the unassigned state,
2009 * just means freeing them.
2010 */
2011 moved += trunk_connection_requests_dequeue(&to_process, tconn,
2012 states & TRUNK_REQUEST_STATE_CANCEL_ALL, max - moved);
2013 while ((treq = fr_dlist_next(&to_process, treq))) {
2014 trunk_request_t *prev;
2015
2016 prev = fr_dlist_remove(&to_process, treq);
2017 trunk_request_free(&treq);
2018 treq = prev;
2019 }
2020
2021done:
2022
2023 /*
2024 * Always re-calculate the request/connection
2025 * ratio at the end.
2026 *
2027 * This avoids having the state transition
2028 * functions do it.
2029 *
2030 * The ratio would be wrong when they calculated
2031 * it anyway, because a bunch of requests are
2032 * dequeued from the connection and temporarily
2033 * cease to exist from the perspective of the
2034 * trunk_requests_per_connection code.
2035 */
2036 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
2037
2039 return moved;
2040}
2041
2042/** Move requests off of a connection and requeue elsewhere
2043 *
2044 * @note We don't re-queue on draining or draining to free, as requests should have already been
2045 * moved off of the connection. It's also dangerous as the trunk management code main
2046 * clean up a connection in this state when it's run on re-queue, and then the caller
2047 * may try and access a now freed connection.
2048 *
2049 * @param[in] tconn to move requests off of.
2050 * @param[in] states Only move requests in this state.
2051 * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
2052 * @param[in] fail_bound If true causes any requests bound to the connection to fail.
2053 * If false bound requests will not be moved.
2054 * @return The number of requests requeued.
2055 */
2056uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
2057{
2058 switch (tconn->pub.state) {
2059 case TRUNK_CONN_ACTIVE:
2060 case TRUNK_CONN_FULL:
2062 return trunk_connection_requests_requeue_priv(tconn, states, max, fail_bound);
2063
2064 default:
2065 return 0;
2066 }
2067}
2068
2069/** Signal a partial write
2070 *
2071 * Where there's high load, and the outbound write buffer is full
2072 *
2073 * @param[in] treq to signal state change for.
2074 */
2076{
2077 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2078
2080 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2081
2082 switch (treq->pub.state) {
2085 break;
2086
2087 default:
2088 return;
2089 }
2090}
2091
2092/** Signal that the request was written to a connection successfully
2093 *
2094 * @param[in] treq to signal state change for.
2095 */
2097{
2098 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2099
2101 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2102
2103 switch (treq->pub.state) {
2107 break;
2108
2109 default:
2110 return;
2111 }
2112}
2113
2114/** Signal that the request was written to a connection successfully, but no response is expected
2115 *
2116 * @param[in] treq to signal state change for.
2117 */
2119{
2120 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2121
2123 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2124
2125 switch (treq->pub.state) {
2129 break;
2130
2131 default:
2132 return;
2133 }
2134}
2135
2136/** Signal that a trunk request is complete
2137 *
2138 * The API client will be informed that the request is now complete.
2139 */
2141{
2142 trunk_t *trunk = treq->pub.trunk;
2143
2144 if (!fr_cond_assert_msg(trunk, "treq not associated with trunk")) return;
2145
2146 /*
2147 * We assume that if the request is being signalled
2148 * as complete from the demux function, that it was
2149 * a successful read.
2150 *
2151 * If this assumption turns out to be incorrect
2152 * then we need to add an argument to signal_complete
2153 * to indicate if this is a successful read.
2154 */
2155 if (IN_REQUEST_DEMUX(trunk)) {
2156 trunk_connection_t *tconn = treq->pub.tconn;
2157
2158 trunk->pub.last_read_success = fr_time();
2160 }
2161
2162 switch (treq->pub.state) {
2164 case TRUNK_REQUEST_STATE_PENDING: /* Got immediate response, i.e. cached */
2167 break;
2168
2169 default:
2170 return;
2171 }
2172}
2173
2174/** Signal that a trunk request failed
2175 *
2176 * The API client will be informed that the request has failed.
2177 */
2179{
2180 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2181
2183}
2184
2185/** Cancel a trunk request
2186 *
2187 * treq can be in any state, but requests to cancel if the treq is not in
2188 * the TRUNK_REQUEST_STATE_PARTIAL or TRUNK_REQUEST_STATE_SENT state will be ignored.
2189 *
2190 * The complete or failed callbacks will not be called here, as it's assumed the request_t *
2191 * is now inviable as it's being cancelled.
2192 *
2193 * The free function however, is called, and that should be used to perform necessary
2194 * cleanup.
2195 *
2196 * @param[in] treq to signal state change for.
2197 */
2199{
2200 trunk_t *trunk;
2201
2202 /*
2203 * Ensure treq hasn't been freed
2204 */
2205 (void)talloc_get_type_abort(treq, trunk_request_t);
2206
2207 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2208
2210 "%s cannot be called within a handler", __FUNCTION__)) return;
2211
2212 trunk = treq->pub.trunk;
2213
2214 switch (treq->pub.state) {
2215 /*
2216 * We don't call the complete or failed callbacks
2217 * as the request and rctx are no longer viable.
2218 */
2221 {
2222 trunk_connection_t *tconn = treq->pub.tconn;
2223
2224 /*
2225 * Don't allow connection state changes
2226 */
2230 "Bad state %s after cancellation",
2231 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"))) {
2233 return;
2234 }
2235 /*
2236 * No cancel muxer. We're done.
2237 *
2238 * If we do have a cancel mux function,
2239 * the next time this connection becomes
2240 * writable, we'll call the cancel mux
2241 * function.
2242 *
2243 * We don't run the complete or failed
2244 * callbacks here as the request is
2245 * being cancelled.
2246 */
2247 if (!trunk->funcs.request_cancel_mux) {
2249 trunk_request_free(&treq);
2250 }
2252 }
2253 break;
2254
2255 /*
2256 * We're already in the process of cancelling a
2257 * request, so ignore duplicate signals.
2258 */
2263 break;
2264
2265 /*
2266 * For any other state, we just release the request
2267 * from its current connection and free it.
2268 */
2269 default:
2271 trunk_request_free(&treq);
2272 break;
2273 }
2274}
2275
2276/** Signal a partial cancel write
2277 *
2278 * Where there's high load, and the outbound write buffer is full
2279 *
2280 * @param[in] treq to signal state change for.
2281 */
2283{
2284 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2285
2287 "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2288
2289 switch (treq->pub.state) {
2292 break;
2293
2294 default:
2295 return;
2296 }
2297}
2298
2299/** Signal that a remote server has been notified of the cancellation
2300 *
2301 * Called from request_cancel_mux to indicate that the datastore has been informed
2302 * that the response is no longer needed.
2303 *
2304 * @param[in] treq to signal state change for.
2305 */
2307{
2308 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2309
2311 "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2312
2313 switch (treq->pub.state) {
2317 break;
2318
2319 default:
2320 break;
2321 }
2322}
2323
2324/** Signal that a remote server acked our cancellation
2325 *
2326 * Called from request_demux to indicate that it got an ack for the cancellation.
2327 *
2328 * @param[in] treq to signal state change for.
2329 */
2331{
2332 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2333
2335 "%s can only be called from within request_demux or request_cancel_mux handlers",
2336 __FUNCTION__)) return;
2337
2338 switch (treq->pub.state) {
2340 /*
2341 * This is allowed, as we may not need to wait
2342 * for the database to ACK our cancellation
2343 * request.
2344 *
2345 * Note: TRUNK_REQUEST_STATE_CANCEL_PARTIAL
2346 * is not allowed here, as that'd mean we'd half
2347 * written the cancellation request out to the
2348 * socket, and then decided to abandon it.
2349 *
2350 * That'd leave the socket in an unusable state.
2351 */
2354 break;
2355
2356 default:
2357 break;
2358 }
2359}
2360
2361/** If the trunk request is freed then update the target requests
2362 *
2363 * gperftools showed calling the request free function directly was slightly faster
2364 * than using talloc_free.
2365 *
2366 * @param[in] treq_to_free request.
2367 */
2369{
2370 trunk_request_t *treq = *treq_to_free;
2371 trunk_t *trunk;
2372
2373 if (unlikely(!treq)) return;
2374
2375 trunk = treq->pub.trunk;
2376
2377 /*
2378 * The only valid states a trunk request can be
2379 * freed from.
2380 */
2381 switch (treq->pub.state) {
2387 break;
2388
2389 default:
2390 if (!fr_cond_assert(0)) return;
2391 }
2392
2393 /*
2394 * Zero out the pointer to prevent double frees
2395 */
2396 *treq_to_free = NULL;
2397
2398 /*
2399 * Call the API client callback to free
2400 * any associated memory.
2401 */
2402 DO_REQUEST_FREE(treq);
2403
2404 /*
2405 * Update the last above/below target stats
2406 * We only do this when we alloc or free
2407 * connections, or on connection
2408 * state changes.
2409 */
2410 trunk_requests_per_connection(NULL, NULL, treq->pub.trunk, fr_time(), false);
2411
2412 /*
2413 * This tracks the total number of requests
2414 * allocated and not freed or returned to
2415 * the free list.
2416 */
2417 if (fr_cond_assert(trunk->pub.req_alloc > 0)) trunk->pub.req_alloc--;
2418
2419 /*
2420 * No cleanup delay, means cleanup immediately
2421 */
2424
2425#ifndef NDEBUG
2426 /*
2427 * Ensure anything parented off the treq
2428 * is freed. We do this to trigger
2429 * the destructors for the log entries.
2430 */
2431 talloc_free_children(treq);
2432
2433 /*
2434 * State log should now be empty as entries
2435 * remove themselves from the dlist
2436 * on free.
2437 */
2439 "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2440#endif
2441
2442 talloc_free(treq);
2443 return;
2444 }
2445
2446 /*
2447 * Ensure anything parented off the treq
2448 * is freed.
2449 */
2450 talloc_free_children(treq);
2451
2452#ifndef NDEBUG
2453 /*
2454 * State log should now be empty as entries
2455 * remove themselves from the dlist
2456 * on free.
2457 */
2459 "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2460#endif
2461
2462 /*
2463 *
2464 * Return the trunk request back to the init state.
2465 */
2466 *treq = (trunk_request_t){
2467 .pub = {
2469 .trunk = treq->pub.trunk,
2470 },
2471 .cancel_reason = TRUNK_CANCEL_REASON_NONE,
2472 .last_freed = fr_time(),
2473#ifndef NDEBUG
2474 .log = treq->log /* Keep the list head, to save reinitialisation */
2475#endif
2476 };
2477
2478
2479 /*
2480 * Insert at the head, so that we can free
2481 * requests that have been unused for N
2482 * seconds from the tail.
2483 */
2484 trunk_list_free_requests_add(trunk, treq);
2485
2486}
2487
2488/** Actually free the trunk request
2489 *
2490 */
2492{
2493 trunk_t *trunk = treq->pub.trunk;
2494
2495 switch (treq->pub.state) {
2498 break;
2499
2500 default:
2501 fr_assert(0);
2502 break;
2503 }
2504
2505 trunk_list_free_requests_remove(trunk, treq);
2506
2507 return 0;
2508}
2509
2510/** (Pre-)Allocate a new trunk request
2511 *
2512 * If trunk->conf.req_pool_headers or trunk->conf.req_pool_size are not zero then the
2513 * request will be a talloc pool, which can be used to hold the preq.
2514 *
2515 * @note Do not use MEM to check the result of this allocated as it may fail for
2516 * non-fatal reasons.
2517 *
2518 * @param[in] trunk to add request to.
2519 * @param[in] request to wrap in a trunk request (treq).
2520 * @return
2521 * - A newly allocated request.
2522 * - NULL if too many requests are allocated.
2523 */
2525{
2526 trunk_request_t *treq;
2527
2528 /*
2529 * The number of treqs currently allocated
2530 * exceeds the maximum number allowed.
2531 */
2532 if (trunk->conf.max_req_per_conn && trunk->conf.max) {
2533 uint64_t limit;
2534
2535 limit = (uint64_t) trunk->conf.max_req_per_conn * trunk->conf.max;
2536 if (trunk->pub.req_alloc >= (limit + trunk->conf.max_backlog)) {
2538 RWARN, WARN, "Refusing to alloc requests - "
2539 "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
2540 "plus %u backlog requests reached",
2541 limit, trunk->conf.max, trunk->conf.max_req_per_conn,
2542 trunk->conf.max_backlog);
2543 return NULL;
2544 }
2545 }
2546
2547 /*
2548 * Re-use a recently freed request, which might have some
2549 * better cache locality than getting a request from the tail.
2550 *
2551 * If we can't do that, just allocate a new one.
2552 */
2553 treq = trunk_list_free_requests_pop(trunk);
2554 if (treq) {
2556 fr_assert(treq->pub.trunk == trunk);
2557 fr_assert(treq->pub.tconn == NULL);
2560 trunk->pub.req_alloc_reused++;
2561 } else {
2563 trunk->conf.req_pool_headers, trunk->conf.req_pool_size));
2564 talloc_set_destructor(treq, _trunk_request_free);
2565
2566 *treq = (trunk_request_t){
2567 .pub = {
2569 .trunk = trunk
2570 },
2571 .cancel_reason = TRUNK_CANCEL_REASON_NONE
2572 };
2573 trunk->pub.req_alloc_new++;
2574#ifndef NDEBUG
2576#endif
2577 }
2578
2579 trunk->pub.req_alloc++;
2581 /* heap_id - initialised when treq inserted into pending */
2582 /* list - empty */
2583 /* preq - populated later */
2584 /* rctx - populated later */
2585 treq->pub.request = request;
2586
2587 return treq;
2588}
2589
2590/** Enqueue a request that needs data written to the trunk
2591 *
2592 * When a request_t * needs to make an asynchronous request to an external datastore
2593 * it should call this function, specifying a preq (protocol request) containing
2594 * the data necessary to request information from the external datastore, and an
2595 * rctx (resume ctx) used to hold the decoded response and/or any error codes.
2596 *
2597 * After a treq is successfully enqueued it will either be assigned immediately
2598 * to the pending queue of a connection, or if no connections are available,
2599 * (depending on the trunk configuration) the treq will be placed in the trunk's
2600 * global backlog.
2601 *
2602 * After receiving a positive return code from this function the caller should
2603 * immediately yield, to allow the various timers and I/O handlers that drive tconn
2604 * (trunk connection) and treq state changes to be called.
2605 *
2606 * When a tconn becomes writable (or the trunk is configured to be always writable)
2607 * the #trunk_request_mux_t callback will be called to dequeue, encode and
2608 * send any pending requests for that tconn. The #trunk_request_mux_t callback
2609 * is also responsible for tracking the outbound requests to allow the
2610 * #trunk_request_demux_t callback to match inbound responses with the original
2611 * treq. Once the #trunk_request_mux_t callback is done processing the treq
2612 * it signals what state the treq should enter next using one of the
2613 * trunk_request_signal_* functions.
2614 *
2615 * When a tconn becomes readable the user specified #trunk_request_demux_t
2616 * callback is called to process any responses, match them with the original treq.
2617 * and signal what state they should enter next using one of the
2618 * trunk_request_signal_* functions.
2619 *
2620 * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2621 * is NULL, a new treq will be allocated.
2622 * Otherwise treq should point to memory allocated
2623 * with trunk_request_alloc.
2624 * @param[in] trunk to enqueue request on.
2625 * @param[in] request to enqueue.
2626 * @param[in] preq Protocol request to write out. Will be freed when
2627 * treq is freed. Should ideally be parented by the
2628 * treq if possible.
2629 * Use #trunk_request_alloc for pre-allocation of
2630 * the treq.
2631 * @param[in] rctx The resume context to write any result to.
2632 * @return
2633 * - TRUNK_ENQUEUE_OK.
2634 * - TRUNK_ENQUEUE_IN_BACKLOG.
2635 * - TRUNK_ENQUEUE_NO_CAPACITY.
2636 * - TRUNK_ENQUEUE_DST_UNAVAILABLE
2637 * - TRUNK_ENQUEUE_FAIL
2638 */
2640 request_t *request, void *preq, void *rctx)
2641{
2642 trunk_connection_t *tconn = NULL;
2643 trunk_request_t *treq;
2644 trunk_enqueue_t ret;
2645
2646 if (!fr_cond_assert_msg(!IN_HANDLER(trunk),
2647 "%s cannot be called within a handler", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2648
2649 if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == TRUNK_REQUEST_STATE_INIT),
2650 "%s requests must be in \"init\" state", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2651
2652 /*
2653 * If delay_start was set, we may need
2654 * to insert the timer for the connection manager.
2655 */
2656 if (unlikely(!trunk->started)) {
2657 if (trunk_start(trunk) < 0) return TRUNK_ENQUEUE_FAIL;
2658 }
2659
2660 ret = trunk_request_check_enqueue(&tconn, trunk, request);
2661 switch (ret) {
2662 case TRUNK_ENQUEUE_OK:
2663 if (*treq_out) {
2664 treq = *treq_out;
2665 } else {
2666 *treq_out = treq = trunk_request_alloc(trunk, request);
2667 if (!treq) return TRUNK_ENQUEUE_NO_CAPACITY;
2668 }
2669 treq->pub.preq = preq;
2670 treq->pub.rctx = rctx;
2671 if (trunk->conf.always_writable) {
2673 trunk_request_enter_pending(treq, tconn, true);
2676 } else {
2677 trunk_request_enter_pending(treq, tconn, true);
2678 }
2679 break;
2680
2682 if (*treq_out) {
2683 treq = *treq_out;
2684 } else {
2685 *treq_out = treq = trunk_request_alloc(trunk, request);
2686 if (!treq) return TRUNK_ENQUEUE_NO_CAPACITY;
2687 }
2688 treq->pub.preq = preq;
2689 treq->pub.rctx = rctx;
2690 trunk_request_enter_backlog(treq, true);
2691 break;
2692
2693 default:
2694 /*
2695 * If a trunk request was provided
2696 * populate the preq and rctx fields
2697 * so that if it's freed with
2698 * trunk_request_free, the free
2699 * function works as intended.
2700 */
2701 if (*treq_out) {
2702 treq = *treq_out;
2703 treq->pub.preq = preq;
2704 treq->pub.rctx = rctx;
2705 }
2706 return ret;
2707 }
2708
2709 return ret;
2710}
2711
2712/** Re-enqueue a request on the same connection
2713 *
2714 * If the treq has been sent, we assume that we're being signalled to requeue
2715 * because something outside of the trunk API has determined that a retransmission
2716 * is required. The easiest way to perform that retransmission is to clean up
2717 * any tracking information for the request, and the requeue it for transmission.
2718 *
2719 * IF re-queueing fails, the request will enter the fail state. It should not be
2720 * accessed if this occurs.
2721 *
2722 * @param[in] treq to requeue (retransmit).
2723 * @return
2724 * - TRUNK_ENQUEUE_OK.
2725 * - TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2726 * - TRUNK_ENQUEUE_FAIL - Request isn't in a valid state to be reassigned.
2727 */
2729{
2730 trunk_connection_t *tconn = treq->pub.tconn; /* Existing conn */
2731
2732 if (!tconn) return TRUNK_ENQUEUE_FAIL;
2733
2734 if (!IS_PROCESSING(tconn)) {
2737 }
2738
2739 switch (treq->pub.state) {
2745 trunk_request_enter_pending(treq, tconn, false);
2746 if (treq->pub.trunk->conf.always_writable) {
2748 }
2750 break;
2751
2752 case TRUNK_REQUEST_STATE_BACKLOG: /* Do nothing.... */
2753 case TRUNK_REQUEST_STATE_PENDING: /* Do nothing.... */
2754 break;
2755
2756 default:
2758 return TRUNK_ENQUEUE_FAIL;
2759 }
2760
2761 return TRUNK_ENQUEUE_OK;
2762}
2763
2764/** Enqueue additional requests on a specific connection
2765 *
2766 * This may be used to create a series of requests on a single connection, or to generate
2767 * in-band status checks.
2768 *
2769 * @note If conf->always_writable, then the muxer will be called immediately. The caller
2770 * must be able to handle multiple calls to its muxer gracefully.
2771 *
2772 * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2773 * is NULL, a new treq will be allocated.
2774 * Otherwise treq should point to memory allocated
2775 * with trunk_request_alloc.
2776 * @param[in] tconn to enqueue request on.
2777 * @param[in] request to enqueue.
2778 * @param[in] preq Protocol request to write out. Will be freed when
2779 * treq is freed. Should ideally be parented by the
2780 * treq if possible.
2781 * Use #trunk_request_alloc for pre-allocation of
2782 * the treq.
2783 * @param[in] rctx The resume context to write any result to.
2784 * @param[in] ignore_limits Ignore max_req_per_conn. Useful to force status
2785 * checks through even if the connection is at capacity.
2786 * Will also allow enqueuing on "inactive", "draining",
2787 * "draining-to-free" connections.
2788 * @return
2789 * - TRUNK_ENQUEUE_OK.
2790 * - TRUNK_ENQUEUE_NO_CAPACITY - At max_req_per_conn_limit
2791 * - TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2792 */
2794 request_t *request, void *preq, void *rctx,
2795 bool ignore_limits)
2796{
2797 trunk_request_t *treq;
2798 trunk_t *trunk = tconn->pub.trunk;
2799
2800 if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == TRUNK_REQUEST_STATE_INIT),
2801 "%s requests must be in \"init\" state", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2802
2804
2805 /*
2806 * Limits check
2807 */
2808 if (!ignore_limits) {
2809 if (trunk->conf.max_req_per_conn &&
2812
2814 }
2815
2816 if (*treq_out) {
2817 treq = *treq_out;
2818 } else {
2819 *treq_out = treq = trunk_request_alloc(trunk, request);
2820 if (!treq) return TRUNK_ENQUEUE_NO_CAPACITY;
2821 }
2822
2823 treq->pub.preq = preq;
2824 treq->pub.rctx = rctx;
2825 treq->bound_to_conn = true; /* Don't let the request be transferred */
2826
2827 if (trunk->conf.always_writable) {
2829 trunk_request_enter_pending(treq, tconn, true);
2832 } else {
2833 trunk_request_enter_pending(treq, tconn, true);
2834 }
2835
2836 return TRUNK_ENQUEUE_OK;
2837}
2838
2839#ifndef NDEBUG
2840/** Used for sanity checks to ensure all log entries have been freed
2841 *
2842 */
2844{
2845 fr_dlist_remove(slog->log_head, slog);
2846
2847 return 0;
2848}
2849
2850void trunk_request_state_log_entry_add(char const *function, int line,
2852{
2853 trunk_request_state_log_t *slog = NULL;
2854
2856 slog = fr_dlist_head(&treq->log);
2857 fr_assert_msg(slog, "slog list head NULL but element counter was %u",
2858 fr_dlist_num_elements(&treq->log));
2859 (void)fr_dlist_remove(&treq->log, slog); /* Returns NULL when removing the list head */
2860 memset(slog, 0, sizeof(*slog));
2861 } else {
2862 MEM(slog = talloc_zero(treq, trunk_request_state_log_t));
2863 talloc_set_destructor(slog, _state_log_entry_free);
2864 }
2865
2866 slog->log_head = &treq->log;
2867 slog->from = treq->pub.state;
2868 slog->to = new;
2869 slog->function = function;
2870 slog->line = line;
2871 if (treq->pub.tconn) {
2872 slog->tconn = treq->pub.tconn;
2873 slog->tconn_id = treq->pub.tconn->pub.conn->id;
2874 slog->tconn_state = treq->pub.tconn->pub.state;
2875 }
2876
2877 fr_dlist_insert_tail(&treq->log, slog);
2878
2879}
2880
2881void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line,
2882 trunk_request_t const *treq)
2883{
2884 trunk_request_state_log_t *slog = NULL;
2885
2886 int i;
2887
2888 for (slog = fr_dlist_head(&treq->log), i = 0;
2889 slog;
2890 slog = fr_dlist_next(&treq->log, slog), i++) {
2891 fr_log(log, log_type, file, line, "[%u] %s:%i - in conn %"PRIu64" in state %s - %s -> %s",
2892 i, slog->function, slog->line,
2893 slog->tconn_id,
2895 slog->tconn_state, "<INVALID>") : "none",
2896 fr_table_str_by_value(trunk_request_states, slog->from, "<INVALID>"),
2897 fr_table_str_by_value(trunk_request_states, slog->to, "<INVALID>"));
2898 }
2899}
2900#endif
2901
2902/** Return the count number of connections in the specified states
2903 *
2904 * @param[in] trunk to retrieve counts for.
2905 * @param[in] conn_state One or more #trunk_connection_state_t states or'd together.
2906 * @return The number of connections in the specified states.
2907 */
2909{
2910 uint16_t count = 0;
2911
2912 if (conn_state & TRUNK_CONN_INIT) count += fr_dlist_num_elements(&trunk->init);
2913 if (conn_state & TRUNK_CONN_CONNECTING) count += fr_dlist_num_elements(&trunk->connecting);
2914 if (conn_state & TRUNK_CONN_ACTIVE) count += fr_minmax_heap_num_elements(trunk->active);
2915 if (conn_state & TRUNK_CONN_FULL) count += fr_dlist_num_elements(&trunk->full);
2916 if (conn_state & TRUNK_CONN_INACTIVE) count += fr_dlist_num_elements(&trunk->inactive);
2918 if (conn_state & TRUNK_CONN_CLOSED) count += fr_dlist_num_elements(&trunk->closed);
2919 if (conn_state & TRUNK_CONN_DRAINING) count += fr_dlist_num_elements(&trunk->draining);
2921
2922 return count;
2923}
2924
2925/** Return the count number of requests associated with a trunk connection
2926 *
2927 * @param[in] tconn to return request count for.
2928 * @param[in] req_state One or more request states or'd together.
2929 *
2930 * @return The number of requests in the specified states, associated with a tconn.
2931 */
2933{
2934 uint32_t count = 0;
2935
2937 if (req_state & TRUNK_REQUEST_STATE_PARTIAL) count += tconn->partial ? 1 : 0;
2938 if (req_state & TRUNK_REQUEST_STATE_SENT) count += fr_dlist_num_elements(&tconn->sent);
2940 if (req_state & TRUNK_REQUEST_STATE_CANCEL) count += fr_dlist_num_elements(&tconn->cancel);
2941 if (req_state & TRUNK_REQUEST_STATE_CANCEL_PARTIAL) count += tconn->cancel_partial ? 1 : 0;
2943
2944 return count;
2945}
2946
2947/** Automatically mark a connection as full
2948 *
2949 * @param[in] tconn to potentially mark as full.
2950 */
2952{
2953 trunk_t *trunk = tconn->pub.trunk;
2955
2956 if (tconn->pub.state != TRUNK_CONN_ACTIVE) return;
2957
2958 /*
2959 * Enforces max_req_per_conn
2960 */
2961 if (trunk->conf.max_req_per_conn > 0) {
2964 }
2965}
2966
2967/** Return whether a trunk connection should currently be considered full
2968 *
2969 * @param[in] tconn to check.
2970 * @return
2971 * - true if the connection is full.
2972 * - false if the connection is not full.
2973 */
2975{
2976 trunk_t *trunk = tconn->pub.trunk;
2978
2979 /*
2980 * Enforces max_req_per_conn
2981 */
2983 if ((trunk->conf.max_req_per_conn == 0) || (count < trunk->conf.max_req_per_conn)) return false;
2984
2985 return true;
2986}
2987
2988/** Automatically mark a connection as active or reconnect it
2989 *
2990 * @param[in] tconn to potentially mark as active or reconnect.
2991 */
2993{
2994 if (tconn->pub.state != TRUNK_CONN_FULL) return;
2995
2996 /*
2997 * Enforces max_req_per_conn
2998 */
3000}
3001
3002/** A connection is readable. Call the request_demux function to read pending requests
3003 *
3004 */
3006{
3007 trunk_t *trunk = tconn->pub.trunk;
3008
3009 DO_REQUEST_DEMUX(tconn);
3010}
3011
3012/** A connection is writable. Call the request_mux function to write pending requests
3013 *
3014 */
3016{
3017 trunk_t *trunk = tconn->pub.trunk;
3018
3019 /*
3020 * Call the cancel_sent function (if we have one)
3021 * to inform a backend datastore we no longer
3022 * care about the result
3023 */
3027 DO_REQUEST_CANCEL_MUX(tconn);
3028 }
3032 DO_REQUEST_MUX(tconn);
3033}
3034
3035/** Update the registrations for I/O events we're interested in
3036 *
3037 */
3039{
3040 trunk_t *trunk = tconn->pub.trunk;
3042
3043 switch (tconn->pub.state) {
3044 /*
3045 * We only register I/O events if the trunk connection is
3046 * in one of these states.
3047 *
3048 * For the other states the trunk shouldn't be processing
3049 * requests.
3050 */
3051 case TRUNK_CONN_ACTIVE:
3052 case TRUNK_CONN_FULL:
3057 /*
3058 * If the connection is always writable,
3059 * then we don't care about write events.
3060 */
3061 if (!trunk->conf.always_writable &&
3065 (trunk->funcs.request_cancel_mux ?
3069 }
3070
3073 (trunk->funcs.request_cancel_mux ?
3076 }
3077 break;
3078
3079 default:
3080 break;
3081 }
3082
3083 if (tconn->events != events) {
3084 /*
3085 * There may be a fatal error which results
3086 * in the connection being freed.
3087 *
3088 * Stop that from happening until after
3089 * we're done using it.
3090 */
3093 tconn->events = events;
3095 }
3096}
3097
3098/** Remove a trunk connection from whichever list it's currently in
3099 *
3100 * @param[in] tconn to remove.
3101 */
3103{
3104 trunk_t *trunk = tconn->pub.trunk;
3105
3106 switch (tconn->pub.state) {
3107 case TRUNK_CONN_ACTIVE:
3108 {
3109 int ret;
3110
3111 ret = fr_minmax_heap_extract(trunk->active, tconn);
3112 if (!fr_cond_assert_msg(ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) return;
3113 }
3114 return;
3115
3116 case TRUNK_CONN_INIT:
3117 fr_dlist_remove(&trunk->init, tconn);
3118 break;
3119
3121 fr_dlist_remove(&trunk->connecting, tconn);
3122 return;
3123
3124 case TRUNK_CONN_CLOSED:
3125 fr_dlist_remove(&trunk->closed, tconn);
3126 return;
3127
3128 case TRUNK_CONN_FULL:
3129 trunk_list_full_remove(trunk, tconn);
3130 return;
3131
3133 trunk_list_inactive_remove(trunk, tconn);
3134 return;
3135
3137 trunk_list_inactive_draining_remove(trunk, tconn);
3138 return;
3139
3141 trunk_list_draining_remove(trunk, tconn);
3142 return;
3143
3145 fr_dlist_remove(&trunk->draining_to_free, tconn);
3146 return;
3147
3148 case TRUNK_CONN_HALTED:
3149 return;
3150 }
3151}
3152
3153/** Transition a connection to the full state
3154 *
3155 * Called whenever a trunk connection is at the maximum number of requests.
3156 * Removes the connection from the connected heap, and places it in the full list.
3157 */
3159{
3160 trunk_t *trunk = tconn->pub.trunk;
3161
3162 switch (tconn->pub.state) {
3163 case TRUNK_CONN_ACTIVE:
3165 break;
3166
3167 default:
3169 }
3170
3171 trunk_list_full_add(trunk, tconn);
3173}
3174
3175/** Transition a connection to the inactive state
3176 *
3177 * Called whenever the API client wants to stop new requests being enqueued
3178 * on a trunk connection.
3179 */
3181{
3182 trunk_t *trunk = tconn->pub.trunk;
3183
3184 switch (tconn->pub.state) {
3185 case TRUNK_CONN_ACTIVE:
3186 case TRUNK_CONN_FULL:
3188 break;
3189
3190 default:
3192 }
3193
3194 trunk_list_inactive_add(trunk, tconn);
3196}
3197
3198/** Transition a connection to the inactive-draining state
3199 *
3200 * Called whenever the trunk manager wants to drain an inactive connection
3201 * of its requests.
3202 */
3204{
3205 trunk_t *trunk = tconn->pub.trunk;
3206
3207 switch (tconn->pub.state) {
3211 break;
3212
3213 default:
3215 }
3216
3217 trunk_list_inactive_draining_add(trunk, tconn);
3219
3220 /*
3221 * Immediately re-enqueue all pending
3222 * requests, so the connection is drained
3223 * quicker.
3224 */
3226}
3227
3228/** Transition a connection to the draining state
3229 *
3230 * Removes the connection from the active heap so it won't be assigned any new
3231 * connections.
3232 */
3234{
3235 trunk_t *trunk = tconn->pub.trunk;
3236
3237 switch (tconn->pub.state) {
3238 case TRUNK_CONN_ACTIVE:
3239 case TRUNK_CONN_FULL:
3243 break;
3244
3245 default:
3247 }
3248
3249 trunk_list_draining_add(trunk, tconn);
3251
3252 /*
3253 * Immediately re-enqueue all pending
3254 * requests, so the connection is drained
3255 * quicker.
3256 */
3258}
3259
3260/** Transition a connection to the draining-to-reconnect state
3261 *
3262 * Removes the connection from the active heap so it won't be assigned any new
3263 * connections.
3264 */
3266{
3267 trunk_t *trunk = tconn->pub.trunk;
3268
3270
3271 switch (tconn->pub.state) {
3272 case TRUNK_CONN_ACTIVE:
3273 case TRUNK_CONN_FULL:
3278 break;
3279
3280 default:
3282 }
3283
3284 fr_dlist_insert_head(&trunk->draining_to_free, tconn);
3286
3287 /*
3288 * Immediately re-enqueue all pending
3289 * requests, so the connection is drained
3290 * quicker.
3291 */
3293}
3294
3295
3296/** Transition a connection back to the active state
3297 *
3298 * This should only be called on a connection which is in the full state,
3299 * inactive state, draining state or connecting state.
3300 */
3302{
3303 trunk_t *trunk = tconn->pub.trunk;
3304 int ret;
3305
3306 switch (tconn->pub.state) {
3307 case TRUNK_CONN_FULL:
3312 break;
3313
3314 case TRUNK_CONN_INIT:
3318 break;
3319
3320 default:
3322 }
3323
3324 ret = fr_minmax_heap_insert(trunk->active, tconn); /* re-insert into the active heap*/
3325 if (!fr_cond_assert_msg(ret == 0, "Failed inserting connection into active heap: %s", fr_strerror())) {
3327 return;
3328 }
3329
3331
3332 /*
3333 * Reorder the connections
3334 */
3335 CONN_REORDER(tconn);
3336
3337 /*
3338 * Rebalance requests
3339 */
3340 trunk_rebalance(trunk);
3341
3342 /*
3343 * We place requests into the backlog
3344 * because there were no connections
3345 * available to handle them.
3346 *
3347 * If a connection has become active
3348 * chances are those backlogged requests
3349 * can now be enqueued, so try and do
3350 * that now.
3351 *
3352 * If there's requests sitting in the
3353 * backlog indefinitely, it's because
3354 * they were inserted there erroneously
3355 * when there were active connections
3356 * which could have handled them.
3357 */
3358 trunk_backlog_drain(trunk);
3359}
3360
3361/** Connection transitioned to the init state
3362 *
3363 * Reflect the connection state change in the lists we use to track connections.
3364 *
3365 * @note This function is only called from the connection API as a watcher.
3366 *
3367 * @param[in] conn The connection which changes state.
3368 * @param[in] prev The connection is was in.
3369 * @param[in] state The connection is now in.
3370 * @param[in] uctx The trunk_connection_t wrapping the connection.
3371 */
3375 void *uctx)
3376{
3377 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3378 trunk_t *trunk = tconn->pub.trunk;
3379
3380 switch (tconn->pub.state) {
3381 case TRUNK_CONN_HALTED:
3382 break;
3383
3384 case TRUNK_CONN_CLOSED:
3386 break;
3387
3388 default:
3390 }
3391
3392 fr_dlist_insert_head(&trunk->init, tconn);
3394}
3395
3396/** Connection transitioned to the connecting state
3397 *
3398 * Reflect the connection state change in the lists we use to track connections.
3399 *
3400 * @note This function is only called from the connection API as a watcher.
3401 *
3402 * @param[in] conn The connection which changes state.
3403 * @param[in] prev The connection is was in.
3404 * @param[in] state The connection is now in.
3405 * @param[in] uctx The trunk_connection_t wrapping the connection.
3406 */
3410 void *uctx)
3411{
3412 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3413 trunk_t *trunk = tconn->pub.trunk;
3414
3415 switch (tconn->pub.state) {
3416 case TRUNK_CONN_INIT:
3417 case TRUNK_CONN_CLOSED:
3419 break;
3420
3421 default:
3423 }
3424
3425 /*
3426 * If a connection just entered the
3427 * connecting state, it should have
3428 * no requests associated with it.
3429 */
3431
3432 fr_dlist_insert_head(&trunk->connecting, tconn); /* MUST remain a head insertion for reconnect logic */
3434}
3435
3436/** Connection transitioned to the shutdown state
3437 *
3438 * If we're not already in the draining-to-free state, transition there now.
3439 *
3440 * The idea is that if something signalled the connection to shutdown, we need
3441 * to reflect that by dequeuing any pending requests, not accepting new ones,
3442 * and waiting for the existing requests to complete.
3443 *
3444 * @note This function is only called from the connection API as a watcher.
3445 *
3446 * @param[in] conn The connection which changes state.
3447 * @param[in] prev The connection is was in.
3448 * @param[in] state The connection is now in.
3449 * @param[in] uctx The trunk_connection_t wrapping the connection.
3450 */
3454 void *uctx)
3455{
3456 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3457
3458 switch (tconn->pub.state) {
3459 case TRUNK_CONN_DRAINING_TO_FREE: /* Do Nothing */
3460 return;
3461
3462 case TRUNK_CONN_ACTIVE: /* Transition to draining-to-free */
3463 case TRUNK_CONN_FULL:
3467 break;
3468
3469 case TRUNK_CONN_INIT:
3471 case TRUNK_CONN_CLOSED:
3472 case TRUNK_CONN_HALTED:
3474 }
3475
3477}
3478
3479/** Trigger a reconnection of the trunk connection
3480 *
3481 * @param[in] tl timer list the timer was inserted into.
3482 * @param[in] now Current time.
3483 * @param[in] uctx The tconn.
3484 */
3486{
3487 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3488
3490}
3491
3492/** Connection transitioned to the connected state
3493 *
3494 * Reflect the connection state change in the lists we use to track connections.
3495 *
3496 * @note This function is only called from the connection API as a watcher.
3497 *
3498 * @param[in] conn The connection which changes state.
3499 * @param[in] prev The connection is was in.
3500 * @param[in] state The connection is now in.
3501 * @param[in] uctx The trunk_connection_t wrapping the connection.
3502 */
3506 void *uctx)
3507{
3508 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3509 trunk_t *trunk = tconn->pub.trunk;
3510
3511 /*
3512 * If a connection was just connected, it should only
3513 * have a pending list of requests. This state is found
3514 * in the rlm_radius module, which starts a new trunk,
3515 * and then immediately enqueues a request onto it. The
3516 * alternative for rlm_radius is to keep it's own queue
3517 * of pending requests before the trunk is fully
3518 * initialized. And then enqueue them onto the trunk
3519 * when the trunk is connected.
3520 *
3521 * It's instead easier (and makes more sense) to allow
3522 * the trunk to accept packets into its queue. If there
3523 * are no connections within a period of time, then the
3524 * requests will retry, or will time out.
3525 */
3527
3528 /*
3529 * Set here, as the active state can
3530 * be transitioned to from full and
3531 * draining too.
3532 */
3533 trunk->pub.last_connected = fr_time();
3534
3535 /*
3536 * Insert a timer to reconnect the
3537 * connection periodically.
3538 */
3539 if (fr_time_delta_ispos(trunk->conf.lifetime)) {
3540 if (fr_timer_in(tconn, trunk->el->tl, &tconn->lifetime_ev,
3541 trunk->conf.lifetime, false, _trunk_connection_lifetime_expire, tconn) < 0) {
3542 PERROR("Failed inserting connection reconnection timer event, halting connection");
3544 return;
3545 }
3546 }
3547
3549}
3550
3551/** Connection failed after it was connected
3552 *
3553 * Reflect the connection state change in the lists we use to track connections.
3554 *
3555 * @note This function is only called from the connection API as a watcher.
3556 *
3557 * @param[in] conn The connection which changes state.
3558 * @param[in] prev The connection is was in.
3559 * @param[in] state The connection is now in.
3560 * @param[in] uctx The trunk_connection_t wrapping the connection.
3561 */
3565 void *uctx)
3566{
3567 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3568 trunk_t *trunk = tconn->pub.trunk;
3569 bool need_requeue = false;
3570
3571 switch (tconn->pub.state) {
3572 case TRUNK_CONN_ACTIVE:
3573 case TRUNK_CONN_FULL:
3578 need_requeue = true;
3580 break;
3581
3582 case TRUNK_CONN_INIT: /* Initialisation failed */
3586 break;
3587
3588 case TRUNK_CONN_CLOSED:
3589 case TRUNK_CONN_HALTED: /* Can't move backwards? */
3591 }
3592
3593 fr_dlist_insert_head(&trunk->closed, tconn); /* MUST remain a head insertion for reconnect logic */
3595
3596 /*
3597 * Now *AFTER* the connection has been
3598 * removed from the active, pool
3599 * re-enqueue the requests.
3600 */
3601 if (need_requeue) trunk_connection_requests_requeue_priv(tconn, TRUNK_REQUEST_STATE_ALL, 0, true);
3602
3603 /*
3604 * There should be no requests left on this
3605 * connection. They should have all been
3606 * moved off or failed.
3607 */
3609
3610 /*
3611 * Clear statistics and flags
3612 */
3613 tconn->sent_count = 0;
3614
3615 /*
3616 * Remove the reconnect event
3617 */
3619
3620 /*
3621 * Remove the I/O events
3622 */
3624}
3625
3626/** Connection failed
3627 *
3628 * @param[in] conn The connection which changes state.
3629 * @param[in] prev The connection is was in.
3630 * @param[in] state The connection is now in.
3631 * @param[in] uctx The trunk_connection_t wrapping the connection.
3632 */
3634 connection_state_t prev,
3636 void *uctx)
3637{
3638 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3639 trunk_t *trunk = tconn->pub.trunk;
3640
3641 /*
3642 * Need to set this first as it
3643 * determines whether requests are
3644 * re-queued or fail outright.
3645 */
3646 trunk->pub.last_failed = fr_time();
3647
3648 /*
3649 * Failed in the init state, transition the
3650 * connection to closed, else we get an
3651 * INIT -> INIT transition which triggers
3652 * an assert.
3653 */
3654 if (prev == CONNECTION_STATE_INIT) _trunk_connection_on_closed(conn, prev, state, uctx);
3655
3656 /*
3657 * See what the state of the trunk is
3658 * if there are no connections that could
3659 * potentially accept requests in the near
3660 * future, then fail all the requests in the
3661 * trunk backlog.
3662 */
3663 if ((prev == CONNECTION_STATE_CONNECTED) &&
3668}
3669
3670/** Connection transitioned to the halted state
3671 *
3672 * Remove the connection remove all lists, as it's likely about to be freed.
3673 *
3674 * Setting the trunk back to the init state ensures that if the code is ever
3675 * refactored and #connection_signal_reconnect is used after a connection
3676 * is halted, then everything is maintained in a valid state.
3677 *
3678 * @note This function is only called from the connection API as a watcher.
3679 *
3680 * @param[in] conn The connection which changes state.
3681 * @param[in] prev The connection is was in.
3682 * @param[in] state The connection is now in.
3683 * @param[in] uctx The trunk_connection_t wrapping the connection.
3684 */
3688 void *uctx)
3689{
3690 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3691 trunk_t *trunk = tconn->pub.trunk;
3692
3693 switch (tconn->pub.state) {
3694 case TRUNK_CONN_INIT:
3695 case TRUNK_CONN_CLOSED:
3697 break;
3698
3699 default:
3701 }
3702
3703 /*
3704 * It began life in the halted state,
3705 * and will end life in the halted state.
3706 */
3708
3709 /*
3710 * There should be no requests left on this
3711 * connection. They should have all been
3712 * moved off or failed.
3713 */
3715
3716 /*
3717 * And free the connection...
3718 */
3719 if (trunk->in_handler) {
3720 /*
3721 * ...later.
3722 */
3723 fr_dlist_insert_tail(&trunk->to_free, tconn);
3724 return;
3725 }
3726 talloc_free(tconn);
3727}
3728
3729/** Free a connection
3730 *
3731 * Enforces orderly free order of children of the tconn
3732 */
3734{
3736 fr_assert(!fr_dlist_entry_in_list(&tconn->entry)); /* Should not be in a list */
3737
3738 /*
3739 * Loop over all the requests we gathered
3740 * and transition them to the failed state,
3741 * freeing them.
3742 *
3743 * Usually, requests will be re-queued when
3744 * a connection enters the closed state,
3745 * but in this case because the whole trunk
3746 * is being freed, we don't bother, and
3747 * just signal to the API client that the
3748 * requests failed.
3749 */
3750 if (tconn->pub.trunk->freeing) {
3751 fr_dlist_head_t to_fail;
3752 trunk_request_t *treq = NULL;
3753
3754 fr_dlist_talloc_init(&to_fail, trunk_request_t, entry);
3755
3756 /*
3757 * Remove requests from this connection
3758 */
3760 while ((treq = fr_dlist_next(&to_fail, treq))) {
3761 trunk_request_t *prev;
3762
3763 prev = fr_dlist_remove(&to_fail, treq);
3765 treq = prev;
3766 }
3767 }
3768
3769 /*
3770 * Ensure we're not signalled by the connection
3771 * as it processes its backlog of state changes,
3772 * as we are about to be freed.
3773 */
3781
3782 /*
3783 * This may return -1, indicating the free was deferred
3784 * this is fine. It just means the conn will be freed
3785 * after all the handlers have exited.
3786 */
3787 (void)talloc_free(tconn->pub.conn);
3788 tconn->pub.conn = NULL;
3789
3790 return 0;
3791}
3792
3793/** Attempt to spawn a new connection
3794 *
3795 * Calls the API client's alloc() callback to create a new connection_t,
3796 * then inserts the connection into the 'connecting' list.
3797 *
3798 * @param[in] trunk to spawn connection in.
3799 * @param[in] now The current time.
3800 */
3802{
3803 trunk_connection_t *tconn;
3804
3805
3806 /*
3807 * Call the API client's callback to create
3808 * a new connection_t.
3809 */
3810 MEM(tconn = talloc_zero(trunk, trunk_connection_t));
3811 tconn->pub.trunk = trunk;
3812 tconn->pub.state = TRUNK_CONN_HALTED; /* All connections start in the halted state */
3813
3814 /*
3815 * Allocate a new connection_t or fail.
3816 */
3817 DO_CONNECTION_ALLOC(tconn);
3818
3820 fr_dlist_talloc_init(&tconn->sent, trunk_request_t, entry);
3824
3825 /*
3826 * OK, we have the connection, now setup watch
3827 * points so we know when it changes state.
3828 *
3829 * This lets us automatically move the tconn
3830 * between the different lists in the trunk
3831 * with minimum extra code.
3832 */
3834 _trunk_connection_on_init, false, tconn); /* Before init() has been called */
3835
3837 _trunk_connection_on_connecting, false, tconn); /* After init() has been called */
3838
3840 _trunk_connection_on_connected, false, tconn); /* After open() has been called */
3841
3843 _trunk_connection_on_closed, false, tconn); /* Before close() has been called */
3844
3846 _trunk_connection_on_failed, false, tconn); /* Before failed() has been called */
3847
3849 _trunk_connection_on_shutdown, false, tconn); /* After shutdown() has been called */
3850
3852 _trunk_connection_on_halted, false, tconn); /* About to be freed */
3853
3854 talloc_set_destructor(tconn, _trunk_connection_free);
3855
3856 connection_signal_init(tconn->pub.conn); /* annnnd GO! */
3857
3858 trunk->pub.last_open = now;
3859
3860 return 0;
3861}
3862
3863/** Pop a cancellation request off a connection's cancellation queue
3864 *
3865 * The request we return is advanced by the request moving out of the
3866 * cancel state and into the cancel_sent or cancel_complete state.
3867 *
3868 * One of these signalling functions must be called after the request
3869 * has been popped:
3870 *
3871 * - #trunk_request_signal_cancel_sent
3872 * The remote datastore has been informed, but we need to wait for acknowledgement.
3873 * The #trunk_request_demux_t callback must handle the acks calling
3874 * #trunk_request_signal_cancel_complete when an ack is received.
3875 *
3876 * - #trunk_request_signal_cancel_complete
3877 * The request was cancelled and we don't need to wait, clean it up immediately.
3878 *
3879 * @param[out] treq_out to process
3880 * @param[in] tconn Connection to drain cancellation request from.
3881 * @return
3882 * - 1 if no more requests.
3883 * - 0 if a new request was written to treq_out.
3884 * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3885 * memory or requests associated with the connection.
3886 * - -2 if called outside of the cancel muxer.
3887 */
3889{
3890 if (unlikely(tconn->pub.state == TRUNK_CONN_HALTED)) return -1;
3891
3893 "%s can only be called from within request_cancel_mux handler",
3894 __FUNCTION__)) return -2;
3895
3896 *treq_out = tconn->cancel_partial ? tconn->cancel_partial : fr_dlist_head(&tconn->cancel);
3897 if (!*treq_out) return 1;
3898
3899 return 0;
3900}
3901
3902/** Pop a request off a connection's pending queue
3903 *
3904 * The request we return is advanced by the request moving out of the partial or
3905 * pending states, when the mux function signals us.
3906 *
3907 * If the same request is returned again and again, it means the muxer isn't actually
3908 * doing anything with the request we returned, and it's and error in the muxer code.
3909 *
3910 * One of these signalling functions must be used after the request has been popped:
3911 *
3912 * - #trunk_request_signal_complete
3913 * The request was completed. Either we got a synchronous response, or we knew the
3914 * response without contacting an external server (cache).
3915 *
3916 * - #trunk_request_signal_fail
3917 * Failed muxing the request due to a permanent issue, i.e. an invalid request.
3918 *
3919 * - #trunk_request_signal_partial
3920 * Wrote part of a request. This request will be returned on the next call to this
3921 * function so that the request_mux function can finish writing it. Only useful
3922 * for stream type connections. Datagram type connections cannot have partial
3923 * writes.
3924 *
3925 * - #trunk_request_signal_sent Successfully sent a request.
3926 *
3927 * @param[out] treq_out to process
3928 * @param[in] tconn to pop a request from.
3929 * @return
3930 * - 1 if no more requests.
3931 * - 0 if a new request was written to treq_out.
3932 * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3933 * memory or requests associated with the connection.
3934 * - -2 if called outside of the muxer.
3935 */
3937{
3938 if (unlikely(tconn->pub.state == TRUNK_CONN_HALTED)) return -1;
3939
3941 "%s can only be called from within request_mux handler",
3942 __FUNCTION__)) return -2;
3943
3944 *treq_out = tconn->partial ? tconn->partial : fr_heap_peek(tconn->pending);
3945 if (!*treq_out) return 1;
3946
3947 return 0;
3948}
3949
3950/** Signal that a trunk connection is writable
3951 *
3952 * Should be called from the 'write' I/O handler to signal that requests can be enqueued.
3953 *
3954 * @param[in] tconn to signal.
3955 */
3957{
3958 trunk_t *trunk = tconn->pub.trunk;
3959
3960 if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3961 "%s cannot be called within a handler", __FUNCTION__)) return;
3962
3963 DEBUG3("[%" PRIu64 "] Signalled writable", tconn->pub.conn->id);
3964
3966}
3967
3968/** Signal that a trunk connection is readable
3969 *
3970 * Should be called from the 'read' I/O handler to signal that requests should be dequeued.
3971 *
3972 * @param[in] tconn to signal.
3973 */
3975{
3976 trunk_t *trunk = tconn->pub.trunk;
3977
3978 if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3979 "%s cannot be called within a handler", __FUNCTION__)) return;
3980
3981 DEBUG3("[%" PRIu64 "] Signalled readable", tconn->pub.conn->id);
3982
3984}
3985
3986/** Signal a trunk connection cannot accept more requests
3987 *
3988 * @param[in] tconn to signal.
3989 */
3991{
3992 /* Can be called anywhere */
3993
3994 switch (tconn->pub.state) {
3995 case TRUNK_CONN_ACTIVE:
3996 case TRUNK_CONN_FULL:
3998 break;
3999
4002 break;
4003
4004 default:
4005 return;
4006 }
4007}
4008
4009/** Signal a trunk connection is no longer full
4010 *
4011 * @param[in] tconn to signal.
4012 */
4014{
4015 switch (tconn->pub.state) {
4016 case TRUNK_CONN_FULL:
4017 trunk_connection_auto_unfull(tconn); /* Mark as active if it should be active */
4018 break;
4019
4021 /*
4022 * Do the appropriate state transition based on
4023 * how many requests the trunk connection is
4024 * currently servicing.
4025 */
4026 if (trunk_connection_is_full(tconn)) {
4028 break;
4029 }
4031 break;
4032
4033 /*
4034 * Unsetting the active flag just moves
4035 * the connection back to the normal
4036 * draining state.
4037 */
4038 case TRUNK_CONN_INACTIVE_DRAINING: /* Only an external signal can trigger this transition */
4040 break;
4041
4042 default:
4043 return;
4044 }
4045}
4046
4047/** Signal a trunk connection is no longer viable
4048 *
4049 * @param[in] tconn to signal.
4050 * @param[in] reason the connection is being reconnected.
4051 */
4056
4057/** Standard I/O read function
4058 *
4059 * Underlying FD in now readable, so call the trunk to read any pending requests
4060 * from this connection.
4061 *
4062 * @param[in] el The event list signalling.
4063 * @param[in] fd that's now readable.
4064 * @param[in] flags describing the read event.
4065 * @param[in] uctx The trunk connection handle (tconn).
4066 */
4068{
4069 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
4070
4072}
4073
4074/** Standard I/O write function
4075 *
4076 * Underlying FD is now writable, so call the trunk to write any pending requests
4077 * to this connection.
4078 *
4079 * @param[in] el The event list signalling.
4080 * @param[in] fd that's now writable.
4081 * @param[in] flags describing the write event.
4082 * @param[in] uctx The trunk connection handle (tcon).
4083 */
4085{
4086 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
4087
4089}
4090
4091
4092/** Returns true if the trunk connection is in one of the specified states
4093 *
4094 * @param[in] tconn To check state for.
4095 * @param[in] state to check
4096 * @return
4097 * - True if trunk connection is in a particular state.
4098 * - False if trunk connection is not in a particular state.
4099 */
4101{
4102 return (bool)(tconn->pub.state & state);
4103}
4104
4105/** Close connections in a particular connection list if they have no requests associated with them
4106 *
4107 * @param[in] trunk containing connections we want to close.
4108 * @param[in] head of list of connections to examine.
4109 */
4111{
4112 trunk_connection_t *tconn = NULL;
4113
4114 while ((tconn = fr_dlist_next(head, tconn))) {
4115 trunk_connection_t *prev;
4116
4118
4119 prev = fr_dlist_prev(head, tconn);
4120
4121 DEBUG3("Closing %s connection with no requests",
4123 /*
4124 * Close the connection as gracefully
4125 * as possible by signalling it should
4126 * shutdown.
4127 *
4128 * The connection, should, if serviced
4129 * correctly by the underlying library,
4130 * automatically transition to halted after
4131 * all pending reads/writes are
4132 * complete at which point we'll be informed
4133 * and free our tconn wrapper.
4134 */
4136 tconn = prev;
4137 }
4138}
4139
4140/** Rebalance connections across active trunk members when a new connection becomes active
4141 *
4142 * We don't have any visibility into the connection prioritisation algorithm
4143 * it's essentially a black box.
4144 *
4145 * We can however determine when the correct level of requests per connection
4146 * has been reached, by dequeuing and requeing requests up until the point
4147 * where the connection that just had a request dequeued, receives the same
4148 * request back.
4149 *
4150 * @param[in] trunk The trunk to rebalance.
4151 */
4152static void trunk_rebalance(trunk_t *trunk)
4153{
4155
4157
4158 /*
4159 * Only rebalance if the top and bottom of
4160 * the heap are not equal.
4161 */
4162 if (trunk->funcs.connection_prioritise(fr_minmax_heap_max_peek(trunk->active), head) == 0) return;
4163
4164 DEBUG3("Rebalancing requests");
4165
4166 /*
4167 * Keep requeuing requests from the connection
4168 * at the bottom of the heap until the
4169 * connection at the top is shifted from that
4170 * position.
4171 */
4172 while ((fr_minmax_heap_min_peek(trunk->active) == head) &&
4174 TRUNK_REQUEST_STATE_PENDING, 1, false));
4175}
4176
4177/** Implements the algorithm we use to manage requests per connection levels
4178 *
4179 * This is executed periodically using a timer event, and opens/closes
4180 * connections.
4181 *
4182 * The aim is to try and keep the request per connection level in a sweet spot,
4183 * where there's enough outstanding work for the connection/pipelining to work
4184 * efficiently, but not so much so that we encounter increased latency.
4185 *
4186 * In the request enqueue and dequeue functions we record every time the
4187 * average number of requests per connection goes above the target count
4188 * and record every time the average number of requests per connection goes
4189 * below the target count.
4190 *
4191 * This may sound expensive, but in all cases we're just summing counters.
4192 * CPU time required does not increase with additional requests, only with
4193 * large numbers of connections.
4194 *
4195 * If we do encounter scaling issues, we can always maintain the counters
4196 * as aggregates as an optimisation later.
4197 *
4198 * If when the management function runs, the trunk was above the target
4199 * most recently, we:
4200 * - Return if we've been in this state for a shorter period than 'open_delay'.
4201 * - Return if we're at max.
4202 * - Return if opening a new connection will take us below the load target.
4203 * - Return if we last opened a connection within 'open_delay'.
4204 * - Otherwise we attempt to open a new connection.
4205 *
4206 * If the trunk we below the target most recently, we:
4207 * - Return if we've been in this state for a shorter period than 'close_delay'.
4208 * - Return if we're at min.
4209 * - Return if we have no connections.
4210 * - Close a connection if min is 0, and we have no outstanding
4211 * requests. Then return.
4212 * - Return if closing a new connection will take us above the load target.
4213 * - Return if we last closed a connection within 'closed_delay'.
4214 * - Otherwise we move a connection to draining state.
4215 */
4216static void trunk_manage(trunk_t *trunk, fr_time_t now)
4217{
4218 trunk_connection_t *tconn = NULL;
4219 trunk_request_t *treq;
4220 uint32_t average = 0;
4221 uint32_t req_count;
4222 uint16_t conn_count;
4223 trunk_state_t new_state;
4224
4225 DEBUG4("Managing trunk");
4226
4227 /*
4228 * Cleanup requests in our request cache which
4229 * have been reapable for too long.
4230 */
4231 while ((treq = trunk_list_free_requests_peek(trunk)) &&
4233
4234 /*
4235 * If we have idle connections, then close them.
4236 */
4239 fr_time_t idle_cutoff = fr_time_sub(now, trunk->conf.idle_timeout);
4240
4241 for (tconn = fr_minmax_heap_iter_init(trunk->active, &iter);
4242 tconn;
4243 tconn = fr_minmax_heap_iter_next(trunk->active, &iter)) {
4244 /*
4245 * The connection has outstanding requests without replies, don't do anything.
4246 */
4247 if (fr_heap_num_elements(tconn->pending) > 0) continue;
4248
4249 /*
4250 * The connection was last active after the idle cutoff time, don't do anything.
4251 */
4252 if (fr_time_gt(tconn->pub.last_write_success, idle_cutoff)) continue;
4253
4254 /*
4255 * This connection has been inactive since before the idle timeout. Drain it,
4256 * and free it.
4257 *
4258 * This also extracts the connection from the minmax heap, which invalidates the
4259 * iterator, so we stop iterating over it.
4260 */
4262 break;
4263 }
4264 }
4265
4266 /*
4267 * Free any connections which have drained
4268 * and we didn't reactivate during the last
4269 * round of management.
4270 */
4274
4275 /*
4276 * Process deferred connection freeing
4277 */
4278 if (!trunk->in_handler) fr_dlist_talloc_free(&trunk->to_free);
4279
4280 /*
4281 * Update the state of the trunk
4282 */
4284 new_state = TRUNK_STATE_ACTIVE;
4285 } else {
4286 /*
4287 * INIT / CONNECTING / FULL mean connections will become active
4288 * so the trunk is PENDING
4289 */
4294 }
4295
4296 if (new_state != trunk->pub.state) TRUNK_STATE_TRANSITION(new_state);
4297
4298 /*
4299 * A trunk can be signalled to not proactively
4300 * manage connections if a destination is known
4301 * to be unreachable, and doing so would result
4302 * in spurious connections still being opened.
4303 *
4304 * We still run other connection management
4305 * functions and just short circuit the function
4306 * here.
4307 */
4308 if (!trunk->managing_connections) return;
4309
4310 /*
4311 * We're above the target requests per connection
4312 * spawn more connections!
4313 */
4315 /*
4316 * If connecting is provided, check we
4317 * wouldn't have too many connections in
4318 * the connecting state.
4319 *
4320 * This is a throttle in the case of transitory
4321 * load spikes, or a backend becoming
4322 * unavailable.
4323 */
4324 if ((trunk->conf.connecting > 0) &&
4326 trunk->conf.connecting)) {
4327 DEBUG4("Not opening connection - Too many (%u) connections in the connecting state",
4328 trunk->conf.connecting);
4329 return;
4330 }
4331
4332 trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4333
4334 /*
4335 * Only apply hysteresis if we have at least
4336 * one available connection.
4337 */
4338 if (conn_count && fr_time_gt(fr_time_add(trunk->pub.last_above_target, trunk->conf.open_delay), now)) {
4339 DEBUG4("Not opening connection - Need to be above target for %pVs. It's been %pVs",
4342 return; /* too soon */
4343 }
4344
4345 /*
4346 * We don't consider 'draining' connections
4347 * in the max calculation, as if we do
4348 * determine that we need to spawn a new
4349 * request, then we'd move all 'draining'
4350 * connections to active before spawning
4351 * any new connections.
4352 */
4353 if ((trunk->conf.max > 0) && (conn_count >= trunk->conf.max)) {
4354 DEBUG4("Not opening connection - Have %u connections, need %u or below",
4355 conn_count, trunk->conf.max);
4356 return;
4357 }
4358
4359 /*
4360 * We consider requests pending on all connections
4361 * and the trunk's backlog as that's the current count
4362 * load.
4363 */
4364 if (!req_count) {
4365 DEBUG4("Not opening connection - No outstanding requests");
4366 return;
4367 }
4368
4369 /*
4370 * Do the n+1 check, i.e. if we open one connection
4371 * will that take us below our target threshold.
4372 */
4373 if (conn_count > 0) {
4374 average = ROUND_UP_DIV(req_count, (conn_count + 1));
4375 if (average < trunk->conf.target_req_per_conn) {
4376 DEBUG4("Not opening connection - Would leave us below our target requests "
4377 "per connection (now %u, after open %u)",
4378 ROUND_UP_DIV(req_count, conn_count), average);
4379 return;
4380 }
4381 } else {
4382 (void)trunk_connection_spawn(trunk, now);
4383 return;
4384 }
4385
4386 /*
4387 * If we've got a connection in the draining list
4388 * move it back into the active list if we've
4389 * been requested to add a connection back in.
4390 */
4391 tconn = fr_dlist_head(&trunk->draining);
4392 if (tconn) {
4393 if (trunk_connection_is_full(tconn)) {
4395 } else {
4397 }
4398 return;
4399 }
4400
4401 /*
4402 * Implement delay if there's no connections that
4403 * could be immediately re-activated.
4404 */
4405 if (fr_time_gt(fr_time_add(trunk->pub.last_open, trunk->conf.open_delay), now)) {
4406 DEBUG4("Not opening connection - Need to wait %pVs before opening another connection. "
4407 "It's been %pVs",
4410 return;
4411 }
4412
4413 DEBUG4("Opening connection - Above target requests per connection (now %u, target %u)",
4414 ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4415 /* last_open set by trunk_connection_spawn */
4416 (void)trunk_connection_spawn(trunk, now);
4417 }
4418
4419 /*
4420 * We're below the target requests per connection.
4421 * Free some connections...
4422 */
4423 else if (fr_time_gt(trunk->pub.last_below_target, trunk->pub.last_above_target)) {
4424 if (fr_time_gt(fr_time_add(trunk->pub.last_below_target, trunk->conf.close_delay), now)) {
4425 DEBUG4("Not closing connection - Need to be below target for %pVs. It's been %pVs",
4428 return; /* too soon */
4429 }
4430
4431 trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4432
4433 if (!conn_count) {
4434 DEBUG4("Not closing connection - No connections to close!");
4435 return;
4436 }
4437
4438 if ((trunk->conf.min > 0) && ((conn_count - 1) < trunk->conf.min)) {
4439 DEBUG4("Not closing connection - Have %u connections, need %u or above",
4440 conn_count, trunk->conf.min);
4441 return;
4442 }
4443
4444 if (!req_count) {
4445 DEBUG4("Closing connection - No outstanding requests");
4446 goto close;
4447 }
4448
4449 /*
4450 * The minimum number of connections must be set
4451 * to zero for this to work.
4452 * min == 0, no requests, close all the connections.
4453 * This is useful for backup databases, when
4454 * maintaining the connection would lead to lots of
4455 * log file churn.
4456 */
4457 if (conn_count == 1) {
4458 DEBUG4("Not closing connection - Would leave connections "
4459 "and there are still %u outstanding requests", req_count);
4460 return;
4461 }
4462
4463 /*
4464 * Do the n-1 check, i.e. if we close one connection
4465 * will that take us above our target threshold.
4466 */
4467 average = ROUND_UP_DIV(req_count, (conn_count - 1));
4468 if (average > trunk->conf.target_req_per_conn) {
4469 DEBUG4("Not closing connection - Would leave us above our target requests per connection "
4470 "(now %u, after close %u)", ROUND_UP_DIV(req_count, conn_count), average);
4471 return;
4472 }
4473
4474 DEBUG4("Closing connection - Below target requests per connection (now %u, target %u)",
4475 ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4476
4477 close:
4478 if (fr_time_gt(fr_time_add(trunk->pub.last_closed, trunk->conf.close_delay), now)) {
4479 DEBUG4("Not closing connection - Need to wait %pVs before closing another connection. "
4480 "It's been %pVs",
4483 return;
4484 }
4485
4486 /*
4487 * If the last event on the trunk was a connection failure and
4488 * there is only one connection, this may well be a reconnect
4489 * attempt after a failure - and needs to persist otherwise
4490 * the last event will be a failure and no new connection will
4491 * be made, leading to no new requests being enqueued.
4492 */
4493 if (fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
4494 fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed) && (conn_count == 1)) {
4495 DEBUG4("Not closing remaining connection - last event was a failure");
4496 return;
4497 }
4498
4499 /*
4500 * Inactive connections get counted in the
4501 * set of viable connections, but are likely
4502 * to be congested or dead, so we drain
4503 * (and possibly eventually free) those first.
4504 */
4505 if ((tconn = trunk_list_inactive_peek(trunk))) {
4506 /*
4507 * If the connection has no requests associated
4508 * with it then immediately free.
4509 */
4511 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4512 } else {
4514 }
4515 /*
4516 * It is possible to have too may connecting
4517 * connections when the connections are
4518 * taking a while to open and the number
4519 * of requests decreases.
4520 */
4521 } else if ((tconn = fr_dlist_tail(&trunk->connecting))) {
4522 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4523
4524 /*
4525 * Finally if there are no "connecting"
4526 * connections to close, and no "inactive"
4527 * connections, start draining "active"
4528 * connections.
4529 */
4530 } else if ((tconn = fr_minmax_heap_max_peek(trunk->active))) {
4531 /*
4532 * If the connection has no requests associated
4533 * with it then immediately free.
4534 */
4536 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4537 } else {
4539 }
4540 }
4541
4542 trunk->pub.last_closed = now;
4543
4544 return;
4545 }
4546}
4547
4548/** Event to periodically call the connection management function
4549 *
4550 * @param[in] tl this event belongs to.
4551 * @param[in] now current time.
4552 * @param[in] uctx The trunk.
4553 */
4554static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx)
4555{
4556 trunk_t *trunk = talloc_get_type_abort(uctx, trunk_t);
4557
4558 trunk_manage(trunk, now);
4559
4561 if (fr_timer_in(trunk, tl, &trunk->manage_ev, trunk->conf.manage_interval,
4562 false, _trunk_timer, trunk) < 0) {
4563 PERROR("Failed inserting trunk management event");
4564 /* Not much we can do, hopefully the trunk will be freed soon */
4565 }
4566 }
4567}
4568
4569/** Return a count of requests on a connection in a specific state
4570 *
4571 * @param[in] trunk to retrieve counts for.
4572 * @param[in] conn_state One or more connection states or'd together.
4573 * @param[in] req_state One or more request states or'd together.
4574 * @return The number of requests in a particular state, on connection in a particular state.
4575 */
4576uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
4577{
4578 uint64_t count = 0;
4579 trunk_connection_t *tconn = NULL;
4581
4582#define COUNT_BY_STATE(_state, _list) \
4583do { \
4584 if (conn_state & (_state)) { \
4585 tconn = NULL; \
4586 while ((tconn = fr_dlist_next(&trunk->_list, tconn))) { \
4587 count += trunk_request_count_by_connection(tconn, req_state); \
4588 } \
4589 } \
4590} while (0)
4591
4592 if (conn_state & TRUNK_CONN_ACTIVE) {
4593 for (tconn = fr_minmax_heap_iter_init(trunk->active, &iter);
4594 tconn;
4595 tconn = fr_minmax_heap_iter_next(trunk->active, &iter)) {
4596 count += trunk_request_count_by_connection(tconn, req_state);
4597 }
4598 }
4599
4602 COUNT_BY_STATE(TRUNK_CONN_INACTIVE_DRAINING, inactive_draining);
4605
4607
4608 return count;
4609}
4610
4611/** Update timestamps for when we last had a transition from above target to below target or vice versa
4612 *
4613 * Should be called on every time a connection or request is allocated or freed.
4614 *
4615 * @param[out] conn_count_out How many connections we considered.
4616 * @param[out] req_count_out How many requests we considered.
4617 * @param[in] trunk to operate on.
4618 * @param[in] now The current time.
4619 * @param[in] verify if true (and this is a debug build), then assert if req_per_conn
4620 * has changed.
4621 * @return
4622 * - 0 if the average couldn't be calculated (no requests or no connections).
4623 * - The average number of requests per connection.
4624 */
4625static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_count_out,
4626 trunk_t *trunk, fr_time_t now,
4627 NDEBUG_UNUSED bool verify)
4628{
4629 uint32_t req_count = 0;
4630 uint16_t conn_count = 0;
4631 uint64_t req_per_conn = 0;
4632
4634
4635 /*
4636 * No need to update these as the trunk is being freed
4637 */
4638 if (trunk->freeing) goto done;
4639
4640 /*
4641 * Count all connections except draining and draining to free.
4642 *
4643 * Omitting these connection states artificially raises the
4644 * request to connection ratio, so that we can preemptively spawn
4645 * new connections.
4646 *
4647 * In the case of TRUNK_CONN_DRAINING | TRUNK_CONN_INACTIVE_DRAINING
4648 * the trunk management code has enough hysteresis to not
4649 * immediately reactivate the connection.
4650 *
4651 * In the case of TRUNK_CONN_DRAINING_TO_FREE the trunk
4652 * management code should spawn a new connection to takes its place.
4653 *
4654 * Connections placed in the DRAINING_TO_FREE state are being
4655 * closed preemptively to deal with bugs on the server we're
4656 * talking to, or misconfigured firewalls which are trashing
4657 * TCP/UDP connection states.
4658 */
4663
4664 /*
4665 * Requests on all connections
4666 */
4667 req_count = trunk_request_count_by_state(trunk,
4670
4671 /*
4672 * No connections, but we do have requests
4673 */
4674 if (conn_count == 0) {
4675 if ((req_count > 0) && (trunk->conf.target_req_per_conn > 0)) goto above_target;
4676 goto done;
4677 }
4678
4679 if (req_count == 0) {
4680 if (trunk->conf.target_req_per_conn > 0) goto below_target;
4681 goto done;
4682 }
4683
4684 /*
4685 * Calculate the req_per_conn
4686 */
4687 req_per_conn = ROUND_UP_DIV(req_count, conn_count);
4688 if (req_per_conn > trunk->conf.target_req_per_conn) {
4689 above_target:
4690 /*
4691 * Edge - Below target to above target (too many requests per conn - spawn more)
4692 *
4693 * The equality check is correct here as both values start at 0.
4694 */
4696 } else if (req_per_conn < trunk->conf.target_req_per_conn) {
4697 below_target:
4698 /*
4699 * Edge - Above target to below target (too few requests per conn - close some)
4700 *
4701 * The equality check is correct here as both values start at 0.
4702 */
4704 }
4705
4706done:
4707 if (conn_count_out) *conn_count_out = conn_count;
4708 if (req_count_out) *req_count_out = req_count;
4709
4710 /*
4711 * Check we haven't missed a call to trunk_requests_per_connection
4712 */
4713 fr_assert(!verify || (trunk->last_req_per_conn == 0) || (req_per_conn == trunk->last_req_per_conn));
4714
4715 trunk->last_req_per_conn = req_per_conn;
4716
4717 return req_per_conn;
4718}
4719
4720/** Drain the backlog of as many requests as possible
4721 *
4722 * @param[in] trunk To drain backlog requests for.
4723 */
4724static void trunk_backlog_drain(trunk_t *trunk)
4725{
4726 trunk_request_t *treq;
4727
4728 if (fr_heap_num_elements(trunk->backlog) == 0) return;
4729
4730 /*
4731 * If it's always writable, this isn't
4732 * really a noteworthy event.
4733 */
4734 if (!trunk->conf.always_writable) DEBUG3("Draining backlog of requests");
4735
4736 /*
4737 * Do *NOT* add an artificial limit
4738 * here. We rely on all available
4739 * connections entering the full
4740 * state and transitioning back to
4741 * active in order to drain the
4742 * backlog.
4743 */
4744 while ((treq = fr_heap_peek(trunk->backlog))) {
4745 switch (trunk_request_enqueue_existing(treq)) {
4746 case TRUNK_ENQUEUE_OK:
4747 continue;
4748
4749 /*
4750 * Signal to stop
4751 */
4753 break;
4754
4755 /*
4756 * Failed enqueueing the request,
4757 * have it enter the failed state
4758 * which will free it and
4759 * re-enliven the yielded request.
4760 */
4762 case TRUNK_ENQUEUE_FAIL:
4764 continue;
4765
4768 return;
4769 }
4770 }
4771}
4772
4773/** Force the trunk to re-establish its connections
4774 *
4775 * @param[in] trunk to signal.
4776 * @param[in] states One or more states or'd together.
4777 * @param[in] reason Why the connections are being signalled to reconnect.
4778 */
4779void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
4780{
4781
4782#define RECONNECT_BY_STATE(_state, _list) \
4783do { \
4784 if (states & (_state)) { \
4785 size_t i; \
4786 for (i = fr_dlist_num_elements(&trunk->_list); i > 0; i--) { \
4787 connection_signal_reconnect(((trunk_connection_t *)fr_dlist_tail(&trunk->_list))->pub.conn, reason); \
4788 } \
4789 } \
4790} while (0)
4791
4792 /*
4793 * Connections in the 'connecting' state
4794 * may re-enter that state, so we need to
4795 * be careful not to enter an infinite
4796 * loop, as we iterate over the list
4797 * again and again.
4798 */
4800
4801 if (states & TRUNK_CONN_ACTIVE) {
4802 trunk_connection_t *tconn;
4803 while ((tconn = fr_minmax_heap_min_peek(trunk->active))) connection_signal_reconnect(tconn->pub.conn, reason);
4804 }
4805
4813}
4814
4815/** Start the trunk running
4816 *
4817 */
4819{
4820 uint16_t i;
4821
4822 if (unlikely(trunk->started)) return 0;
4823
4824 /*
4825 * Spawn the initial set of connections
4826 */
4827 for (i = 0; i < trunk->conf.start; i++) {
4828 DEBUG("[%i] Starting initial connection", i);
4829 if (trunk_connection_spawn(trunk, fr_time()) != 0) return -1;
4830 }
4831
4832 /*
4833 * If the idle timeout is set, AND there's no management interval, OR the management interval is
4834 * less than the idle timeout, update the management interval.
4835 */
4839 trunk->conf.manage_interval = trunk->conf.idle_timeout;
4840 }
4841
4843 /*
4844 * Insert the event timer to manage
4845 * the interval between managing connections.
4846 */
4847 if (fr_timer_in(trunk, trunk->el->tl, &trunk->manage_ev, trunk->conf.manage_interval,
4848 false, _trunk_timer, trunk) < 0) {
4849 PERROR("Failed inserting trunk management event");
4850 return -1;
4851 }
4852 }
4853 trunk->started = true;
4854 trunk->managing_connections = true;
4855
4856 return 0;
4857}
4858
4859/** Allow the trunk to open and close connections in response to load
4860 *
4861 */
4863{
4864 if (!trunk->started || trunk->managing_connections) return;
4865
4866 DEBUG3("Connection management enabled");
4867 trunk->managing_connections = true;
4868}
4869
4870/** Stop the trunk from opening and closing connections in response to load
4871 *
4872 */
4874{
4875 if (!trunk->started || !trunk->managing_connections) return;
4876
4877 DEBUG3("Connection management disabled");
4878 trunk->managing_connections = false;
4879}
4880
4881/** Schedule a trunk management event for the next time the event loop is executed
4882 */
4884{
4885 if (!trunk->started || !trunk->managing_connections) return 0;
4886
4887 if (fr_timer_in(trunk, trunk->el->tl, &trunk->manage_ev, fr_time_delta_wrap(0),
4888 false, _trunk_timer, trunk) < 0) {
4889 PERROR("Failed inserting trunk management event");
4890 return -1;
4891 }
4892
4893 return 0;
4894}
4895
4896/** Order connections by queue depth
4897 *
4898 */
4899static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
4900{
4903
4906
4907 /*
4908 * Add a fudge factor of 1 to reduce spurious rebalancing
4909 */
4910 return ((a_count > b_count) && ((a_count - b_count) > 1)) - ((b_count > a_count) && ((b_count - a_count) > 1));
4911}
4912
4913/** Free a trunk, gracefully closing all connections.
4914 *
4915 */
4916static int _trunk_free(trunk_t *trunk)
4917{
4918 trunk_connection_t *tconn;
4919 trunk_request_t *treq;
4920 trunk_watch_entry_t *watch;
4921 size_t i;
4922
4923 DEBUG4("Trunk free %p", trunk);
4924
4925 trunk->freeing = true; /* Prevent re-enqueuing */
4926
4927 /*
4928 * We really don't want this firing after
4929 * we've freed everything.
4930 */
4932
4933 /*
4934 * Now free the connections in each of the lists.
4935 *
4936 * Each time a connection is freed it removes itself from the list
4937 * its in, which means the head should keep advancing automatically.
4938 */
4939 while ((tconn = fr_minmax_heap_min_peek(trunk->active))) connection_signal_halt(tconn->pub.conn);
4940 while ((tconn = fr_dlist_head(&trunk->init))) connection_signal_halt(tconn->pub.conn);
4941 while ((tconn = fr_dlist_head(&trunk->connecting))) connection_signal_halt(tconn->pub.conn);
4942 while ((tconn = fr_dlist_head(&trunk->full))) connection_signal_halt(tconn->pub.conn);
4943 while ((tconn = fr_dlist_head(&trunk->inactive))) connection_signal_halt(tconn->pub.conn);
4944 while ((tconn = fr_dlist_head(&trunk->inactive_draining))) connection_signal_halt(tconn->pub.conn);
4945 while ((tconn = fr_dlist_head(&trunk->closed))) connection_signal_halt(tconn->pub.conn);
4946 while ((tconn = fr_dlist_head(&trunk->draining))) connection_signal_halt(tconn->pub.conn);
4947 while ((tconn = fr_dlist_head(&trunk->draining_to_free))) connection_signal_halt(tconn->pub.conn);
4948
4949 /*
4950 * Process any deferred connection frees
4951 */
4953
4954 /*
4955 * Free any requests left in the backlog
4956 */
4957 while ((treq = fr_heap_peek(trunk->backlog))) trunk_request_enter_failed(treq);
4958
4959 /*
4960 * Free any requests in our request cache
4961 */
4962 while ((treq = trunk_list_free_requests_peek(trunk))) talloc_free(treq);
4963
4964 /*
4965 * Free any entries in the watch lists
4966 */
4967 for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
4968 while ((watch = fr_dlist_pop_head(&trunk->watch[i]))) talloc_free(watch);
4969 }
4970
4971 return 0;
4972}
4973
4974/** Allocate a new collection of connections
4975 *
4976 * This function should be called first to allocate a new trunk connection.
4977 *
4978 * After the trunk has been allocated, #trunk_request_alloc and
4979 * #trunk_request_enqueue should be used to allocate memory for trunk
4980 * requests, and pass a preq (protocol request) to the trunk for
4981 * processing.
4982 *
4983 * The trunk will then asynchronously process the request, writing the result
4984 * to a specified rctx. See #trunk_request_enqueue for more details.
4985 *
4986 * @note Trunks may not be shared between multiple threads under any circumstances.
4987 *
4988 * @param[in] ctx To use for any memory allocations. Must be thread local.
4989 * @param[in] el to use for I/O and timer events.
4990 * @param[in] funcs Callback functions.
4991 * @param[in] conf Common user configurable parameters.
4992 * @param[in] log_prefix To prepend to global messages.
4993 * @param[in] uctx User data to pass to the alloc function.
4994 * @param[in] delay_start If true, then we will not spawn any connections
4995 * until the first request is enqueued.
4996 * @param[in] trigger_args Pairs to pass to trigger requests, if triggers are enabled.
4997 * @return
4998 * - New trunk handle on success.
4999 * - NULL on error.
5000 */
5002 trunk_io_funcs_t const *funcs, trunk_conf_t const *conf,
5003 char const *log_prefix, void const *uctx, bool delay_start, fr_pair_list_t *trigger_args)
5004{
5005 trunk_t *trunk;
5006 size_t i;
5007
5008 /*
5009 * Check we have the functions we need
5010 */
5011 if (!fr_cond_assert(funcs->connection_alloc)) return NULL;
5012
5013 MEM(trunk = talloc_zero(ctx, trunk_t));
5014 trunk->el = el;
5015 trunk->log_prefix = talloc_strdup(trunk, log_prefix);
5016 trunk->trigger_args = trigger_args;
5017
5018 memcpy(&trunk->funcs, funcs, sizeof(trunk->funcs));
5019 if (!trunk->funcs.connection_prioritise) {
5021 }
5023
5024 memcpy(&trunk->conf, conf, sizeof(trunk->conf));
5025
5026 memcpy(&trunk->uctx, &uctx, sizeof(trunk->uctx));
5027 talloc_set_destructor(trunk, _trunk_free);
5028
5029 /*
5030 * Free request list...
5031 */
5033
5034 /*
5035 * Request backlog queue
5036 */
5038 trunk_request_t, heap_id, 0));
5039
5040 /*
5041 * Connection queues and trees
5042 */
5044 trunk_connection_t, heap_id, 0));
5054
5055 /*
5056 * Watch lists
5057 */
5058 for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
5060 }
5061
5062 DEBUG4("Trunk allocated %p", trunk);
5063
5064 if (!delay_start) {
5065 if (trunk_start(trunk) < 0) {
5066 talloc_free(trunk);
5067 return NULL;
5068 }
5069 }
5070
5071 return trunk;
5072}
5073
5074/** Check for a module trigger section when parsing the `triggers` option.
5075 *
5076 */
5077int trunk_trigger_cf_parse(TALLOC_CTX *ctx, void *out, void *parent, CONF_ITEM *ci, conf_parser_t const *rule)
5078{
5081
5082 if (cf_pair_parse_value(ctx, out, parent, ci, rule)< 0) return -1;
5083
5084 /*
5085 * If the parent section of the `triggers` option contains a trigger
5086 * section then store it as the module CONF SECTION for the appropriate
5087 * trigger group.
5088 */
5089 if (cf_section_find(cs, "trigger", NULL)) {
5090 if (strcmp(cf_section_name(cs), "request") == 0) {
5091 conf->req_trigger_cs = cs;
5092 } else {
5093 conf->conn_trigger_cs = cs;
5094 }
5095 }
5096
5097 return 0;
5098}
5099
5100#ifndef TALLOC_GET_TYPE_ABORT_NOOP
5101/** Verify a trunk
5102 *
5103 * A trunk has some number of connections, which each have some number of requests. The connections and
5104 * requests are in differing kinds of containers depending on their state and how they are used, and may
5105 * have fields that can only be validated by comparison with a parent. We had planned on passing a "context"
5106 * down with the ancestral values, but that breaks the foo_verify() API. Each foo_verify() will only verify the
5107 * foo's children.
5108 */
5109void trunk_verify(char const *file, int line, trunk_t *trunk)
5110{
5111 fr_fatal_assert_msg(trunk, "CONSISTENCY CHECK FAILED %s[%i]: trunk_t pointer was NULL", file, line);
5112 (void) talloc_get_type_abort(trunk, trunk_t);
5113
5114 for (size_t i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
5115 _fr_dlist_verify(file, line, &trunk->watch[i]);
5116 }
5117
5118#define IO_FUNC_VERIFY(_func) \
5119 fr_fatal_assert_msg(trunk->funcs._func, "CONSISTENCY_CHECK_FAILED %s[%i}: " #_func " was NULL", file, line)
5120
5121 /*
5122 * Only a few of the function pointers *must* be non-NULL..
5123 */
5125 IO_FUNC_VERIFY(connection_prioritise);
5127
5128#define TRUNK_TCONN_CHECKS(_tconn, _state) \
5129do { \
5130 fr_fatal_assert_msg(trunk == _tconn->pub.trunk, \
5131 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-trunk mismatch", file, line); \
5132 fr_fatal_assert_msg(_state == _tconn->pub.state, \
5133 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-state mismatch", file, line); \
5134} while (0)
5135
5136#define TCONN_DLIST_VERIFY(_dlist, _state) \
5137do { \
5138 _fr_dlist_verify(file, line, &(trunk->_dlist)); \
5139 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5140 trunk_connection_verify(file, line, tconn); \
5141 TRUNK_TCONN_CHECKS(tconn, _state); \
5142 } \
5143} while (0)
5144
5145#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state) \
5146do {\
5147 fr_minmax_heap_verify(file, line, trunk->_heap); \
5148 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5149 trunk_connection_verify(file, line, tconn); \
5150 TRUNK_TCONN_CHECKS(tconn, _state); \
5151 }} \
5152} while (0)
5153
5154 fr_dlist_verify(&(trunk->free_requests));
5155 FR_HEAP_VERIFY(trunk->backlog);
5156
5163 /* TCONN_DLIST_VERIFY(failed, ???); */
5168}
5169
5171{
5172 fr_fatal_assert_msg(tconn, "CONSISTENCY CHECK FAILED %s[%i]: trunk_connection_t pointer was NULL", file, line);
5173 (void) talloc_get_type_abort(tconn, trunk_connection_t);
5174
5175 (void) talloc_get_type_abort(tconn->pub.trunk, trunk_t);
5176
5177 /*
5178 * shouldn't be both in heap and on list--but it doesn't look like moves
5179 * to active heap wipe the dlist pointers.
5180 */
5181
5182#define TCONN_TREQ_CHECKS(_treq, _state) \
5183do { \
5184 fr_fatal_assert_msg(tconn == _treq->pub.tconn, \
5185 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-tconn mismatch", file, line); \
5186 fr_fatal_assert_msg(tconn->pub.trunk == _treq->pub.trunk, \
5187 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-trunk mismatch", file, line); \
5188 fr_fatal_assert_msg(_state == _treq->pub.state, \
5189 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-state mismatch", file, line); \
5190} while (0)
5191
5192#define TREQ_DLIST_VERIFY(_dlist, _state) \
5193do { \
5194 _fr_dlist_verify(file, line, &(tconn->_dlist)); \
5195 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5196 trunk_request_verify(file, line, treq); \
5197 TCONN_TREQ_CHECKS(treq, _state); \
5198 } \
5199} while (0)
5200
5201#define TREQ_HEAP_VERIFY(_heap, _state) \
5202do { \
5203 fr_heap_iter_t _iter; \
5204 fr_heap_verify(file, line, tconn->_heap); \
5205 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5206 treq; \
5207 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5208 trunk_request_verify(file, line, treq); \
5209 TCONN_TREQ_CHECKS(treq, _state); \
5210 } \
5211} while (0)
5212
5213#define TREQ_OPTION_VERIFY(_option, _state) \
5214do { \
5215 if (tconn->_option) { \
5216 trunk_request_verify(file, line, tconn->_option); \
5217 TCONN_TREQ_CHECKS(tconn->_option, _state); \
5218 } \
5219} while (0)
5220
5221 /* verify associated requests */
5228}
5229
5230void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
5231{
5232 fr_fatal_assert_msg(treq, "CONSISTENCY CHECK FAILED %s[%i]: trunk_request_t pointer was NULL", file, line);
5233 (void) talloc_get_type_abort(treq, trunk_request_t);
5234
5235#ifdef WITH_VERIFY_PTR
5236 if (treq->pub.request) request_verify(file, line, treq->pub.request);
5237#endif
5238}
5239
5240
5241bool trunk_search(trunk_t *trunk, void *ptr)
5242{
5243#define TCONN_DLIST_SEARCH(_dlist) \
5244do { \
5245 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5246 if (ptr == tconn) { \
5247 fr_fprintf(stderr, "trunk_search: tconn %p on " #_dlist "\n", ptr); \
5248 return true; \
5249 } \
5250 if (trunk_connection_search(tconn, ptr)) { \
5251 fr_fprintf(stderr, " in tconn %p on " #_dlist "\n", tconn); \
5252 return true; \
5253 } \
5254 } \
5255} while (0)
5256
5257#define TCONN_MINMAX_HEAP_SEARCH(_heap) \
5258do { \
5259 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5260 if (ptr == tconn) { \
5261 fr_fprintf(stderr, "trunk_search: tconn %p on " #_heap "\n", ptr); \
5262 return true; \
5263 } \
5264 if (trunk_connection_search(tconn, ptr)) { \
5265 fr_fprintf(stderr, " on tconn %p on " #_heap "\n", tconn); \
5266 return true; \
5267 } \
5268 }}\
5269} while (0)
5270
5272 TCONN_DLIST_SEARCH(connecting);
5274 TCONN_DLIST_SEARCH(full);
5275 TCONN_DLIST_SEARCH(inactive);
5276 TCONN_DLIST_SEARCH(inactive_draining);
5277 TCONN_DLIST_SEARCH(failed);
5278 TCONN_DLIST_SEARCH(closed);
5279 TCONN_DLIST_SEARCH(draining);
5280 TCONN_DLIST_SEARCH(draining_to_free);
5281 TCONN_DLIST_SEARCH(to_free);
5282
5283 return false;
5284}
5285
5287{
5288#define TREQ_DLIST_SEARCH(_dlist) \
5289do { \
5290 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5291 if (ptr == treq) { \
5292 fr_fprintf(stderr, "trunk_search: treq %p on " #_dlist "\n", ptr); \
5293 return true; \
5294 } \
5295 if (trunk_request_search(treq, ptr)) { \
5296 fr_fprintf(stderr, "trunk_search: preq %p found on " #_dlist, ptr); \
5297 return true; \
5298 } \
5299 } \
5300} while (0)
5301
5302#define TREQ_HEAP_SEARCH(_heap) \
5303do { \
5304 fr_heap_iter_t _iter; \
5305 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5306 treq; \
5307 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5308 if (ptr == treq) { \
5309 fr_fprintf(stderr, "trunk_search: treq %p in " #_heap "\n", ptr); \
5310 return true; \
5311 } \
5312 if (trunk_request_search(treq, ptr)) { \
5313 fr_fprintf(stderr, "trunk_search: preq %p found in " #_heap, ptr); \
5314 return true; \
5315 } \
5316 } \
5317} while (0)
5318
5319#define TREQ_OPTION_SEARCH(_option) \
5320do { \
5321 if (tconn->_option) { \
5322 if (ptr == tconn->_option) { \
5323 fr_fprintf(stderr, "trunk_search: treq %p is " #_option "\n", ptr); \
5324 return true; \
5325 } \
5326 if (trunk_request_search(tconn->_option, ptr)) { \
5327 fr_fprintf(stderr, "trunk_search: preq %p found in " #_option, ptr); \
5328 return true; \
5329 } \
5330 } \
5331} while (0)
5332
5333 /* search associated requests */
5334 TREQ_HEAP_SEARCH(pending);
5335 TREQ_DLIST_SEARCH(sent);
5336 TREQ_DLIST_SEARCH(cancel);
5337 TREQ_DLIST_SEARCH(cancel_sent);
5338 TREQ_OPTION_SEARCH(partial);
5339 TREQ_OPTION_SEARCH(cancel_partial);
5340
5341 return false;
5342}
5343
5345{
5346 return treq->pub.preq == ptr;
5347}
5348#endif
int const char * file
Definition acutest.h:702
int const char int line
Definition acutest.h:702
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request)
static bool init
Definition fuzzer.c:40
#define L(_str)
Helper for initialising arrays of string literals.
Definition build.h:228
#define NDEBUG_UNUSED
Definition build.h:347
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition build.h:343
#define DIAG_ON(_x)
Definition build.h:487
#define unlikely(_x)
Definition build.h:407
#define UNUSED
Definition build.h:336
#define NUM_ELEMENTS(_t)
Definition build.h:358
#define DIAG_OFF(_x)
Definition build.h:486
int cf_pair_parse_value(TALLOC_CTX *ctx, void *out, UNUSED void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Parses a CONF_PAIR into a C data type.
Definition cf_parse.c:213
#define CONF_PARSER_TERMINATOR
Definition cf_parse.h:657
cf_parse_t func
Override default parsing behaviour for the specified type with a custom parsing function.
Definition cf_parse.h:611
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition cf_parse.h:280
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
Definition cf_parse.h:334
#define FR_CONF_OFFSET_SUBSECTION(_name, _flags, _struct, _field, _subcs)
conf_parser_t which populates a sub-struct using a CONF_SECTION
Definition cf_parse.h:309
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Definition cf_parse.h:423
Defines a CONF_PAIR to C data type mapping.
Definition cf_parse.h:594
Common header for all CONF_* types.
Definition cf_priv.h:49
Configuration AVP similar to a fr_pair_t.
Definition cf_priv.h:72
A section grouping multiple CONF_PAIR.
Definition cf_priv.h:101
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
Definition cf_util.c:1029
CONF_SECTION * cf_item_to_section(CONF_ITEM const *ci)
Cast a CONF_ITEM to a CONF_SECTION.
Definition cf_util.c:685
char const * cf_section_name(CONF_SECTION const *cs)
Return name2 if set, else name1.
Definition cf_util.c:1199
#define cf_parent(_cf)
Definition cf_util.h:98
connection_state_t
Definition connection.h:47
@ CONNECTION_STATE_FAILED
Connection has failed.
Definition connection.h:56
@ CONNECTION_STATE_HALTED
The connection is in a halted stat.
Definition connection.h:48
@ CONNECTION_STATE_CLOSED
Connection has been closed.
Definition connection.h:57
@ CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
Definition connection.h:54
@ CONNECTION_STATE_INIT
Init state, sets up connection.
Definition connection.h:51
@ CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
Definition connection.h:52
@ CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
Definition connection.h:55
connection_reason_t
Definition connection.h:84
static size_t min(size_t x, size_t y)
Definition dbuff.c:66
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:141
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:212
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:158
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:186
#define MEM(x)
Definition debug.h:46
#define DEBUG(fmt,...)
Definition dhcpclient.c:38
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:242
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition dlist.h:468
static void _fr_dlist_verify(char const *file, int line, fr_dlist_head_t const *list_head)
Check all items in the list are valid.
Definition dlist.h:717
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition dlist.h:620
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition dlist.h:145
static void fr_dlist_talloc_free(fr_dlist_head_t *head)
Free all items in a doubly linked list (with talloc)
Definition dlist.h:892
static void * fr_dlist_prev(fr_dlist_head_t const *list_head, void const *ptr)
Get the previous item in a list.
Definition dlist.h:570
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:921
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition dlist.h:654
static void * fr_dlist_tail(fr_dlist_head_t const *list_head)
Return the TAIL item of a list or NULL if the list is empty.
Definition dlist.h:513
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:360
#define fr_dlist_verify(_head)
Definition dlist.h:737
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:257
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
Definition dlist.h:320
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Definition dlist.h:537
Head of a doubly linked list.
Definition dlist.h:51
Entry in a doubly linked list.
Definition dlist.h:41
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
unsigned int fr_heap_index_t
Definition heap.h:80
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition heap.h:136
#define FR_HEAP_VERIFY(_heap)
Definition heap.h:212
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition heap.h:115
The main heap structure.
Definition heap.h:66
talloc_free(hp)
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition log.h:540
#define RDEBUG3(fmt,...)
Definition log.h:355
#define RWARN(fmt,...)
Definition log.h:309
#define DEBUG4(_fmt,...)
Definition log.h:267
#define RATE_LIMIT_LOCAL_ROPTIONAL(_entry, _l_request, _l_global, _fmt,...)
Rate limit messages using a local limiting entry.
Definition log.h:618
Track when a log message was last repeated.
Definition log.h:559
#define fr_time()
Definition event.c:60
Stores all information relating to an event list.
Definition event.c:377
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition log.c:577
fr_log_type_t
Definition log.h:51
#define ROUND_UP_DIV(_x, _y)
Get the ceiling value of integer division.
Definition math.h:211
unsigned short uint16_t
unsigned int uint32_t
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
void * fr_minmax_heap_iter_next(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Get the next entry in a minmax heap.
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
void * fr_minmax_heap_iter_init(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Iterate over entries in a minmax heap.
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
unsigned int fr_minmax_heap_iter_t
Definition minmax_heap.h:38
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
Definition minmax_heap.h:85
int8_t fr_pointer_cmp(void const *a, void const *b)
Compares two pointers.
Definition misc.c:449
static int8_t request_prioritise(void const *one, void const *two)
Definition bio.c:1151
#define fr_assert(_expr)
Definition rad_assert.h:37
#define RDEBUG(fmt,...)
#define DEBUG2(fmt,...)
#define WARN(fmt,...)
static bool done
Definition radclient.c:80
#define INFO(fmt,...)
Definition radict.c:63
static fr_event_list_t * events
Definition radsniff.c:58
static rs_t * conf
Definition radsniff.c:52
void connection_signal_shutdown(connection_t *conn)
Shuts down a connection gracefully.
int connection_del_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a post list.
Definition connection.c:483
void connection_signal_halt(connection_t *conn)
Shuts down a connection ungracefully.
void connection_signals_resume(connection_t *conn)
Resume processing of deferred signals.
Definition connection.c:330
void connection_signal_reconnect(connection_t *conn, connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
void connection_signal_init(connection_t *conn)
Asynchronously signal a halted connection to start.
connection_t * connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_funcs_t const *funcs, connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
connection_watch_entry_t * connection_add_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed before a state function has been called.
Definition connection.c:521
connection_watch_entry_t * connection_add_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed after a state function has been called.
Definition connection.c:543
int connection_del_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a pre list.
Definition connection.c:466
void connection_signals_pause(connection_t *conn)
Pause processing of deferred signals.
Definition connection.c:321
static fr_time_t test_time(void)
Definition slab_tests.c:43
static fr_time_t test_time_base
Definition slab_tests.c:42
return count
Definition module.c:155
@ memory_order_relaxed
Definition stdatomic.h:127
#define atomic_fetch_add_explicit(object, operand, order)
Definition stdatomic.h:302
#define ATOMIC_VAR_INIT(value)
Definition stdatomic.h:88
Definition log.h:93
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
An element in a table indexed by bit position.
Definition table.h:83
An element in an arbitrarily ordered array of name to num mappings.
Definition table.h:57
#define talloc_get_type_abort_const
Definition talloc.h:110
#define talloc_pooled_object(_ctx, _type, _num_subobjects, _total_subobjects_size)
Definition talloc.h:204
#define talloc_strdup(_ctx, _str)
Definition talloc.h:142
#define fr_time_gteq(_a, _b)
Definition time.h:238
#define fr_time_delta_wrap(_time)
Definition time.h:152
#define fr_time_wrap(_time)
Definition time.h:145
#define fr_time_lteq(_a, _b)
Definition time.h:240
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_gt(_a, _b)
Definition time.h:237
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
#define fr_time_lt(_a, _b)
Definition time.h:239
#define fr_time_delta_gt(_a, _b)
Definition time.h:283
"server local" time.
Definition time.h:69
An event timer list.
Definition timer.c:49
A timer event.
Definition timer.c:83
#define FR_TIMER_DELETE(_ev_p)
Definition timer.h:103
#define FR_TIMER_DELETE_RETURN(_ev_p)
Definition timer.h:110
#define fr_timer_in(...)
Definition timer.h:87
#define FR_TIMER_DISARM(_ev)
Definition timer.h:91
bool trunk_search(trunk_t *trunk, void *ptr)
Definition trunk.c:5241
static atomic_uint_fast64_t request_counter
Definition trunk.c:54
CONF_PAIR * trigger_cp[NUM_ELEMENTS(trunk_conn_trigger_names)]
Cached trigger CONF_PAIRs.
Definition trunk.c:319
static void trunk_connection_enter_active(trunk_connection_t *tconn)
Transition a connection back to the active state.
Definition trunk.c:3301
#define CONN_REORDER(_tconn)
Reorder the connections in the active heap.
Definition trunk.c:792
static size_t trunk_req_trigger_names_len
Definition trunk.c:388
int trunk_connection_pop_cancellation(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a cancellation request off a connection's cancellation queue.
Definition trunk.c:3888
fr_dlist_head_t cancel
Requests in the cancel state.
Definition trunk.c:161
int trunk_connection_manage_schedule(trunk_t *trunk)
Schedule a trunk management event for the next time the event loop is executed.
Definition trunk.c:4883
#define REQUEST_EXTRACT_SENT(_treq)
Remove the current request from the sent list.
Definition trunk.c:762
static void _trunk_connection_on_shutdown(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the shutdown state.
Definition trunk.c:3451
struct trunk_watch_entry_s trunk_watch_entry_t
An entry in a trunk watch function list.
fr_dlist_head_t reapable
Idle request.
Definition trunk.c:159
fr_heap_t * pending
Requests waiting to be sent.
Definition trunk.c:153
trunk_conf_t conf
Trunk common configuration.
Definition trunk.c:224
static size_t trunk_connection_states_len
Definition trunk.c:427
#define REQUEST_EXTRACT_REAPABLE(_treq)
Remove the current request from the reapable list.
Definition trunk.c:767
trunk_connection_t * tconn
The request was associated with.
Definition trunk.c:82
void trunk_connection_callback_readable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O read function.
Definition trunk.c:4067
fr_rate_limit_t limit_last_failure_log
Rate limit on "Refusing to enqueue requests - No active conns".
Definition trunk.c:298
void trunk_verify(char const *file, int line, trunk_t *trunk)
Verify a trunk.
Definition trunk.c:5109
fr_timer_t * manage_ev
Periodic connection management event.
Definition trunk.c:290
#define IN_HANDLER(_trunk)
Definition trunk.c:722
static fr_table_num_ordered_t const trunk_connection_states[]
Definition trunk.c:415
void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
Force the trunk to re-establish its connections.
Definition trunk.c:4779
void trunk_connection_callback_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O write function.
Definition trunk.c:4084
void * uctx
User data to pass to the function.
Definition trunk.c:191
static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new)
Transition a request to the pending state, adding it to the backlog of an active connection.
Definition trunk.c:1198
static void trunk_request_remove_from_conn(trunk_request_t *treq)
Remove a request from all connection lists.
Definition trunk.c:1007
fr_rate_limit_t limit_max_requests_alloc_log
Rate limit on "Refusing to alloc requests - Limit of * requests reached".
Definition trunk.c:296
trunk_request_state_t to
What state we transitioned to.
Definition trunk.c:80
static int8_t _trunk_request_prioritise(void const *a, void const *b)
Compare two protocol requests.
Definition trunk.c:983
static void trunk_manage(trunk_t *trunk, fr_time_t now)
Implements the algorithm we use to manage requests per connection levels.
Definition trunk.c:4216
static int _trunk_connection_free(trunk_connection_t *tconn)
Free a connection.
Definition trunk.c:3733
trunk_io_funcs_t funcs
I/O functions.
Definition trunk.c:276
fr_dlist_head_t draining
Connections that will be freed once all their requests are complete, but can be reactivated.
Definition trunk.c:261
#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq)
Remove the current request from the cancel_partial slot.
Definition trunk.c:777
int trunk_trigger_cf_parse(TALLOC_CTX *ctx, void *out, void *parent, CONF_ITEM *ci, conf_parser_t const *rule)
Check for a module trigger section when parsing the triggers option.
Definition trunk.c:5077
int trunk_start(trunk_t *trunk)
Start the trunk running.
Definition trunk.c:4818
void trunk_request_signal_partial(trunk_request_t *treq)
Signal a partial write.
Definition trunk.c:2075
void trunk_request_signal_fail(trunk_request_t *treq)
Signal that a trunk request failed.
Definition trunk.c:2178
#define TREQ_OPTION_SEARCH(_option)
void trunk_request_signal_cancel_sent(trunk_request_t *treq)
Signal that a remote server has been notified of the cancellation.
Definition trunk.c:2306
static void trunk_connection_enter_draining_to_free(trunk_connection_t *tconn)
Transition a connection to the draining-to-reconnect state.
Definition trunk.c:3265
trunk_watch_t func
Function to call when a trunk enters.
Definition trunk.c:187
void trunk_connection_signal_readable(trunk_connection_t *tconn)
Signal that a trunk connection is readable.
Definition trunk.c:3974
#define DO_REQUEST_FREE(_treq)
Call the free callback (if set)
Definition trunk.c:614
trunk_request_t * trunk_request_alloc(trunk_t *trunk, request_t *request)
(Pre-)Allocate a new trunk request
Definition trunk.c:2524
static void _trunk_connection_on_halted(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the halted state.
Definition trunk.c:3685
#define REQUEST_EXTRACT_BACKLOG(_treq)
Remove the current request from the backlog.
Definition trunk.c:733
fr_heap_index_t heap_id
Used to track the connection in the connected heap.
Definition trunk.c:138
fr_dlist_head_t closed
Connections that have closed.
Definition trunk.c:258
fr_dlist_head_t watch[TRUNK_STATE_MAX]
To be called when trunk changes state.
Definition trunk.c:282
static void trunk_watch_call(trunk_t *trunk, fr_dlist_head_t *list, trunk_state_t state)
Call a list of watch functions associated with a state.
Definition trunk.c:833
static void trunk_request_enter_cancel_complete(trunk_request_t *treq)
Cancellation was acked, the request is complete, free it.
Definition trunk.c:1550
int line
Line change occurred on.
Definition trunk.c:92
static void trunk_connection_enter_inactive_draining(trunk_connection_t *tconn)
Transition a connection to the inactive-draining state.
Definition trunk.c:3203
#define CONN_STATE_TRANSITION(_new, _log)
Definition trunk.c:457
static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out, trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify)
Update timestamps for when we last had a transition from above target to below target or vice versa.
Definition trunk.c:4625
static size_t trunk_connection_events_len
Definition trunk.c:443
static void _trunk_connection_on_failed(connection_t *conn, connection_state_t prev, connection_state_t state, void *uctx)
Connection failed.
Definition trunk.c:3633
bool oneshot
Remove the function after it's called once.
Definition trunk.c:189
bool started
Has the trunk been started.
Definition trunk.c:307
static size_t trunk_states_len
Definition trunk.c:413
#define TCONN_DLIST_VERIFY(_dlist, _state)
#define IO_FUNC_VERIFY(_func)
uint32_t trunk_request_count_by_connection(trunk_connection_t const *tconn, int req_state)
Return the count number of requests associated with a trunk connection.
Definition trunk.c:2932
uint64_t last_req_per_conn
The last request to connection ratio we calculated.
Definition trunk.c:312
#define DO_REQUEST_COMPLETE(_treq)
Call the complete callback (if set)
Definition trunk.c:575
static void trunk_connection_auto_full(trunk_connection_t *tconn)
Automatically mark a connection as full.
Definition trunk.c:2951
static void trunk_connection_remove(trunk_connection_t *tconn)
Remove a trunk connection from whichever list it's currently in.
Definition trunk.c:3102
#define TRUNK_REQUEST_STATE_LOG_MAX
The maximum number of state logs to record per request.
Definition trunk.c:71
static void trunk_connection_writable(trunk_connection_t *tconn)
A connection is writable.
Definition trunk.c:3015
#define OVER_MAX_CHECK
trunk_connection_event_t events
The current events we expect to be notified on.
Definition trunk.c:147
trunk_watch_entry_t * trunk_add_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch, bool oneshot, void const *uctx)
Add a watch entry to the trunk state list.
Definition trunk.c:909
static int _trunk_free(trunk_t *trunk)
Free a trunk, gracefully closing all connections.
Definition trunk.c:4916
fr_dlist_head_t failed
Connections that'll be reconnected shortly.
Definition trunk.c:256
static void trunk_rebalance(trunk_t *trunk)
Rebalance connections across active trunk members when a new connection becomes active.
Definition trunk.c:4152
static void trunk_backlog_drain(trunk_t *trunk)
Drain the backlog of as many requests as possible.
Definition trunk.c:4724
#define DO_REQUEST_CANCEL(_treq, _reason)
Call the cancel callback if set.
Definition trunk.c:536
#define FR_TRUNK_LIST_FUNC(_list, _type)
Definition trunk.c:804
static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
Order connections by queue depth.
Definition trunk.c:4899
struct trunk_request_pub_s pub
Public fields in the trunk request.
Definition trunk.c:100
#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state)
trunk_request_t * cancel_partial
Partially written cancellation request.
Definition trunk.c:163
#define TCONN_MINMAX_HEAP_SEARCH(_heap)
uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Move requests off of a connection and requeue elsewhere.
Definition trunk.c:2056
bool enabled
Whether the watch entry is enabled.
Definition trunk.c:190
fr_time_t last_freed
Last time this request was freed.
Definition trunk.c:113
#define DO_REQUEST_CONN_RELEASE(_treq)
Call the "conn_release" callback (if set)
Definition trunk.c:557
#define TREQ_DLIST_SEARCH(_dlist)
#define REQUEST_EXTRACT_CANCEL(_treq)
Remove the current request from the cancel list.
Definition trunk.c:772
static bool trunk_connection_is_full(trunk_connection_t *tconn)
Return whether a trunk connection should currently be considered full.
Definition trunk.c:2974
struct trunk_pub_s pub
Public fields in the trunk connection.
Definition trunk.c:216
trunk_cancel_reason_t cancel_reason
Why this request was cancelled.
Definition trunk.c:111
#define REQUEST_BAD_STATE_TRANSITION(_new)
Definition trunk.c:502
trunk_enqueue_t trunk_request_enqueue_on_conn(trunk_request_t **treq_out, trunk_connection_t *tconn, request_t *request, void *preq, void *rctx, bool ignore_limits)
Enqueue additional requests on a specific connection.
Definition trunk.c:2793
static void _trunk_connection_on_closed(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection failed after it was connected.
Definition trunk.c:3562
static fr_table_num_ordered_t const trunk_connection_events[]
Definition trunk.c:437
trunk_enqueue_t trunk_request_enqueue(trunk_request_t **treq_out, trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
Definition trunk.c:2639
#define TCONN_DLIST_SEARCH(_dlist)
static void trunk_request_enter_unassigned(trunk_request_t *treq)
Transition a request to the unassigned state, in preparation for re-assignment.
Definition trunk.c:1101
struct trunk_request_s trunk_request_t
Definition trunk.c:33
void * in_handler
Which handler we're inside.
Definition trunk.c:278
bool freeing
Trunk is being freed, don't spawn new connections or re-enqueue.
Definition trunk.c:304
static fr_table_num_ordered_t const trunk_states[]
Definition trunk.c:408
static void trunk_connection_readable(trunk_connection_t *tconn)
A connection is readable.
Definition trunk.c:3005
#define IS_SERVICEABLE(_tconn)
Definition trunk.c:727
trunk_enqueue_t trunk_request_requeue(trunk_request_t *treq)
Re-enqueue a request on the same connection.
Definition trunk.c:2728
#define IS_PROCESSING(_tconn)
Definition trunk.c:728
#define RECONNECT_BY_STATE(_state, _list)
static void trunk_connection_enter_draining(trunk_connection_t *tconn)
Transition a connection to the draining state.
Definition trunk.c:3233
static fr_table_num_indexed_bit_pos_t const trunk_req_trigger_names[]
Map request states to trigger names.
Definition trunk.c:373
fr_dlist_t entry
Used to track the trunk request in the conn->sent or trunk->backlog request.
Definition trunk.c:108
static void trunk_connection_close_if_empty(trunk_t *trunk, fr_dlist_head_t *head)
Close connections in a particular connection list if they have no requests associated with them.
Definition trunk.c:4110
void trunk_request_signal_cancel_complete(trunk_request_t *treq)
Signal that a remote server acked our cancellation.
Definition trunk.c:2330
static trunk_enqueue_t trunk_request_check_enqueue(trunk_connection_t **tconn_out, trunk_t *trunk, request_t *request)
Check to see if a trunk request can be enqueued.
Definition trunk.c:1648
#define DO_REQUEST_MUX(_tconn)
Write one or more requests to a connection.
Definition trunk.c:632
#define REQUEST_EXTRACT_PARTIAL(_treq)
Remove the current request from the partial slot.
Definition trunk.c:753
fr_dlist_head_t sent
Sent request.
Definition trunk.c:157
static void trunk_request_enter_partial(trunk_request_t *treq)
Transition a request to the partial state, indicating that is has been partially sent.
Definition trunk.c:1269
fr_timer_t * lifetime_ev
Maximum time this connection can be open.
Definition trunk.c:178
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
Definition trunk.c:3936
fr_dlist_head_t connecting
Connections which are not yet in the open state.
Definition trunk.c:242
#define TRUNK_STATE_TRANSITION(_new)
Definition trunk.c:929
void trunk_request_signal_cancel(trunk_request_t *treq)
Cancel a trunk request.
Definition trunk.c:2198
void trunk_request_state_log_entry_add(char const *function, int line, trunk_request_t *treq, trunk_request_state_t new)
Definition trunk.c:2850
static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now)
Attempt to spawn a new connection.
Definition trunk.c:3801
int trunk_del_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch)
Remove a watch function from a trunk state list.
Definition trunk.c:875
static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx)
Event to periodically call the connection management function.
Definition trunk.c:4554
struct trunk_connection_pub_s pub
Public fields in the trunk connection.
Definition trunk.c:134
static void trunk_request_enter_reapable(trunk_request_t *treq)
Transition a request to the reapable state, indicating that it's been sent in its entirety,...
Definition trunk.c:1356
uint16_t trunk_connection_count_by_state(trunk_t *trunk, int conn_state)
Return the count number of connections in the specified states.
Definition trunk.c:2908
#define IN_REQUEST_DEMUX(_trunk)
Definition trunk.c:724
#define DO_REQUEST_FAIL(_treq, _prev_state)
Call the fail callback (if set)
Definition trunk.c:594
static void trunk_request_enter_cancel(trunk_request_t *treq, trunk_cancel_reason_t reason)
Transition a request to the cancel state, placing it in a connection's cancellation list.
Definition trunk.c:1419
static trunk_enqueue_t trunk_request_enqueue_existing(trunk_request_t *treq)
Enqueue a request which has never been assigned to a connection or was previously cancelled.
Definition trunk.c:1726
bool managing_connections
Whether the trunk is allowed to manage (open/close) connections.
Definition trunk.c:309
#define DO_CONNECTION_ALLOC(_tconn)
Allocate a new connection.
Definition trunk.c:683
char const * function
State change occurred in.
Definition trunk.c:91
static size_t trunk_request_states_len
Definition trunk.c:406
fr_dlist_head_t init
Connections which have not yet started connecting.
Definition trunk.c:239
fr_dlist_head_t * log_head
To allow the log entry to remove itself on free.
Definition trunk.c:77
static void trunk_request_enter_cancel_partial(trunk_request_t *treq)
Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot.
Definition trunk.c:1470
static void _trunk_connection_on_connected(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connected state.
Definition trunk.c:3503
trunk_t * trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, trunk_io_funcs_t const *funcs, trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start, fr_pair_list_t *trigger_args)
Allocate a new collection of connections.
Definition trunk.c:5001
fr_dlist_head_t to_free
Connections we're done with and will free on the next call to trunk_manage.
Definition trunk.c:267
trunk_request_t * partial
Partially written request.
Definition trunk.c:155
static void trunk_request_enter_failed(trunk_request_t *treq)
Request failed, inform the API client and free the request.
Definition trunk.c:1612
fr_minmax_heap_t * active
Connections which can service requests.
Definition trunk.c:244
conf_parser_t const trunk_config[]
Config parser definitions to populate a trunk_conf_t.
Definition trunk.c:341
static void trunk_request_enter_complete(trunk_request_t *treq)
Request completed successfully, inform the API client and free the request.
Definition trunk.c:1581
static void trunk_request_enter_sent(trunk_request_t *treq)
Transition a request to the sent state, indicating that it's been sent in its entirety.
Definition trunk.c:1299
#define DO_REQUEST_CANCEL_MUX(_tconn)
Write one or more cancellation requests to a connection.
Definition trunk.c:665
static void trunk_connection_enter_full(trunk_connection_t *tconn)
Transition a connection to the full state.
Definition trunk.c:3158
void trunk_request_free(trunk_request_t **treq_to_free)
If the trunk request is freed then update the target requests.
Definition trunk.c:2368
#define DO_REQUEST_DEMUX(_tconn)
Read one or more requests from a connection.
Definition trunk.c:649
static uint64_t trunk_connection_requests_dequeue(fr_dlist_head_t *out, trunk_connection_t *tconn, int states, uint64_t max)
Shift requests in the specified states onto new connections.
Definition trunk.c:1783
static int _trunk_request_free(trunk_request_t *treq)
Actually free the trunk request.
Definition trunk.c:2491
char const * log_prefix
What to prepend to messages.
Definition trunk.c:220
#define REQUEST_EXTRACT_PENDING(_treq)
Remove the current request from the pending list.
Definition trunk.c:743
static void _trunk_connection_lifetime_expire(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t now, void *uctx)
Trigger a reconnection of the trunk connection.
Definition trunk.c:3485
static void trunk_connection_event_update(trunk_connection_t *tconn)
Update the registrations for I/O events we're interested in.
Definition trunk.c:3038
static conf_parser_t const trunk_config_request[]
Definition trunk.c:324
fr_dlist_head_t full
Connections which have too many outstanding requests.
Definition trunk.c:246
#define DEQUEUE_ALL(_src_list, _state)
static void trunk_request_enter_backlog(trunk_request_t *treq, bool new)
Transition a request to the backlog state, adding it to the backlog of the trunk.
Definition trunk.c:1136
static fr_table_num_ordered_t const trunk_request_states[]
Definition trunk.c:391
static void _trunk_connection_on_connecting(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connecting state.
Definition trunk.c:3407
static fr_table_num_indexed_bit_pos_t const trunk_conn_trigger_names[]
Map connection states to trigger names.
Definition trunk.c:198
fr_dlist_head_t draining_to_free
Connections that will be freed once all their requests are complete.
Definition trunk.c:264
uint64_t id
Trunk request ID.
Definition trunk.c:104
uint64_t sent_count
The number of requests that have been sent using this connection.
Definition trunk.c:171
static void _trunk_connection_on_init(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the init state.
Definition trunk.c:3372
#define DO_CONNECTION_NOTIFY(_tconn, _events)
Change what events the connection should be notified about.
Definition trunk.c:705
#define TREQ_DLIST_VERIFY(_dlist, _state)
fr_dlist_head_t inactive
Connections which have been signalled to be inactive by the API client.
Definition trunk.c:249
bool trigger_undef[NUM_ELEMENTS(trunk_conn_trigger_names)]
Record that a specific trigger is undefined.
Definition trunk.c:317
void trunk_connection_manage_stop(trunk_t *trunk)
Stop the trunk from opening and closing connections in response to load.
Definition trunk.c:4873
#define TREQ_HEAP_VERIFY(_heap, _state)
void trunk_connection_signal_active(trunk_connection_t *tconn)
Signal a trunk connection is no longer full.
Definition trunk.c:4013
fr_dlist_head_t log
State change log.
Definition trunk.c:123
uint64_t tconn_id
If the treq was associated with a connection the connection ID.
Definition trunk.c:85
fr_dlist_t entry
Used to track the connection in the connecting, full and failed lists.
Definition trunk.c:141
static void trunk_request_enter_cancel_sent(trunk_request_t *treq)
Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list.
Definition trunk.c:1505
static void trunk_connection_enter_inactive(trunk_connection_t *tconn)
Transition a connection to the inactive state.
Definition trunk.c:3180
trunk_request_state_t from
What state we transitioned from.
Definition trunk.c:79
fr_pair_list_t * trigger_args
Passed to trigger.
Definition trunk.c:315
fr_dlist_head_t cancel_sent
Sent cancellation request.
Definition trunk.c:165
void trunk_connection_manage_start(trunk_t *trunk)
Allow the trunk to open and close connections in response to load.
Definition trunk.c:4862
fr_dlist_head_t inactive_draining
Connections which have been signalled to be inactive by the API client, which the trunk manager is dr...
Definition trunk.c:252
void trunk_connection_signal_inactive(trunk_connection_t *tconn)
Signal a trunk connection cannot accept more requests.
Definition trunk.c:3990
static int _state_log_entry_free(trunk_request_state_log_t *slog)
Used for sanity checks to ensure all log entries have been freed.
Definition trunk.c:2843
void trunk_connection_verify(char const *file, int line, trunk_connection_t *tconn)
Definition trunk.c:5170
fr_heap_t * backlog
The request backlog.
Definition trunk.c:229
#define IN_REQUEST_CANCEL_MUX(_trunk)
Definition trunk.c:725
void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
Definition trunk.c:5230
uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
Return a count of requests on a connection in a specific state.
Definition trunk.c:4576
void trunk_request_signal_cancel_partial(trunk_request_t *treq)
Signal a partial cancel write.
Definition trunk.c:2282
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
Definition trunk.c:2096
#define COUNT_BY_STATE(_state, _list)
void * uctx
Uctx data to pass to alloc.
Definition trunk.c:280
#define TREQ_OPTION_VERIFY(_option, _state)
bool trunk_connection_search(trunk_connection_t *tconn, void *ptr)
Definition trunk.c:5286
#define CONN_BAD_STATE_TRANSITION(_new)
Definition trunk.c:468
fr_heap_index_t heap_id
Used to track the request conn->pending heap.
Definition trunk.c:106
#define REQUEST_STATE_TRANSITION(_new)
Record a request state transition and log appropriate output.
Definition trunk.c:491
trunk_watch_entry_t * next_watcher
Watcher about to be run. Used to prevent nested watchers.
Definition trunk.c:284
static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Remove requests in specified states from a connection, attempting to distribute them to new connectio...
Definition trunk.c:1894
bool sent
Trunk request has been sent at least once.
Definition trunk.c:118
void trunk_request_signal_complete(trunk_request_t *treq)
Signal that a trunk request is complete.
Definition trunk.c:2140
static void trunk_connection_auto_unfull(trunk_connection_t *tconn)
Automatically mark a connection as active or reconnect it.
Definition trunk.c:2992
void trunk_connection_signal_reconnect(trunk_connection_t *tconn, connection_reason_t reason)
Signal a trunk connection is no longer viable.
Definition trunk.c:4052
void trunk_connection_signal_writable(trunk_connection_t *tconn)
Signal that a trunk connection is writable.
Definition trunk.c:3956
bool trunk_request_search(trunk_request_t *treq, void *ptr)
Definition trunk.c:5344
fr_dlist_t entry
List entry.
Definition trunk.c:186
static conf_parser_t const trunk_config_connection[]
Definition trunk.c:333
trunk_connection_state_t tconn_state
If the treq was associated with a connection the connection state at the time of the state transition...
Definition trunk.c:87
bool bound_to_conn
Fail the request if there's an attempt to re-enqueue it.
Definition trunk.c:115
static size_t trunk_cancellation_reasons_len
Definition trunk.c:435
static fr_table_num_ordered_t const trunk_cancellation_reasons[]
Definition trunk.c:429
static size_t trunk_conn_trigger_names_len
Definition trunk.c:210
fr_event_list_t * el
Event list used by this trunk and the connection.
Definition trunk.c:222
void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line, trunk_request_t const *treq)
Definition trunk.c:2881
#define IN_REQUEST_MUX(_trunk)
Definition trunk.c:723
fr_dlist_head_t free_requests
Requests in the unassigned state.
Definition trunk.c:226
bool trunk_connection_in_state(trunk_connection_t *tconn, int state)
Returns true if the trunk connection is in one of the specified states.
Definition trunk.c:4100
#define TREQ_HEAP_SEARCH(_heap)
#define REQUEST_EXTRACT_CANCEL_SENT(_treq)
Remove the current request from the cancel sent list.
Definition trunk.c:786
fr_dlist_t entry
Entry in the linked list.
Definition trunk.c:78
void trunk_request_signal_reapable(trunk_request_t *treq)
Signal that the request was written to a connection successfully, but no response is expected.
Definition trunk.c:2118
Associates request queues with a connection.
Definition trunk.c:133
Wraps a normal request.
Definition trunk.c:99
Trace state machine changes for a particular request.
Definition trunk.c:76
Main trunk management handle.
Definition trunk.c:215
An entry in a trunk watch function list.
Definition trunk.c:185
uint16_t max
Maximum number of connections in the trunk.
Definition trunk.h:232
uint32_t max_req_per_conn
Maximum requests per connection.
Definition trunk.h:241
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
Definition trunk.h:321
trunk_t *_CONST trunk
Trunk this request belongs to.
Definition trunk.h:352
bool backlog_on_failed_conn
Assign requests to the backlog when there are no available connections and the last connection event ...
Definition trunk.h:282
uint16_t min
Shouldn't let connections drop below this number.
Definition trunk.h:230
#define TRUNK_REQUEST_STATE_ALL
All request states.
Definition trunk.h:196
void *_CONST rctx
Resume ctx of the module.
Definition trunk.h:358
trunk_t *_CONST trunk
Trunk this connection belongs to.
Definition trunk.h:380
fr_heap_cmp_t connection_prioritise
Ordering function for connections.
Definition trunk.h:742
trunk_connection_state_t
Used for sanity checks and to track which list the connection is in.
Definition trunk.h:87
@ TRUNK_CONN_FULL
Connection is full and can't accept any more requests.
Definition trunk.h:95
@ TRUNK_CONN_CONNECTING
Connection is connecting.
Definition trunk.h:90
@ TRUNK_CONN_DRAINING
Connection will be closed once it has no more outstanding requests, if it's not reactivated.
Definition trunk.h:101
@ TRUNK_CONN_INACTIVE_DRAINING
Connection is inactive, can't accept any more requests, and will be closed once it has no more outsta...
Definition trunk.h:97
@ TRUNK_CONN_INACTIVE
Connection is inactive and can't accept any more requests.
Definition trunk.h:96
@ TRUNK_CONN_HALTED
Halted, ready to be freed.
Definition trunk.h:88
@ TRUNK_CONN_CLOSED
Connection was closed, either explicitly or due to failure.
Definition trunk.h:94
@ TRUNK_CONN_INIT
In the initial state.
Definition trunk.h:89
@ TRUNK_CONN_DRAINING_TO_FREE
Connection will be closed once it has no more outstanding requests.
Definition trunk.h:103
@ TRUNK_CONN_ACTIVE
Connection is connected and ready to service requests.
Definition trunk.h:91
unsigned req_pool_headers
How many chunk headers the talloc pool allocated with the treq should contain.
Definition trunk.h:267
request_t *_CONST request
The request that we're writing the data on behalf of.
Definition trunk.h:360
fr_time_t _CONST last_open
Last time the connection management function opened a connection.
Definition trunk.h:311
fr_time_delta_t idle_timeout
how long a connection can remain idle for
Definition trunk.h:251
trunk_connection_state_t _CONST state
What state the connection is in.
Definition trunk.h:372
size_t req_pool_size
The size of the talloc pool allocated with the treq.
Definition trunk.h:270
uint64_t max_uses
The maximum time a connection can be used.
Definition trunk.h:247
fr_time_delta_t lifetime
Time between reconnects.
Definition trunk.h:249
uint16_t connecting
Maximum number of connections that can be in the connecting state.
Definition trunk.h:234
uint64_t _CONST req_alloc_reused
How many requests were reused.
Definition trunk.h:335
uint32_t max_backlog
Maximum number of requests that can be in the backlog.
Definition trunk.h:245
fr_time_t _CONST last_failed
Last time a connection failed.
Definition trunk.h:319
trunk_request_state_t _CONST state
Which list the request is now located in.
Definition trunk.h:350
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
Definition trunk.h:376
trunk_connection_t *_CONST tconn
Connection this request belongs to.
Definition trunk.h:354
trunk_connection_alloc_t connection_alloc
Allocate a new connection_t.
Definition trunk.h:738
fr_time_t _CONST last_read_success
Last time we read a response.
Definition trunk.h:323
fr_time_t _CONST last_below_target
Last time average utilisation went below the target value.
Definition trunk.h:308
fr_time_t _CONST last_read_success
Last time we read from the connection.
Definition trunk.h:378
fr_time_delta_t close_delay
How long we must be below target utilisation to close an existing connection.
Definition trunk.h:256
uint16_t start
How many connections to start.
Definition trunk.h:228
fr_time_delta_t req_cleanup_delay
How long must a request in the unassigned (free) list not have been used for before it's cleaned up a...
Definition trunk.h:260
#define TRUNK_REQUEST_STATE_CANCEL_ALL
All requests in various cancellation states.
Definition trunk.h:214
bool always_writable
Set to true if our ability to write requests to a connection handle is not dependent on the state of ...
Definition trunk.h:272
trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
Definition trunk.h:72
@ TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
Definition trunk.h:79
@ TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
Definition trunk.h:77
@ TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
Definition trunk.h:73
@ TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
Definition trunk.h:75
#define TRUNK_CONN_ALL
All connection states.
Definition trunk.h:111
fr_heap_cmp_t request_prioritise
Ordering function for requests.
Definition trunk.h:744
uint64_t _CONST req_alloc
The number of requests currently allocated that have not been freed or returned to the free list.
Definition trunk.h:329
trunk_cancel_reason_t
Reasons for a request being cancelled.
Definition trunk.h:55
@ TRUNK_CANCEL_REASON_NONE
Request has not been cancelled.
Definition trunk.h:56
@ TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
Definition trunk.h:57
@ TRUNK_CANCEL_REASON_REQUEUE
A previously sent request is being requeued.
Definition trunk.h:59
@ TRUNK_CANCEL_REASON_MOVE
Request cancelled because it's being moved.
Definition trunk.h:58
uint64_t _CONST req_alloc_new
How many requests we've allocated.
Definition trunk.h:333
fr_time_delta_t open_delay
How long we must be above target utilisation to spawn a new connection.
Definition trunk.h:253
connection_t *_CONST conn
The underlying connection.
Definition trunk.h:374
trunk_state_t
Definition trunk.h:62
@ TRUNK_STATE_MAX
Definition trunk.h:66
@ TRUNK_STATE_PENDING
Trunk has connections, but none are active.
Definition trunk.h:65
@ TRUNK_STATE_ACTIVE
Trunk has active connections.
Definition trunk.h:64
@ TRUNK_STATE_IDLE
Trunk has no connections.
Definition trunk.h:63
fr_time_t _CONST last_closed
Last time the connection management function closed a connection.
Definition trunk.h:314
void(* trunk_watch_t)(trunk_t *trunk, trunk_state_t prev, trunk_state_t state, void *uctx)
Receive a notification when a trunk enters a particular state.
Definition trunk.h:729
fr_time_delta_t manage_interval
How often we run the management algorithm to open/close connections.
Definition trunk.h:264
trunk_enqueue_t
Definition trunk.h:149
@ TRUNK_ENQUEUE_DST_UNAVAILABLE
Destination is down.
Definition trunk.h:154
@ TRUNK_ENQUEUE_FAIL
General failure.
Definition trunk.h:155
@ TRUNK_ENQUEUE_OK
Operation was successful.
Definition trunk.h:151
@ TRUNK_ENQUEUE_NO_CAPACITY
At maximum number of connections, and no connection has capacity.
Definition trunk.h:152
@ TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
Definition trunk.h:150
void *_CONST preq
Data for the muxer to write to the connection.
Definition trunk.h:356
uint32_t target_req_per_conn
How many pending requests should ideally be running on each connection.
Definition trunk.h:237
fr_time_t _CONST last_connected
Last time a connection connected.
Definition trunk.h:317
trunk_request_cancel_mux_t request_cancel_mux
!< Read one or more requests from a connection.
Definition trunk.h:751
trunk_request_state_t
Used for sanity checks and to simplify freeing.
Definition trunk.h:162
@ TRUNK_REQUEST_STATE_PARTIAL
Some of the request was written to the socket, more of it should be written later.
Definition trunk.h:171
@ TRUNK_REQUEST_STATE_REAPABLE
Request has been written, needs to persist, but we are not currently waiting for any response.
Definition trunk.h:174
@ TRUNK_REQUEST_STATE_UNASSIGNED
Transition state - Request currently not assigned to any connection.
Definition trunk.h:166
@ TRUNK_REQUEST_STATE_INIT
Initial state.
Definition trunk.h:163
@ TRUNK_REQUEST_STATE_CANCEL_SENT
We've informed the remote server that the request has been cancelled.
Definition trunk.h:186
@ TRUNK_REQUEST_STATE_COMPLETE
The request is complete.
Definition trunk.h:183
@ TRUNK_REQUEST_STATE_FAILED
The request failed.
Definition trunk.h:184
@ TRUNK_REQUEST_STATE_CANCEL
A request on a particular socket was cancel.
Definition trunk.h:185
@ TRUNK_REQUEST_STATE_CANCEL_PARTIAL
We partially wrote a cancellation request.
Definition trunk.h:188
@ TRUNK_REQUEST_STATE_BACKLOG
In the backlog.
Definition trunk.h:168
@ TRUNK_REQUEST_STATE_CANCEL_COMPLETE
Remote server has acknowledged our cancellation.
Definition trunk.h:189
@ TRUNK_REQUEST_STATE_PENDING
In the queue of a connection and is pending writing.
Definition trunk.h:169
@ TRUNK_REQUEST_STATE_SENT
Was written to a socket. Waiting for a response.
Definition trunk.h:173
trunk_state_t _CONST state
Current state of the trunk.
Definition trunk.h:338
fr_time_t _CONST last_above_target
Last time average utilisation went above the target value.
Definition trunk.h:305
Common configuration parameters for a trunk.
Definition trunk.h:225
Public fields for the trunk connection.
Definition trunk.h:371
I/O functions to pass to trunk_alloc.
Definition trunk.h:737
Public fields for the trunk.
Definition trunk.h:301
Public fields for the trunk request.
Definition trunk.h:349
static fr_event_list_t * el
static fr_slen_t head
Definition xlat.h:420
static fr_slen_t parent
Definition pair.h:858
char const * fr_strerror(void)
Get the last library error.
Definition strerror.c:553
#define fr_box_time_delta(_val)
Definition value.h:366
int nonnull(2, 5))
static size_t char ** out
Definition value.h:1030