The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
trunk.c
Go to the documentation of this file.
1 /*
2  * This program is is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or (at
5  * your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: 58449016519ccdcb6716416e983c52d0bb90e864 $
19  *
20  * @file src/lib/server/trunk.c
21  * @brief A management API for bonding multiple connections together.
22  *
23  * @copyright 2019-2020 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
24  * @copyright 2019-2020 The FreeRADIUS server project
25  */
26 
27 #define LOG_PREFIX trunk->log_prefix
28 
29 #ifdef NDEBUG
30 # define TALLOC_GET_TYPE_ABORT_NOOP 1
31 #endif
32 
35 typedef struct fr_trunk_s fr_trunk_t;
36 #define _TRUNK_PRIVATE 1
37 #include <freeradius-devel/server/trunk.h>
38 
39 #include <freeradius-devel/server/connection.h>
40 #include <freeradius-devel/server/trigger.h>
41 #include <freeradius-devel/util/misc.h>
42 #include <freeradius-devel/util/syserror.h>
43 #include <freeradius-devel/util/table.h>
44 #include <freeradius-devel/util/minmax_heap.h>
45 
46 #ifdef HAVE_STDATOMIC_H
47 # include <stdatomic.h>
48 #else
49 # include <freeradius-devel/util/stdatomic.h>
50 #endif
51 
52 static atomic_uint_fast64_t request_counter = ATOMIC_VAR_INIT(1);
53 
54 #ifdef TESTING_TRUNK
56 
57 static fr_time_t test_time(void)
58 {
59  return test_time_base;
60 }
61 
62 #define fr_time test_time
63 #endif
64 
65 #ifndef NDEBUG
66 /** The maximum number of state logs to record per request
67  *
68  */
69 #define FR_TRUNK_REQUEST_STATE_LOG_MAX 20
70 
71 /** Trace state machine changes for a particular request
72  *
73  */
74 typedef struct {
75  fr_dlist_head_t *log_head; //!< To allow the log entry to remove itself on free.
76  fr_dlist_t entry; //!< Entry in the linked list.
77  fr_trunk_request_state_t from; //!< What state we transitioned from.
78  fr_trunk_request_state_t to; //!< What state we transitioned to.
79 
80  fr_trunk_connection_t *tconn; //!< The request was associated with.
81  ///< Pointer may now be invalid, do no de-reference.
82 
83  uint64_t tconn_id; //!< If the treq was associated with a connection
84  ///< the connection ID.
85  fr_trunk_connection_state_t tconn_state; //!< If the treq was associated with a connection
86  ///< the connection state at the time of the
87  ///< state transition.
88 
89  char const *function; //!< State change occurred in.
90  int line; //!< Line change occurred on.
92 #endif
93 
94 /** Wraps a normal request
95  *
96  */
98  struct fr_trunk_request_pub_s pub; //!< Public fields in the trunk request.
99  ///< This *MUST* be the first field in this
100  ///< structure.
101 
102  uint64_t id; //!< Trunk request ID.
103 
104  fr_heap_index_t heap_id; //!< Used to track the request conn->pending heap.
105 
106  fr_dlist_t entry; //!< Used to track the trunk request in the conn->sent
107  ///< or trunk->backlog request.
108 
109  fr_trunk_cancel_reason_t cancel_reason; //!< Why this request was cancelled.
110 
111  fr_time_t last_freed; //!< Last time this request was freed.
112 
113  bool bound_to_conn; //!< Fail the request if there's an attempt to
114  ///< re-enqueue it.
115 
116 #ifndef NDEBUG
117  fr_dlist_head_t log; //!< State change log.
118 #endif
119 };
120 
121 
122 /** Associates request queues with a connection
123  *
124  * @dotfile src/lib/server/trunk_conn.gv "Trunk connection state machine"
125  * @dotfile src/lib/server/trunk_req.gv "Trunk request state machine"
126  */
128  struct fr_trunk_connection_pub_s pub; //!< Public fields in the trunk connection.
129  ///< This *MUST* be the first field in this
130  ///< structure.
131 
132  fr_heap_index_t heap_id; //!< Used to track the connection in the connected
133  ///< heap.
134 
135  fr_dlist_t entry; //!< Used to track the connection in the connecting,
136  ///< full and failed lists.
137 
138  /** @name State
139  * @{
140  */
141  fr_trunk_connection_event_t events; //!< The current events we expect to be notified on.
142  /** @} */
143 
144  /** @name Request lists
145  * @{
146  */
147  fr_heap_t *pending; //!< Requests waiting to be sent.
148 
149  fr_trunk_request_t *partial; //!< Partially written request.
150 
151  fr_dlist_head_t sent; //!< Sent request.
152 
153  fr_dlist_head_t cancel; //!< Requests in the cancel state.
154 
155  fr_trunk_request_t *cancel_partial; //!< Partially written cancellation request.
156 
157  fr_dlist_head_t cancel_sent; //!< Sent cancellation request.
158  /** @} */
159 
160  /** @name Statistics
161  * @{
162  */
163  uint64_t sent_count; //!< The number of requests that have been sent using
164  ///< this connection.
165  /** @} */
166 
167  /** @name Timers
168  * @{
169  */
170  fr_event_timer_t const *lifetime_ev; //!< Maximum time this connection can be open.
171  /** @} */
172 };
173 
174 /** An entry in a trunk watch function list
175  *
176  */
177 typedef struct fr_trunk_watch_entry_s {
178  fr_dlist_t entry; //!< List entry.
179  fr_trunk_watch_t func; //!< Function to call when a trunk enters
180  ///< the state this list belongs to
181  bool oneshot; //!< Remove the function after it's called once.
182  bool enabled; //!< Whether the watch entry is enabled.
183  void *uctx; //!< User data to pass to the function.
185 
186 /** Main trunk management handle
187  *
188  */
189 struct fr_trunk_s {
190  struct fr_trunk_pub_s pub; //!< Public fields in the trunk connection.
191  ///< This *MUST* be the first field in this
192  ///< structure.
193 
194  char const *log_prefix; //!< What to prepend to messages.
195 
196  fr_event_list_t *el; //!< Event list used by this trunk and the connection.
197 
198  fr_trunk_conf_t conf; //!< Trunk common configuration.
199 
200  fr_dlist_head_t free_requests; //!< Requests in the unassigned state. Waiting to be
201  ///< enqueued.
202 
203  fr_heap_t *backlog; //!< The request backlog. Requests we couldn't
204  ///< immediately assign to a connection.
205 
206  /** @name Connection lists
207  *
208  * A connection must always be in exactly one of these lists
209  * or trees.
210  *
211  * @{
212  */
213  fr_dlist_head_t init; //!< Connections which have not yet started
214  ///< connecting.
215 
216  fr_dlist_head_t connecting; //!< Connections which are not yet in the open state.
217 
218  fr_minmax_heap_t *active; //!< Connections which can service requests.
219 
220  fr_dlist_head_t full; //!< Connections which have too many outstanding
221  ///< requests.
222 
223  fr_dlist_head_t inactive; //!< Connections which have been signalled to be
224  ///< inactive by the API client.
225 
226  fr_dlist_head_t inactive_draining; //!< Connections which have been signalled to be
227  ///< inactive by the API client, which the trunk
228  ///< manager is draining to close.
229 
230  fr_dlist_head_t failed; //!< Connections that'll be reconnected shortly.
231 
232  fr_dlist_head_t closed; //!< Connections that have closed. Either due to
233  ///< shutdown, reconnection or failure.
234 
235  fr_dlist_head_t draining; //!< Connections that will be freed once all their
236  ///< requests are complete, but can be reactivated.
237 
238  fr_dlist_head_t draining_to_free; //!< Connections that will be freed once all their
239  ///< requests are complete.
240 
241  fr_dlist_head_t to_free; //!< Connections we're done with and will free on
242  //!< the next call to trunk_manage.
243  //!< This prevents connections from being freed
244  //!< whilst we're inside callbacks.
245  /** @} */
246 
247  /** @name Callbacks
248  * @{
249  */
250  fr_trunk_io_funcs_t funcs; //!< I/O functions.
251 
252  void *in_handler; //!< Which handler we're inside.
253 
254  void *uctx; //!< Uctx data to pass to alloc.
255 
256  fr_dlist_head_t watch[FR_TRUNK_STATE_MAX]; //!< To be called when trunk changes state.
257 
258  fr_trunk_watch_entry_t *next_watcher; //!< Watcher about to be run. Used to prevent nested watchers.
259  /** @} */
260 
261  /** @name Timers
262  * @{
263  */
264  fr_event_timer_t const *manage_ev; //!< Periodic connection management event.
265  /** @} */
266 
267  /** @name Log rate limiting entries
268  * @{
269  */
270  fr_rate_limit_t limit_max_requests_alloc_log; //!< Rate limit on "Refusing to alloc requests - Limit of * requests reached"
271 
272  fr_rate_limit_t limit_last_failure_log; //!< Rate limit on "Refusing to enqueue requests - No active conns"
273  /** @} */
274 
275  /** @name State
276  * @{
277  */
278  bool freeing; //!< Trunk is being freed, don't spawn new
279  ///< connections or re-enqueue.
280 
281  bool started; //!< Has the trunk been started.
282 
283  bool managing_connections; //!< Whether the trunk is allowed to manage
284  ///< (open/close) connections.
285 
286  uint64_t last_req_per_conn; //!< The last request to connection ratio we calculated.
287  /** @} */
288 };
289 
291  { FR_CONF_OFFSET("per_connection_max", fr_trunk_conf_t, max_req_per_conn), .dflt = "2000" },
292  { FR_CONF_OFFSET("per_connection_target", fr_trunk_conf_t, target_req_per_conn), .dflt = "1000" },
293  { FR_CONF_OFFSET("free_delay", fr_trunk_conf_t, req_cleanup_delay), .dflt = "10.0" },
294 
296 };
297 
299  { FR_CONF_OFFSET("connect_timeout", fr_connection_conf_t, connection_timeout), .dflt = "3.0" },
300  { FR_CONF_OFFSET("reconnect_delay", fr_connection_conf_t, reconnection_delay), .dflt = "1" },
301 
303 };
304 
305 #ifndef TRUNK_TESTS
307  { FR_CONF_OFFSET("start", fr_trunk_conf_t, start), .dflt = "5" },
308  { FR_CONF_OFFSET("min", fr_trunk_conf_t, min), .dflt = "1" },
309  { FR_CONF_OFFSET("max", fr_trunk_conf_t, max), .dflt = "5" },
310  { FR_CONF_OFFSET("connecting", fr_trunk_conf_t, connecting), .dflt = "2" },
311  { FR_CONF_OFFSET("uses", fr_trunk_conf_t, max_uses), .dflt = "0" },
312  { FR_CONF_OFFSET("lifetime", fr_trunk_conf_t, lifetime), .dflt = "0" },
313 
314  { FR_CONF_OFFSET("open_delay", fr_trunk_conf_t, open_delay), .dflt = "0.2" },
315  { FR_CONF_OFFSET("close_delay", fr_trunk_conf_t, close_delay), .dflt = "10.0" },
316 
317  { FR_CONF_OFFSET("manage_interval", fr_trunk_conf_t, manage_interval), .dflt = "0.2" },
318 
319  { FR_CONF_OFFSET_SUBSECTION("connection", 0, fr_trunk_conf_t, conn_conf, fr_trunk_config_connection), .subcs_size = sizeof(fr_trunk_config_connection) },
320  { FR_CONF_POINTER("request", 0, CONF_FLAG_SUBSECTION, NULL), .subcs = (void const *) fr_trunk_config_request },
321 
323 };
324 #endif
325 
326 #ifndef NDEBUG
327 /** Map request states to trigger names
328  *
329  * Must stay in the same order as #fr_trunk_connection_state_t
330  */
332  { L("pool.request_init"), FR_TRUNK_REQUEST_STATE_INIT }, /* 0x0000 - bit 0 */
333  { L("pool.request_unassigned"), FR_TRUNK_REQUEST_STATE_UNASSIGNED }, /* 0x0001 - bit 1 */
334  { L("pool.request_backlog"), FR_TRUNK_REQUEST_STATE_BACKLOG }, /* 0x0002 - bit 2 */
335  { L("pool.request_pending"), FR_TRUNK_REQUEST_STATE_PENDING }, /* 0x0004 - bit 3 */
336  { L("pool.request_partial"), FR_TRUNK_REQUEST_STATE_PARTIAL }, /* 0x0008 - bit 4 */
337  { L("pool.request_sent"), FR_TRUNK_REQUEST_STATE_SENT }, /* 0x0010 - bit 5 */
338  { L("pool.request_complete"), FR_TRUNK_REQUEST_STATE_COMPLETE }, /* 0x0020 - bit 6 */
339  { L("pool.request_state_failed"), FR_TRUNK_REQUEST_STATE_FAILED }, /* 0x0040 - bit 7 */
340  { L("pool.request_state_cancel"), FR_TRUNK_REQUEST_STATE_CANCEL }, /* 0x0080 - bit 8 */
341  { L("pool.request_state_cancel_sent"), FR_TRUNK_REQUEST_STATE_CANCEL_SENT }, /* 0x0100 - bit 9 */
342  { L("pool.request_state_cancel_partial"), FR_TRUNK_REQUEST_STATE_CANCEL_PARTIAL }, /* 0x0200 - bit 10 */
343  { L("pool.request_state_cancel_complete"), FR_TRUNK_REQUEST_STATE_CANCEL_COMPLETE } /* 0x0400 - bit 11 */
344 };
346 #endif
347 
349  { L("INIT"), FR_TRUNK_REQUEST_STATE_INIT },
350  { L("UNASSIGNED"), FR_TRUNK_REQUEST_STATE_UNASSIGNED },
351  { L("BACKLOG"), FR_TRUNK_REQUEST_STATE_BACKLOG },
352  { L("PENDING"), FR_TRUNK_REQUEST_STATE_PENDING },
353  { L("PARTIAL"), FR_TRUNK_REQUEST_STATE_PARTIAL },
354  { L("SENT"), FR_TRUNK_REQUEST_STATE_SENT },
355  { L("COMPLETE"), FR_TRUNK_REQUEST_STATE_COMPLETE },
356  { L("FAILED"), FR_TRUNK_REQUEST_STATE_FAILED },
357  { L("CANCEL"), FR_TRUNK_REQUEST_STATE_CANCEL },
358  { L("CANCEL-SENT"), FR_TRUNK_REQUEST_STATE_CANCEL_SENT },
359  { L("CANCEL-PARTIAL"), FR_TRUNK_REQUEST_STATE_CANCEL_PARTIAL },
360  { L("CANCEL-COMPLETE"), FR_TRUNK_REQUEST_STATE_CANCEL_COMPLETE }
361 };
363 
364 /** Map connection states to trigger names
365  *
366  * Must stay in the same order as #fr_trunk_connection_state_t
367  */
369  { L("pool.connection_halted"), FR_TRUNK_CONN_HALTED }, /* 0x0000 - bit 0 */
370  { L("pool.connection_init"), FR_TRUNK_CONN_INIT }, /* 0x0001 - bit 1 */
371  { L("pool.connection_connecting"), FR_TRUNK_CONN_CONNECTING }, /* 0x0002 - bit 2 */
372  { L("pool.connection_active"), FR_TRUNK_CONN_ACTIVE }, /* 0x0004 - bit 3 */
373  { L("pool.connection_closed"), FR_TRUNK_CONN_CLOSED }, /* 0x0008 - bit 4 */
374  { L("pool.connection_full"), FR_TRUNK_CONN_FULL }, /* 0x0010 - bit 5 */
375  { L("pool.connection_inactive"), FR_TRUNK_CONN_INACTIVE }, /* 0x0020 - bit 6 */
376  { L("pool.connection_inactive_draining"), FR_TRUNK_CONN_INACTIVE_DRAINING }, /* 0x0040 - bit 7 */
377  { L("pool.connection_draining"), FR_TRUNK_CONN_DRAINING }, /* 0x0080 - bit 8 */
378  { L("pool.connection_draining_to_free"), FR_TRUNK_CONN_DRAINING_TO_FREE } /* 0x0100 - bit 9 */
379 };
381 
383  { L("IDLE"), FR_TRUNK_STATE_IDLE },
384  { L("ACTIVE"), FR_TRUNK_STATE_ACTIVE },
385  { L("PENDING"), FR_TRUNK_STATE_PENDING }
386 };
388 
390  { L("INIT"), FR_TRUNK_CONN_INIT },
391  { L("HALTED"), FR_TRUNK_CONN_HALTED },
392  { L("CONNECTING"), FR_TRUNK_CONN_CONNECTING },
393  { L("ACTIVE"), FR_TRUNK_CONN_ACTIVE },
394  { L("CLOSED"), FR_TRUNK_CONN_CLOSED },
395  { L("FULL"), FR_TRUNK_CONN_FULL },
396  { L("INACTIVE"), FR_TRUNK_CONN_INACTIVE },
397  { L("INACTIVE-DRAINING"), FR_TRUNK_CONN_INACTIVE_DRAINING },
398  { L("DRAINING"), FR_TRUNK_CONN_DRAINING },
399  { L("DRAINING-TO-FREE"), FR_TRUNK_CONN_DRAINING_TO_FREE }
400 };
402 
404  { L("FR_TRUNK_CANCEL_REASON_NONE"), FR_TRUNK_CANCEL_REASON_NONE },
405  { L("FR_TRUNK_CANCEL_REASON_SIGNAL"), FR_TRUNK_CANCEL_REASON_SIGNAL },
406  { L("FR_TRUNK_CANCEL_REASON_MOVE"), FR_TRUNK_CANCEL_REASON_MOVE },
407  { L("FR_TRUNK_CANCEL_REASON_REQUEUE"), FR_TRUNK_CANCEL_REASON_REQUEUE }
408 };
410 
412  { L("FR_TRUNK_CONN_EVENT_NONE"), FR_TRUNK_CONN_EVENT_NONE },
413  { L("FR_TRUNK_CONN_EVENT_READ"), FR_TRUNK_CONN_EVENT_READ },
414  { L("FR_TRUNK_CONN_EVENT_WRITE"), FR_TRUNK_CONN_EVENT_WRITE },
415  { L("FR_TRUNK_CONN_EVENT_BOTH"), FR_TRUNK_CONN_EVENT_BOTH },
416 };
418 
419 #define CONN_TRIGGER(_state) do { \
420  if (trunk->pub.triggers) { \
421  trigger_exec(unlang_interpret_get_thread_default(), \
422  NULL, fr_table_str_by_value(fr_trunk_conn_trigger_names, _state, \
423  "<INVALID>"), true, NULL); \
424  } \
425 } while (0)
426 
427 #define CONN_STATE_TRANSITION(_new, _log) \
428 do { \
429  _log("[%" PRIu64 "] Trunk connection changed state %s -> %s", \
430  tconn->pub.conn->id, \
431  fr_table_str_by_value(fr_trunk_connection_states, tconn->pub.state, "<INVALID>"), \
432  fr_table_str_by_value(fr_trunk_connection_states, _new, "<INVALID>")); \
433  tconn->pub.state = _new; \
434  CONN_TRIGGER(_new); \
435  trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false); \
436 } while (0)
437 
438 #define CONN_BAD_STATE_TRANSITION(_new) \
439 do { \
440  if (!fr_cond_assert_msg(0, "[%" PRIu64 "] Trunk connection invalid transition %s -> %s", \
441  tconn->pub.conn->id, \
442  fr_table_str_by_value(fr_trunk_connection_states, tconn->pub.state, "<INVALID>"), \
443  fr_table_str_by_value(fr_trunk_connection_states, _new, "<INVALID>"))) return; \
444 } while (0)
445 
446 #ifndef NDEBUG
447 void trunk_request_state_log_entry_add(char const *function, int line,
449 
450 #define REQUEST_TRIGGER(_state) do { \
451  if (trunk->pub.triggers) { \
452  trigger_exec(unlang_interpret_get_thread_default(), \
453  NULL, fr_table_str_by_value(fr_trunk_req_trigger_names, _state, \
454  "<INVALID>"), true, NULL); \
455  } \
456 } while (0)
457 
458 /** Record a request state transition and log appropriate output
459  *
460  */
461 #define REQUEST_STATE_TRANSITION(_new) \
462 do { \
463  request_t *request = treq->pub.request; \
464  ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
465  treq->id, \
466  fr_table_str_by_value(fr_trunk_request_states, treq->pub.state, "<INVALID>"), \
467  fr_table_str_by_value(fr_trunk_request_states, _new, "<INVALID>")); \
468  trunk_request_state_log_entry_add(__FUNCTION__, __LINE__, treq, _new); \
469  treq->pub.state = _new; \
470  REQUEST_TRIGGER(_new); \
471 } while (0)
472 #define REQUEST_BAD_STATE_TRANSITION(_new) \
473 do { \
474  fr_trunk_request_state_log(&default_log, L_ERR, __FILE__, __LINE__, treq); \
475  if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
476  treq->id, \
477  fr_table_str_by_value(fr_trunk_request_states, treq->pub.state, "<INVALID>"), \
478  fr_table_str_by_value(fr_trunk_request_states, _new, "<INVALID>"))) return; \
479 } while (0)
480 #else
481 /** Record a request state transition
482  *
483  */
484 #define REQUEST_STATE_TRANSITION(_new) \
485 do { \
486  request_t *request = treq->pub.request; \
487  ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
488  treq->id, \
489  fr_table_str_by_value(fr_trunk_request_states, treq->pub.state, "<INVALID>"), \
490  fr_table_str_by_value(fr_trunk_request_states, _new, "<INVALID>")); \
491  treq->pub.state = _new; \
492 } while (0)
493 #define REQUEST_BAD_STATE_TRANSITION(_new) \
494 do { \
495  if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
496  treq->id, \
497  fr_table_str_by_value(fr_trunk_request_states, treq->pub.state, "<INVALID>"), \
498  fr_table_str_by_value(fr_trunk_request_states, _new, "<INVALID>"))) return; \
499 } while (0)
500 #endif
501 
502 
503 /** Call the cancel callback if set
504  *
505  */
506 #define DO_REQUEST_CANCEL(_treq, _reason) \
507 do { \
508  if ((_treq)->pub.trunk->funcs.request_cancel) { \
509  request_t *request = (_treq)->pub.request; \
510  void *_prev = (_treq)->pub.trunk->in_handler; \
511  (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_cancel; \
512  ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_cancel(conn=%p, preq=%p, reason=%s, uctx=%p)", \
513  (_treq)->pub.tconn->pub.conn, \
514  (_treq)->pub.preq, \
515  fr_table_str_by_value(fr_trunk_cancellation_reasons, \
516  (_reason), \
517  "<INVALID>"), \
518  (_treq)->pub.trunk->uctx); \
519  (_treq)->pub.trunk->funcs.request_cancel((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_reason), (_treq)->pub.trunk->uctx); \
520  (_treq)->pub.trunk->in_handler = _prev; \
521  } \
522 } while(0)
523 
524 /** Call the "conn_release" callback (if set)
525  *
526  */
527 #define DO_REQUEST_CONN_RELEASE(_treq) \
528 do { \
529  if ((_treq)->pub.trunk->funcs.request_conn_release) { \
530  request_t *request = (_treq)->pub.request; \
531  void *_prev = (_treq)->pub.trunk->in_handler; \
532  (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_conn_release; \
533  ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_conn_release(conn=%p, preq=%p, uctx=%p)", \
534  (_treq)->pub.tconn->pub.conn, \
535  (_treq)->pub.preq, \
536  (_treq)->pub.trunk->uctx); \
537  (_treq)->pub.trunk->funcs.request_conn_release((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
538  (_treq)->pub.trunk->in_handler = _prev; \
539  } \
540 } while(0)
541 
542 /** Call the complete callback (if set)
543  *
544  */
545 #define DO_REQUEST_COMPLETE(_treq) \
546 do { \
547  if ((_treq)->pub.trunk->funcs.request_complete) { \
548  request_t *request = (_treq)->pub.request; \
549  void *_prev = (_treq)->pub.trunk->in_handler; \
550  ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_complete(request=%p, preq=%p, rctx=%p, uctx=%p)", \
551  (_treq)->pub.request, \
552  (_treq)->pub.preq, \
553  (_treq)->pub.rctx, \
554  (_treq)->pub.trunk->uctx); \
555  (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_complete; \
556  (_treq)->pub.trunk->funcs.request_complete((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, (_treq)->pub.trunk->uctx); \
557  (_treq)->pub.trunk->in_handler = _prev; \
558  } \
559 } while(0)
560 
561 /** Call the fail callback (if set)
562  *
563  */
564 #define DO_REQUEST_FAIL(_treq, _prev_state) \
565 do { \
566  if ((_treq)->pub.trunk->funcs.request_fail) { \
567  request_t *request = (_treq)->pub.request; \
568  void *_prev = (_treq)->pub.trunk->in_handler; \
569  ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_fail(request=%p, preq=%p, rctx=%p, state=%s uctx=%p)", \
570  (_treq)->pub.request, \
571  (_treq)->pub.preq, \
572  (_treq)->pub.rctx, \
573  fr_table_str_by_value(fr_trunk_request_states, (_prev_state), "<INVALID>"), \
574  (_treq)->pub.trunk->uctx); \
575  (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_fail; \
576  (_treq)->pub.trunk->funcs.request_fail((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, _prev_state, (_treq)->pub.trunk->uctx); \
577  (_treq)->pub.trunk->in_handler = _prev; \
578  } \
579 } while(0)
580 
581 /** Call the free callback (if set)
582  *
583  */
584 #define DO_REQUEST_FREE(_treq) \
585 do { \
586  if ((_treq)->pub.trunk->funcs.request_free) { \
587  request_t *request = (_treq)->pub.request; \
588  void *_prev = (_treq)->pub.trunk->in_handler; \
589  ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_free(request=%p, preq=%p, uctx=%p)", \
590  (_treq)->pub.request, \
591  (_treq)->pub.preq, \
592  (_treq)->pub.trunk->uctx); \
593  (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_free; \
594  (_treq)->pub.trunk->funcs.request_free((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
595  (_treq)->pub.trunk->in_handler = _prev; \
596  } \
597 } while(0)
598 
599 /** Write one or more requests to a connection
600  *
601  */
602 #define DO_REQUEST_MUX(_tconn) \
603 do { \
604  void *_prev = (_tconn)->pub.trunk->in_handler; \
605  DEBUG3("[%" PRIu64 "] Calling request_mux(el=%p, tconn=%p, conn=%p, uctx=%p)", \
606  (_tconn)->pub.conn->id, \
607  (_tconn)->pub.trunk->el, \
608  (_tconn), \
609  (_tconn)->pub.conn, \
610  (_tconn)->pub.trunk->uctx); \
611  (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_mux; \
612  (_tconn)->pub.trunk->funcs.request_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
613  (_tconn)->pub.trunk->in_handler = _prev; \
614 } while(0)
615 
616 /** Read one or more requests from a connection
617  *
618  */
619 #define DO_REQUEST_DEMUX(_tconn) \
620 do { \
621  void *_prev = (_tconn)->pub.trunk->in_handler; \
622  DEBUG3("[%" PRIu64 "] Calling request_demux(tconn=%p, conn=%p, uctx=%p)", \
623  (_tconn)->pub.conn->id, \
624  (_tconn), \
625  (_tconn)->pub.conn, \
626  (_tconn)->pub.trunk->uctx); \
627  (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_demux; \
628  (_tconn)->pub.trunk->funcs.request_demux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
629  (_tconn)->pub.trunk->in_handler = _prev; \
630 } while(0)
631 
632 /** Write one or more cancellation requests to a connection
633  *
634  */
635 #define DO_REQUEST_CANCEL_MUX(_tconn) \
636 do { \
637  if ((_tconn)->pub.trunk->funcs.request_cancel_mux) { \
638  void *_prev = (_tconn)->pub.trunk->in_handler; \
639  DEBUG3("[%" PRIu64 "] Calling request_cancel_mux(tconn=%p, conn=%p, uctx=%p)", \
640  (_tconn)->pub.conn->id, \
641  (_tconn), \
642  (_tconn)->pub.conn, \
643  (_tconn)->pub.trunk->uctx); \
644  (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_cancel_mux; \
645  (_tconn)->pub.trunk->funcs.request_cancel_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
646  (_tconn)->pub.trunk->in_handler = _prev; \
647  } \
648 } while(0)
649 
650 /** Allocate a new connection
651  *
652  */
653 #define DO_CONNECTION_ALLOC(_tconn) \
654 do { \
655  void *_prev = trunk->in_handler; \
656  DEBUG3("Calling connection_alloc(tconn=%p, el=%p, conf=%p, log_prefix=\"%s\", uctx=%p)", \
657  (_tconn), \
658  (_tconn)->pub.trunk->el, \
659  (_tconn)->pub.trunk->conf.conn_conf, \
660  trunk->log_prefix, \
661  (_tconn)->pub.trunk->uctx); \
662  (_tconn)->pub.trunk->in_handler = (void *) (_tconn)->pub.trunk->funcs.connection_alloc; \
663  (_tconn)->pub.conn = trunk->funcs.connection_alloc((_tconn), (_tconn)->pub.trunk->el, (_tconn)->pub.trunk->conf.conn_conf, (_tconn)->pub.trunk->log_prefix, trunk->uctx); \
664  (_tconn)->pub.trunk->in_handler = _prev; \
665  if (!(_tconn)->pub.conn) { \
666  ERROR("Failed creating new connection"); \
667  talloc_free(tconn); \
668  return -1; \
669  } \
670 } while(0)
671 
672 /** Change what events the connection should be notified about
673  *
674  */
675 #define DO_CONNECTION_NOTIFY(_tconn, _events) \
676 do { \
677  if ((_tconn)->pub.trunk->funcs.connection_notify) { \
678  void *_prev = (_tconn)->pub.trunk->in_handler; \
679  DEBUG3("[%" PRIu64 "] Calling connection_notify(tconn=%p, conn=%p, el=%p, events=%s, uctx=%p)", \
680  (_tconn)->pub.conn->id, \
681  (_tconn), \
682  (_tconn)->pub.conn, \
683  (_tconn)->pub.trunk->el, \
684  fr_table_str_by_value(fr_trunk_connection_events, (_events), "<INVALID>"), \
685  (_tconn)->pub.trunk->uctx); \
686  (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.connection_notify; \
687  (_tconn)->pub.trunk->funcs.connection_notify((_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->el, (_events), (_tconn)->pub.trunk->uctx); \
688  (_tconn)->pub.trunk->in_handler = _prev; \
689  } \
690 } while(0)
691 
692 #define IN_HANDLER(_trunk) (((_trunk)->in_handler) != NULL)
693 #define IN_REQUEST_MUX(_trunk) (((_trunk)->funcs.request_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_mux))
694 #define IN_REQUEST_DEMUX(_trunk) (((_trunk)->funcs.request_demux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_demux))
695 #define IN_REQUEST_CANCEL_MUX(_trunk) (((_trunk)->funcs.request_cancel_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_cancel_mux))
696 
697 #define IS_SERVICEABLE(_tconn) ((_tconn)->pub.state & FR_TRUNK_CONN_SERVICEABLE)
698 #define IS_PROCESSING(_tconn) ((tconn)->pub.state & FR_TRUNK_CONN_PROCESSING)
699 
700 /** Remove the current request from the backlog
701  *
702  */
703 #define REQUEST_EXTRACT_BACKLOG(_treq) \
704 do { \
705  int _ret; \
706  _ret = fr_heap_extract(&(_treq)->pub.trunk->backlog, _treq); \
707  if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from backlog heap: %s", fr_strerror())) break; \
708 } while (0)
709 
710 /** Remove the current request from the pending list
711  *
712  */
713 #define REQUEST_EXTRACT_PENDING(_treq) \
714 do { \
715  int _ret; \
716  _ret = fr_heap_extract(&(_treq)->pub.tconn->pending, _treq); \
717  if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from pending heap: %s", fr_strerror())) break; \
718 } while (0)
719 
720 /** Remove the current request from the partial slot
721  *
722  */
723 #define REQUEST_EXTRACT_PARTIAL(_treq) \
724 do { \
725  fr_assert((_treq)->pub.tconn->partial == treq); \
726  tconn->partial = NULL; \
727 } while (0)
728 
729 /** Remove the current request from the sent list
730  *
731  */
732 #define REQUEST_EXTRACT_SENT(_treq) fr_dlist_remove(&tconn->sent, treq)
733 
734 /** Remove the current request from the cancel list
735  *
736  */
737 #define REQUEST_EXTRACT_CANCEL(_treq) fr_dlist_remove(&tconn->cancel, treq)
738 
739 /** Remove the current request from the cancel_partial slot
740  *
741  */
742 #define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq) \
743 do { \
744  fr_assert((_treq)->pub.tconn->cancel_partial == treq); \
745  tconn->cancel_partial = NULL; \
746 } while (0)
747 
748 /** Remove the current request from the cancel sent list
749  *
750  */
751 #define REQUEST_EXTRACT_CANCEL_SENT(_treq) fr_dlist_remove(&tconn->cancel_sent, treq)
752 
753 /** Reorder the connections in the active heap
754  *
755  * fr_heap_extract will also error out if heap_id is bad - no need for assert
756  */
757 #define CONN_REORDER(_tconn) \
758 do { \
759  int _ret; \
760  if ((fr_minmax_heap_num_elements((_tconn)->pub.trunk->active) == 1)) break; \
761  if (!fr_cond_assert((_tconn)->pub.state == FR_TRUNK_CONN_ACTIVE)) break; \
762  _ret = fr_minmax_heap_extract((_tconn)->pub.trunk->active, (_tconn)); \
763  if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) break; \
764  fr_minmax_heap_insert((_tconn)->pub.trunk->active, (_tconn)); \
765 } while (0)
766 
767 /** Call a list of watch functions associated with a state
768  *
769  */
771 {
772  /*
773  * Nested watcher calls are not allowed
774  * and shouldn't be possible because of
775  * deferred signal processing.
776  */
777  fr_assert(trunk->next_watcher == NULL);
778 
779  while ((trunk->next_watcher = fr_dlist_next(list, trunk->next_watcher))) {
780  fr_trunk_watch_entry_t *entry = trunk->next_watcher;
781  bool oneshot = entry->oneshot; /* Watcher could be freed, so store now */
782 
783  if (!entry->enabled) continue;
784  if (oneshot) trunk->next_watcher = fr_dlist_remove(list, entry);
785 
786  entry->func(trunk, trunk->pub.state, state, entry->uctx);
787 
788  if (oneshot) talloc_free(entry);
789  }
790  trunk->next_watcher = NULL;
791 }
792 
793 /** Call the state change watch functions
794  *
795  */
796 #define CALL_WATCHERS(_trunk, _state) \
797 do { \
798  if (fr_dlist_empty(&(_trunk)->watch[_state])) break; \
799  trunk_watch_call((_trunk), &(_trunk)->watch[_state], _state); \
800 } while(0)
801 
802 /** Remove a watch function from a trunk state list
803  *
804  * @param[in] trunk The trunk to remove the watcher from.
805  * @param[in] state to remove the watch from.
806  * @param[in] watch Function to remove.
807  * @return
808  * - 0 if the function was removed successfully.
809  * - -1 if the function wasn't present in the watch list.
810  * - -2 if an invalid state was passed.
811  */
813 {
814  fr_trunk_watch_entry_t *entry = NULL;
815  fr_dlist_head_t *list;
816 
817  if (state >= FR_TRUNK_STATE_MAX) return -2;
818 
819  list = &trunk->watch[state];
820  while ((entry = fr_dlist_next(list, entry))) {
821  if (entry->func == watch) {
822  if (trunk->next_watcher == entry) {
823  trunk->next_watcher = fr_dlist_remove(list, entry);
824  } else {
825  fr_dlist_remove(list, entry);
826  }
827  talloc_free(entry);
828  return 0;
829  }
830  }
831 
832  return -1;
833 }
834 
835 /** Add a watch entry to the trunk state list
836  *
837  * @param[in] trunk The trunk to add the watcher to.
838  * @param[in] state to watch for.
839  * @param[in] watch Function to add.
840  * @param[in] oneshot Should this watcher only be run once.
841  * @param[in] uctx Context to pass to function.
842  * @return
843  * - NULL if an invalid state is passed.
844  * - A new watch entry handle on success.
845  */
847  fr_trunk_watch_t watch, bool oneshot, void const *uctx)
848 {
849  fr_trunk_watch_entry_t *entry;
850  fr_dlist_head_t *list;
851 
852  if (state >= FR_TRUNK_STATE_MAX) return NULL;
853 
854  list = &trunk->watch[state];
855  MEM(entry = talloc_zero(trunk, fr_trunk_watch_entry_t));
856 
857  entry->func = watch;
858  entry->oneshot = oneshot;
859  entry->enabled = true;
860  memcpy(&entry->uctx, &uctx, sizeof(entry->uctx));
861  fr_dlist_insert_tail(list, entry);
862 
863  return entry;
864 }
865 
866 #define TRUNK_STATE_TRANSITION(_new) \
867 do { \
868  DEBUG3("Trunk changed state %s -> %s", \
869  fr_table_str_by_value(fr_trunk_states, trunk->pub.state, "<INVALID>"), \
870  fr_table_str_by_value(fr_trunk_states, _new, "<INVALID>")); \
871  CALL_WATCHERS(trunk, _new); \
872  trunk->pub.state = _new; \
873 } while (0)
874 
875 static void trunk_request_enter_backlog(fr_trunk_request_t *treq, bool new);
876 static void trunk_request_enter_pending(fr_trunk_request_t *treq, fr_trunk_connection_t *tconn, bool new);
884 
885 static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out,
886  fr_trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify);
887 
888 static int trunk_connection_spawn(fr_trunk_t *trunk, fr_time_t now);
889 static inline void trunk_connection_auto_full(fr_trunk_connection_t *tconn);
890 static inline void trunk_connection_auto_unfull(fr_trunk_connection_t *tconn);
891 static inline void trunk_connection_readable(fr_trunk_connection_t *tconn);
892 static inline void trunk_connection_writable(fr_trunk_connection_t *tconn);
900 
901 static void trunk_rebalance(fr_trunk_t *trunk);
902 static void trunk_manage(fr_trunk_t *trunk, fr_time_t now);
903 static void _trunk_timer(fr_event_list_t *el, fr_time_t now, void *uctx);
904 static void trunk_backlog_drain(fr_trunk_t *trunk);
905 
906 /** Compare two protocol requests
907  *
908  * Allows protocol requests to be prioritised with a function
909  * specified by the API client. Defaults to by pointer address
910  * if no function is specified.
911  *
912  * @param[in] a treq to compare to b.
913  * @param[in] b treq to compare to a.
914  * @return
915  * - +1 if a > b.
916  * - 0 if a == b.
917  * - -1 if a < b.
918  */
919 static int8_t _trunk_request_prioritise(void const *a, void const *b)
920 {
923 
924  fr_assert(treq_a->pub.trunk == treq_b->pub.trunk);
925 
926  return treq_a->pub.trunk->funcs.request_prioritise(treq_a->pub.preq, treq_b->pub.preq);
927 }
928 
929 /** Remove a request from all connection lists
930  *
931  * A common function used by init, fail, complete state functions to disassociate
932  * a request from a connection in preparation for freeing or reassignment.
933  *
934  * Despite its unassuming name, this function is *the* place to put calls to
935  * functions which need to be called when the number of requests associated with
936  * a connection changes.
937  *
938  * Trunk requests will always be passed to this function before they're removed
939  * from a connection, even if the requests are being freed.
940  *
941  * @param[in] treq to trigger a state change for.
942  */
944 {
945  fr_trunk_connection_t *tconn = treq->pub.tconn;
946  fr_trunk_t *trunk = treq->pub.trunk;
947 
948  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
949 
950  switch (treq->pub.state) {
952  return; /* Not associated with connection */
953 
956  break;
957 
960  break;
961 
963  REQUEST_EXTRACT_SENT(treq);
964  break;
965 
968  break;
969 
972  break;
973 
976  break;
977 
978  default:
979  fr_assert(0);
980  break;
981  }
982 
983  /*
984  * If the request wasn't associated with a
985  * connection, then there's nothing more
986  * to do.
987  */
988  if (!tconn) return;
989 
990  {
991  request_t *request = treq->pub.request;
992 
993  ROPTIONAL(RDEBUG3, DEBUG3, "%s Trunk connection released request %" PRIu64,
994  tconn->pub.conn->name, treq->id);
995  }
996  /*
997  * Release any connection specific resources the
998  * treq holds.
999  */
1001 
1002  switch (tconn->pub.state){
1003  case FR_TRUNK_CONN_FULL:
1004  trunk_connection_auto_unfull(tconn); /* Check if we can switch back to active */
1005  if (tconn->pub.state == FR_TRUNK_CONN_FULL) break; /* Only fallthrough if conn is now active */
1006  FALL_THROUGH;
1007 
1008  case FR_TRUNK_CONN_ACTIVE:
1009  CONN_REORDER(tconn);
1010  break;
1011 
1012  default:
1013  break;
1014  }
1015 
1016  treq->pub.tconn = NULL;
1017 
1018  /*
1019  * Request removed from the connection
1020  * see if we need up deregister I/O events.
1021  */
1023 }
1024 
1025 /** Transition a request to the unassigned state, in preparation for re-assignment
1026  *
1027  * @note treq->tconn may be inviable after calling
1028  * if treq->conn and fr_connection_signals_pause are not used.
1029  * This is due to call to trunk_request_remove_from_conn.
1030  *
1031  * @param[in] treq to trigger a state change for.
1032  */
1034 {
1035  fr_trunk_t *trunk = treq->pub.trunk;
1036 
1037  switch (treq->pub.state) {
1039  return;
1040 
1043  break;
1044 
1050  break;
1051 
1052  default:
1054  }
1055 
1057 }
1058 
1059 /** Transition a request to the backlog state, adding it to the backlog of the trunk
1060  *
1061  * @note treq->tconn and treq may be inviable after calling
1062  * if treq->conn and fr_connection_signals_pause are not used.
1063  * This is due to call to trunk_manage.
1064  *
1065  * @param[in] treq to trigger a state change for.
1066  * @param[in] new Whether this is a new request.
1067  */
1069 {
1070  fr_trunk_connection_t *tconn = treq->pub.tconn;
1071  fr_trunk_t *trunk = treq->pub.trunk;
1072 
1073  switch (treq->pub.state) {
1076  break;
1077 
1080  break;
1081 
1083  REQUEST_EXTRACT_CANCEL(treq);
1084  break;
1085 
1086  default:
1088  }
1089 
1091  fr_heap_insert(&trunk->backlog, treq); /* Insert into the backlog heap */
1092 
1093  /*
1094  * A new request has entered the trunk.
1095  * Re-calculate request/connection ratios.
1096  */
1097  if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1098 
1099  /*
1100  * To reduce latency, if there's no connections
1101  * in the connecting state, call the trunk manage
1102  * function immediately.
1103  *
1104  * Likewise, if there's draining connections
1105  * which could be moved back to active call
1106  * the trunk manage function.
1107  *
1108  * Remember requests only enter the backlog if
1109  * there's no connections which can service them.
1110  */
1114  }
1115 }
1116 
1117 /** Transition a request to the pending state, adding it to the backlog of an active connection
1118  *
1119  * All trunk requests being added to a connection get passed to this function.
1120  * All trunk requests being removed from a connection get passed to #trunk_request_remove_from_conn.
1121  *
1122  * @note treq->tconn and treq may be inviable after calling
1123  * if treq->conn and fr_connection_signals_pause is not used.
1124  * This is due to call to trunk_connection_event_update.
1125  *
1126  * @param[in] treq to trigger a state change for.
1127  * @param[in] tconn to enqueue the request on.
1128  * @param[in] new Whether this is a new request.
1129  */
1131 {
1132  fr_trunk_t *trunk = treq->pub.trunk;
1133 
1134  fr_assert(tconn->pub.trunk == trunk);
1135  fr_assert(IS_PROCESSING(tconn));
1136 
1137  switch (treq->pub.state) {
1140  fr_assert(!treq->pub.tconn);
1141  break;
1142 
1144  fr_assert(!treq->pub.tconn);
1146  break;
1147 
1148  case FR_TRUNK_REQUEST_STATE_CANCEL: /* Moved from another connection */
1149  REQUEST_EXTRACT_CANCEL(treq);
1150  break;
1151 
1152  default:
1154  }
1155 
1156  /*
1157  * Assign the new connection first this first so
1158  * it appears in the state log.
1159  */
1160  treq->pub.tconn = tconn;
1161 
1163 
1164  {
1165  request_t *request = treq->pub.request;
1166 
1167  ROPTIONAL(RDEBUG, DEBUG3, "%s Trunk connection assigned request %"PRIu64,
1168  tconn->pub.conn->name, treq->id);
1169  }
1170  fr_heap_insert(&tconn->pending, treq);
1171 
1172  /*
1173  * A new request has entered the trunk.
1174  * Re-calculate request/connection ratios.
1175  */
1176  if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1177 
1178  /*
1179  * Check if we need to automatically transition the
1180  * connection to full.
1181  */
1183 
1184  /*
1185  * Reorder the connection in the heap now it has an
1186  * additional request.
1187  */
1188  if (tconn->pub.state == FR_TRUNK_CONN_ACTIVE) CONN_REORDER(tconn);
1189 
1190  /*
1191  * We have a new request, see if we need to register
1192  * for I/O events.
1193  */
1195 }
1196 
1197 /** Transition a request to the partial state, indicating that is has been partially sent
1198  *
1199  * @param[in] treq to trigger a state change for.
1200  */
1202 {
1203  fr_trunk_connection_t *tconn = treq->pub.tconn;
1204  fr_trunk_t *trunk = treq->pub.trunk;
1205 
1206  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1207 
1208  switch (treq->pub.state) {
1209  case FR_TRUNK_REQUEST_STATE_PENDING: /* All requests go through pending, even requeued ones */
1211  break;
1212 
1213  default:
1215  }
1216 
1217  fr_assert(!tconn->partial);
1218  tconn->partial = treq;
1219 
1221 }
1222 
1223 /** Transition a request to the sent state, indicating that it's been sent in its entirety
1224  *
1225  * @note treq->tconn and treq may be inviable after calling
1226  * if treq->conn and fr_connection_signals_pause is not used.
1227  * This is due to call to trunk_connection_event_update.
1228  *
1229  * @param[in] treq to trigger a state change for.
1230  */
1232 {
1233  fr_trunk_connection_t *tconn = treq->pub.tconn;
1234  fr_trunk_t *trunk = treq->pub.trunk;
1235 
1236  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1237 
1238  switch (treq->pub.state) {
1241  break;
1242 
1245  break;
1246 
1247  default:
1249  }
1250 
1252  fr_dlist_insert_tail(&tconn->sent, treq);
1253 
1254  /*
1255  * Update the connection's sent stats
1256  */
1257  tconn->sent_count++;
1258 
1259  /*
1260  * Enforces max_uses
1261  */
1262  if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1264  }
1265 
1266  /*
1267  * We just sent a request, we probably need
1268  * to tell the event loop we want to be
1269  * notified if there's data available.
1270  */
1272 }
1273 
1274 /** Transition a request to the cancel state, placing it in a connection's cancellation list
1275  *
1276  * If a request_cancel_send callback is provided, that callback will
1277  * be called periodically for requests which were cancelled due to
1278  * a signal.
1279  *
1280  * The request_cancel_send callback will dequeue cancelled requests
1281  * and inform a remote server that the result is no longer required.
1282  *
1283  * A request must enter this state before being added to the backlog
1284  * of another connection if it's been sent or partially sent.
1285  *
1286  * @note treq->tconn and treq may be inviable after calling
1287  * if treq->conn and fr_connection_signals_pause is not used.
1288  * This is due to call to trunk_connection_event_update.
1289  *
1290  * @param[in] treq to trigger a state change for.
1291  * @param[in] reason Why the request was cancelled.
1292  * Should be one of:
1293  * - FR_TRUNK_CANCEL_REASON_SIGNAL request cancelled
1294  * because of a signal from the interpreter.
1295  * - FR_TRUNK_CANCEL_REASON_MOVE request cancelled
1296  * because the connection failed and it needs
1297  * to be assigned to a new connection.
1298  * - FR_TRUNK_CANCEL_REASON_REQUEUE request cancelled
1299  * as it needs to be resent on the same connection.
1300  */
1302 {
1303  fr_trunk_connection_t *tconn = treq->pub.tconn;
1304  fr_trunk_t *trunk = treq->pub.trunk;
1305 
1306  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1307 
1308  switch (treq->pub.state) {
1311  break;
1312 
1314  REQUEST_EXTRACT_SENT(treq);
1315  break;
1316 
1317  default:
1319  }
1320 
1322  fr_dlist_insert_tail(&tconn->cancel, treq);
1323  treq->cancel_reason = reason;
1324 
1325  DO_REQUEST_CANCEL(treq, reason);
1326 
1327  /*
1328  * Our treq is no longer bound to an actual
1329  * request_t *, as we can't guarantee the
1330  * lifetime of the original request_t *.
1331  */
1332  if (treq->cancel_reason == FR_TRUNK_CANCEL_REASON_SIGNAL) treq->pub.request = NULL;
1333 
1334  /*
1335  * Register for I/O write events if we need to.
1336  */
1338 }
1339 
1340 /** Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot
1341  *
1342  * The request_demux function is then responsible for signalling
1343  * that the cancel request is complete when the remote server
1344  * acknowledges the cancellation request.
1345  *
1346  * @param[in] treq to trigger a state change for.
1347  */
1349 {
1350  fr_trunk_connection_t *tconn = treq->pub.tconn;
1351  fr_trunk_t *trunk = treq->pub.trunk;
1352 
1353  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1356 
1357  switch (treq->pub.state) {
1358  case FR_TRUNK_REQUEST_STATE_CANCEL: /* The only valid state cancel_sent can be reached from */
1359  REQUEST_EXTRACT_CANCEL(treq);
1360  break;
1361 
1362  default:
1364  }
1365 
1367  fr_assert(!tconn->partial);
1368  tconn->cancel_partial = treq;
1369 }
1370 
1371 /** Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list
1372  *
1373  * The request_demux function is then responsible for signalling
1374  * that the cancel request is complete when the remote server
1375  * acknowledges the cancellation request.
1376  *
1377  * @note treq->tconn and treq may be inviable after calling
1378  * if treq->conn and fr_connection_signals_pause is not used.
1379  * This is due to call to trunk_connection_event_update.
1380  *
1381  * @param[in] treq to trigger a state change for.
1382  */
1384 {
1385  fr_trunk_connection_t *tconn = treq->pub.tconn;
1386  fr_trunk_t *trunk = treq->pub.trunk;
1387 
1388  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1391 
1392  switch (treq->pub.state) {
1395  break;
1396 
1398  REQUEST_EXTRACT_CANCEL(treq);
1399  break;
1400 
1401  default:
1403  }
1404 
1406  fr_dlist_insert_tail(&tconn->cancel_sent, treq);
1407 
1408  /*
1409  * De-register for I/O write events
1410  * and register the read events
1411  * to drain the cancel ACKs.
1412  */
1414 }
1415 
1416 /** Cancellation was acked, the request is complete, free it
1417  *
1418  * The API client will not be informed, as the original request_t *
1419  * will likely have been freed by this point.
1420  *
1421  * @note treq will be inviable after a call to this function.
1422  * treq->tconn may be inviable after calling
1423  * if treq->conn and fr_connection_signals_pause is not used.
1424  * This is due to call to trunk_request_remove_from_conn.
1425  *
1426  * @param[in] treq to mark as complete.
1427  */
1429 {
1430  fr_trunk_connection_t *tconn = treq->pub.tconn;
1431  fr_trunk_t *trunk = treq->pub.trunk;
1432 
1433  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1434  if (!fr_cond_assert(!treq->pub.request)) return; /* Only a valid state for request_t * which have been cancelled */
1435 
1436  switch (treq->pub.state) {
1439  break;
1440 
1441  default:
1443  }
1444 
1446 
1448  fr_trunk_request_free(&treq); /* Free the request */
1449 }
1450 
1451 /** Request completed successfully, inform the API client and free the request
1452  *
1453  * @note treq will be inviable after a call to this function.
1454  * treq->tconn may also be inviable due to call to
1455  * trunk_request_remove_from_conn.
1456  *
1457  * @param[in] treq to mark as complete.
1458  */
1460 {
1461  fr_trunk_connection_t *tconn = treq->pub.tconn;
1462  fr_trunk_t *trunk = treq->pub.trunk;
1463 
1464  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1465 
1466  switch (treq->pub.state) {
1470  break;
1471 
1472  default:
1474  }
1475 
1477  DO_REQUEST_COMPLETE(treq);
1478  fr_trunk_request_free(&treq); /* Free the request */
1479 }
1480 
1481 /** Request failed, inform the API client and free the request
1482  *
1483  * @note treq will be inviable after a call to this function.
1484  * treq->tconn may also be inviable due to call to
1485  * trunk_request_remove_from_conn.
1486  *
1487  * @param[in] treq to mark as failed.
1488  */
1490 {
1491  fr_trunk_connection_t *tconn = treq->pub.tconn;
1492  fr_trunk_t *trunk = treq->pub.trunk;
1493  fr_trunk_request_state_t prev = treq->pub.state;
1494 
1495  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1496 
1497  switch (treq->pub.state) {
1500  break;
1501 
1502  default:
1504  break;
1505  }
1506 
1508  DO_REQUEST_FAIL(treq, prev);
1509  fr_trunk_request_free(&treq); /* Free the request */
1510 }
1511 
1512 /** Check to see if a trunk request can be enqueued
1513  *
1514  * @param[out] tconn_out Connection the request may be enqueued on.
1515  * @param[in] trunk To enqueue requests on.
1516  * @param[in] request associated with the treq (if any).
1517  * @return
1518  * - FR_TRUNK_ENQUEUE_OK caller should enqueue request on provided tconn.
1519  * - FR_TRUNK_ENQUEUE_IN_BACKLOG Request should be queued in the backlog.
1520  * - FR_TRUNK_ENQUEUE_NO_CAPACITY Unable to enqueue request as we have no spare
1521  * connections or backlog space.
1522  * - FR_TRUNK_ENQUEUE_DST_UNAVAILABLE Can't enqueue because the destination is
1523  * unreachable.
1524  */
1526  request_t *request)
1527 {
1528  fr_trunk_connection_t *tconn;
1529  /*
1530  * If we have an active connection then
1531  * return that.
1532  */
1533  tconn = fr_minmax_heap_min_peek(trunk->active);
1534  if (tconn) {
1535  *tconn_out = tconn;
1536  return FR_TRUNK_ENQUEUE_OK;
1537  }
1538 
1539  /*
1540  * Unlike the connection pool, we don't need
1541  * to drive any internal processes by feeding
1542  * it requests.
1543  *
1544  * If the last event to occur was a failure
1545  * we refuse to enqueue new requests until
1546  * one or more connections comes online.
1547  */
1548  if (!trunk->conf.backlog_on_failed_conn &&
1549  fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
1550  fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed)) {
1552  RWARN, WARN, "Refusing to enqueue requests - "
1553  "No active connections and last event was a connection failure");
1554 
1556  }
1557 
1558 
1559  /*
1560  * Only enforce if we're limiting maximum
1561  * number of connections, and maximum
1562  * number of requests per connection.
1563  *
1564  * The alloc function also checks this
1565  * which is why this is only done for
1566  * debug builds.
1567  */
1568  if (trunk->conf.max_req_per_conn && trunk->conf.max) {
1569  uint64_t limit;
1570 
1571  limit = trunk->conf.max * (uint64_t)trunk->conf.max_req_per_conn;
1572  if (limit > 0) {
1573  uint64_t total_reqs;
1574 
1577  if (total_reqs >= limit) {
1579  RWARN, WARN, "Refusing to alloc requests - "
1580  "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
1581  "requests reached",
1582  limit, trunk->conf.max, trunk->conf.max_req_per_conn);
1584  }
1585  }
1586  }
1587 
1589 }
1590 
1591 /** Enqueue a request which has never been assigned to a connection or was previously cancelled
1592  *
1593  * @param[in] treq to re enqueue. Must have been removed
1594  * from its existing connection with
1595  * #trunk_connection_requests_dequeue.
1596  * @return
1597  * - FR_TRUNK_ENQUEUE_OK Request was re-enqueued.
1598  * - FR_TRUNK_ENQUEUE_NO_CAPACITY Request enqueueing failed because we're at capacity.
1599  * - FR_TRUNK_ENQUEUE_DST_UNAVAILABLE Enqueuing failed for some reason.
1600  * Usually because the connection to the resource is down.
1601  */
1603 {
1604  fr_trunk_t *trunk = treq->pub.trunk;
1605  fr_trunk_connection_t *tconn = NULL;
1606  fr_trunk_enqueue_t ret;
1607 
1608  /*
1609  * Must *NOT* still be assigned to another connection
1610  */
1611  fr_assert(!treq->pub.tconn);
1612 
1613  ret = trunk_request_check_enqueue(&tconn, trunk, treq->pub.request);
1614  switch (ret) {
1615  case FR_TRUNK_ENQUEUE_OK:
1616  if (trunk->conf.always_writable) {
1618  trunk_request_enter_pending(treq, tconn, false);
1621  } else {
1622  trunk_request_enter_pending(treq, tconn, false);
1623  }
1624  break;
1625 
1627  /*
1628  * No more connections and request
1629  * is already in the backlog.
1630  *
1631  * Signal our caller it should stop
1632  * trying to drain the backlog.
1633  */
1635  trunk_request_enter_backlog(treq, false);
1636  break;
1637 
1638  default:
1639  break;
1640  }
1641 
1642  return ret;
1643 }
1644 
1645 /** Shift requests in the specified states onto new connections
1646  *
1647  * This function will blindly dequeue any requests in the specified state and get
1648  * them back to the unassigned state, cancelling any sent or partially sent requests.
1649  *
1650  * This function does not check that dequeuing a request in a particular state is a
1651  * sane or sensible thing to do, that's up to the caller!
1652  *
1653  * @param[out] out A list to insert the newly dequeued and unassigned
1654  * requests into.
1655  * @param[in] tconn to dequeue requests from.
1656  * @param[in] states Dequeue request in these states.
1657  * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
1658  */
1660  int states, uint64_t max)
1661 {
1662  fr_trunk_request_t *treq;
1663  uint64_t count = 0;
1664 
1665  if (max == 0) max = UINT64_MAX;
1666 
1667 #define OVER_MAX_CHECK if (++count > max) return (count - 1)
1668 
1669 #define DEQUEUE_ALL(_src_list, _state) do { \
1670  while ((treq = fr_dlist_head(_src_list))) { \
1671  OVER_MAX_CHECK; \
1672  fr_assert(treq->pub.state == (_state)); \
1673  trunk_request_enter_unassigned(treq); \
1674  fr_dlist_insert_tail(out, treq); \
1675  } } while (0)
1676 
1677  /*
1678  * Don't need to do anything with
1679  * cancellation requests.
1680  */
1681  if (states & FR_TRUNK_REQUEST_STATE_CANCEL) DEQUEUE_ALL(&tconn->cancel,
1683 
1684  /*
1685  * ...same with cancel inform
1686  */
1689 
1690  /*
1691  * ....same with cancel partial
1692  */
1695  treq = tconn->cancel_partial;
1696  if (treq) {
1699  fr_dlist_insert_tail(out, treq);
1700  }
1701  }
1702 
1703  /*
1704  * ...and pending.
1705  */
1706  if (states & FR_TRUNK_REQUEST_STATE_PENDING) {
1707  while ((treq = fr_heap_peek(tconn->pending))) {
1711  fr_dlist_insert_tail(out, treq);
1712  }
1713  }
1714 
1715  /*
1716  * Cancel partially sent requests
1717  */
1718  if (states & FR_TRUNK_REQUEST_STATE_PARTIAL) {
1720  treq = tconn->partial;
1721  if (treq) {
1723 
1724  /*
1725  * Don't allow the connection to change state whilst
1726  * we're draining requests from it.
1727  */
1731  fr_dlist_insert_tail(out, treq);
1733  }
1734  }
1735 
1736  /*
1737  * Cancel sent requests
1738  */
1739  if (states & FR_TRUNK_REQUEST_STATE_SENT) {
1740  /*
1741  * Don't allow the connection to change state whilst
1742  * we're draining requests from it.
1743  */
1745  while ((treq = fr_dlist_head(&tconn->sent))) {
1748 
1751  fr_dlist_insert_tail(out, treq);
1752  }
1754  }
1755 
1756  return count;
1757 }
1758 
1759 /** Remove requests in specified states from a connection, attempting to distribute them to new connections
1760  *
1761  * @param[in] tconn To remove requests from.
1762  * @param[in] states One or more states or'd together.
1763  * @param[in] max The maximum number of requests to dequeue.
1764  * 0 for unlimited.
1765  * @param[in] fail_bound If true causes any requests bound to the connection to fail.
1766  * If false bound requests will not be moved.
1767  *
1768  * @return the number of requests re-queued.
1769  */
1770 static uint64_t trunk_connection_requests_requeue(fr_trunk_connection_t *tconn, int states, uint64_t max,
1771  bool fail_bound)
1772 {
1773  fr_trunk_t *trunk = tconn->pub.trunk;
1774  fr_dlist_head_t to_process;
1775  fr_trunk_request_t *treq = NULL;
1776  uint64_t moved = 0;
1777 
1778  if (max == 0) max = UINT64_MAX;
1779 
1780  fr_dlist_talloc_init(&to_process, fr_trunk_request_t, entry);
1781 
1782  /*
1783  * Prevent the connection changing state whilst we're
1784  * working with it.
1785  *
1786  * There's a user callback that can be called by
1787  * trunk_request_enqueue_existing which can reconnect
1788  * the connection.
1789  */
1791 
1792  /*
1793  * Remove non-cancelled requests from the connection
1794  */
1795  moved += trunk_connection_requests_dequeue(&to_process, tconn, states & ~FR_TRUNK_REQUEST_STATE_CANCEL_ALL, max);
1796 
1797  /*
1798  * Prevent requests being requeued on the same trunk
1799  * connection, which would break rebalancing.
1800  *
1801  * This is a bit of a hack, but nothing should test
1802  * for connection/list consistency in this code,
1803  * and if something is added later, it'll be flagged
1804  * by the tests.
1805  */
1806  if (tconn->pub.state == FR_TRUNK_CONN_ACTIVE) {
1807  int ret;
1808 
1809  ret = fr_minmax_heap_extract(trunk->active, tconn);
1810  if (!fr_cond_assert_msg(ret == 0,
1811  "Failed extracting conn from active heap: %s", fr_strerror())) goto done;
1812 
1813  }
1814 
1815  /*
1816  * Loop over all the requests we gathered and
1817  * redistribute them to new connections.
1818  */
1819  while ((treq = fr_dlist_next(&to_process, treq))) {
1820  fr_trunk_request_t *prev;
1821 
1822  prev = fr_dlist_remove(&to_process, treq);
1823 
1824  /*
1825  * Attempts to re-queue a request
1826  * that's bound to a connection
1827  * results in a failure.
1828  */
1829  if (treq->bound_to_conn) {
1830  if (fail_bound || !IS_SERVICEABLE(tconn)) {
1832  } else {
1833  trunk_request_enter_pending(treq, tconn, false);
1834  }
1835  goto next;
1836  }
1837 
1838  switch (trunk_request_enqueue_existing(treq)) {
1839  case FR_TRUNK_ENQUEUE_OK:
1840  break;
1841 
1842  /*
1843  * A connection failed, and
1844  * there's no other connections
1845  * available to deal with the
1846  * load, it's been placed back
1847  * in the backlog.
1848  */
1850  break;
1851 
1852  /*
1853  * If we fail to re-enqueue then
1854  * there's nothing to do except
1855  * fail the request.
1856  */
1859  case FR_TRUNK_ENQUEUE_FAIL:
1861  break;
1862  }
1863  next:
1864  treq = prev;
1865  }
1866 
1867  /*
1868  * Add the connection back into the active list
1869  */
1870  if (tconn->pub.state == FR_TRUNK_CONN_ACTIVE) {
1871  int ret;
1872 
1873  ret = fr_minmax_heap_insert(trunk->active, tconn);
1874  if (!fr_cond_assert_msg(ret == 0,
1875  "Failed re-inserting conn into active heap: %s", fr_strerror())) goto done;
1876  }
1877  if (moved >= max) goto done;
1878 
1879  /*
1880  * Deal with the cancelled requests specially we can't
1881  * queue them up again as they were only valid on that
1882  * specific connection.
1883  *
1884  * We just need to run them to completion which, as
1885  * they should already be in the unassigned state,
1886  * just means freeing them.
1887  */
1888  moved += trunk_connection_requests_dequeue(&to_process, tconn,
1889  states & FR_TRUNK_REQUEST_STATE_CANCEL_ALL, max - moved);
1890  while ((treq = fr_dlist_next(&to_process, treq))) {
1891  fr_trunk_request_t *prev;
1892 
1893  prev = fr_dlist_remove(&to_process, treq);
1894  fr_trunk_request_free(&treq);
1895  treq = prev;
1896  }
1897 
1898 done:
1899 
1900  /*
1901  * Always re-calculate the request/connection
1902  * ratio at the end.
1903  *
1904  * This avoids having the state transition
1905  * functions do it.
1906  *
1907  * The ratio would be wrong when they calculated
1908  * it anyway, because a bunch of requests are
1909  * dequeued from the connection and temporarily
1910  * cease to exist from the perspective of the
1911  * trunk_requests_per_connection code.
1912  */
1913  trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1914 
1916  return moved;
1917 }
1918 
1919 /** Move requests off of a connection and requeue elsewhere
1920  *
1921  * @note We don't re-queue on draining or draining to free, as requests should have already been
1922  * moved off of the connection. It's also dangerous as the trunk management code main
1923  * clean up a connection in this state when it's run on re-queue, and then the caller
1924  * may try and access a now freed connection.
1925  *
1926  * @param[in] tconn to move requests off of.
1927  * @param[in] states Only move requests in this state.
1928  * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
1929  * @param[in] fail_bound If true causes any requests bound to the connection to fail.
1930  * If false bound requests will not be moved.
1931  * @return The number of requests requeued.
1932  */
1933 uint64_t fr_trunk_connection_requests_requeue(fr_trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
1934 {
1935  switch (tconn->pub.state) {
1936  case FR_TRUNK_CONN_ACTIVE:
1937  case FR_TRUNK_CONN_FULL:
1939  return trunk_connection_requests_requeue(tconn, states, max, fail_bound);
1940 
1941  default:
1942  return 0;
1943  }
1944 }
1945 
1946 /** Signal a partial write
1947  *
1948  * Where there's high load, and the outbound write buffer is full
1949  *
1950  * @param[in] treq to signal state change for.
1951  */
1953 {
1954  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
1955 
1957  "%s can only be called from within request_mux handler", __FUNCTION__)) return;
1958 
1959  switch (treq->pub.state) {
1962  break;
1963 
1964  default:
1965  return;
1966  }
1967 }
1968 
1969 /** Signal that the request was written to a connection successfully
1970  *
1971  * @param[in] treq to signal state change for.
1972  */
1974 {
1975  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
1976 
1978  "%s can only be called from within request_mux handler", __FUNCTION__)) return;
1979 
1980  switch (treq->pub.state) {
1984  break;
1985 
1986  default:
1987  return;
1988  }
1989 }
1990 
1991 /** Signal that a trunk request is complete
1992  *
1993  * The API client will be informed that the request is now complete.
1994  */
1996 {
1997  fr_trunk_t *trunk = treq->pub.trunk;
1998 
1999  if (!fr_cond_assert_msg(trunk, "treq not associated with trunk")) return;
2000 
2001  /*
2002  * We assume that if the request is being signalled
2003  * as complete from the demux function, that it was
2004  * a successful read.
2005  *
2006  * If this assumption turns out to be incorrect
2007  * then we need to add an argument to signal_complete
2008  * to indicate if this is a successful read.
2009  */
2010  if (IN_REQUEST_DEMUX(trunk)) trunk->pub.last_read_success = fr_time();
2011 
2012  switch (treq->pub.state) {
2014  case FR_TRUNK_REQUEST_STATE_PENDING: /* Got immediate response, i.e. cached */
2016  break;
2017 
2018  default:
2019  return;
2020  }
2021 }
2022 
2023 /** Signal that a trunk request failed
2024  *
2025  * The API client will be informed that the request has failed.
2026  */
2028 {
2029  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2030 
2032 }
2033 
2034 /** Cancel a trunk request
2035  *
2036  * treq can be in any state, but requests to cancel if the treq is not in
2037  * the FR_TRUNK_REQUEST_STATE_PARTIAL or FR_TRUNK_REQUEST_STATE_SENT state will be ignored.
2038  *
2039  * The complete or failed callbacks will not be called here, as it's assumed the request_t *
2040  * is now inviable as it's being cancelled.
2041  *
2042  * The free function however, is called, and that should be used to perform necessary
2043  * cleanup.
2044  *
2045  * @param[in] treq to signal state change for.
2046  */
2048 {
2049  fr_trunk_t *trunk;
2050 
2051  /*
2052  * Ensure treq hasn't been freed
2053  */
2054  (void)talloc_get_type_abort(treq, fr_trunk_request_t);
2055 
2056  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2057 
2058  if (!fr_cond_assert_msg(!IN_HANDLER(treq->pub.trunk),
2059  "%s cannot be called within a handler", __FUNCTION__)) return;
2060 
2061  trunk = treq->pub.trunk;
2062 
2063  switch (treq->pub.state) {
2064  /*
2065  * We don't call the complete or failed callbacks
2066  * as the request and rctx are no longer viable.
2067  */
2070  {
2071  fr_trunk_connection_t *tconn = treq->pub.tconn;
2072 
2073  /*
2074  * Don't allow connection state changes
2075  */
2079  "Bad state %s after cancellation",
2080  fr_table_str_by_value(fr_trunk_request_states, treq->pub.state, "<INVALID>"))) {
2082  return;
2083  }
2084  /*
2085  * No cancel muxer. We're done.
2086  *
2087  * If we do have a cancel mux function,
2088  * the next time this connection becomes
2089  * writable, we'll call the cancel mux
2090  * function.
2091  *
2092  * We don't run the complete or failed
2093  * callbacks here as the request is
2094  * being cancelled.
2095  */
2096  if (!trunk->funcs.request_cancel_mux) {
2098  fr_trunk_request_free(&treq);
2099  }
2101  }
2102  break;
2103 
2104  /*
2105  * We're already in the process of cancelling a
2106  * request, so ignore duplicate signals.
2107  */
2112  break;
2113 
2114  /*
2115  * For any other state, we just release the request
2116  * from its current connection and free it.
2117  */
2118  default:
2120  fr_trunk_request_free(&treq);
2121  break;
2122  }
2123 }
2124 
2125 /** Signal a partial cancel write
2126  *
2127  * Where there's high load, and the outbound write buffer is full
2128  *
2129  * @param[in] treq to signal state change for.
2130  */
2132 {
2133  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2134 
2136  "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2137 
2138  switch (treq->pub.state) {
2141  break;
2142 
2143  default:
2144  return;
2145  }
2146 }
2147 
2148 /** Signal that a remote server has been notified of the cancellation
2149  *
2150  * Called from request_cancel_mux to indicate that the datastore has been informed
2151  * that the response is no longer needed.
2152  *
2153  * @param[in] treq to signal state change for.
2154  */
2156 {
2157  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2158 
2160  "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2161 
2162  switch (treq->pub.state) {
2166  break;
2167 
2168  default:
2169  break;
2170  }
2171 }
2172 
2173 /** Signal that a remote server acked our cancellation
2174  *
2175  * Called from request_demux to indicate that it got an ack for the cancellation.
2176  *
2177  * @param[in] treq to signal state change for.
2178  */
2180 {
2181  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2182 
2184  "%s can only be called from within request_demux or request_cancel_mux handlers",
2185  __FUNCTION__)) return;
2186 
2187  switch (treq->pub.state) {
2189  /*
2190  * This is allowed, as we may not need to wait
2191  * for the database to ACK our cancellation
2192  * request.
2193  *
2194  * Note: FR_TRUNK_REQUEST_STATE_CANCEL_PARTIAL
2195  * is not allowed here, as that'd mean we'd half
2196  * written the cancellation request out to the
2197  * socket, and then decided to abandon it.
2198  *
2199  * That'd leave the socket in an unusable state.
2200  */
2203  break;
2204 
2205  default:
2206  break;
2207  }
2208 }
2209 
2210 /** If the trunk request is freed then update the target requests
2211  *
2212  * gperftools showed calling the request free function directly was slightly faster
2213  * than using talloc_free.
2214  *
2215  * @param[in] treq_to_free request.
2216  */
2218 {
2219  fr_trunk_request_t *treq = *treq_to_free;
2220  fr_trunk_t *trunk = treq->pub.trunk;
2221 
2222  if (unlikely(!treq)) return;
2223 
2224  /*
2225  * The only valid states a trunk request can be
2226  * freed from.
2227  */
2228  switch (treq->pub.state) {
2234  break;
2235 
2236  default:
2237  if (!fr_cond_assert(0)) return;
2238  }
2239 
2240  /*
2241  * Zero out the pointer to prevent double frees
2242  */
2243  *treq_to_free = NULL;
2244 
2245  /*
2246  * Call the API client callback to free
2247  * any associated memory.
2248  */
2249  DO_REQUEST_FREE(treq);
2250 
2251  /*
2252  * Update the last above/below target stats
2253  * We only do this when we alloc or free
2254  * connections, or on connection
2255  * state changes.
2256  */
2257  trunk_requests_per_connection(NULL, NULL, treq->pub.trunk, fr_time(), false);
2258 
2259  /*
2260  * This tracks the total number of requests
2261  * allocated and not freed or returned to
2262  * the free list.
2263  */
2264  if (fr_cond_assert(trunk->pub.req_alloc > 0)) trunk->pub.req_alloc--;
2265 
2266  /*
2267  * No cleanup delay, means cleanup immediately
2268  */
2271 
2272 #ifndef NDEBUG
2273  /*
2274  * Ensure anything parented off the treq
2275  * is freed. We do this to trigger
2276  * the destructors for the log entries.
2277  */
2278  talloc_free_children(treq);
2279 
2280  /*
2281  * State log should now be empty as entries
2282  * remove themselves from the dlist
2283  * on free.
2284  */
2286  "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2287 #endif
2288 
2289  talloc_free(treq);
2290  return;
2291  }
2292 
2293  /*
2294  * Ensure anything parented off the treq
2295  * is freed.
2296  */
2297  talloc_free_children(treq);
2298 
2299 #ifndef NDEBUG
2300  /*
2301  * State log should now be empty as entries
2302  * remove themselves from the dlist
2303  * on free.
2304  */
2306  "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2307 #endif
2308 
2309  /*
2310  *
2311  * Return the trunk request back to the init state.
2312  */
2313  *treq = (fr_trunk_request_t){
2314  .pub = {
2316  .trunk = treq->pub.trunk,
2317  },
2318  .cancel_reason = FR_TRUNK_CANCEL_REASON_NONE,
2319  .last_freed = fr_time(),
2320 #ifndef NDEBUG
2321  .log = treq->log /* Keep the list head, to save reinitialisation */
2322 #endif
2323  };
2324 
2325  /*
2326  * Insert at the head, so that we can free
2327  * requests that have been unused for N
2328  * seconds from the tail.
2329  */
2330  fr_dlist_insert_tail(&trunk->free_requests, treq);
2331 }
2332 
2333 /** Actually free the trunk request
2334  *
2335  */
2337 {
2338  fr_trunk_t *trunk = treq->pub.trunk;
2339 
2340  switch (treq->pub.state) {
2343  break;
2344 
2345  default:
2346  fr_assert(0);
2347  break;
2348  }
2349 
2350  fr_dlist_remove(&trunk->free_requests, treq);
2351 
2352  return 0;
2353 }
2354 
2355 /** (Pre-)Allocate a new trunk request
2356  *
2357  * If trunk->conf.req_pool_headers or trunk->conf.req_pool_size are not zero then the
2358  * request will be a talloc pool, which can be used to hold the preq.
2359  *
2360  * @note Do not use MEM to check the result of this allocated as it may fail for
2361  * non-fatal reasons.
2362  *
2363  * @param[in] trunk to add request to.
2364  * @param[in] request to wrap in a trunk request (treq).
2365  * @return
2366  * - A newly allocated request.
2367  * - NULL if too many requests are allocated.
2368  */
2370 {
2371  fr_trunk_request_t *treq;
2372 
2373  /*
2374  * The number of treqs currently allocated
2375  * exceeds the maximum number allowed.
2376  */
2377  if (trunk->conf.max_req_per_conn && trunk->conf.max) {
2378  uint64_t limit;
2379 
2380  limit = (uint64_t) trunk->conf.max_req_per_conn * trunk->conf.max;
2381  if (trunk->pub.req_alloc >= limit) {
2383  RWARN, WARN, "Refusing to alloc requests - "
2384  "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
2385  "requests reached",
2386  limit, trunk->conf.max, trunk->conf.max_req_per_conn);
2387  return NULL;
2388  }
2389  }
2390 
2391  /*
2392  * Allocate or reuse an existing request
2393  */
2394  treq = fr_dlist_head(&trunk->free_requests);
2395  if (treq) {
2396  fr_dlist_remove(&trunk->free_requests, treq);
2398  fr_assert(treq->pub.trunk == trunk);
2399  fr_assert(treq->pub.tconn == NULL);
2402  trunk->pub.req_alloc_reused++;
2403  } else {
2405  trunk->conf.req_pool_headers, trunk->conf.req_pool_size));
2406  talloc_set_destructor(treq, _trunk_request_free);
2407 
2408  *treq = (fr_trunk_request_t){
2409  .pub = {
2411  .trunk = trunk
2412  },
2413  .cancel_reason = FR_TRUNK_CANCEL_REASON_NONE
2414  };
2415  trunk->pub.req_alloc_new++;
2416 #ifndef NDEBUG
2418 #endif
2419  }
2420 
2421  trunk->pub.req_alloc++;
2423  /* heap_id - initialised when treq inserted into pending */
2424  /* list - empty */
2425  /* preq - populated later */
2426  /* rctx - populated later */
2427  treq->pub.request = request;
2428 
2429  return treq;
2430 }
2431 
2432 /** Enqueue a request that needs data written to the trunk
2433  *
2434  * When a request_t * needs to make an asynchronous request to an external datastore
2435  * it should call this function, specifying a preq (protocol request) containing
2436  * the data necessary to request information from the external datastore, and an
2437  * rctx (resume ctx) used to hold the decoded response and/or any error codes.
2438  *
2439  * After a treq is successfully enqueued it will either be assigned immediately
2440  * to the pending queue of a connection, or if no connections are available,
2441  * (depending on the trunk configuration) the treq will be placed in the trunk's
2442  * global backlog.
2443  *
2444  * After receiving a positive return code from this function the caller should
2445  * immediately yield, to allow the various timers and I/O handlers that drive tconn
2446  * (trunk connection) and treq state changes to be called.
2447  *
2448  * When a tconn becomes writable (or the trunk is configured to be always writable)
2449  * the #fr_trunk_request_mux_t callback will be called to dequeue, encode and
2450  * send any pending requests for that tconn. The #fr_trunk_request_mux_t callback
2451  * is also responsible for tracking the outbound requests to allow the
2452  * #fr_trunk_request_demux_t callback to match inbound responses with the original
2453  * treq. Once the #fr_trunk_request_mux_t callback is done processing the treq
2454  * it signals what state the treq should enter next using one of the
2455  * fr_trunk_request_signal_* functions.
2456  *
2457  * When a tconn becomes readable the user specified #fr_trunk_request_demux_t
2458  * callback is called to process any responses, match them with the original treq.
2459  * and signal what state they should enter next using one of the
2460  * fr_trunk_request_signal_* functions.
2461  *
2462  * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2463  * is NULL, a new treq will be allocated.
2464  * Otherwise treq should point to memory allocated
2465  * with fr_trunk_request_alloc.
2466  * @param[in] trunk to enqueue request on.
2467  * @param[in] request to enqueue.
2468  * @param[in] preq Protocol request to write out. Will be freed when
2469  * treq is freed. Should ideally be parented by the
2470  * treq if possible.
2471  * Use #fr_trunk_request_alloc for pre-allocation of
2472  * the treq.
2473  * @param[in] rctx The resume context to write any result to.
2474  * @return
2475  * - FR_TRUNK_ENQUEUE_OK.
2476  * - FR_TRUNK_ENQUEUE_IN_BACKLOG.
2477  * - FR_TRUNK_ENQUEUE_NO_CAPACITY.
2478  * - FR_TRUNK_ENQUEUE_DST_UNAVAILABLE
2479  * - FR_TRUNK_ENQUEUE_FAIL
2480  */
2482  request_t *request, void *preq, void *rctx)
2483 {
2484  fr_trunk_connection_t *tconn = NULL;
2485  fr_trunk_request_t *treq;
2486  fr_trunk_enqueue_t ret;
2487 
2488  if (!fr_cond_assert_msg(!IN_HANDLER(trunk),
2489  "%s cannot be called within a handler", __FUNCTION__)) return FR_TRUNK_ENQUEUE_FAIL;
2490 
2491  if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == FR_TRUNK_REQUEST_STATE_INIT),
2492  "%s requests must be in \"init\" state", __FUNCTION__)) return FR_TRUNK_ENQUEUE_FAIL;
2493 
2494  /*
2495  * If delay_start was set, we may need
2496  * to insert the timer for the connection manager.
2497  */
2498  if (unlikely(!trunk->started)) {
2499  if (fr_trunk_start(trunk) < 0) return FR_TRUNK_ENQUEUE_FAIL;
2500  }
2501 
2502  ret = trunk_request_check_enqueue(&tconn, trunk, request);
2503  switch (ret) {
2504  case FR_TRUNK_ENQUEUE_OK:
2505  if (*treq_out) {
2506  treq = *treq_out;
2507  } else {
2508  MEM(*treq_out = treq = fr_trunk_request_alloc(trunk, request));
2509  }
2510  treq->pub.preq = preq;
2511  treq->pub.rctx = rctx;
2512  if (trunk->conf.always_writable) {
2514  trunk_request_enter_pending(treq, tconn, true);
2517  } else {
2518  trunk_request_enter_pending(treq, tconn, true);
2519  }
2520  break;
2521 
2523  if (*treq_out) {
2524  treq = *treq_out;
2525  } else {
2526  MEM(*treq_out = treq = fr_trunk_request_alloc(trunk, request));
2527  }
2528  treq->pub.preq = preq;
2529  treq->pub.rctx = rctx;
2530  trunk_request_enter_backlog(treq, true);
2531  break;
2532 
2533  default:
2534  /*
2535  * If a trunk request was provided
2536  * populate the preq and rctx fields
2537  * so that if it's freed with
2538  * fr_trunk_request_free, the free
2539  * function works as intended.
2540  */
2541  if (*treq_out) {
2542  treq = *treq_out;
2543  treq->pub.preq = preq;
2544  treq->pub.rctx = rctx;
2545  }
2546  return ret;
2547  }
2548 
2549  return ret;
2550 }
2551 
2552 /** Re-enqueue a request on the same connection
2553  *
2554  * If the treq has been sent, we assume that we're being signalled to requeue
2555  * because something outside of the trunk API has determined that a retransmission
2556  * is required. The easiest way to perform that retransmission is to clean up
2557  * any tracking information for the request, and the requeue it for transmission.
2558  *
2559  * IF re-queueing fails, the request will enter the fail state. It should not be
2560  * accessed if this occurs.
2561  *
2562  * @param[in] treq to requeue (retransmit).
2563  * @return
2564  * - FR_TRUNK_ENQUEUE_OK.
2565  * - FR_TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2566  * - FR_TRUNK_ENQUEUE_FAIL - Request isn't in a valid state to be reassigned.
2567  */
2569 {
2570  fr_trunk_connection_t *tconn = treq->pub.tconn; /* Existing conn */
2571 
2572  if (!tconn) return FR_TRUNK_ENQUEUE_FAIL;
2573 
2574  if (!IS_PROCESSING(tconn)) {
2577  }
2578 
2579  switch (treq->pub.state) {
2584  trunk_request_enter_pending(treq, tconn, false);
2586  break;
2587 
2588  case FR_TRUNK_REQUEST_STATE_BACKLOG: /* Do nothing.... */
2589  case FR_TRUNK_REQUEST_STATE_PENDING: /* Do nothing.... */
2590  break;
2591 
2592  default:
2594  return FR_TRUNK_ENQUEUE_FAIL;
2595  }
2596 
2597  return FR_TRUNK_ENQUEUE_OK;
2598 }
2599 
2600 /** Enqueue additional requests on a specific connection
2601  *
2602  * This may be used to create a series of requests on a single connection, or to generate
2603  * in-band status checks.
2604  *
2605  * @note If conf->always_writable, then the muxer will be called immediately. The caller
2606  * must be able to handle multiple calls to its muxer gracefully.
2607  *
2608  * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2609  * is NULL, a new treq will be allocated.
2610  * Otherwise treq should point to memory allocated
2611  * with fr_trunk_request_alloc.
2612  * @param[in] tconn to enqueue request on.
2613  * @param[in] request to enqueue.
2614  * @param[in] preq Protocol request to write out. Will be freed when
2615  * treq is freed. Should ideally be parented by the
2616  * treq if possible.
2617  * Use #fr_trunk_request_alloc for pre-allocation of
2618  * the treq.
2619  * @param[in] rctx The resume context to write any result to.
2620  * @param[in] ignore_limits Ignore max_req_per_conn. Useful to force status
2621  * checks through even if the connection is at capacity.
2622  * Will also allow enqueuing on "inactive", "draining",
2623  * "draining-to-free" connections.
2624  * @return
2625  * - FR_TRUNK_ENQUEUE_OK.
2626  * - FR_TRUNK_ENQUEUE_NO_CAPACITY - At max_req_per_conn_limit
2627  * - FR_TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2628  */
2630  request_t *request, void *preq, void *rctx,
2631  bool ignore_limits)
2632 {
2633  fr_trunk_request_t *treq;
2634  fr_trunk_t *trunk = tconn->pub.trunk;
2635 
2636  if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == FR_TRUNK_REQUEST_STATE_INIT),
2637  "%s requests must be in \"init\" state", __FUNCTION__)) return FR_TRUNK_ENQUEUE_FAIL;
2638 
2640 
2641  /*
2642  * Limits check
2643  */
2644  if (!ignore_limits) {
2645  if (trunk->conf.max_req_per_conn &&
2648 
2650  }
2651 
2652  if (*treq_out) {
2653  treq = *treq_out;
2654  } else {
2655  MEM(*treq_out = treq = fr_trunk_request_alloc(trunk, request));
2656  }
2657 
2658  treq->pub.preq = preq;
2659  treq->pub.rctx = rctx;
2660  treq->bound_to_conn = true; /* Don't let the request be transferred */
2661 
2662  if (trunk->conf.always_writable) {
2664  trunk_request_enter_pending(treq, tconn, true);
2667  } else {
2668  trunk_request_enter_pending(treq, tconn, true);
2669  }
2670 
2671  return FR_TRUNK_ENQUEUE_OK;
2672 }
2673 
2674 #ifndef NDEBUG
2675 /** Used for sanity checks to ensure all log entries have been freed
2676  *
2677  */
2679 {
2680  fr_dlist_remove(slog->log_head, slog);
2681 
2682  return 0;
2683 }
2684 
2685 void trunk_request_state_log_entry_add(char const *function, int line,
2687 {
2688  fr_trunk_request_state_log_t *slog = NULL;
2689 
2691  slog = fr_dlist_head(&treq->log);
2692  fr_assert_msg(slog, "slog list head NULL but element counter was %u",
2693  fr_dlist_num_elements(&treq->log));
2694  (void)fr_dlist_remove(&treq->log, slog); /* Returns NULL when removing the list head */
2695  memset(slog, 0, sizeof(*slog));
2696  } else {
2697  MEM(slog = talloc_zero(treq, fr_trunk_request_state_log_t));
2698  talloc_set_destructor(slog, _state_log_entry_free);
2699  }
2700 
2701  slog->log_head = &treq->log;
2702  slog->from = treq->pub.state;
2703  slog->to = new;
2704  slog->function = function;
2705  slog->line = line;
2706  if (treq->pub.tconn) {
2707  slog->tconn = treq->pub.tconn;
2708  slog->tconn_id = treq->pub.tconn->pub.conn->id;
2709  slog->tconn_state = treq->pub.tconn->pub.state;
2710  }
2711 
2712  fr_dlist_insert_tail(&treq->log, slog);
2713 
2714 }
2715 
2716 void fr_trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line,
2717  fr_trunk_request_t const *treq)
2718 {
2719  fr_trunk_request_state_log_t *slog = NULL;
2720 
2721  int i;
2722 
2723  for (slog = fr_dlist_head(&treq->log), i = 0;
2724  slog;
2725  slog = fr_dlist_next(&treq->log, slog), i++) {
2726  fr_log(log, log_type, file, line, "[%u] %s:%i - in conn %"PRIu64" in state %s - %s -> %s",
2727  i, slog->function, slog->line,
2728  slog->tconn_id,
2730  slog->tconn_state, "<INVALID>") : "none",
2731  fr_table_str_by_value(fr_trunk_request_states, slog->from, "<INVALID>"),
2732  fr_table_str_by_value(fr_trunk_request_states, slog->to, "<INVALID>"));
2733  }
2734 }
2735 #endif
2736 
2737 /** Return the count number of connections in the specified states
2738  *
2739  * @param[in] trunk to retrieve counts for.
2740  * @param[in] conn_state One or more #fr_trunk_connection_state_t states or'd together.
2741  * @return The number of connections in the specified states.
2742  */
2744 {
2745  uint16_t count = 0;
2746 
2747  if (conn_state & FR_TRUNK_CONN_INIT) count += fr_dlist_num_elements(&trunk->init);
2748  if (conn_state & FR_TRUNK_CONN_CONNECTING) count += fr_dlist_num_elements(&trunk->connecting);
2749  if (conn_state & FR_TRUNK_CONN_ACTIVE) count += fr_minmax_heap_num_elements(trunk->active);
2750  if (conn_state & FR_TRUNK_CONN_FULL) count += fr_dlist_num_elements(&trunk->full);
2751  if (conn_state & FR_TRUNK_CONN_INACTIVE) count += fr_dlist_num_elements(&trunk->inactive);
2753  if (conn_state & FR_TRUNK_CONN_CLOSED) count += fr_dlist_num_elements(&trunk->closed);
2754  if (conn_state & FR_TRUNK_CONN_DRAINING) count += fr_dlist_num_elements(&trunk->draining);
2756 
2757  return count;
2758 }
2759 
2760 /** Return the count number of requests associated with a trunk connection
2761  *
2762  * @param[in] tconn to return request count for.
2763  * @param[in] req_state One or more request states or'd together.
2764  *
2765  * @return The number of requests in the specified states, associated with a tconn.
2766  */
2768 {
2769  uint32_t count = 0;
2770 
2772  if (req_state & FR_TRUNK_REQUEST_STATE_PARTIAL) count += tconn->partial ? 1 : 0;
2773  if (req_state & FR_TRUNK_REQUEST_STATE_SENT) count += fr_dlist_num_elements(&tconn->sent);
2774  if (req_state & FR_TRUNK_REQUEST_STATE_CANCEL) count += fr_dlist_num_elements(&tconn->cancel);
2775  if (req_state & FR_TRUNK_REQUEST_STATE_CANCEL_PARTIAL) count += tconn->cancel_partial ? 1 : 0;
2777 
2778  return count;
2779 }
2780 
2781 /** Automatically mark a connection as inactive
2782  *
2783  * @param[in] tconn to potentially mark as inactive.
2784  */
2786 {
2787  fr_trunk_t *trunk = tconn->pub.trunk;
2788  uint32_t count;
2789 
2790  if (tconn->pub.state != FR_TRUNK_CONN_ACTIVE) return;
2791 
2792  /*
2793  * Enforces max_req_per_conn
2794  */
2795  if (trunk->conf.max_req_per_conn > 0) {
2798  }
2799 }
2800 
2801 /** Return whether a trunk connection should currently be considered full
2802  *
2803  * @param[in] tconn to check.
2804  * @return
2805  * - true if the connection is full.
2806  * - false if the connection is not full.
2807  */
2809 {
2810  fr_trunk_t *trunk = tconn->pub.trunk;
2811  uint32_t count;
2812 
2813  /*
2814  * Enforces max_req_per_conn
2815  */
2817  if ((trunk->conf.max_req_per_conn == 0) || (count < trunk->conf.max_req_per_conn)) return false;
2818 
2819  return true;
2820 }
2821 
2822 /** Automatically mark a connection as active or reconnect it
2823  *
2824  * @param[in] tconn to potentially mark as active or reconnect.
2825  */
2827 {
2828  if (tconn->pub.state != FR_TRUNK_CONN_FULL) return;
2829 
2830  /*
2831  * Enforces max_req_per_conn
2832  */
2834 }
2835 
2836 /** A connection is readable. Call the request_demux function to read pending requests
2837  *
2838  */
2840 {
2841  fr_trunk_t *trunk = tconn->pub.trunk;
2842 
2843  DO_REQUEST_DEMUX(tconn);
2844 }
2845 
2846 /** A connection is writable. Call the request_mux function to write pending requests
2847  *
2848  */
2850 {
2851  fr_trunk_t *trunk = tconn->pub.trunk;
2852 
2853  /*
2854  * Call the cancel_sent function (if we have one)
2855  * to inform a backend datastore we no longer
2856  * care about the result
2857  */
2861  DO_REQUEST_CANCEL_MUX(tconn);
2862  }
2866  DO_REQUEST_MUX(tconn);
2867 }
2868 
2869 /** Update the registrations for I/O events we're interested in
2870  *
2871  */
2873 {
2874  fr_trunk_t *trunk = tconn->pub.trunk;
2876 
2877  switch (tconn->pub.state) {
2878  /*
2879  * We only register I/O events if the trunk connection is
2880  * in one of these states.
2881  *
2882  * For the other states the trunk shouldn't be processing
2883  * requests.
2884  */
2885  case FR_TRUNK_CONN_ACTIVE:
2886  case FR_TRUNK_CONN_FULL:
2891  /*
2892  * If the connection is always writable,
2893  * then we don't care about write events.
2894  */
2895  if (!trunk->conf.always_writable &&
2899  (trunk->funcs.request_cancel_mux ?
2903  }
2904 
2907  (trunk->funcs.request_cancel_mux ?
2910  }
2911  break;
2912 
2913  default:
2914  break;
2915  }
2916 
2917  if (tconn->events != events) {
2918  /*
2919  * There may be a fatal error which results
2920  * in the connection being freed.
2921  *
2922  * Stop that from happening until after
2923  * we're done using it.
2924  */
2926  DO_CONNECTION_NOTIFY(tconn, events);
2927  tconn->events = events;
2929  }
2930 }
2931 
2932 /** Remove a trunk connection from whichever list it's currently in
2933  *
2934  * @param[in] tconn to remove.
2935  */
2937 {
2938  fr_trunk_t *trunk = tconn->pub.trunk;
2939 
2940  switch (tconn->pub.state) {
2941  case FR_TRUNK_CONN_ACTIVE:
2942  {
2943  int ret;
2944 
2945  ret = fr_minmax_heap_extract(trunk->active, tconn);
2946  if (!fr_cond_assert_msg(ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) return;
2947  }
2948  return;
2949 
2950  case FR_TRUNK_CONN_INIT:
2951  fr_dlist_remove(&trunk->init, tconn);
2952  break;
2953 
2955  fr_dlist_remove(&trunk->connecting, tconn);
2956  return;
2957 
2958  case FR_TRUNK_CONN_CLOSED:
2959  fr_dlist_remove(&trunk->closed, tconn);
2960  return;
2961 
2962  case FR_TRUNK_CONN_FULL:
2963  fr_dlist_remove(&trunk->full, tconn);
2964  return;
2965 
2967  fr_dlist_remove(&trunk->inactive, tconn);
2968  return;
2969 
2971  fr_dlist_remove(&trunk->inactive_draining, tconn);
2972  return;
2973 
2975  fr_dlist_remove(&trunk->draining, tconn);
2976  return;
2977 
2979  fr_dlist_remove(&trunk->draining_to_free, tconn);
2980  return;
2981 
2982  case FR_TRUNK_CONN_HALTED:
2983  return;
2984  }
2985 }
2986 
2987 /** Transition a connection to the full state
2988  *
2989  * Called whenever a trunk connection is at the maximum number of requests.
2990  * Removes the connection from the connected heap, and places it in the full list.
2991  */
2993 {
2994  fr_trunk_t *trunk = tconn->pub.trunk;
2995 
2996  switch (tconn->pub.state) {
2997  case FR_TRUNK_CONN_ACTIVE:
2998  trunk_connection_remove(tconn);
2999  break;
3000 
3001  default:
3003  }
3004 
3005  fr_dlist_insert_head(&trunk->full, tconn);
3007 }
3008 
3009 /** Transition a connection to the inactive state
3010  *
3011  * Called whenever the API client wants to stop new requests being enqueued
3012  * on a trunk connection.
3013  */
3015 {
3016  fr_trunk_t *trunk = tconn->pub.trunk;
3017 
3018  switch (tconn->pub.state) {
3019  case FR_TRUNK_CONN_ACTIVE:
3020  case FR_TRUNK_CONN_FULL:
3021  trunk_connection_remove(tconn);
3022  break;
3023 
3024  default:
3026  }
3027 
3028  fr_dlist_insert_head(&trunk->inactive, tconn);
3030 }
3031 
3032 /** Transition a connection to the inactive-draining state
3033  *
3034  * Called whenever the trunk manager wants to drain an inactive connection
3035  * of its requests.
3036  */
3038 {
3039  fr_trunk_t *trunk = tconn->pub.trunk;
3040 
3041  switch (tconn->pub.state) {
3044  trunk_connection_remove(tconn);
3045  break;
3046 
3047  default:
3049  }
3050 
3051  fr_dlist_insert_head(&trunk->inactive_draining, tconn);
3053 
3054  /*
3055  * Immediately re-enqueue all pending
3056  * requests, so the connection is drained
3057  * quicker.
3058  */
3060 }
3061 
3062 /** Transition a connection to the draining state
3063  *
3064  * Removes the connection from the active heap so it won't be assigned any new
3065  * connections.
3066  */
3068 {
3069  fr_trunk_t *trunk = tconn->pub.trunk;
3070 
3071  switch (tconn->pub.state) {
3072  case FR_TRUNK_CONN_ACTIVE:
3073  case FR_TRUNK_CONN_FULL:
3076  trunk_connection_remove(tconn);
3077  break;
3078 
3079  default:
3081  }
3082 
3083  fr_dlist_insert_head(&trunk->draining, tconn);
3085 
3086  /*
3087  * Immediately re-enqueue all pending
3088  * requests, so the connection is drained
3089  * quicker.
3090  */
3092 }
3093 
3094 /** Transition a connection to the draining-to-reconnect state
3095  *
3096  * Removes the connection from the active heap so it won't be assigned any new
3097  * connections.
3098  */
3100 {
3101  fr_trunk_t *trunk = tconn->pub.trunk;
3102 
3103  switch (tconn->pub.state) {
3104  case FR_TRUNK_CONN_ACTIVE:
3105  case FR_TRUNK_CONN_FULL:
3109  trunk_connection_remove(tconn);
3110  break;
3111 
3112  default:
3114  }
3115 
3116  fr_dlist_insert_head(&trunk->draining_to_free, tconn);
3118 
3119  /*
3120  * Immediately re-enqueue all pending
3121  * requests, so the connection is drained
3122  * quicker.
3123  */
3125 }
3126 
3127 
3128 /** Transition a connection back to the active state
3129  *
3130  * This should only be called on a connection which is in the full state,
3131  * inactive state, draining state or connecting state.
3132  */
3134 {
3135  fr_trunk_t *trunk = tconn->pub.trunk;
3136  int ret;
3137 
3138  switch (tconn->pub.state) {
3139  case FR_TRUNK_CONN_FULL:
3143  trunk_connection_remove(tconn);
3144  break;
3145 
3147  trunk_connection_remove(tconn);
3149  break;
3150 
3151  default:
3153  }
3154 
3155  ret = fr_minmax_heap_insert(trunk->active, tconn); /* re-insert into the active heap*/
3156  if (!fr_cond_assert_msg(ret == 0, "Failed inserting connection into active heap: %s", fr_strerror())) {
3158  return;
3159  }
3160 
3162 
3163  /*
3164  * Reorder the connections
3165  */
3166  CONN_REORDER(tconn);
3167 
3168  /*
3169  * Rebalance requests
3170  */
3171  trunk_rebalance(trunk);
3172 
3173  /*
3174  * We place requests into the backlog
3175  * because there were no connections
3176  * available to handle them.
3177  *
3178  * If a connection has become active
3179  * chances are those backlogged requests
3180  * can now be enqueued, so try and do
3181  * that now.
3182  *
3183  * If there's requests sitting in the
3184  * backlog indefinitely, it's because
3185  * they were inserted there erroneously
3186  * when there were active connections
3187  * which could have handled them.
3188  */
3189  trunk_backlog_drain(trunk);
3190 }
3191 
3192 /** Connection transitioned to the the init state
3193  *
3194  * Reflect the connection state change in the lists we use to track connections.
3195  *
3196  * @note This function is only called from the connection API as a watcher.
3197  *
3198  * @param[in] conn The connection which changes state.
3199  * @param[in] prev The connection is was in.
3200  * @param[in] state The connection is now in.
3201  * @param[in] uctx The fr_trunk_connection_t wrapping the connection.
3202  */
3206  void *uctx)
3207 {
3208  fr_trunk_connection_t *tconn = talloc_get_type_abort(uctx, fr_trunk_connection_t);
3209  fr_trunk_t *trunk = tconn->pub.trunk;
3210 
3211  switch (tconn->pub.state) {
3212  case FR_TRUNK_CONN_HALTED:
3213  break;
3214 
3215  case FR_TRUNK_CONN_CLOSED:
3216  trunk_connection_remove(tconn);
3217  break;
3218 
3219  default:
3221  }
3222 
3223  fr_dlist_insert_head(&trunk->init, tconn);
3225 }
3226 
3227 /** Connection transitioned to the connecting state
3228  *
3229  * Reflect the connection state change in the lists we use to track connections.
3230  *
3231  * @note This function is only called from the connection API as a watcher.
3232  *
3233  * @param[in] conn The connection which changes state.
3234  * @param[in] prev The connection is was in.
3235  * @param[in] state The connection is now in.
3236  * @param[in] uctx The fr_trunk_connection_t wrapping the connection.
3237  */
3241  void *uctx)
3242 {
3243  fr_trunk_connection_t *tconn = talloc_get_type_abort(uctx, fr_trunk_connection_t);
3244  fr_trunk_t *trunk = tconn->pub.trunk;
3245 
3246  switch (tconn->pub.state) {
3247  case FR_TRUNK_CONN_INIT:
3248  case FR_TRUNK_CONN_CLOSED:
3249  trunk_connection_remove(tconn);
3250  break;
3251 
3252  default:
3254  }
3255 
3256  /*
3257  * If a connection just entered the
3258  * connecting state, it should have
3259  * no requests associated with it.
3260  */
3262 
3263  fr_dlist_insert_head(&trunk->connecting, tconn); /* MUST remain a head insertion for reconnect logic */
3265 }
3266 
3267 /** Connection transitioned to the shutdown state
3268  *
3269  * If we're not already in the draining-to-free state, transition there now.
3270  *
3271  * The idea is that if something signalled the connection to shutdown, we need
3272  * to reflect that by dequeuing any pending requests, not accepting new ones,
3273  * and waiting for the existing requests to complete.
3274  *
3275  * @note This function is only called from the connection API as a watcher.
3276  *
3277  * @param[in] conn The connection which changes state.
3278  * @param[in] prev The connection is was in.
3279  * @param[in] state The connection is now in.
3280  * @param[in] uctx The fr_trunk_connection_t wrapping the connection.
3281  */
3285  void *uctx)
3286 {
3287  fr_trunk_connection_t *tconn = talloc_get_type_abort(uctx, fr_trunk_connection_t);
3288 
3289  switch (tconn->pub.state) {
3290  case FR_TRUNK_CONN_DRAINING_TO_FREE: /* Do Nothing */
3291  return;
3292 
3293  case FR_TRUNK_CONN_ACTIVE: /* Transition to draining-to-free */
3294  case FR_TRUNK_CONN_FULL:
3298  break;
3299 
3300  case FR_TRUNK_CONN_INIT:
3302  case FR_TRUNK_CONN_CLOSED:
3303  case FR_TRUNK_CONN_HALTED:
3305  }
3306 
3308 }
3309 
3310 /** Trigger a reconnection of the trunk connection
3311  *
3312  * @param[in] el Event list the timer was inserted into.
3313  * @param[in] now Current time.
3314  * @param[in] uctx The tconn.
3315  */
3317 {
3318  fr_trunk_connection_t *tconn = talloc_get_type_abort(uctx, fr_trunk_connection_t);
3319 
3321 }
3322 
3323 /** Connection transitioned to the connected state
3324  *
3325  * Reflect the connection state change in the lists we use to track connections.
3326  *
3327  * @note This function is only called from the connection API as a watcher.
3328  *
3329  * @param[in] conn The connection which changes state.
3330  * @param[in] prev The connection is was in.
3331  * @param[in] state The connection is now in.
3332  * @param[in] uctx The fr_trunk_connection_t wrapping the connection.
3333  */
3337  void *uctx)
3338 {
3339  fr_trunk_connection_t *tconn = talloc_get_type_abort(uctx, fr_trunk_connection_t);
3340  fr_trunk_t *trunk = tconn->pub.trunk;
3341 
3342  /*
3343  * If a connection was just connected,
3344  * it should have no requests associated
3345  * with it.
3346  */
3348 
3349  /*
3350  * Set here, as the active state can
3351  * be transitioned to from full and
3352  * draining too.
3353  */
3354  trunk->pub.last_connected = fr_time();
3355 
3356  /*
3357  * Insert a timer to reconnect the
3358  * connection periodically.
3359  */
3360  if (fr_time_delta_ispos(trunk->conf.lifetime)) {
3361  if (fr_event_timer_in(tconn, trunk->el, &tconn->lifetime_ev,
3362  trunk->conf.lifetime, _trunk_connection_lifetime_expire, tconn) < 0) {
3363  PERROR("Failed inserting connection reconnection timer event, halting connection");
3365  return;
3366  }
3367  }
3368 
3370 }
3371 
3372 /** Connection failed after it was connected
3373  *
3374  * Reflect the connection state change in the lists we use to track connections.
3375  *
3376  * @note This function is only called from the connection API as a watcher.
3377  *
3378  * @param[in] conn The connection which changes state.
3379  * @param[in] prev The connection is was in.
3380  * @param[in] state The connection is now in.
3381  * @param[in] uctx The fr_trunk_connection_t wrapping the connection.
3382  */
3386  void *uctx)
3387 {
3388  fr_trunk_connection_t *tconn = talloc_get_type_abort(uctx, fr_trunk_connection_t);
3389  fr_trunk_t *trunk = tconn->pub.trunk;
3390  bool need_requeue = false;
3391 
3392  switch (tconn->pub.state) {
3393  case FR_TRUNK_CONN_ACTIVE:
3394  case FR_TRUNK_CONN_FULL:
3399  need_requeue = true;
3400  trunk_connection_remove(tconn);
3401  break;
3402 
3403  case FR_TRUNK_CONN_INIT: /* Initialisation failed */
3405  trunk_connection_remove(tconn);
3407  break;
3408 
3409  case FR_TRUNK_CONN_CLOSED:
3410  case FR_TRUNK_CONN_HALTED: /* Can't move backwards? */
3412  }
3413 
3414  fr_dlist_insert_head(&trunk->closed, tconn); /* MUST remain a head insertion for reconnect logic */
3416 
3417  /*
3418  * Now *AFTER* the connection has been
3419  * removed from the active, pool
3420  * re-enqueue the requests.
3421  */
3422  if (need_requeue) trunk_connection_requests_requeue(tconn, FR_TRUNK_REQUEST_STATE_ALL, 0, true);
3423 
3424  /*
3425  * There should be no requests left on this
3426  * connection. They should have all been
3427  * moved off or failed.
3428  */
3430 
3431  /*
3432  * Clear statistics and flags
3433  */
3434  tconn->sent_count = 0;
3435 
3436  /*
3437  * Remove the reconnect event
3438  */
3440 
3441  /*
3442  * Remove the I/O events
3443  */
3445 }
3446 
3447 /** Connection failed
3448  *
3449  * @param[in] conn The connection which changes state.
3450  * @param[in] prev The connection is was in.
3451  * @param[in] state The connection is now in.
3452  * @param[in] uctx The fr_trunk_connection_t wrapping the connection.
3453  */
3455  fr_connection_state_t prev,
3457  void *uctx)
3458 {
3459  fr_trunk_connection_t *tconn = talloc_get_type_abort(uctx, fr_trunk_connection_t);
3460  fr_trunk_t *trunk = tconn->pub.trunk;
3461 
3462  /*
3463  * Need to set this first as it
3464  * determines whether requests are
3465  * re-queued or fail outright.
3466  */
3467  trunk->pub.last_failed = fr_time();
3468 
3469  /*
3470  * Failed in the init state, transition the
3471  * connection to closed, else we get an
3472  * INIT -> INIT transition which triggers
3473  * an assert.
3474  */
3475  if (prev == FR_CONNECTION_STATE_INIT) _trunk_connection_on_closed(conn, prev, state, uctx);
3476 
3477  /*
3478  * See what the state of the trunk is
3479  * if there are no connections that could
3480  * potentially accept requests in the near
3481  * future, then fail all the requests in the
3482  * trunk backlog.
3483  */
3489 }
3490 
3491 /** Connection transitioned to the halted state
3492  *
3493  * Remove the connection remove all lists, as it's likely about to be freed.
3494  *
3495  * Setting the trunk back to the init state ensures that if the code is ever
3496  * refactored and #fr_connection_signal_reconnect is used after a connection
3497  * is halted, then everything is maintained in a valid state.
3498  *
3499  * @note This function is only called from the connection API as a watcher.
3500  *
3501  * @param[in] conn The connection which changes state.
3502  * @param[in] prev The connection is was in.
3503  * @param[in] state The connection is now in.
3504  * @param[in] uctx The fr_trunk_connection_t wrapping the connection.
3505  */
3509  void *uctx)
3510 {
3511  fr_trunk_connection_t *tconn = talloc_get_type_abort(uctx, fr_trunk_connection_t);
3512  fr_trunk_t *trunk = tconn->pub.trunk;
3513 
3514  switch (tconn->pub.state) {
3515  case FR_TRUNK_CONN_INIT:
3516  case FR_TRUNK_CONN_CLOSED:
3517  trunk_connection_remove(tconn);
3518  break;
3519 
3520  default:
3522  }
3523 
3524  /*
3525  * It began life in the halted state,
3526  * and will end life in the halted state.
3527  */
3529 
3530  /*
3531  * There should be no requests left on this
3532  * connection. They should have all been
3533  * moved off or failed.
3534  */
3536 
3537  /*
3538  * And free the connection...
3539  */
3540  if (trunk->in_handler) {
3541  /*
3542  * ...later.
3543  */
3544  fr_dlist_insert_tail(&trunk->to_free, tconn);
3545  return;
3546  }
3547  talloc_free(tconn);
3548 }
3549 
3550 /** Free a connection
3551  *
3552  * Enforces orderly free order of children of the tconn
3553  */
3555 {
3557  fr_assert(!fr_dlist_entry_in_list(&tconn->entry)); /* Should not be in a list */
3558 
3559  /*
3560  * Loop over all the requests we gathered
3561  * and transition them to the failed state,
3562  * freeing them.
3563  *
3564  * Usually, requests will be re-queued when
3565  * a connection enters the closed state,
3566  * but in this case because the whole trunk
3567  * is being freed, we don't bother, and
3568  * just signal to the API client that the
3569  * requests failed.
3570  */
3571  if (tconn->pub.trunk->freeing) {
3572  fr_dlist_head_t to_fail;
3573  fr_trunk_request_t *treq = NULL;
3574 
3575  fr_dlist_talloc_init(&to_fail, fr_trunk_request_t, entry);
3576 
3577  /*
3578  * Remove requests from this connection
3579  */
3581  while ((treq = fr_dlist_next(&to_fail, treq))) {
3582  fr_trunk_request_t *prev;
3583 
3584  prev = fr_dlist_remove(&to_fail, treq);
3586  treq = prev;
3587  }
3588  }
3589 
3590  /*
3591  * Ensure we're not signalled by the connection
3592  * as it processes its backlog of state changes,
3593  * as we are about to be freed.
3594  */
3602 
3603  /*
3604  * This may return -1, indicating the free was deferred
3605  * this is fine. It just means the conn will be freed
3606  * after all the handlers have exited.
3607  */
3608  (void)talloc_free(tconn->pub.conn);
3609  tconn->pub.conn = NULL;
3610 
3611  return 0;
3612 }
3613 
3614 /** Attempt to spawn a new connection
3615  *
3616  * Calls the API client's alloc() callback to create a new fr_connection_t,
3617  * then inserts the connection into the 'connecting' list.
3618  *
3619  * @param[in] trunk to spawn connection in.
3620  * @param[in] now The current time.
3621  */
3623 {
3624  fr_trunk_connection_t *tconn;
3625 
3626 
3627  /*
3628  * Call the API client's callback to create
3629  * a new fr_connection_t.
3630  */
3631  MEM(tconn = talloc_zero(trunk, fr_trunk_connection_t));
3632  tconn->pub.trunk = trunk;
3633  tconn->pub.state = FR_TRUNK_CONN_HALTED; /* All connections start in the halted state */
3634 
3635  /*
3636  * Allocate a new fr_connection_t or fail.
3637  */
3638  DO_CONNECTION_ALLOC(tconn);
3639 
3641  fr_dlist_talloc_init(&tconn->sent, fr_trunk_request_t, entry);
3644 
3645  /*
3646  * OK, we have the connection, now setup watch
3647  * points so we know when it changes state.
3648  *
3649  * This lets us automatically move the tconn
3650  * between the different lists in the trunk
3651  * with minimum extra code.
3652  */
3654  _trunk_connection_on_init, false, tconn); /* Before init() has been called */
3655 
3657  _trunk_connection_on_connecting, false, tconn); /* After init() has been called */
3658 
3660  _trunk_connection_on_connected, false, tconn); /* After open() has been called */
3661 
3663  _trunk_connection_on_closed, false, tconn); /* Before close() has been called */
3664 
3666  _trunk_connection_on_failed, false, tconn); /* Before failed() has been called */
3667 
3669  _trunk_connection_on_shutdown, false, tconn); /* After shutdown() has been called */
3670 
3672  _trunk_connection_on_halted, false, tconn); /* About to be freed */
3673 
3674  talloc_set_destructor(tconn, _trunk_connection_free);
3675 
3676  fr_connection_signal_init(tconn->pub.conn); /* annnnd GO! */
3677 
3678  trunk->pub.last_open = now;
3679 
3680  return 0;
3681 }
3682 
3683 /** Pop a cancellation request off a connection's cancellation queue
3684  *
3685  * The request we return is advanced by the request moving out of the
3686  * cancel state and into the cancel_sent or cancel_complete state.
3687  *
3688  * One of these signalling functions must be called after the request
3689  * has been popped:
3690  *
3691  * - #fr_trunk_request_signal_cancel_sent
3692  * The remote datastore has been informed, but we need to wait for acknowledgement.
3693  * The #fr_trunk_request_demux_t callback must handle the acks calling
3694  * #fr_trunk_request_signal_cancel_complete when an ack is received.
3695  *
3696  * - #fr_trunk_request_signal_cancel_complete
3697  * The request was cancelled and we don't need to wait, clean it up immediately.
3698  *
3699  * @param[out] treq_out to process
3700  * @param[in] tconn Connection to drain cancellation request from.
3701  * @return
3702  * - 1 if no more requests.
3703  * - 0 if a new request was written to treq_out.
3704  * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3705  * memory or requests associated with the connection.
3706  * - -2 if called outside of the cancel muxer.
3707  */
3709 {
3710  if (unlikely(tconn->pub.state == FR_TRUNK_CONN_HALTED)) return -1;
3711 
3713  "%s can only be called from within request_cancel_mux handler",
3714  __FUNCTION__)) return -2;
3715 
3716  *treq_out = tconn->cancel_partial ? tconn->cancel_partial : fr_dlist_head(&tconn->cancel);
3717  if (!*treq_out) return 1;
3718 
3719  return 0;
3720 }
3721 
3722 /** Pop a request off a connection's pending queue
3723  *
3724  * The request we return is advanced by the request moving out of the partial or
3725  * pending states, when the mux function signals us.
3726  *
3727  * If the same request is returned again and again, it means the muxer isn't actually
3728  * doing anything with the request we returned, and it's and error in the muxer code.
3729  *
3730  * One of these signalling functions must be used after the request has been popped:
3731  *
3732  * - #fr_trunk_request_signal_complete
3733  * The request was completed. Either we got a synchronous response, or we knew the
3734  * response without contacting an external server (cache).
3735  *
3736  * - #fr_trunk_request_signal_fail
3737  * Failed muxing the request due to a permanent issue, i.e. an invalid request.
3738  *
3739  * - #fr_trunk_request_signal_partial
3740  * Wrote part of a request. This request will be returned on the next call to this
3741  * function so that the request_mux function can finish writing it. Only useful
3742  * for stream type connections. Datagram type connections cannot have partial
3743  * writes.
3744  *
3745  * - #fr_trunk_request_signal_sent Successfully sent a request.
3746  *
3747  * @param[out] treq_out to process
3748  * @param[in] tconn to pop a request from.
3749  * @return
3750  * - 1 if no more requests.
3751  * - 0 if a new request was written to treq_out.
3752  * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3753  * memory or requests associated with the connection.
3754  * - -2 if called outside of the muxer.
3755  */
3757 {
3758  if (unlikely(tconn->pub.state == FR_TRUNK_CONN_HALTED)) return -1;
3759 
3761  "%s can only be called from within request_mux handler",
3762  __FUNCTION__)) return -2;
3763 
3764  *treq_out = tconn->partial ? tconn->partial : fr_heap_peek(tconn->pending);
3765  if (!*treq_out) return 1;
3766 
3767  return 0;
3768 }
3769 
3770 /** Signal that a trunk connection is writable
3771  *
3772  * Should be called from the 'write' I/O handler to signal that requests can be enqueued.
3773  *
3774  * @param[in] tconn to signal.
3775  */
3777 {
3778  fr_trunk_t *trunk = tconn->pub.trunk;
3779 
3780  if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3781  "%s cannot be called within a handler", __FUNCTION__)) return;
3782 
3783  DEBUG3("[%" PRIu64 "] Signalled writable", tconn->pub.conn->id);
3784 
3786 }
3787 
3788 /** Signal that a trunk connection is readable
3789  *
3790  * Should be called from the 'read' I/O handler to signal that requests should be dequeued.
3791  *
3792  * @param[in] tconn to signal.
3793  */
3795 {
3796  fr_trunk_t *trunk = tconn->pub.trunk;
3797 
3798  if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3799  "%s cannot be called within a handler", __FUNCTION__)) return;
3800 
3801  DEBUG3("[%" PRIu64 "] Signalled readable", tconn->pub.conn->id);
3802 
3804 }
3805 
3806 /** Signal a trunk connection cannot accept more requests
3807  *
3808  * @param[in] tconn to signal.
3809  */
3811 {
3812  /* Can be called anywhere */
3813 
3814  switch (tconn->pub.state) {
3815  case FR_TRUNK_CONN_ACTIVE:
3816  case FR_TRUNK_CONN_FULL:
3818  break;
3819 
3822  break;
3823 
3824  default:
3825  return;
3826  }
3827 }
3828 
3829 /** Signal a trunk connection is no longer full
3830  *
3831  * @param[in] tconn to signal.
3832  */
3834 {
3835  switch (tconn->pub.state) {
3836  case FR_TRUNK_CONN_FULL:
3837  trunk_connection_auto_unfull(tconn); /* Mark as active if it should be active */
3838  break;
3839 
3841  /*
3842  * Do the appropriate state transition based on
3843  * how many requests the trunk connection is
3844  * currently servicing.
3845  */
3846  if (trunk_connection_is_full(tconn)) {
3848  break;
3849  }
3851  break;
3852 
3853  /*
3854  * Unsetting the active flag just moves
3855  * the connection back to the normal
3856  * draining state.
3857  */
3858  case FR_TRUNK_CONN_INACTIVE_DRAINING: /* Only an external signal can trigger this transition */
3860  break;
3861 
3862  default:
3863  return;
3864  }
3865 }
3866 
3867 /** Signal a trunk connection is no longer viable
3868  *
3869  * @param[in] tconn to signal.
3870  * @param[in] reason the connection is being reconnected.
3871  */
3873 {
3874  fr_connection_signal_reconnect(tconn->pub.conn, reason);
3875 }
3876 
3877 /** Standard I/O read function
3878  *
3879  * Underlying FD in now readable, so call the trunk to read any pending requests
3880  * from this connection.
3881  *
3882  * @param[in] el The event list signalling.
3883  * @param[in] fd that's now readable.
3884  * @param[in] flags describing the read event.
3885  * @param[in] uctx The trunk connection handle (tconn).
3886  */
3888 {
3889  fr_trunk_connection_t *tconn = talloc_get_type_abort(uctx, fr_trunk_connection_t);
3890 
3892 }
3893 
3894 /** Standard I/O write function
3895  *
3896  * Underlying FD is now writable, so call the trunk to write any pending requests
3897  * to this connection.
3898  *
3899  * @param[in] el The event list signalling.
3900  * @param[in] fd that's now writable.
3901  * @param[in] flags describing the write event.
3902  * @param[in] uctx The trunk connection handle (tcon).
3903  */
3905 {
3906  fr_trunk_connection_t *tconn = talloc_get_type_abort(uctx, fr_trunk_connection_t);
3907 
3909 }
3910 
3911 
3912 /** Returns true if the trunk connection is in one of the specified states
3913  *
3914  * @param[in] tconn To check state for.
3915  * @param[in] state to check
3916  * @return
3917  * - True if trunk connection is in a particular state.
3918  * - False if trunk connection is not in a particular state.
3919  */
3921 {
3922  return (bool)(tconn->pub.state & state);
3923 }
3924 
3925 /** Close connections in a particular connection list if they have no requests associated with them
3926  *
3927  * @param[in] trunk containing connections we want to close.
3928  * @param[in] head of list of connections to examine.
3929  */
3931 {
3932  fr_trunk_connection_t *tconn = NULL;
3933 
3934  while ((tconn = fr_dlist_next(head, tconn))) {
3935  fr_trunk_connection_t *prev;
3936 
3938 
3939  prev = fr_dlist_prev(head, tconn);
3940 
3941  DEBUG3("Closing %s connection with no requests",
3943  /*
3944  * Close the connection as gracefully
3945  * as possible by signalling it should
3946  * shutdown.
3947  *
3948  * The connection, should, if serviced
3949  * correctly by the underlying library,
3950  * automatically transition to halted after
3951  * all pending reads/writes are
3952  * complete at which point we'll be informed
3953  * and free our tconn wrapper.
3954  */
3956  tconn = prev;
3957  }
3958 }
3959 
3960 /** Rebalance connections across active trunk members when a new connection becomes active
3961  *
3962  * We don't have any visibility into the connection prioritisation algorithm
3963  * it's essentially a black box.
3964  *
3965  * We can however determine when the correct level of requests per connection
3966  * has been reached, by dequeuing and requeing requests up until the point
3967  * where the connection that just had a request dequeued, receives the same
3968  * request back.
3969  *
3970  * @param[in] trunk The trunk to rebalance.
3971  */
3972 static void trunk_rebalance(fr_trunk_t *trunk)
3973 {
3975 
3977 
3978  /*
3979  * Only rebalance if the top and bottom of
3980  * the heap are not equal.
3981  */
3982  if (trunk->funcs.connection_prioritise(fr_minmax_heap_max_peek(trunk->active), head) == 0) return;
3983 
3984  DEBUG3("Rebalancing requests");
3985 
3986  /*
3987  * Keep requeuing requests from the connection
3988  * at the bottom of the heap until the
3989  * connection at the top is shifted from that
3990  * position.
3991  */
3992  while ((fr_minmax_heap_min_peek(trunk->active) == head) &&
3994  FR_TRUNK_REQUEST_STATE_PENDING, 1, false));
3995 }
3996 
3997 /** Implements the algorithm we use to manage requests per connection levels
3998  *
3999  * This is executed periodically using a timer event, and opens/closes
4000  * connections.
4001  *
4002  * The aim is to try and keep the request per connection level in a sweet spot,
4003  * where there's enough outstanding work for the connection/pipelining to work
4004  * efficiently, but not so much so that we encounter increased latency.
4005  *
4006  * In the request enqueue and dequeue functions we record every time the
4007  * average number of requests per connection goes above the target count
4008  * and record every time the average number of requests per connection goes
4009  * below the target count.
4010  *
4011  * This may sound expensive, but in all cases we're just summing counters.
4012  * CPU time required does not increase with additional requests, only with
4013  * large numbers of connections.
4014  *
4015  * If we do encounter scaling issues, we can always maintain the counters
4016  * as aggregates as an optimisation later.
4017  *
4018  * If when the management function runs, the trunk was above the target
4019  * most recently, we:
4020  * - Return if we've been in this state for a shorter period than 'open_delay'.
4021  * - Return if we're at max.
4022  * - Return if opening a new connection will take us below the load target.
4023  * - Return if we last opened a connection within 'open_delay'.
4024  * - Otherwise we attempt to open a new connection.
4025  *
4026  * If the trunk we below the target most recently, we:
4027  * - Return if we've been in this state for a shorter period than 'close_delay'.
4028  * - Return if we're at min.
4029  * - Return if we have no connections.
4030  * - Close a connection if min is 0, and we have no outstanding
4031  * requests. Then return.
4032  * - Return if closing a new connection will take us above the load target.
4033  * - Return if we last closed a connection within 'closed_delay'.
4034  * - Otherwise we move a connection to draining state.
4035  */
4036 static void trunk_manage(fr_trunk_t *trunk, fr_time_t now)
4037 {
4038  fr_trunk_connection_t *tconn = NULL;
4039  fr_trunk_request_t *treq;
4040  uint32_t average = 0;
4041  uint32_t req_count;
4042  uint16_t conn_count;
4043  fr_trunk_state_t new_state;
4044 
4045  DEBUG4("Managing trunk");
4046 
4047  /*
4048  * Cleanup requests in our request cache which
4049  * have been idle for too long.
4050  */
4051  while ((treq = fr_dlist_tail(&trunk->free_requests)) &&
4053 
4054  /*
4055  * Free any connections which have drained
4056  * and we didn't reactivate during the last
4057  * round of management.
4058  */
4060  trunk_connection_close_if_empty(trunk, &trunk->draining);
4062 
4063  /*
4064  * Process deferred connection freeing
4065  */
4066  if (!trunk->in_handler) {
4067  while ((tconn = fr_dlist_head(&trunk->to_free))) talloc_free(fr_dlist_remove(&trunk->to_free, tconn));
4068  }
4069 
4070  /*
4071  * Update the state of the trunk
4072  */
4074  new_state = FR_TRUNK_STATE_ACTIVE;
4075  } else {
4076  /*
4077  * INIT / CONNECTING / FULL mean connections will become active
4078  * so the trunk is PENDING
4079  */
4084  }
4085 
4086  if (new_state != trunk->pub.state) TRUNK_STATE_TRANSITION(new_state);
4087 
4088  /*
4089  * A trunk can be signalled to not proactively
4090  * manage connections if a destination is known
4091  * to be unreachable, and doing so would result
4092  * in spurious connections still being opened.
4093  *
4094  * We still run other connection management
4095  * functions and just short circuit the function
4096  * here.
4097  */
4098  if (!trunk->managing_connections) return;
4099 
4100  /*
4101  * We're above the target requests per connection
4102  * spawn more connections!
4103  */
4104  if (fr_time_gteq(trunk->pub.last_above_target, trunk->pub.last_below_target)) {
4105  /*
4106  * If connecting is provided, check we
4107  * wouldn't have too many connections in
4108  * the connecting state.
4109  *
4110  * This is a throttle in the case of transitory
4111  * load spikes, or a backend becoming
4112  * unavailable.
4113  */
4114  if ((trunk->conf.connecting > 0) &&
4116  trunk->conf.connecting)) {
4117  DEBUG4("Not opening connection - Too many (%u) connections in the connecting state",
4118  trunk->conf.connecting);
4119  return;
4120  }
4121 
4122  trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4123 
4124  /*
4125  * Only apply hysteresis if we have at least
4126  * one available connection.
4127  */
4128  if (conn_count && fr_time_gt(fr_time_add(trunk->pub.last_above_target, trunk->conf.open_delay), now)) {
4129  DEBUG4("Not opening connection - Need to be above target for %pVs. It's been %pVs",
4132  return; /* too soon */
4133  }
4134 
4135  /*
4136  * We don't consider 'draining' connections
4137  * in the max calculation, as if we do
4138  * determine that we need to spawn a new
4139  * request, then we'd move all 'draining'
4140  * connections to active before spawning
4141  * any new connections.
4142  */
4143  if ((trunk->conf.max > 0) && (conn_count >= trunk->conf.max)) {
4144  DEBUG4("Not opening connection - Have %u connections, need %u or below",
4145  conn_count, trunk->conf.max);
4146  return;
4147  }
4148 
4149  /*
4150  * We consider requests pending on all connections
4151  * and the trunk's backlog as that's the current count
4152  * load.
4153  */
4154  if (!req_count) {
4155  DEBUG4("Not opening connection - No outstanding requests");
4156  return;
4157  }
4158 
4159  /*
4160  * Do the n+1 check, i.e. if we open one connection
4161  * will that take us below our target threshold.
4162  */
4163  if (conn_count > 0) {
4164  average = ROUND_UP_DIV(req_count, (conn_count + 1));
4165  if (average < trunk->conf.target_req_per_conn) {
4166  DEBUG4("Not opening connection - Would leave us below our target requests "
4167  "per connection (now %u, after open %u)",
4168  ROUND_UP_DIV(req_count, conn_count), average);
4169  return;
4170  }
4171  } else {
4172  (void)trunk_connection_spawn(trunk, now);
4173  return;
4174  }
4175 
4176  /*
4177  * If we've got a connection in the draining list
4178  * move it back into the active list if we've
4179  * been requested to add a connection back in.
4180  */
4181  tconn = fr_dlist_head(&trunk->draining);
4182  if (tconn) {
4183  if (trunk_connection_is_full(tconn)) {
4185  } else {
4187  }
4188  return;
4189  }
4190 
4191  /*
4192  * Implement delay if there's no connections that
4193  * could be immediately re-activated.
4194  */
4195  if (fr_time_gt(fr_time_add(trunk->pub.last_open, trunk->conf.open_delay), now)) {
4196  DEBUG4("Not opening connection - Need to wait %pVs before opening another connection. "
4197  "It's been %pVs",
4199  fr_box_time_delta(fr_time_sub(now, trunk->pub.last_open)));
4200  return;
4201  }
4202 
4203  DEBUG4("Opening connection - Above target requests per connection (now %u, target %u)",
4204  ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4205  /* last_open set by trunk_connection_spawn */
4206  (void)trunk_connection_spawn(trunk, now);
4207  }
4208 
4209  /*
4210  * We're below the target requests per connection.
4211  * Free some connections...
4212  */
4213  else if (fr_time_gt(trunk->pub.last_below_target, trunk->pub.last_above_target)) {
4214  if (fr_time_gt(fr_time_add(trunk->pub.last_below_target, trunk->conf.close_delay), now)) {
4215  DEBUG4("Not closing connection - Need to be below target for %pVs. It's been %pVs",
4218  return; /* too soon */
4219  }
4220 
4221  trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4222 
4223  if (!conn_count) {
4224  DEBUG4("Not closing connection - No connections to close!");
4225  return;
4226  }
4227 
4228  if ((trunk->conf.min > 0) && ((conn_count - 1) < trunk->conf.min)) {
4229  DEBUG4("Not closing connection - Have %u connections, need %u or above",
4230  conn_count, trunk->conf.min);
4231  return;
4232  }
4233 
4234  if (!req_count) {
4235  DEBUG4("Closing connection - No outstanding requests");
4236  goto close;
4237  }
4238 
4239  /*
4240  * The minimum number of connections must be set
4241  * to zero for this to work.
4242  * min == 0, no requests, close all the connections.
4243  * This is useful for backup databases, when
4244  * maintaining the connection would lead to lots of
4245  * log file churn.
4246  */
4247  if (conn_count == 1) {
4248  DEBUG4("Not closing connection - Would leave connections "
4249  "and there are still %u outstanding requests", req_count);
4250  return;
4251  }
4252 
4253  /*
4254  * Do the n-1 check, i.e. if we close one connection
4255  * will that take us above our target threshold.
4256  */
4257  average = ROUND_UP_DIV(req_count, (conn_count - 1));
4258  if (average > trunk->conf.target_req_per_conn) {
4259  DEBUG4("Not closing connection - Would leave us above our target requests per connection "
4260  "(now %u, after close %u)", ROUND_UP_DIV(req_count, conn_count), average);
4261  return;
4262  }
4263 
4264  DEBUG4("Closing connection - Below target requests per connection (now %u, target %u)",
4265  ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4266 
4267  close:
4268  if (fr_time_gt(fr_time_add(trunk->pub.last_closed, trunk->conf.close_delay), now)) {
4269  DEBUG4("Not closing connection - Need to wait %pVs before closing another connection. "
4270  "It's been %pVs",
4273  return;
4274  }
4275 
4276  /*
4277  * Inactive connections get counted in the
4278  * set of viable connections, but are likely
4279  * to be congested or dead, so we drain
4280  * (and possibly eventually free) those first.
4281  */
4282  if ((tconn = fr_dlist_tail(&trunk->inactive))) {
4283  /*
4284  * If the connection has no requests associated
4285  * with it then immediately free.
4286  */
4288  fr_connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4289  } else {
4291  }
4292  /*
4293  * It is possible to have too may connecting
4294  * connections when the connections are
4295  * taking a while to open and the number
4296  * of requests decreases.
4297  */
4298  } else if ((tconn = fr_dlist_tail(&trunk->connecting))) {
4299  fr_connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4300 
4301  /*
4302  * Finally if there are no "connecting"
4303  * connections to close, and no "inactive"
4304  * connections, start draining "active"
4305  * connections.
4306  */
4307  } else if ((tconn = fr_minmax_heap_max_peek(trunk->active))) {
4308  /*
4309  * If the connection has no requests associated
4310  * with it then immediately free.
4311  */
4313  fr_connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4314  } else {
4316  }
4317  }
4318 
4319  trunk->pub.last_closed = now;
4320 
4321 
4322  return;
4323  }
4324 }
4325 
4326 /** Event to periodically call the connection management function
4327  *
4328  * @param[in] el this event belongs to.
4329  * @param[in] now current time.
4330  * @param[in] uctx The trunk.
4331  */
4332 static void _trunk_timer(fr_event_list_t *el, fr_time_t now, void *uctx)
4333 {
4334  fr_trunk_t *trunk = talloc_get_type_abort(uctx, fr_trunk_t);
4335 
4336  trunk_manage(trunk, now);
4337 
4339  if (fr_event_timer_in(trunk, el, &trunk->manage_ev, trunk->conf.manage_interval,
4340  _trunk_timer, trunk) < 0) {
4341  PERROR("Failed inserting trunk management event");
4342  /* Not much we can do, hopefully the trunk will be freed soon */
4343  }
4344  }
4345 }
4346 
4347 /** Return a count of requests on a connection in a specific state
4348  *
4349  * @param[in] trunk to retrieve counts for.
4350  * @param[in] conn_state One or more connection states or'd together.
4351  * @param[in] req_state One or more request states or'd together.
4352  * @return The number of requests in a particular state, on connection in a particular state.
4353  */
4354 uint64_t fr_trunk_request_count_by_state(fr_trunk_t *trunk, int conn_state, int req_state)
4355 {
4356  uint64_t count = 0;
4357  fr_trunk_connection_t *tconn = NULL;
4358  fr_minmax_heap_iter_t iter;
4359 
4360 #define COUNT_BY_STATE(_state, _list) \
4361 do { \
4362  if (conn_state & (_state)) { \
4363  tconn = NULL; \
4364  while ((tconn = fr_dlist_next(&trunk->_list, tconn))) { \
4365  count += fr_trunk_request_count_by_connection(tconn, req_state); \
4366  } \
4367  } \
4368 } while (0)
4369 
4370  if (conn_state & FR_TRUNK_CONN_ACTIVE) {
4371  for (tconn = fr_minmax_heap_iter_init(trunk->active, &iter);
4372  tconn;
4373  tconn = fr_minmax_heap_iter_next(trunk->active, &iter)) {
4374  count += fr_trunk_request_count_by_connection(tconn, req_state);
4375  }
4376  }
4377 
4380  COUNT_BY_STATE(FR_TRUNK_CONN_INACTIVE_DRAINING, inactive_draining);
4383 
4385 
4386  return count;
4387 }
4388 
4389 /** Update timestamps for when we last had a transition from above target to below target or vice versa
4390  *
4391  * Should be called on every time a connection or request is allocated or freed.
4392  *
4393  * @param[out] conn_count_out How many connections we considered.
4394  * @param[out] req_count_out How many requests we considered.
4395  * @param[in] trunk to operate on.
4396  * @param[in] now The current time.
4397  * @param[in] verify if true (and this is a debug build), then assert if req_per_conn
4398  * has changed.
4399  * @return
4400  * - 0 if the average couldn't be calculated (no requests or no connections).
4401  * - The average number of requests per connection.
4402  */
4403 static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_count_out,
4404  fr_trunk_t *trunk, fr_time_t now,
4405  NDEBUG_UNUSED bool verify)
4406 {
4407  uint32_t req_count = 0;
4408  uint16_t conn_count = 0;
4409  uint64_t req_per_conn = 0;
4410 
4411  fr_assert(fr_time_gt(now, fr_time_wrap(0)));
4412 
4413  /*
4414  * No need to update these as the trunk is being freed
4415  */
4416  if (trunk->freeing) goto done;
4417 
4418  /*
4419  * Count all connections except draining and draining to free.
4420  *
4421  * Omitting these connection states artificially raises the
4422  * request to connection ratio, so that we can preemptively spawn
4423  * new connections.
4424  *
4425  * In the case of FR_TRUNK_CONN_DRAINING | FR_TRUNK_CONN_INACTIVE_DRAINING
4426  * the trunk management code has enough hysteresis to not
4427  * immediately reactivate the connection.
4428  *
4429  * In the case of TRUNK_CONN_DRAINING_TO_FREE the trunk
4430  * management code should spawn a new connection to takes its place.
4431  *
4432  * Connections placed in the DRAINING_TO_FREE state are being
4433  * closed preemptively to deal with bugs on the server we're
4434  * talking to, or misconfigured firewalls which are trashing
4435  * TCP/UDP connection states.
4436  */
4441 
4442  /*
4443  * Requests on all connections
4444  */
4445  req_count = fr_trunk_request_count_by_state(trunk,
4448 
4449  /*
4450  * No connections, but we do have requests
4451  */
4452  if (conn_count == 0) {
4453  if ((req_count > 0) && (trunk->conf.target_req_per_conn > 0)) goto above_target;
4454  goto done;
4455  }
4456 
4457  if (req_count == 0) {
4458  if (trunk->conf.target_req_per_conn > 0) goto below_target;
4459  goto done;
4460  }
4461 
4462  /*
4463  * Calculate the req_per_conn
4464  */
4465  req_per_conn = ROUND_UP_DIV(req_count, conn_count);
4466  if (req_per_conn > trunk->conf.target_req_per_conn) {
4467  above_target:
4468  /*
4469  * Edge - Below target to above target (too many requests per conn - spawn more)
4470  *
4471  * The equality check is correct here as both values start at 0.
4472  */
4473  if (fr_time_lteq(trunk->pub.last_above_target, trunk->pub.last_below_target)) trunk->pub.last_above_target = now;
4474  } else if (req_per_conn < trunk->conf.target_req_per_conn) {
4475  below_target:
4476  /*
4477  * Edge - Above target to below target (too few requests per conn - close some)
4478  *
4479  * The equality check is correct here as both values start at 0.
4480  */
4481  if (fr_time_lteq(trunk->pub.last_below_target, trunk->pub.last_above_target)) trunk->pub.last_below_target = now;
4482  }
4483 
4484 done:
4485  if (conn_count_out) *conn_count_out = conn_count;
4486  if (req_count_out) *req_count_out = req_count;
4487 
4488  /*
4489  * Check we haven't missed a call to trunk_requests_per_connection
4490  */
4491  fr_assert(!verify || (trunk->last_req_per_conn == 0) || (req_per_conn == trunk->last_req_per_conn));
4492 
4493  trunk->last_req_per_conn = req_per_conn;
4494 
4495  return req_per_conn;
4496 }
4497 
4498 /** Drain the backlog of as many requests as possible
4499  *
4500  * @param[in] trunk To drain backlog requests for.
4501  */
4502 static void trunk_backlog_drain(fr_trunk_t *trunk)
4503 {
4504  fr_trunk_request_t *treq;
4505 
4506  if (fr_heap_num_elements(trunk->backlog) == 0) return;
4507 
4508  /*
4509  * If it's always writable, this isn't
4510  * really a noteworthy event.
4511  */
4512  if (!trunk->conf.always_writable) DEBUG3("Draining backlog of requests");
4513 
4514  /*
4515  * Do *NOT* add an artificial limit
4516  * here. We rely on all available
4517  * connections entering the full
4518  * state and transitioning back to
4519  * active in order to drain the
4520  * backlog.
4521  */
4522  while ((treq = fr_heap_peek(trunk->backlog))) {
4523  switch (trunk_request_enqueue_existing(treq)) {
4524  case FR_TRUNK_ENQUEUE_OK:
4525  continue;
4526 
4527  /*
4528  * Signal to stop
4529  */
4531  break;
4532 
4533  /*
4534  * Failed enqueueing the request,
4535  * have it enter the failed state
4536  * which will free it and
4537  * re-enliven the yielded request.
4538  */
4540  case FR_TRUNK_ENQUEUE_FAIL:
4542  continue;
4543 
4546  return;
4547  }
4548  }
4549 }
4550 
4551 /** Force the trunk to re-establish its connections
4552  *
4553  * @param[in] trunk to signal.
4554  * @param[in] states One or more states or'd together.
4555  * @param[in] reason Why the connections are being signalled to reconnect.
4556  */
4557 void fr_trunk_reconnect(fr_trunk_t *trunk, int states, fr_connection_reason_t reason)
4558 {
4559 
4560 #define RECONNECT_BY_STATE(_state, _list) \
4561 do { \
4562  if (states & (_state)) { \
4563  size_t i; \
4564  for (i = fr_dlist_num_elements(&trunk->_list); i > 0; i--) { \
4565  fr_connection_signal_reconnect(((fr_trunk_connection_t *)fr_dlist_tail(&trunk->_list))->pub.conn, reason); \
4566  } \
4567  } \
4568 } while (0)
4569 
4570  /*
4571  * Connections in the 'connecting' state
4572  * may re-enter that state, so we need to
4573  * be careful not to enter an infinite
4574  * loop, as we iterate over the list
4575  * again and again.
4576  */
4578 
4579  if (states & FR_TRUNK_CONN_ACTIVE) {
4580  fr_trunk_connection_t *tconn;
4581  while ((tconn = fr_minmax_heap_min_peek(trunk->active))) fr_connection_signal_reconnect(tconn->pub.conn, reason);
4582  }
4583 
4591 }
4592 
4593 /** Start the trunk running
4594  *
4595  */
4597 {
4598  uint16_t i;
4599 
4600  if (unlikely(trunk->started)) return 0;
4601 
4602  /*
4603  * Spawn the initial set of connections
4604  */
4605  for (i = 0; i < trunk->conf.start; i++) {
4606  DEBUG("[%i] Starting initial connection", i);
4607  if (trunk_connection_spawn(trunk, fr_time()) != 0) return -1;
4608  }
4609 
4611  /*
4612  * Insert the event timer to manage
4613  * the interval between managing connections.
4614  */
4615  if (fr_event_timer_in(trunk, trunk->el, &trunk->manage_ev, trunk->conf.manage_interval,
4616  _trunk_timer, trunk) < 0) {
4617  PERROR("Failed inserting trunk management event");
4618  return -1;
4619  }
4620  }
4621  trunk->started = true;
4622  trunk->managing_connections = true;
4623 
4624  return 0;
4625 }
4626 
4627 /** Allow the trunk to open and close connections in response to load
4628  *
4629  */
4631 {
4632  if (!trunk->started || trunk->managing_connections) return;
4633 
4634  DEBUG3("Connection management enabled");
4635  trunk->managing_connections = true;
4636 }
4637 
4638 /** Stop the trunk from opening and closing connections in response to load
4639  *
4640  */
4642 {
4643  if (!trunk->started || !trunk->managing_connections) return;
4644 
4645  DEBUG3("Connection management disabled");
4646  trunk->managing_connections = false;
4647 }
4648 
4649 /** Schedule a trunk management event for the next time the event loop is executed
4650  */
4652 {
4653  if (!trunk->started || !trunk->managing_connections) return 0;
4654 
4655  if (fr_event_timer_in(trunk, trunk->el, &trunk->manage_ev, fr_time_delta_wrap(0), _trunk_timer, trunk) < 0) {
4656  PERROR("Failed inserting trunk management event");
4657  return -1;
4658  }
4659 
4660  return 0;
4661 }
4662 
4663 /** Order connections by queue depth
4664  *
4665  */
4666 static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
4667 {
4670 
4673 
4674  /*
4675  * Add a fudge factor of 1 to reduce spurious rebalancing
4676  */
4677  return ((a_count > b_count) && ((a_count - b_count) > 1)) - ((b_count > a_count) && ((b_count - a_count) > 1));
4678 }
4679 
4680 /** Free a trunk, gracefully closing all connections.
4681  *
4682  */
4683 static int _trunk_free(fr_trunk_t *trunk)
4684 {
4685  fr_trunk_connection_t *tconn;
4686  fr_trunk_request_t *treq;
4687  fr_trunk_watch_entry_t *watch;
4688  size_t i;
4689 
4690  DEBUG4("Trunk free %p", trunk);
4691 
4692  trunk->freeing = true; /* Prevent re-enqueuing */
4693 
4694  /*
4695  * We really don't want this firing after
4696  * we've freed everything.
4697  */
4699 
4700  /*
4701  * Now free the connections in each of the lists.
4702  *
4703  * Each time a connection is freed it removes itself from the list
4704  * its in, which means the head should keep advancing automatically.
4705  */
4706  while ((tconn = fr_minmax_heap_min_peek(trunk->active))) fr_connection_signal_halt(tconn->pub.conn);
4707  while ((tconn = fr_dlist_head(&trunk->init))) fr_connection_signal_halt(tconn->pub.conn);
4708  while ((tconn = fr_dlist_head(&trunk->connecting))) fr_connection_signal_halt(tconn->pub.conn);
4709  while ((tconn = fr_dlist_head(&trunk->full))) fr_connection_signal_halt(tconn->pub.conn);
4710  while ((tconn = fr_dlist_head(&trunk->inactive))) fr_connection_signal_halt(tconn->pub.conn);
4711  while ((tconn = fr_dlist_head(&trunk->inactive_draining))) fr_connection_signal_halt(tconn->pub.conn);
4712  while ((tconn = fr_dlist_head(&trunk->closed))) fr_connection_signal_halt(tconn->pub.conn);
4713  while ((tconn = fr_dlist_head(&trunk->draining))) fr_connection_signal_halt(tconn->pub.conn);
4714  while ((tconn = fr_dlist_head(&trunk->draining_to_free))) fr_connection_signal_halt(tconn->pub.conn);
4715 
4716  /*
4717  * Process any deferred connection frees
4718  */
4719  while ((tconn = fr_dlist_head(&trunk->to_free))) talloc_free(fr_dlist_remove(&trunk->to_free, tconn));
4720 
4721  /*
4722  * Free any requests left in the backlog
4723  */
4724  while ((treq = fr_heap_peek(trunk->backlog))) trunk_request_enter_failed(treq);
4725 
4726  /*
4727  * Free any requests in our request cache
4728  */
4729  while ((treq = fr_dlist_head(&trunk->free_requests))) talloc_free(treq);
4730 
4731  /*
4732  * Free any entries in the watch lists
4733  */
4734  for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
4735  while ((watch = fr_dlist_pop_head(&trunk->watch[i]))) talloc_free(watch);
4736  }
4737 
4738  return 0;
4739 }
4740 
4741 /** Allocate a new collection of connections
4742  *
4743  * This function should be called first to allocate a new trunk connection.
4744  *
4745  * After the trunk has been allocated, #fr_trunk_request_alloc and
4746  * #fr_trunk_request_enqueue should be used to allocate memory for trunk
4747  * requests, and pass a preq (protocol request) to the trunk for
4748  * processing.
4749  *
4750  * The trunk will then asynchronously process the request, writing the result
4751  * to a specified rctx. See #fr_trunk_request_enqueue for more details.
4752  *
4753  * @note Trunks may not be shared between multiple threads under any circumstances.
4754  *
4755  * @param[in] ctx To use for any memory allocations. Must be thread local.
4756  * @param[in] el to use for I/O and timer events.
4757  * @param[in] funcs Callback functions.
4758  * @param[in] conf Common user configurable parameters.
4759  * @param[in] log_prefix To prepend to global messages.
4760  * @param[in] uctx User data to pass to the alloc function.
4761  * @param[in] delay_start If true, then we will not spawn any connections
4762  * until the first request is enqueued.
4763  * @return
4764  * - New trunk handle on success.
4765  * - NULL on error.
4766  */
4768  fr_trunk_io_funcs_t const *funcs, fr_trunk_conf_t const *conf,
4769  char const *log_prefix, void const *uctx, bool delay_start)
4770 {
4771  fr_trunk_t *trunk;
4772  size_t i;
4773 
4774  /*
4775  * Check we have the functions we need
4776  */
4777  if (!fr_cond_assert(funcs->connection_alloc)) return NULL;
4778 
4779  MEM(trunk = talloc_zero(ctx, fr_trunk_t));
4780  trunk->el = el;
4781  trunk->log_prefix = talloc_strdup(trunk, log_prefix);
4782 
4783  memcpy(&trunk->funcs, funcs, sizeof(trunk->funcs));
4784  if (!trunk->funcs.connection_prioritise) {
4786  }
4788 
4789  memcpy(&trunk->conf, conf, sizeof(trunk->conf));
4790 
4791  memcpy(&trunk->uctx, &uctx, sizeof(trunk->uctx));
4792  talloc_set_destructor(trunk, _trunk_free);
4793 
4794  /*
4795  * Unused request list...
4796  */
4798 
4799  /*
4800  * Request backlog queue
4801  */
4803  fr_trunk_request_t, heap_id, 0));
4804 
4805  /*
4806  * Connection queues and trees
4807  */
4809  fr_trunk_connection_t, heap_id, 0));
4819 
4820  /*
4821  * Watch lists
4822  */
4823  for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
4825  }
4826 
4827  DEBUG4("Trunk allocated %p", trunk);
4828 
4829  if (!delay_start) {
4830  if (fr_trunk_start(trunk) < 0) {
4831  talloc_free(trunk);
4832  return NULL;
4833  }
4834  }
4835 
4836  return trunk;
4837 }
4838 
4839 #ifndef TALLOC_GET_TYPE_ABORT_NOOP
4840 /** Verify a trunk
4841  *
4842  * A trunk has some number of connections, which each have some number of requests. The connections and
4843  * requests are in differing kinds of containers depending on their state and how they are used, and may
4844  * have fields that can only be validated by comparison with a parent. We had planned on passing a "context"
4845  * down with the ancestral values, but that breaks the foo_verify() API. Each foo_verify() will only verify the
4846  * foo's children.
4847  */
4848 void fr_trunk_verify(char const *file, int line, fr_trunk_t *trunk)
4849 {
4850  fr_fatal_assert_msg(trunk, "CONSISTENCY CHECK FAILED %s[%i]: fr_trunk_t pointer was NULL", file, line);
4851  (void) talloc_get_type_abort(trunk, fr_trunk_t);
4852 
4853  for (size_t i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
4854  _fr_dlist_verify(file, line, &trunk->watch[i]);
4855  }
4856 
4857 #define IO_FUNC_VERIFY(_func) \
4858  fr_fatal_assert_msg(trunk->funcs._func, "CONSISTENCY_CHECK_FAILED %s[%i}: " #_func " was NULL", file, line)
4859 
4860  /*
4861  * Only a few of the function pointers *must* be non-NULL..
4862  */
4863  IO_FUNC_VERIFY(connection_alloc);
4864  IO_FUNC_VERIFY(connection_prioritise);
4866 
4867 #define TRUNK_TCONN_CHECKS(_tconn, _state) \
4868 do { \
4869  fr_fatal_assert_msg(trunk == _tconn->pub.trunk, \
4870  "CONSISTENCY_CHECK_FAILED %s[%i}: connection-trunk mismatch", file, line); \
4871  fr_fatal_assert_msg(_state == _tconn->pub.state, \
4872  "CONSISTENCY_CHECK_FAILED %s[%i}: connection-state mismatch", file, line); \
4873 } while (0)
4874 
4875 #define TCONN_DLIST_VERIFY(_dlist, _state) \
4876 do { \
4877  _fr_dlist_verify(file, line, &(trunk->_dlist)); \
4878  fr_dlist_foreach(&(trunk->_dlist), fr_trunk_connection_t, tconn) { \
4879  fr_trunk_connection_verify(file, line, tconn); \
4880  TRUNK_TCONN_CHECKS(tconn, _state); \
4881  } \
4882 } while (0)
4883 
4884 #define TCONN_MINMAX_HEAP_VERIFY(_heap, _state) \
4885 do {\
4886  fr_minmax_heap_verify(file, line, trunk->_heap); \
4887  fr_minmax_heap_foreach(trunk->_heap, fr_trunk_connection_t, tconn) { \
4888  fr_trunk_connection_verify(file, line, tconn); \
4889  TRUNK_TCONN_CHECKS(tconn, _state); \
4890  }} \
4891 } while (0)
4892 
4893  fr_dlist_verify(&(trunk->free_requests));
4894  FR_HEAP_VERIFY(trunk->backlog);
4895 
4902  /* TCONN_DLIST_VERIFY(failed, ???); */
4907 }
4908 
4910 {
4911  fr_fatal_assert_msg(tconn, "CONSISTENCY CHECK FAILED %s[%i]: fr_trunk_connection_t pointer was NULL", file, line);
4912  (void) talloc_get_type_abort(tconn, fr_trunk_connection_t);
4913 
4914  (void) talloc_get_type_abort(tconn->pub.trunk, fr_trunk_t);
4915 
4916  /*
4917  * shouldn't be both in heap and on list--but it doesn't look like moves
4918  * to active heap wipe the dlist pointers.
4919  */
4920 
4921 #define TCONN_TREQ_CHECKS(_treq, _state) \
4922 do { \
4923  fr_fatal_assert_msg(tconn == _treq->pub.tconn, \
4924  "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-tconn mismatch", file, line); \
4925  fr_fatal_assert_msg(tconn->pub.trunk == _treq->pub.trunk, \
4926  "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-trunk mismatch", file, line); \
4927  fr_fatal_assert_msg(_state == _treq->pub.state, \
4928  "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-state mismatch", file, line); \
4929 } while (0)
4930 
4931 #define TREQ_DLIST_VERIFY(_dlist, _state) \
4932 do { \
4933  _fr_dlist_verify(file, line, &(tconn->_dlist)); \
4934  fr_dlist_foreach(&(tconn->_dlist), fr_trunk_request_t, treq) { \
4935  fr_trunk_request_verify(file, line, treq); \
4936  TCONN_TREQ_CHECKS(treq, _state); \
4937  } \
4938 } while (0)
4939 
4940 #define TREQ_HEAP_VERIFY(_heap, _state) \
4941 do { \
4942  fr_heap_iter_t _iter; \
4943  fr_heap_verify(file, line, tconn->_heap); \
4944  for (fr_trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
4945  treq; \
4946  treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
4947  fr_trunk_request_verify(file, line, treq); \
4948  TCONN_TREQ_CHECKS(treq, _state); \
4949  } \
4950 } while (0)
4951 
4952 #define TREQ_OPTION_VERIFY(_option, _state) \
4953 do { \
4954  if (tconn->_option) { \
4955  fr_trunk_request_verify(file, line, tconn->_option); \
4956  TCONN_TREQ_CHECKS(tconn->_option, _state); \
4957  } \
4958 } while (0)
4959 
4960  /* verify associated requests */
4967 }
4968 
4970 {
4971  fr_fatal_assert_msg(treq, "CONSISTENCY CHECK FAILED %s[%i]: fr_trunk_request_t pointer was NULL", file, line);
4972  (void) talloc_get_type_abort(treq, fr_trunk_request_t);
4973 
4974 #ifdef WITH_VERIFY_PTR
4975  if (treq->pub.request) request_verify(file, line, treq->pub.request);
4976 #endif
4977 }
4978 
4979 
4980 bool fr_trunk_search(fr_trunk_t *trunk, void *ptr)
4981 {
4982 #define TCONN_DLIST_SEARCH(_dlist) \
4983 do { \
4984  fr_dlist_foreach(&(trunk->_dlist), fr_trunk_connection_t, tconn) { \
4985  if (ptr == tconn) { \
4986  fr_fprintf(stderr, "fr_trunk_search: tconn %p on " #_dlist "\n", ptr); \
4987  return true; \
4988  } \
4989  if (fr_trunk_connection_search(tconn, ptr)) { \
4990  fr_fprintf(stderr, " in tconn %p on " #_dlist "\n", tconn); \
4991  return true; \
4992  } \
4993  } \
4994 } while (0)
4995 
4996 #define TCONN_MINMAX_HEAP_SEARCH(_heap) \
4997 do { \
4998  fr_minmax_heap_foreach(trunk->_heap, fr_trunk_connection_t, tconn) { \
4999  if (ptr == tconn) { \
5000  fr_fprintf(stderr, "fr_trunk_search: tconn %p on " #_heap "\n", ptr); \
5001  return true; \
5002  } \
5003  if (fr_trunk_connection_search(tconn, ptr)) { \
5004  fr_fprintf(stderr, " on tconn %p on " #_heap "\n", tconn); \
5005  return true; \
5006  } \
5007  }}\
5008 } while (0)
5009 
5011  TCONN_DLIST_SEARCH(connecting);
5012  TCONN_MINMAX_HEAP_SEARCH(active);
5013  TCONN_DLIST_SEARCH(full);
5014  TCONN_DLIST_SEARCH(inactive);
5015  TCONN_DLIST_SEARCH(inactive_draining);
5016  TCONN_DLIST_SEARCH(failed);
5017  TCONN_DLIST_SEARCH(closed);
5018  TCONN_DLIST_SEARCH(draining);
5019  TCONN_DLIST_SEARCH(draining_to_free);
5020  TCONN_DLIST_SEARCH(to_free);
5021 
5022  return false;
5023 }
5024 
5026 {
5027 #define TREQ_DLIST_SEARCH(_dlist) \
5028 do { \
5029  fr_dlist_foreach(&(tconn->_dlist), fr_trunk_request_t, treq) { \
5030  if (ptr == treq) { \
5031  fr_fprintf(stderr, "fr_trunk_search: treq %p on " #_dlist "\n", ptr); \
5032  return true; \
5033  } \
5034  if (fr_trunk_request_search(treq, ptr)) { \
5035  fr_fprintf(stderr, "fr_trunk_search: preq %p found on " #_dlist, ptr); \
5036  return true; \
5037  } \
5038  } \
5039 } while (0)
5040 
5041 #define TREQ_HEAP_SEARCH(_heap) \
5042 do { \
5043  fr_heap_iter_t _iter; \
5044  for (fr_trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5045  treq; \
5046  treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5047  if (ptr == treq) { \
5048  fr_fprintf(stderr, "fr_trunk_search: treq %p in " #_heap "\n", ptr); \
5049  return true; \
5050  } \
5051  if (fr_trunk_request_search(treq, ptr)) { \
5052  fr_fprintf(stderr, "fr_trunk_search: preq %p found in " #_heap, ptr); \
5053  return true; \
5054  } \
5055  } \
5056 } while (0)
5057 
5058 #define TREQ_OPTION_SEARCH(_option) \
5059 do { \
5060  if (tconn->_option) { \
5061  if (ptr == tconn->_option) { \
5062  fr_fprintf(stderr, "fr_trunk_search: treq %p is " #_option "\n", ptr); \
5063  return true; \
5064  } \
5065  if (fr_trunk_request_search(tconn->_option, ptr)) { \
5066  fr_fprintf(stderr, "fr_trunk_search: preq %p found in " #_option, ptr); \
5067  return true; \
5068  } \
5069  } \
5070 } while (0)
5071 
5072  /* search associated requests */
5073  TREQ_HEAP_SEARCH(pending);
5074  TREQ_DLIST_SEARCH(sent);
5075  TREQ_DLIST_SEARCH(cancel);
5076  TREQ_DLIST_SEARCH(cancel_sent);
5077  TREQ_OPTION_SEARCH(partial);
5078  TREQ_OPTION_SEARCH(cancel_partial);
5079 
5080  return false;
5081 }
5082 
5084 {
5085  return treq->pub.preq == ptr;
5086 }
5087 #endif
int const char * file
Definition: acutest.h:702
int const char int line
Definition: acutest.h:702
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request)
#define L(_str)
Helper for initialising arrays of string literals.
Definition: build.h:207
#define NDEBUG_UNUSED
Definition: build.h:324
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition: build.h:320
#define unlikely(_x)
Definition: build.h:378
#define UNUSED
Definition: build.h:313
#define NUM_ELEMENTS(_t)
Definition: build.h:335
#define CONF_PARSER_TERMINATOR
Definition: cf_parse.h:626
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition: cf_parse.h:268
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
Definition: cf_parse.h:310
#define FR_CONF_OFFSET_SUBSECTION(_name, _flags, _struct, _field, _subcs)
conf_parser_t which populates a sub-struct using a CONF_SECTION
Definition: cf_parse.h:297
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Definition: cf_parse.h:400
Defines a CONF_PAIR to C data type mapping.
Definition: cf_parse.h:563
fr_connection_state_t
Definition: connection.h:45
@ FR_CONNECTION_STATE_CLOSED
Connection has been closed.
Definition: connection.h:55
@ FR_CONNECTION_STATE_HALTED
The connection is in a halted stat.
Definition: connection.h:46
@ FR_CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
Definition: connection.h:50
@ FR_CONNECTION_STATE_FAILED
Connection has failed.
Definition: connection.h:54
@ FR_CONNECTION_STATE_INIT
Init state, sets up connection.
Definition: connection.h:49
@ FR_CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
Definition: connection.h:53
@ FR_CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
Definition: connection.h:52
fr_connection_reason_t
Definition: connection.h:83
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:137
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition: debug.h:208
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:154
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition: debug.h:182
#define DEBUG(fmt,...)
Definition: dhcpclient.c:39
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition: dlist.h:260
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition: dlist.h:672
static void _fr_dlist_verify(char const *file, int line, fr_dlist_head_t const *list_head)
Check all items in the list are valid.
Definition: dlist.h:735
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Definition: dlist.h:555
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition: dlist.h:163
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition: dlist.h:939
static void * fr_dlist_tail(fr_dlist_head_t const *list_head)
Return the TAIL item of a list or NULL if the list is empty.
Definition: dlist.h:531
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition: dlist.h:486
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition: dlist.h:378
#define fr_dlist_verify(_head)
Definition: dlist.h:755
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition: dlist.h:638
static void * fr_dlist_prev(fr_dlist_head_t const *list_head, void const *ptr)
Get the previous item in a list.
Definition: dlist.h:588
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition: dlist.h:275
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
Definition: dlist.h:338
Head of a doubly linked list.
Definition: dlist.h:51
Entry in a doubly linked list.
Definition: dlist.h:41
#define fr_event_timer_in(...)
Definition: event.h:255
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition: heap.c:146
unsigned int fr_heap_index_t
Definition: heap.h:80
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition: heap.h:136
#define FR_HEAP_VERIFY(_heap)
Definition: heap.h:212
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition: heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition: heap.h:115
The main heap structure.
Definition: heap.h:66
#define PERROR(_fmt,...)
Definition: log.h:228
#define DEBUG3(_fmt,...)
Definition: log.h:266
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition: log.h:528
#define RDEBUG3(fmt,...)
Definition: log.h:343
#define RWARN(fmt,...)
Definition: log.h:297
#define DEBUG4(_fmt,...)
Definition: log.h:267
#define RATE_LIMIT_LOCAL_ROPTIONAL(_entry, _l_request, _l_global, _fmt,...)
Rate limit messages using a local limiting entry.
Definition: log.h:606
Track when a log message was last repeated.
Definition: log.h:547
talloc_free(reap)
int fr_event_timer_delete(fr_event_timer_t const **ev_p)
Delete a timer event from the event list.
Definition: event.c:1604
Stores all information relating to an event list.
Definition: event.c:411
A timer event.
Definition: event.c:102
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition: log.c:599
fr_log_type_t
Definition: log.h:54
#define ROUND_UP_DIV(_x, _y)
Get the ceiling value of integer division.
Definition: math.h:153
unsigned short uint16_t
Definition: merged_model.c:31
unsigned int uint32_t
Definition: merged_model.c:33
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
Definition: minmax_heap.c:424
void * fr_minmax_heap_iter_next(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Get the next entry in a minmax heap.
Definition: minmax_heap.c:573
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
Definition: minmax_heap.c:449
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
Definition: minmax_heap.c:466
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
Definition: minmax_heap.c:533
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
Definition: minmax_heap.c:486
void * fr_minmax_heap_iter_init(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Iterate over entries in a minmax heap.
Definition: minmax_heap.c:551
unsigned int fr_minmax_heap_iter_t
Definition: minmax_heap.h:38
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
Definition: minmax_heap.h:85
int8_t fr_pointer_cmp(void const *a, void const *b)
Compares two pointers.
Definition: misc.c:408
static bool done
Definition: radclient.c:80
#define RDEBUG(fmt,...)
Definition: radclient.h:53
#define DEBUG2(fmt,...)
Definition: radclient.h:43
#define WARN(fmt,...)
Definition: radclient.h:47
#define INFO(fmt,...)
Definition: radict.c:54
static fr_event_list_t * events
Definition: radsniff.c:59
static rs_t * conf
Definition: radsniff.c:53
static int8_t request_prioritise(void const *one, void const *two)
static size_t min(size_t x, size_t y)
Definition: sbuff.c:135
void fr_connection_signals_pause(fr_connection_t *conn)
Pause processing of deferred signals.
Definition: connection.c:310
int fr_connection_del_watch_pre(fr_connection_t *conn, fr_connection_state_t state, fr_connection_watch_t watch)
Remove a watch function from a pre list.
Definition: connection.c:454
fr_connection_watch_entry_t * fr_connection_add_watch_post(fr_connection_t *conn, fr_connection_state_t state, fr_connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed after a state function has been called.
Definition: connection.c:531
void fr_connection_signal_halt(fr_connection_t *conn)
Shuts down a connection ungracefully.
Definition: connection.c:1290
void fr_connection_signals_resume(fr_connection_t *conn)
Resume processing of deferred signals.
Definition: connection.c:319
void fr_connection_signal_init(fr_connection_t *conn)
Asynchronously signal a halted connection to start.
Definition: connection.c:1106
fr_connection_watch_entry_t * fr_connection_add_watch_pre(fr_connection_t *conn, fr_connection_state_t state, fr_connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed before a state function has been called.
Definition: connection.c:509
void fr_connection_signal_shutdown(fr_connection_t *conn)
Shuts down a connection gracefully.
Definition: connection.c:1227
int fr_connection_del_watch_post(fr_connection_t *conn, fr_connection_state_t state, fr_connection_watch_t watch)
Remove a watch function from a post list.
Definition: connection.c:471
void fr_connection_signal_reconnect(fr_connection_t *conn, fr_connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
Definition: connection.c:1166
static fr_time_t test_time_base
Definition: slab_tests.c:42
return count
Definition: module.c:175
if(!subtype_vp) goto fail
fr_assert(0)
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
init
Enter the EAP-IDENTITY state.
Definition: state_machine.c:90
fr_time_t test_time
Definition: state_test.c:3
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
@ memory_order_relaxed
Definition: stdatomic.h:127
#define atomic_fetch_add_explicit(object, operand, order)
Definition: stdatomic.h:302
#define ATOMIC_VAR_INIT(value)
Definition: stdatomic.h:88
Definition: log.h:96
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition: table.h:253
An element in a table indexed by bit position.
Definition: table.h:79
An element in an arbitrarily ordered array of name to num mappings.
Definition: table.h:53
#define talloc_get_type_abort_const
Definition: talloc.h:270
#define talloc_pooled_object(_ctx, _type, _num_subobjects, _total_subobjects_size)
Definition: talloc.h:168
#define fr_time_gteq(_a, _b)
Definition: time.h:238
#define fr_time_delta_wrap(_time)
Definition: time.h:152
#define fr_time_wrap(_time)
Definition: time.h:145
#define fr_time_lteq(_a, _b)
Definition: time.h:240
#define fr_time_delta_ispos(_a)
Definition: time.h:288
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition: time.h:196
#define fr_time_gt(_a, _b)
Definition: time.h:237
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition: time.h:229
#define fr_time_lt(_a, _b)
Definition: time.h:239
"server local" time.
Definition: time.h:69
fr_dlist_head_t inactive_draining
Connections which have been signalled to be inactive by the API client, which the trunk manager is dr...
Definition: trunk.c:226
static atomic_uint_fast64_t request_counter
Definition: trunk.c:52
fr_dlist_head_t free_requests
Requests in the unassigned state.
Definition: trunk.c:200
#define CONN_REORDER(_tconn)
Reorder the connections in the active heap.
Definition: trunk.c:757
static void _trunk_connection_on_connecting(UNUSED fr_connection_t *conn, UNUSED fr_connection_state_t prev, UNUSED fr_connection_state_t state, void *uctx)
Connection transitioned to the connecting state.
Definition: trunk.c:3238
int fr_trunk_start(fr_trunk_t *trunk)
Start the trunk running.
Definition: trunk.c:4596
static void trunk_request_enter_unassigned(fr_trunk_request_t *treq)
Transition a request to the unassigned state, in preparation for re-assignment.
Definition: trunk.c:1033
#define REQUEST_EXTRACT_SENT(_treq)
Remove the current request from the sent list.
Definition: trunk.c:732
fr_dlist_head_t inactive
Connections which have been signalled to be inactive by the API client.
Definition: trunk.c:223
uint64_t tconn_id
If the treq was associated with a connection the connection ID.
Definition: trunk.c:83
static fr_table_num_indexed_bit_pos_t const fr_trunk_req_trigger_names[]
Map request states to trigger names.
Definition: trunk.c:331
void fr_trunk_request_free(fr_trunk_request_t **treq_to_free)
If the trunk request is freed then update the target requests.
Definition: trunk.c:2217
void fr_trunk_connection_manage_stop(fr_trunk_t *trunk)
Stop the trunk from opening and closing connections in response to load.
Definition: trunk.c:4641
static void trunk_connection_close_if_empty(fr_trunk_t *trunk, fr_dlist_head_t *head)
Close connections in a particular connection list if they have no requests associated with them.
Definition: trunk.c:3930
void * uctx
User data to pass to the function.
Definition: trunk.c:183
void fr_trunk_request_signal_cancel_sent(fr_trunk_request_t *treq)
Signal that a remote server has been notified of the cancellation.
Definition: trunk.c:2155
static fr_table_num_ordered_t const fr_trunk_connection_states[]
Definition: trunk.c:389
conf_parser_t const fr_trunk_config[]
Config parser definitions to populate a fr_trunk_conf_t.
Definition: trunk.c:306
#define IN_HANDLER(_trunk)
Definition: trunk.c:692
static int8_t _trunk_request_prioritise(void const *a, void const *b)
Compare two protocol requests.
Definition: trunk.c:919
fr_dlist_head_t connecting
Connections which are not yet in the open state.
Definition: trunk.c:216
static void trunk_request_enter_cancel_complete(fr_trunk_request_t *treq)
Cancellation was acked, the request is complete, free it.
Definition: trunk.c:1428
fr_heap_index_t heap_id
Used to track the request conn->pending heap.
Definition: trunk.c:104
struct fr_trunk_pub_s pub
Public fields in the trunk connection.
Definition: trunk.c:190
void fr_trunk_reconnect(fr_trunk_t *trunk, int states, fr_connection_reason_t reason)
Force the trunk to re-establish its connections.
Definition: trunk.c:4557
static int _trunk_free(fr_trunk_t *trunk)
Free a trunk, gracefully closing all connections.
Definition: trunk.c:4683
#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq)
Remove the current request from the cancel_partial slot.
Definition: trunk.c:742
fr_dlist_head_t failed
Connections that'll be reconnected shortly.
Definition: trunk.c:230
void fr_trunk_connection_signal_active(fr_trunk_connection_t *tconn)
Signal a trunk connection is no longer full.
Definition: trunk.c:3833
fr_dlist_t entry
Used to track the connection in the connecting, full and failed lists.
Definition: trunk.c:135
static int _trunk_connection_free(fr_trunk_connection_t *tconn)
Free a connection.
Definition: trunk.c:3554
void fr_trunk_request_signal_cancel(fr_trunk_request_t *treq)
Cancel a trunk request.
Definition: trunk.c:2047
#define TREQ_OPTION_SEARCH(_option)
static void trunk_request_enter_sent(fr_trunk_request_t *treq)
Transition a request to the sent state, indicating that it's been sent in its entirety.
Definition: trunk.c:1231
fr_trunk_connection_t * tconn
The request was associated with.
Definition: trunk.c:80
#define DO_REQUEST_FREE(_treq)
Call the free callback (if set)
Definition: trunk.c:584
fr_trunk_enqueue_t fr_trunk_request_enqueue_on_conn(fr_trunk_request_t **treq_out, fr_trunk_connection_t *tconn, request_t *request, void *preq, void *rctx, bool ignore_limits)
Enqueue additional requests on a specific connection.
Definition: trunk.c:2629
#define REQUEST_EXTRACT_BACKLOG(_treq)
Remove the current request from the backlog.
Definition: trunk.c:703
fr_heap_t * backlog
The request backlog.
Definition: trunk.c:203
struct fr_trunk_connection_pub_s pub
Public fields in the trunk connection.
Definition: trunk.c:128
#define CONN_STATE_TRANSITION(_new, _log)
Definition: trunk.c:427
fr_trunk_request_t * fr_trunk_request_alloc(fr_trunk_t *trunk, request_t *request)
(Pre-)Allocate a new trunk request
Definition: trunk.c:2369
fr_minmax_heap_t * active
Connections which can service requests.
Definition: trunk.c:218
static void _trunk_connection_on_closed(UNUSED fr_connection_t *conn, UNUSED fr_connection_state_t prev, UNUSED fr_connection_state_t state, void *uctx)
Connection failed after it was connected.
Definition: trunk.c:3383
#define TCONN_DLIST_VERIFY(_dlist, _state)
#define IO_FUNC_VERIFY(_func)
fr_dlist_head_t closed
Connections that have closed.
Definition: trunk.c:232
#define DO_REQUEST_COMPLETE(_treq)
Call the complete callback (if set)
Definition: trunk.c:545
fr_trunk_watch_entry_t * fr_trunk_add_watch(fr_trunk_t *trunk, fr_trunk_state_t state, fr_trunk_watch_t watch, bool oneshot, void const *uctx)
Add a watch entry to the trunk state list.
Definition: trunk.c:846
static conf_parser_t const fr_trunk_config_request[]
Definition: trunk.c:290
bool freeing
Trunk is being freed, don't spawn new connections or re-enqueue.
Definition: trunk.c:278
static void _trunk_connection_on_shutdown(UNUSED fr_connection_t *conn, UNUSED fr_connection_state_t prev, UNUSED fr_connection_state_t state, void *uctx)
Connection transitioned to the shutdown state.
Definition: trunk.c:3282
static fr_table_num_ordered_t const fr_trunk_cancellation_reasons[]
Definition: trunk.c:403
static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out, fr_trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify)
Update timestamps for when we last had a transition from above target to below target or vice versa.
Definition: trunk.c:4403
bool oneshot
Remove the function after it's called once.
Definition: trunk.c:181
static conf_parser_t const fr_trunk_config_connection[]
Definition: trunk.c:298
#define OVER_MAX_CHECK
static void trunk_connection_auto_unfull(fr_trunk_connection_t *tconn)
Automatically mark a connection as active or reconnect it.
Definition: trunk.c:2826
int fr_trunk_connection_pop_cancellation(fr_trunk_request_t **treq_out, fr_trunk_connection_t *tconn)
Pop a cancellation request off a connection's cancellation queue.
Definition: trunk.c:3708
void fr_trunk_request_signal_sent(fr_trunk_request_t *treq)
Signal that the request was written to a connection successfully.
Definition: trunk.c:1973
fr_trunk_enqueue_t fr_trunk_request_enqueue(fr_trunk_request_t **treq_out, fr_trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
Definition: trunk.c:2481
#define DO_REQUEST_CANCEL(_treq, _reason)
Call the cancel callback if set.
Definition: trunk.c:506
static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
Order connections by queue depth.
Definition: trunk.c:4666
static fr_trunk_enqueue_t trunk_request_enqueue_existing(fr_trunk_request_t *treq)
Enqueue a request which has never been assigned to a connection or was previously cancelled.
Definition: trunk.c:1602
static int _trunk_request_free(fr_trunk_request_t *treq)
Actually free the trunk request.
Definition: trunk.c:2336
int fr_trunk_del_watch(fr_trunk_t *trunk, fr_trunk_state_t state, fr_trunk_watch_t watch)
Remove a watch function from a trunk state list.
Definition: trunk.c:812
#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state)
uint32_t fr_trunk_request_count_by_connection(fr_trunk_connection_t const *tconn, int req_state)
Return the count number of requests associated with a trunk connection.
Definition: trunk.c:2767
#define TCONN_MINMAX_HEAP_SEARCH(_heap)
struct fr_trunk_watch_entry_s fr_trunk_watch_entry_t
An entry in a trunk watch function list.
fr_heap_index_t heap_id
Used to track the connection in the connected heap.
Definition: trunk.c:132
#define DO_REQUEST_CONN_RELEASE(_treq)
Call the "conn_release" callback (if set)
Definition: trunk.c:527
#define TREQ_DLIST_SEARCH(_dlist)
#define REQUEST_EXTRACT_CANCEL(_treq)
Remove the current request from the cancel list.
Definition: trunk.c:737
bool fr_trunk_connection_search(fr_trunk_connection_t *tconn, void *ptr)
Definition: trunk.c:5025
static void _trunk_connection_on_failed(fr_connection_t *conn, fr_connection_state_t prev, fr_connection_state_t state, void *uctx)
Connection failed.
Definition: trunk.c:3454
#define REQUEST_BAD_STATE_TRANSITION(_new)
Definition: trunk.c:472
static void trunk_backlog_drain(fr_trunk_t *trunk)
Drain the backlog of as many requests as possible.
Definition: trunk.c:4502
fr_rate_limit_t limit_max_requests_alloc_log
Rate limit on "Refusing to alloc requests - Limit of * requests reached".
Definition: trunk.c:270
bool bound_to_conn
Fail the request if there's an attempt to re-enqueue it.
Definition: trunk.c:113
static bool trunk_connection_is_full(fr_trunk_connection_t *tconn)
Return whether a trunk connection should currently be considered full.
Definition: trunk.c:2808
fr_trunk_request_state_t from
What state we transitioned from.
Definition: trunk.c:77
static void trunk_request_remove_from_conn(fr_trunk_request_t *treq)
Remove a request from all connection lists.
Definition: trunk.c:943
#define TCONN_DLIST_SEARCH(_dlist)
fr_trunk_connection_event_t events
The current events we expect to be notified on.
Definition: trunk.c:141
static size_t fr_trunk_states_len
Definition: trunk.c:387
static int trunk_connection_spawn(fr_trunk_t *trunk, fr_time_t now)
Attempt to spawn a new connection.
Definition: trunk.c:3622
#define IS_SERVICEABLE(_tconn)
Definition: trunk.c:697
static void trunk_rebalance(fr_trunk_t *trunk)
Rebalance connections across active trunk members when a new connection becomes active.
Definition: trunk.c:3972
fr_dlist_t entry
Entry in the linked list.
Definition: trunk.c:76
static void trunk_request_enter_complete(fr_trunk_request_t *treq)
Request completed successfully, inform the API client and free the request.
Definition: trunk.c:1459
#define IS_PROCESSING(_tconn)
Definition: trunk.c:698
#define RECONNECT_BY_STATE(_state, _list)
uint64_t fr_trunk_connection_requests_requeue(fr_trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Move requests off of a connection and requeue elsewhere.
Definition: trunk.c:1933
fr_dlist_head_t cancel
Requests in the cancel state.
Definition: trunk.c:153
uint16_t fr_trunk_connection_count_by_state(fr_trunk_t *trunk, int conn_state)
Return the count number of connections in the specified states.
Definition: trunk.c:2743
fr_trunk_io_funcs_t funcs
I/O functions.
Definition: trunk.c:250
fr_dlist_head_t cancel_sent
Sent cancellation request.
Definition: trunk.c:157
void fr_trunk_request_signal_complete(fr_trunk_request_t *treq)
Signal that a trunk request is complete.
Definition: trunk.c:1995
bool fr_trunk_request_search(fr_trunk_request_t *treq, void *ptr)
Definition: trunk.c:5083
fr_trunk_cancel_reason_t cancel_reason
Why this request was cancelled.
Definition: trunk.c:109
fr_event_timer_t const * lifetime_ev
Maximum time this connection can be open.
Definition: trunk.c:170
static fr_table_num_ordered_t const fr_trunk_connection_events[]
Definition: trunk.c:411
uint64_t fr_trunk_request_count_by_state(fr_trunk_t *trunk, int conn_state, int req_state)
Return a count of requests on a connection in a specific state.
Definition: trunk.c:4354
static void trunk_connection_writable(fr_trunk_connection_t *tconn)
A connection is writable.
Definition: trunk.c:2849
fr_rate_limit_t limit_last_failure_log
Rate limit on "Refusing to enqueue requests - No active conns".
Definition: trunk.c:272
#define DO_REQUEST_MUX(_tconn)
Write one or more requests to a connection.
Definition: trunk.c:602
#define REQUEST_EXTRACT_PARTIAL(_treq)
Remove the current request from the partial slot.
Definition: trunk.c:723
fr_trunk_enqueue_t fr_trunk_request_requeue(fr_trunk_request_t *treq)
Re-enqueue a request on the same connection.
Definition: trunk.c:2568
static void _trunk_timer(fr_event_list_t *el, fr_time_t now, void *uctx)
Event to periodically call the connection management function.
Definition: trunk.c:4332
#define TRUNK_STATE_TRANSITION(_new)
Definition: trunk.c:866
void fr_trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line, fr_trunk_request_t const *treq)
Definition: trunk.c:2716
void * uctx
Uctx data to pass to alloc.
Definition: trunk.c:254
static void _trunk_connection_on_init(UNUSED fr_connection_t *conn, UNUSED fr_connection_state_t prev, UNUSED fr_connection_state_t state, void *uctx)
Connection transitioned to the the init state.
Definition: trunk.c:3203
static void trunk_connection_auto_full(fr_trunk_connection_t *tconn)
Automatically mark a connection as inactive.
Definition: trunk.c:2785
fr_trunk_request_t * partial
Partially written request.
Definition: trunk.c:149
static void trunk_connection_enter_active(fr_trunk_connection_t *tconn)
Transition a connection back to the active state.
Definition: trunk.c:3133
fr_event_timer_t const * manage_ev
Periodic connection management event.
Definition: trunk.c:264
static fr_table_num_ordered_t const fr_trunk_request_states[]
Definition: trunk.c:348
static void trunk_request_enter_cancel_partial(fr_trunk_request_t *treq)
Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot.
Definition: trunk.c:1348
static void trunk_connection_enter_draining(fr_trunk_connection_t *tconn)
Transition a connection to the draining state.
Definition: trunk.c:3067
void fr_trunk_verify(char const *file, int line, fr_trunk_t *trunk)
Verify a trunk.
Definition: trunk.c:4848
fr_dlist_head_t to_free
Connections we're done with and will free on the next call to trunk_manage.
Definition: trunk.c:241
#define IN_REQUEST_DEMUX(_trunk)
Definition: trunk.c:694
#define DO_REQUEST_FAIL(_treq, _prev_state)
Call the fail callback (if set)
Definition: trunk.c:564
fr_trunk_connection_state_t tconn_state
If the treq was associated with a connection the connection state at the time of the state transition...
Definition: trunk.c:85
#define DO_CONNECTION_ALLOC(_tconn)
Allocate a new connection.
Definition: trunk.c:653
void fr_trunk_request_signal_partial(fr_trunk_request_t *treq)
Signal a partial write.
Definition: trunk.c:1952
static void trunk_request_enter_backlog(fr_trunk_request_t *treq, bool new)
Transition a request to the backlog state, adding it to the backlog of the trunk.
Definition: trunk.c:1068
static uint64_t trunk_connection_requests_dequeue(fr_dlist_head_t *out, fr_trunk_connection_t *tconn, int states, uint64_t max)
Shift requests in the specified states onto new connections.
Definition: trunk.c:1659
char const * function
State change occurred in.
Definition: trunk.c:89
static void trunk_manage(fr_trunk_t *trunk, fr_time_t now)
Implements the algorithm we use to manage requests per connection levels.
Definition: trunk.c:4036
static void trunk_connection_enter_inactive(fr_trunk_connection_t *tconn)
Transition a connection to the inactive state.
Definition: trunk.c:3014
static size_t fr_trunk_req_trigger_names_len
Definition: trunk.c:345
fr_trunk_t * fr_trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, fr_trunk_io_funcs_t const *funcs, fr_trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start)
Allocate a new collection of connections.
Definition: trunk.c:4767
int fr_trunk_connection_pop_request(fr_trunk_request_t **treq_out, fr_trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
Definition: trunk.c:3756
#define DO_REQUEST_CANCEL_MUX(_tconn)
Write one or more cancellation requests to a connection.
Definition: trunk.c:635
static fr_trunk_enqueue_t trunk_request_check_enqueue(fr_trunk_connection_t **tconn_out, fr_trunk_t *trunk, request_t *request)
Check to see if a trunk request can be enqueued.
Definition: trunk.c:1525
fr_trunk_watch_entry_t * next_watcher
Watcher about to be run. Used to prevent nested watchers.
Definition: trunk.c:258
fr_dlist_head_t * log_head
To allow the log entry to remove itself on free.
Definition: trunk.c:75
void * in_handler
Which handler we're inside.
Definition: trunk.c:252
#define DO_REQUEST_DEMUX(_tconn)
Read one or more requests from a connection.
Definition: trunk.c:619
static void trunk_request_enter_cancel_sent(fr_trunk_request_t *treq)
Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list.
Definition: trunk.c:1383
fr_dlist_head_t full
Connections which have too many outstanding requests.
Definition: trunk.c:220
static size_t fr_trunk_conn_trigger_names_len
Definition: trunk.c:380
void fr_trunk_connection_signal_inactive(fr_trunk_connection_t *tconn)
Signal a trunk connection cannot accept more requests.
Definition: trunk.c:3810
static size_t fr_trunk_cancellation_reasons_len
Definition: trunk.c:409
#define REQUEST_EXTRACT_PENDING(_treq)
Remove the current request from the pending list.
Definition: trunk.c:713
void fr_trunk_connection_signal_readable(fr_trunk_connection_t *tconn)
Signal that a trunk connection is readable.
Definition: trunk.c:3794
#define DEQUEUE_ALL(_src_list, _state)
static void trunk_connection_readable(fr_trunk_connection_t *tconn)
A connection is readable.
Definition: trunk.c:2839
void fr_trunk_connection_callback_readable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O read function.
Definition: trunk.c:3887
fr_trunk_request_t * cancel_partial
Partially written cancellation request.
Definition: trunk.c:155
static void _trunk_connection_lifetime_expire(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Trigger a reconnection of the trunk connection.
Definition: trunk.c:3316
char const * log_prefix
What to prepend to messages.
Definition: trunk.c:194
int line
Line change occurred on.
Definition: trunk.c:90
#define DO_CONNECTION_NOTIFY(_tconn, _events)
Change what events the connection should be notified about.
Definition: trunk.c:675
#define TREQ_DLIST_VERIFY(_dlist, _state)
static void trunk_connection_enter_full(fr_trunk_connection_t *tconn)
Transition a connection to the full state.
Definition: trunk.c:2992
static fr_table_num_ordered_t const fr_trunk_states[]
Definition: trunk.c:382
uint64_t sent_count
The number of requests that have been sent using this connection.
Definition: trunk.c:163
fr_dlist_head_t draining_to_free
Connections that will be freed once all their requests are complete.
Definition: trunk.c:238
#define TREQ_HEAP_VERIFY(_heap, _state)
static int _state_log_entry_free(fr_trunk_request_state_log_t *slog)
Used for sanity checks to ensure all log entries have been freed.
Definition: trunk.c:2678
struct fr_trunk_request_pub_s pub
Public fields in the trunk request.
Definition: trunk.c:98
static uint64_t trunk_connection_requests_requeue(fr_trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Remove requests in specified states from a connection, attempting to distribute them to new connectio...
Definition: trunk.c:1770
void fr_trunk_request_signal_fail(fr_trunk_request_t *treq)
Signal that a trunk request failed.
Definition: trunk.c:2027
struct fr_trunk_request_s fr_trunk_request_t
Definition: trunk.c:33
static void trunk_request_enter_cancel(fr_trunk_request_t *treq, fr_trunk_cancel_reason_t reason)
Transition a request to the cancel state, placing it in a connection's cancellation list.
Definition: trunk.c:1301
bool started
Has the trunk been started.
Definition: trunk.c:281
void fr_trunk_request_verify(char const *file, int line, fr_trunk_request_t *treq)
Definition: trunk.c:4969
#define FR_TRUNK_REQUEST_STATE_LOG_MAX
The maximum number of state logs to record per request.
Definition: trunk.c:69
fr_dlist_t entry
Used to track the trunk request in the conn->sent or trunk->backlog request.
Definition: trunk.c:106
static size_t fr_trunk_connection_states_len
Definition: trunk.c:401
fr_trunk_request_state_t to
What state we transitioned to.
Definition: trunk.c:78
static void _trunk_connection_on_connected(UNUSED fr_connection_t *conn, UNUSED fr_connection_state_t prev, UNUSED fr_connection_state_t state, void *uctx)
Connection transitioned to the connected state.
Definition: trunk.c:3334
#define IN_REQUEST_CANCEL_MUX(_trunk)
Definition: trunk.c:695
static size_t fr_trunk_connection_events_len
Definition: trunk.c:417
uint64_t last_req_per_conn
The last request to connection ratio we calculated.
Definition: trunk.c:286
fr_time_t last_freed
Last time this request was freed.
Definition: trunk.c:111
void trunk_request_state_log_entry_add(char const *function, int line, fr_trunk_request_t *treq, fr_trunk_request_state_t new)
Definition: trunk.c:2685
uint64_t id
Trunk request ID.
Definition: trunk.c:102
static void trunk_request_enter_pending(fr_trunk_request_t *treq, fr_trunk_connection_t *tconn, bool new)
Transition a request to the pending state, adding it to the backlog of an active connection.
Definition: trunk.c:1130
fr_dlist_head_t log
State change log.
Definition: trunk.c:117
#define COUNT_BY_STATE(_state, _list)
#define TREQ_OPTION_VERIFY(_option, _state)
fr_dlist_t entry
List entry.
Definition: trunk.c:178
fr_trunk_watch_t func
Function to call when a trunk enters the state this list belongs to.
Definition: trunk.c:179
fr_dlist_head_t watch[FR_TRUNK_STATE_MAX]
To be called when trunk changes state.
Definition: trunk.c:256
#define CONN_BAD_STATE_TRANSITION(_new)
Definition: trunk.c:438
#define REQUEST_STATE_TRANSITION(_new)
Record a request state transition and log appropriate output.
Definition: trunk.c:461
void fr_trunk_connection_signal_reconnect(fr_trunk_connection_t *tconn, fr_connection_reason_t reason)
Signal a trunk connection is no longer viable.
Definition: trunk.c:3872
void fr_trunk_connection_signal_writable(fr_trunk_connection_t *tconn)
Signal that a trunk connection is writable.
Definition: trunk.c:3776
fr_heap_t * pending
Requests waiting to be sent.
Definition: trunk.c:147
void fr_trunk_connection_callback_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O write function.
Definition: trunk.c:3904
static fr_table_num_indexed_bit_pos_t const fr_trunk_conn_trigger_names[]
Map connection states to trigger names.
Definition: trunk.c:368
static void _trunk_connection_on_halted(UNUSED fr_connection_t *conn, UNUSED fr_connection_state_t prev, UNUSED fr_connection_state_t state, void *uctx)
Connection transitioned to the halted state.
Definition: trunk.c:3506
void fr_trunk_connection_verify(char const *file, int line, fr_trunk_connection_t *tconn)
Definition: trunk.c:4909
static void trunk_request_enter_failed(fr_trunk_request_t *treq)
Request failed, inform the API client and free the request.
Definition: trunk.c:1489
fr_dlist_head_t draining
Connections that will be freed once all their requests are complete, but can be reactivated.
Definition: trunk.c:235
bool fr_trunk_connection_in_state(fr_trunk_connection_t *tconn, int state)
Returns true if the trunk connection is in one of the specified states.
Definition: trunk.c:3920
static void trunk_connection_event_update(fr_trunk_connection_t *tconn)
Update the registrations for I/O events we're interested in.
Definition: trunk.c:2872
static void trunk_connection_enter_draining_to_free(fr_trunk_connection_t *tconn)
Transition a connection to the draining-to-reconnect state.
Definition: trunk.c:3099
fr_event_list_t * el
Event list used by this trunk and the connection.
Definition: trunk.c:196
bool managing_connections
Whether the trunk is allowed to manage (open/close) connections.
Definition: trunk.c:283
void fr_trunk_connection_manage_start(fr_trunk_t *trunk)
Allow the trunk to open and close connections in response to load.
Definition: trunk.c:4630
static void trunk_watch_call(fr_trunk_t *trunk, fr_dlist_head_t *list, fr_trunk_state_t state)
Call a list of watch functions associated with a state.
Definition: trunk.c:770
static size_t fr_trunk_request_states_len
Definition: trunk.c:362
void fr_trunk_request_signal_cancel_complete(fr_trunk_request_t *treq)
Signal that a remote server acked our cancellation.
Definition: trunk.c:2179
static void trunk_connection_enter_inactive_draining(fr_trunk_connection_t *tconn)
Transition a connection to the inactive-draining state.
Definition: trunk.c:3037
void fr_trunk_request_signal_cancel_partial(fr_trunk_request_t *treq)
Signal a partial cancel write.
Definition: trunk.c:2131
static void trunk_connection_remove(fr_trunk_connection_t *tconn)
Remove a trunk connection from whichever list it's currently in.
Definition: trunk.c:2936
bool fr_trunk_search(fr_trunk_t *trunk, void *ptr)
Definition: trunk.c:4980
bool enabled
Whether the watch entry is enabled.
Definition: trunk.c:182
#define IN_REQUEST_MUX(_trunk)
Definition: trunk.c:693
fr_dlist_head_t sent
Sent request.
Definition: trunk.c:151
#define TREQ_HEAP_SEARCH(_heap)
#define REQUEST_EXTRACT_CANCEL_SENT(_treq)
Remove the current request from the cancel sent list.
Definition: trunk.c:751
int fr_trunk_connection_manage_schedule(fr_trunk_t *trunk)
Schedule a trunk management event for the next time the event loop is executed.
Definition: trunk.c:4651
fr_trunk_conf_t conf
Trunk common configuration.
Definition: trunk.c:198
fr_dlist_head_t init
Connections which have not yet started connecting.
Definition: trunk.c:213
static void trunk_request_enter_partial(fr_trunk_request_t *treq)
Transition a request to the partial state, indicating that is has been partially sent.
Definition: trunk.c:1201
Associates request queues with a connection.
Definition: trunk.c:127
Wraps a normal request.
Definition: trunk.c:97
Trace state machine changes for a particular request.
Definition: trunk.c:74
Main trunk management handle.
Definition: trunk.c:189
An entry in a trunk watch function list.
Definition: trunk.c:177
fr_time_delta_t req_cleanup_delay
How long must a request in the unassigned (free) list not have been used for before it's cleaned up a...
Definition: trunk.h:244
fr_time_t _CONST last_connected
Last time a connection connected.
Definition: trunk.h:295
uint64_t _CONST req_alloc_reused
How many requests were reused.
Definition: trunk.h:311
fr_trunk_cancel_reason_t
Reasons for a request being cancelled.
Definition: trunk.h:55
@ FR_TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
Definition: trunk.h:57
@ FR_TRUNK_CANCEL_REASON_MOVE
Request cancelled because it's being moved.
Definition: trunk.h:58
@ FR_TRUNK_CANCEL_REASON_REQUEUE
A previously sent request is being requeued.
Definition: trunk.h:59
@ FR_TRUNK_CANCEL_REASON_NONE
Request has not been cancelled.
Definition: trunk.h:56
fr_trunk_connection_t *_CONST tconn
Connection this request belongs to.
Definition: trunk.h:332
uint32_t max_req_per_conn
Maximum connections per request.
Definition: trunk.h:229
fr_trunk_connection_state_t
Used for sanity checks and to track which list the connection is in.
Definition: trunk.h:87
@ FR_TRUNK_CONN_INACTIVE
Connection is inactive and can't accept any more requests.
Definition: trunk.h:96
@ FR_TRUNK_CONN_CLOSED
Connection was closed, either explicitly or due to failure.
Definition: trunk.h:94
@ FR_TRUNK_CONN_HALTED
Halted, ready to be freed.
Definition: trunk.h:88
@ FR_TRUNK_CONN_FULL
Connection is full and can't accept any more requests.
Definition: trunk.h:95
@ FR_TRUNK_CONN_CONNECTING
Connection is connecting.
Definition: trunk.h:90
@ FR_TRUNK_CONN_DRAINING_TO_FREE
Connection will be closed once it has no more outstanding requests.
Definition: trunk.h:103
@ FR_TRUNK_CONN_INACTIVE_DRAINING
Connection is inactive, can't accept any more requests, and will be closed once it has no more outsta...
Definition: trunk.h:97
@ FR_TRUNK_CONN_DRAINING
Connection will be closed once it has no more outstanding requests, if it's not reactivated.
Definition: trunk.h:101
@ FR_TRUNK_CONN_ACTIVE
Connection is connected and ready to service requests.
Definition: trunk.h:91
@ FR_TRUNK_CONN_INIT
In the initial state.
Definition: trunk.h:89
uint16_t start
How many connections to start.
Definition: trunk.h:216
void(* fr_trunk_watch_t)(fr_trunk_t *trunk, fr_trunk_state_t prev, fr_trunk_state_t state, void *uctx)
Receive a notification when a trunk enters a particular state.
Definition: trunk.h:703
unsigned req_pool_headers
How many chunk headers the talloc pool allocated with the treq should contain.
Definition: trunk.h:251
fr_time_t _CONST last_below_target
Last time average utilisation went below the target value.
Definition: trunk.h:286
fr_trunk_request_state_t
Used for sanity checks and to simplify freeing.
Definition: trunk.h:161
@ FR_TRUNK_REQUEST_STATE_BACKLOG
In the backlog.
Definition: trunk.h:167
@ FR_TRUNK_REQUEST_STATE_CANCEL
A request on a particular socket was cancel.
Definition: trunk.h:175
@ FR_TRUNK_REQUEST_STATE_SENT
Was written to a socket. Waiting for a response.
Definition: trunk.h:172
@ FR_TRUNK_REQUEST_STATE_PENDING
In the queue of a connection and is pending writing.
Definition: trunk.h:168
@ FR_TRUNK_REQUEST_STATE_UNASSIGNED
Transition state - Request currently not assigned to any connection.
Definition: trunk.h:165
@ FR_TRUNK_REQUEST_STATE_CANCEL_SENT
We've informed the remote server that the request has been cancelled.
Definition: trunk.h:176
@ FR_TRUNK_REQUEST_STATE_FAILED
The request failed.
Definition: trunk.h:174
@ FR_TRUNK_REQUEST_STATE_CANCEL_COMPLETE
Remote server has acknowledged our cancellation.
Definition: trunk.h:179
@ FR_TRUNK_REQUEST_STATE_CANCEL_PARTIAL
We partially wrote a cancellation request.
Definition: trunk.h:178
@ FR_TRUNK_REQUEST_STATE_PARTIAL
Some of the request was written to the socket, more of it should be written later.
Definition: trunk.h:170
@ FR_TRUNK_REQUEST_STATE_INIT
Initial state.
Definition: trunk.h:162
@ FR_TRUNK_REQUEST_STATE_COMPLETE
The request is complete.
Definition: trunk.h:173
fr_time_t _CONST last_closed
Last time the connection management function closed a connection.
Definition: trunk.h:292
fr_time_t _CONST last_above_target
Last time average utilisation went above the target value.
Definition: trunk.h:283
#define FR_TRUNK_CONN_ALL
All connection states.
Definition: trunk.h:111
bool backlog_on_failed_conn
Assign requests to the backlog when there are no available connections and the last connection event ...
Definition: trunk.h:266
uint64_t max_uses
The maximum time a connection can be used.
Definition: trunk.h:233
fr_trunk_request_state_t _CONST state
Which list the request is now located in.
Definition: trunk.h:328
fr_time_delta_t open_delay
How long we must be above target utilisation to spawn a new connection.
Definition: trunk.h:237
fr_heap_cmp_t request_prioritise
Ordering function for requests.
Definition: trunk.h:718
request_t *_CONST request
The request that we're writing the data on behalf of.
Definition: trunk.h:338
uint64_t _CONST req_alloc_new
How many requests we've allocated.
Definition: trunk.h:309
fr_trunk_state_t
Definition: trunk.h:62
@ FR_TRUNK_STATE_IDLE
Trunk has no connections.
Definition: trunk.h:63
@ FR_TRUNK_STATE_MAX
Definition: trunk.h:66
@ FR_TRUNK_STATE_ACTIVE
Trunk has active connections.
Definition: trunk.h:64
@ FR_TRUNK_STATE_PENDING
Trunk has connections, but none are active.
Definition: trunk.h:65
fr_trunk_connection_alloc_t connection_alloc
Allocate a new fr_connection_t.
Definition: trunk.h:712
fr_heap_cmp_t connection_prioritise
Ordering function for connections.
Definition: trunk.h:716
fr_time_delta_t close_delay
How long we must be below target utilisation to close an existing connection.
Definition: trunk.h:240
uint32_t target_req_per_conn
How many pending requests should ideally be running on each connection.
Definition: trunk.h:225
uint64_t _CONST req_alloc
The number of requests currently allocated that have not been freed or returned to the free list.
Definition: trunk.h:305
fr_trunk_connection_state_t _CONST state
What state the connection is in.
Definition: trunk.h:350
#define FR_TRUNK_REQUEST_STATE_ALL
All request states.
Definition: trunk.h:185
fr_trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
Definition: trunk.h:72
@ FR_TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
Definition: trunk.h:73
@ FR_TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
Definition: trunk.h:77
@ FR_TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
Definition: trunk.h:75
@ FR_TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
Definition: trunk.h:79
fr_trunk_t *_CONST trunk
Trunk this connection belongs to.
Definition: trunk.h:354
fr_connection_t *_CONST conn
The underlying connection.
Definition: trunk.h:352
fr_trunk_t *_CONST trunk
Trunk this request belongs to.
Definition: trunk.h:330
void *_CONST preq
Data for the muxer to write to the connection.
Definition: trunk.h:334
uint16_t connecting
Maximum number of connections that can be in the connecting state.
Definition: trunk.h:222
bool always_writable
Set to true if our ability to write requests to a connection handle is not dependent on the state of ...
Definition: trunk.h:256
uint16_t max
Maximum number of connections in the trunk.
Definition: trunk.h:220
fr_time_delta_t manage_interval
How often we run the management algorithm to open/close connections.
Definition: trunk.h:248
void *_CONST rctx
Resume ctx of the module.
Definition: trunk.h:336
fr_trunk_state_t _CONST state
Current state of the trunk.
Definition: trunk.h:316
fr_time_t _CONST last_read_success
Last time we read a response.
Definition: trunk.h:299
fr_time_t _CONST last_open
Last time the connection management function opened a connection.
Definition: trunk.h:289
size_t req_pool_size
The size of the talloc pool allocated with the treq.
Definition: trunk.h:254
fr_time_t _CONST last_failed
Last time a connection failed.
Definition: trunk.h:297
fr_trunk_request_cancel_mux_t request_cancel_mux
!< Read one or more requests from a connection.
Definition: trunk.h:725
fr_time_delta_t lifetime
Time between reconnects.
Definition: trunk.h:235
fr_trunk_enqueue_t
Definition: trunk.h:148
@ FR_TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
Definition: trunk.h:149
@ FR_TRUNK_ENQUEUE_NO_CAPACITY
At maximum number of connections, and no connection has capacity.
Definition: trunk.h:151
@ FR_TRUNK_ENQUEUE_OK
Operation was successful.
Definition: trunk.h:150
@ FR_TRUNK_ENQUEUE_DST_UNAVAILABLE
Destination is down.
Definition: trunk.h:153
@ FR_TRUNK_ENQUEUE_FAIL
General failure.
Definition: trunk.h:154
uint16_t min
Shouldn't let connections drop below this number.
Definition: trunk.h:218
#define FR_TRUNK_REQUEST_STATE_CANCEL_ALL
All requests in various cancellation states.
Definition: trunk.h:202
Common configuration parameters for a trunk.
Definition: trunk.h:213
Public fields for the trunk connection.
Definition: trunk.h:349
I/O functions to pass to fr_trunk_alloc.
Definition: trunk.h:711
Public fields for the trunk.
Definition: trunk.h:279
Public fields for the trunk request.
Definition: trunk.h:327
close(uq->fd)
static fr_event_list_t * el
static fr_slen_t head
Definition: xlat.h:408
char const * fr_strerror(void)
Get the last library error.
Definition: strerror.c:554
#define fr_box_time_delta(_val)
Definition: value.h:336
int nonnull(2, 5))
static size_t char ** out
Definition: value.h:984