The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
trunk.c
Go to the documentation of this file.
1 /*
2  * This program is is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or (at
5  * your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: 4f25c4263264e6c288f49422d25a2d7e66e5bcaa $
19  *
20  * @file src/lib/server/trunk.c
21  * @brief A management API for bonding multiple connections together.
22  *
23  * @copyright 2019-2020 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
24  * @copyright 2019-2020 The FreeRADIUS server project
25  */
26 
27 #define LOG_PREFIX trunk->log_prefix
28 
29 #ifdef NDEBUG
30 # define TALLOC_GET_TYPE_ABORT_NOOP 1
31 #endif
32 
33 typedef struct trunk_request_s trunk_request_t;
35 typedef struct trunk_s trunk_t;
36 #define _TRUNK_PRIVATE 1
37 #include <freeradius-devel/server/trunk.h>
38 
39 #include <freeradius-devel/server/connection.h>
40 #include <freeradius-devel/server/trigger.h>
41 #include <freeradius-devel/util/misc.h>
42 #include <freeradius-devel/util/syserror.h>
43 #include <freeradius-devel/util/table.h>
44 #include <freeradius-devel/util/minmax_heap.h>
45 
46 #ifdef HAVE_STDATOMIC_H
47 # include <stdatomic.h>
48 #else
49 # include <freeradius-devel/util/stdatomic.h>
50 #endif
51 
52 static atomic_uint_fast64_t request_counter = ATOMIC_VAR_INIT(1);
53 
54 #ifdef TESTING_TRUNK
56 
57 static fr_time_t test_time(void)
58 {
59  return test_time_base;
60 }
61 
62 #define fr_time test_time
63 #endif
64 
65 #ifndef NDEBUG
66 /** The maximum number of state logs to record per request
67  *
68  */
69 #define TRUNK_REQUEST_STATE_LOG_MAX 20
70 
71 /** Trace state machine changes for a particular request
72  *
73  */
74 typedef struct {
75  fr_dlist_head_t *log_head; //!< To allow the log entry to remove itself on free.
76  fr_dlist_t entry; //!< Entry in the linked list.
77  trunk_request_state_t from; //!< What state we transitioned from.
78  trunk_request_state_t to; //!< What state we transitioned to.
79 
80  trunk_connection_t *tconn; //!< The request was associated with.
81  ///< Pointer may now be invalid, do no de-reference.
82 
83  uint64_t tconn_id; //!< If the treq was associated with a connection
84  ///< the connection ID.
85  trunk_connection_state_t tconn_state; //!< If the treq was associated with a connection
86  ///< the connection state at the time of the
87  ///< state transition.
88 
89  char const *function; //!< State change occurred in.
90  int line; //!< Line change occurred on.
92 #endif
93 
94 /** Wraps a normal request
95  *
96  */
98  struct trunk_request_pub_s pub; //!< Public fields in the trunk request.
99  ///< This *MUST* be the first field in this
100  ///< structure.
101 
102  uint64_t id; //!< Trunk request ID.
103 
104  fr_heap_index_t heap_id; //!< Used to track the request conn->pending heap.
105 
106  fr_dlist_t entry; //!< Used to track the trunk request in the conn->sent
107  ///< or trunk->backlog request.
108 
109  trunk_cancel_reason_t cancel_reason; //!< Why this request was cancelled.
110 
111  fr_time_t last_freed; //!< Last time this request was freed.
112 
113  bool bound_to_conn; //!< Fail the request if there's an attempt to
114  ///< re-enqueue it.
115 
116  bool sent; //!< Trunk request has been sent at least once.
117  ///< Used so that re-queueing doesn't increase trunk
118  ///< `sent` count.
119 
120 #ifndef NDEBUG
121  fr_dlist_head_t log; //!< State change log.
122 #endif
123 };
124 
125 
126 /** Associates request queues with a connection
127  *
128  * @dotfile src/lib/server/trunk_conn.gv "Trunk connection state machine"
129  * @dotfile src/lib/server/trunk_req.gv "Trunk request state machine"
130  */
132  struct trunk_connection_pub_s pub; //!< Public fields in the trunk connection.
133  ///< This *MUST* be the first field in this
134  ///< structure.
135 
136  fr_heap_index_t heap_id; //!< Used to track the connection in the connected
137  ///< heap.
138 
139  fr_dlist_t entry; //!< Used to track the connection in the connecting,
140  ///< full and failed lists.
141 
142  /** @name State
143  * @{
144  */
145  trunk_connection_event_t events; //!< The current events we expect to be notified on.
146  /** @} */
147 
148  /** @name Request lists
149  * @{
150  */
151  fr_heap_t *pending; //!< Requests waiting to be sent.
152 
153  trunk_request_t *partial; //!< Partially written request.
154 
155  fr_dlist_head_t sent; //!< Sent request.
156 
157  fr_dlist_head_t reapable; //!< Idle request.
158 
159  fr_dlist_head_t cancel; //!< Requests in the cancel state.
160 
161  trunk_request_t *cancel_partial; //!< Partially written cancellation request.
162 
163  fr_dlist_head_t cancel_sent; //!< Sent cancellation request.
164  /** @} */
165 
166  /** @name Statistics
167  * @{
168  */
169  uint64_t sent_count; //!< The number of requests that have been sent using
170  ///< this connection.
171  /** @} */
172 
173  /** @name Timers
174  * @{
175  */
176  fr_event_timer_t const *lifetime_ev; //!< Maximum time this connection can be open.
177  /** @} */
178 };
179 
180 /** An entry in a trunk watch function list
181  *
182  */
183 typedef struct trunk_watch_entry_s {
184  fr_dlist_t entry; //!< List entry.
185  trunk_watch_t func; //!< Function to call when a trunk enters
186  ///< the state this list belongs to
187  bool oneshot; //!< Remove the function after it's called once.
188  bool enabled; //!< Whether the watch entry is enabled.
189  void *uctx; //!< User data to pass to the function.
191 
192 /** Main trunk management handle
193  *
194  */
195 struct trunk_s {
196  struct trunk_pub_s pub; //!< Public fields in the trunk connection.
197  ///< This *MUST* be the first field in this
198  ///< structure.
199 
200  char const *log_prefix; //!< What to prepend to messages.
201 
202  fr_event_list_t *el; //!< Event list used by this trunk and the connection.
203 
204  trunk_conf_t conf; //!< Trunk common configuration.
205 
206  fr_dlist_head_t free_requests; //!< Requests in the unassigned state. Waiting to be
207  ///< enqueued.
208 
209  fr_heap_t *backlog; //!< The request backlog. Requests we couldn't
210  ///< immediately assign to a connection.
211 
212  /** @name Connection lists
213  *
214  * A connection must always be in exactly one of these lists
215  * or trees.
216  *
217  * @{
218  */
219  fr_dlist_head_t init; //!< Connections which have not yet started
220  ///< connecting.
221 
222  fr_dlist_head_t connecting; //!< Connections which are not yet in the open state.
223 
224  fr_minmax_heap_t *active; //!< Connections which can service requests.
225 
226  fr_dlist_head_t full; //!< Connections which have too many outstanding
227  ///< requests.
228 
229  fr_dlist_head_t inactive; //!< Connections which have been signalled to be
230  ///< inactive by the API client.
231 
232  fr_dlist_head_t inactive_draining; //!< Connections which have been signalled to be
233  ///< inactive by the API client, which the trunk
234  ///< manager is draining to close.
235 
236  fr_dlist_head_t failed; //!< Connections that'll be reconnected shortly.
237 
238  fr_dlist_head_t closed; //!< Connections that have closed. Either due to
239  ///< shutdown, reconnection or failure.
240 
241  fr_dlist_head_t draining; //!< Connections that will be freed once all their
242  ///< requests are complete, but can be reactivated.
243 
244  fr_dlist_head_t draining_to_free; //!< Connections that will be freed once all their
245  ///< requests are complete.
246 
247  fr_dlist_head_t to_free; //!< Connections we're done with and will free on
248  //!< the next call to trunk_manage.
249  //!< This prevents connections from being freed
250  //!< whilst we're inside callbacks.
251  /** @} */
252 
253  /** @name Callbacks
254  * @{
255  */
256  trunk_io_funcs_t funcs; //!< I/O functions.
257 
258  void *in_handler; //!< Which handler we're inside.
259 
260  void *uctx; //!< Uctx data to pass to alloc.
261 
262  fr_dlist_head_t watch[TRUNK_STATE_MAX]; //!< To be called when trunk changes state.
263 
264  trunk_watch_entry_t *next_watcher; //!< Watcher about to be run. Used to prevent nested watchers.
265  /** @} */
266 
267  /** @name Timers
268  * @{
269  */
270  fr_event_timer_t const *manage_ev; //!< Periodic connection management event.
271  /** @} */
272 
273  /** @name Log rate limiting entries
274  * @{
275  */
276  fr_rate_limit_t limit_max_requests_alloc_log; //!< Rate limit on "Refusing to alloc requests - Limit of * requests reached"
277 
278  fr_rate_limit_t limit_last_failure_log; //!< Rate limit on "Refusing to enqueue requests - No active conns"
279  /** @} */
280 
281  /** @name State
282  * @{
283  */
284  bool freeing; //!< Trunk is being freed, don't spawn new
285  ///< connections or re-enqueue.
286 
287  bool started; //!< Has the trunk been started.
288 
289  bool managing_connections; //!< Whether the trunk is allowed to manage
290  ///< (open/close) connections.
291 
292  uint64_t last_req_per_conn; //!< The last request to connection ratio we calculated.
293  /** @} */
294 };
295 
297  { FR_CONF_OFFSET("per_connection_max", trunk_conf_t, max_req_per_conn), .dflt = "2000" },
298  { FR_CONF_OFFSET("per_connection_target", trunk_conf_t, target_req_per_conn), .dflt = "1000" },
299  { FR_CONF_OFFSET("free_delay", trunk_conf_t, req_cleanup_delay), .dflt = "10.0" },
300 
302 };
303 
305  { FR_CONF_OFFSET("connect_timeout", connection_conf_t, connection_timeout), .dflt = "3.0" },
306  { FR_CONF_OFFSET("reconnect_delay", connection_conf_t, reconnection_delay), .dflt = "1" },
307 
309 };
310 
311 #ifndef TRUNK_TESTS
313  { FR_CONF_OFFSET("start", trunk_conf_t, start), .dflt = "1" },
314  { FR_CONF_OFFSET("min", trunk_conf_t, min), .dflt = "1" },
315  { FR_CONF_OFFSET("max", trunk_conf_t, max), .dflt = "5" },
316  { FR_CONF_OFFSET("connecting", trunk_conf_t, connecting), .dflt = "2" },
317  { FR_CONF_OFFSET("uses", trunk_conf_t, max_uses), .dflt = "0" },
318  { FR_CONF_OFFSET("lifetime", trunk_conf_t, lifetime), .dflt = "0" },
319 
320  { FR_CONF_OFFSET("open_delay", trunk_conf_t, open_delay), .dflt = "0.2" },
321  { FR_CONF_OFFSET("close_delay", trunk_conf_t, close_delay), .dflt = "10.0" },
322 
323  { FR_CONF_OFFSET("manage_interval", trunk_conf_t, manage_interval), .dflt = "0.2" },
324 
325  { FR_CONF_OFFSET("max_backlog", trunk_conf_t, max_backlog), .dflt = "1000" },
326 
327  { FR_CONF_OFFSET_SUBSECTION("connection", 0, trunk_conf_t, conn_conf, trunk_config_connection), .subcs_size = sizeof(trunk_config_connection) },
328  { FR_CONF_POINTER("request", 0, CONF_FLAG_SUBSECTION, NULL), .subcs = (void const *) trunk_config_request },
329 
331 };
332 #endif
333 
334 #ifndef NDEBUG
335 /** Map request states to trigger names
336  *
337  * Must stay in the same order as #trunk_connection_state_t
338  */
340  { L("pool.request_init"), TRUNK_REQUEST_STATE_INIT }, /* 0x0000 - bit 0 */
341  { L("pool.request_unassigned"), TRUNK_REQUEST_STATE_UNASSIGNED }, /* 0x0001 - bit 1 */
342  { L("pool.request_backlog"), TRUNK_REQUEST_STATE_BACKLOG }, /* 0x0002 - bit 2 */
343  { L("pool.request_pending"), TRUNK_REQUEST_STATE_PENDING }, /* 0x0004 - bit 3 */
344  { L("pool.request_partial"), TRUNK_REQUEST_STATE_PARTIAL }, /* 0x0008 - bit 4 */
345  { L("pool.request_sent"), TRUNK_REQUEST_STATE_SENT }, /* 0x0010 - bit 5 */
346  { L("pool.request_state_reapable"), TRUNK_REQUEST_STATE_REAPABLE }, /* 0x0020 - bit 6 */
347  { L("pool.request_complete"), TRUNK_REQUEST_STATE_COMPLETE }, /* 0x0040 - bit 7 */
348  { L("pool.request_state_failed"), TRUNK_REQUEST_STATE_FAILED }, /* 0x0080 - bit 8 */
349  { L("pool.request_state_cancel"), TRUNK_REQUEST_STATE_CANCEL }, /* 0x0100 - bit 9 */
350  { L("pool.request_state_cancel_sent"), TRUNK_REQUEST_STATE_CANCEL_SENT }, /* 0x0200 - bit 10 */
351  { L("pool.request_state_cancel_partial"), TRUNK_REQUEST_STATE_CANCEL_PARTIAL }, /* 0x0400 - bit 11 */
352  { L("pool.request_state_cancel_complete"), TRUNK_REQUEST_STATE_CANCEL_COMPLETE }, /* 0x0800 - bit 12 */
353 };
355 #endif
356 
358  { L("INIT"), TRUNK_REQUEST_STATE_INIT },
359  { L("UNASSIGNED"), TRUNK_REQUEST_STATE_UNASSIGNED },
360  { L("BACKLOG"), TRUNK_REQUEST_STATE_BACKLOG },
361  { L("PENDING"), TRUNK_REQUEST_STATE_PENDING },
362  { L("PARTIAL"), TRUNK_REQUEST_STATE_PARTIAL },
363  { L("SENT"), TRUNK_REQUEST_STATE_SENT },
364  { L("REAPABLE"), TRUNK_REQUEST_STATE_REAPABLE },
365  { L("COMPLETE"), TRUNK_REQUEST_STATE_COMPLETE },
366  { L("FAILED"), TRUNK_REQUEST_STATE_FAILED },
367  { L("CANCEL"), TRUNK_REQUEST_STATE_CANCEL },
368  { L("CANCEL-SENT"), TRUNK_REQUEST_STATE_CANCEL_SENT },
369  { L("CANCEL-PARTIAL"), TRUNK_REQUEST_STATE_CANCEL_PARTIAL },
370  { L("CANCEL-COMPLETE"), TRUNK_REQUEST_STATE_CANCEL_COMPLETE }
371 };
373 
374 /** Map connection states to trigger names
375  *
376  * Must stay in the same order as #trunk_connection_state_t
377  */
379  { L("pool.connection_halted"), TRUNK_CONN_HALTED }, /* 0x0000 - bit 0 */
380  { L("pool.connection_init"), TRUNK_CONN_INIT }, /* 0x0001 - bit 1 */
381  { L("pool.connection_connecting"), TRUNK_CONN_CONNECTING }, /* 0x0002 - bit 2 */
382  { L("pool.connection_active"), TRUNK_CONN_ACTIVE }, /* 0x0004 - bit 3 */
383  { L("pool.connection_closed"), TRUNK_CONN_CLOSED }, /* 0x0008 - bit 4 */
384  { L("pool.connection_full"), TRUNK_CONN_FULL }, /* 0x0010 - bit 5 */
385  { L("pool.connection_inactive"), TRUNK_CONN_INACTIVE }, /* 0x0020 - bit 6 */
386  { L("pool.connection_inactive_draining"), TRUNK_CONN_INACTIVE_DRAINING }, /* 0x0040 - bit 7 */
387  { L("pool.connection_draining"), TRUNK_CONN_DRAINING }, /* 0x0080 - bit 8 */
388  { L("pool.connection_draining_to_free"), TRUNK_CONN_DRAINING_TO_FREE } /* 0x0100 - bit 9 */
389 };
391 
393  { L("IDLE"), TRUNK_STATE_IDLE },
394  { L("ACTIVE"), TRUNK_STATE_ACTIVE },
395  { L("PENDING"), TRUNK_STATE_PENDING }
396 };
398 
400  { L("INIT"), TRUNK_CONN_INIT },
401  { L("HALTED"), TRUNK_CONN_HALTED },
402  { L("CONNECTING"), TRUNK_CONN_CONNECTING },
403  { L("ACTIVE"), TRUNK_CONN_ACTIVE },
404  { L("CLOSED"), TRUNK_CONN_CLOSED },
405  { L("FULL"), TRUNK_CONN_FULL },
406  { L("INACTIVE"), TRUNK_CONN_INACTIVE },
407  { L("INACTIVE-DRAINING"), TRUNK_CONN_INACTIVE_DRAINING },
408  { L("DRAINING"), TRUNK_CONN_DRAINING },
409  { L("DRAINING-TO-FREE"), TRUNK_CONN_DRAINING_TO_FREE }
410 };
412 
414  { L("TRUNK_CANCEL_REASON_NONE"), TRUNK_CANCEL_REASON_NONE },
415  { L("TRUNK_CANCEL_REASON_SIGNAL"), TRUNK_CANCEL_REASON_SIGNAL },
416  { L("TRUNK_CANCEL_REASON_MOVE"), TRUNK_CANCEL_REASON_MOVE },
417  { L("TRUNK_CANCEL_REASON_REQUEUE"), TRUNK_CANCEL_REASON_REQUEUE }
418 };
420 
422  { L("TRUNK_CONN_EVENT_NONE"), TRUNK_CONN_EVENT_NONE },
423  { L("TRUNK_CONN_EVENT_READ"), TRUNK_CONN_EVENT_READ },
424  { L("TRUNK_CONN_EVENT_WRITE"), TRUNK_CONN_EVENT_WRITE },
425  { L("TRUNK_CONN_EVENT_BOTH"), TRUNK_CONN_EVENT_BOTH },
426 };
428 
429 #define CONN_TRIGGER(_state) do { \
430  if (trunk->pub.triggers) { \
431  trigger_exec(unlang_interpret_get_thread_default(), \
432  NULL, fr_table_str_by_value(trunk_conn_trigger_names, _state, \
433  "<INVALID>"), true, NULL); \
434  } \
435 } while (0)
436 
437 #define CONN_STATE_TRANSITION(_new, _log) \
438 do { \
439  _log("[%" PRIu64 "] Trunk connection changed state %s -> %s", \
440  tconn->pub.conn->id, \
441  fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
442  fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>")); \
443  tconn->pub.state = _new; \
444  CONN_TRIGGER(_new); \
445  trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false); \
446 } while (0)
447 
448 #define CONN_BAD_STATE_TRANSITION(_new) \
449 do { \
450  if (!fr_cond_assert_msg(0, "[%" PRIu64 "] Trunk connection invalid transition %s -> %s", \
451  tconn->pub.conn->id, \
452  fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
453  fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>"))) return; \
454 } while (0)
455 
456 #ifndef NDEBUG
457 void trunk_request_state_log_entry_add(char const *function, int line,
458  trunk_request_t *treq, trunk_request_state_t new) CC_HINT(nonnull);
459 
460 #define REQUEST_TRIGGER(_state) do { \
461  if (trunk->pub.triggers) { \
462  trigger_exec(unlang_interpret_get_thread_default(), \
463  NULL, fr_table_str_by_value(trunk_req_trigger_names, _state, \
464  "<INVALID>"), true, NULL); \
465  } \
466 } while (0)
467 
468 /** Record a request state transition and log appropriate output
469  *
470  */
471 #define REQUEST_STATE_TRANSITION(_new) \
472 do { \
473  request_t *request = treq->pub.request; \
474  ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
475  treq->id, \
476  fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
477  fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
478  trunk_request_state_log_entry_add(__FUNCTION__, __LINE__, treq, _new); \
479  treq->pub.state = _new; \
480  REQUEST_TRIGGER(_new); \
481 } while (0)
482 #define REQUEST_BAD_STATE_TRANSITION(_new) \
483 do { \
484  trunk_request_state_log(&default_log, L_ERR, __FILE__, __LINE__, treq); \
485  if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
486  treq->id, \
487  fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
488  fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
489 } while (0)
490 #else
491 /** Record a request state transition
492  *
493  */
494 #define REQUEST_STATE_TRANSITION(_new) \
495 do { \
496  request_t *request = treq->pub.request; \
497  ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
498  treq->id, \
499  fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
500  fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
501  treq->pub.state = _new; \
502 } while (0)
503 #define REQUEST_BAD_STATE_TRANSITION(_new) \
504 do { \
505  if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
506  treq->id, \
507  fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
508  fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
509 } while (0)
510 #endif
511 
512 
513 /** Call the cancel callback if set
514  *
515  */
516 #define DO_REQUEST_CANCEL(_treq, _reason) \
517 do { \
518  if ((_treq)->pub.trunk->funcs.request_cancel) { \
519  request_t *request = (_treq)->pub.request; \
520  void *_prev = (_treq)->pub.trunk->in_handler; \
521  (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_cancel; \
522  ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_cancel(conn=%p, preq=%p, reason=%s, uctx=%p)", \
523  (_treq)->pub.tconn->pub.conn, \
524  (_treq)->pub.preq, \
525  fr_table_str_by_value(trunk_cancellation_reasons, \
526  (_reason), \
527  "<INVALID>"), \
528  (_treq)->pub.trunk->uctx); \
529  (_treq)->pub.trunk->funcs.request_cancel((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_reason), (_treq)->pub.trunk->uctx); \
530  (_treq)->pub.trunk->in_handler = _prev; \
531  } \
532 } while(0)
533 
534 /** Call the "conn_release" callback (if set)
535  *
536  */
537 #define DO_REQUEST_CONN_RELEASE(_treq) \
538 do { \
539  if ((_treq)->pub.trunk->funcs.request_conn_release) { \
540  request_t *request = (_treq)->pub.request; \
541  void *_prev = (_treq)->pub.trunk->in_handler; \
542  (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_conn_release; \
543  ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_conn_release(conn=%p, preq=%p, uctx=%p)", \
544  (_treq)->pub.tconn->pub.conn, \
545  (_treq)->pub.preq, \
546  (_treq)->pub.trunk->uctx); \
547  (_treq)->pub.trunk->funcs.request_conn_release((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
548  (_treq)->pub.trunk->in_handler = _prev; \
549  } \
550 } while(0)
551 
552 /** Call the complete callback (if set)
553  *
554  */
555 #define DO_REQUEST_COMPLETE(_treq) \
556 do { \
557  if ((_treq)->pub.trunk->funcs.request_complete) { \
558  request_t *request = (_treq)->pub.request; \
559  void *_prev = (_treq)->pub.trunk->in_handler; \
560  ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_complete(request=%p, preq=%p, rctx=%p, uctx=%p)", \
561  (_treq)->pub.request, \
562  (_treq)->pub.preq, \
563  (_treq)->pub.rctx, \
564  (_treq)->pub.trunk->uctx); \
565  (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_complete; \
566  (_treq)->pub.trunk->funcs.request_complete((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, (_treq)->pub.trunk->uctx); \
567  (_treq)->pub.trunk->in_handler = _prev; \
568  } \
569 } while(0)
570 
571 /** Call the fail callback (if set)
572  *
573  */
574 #define DO_REQUEST_FAIL(_treq, _prev_state) \
575 do { \
576  if ((_treq)->pub.trunk->funcs.request_fail) { \
577  request_t *request = (_treq)->pub.request; \
578  void *_prev = (_treq)->pub.trunk->in_handler; \
579  ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_fail(request=%p, preq=%p, rctx=%p, state=%s uctx=%p)", \
580  (_treq)->pub.request, \
581  (_treq)->pub.preq, \
582  (_treq)->pub.rctx, \
583  fr_table_str_by_value(trunk_request_states, (_prev_state), "<INVALID>"), \
584  (_treq)->pub.trunk->uctx); \
585  (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_fail; \
586  (_treq)->pub.trunk->funcs.request_fail((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, _prev_state, (_treq)->pub.trunk->uctx); \
587  (_treq)->pub.trunk->in_handler = _prev; \
588  } \
589 } while(0)
590 
591 /** Call the free callback (if set)
592  *
593  */
594 #define DO_REQUEST_FREE(_treq) \
595 do { \
596  if ((_treq)->pub.trunk->funcs.request_free) { \
597  request_t *request = (_treq)->pub.request; \
598  void *_prev = (_treq)->pub.trunk->in_handler; \
599  ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_free(request=%p, preq=%p, uctx=%p)", \
600  (_treq)->pub.request, \
601  (_treq)->pub.preq, \
602  (_treq)->pub.trunk->uctx); \
603  (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_free; \
604  (_treq)->pub.trunk->funcs.request_free((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
605  (_treq)->pub.trunk->in_handler = _prev; \
606  } \
607 } while(0)
608 
609 /** Write one or more requests to a connection
610  *
611  */
612 #define DO_REQUEST_MUX(_tconn) \
613 do { \
614  void *_prev = (_tconn)->pub.trunk->in_handler; \
615  DEBUG3("[%" PRIu64 "] Calling request_mux(el=%p, tconn=%p, conn=%p, uctx=%p)", \
616  (_tconn)->pub.conn->id, \
617  (_tconn)->pub.trunk->el, \
618  (_tconn), \
619  (_tconn)->pub.conn, \
620  (_tconn)->pub.trunk->uctx); \
621  (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_mux; \
622  (_tconn)->pub.trunk->funcs.request_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
623  (_tconn)->pub.trunk->in_handler = _prev; \
624 } while(0)
625 
626 /** Read one or more requests from a connection
627  *
628  */
629 #define DO_REQUEST_DEMUX(_tconn) \
630 do { \
631  void *_prev = (_tconn)->pub.trunk->in_handler; \
632  DEBUG3("[%" PRIu64 "] Calling request_demux(tconn=%p, conn=%p, uctx=%p)", \
633  (_tconn)->pub.conn->id, \
634  (_tconn), \
635  (_tconn)->pub.conn, \
636  (_tconn)->pub.trunk->uctx); \
637  (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_demux; \
638  (_tconn)->pub.trunk->funcs.request_demux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
639  (_tconn)->pub.trunk->in_handler = _prev; \
640 } while(0)
641 
642 /** Write one or more cancellation requests to a connection
643  *
644  */
645 #define DO_REQUEST_CANCEL_MUX(_tconn) \
646 do { \
647  if ((_tconn)->pub.trunk->funcs.request_cancel_mux) { \
648  void *_prev = (_tconn)->pub.trunk->in_handler; \
649  DEBUG3("[%" PRIu64 "] Calling request_cancel_mux(tconn=%p, conn=%p, uctx=%p)", \
650  (_tconn)->pub.conn->id, \
651  (_tconn), \
652  (_tconn)->pub.conn, \
653  (_tconn)->pub.trunk->uctx); \
654  (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_cancel_mux; \
655  (_tconn)->pub.trunk->funcs.request_cancel_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
656  (_tconn)->pub.trunk->in_handler = _prev; \
657  } \
658 } while(0)
659 
660 /** Allocate a new connection
661  *
662  */
663 #define DO_CONNECTION_ALLOC(_tconn) \
664 do { \
665  void *_prev = trunk->in_handler; \
666  DEBUG3("Calling connection_alloc(tconn=%p, el=%p, conf=%p, log_prefix=\"%s\", uctx=%p)", \
667  (_tconn), \
668  (_tconn)->pub.trunk->el, \
669  (_tconn)->pub.trunk->conf.conn_conf, \
670  trunk->log_prefix, \
671  (_tconn)->pub.trunk->uctx); \
672  (_tconn)->pub.trunk->in_handler = (void *) (_tconn)->pub.trunk->funcs.connection_alloc; \
673  (_tconn)->pub.conn = trunk->funcs.connection_alloc((_tconn), (_tconn)->pub.trunk->el, (_tconn)->pub.trunk->conf.conn_conf, (_tconn)->pub.trunk->log_prefix, trunk->uctx); \
674  (_tconn)->pub.trunk->in_handler = _prev; \
675  if (!(_tconn)->pub.conn) { \
676  ERROR("Failed creating new connection"); \
677  talloc_free(tconn); \
678  return -1; \
679  } \
680 } while(0)
681 
682 /** Change what events the connection should be notified about
683  *
684  */
685 #define DO_CONNECTION_NOTIFY(_tconn, _events) \
686 do { \
687  if ((_tconn)->pub.trunk->funcs.connection_notify) { \
688  void *_prev = (_tconn)->pub.trunk->in_handler; \
689  DEBUG3("[%" PRIu64 "] Calling connection_notify(tconn=%p, conn=%p, el=%p, events=%s, uctx=%p)", \
690  (_tconn)->pub.conn->id, \
691  (_tconn), \
692  (_tconn)->pub.conn, \
693  (_tconn)->pub.trunk->el, \
694  fr_table_str_by_value(trunk_connection_events, (_events), "<INVALID>"), \
695  (_tconn)->pub.trunk->uctx); \
696  (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.connection_notify; \
697  (_tconn)->pub.trunk->funcs.connection_notify((_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->el, (_events), (_tconn)->pub.trunk->uctx); \
698  (_tconn)->pub.trunk->in_handler = _prev; \
699  } \
700 } while(0)
701 
702 #define IN_HANDLER(_trunk) (((_trunk)->in_handler) != NULL)
703 #define IN_REQUEST_MUX(_trunk) (((_trunk)->funcs.request_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_mux))
704 #define IN_REQUEST_DEMUX(_trunk) (((_trunk)->funcs.request_demux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_demux))
705 #define IN_REQUEST_CANCEL_MUX(_trunk) (((_trunk)->funcs.request_cancel_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_cancel_mux))
706 
707 #define IS_SERVICEABLE(_tconn) ((_tconn)->pub.state & TRUNK_CONN_SERVICEABLE)
708 #define IS_PROCESSING(_tconn) ((tconn)->pub.state & TRUNK_CONN_PROCESSING)
709 
710 /** Remove the current request from the backlog
711  *
712  */
713 #define REQUEST_EXTRACT_BACKLOG(_treq) \
714 do { \
715  int _ret; \
716  _ret = fr_heap_extract(&(_treq)->pub.trunk->backlog, _treq); \
717  if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from backlog heap: %s", fr_strerror())) break; \
718 } while (0)
719 
720 /** Remove the current request from the pending list
721  *
722  */
723 #define REQUEST_EXTRACT_PENDING(_treq) \
724 do { \
725  int _ret; \
726  _ret = fr_heap_extract(&(_treq)->pub.tconn->pending, _treq); \
727  if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from pending heap: %s", fr_strerror())) break; \
728 } while (0)
729 
730 /** Remove the current request from the partial slot
731  *
732  */
733 #define REQUEST_EXTRACT_PARTIAL(_treq) \
734 do { \
735  fr_assert((_treq)->pub.tconn->partial == treq); \
736  tconn->partial = NULL; \
737 } while (0)
738 
739 /** Remove the current request from the sent list
740  *
741  */
742 #define REQUEST_EXTRACT_SENT(_treq) fr_dlist_remove(&tconn->sent, treq)
743 
744 /** Remove the current request from the reapable list
745  *
746  */
747 #define REQUEST_EXTRACT_REAPABLE(_treq) fr_dlist_remove(&tconn->reapable, treq)
748 
749 /** Remove the current request from the cancel list
750  *
751  */
752 #define REQUEST_EXTRACT_CANCEL(_treq) fr_dlist_remove(&tconn->cancel, treq)
753 
754 /** Remove the current request from the cancel_partial slot
755  *
756  */
757 #define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq) \
758 do { \
759  fr_assert((_treq)->pub.tconn->cancel_partial == treq); \
760  tconn->cancel_partial = NULL; \
761 } while (0)
762 
763 /** Remove the current request from the cancel sent list
764  *
765  */
766 #define REQUEST_EXTRACT_CANCEL_SENT(_treq) fr_dlist_remove(&tconn->cancel_sent, treq)
767 
768 /** Reorder the connections in the active heap
769  *
770  * fr_heap_extract will also error out if heap_id is bad - no need for assert
771  */
772 #define CONN_REORDER(_tconn) \
773 do { \
774  int _ret; \
775  if ((fr_minmax_heap_num_elements((_tconn)->pub.trunk->active) == 1)) break; \
776  if (!fr_cond_assert((_tconn)->pub.state == TRUNK_CONN_ACTIVE)) break; \
777  _ret = fr_minmax_heap_extract((_tconn)->pub.trunk->active, (_tconn)); \
778  if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) break; \
779  fr_minmax_heap_insert((_tconn)->pub.trunk->active, (_tconn)); \
780 } while (0)
781 
782 /** Call a list of watch functions associated with a state
783  *
784  */
785 static inline void trunk_watch_call(trunk_t *trunk, fr_dlist_head_t *list, trunk_state_t state)
786 {
787  /*
788  * Nested watcher calls are not allowed
789  * and shouldn't be possible because of
790  * deferred signal processing.
791  */
792  fr_assert(trunk->next_watcher == NULL);
793 
794  while ((trunk->next_watcher = fr_dlist_next(list, trunk->next_watcher))) {
795  trunk_watch_entry_t *entry = trunk->next_watcher;
796  bool oneshot = entry->oneshot; /* Watcher could be freed, so store now */
797 
798  if (!entry->enabled) continue;
799  if (oneshot) trunk->next_watcher = fr_dlist_remove(list, entry);
800 
801  entry->func(trunk, trunk->pub.state, state, entry->uctx);
802 
803  if (oneshot) talloc_free(entry);
804  }
805  trunk->next_watcher = NULL;
806 }
807 
808 /** Call the state change watch functions
809  *
810  */
811 #define CALL_WATCHERS(_trunk, _state) \
812 do { \
813  if (fr_dlist_empty(&(_trunk)->watch[_state])) break; \
814  trunk_watch_call((_trunk), &(_trunk)->watch[_state], _state); \
815 } while(0)
816 
817 /** Remove a watch function from a trunk state list
818  *
819  * @param[in] trunk The trunk to remove the watcher from.
820  * @param[in] state to remove the watch from.
821  * @param[in] watch Function to remove.
822  * @return
823  * - 0 if the function was removed successfully.
824  * - -1 if the function wasn't present in the watch list.
825  * - -2 if an invalid state was passed.
826  */
828 {
829  trunk_watch_entry_t *entry = NULL;
830  fr_dlist_head_t *list;
831 
832  if (state >= TRUNK_STATE_MAX) return -2;
833 
834  list = &trunk->watch[state];
835  while ((entry = fr_dlist_next(list, entry))) {
836  if (entry->func == watch) {
837  if (trunk->next_watcher == entry) {
838  trunk->next_watcher = fr_dlist_remove(list, entry);
839  } else {
840  fr_dlist_remove(list, entry);
841  }
842  talloc_free(entry);
843  return 0;
844  }
845  }
846 
847  return -1;
848 }
849 
850 /** Add a watch entry to the trunk state list
851  *
852  * @param[in] trunk The trunk to add the watcher to.
853  * @param[in] state to watch for.
854  * @param[in] watch Function to add.
855  * @param[in] oneshot Should this watcher only be run once.
856  * @param[in] uctx Context to pass to function.
857  * @return
858  * - NULL if an invalid state is passed.
859  * - A new watch entry handle on success.
860  */
862  trunk_watch_t watch, bool oneshot, void const *uctx)
863 {
864  trunk_watch_entry_t *entry;
865  fr_dlist_head_t *list;
866 
867  if (state >= TRUNK_STATE_MAX) return NULL;
868 
869  list = &trunk->watch[state];
870  MEM(entry = talloc_zero(trunk, trunk_watch_entry_t));
871 
872  entry->func = watch;
873  entry->oneshot = oneshot;
874  entry->enabled = true;
875  memcpy(&entry->uctx, &uctx, sizeof(entry->uctx));
876  fr_dlist_insert_tail(list, entry);
877 
878  return entry;
879 }
880 
881 #define TRUNK_STATE_TRANSITION(_new) \
882 do { \
883  DEBUG3("Trunk changed state %s -> %s", \
884  fr_table_str_by_value(trunk_states, trunk->pub.state, "<INVALID>"), \
885  fr_table_str_by_value(trunk_states, _new, "<INVALID>")); \
886  CALL_WATCHERS(trunk, _new); \
887  trunk->pub.state = _new; \
888 } while (0)
889 
890 static void trunk_request_enter_backlog(trunk_request_t *treq, bool new);
891 static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new);
893 static void trunk_request_enter_sent(trunk_request_t *treq);
900 
901 static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out,
902  trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify);
903 
904 static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now);
905 static inline void trunk_connection_auto_full(trunk_connection_t *tconn);
906 static inline void trunk_connection_auto_unfull(trunk_connection_t *tconn);
907 static inline void trunk_connection_readable(trunk_connection_t *tconn);
908 static inline void trunk_connection_writable(trunk_connection_t *tconn);
916 
917 static void trunk_rebalance(trunk_t *trunk);
918 static void trunk_manage(trunk_t *trunk, fr_time_t now);
919 static void _trunk_timer(fr_event_list_t *el, fr_time_t now, void *uctx);
920 static void trunk_backlog_drain(trunk_t *trunk);
921 
922 /** Compare two protocol requests
923  *
924  * Allows protocol requests to be prioritised with a function
925  * specified by the API client. Defaults to by pointer address
926  * if no function is specified.
927  *
928  * @param[in] a treq to compare to b.
929  * @param[in] b treq to compare to a.
930  * @return
931  * - +1 if a > b.
932  * - 0 if a == b.
933  * - -1 if a < b.
934  */
935 static int8_t _trunk_request_prioritise(void const *a, void const *b)
936 {
939 
940  fr_assert(treq_a->pub.trunk == treq_b->pub.trunk);
941 
942  return treq_a->pub.trunk->funcs.request_prioritise(treq_a->pub.preq, treq_b->pub.preq);
943 }
944 
945 /** Remove a request from all connection lists
946  *
947  * A common function used by init, fail, complete state functions to disassociate
948  * a request from a connection in preparation for freeing or reassignment.
949  *
950  * Despite its unassuming name, this function is *the* place to put calls to
951  * functions which need to be called when the number of requests associated with
952  * a connection changes.
953  *
954  * Trunk requests will always be passed to this function before they're removed
955  * from a connection, even if the requests are being freed.
956  *
957  * @param[in] treq to trigger a state change for.
958  */
960 {
961  trunk_connection_t *tconn = treq->pub.tconn;
962  trunk_t *trunk = treq->pub.trunk;
963 
964  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
965 
966  switch (treq->pub.state) {
968  return; /* Not associated with connection */
969 
972  break;
973 
976  break;
977 
979  REQUEST_EXTRACT_SENT(treq);
980  break;
981 
984  break;
985 
988  break;
989 
992  break;
993 
996  break;
997 
998  default:
999  fr_assert(0);
1000  break;
1001  }
1002 
1003  /*
1004  * If the request wasn't associated with a
1005  * connection, then there's nothing more
1006  * to do.
1007  */
1008  if (!tconn) return;
1009 
1010  {
1011  request_t *request = treq->pub.request;
1012 
1013  ROPTIONAL(RDEBUG3, DEBUG3, "%s Trunk connection released request %" PRIu64,
1014  tconn->pub.conn->name, treq->id);
1015  }
1016  /*
1017  * Release any connection specific resources the
1018  * treq holds.
1019  */
1021 
1022  switch (tconn->pub.state){
1023  case TRUNK_CONN_FULL:
1024  trunk_connection_auto_unfull(tconn); /* Check if we can switch back to active */
1025  if (tconn->pub.state == TRUNK_CONN_FULL) break; /* Only fallthrough if conn is now active */
1026  FALL_THROUGH;
1027 
1028  case TRUNK_CONN_ACTIVE:
1029  CONN_REORDER(tconn);
1030  break;
1031 
1032  default:
1033  break;
1034  }
1035 
1036  treq->pub.tconn = NULL;
1037 
1038  /*
1039  * Request removed from the connection
1040  * see if we need up deregister I/O events.
1041  */
1043 }
1044 
1045 /** Transition a request to the unassigned state, in preparation for re-assignment
1046  *
1047  * @note treq->tconn may be inviable after calling
1048  * if treq->conn and connection_signals_pause are not used.
1049  * This is due to call to trunk_request_remove_from_conn.
1050  *
1051  * @param[in] treq to trigger a state change for.
1052  */
1054 {
1055  trunk_t *trunk = treq->pub.trunk;
1056 
1057  switch (treq->pub.state) {
1059  return;
1060 
1063  break;
1064 
1070  break;
1071 
1072  default:
1074  }
1075 
1077 }
1078 
1079 /** Transition a request to the backlog state, adding it to the backlog of the trunk
1080  *
1081  * @note treq->tconn and treq may be inviable after calling
1082  * if treq->conn and connection_signals_pause are not used.
1083  * This is due to call to trunk_manage.
1084  *
1085  * @param[in] treq to trigger a state change for.
1086  * @param[in] new Whether this is a new request.
1087  */
1088 static void trunk_request_enter_backlog(trunk_request_t *treq, bool new)
1089 {
1090  trunk_connection_t *tconn = treq->pub.tconn;
1091  trunk_t *trunk = treq->pub.trunk;
1092 
1093  switch (treq->pub.state) {
1096  break;
1097 
1100  break;
1101 
1103  REQUEST_EXTRACT_CANCEL(treq);
1104  break;
1105 
1106  default:
1108  }
1109 
1111  fr_heap_insert(&trunk->backlog, treq); /* Insert into the backlog heap */
1112 
1113  /*
1114  * A new request has entered the trunk.
1115  * Re-calculate request/connection ratios.
1116  */
1117  if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1118 
1119  /*
1120  * To reduce latency, if there's no connections
1121  * in the connecting state, call the trunk manage
1122  * function immediately.
1123  *
1124  * Likewise, if there's draining connections
1125  * which could be moved back to active call
1126  * the trunk manage function.
1127  *
1128  * Remember requests only enter the backlog if
1129  * there's no connections which can service them.
1130  */
1134  }
1135 }
1136 
1137 /** Transition a request to the pending state, adding it to the backlog of an active connection
1138  *
1139  * All trunk requests being added to a connection get passed to this function.
1140  * All trunk requests being removed from a connection get passed to #trunk_request_remove_from_conn.
1141  *
1142  * @note treq->tconn and treq may be inviable after calling
1143  * if treq->conn and connection_signals_pause is not used.
1144  * This is due to call to trunk_connection_event_update.
1145  *
1146  * @param[in] treq to trigger a state change for.
1147  * @param[in] tconn to enqueue the request on.
1148  * @param[in] new Whether this is a new request.
1149  */
1151 {
1152  trunk_t *trunk = treq->pub.trunk;
1153 
1154  fr_assert(tconn->pub.trunk == trunk);
1155  fr_assert(IS_PROCESSING(tconn));
1156 
1157  switch (treq->pub.state) {
1160  fr_assert(!treq->pub.tconn);
1161  break;
1162 
1164  fr_assert(!treq->pub.tconn);
1166  break;
1167 
1168  case TRUNK_REQUEST_STATE_CANCEL: /* Moved from another connection */
1169  REQUEST_EXTRACT_CANCEL(treq);
1170  break;
1171 
1172  default:
1174  }
1175 
1176  /*
1177  * Assign the new connection first this first so
1178  * it appears in the state log.
1179  */
1180  treq->pub.tconn = tconn;
1181 
1183 
1184  {
1185  request_t *request = treq->pub.request;
1186 
1187  ROPTIONAL(RDEBUG, DEBUG3, "%s Trunk connection assigned request %"PRIu64,
1188  tconn->pub.conn->name, treq->id);
1189  }
1190  fr_heap_insert(&tconn->pending, treq);
1191 
1192  /*
1193  * A new request has entered the trunk.
1194  * Re-calculate request/connection ratios.
1195  */
1196  if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1197 
1198  /*
1199  * Check if we need to automatically transition the
1200  * connection to full.
1201  */
1203 
1204  /*
1205  * Reorder the connection in the heap now it has an
1206  * additional request.
1207  */
1208  if (tconn->pub.state == TRUNK_CONN_ACTIVE) CONN_REORDER(tconn);
1209 
1210  /*
1211  * We have a new request, see if we need to register
1212  * for I/O events.
1213  */
1215 }
1216 
1217 /** Transition a request to the partial state, indicating that is has been partially sent
1218  *
1219  * @param[in] treq to trigger a state change for.
1220  */
1222 {
1223  trunk_connection_t *tconn = treq->pub.tconn;
1224  trunk_t *trunk = treq->pub.trunk;
1225 
1226  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1227 
1228  switch (treq->pub.state) {
1229  case TRUNK_REQUEST_STATE_PENDING: /* All requests go through pending, even requeued ones */
1231  break;
1232 
1233  default:
1235  }
1236 
1237  fr_assert(!tconn->partial);
1238  tconn->partial = treq;
1239 
1241 }
1242 
1243 /** Transition a request to the sent state, indicating that it's been sent in its entirety
1244  *
1245  * @note treq->tconn and treq may be inviable after calling
1246  * if treq->conn and connection_signals_pause is not used.
1247  * This is due to call to trunk_connection_event_update.
1248  *
1249  * @param[in] treq to trigger a state change for.
1250  */
1252 {
1253  trunk_connection_t *tconn = treq->pub.tconn;
1254  trunk_t *trunk = treq->pub.trunk;
1255 
1256  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1257 
1258  switch (treq->pub.state) {
1261  break;
1262 
1265  break;
1266 
1267  default:
1269  }
1270 
1272  fr_dlist_insert_tail(&tconn->sent, treq);
1273 
1274  /*
1275  * Update the connection's sent stats if this is the
1276  * first time this request is being sent.
1277  */
1278  if (!treq->sent) {
1279  tconn->sent_count++;
1280  treq->sent = true;
1281 
1282  /*
1283  * Enforces max_uses
1284  */
1285  if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1287  }
1288  }
1289 
1290  /*
1291  * We just sent a request, we probably need
1292  * to tell the event loop we want to be
1293  * notified if there's data available.
1294  */
1296 }
1297 
1298 /** Transition a request to the reapable state, indicating that it's been sent in its entirety, but no response is expected
1299  *
1300  * @note Largely a replica of trunk_request_enter_sent.
1301  *
1302  * @param[in] treq to trigger a state change for.
1303  */
1305 {
1306  trunk_connection_t *tconn = treq->pub.tconn;
1307  trunk_t *trunk = treq->pub.trunk;
1308 
1309  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1310 
1311  switch (treq->pub.state) {
1314  break;
1315 
1318  break;
1319 
1320  default:
1322  }
1323 
1325  fr_dlist_insert_tail(&tconn->reapable, treq);
1326 
1327  if (!treq->sent) {
1328  tconn->sent_count++;
1329  treq->sent = true;
1330 
1331  if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1333  }
1334  }
1335 
1337 }
1338 
1339 /** Transition a request to the cancel state, placing it in a connection's cancellation list
1340  *
1341  * If a request_cancel_send callback is provided, that callback will
1342  * be called periodically for requests which were cancelled due to
1343  * a signal.
1344  *
1345  * The request_cancel_send callback will dequeue cancelled requests
1346  * and inform a remote server that the result is no longer required.
1347  *
1348  * A request must enter this state before being added to the backlog
1349  * of another connection if it's been sent or partially sent.
1350  *
1351  * @note treq->tconn and treq may be inviable after calling
1352  * if treq->conn and connection_signals_pause is not used.
1353  * This is due to call to trunk_connection_event_update.
1354  *
1355  * @param[in] treq to trigger a state change for.
1356  * @param[in] reason Why the request was cancelled.
1357  * Should be one of:
1358  * - TRUNK_CANCEL_REASON_SIGNAL request cancelled
1359  * because of a signal from the interpreter.
1360  * - TRUNK_CANCEL_REASON_MOVE request cancelled
1361  * because the connection failed and it needs
1362  * to be assigned to a new connection.
1363  * - TRUNK_CANCEL_REASON_REQUEUE request cancelled
1364  * as it needs to be resent on the same connection.
1365  */
1367 {
1368  trunk_connection_t *tconn = treq->pub.tconn;
1369  trunk_t *trunk = treq->pub.trunk;
1370 
1371  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1372 
1373  switch (treq->pub.state) {
1376  break;
1377 
1379  REQUEST_EXTRACT_SENT(treq);
1380  break;
1381 
1384  break;
1385 
1386  default:
1388  }
1389 
1391  fr_dlist_insert_tail(&tconn->cancel, treq);
1392  treq->cancel_reason = reason;
1393 
1394  DO_REQUEST_CANCEL(treq, reason);
1395 
1396  /*
1397  * Our treq is no longer bound to an actual
1398  * request_t *, as we can't guarantee the
1399  * lifetime of the original request_t *.
1400  */
1401  if (treq->cancel_reason == TRUNK_CANCEL_REASON_SIGNAL) treq->pub.request = NULL;
1402 
1403  /*
1404  * Register for I/O write events if we need to.
1405  */
1407 }
1408 
1409 /** Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot
1410  *
1411  * The request_demux function is then responsible for signalling
1412  * that the cancel request is complete when the remote server
1413  * acknowledges the cancellation request.
1414  *
1415  * @param[in] treq to trigger a state change for.
1416  */
1418 {
1419  trunk_connection_t *tconn = treq->pub.tconn;
1420  trunk_t *trunk = treq->pub.trunk;
1421 
1422  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1425 
1426  switch (treq->pub.state) {
1427  case TRUNK_REQUEST_STATE_CANCEL: /* The only valid state cancel_sent can be reached from */
1428  REQUEST_EXTRACT_CANCEL(treq);
1429  break;
1430 
1431  default:
1433  }
1434 
1436  fr_assert(!tconn->partial);
1437  tconn->cancel_partial = treq;
1438 }
1439 
1440 /** Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list
1441  *
1442  * The request_demux function is then responsible for signalling
1443  * that the cancel request is complete when the remote server
1444  * acknowledges the cancellation request.
1445  *
1446  * @note treq->tconn and treq may be inviable after calling
1447  * if treq->conn and connection_signals_pause is not used.
1448  * This is due to call to trunk_connection_event_update.
1449  *
1450  * @param[in] treq to trigger a state change for.
1451  */
1453 {
1454  trunk_connection_t *tconn = treq->pub.tconn;
1455  trunk_t *trunk = treq->pub.trunk;
1456 
1457  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1460 
1461  switch (treq->pub.state) {
1464  break;
1465 
1467  REQUEST_EXTRACT_CANCEL(treq);
1468  break;
1469 
1470  default:
1472  }
1473 
1475  fr_dlist_insert_tail(&tconn->cancel_sent, treq);
1476 
1477  /*
1478  * De-register for I/O write events
1479  * and register the read events
1480  * to drain the cancel ACKs.
1481  */
1483 }
1484 
1485 /** Cancellation was acked, the request is complete, free it
1486  *
1487  * The API client will not be informed, as the original request_t *
1488  * will likely have been freed by this point.
1489  *
1490  * @note treq will be inviable after a call to this function.
1491  * treq->tconn may be inviable after calling
1492  * if treq->conn and connection_signals_pause is not used.
1493  * This is due to call to trunk_request_remove_from_conn.
1494  *
1495  * @param[in] treq to mark as complete.
1496  */
1498 {
1499  trunk_connection_t *tconn = treq->pub.tconn;
1500  trunk_t *trunk = treq->pub.trunk;
1501 
1502  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1503  if (!fr_cond_assert(!treq->pub.request)) return; /* Only a valid state for request_t * which have been cancelled */
1504 
1505  switch (treq->pub.state) {
1508  break;
1509 
1510  default:
1512  }
1513 
1515 
1517  trunk_request_free(&treq); /* Free the request */
1518 }
1519 
1520 /** Request completed successfully, inform the API client and free the request
1521  *
1522  * @note treq will be inviable after a call to this function.
1523  * treq->tconn may also be inviable due to call to
1524  * trunk_request_remove_from_conn.
1525  *
1526  * @param[in] treq to mark as complete.
1527  */
1529 {
1530  trunk_connection_t *tconn = treq->pub.tconn;
1531  trunk_t *trunk = treq->pub.trunk;
1532 
1533  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1534 
1535  switch (treq->pub.state) {
1540  break;
1541 
1542  default:
1544  }
1545 
1547  DO_REQUEST_COMPLETE(treq);
1548  trunk_request_free(&treq); /* Free the request */
1549 }
1550 
1551 /** Request failed, inform the API client and free the request
1552  *
1553  * @note treq will be inviable after a call to this function.
1554  * treq->tconn may also be inviable due to call to
1555  * trunk_request_remove_from_conn.
1556  *
1557  * @param[in] treq to mark as failed.
1558  */
1560 {
1561  trunk_connection_t *tconn = treq->pub.tconn;
1562  trunk_t *trunk = treq->pub.trunk;
1563  trunk_request_state_t prev = treq->pub.state;
1564 
1565  if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1566 
1567  switch (treq->pub.state) {
1570  break;
1571 
1572  default:
1574  break;
1575  }
1576 
1578  DO_REQUEST_FAIL(treq, prev);
1579  trunk_request_free(&treq); /* Free the request */
1580 }
1581 
1582 /** Check to see if a trunk request can be enqueued
1583  *
1584  * @param[out] tconn_out Connection the request may be enqueued on.
1585  * @param[in] trunk To enqueue requests on.
1586  * @param[in] request associated with the treq (if any).
1587  * @return
1588  * - TRUNK_ENQUEUE_OK caller should enqueue request on provided tconn.
1589  * - TRUNK_ENQUEUE_IN_BACKLOG Request should be queued in the backlog.
1590  * - TRUNK_ENQUEUE_NO_CAPACITY Unable to enqueue request as we have no spare
1591  * connections or backlog space.
1592  * - TRUNK_ENQUEUE_DST_UNAVAILABLE Can't enqueue because the destination is
1593  * unreachable.
1594  */
1596  request_t *request)
1597 {
1598  trunk_connection_t *tconn;
1599  /*
1600  * If we have an active connection then
1601  * return that.
1602  */
1603  tconn = fr_minmax_heap_min_peek(trunk->active);
1604  if (tconn) {
1605  *tconn_out = tconn;
1606  return TRUNK_ENQUEUE_OK;
1607  }
1608 
1609  /*
1610  * Unlike the connection pool, we don't need
1611  * to drive any internal processes by feeding
1612  * it requests.
1613  *
1614  * If the last event to occur was a failure
1615  * we refuse to enqueue new requests until
1616  * one or more connections comes online.
1617  */
1618  if (!trunk->conf.backlog_on_failed_conn &&
1619  fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
1620  fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed)) {
1622  RWARN, WARN, "Refusing to enqueue requests - "
1623  "No active connections and last event was a connection failure");
1624 
1626  }
1627 
1628 
1629  /*
1630  * Only enforce if we're limiting maximum
1631  * number of connections, and maximum
1632  * number of requests per connection.
1633  *
1634  * The alloc function also checks this
1635  * which is why this is only done for
1636  * debug builds.
1637  */
1638  if (trunk->conf.max_req_per_conn && trunk->conf.max) {
1639  uint64_t limit;
1640 
1641  limit = trunk->conf.max * (uint64_t)trunk->conf.max_req_per_conn;
1642  if (limit > 0) {
1643  uint64_t total_reqs;
1644 
1645  total_reqs = trunk_request_count_by_state(trunk, TRUNK_CONN_ALL,
1647  if (total_reqs >= (limit + trunk->conf.max_backlog)) {
1649  RWARN, WARN, "Refusing to alloc requests - "
1650  "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
1651  "plus %u backlog requests reached",
1652  limit, trunk->conf.max, trunk->conf.max_req_per_conn,
1653  trunk->conf.max_backlog);
1655  }
1656  }
1657  }
1658 
1659  return TRUNK_ENQUEUE_IN_BACKLOG;
1660 }
1661 
1662 /** Enqueue a request which has never been assigned to a connection or was previously cancelled
1663  *
1664  * @param[in] treq to re enqueue. Must have been removed
1665  * from its existing connection with
1666  * #trunk_connection_requests_dequeue.
1667  * @return
1668  * - TRUNK_ENQUEUE_OK Request was re-enqueued.
1669  * - TRUNK_ENQUEUE_NO_CAPACITY Request enqueueing failed because we're at capacity.
1670  * - TRUNK_ENQUEUE_DST_UNAVAILABLE Enqueuing failed for some reason.
1671  * Usually because the connection to the resource is down.
1672  */
1674 {
1675  trunk_t *trunk = treq->pub.trunk;
1676  trunk_connection_t *tconn = NULL;
1677  trunk_enqueue_t ret;
1678 
1679  /*
1680  * Must *NOT* still be assigned to another connection
1681  */
1682  fr_assert(!treq->pub.tconn);
1683 
1684  ret = trunk_request_check_enqueue(&tconn, trunk, treq->pub.request);
1685  switch (ret) {
1686  case TRUNK_ENQUEUE_OK:
1687  if (trunk->conf.always_writable) {
1689  trunk_request_enter_pending(treq, tconn, false);
1692  } else {
1693  trunk_request_enter_pending(treq, tconn, false);
1694  }
1695  break;
1696 
1698  /*
1699  * No more connections and request
1700  * is already in the backlog.
1701  *
1702  * Signal our caller it should stop
1703  * trying to drain the backlog.
1704  */
1706  trunk_request_enter_backlog(treq, false);
1707  break;
1708 
1709  default:
1710  break;
1711  }
1712 
1713  return ret;
1714 }
1715 
1716 /** Shift requests in the specified states onto new connections
1717  *
1718  * This function will blindly dequeue any requests in the specified state and get
1719  * them back to the unassigned state, cancelling any sent or partially sent requests.
1720  *
1721  * This function does not check that dequeuing a request in a particular state is a
1722  * sane or sensible thing to do, that's up to the caller!
1723  *
1724  * @param[out] out A list to insert the newly dequeued and unassigned
1725  * requests into.
1726  * @param[in] tconn to dequeue requests from.
1727  * @param[in] states Dequeue request in these states.
1728  * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
1729  */
1731  int states, uint64_t max)
1732 {
1733  trunk_request_t *treq;
1734  uint64_t count = 0;
1735 
1736  if (max == 0) max = UINT64_MAX;
1737 
1738 #define OVER_MAX_CHECK if (++count > max) return (count - 1)
1739 
1740 #define DEQUEUE_ALL(_src_list, _state) do { \
1741  while ((treq = fr_dlist_head(_src_list))) { \
1742  OVER_MAX_CHECK; \
1743  fr_assert(treq->pub.state == (_state)); \
1744  trunk_request_enter_unassigned(treq); \
1745  fr_dlist_insert_tail(out, treq); \
1746  } } while (0)
1747 
1748  /*
1749  * Don't need to do anything with
1750  * cancellation requests.
1751  */
1752  if (states & TRUNK_REQUEST_STATE_CANCEL) DEQUEUE_ALL(&tconn->cancel,
1754 
1755  /*
1756  * ...same with cancel inform
1757  */
1760 
1761  /*
1762  * ....same with cancel partial
1763  */
1764  if (states & TRUNK_REQUEST_STATE_CANCEL_PARTIAL) {
1766  treq = tconn->cancel_partial;
1767  if (treq) {
1770  fr_dlist_insert_tail(out, treq);
1771  }
1772  }
1773 
1774  /*
1775  * ...and pending.
1776  */
1777  if (states & TRUNK_REQUEST_STATE_PENDING) {
1778  while ((treq = fr_heap_peek(tconn->pending))) {
1782  fr_dlist_insert_tail(out, treq);
1783  }
1784  }
1785 
1786  /*
1787  * Cancel partially sent requests
1788  */
1789  if (states & TRUNK_REQUEST_STATE_PARTIAL) {
1791  treq = tconn->partial;
1792  if (treq) {
1794 
1795  /*
1796  * Don't allow the connection to change state whilst
1797  * we're draining requests from it.
1798  */
1802  fr_dlist_insert_tail(out, treq);
1804  }
1805  }
1806 
1807  /*
1808  * Cancel sent requests
1809  */
1810  if (states & TRUNK_REQUEST_STATE_SENT) {
1811  /*
1812  * Don't allow the connection to change state whilst
1813  * we're draining requests from it.
1814  */
1816  while ((treq = fr_dlist_head(&tconn->sent))) {
1819 
1822  fr_dlist_insert_tail(out, treq);
1823  }
1825  }
1826 
1827  return count;
1828 }
1829 
1830 /** Remove requests in specified states from a connection, attempting to distribute them to new connections
1831  *
1832  * @param[in] tconn To remove requests from.
1833  * @param[in] states One or more states or'd together.
1834  * @param[in] max The maximum number of requests to dequeue.
1835  * 0 for unlimited.
1836  * @param[in] fail_bound If true causes any requests bound to the connection to fail.
1837  * If false bound requests will not be moved.
1838  *
1839  * @return the number of requests re-queued.
1840  */
1841 static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
1842 {
1843  trunk_t *trunk = tconn->pub.trunk;
1844  fr_dlist_head_t to_process;
1845  trunk_request_t *treq = NULL;
1846  uint64_t moved = 0;
1847 
1848  if (max == 0) max = UINT64_MAX;
1849 
1850  fr_dlist_talloc_init(&to_process, trunk_request_t, entry);
1851 
1852  /*
1853  * Prevent the connection changing state whilst we're
1854  * working with it.
1855  *
1856  * There's a user callback that can be called by
1857  * trunk_request_enqueue_existing which can reconnect
1858  * the connection.
1859  */
1861 
1862  /*
1863  * Remove non-cancelled requests from the connection
1864  */
1865  moved += trunk_connection_requests_dequeue(&to_process, tconn, states & ~TRUNK_REQUEST_STATE_CANCEL_ALL, max);
1866 
1867  /*
1868  * Prevent requests being requeued on the same trunk
1869  * connection, which would break rebalancing.
1870  *
1871  * This is a bit of a hack, but nothing should test
1872  * for connection/list consistency in this code,
1873  * and if something is added later, it'll be flagged
1874  * by the tests.
1875  */
1876  if (tconn->pub.state == TRUNK_CONN_ACTIVE) {
1877  int ret;
1878 
1879  ret = fr_minmax_heap_extract(trunk->active, tconn);
1880  if (!fr_cond_assert_msg(ret == 0,
1881  "Failed extracting conn from active heap: %s", fr_strerror())) goto done;
1882 
1883  }
1884 
1885  /*
1886  * Loop over all the requests we gathered and
1887  * redistribute them to new connections.
1888  */
1889  while ((treq = fr_dlist_next(&to_process, treq))) {
1890  trunk_request_t *prev;
1891 
1892  prev = fr_dlist_remove(&to_process, treq);
1893 
1894  /*
1895  * Attempts to re-queue a request
1896  * that's bound to a connection
1897  * results in a failure.
1898  */
1899  if (treq->bound_to_conn) {
1900  if (fail_bound || !IS_SERVICEABLE(tconn)) {
1902  } else {
1903  trunk_request_enter_pending(treq, tconn, false);
1904  }
1905  goto next;
1906  }
1907 
1908  switch (trunk_request_enqueue_existing(treq)) {
1909  case TRUNK_ENQUEUE_OK:
1910  break;
1911 
1912  /*
1913  * A connection failed, and
1914  * there's no other connections
1915  * available to deal with the
1916  * load, it's been placed back
1917  * in the backlog.
1918  */
1920  break;
1921 
1922  /*
1923  * If we fail to re-enqueue then
1924  * there's nothing to do except
1925  * fail the request.
1926  */
1929  case TRUNK_ENQUEUE_FAIL:
1931  break;
1932  }
1933  next:
1934  treq = prev;
1935  }
1936 
1937  /*
1938  * Add the connection back into the active list
1939  */
1940  if (tconn->pub.state == TRUNK_CONN_ACTIVE) {
1941  int ret;
1942 
1943  ret = fr_minmax_heap_insert(trunk->active, tconn);
1944  if (!fr_cond_assert_msg(ret == 0,
1945  "Failed re-inserting conn into active heap: %s", fr_strerror())) goto done;
1946  }
1947  if (moved >= max) goto done;
1948 
1949  /*
1950  * Deal with the cancelled requests specially we can't
1951  * queue them up again as they were only valid on that
1952  * specific connection.
1953  *
1954  * We just need to run them to completion which, as
1955  * they should already be in the unassigned state,
1956  * just means freeing them.
1957  */
1958  moved += trunk_connection_requests_dequeue(&to_process, tconn,
1959  states & TRUNK_REQUEST_STATE_CANCEL_ALL, max - moved);
1960  while ((treq = fr_dlist_next(&to_process, treq))) {
1961  trunk_request_t *prev;
1962 
1963  prev = fr_dlist_remove(&to_process, treq);
1964  trunk_request_free(&treq);
1965  treq = prev;
1966  }
1967 
1968 done:
1969 
1970  /*
1971  * Always re-calculate the request/connection
1972  * ratio at the end.
1973  *
1974  * This avoids having the state transition
1975  * functions do it.
1976  *
1977  * The ratio would be wrong when they calculated
1978  * it anyway, because a bunch of requests are
1979  * dequeued from the connection and temporarily
1980  * cease to exist from the perspective of the
1981  * trunk_requests_per_connection code.
1982  */
1983  trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1984 
1986  return moved;
1987 }
1988 
1989 /** Move requests off of a connection and requeue elsewhere
1990  *
1991  * @note We don't re-queue on draining or draining to free, as requests should have already been
1992  * moved off of the connection. It's also dangerous as the trunk management code main
1993  * clean up a connection in this state when it's run on re-queue, and then the caller
1994  * may try and access a now freed connection.
1995  *
1996  * @param[in] tconn to move requests off of.
1997  * @param[in] states Only move requests in this state.
1998  * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
1999  * @param[in] fail_bound If true causes any requests bound to the connection to fail.
2000  * If false bound requests will not be moved.
2001  * @return The number of requests requeued.
2002  */
2003 uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
2004 {
2005  switch (tconn->pub.state) {
2006  case TRUNK_CONN_ACTIVE:
2007  case TRUNK_CONN_FULL:
2008  case TRUNK_CONN_INACTIVE:
2009  return trunk_connection_requests_requeue_priv(tconn, states, max, fail_bound);
2010 
2011  default:
2012  return 0;
2013  }
2014 }
2015 
2016 /** Signal a partial write
2017  *
2018  * Where there's high load, and the outbound write buffer is full
2019  *
2020  * @param[in] treq to signal state change for.
2021  */
2023 {
2024  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2025 
2027  "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2028 
2029  switch (treq->pub.state) {
2032  break;
2033 
2034  default:
2035  return;
2036  }
2037 }
2038 
2039 /** Signal that the request was written to a connection successfully
2040  *
2041  * @param[in] treq to signal state change for.
2042  */
2044 {
2045  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2046 
2048  "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2049 
2050  switch (treq->pub.state) {
2054  break;
2055 
2056  default:
2057  return;
2058  }
2059 }
2060 
2061 /** Signal that the request was written to a connection successfully, but no response is expected
2062  *
2063  * @param[in] treq to signal state change for.
2064  */
2066 {
2067  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2068 
2070  "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2071 
2072  switch (treq->pub.state) {
2076  break;
2077 
2078  default:
2079  return;
2080  }
2081 }
2082 
2083 /** Signal that a trunk request is complete
2084  *
2085  * The API client will be informed that the request is now complete.
2086  */
2088 {
2089  trunk_t *trunk = treq->pub.trunk;
2090 
2091  if (!fr_cond_assert_msg(trunk, "treq not associated with trunk")) return;
2092 
2093  /*
2094  * We assume that if the request is being signalled
2095  * as complete from the demux function, that it was
2096  * a successful read.
2097  *
2098  * If this assumption turns out to be incorrect
2099  * then we need to add an argument to signal_complete
2100  * to indicate if this is a successful read.
2101  */
2102  if (IN_REQUEST_DEMUX(trunk)) trunk->pub.last_read_success = fr_time();
2103 
2104  switch (treq->pub.state) {
2106  case TRUNK_REQUEST_STATE_PENDING: /* Got immediate response, i.e. cached */
2109  break;
2110 
2111  default:
2112  return;
2113  }
2114 }
2115 
2116 /** Signal that a trunk request failed
2117  *
2118  * The API client will be informed that the request has failed.
2119  */
2121 {
2122  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2123 
2125 }
2126 
2127 /** Cancel a trunk request
2128  *
2129  * treq can be in any state, but requests to cancel if the treq is not in
2130  * the TRUNK_REQUEST_STATE_PARTIAL or TRUNK_REQUEST_STATE_SENT state will be ignored.
2131  *
2132  * The complete or failed callbacks will not be called here, as it's assumed the request_t *
2133  * is now inviable as it's being cancelled.
2134  *
2135  * The free function however, is called, and that should be used to perform necessary
2136  * cleanup.
2137  *
2138  * @param[in] treq to signal state change for.
2139  */
2141 {
2142  trunk_t *trunk;
2143 
2144  /*
2145  * Ensure treq hasn't been freed
2146  */
2147  (void)talloc_get_type_abort(treq, trunk_request_t);
2148 
2149  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2150 
2151  if (!fr_cond_assert_msg(!IN_HANDLER(treq->pub.trunk),
2152  "%s cannot be called within a handler", __FUNCTION__)) return;
2153 
2154  trunk = treq->pub.trunk;
2155 
2156  switch (treq->pub.state) {
2157  /*
2158  * We don't call the complete or failed callbacks
2159  * as the request and rctx are no longer viable.
2160  */
2163  {
2164  trunk_connection_t *tconn = treq->pub.tconn;
2165 
2166  /*
2167  * Don't allow connection state changes
2168  */
2172  "Bad state %s after cancellation",
2173  fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"))) {
2175  return;
2176  }
2177  /*
2178  * No cancel muxer. We're done.
2179  *
2180  * If we do have a cancel mux function,
2181  * the next time this connection becomes
2182  * writable, we'll call the cancel mux
2183  * function.
2184  *
2185  * We don't run the complete or failed
2186  * callbacks here as the request is
2187  * being cancelled.
2188  */
2189  if (!trunk->funcs.request_cancel_mux) {
2191  trunk_request_free(&treq);
2192  }
2194  }
2195  break;
2196 
2197  /*
2198  * We're already in the process of cancelling a
2199  * request, so ignore duplicate signals.
2200  */
2205  break;
2206 
2207  /*
2208  * For any other state, we just release the request
2209  * from its current connection and free it.
2210  */
2211  default:
2213  trunk_request_free(&treq);
2214  break;
2215  }
2216 }
2217 
2218 /** Signal a partial cancel write
2219  *
2220  * Where there's high load, and the outbound write buffer is full
2221  *
2222  * @param[in] treq to signal state change for.
2223  */
2225 {
2226  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2227 
2229  "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2230 
2231  switch (treq->pub.state) {
2234  break;
2235 
2236  default:
2237  return;
2238  }
2239 }
2240 
2241 /** Signal that a remote server has been notified of the cancellation
2242  *
2243  * Called from request_cancel_mux to indicate that the datastore has been informed
2244  * that the response is no longer needed.
2245  *
2246  * @param[in] treq to signal state change for.
2247  */
2249 {
2250  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2251 
2253  "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2254 
2255  switch (treq->pub.state) {
2259  break;
2260 
2261  default:
2262  break;
2263  }
2264 }
2265 
2266 /** Signal that a remote server acked our cancellation
2267  *
2268  * Called from request_demux to indicate that it got an ack for the cancellation.
2269  *
2270  * @param[in] treq to signal state change for.
2271  */
2273 {
2274  if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2275 
2277  "%s can only be called from within request_demux or request_cancel_mux handlers",
2278  __FUNCTION__)) return;
2279 
2280  switch (treq->pub.state) {
2282  /*
2283  * This is allowed, as we may not need to wait
2284  * for the database to ACK our cancellation
2285  * request.
2286  *
2287  * Note: TRUNK_REQUEST_STATE_CANCEL_PARTIAL
2288  * is not allowed here, as that'd mean we'd half
2289  * written the cancellation request out to the
2290  * socket, and then decided to abandon it.
2291  *
2292  * That'd leave the socket in an unusable state.
2293  */
2296  break;
2297 
2298  default:
2299  break;
2300  }
2301 }
2302 
2303 /** If the trunk request is freed then update the target requests
2304  *
2305  * gperftools showed calling the request free function directly was slightly faster
2306  * than using talloc_free.
2307  *
2308  * @param[in] treq_to_free request.
2309  */
2311 {
2312  trunk_request_t *treq = *treq_to_free;
2313  trunk_t *trunk = treq->pub.trunk;
2314 
2315  if (unlikely(!treq)) return;
2316 
2317  /*
2318  * The only valid states a trunk request can be
2319  * freed from.
2320  */
2321  switch (treq->pub.state) {
2327  break;
2328 
2329  default:
2330  if (!fr_cond_assert(0)) return;
2331  }
2332 
2333  /*
2334  * Zero out the pointer to prevent double frees
2335  */
2336  *treq_to_free = NULL;
2337 
2338  /*
2339  * Call the API client callback to free
2340  * any associated memory.
2341  */
2342  DO_REQUEST_FREE(treq);
2343 
2344  /*
2345  * Update the last above/below target stats
2346  * We only do this when we alloc or free
2347  * connections, or on connection
2348  * state changes.
2349  */
2350  trunk_requests_per_connection(NULL, NULL, treq->pub.trunk, fr_time(), false);
2351 
2352  /*
2353  * This tracks the total number of requests
2354  * allocated and not freed or returned to
2355  * the free list.
2356  */
2357  if (fr_cond_assert(trunk->pub.req_alloc > 0)) trunk->pub.req_alloc--;
2358 
2359  /*
2360  * No cleanup delay, means cleanup immediately
2361  */
2364 
2365 #ifndef NDEBUG
2366  /*
2367  * Ensure anything parented off the treq
2368  * is freed. We do this to trigger
2369  * the destructors for the log entries.
2370  */
2371  talloc_free_children(treq);
2372 
2373  /*
2374  * State log should now be empty as entries
2375  * remove themselves from the dlist
2376  * on free.
2377  */
2379  "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2380 #endif
2381 
2382  talloc_free(treq);
2383  return;
2384  }
2385 
2386  /*
2387  * Ensure anything parented off the treq
2388  * is freed.
2389  */
2390  talloc_free_children(treq);
2391 
2392 #ifndef NDEBUG
2393  /*
2394  * State log should now be empty as entries
2395  * remove themselves from the dlist
2396  * on free.
2397  */
2399  "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2400 #endif
2401 
2402  /*
2403  *
2404  * Return the trunk request back to the init state.
2405  */
2406  *treq = (trunk_request_t){
2407  .pub = {
2409  .trunk = treq->pub.trunk,
2410  },
2411  .cancel_reason = TRUNK_CANCEL_REASON_NONE,
2412  .last_freed = fr_time(),
2413 #ifndef NDEBUG
2414  .log = treq->log /* Keep the list head, to save reinitialisation */
2415 #endif
2416  };
2417 
2418  /*
2419  * Insert at the head, so that we can free
2420  * requests that have been unused for N
2421  * seconds from the tail.
2422  */
2423  fr_dlist_insert_tail(&trunk->free_requests, treq);
2424 }
2425 
2426 /** Actually free the trunk request
2427  *
2428  */
2430 {
2431  trunk_t *trunk = treq->pub.trunk;
2432 
2433  switch (treq->pub.state) {
2436  break;
2437 
2438  default:
2439  fr_assert(0);
2440  break;
2441  }
2442 
2443  fr_dlist_remove(&trunk->free_requests, treq);
2444 
2445  return 0;
2446 }
2447 
2448 /** (Pre-)Allocate a new trunk request
2449  *
2450  * If trunk->conf.req_pool_headers or trunk->conf.req_pool_size are not zero then the
2451  * request will be a talloc pool, which can be used to hold the preq.
2452  *
2453  * @note Do not use MEM to check the result of this allocated as it may fail for
2454  * non-fatal reasons.
2455  *
2456  * @param[in] trunk to add request to.
2457  * @param[in] request to wrap in a trunk request (treq).
2458  * @return
2459  * - A newly allocated request.
2460  * - NULL if too many requests are allocated.
2461  */
2463 {
2464  trunk_request_t *treq;
2465 
2466  /*
2467  * The number of treqs currently allocated
2468  * exceeds the maximum number allowed.
2469  */
2470  if (trunk->conf.max_req_per_conn && trunk->conf.max) {
2471  uint64_t limit;
2472 
2473  limit = (uint64_t) trunk->conf.max_req_per_conn * trunk->conf.max;
2474  if (trunk->pub.req_alloc >= (limit + trunk->conf.max_backlog)) {
2476  RWARN, WARN, "Refusing to alloc requests - "
2477  "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
2478  "plus %u backlog requests reached",
2479  limit, trunk->conf.max, trunk->conf.max_req_per_conn,
2480  trunk->conf.max_backlog);
2481  return NULL;
2482  }
2483  }
2484 
2485  /*
2486  * Allocate or reuse an existing request
2487  */
2488  treq = fr_dlist_head(&trunk->free_requests);
2489  if (treq) {
2490  fr_dlist_remove(&trunk->free_requests, treq);
2492  fr_assert(treq->pub.trunk == trunk);
2493  fr_assert(treq->pub.tconn == NULL);
2496  trunk->pub.req_alloc_reused++;
2497  } else {
2499  trunk->conf.req_pool_headers, trunk->conf.req_pool_size));
2500  talloc_set_destructor(treq, _trunk_request_free);
2501 
2502  *treq = (trunk_request_t){
2503  .pub = {
2505  .trunk = trunk
2506  },
2507  .cancel_reason = TRUNK_CANCEL_REASON_NONE
2508  };
2509  trunk->pub.req_alloc_new++;
2510 #ifndef NDEBUG
2512 #endif
2513  }
2514 
2515  trunk->pub.req_alloc++;
2517  /* heap_id - initialised when treq inserted into pending */
2518  /* list - empty */
2519  /* preq - populated later */
2520  /* rctx - populated later */
2521  treq->pub.request = request;
2522 
2523  return treq;
2524 }
2525 
2526 /** Enqueue a request that needs data written to the trunk
2527  *
2528  * When a request_t * needs to make an asynchronous request to an external datastore
2529  * it should call this function, specifying a preq (protocol request) containing
2530  * the data necessary to request information from the external datastore, and an
2531  * rctx (resume ctx) used to hold the decoded response and/or any error codes.
2532  *
2533  * After a treq is successfully enqueued it will either be assigned immediately
2534  * to the pending queue of a connection, or if no connections are available,
2535  * (depending on the trunk configuration) the treq will be placed in the trunk's
2536  * global backlog.
2537  *
2538  * After receiving a positive return code from this function the caller should
2539  * immediately yield, to allow the various timers and I/O handlers that drive tconn
2540  * (trunk connection) and treq state changes to be called.
2541  *
2542  * When a tconn becomes writable (or the trunk is configured to be always writable)
2543  * the #trunk_request_mux_t callback will be called to dequeue, encode and
2544  * send any pending requests for that tconn. The #trunk_request_mux_t callback
2545  * is also responsible for tracking the outbound requests to allow the
2546  * #trunk_request_demux_t callback to match inbound responses with the original
2547  * treq. Once the #trunk_request_mux_t callback is done processing the treq
2548  * it signals what state the treq should enter next using one of the
2549  * trunk_request_signal_* functions.
2550  *
2551  * When a tconn becomes readable the user specified #trunk_request_demux_t
2552  * callback is called to process any responses, match them with the original treq.
2553  * and signal what state they should enter next using one of the
2554  * trunk_request_signal_* functions.
2555  *
2556  * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2557  * is NULL, a new treq will be allocated.
2558  * Otherwise treq should point to memory allocated
2559  * with trunk_request_alloc.
2560  * @param[in] trunk to enqueue request on.
2561  * @param[in] request to enqueue.
2562  * @param[in] preq Protocol request to write out. Will be freed when
2563  * treq is freed. Should ideally be parented by the
2564  * treq if possible.
2565  * Use #trunk_request_alloc for pre-allocation of
2566  * the treq.
2567  * @param[in] rctx The resume context to write any result to.
2568  * @return
2569  * - TRUNK_ENQUEUE_OK.
2570  * - TRUNK_ENQUEUE_IN_BACKLOG.
2571  * - TRUNK_ENQUEUE_NO_CAPACITY.
2572  * - TRUNK_ENQUEUE_DST_UNAVAILABLE
2573  * - TRUNK_ENQUEUE_FAIL
2574  */
2576  request_t *request, void *preq, void *rctx)
2577 {
2578  trunk_connection_t *tconn = NULL;
2579  trunk_request_t *treq;
2580  trunk_enqueue_t ret;
2581 
2582  if (!fr_cond_assert_msg(!IN_HANDLER(trunk),
2583  "%s cannot be called within a handler", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2584 
2585  if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == TRUNK_REQUEST_STATE_INIT),
2586  "%s requests must be in \"init\" state", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2587 
2588  /*
2589  * If delay_start was set, we may need
2590  * to insert the timer for the connection manager.
2591  */
2592  if (unlikely(!trunk->started)) {
2593  if (trunk_start(trunk) < 0) return TRUNK_ENQUEUE_FAIL;
2594  }
2595 
2596  ret = trunk_request_check_enqueue(&tconn, trunk, request);
2597  switch (ret) {
2598  case TRUNK_ENQUEUE_OK:
2599  if (*treq_out) {
2600  treq = *treq_out;
2601  } else {
2602  *treq_out = treq = trunk_request_alloc(trunk, request);
2603  if (!treq) return TRUNK_ENQUEUE_FAIL;
2604  }
2605  treq->pub.preq = preq;
2606  treq->pub.rctx = rctx;
2607  if (trunk->conf.always_writable) {
2609  trunk_request_enter_pending(treq, tconn, true);
2612  } else {
2613  trunk_request_enter_pending(treq, tconn, true);
2614  }
2615  break;
2616 
2618  if (*treq_out) {
2619  treq = *treq_out;
2620  } else {
2621  *treq_out = treq = trunk_request_alloc(trunk, request);
2622  if (!treq) return TRUNK_ENQUEUE_FAIL;
2623  }
2624  treq->pub.preq = preq;
2625  treq->pub.rctx = rctx;
2626  trunk_request_enter_backlog(treq, true);
2627  break;
2628 
2629  default:
2630  /*
2631  * If a trunk request was provided
2632  * populate the preq and rctx fields
2633  * so that if it's freed with
2634  * trunk_request_free, the free
2635  * function works as intended.
2636  */
2637  if (*treq_out) {
2638  treq = *treq_out;
2639  treq->pub.preq = preq;
2640  treq->pub.rctx = rctx;
2641  }
2642  return ret;
2643  }
2644 
2645  return ret;
2646 }
2647 
2648 /** Re-enqueue a request on the same connection
2649  *
2650  * If the treq has been sent, we assume that we're being signalled to requeue
2651  * because something outside of the trunk API has determined that a retransmission
2652  * is required. The easiest way to perform that retransmission is to clean up
2653  * any tracking information for the request, and the requeue it for transmission.
2654  *
2655  * IF re-queueing fails, the request will enter the fail state. It should not be
2656  * accessed if this occurs.
2657  *
2658  * @param[in] treq to requeue (retransmit).
2659  * @return
2660  * - TRUNK_ENQUEUE_OK.
2661  * - TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2662  * - TRUNK_ENQUEUE_FAIL - Request isn't in a valid state to be reassigned.
2663  */
2665 {
2666  trunk_connection_t *tconn = treq->pub.tconn; /* Existing conn */
2667 
2668  if (!tconn) return TRUNK_ENQUEUE_FAIL;
2669 
2670  if (!IS_PROCESSING(tconn)) {
2673  }
2674 
2675  switch (treq->pub.state) {
2681  trunk_request_enter_pending(treq, tconn, false);
2682  if (treq->pub.trunk->conf.always_writable) {
2684  }
2686  break;
2687 
2688  case TRUNK_REQUEST_STATE_BACKLOG: /* Do nothing.... */
2689  case TRUNK_REQUEST_STATE_PENDING: /* Do nothing.... */
2690  break;
2691 
2692  default:
2694  return TRUNK_ENQUEUE_FAIL;
2695  }
2696 
2697  return TRUNK_ENQUEUE_OK;
2698 }
2699 
2700 /** Enqueue additional requests on a specific connection
2701  *
2702  * This may be used to create a series of requests on a single connection, or to generate
2703  * in-band status checks.
2704  *
2705  * @note If conf->always_writable, then the muxer will be called immediately. The caller
2706  * must be able to handle multiple calls to its muxer gracefully.
2707  *
2708  * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2709  * is NULL, a new treq will be allocated.
2710  * Otherwise treq should point to memory allocated
2711  * with trunk_request_alloc.
2712  * @param[in] tconn to enqueue request on.
2713  * @param[in] request to enqueue.
2714  * @param[in] preq Protocol request to write out. Will be freed when
2715  * treq is freed. Should ideally be parented by the
2716  * treq if possible.
2717  * Use #trunk_request_alloc for pre-allocation of
2718  * the treq.
2719  * @param[in] rctx The resume context to write any result to.
2720  * @param[in] ignore_limits Ignore max_req_per_conn. Useful to force status
2721  * checks through even if the connection is at capacity.
2722  * Will also allow enqueuing on "inactive", "draining",
2723  * "draining-to-free" connections.
2724  * @return
2725  * - TRUNK_ENQUEUE_OK.
2726  * - TRUNK_ENQUEUE_NO_CAPACITY - At max_req_per_conn_limit
2727  * - TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2728  */
2730  request_t *request, void *preq, void *rctx,
2731  bool ignore_limits)
2732 {
2733  trunk_request_t *treq;
2734  trunk_t *trunk = tconn->pub.trunk;
2735 
2736  if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == TRUNK_REQUEST_STATE_INIT),
2737  "%s requests must be in \"init\" state", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2738 
2739  if (!IS_SERVICEABLE(tconn)) return TRUNK_ENQUEUE_DST_UNAVAILABLE;
2740 
2741  /*
2742  * Limits check
2743  */
2744  if (!ignore_limits) {
2745  if (trunk->conf.max_req_per_conn &&
2748 
2749  if (tconn->pub.state != TRUNK_CONN_ACTIVE) return TRUNK_ENQUEUE_NO_CAPACITY;
2750  }
2751 
2752  if (*treq_out) {
2753  treq = *treq_out;
2754  } else {
2755  MEM(*treq_out = treq = trunk_request_alloc(trunk, request));
2756  }
2757 
2758  treq->pub.preq = preq;
2759  treq->pub.rctx = rctx;
2760  treq->bound_to_conn = true; /* Don't let the request be transferred */
2761 
2762  if (trunk->conf.always_writable) {
2764  trunk_request_enter_pending(treq, tconn, true);
2767  } else {
2768  trunk_request_enter_pending(treq, tconn, true);
2769  }
2770 
2771  return TRUNK_ENQUEUE_OK;
2772 }
2773 
2774 #ifndef NDEBUG
2775 /** Used for sanity checks to ensure all log entries have been freed
2776  *
2777  */
2779 {
2780  fr_dlist_remove(slog->log_head, slog);
2781 
2782  return 0;
2783 }
2784 
2785 void trunk_request_state_log_entry_add(char const *function, int line,
2787 {
2788  trunk_request_state_log_t *slog = NULL;
2789 
2791  slog = fr_dlist_head(&treq->log);
2792  fr_assert_msg(slog, "slog list head NULL but element counter was %u",
2793  fr_dlist_num_elements(&treq->log));
2794  (void)fr_dlist_remove(&treq->log, slog); /* Returns NULL when removing the list head */
2795  memset(slog, 0, sizeof(*slog));
2796  } else {
2797  MEM(slog = talloc_zero(treq, trunk_request_state_log_t));
2798  talloc_set_destructor(slog, _state_log_entry_free);
2799  }
2800 
2801  slog->log_head = &treq->log;
2802  slog->from = treq->pub.state;
2803  slog->to = new;
2804  slog->function = function;
2805  slog->line = line;
2806  if (treq->pub.tconn) {
2807  slog->tconn = treq->pub.tconn;
2808  slog->tconn_id = treq->pub.tconn->pub.conn->id;
2809  slog->tconn_state = treq->pub.tconn->pub.state;
2810  }
2811 
2812  fr_dlist_insert_tail(&treq->log, slog);
2813 
2814 }
2815 
2816 void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line,
2817  trunk_request_t const *treq)
2818 {
2819  trunk_request_state_log_t *slog = NULL;
2820 
2821  int i;
2822 
2823  for (slog = fr_dlist_head(&treq->log), i = 0;
2824  slog;
2825  slog = fr_dlist_next(&treq->log, slog), i++) {
2826  fr_log(log, log_type, file, line, "[%u] %s:%i - in conn %"PRIu64" in state %s - %s -> %s",
2827  i, slog->function, slog->line,
2828  slog->tconn_id,
2830  slog->tconn_state, "<INVALID>") : "none",
2831  fr_table_str_by_value(trunk_request_states, slog->from, "<INVALID>"),
2832  fr_table_str_by_value(trunk_request_states, slog->to, "<INVALID>"));
2833  }
2834 }
2835 #endif
2836 
2837 /** Return the count number of connections in the specified states
2838  *
2839  * @param[in] trunk to retrieve counts for.
2840  * @param[in] conn_state One or more #trunk_connection_state_t states or'd together.
2841  * @return The number of connections in the specified states.
2842  */
2844 {
2845  uint16_t count = 0;
2846 
2847  if (conn_state & TRUNK_CONN_INIT) count += fr_dlist_num_elements(&trunk->init);
2848  if (conn_state & TRUNK_CONN_CONNECTING) count += fr_dlist_num_elements(&trunk->connecting);
2849  if (conn_state & TRUNK_CONN_ACTIVE) count += fr_minmax_heap_num_elements(trunk->active);
2850  if (conn_state & TRUNK_CONN_FULL) count += fr_dlist_num_elements(&trunk->full);
2851  if (conn_state & TRUNK_CONN_INACTIVE) count += fr_dlist_num_elements(&trunk->inactive);
2853  if (conn_state & TRUNK_CONN_CLOSED) count += fr_dlist_num_elements(&trunk->closed);
2854  if (conn_state & TRUNK_CONN_DRAINING) count += fr_dlist_num_elements(&trunk->draining);
2856 
2857  return count;
2858 }
2859 
2860 /** Return the count number of requests associated with a trunk connection
2861  *
2862  * @param[in] tconn to return request count for.
2863  * @param[in] req_state One or more request states or'd together.
2864  *
2865  * @return The number of requests in the specified states, associated with a tconn.
2866  */
2868 {
2869  uint32_t count = 0;
2870 
2871  if (req_state & TRUNK_REQUEST_STATE_PENDING) count += fr_heap_num_elements(tconn->pending);
2872  if (req_state & TRUNK_REQUEST_STATE_PARTIAL) count += tconn->partial ? 1 : 0;
2873  if (req_state & TRUNK_REQUEST_STATE_SENT) count += fr_dlist_num_elements(&tconn->sent);
2875  if (req_state & TRUNK_REQUEST_STATE_CANCEL) count += fr_dlist_num_elements(&tconn->cancel);
2876  if (req_state & TRUNK_REQUEST_STATE_CANCEL_PARTIAL) count += tconn->cancel_partial ? 1 : 0;
2878 
2879  return count;
2880 }
2881 
2882 /** Automatically mark a connection as inactive
2883  *
2884  * @param[in] tconn to potentially mark as inactive.
2885  */
2887 {
2888  trunk_t *trunk = tconn->pub.trunk;
2889  uint32_t count;
2890 
2891  if (tconn->pub.state != TRUNK_CONN_ACTIVE) return;
2892 
2893  /*
2894  * Enforces max_req_per_conn
2895  */
2896  if (trunk->conf.max_req_per_conn > 0) {
2899  }
2900 }
2901 
2902 /** Return whether a trunk connection should currently be considered full
2903  *
2904  * @param[in] tconn to check.
2905  * @return
2906  * - true if the connection is full.
2907  * - false if the connection is not full.
2908  */
2910 {
2911  trunk_t *trunk = tconn->pub.trunk;
2912  uint32_t count;
2913 
2914  /*
2915  * Enforces max_req_per_conn
2916  */
2918  if ((trunk->conf.max_req_per_conn == 0) || (count < trunk->conf.max_req_per_conn)) return false;
2919 
2920  return true;
2921 }
2922 
2923 /** Automatically mark a connection as active or reconnect it
2924  *
2925  * @param[in] tconn to potentially mark as active or reconnect.
2926  */
2928 {
2929  if (tconn->pub.state != TRUNK_CONN_FULL) return;
2930 
2931  /*
2932  * Enforces max_req_per_conn
2933  */
2935 }
2936 
2937 /** A connection is readable. Call the request_demux function to read pending requests
2938  *
2939  */
2941 {
2942  trunk_t *trunk = tconn->pub.trunk;
2943 
2944  DO_REQUEST_DEMUX(tconn);
2945 }
2946 
2947 /** A connection is writable. Call the request_mux function to write pending requests
2948  *
2949  */
2951 {
2952  trunk_t *trunk = tconn->pub.trunk;
2953 
2954  /*
2955  * Call the cancel_sent function (if we have one)
2956  * to inform a backend datastore we no longer
2957  * care about the result
2958  */
2962  DO_REQUEST_CANCEL_MUX(tconn);
2963  }
2966  TRUNK_REQUEST_STATE_PARTIAL)) return;
2967  DO_REQUEST_MUX(tconn);
2968 }
2969 
2970 /** Update the registrations for I/O events we're interested in
2971  *
2972  */
2974 {
2975  trunk_t *trunk = tconn->pub.trunk;
2977 
2978  switch (tconn->pub.state) {
2979  /*
2980  * We only register I/O events if the trunk connection is
2981  * in one of these states.
2982  *
2983  * For the other states the trunk shouldn't be processing
2984  * requests.
2985  */
2986  case TRUNK_CONN_ACTIVE:
2987  case TRUNK_CONN_FULL:
2988  case TRUNK_CONN_INACTIVE:
2990  case TRUNK_CONN_DRAINING:
2992  /*
2993  * If the connection is always writable,
2994  * then we don't care about write events.
2995  */
2996  if (!trunk->conf.always_writable &&
3000  (trunk->funcs.request_cancel_mux ?
3004  }
3005 
3008  (trunk->funcs.request_cancel_mux ?
3009  TRUNK_REQUEST_STATE_CANCEL_SENT : 0)) > 0) {
3011  }
3012  break;
3013 
3014  default:
3015  break;
3016  }
3017 
3018  if (tconn->events != events) {
3019  /*
3020  * There may be a fatal error which results
3021  * in the connection being freed.
3022  *
3023  * Stop that from happening until after
3024  * we're done using it.
3025  */
3027  DO_CONNECTION_NOTIFY(tconn, events);
3028  tconn->events = events;
3030  }
3031 }
3032 
3033 /** Remove a trunk connection from whichever list it's currently in
3034  *
3035  * @param[in] tconn to remove.
3036  */
3038 {
3039  trunk_t *trunk = tconn->pub.trunk;
3040 
3041  switch (tconn->pub.state) {
3042  case TRUNK_CONN_ACTIVE:
3043  {
3044  int ret;
3045 
3046  ret = fr_minmax_heap_extract(trunk->active, tconn);
3047  if (!fr_cond_assert_msg(ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) return;
3048  }
3049  return;
3050 
3051  case TRUNK_CONN_INIT:
3052  fr_dlist_remove(&trunk->init, tconn);
3053  break;
3054 
3055  case TRUNK_CONN_CONNECTING:
3056  fr_dlist_remove(&trunk->connecting, tconn);
3057  return;
3058 
3059  case TRUNK_CONN_CLOSED:
3060  fr_dlist_remove(&trunk->closed, tconn);
3061  return;
3062 
3063  case TRUNK_CONN_FULL:
3064  fr_dlist_remove(&trunk->full, tconn);
3065  return;
3066 
3067  case TRUNK_CONN_INACTIVE:
3068  fr_dlist_remove(&trunk->inactive, tconn);
3069  return;
3070 
3072  fr_dlist_remove(&trunk->inactive_draining, tconn);
3073  return;
3074 
3075  case TRUNK_CONN_DRAINING:
3076  fr_dlist_remove(&trunk->draining, tconn);
3077  return;
3078 
3080  fr_dlist_remove(&trunk->draining_to_free, tconn);
3081  return;
3082 
3083  case TRUNK_CONN_HALTED:
3084  return;
3085  }
3086 }
3087 
3088 /** Transition a connection to the full state
3089  *
3090  * Called whenever a trunk connection is at the maximum number of requests.
3091  * Removes the connection from the connected heap, and places it in the full list.
3092  */
3094 {
3095  trunk_t *trunk = tconn->pub.trunk;
3096 
3097  switch (tconn->pub.state) {
3098  case TRUNK_CONN_ACTIVE:
3099  trunk_connection_remove(tconn);
3100  break;
3101 
3102  default:
3104  }
3105 
3106  fr_dlist_insert_head(&trunk->full, tconn);
3108 }
3109 
3110 /** Transition a connection to the inactive state
3111  *
3112  * Called whenever the API client wants to stop new requests being enqueued
3113  * on a trunk connection.
3114  */
3116 {
3117  trunk_t *trunk = tconn->pub.trunk;
3118 
3119  switch (tconn->pub.state) {
3120  case TRUNK_CONN_ACTIVE:
3121  case TRUNK_CONN_FULL:
3122  trunk_connection_remove(tconn);
3123  break;
3124 
3125  default:
3127  }
3128 
3129  fr_dlist_insert_head(&trunk->inactive, tconn);
3131 }
3132 
3133 /** Transition a connection to the inactive-draining state
3134  *
3135  * Called whenever the trunk manager wants to drain an inactive connection
3136  * of its requests.
3137  */
3139 {
3140  trunk_t *trunk = tconn->pub.trunk;
3141 
3142  switch (tconn->pub.state) {
3143  case TRUNK_CONN_INACTIVE:
3144  case TRUNK_CONN_DRAINING:
3145  trunk_connection_remove(tconn);
3146  break;
3147 
3148  default:
3150  }
3151 
3152  fr_dlist_insert_head(&trunk->inactive_draining, tconn);
3154 
3155  /*
3156  * Immediately re-enqueue all pending
3157  * requests, so the connection is drained
3158  * quicker.
3159  */
3161 }
3162 
3163 /** Transition a connection to the draining state
3164  *
3165  * Removes the connection from the active heap so it won't be assigned any new
3166  * connections.
3167  */
3169 {
3170  trunk_t *trunk = tconn->pub.trunk;
3171 
3172  switch (tconn->pub.state) {
3173  case TRUNK_CONN_ACTIVE:
3174  case TRUNK_CONN_FULL:
3175  case TRUNK_CONN_INACTIVE:
3177  trunk_connection_remove(tconn);
3178  break;
3179 
3180  default:
3182  }
3183 
3184  fr_dlist_insert_head(&trunk->draining, tconn);
3186 
3187  /*
3188  * Immediately re-enqueue all pending
3189  * requests, so the connection is drained
3190  * quicker.
3191  */
3193 }
3194 
3195 /** Transition a connection to the draining-to-reconnect state
3196  *
3197  * Removes the connection from the active heap so it won't be assigned any new
3198  * connections.
3199  */
3201 {
3202  trunk_t *trunk = tconn->pub.trunk;
3203 
3204  if (tconn->lifetime_ev) fr_event_timer_delete(&tconn->lifetime_ev);
3205 
3206  switch (tconn->pub.state) {
3207  case TRUNK_CONN_ACTIVE:
3208  case TRUNK_CONN_FULL:
3209  case TRUNK_CONN_INACTIVE:
3211  case TRUNK_CONN_DRAINING:
3212  trunk_connection_remove(tconn);
3213  break;
3214 
3215  default:
3217  }
3218 
3219  fr_dlist_insert_head(&trunk->draining_to_free, tconn);
3221 
3222  /*
3223  * Immediately re-enqueue all pending
3224  * requests, so the connection is drained
3225  * quicker.
3226  */
3228 }
3229 
3230 
3231 /** Transition a connection back to the active state
3232  *
3233  * This should only be called on a connection which is in the full state,
3234  * inactive state, draining state or connecting state.
3235  */
3237 {
3238  trunk_t *trunk = tconn->pub.trunk;
3239  int ret;
3240 
3241  switch (tconn->pub.state) {
3242  case TRUNK_CONN_FULL:
3243  case TRUNK_CONN_INACTIVE:
3245  case TRUNK_CONN_DRAINING:
3246  trunk_connection_remove(tconn);
3247  break;
3248 
3249  case TRUNK_CONN_INIT:
3250  case TRUNK_CONN_CONNECTING:
3251  trunk_connection_remove(tconn);
3253  break;
3254 
3255  default:
3257  }
3258 
3259  ret = fr_minmax_heap_insert(trunk->active, tconn); /* re-insert into the active heap*/
3260  if (!fr_cond_assert_msg(ret == 0, "Failed inserting connection into active heap: %s", fr_strerror())) {
3262  return;
3263  }
3264 
3266 
3267  /*
3268  * Reorder the connections
3269  */
3270  CONN_REORDER(tconn);
3271 
3272  /*
3273  * Rebalance requests
3274  */
3275  trunk_rebalance(trunk);
3276 
3277  /*
3278  * We place requests into the backlog
3279  * because there were no connections
3280  * available to handle them.
3281  *
3282  * If a connection has become active
3283  * chances are those backlogged requests
3284  * can now be enqueued, so try and do
3285  * that now.
3286  *
3287  * If there's requests sitting in the
3288  * backlog indefinitely, it's because
3289  * they were inserted there erroneously
3290  * when there were active connections
3291  * which could have handled them.
3292  */
3293  trunk_backlog_drain(trunk);
3294 }
3295 
3296 /** Connection transitioned to the the init state
3297  *
3298  * Reflect the connection state change in the lists we use to track connections.
3299  *
3300  * @note This function is only called from the connection API as a watcher.
3301  *
3302  * @param[in] conn The connection which changes state.
3303  * @param[in] prev The connection is was in.
3304  * @param[in] state The connection is now in.
3305  * @param[in] uctx The trunk_connection_t wrapping the connection.
3306  */
3310  void *uctx)
3311 {
3312  trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3313  trunk_t *trunk = tconn->pub.trunk;
3314 
3315  switch (tconn->pub.state) {
3316  case TRUNK_CONN_HALTED:
3317  break;
3318 
3319  case TRUNK_CONN_CLOSED:
3320  trunk_connection_remove(tconn);
3321  break;
3322 
3323  default:
3325  }
3326 
3327  fr_dlist_insert_head(&trunk->init, tconn);
3329 }
3330 
3331 /** Connection transitioned to the connecting state
3332  *
3333  * Reflect the connection state change in the lists we use to track connections.
3334  *
3335  * @note This function is only called from the connection API as a watcher.
3336  *
3337  * @param[in] conn The connection which changes state.
3338  * @param[in] prev The connection is was in.
3339  * @param[in] state The connection is now in.
3340  * @param[in] uctx The trunk_connection_t wrapping the connection.
3341  */
3345  void *uctx)
3346 {
3347  trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3348  trunk_t *trunk = tconn->pub.trunk;
3349 
3350  switch (tconn->pub.state) {
3351  case TRUNK_CONN_INIT:
3352  case TRUNK_CONN_CLOSED:
3353  trunk_connection_remove(tconn);
3354  break;
3355 
3356  default:
3358  }
3359 
3360  /*
3361  * If a connection just entered the
3362  * connecting state, it should have
3363  * no requests associated with it.
3364  */
3366 
3367  fr_dlist_insert_head(&trunk->connecting, tconn); /* MUST remain a head insertion for reconnect logic */
3369 }
3370 
3371 /** Connection transitioned to the shutdown state
3372  *
3373  * If we're not already in the draining-to-free state, transition there now.
3374  *
3375  * The idea is that if something signalled the connection to shutdown, we need
3376  * to reflect that by dequeuing any pending requests, not accepting new ones,
3377  * and waiting for the existing requests to complete.
3378  *
3379  * @note This function is only called from the connection API as a watcher.
3380  *
3381  * @param[in] conn The connection which changes state.
3382  * @param[in] prev The connection is was in.
3383  * @param[in] state The connection is now in.
3384  * @param[in] uctx The trunk_connection_t wrapping the connection.
3385  */
3389  void *uctx)
3390 {
3391  trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3392 
3393  switch (tconn->pub.state) {
3394  case TRUNK_CONN_DRAINING_TO_FREE: /* Do Nothing */
3395  return;
3396 
3397  case TRUNK_CONN_ACTIVE: /* Transition to draining-to-free */
3398  case TRUNK_CONN_FULL:
3399  case TRUNK_CONN_INACTIVE:
3401  case TRUNK_CONN_DRAINING:
3402  break;
3403 
3404  case TRUNK_CONN_INIT:
3405  case TRUNK_CONN_CONNECTING:
3406  case TRUNK_CONN_CLOSED:
3407  case TRUNK_CONN_HALTED:
3409  }
3410 
3412 }
3413 
3414 /** Trigger a reconnection of the trunk connection
3415  *
3416  * @param[in] el Event list the timer was inserted into.
3417  * @param[in] now Current time.
3418  * @param[in] uctx The tconn.
3419  */
3421 {
3422  trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3423 
3425 }
3426 
3427 /** Connection transitioned to the connected state
3428  *
3429  * Reflect the connection state change in the lists we use to track connections.
3430  *
3431  * @note This function is only called from the connection API as a watcher.
3432  *
3433  * @param[in] conn The connection which changes state.
3434  * @param[in] prev The connection is was in.
3435  * @param[in] state The connection is now in.
3436  * @param[in] uctx The trunk_connection_t wrapping the connection.
3437  */
3441  void *uctx)
3442 {
3443  trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3444  trunk_t *trunk = tconn->pub.trunk;
3445 
3446  /*
3447  * If a connection was just connected,
3448  * it should have no requests associated
3449  * with it.
3450  */
3452 
3453  /*
3454  * Set here, as the active state can
3455  * be transitioned to from full and
3456  * draining too.
3457  */
3458  trunk->pub.last_connected = fr_time();
3459 
3460  /*
3461  * Insert a timer to reconnect the
3462  * connection periodically.
3463  */
3464  if (fr_time_delta_ispos(trunk->conf.lifetime)) {
3465  if (fr_event_timer_in(tconn, trunk->el, &tconn->lifetime_ev,
3466  trunk->conf.lifetime, _trunk_connection_lifetime_expire, tconn) < 0) {
3467  PERROR("Failed inserting connection reconnection timer event, halting connection");
3469  return;
3470  }
3471  }
3472 
3474 }
3475 
3476 /** Connection failed after it was connected
3477  *
3478  * Reflect the connection state change in the lists we use to track connections.
3479  *
3480  * @note This function is only called from the connection API as a watcher.
3481  *
3482  * @param[in] conn The connection which changes state.
3483  * @param[in] prev The connection is was in.
3484  * @param[in] state The connection is now in.
3485  * @param[in] uctx The trunk_connection_t wrapping the connection.
3486  */
3490  void *uctx)
3491 {
3492  trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3493  trunk_t *trunk = tconn->pub.trunk;
3494  bool need_requeue = false;
3495 
3496  switch (tconn->pub.state) {
3497  case TRUNK_CONN_ACTIVE:
3498  case TRUNK_CONN_FULL:
3499  case TRUNK_CONN_INACTIVE:
3501  case TRUNK_CONN_DRAINING:
3503  need_requeue = true;
3504  trunk_connection_remove(tconn);
3505  break;
3506 
3507  case TRUNK_CONN_INIT: /* Initialisation failed */
3508  case TRUNK_CONN_CONNECTING:
3509  trunk_connection_remove(tconn);
3511  break;
3512 
3513  case TRUNK_CONN_CLOSED:
3514  case TRUNK_CONN_HALTED: /* Can't move backwards? */
3516  }
3517 
3518  fr_dlist_insert_head(&trunk->closed, tconn); /* MUST remain a head insertion for reconnect logic */
3520 
3521  /*
3522  * Now *AFTER* the connection has been
3523  * removed from the active, pool
3524  * re-enqueue the requests.
3525  */
3526  if (need_requeue) trunk_connection_requests_requeue_priv(tconn, TRUNK_REQUEST_STATE_ALL, 0, true);
3527 
3528  /*
3529  * There should be no requests left on this
3530  * connection. They should have all been
3531  * moved off or failed.
3532  */
3534 
3535  /*
3536  * Clear statistics and flags
3537  */
3538  tconn->sent_count = 0;
3539 
3540  /*
3541  * Remove the reconnect event
3542  */
3544 
3545  /*
3546  * Remove the I/O events
3547  */
3549 }
3550 
3551 /** Connection failed
3552  *
3553  * @param[in] conn The connection which changes state.
3554  * @param[in] prev The connection is was in.
3555  * @param[in] state The connection is now in.
3556  * @param[in] uctx The trunk_connection_t wrapping the connection.
3557  */
3559  connection_state_t prev,
3561  void *uctx)
3562 {
3563  trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3564  trunk_t *trunk = tconn->pub.trunk;
3565 
3566  /*
3567  * Need to set this first as it
3568  * determines whether requests are
3569  * re-queued or fail outright.
3570  */
3571  trunk->pub.last_failed = fr_time();
3572 
3573  /*
3574  * Failed in the init state, transition the
3575  * connection to closed, else we get an
3576  * INIT -> INIT transition which triggers
3577  * an assert.
3578  */
3579  if (prev == CONNECTION_STATE_INIT) _trunk_connection_on_closed(conn, prev, state, uctx);
3580 
3581  /*
3582  * See what the state of the trunk is
3583  * if there are no connections that could
3584  * potentially accept requests in the near
3585  * future, then fail all the requests in the
3586  * trunk backlog.
3587  */
3588  if ((state == CONNECTION_STATE_CONNECTED) &&
3591  TRUNK_CONN_FULL |
3592  TRUNK_CONN_DRAINING)) == 0)) trunk_backlog_drain(trunk);
3593 }
3594 
3595 /** Connection transitioned to the halted state
3596  *
3597  * Remove the connection remove all lists, as it's likely about to be freed.
3598  *
3599  * Setting the trunk back to the init state ensures that if the code is ever
3600  * refactored and #connection_signal_reconnect is used after a connection
3601  * is halted, then everything is maintained in a valid state.
3602  *
3603  * @note This function is only called from the connection API as a watcher.
3604  *
3605  * @param[in] conn The connection which changes state.
3606  * @param[in] prev The connection is was in.
3607  * @param[in] state The connection is now in.
3608  * @param[in] uctx The trunk_connection_t wrapping the connection.
3609  */
3613  void *uctx)
3614 {
3615  trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3616  trunk_t *trunk = tconn->pub.trunk;
3617 
3618  switch (tconn->pub.state) {
3619  case TRUNK_CONN_INIT:
3620  case TRUNK_CONN_CLOSED:
3621  trunk_connection_remove(tconn);
3622  break;
3623 
3624  default:
3626  }
3627 
3628  /*
3629  * It began life in the halted state,
3630  * and will end life in the halted state.
3631  */
3633 
3634  /*
3635  * There should be no requests left on this
3636  * connection. They should have all been
3637  * moved off or failed.
3638  */
3640 
3641  /*
3642  * And free the connection...
3643  */
3644  if (trunk->in_handler) {
3645  /*
3646  * ...later.
3647  */
3648  fr_dlist_insert_tail(&trunk->to_free, tconn);
3649  return;
3650  }
3651  talloc_free(tconn);
3652 }
3653 
3654 /** Free a connection
3655  *
3656  * Enforces orderly free order of children of the tconn
3657  */
3659 {
3660  fr_assert(tconn->pub.state == TRUNK_CONN_HALTED);
3661  fr_assert(!fr_dlist_entry_in_list(&tconn->entry)); /* Should not be in a list */
3662 
3663  /*
3664  * Loop over all the requests we gathered
3665  * and transition them to the failed state,
3666  * freeing them.
3667  *
3668  * Usually, requests will be re-queued when
3669  * a connection enters the closed state,
3670  * but in this case because the whole trunk
3671  * is being freed, we don't bother, and
3672  * just signal to the API client that the
3673  * requests failed.
3674  */
3675  if (tconn->pub.trunk->freeing) {
3676  fr_dlist_head_t to_fail;
3677  trunk_request_t *treq = NULL;
3678 
3679  fr_dlist_talloc_init(&to_fail, trunk_request_t, entry);
3680 
3681  /*
3682  * Remove requests from this connection
3683  */
3685  while ((treq = fr_dlist_next(&to_fail, treq))) {
3686  trunk_request_t *prev;
3687 
3688  prev = fr_dlist_remove(&to_fail, treq);
3690  treq = prev;
3691  }
3692  }
3693 
3694  /*
3695  * Ensure we're not signalled by the connection
3696  * as it processes its backlog of state changes,
3697  * as we are about to be freed.
3698  */
3706 
3707  /*
3708  * This may return -1, indicating the free was deferred
3709  * this is fine. It just means the conn will be freed
3710  * after all the handlers have exited.
3711  */
3712  (void)talloc_free(tconn->pub.conn);
3713  tconn->pub.conn = NULL;
3714 
3715  return 0;
3716 }
3717 
3718 /** Attempt to spawn a new connection
3719  *
3720  * Calls the API client's alloc() callback to create a new connection_t,
3721  * then inserts the connection into the 'connecting' list.
3722  *
3723  * @param[in] trunk to spawn connection in.
3724  * @param[in] now The current time.
3725  */
3727 {
3728  trunk_connection_t *tconn;
3729 
3730 
3731  /*
3732  * Call the API client's callback to create
3733  * a new connection_t.
3734  */
3735  MEM(tconn = talloc_zero(trunk, trunk_connection_t));
3736  tconn->pub.trunk = trunk;
3737  tconn->pub.state = TRUNK_CONN_HALTED; /* All connections start in the halted state */
3738 
3739  /*
3740  * Allocate a new connection_t or fail.
3741  */
3742  DO_CONNECTION_ALLOC(tconn);
3743 
3745  fr_dlist_talloc_init(&tconn->sent, trunk_request_t, entry);
3747  fr_dlist_talloc_init(&tconn->cancel, trunk_request_t, entry);
3749 
3750  /*
3751  * OK, we have the connection, now setup watch
3752  * points so we know when it changes state.
3753  *
3754  * This lets us automatically move the tconn
3755  * between the different lists in the trunk
3756  * with minimum extra code.
3757  */
3759  _trunk_connection_on_init, false, tconn); /* Before init() has been called */
3760 
3762  _trunk_connection_on_connecting, false, tconn); /* After init() has been called */
3763 
3765  _trunk_connection_on_connected, false, tconn); /* After open() has been called */
3766 
3768  _trunk_connection_on_closed, false, tconn); /* Before close() has been called */
3769 
3771  _trunk_connection_on_failed, false, tconn); /* Before failed() has been called */
3772 
3774  _trunk_connection_on_shutdown, false, tconn); /* After shutdown() has been called */
3775 
3777  _trunk_connection_on_halted, false, tconn); /* About to be freed */
3778 
3779  talloc_set_destructor(tconn, _trunk_connection_free);
3780 
3781  connection_signal_init(tconn->pub.conn); /* annnnd GO! */
3782 
3783  trunk->pub.last_open = now;
3784 
3785  return 0;
3786 }
3787 
3788 /** Pop a cancellation request off a connection's cancellation queue
3789  *
3790  * The request we return is advanced by the request moving out of the
3791  * cancel state and into the cancel_sent or cancel_complete state.
3792  *
3793  * One of these signalling functions must be called after the request
3794  * has been popped:
3795  *
3796  * - #trunk_request_signal_cancel_sent
3797  * The remote datastore has been informed, but we need to wait for acknowledgement.
3798  * The #trunk_request_demux_t callback must handle the acks calling
3799  * #trunk_request_signal_cancel_complete when an ack is received.
3800  *
3801  * - #trunk_request_signal_cancel_complete
3802  * The request was cancelled and we don't need to wait, clean it up immediately.
3803  *
3804  * @param[out] treq_out to process
3805  * @param[in] tconn Connection to drain cancellation request from.
3806  * @return
3807  * - 1 if no more requests.
3808  * - 0 if a new request was written to treq_out.
3809  * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3810  * memory or requests associated with the connection.
3811  * - -2 if called outside of the cancel muxer.
3812  */
3814 {
3815  if (unlikely(tconn->pub.state == TRUNK_CONN_HALTED)) return -1;
3816 
3818  "%s can only be called from within request_cancel_mux handler",
3819  __FUNCTION__)) return -2;
3820 
3821  *treq_out = tconn->cancel_partial ? tconn->cancel_partial : fr_dlist_head(&tconn->cancel);
3822  if (!*treq_out) return 1;
3823 
3824  return 0;
3825 }
3826 
3827 /** Pop a request off a connection's pending queue
3828  *
3829  * The request we return is advanced by the request moving out of the partial or
3830  * pending states, when the mux function signals us.
3831  *
3832  * If the same request is returned again and again, it means the muxer isn't actually
3833  * doing anything with the request we returned, and it's and error in the muxer code.
3834  *
3835  * One of these signalling functions must be used after the request has been popped:
3836  *
3837  * - #trunk_request_signal_complete
3838  * The request was completed. Either we got a synchronous response, or we knew the
3839  * response without contacting an external server (cache).
3840  *
3841  * - #trunk_request_signal_fail
3842  * Failed muxing the request due to a permanent issue, i.e. an invalid request.
3843  *
3844  * - #trunk_request_signal_partial
3845  * Wrote part of a request. This request will be returned on the next call to this
3846  * function so that the request_mux function can finish writing it. Only useful
3847  * for stream type connections. Datagram type connections cannot have partial
3848  * writes.
3849  *
3850  * - #trunk_request_signal_sent Successfully sent a request.
3851  *
3852  * @param[out] treq_out to process
3853  * @param[in] tconn to pop a request from.
3854  * @return
3855  * - 1 if no more requests.
3856  * - 0 if a new request was written to treq_out.
3857  * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3858  * memory or requests associated with the connection.
3859  * - -2 if called outside of the muxer.
3860  */
3862 {
3863  if (unlikely(tconn->pub.state == TRUNK_CONN_HALTED)) return -1;
3864 
3866  "%s can only be called from within request_mux handler",
3867  __FUNCTION__)) return -2;
3868 
3869  *treq_out = tconn->partial ? tconn->partial : fr_heap_peek(tconn->pending);
3870  if (!*treq_out) return 1;
3871 
3872  return 0;
3873 }
3874 
3875 /** Signal that a trunk connection is writable
3876  *
3877  * Should be called from the 'write' I/O handler to signal that requests can be enqueued.
3878  *
3879  * @param[in] tconn to signal.
3880  */
3882 {
3883  trunk_t *trunk = tconn->pub.trunk;
3884 
3885  if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3886  "%s cannot be called within a handler", __FUNCTION__)) return;
3887 
3888  DEBUG3("[%" PRIu64 "] Signalled writable", tconn->pub.conn->id);
3889 
3891 }
3892 
3893 /** Signal that a trunk connection is readable
3894  *
3895  * Should be called from the 'read' I/O handler to signal that requests should be dequeued.
3896  *
3897  * @param[in] tconn to signal.
3898  */
3900 {
3901  trunk_t *trunk = tconn->pub.trunk;
3902 
3903  if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3904  "%s cannot be called within a handler", __FUNCTION__)) return;
3905 
3906  DEBUG3("[%" PRIu64 "] Signalled readable", tconn->pub.conn->id);
3907 
3909 }
3910 
3911 /** Signal a trunk connection cannot accept more requests
3912  *
3913  * @param[in] tconn to signal.
3914  */
3916 {
3917  /* Can be called anywhere */
3918 
3919  switch (tconn->pub.state) {
3920  case TRUNK_CONN_ACTIVE:
3921  case TRUNK_CONN_FULL:
3923  break;
3924 
3925  case TRUNK_CONN_DRAINING:
3927  break;
3928 
3929  default:
3930  return;
3931  }
3932 }
3933 
3934 /** Signal a trunk connection is no longer full
3935  *
3936  * @param[in] tconn to signal.
3937  */
3939 {
3940  switch (tconn->pub.state) {
3941  case TRUNK_CONN_FULL:
3942  trunk_connection_auto_unfull(tconn); /* Mark as active if it should be active */
3943  break;
3944 
3945  case TRUNK_CONN_INACTIVE:
3946  /*
3947  * Do the appropriate state transition based on
3948  * how many requests the trunk connection is
3949  * currently servicing.
3950  */
3951  if (trunk_connection_is_full(tconn)) {
3953  break;
3954  }
3956  break;
3957 
3958  /*
3959  * Unsetting the active flag just moves
3960  * the connection back to the normal
3961  * draining state.
3962  */
3963  case TRUNK_CONN_INACTIVE_DRAINING: /* Only an external signal can trigger this transition */
3965  break;
3966 
3967  default:
3968  return;
3969  }
3970 }
3971 
3972 /** Signal a trunk connection is no longer viable
3973  *
3974  * @param[in] tconn to signal.
3975  * @param[in] reason the connection is being reconnected.
3976  */
3978 {
3979  connection_signal_reconnect(tconn->pub.conn, reason);
3980 }
3981 
3982 /** Standard I/O read function
3983  *
3984  * Underlying FD in now readable, so call the trunk to read any pending requests
3985  * from this connection.
3986  *
3987  * @param[in] el The event list signalling.
3988  * @param[in] fd that's now readable.
3989  * @param[in] flags describing the read event.
3990  * @param[in] uctx The trunk connection handle (tconn).
3991  */
3993 {
3994  trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3995 
3997 }
3998 
3999 /** Standard I/O write function
4000  *
4001  * Underlying FD is now writable, so call the trunk to write any pending requests
4002  * to this connection.
4003  *
4004  * @param[in] el The event list signalling.
4005  * @param[in] fd that's now writable.
4006  * @param[in] flags describing the write event.
4007  * @param[in] uctx The trunk connection handle (tcon).
4008  */
4010 {
4011  trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
4012 
4014 }
4015 
4016 
4017 /** Returns true if the trunk connection is in one of the specified states
4018  *
4019  * @param[in] tconn To check state for.
4020  * @param[in] state to check
4021  * @return
4022  * - True if trunk connection is in a particular state.
4023  * - False if trunk connection is not in a particular state.
4024  */
4026 {
4027  return (bool)(tconn->pub.state & state);
4028 }
4029 
4030 /** Close connections in a particular connection list if they have no requests associated with them
4031  *
4032  * @param[in] trunk containing connections we want to close.
4033  * @param[in] head of list of connections to examine.
4034  */
4036 {
4037  trunk_connection_t *tconn = NULL;
4038 
4039  while ((tconn = fr_dlist_next(head, tconn))) {
4040  trunk_connection_t *prev;
4041 
4043 
4044  prev = fr_dlist_prev(head, tconn);
4045 
4046  DEBUG3("Closing %s connection with no requests",
4047  fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"));
4048  /*
4049  * Close the connection as gracefully
4050  * as possible by signalling it should
4051  * shutdown.
4052  *
4053  * The connection, should, if serviced
4054  * correctly by the underlying library,
4055  * automatically transition to halted after
4056  * all pending reads/writes are
4057  * complete at which point we'll be informed
4058  * and free our tconn wrapper.
4059  */
4061  tconn = prev;
4062  }
4063 }
4064 
4065 /** Rebalance connections across active trunk members when a new connection becomes active
4066  *
4067  * We don't have any visibility into the connection prioritisation algorithm
4068  * it's essentially a black box.
4069  *
4070  * We can however determine when the correct level of requests per connection
4071  * has been reached, by dequeuing and requeing requests up until the point
4072  * where the connection that just had a request dequeued, receives the same
4073  * request back.
4074  *
4075  * @param[in] trunk The trunk to rebalance.
4076  */
4077 static void trunk_rebalance(trunk_t *trunk)
4078 {
4080 
4082 
4083  /*
4084  * Only rebalance if the top and bottom of
4085  * the heap are not equal.
4086  */
4087  if (trunk->funcs.connection_prioritise(fr_minmax_heap_max_peek(trunk->active), head) == 0) return;
4088 
4089  DEBUG3("Rebalancing requests");
4090 
4091  /*
4092  * Keep requeuing requests from the connection
4093  * at the bottom of the heap until the
4094  * connection at the top is shifted from that
4095  * position.
4096  */
4097  while ((fr_minmax_heap_min_peek(trunk->active) == head) &&
4099  TRUNK_REQUEST_STATE_PENDING, 1, false));
4100 }
4101 
4102 /** Implements the algorithm we use to manage requests per connection levels
4103  *
4104  * This is executed periodically using a timer event, and opens/closes
4105  * connections.
4106  *
4107  * The aim is to try and keep the request per connection level in a sweet spot,
4108  * where there's enough outstanding work for the connection/pipelining to work
4109  * efficiently, but not so much so that we encounter increased latency.
4110  *
4111  * In the request enqueue and dequeue functions we record every time the
4112  * average number of requests per connection goes above the target count
4113  * and record every time the average number of requests per connection goes
4114  * below the target count.
4115  *
4116  * This may sound expensive, but in all cases we're just summing counters.
4117  * CPU time required does not increase with additional requests, only with
4118  * large numbers of connections.
4119  *
4120  * If we do encounter scaling issues, we can always maintain the counters
4121  * as aggregates as an optimisation later.
4122  *
4123  * If when the management function runs, the trunk was above the target
4124  * most recently, we:
4125  * - Return if we've been in this state for a shorter period than 'open_delay'.
4126  * - Return if we're at max.
4127  * - Return if opening a new connection will take us below the load target.
4128  * - Return if we last opened a connection within 'open_delay'.
4129  * - Otherwise we attempt to open a new connection.
4130  *
4131  * If the trunk we below the target most recently, we:
4132  * - Return if we've been in this state for a shorter period than 'close_delay'.
4133  * - Return if we're at min.
4134  * - Return if we have no connections.
4135  * - Close a connection if min is 0, and we have no outstanding
4136  * requests. Then return.
4137  * - Return if closing a new connection will take us above the load target.
4138  * - Return if we last closed a connection within 'closed_delay'.
4139  * - Otherwise we move a connection to draining state.
4140  */
4141 static void trunk_manage(trunk_t *trunk, fr_time_t now)
4142 {
4143  trunk_connection_t *tconn = NULL;
4144  trunk_request_t *treq;
4145  uint32_t average = 0;
4146  uint32_t req_count;
4147  uint16_t conn_count;
4148  trunk_state_t new_state;
4149 
4150  DEBUG4("Managing trunk");
4151 
4152  /*
4153  * Cleanup requests in our request cache which
4154  * have been reapable for too long.
4155  */
4156  while ((treq = fr_dlist_tail(&trunk->free_requests)) &&
4158 
4159  /*
4160  * Free any connections which have drained
4161  * and we didn't reactivate during the last
4162  * round of management.
4163  */
4165  trunk_connection_close_if_empty(trunk, &trunk->draining);
4167 
4168  /*
4169  * Process deferred connection freeing
4170  */
4171  if (!trunk->in_handler) {
4172  while ((tconn = fr_dlist_head(&trunk->to_free))) talloc_free(fr_dlist_remove(&trunk->to_free, tconn));
4173  }
4174 
4175  /*
4176  * Update the state of the trunk
4177  */
4179  new_state = TRUNK_STATE_ACTIVE;
4180  } else {
4181  /*
4182  * INIT / CONNECTING / FULL mean connections will become active
4183  * so the trunk is PENDING
4184  */
4187  TRUNK_CONN_FULL) ?
4189  }
4190 
4191  if (new_state != trunk->pub.state) TRUNK_STATE_TRANSITION(new_state);
4192 
4193  /*
4194  * A trunk can be signalled to not proactively
4195  * manage connections if a destination is known
4196  * to be unreachable, and doing so would result
4197  * in spurious connections still being opened.
4198  *
4199  * We still run other connection management
4200  * functions and just short circuit the function
4201  * here.
4202  */
4203  if (!trunk->managing_connections) return;
4204 
4205  /*
4206  * We're above the target requests per connection
4207  * spawn more connections!
4208  */
4209  if (fr_time_gteq(trunk->pub.last_above_target, trunk->pub.last_below_target)) {
4210  /*
4211  * If connecting is provided, check we
4212  * wouldn't have too many connections in
4213  * the connecting state.
4214  *
4215  * This is a throttle in the case of transitory
4216  * load spikes, or a backend becoming
4217  * unavailable.
4218  */
4219  if ((trunk->conf.connecting > 0) &&
4221  trunk->conf.connecting)) {
4222  DEBUG4("Not opening connection - Too many (%u) connections in the connecting state",
4223  trunk->conf.connecting);
4224  return;
4225  }
4226 
4227  trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4228 
4229  /*
4230  * Only apply hysteresis if we have at least
4231  * one available connection.
4232  */
4233  if (conn_count && fr_time_gt(fr_time_add(trunk->pub.last_above_target, trunk->conf.open_delay), now)) {
4234  DEBUG4("Not opening connection - Need to be above target for %pVs. It's been %pVs",
4237  return; /* too soon */
4238  }
4239 
4240  /*
4241  * We don't consider 'draining' connections
4242  * in the max calculation, as if we do
4243  * determine that we need to spawn a new
4244  * request, then we'd move all 'draining'
4245  * connections to active before spawning
4246  * any new connections.
4247  */
4248  if ((trunk->conf.max > 0) && (conn_count >= trunk->conf.max)) {
4249  DEBUG4("Not opening connection - Have %u connections, need %u or below",
4250  conn_count, trunk->conf.max);
4251  return;
4252  }
4253 
4254  /*
4255  * We consider requests pending on all connections
4256  * and the trunk's backlog as that's the current count
4257  * load.
4258  */
4259  if (!req_count) {
4260  DEBUG4("Not opening connection - No outstanding requests");
4261  return;
4262  }
4263 
4264  /*
4265  * Do the n+1 check, i.e. if we open one connection
4266  * will that take us below our target threshold.
4267  */
4268  if (conn_count > 0) {
4269  average = ROUND_UP_DIV(req_count, (conn_count + 1));
4270  if (average < trunk->conf.target_req_per_conn) {
4271  DEBUG4("Not opening connection - Would leave us below our target requests "
4272  "per connection (now %u, after open %u)",
4273  ROUND_UP_DIV(req_count, conn_count), average);
4274  return;
4275  }
4276  } else {
4277  (void)trunk_connection_spawn(trunk, now);
4278  return;
4279  }
4280 
4281  /*
4282  * If we've got a connection in the draining list
4283  * move it back into the active list if we've
4284  * been requested to add a connection back in.
4285  */
4286  tconn = fr_dlist_head(&trunk->draining);
4287  if (tconn) {
4288  if (trunk_connection_is_full(tconn)) {
4290  } else {
4292  }
4293  return;
4294  }
4295 
4296  /*
4297  * Implement delay if there's no connections that
4298  * could be immediately re-activated.
4299  */
4300  if (fr_time_gt(fr_time_add(trunk->pub.last_open, trunk->conf.open_delay), now)) {
4301  DEBUG4("Not opening connection - Need to wait %pVs before opening another connection. "
4302  "It's been %pVs",
4304  fr_box_time_delta(fr_time_sub(now, trunk->pub.last_open)));
4305  return;
4306  }
4307 
4308  DEBUG4("Opening connection - Above target requests per connection (now %u, target %u)",
4309  ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4310  /* last_open set by trunk_connection_spawn */
4311  (void)trunk_connection_spawn(trunk, now);
4312  }
4313 
4314  /*
4315  * We're below the target requests per connection.
4316  * Free some connections...
4317  */
4318  else if (fr_time_gt(trunk->pub.last_below_target, trunk->pub.last_above_target)) {
4319  if (fr_time_gt(fr_time_add(trunk->pub.last_below_target, trunk->conf.close_delay), now)) {
4320  DEBUG4("Not closing connection - Need to be below target for %pVs. It's been %pVs",
4323  return; /* too soon */
4324  }
4325 
4326  trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4327 
4328  if (!conn_count) {
4329  DEBUG4("Not closing connection - No connections to close!");
4330  return;
4331  }
4332 
4333  if ((trunk->conf.min > 0) && ((conn_count - 1) < trunk->conf.min)) {
4334  DEBUG4("Not closing connection - Have %u connections, need %u or above",
4335  conn_count, trunk->conf.min);
4336  return;
4337  }
4338 
4339  if (!req_count) {
4340  DEBUG4("Closing connection - No outstanding requests");
4341  goto close;
4342  }
4343 
4344  /*
4345  * The minimum number of connections must be set
4346  * to zero for this to work.
4347  * min == 0, no requests, close all the connections.
4348  * This is useful for backup databases, when
4349  * maintaining the connection would lead to lots of
4350  * log file churn.
4351  */
4352  if (conn_count == 1) {
4353  DEBUG4("Not closing connection - Would leave connections "
4354  "and there are still %u outstanding requests", req_count);
4355  return;
4356  }
4357 
4358  /*
4359  * Do the n-1 check, i.e. if we close one connection
4360  * will that take us above our target threshold.
4361  */
4362  average = ROUND_UP_DIV(req_count, (conn_count - 1));
4363  if (average > trunk->conf.target_req_per_conn) {
4364  DEBUG4("Not closing connection - Would leave us above our target requests per connection "
4365  "(now %u, after close %u)", ROUND_UP_DIV(req_count, conn_count), average);
4366  return;
4367  }
4368 
4369  DEBUG4("Closing connection - Below target requests per connection (now %u, target %u)",
4370  ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4371 
4372  close:
4373  if (fr_time_gt(fr_time_add(trunk->pub.last_closed, trunk->conf.close_delay), now)) {
4374  DEBUG4("Not closing connection - Need to wait %pVs before closing another connection. "
4375  "It's been %pVs",
4378  return;
4379  }
4380 
4381  /*
4382  * If the last event on the trunk was a connection failure and
4383  * there is only one connection, this may well be a reconnect
4384  * attempt after a failure - and needs to persist otherwise
4385  * the last event will be a failure and no new connection will
4386  * be made, leading to no new requests being enqueued.
4387  */
4388  if (fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
4389  fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed) && (conn_count == 1)) {
4390  DEBUG4("Not closing remaining connection - last event was a failure");
4391  return;
4392  }
4393 
4394  /*
4395  * Inactive connections get counted in the
4396  * set of viable connections, but are likely
4397  * to be congested or dead, so we drain
4398  * (and possibly eventually free) those first.
4399  */
4400  if ((tconn = fr_dlist_tail(&trunk->inactive))) {
4401  /*
4402  * If the connection has no requests associated
4403  * with it then immediately free.
4404  */
4406  connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4407  } else {
4409  }
4410  /*
4411  * It is possible to have too may connecting
4412  * connections when the connections are
4413  * taking a while to open and the number
4414  * of requests decreases.
4415  */
4416  } else if ((tconn = fr_dlist_tail(&trunk->connecting))) {
4417  connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4418 
4419  /*
4420  * Finally if there are no "connecting"
4421  * connections to close, and no "inactive"
4422  * connections, start draining "active"
4423  * connections.
4424  */
4425  } else if ((tconn = fr_minmax_heap_max_peek(trunk->active))) {
4426  /*
4427  * If the connection has no requests associated
4428  * with it then immediately free.
4429  */
4431  connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4432  } else {
4434  }
4435  }
4436 
4437  trunk->pub.last_closed = now;
4438 
4439 
4440  return;
4441  }
4442 }
4443 
4444 /** Event to periodically call the connection management function
4445  *
4446  * @param[in] el this event belongs to.
4447  * @param[in] now current time.
4448  * @param[in] uctx The trunk.
4449  */
4450 static void _trunk_timer(fr_event_list_t *el, fr_time_t now, void *uctx)
4451 {
4452  trunk_t *trunk = talloc_get_type_abort(uctx, trunk_t);
4453 
4454  trunk_manage(trunk, now);
4455 
4457  if (fr_event_timer_in(trunk, el, &trunk->manage_ev, trunk->conf.manage_interval,
4458  _trunk_timer, trunk) < 0) {
4459  PERROR("Failed inserting trunk management event");
4460  /* Not much we can do, hopefully the trunk will be freed soon */
4461  }
4462  }
4463 }
4464 
4465 /** Return a count of requests on a connection in a specific state
4466  *
4467  * @param[in] trunk to retrieve counts for.
4468  * @param[in] conn_state One or more connection states or'd together.
4469  * @param[in] req_state One or more request states or'd together.
4470  * @return The number of requests in a particular state, on connection in a particular state.
4471  */
4472 uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
4473 {
4474  uint64_t count = 0;
4475  trunk_connection_t *tconn = NULL;
4477 
4478 #define COUNT_BY_STATE(_state, _list) \
4479 do { \
4480  if (conn_state & (_state)) { \
4481  tconn = NULL; \
4482  while ((tconn = fr_dlist_next(&trunk->_list, tconn))) { \
4483  count += trunk_request_count_by_connection(tconn, req_state); \
4484  } \
4485  } \
4486 } while (0)
4487 
4488  if (conn_state & TRUNK_CONN_ACTIVE) {
4489  for (tconn = fr_minmax_heap_iter_init(trunk->active, &iter);
4490  tconn;
4491  tconn = fr_minmax_heap_iter_next(trunk->active, &iter)) {
4492  count += trunk_request_count_by_connection(tconn, req_state);
4493  }
4494  }
4495 
4498  COUNT_BY_STATE(TRUNK_CONN_INACTIVE_DRAINING, inactive_draining);
4500  COUNT_BY_STATE(TRUNK_CONN_DRAINING_TO_FREE, draining_to_free);
4501 
4502  if (req_state & TRUNK_REQUEST_STATE_BACKLOG) count += fr_heap_num_elements(trunk->backlog);
4503 
4504  return count;
4505 }
4506 
4507 /** Update timestamps for when we last had a transition from above target to below target or vice versa
4508  *
4509  * Should be called on every time a connection or request is allocated or freed.
4510  *
4511  * @param[out] conn_count_out How many connections we considered.
4512  * @param[out] req_count_out How many requests we considered.
4513  * @param[in] trunk to operate on.
4514  * @param[in] now The current time.
4515  * @param[in] verify if true (and this is a debug build), then assert if req_per_conn
4516  * has changed.
4517  * @return
4518  * - 0 if the average couldn't be calculated (no requests or no connections).
4519  * - The average number of requests per connection.
4520  */
4521 static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_count_out,
4522  trunk_t *trunk, fr_time_t now,
4523  NDEBUG_UNUSED bool verify)
4524 {
4525  uint32_t req_count = 0;
4526  uint16_t conn_count = 0;
4527  uint64_t req_per_conn = 0;
4528 
4529  fr_assert(fr_time_gt(now, fr_time_wrap(0)));
4530 
4531  /*
4532  * No need to update these as the trunk is being freed
4533  */
4534  if (trunk->freeing) goto done;
4535 
4536  /*
4537  * Count all connections except draining and draining to free.
4538  *
4539  * Omitting these connection states artificially raises the
4540  * request to connection ratio, so that we can preemptively spawn
4541  * new connections.
4542  *
4543  * In the case of TRUNK_CONN_DRAINING | TRUNK_CONN_INACTIVE_DRAINING
4544  * the trunk management code has enough hysteresis to not
4545  * immediately reactivate the connection.
4546  *
4547  * In the case of TRUNK_CONN_DRAINING_TO_FREE the trunk
4548  * management code should spawn a new connection to takes its place.
4549  *
4550  * Connections placed in the DRAINING_TO_FREE state are being
4551  * closed preemptively to deal with bugs on the server we're
4552  * talking to, or misconfigured firewalls which are trashing
4553  * TCP/UDP connection states.
4554  */
4555  conn_count = trunk_connection_count_by_state(trunk, TRUNK_CONN_ALL ^
4559 
4560  /*
4561  * Requests on all connections
4562  */
4563  req_count = trunk_request_count_by_state(trunk,
4564  TRUNK_CONN_ALL ^
4566 
4567  /*
4568  * No connections, but we do have requests
4569  */
4570  if (conn_count == 0) {
4571  if ((req_count > 0) && (trunk->conf.target_req_per_conn > 0)) goto above_target;
4572  goto done;
4573  }
4574 
4575  if (req_count == 0) {
4576  if (trunk->conf.target_req_per_conn > 0) goto below_target;
4577  goto done;
4578  }
4579 
4580  /*
4581  * Calculate the req_per_conn
4582  */
4583  req_per_conn = ROUND_UP_DIV(req_count, conn_count);
4584  if (req_per_conn > trunk->conf.target_req_per_conn) {
4585  above_target:
4586  /*
4587  * Edge - Below target to above target (too many requests per conn - spawn more)
4588  *
4589  * The equality check is correct here as both values start at 0.
4590  */
4591  if (fr_time_lteq(trunk->pub.last_above_target, trunk->pub.last_below_target)) trunk->pub.last_above_target = now;
4592  } else if (req_per_conn < trunk->conf.target_req_per_conn) {
4593  below_target:
4594  /*
4595  * Edge - Above target to below target (too few requests per conn - close some)
4596  *
4597  * The equality check is correct here as both values start at 0.
4598  */
4599  if (fr_time_lteq(trunk->pub.last_below_target, trunk->pub.last_above_target)) trunk->pub.last_below_target = now;
4600  }
4601 
4602 done:
4603  if (conn_count_out) *conn_count_out = conn_count;
4604  if (req_count_out) *req_count_out = req_count;
4605 
4606  /*
4607  * Check we haven't missed a call to trunk_requests_per_connection
4608  */
4609  fr_assert(!verify || (trunk->last_req_per_conn == 0) || (req_per_conn == trunk->last_req_per_conn));
4610 
4611  trunk->last_req_per_conn = req_per_conn;
4612 
4613  return req_per_conn;
4614 }
4615 
4616 /** Drain the backlog of as many requests as possible
4617  *
4618  * @param[in] trunk To drain backlog requests for.
4619  */
4620 static void trunk_backlog_drain(trunk_t *trunk)
4621 {
4622  trunk_request_t *treq;
4623 
4624  if (fr_heap_num_elements(trunk->backlog) == 0) return;
4625 
4626  /*
4627  * If it's always writable, this isn't
4628  * really a noteworthy event.
4629  */
4630  if (!trunk->conf.always_writable) DEBUG3("Draining backlog of requests");
4631 
4632  /*
4633  * Do *NOT* add an artificial limit
4634  * here. We rely on all available
4635  * connections entering the full
4636  * state and transitioning back to
4637  * active in order to drain the
4638  * backlog.
4639  */
4640  while ((treq = fr_heap_peek(trunk->backlog))) {
4641  switch (trunk_request_enqueue_existing(treq)) {
4642  case TRUNK_ENQUEUE_OK:
4643  continue;
4644 
4645  /*
4646  * Signal to stop
4647  */
4649  break;
4650 
4651  /*
4652  * Failed enqueueing the request,
4653  * have it enter the failed state
4654  * which will free it and
4655  * re-enliven the yielded request.
4656  */
4658  case TRUNK_ENQUEUE_FAIL:
4660  continue;
4661 
4664  return;
4665  }
4666  }
4667 }
4668 
4669 /** Force the trunk to re-establish its connections
4670  *
4671  * @param[in] trunk to signal.
4672  * @param[in] states One or more states or'd together.
4673  * @param[in] reason Why the connections are being signalled to reconnect.
4674  */
4675 void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
4676 {
4677 
4678 #define RECONNECT_BY_STATE(_state, _list) \
4679 do { \
4680  if (states & (_state)) { \
4681  size_t i; \
4682  for (i = fr_dlist_num_elements(&trunk->_list); i > 0; i--) { \
4683  connection_signal_reconnect(((trunk_connection_t *)fr_dlist_tail(&trunk->_list))->pub.conn, reason); \
4684  } \
4685  } \
4686 } while (0)
4687 
4688  /*
4689  * Connections in the 'connecting' state
4690  * may re-enter that state, so we need to
4691  * be careful not to enter an infinite
4692  * loop, as we iterate over the list
4693  * again and again.
4694  */
4696 
4697  if (states & TRUNK_CONN_ACTIVE) {
4698  trunk_connection_t *tconn;
4699  while ((tconn = fr_minmax_heap_min_peek(trunk->active))) connection_signal_reconnect(tconn->pub.conn, reason);
4700  }
4701 
4709 }
4710 
4711 /** Start the trunk running
4712  *
4713  */
4715 {
4716  uint16_t i;
4717 
4718  if (unlikely(trunk->started)) return 0;
4719 
4720  /*
4721  * Spawn the initial set of connections
4722  */
4723  for (i = 0; i < trunk->conf.start; i++) {
4724  DEBUG("[%i] Starting initial connection", i);
4725  if (trunk_connection_spawn(trunk, fr_time()) != 0) return -1;
4726  }
4727 
4729  /*
4730  * Insert the event timer to manage
4731  * the interval between managing connections.
4732  */
4733  if (fr_event_timer_in(trunk, trunk->el, &trunk->manage_ev, trunk->conf.manage_interval,
4734  _trunk_timer, trunk) < 0) {
4735  PERROR("Failed inserting trunk management event");
4736  return -1;
4737  }
4738  }
4739  trunk->started = true;
4740  trunk->managing_connections = true;
4741 
4742  return 0;
4743 }
4744 
4745 /** Allow the trunk to open and close connections in response to load
4746  *
4747  */
4749 {
4750  if (!trunk->started || trunk->managing_connections) return;
4751 
4752  DEBUG3("Connection management enabled");
4753  trunk->managing_connections = true;
4754 }
4755 
4756 /** Stop the trunk from opening and closing connections in response to load
4757  *
4758  */
4760 {
4761  if (!trunk->started || !trunk->managing_connections) return;
4762 
4763  DEBUG3("Connection management disabled");
4764  trunk->managing_connections = false;
4765 }
4766 
4767 /** Schedule a trunk management event for the next time the event loop is executed
4768  */
4770 {
4771  if (!trunk->started || !trunk->managing_connections) return 0;
4772 
4773  if (fr_event_timer_in(trunk, trunk->el, &trunk->manage_ev, fr_time_delta_wrap(0), _trunk_timer, trunk) < 0) {
4774  PERROR("Failed inserting trunk management event");
4775  return -1;
4776  }
4777 
4778  return 0;
4779 }
4780 
4781 /** Order connections by queue depth
4782  *
4783  */
4784 static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
4785 {
4788 
4791 
4792  /*
4793  * Add a fudge factor of 1 to reduce spurious rebalancing
4794  */
4795  return ((a_count > b_count) && ((a_count - b_count) > 1)) - ((b_count > a_count) && ((b_count - a_count) > 1));
4796 }
4797 
4798 /** Free a trunk, gracefully closing all connections.
4799  *
4800  */
4801 static int _trunk_free(trunk_t *trunk)
4802 {
4803  trunk_connection_t *tconn;
4804  trunk_request_t *treq;
4805  trunk_watch_entry_t *watch;
4806  size_t i;
4807 
4808  DEBUG4("Trunk free %p", trunk);
4809 
4810  trunk->freeing = true; /* Prevent re-enqueuing */
4811 
4812  /*
4813  * We really don't want this firing after
4814  * we've freed everything.
4815  */
4817 
4818  /*
4819  * Now free the connections in each of the lists.
4820  *
4821  * Each time a connection is freed it removes itself from the list
4822  * its in, which means the head should keep advancing automatically.
4823  */
4824  while ((tconn = fr_minmax_heap_min_peek(trunk->active))) connection_signal_halt(tconn->pub.conn);
4825  while ((tconn = fr_dlist_head(&trunk->init))) connection_signal_halt(tconn->pub.conn);
4826  while ((tconn = fr_dlist_head(&trunk->connecting))) connection_signal_halt(tconn->pub.conn);
4827  while ((tconn = fr_dlist_head(&trunk->full))) connection_signal_halt(tconn->pub.conn);
4828  while ((tconn = fr_dlist_head(&trunk->inactive))) connection_signal_halt(tconn->pub.conn);
4829  while ((tconn = fr_dlist_head(&trunk->inactive_draining))) connection_signal_halt(tconn->pub.conn);
4830  while ((tconn = fr_dlist_head(&trunk->closed))) connection_signal_halt(tconn->pub.conn);
4831  while ((tconn = fr_dlist_head(&trunk->draining))) connection_signal_halt(tconn->pub.conn);
4832  while ((tconn = fr_dlist_head(&trunk->draining_to_free))) connection_signal_halt(tconn->pub.conn);
4833 
4834  /*
4835  * Process any deferred connection frees
4836  */
4837  while ((tconn = fr_dlist_head(&trunk->to_free))) talloc_free(fr_dlist_remove(&trunk->to_free, tconn));
4838 
4839  /*
4840  * Free any requests left in the backlog
4841  */
4842  while ((treq = fr_heap_peek(trunk->backlog))) trunk_request_enter_failed(treq);
4843 
4844  /*
4845  * Free any requests in our request cache
4846  */
4847  while ((treq = fr_dlist_head(&trunk->free_requests))) talloc_free(treq);
4848 
4849  /*
4850  * Free any entries in the watch lists
4851  */
4852  for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
4853  while ((watch = fr_dlist_pop_head(&trunk->watch[i]))) talloc_free(watch);
4854  }
4855 
4856  return 0;
4857 }
4858 
4859 /** Allocate a new collection of connections
4860  *
4861  * This function should be called first to allocate a new trunk connection.
4862  *
4863  * After the trunk has been allocated, #trunk_request_alloc and
4864  * #trunk_request_enqueue should be used to allocate memory for trunk
4865  * requests, and pass a preq (protocol request) to the trunk for
4866  * processing.
4867  *
4868  * The trunk will then asynchronously process the request, writing the result
4869  * to a specified rctx. See #trunk_request_enqueue for more details.
4870  *
4871  * @note Trunks may not be shared between multiple threads under any circumstances.
4872  *
4873  * @param[in] ctx To use for any memory allocations. Must be thread local.
4874  * @param[in] el to use for I/O and timer events.
4875  * @param[in] funcs Callback functions.
4876  * @param[in] conf Common user configurable parameters.
4877  * @param[in] log_prefix To prepend to global messages.
4878  * @param[in] uctx User data to pass to the alloc function.
4879  * @param[in] delay_start If true, then we will not spawn any connections
4880  * until the first request is enqueued.
4881  * @return
4882  * - New trunk handle on success.
4883  * - NULL on error.
4884  */
4886  trunk_io_funcs_t const *funcs, trunk_conf_t const *conf,
4887  char const *log_prefix, void const *uctx, bool delay_start)
4888 {
4889  trunk_t *trunk;
4890  size_t i;
4891 
4892  /*
4893  * Check we have the functions we need
4894  */
4895  if (!fr_cond_assert(funcs->connection_alloc)) return NULL;
4896 
4897  MEM(trunk = talloc_zero(ctx, trunk_t));
4898  trunk->el = el;
4899  trunk->log_prefix = talloc_strdup(trunk, log_prefix);
4900 
4901  memcpy(&trunk->funcs, funcs, sizeof(trunk->funcs));
4902  if (!trunk->funcs.connection_prioritise) {
4904  }
4906 
4907  memcpy(&trunk->conf, conf, sizeof(trunk->conf));
4908 
4909  memcpy(&trunk->uctx, &uctx, sizeof(trunk->uctx));
4910  talloc_set_destructor(trunk, _trunk_free);
4911 
4912  /*
4913  * Unused request list...
4914  */
4916 
4917  /*
4918  * Request backlog queue
4919  */
4921  trunk_request_t, heap_id, 0));
4922 
4923  /*
4924  * Connection queues and trees
4925  */
4927  trunk_connection_t, heap_id, 0));
4928  fr_dlist_talloc_init(&trunk->init, trunk_connection_t, entry);
4930  fr_dlist_talloc_init(&trunk->full, trunk_connection_t, entry);
4937 
4938  /*
4939  * Watch lists
4940  */
4941  for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
4942  fr_dlist_talloc_init(&trunk->watch[i], trunk_watch_entry_t, entry);
4943  }
4944 
4945  DEBUG4("Trunk allocated %p", trunk);
4946 
4947  if (!delay_start) {
4948  if (trunk_start(trunk) < 0) {
4949  talloc_free(trunk);
4950  return NULL;
4951  }
4952  }
4953 
4954  return trunk;
4955 }
4956 
4957 #ifndef TALLOC_GET_TYPE_ABORT_NOOP
4958 /** Verify a trunk
4959  *
4960  * A trunk has some number of connections, which each have some number of requests. The connections and
4961  * requests are in differing kinds of containers depending on their state and how they are used, and may
4962  * have fields that can only be validated by comparison with a parent. We had planned on passing a "context"
4963  * down with the ancestral values, but that breaks the foo_verify() API. Each foo_verify() will only verify the
4964  * foo's children.
4965  */
4966 void trunk_verify(char const *file, int line, trunk_t *trunk)
4967 {
4968  fr_fatal_assert_msg(trunk, "CONSISTENCY CHECK FAILED %s[%i]: trunk_t pointer was NULL", file, line);
4969  (void) talloc_get_type_abort(trunk, trunk_t);
4970 
4971  for (size_t i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
4972  _fr_dlist_verify(file, line, &trunk->watch[i]);
4973  }
4974 
4975 #define IO_FUNC_VERIFY(_func) \
4976  fr_fatal_assert_msg(trunk->funcs._func, "CONSISTENCY_CHECK_FAILED %s[%i}: " #_func " was NULL", file, line)
4977 
4978  /*
4979  * Only a few of the function pointers *must* be non-NULL..
4980  */
4982  IO_FUNC_VERIFY(connection_prioritise);
4984 
4985 #define TRUNK_TCONN_CHECKS(_tconn, _state) \
4986 do { \
4987  fr_fatal_assert_msg(trunk == _tconn->pub.trunk, \
4988  "CONSISTENCY_CHECK_FAILED %s[%i}: connection-trunk mismatch", file, line); \
4989  fr_fatal_assert_msg(_state == _tconn->pub.state, \
4990  "CONSISTENCY_CHECK_FAILED %s[%i}: connection-state mismatch", file, line); \
4991 } while (0)
4992 
4993 #define TCONN_DLIST_VERIFY(_dlist, _state) \
4994 do { \
4995  _fr_dlist_verify(file, line, &(trunk->_dlist)); \
4996  fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
4997  trunk_connection_verify(file, line, tconn); \
4998  TRUNK_TCONN_CHECKS(tconn, _state); \
4999  } \
5000 } while (0)
5001 
5002 #define TCONN_MINMAX_HEAP_VERIFY(_heap, _state) \
5003 do {\
5004  fr_minmax_heap_verify(file, line, trunk->_heap); \
5005  fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5006  trunk_connection_verify(file, line, tconn); \
5007  TRUNK_TCONN_CHECKS(tconn, _state); \
5008  }} \
5009 } while (0)
5010 
5011  fr_dlist_verify(&(trunk->free_requests));
5012  FR_HEAP_VERIFY(trunk->backlog);
5013 
5020  /* TCONN_DLIST_VERIFY(failed, ???); */
5025 }
5026 
5027 void trunk_connection_verify(char const *file, int line, trunk_connection_t *tconn)
5028 {
5029  fr_fatal_assert_msg(tconn, "CONSISTENCY CHECK FAILED %s[%i]: trunk_connection_t pointer was NULL", file, line);
5030  (void) talloc_get_type_abort(tconn, trunk_connection_t);
5031 
5032  (void) talloc_get_type_abort(tconn->pub.trunk, trunk_t);
5033 
5034  /*
5035  * shouldn't be both in heap and on list--but it doesn't look like moves
5036  * to active heap wipe the dlist pointers.
5037  */
5038 
5039 #define TCONN_TREQ_CHECKS(_treq, _state) \
5040 do { \
5041  fr_fatal_assert_msg(tconn == _treq->pub.tconn, \
5042  "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-tconn mismatch", file, line); \
5043  fr_fatal_assert_msg(tconn->pub.trunk == _treq->pub.trunk, \
5044  "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-trunk mismatch", file, line); \
5045  fr_fatal_assert_msg(_state == _treq->pub.state, \
5046  "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-state mismatch", file, line); \
5047 } while (0)
5048 
5049 #define TREQ_DLIST_VERIFY(_dlist, _state) \
5050 do { \
5051  _fr_dlist_verify(file, line, &(tconn->_dlist)); \
5052  fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5053  trunk_request_verify(file, line, treq); \
5054  TCONN_TREQ_CHECKS(treq, _state); \
5055  } \
5056 } while (0)
5057 
5058 #define TREQ_HEAP_VERIFY(_heap, _state) \
5059 do { \
5060  fr_heap_iter_t _iter; \
5061  fr_heap_verify(file, line, tconn->_heap); \
5062  for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5063  treq; \
5064  treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5065  trunk_request_verify(file, line, treq); \
5066  TCONN_TREQ_CHECKS(treq, _state); \
5067  } \
5068 } while (0)
5069 
5070 #define TREQ_OPTION_VERIFY(_option, _state) \
5071 do { \
5072  if (tconn->_option) { \
5073  trunk_request_verify(file, line, tconn->_option); \
5074  TCONN_TREQ_CHECKS(tconn->_option, _state); \
5075  } \
5076 } while (0)
5077 
5078  /* verify associated requests */
5085 }
5086 
5087 void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
5088 {
5089  fr_fatal_assert_msg(treq, "CONSISTENCY CHECK FAILED %s[%i]: trunk_request_t pointer was NULL", file, line);
5090  (void) talloc_get_type_abort(treq, trunk_request_t);
5091 
5092 #ifdef WITH_VERIFY_PTR
5093  if (treq->pub.request) request_verify(file, line, treq->pub.request);
5094 #endif
5095 }
5096 
5097 
5098 bool trunk_search(trunk_t *trunk, void *ptr)
5099 {
5100 #define TCONN_DLIST_SEARCH(_dlist) \
5101 do { \
5102  fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5103  if (ptr == tconn) { \
5104  fr_fprintf(stderr, "trunk_search: tconn %p on " #_dlist "\n", ptr); \
5105  return true; \
5106  } \
5107  if (trunk_connection_search(tconn, ptr)) { \
5108  fr_fprintf(stderr, " in tconn %p on " #_dlist "\n", tconn); \
5109  return true; \
5110  } \
5111  } \
5112 } while (0)
5113 
5114 #define TCONN_MINMAX_HEAP_SEARCH(_heap) \
5115 do { \
5116  fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5117  if (ptr == tconn) { \
5118  fr_fprintf(stderr, "trunk_search: tconn %p on " #_heap "\n", ptr); \
5119  return true; \
5120  } \
5121  if (trunk_connection_search(tconn, ptr)) { \
5122  fr_fprintf(stderr, " on tconn %p on " #_heap "\n", tconn); \
5123  return true; \
5124  } \
5125  }}\
5126 } while (0)
5127 
5129  TCONN_DLIST_SEARCH(connecting);
5130  TCONN_MINMAX_HEAP_SEARCH(active);
5131  TCONN_DLIST_SEARCH(full);
5132  TCONN_DLIST_SEARCH(inactive);
5133  TCONN_DLIST_SEARCH(inactive_draining);
5134  TCONN_DLIST_SEARCH(failed);
5135  TCONN_DLIST_SEARCH(closed);
5136  TCONN_DLIST_SEARCH(draining);
5137  TCONN_DLIST_SEARCH(draining_to_free);
5138  TCONN_DLIST_SEARCH(to_free);
5139 
5140  return false;
5141 }
5142 
5144 {
5145 #define TREQ_DLIST_SEARCH(_dlist) \
5146 do { \
5147  fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5148  if (ptr == treq) { \
5149  fr_fprintf(stderr, "trunk_search: treq %p on " #_dlist "\n", ptr); \
5150  return true; \
5151  } \
5152  if (trunk_request_search(treq, ptr)) { \
5153  fr_fprintf(stderr, "trunk_search: preq %p found on " #_dlist, ptr); \
5154  return true; \
5155  } \
5156  } \
5157 } while (0)
5158 
5159 #define TREQ_HEAP_SEARCH(_heap) \
5160 do { \
5161  fr_heap_iter_t _iter; \
5162  for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5163  treq; \
5164  treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5165  if (ptr == treq) { \
5166  fr_fprintf(stderr, "trunk_search: treq %p in " #_heap "\n", ptr); \
5167  return true; \
5168  } \
5169  if (trunk_request_search(treq, ptr)) { \
5170  fr_fprintf(stderr, "trunk_search: preq %p found in " #_heap, ptr); \
5171  return true; \
5172  } \
5173  } \
5174 } while (0)
5175 
5176 #define TREQ_OPTION_SEARCH(_option) \
5177 do { \
5178  if (tconn->_option) { \
5179  if (ptr == tconn->_option) { \
5180  fr_fprintf(stderr, "trunk_search: treq %p is " #_option "\n", ptr); \
5181  return true; \
5182  } \
5183  if (trunk_request_search(tconn->_option, ptr)) { \
5184  fr_fprintf(stderr, "trunk_search: preq %p found in " #_option, ptr); \
5185  return true; \
5186  } \
5187  } \
5188 } while (0)
5189 
5190  /* search associated requests */
5191  TREQ_HEAP_SEARCH(pending);
5192  TREQ_DLIST_SEARCH(sent);
5193  TREQ_DLIST_SEARCH(cancel);
5194  TREQ_DLIST_SEARCH(cancel_sent);
5195  TREQ_OPTION_SEARCH(partial);
5196  TREQ_OPTION_SEARCH(cancel_partial);
5197 
5198  return false;
5199 }
5200 
5202 {
5203  return treq->pub.preq == ptr;
5204 }
5205 #endif
int const char * file
Definition: acutest.h:702
int const char int line
Definition: acutest.h:702
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request)
#define L(_str)
Helper for initialising arrays of string literals.
Definition: build.h:207
#define NDEBUG_UNUSED
Definition: build.h:324
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition: build.h:320
#define unlikely(_x)
Definition: build.h:379
#define UNUSED
Definition: build.h:313
#define NUM_ELEMENTS(_t)
Definition: build.h:335
#define CONF_PARSER_TERMINATOR
Definition: cf_parse.h:627
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition: cf_parse.h:268
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
Definition: cf_parse.h:310
#define FR_CONF_OFFSET_SUBSECTION(_name, _flags, _struct, _field, _subcs)
conf_parser_t which populates a sub-struct using a CONF_SECTION
Definition: cf_parse.h:297
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Definition: cf_parse.h:399
Defines a CONF_PAIR to C data type mapping.
Definition: cf_parse.h:564
connection_state_t
Definition: connection.h:45
@ CONNECTION_STATE_FAILED
Connection has failed.
Definition: connection.h:54
@ CONNECTION_STATE_HALTED
The connection is in a halted stat.
Definition: connection.h:46
@ CONNECTION_STATE_CLOSED
Connection has been closed.
Definition: connection.h:55
@ CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
Definition: connection.h:52
@ CONNECTION_STATE_INIT
Init state, sets up connection.
Definition: connection.h:49
@ CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
Definition: connection.h:50
@ CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
Definition: connection.h:53
connection_reason_t
Definition: connection.h:83
next
Definition: dcursor.h:178
fr_dcursor_eval_t void const * uctx
Definition: dcursor.h:546
fr_dcursor_iter_t iter
Definition: dcursor.h:147
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:139
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition: debug.h:210
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:156
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition: debug.h:184
#define DEBUG(fmt,...)
Definition: dhcpclient.c:39
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition: dlist.h:260
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition: dlist.h:672
static void _fr_dlist_verify(char const *file, int line, fr_dlist_head_t const *list_head)
Check all items in the list are valid.
Definition: dlist.h:735
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Definition: dlist.h:555
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition: dlist.h:163
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition: dlist.h:939
static void * fr_dlist_tail(fr_dlist_head_t const *list_head)
Return the TAIL item of a list or NULL if the list is empty.
Definition: dlist.h:531
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition: dlist.h:486
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition: dlist.h:378
#define fr_dlist_verify(_head)
Definition: dlist.h:755
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition: dlist.h:638
static void * fr_dlist_prev(fr_dlist_head_t const *list_head, void const *ptr)
Get the previous item in a list.
Definition: dlist.h:588
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition: dlist.h:275
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
Definition: dlist.h:338
Head of a doubly linked list.
Definition: dlist.h:51
Entry in a doubly linked list.
Definition: dlist.h:41
#define fr_event_timer_in(...)
Definition: event.h:255
if(rcode > 0)
Definition: fd_read.h:9
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition: heap.c:146
unsigned int fr_heap_index_t
Definition: heap.h:80
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition: heap.h:136
#define FR_HEAP_VERIFY(_heap)
Definition: heap.h:212
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition: heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition: heap.h:115
The main heap structure.
Definition: heap.h:66
#define PERROR(_fmt,...)
Definition: log.h:228
#define DEBUG3(_fmt,...)
Definition: log.h:266
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition: log.h:528
#define RDEBUG3(fmt,...)
Definition: log.h:343
#define RWARN(fmt,...)
Definition: log.h:297
#define DEBUG4(_fmt,...)
Definition: log.h:267
#define RATE_LIMIT_LOCAL_ROPTIONAL(_entry, _l_request, _l_global, _fmt,...)
Rate limit messages using a local limiting entry.
Definition: log.h:606
Track when a log message was last repeated.
Definition: log.h:547
talloc_free(reap)
int fr_event_timer_delete(fr_event_timer_t const **ev_p)
Delete a timer event from the event list.
Definition: event.c:1611
Stores all information relating to an event list.
Definition: event.c:411
A timer event.
Definition: event.c:102
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition: log.c:583
fr_log_type_t
Definition: log.h:54
#define ROUND_UP_DIV(_x, _y)
Get the ceiling value of integer division.
Definition: math.h:153
unsigned short uint16_t
Definition: merged_model.c:31
unsigned int uint32_t
Definition: merged_model.c:33
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
Definition: minmax_heap.c:424
void * fr_minmax_heap_iter_next(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Get the next entry in a minmax heap.
Definition: minmax_heap.c:573
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
Definition: minmax_heap.c:449
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
Definition: minmax_heap.c:466
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
Definition: minmax_heap.c:533
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
Definition: minmax_heap.c:486
void * fr_minmax_heap_iter_init(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Iterate over entries in a minmax heap.
Definition: minmax_heap.c:551
unsigned int fr_minmax_heap_iter_t
Definition: minmax_heap.h:38
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
Definition: minmax_heap.h:85
int8_t fr_pointer_cmp(void const *a, void const *b)
Compares two pointers.
Definition: misc.c:417
static bool done
Definition: radclient.c:80
#define RDEBUG(fmt,...)
Definition: radclient.h:53
#define DEBUG2(fmt,...)
Definition: radclient.h:43
#define WARN(fmt,...)
Definition: radclient.h:47
#define INFO(fmt,...)
Definition: radict.c:54
static fr_event_list_t * events
Definition: radsniff.c:59
static rs_t * conf
Definition: radsniff.c:53
static int8_t request_prioritise(void const *one, void const *two)
static size_t min(size_t x, size_t y)
Definition: sbuff.c:143
void connection_signal_shutdown(connection_t *conn)
Shuts down a connection gracefully.
Definition: connection.c:1228
int connection_del_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a post list.
Definition: connection.c:472
void connection_signal_halt(connection_t *conn)
Shuts down a connection ungracefully.
Definition: connection.c:1291
void connection_signals_resume(connection_t *conn)
Resume processing of deferred signals.
Definition: connection.c:319
void connection_signal_reconnect(connection_t *conn, connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
Definition: connection.c:1167
connection_watch_entry_t * connection_add_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed before a state function has been called.
Definition: connection.c:510
void connection_signal_init(connection_t *conn)
Asynchronously signal a halted connection to start.
Definition: connection.c:1107
connection_watch_entry_t * connection_add_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed after a state function has been called.
Definition: connection.c:532
connection_t * connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_funcs_t const *funcs, connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
Definition: connection.c:1512
int connection_del_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a pre list.
Definition: connection.c:455
void connection_signals_pause(connection_t *conn)
Pause processing of deferred signals.
Definition: connection.c:310
static fr_time_t test_time_base
Definition: slab_tests.c:42
return count
Definition: module.c:163
fr_assert(0)
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
init
Enter the EAP-IDENTITY state.
Definition: state_machine.c:90
fr_time_t test_time
Definition: state_test.c:3
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
@ memory_order_relaxed
Definition: stdatomic.h:127
#define atomic_fetch_add_explicit(object, operand, order)
Definition: stdatomic.h:302
#define ATOMIC_VAR_INIT(value)
Definition: stdatomic.h:88
Definition: log.h:96
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition: table.h:772
An element in a table indexed by bit position.
Definition: table.h:83
An element in an arbitrarily ordered array of name to num mappings.
Definition: table.h:57
#define talloc_get_type_abort_const
Definition: talloc.h:282
#define talloc_pooled_object(_ctx, _type, _num_subobjects, _total_subobjects_size)
Definition: talloc.h:180
#define fr_time_gteq(_a, _b)
Definition: time.h:238
#define fr_time_delta_wrap(_time)
Definition: time.h:152
#define fr_time_wrap(_time)
Definition: time.h:145
#define fr_time_lteq(_a, _b)
Definition: time.h:240
#define fr_time_delta_ispos(_a)
Definition: time.h:290
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition: time.h:196
#define fr_time_gt(_a, _b)
Definition: time.h:237
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition: time.h:229
#define fr_time_lt(_a, _b)
Definition: time.h:239
"server local" time.
Definition: time.h:69
bool trunk_search(trunk_t *trunk, void *ptr)
Definition: trunk.c:5098
static atomic_uint_fast64_t request_counter
Definition: trunk.c:52
static void trunk_connection_enter_active(trunk_connection_t *tconn)
Transition a connection back to the active state.
Definition: trunk.c:3236
#define CONN_REORDER(_tconn)
Reorder the connections in the active heap.
Definition: trunk.c:772
static size_t trunk_req_trigger_names_len
Definition: trunk.c:354
int trunk_connection_pop_cancellation(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a cancellation request off a connection's cancellation queue.
Definition: trunk.c:3813
fr_dlist_head_t cancel
Requests in the cancel state.
Definition: trunk.c:159
int trunk_connection_manage_schedule(trunk_t *trunk)
Schedule a trunk management event for the next time the event loop is executed.
Definition: trunk.c:4769
#define REQUEST_EXTRACT_SENT(_treq)
Remove the current request from the sent list.
Definition: trunk.c:742
static void _trunk_connection_on_shutdown(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the shutdown state.
Definition: trunk.c:3386
struct trunk_watch_entry_s trunk_watch_entry_t
An entry in a trunk watch function list.
fr_dlist_head_t reapable
Idle request.
Definition: trunk.c:157
fr_heap_t * pending
Requests waiting to be sent.
Definition: trunk.c:151
trunk_conf_t conf
Trunk common configuration.
Definition: trunk.c:204
static size_t trunk_connection_states_len
Definition: trunk.c:411
#define REQUEST_EXTRACT_REAPABLE(_treq)
Remove the current request from the reapable list.
Definition: trunk.c:747
trunk_connection_t * tconn
The request was associated with.
Definition: trunk.c:80
void trunk_connection_callback_readable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O read function.
Definition: trunk.c:3992
fr_rate_limit_t limit_last_failure_log
Rate limit on "Refusing to enqueue requests - No active conns".
Definition: trunk.c:278
void trunk_verify(char const *file, int line, trunk_t *trunk)
Verify a trunk.
Definition: trunk.c:4966
#define IN_HANDLER(_trunk)
Definition: trunk.c:702
static fr_table_num_ordered_t const trunk_connection_states[]
Definition: trunk.c:399
void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
Force the trunk to re-establish its connections.
Definition: trunk.c:4675
void trunk_connection_callback_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O write function.
Definition: trunk.c:4009
void * uctx
User data to pass to the function.
Definition: trunk.c:189
static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new)
Transition a request to the pending state, adding it to the backlog of an active connection.
Definition: trunk.c:1150
static void trunk_request_remove_from_conn(trunk_request_t *treq)
Remove a request from all connection lists.
Definition: trunk.c:959
fr_rate_limit_t limit_max_requests_alloc_log
Rate limit on "Refusing to alloc requests - Limit of * requests reached".
Definition: trunk.c:276
trunk_request_state_t to
What state we transitioned to.
Definition: trunk.c:78
static int8_t _trunk_request_prioritise(void const *a, void const *b)
Compare two protocol requests.
Definition: trunk.c:935
static void trunk_manage(trunk_t *trunk, fr_time_t now)
Implements the algorithm we use to manage requests per connection levels.
Definition: trunk.c:4141
static int _trunk_connection_free(trunk_connection_t *tconn)
Free a connection.
Definition: trunk.c:3658
trunk_io_funcs_t funcs
I/O functions.
Definition: trunk.c:256
fr_dlist_head_t draining
Connections that will be freed once all their requests are complete, but can be reactivated.
Definition: trunk.c:241
#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq)
Remove the current request from the cancel_partial slot.
Definition: trunk.c:757
int trunk_start(trunk_t *trunk)
Start the trunk running.
Definition: trunk.c:4714
void trunk_request_signal_partial(trunk_request_t *treq)
Signal a partial write.
Definition: trunk.c:2022
void trunk_request_signal_fail(trunk_request_t *treq)
Signal that a trunk request failed.
Definition: trunk.c:2120
fr_event_timer_t const * manage_ev
Periodic connection management event.
Definition: trunk.c:270
#define TREQ_OPTION_SEARCH(_option)
void trunk_request_signal_cancel_sent(trunk_request_t *treq)
Signal that a remote server has been notified of the cancellation.
Definition: trunk.c:2248
static void trunk_connection_enter_draining_to_free(trunk_connection_t *tconn)
Transition a connection to the draining-to-reconnect state.
Definition: trunk.c:3200
trunk_watch_t func
Function to call when a trunk enters.
Definition: trunk.c:185
void trunk_connection_signal_readable(trunk_connection_t *tconn)
Signal that a trunk connection is readable.
Definition: trunk.c:3899
trunk_watch_entry_t * trunk_add_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch, bool oneshot, void const *uctx)
Add a watch entry to the trunk state list.
Definition: trunk.c:861
#define DO_REQUEST_FREE(_treq)
Call the free callback (if set)
Definition: trunk.c:594
static void _trunk_connection_on_halted(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the halted state.
Definition: trunk.c:3610
#define REQUEST_EXTRACT_BACKLOG(_treq)
Remove the current request from the backlog.
Definition: trunk.c:713
fr_heap_index_t heap_id
Used to track the connection in the connected heap.
Definition: trunk.c:136
fr_dlist_head_t closed
Connections that have closed.
Definition: trunk.c:238
fr_dlist_head_t watch[TRUNK_STATE_MAX]
To be called when trunk changes state.
Definition: trunk.c:262
static void trunk_watch_call(trunk_t *trunk, fr_dlist_head_t *list, trunk_state_t state)
Call a list of watch functions associated with a state.
Definition: trunk.c:785
static void trunk_request_enter_cancel_complete(trunk_request_t *treq)
Cancellation was acked, the request is complete, free it.
Definition: trunk.c:1497
int line
Line change occurred on.
Definition: trunk.c:90
static void trunk_connection_enter_inactive_draining(trunk_connection_t *tconn)
Transition a connection to the inactive-draining state.
Definition: trunk.c:3138
#define CONN_STATE_TRANSITION(_new, _log)
Definition: trunk.c:437
static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out, trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify)
Update timestamps for when we last had a transition from above target to below target or vice versa.
Definition: trunk.c:4521
static size_t trunk_connection_events_len
Definition: trunk.c:427
static void _trunk_connection_on_failed(connection_t *conn, connection_state_t prev, connection_state_t state, void *uctx)
Connection failed.
Definition: trunk.c:3558
bool oneshot
Remove the function after it's called once.
Definition: trunk.c:187
bool started
Has the trunk been started.
Definition: trunk.c:287
static size_t trunk_states_len
Definition: trunk.c:397
#define TCONN_DLIST_VERIFY(_dlist, _state)
#define IO_FUNC_VERIFY(_func)
uint32_t trunk_request_count_by_connection(trunk_connection_t const *tconn, int req_state)
Return the count number of requests associated with a trunk connection.
Definition: trunk.c:2867
uint64_t last_req_per_conn
The last request to connection ratio we calculated.
Definition: trunk.c:292
#define DO_REQUEST_COMPLETE(_treq)
Call the complete callback (if set)
Definition: trunk.c:555
static void trunk_connection_auto_full(trunk_connection_t *tconn)
Automatically mark a connection as inactive.
Definition: trunk.c:2886
static void trunk_connection_remove(trunk_connection_t *tconn)
Remove a trunk connection from whichever list it's currently in.
Definition: trunk.c:3037
#define TRUNK_REQUEST_STATE_LOG_MAX
The maximum number of state logs to record per request.
Definition: trunk.c:69
static void trunk_connection_writable(trunk_connection_t *tconn)
A connection is writable.
Definition: trunk.c:2950
fr_event_timer_t const * lifetime_ev
Maximum time this connection can be open.
Definition: trunk.c:176
#define OVER_MAX_CHECK
trunk_connection_event_t events
The current events we expect to be notified on.
Definition: trunk.c:145
static int _trunk_free(trunk_t *trunk)
Free a trunk, gracefully closing all connections.
Definition: trunk.c:4801
fr_dlist_head_t failed
Connections that'll be reconnected shortly.
Definition: trunk.c:236
static void trunk_rebalance(trunk_t *trunk)
Rebalance connections across active trunk members when a new connection becomes active.
Definition: trunk.c:4077
static void trunk_backlog_drain(trunk_t *trunk)
Drain the backlog of as many requests as possible.
Definition: trunk.c:4620
#define DO_REQUEST_CANCEL(_treq, _reason)
Call the cancel callback if set.
Definition: trunk.c:516
static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
Order connections by queue depth.
Definition: trunk.c:4784
struct trunk_request_pub_s pub
Public fields in the trunk request.
Definition: trunk.c:98
#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state)
trunk_request_t * cancel_partial
Partially written cancellation request.
Definition: trunk.c:161
#define TCONN_MINMAX_HEAP_SEARCH(_heap)
uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Move requests off of a connection and requeue elsewhere.
Definition: trunk.c:2003
bool enabled
Whether the watch entry is enabled.
Definition: trunk.c:188
fr_time_t last_freed
Last time this request was freed.
Definition: trunk.c:111
#define DO_REQUEST_CONN_RELEASE(_treq)
Call the "conn_release" callback (if set)
Definition: trunk.c:537
#define TREQ_DLIST_SEARCH(_dlist)
#define REQUEST_EXTRACT_CANCEL(_treq)
Remove the current request from the cancel list.
Definition: trunk.c:752
static bool trunk_connection_is_full(trunk_connection_t *tconn)
Return whether a trunk connection should currently be considered full.
Definition: trunk.c:2909
struct trunk_pub_s pub
Public fields in the trunk connection.
Definition: trunk.c:196
trunk_cancel_reason_t cancel_reason
Why this request was cancelled.
Definition: trunk.c:109
#define REQUEST_BAD_STATE_TRANSITION(_new)
Definition: trunk.c:482
trunk_enqueue_t trunk_request_enqueue_on_conn(trunk_request_t **treq_out, trunk_connection_t *tconn, request_t *request, void *preq, void *rctx, bool ignore_limits)
Enqueue additional requests on a specific connection.
Definition: trunk.c:2729
static void _trunk_connection_on_closed(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection failed after it was connected.
Definition: trunk.c:3487
static fr_table_num_ordered_t const trunk_connection_events[]
Definition: trunk.c:421
trunk_enqueue_t trunk_request_enqueue(trunk_request_t **treq_out, trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
Definition: trunk.c:2575
#define TCONN_DLIST_SEARCH(_dlist)
static void trunk_request_enter_unassigned(trunk_request_t *treq)
Transition a request to the unassigned state, in preparation for re-assignment.
Definition: trunk.c:1053
struct trunk_request_s trunk_request_t
Definition: trunk.c:33
void * in_handler
Which handler we're inside.
Definition: trunk.c:258
bool freeing
Trunk is being freed, don't spawn new connections or re-enqueue.
Definition: trunk.c:284
static fr_table_num_ordered_t const trunk_states[]
Definition: trunk.c:392
static void trunk_connection_readable(trunk_connection_t *tconn)
A connection is readable.
Definition: trunk.c:2940
#define IS_SERVICEABLE(_tconn)
Definition: trunk.c:707
trunk_enqueue_t trunk_request_requeue(trunk_request_t *treq)
Re-enqueue a request on the same connection.
Definition: trunk.c:2664
#define IS_PROCESSING(_tconn)
Definition: trunk.c:708
#define RECONNECT_BY_STATE(_state, _list)
static void trunk_connection_enter_draining(trunk_connection_t *tconn)
Transition a connection to the draining state.
Definition: trunk.c:3168
static fr_table_num_indexed_bit_pos_t const trunk_req_trigger_names[]
Map request states to trigger names.
Definition: trunk.c:339
fr_dlist_t entry
Used to track the trunk request in the conn->sent or trunk->backlog request.
Definition: trunk.c:106
static void trunk_connection_close_if_empty(trunk_t *trunk, fr_dlist_head_t *head)
Close connections in a particular connection list if they have no requests associated with them.
Definition: trunk.c:4035
void trunk_request_signal_cancel_complete(trunk_request_t *treq)
Signal that a remote server acked our cancellation.
Definition: trunk.c:2272
static trunk_enqueue_t trunk_request_check_enqueue(trunk_connection_t **tconn_out, trunk_t *trunk, request_t *request)
Check to see if a trunk request can be enqueued.
Definition: trunk.c:1595
#define DO_REQUEST_MUX(_tconn)
Write one or more requests to a connection.
Definition: trunk.c:612
#define REQUEST_EXTRACT_PARTIAL(_treq)
Remove the current request from the partial slot.
Definition: trunk.c:733
fr_dlist_head_t sent
Sent request.
Definition: trunk.c:155
static void trunk_request_enter_partial(trunk_request_t *treq)
Transition a request to the partial state, indicating that is has been partially sent.
Definition: trunk.c:1221
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
Definition: trunk.c:3861
fr_dlist_head_t connecting
Connections which are not yet in the open state.
Definition: trunk.c:222
static void _trunk_timer(fr_event_list_t *el, fr_time_t now, void *uctx)
Event to periodically call the connection management function.
Definition: trunk.c:4450
#define TRUNK_STATE_TRANSITION(_new)
Definition: trunk.c:881
void trunk_request_signal_cancel(trunk_request_t *treq)
Cancel a trunk request.
Definition: trunk.c:2140
void trunk_request_state_log_entry_add(char const *function, int line, trunk_request_t *treq, trunk_request_state_t new)
Definition: trunk.c:2785
static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now)
Attempt to spawn a new connection.
Definition: trunk.c:3726
int trunk_del_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch)
Remove a watch function from a trunk state list.
Definition: trunk.c:827
struct trunk_connection_pub_s pub
Public fields in the trunk connection.
Definition: trunk.c:132
static void trunk_request_enter_reapable(trunk_request_t *treq)
Transition a request to the reapable state, indicating that it's been sent in its entirety,...
Definition: trunk.c:1304
uint16_t trunk_connection_count_by_state(trunk_t *trunk, int conn_state)
Return the count number of connections in the specified states.
Definition: trunk.c:2843
#define IN_REQUEST_DEMUX(_trunk)
Definition: trunk.c:704
#define DO_REQUEST_FAIL(_treq, _prev_state)
Call the fail callback (if set)
Definition: trunk.c:574
static void trunk_request_enter_cancel(trunk_request_t *treq, trunk_cancel_reason_t reason)
Transition a request to the cancel state, placing it in a connection's cancellation list.
Definition: trunk.c:1366
static trunk_enqueue_t trunk_request_enqueue_existing(trunk_request_t *treq)
Enqueue a request which has never been assigned to a connection or was previously cancelled.
Definition: trunk.c:1673
bool managing_connections
Whether the trunk is allowed to manage (open/close) connections.
Definition: trunk.c:289
#define DO_CONNECTION_ALLOC(_tconn)
Allocate a new connection.
Definition: trunk.c:663
char const * function
State change occurred in.
Definition: trunk.c:89
static size_t trunk_request_states_len
Definition: trunk.c:372
fr_dlist_head_t init
Connections which have not yet started connecting.
Definition: trunk.c:219
fr_dlist_head_t * log_head
To allow the log entry to remove itself on free.
Definition: trunk.c:75
static void trunk_request_enter_cancel_partial(trunk_request_t *treq)
Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot.
Definition: trunk.c:1417
static void _trunk_connection_on_connected(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connected state.
Definition: trunk.c:3438
fr_dlist_head_t to_free
Connections we're done with and will free on the next call to trunk_manage.
Definition: trunk.c:247
trunk_request_t * partial
Partially written request.
Definition: trunk.c:153
static void trunk_request_enter_failed(trunk_request_t *treq)
Request failed, inform the API client and free the request.
Definition: trunk.c:1559
fr_minmax_heap_t * active
Connections which can service requests.
Definition: trunk.c:224
conf_parser_t const trunk_config[]
Config parser definitions to populate a trunk_conf_t.
Definition: trunk.c:312
static void trunk_request_enter_complete(trunk_request_t *treq)
Request completed successfully, inform the API client and free the request.
Definition: trunk.c:1528
static void trunk_request_enter_sent(trunk_request_t *treq)
Transition a request to the sent state, indicating that it's been sent in its entirety.
Definition: trunk.c:1251
#define DO_REQUEST_CANCEL_MUX(_tconn)
Write one or more cancellation requests to a connection.
Definition: trunk.c:645
static void trunk_connection_enter_full(trunk_connection_t *tconn)
Transition a connection to the full state.
Definition: trunk.c:3093
void trunk_request_free(trunk_request_t **treq_to_free)
If the trunk request is freed then update the target requests.
Definition: trunk.c:2310
#define DO_REQUEST_DEMUX(_tconn)
Read one or more requests from a connection.
Definition: trunk.c:629
static uint64_t trunk_connection_requests_dequeue(fr_dlist_head_t *out, trunk_connection_t *tconn, int states, uint64_t max)
Shift requests in the specified states onto new connections.
Definition: trunk.c:1730
static int _trunk_request_free(trunk_request_t *treq)
Actually free the trunk request.
Definition: trunk.c:2429
char const * log_prefix
What to prepend to messages.
Definition: trunk.c:200
#define REQUEST_EXTRACT_PENDING(_treq)
Remove the current request from the pending list.
Definition: trunk.c:723
static void trunk_connection_event_update(trunk_connection_t *tconn)
Update the registrations for I/O events we're interested in.
Definition: trunk.c:2973
static conf_parser_t const trunk_config_request[]
Definition: trunk.c:296
fr_dlist_head_t full
Connections which have too many outstanding requests.
Definition: trunk.c:226
#define DEQUEUE_ALL(_src_list, _state)
static void trunk_request_enter_backlog(trunk_request_t *treq, bool new)
Transition a request to the backlog state, adding it to the backlog of the trunk.
Definition: trunk.c:1088
static fr_table_num_ordered_t const trunk_request_states[]
Definition: trunk.c:357
static void _trunk_connection_on_connecting(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connecting state.
Definition: trunk.c:3342
static void _trunk_connection_lifetime_expire(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Trigger a reconnection of the trunk connection.
Definition: trunk.c:3420
static fr_table_num_indexed_bit_pos_t const trunk_conn_trigger_names[]
Map connection states to trigger names.
Definition: trunk.c:378
fr_dlist_head_t draining_to_free
Connections that will be freed once all their requests are complete.
Definition: trunk.c:244
uint64_t id
Trunk request ID.
Definition: trunk.c:102
uint64_t sent_count
The number of requests that have been sent using this connection.
Definition: trunk.c:169
static void _trunk_connection_on_init(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the the init state.
Definition: trunk.c:3307
trunk_request_t * trunk_request_alloc(trunk_t *trunk, request_t *request)
(Pre-)Allocate a new trunk request
Definition: trunk.c:2462
#define DO_CONNECTION_NOTIFY(_tconn, _events)
Change what events the connection should be notified about.
Definition: trunk.c:685
#define TREQ_DLIST_VERIFY(_dlist, _state)
fr_dlist_head_t inactive
Connections which have been signalled to be inactive by the API client.
Definition: trunk.c:229
void trunk_connection_manage_stop(trunk_t *trunk)
Stop the trunk from opening and closing connections in response to load.
Definition: trunk.c:4759
#define TREQ_HEAP_VERIFY(_heap, _state)
void trunk_connection_signal_active(trunk_connection_t *tconn)
Signal a trunk connection is no longer full.
Definition: trunk.c:3938
fr_dlist_head_t log
State change log.
Definition: trunk.c:121
uint64_t tconn_id
If the treq was associated with a connection the connection ID.
Definition: trunk.c:83
fr_dlist_t entry
Used to track the connection in the connecting, full and failed lists.
Definition: trunk.c:139
static void trunk_request_enter_cancel_sent(trunk_request_t *treq)
Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list.
Definition: trunk.c:1452
static void trunk_connection_enter_inactive(trunk_connection_t *tconn)
Transition a connection to the inactive state.
Definition: trunk.c:3115
trunk_request_state_t from
What state we transitioned from.
Definition: trunk.c:77
fr_dlist_head_t cancel_sent
Sent cancellation request.
Definition: trunk.c:163
void trunk_connection_manage_start(trunk_t *trunk)
Allow the trunk to open and close connections in response to load.
Definition: trunk.c:4748
fr_dlist_head_t inactive_draining
Connections which have been signalled to be inactive by the API client, which the trunk manager is dr...
Definition: trunk.c:232
void trunk_connection_signal_inactive(trunk_connection_t *tconn)
Signal a trunk connection cannot accept more requests.
Definition: trunk.c:3915
static int _state_log_entry_free(trunk_request_state_log_t *slog)
Used for sanity checks to ensure all log entries have been freed.
Definition: trunk.c:2778
void trunk_connection_verify(char const *file, int line, trunk_connection_t *tconn)
Definition: trunk.c:5027
fr_heap_t * backlog
The request backlog.
Definition: trunk.c:209
#define IN_REQUEST_CANCEL_MUX(_trunk)
Definition: trunk.c:705
void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
Definition: trunk.c:5087
uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
Return a count of requests on a connection in a specific state.
Definition: trunk.c:4472
void trunk_request_signal_cancel_partial(trunk_request_t *treq)
Signal a partial cancel write.
Definition: trunk.c:2224
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
Definition: trunk.c:2043
#define COUNT_BY_STATE(_state, _list)
void * uctx
Uctx data to pass to alloc.
Definition: trunk.c:260
#define TREQ_OPTION_VERIFY(_option, _state)
bool trunk_connection_search(trunk_connection_t *tconn, void *ptr)
Definition: trunk.c:5143
#define CONN_BAD_STATE_TRANSITION(_new)
Definition: trunk.c:448
fr_heap_index_t heap_id
Used to track the request conn->pending heap.
Definition: trunk.c:104
#define REQUEST_STATE_TRANSITION(_new)
Record a request state transition and log appropriate output.
Definition: trunk.c:471
trunk_watch_entry_t * next_watcher
Watcher about to be run. Used to prevent nested watchers.
Definition: trunk.c:264
static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Remove requests in specified states from a connection, attempting to distribute them to new connectio...
Definition: trunk.c:1841
bool sent
Trunk request has been sent at least once.
Definition: trunk.c:116
void trunk_request_signal_complete(trunk_request_t *treq)
Signal that a trunk request is complete.
Definition: trunk.c:2087
static void trunk_connection_auto_unfull(trunk_connection_t *tconn)
Automatically mark a connection as active or reconnect it.
Definition: trunk.c:2927
void trunk_connection_signal_reconnect(trunk_connection_t *tconn, connection_reason_t reason)
Signal a trunk connection is no longer viable.
Definition: trunk.c:3977
void trunk_connection_signal_writable(trunk_connection_t *tconn)
Signal that a trunk connection is writable.
Definition: trunk.c:3881
bool trunk_request_search(trunk_request_t *treq, void *ptr)
Definition: trunk.c:5201
fr_dlist_t entry
List entry.
Definition: trunk.c:184
static conf_parser_t const trunk_config_connection[]
Definition: trunk.c:304
trunk_connection_state_t tconn_state
If the treq was associated with a connection the connection state at the time of the state transition...
Definition: trunk.c:85
bool bound_to_conn
Fail the request if there's an attempt to re-enqueue it.
Definition: trunk.c:113
static size_t trunk_cancellation_reasons_len
Definition: trunk.c:419
static fr_table_num_ordered_t const trunk_cancellation_reasons[]
Definition: trunk.c:413
static size_t trunk_conn_trigger_names_len
Definition: trunk.c:390
fr_event_list_t * el
Event list used by this trunk and the connection.
Definition: trunk.c:202
void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line, trunk_request_t const *treq)
Definition: trunk.c:2816
#define IN_REQUEST_MUX(_trunk)
Definition: trunk.c:703
fr_dlist_head_t free_requests
Requests in the unassigned state.
Definition: trunk.c:206
trunk_t * trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, trunk_io_funcs_t const *funcs, trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start)
Allocate a new collection of connections.
Definition: trunk.c:4885
bool trunk_connection_in_state(trunk_connection_t *tconn, int state)
Returns true if the trunk connection is in one of the specified states.
Definition: trunk.c:4025
#define TREQ_HEAP_SEARCH(_heap)
#define REQUEST_EXTRACT_CANCEL_SENT(_treq)
Remove the current request from the cancel sent list.
Definition: trunk.c:766
fr_dlist_t entry
Entry in the linked list.
Definition: trunk.c:76
void trunk_request_signal_reapable(trunk_request_t *treq)
Signal that the request was written to a connection successfully, but no response is expected.
Definition: trunk.c:2065
Associates request queues with a connection.
Definition: trunk.c:131
Wraps a normal request.
Definition: trunk.c:97
Trace state machine changes for a particular request.
Definition: trunk.c:74
Main trunk management handle.
Definition: trunk.c:195
An entry in a trunk watch function list.
Definition: trunk.c:183
uint16_t max
Maximum number of connections in the trunk.
Definition: trunk.h:231
uint32_t max_req_per_conn
Maximum requests per connection.
Definition: trunk.h:240
trunk_t *_CONST trunk
Trunk this request belongs to.
Definition: trunk.h:343
bool backlog_on_failed_conn
Assign requests to the backlog when there are no available connections and the last connection event ...
Definition: trunk.h:279
uint16_t min
Shouldn't let connections drop below this number.
Definition: trunk.h:229
#define TRUNK_REQUEST_STATE_ALL
All request states.
Definition: trunk.h:195
void *_CONST rctx
Resume ctx of the module.
Definition: trunk.h:349
trunk_t *_CONST trunk
Trunk this connection belongs to.
Definition: trunk.h:367
fr_heap_cmp_t connection_prioritise
Ordering function for connections.
Definition: trunk.h:729
trunk_connection_state_t
Used for sanity checks and to track which list the connection is in.
Definition: trunk.h:87
@ TRUNK_CONN_FULL
Connection is full and can't accept any more requests.
Definition: trunk.h:95
@ TRUNK_CONN_CONNECTING
Connection is connecting.
Definition: trunk.h:90
@ TRUNK_CONN_DRAINING
Connection will be closed once it has no more outstanding requests, if it's not reactivated.
Definition: trunk.h:101
@ TRUNK_CONN_INACTIVE_DRAINING
Connection is inactive, can't accept any more requests, and will be closed once it has no more outsta...
Definition: trunk.h:97
@ TRUNK_CONN_INACTIVE
Connection is inactive and can't accept any more requests.
Definition: trunk.h:96
@ TRUNK_CONN_HALTED
Halted, ready to be freed.
Definition: trunk.h:88
@ TRUNK_CONN_CLOSED
Connection was closed, either explicitly or due to failure.
Definition: trunk.h:94
@ TRUNK_CONN_INIT
In the initial state.
Definition: trunk.h:89
@ TRUNK_CONN_DRAINING_TO_FREE
Connection will be closed once it has no more outstanding requests.
Definition: trunk.h:103
@ TRUNK_CONN_ACTIVE
Connection is connected and ready to service requests.
Definition: trunk.h:91
unsigned req_pool_headers
How many chunk headers the talloc pool allocated with the treq should contain.
Definition: trunk.h:264
request_t *_CONST request
The request that we're writing the data on behalf of.
Definition: trunk.h:351
fr_time_t _CONST last_open
Last time the connection management function opened a connection.
Definition: trunk.h:302
trunk_connection_state_t _CONST state
What state the connection is in.
Definition: trunk.h:363
size_t req_pool_size
The size of the talloc pool allocated with the treq.
Definition: trunk.h:267
uint64_t max_uses
The maximum time a connection can be used.
Definition: trunk.h:246
fr_time_delta_t lifetime
Time between reconnects.
Definition: trunk.h:248
uint16_t connecting
Maximum number of connections that can be in the connecting state.
Definition: trunk.h:233
uint64_t _CONST req_alloc_reused
How many requests were reused.
Definition: trunk.h:324
uint32_t max_backlog
Maximum number of requests that can be in the backlog.
Definition: trunk.h:244
fr_time_t _CONST last_failed
Last time a connection failed.
Definition: trunk.h:310
trunk_request_state_t _CONST state
Which list the request is now located in.
Definition: trunk.h:341
trunk_connection_t *_CONST tconn
Connection this request belongs to.
Definition: trunk.h:345
trunk_connection_alloc_t connection_alloc
Allocate a new connection_t.
Definition: trunk.h:725
fr_time_t _CONST last_read_success
Last time we read a response.
Definition: trunk.h:312
fr_time_t _CONST last_below_target
Last time average utilisation went below the target value.
Definition: trunk.h:299
fr_time_delta_t close_delay
How long we must be below target utilisation to close an existing connection.
Definition: trunk.h:253
uint16_t start
How many connections to start.
Definition: trunk.h:227
fr_time_delta_t req_cleanup_delay
How long must a request in the unassigned (free) list not have been used for before it's cleaned up a...
Definition: trunk.h:257
#define TRUNK_REQUEST_STATE_CANCEL_ALL
All requests in various cancellation states.
Definition: trunk.h:213
bool always_writable
Set to true if our ability to write requests to a connection handle is not dependent on the state of ...
Definition: trunk.h:269
trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
Definition: trunk.h:72
@ TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
Definition: trunk.h:79
@ TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
Definition: trunk.h:77
@ TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
Definition: trunk.h:73
@ TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
Definition: trunk.h:75
#define TRUNK_CONN_ALL
All connection states.
Definition: trunk.h:111
fr_heap_cmp_t request_prioritise
Ordering function for requests.
Definition: trunk.h:731
uint64_t _CONST req_alloc
The number of requests currently allocated that have not been freed or returned to the free list.
Definition: trunk.h:318
trunk_cancel_reason_t
Reasons for a request being cancelled.
Definition: trunk.h:55
@ TRUNK_CANCEL_REASON_NONE
Request has not been cancelled.
Definition: trunk.h:56
@ TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
Definition: trunk.h:57
@ TRUNK_CANCEL_REASON_REQUEUE
A previously sent request is being requeued.
Definition: trunk.h:59
@ TRUNK_CANCEL_REASON_MOVE
Request cancelled because it's being moved.
Definition: trunk.h:58
uint64_t _CONST req_alloc_new
How many requests we've allocated.
Definition: trunk.h:322
fr_time_delta_t open_delay
How long we must be above target utilisation to spawn a new connection.
Definition: trunk.h:250
connection_t *_CONST conn
The underlying connection.
Definition: trunk.h:365
trunk_state_t
Definition: trunk.h:62
@ TRUNK_STATE_MAX
Definition: trunk.h:66
@ TRUNK_STATE_PENDING
Trunk has connections, but none are active.
Definition: trunk.h:65
@ TRUNK_STATE_ACTIVE
Trunk has active connections.
Definition: trunk.h:64
@ TRUNK_STATE_IDLE
Trunk has no connections.
Definition: trunk.h:63
fr_time_t _CONST last_closed
Last time the connection management function closed a connection.
Definition: trunk.h:305
void(* trunk_watch_t)(trunk_t *trunk, trunk_state_t prev, trunk_state_t state, void *uctx)
Receive a notification when a trunk enters a particular state.
Definition: trunk.h:716
fr_time_delta_t manage_interval
How often we run the management algorithm to open/close connections.
Definition: trunk.h:261
trunk_enqueue_t
Definition: trunk.h:148
@ TRUNK_ENQUEUE_DST_UNAVAILABLE
Destination is down.
Definition: trunk.h:153
@ TRUNK_ENQUEUE_FAIL
General failure.
Definition: trunk.h:154
@ TRUNK_ENQUEUE_OK
Operation was successful.
Definition: trunk.h:150
@ TRUNK_ENQUEUE_NO_CAPACITY
At maximum number of connections, and no connection has capacity.
Definition: trunk.h:151
@ TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
Definition: trunk.h:149
void *_CONST preq
Data for the muxer to write to the connection.
Definition: trunk.h:347
uint32_t target_req_per_conn
How many pending requests should ideally be running on each connection.
Definition: trunk.h:236
fr_time_t _CONST last_connected
Last time a connection connected.
Definition: trunk.h:308
trunk_request_cancel_mux_t request_cancel_mux
!< Read one or more requests from a connection.
Definition: trunk.h:738
trunk_request_state_t
Used for sanity checks and to simplify freeing.
Definition: trunk.h:161
@ TRUNK_REQUEST_STATE_PARTIAL
Some of the request was written to the socket, more of it should be written later.
Definition: trunk.h:170
@ TRUNK_REQUEST_STATE_REAPABLE
Request has been written, needs to persist, but we are not currently waiting for any response.
Definition: trunk.h:173
@ TRUNK_REQUEST_STATE_UNASSIGNED
Transition state - Request currently not assigned to any connection.
Definition: trunk.h:165
@ TRUNK_REQUEST_STATE_INIT
Initial state.
Definition: trunk.h:162
@ TRUNK_REQUEST_STATE_CANCEL_SENT
We've informed the remote server that the request has been cancelled.
Definition: trunk.h:185
@ TRUNK_REQUEST_STATE_COMPLETE
The request is complete.
Definition: trunk.h:182
@ TRUNK_REQUEST_STATE_FAILED
The request failed.
Definition: trunk.h:183
@ TRUNK_REQUEST_STATE_CANCEL
A request on a particular socket was cancel.
Definition: trunk.h:184
@ TRUNK_REQUEST_STATE_CANCEL_PARTIAL
We partially wrote a cancellation request.
Definition: trunk.h:187
@ TRUNK_REQUEST_STATE_BACKLOG
In the backlog.
Definition: trunk.h:167
@ TRUNK_REQUEST_STATE_CANCEL_COMPLETE
Remote server has acknowledged our cancellation.
Definition: trunk.h:188
@ TRUNK_REQUEST_STATE_PENDING
In the queue of a connection and is pending writing.
Definition: trunk.h:168
@ TRUNK_REQUEST_STATE_SENT
Was written to a socket. Waiting for a response.
Definition: trunk.h:172
trunk_state_t _CONST state
Current state of the trunk.
Definition: trunk.h:329
fr_time_t _CONST last_above_target
Last time average utilisation went above the target value.
Definition: trunk.h:296
Common configuration parameters for a trunk.
Definition: trunk.h:224
Public fields for the trunk connection.
Definition: trunk.h:362
I/O functions to pass to trunk_alloc.
Definition: trunk.h:724
Public fields for the trunk.
Definition: trunk.h:292
Public fields for the trunk request.
Definition: trunk.h:340
close(uq->fd)
static fr_event_list_t * el
static fr_slen_t head
Definition: xlat.h:406
char const * fr_strerror(void)
Get the last library error.
Definition: strerror.c:554
#define fr_box_time_delta(_val)
Definition: value.h:343
int nonnull(2, 5))
static size_t char ** out
Definition: value.h:997