The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
trunk.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 8db873be94162ac501c7405855fd31e5c67bc4e5 $
19 *
20 * @file src/lib/server/trunk.c
21 * @brief A management API for bonding multiple connections together.
22 *
23 * @copyright 2019-2020 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
24 * @copyright 2019-2020 The FreeRADIUS server project
25 */
26
27#define LOG_PREFIX trunk->log_prefix
28
29#ifdef NDEBUG
30# define TALLOC_GET_TYPE_ABORT_NOOP 1
31#endif
32
35typedef struct trunk_s trunk_t;
36#define _TRUNK_PRIVATE 1
37#include <freeradius-devel/server/trunk.h>
38
39#include <freeradius-devel/server/trigger.h>
40#include <freeradius-devel/util/debug.h>
41#include <freeradius-devel/util/misc.h>
42#include <freeradius-devel/util/syserror.h>
43#include <freeradius-devel/util/minmax_heap.h>
44
45#ifdef HAVE_STDATOMIC_H
46# include <stdatomic.h>
47# ifndef ATOMIC_VAR_INIT
48# define ATOMIC_VAR_INIT(_x) (_x)
49# endif
50#else
51# include <freeradius-devel/util/stdatomic.h>
52#endif
53
54static atomic_uint_fast64_t request_counter = ATOMIC_VAR_INIT(1);
55
56#ifdef TESTING_TRUNK
58
59static fr_time_t test_time(void)
60{
61 return test_time_base;
62}
63
64#define fr_time test_time
65#endif
66
67#ifndef NDEBUG
68/** The maximum number of state logs to record per request
69 *
70 */
71#define TRUNK_REQUEST_STATE_LOG_MAX 20
72
73/** Trace state machine changes for a particular request
74 *
75 */
76typedef struct {
77 fr_dlist_head_t *log_head; //!< To allow the log entry to remove itself on free.
78 fr_dlist_t entry; //!< Entry in the linked list.
79 trunk_request_state_t from; //!< What state we transitioned from.
80 trunk_request_state_t to; //!< What state we transitioned to.
81
82 trunk_connection_t *tconn; //!< The request was associated with.
83 ///< Pointer may now be invalid, do no de-reference.
84
85 uint64_t tconn_id; //!< If the treq was associated with a connection
86 ///< the connection ID.
87 trunk_connection_state_t tconn_state; //!< If the treq was associated with a connection
88 ///< the connection state at the time of the
89 ///< state transition.
90
91 char const *function; //!< State change occurred in.
92 int line; //!< Line change occurred on.
94#endif
95
96/** Wraps a normal request
97 *
98 */
100 struct trunk_request_pub_s pub; //!< Public fields in the trunk request.
101 ///< This *MUST* be the first field in this
102 ///< structure.
103
104 uint64_t id; //!< Trunk request ID.
105
106 fr_heap_index_t heap_id; //!< Used to track the request conn->pending heap.
107
108 fr_dlist_t entry; //!< Used to track the trunk request in the conn->sent
109 ///< or trunk->backlog request.
110
111 trunk_cancel_reason_t cancel_reason; //!< Why this request was cancelled.
112
113 fr_time_t last_freed; //!< Last time this request was freed.
114
115 bool bound_to_conn; //!< Fail the request if there's an attempt to
116 ///< re-enqueue it.
117
118 bool sent; //!< Trunk request has been sent at least once.
119 ///< Used so that re-queueing doesn't increase trunk
120 ///< `sent` count.
121
122#ifndef NDEBUG
123 fr_dlist_head_t log; //!< State change log.
124#endif
125};
126
127
128/** Associates request queues with a connection
129 *
130 * @dotfile src/lib/server/trunk_conn.gv "Trunk connection state machine"
131 * @dotfile src/lib/server/trunk_req.gv "Trunk request state machine"
132 */
134 struct trunk_connection_pub_s pub; //!< Public fields in the trunk connection.
135 ///< This *MUST* be the first field in this
136 ///< structure.
137
138 fr_heap_index_t heap_id; //!< Used to track the connection in the connected
139 ///< heap.
140
141 fr_dlist_t entry; //!< Used to track the connection in the connecting,
142 ///< full and failed lists.
143
144 /** @name State
145 * @{
146 */
147 trunk_connection_event_t events; //!< The current events we expect to be notified on.
148 /** @} */
149
150 /** @name Request lists
151 * @{
152 */
153 fr_heap_t *pending; //!< Requests waiting to be sent.
154
155 trunk_request_t *partial; //!< Partially written request.
156
157 fr_dlist_head_t sent; //!< Sent request.
158
159 fr_dlist_head_t reapable; //!< Idle request.
160
161 fr_dlist_head_t cancel; //!< Requests in the cancel state.
162
163 trunk_request_t *cancel_partial; //!< Partially written cancellation request.
164
165 fr_dlist_head_t cancel_sent; //!< Sent cancellation request.
166 /** @} */
167
168 /** @name Statistics
169 * @{
170 */
171 uint64_t sent_count; //!< The number of requests that have been sent using
172 ///< this connection.
173 /** @} */
174
175 /** @name Timers
176 * @{
177 */
178 fr_timer_t *lifetime_ev; //!< Maximum time this connection can be open.
179 /** @} */
180};
181
182/** An entry in a trunk watch function list
183 *
184 */
185typedef struct trunk_watch_entry_s {
186 fr_dlist_t entry; //!< List entry.
187 trunk_watch_t func; //!< Function to call when a trunk enters
188 ///< the state this list belongs to
189 bool oneshot; //!< Remove the function after it's called once.
190 bool enabled; //!< Whether the watch entry is enabled.
191 void *uctx; //!< User data to pass to the function.
193
194/** Map connection states to trigger names
195 *
196 * Must stay in the same order as #trunk_connection_state_t
197 */
199 { L("pool.connection_halted"), TRUNK_CONN_HALTED }, /* 0x0000 - bit 0 */
200 { L("pool.connection_init"), TRUNK_CONN_INIT }, /* 0x0001 - bit 1 */
201 { L("pool.connection_connecting"), TRUNK_CONN_CONNECTING }, /* 0x0002 - bit 2 */
202 { L("pool.connection_active"), TRUNK_CONN_ACTIVE }, /* 0x0004 - bit 3 */
203 { L("pool.connection_closed"), TRUNK_CONN_CLOSED }, /* 0x0008 - bit 4 */
204 { L("pool.connection_full"), TRUNK_CONN_FULL }, /* 0x0010 - bit 5 */
205 { L("pool.connection_inactive"), TRUNK_CONN_INACTIVE }, /* 0x0020 - bit 6 */
206 { L("pool.connection_inactive_draining"), TRUNK_CONN_INACTIVE_DRAINING }, /* 0x0040 - bit 7 */
207 { L("pool.connection_draining"), TRUNK_CONN_DRAINING }, /* 0x0080 - bit 8 */
208 { L("pool.connection_draining_to_free"), TRUNK_CONN_DRAINING_TO_FREE } /* 0x0100 - bit 9 */
209};
211
212/** Main trunk management handle
213 *
214 */
215struct trunk_s {
216 struct trunk_pub_s pub; //!< Public fields in the trunk connection.
217 ///< This *MUST* be the first field in this
218 ///< structure.
219
220 char const *log_prefix; //!< What to prepend to messages.
221
222 fr_event_list_t *el; //!< Event list used by this trunk and the connection.
223
224 trunk_conf_t conf; //!< Trunk common configuration.
225
226 fr_dlist_head_t free_requests; //!< Requests in the unassigned state. Waiting to be
227 ///< enqueued.
228
229 fr_heap_t *backlog; //!< The request backlog. Requests we couldn't
230 ///< immediately assign to a connection.
231
232 /** @name Connection lists
233 *
234 * A connection must always be in exactly one of these lists
235 * or trees.
236 *
237 * @{
238 */
239 fr_dlist_head_t init; //!< Connections which have not yet started
240 ///< connecting.
241
242 fr_dlist_head_t connecting; //!< Connections which are not yet in the open state.
243
244 fr_minmax_heap_t *active; //!< Connections which can service requests.
245
246 fr_dlist_head_t full; //!< Connections which have too many outstanding
247 ///< requests.
248
249 fr_dlist_head_t inactive; //!< Connections which have been signalled to be
250 ///< inactive by the API client.
251
252 fr_dlist_head_t inactive_draining; //!< Connections which have been signalled to be
253 ///< inactive by the API client, which the trunk
254 ///< manager is draining to close.
255
256 fr_dlist_head_t failed; //!< Connections that'll be reconnected shortly.
257
258 fr_dlist_head_t closed; //!< Connections that have closed. Either due to
259 ///< shutdown, reconnection or failure.
260
261 fr_dlist_head_t draining; //!< Connections that will be freed once all their
262 ///< requests are complete, but can be reactivated.
263
264 fr_dlist_head_t draining_to_free; //!< Connections that will be freed once all their
265 ///< requests are complete.
266
267 fr_dlist_head_t to_free; //!< Connections we're done with and will free on
268 //!< the next call to trunk_manage.
269 //!< This prevents connections from being freed
270 //!< whilst we're inside callbacks.
271 /** @} */
272
273 /** @name Callbacks
274 * @{
275 */
276 trunk_io_funcs_t funcs; //!< I/O functions.
277
278 void *in_handler; //!< Which handler we're inside.
279
280 void *uctx; //!< Uctx data to pass to alloc.
281
282 fr_dlist_head_t watch[TRUNK_STATE_MAX]; //!< To be called when trunk changes state.
283
284 trunk_watch_entry_t *next_watcher; //!< Watcher about to be run. Used to prevent nested watchers.
285 /** @} */
286
287 /** @name Timers
288 * @{
289 */
290 fr_timer_t *manage_ev; //!< Periodic connection management event.
291 /** @} */
292
293 /** @name Log rate limiting entries
294 * @{
295 */
296 fr_rate_limit_t limit_max_requests_alloc_log; //!< Rate limit on "Refusing to alloc requests - Limit of * requests reached"
297
298 fr_rate_limit_t limit_last_failure_log; //!< Rate limit on "Refusing to enqueue requests - No active conns"
299 /** @} */
300
301 /** @name State
302 * @{
303 */
304 bool freeing; //!< Trunk is being freed, don't spawn new
305 ///< connections or re-enqueue.
306
307 bool started; //!< Has the trunk been started.
308
309 bool managing_connections; //!< Whether the trunk is allowed to manage
310 ///< (open/close) connections.
311
312 uint64_t last_req_per_conn; //!< The last request to connection ratio we calculated.
313 /** @} */
314
315 fr_pair_list_t *trigger_args; //!< Passed to trigger
316
317 bool trigger_undef[NUM_ELEMENTS(trunk_conn_trigger_names)]; //!< Record that a specific trigger is undefined.
318
320};
321
322int trunk_trigger_cf_parse(TALLOC_CTX *ctx, void *out, void *parent, CONF_ITEM *ci, conf_parser_t const *rule);
323
325 { FR_CONF_OFFSET("per_connection_max", trunk_conf_t, max_req_per_conn), .dflt = "2000" },
326 { FR_CONF_OFFSET("per_connection_target", trunk_conf_t, target_req_per_conn), .dflt = "1000" },
327 { FR_CONF_OFFSET("free_delay", trunk_conf_t, req_cleanup_delay), .dflt = "10.0" },
328 { FR_CONF_OFFSET("triggers", trunk_conf_t, req_triggers), .func = trunk_trigger_cf_parse },
329
331};
332
334 { FR_CONF_OFFSET("connect_timeout", connection_conf_t, connection_timeout), .dflt = "3.0" },
335 { FR_CONF_OFFSET("reconnect_delay", connection_conf_t, reconnection_delay), .dflt = "1" },
336
338};
339
340#ifndef TRUNK_TESTS
342 { FR_CONF_OFFSET("start", trunk_conf_t, start), .dflt = "1" },
343 { FR_CONF_OFFSET("min", trunk_conf_t, min), .dflt = "1" },
344 { FR_CONF_OFFSET("max", trunk_conf_t, max), .dflt = "5" },
345 { FR_CONF_OFFSET("connecting", trunk_conf_t, connecting), .dflt = "2" },
346 { FR_CONF_OFFSET("uses", trunk_conf_t, max_uses), .dflt = "0" },
347 { FR_CONF_OFFSET("lifetime", trunk_conf_t, lifetime), .dflt = "0" },
348 { FR_CONF_OFFSET("idle_timeout", trunk_conf_t, idle_timeout), .dflt = "0" },
349
350 { FR_CONF_OFFSET("open_delay", trunk_conf_t, open_delay), .dflt = "0.2" },
351 { FR_CONF_OFFSET("close_delay", trunk_conf_t, close_delay), .dflt = "10.0" },
352
353 { FR_CONF_OFFSET("manage_interval", trunk_conf_t, manage_interval), .dflt = "0.2" },
354
355 { FR_CONF_OFFSET("max_backlog", trunk_conf_t, max_backlog), .dflt = "1000" },
356
357 { FR_CONF_OFFSET("backlog_on_failed_conn", trunk_conf_t, backlog_on_failed_conn), },
358
359 { FR_CONF_OFFSET("triggers", trunk_conf_t, conn_triggers), .func = trunk_trigger_cf_parse },
360
361 { FR_CONF_OFFSET_SUBSECTION("connection", 0, trunk_conf_t, conn_conf, trunk_config_connection), .subcs_size = sizeof(trunk_config_connection) },
362 { FR_CONF_POINTER("request", 0, CONF_FLAG_SUBSECTION, NULL), .subcs = (void const *) trunk_config_request },
363
365};
366#endif
367
368#ifndef NDEBUG
369/** Map request states to trigger names
370 *
371 * Must stay in the same order as #trunk_connection_state_t
372 */
374 { L("pool.request_init"), TRUNK_REQUEST_STATE_INIT }, /* 0x0000 - bit 0 */
375 { L("pool.request_unassigned"), TRUNK_REQUEST_STATE_UNASSIGNED }, /* 0x0001 - bit 1 */
376 { L("pool.request_backlog"), TRUNK_REQUEST_STATE_BACKLOG }, /* 0x0002 - bit 2 */
377 { L("pool.request_pending"), TRUNK_REQUEST_STATE_PENDING }, /* 0x0004 - bit 3 */
378 { L("pool.request_partial"), TRUNK_REQUEST_STATE_PARTIAL }, /* 0x0008 - bit 4 */
379 { L("pool.request_sent"), TRUNK_REQUEST_STATE_SENT }, /* 0x0010 - bit 5 */
380 { L("pool.request_state_reapable"), TRUNK_REQUEST_STATE_REAPABLE }, /* 0x0020 - bit 6 */
381 { L("pool.request_complete"), TRUNK_REQUEST_STATE_COMPLETE }, /* 0x0040 - bit 7 */
382 { L("pool.request_state_failed"), TRUNK_REQUEST_STATE_FAILED }, /* 0x0080 - bit 8 */
383 { L("pool.request_state_cancel"), TRUNK_REQUEST_STATE_CANCEL }, /* 0x0100 - bit 9 */
384 { L("pool.request_state_cancel_sent"), TRUNK_REQUEST_STATE_CANCEL_SENT }, /* 0x0200 - bit 10 */
385 { L("pool.request_state_cancel_partial"), TRUNK_REQUEST_STATE_CANCEL_PARTIAL }, /* 0x0400 - bit 11 */
386 { L("pool.request_state_cancel_complete"), TRUNK_REQUEST_STATE_CANCEL_COMPLETE }, /* 0x0800 - bit 12 */
387};
389#endif
390
392 { L("INIT"), TRUNK_REQUEST_STATE_INIT },
393 { L("UNASSIGNED"), TRUNK_REQUEST_STATE_UNASSIGNED },
394 { L("BACKLOG"), TRUNK_REQUEST_STATE_BACKLOG },
395 { L("PENDING"), TRUNK_REQUEST_STATE_PENDING },
396 { L("PARTIAL"), TRUNK_REQUEST_STATE_PARTIAL },
397 { L("SENT"), TRUNK_REQUEST_STATE_SENT },
398 { L("REAPABLE"), TRUNK_REQUEST_STATE_REAPABLE },
399 { L("COMPLETE"), TRUNK_REQUEST_STATE_COMPLETE },
400 { L("FAILED"), TRUNK_REQUEST_STATE_FAILED },
401 { L("CANCEL"), TRUNK_REQUEST_STATE_CANCEL },
402 { L("CANCEL-SENT"), TRUNK_REQUEST_STATE_CANCEL_SENT },
403 { L("CANCEL-PARTIAL"), TRUNK_REQUEST_STATE_CANCEL_PARTIAL },
404 { L("CANCEL-COMPLETE"), TRUNK_REQUEST_STATE_CANCEL_COMPLETE }
405};
407
409 { L("IDLE"), TRUNK_STATE_IDLE },
410 { L("ACTIVE"), TRUNK_STATE_ACTIVE },
411 { L("PENDING"), TRUNK_STATE_PENDING }
412};
414
416 { L("INIT"), TRUNK_CONN_INIT },
417 { L("HALTED"), TRUNK_CONN_HALTED },
418 { L("CONNECTING"), TRUNK_CONN_CONNECTING },
419 { L("ACTIVE"), TRUNK_CONN_ACTIVE },
420 { L("CLOSED"), TRUNK_CONN_CLOSED },
421 { L("FULL"), TRUNK_CONN_FULL },
422 { L("INACTIVE"), TRUNK_CONN_INACTIVE },
423 { L("INACTIVE-DRAINING"), TRUNK_CONN_INACTIVE_DRAINING },
424 { L("DRAINING"), TRUNK_CONN_DRAINING },
425 { L("DRAINING-TO-FREE"), TRUNK_CONN_DRAINING_TO_FREE }
426};
428
430 { L("TRUNK_CANCEL_REASON_NONE"), TRUNK_CANCEL_REASON_NONE },
431 { L("TRUNK_CANCEL_REASON_SIGNAL"), TRUNK_CANCEL_REASON_SIGNAL },
432 { L("TRUNK_CANCEL_REASON_MOVE"), TRUNK_CANCEL_REASON_MOVE },
433 { L("TRUNK_CANCEL_REASON_REQUEUE"), TRUNK_CANCEL_REASON_REQUEUE }
434};
436
438 { L("TRUNK_CONN_EVENT_NONE"), TRUNK_CONN_EVENT_NONE },
439 { L("TRUNK_CONN_EVENT_READ"), TRUNK_CONN_EVENT_READ },
440 { L("TRUNK_CONN_EVENT_WRITE"), TRUNK_CONN_EVENT_WRITE },
441 { L("TRUNK_CONN_EVENT_BOTH"), TRUNK_CONN_EVENT_BOTH },
442};
444
445#define CONN_TRIGGER(_state) do { \
446 uint8_t idx = fr_high_bit_pos(_state); \
447 if (trunk->conf.conn_triggers && !trunk->trigger_undef[idx]) { \
448 if (trigger(unlang_interpret_get_thread_default(), trunk->conf.conn_trigger_cs, \
449 &trunk->trigger_cp[idx], \
450 fr_table_str_by_value(trunk_conn_trigger_names, _state, \
451 "<INVALID>"), true, trunk->trigger_args) == -1) { \
452 trunk->trigger_undef[idx] = true; \
453 } \
454 } \
455} while (0)
456
457#define CONN_STATE_TRANSITION(_new, _log) \
458do { \
459 _log("[%" PRIu64 "] Trunk connection changed state %s -> %s", \
460 tconn->pub.conn->id, \
461 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
462 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>")); \
463 tconn->pub.state = _new; \
464 CONN_TRIGGER(_new); \
465 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false); \
466} while (0)
467
468#define CONN_BAD_STATE_TRANSITION(_new) \
469do { \
470 if (!fr_cond_assert_msg(0, "[%" PRIu64 "] Trunk connection invalid transition %s -> %s", \
471 tconn->pub.conn->id, \
472 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
473 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>"))) return; \
474} while (0)
475
476#ifndef NDEBUG
477void trunk_request_state_log_entry_add(char const *function, int line,
478 trunk_request_t *treq, trunk_request_state_t new) CC_HINT(nonnull);
479
480#define REQUEST_TRIGGER(_state) do { \
481 if (trunk->conf.req_triggers) { \
482 trigger(unlang_interpret_get_thread_default(), \
483 trunk->conf.req_trigger_cs, NULL, fr_table_str_by_value(trunk_req_trigger_names, _state, \
484 "<INVALID>"), true, trunk->trigger_args); \
485 } \
486} while (0)
487
488/** Record a request state transition and log appropriate output
489 *
490 */
491#define REQUEST_STATE_TRANSITION(_new) \
492do { \
493 request_t *request = treq->pub.request; \
494 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
495 treq->id, \
496 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
497 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
498 trunk_request_state_log_entry_add(__FUNCTION__, __LINE__, treq, _new); \
499 treq->pub.state = _new; \
500 REQUEST_TRIGGER(_new); \
501} while (0)
502#define REQUEST_BAD_STATE_TRANSITION(_new) \
503do { \
504 trunk_request_state_log(&default_log, L_ERR, __FILE__, __LINE__, treq); \
505 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
506 treq->id, \
507 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
508 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
509} while (0)
510#else
511/** Record a request state transition
512 *
513 */
514#define REQUEST_STATE_TRANSITION(_new) \
515do { \
516 request_t *request = treq->pub.request; \
517 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
518 treq->id, \
519 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
520 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
521 treq->pub.state = _new; \
522} while (0)
523#define REQUEST_BAD_STATE_TRANSITION(_new) \
524do { \
525 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
526 treq->id, \
527 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
528 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
529} while (0)
530#endif
531
532
533/** Call the cancel callback if set
534 *
535 */
536#define DO_REQUEST_CANCEL(_treq, _reason) \
537do { \
538 if ((_treq)->pub.trunk->funcs.request_cancel) { \
539 request_t *request = (_treq)->pub.request; \
540 void *_prev = (_treq)->pub.trunk->in_handler; \
541 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_cancel; \
542 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_cancel(conn=%p, preq=%p, reason=%s, uctx=%p)", \
543 (_treq)->pub.tconn->pub.conn, \
544 (_treq)->pub.preq, \
545 fr_table_str_by_value(trunk_cancellation_reasons, \
546 (_reason), \
547 "<INVALID>"), \
548 (_treq)->pub.trunk->uctx); \
549 (_treq)->pub.trunk->funcs.request_cancel((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_reason), (_treq)->pub.trunk->uctx); \
550 (_treq)->pub.trunk->in_handler = _prev; \
551 } \
552} while(0)
553
554/** Call the "conn_release" callback (if set)
555 *
556 */
557#define DO_REQUEST_CONN_RELEASE(_treq) \
558do { \
559 if ((_treq)->pub.trunk->funcs.request_conn_release) { \
560 request_t *request = (_treq)->pub.request; \
561 void *_prev = (_treq)->pub.trunk->in_handler; \
562 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_conn_release; \
563 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_conn_release(conn=%p, preq=%p, uctx=%p)", \
564 (_treq)->pub.tconn->pub.conn, \
565 (_treq)->pub.preq, \
566 (_treq)->pub.trunk->uctx); \
567 (_treq)->pub.trunk->funcs.request_conn_release((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
568 (_treq)->pub.trunk->in_handler = _prev; \
569 } \
570} while(0)
571
572/** Call the complete callback (if set)
573 *
574 */
575#define DO_REQUEST_COMPLETE(_treq) \
576do { \
577 if ((_treq)->pub.trunk->funcs.request_complete) { \
578 request_t *request = (_treq)->pub.request; \
579 void *_prev = (_treq)->pub.trunk->in_handler; \
580 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_complete(request=%p, preq=%p, rctx=%p, uctx=%p)", \
581 (_treq)->pub.request, \
582 (_treq)->pub.preq, \
583 (_treq)->pub.rctx, \
584 (_treq)->pub.trunk->uctx); \
585 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_complete; \
586 (_treq)->pub.trunk->funcs.request_complete((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, (_treq)->pub.trunk->uctx); \
587 (_treq)->pub.trunk->in_handler = _prev; \
588 } \
589} while(0)
590
591/** Call the fail callback (if set)
592 *
593 */
594#define DO_REQUEST_FAIL(_treq, _prev_state) \
595do { \
596 if ((_treq)->pub.trunk->funcs.request_fail) { \
597 request_t *request = (_treq)->pub.request; \
598 void *_prev = (_treq)->pub.trunk->in_handler; \
599 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_fail(request=%p, preq=%p, rctx=%p, state=%s uctx=%p)", \
600 (_treq)->pub.request, \
601 (_treq)->pub.preq, \
602 (_treq)->pub.rctx, \
603 fr_table_str_by_value(trunk_request_states, (_prev_state), "<INVALID>"), \
604 (_treq)->pub.trunk->uctx); \
605 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_fail; \
606 (_treq)->pub.trunk->funcs.request_fail((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, _prev_state, (_treq)->pub.trunk->uctx); \
607 (_treq)->pub.trunk->in_handler = _prev; \
608 } \
609} while(0)
610
611/** Call the free callback (if set)
612 *
613 */
614#define DO_REQUEST_FREE(_treq) \
615do { \
616 if ((_treq)->pub.trunk->funcs.request_free) { \
617 request_t *request = (_treq)->pub.request; \
618 void *_prev = (_treq)->pub.trunk->in_handler; \
619 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_free(request=%p, preq=%p, uctx=%p)", \
620 (_treq)->pub.request, \
621 (_treq)->pub.preq, \
622 (_treq)->pub.trunk->uctx); \
623 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_free; \
624 (_treq)->pub.trunk->funcs.request_free((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
625 (_treq)->pub.trunk->in_handler = _prev; \
626 } \
627} while(0)
628
629/** Write one or more requests to a connection
630 *
631 */
632#define DO_REQUEST_MUX(_tconn) \
633do { \
634 void *_prev = (_tconn)->pub.trunk->in_handler; \
635 DEBUG3("[%" PRIu64 "] Calling request_mux(el=%p, tconn=%p, conn=%p, uctx=%p)", \
636 (_tconn)->pub.conn->id, \
637 (_tconn)->pub.trunk->el, \
638 (_tconn), \
639 (_tconn)->pub.conn, \
640 (_tconn)->pub.trunk->uctx); \
641 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_mux; \
642 (_tconn)->pub.trunk->funcs.request_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
643 (_tconn)->pub.trunk->in_handler = _prev; \
644} while(0)
645
646/** Read one or more requests from a connection
647 *
648 */
649#define DO_REQUEST_DEMUX(_tconn) \
650do { \
651 void *_prev = (_tconn)->pub.trunk->in_handler; \
652 DEBUG3("[%" PRIu64 "] Calling request_demux(tconn=%p, conn=%p, uctx=%p)", \
653 (_tconn)->pub.conn->id, \
654 (_tconn), \
655 (_tconn)->pub.conn, \
656 (_tconn)->pub.trunk->uctx); \
657 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_demux; \
658 (_tconn)->pub.trunk->funcs.request_demux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
659 (_tconn)->pub.trunk->in_handler = _prev; \
660} while(0)
661
662/** Write one or more cancellation requests to a connection
663 *
664 */
665#define DO_REQUEST_CANCEL_MUX(_tconn) \
666do { \
667 if ((_tconn)->pub.trunk->funcs.request_cancel_mux) { \
668 void *_prev = (_tconn)->pub.trunk->in_handler; \
669 DEBUG3("[%" PRIu64 "] Calling request_cancel_mux(tconn=%p, conn=%p, uctx=%p)", \
670 (_tconn)->pub.conn->id, \
671 (_tconn), \
672 (_tconn)->pub.conn, \
673 (_tconn)->pub.trunk->uctx); \
674 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_cancel_mux; \
675 (_tconn)->pub.trunk->funcs.request_cancel_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
676 (_tconn)->pub.trunk->in_handler = _prev; \
677 } \
678} while(0)
679
680/** Allocate a new connection
681 *
682 */
683#define DO_CONNECTION_ALLOC(_tconn) \
684do { \
685 void *_prev = trunk->in_handler; \
686 DEBUG3("Calling connection_alloc(tconn=%p, el=%p, conf=%p, log_prefix=\"%s\", uctx=%p)", \
687 (_tconn), \
688 (_tconn)->pub.trunk->el, \
689 (_tconn)->pub.trunk->conf.conn_conf, \
690 trunk->log_prefix, \
691 (_tconn)->pub.trunk->uctx); \
692 (_tconn)->pub.trunk->in_handler = (void *) (_tconn)->pub.trunk->funcs.connection_alloc; \
693 (_tconn)->pub.conn = trunk->funcs.connection_alloc((_tconn), (_tconn)->pub.trunk->el, (_tconn)->pub.trunk->conf.conn_conf, (_tconn)->pub.trunk->log_prefix, trunk->uctx); \
694 (_tconn)->pub.trunk->in_handler = _prev; \
695 if (!(_tconn)->pub.conn) { \
696 ERROR("Failed creating new connection"); \
697 talloc_free(tconn); \
698 return -1; \
699 } \
700} while(0)
701
702/** Change what events the connection should be notified about
703 *
704 */
705#define DO_CONNECTION_NOTIFY(_tconn, _events) \
706do { \
707 if ((_tconn)->pub.trunk->funcs.connection_notify) { \
708 void *_prev = (_tconn)->pub.trunk->in_handler; \
709 DEBUG3("[%" PRIu64 "] Calling connection_notify(tconn=%p, conn=%p, el=%p, events=%s, uctx=%p)", \
710 (_tconn)->pub.conn->id, \
711 (_tconn), \
712 (_tconn)->pub.conn, \
713 (_tconn)->pub.trunk->el, \
714 fr_table_str_by_value(trunk_connection_events, (_events), "<INVALID>"), \
715 (_tconn)->pub.trunk->uctx); \
716 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.connection_notify; \
717 (_tconn)->pub.trunk->funcs.connection_notify((_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->el, (_events), (_tconn)->pub.trunk->uctx); \
718 (_tconn)->pub.trunk->in_handler = _prev; \
719 } \
720} while(0)
721
722#define IN_HANDLER(_trunk) (((_trunk)->in_handler) != NULL)
723#define IN_REQUEST_MUX(_trunk) (((_trunk)->funcs.request_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_mux))
724#define IN_REQUEST_DEMUX(_trunk) (((_trunk)->funcs.request_demux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_demux))
725#define IN_REQUEST_CANCEL_MUX(_trunk) (((_trunk)->funcs.request_cancel_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_cancel_mux))
726
727#define IS_SERVICEABLE(_tconn) ((_tconn)->pub.state & TRUNK_CONN_SERVICEABLE)
728#define IS_PROCESSING(_tconn) ((_tconn)->pub.state & TRUNK_CONN_PROCESSING)
729
730/** Remove the current request from the backlog
731 *
732 */
733#define REQUEST_EXTRACT_BACKLOG(_treq) \
734do { \
735 int _ret; \
736 _ret = fr_heap_extract(&(_treq)->pub.trunk->backlog, _treq); \
737 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from backlog heap: %s", fr_strerror())) break; \
738} while (0)
739
740/** Remove the current request from the pending list
741 *
742 */
743#define REQUEST_EXTRACT_PENDING(_treq) \
744do { \
745 int _ret; \
746 _ret = fr_heap_extract(&(_treq)->pub.tconn->pending, _treq); \
747 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from pending heap: %s", fr_strerror())) break; \
748} while (0)
749
750/** Remove the current request from the partial slot
751 *
752 */
753#define REQUEST_EXTRACT_PARTIAL(_treq) \
754do { \
755 fr_assert((_treq)->pub.tconn->partial == treq); \
756 tconn->partial = NULL; \
757} while (0)
758
759/** Remove the current request from the sent list
760 *
761 */
762#define REQUEST_EXTRACT_SENT(_treq) fr_dlist_remove(&tconn->sent, treq)
763
764/** Remove the current request from the reapable list
765 *
766 */
767#define REQUEST_EXTRACT_REAPABLE(_treq) fr_dlist_remove(&tconn->reapable, treq)
768
769/** Remove the current request from the cancel list
770 *
771 */
772#define REQUEST_EXTRACT_CANCEL(_treq) fr_dlist_remove(&tconn->cancel, treq)
773
774/** Remove the current request from the cancel_partial slot
775 *
776 */
777#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq) \
778do { \
779 fr_assert((_treq)->pub.tconn->cancel_partial == treq); \
780 tconn->cancel_partial = NULL; \
781} while (0)
782
783/** Remove the current request from the cancel sent list
784 *
785 */
786#define REQUEST_EXTRACT_CANCEL_SENT(_treq) fr_dlist_remove(&tconn->cancel_sent, treq)
787
788/** Reorder the connections in the active heap
789 *
790 * fr_heap_extract will also error out if heap_id is bad - no need for assert
791 */
792#define CONN_REORDER(_tconn) \
793do { \
794 int _ret; \
795 if ((fr_minmax_heap_num_elements((_tconn)->pub.trunk->active) == 1)) break; \
796 if (!fr_cond_assert((_tconn)->pub.state == TRUNK_CONN_ACTIVE)) break; \
797 _ret = fr_minmax_heap_extract((_tconn)->pub.trunk->active, (_tconn)); \
798 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) break; \
799 fr_minmax_heap_insert((_tconn)->pub.trunk->active, (_tconn)); \
800} while (0)
801
802
803#define FR_TRUNK_LIST_FUNC(_list) \
804static inline CC_HINT(nonnull, always_inline) void trunk_list_ ## _list ## _add(trunk_t *trunk, trunk_request_t *treq) \
805{ \
806 fr_dlist_insert_head(&trunk->_list, treq); \
807} \
808static inline CC_HINT(nonnull, always_inline) trunk_request_t *trunk_list_ ## _list ##_peek(trunk_t *trunk) \
809{ \
810 return fr_dlist_tail(&trunk->_list); \
811} \
812static inline CC_HINT(nonnull, always_inline) trunk_request_t *trunk_list_ ## _list ##_pop(trunk_t *trunk) \
813{ \
814 return fr_dlist_pop_head(&trunk->_list); \
815} \
816static inline CC_HINT(nonnull, always_inline) void trunk_list_ ## _list ##_remove(trunk_t *trunk, trunk_request_t *treq) \
817{ \
818 fr_dlist_remove(&trunk->_list, treq); \
819}
820
821FR_TRUNK_LIST_FUNC(free_requests)
822
823
824/** Call a list of watch functions associated with a state
825 *
826 */
828{
829 /*
830 * Nested watcher calls are not allowed
831 * and shouldn't be possible because of
832 * deferred signal processing.
833 */
834 fr_assert(trunk->next_watcher == NULL);
835
836 while ((trunk->next_watcher = fr_dlist_next(list, trunk->next_watcher))) {
837 trunk_watch_entry_t *entry = trunk->next_watcher;
838 bool oneshot = entry->oneshot; /* Watcher could be freed, so store now */
839
840 if (!entry->enabled) continue;
841 if (oneshot) trunk->next_watcher = fr_dlist_remove(list, entry);
842
843 entry->func(trunk, trunk->pub.state, state, entry->uctx);
844
845 if (oneshot) talloc_free(entry);
846 }
847 trunk->next_watcher = NULL;
848}
849
850/** Call the state change watch functions
851 *
852 */
853#define CALL_WATCHERS(_trunk, _state) \
854do { \
855 if (fr_dlist_empty(&(_trunk)->watch[_state])) break; \
856 trunk_watch_call((_trunk), &(_trunk)->watch[_state], _state); \
857} while(0)
858
859/** Remove a watch function from a trunk state list
860 *
861 * @param[in] trunk The trunk to remove the watcher from.
862 * @param[in] state to remove the watch from.
863 * @param[in] watch Function to remove.
864 * @return
865 * - 0 if the function was removed successfully.
866 * - -1 if the function wasn't present in the watch list.
867 * - -2 if an invalid state was passed.
868 */
870{
871 trunk_watch_entry_t *entry = NULL;
872 fr_dlist_head_t *list;
873
874 if (state >= TRUNK_STATE_MAX) return -2;
875
876 list = &trunk->watch[state];
877 while ((entry = fr_dlist_next(list, entry))) {
878 if (entry->func == watch) {
879 if (trunk->next_watcher == entry) {
880 trunk->next_watcher = fr_dlist_remove(list, entry);
881 } else {
882 fr_dlist_remove(list, entry);
883 }
884 talloc_free(entry);
885 return 0;
886 }
887 }
888
889 return -1;
890}
891
892/** Add a watch entry to the trunk state list
893 *
894 * @param[in] trunk The trunk to add the watcher to.
895 * @param[in] state to watch for.
896 * @param[in] watch Function to add.
897 * @param[in] oneshot Should this watcher only be run once.
898 * @param[in] uctx Context to pass to function.
899 * @return
900 * - NULL if an invalid state is passed.
901 * - A new watch entry handle on success.
902 */
904 trunk_watch_t watch, bool oneshot, void const *uctx)
905{
906 trunk_watch_entry_t *entry;
907 fr_dlist_head_t *list;
908
909 if (state >= TRUNK_STATE_MAX) return NULL;
910
911 list = &trunk->watch[state];
912 MEM(entry = talloc_zero(trunk, trunk_watch_entry_t));
913
914 entry->func = watch;
915 entry->oneshot = oneshot;
916 entry->enabled = true;
917 memcpy(&entry->uctx, &uctx, sizeof(entry->uctx));
918 fr_dlist_insert_tail(list, entry);
919
920 return entry;
921}
922
923#define TRUNK_STATE_TRANSITION(_new) \
924do { \
925 DEBUG3("Trunk changed state %s -> %s", \
926 fr_table_str_by_value(trunk_states, trunk->pub.state, "<INVALID>"), \
927 fr_table_str_by_value(trunk_states, _new, "<INVALID>")); \
928 CALL_WATCHERS(trunk, _new); \
929 trunk->pub.state = _new; \
930} while (0)
931
932static void trunk_request_enter_backlog(trunk_request_t *treq, bool new);
933static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new);
942
943static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out,
944 trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify);
945
946static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now);
947static inline void trunk_connection_auto_full(trunk_connection_t *tconn);
948static inline void trunk_connection_auto_unfull(trunk_connection_t *tconn);
949static inline void trunk_connection_readable(trunk_connection_t *tconn);
950static inline void trunk_connection_writable(trunk_connection_t *tconn);
958
959static void trunk_rebalance(trunk_t *trunk);
960static void trunk_manage(trunk_t *trunk, fr_time_t now);
961static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx);
962static void trunk_backlog_drain(trunk_t *trunk);
963
964/** Compare two protocol requests
965 *
966 * Allows protocol requests to be prioritised with a function
967 * specified by the API client. Defaults to by pointer address
968 * if no function is specified.
969 *
970 * @param[in] a treq to compare to b.
971 * @param[in] b treq to compare to a.
972 * @return
973 * - +1 if a > b.
974 * - 0 if a == b.
975 * - -1 if a < b.
976 */
977static int8_t _trunk_request_prioritise(void const *a, void const *b)
978{
981
982 fr_assert(treq_a->pub.trunk == treq_b->pub.trunk);
983
984 return treq_a->pub.trunk->funcs.request_prioritise(treq_a->pub.preq, treq_b->pub.preq);
985}
986
987/** Remove a request from all connection lists
988 *
989 * A common function used by init, fail, complete state functions to disassociate
990 * a request from a connection in preparation for freeing or reassignment.
991 *
992 * Despite its unassuming name, this function is *the* place to put calls to
993 * functions which need to be called when the number of requests associated with
994 * a connection changes.
995 *
996 * Trunk requests will always be passed to this function before they're removed
997 * from a connection, even if the requests are being freed.
998 *
999 * @param[in] treq to trigger a state change for.
1000 */
1002{
1003 trunk_connection_t *tconn = treq->pub.tconn;
1004 trunk_t *trunk = treq->pub.trunk;
1005
1006 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1007
1008 switch (treq->pub.state) {
1010 return; /* Not associated with connection */
1011
1014 break;
1015
1018 break;
1019
1022 break;
1023
1026 break;
1027
1030 break;
1031
1034 break;
1035
1038 break;
1039
1040 default:
1041 fr_assert(0);
1042 break;
1043 }
1044
1045 /*
1046 * If the request wasn't associated with a
1047 * connection, then there's nothing more
1048 * to do.
1049 */
1050 if (!tconn) return;
1051
1052 {
1053 request_t *request = treq->pub.request;
1054
1055 ROPTIONAL(RDEBUG3, DEBUG3, "%s Trunk connection released request %" PRIu64,
1056 tconn->pub.conn->name, treq->id);
1057 }
1058 /*
1059 * Release any connection specific resources the
1060 * treq holds.
1061 */
1063
1064 switch (tconn->pub.state){
1065 case TRUNK_CONN_FULL:
1066 trunk_connection_auto_unfull(tconn); /* Check if we can switch back to active */
1067 if (tconn->pub.state == TRUNK_CONN_FULL) break; /* Only fallthrough if conn is now active */
1069
1070 case TRUNK_CONN_ACTIVE:
1071 CONN_REORDER(tconn);
1072 break;
1073
1074 default:
1075 break;
1076 }
1077
1078 treq->pub.tconn = NULL;
1079
1080 /*
1081 * Request removed from the connection
1082 * see if we need up deregister I/O events.
1083 */
1085}
1086
1087/** Transition a request to the unassigned state, in preparation for re-assignment
1088 *
1089 * @note treq->tconn may be inviable after calling
1090 * if treq->conn and connection_signals_pause are not used.
1091 * This is due to call to trunk_request_remove_from_conn.
1092 *
1093 * @param[in] treq to trigger a state change for.
1094 */
1096{
1097 trunk_t *trunk = treq->pub.trunk;
1098
1099 switch (treq->pub.state) {
1101 return;
1102
1105 break;
1106
1112 break;
1113
1114 default:
1116 }
1117
1119}
1120
1121/** Transition a request to the backlog state, adding it to the backlog of the trunk
1122 *
1123 * @note treq->tconn and treq may be inviable after calling
1124 * if treq->conn and connection_signals_pause are not used.
1125 * This is due to call to trunk_manage.
1126 *
1127 * @param[in] treq to trigger a state change for.
1128 * @param[in] new Whether this is a new request.
1129 */
1131{
1132 trunk_connection_t *tconn = treq->pub.tconn;
1133 trunk_t *trunk = treq->pub.trunk;
1134
1135 switch (treq->pub.state) {
1138 break;
1139
1142 break;
1143
1146 break;
1147
1148 default:
1150 }
1151
1153 fr_heap_insert(&trunk->backlog, treq); /* Insert into the backlog heap */
1154
1155 /*
1156 * A new request has entered the trunk.
1157 * Re-calculate request/connection ratios.
1158 */
1159 if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1160
1161 /*
1162 * To reduce latency, if there's no connections
1163 * in the connecting state, call the trunk manage
1164 * function immediately.
1165 *
1166 * Likewise, if there's draining connections
1167 * which could be moved back to active call
1168 * the trunk manage function.
1169 *
1170 * Remember requests only enter the backlog if
1171 * there's no connections which can service them.
1172 */
1176 }
1177}
1178
1179/** Transition a request to the pending state, adding it to the backlog of an active connection
1180 *
1181 * All trunk requests being added to a connection get passed to this function.
1182 * All trunk requests being removed from a connection get passed to #trunk_request_remove_from_conn.
1183 *
1184 * @note treq->tconn and treq may be inviable after calling
1185 * if treq->conn and connection_signals_pause is not used.
1186 * This is due to call to trunk_connection_event_update.
1187 *
1188 * @param[in] treq to trigger a state change for.
1189 * @param[in] tconn to enqueue the request on.
1190 * @param[in] new Whether this is a new request.
1191 */
1193{
1194 trunk_t *trunk = treq->pub.trunk;
1195
1196 fr_assert(tconn->pub.trunk == trunk);
1197 fr_assert(IS_PROCESSING(tconn));
1198
1199 switch (treq->pub.state) {
1202 fr_assert(!treq->pub.tconn);
1203 break;
1204
1206 fr_assert(!treq->pub.tconn);
1208 break;
1209
1210 case TRUNK_REQUEST_STATE_CANCEL: /* Moved from another connection */
1212 break;
1213
1214 default:
1216 }
1217
1218 /*
1219 * Assign the new connection first this first so
1220 * it appears in the state log.
1221 */
1222 treq->pub.tconn = tconn;
1223
1225
1226 {
1227 request_t *request = treq->pub.request;
1228
1229 ROPTIONAL(RDEBUG, DEBUG3, "%s Trunk connection assigned request %"PRIu64,
1230 tconn->pub.conn->name, treq->id);
1231 }
1232 fr_heap_insert(&tconn->pending, treq);
1233
1234 /*
1235 * A new request has entered the trunk.
1236 * Re-calculate request/connection ratios.
1237 */
1238 if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1239
1240 /*
1241 * Check if we need to automatically transition the
1242 * connection to full.
1243 */
1245
1246 /*
1247 * Reorder the connection in the heap now it has an
1248 * additional request.
1249 */
1250 if (tconn->pub.state == TRUNK_CONN_ACTIVE) CONN_REORDER(tconn);
1251
1252 /*
1253 * We have a new request, see if we need to register
1254 * for I/O events.
1255 */
1257}
1258
1259/** Transition a request to the partial state, indicating that is has been partially sent
1260 *
1261 * @param[in] treq to trigger a state change for.
1262 */
1264{
1265 trunk_connection_t *tconn = treq->pub.tconn;
1266 trunk_t *trunk = treq->pub.trunk;
1267
1268 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1269
1270 switch (treq->pub.state) {
1271 case TRUNK_REQUEST_STATE_PENDING: /* All requests go through pending, even requeued ones */
1273 break;
1274
1275 default:
1277 }
1278
1279 fr_assert(!tconn->partial);
1280 tconn->partial = treq;
1281
1283}
1284
1285/** Transition a request to the sent state, indicating that it's been sent in its entirety
1286 *
1287 * @note treq->tconn and treq may be inviable after calling
1288 * if treq->conn and connection_signals_pause is not used.
1289 * This is due to call to trunk_connection_event_update.
1290 *
1291 * @param[in] treq to trigger a state change for.
1292 */
1294{
1295 trunk_connection_t *tconn = treq->pub.tconn;
1296 trunk_t *trunk = treq->pub.trunk;
1297
1298 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1299
1300 switch (treq->pub.state) {
1303 break;
1304
1307 break;
1308
1309 default:
1311 }
1312
1314 fr_dlist_insert_tail(&tconn->sent, treq);
1315
1316 /*
1317 * Update the connection's sent stats if this is the
1318 * first time this request is being sent.
1319 */
1320 if (!treq->sent) {
1321 trunk->pub.last_write_success = fr_time();
1322
1324 tconn->sent_count++;
1325 treq->sent = true;
1326
1327 /*
1328 * Enforces max_uses
1329 */
1330 if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1331 DEBUG3("Trunk hit max uses %" PRIu64 " at %d", trunk->conf.max_uses, __LINE__);
1333 }
1334 }
1335
1336 /*
1337 * We just sent a request, we probably need
1338 * to tell the event loop we want to be
1339 * notified if there's data available.
1340 */
1342}
1343
1344/** Transition a request to the reapable state, indicating that it's been sent in its entirety, but no response is expected
1345 *
1346 * @note Largely a replica of trunk_request_enter_sent.
1347 *
1348 * @param[in] treq to trigger a state change for.
1349 */
1351{
1352 trunk_connection_t *tconn = treq->pub.tconn;
1353 trunk_t *trunk = treq->pub.trunk;
1354
1355 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1356
1357 switch (treq->pub.state) {
1360 break;
1361
1364 break;
1365
1366 default:
1368 }
1369
1371 fr_dlist_insert_tail(&tconn->reapable, treq);
1372
1373 if (!treq->sent) {
1374 tconn->sent_count++;
1375 treq->sent = true;
1376
1377 if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1378 DEBUG3("Trunk hit max uses %" PRIu64 " at %d", trunk->conf.max_uses, __LINE__);
1380 }
1381 }
1382
1384}
1385
1386/** Transition a request to the cancel state, placing it in a connection's cancellation list
1387 *
1388 * If a request_cancel_send callback is provided, that callback will
1389 * be called periodically for requests which were cancelled due to
1390 * a signal.
1391 *
1392 * The request_cancel_send callback will dequeue cancelled requests
1393 * and inform a remote server that the result is no longer required.
1394 *
1395 * A request must enter this state before being added to the backlog
1396 * of another connection if it's been sent or partially sent.
1397 *
1398 * @note treq->tconn and treq may be inviable after calling
1399 * if treq->conn and connection_signals_pause is not used.
1400 * This is due to call to trunk_connection_event_update.
1401 *
1402 * @param[in] treq to trigger a state change for.
1403 * @param[in] reason Why the request was cancelled.
1404 * Should be one of:
1405 * - TRUNK_CANCEL_REASON_SIGNAL request cancelled
1406 * because of a signal from the interpreter.
1407 * - TRUNK_CANCEL_REASON_MOVE request cancelled
1408 * because the connection failed and it needs
1409 * to be assigned to a new connection.
1410 * - TRUNK_CANCEL_REASON_REQUEUE request cancelled
1411 * as it needs to be resent on the same connection.
1412 */
1414{
1415 trunk_connection_t *tconn = treq->pub.tconn;
1416 trunk_t *trunk = treq->pub.trunk;
1417
1418 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1419
1420 switch (treq->pub.state) {
1423 break;
1424
1427 break;
1428
1431 break;
1432
1433 default:
1435 }
1436
1438 fr_dlist_insert_tail(&tconn->cancel, treq);
1439 treq->cancel_reason = reason;
1440
1441 DO_REQUEST_CANCEL(treq, reason);
1442
1443 /*
1444 * Our treq is no longer bound to an actual
1445 * request_t *, as we can't guarantee the
1446 * lifetime of the original request_t *.
1447 */
1448 if (treq->cancel_reason == TRUNK_CANCEL_REASON_SIGNAL) treq->pub.request = NULL;
1449
1450 /*
1451 * Register for I/O write events if we need to.
1452 */
1454}
1455
1456/** Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot
1457 *
1458 * The request_demux function is then responsible for signalling
1459 * that the cancel request is complete when the remote server
1460 * acknowledges the cancellation request.
1461 *
1462 * @param[in] treq to trigger a state change for.
1463 */
1465{
1466 trunk_connection_t *tconn = treq->pub.tconn;
1467 trunk_t *trunk = treq->pub.trunk;
1468
1469 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1472
1473 switch (treq->pub.state) {
1474 case TRUNK_REQUEST_STATE_CANCEL: /* The only valid state cancel_sent can be reached from */
1476 break;
1477
1478 default:
1480 }
1481
1483 fr_assert(!tconn->cancel_partial);
1484 tconn->cancel_partial = treq;
1485}
1486
1487/** Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list
1488 *
1489 * The request_demux function is then responsible for signalling
1490 * that the cancel request is complete when the remote server
1491 * acknowledges the cancellation request.
1492 *
1493 * @note treq->tconn and treq may be inviable after calling
1494 * if treq->conn and connection_signals_pause is not used.
1495 * This is due to call to trunk_connection_event_update.
1496 *
1497 * @param[in] treq to trigger a state change for.
1498 */
1500{
1501 trunk_connection_t *tconn = treq->pub.tconn;
1502 trunk_t *trunk = treq->pub.trunk;
1503
1504 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1507
1508 switch (treq->pub.state) {
1511 break;
1512
1515 break;
1516
1517 default:
1519 }
1520
1522 fr_dlist_insert_tail(&tconn->cancel_sent, treq);
1523
1524 /*
1525 * De-register for I/O write events
1526 * and register the read events
1527 * to drain the cancel ACKs.
1528 */
1530}
1531
1532/** Cancellation was acked, the request is complete, free it
1533 *
1534 * The API client will not be informed, as the original request_t *
1535 * will likely have been freed by this point.
1536 *
1537 * @note treq will be inviable after a call to this function.
1538 * treq->tconn may be inviable after calling
1539 * if treq->conn and connection_signals_pause is not used.
1540 * This is due to call to trunk_request_remove_from_conn.
1541 *
1542 * @param[in] treq to mark as complete.
1543 */
1545{
1546 trunk_connection_t *tconn = treq->pub.tconn;
1547 trunk_t *trunk = treq->pub.trunk;
1548
1549 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1550 if (!fr_cond_assert(!treq->pub.request)) return; /* Only a valid state for request_t * which have been cancelled */
1551
1552 switch (treq->pub.state) {
1555 break;
1556
1557 default:
1559 }
1560
1562
1564 trunk_request_free(&treq); /* Free the request */
1565}
1566
1567/** Request completed successfully, inform the API client and free the request
1568 *
1569 * @note treq will be inviable after a call to this function.
1570 * treq->tconn may also be inviable due to call to
1571 * trunk_request_remove_from_conn.
1572 *
1573 * @param[in] treq to mark as complete.
1574 */
1576{
1577 trunk_connection_t *tconn = treq->pub.tconn;
1578 trunk_t *trunk = treq->pub.trunk;
1579
1580 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1581
1582 switch (treq->pub.state) {
1587 break;
1588
1589 default:
1591 }
1592
1594 DO_REQUEST_COMPLETE(treq);
1595 trunk_request_free(&treq); /* Free the request */
1596}
1597
1598/** Request failed, inform the API client and free the request
1599 *
1600 * @note treq will be inviable after a call to this function.
1601 * treq->tconn may also be inviable due to call to
1602 * trunk_request_remove_from_conn.
1603 *
1604 * @param[in] treq to mark as failed.
1605 */
1607{
1608 trunk_connection_t *tconn = treq->pub.tconn;
1609 trunk_t *trunk = treq->pub.trunk;
1610 trunk_request_state_t prev = treq->pub.state;
1611
1612 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1613
1614 switch (treq->pub.state) {
1617 break;
1618
1619 default:
1621 break;
1622 }
1623
1625 DO_REQUEST_FAIL(treq, prev);
1626 trunk_request_free(&treq); /* Free the request */
1627}
1628
1629/** Check to see if a trunk request can be enqueued
1630 *
1631 * @param[out] tconn_out Connection the request may be enqueued on.
1632 * @param[in] trunk To enqueue requests on.
1633 * @param[in] request associated with the treq (if any).
1634 * @return
1635 * - TRUNK_ENQUEUE_OK caller should enqueue request on provided tconn.
1636 * - TRUNK_ENQUEUE_IN_BACKLOG Request should be queued in the backlog.
1637 * - TRUNK_ENQUEUE_NO_CAPACITY Unable to enqueue request as we have no spare
1638 * connections or backlog space.
1639 * - TRUNK_ENQUEUE_DST_UNAVAILABLE Can't enqueue because the destination is
1640 * unreachable.
1641 */
1643 request_t *request)
1644{
1645 trunk_connection_t *tconn;
1646 /*
1647 * If we have an active connection then
1648 * return that.
1649 */
1650 tconn = fr_minmax_heap_min_peek(trunk->active);
1651 if (tconn) {
1652 *tconn_out = tconn;
1653 return TRUNK_ENQUEUE_OK;
1654 }
1655
1656 /*
1657 * Unlike the connection pool, we don't need
1658 * to drive any internal processes by feeding
1659 * it requests.
1660 *
1661 * If the last event to occur was a failure
1662 * we refuse to enqueue new requests until
1663 * one or more connections comes online.
1664 */
1665 if (!trunk->conf.backlog_on_failed_conn &&
1666 fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
1667 fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed)) {
1669 RWARN, WARN, "Refusing to enqueue requests - "
1670 "No active connections and last event was a connection failure");
1671
1673 }
1674
1675
1676 /*
1677 * Only enforce if we're limiting maximum
1678 * number of connections, and maximum
1679 * number of requests per connection.
1680 *
1681 * The alloc function also checks this
1682 * which is why this is only done for
1683 * debug builds.
1684 */
1685 if (trunk->conf.max_req_per_conn && trunk->conf.max) {
1686 uint64_t limit;
1687
1688 limit = trunk->conf.max * (uint64_t)trunk->conf.max_req_per_conn;
1689 if (limit > 0) {
1690 uint64_t total_reqs;
1691
1692 total_reqs = trunk_request_count_by_state(trunk, TRUNK_CONN_ALL,
1694 if (total_reqs >= (limit + trunk->conf.max_backlog)) {
1696 RWARN, WARN, "Refusing to alloc requests - "
1697 "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
1698 "plus %u backlog requests reached",
1699 limit, trunk->conf.max, trunk->conf.max_req_per_conn,
1700 trunk->conf.max_backlog);
1702 }
1703 }
1704 }
1705
1707}
1708
1709/** Enqueue a request which has never been assigned to a connection or was previously cancelled
1710 *
1711 * @param[in] treq to re enqueue. Must have been removed
1712 * from its existing connection with
1713 * #trunk_connection_requests_dequeue.
1714 * @return
1715 * - TRUNK_ENQUEUE_OK Request was re-enqueued.
1716 * - TRUNK_ENQUEUE_NO_CAPACITY Request enqueueing failed because we're at capacity.
1717 * - TRUNK_ENQUEUE_DST_UNAVAILABLE Enqueuing failed for some reason.
1718 * Usually because the connection to the resource is down.
1719 */
1721{
1722 trunk_t *trunk = treq->pub.trunk;
1723 trunk_connection_t *tconn = NULL;
1724 trunk_enqueue_t ret;
1725
1726 /*
1727 * Must *NOT* still be assigned to another connection
1728 */
1729 fr_assert(!treq->pub.tconn);
1730
1731 ret = trunk_request_check_enqueue(&tconn, trunk, treq->pub.request);
1732 switch (ret) {
1733 case TRUNK_ENQUEUE_OK:
1734 if (trunk->conf.always_writable) {
1736 trunk_request_enter_pending(treq, tconn, false);
1739 } else {
1740 trunk_request_enter_pending(treq, tconn, false);
1741 }
1742 break;
1743
1745 /*
1746 * No more connections and request
1747 * is already in the backlog.
1748 *
1749 * Signal our caller it should stop
1750 * trying to drain the backlog.
1751 */
1753 trunk_request_enter_backlog(treq, false);
1754 break;
1755
1756 default:
1757 break;
1758 }
1759
1760 return ret;
1761}
1762
1763/** Shift requests in the specified states onto new connections
1764 *
1765 * This function will blindly dequeue any requests in the specified state and get
1766 * them back to the unassigned state, cancelling any sent or partially sent requests.
1767 *
1768 * This function does not check that dequeuing a request in a particular state is a
1769 * sane or sensible thing to do, that's up to the caller!
1770 *
1771 * @param[out] out A list to insert the newly dequeued and unassigned
1772 * requests into.
1773 * @param[in] tconn to dequeue requests from.
1774 * @param[in] states Dequeue request in these states.
1775 * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
1776 */
1778 int states, uint64_t max)
1779{
1780 trunk_request_t *treq;
1781 uint64_t count = 0;
1782
1783 if (max == 0) max = UINT64_MAX;
1784
1785#define OVER_MAX_CHECK if (++count > max) return (count - 1)
1786
1787#define DEQUEUE_ALL(_src_list, _state) do { \
1788 while ((treq = fr_dlist_head(_src_list))) { \
1789 OVER_MAX_CHECK; \
1790 fr_assert(treq->pub.state == (_state)); \
1791 trunk_request_enter_unassigned(treq); \
1792 fr_dlist_insert_tail(out, treq); \
1793 } } while (0)
1794
1795 /*
1796 * Don't need to do anything with
1797 * cancellation requests.
1798 */
1799 if (states & TRUNK_REQUEST_STATE_CANCEL) DEQUEUE_ALL(&tconn->cancel,
1801
1802 /*
1803 * ...same with cancel inform
1804 */
1807
1808 /*
1809 * ....same with cancel partial
1810 */
1813 treq = tconn->cancel_partial;
1814 if (treq) {
1818 }
1819 }
1820
1821 /*
1822 * ...and pending.
1823 */
1824 if (states & TRUNK_REQUEST_STATE_PENDING) {
1825 while ((treq = fr_heap_peek(tconn->pending))) {
1830 }
1831 }
1832
1833 /*
1834 * Cancel partially sent requests
1835 */
1836 if (states & TRUNK_REQUEST_STATE_PARTIAL) {
1838 treq = tconn->partial;
1839 if (treq) {
1841
1842 /*
1843 * Don't allow the connection to change state whilst
1844 * we're draining requests from it.
1845 */
1851 }
1852 }
1853
1854 /*
1855 * Cancel sent requests
1856 */
1857 if (states & TRUNK_REQUEST_STATE_SENT) {
1858 /*
1859 * Don't allow the connection to change state whilst
1860 * we're draining requests from it.
1861 */
1863 while ((treq = fr_dlist_head(&tconn->sent))) {
1866
1870 }
1872 }
1873
1874 return count;
1875}
1876
1877/** Remove requests in specified states from a connection, attempting to distribute them to new connections
1878 *
1879 * @param[in] tconn To remove requests from.
1880 * @param[in] states One or more states or'd together.
1881 * @param[in] max The maximum number of requests to dequeue.
1882 * 0 for unlimited.
1883 * @param[in] fail_bound If true causes any requests bound to the connection to fail.
1884 * If false bound requests will not be moved.
1885 *
1886 * @return the number of requests re-queued.
1887 */
1888static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
1889{
1890 trunk_t *trunk = tconn->pub.trunk;
1891 fr_dlist_head_t to_process;
1892 trunk_request_t *treq = NULL;
1893 uint64_t moved = 0;
1894
1895 if (max == 0) max = UINT64_MAX;
1896
1897 fr_dlist_talloc_init(&to_process, trunk_request_t, entry);
1898
1899 /*
1900 * Prevent the connection changing state whilst we're
1901 * working with it.
1902 *
1903 * There's a user callback that can be called by
1904 * trunk_request_enqueue_existing which can reconnect
1905 * the connection.
1906 */
1908
1909 /*
1910 * Remove non-cancelled requests from the connection
1911 */
1912 moved += trunk_connection_requests_dequeue(&to_process, tconn, states & ~TRUNK_REQUEST_STATE_CANCEL_ALL, max);
1913
1914 /*
1915 * Prevent requests being requeued on the same trunk
1916 * connection, which would break rebalancing.
1917 *
1918 * This is a bit of a hack, but nothing should test
1919 * for connection/list consistency in this code,
1920 * and if something is added later, it'll be flagged
1921 * by the tests.
1922 */
1923 if (tconn->pub.state == TRUNK_CONN_ACTIVE) {
1924 int ret;
1925
1926 ret = fr_minmax_heap_extract(trunk->active, tconn);
1927 if (!fr_cond_assert_msg(ret == 0,
1928 "Failed extracting conn from active heap: %s", fr_strerror())) goto done;
1929
1930 }
1931
1932 /*
1933 * Loop over all the requests we gathered and
1934 * redistribute them to new connections.
1935 */
1936 while ((treq = fr_dlist_next(&to_process, treq))) {
1937 trunk_request_t *prev;
1938
1939 prev = fr_dlist_remove(&to_process, treq);
1940
1941 /*
1942 * Attempts to re-queue a request
1943 * that's bound to a connection
1944 * results in a failure.
1945 */
1946 if (treq->bound_to_conn) {
1947 if (fail_bound || !IS_SERVICEABLE(tconn)) {
1949 } else {
1950 trunk_request_enter_pending(treq, tconn, false);
1951 }
1952 goto next;
1953 }
1954
1955 switch (trunk_request_enqueue_existing(treq)) {
1956 case TRUNK_ENQUEUE_OK:
1957 break;
1958
1959 /*
1960 * A connection failed, and
1961 * there's no other connections
1962 * available to deal with the
1963 * load, it's been placed back
1964 * in the backlog.
1965 */
1967 break;
1968
1969 /*
1970 * If we fail to re-enqueue then
1971 * there's nothing to do except
1972 * fail the request.
1973 */
1976 case TRUNK_ENQUEUE_FAIL:
1978 break;
1979 }
1980 next:
1981 treq = prev;
1982 }
1983
1984 /*
1985 * Add the connection back into the active list
1986 */
1987 if (tconn->pub.state == TRUNK_CONN_ACTIVE) {
1988 int ret;
1989
1990 ret = fr_minmax_heap_insert(trunk->active, tconn);
1991 if (!fr_cond_assert_msg(ret == 0,
1992 "Failed re-inserting conn into active heap: %s", fr_strerror())) goto done;
1993 }
1994 if (moved >= max) goto done;
1995
1996 /*
1997 * Deal with the cancelled requests specially we can't
1998 * queue them up again as they were only valid on that
1999 * specific connection.
2000 *
2001 * We just need to run them to completion which, as
2002 * they should already be in the unassigned state,
2003 * just means freeing them.
2004 */
2005 moved += trunk_connection_requests_dequeue(&to_process, tconn,
2006 states & TRUNK_REQUEST_STATE_CANCEL_ALL, max - moved);
2007 while ((treq = fr_dlist_next(&to_process, treq))) {
2008 trunk_request_t *prev;
2009
2010 prev = fr_dlist_remove(&to_process, treq);
2011 trunk_request_free(&treq);
2012 treq = prev;
2013 }
2014
2015done:
2016
2017 /*
2018 * Always re-calculate the request/connection
2019 * ratio at the end.
2020 *
2021 * This avoids having the state transition
2022 * functions do it.
2023 *
2024 * The ratio would be wrong when they calculated
2025 * it anyway, because a bunch of requests are
2026 * dequeued from the connection and temporarily
2027 * cease to exist from the perspective of the
2028 * trunk_requests_per_connection code.
2029 */
2030 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
2031
2033 return moved;
2034}
2035
2036/** Move requests off of a connection and requeue elsewhere
2037 *
2038 * @note We don't re-queue on draining or draining to free, as requests should have already been
2039 * moved off of the connection. It's also dangerous as the trunk management code main
2040 * clean up a connection in this state when it's run on re-queue, and then the caller
2041 * may try and access a now freed connection.
2042 *
2043 * @param[in] tconn to move requests off of.
2044 * @param[in] states Only move requests in this state.
2045 * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
2046 * @param[in] fail_bound If true causes any requests bound to the connection to fail.
2047 * If false bound requests will not be moved.
2048 * @return The number of requests requeued.
2049 */
2050uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
2051{
2052 switch (tconn->pub.state) {
2053 case TRUNK_CONN_ACTIVE:
2054 case TRUNK_CONN_FULL:
2056 return trunk_connection_requests_requeue_priv(tconn, states, max, fail_bound);
2057
2058 default:
2059 return 0;
2060 }
2061}
2062
2063/** Signal a partial write
2064 *
2065 * Where there's high load, and the outbound write buffer is full
2066 *
2067 * @param[in] treq to signal state change for.
2068 */
2070{
2071 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2072
2074 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2075
2076 switch (treq->pub.state) {
2079 break;
2080
2081 default:
2082 return;
2083 }
2084}
2085
2086/** Signal that the request was written to a connection successfully
2087 *
2088 * @param[in] treq to signal state change for.
2089 */
2091{
2092 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2093
2095 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2096
2097 switch (treq->pub.state) {
2101 break;
2102
2103 default:
2104 return;
2105 }
2106}
2107
2108/** Signal that the request was written to a connection successfully, but no response is expected
2109 *
2110 * @param[in] treq to signal state change for.
2111 */
2113{
2114 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2115
2117 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2118
2119 switch (treq->pub.state) {
2123 break;
2124
2125 default:
2126 return;
2127 }
2128}
2129
2130/** Signal that a trunk request is complete
2131 *
2132 * The API client will be informed that the request is now complete.
2133 */
2135{
2136 trunk_t *trunk = treq->pub.trunk;
2137
2138 if (!fr_cond_assert_msg(trunk, "treq not associated with trunk")) return;
2139
2140 /*
2141 * We assume that if the request is being signalled
2142 * as complete from the demux function, that it was
2143 * a successful read.
2144 *
2145 * If this assumption turns out to be incorrect
2146 * then we need to add an argument to signal_complete
2147 * to indicate if this is a successful read.
2148 */
2149 if (IN_REQUEST_DEMUX(trunk)) {
2150 trunk_connection_t *tconn = treq->pub.tconn;
2151
2152 trunk->pub.last_read_success = fr_time();
2154 }
2155
2156 switch (treq->pub.state) {
2158 case TRUNK_REQUEST_STATE_PENDING: /* Got immediate response, i.e. cached */
2161 break;
2162
2163 default:
2164 return;
2165 }
2166}
2167
2168/** Signal that a trunk request failed
2169 *
2170 * The API client will be informed that the request has failed.
2171 */
2173{
2174 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2175
2177}
2178
2179/** Cancel a trunk request
2180 *
2181 * treq can be in any state, but requests to cancel if the treq is not in
2182 * the TRUNK_REQUEST_STATE_PARTIAL or TRUNK_REQUEST_STATE_SENT state will be ignored.
2183 *
2184 * The complete or failed callbacks will not be called here, as it's assumed the request_t *
2185 * is now inviable as it's being cancelled.
2186 *
2187 * The free function however, is called, and that should be used to perform necessary
2188 * cleanup.
2189 *
2190 * @param[in] treq to signal state change for.
2191 */
2193{
2194 trunk_t *trunk;
2195
2196 /*
2197 * Ensure treq hasn't been freed
2198 */
2199 (void)talloc_get_type_abort(treq, trunk_request_t);
2200
2201 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2202
2204 "%s cannot be called within a handler", __FUNCTION__)) return;
2205
2206 trunk = treq->pub.trunk;
2207
2208 switch (treq->pub.state) {
2209 /*
2210 * We don't call the complete or failed callbacks
2211 * as the request and rctx are no longer viable.
2212 */
2215 {
2216 trunk_connection_t *tconn = treq->pub.tconn;
2217
2218 /*
2219 * Don't allow connection state changes
2220 */
2224 "Bad state %s after cancellation",
2225 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"))) {
2227 return;
2228 }
2229 /*
2230 * No cancel muxer. We're done.
2231 *
2232 * If we do have a cancel mux function,
2233 * the next time this connection becomes
2234 * writable, we'll call the cancel mux
2235 * function.
2236 *
2237 * We don't run the complete or failed
2238 * callbacks here as the request is
2239 * being cancelled.
2240 */
2241 if (!trunk->funcs.request_cancel_mux) {
2243 trunk_request_free(&treq);
2244 }
2246 }
2247 break;
2248
2249 /*
2250 * We're already in the process of cancelling a
2251 * request, so ignore duplicate signals.
2252 */
2257 break;
2258
2259 /*
2260 * For any other state, we just release the request
2261 * from its current connection and free it.
2262 */
2263 default:
2265 trunk_request_free(&treq);
2266 break;
2267 }
2268}
2269
2270/** Signal a partial cancel write
2271 *
2272 * Where there's high load, and the outbound write buffer is full
2273 *
2274 * @param[in] treq to signal state change for.
2275 */
2277{
2278 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2279
2281 "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2282
2283 switch (treq->pub.state) {
2286 break;
2287
2288 default:
2289 return;
2290 }
2291}
2292
2293/** Signal that a remote server has been notified of the cancellation
2294 *
2295 * Called from request_cancel_mux to indicate that the datastore has been informed
2296 * that the response is no longer needed.
2297 *
2298 * @param[in] treq to signal state change for.
2299 */
2301{
2302 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2303
2305 "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2306
2307 switch (treq->pub.state) {
2311 break;
2312
2313 default:
2314 break;
2315 }
2316}
2317
2318/** Signal that a remote server acked our cancellation
2319 *
2320 * Called from request_demux to indicate that it got an ack for the cancellation.
2321 *
2322 * @param[in] treq to signal state change for.
2323 */
2325{
2326 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2327
2329 "%s can only be called from within request_demux or request_cancel_mux handlers",
2330 __FUNCTION__)) return;
2331
2332 switch (treq->pub.state) {
2334 /*
2335 * This is allowed, as we may not need to wait
2336 * for the database to ACK our cancellation
2337 * request.
2338 *
2339 * Note: TRUNK_REQUEST_STATE_CANCEL_PARTIAL
2340 * is not allowed here, as that'd mean we'd half
2341 * written the cancellation request out to the
2342 * socket, and then decided to abandon it.
2343 *
2344 * That'd leave the socket in an unusable state.
2345 */
2348 break;
2349
2350 default:
2351 break;
2352 }
2353}
2354
2355/** If the trunk request is freed then update the target requests
2356 *
2357 * gperftools showed calling the request free function directly was slightly faster
2358 * than using talloc_free.
2359 *
2360 * @param[in] treq_to_free request.
2361 */
2363{
2364 trunk_request_t *treq = *treq_to_free;
2365 trunk_t *trunk;
2366
2367 if (unlikely(!treq)) return;
2368
2369 trunk = treq->pub.trunk;
2370
2371 /*
2372 * The only valid states a trunk request can be
2373 * freed from.
2374 */
2375 switch (treq->pub.state) {
2381 break;
2382
2383 default:
2384 if (!fr_cond_assert(0)) return;
2385 }
2386
2387 /*
2388 * Zero out the pointer to prevent double frees
2389 */
2390 *treq_to_free = NULL;
2391
2392 /*
2393 * Call the API client callback to free
2394 * any associated memory.
2395 */
2396 DO_REQUEST_FREE(treq);
2397
2398 /*
2399 * Update the last above/below target stats
2400 * We only do this when we alloc or free
2401 * connections, or on connection
2402 * state changes.
2403 */
2404 trunk_requests_per_connection(NULL, NULL, treq->pub.trunk, fr_time(), false);
2405
2406 /*
2407 * This tracks the total number of requests
2408 * allocated and not freed or returned to
2409 * the free list.
2410 */
2411 if (fr_cond_assert(trunk->pub.req_alloc > 0)) trunk->pub.req_alloc--;
2412
2413 /*
2414 * No cleanup delay, means cleanup immediately
2415 */
2418
2419#ifndef NDEBUG
2420 /*
2421 * Ensure anything parented off the treq
2422 * is freed. We do this to trigger
2423 * the destructors for the log entries.
2424 */
2425 talloc_free_children(treq);
2426
2427 /*
2428 * State log should now be empty as entries
2429 * remove themselves from the dlist
2430 * on free.
2431 */
2433 "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2434#endif
2435
2436 talloc_free(treq);
2437 return;
2438 }
2439
2440 /*
2441 * Ensure anything parented off the treq
2442 * is freed.
2443 */
2444 talloc_free_children(treq);
2445
2446#ifndef NDEBUG
2447 /*
2448 * State log should now be empty as entries
2449 * remove themselves from the dlist
2450 * on free.
2451 */
2453 "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2454#endif
2455
2456 /*
2457 *
2458 * Return the trunk request back to the init state.
2459 */
2460 *treq = (trunk_request_t){
2461 .pub = {
2463 .trunk = treq->pub.trunk,
2464 },
2465 .cancel_reason = TRUNK_CANCEL_REASON_NONE,
2466 .last_freed = fr_time(),
2467#ifndef NDEBUG
2468 .log = treq->log /* Keep the list head, to save reinitialisation */
2469#endif
2470 };
2471
2472
2473 /*
2474 * Insert at the head, so that we can free
2475 * requests that have been unused for N
2476 * seconds from the tail.
2477 */
2478 trunk_list_free_requests_add(trunk, treq);
2479
2480}
2481
2482/** Actually free the trunk request
2483 *
2484 */
2486{
2487 trunk_t *trunk = treq->pub.trunk;
2488
2489 switch (treq->pub.state) {
2492 break;
2493
2494 default:
2495 fr_assert(0);
2496 break;
2497 }
2498
2499 trunk_list_free_requests_remove(trunk, treq);
2500
2501 return 0;
2502}
2503
2504/** (Pre-)Allocate a new trunk request
2505 *
2506 * If trunk->conf.req_pool_headers or trunk->conf.req_pool_size are not zero then the
2507 * request will be a talloc pool, which can be used to hold the preq.
2508 *
2509 * @note Do not use MEM to check the result of this allocated as it may fail for
2510 * non-fatal reasons.
2511 *
2512 * @param[in] trunk to add request to.
2513 * @param[in] request to wrap in a trunk request (treq).
2514 * @return
2515 * - A newly allocated request.
2516 * - NULL if too many requests are allocated.
2517 */
2519{
2520 trunk_request_t *treq;
2521
2522 /*
2523 * The number of treqs currently allocated
2524 * exceeds the maximum number allowed.
2525 */
2526 if (trunk->conf.max_req_per_conn && trunk->conf.max) {
2527 uint64_t limit;
2528
2529 limit = (uint64_t) trunk->conf.max_req_per_conn * trunk->conf.max;
2530 if (trunk->pub.req_alloc >= (limit + trunk->conf.max_backlog)) {
2532 RWARN, WARN, "Refusing to alloc requests - "
2533 "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
2534 "plus %u backlog requests reached",
2535 limit, trunk->conf.max, trunk->conf.max_req_per_conn,
2536 trunk->conf.max_backlog);
2537 return NULL;
2538 }
2539 }
2540
2541 /*
2542 * Re-use a recently freed request, which might have some
2543 * better cache locality than getting a request from the tail.
2544 *
2545 * If we can't do that, just allocate a new one.
2546 */
2547 treq = trunk_list_free_requests_pop(trunk);
2548 if (treq) {
2550 fr_assert(treq->pub.trunk == trunk);
2551 fr_assert(treq->pub.tconn == NULL);
2554 trunk->pub.req_alloc_reused++;
2555 } else {
2557 trunk->conf.req_pool_headers, trunk->conf.req_pool_size));
2558 talloc_set_destructor(treq, _trunk_request_free);
2559
2560 *treq = (trunk_request_t){
2561 .pub = {
2563 .trunk = trunk
2564 },
2565 .cancel_reason = TRUNK_CANCEL_REASON_NONE
2566 };
2567 trunk->pub.req_alloc_new++;
2568#ifndef NDEBUG
2570#endif
2571 }
2572
2573 trunk->pub.req_alloc++;
2575 /* heap_id - initialised when treq inserted into pending */
2576 /* list - empty */
2577 /* preq - populated later */
2578 /* rctx - populated later */
2579 treq->pub.request = request;
2580
2581 return treq;
2582}
2583
2584/** Enqueue a request that needs data written to the trunk
2585 *
2586 * When a request_t * needs to make an asynchronous request to an external datastore
2587 * it should call this function, specifying a preq (protocol request) containing
2588 * the data necessary to request information from the external datastore, and an
2589 * rctx (resume ctx) used to hold the decoded response and/or any error codes.
2590 *
2591 * After a treq is successfully enqueued it will either be assigned immediately
2592 * to the pending queue of a connection, or if no connections are available,
2593 * (depending on the trunk configuration) the treq will be placed in the trunk's
2594 * global backlog.
2595 *
2596 * After receiving a positive return code from this function the caller should
2597 * immediately yield, to allow the various timers and I/O handlers that drive tconn
2598 * (trunk connection) and treq state changes to be called.
2599 *
2600 * When a tconn becomes writable (or the trunk is configured to be always writable)
2601 * the #trunk_request_mux_t callback will be called to dequeue, encode and
2602 * send any pending requests for that tconn. The #trunk_request_mux_t callback
2603 * is also responsible for tracking the outbound requests to allow the
2604 * #trunk_request_demux_t callback to match inbound responses with the original
2605 * treq. Once the #trunk_request_mux_t callback is done processing the treq
2606 * it signals what state the treq should enter next using one of the
2607 * trunk_request_signal_* functions.
2608 *
2609 * When a tconn becomes readable the user specified #trunk_request_demux_t
2610 * callback is called to process any responses, match them with the original treq.
2611 * and signal what state they should enter next using one of the
2612 * trunk_request_signal_* functions.
2613 *
2614 * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2615 * is NULL, a new treq will be allocated.
2616 * Otherwise treq should point to memory allocated
2617 * with trunk_request_alloc.
2618 * @param[in] trunk to enqueue request on.
2619 * @param[in] request to enqueue.
2620 * @param[in] preq Protocol request to write out. Will be freed when
2621 * treq is freed. Should ideally be parented by the
2622 * treq if possible.
2623 * Use #trunk_request_alloc for pre-allocation of
2624 * the treq.
2625 * @param[in] rctx The resume context to write any result to.
2626 * @return
2627 * - TRUNK_ENQUEUE_OK.
2628 * - TRUNK_ENQUEUE_IN_BACKLOG.
2629 * - TRUNK_ENQUEUE_NO_CAPACITY.
2630 * - TRUNK_ENQUEUE_DST_UNAVAILABLE
2631 * - TRUNK_ENQUEUE_FAIL
2632 */
2634 request_t *request, void *preq, void *rctx)
2635{
2636 trunk_connection_t *tconn = NULL;
2637 trunk_request_t *treq;
2638 trunk_enqueue_t ret;
2639
2640 if (!fr_cond_assert_msg(!IN_HANDLER(trunk),
2641 "%s cannot be called within a handler", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2642
2643 if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == TRUNK_REQUEST_STATE_INIT),
2644 "%s requests must be in \"init\" state", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2645
2646 /*
2647 * If delay_start was set, we may need
2648 * to insert the timer for the connection manager.
2649 */
2650 if (unlikely(!trunk->started)) {
2651 if (trunk_start(trunk) < 0) return TRUNK_ENQUEUE_FAIL;
2652 }
2653
2654 ret = trunk_request_check_enqueue(&tconn, trunk, request);
2655 switch (ret) {
2656 case TRUNK_ENQUEUE_OK:
2657 if (*treq_out) {
2658 treq = *treq_out;
2659 } else {
2660 *treq_out = treq = trunk_request_alloc(trunk, request);
2661 if (!treq) return TRUNK_ENQUEUE_NO_CAPACITY;
2662 }
2663 treq->pub.preq = preq;
2664 treq->pub.rctx = rctx;
2665 if (trunk->conf.always_writable) {
2667 trunk_request_enter_pending(treq, tconn, true);
2670 } else {
2671 trunk_request_enter_pending(treq, tconn, true);
2672 }
2673 break;
2674
2676 if (*treq_out) {
2677 treq = *treq_out;
2678 } else {
2679 *treq_out = treq = trunk_request_alloc(trunk, request);
2680 if (!treq) return TRUNK_ENQUEUE_NO_CAPACITY;
2681 }
2682 treq->pub.preq = preq;
2683 treq->pub.rctx = rctx;
2684 trunk_request_enter_backlog(treq, true);
2685 break;
2686
2687 default:
2688 /*
2689 * If a trunk request was provided
2690 * populate the preq and rctx fields
2691 * so that if it's freed with
2692 * trunk_request_free, the free
2693 * function works as intended.
2694 */
2695 if (*treq_out) {
2696 treq = *treq_out;
2697 treq->pub.preq = preq;
2698 treq->pub.rctx = rctx;
2699 }
2700 return ret;
2701 }
2702
2703 return ret;
2704}
2705
2706/** Re-enqueue a request on the same connection
2707 *
2708 * If the treq has been sent, we assume that we're being signalled to requeue
2709 * because something outside of the trunk API has determined that a retransmission
2710 * is required. The easiest way to perform that retransmission is to clean up
2711 * any tracking information for the request, and the requeue it for transmission.
2712 *
2713 * IF re-queueing fails, the request will enter the fail state. It should not be
2714 * accessed if this occurs.
2715 *
2716 * @param[in] treq to requeue (retransmit).
2717 * @return
2718 * - TRUNK_ENQUEUE_OK.
2719 * - TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2720 * - TRUNK_ENQUEUE_FAIL - Request isn't in a valid state to be reassigned.
2721 */
2723{
2724 trunk_connection_t *tconn = treq->pub.tconn; /* Existing conn */
2725
2726 if (!tconn) return TRUNK_ENQUEUE_FAIL;
2727
2728 if (!IS_PROCESSING(tconn)) {
2731 }
2732
2733 switch (treq->pub.state) {
2739 trunk_request_enter_pending(treq, tconn, false);
2740 if (treq->pub.trunk->conf.always_writable) {
2742 }
2744 break;
2745
2746 case TRUNK_REQUEST_STATE_BACKLOG: /* Do nothing.... */
2747 case TRUNK_REQUEST_STATE_PENDING: /* Do nothing.... */
2748 break;
2749
2750 default:
2752 return TRUNK_ENQUEUE_FAIL;
2753 }
2754
2755 return TRUNK_ENQUEUE_OK;
2756}
2757
2758/** Enqueue additional requests on a specific connection
2759 *
2760 * This may be used to create a series of requests on a single connection, or to generate
2761 * in-band status checks.
2762 *
2763 * @note If conf->always_writable, then the muxer will be called immediately. The caller
2764 * must be able to handle multiple calls to its muxer gracefully.
2765 *
2766 * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2767 * is NULL, a new treq will be allocated.
2768 * Otherwise treq should point to memory allocated
2769 * with trunk_request_alloc.
2770 * @param[in] tconn to enqueue request on.
2771 * @param[in] request to enqueue.
2772 * @param[in] preq Protocol request to write out. Will be freed when
2773 * treq is freed. Should ideally be parented by the
2774 * treq if possible.
2775 * Use #trunk_request_alloc for pre-allocation of
2776 * the treq.
2777 * @param[in] rctx The resume context to write any result to.
2778 * @param[in] ignore_limits Ignore max_req_per_conn. Useful to force status
2779 * checks through even if the connection is at capacity.
2780 * Will also allow enqueuing on "inactive", "draining",
2781 * "draining-to-free" connections.
2782 * @return
2783 * - TRUNK_ENQUEUE_OK.
2784 * - TRUNK_ENQUEUE_NO_CAPACITY - At max_req_per_conn_limit
2785 * - TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2786 */
2788 request_t *request, void *preq, void *rctx,
2789 bool ignore_limits)
2790{
2791 trunk_request_t *treq;
2792 trunk_t *trunk = tconn->pub.trunk;
2793
2794 if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == TRUNK_REQUEST_STATE_INIT),
2795 "%s requests must be in \"init\" state", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2796
2798
2799 /*
2800 * Limits check
2801 */
2802 if (!ignore_limits) {
2803 if (trunk->conf.max_req_per_conn &&
2806
2808 }
2809
2810 if (*treq_out) {
2811 treq = *treq_out;
2812 } else {
2813 *treq_out = treq = trunk_request_alloc(trunk, request);
2814 if (!treq) return TRUNK_ENQUEUE_NO_CAPACITY;
2815 }
2816
2817 treq->pub.preq = preq;
2818 treq->pub.rctx = rctx;
2819 treq->bound_to_conn = true; /* Don't let the request be transferred */
2820
2821 if (trunk->conf.always_writable) {
2823 trunk_request_enter_pending(treq, tconn, true);
2826 } else {
2827 trunk_request_enter_pending(treq, tconn, true);
2828 }
2829
2830 return TRUNK_ENQUEUE_OK;
2831}
2832
2833#ifndef NDEBUG
2834/** Used for sanity checks to ensure all log entries have been freed
2835 *
2836 */
2838{
2839 fr_dlist_remove(slog->log_head, slog);
2840
2841 return 0;
2842}
2843
2844void trunk_request_state_log_entry_add(char const *function, int line,
2846{
2847 trunk_request_state_log_t *slog = NULL;
2848
2850 slog = fr_dlist_head(&treq->log);
2851 fr_assert_msg(slog, "slog list head NULL but element counter was %u",
2852 fr_dlist_num_elements(&treq->log));
2853 (void)fr_dlist_remove(&treq->log, slog); /* Returns NULL when removing the list head */
2854 memset(slog, 0, sizeof(*slog));
2855 } else {
2856 MEM(slog = talloc_zero(treq, trunk_request_state_log_t));
2857 talloc_set_destructor(slog, _state_log_entry_free);
2858 }
2859
2860 slog->log_head = &treq->log;
2861 slog->from = treq->pub.state;
2862 slog->to = new;
2863 slog->function = function;
2864 slog->line = line;
2865 if (treq->pub.tconn) {
2866 slog->tconn = treq->pub.tconn;
2867 slog->tconn_id = treq->pub.tconn->pub.conn->id;
2868 slog->tconn_state = treq->pub.tconn->pub.state;
2869 }
2870
2871 fr_dlist_insert_tail(&treq->log, slog);
2872
2873}
2874
2875void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line,
2876 trunk_request_t const *treq)
2877{
2878 trunk_request_state_log_t *slog = NULL;
2879
2880 int i;
2881
2882 for (slog = fr_dlist_head(&treq->log), i = 0;
2883 slog;
2884 slog = fr_dlist_next(&treq->log, slog), i++) {
2885 fr_log(log, log_type, file, line, "[%u] %s:%i - in conn %"PRIu64" in state %s - %s -> %s",
2886 i, slog->function, slog->line,
2887 slog->tconn_id,
2889 slog->tconn_state, "<INVALID>") : "none",
2890 fr_table_str_by_value(trunk_request_states, slog->from, "<INVALID>"),
2891 fr_table_str_by_value(trunk_request_states, slog->to, "<INVALID>"));
2892 }
2893}
2894#endif
2895
2896/** Return the count number of connections in the specified states
2897 *
2898 * @param[in] trunk to retrieve counts for.
2899 * @param[in] conn_state One or more #trunk_connection_state_t states or'd together.
2900 * @return The number of connections in the specified states.
2901 */
2903{
2904 uint16_t count = 0;
2905
2906 if (conn_state & TRUNK_CONN_INIT) count += fr_dlist_num_elements(&trunk->init);
2907 if (conn_state & TRUNK_CONN_CONNECTING) count += fr_dlist_num_elements(&trunk->connecting);
2908 if (conn_state & TRUNK_CONN_ACTIVE) count += fr_minmax_heap_num_elements(trunk->active);
2909 if (conn_state & TRUNK_CONN_FULL) count += fr_dlist_num_elements(&trunk->full);
2910 if (conn_state & TRUNK_CONN_INACTIVE) count += fr_dlist_num_elements(&trunk->inactive);
2912 if (conn_state & TRUNK_CONN_CLOSED) count += fr_dlist_num_elements(&trunk->closed);
2913 if (conn_state & TRUNK_CONN_DRAINING) count += fr_dlist_num_elements(&trunk->draining);
2915
2916 return count;
2917}
2918
2919/** Return the count number of requests associated with a trunk connection
2920 *
2921 * @param[in] tconn to return request count for.
2922 * @param[in] req_state One or more request states or'd together.
2923 *
2924 * @return The number of requests in the specified states, associated with a tconn.
2925 */
2927{
2928 uint32_t count = 0;
2929
2931 if (req_state & TRUNK_REQUEST_STATE_PARTIAL) count += tconn->partial ? 1 : 0;
2932 if (req_state & TRUNK_REQUEST_STATE_SENT) count += fr_dlist_num_elements(&tconn->sent);
2934 if (req_state & TRUNK_REQUEST_STATE_CANCEL) count += fr_dlist_num_elements(&tconn->cancel);
2935 if (req_state & TRUNK_REQUEST_STATE_CANCEL_PARTIAL) count += tconn->cancel_partial ? 1 : 0;
2937
2938 return count;
2939}
2940
2941/** Automatically mark a connection as full
2942 *
2943 * @param[in] tconn to potentially mark as full.
2944 */
2946{
2947 trunk_t *trunk = tconn->pub.trunk;
2949
2950 if (tconn->pub.state != TRUNK_CONN_ACTIVE) return;
2951
2952 /*
2953 * Enforces max_req_per_conn
2954 */
2955 if (trunk->conf.max_req_per_conn > 0) {
2958 }
2959}
2960
2961/** Return whether a trunk connection should currently be considered full
2962 *
2963 * @param[in] tconn to check.
2964 * @return
2965 * - true if the connection is full.
2966 * - false if the connection is not full.
2967 */
2969{
2970 trunk_t *trunk = tconn->pub.trunk;
2972
2973 /*
2974 * Enforces max_req_per_conn
2975 */
2977 if ((trunk->conf.max_req_per_conn == 0) || (count < trunk->conf.max_req_per_conn)) return false;
2978
2979 return true;
2980}
2981
2982/** Automatically mark a connection as active or reconnect it
2983 *
2984 * @param[in] tconn to potentially mark as active or reconnect.
2985 */
2987{
2988 if (tconn->pub.state != TRUNK_CONN_FULL) return;
2989
2990 /*
2991 * Enforces max_req_per_conn
2992 */
2994}
2995
2996/** A connection is readable. Call the request_demux function to read pending requests
2997 *
2998 */
3000{
3001 trunk_t *trunk = tconn->pub.trunk;
3002
3003 DO_REQUEST_DEMUX(tconn);
3004}
3005
3006/** A connection is writable. Call the request_mux function to write pending requests
3007 *
3008 */
3010{
3011 trunk_t *trunk = tconn->pub.trunk;
3012
3013 /*
3014 * Call the cancel_sent function (if we have one)
3015 * to inform a backend datastore we no longer
3016 * care about the result
3017 */
3021 DO_REQUEST_CANCEL_MUX(tconn);
3022 }
3026 DO_REQUEST_MUX(tconn);
3027}
3028
3029/** Update the registrations for I/O events we're interested in
3030 *
3031 */
3033{
3034 trunk_t *trunk = tconn->pub.trunk;
3036
3037 switch (tconn->pub.state) {
3038 /*
3039 * We only register I/O events if the trunk connection is
3040 * in one of these states.
3041 *
3042 * For the other states the trunk shouldn't be processing
3043 * requests.
3044 */
3045 case TRUNK_CONN_ACTIVE:
3046 case TRUNK_CONN_FULL:
3051 /*
3052 * If the connection is always writable,
3053 * then we don't care about write events.
3054 */
3055 if (!trunk->conf.always_writable &&
3059 (trunk->funcs.request_cancel_mux ?
3063 }
3064
3067 (trunk->funcs.request_cancel_mux ?
3070 }
3071 break;
3072
3073 default:
3074 break;
3075 }
3076
3077 if (tconn->events != events) {
3078 /*
3079 * There may be a fatal error which results
3080 * in the connection being freed.
3081 *
3082 * Stop that from happening until after
3083 * we're done using it.
3084 */
3087 tconn->events = events;
3089 }
3090}
3091
3092/** Remove a trunk connection from whichever list it's currently in
3093 *
3094 * @param[in] tconn to remove.
3095 */
3097{
3098 trunk_t *trunk = tconn->pub.trunk;
3099
3100 switch (tconn->pub.state) {
3101 case TRUNK_CONN_ACTIVE:
3102 {
3103 int ret;
3104
3105 ret = fr_minmax_heap_extract(trunk->active, tconn);
3106 if (!fr_cond_assert_msg(ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) return;
3107 }
3108 return;
3109
3110 case TRUNK_CONN_INIT:
3111 fr_dlist_remove(&trunk->init, tconn);
3112 break;
3113
3115 fr_dlist_remove(&trunk->connecting, tconn);
3116 return;
3117
3118 case TRUNK_CONN_CLOSED:
3119 fr_dlist_remove(&trunk->closed, tconn);
3120 return;
3121
3122 case TRUNK_CONN_FULL:
3123 fr_dlist_remove(&trunk->full, tconn);
3124 return;
3125
3127 fr_dlist_remove(&trunk->inactive, tconn);
3128 return;
3129
3131 fr_dlist_remove(&trunk->inactive_draining, tconn);
3132 return;
3133
3135 fr_dlist_remove(&trunk->draining, tconn);
3136 return;
3137
3139 fr_dlist_remove(&trunk->draining_to_free, tconn);
3140 return;
3141
3142 case TRUNK_CONN_HALTED:
3143 return;
3144 }
3145}
3146
3147/** Transition a connection to the full state
3148 *
3149 * Called whenever a trunk connection is at the maximum number of requests.
3150 * Removes the connection from the connected heap, and places it in the full list.
3151 */
3153{
3154 trunk_t *trunk = tconn->pub.trunk;
3155
3156 switch (tconn->pub.state) {
3157 case TRUNK_CONN_ACTIVE:
3159 break;
3160
3161 default:
3163 }
3164
3165 fr_dlist_insert_head(&trunk->full, tconn);
3167}
3168
3169/** Transition a connection to the inactive state
3170 *
3171 * Called whenever the API client wants to stop new requests being enqueued
3172 * on a trunk connection.
3173 */
3175{
3176 trunk_t *trunk = tconn->pub.trunk;
3177
3178 switch (tconn->pub.state) {
3179 case TRUNK_CONN_ACTIVE:
3180 case TRUNK_CONN_FULL:
3182 break;
3183
3184 default:
3186 }
3187
3188 fr_dlist_insert_head(&trunk->inactive, tconn);
3190}
3191
3192/** Transition a connection to the inactive-draining state
3193 *
3194 * Called whenever the trunk manager wants to drain an inactive connection
3195 * of its requests.
3196 */
3198{
3199 trunk_t *trunk = tconn->pub.trunk;
3200
3201 switch (tconn->pub.state) {
3205 break;
3206
3207 default:
3209 }
3210
3213
3214 /*
3215 * Immediately re-enqueue all pending
3216 * requests, so the connection is drained
3217 * quicker.
3218 */
3220}
3221
3222/** Transition a connection to the draining state
3223 *
3224 * Removes the connection from the active heap so it won't be assigned any new
3225 * connections.
3226 */
3228{
3229 trunk_t *trunk = tconn->pub.trunk;
3230
3231 switch (tconn->pub.state) {
3232 case TRUNK_CONN_ACTIVE:
3233 case TRUNK_CONN_FULL:
3237 break;
3238
3239 default:
3241 }
3242
3243 fr_dlist_insert_head(&trunk->draining, tconn);
3245
3246 /*
3247 * Immediately re-enqueue all pending
3248 * requests, so the connection is drained
3249 * quicker.
3250 */
3252}
3253
3254/** Transition a connection to the draining-to-reconnect state
3255 *
3256 * Removes the connection from the active heap so it won't be assigned any new
3257 * connections.
3258 */
3260{
3261 trunk_t *trunk = tconn->pub.trunk;
3262
3264
3265 switch (tconn->pub.state) {
3266 case TRUNK_CONN_ACTIVE:
3267 case TRUNK_CONN_FULL:
3272 break;
3273
3274 default:
3276 }
3277
3278 fr_dlist_insert_head(&trunk->draining_to_free, tconn);
3280
3281 /*
3282 * Immediately re-enqueue all pending
3283 * requests, so the connection is drained
3284 * quicker.
3285 */
3287}
3288
3289
3290/** Transition a connection back to the active state
3291 *
3292 * This should only be called on a connection which is in the full state,
3293 * inactive state, draining state or connecting state.
3294 */
3296{
3297 trunk_t *trunk = tconn->pub.trunk;
3298 int ret;
3299
3300 switch (tconn->pub.state) {
3301 case TRUNK_CONN_FULL:
3306 break;
3307
3308 case TRUNK_CONN_INIT:
3312 break;
3313
3314 default:
3316 }
3317
3318 ret = fr_minmax_heap_insert(trunk->active, tconn); /* re-insert into the active heap*/
3319 if (!fr_cond_assert_msg(ret == 0, "Failed inserting connection into active heap: %s", fr_strerror())) {
3321 return;
3322 }
3323
3325
3326 /*
3327 * Reorder the connections
3328 */
3329 CONN_REORDER(tconn);
3330
3331 /*
3332 * Rebalance requests
3333 */
3334 trunk_rebalance(trunk);
3335
3336 /*
3337 * We place requests into the backlog
3338 * because there were no connections
3339 * available to handle them.
3340 *
3341 * If a connection has become active
3342 * chances are those backlogged requests
3343 * can now be enqueued, so try and do
3344 * that now.
3345 *
3346 * If there's requests sitting in the
3347 * backlog indefinitely, it's because
3348 * they were inserted there erroneously
3349 * when there were active connections
3350 * which could have handled them.
3351 */
3352 trunk_backlog_drain(trunk);
3353}
3354
3355/** Connection transitioned to the init state
3356 *
3357 * Reflect the connection state change in the lists we use to track connections.
3358 *
3359 * @note This function is only called from the connection API as a watcher.
3360 *
3361 * @param[in] conn The connection which changes state.
3362 * @param[in] prev The connection is was in.
3363 * @param[in] state The connection is now in.
3364 * @param[in] uctx The trunk_connection_t wrapping the connection.
3365 */
3369 void *uctx)
3370{
3371 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3372 trunk_t *trunk = tconn->pub.trunk;
3373
3374 switch (tconn->pub.state) {
3375 case TRUNK_CONN_HALTED:
3376 break;
3377
3378 case TRUNK_CONN_CLOSED:
3380 break;
3381
3382 default:
3384 }
3385
3386 fr_dlist_insert_head(&trunk->init, tconn);
3388}
3389
3390/** Connection transitioned to the connecting state
3391 *
3392 * Reflect the connection state change in the lists we use to track connections.
3393 *
3394 * @note This function is only called from the connection API as a watcher.
3395 *
3396 * @param[in] conn The connection which changes state.
3397 * @param[in] prev The connection is was in.
3398 * @param[in] state The connection is now in.
3399 * @param[in] uctx The trunk_connection_t wrapping the connection.
3400 */
3404 void *uctx)
3405{
3406 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3407 trunk_t *trunk = tconn->pub.trunk;
3408
3409 switch (tconn->pub.state) {
3410 case TRUNK_CONN_INIT:
3411 case TRUNK_CONN_CLOSED:
3413 break;
3414
3415 default:
3417 }
3418
3419 /*
3420 * If a connection just entered the
3421 * connecting state, it should have
3422 * no requests associated with it.
3423 */
3425
3426 fr_dlist_insert_head(&trunk->connecting, tconn); /* MUST remain a head insertion for reconnect logic */
3428}
3429
3430/** Connection transitioned to the shutdown state
3431 *
3432 * If we're not already in the draining-to-free state, transition there now.
3433 *
3434 * The idea is that if something signalled the connection to shutdown, we need
3435 * to reflect that by dequeuing any pending requests, not accepting new ones,
3436 * and waiting for the existing requests to complete.
3437 *
3438 * @note This function is only called from the connection API as a watcher.
3439 *
3440 * @param[in] conn The connection which changes state.
3441 * @param[in] prev The connection is was in.
3442 * @param[in] state The connection is now in.
3443 * @param[in] uctx The trunk_connection_t wrapping the connection.
3444 */
3448 void *uctx)
3449{
3450 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3451
3452 switch (tconn->pub.state) {
3453 case TRUNK_CONN_DRAINING_TO_FREE: /* Do Nothing */
3454 return;
3455
3456 case TRUNK_CONN_ACTIVE: /* Transition to draining-to-free */
3457 case TRUNK_CONN_FULL:
3461 break;
3462
3463 case TRUNK_CONN_INIT:
3465 case TRUNK_CONN_CLOSED:
3466 case TRUNK_CONN_HALTED:
3468 }
3469
3471}
3472
3473/** Trigger a reconnection of the trunk connection
3474 *
3475 * @param[in] tl timer list the timer was inserted into.
3476 * @param[in] now Current time.
3477 * @param[in] uctx The tconn.
3478 */
3480{
3481 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3482
3484}
3485
3486/** Connection transitioned to the connected state
3487 *
3488 * Reflect the connection state change in the lists we use to track connections.
3489 *
3490 * @note This function is only called from the connection API as a watcher.
3491 *
3492 * @param[in] conn The connection which changes state.
3493 * @param[in] prev The connection is was in.
3494 * @param[in] state The connection is now in.
3495 * @param[in] uctx The trunk_connection_t wrapping the connection.
3496 */
3500 void *uctx)
3501{
3502 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3503 trunk_t *trunk = tconn->pub.trunk;
3504
3505 /*
3506 * If a connection was just connected, it should only
3507 * have a pending list of requests. This state is found
3508 * in the rlm_radius module, which starts a new trunk,
3509 * and then immediately enqueues a request onto it. The
3510 * alternative for rlm_radius is to keep it's own queue
3511 * of pending requests before the trunk is fully
3512 * initialized. And then enqueue them onto the trunk
3513 * when the trunk is connected.
3514 *
3515 * It's instead easier (and makes more sense) to allow
3516 * the trunk to accept packets into its queue. If there
3517 * are no connections within a period of time, then the
3518 * requests will retry, or will time out.
3519 */
3521
3522 /*
3523 * Set here, as the active state can
3524 * be transitioned to from full and
3525 * draining too.
3526 */
3527 trunk->pub.last_connected = fr_time();
3528
3529 /*
3530 * Insert a timer to reconnect the
3531 * connection periodically.
3532 */
3533 if (fr_time_delta_ispos(trunk->conf.lifetime)) {
3534 if (fr_timer_in(tconn, trunk->el->tl, &tconn->lifetime_ev,
3535 trunk->conf.lifetime, false, _trunk_connection_lifetime_expire, tconn) < 0) {
3536 PERROR("Failed inserting connection reconnection timer event, halting connection");
3538 return;
3539 }
3540 }
3541
3543}
3544
3545/** Connection failed after it was connected
3546 *
3547 * Reflect the connection state change in the lists we use to track connections.
3548 *
3549 * @note This function is only called from the connection API as a watcher.
3550 *
3551 * @param[in] conn The connection which changes state.
3552 * @param[in] prev The connection is was in.
3553 * @param[in] state The connection is now in.
3554 * @param[in] uctx The trunk_connection_t wrapping the connection.
3555 */
3559 void *uctx)
3560{
3561 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3562 trunk_t *trunk = tconn->pub.trunk;
3563 bool need_requeue = false;
3564
3565 switch (tconn->pub.state) {
3566 case TRUNK_CONN_ACTIVE:
3567 case TRUNK_CONN_FULL:
3572 need_requeue = true;
3574 break;
3575
3576 case TRUNK_CONN_INIT: /* Initialisation failed */
3580 break;
3581
3582 case TRUNK_CONN_CLOSED:
3583 case TRUNK_CONN_HALTED: /* Can't move backwards? */
3585 }
3586
3587 fr_dlist_insert_head(&trunk->closed, tconn); /* MUST remain a head insertion for reconnect logic */
3589
3590 /*
3591 * Now *AFTER* the connection has been
3592 * removed from the active, pool
3593 * re-enqueue the requests.
3594 */
3595 if (need_requeue) trunk_connection_requests_requeue_priv(tconn, TRUNK_REQUEST_STATE_ALL, 0, true);
3596
3597 /*
3598 * There should be no requests left on this
3599 * connection. They should have all been
3600 * moved off or failed.
3601 */
3603
3604 /*
3605 * Clear statistics and flags
3606 */
3607 tconn->sent_count = 0;
3608
3609 /*
3610 * Remove the reconnect event
3611 */
3613
3614 /*
3615 * Remove the I/O events
3616 */
3618}
3619
3620/** Connection failed
3621 *
3622 * @param[in] conn The connection which changes state.
3623 * @param[in] prev The connection is was in.
3624 * @param[in] state The connection is now in.
3625 * @param[in] uctx The trunk_connection_t wrapping the connection.
3626 */
3628 connection_state_t prev,
3630 void *uctx)
3631{
3632 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3633 trunk_t *trunk = tconn->pub.trunk;
3634
3635 /*
3636 * Need to set this first as it
3637 * determines whether requests are
3638 * re-queued or fail outright.
3639 */
3640 trunk->pub.last_failed = fr_time();
3641
3642 /*
3643 * Failed in the init state, transition the
3644 * connection to closed, else we get an
3645 * INIT -> INIT transition which triggers
3646 * an assert.
3647 */
3648 if (prev == CONNECTION_STATE_INIT) _trunk_connection_on_closed(conn, prev, state, uctx);
3649
3650 /*
3651 * See what the state of the trunk is
3652 * if there are no connections that could
3653 * potentially accept requests in the near
3654 * future, then fail all the requests in the
3655 * trunk backlog.
3656 */
3657 if ((prev == CONNECTION_STATE_CONNECTED) &&
3662}
3663
3664/** Connection transitioned to the halted state
3665 *
3666 * Remove the connection remove all lists, as it's likely about to be freed.
3667 *
3668 * Setting the trunk back to the init state ensures that if the code is ever
3669 * refactored and #connection_signal_reconnect is used after a connection
3670 * is halted, then everything is maintained in a valid state.
3671 *
3672 * @note This function is only called from the connection API as a watcher.
3673 *
3674 * @param[in] conn The connection which changes state.
3675 * @param[in] prev The connection is was in.
3676 * @param[in] state The connection is now in.
3677 * @param[in] uctx The trunk_connection_t wrapping the connection.
3678 */
3682 void *uctx)
3683{
3684 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3685 trunk_t *trunk = tconn->pub.trunk;
3686
3687 switch (tconn->pub.state) {
3688 case TRUNK_CONN_INIT:
3689 case TRUNK_CONN_CLOSED:
3691 break;
3692
3693 default:
3695 }
3696
3697 /*
3698 * It began life in the halted state,
3699 * and will end life in the halted state.
3700 */
3702
3703 /*
3704 * There should be no requests left on this
3705 * connection. They should have all been
3706 * moved off or failed.
3707 */
3709
3710 /*
3711 * And free the connection...
3712 */
3713 if (trunk->in_handler) {
3714 /*
3715 * ...later.
3716 */
3717 fr_dlist_insert_tail(&trunk->to_free, tconn);
3718 return;
3719 }
3720 talloc_free(tconn);
3721}
3722
3723/** Free a connection
3724 *
3725 * Enforces orderly free order of children of the tconn
3726 */
3728{
3730 fr_assert(!fr_dlist_entry_in_list(&tconn->entry)); /* Should not be in a list */
3731
3732 /*
3733 * Loop over all the requests we gathered
3734 * and transition them to the failed state,
3735 * freeing them.
3736 *
3737 * Usually, requests will be re-queued when
3738 * a connection enters the closed state,
3739 * but in this case because the whole trunk
3740 * is being freed, we don't bother, and
3741 * just signal to the API client that the
3742 * requests failed.
3743 */
3744 if (tconn->pub.trunk->freeing) {
3745 fr_dlist_head_t to_fail;
3746 trunk_request_t *treq = NULL;
3747
3748 fr_dlist_talloc_init(&to_fail, trunk_request_t, entry);
3749
3750 /*
3751 * Remove requests from this connection
3752 */
3754 while ((treq = fr_dlist_next(&to_fail, treq))) {
3755 trunk_request_t *prev;
3756
3757 prev = fr_dlist_remove(&to_fail, treq);
3759 treq = prev;
3760 }
3761 }
3762
3763 /*
3764 * Ensure we're not signalled by the connection
3765 * as it processes its backlog of state changes,
3766 * as we are about to be freed.
3767 */
3775
3776 /*
3777 * This may return -1, indicating the free was deferred
3778 * this is fine. It just means the conn will be freed
3779 * after all the handlers have exited.
3780 */
3781 (void)talloc_free(tconn->pub.conn);
3782 tconn->pub.conn = NULL;
3783
3784 return 0;
3785}
3786
3787/** Attempt to spawn a new connection
3788 *
3789 * Calls the API client's alloc() callback to create a new connection_t,
3790 * then inserts the connection into the 'connecting' list.
3791 *
3792 * @param[in] trunk to spawn connection in.
3793 * @param[in] now The current time.
3794 */
3796{
3797 trunk_connection_t *tconn;
3798
3799
3800 /*
3801 * Call the API client's callback to create
3802 * a new connection_t.
3803 */
3804 MEM(tconn = talloc_zero(trunk, trunk_connection_t));
3805 tconn->pub.trunk = trunk;
3806 tconn->pub.state = TRUNK_CONN_HALTED; /* All connections start in the halted state */
3807
3808 /*
3809 * Allocate a new connection_t or fail.
3810 */
3811 DO_CONNECTION_ALLOC(tconn);
3812
3814 fr_dlist_talloc_init(&tconn->sent, trunk_request_t, entry);
3818
3819 /*
3820 * OK, we have the connection, now setup watch
3821 * points so we know when it changes state.
3822 *
3823 * This lets us automatically move the tconn
3824 * between the different lists in the trunk
3825 * with minimum extra code.
3826 */
3828 _trunk_connection_on_init, false, tconn); /* Before init() has been called */
3829
3831 _trunk_connection_on_connecting, false, tconn); /* After init() has been called */
3832
3834 _trunk_connection_on_connected, false, tconn); /* After open() has been called */
3835
3837 _trunk_connection_on_closed, false, tconn); /* Before close() has been called */
3838
3840 _trunk_connection_on_failed, false, tconn); /* Before failed() has been called */
3841
3843 _trunk_connection_on_shutdown, false, tconn); /* After shutdown() has been called */
3844
3846 _trunk_connection_on_halted, false, tconn); /* About to be freed */
3847
3848 talloc_set_destructor(tconn, _trunk_connection_free);
3849
3850 connection_signal_init(tconn->pub.conn); /* annnnd GO! */
3851
3852 trunk->pub.last_open = now;
3853
3854 return 0;
3855}
3856
3857/** Pop a cancellation request off a connection's cancellation queue
3858 *
3859 * The request we return is advanced by the request moving out of the
3860 * cancel state and into the cancel_sent or cancel_complete state.
3861 *
3862 * One of these signalling functions must be called after the request
3863 * has been popped:
3864 *
3865 * - #trunk_request_signal_cancel_sent
3866 * The remote datastore has been informed, but we need to wait for acknowledgement.
3867 * The #trunk_request_demux_t callback must handle the acks calling
3868 * #trunk_request_signal_cancel_complete when an ack is received.
3869 *
3870 * - #trunk_request_signal_cancel_complete
3871 * The request was cancelled and we don't need to wait, clean it up immediately.
3872 *
3873 * @param[out] treq_out to process
3874 * @param[in] tconn Connection to drain cancellation request from.
3875 * @return
3876 * - 1 if no more requests.
3877 * - 0 if a new request was written to treq_out.
3878 * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3879 * memory or requests associated with the connection.
3880 * - -2 if called outside of the cancel muxer.
3881 */
3883{
3884 if (unlikely(tconn->pub.state == TRUNK_CONN_HALTED)) return -1;
3885
3887 "%s can only be called from within request_cancel_mux handler",
3888 __FUNCTION__)) return -2;
3889
3890 *treq_out = tconn->cancel_partial ? tconn->cancel_partial : fr_dlist_head(&tconn->cancel);
3891 if (!*treq_out) return 1;
3892
3893 return 0;
3894}
3895
3896/** Pop a request off a connection's pending queue
3897 *
3898 * The request we return is advanced by the request moving out of the partial or
3899 * pending states, when the mux function signals us.
3900 *
3901 * If the same request is returned again and again, it means the muxer isn't actually
3902 * doing anything with the request we returned, and it's and error in the muxer code.
3903 *
3904 * One of these signalling functions must be used after the request has been popped:
3905 *
3906 * - #trunk_request_signal_complete
3907 * The request was completed. Either we got a synchronous response, or we knew the
3908 * response without contacting an external server (cache).
3909 *
3910 * - #trunk_request_signal_fail
3911 * Failed muxing the request due to a permanent issue, i.e. an invalid request.
3912 *
3913 * - #trunk_request_signal_partial
3914 * Wrote part of a request. This request will be returned on the next call to this
3915 * function so that the request_mux function can finish writing it. Only useful
3916 * for stream type connections. Datagram type connections cannot have partial
3917 * writes.
3918 *
3919 * - #trunk_request_signal_sent Successfully sent a request.
3920 *
3921 * @param[out] treq_out to process
3922 * @param[in] tconn to pop a request from.
3923 * @return
3924 * - 1 if no more requests.
3925 * - 0 if a new request was written to treq_out.
3926 * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3927 * memory or requests associated with the connection.
3928 * - -2 if called outside of the muxer.
3929 */
3931{
3932 if (unlikely(tconn->pub.state == TRUNK_CONN_HALTED)) return -1;
3933
3935 "%s can only be called from within request_mux handler",
3936 __FUNCTION__)) return -2;
3937
3938 *treq_out = tconn->partial ? tconn->partial : fr_heap_peek(tconn->pending);
3939 if (!*treq_out) return 1;
3940
3941 return 0;
3942}
3943
3944/** Signal that a trunk connection is writable
3945 *
3946 * Should be called from the 'write' I/O handler to signal that requests can be enqueued.
3947 *
3948 * @param[in] tconn to signal.
3949 */
3951{
3952 trunk_t *trunk = tconn->pub.trunk;
3953
3954 if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3955 "%s cannot be called within a handler", __FUNCTION__)) return;
3956
3957 DEBUG3("[%" PRIu64 "] Signalled writable", tconn->pub.conn->id);
3958
3960}
3961
3962/** Signal that a trunk connection is readable
3963 *
3964 * Should be called from the 'read' I/O handler to signal that requests should be dequeued.
3965 *
3966 * @param[in] tconn to signal.
3967 */
3969{
3970 trunk_t *trunk = tconn->pub.trunk;
3971
3972 if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3973 "%s cannot be called within a handler", __FUNCTION__)) return;
3974
3975 DEBUG3("[%" PRIu64 "] Signalled readable", tconn->pub.conn->id);
3976
3978}
3979
3980/** Signal a trunk connection cannot accept more requests
3981 *
3982 * @param[in] tconn to signal.
3983 */
3985{
3986 /* Can be called anywhere */
3987
3988 switch (tconn->pub.state) {
3989 case TRUNK_CONN_ACTIVE:
3990 case TRUNK_CONN_FULL:
3992 break;
3993
3996 break;
3997
3998 default:
3999 return;
4000 }
4001}
4002
4003/** Signal a trunk connection is no longer full
4004 *
4005 * @param[in] tconn to signal.
4006 */
4008{
4009 switch (tconn->pub.state) {
4010 case TRUNK_CONN_FULL:
4011 trunk_connection_auto_unfull(tconn); /* Mark as active if it should be active */
4012 break;
4013
4015 /*
4016 * Do the appropriate state transition based on
4017 * how many requests the trunk connection is
4018 * currently servicing.
4019 */
4020 if (trunk_connection_is_full(tconn)) {
4022 break;
4023 }
4025 break;
4026
4027 /*
4028 * Unsetting the active flag just moves
4029 * the connection back to the normal
4030 * draining state.
4031 */
4032 case TRUNK_CONN_INACTIVE_DRAINING: /* Only an external signal can trigger this transition */
4034 break;
4035
4036 default:
4037 return;
4038 }
4039}
4040
4041/** Signal a trunk connection is no longer viable
4042 *
4043 * @param[in] tconn to signal.
4044 * @param[in] reason the connection is being reconnected.
4045 */
4050
4051/** Standard I/O read function
4052 *
4053 * Underlying FD in now readable, so call the trunk to read any pending requests
4054 * from this connection.
4055 *
4056 * @param[in] el The event list signalling.
4057 * @param[in] fd that's now readable.
4058 * @param[in] flags describing the read event.
4059 * @param[in] uctx The trunk connection handle (tconn).
4060 */
4062{
4063 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
4064
4066}
4067
4068/** Standard I/O write function
4069 *
4070 * Underlying FD is now writable, so call the trunk to write any pending requests
4071 * to this connection.
4072 *
4073 * @param[in] el The event list signalling.
4074 * @param[in] fd that's now writable.
4075 * @param[in] flags describing the write event.
4076 * @param[in] uctx The trunk connection handle (tcon).
4077 */
4079{
4080 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
4081
4083}
4084
4085
4086/** Returns true if the trunk connection is in one of the specified states
4087 *
4088 * @param[in] tconn To check state for.
4089 * @param[in] state to check
4090 * @return
4091 * - True if trunk connection is in a particular state.
4092 * - False if trunk connection is not in a particular state.
4093 */
4095{
4096 return (bool)(tconn->pub.state & state);
4097}
4098
4099/** Close connections in a particular connection list if they have no requests associated with them
4100 *
4101 * @param[in] trunk containing connections we want to close.
4102 * @param[in] head of list of connections to examine.
4103 */
4105{
4106 trunk_connection_t *tconn = NULL;
4107
4108 while ((tconn = fr_dlist_next(head, tconn))) {
4109 trunk_connection_t *prev;
4110
4112
4113 prev = fr_dlist_prev(head, tconn);
4114
4115 DEBUG3("Closing %s connection with no requests",
4117 /*
4118 * Close the connection as gracefully
4119 * as possible by signalling it should
4120 * shutdown.
4121 *
4122 * The connection, should, if serviced
4123 * correctly by the underlying library,
4124 * automatically transition to halted after
4125 * all pending reads/writes are
4126 * complete at which point we'll be informed
4127 * and free our tconn wrapper.
4128 */
4130 tconn = prev;
4131 }
4132}
4133
4134/** Rebalance connections across active trunk members when a new connection becomes active
4135 *
4136 * We don't have any visibility into the connection prioritisation algorithm
4137 * it's essentially a black box.
4138 *
4139 * We can however determine when the correct level of requests per connection
4140 * has been reached, by dequeuing and requeing requests up until the point
4141 * where the connection that just had a request dequeued, receives the same
4142 * request back.
4143 *
4144 * @param[in] trunk The trunk to rebalance.
4145 */
4146static void trunk_rebalance(trunk_t *trunk)
4147{
4149
4151
4152 /*
4153 * Only rebalance if the top and bottom of
4154 * the heap are not equal.
4155 */
4156 if (trunk->funcs.connection_prioritise(fr_minmax_heap_max_peek(trunk->active), head) == 0) return;
4157
4158 DEBUG3("Rebalancing requests");
4159
4160 /*
4161 * Keep requeuing requests from the connection
4162 * at the bottom of the heap until the
4163 * connection at the top is shifted from that
4164 * position.
4165 */
4166 while ((fr_minmax_heap_min_peek(trunk->active) == head) &&
4168 TRUNK_REQUEST_STATE_PENDING, 1, false));
4169}
4170
4171/** Implements the algorithm we use to manage requests per connection levels
4172 *
4173 * This is executed periodically using a timer event, and opens/closes
4174 * connections.
4175 *
4176 * The aim is to try and keep the request per connection level in a sweet spot,
4177 * where there's enough outstanding work for the connection/pipelining to work
4178 * efficiently, but not so much so that we encounter increased latency.
4179 *
4180 * In the request enqueue and dequeue functions we record every time the
4181 * average number of requests per connection goes above the target count
4182 * and record every time the average number of requests per connection goes
4183 * below the target count.
4184 *
4185 * This may sound expensive, but in all cases we're just summing counters.
4186 * CPU time required does not increase with additional requests, only with
4187 * large numbers of connections.
4188 *
4189 * If we do encounter scaling issues, we can always maintain the counters
4190 * as aggregates as an optimisation later.
4191 *
4192 * If when the management function runs, the trunk was above the target
4193 * most recently, we:
4194 * - Return if we've been in this state for a shorter period than 'open_delay'.
4195 * - Return if we're at max.
4196 * - Return if opening a new connection will take us below the load target.
4197 * - Return if we last opened a connection within 'open_delay'.
4198 * - Otherwise we attempt to open a new connection.
4199 *
4200 * If the trunk we below the target most recently, we:
4201 * - Return if we've been in this state for a shorter period than 'close_delay'.
4202 * - Return if we're at min.
4203 * - Return if we have no connections.
4204 * - Close a connection if min is 0, and we have no outstanding
4205 * requests. Then return.
4206 * - Return if closing a new connection will take us above the load target.
4207 * - Return if we last closed a connection within 'closed_delay'.
4208 * - Otherwise we move a connection to draining state.
4209 */
4210static void trunk_manage(trunk_t *trunk, fr_time_t now)
4211{
4212 trunk_connection_t *tconn = NULL;
4213 trunk_request_t *treq;
4214 uint32_t average = 0;
4215 uint32_t req_count;
4216 uint16_t conn_count;
4217 trunk_state_t new_state;
4218
4219 DEBUG4("Managing trunk");
4220
4221 /*
4222 * Cleanup requests in our request cache which
4223 * have been reapable for too long.
4224 */
4225 while ((treq = trunk_list_free_requests_peek(trunk)) &&
4227
4228 /*
4229 * If we have idle connections, then close them.
4230 */
4233 fr_time_t idle_cutoff = fr_time_sub(now, trunk->conf.idle_timeout);
4234
4235 for (tconn = fr_minmax_heap_iter_init(trunk->active, &iter);
4236 tconn;
4237 tconn = fr_minmax_heap_iter_next(trunk->active, &iter)) {
4238 /*
4239 * The connection has outstanding requests without replies, don't do anything.
4240 */
4241 if (fr_heap_num_elements(tconn->pending) > 0) continue;
4242
4243 /*
4244 * The connection was last active after the idle cutoff time, don't do anything.
4245 */
4246 if (fr_time_gt(tconn->pub.last_write_success, idle_cutoff)) continue;
4247
4248 /*
4249 * This connection has been inactive since before the idle timeout. Drain it,
4250 * and free it.
4251 *
4252 * This also extracts the connection from the minmax heap, which invalidates the
4253 * iterator, so we stop iterating over it.
4254 */
4256 break;
4257 }
4258 }
4259
4260 /*
4261 * Free any connections which have drained
4262 * and we didn't reactivate during the last
4263 * round of management.
4264 */
4268
4269 /*
4270 * Process deferred connection freeing
4271 */
4272 if (!trunk->in_handler) fr_dlist_talloc_free(&trunk->to_free);
4273
4274 /*
4275 * Update the state of the trunk
4276 */
4278 new_state = TRUNK_STATE_ACTIVE;
4279 } else {
4280 /*
4281 * INIT / CONNECTING / FULL mean connections will become active
4282 * so the trunk is PENDING
4283 */
4288 }
4289
4290 if (new_state != trunk->pub.state) TRUNK_STATE_TRANSITION(new_state);
4291
4292 /*
4293 * A trunk can be signalled to not proactively
4294 * manage connections if a destination is known
4295 * to be unreachable, and doing so would result
4296 * in spurious connections still being opened.
4297 *
4298 * We still run other connection management
4299 * functions and just short circuit the function
4300 * here.
4301 */
4302 if (!trunk->managing_connections) return;
4303
4304 /*
4305 * We're above the target requests per connection
4306 * spawn more connections!
4307 */
4309 /*
4310 * If connecting is provided, check we
4311 * wouldn't have too many connections in
4312 * the connecting state.
4313 *
4314 * This is a throttle in the case of transitory
4315 * load spikes, or a backend becoming
4316 * unavailable.
4317 */
4318 if ((trunk->conf.connecting > 0) &&
4320 trunk->conf.connecting)) {
4321 DEBUG4("Not opening connection - Too many (%u) connections in the connecting state",
4322 trunk->conf.connecting);
4323 return;
4324 }
4325
4326 trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4327
4328 /*
4329 * Only apply hysteresis if we have at least
4330 * one available connection.
4331 */
4332 if (conn_count && fr_time_gt(fr_time_add(trunk->pub.last_above_target, trunk->conf.open_delay), now)) {
4333 DEBUG4("Not opening connection - Need to be above target for %pVs. It's been %pVs",
4336 return; /* too soon */
4337 }
4338
4339 /*
4340 * We don't consider 'draining' connections
4341 * in the max calculation, as if we do
4342 * determine that we need to spawn a new
4343 * request, then we'd move all 'draining'
4344 * connections to active before spawning
4345 * any new connections.
4346 */
4347 if ((trunk->conf.max > 0) && (conn_count >= trunk->conf.max)) {
4348 DEBUG4("Not opening connection - Have %u connections, need %u or below",
4349 conn_count, trunk->conf.max);
4350 return;
4351 }
4352
4353 /*
4354 * We consider requests pending on all connections
4355 * and the trunk's backlog as that's the current count
4356 * load.
4357 */
4358 if (!req_count) {
4359 DEBUG4("Not opening connection - No outstanding requests");
4360 return;
4361 }
4362
4363 /*
4364 * Do the n+1 check, i.e. if we open one connection
4365 * will that take us below our target threshold.
4366 */
4367 if (conn_count > 0) {
4368 average = ROUND_UP_DIV(req_count, (conn_count + 1));
4369 if (average < trunk->conf.target_req_per_conn) {
4370 DEBUG4("Not opening connection - Would leave us below our target requests "
4371 "per connection (now %u, after open %u)",
4372 ROUND_UP_DIV(req_count, conn_count), average);
4373 return;
4374 }
4375 } else {
4376 (void)trunk_connection_spawn(trunk, now);
4377 return;
4378 }
4379
4380 /*
4381 * If we've got a connection in the draining list
4382 * move it back into the active list if we've
4383 * been requested to add a connection back in.
4384 */
4385 tconn = fr_dlist_head(&trunk->draining);
4386 if (tconn) {
4387 if (trunk_connection_is_full(tconn)) {
4389 } else {
4391 }
4392 return;
4393 }
4394
4395 /*
4396 * Implement delay if there's no connections that
4397 * could be immediately re-activated.
4398 */
4399 if (fr_time_gt(fr_time_add(trunk->pub.last_open, trunk->conf.open_delay), now)) {
4400 DEBUG4("Not opening connection - Need to wait %pVs before opening another connection. "
4401 "It's been %pVs",
4404 return;
4405 }
4406
4407 DEBUG4("Opening connection - Above target requests per connection (now %u, target %u)",
4408 ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4409 /* last_open set by trunk_connection_spawn */
4410 (void)trunk_connection_spawn(trunk, now);
4411 }
4412
4413 /*
4414 * We're below the target requests per connection.
4415 * Free some connections...
4416 */
4417 else if (fr_time_gt(trunk->pub.last_below_target, trunk->pub.last_above_target)) {
4418 if (fr_time_gt(fr_time_add(trunk->pub.last_below_target, trunk->conf.close_delay), now)) {
4419 DEBUG4("Not closing connection - Need to be below target for %pVs. It's been %pVs",
4422 return; /* too soon */
4423 }
4424
4425 trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4426
4427 if (!conn_count) {
4428 DEBUG4("Not closing connection - No connections to close!");
4429 return;
4430 }
4431
4432 if ((trunk->conf.min > 0) && ((conn_count - 1) < trunk->conf.min)) {
4433 DEBUG4("Not closing connection - Have %u connections, need %u or above",
4434 conn_count, trunk->conf.min);
4435 return;
4436 }
4437
4438 if (!req_count) {
4439 DEBUG4("Closing connection - No outstanding requests");
4440 goto close;
4441 }
4442
4443 /*
4444 * The minimum number of connections must be set
4445 * to zero for this to work.
4446 * min == 0, no requests, close all the connections.
4447 * This is useful for backup databases, when
4448 * maintaining the connection would lead to lots of
4449 * log file churn.
4450 */
4451 if (conn_count == 1) {
4452 DEBUG4("Not closing connection - Would leave connections "
4453 "and there are still %u outstanding requests", req_count);
4454 return;
4455 }
4456
4457 /*
4458 * Do the n-1 check, i.e. if we close one connection
4459 * will that take us above our target threshold.
4460 */
4461 average = ROUND_UP_DIV(req_count, (conn_count - 1));
4462 if (average > trunk->conf.target_req_per_conn) {
4463 DEBUG4("Not closing connection - Would leave us above our target requests per connection "
4464 "(now %u, after close %u)", ROUND_UP_DIV(req_count, conn_count), average);
4465 return;
4466 }
4467
4468 DEBUG4("Closing connection - Below target requests per connection (now %u, target %u)",
4469 ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4470
4471 close:
4472 if (fr_time_gt(fr_time_add(trunk->pub.last_closed, trunk->conf.close_delay), now)) {
4473 DEBUG4("Not closing connection - Need to wait %pVs before closing another connection. "
4474 "It's been %pVs",
4477 return;
4478 }
4479
4480 /*
4481 * If the last event on the trunk was a connection failure and
4482 * there is only one connection, this may well be a reconnect
4483 * attempt after a failure - and needs to persist otherwise
4484 * the last event will be a failure and no new connection will
4485 * be made, leading to no new requests being enqueued.
4486 */
4487 if (fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
4488 fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed) && (conn_count == 1)) {
4489 DEBUG4("Not closing remaining connection - last event was a failure");
4490 return;
4491 }
4492
4493 /*
4494 * Inactive connections get counted in the
4495 * set of viable connections, but are likely
4496 * to be congested or dead, so we drain
4497 * (and possibly eventually free) those first.
4498 */
4499 if ((tconn = fr_dlist_tail(&trunk->inactive))) {
4500 /*
4501 * If the connection has no requests associated
4502 * with it then immediately free.
4503 */
4505 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4506 } else {
4508 }
4509 /*
4510 * It is possible to have too may connecting
4511 * connections when the connections are
4512 * taking a while to open and the number
4513 * of requests decreases.
4514 */
4515 } else if ((tconn = fr_dlist_tail(&trunk->connecting))) {
4516 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4517
4518 /*
4519 * Finally if there are no "connecting"
4520 * connections to close, and no "inactive"
4521 * connections, start draining "active"
4522 * connections.
4523 */
4524 } else if ((tconn = fr_minmax_heap_max_peek(trunk->active))) {
4525 /*
4526 * If the connection has no requests associated
4527 * with it then immediately free.
4528 */
4530 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4531 } else {
4533 }
4534 }
4535
4536 trunk->pub.last_closed = now;
4537
4538 return;
4539 }
4540}
4541
4542/** Event to periodically call the connection management function
4543 *
4544 * @param[in] tl this event belongs to.
4545 * @param[in] now current time.
4546 * @param[in] uctx The trunk.
4547 */
4548static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx)
4549{
4550 trunk_t *trunk = talloc_get_type_abort(uctx, trunk_t);
4551
4552 trunk_manage(trunk, now);
4553
4555 if (fr_timer_in(trunk, tl, &trunk->manage_ev, trunk->conf.manage_interval,
4556 false, _trunk_timer, trunk) < 0) {
4557 PERROR("Failed inserting trunk management event");
4558 /* Not much we can do, hopefully the trunk will be freed soon */
4559 }
4560 }
4561}
4562
4563/** Return a count of requests on a connection in a specific state
4564 *
4565 * @param[in] trunk to retrieve counts for.
4566 * @param[in] conn_state One or more connection states or'd together.
4567 * @param[in] req_state One or more request states or'd together.
4568 * @return The number of requests in a particular state, on connection in a particular state.
4569 */
4570uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
4571{
4572 uint64_t count = 0;
4573 trunk_connection_t *tconn = NULL;
4575
4576#define COUNT_BY_STATE(_state, _list) \
4577do { \
4578 if (conn_state & (_state)) { \
4579 tconn = NULL; \
4580 while ((tconn = fr_dlist_next(&trunk->_list, tconn))) { \
4581 count += trunk_request_count_by_connection(tconn, req_state); \
4582 } \
4583 } \
4584} while (0)
4585
4586 if (conn_state & TRUNK_CONN_ACTIVE) {
4587 for (tconn = fr_minmax_heap_iter_init(trunk->active, &iter);
4588 tconn;
4589 tconn = fr_minmax_heap_iter_next(trunk->active, &iter)) {
4590 count += trunk_request_count_by_connection(tconn, req_state);
4591 }
4592 }
4593
4596 COUNT_BY_STATE(TRUNK_CONN_INACTIVE_DRAINING, inactive_draining);
4599
4601
4602 return count;
4603}
4604
4605/** Update timestamps for when we last had a transition from above target to below target or vice versa
4606 *
4607 * Should be called on every time a connection or request is allocated or freed.
4608 *
4609 * @param[out] conn_count_out How many connections we considered.
4610 * @param[out] req_count_out How many requests we considered.
4611 * @param[in] trunk to operate on.
4612 * @param[in] now The current time.
4613 * @param[in] verify if true (and this is a debug build), then assert if req_per_conn
4614 * has changed.
4615 * @return
4616 * - 0 if the average couldn't be calculated (no requests or no connections).
4617 * - The average number of requests per connection.
4618 */
4619static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_count_out,
4620 trunk_t *trunk, fr_time_t now,
4621 NDEBUG_UNUSED bool verify)
4622{
4623 uint32_t req_count = 0;
4624 uint16_t conn_count = 0;
4625 uint64_t req_per_conn = 0;
4626
4628
4629 /*
4630 * No need to update these as the trunk is being freed
4631 */
4632 if (trunk->freeing) goto done;
4633
4634 /*
4635 * Count all connections except draining and draining to free.
4636 *
4637 * Omitting these connection states artificially raises the
4638 * request to connection ratio, so that we can preemptively spawn
4639 * new connections.
4640 *
4641 * In the case of TRUNK_CONN_DRAINING | TRUNK_CONN_INACTIVE_DRAINING
4642 * the trunk management code has enough hysteresis to not
4643 * immediately reactivate the connection.
4644 *
4645 * In the case of TRUNK_CONN_DRAINING_TO_FREE the trunk
4646 * management code should spawn a new connection to takes its place.
4647 *
4648 * Connections placed in the DRAINING_TO_FREE state are being
4649 * closed preemptively to deal with bugs on the server we're
4650 * talking to, or misconfigured firewalls which are trashing
4651 * TCP/UDP connection states.
4652 */
4657
4658 /*
4659 * Requests on all connections
4660 */
4661 req_count = trunk_request_count_by_state(trunk,
4664
4665 /*
4666 * No connections, but we do have requests
4667 */
4668 if (conn_count == 0) {
4669 if ((req_count > 0) && (trunk->conf.target_req_per_conn > 0)) goto above_target;
4670 goto done;
4671 }
4672
4673 if (req_count == 0) {
4674 if (trunk->conf.target_req_per_conn > 0) goto below_target;
4675 goto done;
4676 }
4677
4678 /*
4679 * Calculate the req_per_conn
4680 */
4681 req_per_conn = ROUND_UP_DIV(req_count, conn_count);
4682 if (req_per_conn > trunk->conf.target_req_per_conn) {
4683 above_target:
4684 /*
4685 * Edge - Below target to above target (too many requests per conn - spawn more)
4686 *
4687 * The equality check is correct here as both values start at 0.
4688 */
4690 } else if (req_per_conn < trunk->conf.target_req_per_conn) {
4691 below_target:
4692 /*
4693 * Edge - Above target to below target (too few requests per conn - close some)
4694 *
4695 * The equality check is correct here as both values start at 0.
4696 */
4698 }
4699
4700done:
4701 if (conn_count_out) *conn_count_out = conn_count;
4702 if (req_count_out) *req_count_out = req_count;
4703
4704 /*
4705 * Check we haven't missed a call to trunk_requests_per_connection
4706 */
4707 fr_assert(!verify || (trunk->last_req_per_conn == 0) || (req_per_conn == trunk->last_req_per_conn));
4708
4709 trunk->last_req_per_conn = req_per_conn;
4710
4711 return req_per_conn;
4712}
4713
4714/** Drain the backlog of as many requests as possible
4715 *
4716 * @param[in] trunk To drain backlog requests for.
4717 */
4718static void trunk_backlog_drain(trunk_t *trunk)
4719{
4720 trunk_request_t *treq;
4721
4722 if (fr_heap_num_elements(trunk->backlog) == 0) return;
4723
4724 /*
4725 * If it's always writable, this isn't
4726 * really a noteworthy event.
4727 */
4728 if (!trunk->conf.always_writable) DEBUG3("Draining backlog of requests");
4729
4730 /*
4731 * Do *NOT* add an artificial limit
4732 * here. We rely on all available
4733 * connections entering the full
4734 * state and transitioning back to
4735 * active in order to drain the
4736 * backlog.
4737 */
4738 while ((treq = fr_heap_peek(trunk->backlog))) {
4739 switch (trunk_request_enqueue_existing(treq)) {
4740 case TRUNK_ENQUEUE_OK:
4741 continue;
4742
4743 /*
4744 * Signal to stop
4745 */
4747 break;
4748
4749 /*
4750 * Failed enqueueing the request,
4751 * have it enter the failed state
4752 * which will free it and
4753 * re-enliven the yielded request.
4754 */
4756 case TRUNK_ENQUEUE_FAIL:
4758 continue;
4759
4762 return;
4763 }
4764 }
4765}
4766
4767/** Force the trunk to re-establish its connections
4768 *
4769 * @param[in] trunk to signal.
4770 * @param[in] states One or more states or'd together.
4771 * @param[in] reason Why the connections are being signalled to reconnect.
4772 */
4773void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
4774{
4775
4776#define RECONNECT_BY_STATE(_state, _list) \
4777do { \
4778 if (states & (_state)) { \
4779 size_t i; \
4780 for (i = fr_dlist_num_elements(&trunk->_list); i > 0; i--) { \
4781 connection_signal_reconnect(((trunk_connection_t *)fr_dlist_tail(&trunk->_list))->pub.conn, reason); \
4782 } \
4783 } \
4784} while (0)
4785
4786 /*
4787 * Connections in the 'connecting' state
4788 * may re-enter that state, so we need to
4789 * be careful not to enter an infinite
4790 * loop, as we iterate over the list
4791 * again and again.
4792 */
4794
4795 if (states & TRUNK_CONN_ACTIVE) {
4796 trunk_connection_t *tconn;
4797 while ((tconn = fr_minmax_heap_min_peek(trunk->active))) connection_signal_reconnect(tconn->pub.conn, reason);
4798 }
4799
4807}
4808
4809/** Start the trunk running
4810 *
4811 */
4813{
4814 uint16_t i;
4815
4816 if (unlikely(trunk->started)) return 0;
4817
4818 /*
4819 * Spawn the initial set of connections
4820 */
4821 for (i = 0; i < trunk->conf.start; i++) {
4822 DEBUG("[%i] Starting initial connection", i);
4823 if (trunk_connection_spawn(trunk, fr_time()) != 0) return -1;
4824 }
4825
4826 /*
4827 * If the idle timeout is set, AND there's no management interval, OR the management interval is
4828 * less than the idle timeout, update the management interval.
4829 */
4833 trunk->conf.manage_interval = trunk->conf.idle_timeout;
4834 }
4835
4837 /*
4838 * Insert the event timer to manage
4839 * the interval between managing connections.
4840 */
4841 if (fr_timer_in(trunk, trunk->el->tl, &trunk->manage_ev, trunk->conf.manage_interval,
4842 false, _trunk_timer, trunk) < 0) {
4843 PERROR("Failed inserting trunk management event");
4844 return -1;
4845 }
4846 }
4847 trunk->started = true;
4848 trunk->managing_connections = true;
4849
4850 return 0;
4851}
4852
4853/** Allow the trunk to open and close connections in response to load
4854 *
4855 */
4857{
4858 if (!trunk->started || trunk->managing_connections) return;
4859
4860 DEBUG3("Connection management enabled");
4861 trunk->managing_connections = true;
4862}
4863
4864/** Stop the trunk from opening and closing connections in response to load
4865 *
4866 */
4868{
4869 if (!trunk->started || !trunk->managing_connections) return;
4870
4871 DEBUG3("Connection management disabled");
4872 trunk->managing_connections = false;
4873}
4874
4875/** Schedule a trunk management event for the next time the event loop is executed
4876 */
4878{
4879 if (!trunk->started || !trunk->managing_connections) return 0;
4880
4881 if (fr_timer_in(trunk, trunk->el->tl, &trunk->manage_ev, fr_time_delta_wrap(0),
4882 false, _trunk_timer, trunk) < 0) {
4883 PERROR("Failed inserting trunk management event");
4884 return -1;
4885 }
4886
4887 return 0;
4888}
4889
4890/** Order connections by queue depth
4891 *
4892 */
4893static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
4894{
4897
4900
4901 /*
4902 * Add a fudge factor of 1 to reduce spurious rebalancing
4903 */
4904 return ((a_count > b_count) && ((a_count - b_count) > 1)) - ((b_count > a_count) && ((b_count - a_count) > 1));
4905}
4906
4907/** Free a trunk, gracefully closing all connections.
4908 *
4909 */
4910static int _trunk_free(trunk_t *trunk)
4911{
4912 trunk_connection_t *tconn;
4913 trunk_request_t *treq;
4914 trunk_watch_entry_t *watch;
4915 size_t i;
4916
4917 DEBUG4("Trunk free %p", trunk);
4918
4919 trunk->freeing = true; /* Prevent re-enqueuing */
4920
4921 /*
4922 * We really don't want this firing after
4923 * we've freed everything.
4924 */
4926
4927 /*
4928 * Now free the connections in each of the lists.
4929 *
4930 * Each time a connection is freed it removes itself from the list
4931 * its in, which means the head should keep advancing automatically.
4932 */
4933 while ((tconn = fr_minmax_heap_min_peek(trunk->active))) connection_signal_halt(tconn->pub.conn);
4934 while ((tconn = fr_dlist_head(&trunk->init))) connection_signal_halt(tconn->pub.conn);
4935 while ((tconn = fr_dlist_head(&trunk->connecting))) connection_signal_halt(tconn->pub.conn);
4936 while ((tconn = fr_dlist_head(&trunk->full))) connection_signal_halt(tconn->pub.conn);
4937 while ((tconn = fr_dlist_head(&trunk->inactive))) connection_signal_halt(tconn->pub.conn);
4938 while ((tconn = fr_dlist_head(&trunk->inactive_draining))) connection_signal_halt(tconn->pub.conn);
4939 while ((tconn = fr_dlist_head(&trunk->closed))) connection_signal_halt(tconn->pub.conn);
4940 while ((tconn = fr_dlist_head(&trunk->draining))) connection_signal_halt(tconn->pub.conn);
4941 while ((tconn = fr_dlist_head(&trunk->draining_to_free))) connection_signal_halt(tconn->pub.conn);
4942
4943 /*
4944 * Process any deferred connection frees
4945 */
4947
4948 /*
4949 * Free any requests left in the backlog
4950 */
4951 while ((treq = fr_heap_peek(trunk->backlog))) trunk_request_enter_failed(treq);
4952
4953 /*
4954 * Free any requests in our request cache
4955 */
4956 while ((treq = trunk_list_free_requests_peek(trunk))) talloc_free(treq);
4957
4958 /*
4959 * Free any entries in the watch lists
4960 */
4961 for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
4962 while ((watch = fr_dlist_pop_head(&trunk->watch[i]))) talloc_free(watch);
4963 }
4964
4965 return 0;
4966}
4967
4968/** Allocate a new collection of connections
4969 *
4970 * This function should be called first to allocate a new trunk connection.
4971 *
4972 * After the trunk has been allocated, #trunk_request_alloc and
4973 * #trunk_request_enqueue should be used to allocate memory for trunk
4974 * requests, and pass a preq (protocol request) to the trunk for
4975 * processing.
4976 *
4977 * The trunk will then asynchronously process the request, writing the result
4978 * to a specified rctx. See #trunk_request_enqueue for more details.
4979 *
4980 * @note Trunks may not be shared between multiple threads under any circumstances.
4981 *
4982 * @param[in] ctx To use for any memory allocations. Must be thread local.
4983 * @param[in] el to use for I/O and timer events.
4984 * @param[in] funcs Callback functions.
4985 * @param[in] conf Common user configurable parameters.
4986 * @param[in] log_prefix To prepend to global messages.
4987 * @param[in] uctx User data to pass to the alloc function.
4988 * @param[in] delay_start If true, then we will not spawn any connections
4989 * until the first request is enqueued.
4990 * @param[in] trigger_args Pairs to pass to trigger requests, if triggers are enabled.
4991 * @return
4992 * - New trunk handle on success.
4993 * - NULL on error.
4994 */
4996 trunk_io_funcs_t const *funcs, trunk_conf_t const *conf,
4997 char const *log_prefix, void const *uctx, bool delay_start, fr_pair_list_t *trigger_args)
4998{
4999 trunk_t *trunk;
5000 size_t i;
5001
5002 /*
5003 * Check we have the functions we need
5004 */
5005 if (!fr_cond_assert(funcs->connection_alloc)) return NULL;
5006
5007 MEM(trunk = talloc_zero(ctx, trunk_t));
5008 trunk->el = el;
5009 trunk->log_prefix = talloc_strdup(trunk, log_prefix);
5010 trunk->trigger_args = trigger_args;
5011
5012 memcpy(&trunk->funcs, funcs, sizeof(trunk->funcs));
5013 if (!trunk->funcs.connection_prioritise) {
5015 }
5017
5018 memcpy(&trunk->conf, conf, sizeof(trunk->conf));
5019
5020 memcpy(&trunk->uctx, &uctx, sizeof(trunk->uctx));
5021 talloc_set_destructor(trunk, _trunk_free);
5022
5023 /*
5024 * Free request list...
5025 */
5027
5028 /*
5029 * Request backlog queue
5030 */
5032 trunk_request_t, heap_id, 0));
5033
5034 /*
5035 * Connection queues and trees
5036 */
5038 trunk_connection_t, heap_id, 0));
5048
5049 /*
5050 * Watch lists
5051 */
5052 for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
5054 }
5055
5056 DEBUG4("Trunk allocated %p", trunk);
5057
5058 if (!delay_start) {
5059 if (trunk_start(trunk) < 0) {
5060 talloc_free(trunk);
5061 return NULL;
5062 }
5063 }
5064
5065 return trunk;
5066}
5067
5068/** Check for a module trigger section when parsing the `triggers` option.
5069 *
5070 */
5071int trunk_trigger_cf_parse(TALLOC_CTX *ctx, void *out, void *parent, CONF_ITEM *ci, conf_parser_t const *rule)
5072{
5075
5076 if (cf_pair_parse_value(ctx, out, parent, ci, rule)< 0) return -1;
5077
5078 /*
5079 * If the parent section of the `triggers` option contains a trigger
5080 * section then store it as the module CONF SECTION for the appropriate
5081 * trigger group.
5082 */
5083 if (cf_section_find(cs, "trigger", NULL)) {
5084 if (strcmp(cf_section_name(cs), "request") == 0) {
5085 conf->req_trigger_cs = cs;
5086 } else {
5087 conf->conn_trigger_cs = cs;
5088 }
5089 }
5090
5091 return 0;
5092}
5093
5094#ifndef TALLOC_GET_TYPE_ABORT_NOOP
5095/** Verify a trunk
5096 *
5097 * A trunk has some number of connections, which each have some number of requests. The connections and
5098 * requests are in differing kinds of containers depending on their state and how they are used, and may
5099 * have fields that can only be validated by comparison with a parent. We had planned on passing a "context"
5100 * down with the ancestral values, but that breaks the foo_verify() API. Each foo_verify() will only verify the
5101 * foo's children.
5102 */
5103void trunk_verify(char const *file, int line, trunk_t *trunk)
5104{
5105 fr_fatal_assert_msg(trunk, "CONSISTENCY CHECK FAILED %s[%i]: trunk_t pointer was NULL", file, line);
5106 (void) talloc_get_type_abort(trunk, trunk_t);
5107
5108 for (size_t i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
5109 _fr_dlist_verify(file, line, &trunk->watch[i]);
5110 }
5111
5112#define IO_FUNC_VERIFY(_func) \
5113 fr_fatal_assert_msg(trunk->funcs._func, "CONSISTENCY_CHECK_FAILED %s[%i}: " #_func " was NULL", file, line)
5114
5115 /*
5116 * Only a few of the function pointers *must* be non-NULL..
5117 */
5119 IO_FUNC_VERIFY(connection_prioritise);
5121
5122#define TRUNK_TCONN_CHECKS(_tconn, _state) \
5123do { \
5124 fr_fatal_assert_msg(trunk == _tconn->pub.trunk, \
5125 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-trunk mismatch", file, line); \
5126 fr_fatal_assert_msg(_state == _tconn->pub.state, \
5127 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-state mismatch", file, line); \
5128} while (0)
5129
5130#define TCONN_DLIST_VERIFY(_dlist, _state) \
5131do { \
5132 _fr_dlist_verify(file, line, &(trunk->_dlist)); \
5133 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5134 trunk_connection_verify(file, line, tconn); \
5135 TRUNK_TCONN_CHECKS(tconn, _state); \
5136 } \
5137} while (0)
5138
5139#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state) \
5140do {\
5141 fr_minmax_heap_verify(file, line, trunk->_heap); \
5142 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5143 trunk_connection_verify(file, line, tconn); \
5144 TRUNK_TCONN_CHECKS(tconn, _state); \
5145 }} \
5146} while (0)
5147
5148 fr_dlist_verify(&(trunk->free_requests));
5149 FR_HEAP_VERIFY(trunk->backlog);
5150
5157 /* TCONN_DLIST_VERIFY(failed, ???); */
5162}
5163
5165{
5166 fr_fatal_assert_msg(tconn, "CONSISTENCY CHECK FAILED %s[%i]: trunk_connection_t pointer was NULL", file, line);
5167 (void) talloc_get_type_abort(tconn, trunk_connection_t);
5168
5169 (void) talloc_get_type_abort(tconn->pub.trunk, trunk_t);
5170
5171 /*
5172 * shouldn't be both in heap and on list--but it doesn't look like moves
5173 * to active heap wipe the dlist pointers.
5174 */
5175
5176#define TCONN_TREQ_CHECKS(_treq, _state) \
5177do { \
5178 fr_fatal_assert_msg(tconn == _treq->pub.tconn, \
5179 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-tconn mismatch", file, line); \
5180 fr_fatal_assert_msg(tconn->pub.trunk == _treq->pub.trunk, \
5181 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-trunk mismatch", file, line); \
5182 fr_fatal_assert_msg(_state == _treq->pub.state, \
5183 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-state mismatch", file, line); \
5184} while (0)
5185
5186#define TREQ_DLIST_VERIFY(_dlist, _state) \
5187do { \
5188 _fr_dlist_verify(file, line, &(tconn->_dlist)); \
5189 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5190 trunk_request_verify(file, line, treq); \
5191 TCONN_TREQ_CHECKS(treq, _state); \
5192 } \
5193} while (0)
5194
5195#define TREQ_HEAP_VERIFY(_heap, _state) \
5196do { \
5197 fr_heap_iter_t _iter; \
5198 fr_heap_verify(file, line, tconn->_heap); \
5199 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5200 treq; \
5201 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5202 trunk_request_verify(file, line, treq); \
5203 TCONN_TREQ_CHECKS(treq, _state); \
5204 } \
5205} while (0)
5206
5207#define TREQ_OPTION_VERIFY(_option, _state) \
5208do { \
5209 if (tconn->_option) { \
5210 trunk_request_verify(file, line, tconn->_option); \
5211 TCONN_TREQ_CHECKS(tconn->_option, _state); \
5212 } \
5213} while (0)
5214
5215 /* verify associated requests */
5222}
5223
5224void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
5225{
5226 fr_fatal_assert_msg(treq, "CONSISTENCY CHECK FAILED %s[%i]: trunk_request_t pointer was NULL", file, line);
5227 (void) talloc_get_type_abort(treq, trunk_request_t);
5228
5229#ifdef WITH_VERIFY_PTR
5230 if (treq->pub.request) request_verify(file, line, treq->pub.request);
5231#endif
5232}
5233
5234
5235bool trunk_search(trunk_t *trunk, void *ptr)
5236{
5237#define TCONN_DLIST_SEARCH(_dlist) \
5238do { \
5239 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5240 if (ptr == tconn) { \
5241 fr_fprintf(stderr, "trunk_search: tconn %p on " #_dlist "\n", ptr); \
5242 return true; \
5243 } \
5244 if (trunk_connection_search(tconn, ptr)) { \
5245 fr_fprintf(stderr, " in tconn %p on " #_dlist "\n", tconn); \
5246 return true; \
5247 } \
5248 } \
5249} while (0)
5250
5251#define TCONN_MINMAX_HEAP_SEARCH(_heap) \
5252do { \
5253 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5254 if (ptr == tconn) { \
5255 fr_fprintf(stderr, "trunk_search: tconn %p on " #_heap "\n", ptr); \
5256 return true; \
5257 } \
5258 if (trunk_connection_search(tconn, ptr)) { \
5259 fr_fprintf(stderr, " on tconn %p on " #_heap "\n", tconn); \
5260 return true; \
5261 } \
5262 }}\
5263} while (0)
5264
5266 TCONN_DLIST_SEARCH(connecting);
5268 TCONN_DLIST_SEARCH(full);
5269 TCONN_DLIST_SEARCH(inactive);
5270 TCONN_DLIST_SEARCH(inactive_draining);
5271 TCONN_DLIST_SEARCH(failed);
5272 TCONN_DLIST_SEARCH(closed);
5273 TCONN_DLIST_SEARCH(draining);
5274 TCONN_DLIST_SEARCH(draining_to_free);
5275 TCONN_DLIST_SEARCH(to_free);
5276
5277 return false;
5278}
5279
5281{
5282#define TREQ_DLIST_SEARCH(_dlist) \
5283do { \
5284 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5285 if (ptr == treq) { \
5286 fr_fprintf(stderr, "trunk_search: treq %p on " #_dlist "\n", ptr); \
5287 return true; \
5288 } \
5289 if (trunk_request_search(treq, ptr)) { \
5290 fr_fprintf(stderr, "trunk_search: preq %p found on " #_dlist, ptr); \
5291 return true; \
5292 } \
5293 } \
5294} while (0)
5295
5296#define TREQ_HEAP_SEARCH(_heap) \
5297do { \
5298 fr_heap_iter_t _iter; \
5299 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5300 treq; \
5301 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5302 if (ptr == treq) { \
5303 fr_fprintf(stderr, "trunk_search: treq %p in " #_heap "\n", ptr); \
5304 return true; \
5305 } \
5306 if (trunk_request_search(treq, ptr)) { \
5307 fr_fprintf(stderr, "trunk_search: preq %p found in " #_heap, ptr); \
5308 return true; \
5309 } \
5310 } \
5311} while (0)
5312
5313#define TREQ_OPTION_SEARCH(_option) \
5314do { \
5315 if (tconn->_option) { \
5316 if (ptr == tconn->_option) { \
5317 fr_fprintf(stderr, "trunk_search: treq %p is " #_option "\n", ptr); \
5318 return true; \
5319 } \
5320 if (trunk_request_search(tconn->_option, ptr)) { \
5321 fr_fprintf(stderr, "trunk_search: preq %p found in " #_option, ptr); \
5322 return true; \
5323 } \
5324 } \
5325} while (0)
5326
5327 /* search associated requests */
5328 TREQ_HEAP_SEARCH(pending);
5329 TREQ_DLIST_SEARCH(sent);
5330 TREQ_DLIST_SEARCH(cancel);
5331 TREQ_DLIST_SEARCH(cancel_sent);
5332 TREQ_OPTION_SEARCH(partial);
5333 TREQ_OPTION_SEARCH(cancel_partial);
5334
5335 return false;
5336}
5337
5339{
5340 return treq->pub.preq == ptr;
5341}
5342#endif
int const char * file
Definition acutest.h:702
int const char int line
Definition acutest.h:702
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request)
static bool init
Definition fuzzer.c:40
#define L(_str)
Helper for initialising arrays of string literals.
Definition build.h:228
#define NDEBUG_UNUSED
Definition build.h:347
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition build.h:343
#define unlikely(_x)
Definition build.h:402
#define UNUSED
Definition build.h:336
#define NUM_ELEMENTS(_t)
Definition build.h:358
int cf_pair_parse_value(TALLOC_CTX *ctx, void *out, UNUSED void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Parses a CONF_PAIR into a C data type.
Definition cf_parse.c:213
#define CONF_PARSER_TERMINATOR
Definition cf_parse.h:657
cf_parse_t func
Override default parsing behaviour for the specified type with a custom parsing function.
Definition cf_parse.h:611
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition cf_parse.h:280
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
Definition cf_parse.h:334
#define FR_CONF_OFFSET_SUBSECTION(_name, _flags, _struct, _field, _subcs)
conf_parser_t which populates a sub-struct using a CONF_SECTION
Definition cf_parse.h:309
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Definition cf_parse.h:423
Defines a CONF_PAIR to C data type mapping.
Definition cf_parse.h:594
Common header for all CONF_* types.
Definition cf_priv.h:49
Configuration AVP similar to a fr_pair_t.
Definition cf_priv.h:72
A section grouping multiple CONF_PAIR.
Definition cf_priv.h:101
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
Definition cf_util.c:1027
CONF_SECTION * cf_item_to_section(CONF_ITEM const *ci)
Cast a CONF_ITEM to a CONF_SECTION.
Definition cf_util.c:683
char const * cf_section_name(CONF_SECTION const *cs)
Return name2 if set, else name1.
Definition cf_util.c:1196
#define cf_parent(_cf)
Definition cf_util.h:98
connection_state_t
Definition connection.h:47
@ CONNECTION_STATE_FAILED
Connection has failed.
Definition connection.h:56
@ CONNECTION_STATE_HALTED
The connection is in a halted stat.
Definition connection.h:48
@ CONNECTION_STATE_CLOSED
Connection has been closed.
Definition connection.h:57
@ CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
Definition connection.h:54
@ CONNECTION_STATE_INIT
Init state, sets up connection.
Definition connection.h:51
@ CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
Definition connection.h:52
@ CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
Definition connection.h:55
connection_reason_t
Definition connection.h:84
static size_t min(size_t x, size_t y)
Definition dbuff.c:66
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:141
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:212
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:158
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:186
#define MEM(x)
Definition debug.h:46
#define DEBUG(fmt,...)
Definition dhcpclient.c:38
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:242
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition dlist.h:468
static void _fr_dlist_verify(char const *file, int line, fr_dlist_head_t const *list_head)
Check all items in the list are valid.
Definition dlist.h:717
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition dlist.h:620
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition dlist.h:145
static void fr_dlist_talloc_free(fr_dlist_head_t *head)
Free all items in a doubly linked list (with talloc)
Definition dlist.h:892
static void * fr_dlist_prev(fr_dlist_head_t const *list_head, void const *ptr)
Get the previous item in a list.
Definition dlist.h:570
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:921
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition dlist.h:654
static void * fr_dlist_tail(fr_dlist_head_t const *list_head)
Return the TAIL item of a list or NULL if the list is empty.
Definition dlist.h:513
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:360
#define fr_dlist_verify(_head)
Definition dlist.h:737
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:257
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
Definition dlist.h:320
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Definition dlist.h:537
Head of a doubly linked list.
Definition dlist.h:51
Entry in a doubly linked list.
Definition dlist.h:41
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
unsigned int fr_heap_index_t
Definition heap.h:80
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition heap.h:136
#define FR_HEAP_VERIFY(_heap)
Definition heap.h:212
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition heap.h:115
The main heap structure.
Definition heap.h:66
talloc_free(hp)
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition log.h:540
#define RDEBUG3(fmt,...)
Definition log.h:355
#define RWARN(fmt,...)
Definition log.h:309
#define DEBUG4(_fmt,...)
Definition log.h:267
#define RATE_LIMIT_LOCAL_ROPTIONAL(_entry, _l_request, _l_global, _fmt,...)
Rate limit messages using a local limiting entry.
Definition log.h:618
Track when a log message was last repeated.
Definition log.h:559
#define fr_time()
Definition event.c:60
Stores all information relating to an event list.
Definition event.c:377
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition log.c:577
fr_log_type_t
Definition log.h:51
#define ROUND_UP_DIV(_x, _y)
Get the ceiling value of integer division.
Definition math.h:211
unsigned short uint16_t
unsigned int uint32_t
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
void * fr_minmax_heap_iter_next(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Get the next entry in a minmax heap.
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
void * fr_minmax_heap_iter_init(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Iterate over entries in a minmax heap.
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
unsigned int fr_minmax_heap_iter_t
Definition minmax_heap.h:38
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
Definition minmax_heap.h:85
int8_t fr_pointer_cmp(void const *a, void const *b)
Compares two pointers.
Definition misc.c:449
static int8_t request_prioritise(void const *one, void const *two)
Definition bio.c:1151
#define fr_assert(_expr)
Definition rad_assert.h:37
#define RDEBUG(fmt,...)
#define DEBUG2(fmt,...)
#define WARN(fmt,...)
static bool done
Definition radclient.c:80
#define INFO(fmt,...)
Definition radict.c:63
static fr_event_list_t * events
Definition radsniff.c:58
static rs_t * conf
Definition radsniff.c:52
void connection_signal_shutdown(connection_t *conn)
Shuts down a connection gracefully.
int connection_del_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a post list.
Definition connection.c:483
void connection_signal_halt(connection_t *conn)
Shuts down a connection ungracefully.
void connection_signals_resume(connection_t *conn)
Resume processing of deferred signals.
Definition connection.c:330
void connection_signal_reconnect(connection_t *conn, connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
void connection_signal_init(connection_t *conn)
Asynchronously signal a halted connection to start.
connection_t * connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_funcs_t const *funcs, connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
connection_watch_entry_t * connection_add_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed before a state function has been called.
Definition connection.c:521
connection_watch_entry_t * connection_add_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed after a state function has been called.
Definition connection.c:543
int connection_del_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a pre list.
Definition connection.c:466
void connection_signals_pause(connection_t *conn)
Pause processing of deferred signals.
Definition connection.c:321
static fr_time_t test_time(void)
Definition slab_tests.c:43
static fr_time_t test_time_base
Definition slab_tests.c:42
return count
Definition module.c:155
@ memory_order_relaxed
Definition stdatomic.h:127
#define atomic_fetch_add_explicit(object, operand, order)
Definition stdatomic.h:302
#define ATOMIC_VAR_INIT(value)
Definition stdatomic.h:88
Definition log.h:93
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
An element in a table indexed by bit position.
Definition table.h:83
An element in an arbitrarily ordered array of name to num mappings.
Definition table.h:57
#define talloc_get_type_abort_const
Definition talloc.h:110
#define talloc_pooled_object(_ctx, _type, _num_subobjects, _total_subobjects_size)
Definition talloc.h:204
#define talloc_strdup(_ctx, _str)
Definition talloc.h:142
#define fr_time_gteq(_a, _b)
Definition time.h:238
#define fr_time_delta_wrap(_time)
Definition time.h:152
#define fr_time_wrap(_time)
Definition time.h:145
#define fr_time_lteq(_a, _b)
Definition time.h:240
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_gt(_a, _b)
Definition time.h:237
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
#define fr_time_lt(_a, _b)
Definition time.h:239
#define fr_time_delta_gt(_a, _b)
Definition time.h:283
"server local" time.
Definition time.h:69
An event timer list.
Definition timer.c:49
A timer event.
Definition timer.c:83
#define FR_TIMER_DELETE(_ev_p)
Definition timer.h:103
#define FR_TIMER_DELETE_RETURN(_ev_p)
Definition timer.h:110
#define fr_timer_in(...)
Definition timer.h:87
#define FR_TIMER_DISARM(_ev)
Definition timer.h:91
bool trunk_search(trunk_t *trunk, void *ptr)
Definition trunk.c:5235
static atomic_uint_fast64_t request_counter
Definition trunk.c:54
CONF_PAIR * trigger_cp[NUM_ELEMENTS(trunk_conn_trigger_names)]
Cached trigger CONF_PAIRs.
Definition trunk.c:319
static void trunk_connection_enter_active(trunk_connection_t *tconn)
Transition a connection back to the active state.
Definition trunk.c:3295
#define CONN_REORDER(_tconn)
Reorder the connections in the active heap.
Definition trunk.c:792
static size_t trunk_req_trigger_names_len
Definition trunk.c:388
int trunk_connection_pop_cancellation(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a cancellation request off a connection's cancellation queue.
Definition trunk.c:3882
fr_dlist_head_t cancel
Requests in the cancel state.
Definition trunk.c:161
int trunk_connection_manage_schedule(trunk_t *trunk)
Schedule a trunk management event for the next time the event loop is executed.
Definition trunk.c:4877
#define REQUEST_EXTRACT_SENT(_treq)
Remove the current request from the sent list.
Definition trunk.c:762
static void _trunk_connection_on_shutdown(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the shutdown state.
Definition trunk.c:3445
struct trunk_watch_entry_s trunk_watch_entry_t
An entry in a trunk watch function list.
fr_dlist_head_t reapable
Idle request.
Definition trunk.c:159
fr_heap_t * pending
Requests waiting to be sent.
Definition trunk.c:153
trunk_conf_t conf
Trunk common configuration.
Definition trunk.c:224
static size_t trunk_connection_states_len
Definition trunk.c:427
#define REQUEST_EXTRACT_REAPABLE(_treq)
Remove the current request from the reapable list.
Definition trunk.c:767
trunk_connection_t * tconn
The request was associated with.
Definition trunk.c:82
void trunk_connection_callback_readable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O read function.
Definition trunk.c:4061
fr_rate_limit_t limit_last_failure_log
Rate limit on "Refusing to enqueue requests - No active conns".
Definition trunk.c:298
void trunk_verify(char const *file, int line, trunk_t *trunk)
Verify a trunk.
Definition trunk.c:5103
fr_timer_t * manage_ev
Periodic connection management event.
Definition trunk.c:290
#define IN_HANDLER(_trunk)
Definition trunk.c:722
static fr_table_num_ordered_t const trunk_connection_states[]
Definition trunk.c:415
void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
Force the trunk to re-establish its connections.
Definition trunk.c:4773
void trunk_connection_callback_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O write function.
Definition trunk.c:4078
void * uctx
User data to pass to the function.
Definition trunk.c:191
static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new)
Transition a request to the pending state, adding it to the backlog of an active connection.
Definition trunk.c:1192
static void trunk_request_remove_from_conn(trunk_request_t *treq)
Remove a request from all connection lists.
Definition trunk.c:1001
fr_rate_limit_t limit_max_requests_alloc_log
Rate limit on "Refusing to alloc requests - Limit of * requests reached".
Definition trunk.c:296
trunk_request_state_t to
What state we transitioned to.
Definition trunk.c:80
static int8_t _trunk_request_prioritise(void const *a, void const *b)
Compare two protocol requests.
Definition trunk.c:977
static void trunk_manage(trunk_t *trunk, fr_time_t now)
Implements the algorithm we use to manage requests per connection levels.
Definition trunk.c:4210
static int _trunk_connection_free(trunk_connection_t *tconn)
Free a connection.
Definition trunk.c:3727
trunk_io_funcs_t funcs
I/O functions.
Definition trunk.c:276
fr_dlist_head_t draining
Connections that will be freed once all their requests are complete, but can be reactivated.
Definition trunk.c:261
#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq)
Remove the current request from the cancel_partial slot.
Definition trunk.c:777
int trunk_trigger_cf_parse(TALLOC_CTX *ctx, void *out, void *parent, CONF_ITEM *ci, conf_parser_t const *rule)
Check for a module trigger section when parsing the triggers option.
Definition trunk.c:5071
int trunk_start(trunk_t *trunk)
Start the trunk running.
Definition trunk.c:4812
void trunk_request_signal_partial(trunk_request_t *treq)
Signal a partial write.
Definition trunk.c:2069
void trunk_request_signal_fail(trunk_request_t *treq)
Signal that a trunk request failed.
Definition trunk.c:2172
#define TREQ_OPTION_SEARCH(_option)
void trunk_request_signal_cancel_sent(trunk_request_t *treq)
Signal that a remote server has been notified of the cancellation.
Definition trunk.c:2300
static void trunk_connection_enter_draining_to_free(trunk_connection_t *tconn)
Transition a connection to the draining-to-reconnect state.
Definition trunk.c:3259
trunk_watch_t func
Function to call when a trunk enters.
Definition trunk.c:187
void trunk_connection_signal_readable(trunk_connection_t *tconn)
Signal that a trunk connection is readable.
Definition trunk.c:3968
#define DO_REQUEST_FREE(_treq)
Call the free callback (if set)
Definition trunk.c:614
trunk_request_t * trunk_request_alloc(trunk_t *trunk, request_t *request)
(Pre-)Allocate a new trunk request
Definition trunk.c:2518
static void _trunk_connection_on_halted(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the halted state.
Definition trunk.c:3679
#define REQUEST_EXTRACT_BACKLOG(_treq)
Remove the current request from the backlog.
Definition trunk.c:733
fr_heap_index_t heap_id
Used to track the connection in the connected heap.
Definition trunk.c:138
fr_dlist_head_t closed
Connections that have closed.
Definition trunk.c:258
fr_dlist_head_t watch[TRUNK_STATE_MAX]
To be called when trunk changes state.
Definition trunk.c:282
static void trunk_watch_call(trunk_t *trunk, fr_dlist_head_t *list, trunk_state_t state)
Call a list of watch functions associated with a state.
Definition trunk.c:827
static void trunk_request_enter_cancel_complete(trunk_request_t *treq)
Cancellation was acked, the request is complete, free it.
Definition trunk.c:1544
int line
Line change occurred on.
Definition trunk.c:92
static void trunk_connection_enter_inactive_draining(trunk_connection_t *tconn)
Transition a connection to the inactive-draining state.
Definition trunk.c:3197
#define CONN_STATE_TRANSITION(_new, _log)
Definition trunk.c:457
static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out, trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify)
Update timestamps for when we last had a transition from above target to below target or vice versa.
Definition trunk.c:4619
static size_t trunk_connection_events_len
Definition trunk.c:443
static void _trunk_connection_on_failed(connection_t *conn, connection_state_t prev, connection_state_t state, void *uctx)
Connection failed.
Definition trunk.c:3627
bool oneshot
Remove the function after it's called once.
Definition trunk.c:189
bool started
Has the trunk been started.
Definition trunk.c:307
static size_t trunk_states_len
Definition trunk.c:413
#define TCONN_DLIST_VERIFY(_dlist, _state)
#define IO_FUNC_VERIFY(_func)
uint32_t trunk_request_count_by_connection(trunk_connection_t const *tconn, int req_state)
Return the count number of requests associated with a trunk connection.
Definition trunk.c:2926
uint64_t last_req_per_conn
The last request to connection ratio we calculated.
Definition trunk.c:312
#define DO_REQUEST_COMPLETE(_treq)
Call the complete callback (if set)
Definition trunk.c:575
static void trunk_connection_auto_full(trunk_connection_t *tconn)
Automatically mark a connection as full.
Definition trunk.c:2945
static void trunk_connection_remove(trunk_connection_t *tconn)
Remove a trunk connection from whichever list it's currently in.
Definition trunk.c:3096
#define TRUNK_REQUEST_STATE_LOG_MAX
The maximum number of state logs to record per request.
Definition trunk.c:71
static void trunk_connection_writable(trunk_connection_t *tconn)
A connection is writable.
Definition trunk.c:3009
#define OVER_MAX_CHECK
trunk_connection_event_t events
The current events we expect to be notified on.
Definition trunk.c:147
trunk_watch_entry_t * trunk_add_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch, bool oneshot, void const *uctx)
Add a watch entry to the trunk state list.
Definition trunk.c:903
static int _trunk_free(trunk_t *trunk)
Free a trunk, gracefully closing all connections.
Definition trunk.c:4910
fr_dlist_head_t failed
Connections that'll be reconnected shortly.
Definition trunk.c:256
static void trunk_rebalance(trunk_t *trunk)
Rebalance connections across active trunk members when a new connection becomes active.
Definition trunk.c:4146
static void trunk_backlog_drain(trunk_t *trunk)
Drain the backlog of as many requests as possible.
Definition trunk.c:4718
#define DO_REQUEST_CANCEL(_treq, _reason)
Call the cancel callback if set.
Definition trunk.c:536
static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
Order connections by queue depth.
Definition trunk.c:4893
struct trunk_request_pub_s pub
Public fields in the trunk request.
Definition trunk.c:100
#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state)
trunk_request_t * cancel_partial
Partially written cancellation request.
Definition trunk.c:163
#define TCONN_MINMAX_HEAP_SEARCH(_heap)
uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Move requests off of a connection and requeue elsewhere.
Definition trunk.c:2050
bool enabled
Whether the watch entry is enabled.
Definition trunk.c:190
fr_time_t last_freed
Last time this request was freed.
Definition trunk.c:113
#define DO_REQUEST_CONN_RELEASE(_treq)
Call the "conn_release" callback (if set)
Definition trunk.c:557
#define TREQ_DLIST_SEARCH(_dlist)
#define REQUEST_EXTRACT_CANCEL(_treq)
Remove the current request from the cancel list.
Definition trunk.c:772
static bool trunk_connection_is_full(trunk_connection_t *tconn)
Return whether a trunk connection should currently be considered full.
Definition trunk.c:2968
struct trunk_pub_s pub
Public fields in the trunk connection.
Definition trunk.c:216
trunk_cancel_reason_t cancel_reason
Why this request was cancelled.
Definition trunk.c:111
#define REQUEST_BAD_STATE_TRANSITION(_new)
Definition trunk.c:502
trunk_enqueue_t trunk_request_enqueue_on_conn(trunk_request_t **treq_out, trunk_connection_t *tconn, request_t *request, void *preq, void *rctx, bool ignore_limits)
Enqueue additional requests on a specific connection.
Definition trunk.c:2787
static void _trunk_connection_on_closed(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection failed after it was connected.
Definition trunk.c:3556
static fr_table_num_ordered_t const trunk_connection_events[]
Definition trunk.c:437
trunk_enqueue_t trunk_request_enqueue(trunk_request_t **treq_out, trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
Definition trunk.c:2633
#define TCONN_DLIST_SEARCH(_dlist)
static void trunk_request_enter_unassigned(trunk_request_t *treq)
Transition a request to the unassigned state, in preparation for re-assignment.
Definition trunk.c:1095
struct trunk_request_s trunk_request_t
Definition trunk.c:33
void * in_handler
Which handler we're inside.
Definition trunk.c:278
bool freeing
Trunk is being freed, don't spawn new connections or re-enqueue.
Definition trunk.c:304
static fr_table_num_ordered_t const trunk_states[]
Definition trunk.c:408
static void trunk_connection_readable(trunk_connection_t *tconn)
A connection is readable.
Definition trunk.c:2999
#define IS_SERVICEABLE(_tconn)
Definition trunk.c:727
trunk_enqueue_t trunk_request_requeue(trunk_request_t *treq)
Re-enqueue a request on the same connection.
Definition trunk.c:2722
#define IS_PROCESSING(_tconn)
Definition trunk.c:728
#define RECONNECT_BY_STATE(_state, _list)
static void trunk_connection_enter_draining(trunk_connection_t *tconn)
Transition a connection to the draining state.
Definition trunk.c:3227
static fr_table_num_indexed_bit_pos_t const trunk_req_trigger_names[]
Map request states to trigger names.
Definition trunk.c:373
fr_dlist_t entry
Used to track the trunk request in the conn->sent or trunk->backlog request.
Definition trunk.c:108
static void trunk_connection_close_if_empty(trunk_t *trunk, fr_dlist_head_t *head)
Close connections in a particular connection list if they have no requests associated with them.
Definition trunk.c:4104
void trunk_request_signal_cancel_complete(trunk_request_t *treq)
Signal that a remote server acked our cancellation.
Definition trunk.c:2324
static trunk_enqueue_t trunk_request_check_enqueue(trunk_connection_t **tconn_out, trunk_t *trunk, request_t *request)
Check to see if a trunk request can be enqueued.
Definition trunk.c:1642
#define FR_TRUNK_LIST_FUNC(_list)
Definition trunk.c:803
#define DO_REQUEST_MUX(_tconn)
Write one or more requests to a connection.
Definition trunk.c:632
#define REQUEST_EXTRACT_PARTIAL(_treq)
Remove the current request from the partial slot.
Definition trunk.c:753
fr_dlist_head_t sent
Sent request.
Definition trunk.c:157
static void trunk_request_enter_partial(trunk_request_t *treq)
Transition a request to the partial state, indicating that is has been partially sent.
Definition trunk.c:1263
fr_timer_t * lifetime_ev
Maximum time this connection can be open.
Definition trunk.c:178
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
Definition trunk.c:3930
fr_dlist_head_t connecting
Connections which are not yet in the open state.
Definition trunk.c:242
#define TRUNK_STATE_TRANSITION(_new)
Definition trunk.c:923
void trunk_request_signal_cancel(trunk_request_t *treq)
Cancel a trunk request.
Definition trunk.c:2192
void trunk_request_state_log_entry_add(char const *function, int line, trunk_request_t *treq, trunk_request_state_t new)
Definition trunk.c:2844
static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now)
Attempt to spawn a new connection.
Definition trunk.c:3795
int trunk_del_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch)
Remove a watch function from a trunk state list.
Definition trunk.c:869
static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx)
Event to periodically call the connection management function.
Definition trunk.c:4548
struct trunk_connection_pub_s pub
Public fields in the trunk connection.
Definition trunk.c:134
static void trunk_request_enter_reapable(trunk_request_t *treq)
Transition a request to the reapable state, indicating that it's been sent in its entirety,...
Definition trunk.c:1350
uint16_t trunk_connection_count_by_state(trunk_t *trunk, int conn_state)
Return the count number of connections in the specified states.
Definition trunk.c:2902
#define IN_REQUEST_DEMUX(_trunk)
Definition trunk.c:724
#define DO_REQUEST_FAIL(_treq, _prev_state)
Call the fail callback (if set)
Definition trunk.c:594
static void trunk_request_enter_cancel(trunk_request_t *treq, trunk_cancel_reason_t reason)
Transition a request to the cancel state, placing it in a connection's cancellation list.
Definition trunk.c:1413
static trunk_enqueue_t trunk_request_enqueue_existing(trunk_request_t *treq)
Enqueue a request which has never been assigned to a connection or was previously cancelled.
Definition trunk.c:1720
bool managing_connections
Whether the trunk is allowed to manage (open/close) connections.
Definition trunk.c:309
#define DO_CONNECTION_ALLOC(_tconn)
Allocate a new connection.
Definition trunk.c:683
char const * function
State change occurred in.
Definition trunk.c:91
static size_t trunk_request_states_len
Definition trunk.c:406
fr_dlist_head_t init
Connections which have not yet started connecting.
Definition trunk.c:239
fr_dlist_head_t * log_head
To allow the log entry to remove itself on free.
Definition trunk.c:77
static void trunk_request_enter_cancel_partial(trunk_request_t *treq)
Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot.
Definition trunk.c:1464
static void _trunk_connection_on_connected(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connected state.
Definition trunk.c:3497
trunk_t * trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, trunk_io_funcs_t const *funcs, trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start, fr_pair_list_t *trigger_args)
Allocate a new collection of connections.
Definition trunk.c:4995
fr_dlist_head_t to_free
Connections we're done with and will free on the next call to trunk_manage.
Definition trunk.c:267
trunk_request_t * partial
Partially written request.
Definition trunk.c:155
static void trunk_request_enter_failed(trunk_request_t *treq)
Request failed, inform the API client and free the request.
Definition trunk.c:1606
fr_minmax_heap_t * active
Connections which can service requests.
Definition trunk.c:244
conf_parser_t const trunk_config[]
Config parser definitions to populate a trunk_conf_t.
Definition trunk.c:341
static void trunk_request_enter_complete(trunk_request_t *treq)
Request completed successfully, inform the API client and free the request.
Definition trunk.c:1575
static void trunk_request_enter_sent(trunk_request_t *treq)
Transition a request to the sent state, indicating that it's been sent in its entirety.
Definition trunk.c:1293
#define DO_REQUEST_CANCEL_MUX(_tconn)
Write one or more cancellation requests to a connection.
Definition trunk.c:665
static void trunk_connection_enter_full(trunk_connection_t *tconn)
Transition a connection to the full state.
Definition trunk.c:3152
void trunk_request_free(trunk_request_t **treq_to_free)
If the trunk request is freed then update the target requests.
Definition trunk.c:2362
#define DO_REQUEST_DEMUX(_tconn)
Read one or more requests from a connection.
Definition trunk.c:649
static uint64_t trunk_connection_requests_dequeue(fr_dlist_head_t *out, trunk_connection_t *tconn, int states, uint64_t max)
Shift requests in the specified states onto new connections.
Definition trunk.c:1777
static int _trunk_request_free(trunk_request_t *treq)
Actually free the trunk request.
Definition trunk.c:2485
char const * log_prefix
What to prepend to messages.
Definition trunk.c:220
#define REQUEST_EXTRACT_PENDING(_treq)
Remove the current request from the pending list.
Definition trunk.c:743
static void _trunk_connection_lifetime_expire(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t now, void *uctx)
Trigger a reconnection of the trunk connection.
Definition trunk.c:3479
static void trunk_connection_event_update(trunk_connection_t *tconn)
Update the registrations for I/O events we're interested in.
Definition trunk.c:3032
static conf_parser_t const trunk_config_request[]
Definition trunk.c:324
fr_dlist_head_t full
Connections which have too many outstanding requests.
Definition trunk.c:246
#define DEQUEUE_ALL(_src_list, _state)
static void trunk_request_enter_backlog(trunk_request_t *treq, bool new)
Transition a request to the backlog state, adding it to the backlog of the trunk.
Definition trunk.c:1130
static fr_table_num_ordered_t const trunk_request_states[]
Definition trunk.c:391
static void _trunk_connection_on_connecting(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connecting state.
Definition trunk.c:3401
static fr_table_num_indexed_bit_pos_t const trunk_conn_trigger_names[]
Map connection states to trigger names.
Definition trunk.c:198
fr_dlist_head_t draining_to_free
Connections that will be freed once all their requests are complete.
Definition trunk.c:264
uint64_t id
Trunk request ID.
Definition trunk.c:104
uint64_t sent_count
The number of requests that have been sent using this connection.
Definition trunk.c:171
static void _trunk_connection_on_init(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the init state.
Definition trunk.c:3366
#define DO_CONNECTION_NOTIFY(_tconn, _events)
Change what events the connection should be notified about.
Definition trunk.c:705
#define TREQ_DLIST_VERIFY(_dlist, _state)
fr_dlist_head_t inactive
Connections which have been signalled to be inactive by the API client.
Definition trunk.c:249
bool trigger_undef[NUM_ELEMENTS(trunk_conn_trigger_names)]
Record that a specific trigger is undefined.
Definition trunk.c:317
void trunk_connection_manage_stop(trunk_t *trunk)
Stop the trunk from opening and closing connections in response to load.
Definition trunk.c:4867
#define TREQ_HEAP_VERIFY(_heap, _state)
void trunk_connection_signal_active(trunk_connection_t *tconn)
Signal a trunk connection is no longer full.
Definition trunk.c:4007
fr_dlist_head_t log
State change log.
Definition trunk.c:123
uint64_t tconn_id
If the treq was associated with a connection the connection ID.
Definition trunk.c:85
fr_dlist_t entry
Used to track the connection in the connecting, full and failed lists.
Definition trunk.c:141
static void trunk_request_enter_cancel_sent(trunk_request_t *treq)
Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list.
Definition trunk.c:1499
static void trunk_connection_enter_inactive(trunk_connection_t *tconn)
Transition a connection to the inactive state.
Definition trunk.c:3174
trunk_request_state_t from
What state we transitioned from.
Definition trunk.c:79
fr_pair_list_t * trigger_args
Passed to trigger.
Definition trunk.c:315
fr_dlist_head_t cancel_sent
Sent cancellation request.
Definition trunk.c:165
void trunk_connection_manage_start(trunk_t *trunk)
Allow the trunk to open and close connections in response to load.
Definition trunk.c:4856
fr_dlist_head_t inactive_draining
Connections which have been signalled to be inactive by the API client, which the trunk manager is dr...
Definition trunk.c:252
void trunk_connection_signal_inactive(trunk_connection_t *tconn)
Signal a trunk connection cannot accept more requests.
Definition trunk.c:3984
static int _state_log_entry_free(trunk_request_state_log_t *slog)
Used for sanity checks to ensure all log entries have been freed.
Definition trunk.c:2837
void trunk_connection_verify(char const *file, int line, trunk_connection_t *tconn)
Definition trunk.c:5164
fr_heap_t * backlog
The request backlog.
Definition trunk.c:229
#define IN_REQUEST_CANCEL_MUX(_trunk)
Definition trunk.c:725
void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
Definition trunk.c:5224
uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
Return a count of requests on a connection in a specific state.
Definition trunk.c:4570
void trunk_request_signal_cancel_partial(trunk_request_t *treq)
Signal a partial cancel write.
Definition trunk.c:2276
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
Definition trunk.c:2090
#define COUNT_BY_STATE(_state, _list)
void * uctx
Uctx data to pass to alloc.
Definition trunk.c:280
#define TREQ_OPTION_VERIFY(_option, _state)
bool trunk_connection_search(trunk_connection_t *tconn, void *ptr)
Definition trunk.c:5280
#define CONN_BAD_STATE_TRANSITION(_new)
Definition trunk.c:468
fr_heap_index_t heap_id
Used to track the request conn->pending heap.
Definition trunk.c:106
#define REQUEST_STATE_TRANSITION(_new)
Record a request state transition and log appropriate output.
Definition trunk.c:491
trunk_watch_entry_t * next_watcher
Watcher about to be run. Used to prevent nested watchers.
Definition trunk.c:284
static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Remove requests in specified states from a connection, attempting to distribute them to new connectio...
Definition trunk.c:1888
bool sent
Trunk request has been sent at least once.
Definition trunk.c:118
void trunk_request_signal_complete(trunk_request_t *treq)
Signal that a trunk request is complete.
Definition trunk.c:2134
static void trunk_connection_auto_unfull(trunk_connection_t *tconn)
Automatically mark a connection as active or reconnect it.
Definition trunk.c:2986
void trunk_connection_signal_reconnect(trunk_connection_t *tconn, connection_reason_t reason)
Signal a trunk connection is no longer viable.
Definition trunk.c:4046
void trunk_connection_signal_writable(trunk_connection_t *tconn)
Signal that a trunk connection is writable.
Definition trunk.c:3950
bool trunk_request_search(trunk_request_t *treq, void *ptr)
Definition trunk.c:5338
fr_dlist_t entry
List entry.
Definition trunk.c:186
static conf_parser_t const trunk_config_connection[]
Definition trunk.c:333
trunk_connection_state_t tconn_state
If the treq was associated with a connection the connection state at the time of the state transition...
Definition trunk.c:87
bool bound_to_conn
Fail the request if there's an attempt to re-enqueue it.
Definition trunk.c:115
static size_t trunk_cancellation_reasons_len
Definition trunk.c:435
static fr_table_num_ordered_t const trunk_cancellation_reasons[]
Definition trunk.c:429
static size_t trunk_conn_trigger_names_len
Definition trunk.c:210
fr_event_list_t * el
Event list used by this trunk and the connection.
Definition trunk.c:222
void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line, trunk_request_t const *treq)
Definition trunk.c:2875
#define IN_REQUEST_MUX(_trunk)
Definition trunk.c:723
fr_dlist_head_t free_requests
Requests in the unassigned state.
Definition trunk.c:226
bool trunk_connection_in_state(trunk_connection_t *tconn, int state)
Returns true if the trunk connection is in one of the specified states.
Definition trunk.c:4094
#define TREQ_HEAP_SEARCH(_heap)
#define REQUEST_EXTRACT_CANCEL_SENT(_treq)
Remove the current request from the cancel sent list.
Definition trunk.c:786
fr_dlist_t entry
Entry in the linked list.
Definition trunk.c:78
void trunk_request_signal_reapable(trunk_request_t *treq)
Signal that the request was written to a connection successfully, but no response is expected.
Definition trunk.c:2112
Associates request queues with a connection.
Definition trunk.c:133
Wraps a normal request.
Definition trunk.c:99
Trace state machine changes for a particular request.
Definition trunk.c:76
Main trunk management handle.
Definition trunk.c:215
An entry in a trunk watch function list.
Definition trunk.c:185
uint16_t max
Maximum number of connections in the trunk.
Definition trunk.h:232
uint32_t max_req_per_conn
Maximum requests per connection.
Definition trunk.h:241
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
Definition trunk.h:321
trunk_t *_CONST trunk
Trunk this request belongs to.
Definition trunk.h:352
bool backlog_on_failed_conn
Assign requests to the backlog when there are no available connections and the last connection event ...
Definition trunk.h:282
uint16_t min
Shouldn't let connections drop below this number.
Definition trunk.h:230
#define TRUNK_REQUEST_STATE_ALL
All request states.
Definition trunk.h:196
void *_CONST rctx
Resume ctx of the module.
Definition trunk.h:358
trunk_t *_CONST trunk
Trunk this connection belongs to.
Definition trunk.h:380
fr_heap_cmp_t connection_prioritise
Ordering function for connections.
Definition trunk.h:742
trunk_connection_state_t
Used for sanity checks and to track which list the connection is in.
Definition trunk.h:87
@ TRUNK_CONN_FULL
Connection is full and can't accept any more requests.
Definition trunk.h:95
@ TRUNK_CONN_CONNECTING
Connection is connecting.
Definition trunk.h:90
@ TRUNK_CONN_DRAINING
Connection will be closed once it has no more outstanding requests, if it's not reactivated.
Definition trunk.h:101
@ TRUNK_CONN_INACTIVE_DRAINING
Connection is inactive, can't accept any more requests, and will be closed once it has no more outsta...
Definition trunk.h:97
@ TRUNK_CONN_INACTIVE
Connection is inactive and can't accept any more requests.
Definition trunk.h:96
@ TRUNK_CONN_HALTED
Halted, ready to be freed.
Definition trunk.h:88
@ TRUNK_CONN_CLOSED
Connection was closed, either explicitly or due to failure.
Definition trunk.h:94
@ TRUNK_CONN_INIT
In the initial state.
Definition trunk.h:89
@ TRUNK_CONN_DRAINING_TO_FREE
Connection will be closed once it has no more outstanding requests.
Definition trunk.h:103
@ TRUNK_CONN_ACTIVE
Connection is connected and ready to service requests.
Definition trunk.h:91
unsigned req_pool_headers
How many chunk headers the talloc pool allocated with the treq should contain.
Definition trunk.h:267
request_t *_CONST request
The request that we're writing the data on behalf of.
Definition trunk.h:360
fr_time_t _CONST last_open
Last time the connection management function opened a connection.
Definition trunk.h:311
fr_time_delta_t idle_timeout
how long a connection can remain idle for
Definition trunk.h:251
trunk_connection_state_t _CONST state
What state the connection is in.
Definition trunk.h:372
size_t req_pool_size
The size of the talloc pool allocated with the treq.
Definition trunk.h:270
uint64_t max_uses
The maximum time a connection can be used.
Definition trunk.h:247
fr_time_delta_t lifetime
Time between reconnects.
Definition trunk.h:249
uint16_t connecting
Maximum number of connections that can be in the connecting state.
Definition trunk.h:234
uint64_t _CONST req_alloc_reused
How many requests were reused.
Definition trunk.h:335
uint32_t max_backlog
Maximum number of requests that can be in the backlog.
Definition trunk.h:245
fr_time_t _CONST last_failed
Last time a connection failed.
Definition trunk.h:319
trunk_request_state_t _CONST state
Which list the request is now located in.
Definition trunk.h:350
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
Definition trunk.h:376
trunk_connection_t *_CONST tconn
Connection this request belongs to.
Definition trunk.h:354
trunk_connection_alloc_t connection_alloc
Allocate a new connection_t.
Definition trunk.h:738
fr_time_t _CONST last_read_success
Last time we read a response.
Definition trunk.h:323
fr_time_t _CONST last_below_target
Last time average utilisation went below the target value.
Definition trunk.h:308
fr_time_t _CONST last_read_success
Last time we read from the connection.
Definition trunk.h:378
fr_time_delta_t close_delay
How long we must be below target utilisation to close an existing connection.
Definition trunk.h:256
uint16_t start
How many connections to start.
Definition trunk.h:228
fr_time_delta_t req_cleanup_delay
How long must a request in the unassigned (free) list not have been used for before it's cleaned up a...
Definition trunk.h:260
#define TRUNK_REQUEST_STATE_CANCEL_ALL
All requests in various cancellation states.
Definition trunk.h:214
bool always_writable
Set to true if our ability to write requests to a connection handle is not dependent on the state of ...
Definition trunk.h:272
trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
Definition trunk.h:72
@ TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
Definition trunk.h:79
@ TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
Definition trunk.h:77
@ TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
Definition trunk.h:73
@ TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
Definition trunk.h:75
#define TRUNK_CONN_ALL
All connection states.
Definition trunk.h:111
fr_heap_cmp_t request_prioritise
Ordering function for requests.
Definition trunk.h:744
uint64_t _CONST req_alloc
The number of requests currently allocated that have not been freed or returned to the free list.
Definition trunk.h:329
trunk_cancel_reason_t
Reasons for a request being cancelled.
Definition trunk.h:55
@ TRUNK_CANCEL_REASON_NONE
Request has not been cancelled.
Definition trunk.h:56
@ TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
Definition trunk.h:57
@ TRUNK_CANCEL_REASON_REQUEUE
A previously sent request is being requeued.
Definition trunk.h:59
@ TRUNK_CANCEL_REASON_MOVE
Request cancelled because it's being moved.
Definition trunk.h:58
uint64_t _CONST req_alloc_new
How many requests we've allocated.
Definition trunk.h:333
fr_time_delta_t open_delay
How long we must be above target utilisation to spawn a new connection.
Definition trunk.h:253
connection_t *_CONST conn
The underlying connection.
Definition trunk.h:374
trunk_state_t
Definition trunk.h:62
@ TRUNK_STATE_MAX
Definition trunk.h:66
@ TRUNK_STATE_PENDING
Trunk has connections, but none are active.
Definition trunk.h:65
@ TRUNK_STATE_ACTIVE
Trunk has active connections.
Definition trunk.h:64
@ TRUNK_STATE_IDLE
Trunk has no connections.
Definition trunk.h:63
fr_time_t _CONST last_closed
Last time the connection management function closed a connection.
Definition trunk.h:314
void(* trunk_watch_t)(trunk_t *trunk, trunk_state_t prev, trunk_state_t state, void *uctx)
Receive a notification when a trunk enters a particular state.
Definition trunk.h:729
fr_time_delta_t manage_interval
How often we run the management algorithm to open/close connections.
Definition trunk.h:264
trunk_enqueue_t
Definition trunk.h:149
@ TRUNK_ENQUEUE_DST_UNAVAILABLE
Destination is down.
Definition trunk.h:154
@ TRUNK_ENQUEUE_FAIL
General failure.
Definition trunk.h:155
@ TRUNK_ENQUEUE_OK
Operation was successful.
Definition trunk.h:151
@ TRUNK_ENQUEUE_NO_CAPACITY
At maximum number of connections, and no connection has capacity.
Definition trunk.h:152
@ TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
Definition trunk.h:150
void *_CONST preq
Data for the muxer to write to the connection.
Definition trunk.h:356
uint32_t target_req_per_conn
How many pending requests should ideally be running on each connection.
Definition trunk.h:237
fr_time_t _CONST last_connected
Last time a connection connected.
Definition trunk.h:317
trunk_request_cancel_mux_t request_cancel_mux
!< Read one or more requests from a connection.
Definition trunk.h:751
trunk_request_state_t
Used for sanity checks and to simplify freeing.
Definition trunk.h:162
@ TRUNK_REQUEST_STATE_PARTIAL
Some of the request was written to the socket, more of it should be written later.
Definition trunk.h:171
@ TRUNK_REQUEST_STATE_REAPABLE
Request has been written, needs to persist, but we are not currently waiting for any response.
Definition trunk.h:174
@ TRUNK_REQUEST_STATE_UNASSIGNED
Transition state - Request currently not assigned to any connection.
Definition trunk.h:166
@ TRUNK_REQUEST_STATE_INIT
Initial state.
Definition trunk.h:163
@ TRUNK_REQUEST_STATE_CANCEL_SENT
We've informed the remote server that the request has been cancelled.
Definition trunk.h:186
@ TRUNK_REQUEST_STATE_COMPLETE
The request is complete.
Definition trunk.h:183
@ TRUNK_REQUEST_STATE_FAILED
The request failed.
Definition trunk.h:184
@ TRUNK_REQUEST_STATE_CANCEL
A request on a particular socket was cancel.
Definition trunk.h:185
@ TRUNK_REQUEST_STATE_CANCEL_PARTIAL
We partially wrote a cancellation request.
Definition trunk.h:188
@ TRUNK_REQUEST_STATE_BACKLOG
In the backlog.
Definition trunk.h:168
@ TRUNK_REQUEST_STATE_CANCEL_COMPLETE
Remote server has acknowledged our cancellation.
Definition trunk.h:189
@ TRUNK_REQUEST_STATE_PENDING
In the queue of a connection and is pending writing.
Definition trunk.h:169
@ TRUNK_REQUEST_STATE_SENT
Was written to a socket. Waiting for a response.
Definition trunk.h:173
trunk_state_t _CONST state
Current state of the trunk.
Definition trunk.h:338
fr_time_t _CONST last_above_target
Last time average utilisation went above the target value.
Definition trunk.h:305
Common configuration parameters for a trunk.
Definition trunk.h:225
Public fields for the trunk connection.
Definition trunk.h:371
I/O functions to pass to trunk_alloc.
Definition trunk.h:737
Public fields for the trunk.
Definition trunk.h:301
Public fields for the trunk request.
Definition trunk.h:349
static fr_event_list_t * el
static fr_slen_t head
Definition xlat.h:420
static fr_slen_t parent
Definition pair.h:858
char const * fr_strerror(void)
Get the last library error.
Definition strerror.c:553
#define fr_box_time_delta(_val)
Definition value.h:366
int nonnull(2, 5))
static size_t char ** out
Definition value.h:1030