The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
trunk.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 37187bfb17231ec53d5fb876085590c6c1f12428 $
19 *
20 * @file src/lib/server/trunk.c
21 * @brief A management API for bonding multiple connections together.
22 *
23 * @copyright 2019-2020 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
24 * @copyright 2019-2020 The FreeRADIUS server project
25 */
26
27#define LOG_PREFIX trunk->log_prefix
28
29#ifdef NDEBUG
30# define TALLOC_GET_TYPE_ABORT_NOOP 1
31#endif
32
35typedef struct trunk_s trunk_t;
36#define _TRUNK_PRIVATE 1
37#include <freeradius-devel/server/trunk.h>
38
39#include <freeradius-devel/server/trigger.h>
40#include <freeradius-devel/util/debug.h>
41#include <freeradius-devel/util/misc.h>
42#include <freeradius-devel/util/syserror.h>
43#include <freeradius-devel/util/minmax_heap.h>
44
45#ifdef HAVE_STDATOMIC_H
46# include <stdatomic.h>
47# ifndef ATOMIC_VAR_INIT
48# define ATOMIC_VAR_INIT(_x) (_x)
49# endif
50#else
51# include <freeradius-devel/util/stdatomic.h>
52#endif
53
54static atomic_uint_fast64_t request_counter = ATOMIC_VAR_INIT(1);
55
56#ifdef TESTING_TRUNK
58
59static fr_time_t test_time(void)
60{
61 return test_time_base;
62}
63
64#define fr_time test_time
65#endif
66
67#ifndef NDEBUG
68/** The maximum number of state logs to record per request
69 *
70 */
71#define TRUNK_REQUEST_STATE_LOG_MAX 20
72
73/** Trace state machine changes for a particular request
74 *
75 */
76typedef struct {
77 fr_dlist_head_t *log_head; //!< To allow the log entry to remove itself on free.
78 fr_dlist_t entry; //!< Entry in the linked list.
79 trunk_request_state_t from; //!< What state we transitioned from.
80 trunk_request_state_t to; //!< What state we transitioned to.
81
82 trunk_connection_t *tconn; //!< The request was associated with.
83 ///< Pointer may now be invalid, do no de-reference.
84
85 uint64_t tconn_id; //!< If the treq was associated with a connection
86 ///< the connection ID.
87 trunk_connection_state_t tconn_state; //!< If the treq was associated with a connection
88 ///< the connection state at the time of the
89 ///< state transition.
90
91 char const *function; //!< State change occurred in.
92 int line; //!< Line change occurred on.
94#endif
95
96/** Wraps a normal request
97 *
98 */
100 struct trunk_request_pub_s pub; //!< Public fields in the trunk request.
101 ///< This *MUST* be the first field in this
102 ///< structure.
103
104 uint64_t id; //!< Trunk request ID.
105
106 fr_heap_index_t heap_id; //!< Used to track the request conn->pending heap.
107
108 fr_dlist_t entry; //!< Used to track the trunk request in the conn->sent
109 ///< or trunk->backlog request.
110
111 trunk_cancel_reason_t cancel_reason; //!< Why this request was cancelled.
112
113 fr_time_t last_freed; //!< Last time this request was freed.
114
115 bool bound_to_conn; //!< Fail the request if there's an attempt to
116 ///< re-enqueue it.
117
118 bool sent; //!< Trunk request has been sent at least once.
119 ///< Used so that re-queueing doesn't increase trunk
120 ///< `sent` count.
121
122#ifndef NDEBUG
123 fr_dlist_head_t log; //!< State change log.
124#endif
125};
126
127
128/** Associates request queues with a connection
129 *
130 * @dotfile src/lib/server/trunk_conn.gv "Trunk connection state machine"
131 * @dotfile src/lib/server/trunk_req.gv "Trunk request state machine"
132 */
134 struct trunk_connection_pub_s pub; //!< Public fields in the trunk connection.
135 ///< This *MUST* be the first field in this
136 ///< structure.
137
138 fr_heap_index_t heap_id; //!< Used to track the connection in the connected
139 ///< heap.
140
141 fr_dlist_t entry; //!< Used to track the connection in the connecting,
142 ///< full and failed lists.
143
144 /** @name State
145 * @{
146 */
147 trunk_connection_event_t events; //!< The current events we expect to be notified on.
148 /** @} */
149
150 /** @name Request lists
151 * @{
152 */
153 fr_heap_t *pending; //!< Requests waiting to be sent.
154
155 trunk_request_t *partial; //!< Partially written request.
156
157 fr_dlist_head_t sent; //!< Sent request.
158
159 fr_dlist_head_t reapable; //!< Idle request.
160
161 fr_dlist_head_t cancel; //!< Requests in the cancel state.
162
163 trunk_request_t *cancel_partial; //!< Partially written cancellation request.
164
165 fr_dlist_head_t cancel_sent; //!< Sent cancellation request.
166 /** @} */
167
168 /** @name Statistics
169 * @{
170 */
171 uint64_t sent_count; //!< The number of requests that have been sent using
172 ///< this connection.
173 /** @} */
174
175 /** @name Timers
176 * @{
177 */
178 fr_timer_t *lifetime_ev; //!< Maximum time this connection can be open.
179 /** @} */
180};
181
182/** An entry in a trunk watch function list
183 *
184 */
185typedef struct trunk_watch_entry_s {
186 fr_dlist_t entry; //!< List entry.
187 trunk_watch_t func; //!< Function to call when a trunk enters
188 ///< the state this list belongs to
189 bool oneshot; //!< Remove the function after it's called once.
190 bool enabled; //!< Whether the watch entry is enabled.
191 void *uctx; //!< User data to pass to the function.
193
194/** Map connection states to trigger names
195 *
196 * Must stay in the same order as #trunk_connection_state_t
197 */
199 { L("pool.connection_halted"), TRUNK_CONN_HALTED }, /* 0x0000 - bit 0 */
200 { L("pool.connection_init"), TRUNK_CONN_INIT }, /* 0x0001 - bit 1 */
201 { L("pool.connection_connecting"), TRUNK_CONN_CONNECTING }, /* 0x0002 - bit 2 */
202 { L("pool.connection_active"), TRUNK_CONN_ACTIVE }, /* 0x0004 - bit 3 */
203 { L("pool.connection_closed"), TRUNK_CONN_CLOSED }, /* 0x0008 - bit 4 */
204 { L("pool.connection_full"), TRUNK_CONN_FULL }, /* 0x0010 - bit 5 */
205 { L("pool.connection_inactive"), TRUNK_CONN_INACTIVE }, /* 0x0020 - bit 6 */
206 { L("pool.connection_inactive_draining"), TRUNK_CONN_INACTIVE_DRAINING }, /* 0x0040 - bit 7 */
207 { L("pool.connection_draining"), TRUNK_CONN_DRAINING }, /* 0x0080 - bit 8 */
208 { L("pool.connection_draining_to_free"), TRUNK_CONN_DRAINING_TO_FREE } /* 0x0100 - bit 9 */
209};
211
212/** Main trunk management handle
213 *
214 */
215struct trunk_s {
216 struct trunk_pub_s pub; //!< Public fields in the trunk connection.
217 ///< This *MUST* be the first field in this
218 ///< structure.
219
220 char const *log_prefix; //!< What to prepend to messages.
221
222 fr_event_list_t *el; //!< Event list used by this trunk and the connection.
223
224 trunk_conf_t conf; //!< Trunk common configuration.
225
226 fr_dlist_head_t free_requests; //!< Requests in the unassigned state. Waiting to be
227 ///< enqueued.
228
229 fr_heap_t *backlog; //!< The request backlog. Requests we couldn't
230 ///< immediately assign to a connection.
231
232 /** @name Connection lists
233 *
234 * A connection must always be in exactly one of these lists
235 * or trees.
236 *
237 * @{
238 */
239 fr_dlist_head_t init; //!< Connections which have not yet started
240 ///< connecting.
241
242 fr_dlist_head_t connecting; //!< Connections which are not yet in the open state.
243
244 fr_minmax_heap_t *active; //!< Connections which can service requests.
245
246 fr_dlist_head_t full; //!< Connections which have too many outstanding
247 ///< requests.
248
249 fr_dlist_head_t inactive; //!< Connections which have been signalled to be
250 ///< inactive by the API client.
251
252 fr_dlist_head_t inactive_draining; //!< Connections which have been signalled to be
253 ///< inactive by the API client, which the trunk
254 ///< manager is draining to close.
255
256 fr_dlist_head_t failed; //!< Connections that'll be reconnected shortly.
257
258 fr_dlist_head_t closed; //!< Connections that have closed. Either due to
259 ///< shutdown, reconnection or failure.
260
261 fr_dlist_head_t draining; //!< Connections that will be freed once all their
262 ///< requests are complete, but can be reactivated.
263
264 fr_dlist_head_t draining_to_free; //!< Connections that will be freed once all their
265 ///< requests are complete.
266
267 fr_dlist_head_t to_free; //!< Connections we're done with and will free on
268 //!< the next call to trunk_manage.
269 //!< This prevents connections from being freed
270 //!< whilst we're inside callbacks.
271 /** @} */
272
273 /** @name Callbacks
274 * @{
275 */
276 trunk_io_funcs_t funcs; //!< I/O functions.
277
278 void *in_handler; //!< Which handler we're inside.
279
280 void *uctx; //!< Uctx data to pass to alloc.
281
282 fr_dlist_head_t watch[TRUNK_STATE_MAX]; //!< To be called when trunk changes state.
283
284 trunk_watch_entry_t *next_watcher; //!< Watcher about to be run. Used to prevent nested watchers.
285 /** @} */
286
287 /** @name Timers
288 * @{
289 */
290 fr_timer_t *manage_ev; //!< Periodic connection management event.
291 /** @} */
292
293 /** @name Log rate limiting entries
294 * @{
295 */
296 fr_rate_limit_t limit_max_requests_alloc_log; //!< Rate limit on "Refusing to alloc requests - Limit of * requests reached"
297
298 fr_rate_limit_t limit_last_failure_log; //!< Rate limit on "Refusing to enqueue requests - No active conns"
299 /** @} */
300
301 /** @name State
302 * @{
303 */
304 bool freeing; //!< Trunk is being freed, don't spawn new
305 ///< connections or re-enqueue.
306
307 bool started; //!< Has the trunk been started.
308
309 bool managing_connections; //!< Whether the trunk is allowed to manage
310 ///< (open/close) connections.
311
312 uint64_t last_req_per_conn; //!< The last request to connection ratio we calculated.
313 /** @} */
314
315 fr_pair_list_t *trigger_args; //!< Passed to trigger
316
317 bool trigger_undef[NUM_ELEMENTS(trunk_conn_trigger_names)]; //!< Record that a specific trigger is undefined.
318
320};
321
322int trunk_trigger_cf_parse(TALLOC_CTX *ctx, void *out, void *parent, CONF_ITEM *ci, conf_parser_t const *rule);
323
325 { FR_CONF_OFFSET("per_connection_max", trunk_conf_t, max_req_per_conn), .dflt = "2000" },
326 { FR_CONF_OFFSET("per_connection_target", trunk_conf_t, target_req_per_conn), .dflt = "1000" },
327 { FR_CONF_OFFSET("free_delay", trunk_conf_t, req_cleanup_delay), .dflt = "10.0" },
328 { FR_CONF_OFFSET("triggers", trunk_conf_t, req_triggers), .func = trunk_trigger_cf_parse },
329
331};
332
334 { FR_CONF_OFFSET("connect_timeout", connection_conf_t, connection_timeout), .dflt = "3.0" },
335 { FR_CONF_OFFSET("reconnect_delay", connection_conf_t, reconnection_delay), .dflt = "1" },
336
338};
339
340#ifndef TRUNK_TESTS
342 { FR_CONF_OFFSET("start", trunk_conf_t, start), .dflt = "1" },
343 { FR_CONF_OFFSET("min", trunk_conf_t, min), .dflt = "1" },
344 { FR_CONF_OFFSET("max", trunk_conf_t, max), .dflt = "5" },
345 { FR_CONF_OFFSET("connecting", trunk_conf_t, connecting), .dflt = "2" },
346 { FR_CONF_OFFSET("uses", trunk_conf_t, max_uses), .dflt = "0" },
347 { FR_CONF_OFFSET("lifetime", trunk_conf_t, lifetime), .dflt = "0" },
348 { FR_CONF_OFFSET("idle_timeout", trunk_conf_t, idle_timeout), .dflt = "0" },
349
350 { FR_CONF_OFFSET("open_delay", trunk_conf_t, open_delay), .dflt = "0.2" },
351 { FR_CONF_OFFSET("close_delay", trunk_conf_t, close_delay), .dflt = "10.0" },
352
353 { FR_CONF_OFFSET("manage_interval", trunk_conf_t, manage_interval), .dflt = "0.2" },
354
355 { FR_CONF_OFFSET("max_backlog", trunk_conf_t, max_backlog), .dflt = "1000" },
356
357 { FR_CONF_OFFSET("backlog_on_failed_conn", trunk_conf_t, backlog_on_failed_conn), },
358
359 { FR_CONF_OFFSET("triggers", trunk_conf_t, conn_triggers), .func = trunk_trigger_cf_parse },
360
361 { FR_CONF_OFFSET_SUBSECTION("connection", 0, trunk_conf_t, conn_conf, trunk_config_connection), .subcs_size = sizeof(trunk_config_connection) },
362 { FR_CONF_POINTER("request", 0, CONF_FLAG_SUBSECTION, NULL), .subcs = (void const *) trunk_config_request },
363
365};
366#endif
367
368#ifndef NDEBUG
369/** Map request states to trigger names
370 *
371 * Must stay in the same order as #trunk_connection_state_t
372 */
374 { L("pool.request_init"), TRUNK_REQUEST_STATE_INIT }, /* 0x0000 - bit 0 */
375 { L("pool.request_unassigned"), TRUNK_REQUEST_STATE_UNASSIGNED }, /* 0x0001 - bit 1 */
376 { L("pool.request_backlog"), TRUNK_REQUEST_STATE_BACKLOG }, /* 0x0002 - bit 2 */
377 { L("pool.request_pending"), TRUNK_REQUEST_STATE_PENDING }, /* 0x0004 - bit 3 */
378 { L("pool.request_partial"), TRUNK_REQUEST_STATE_PARTIAL }, /* 0x0008 - bit 4 */
379 { L("pool.request_sent"), TRUNK_REQUEST_STATE_SENT }, /* 0x0010 - bit 5 */
380 { L("pool.request_state_reapable"), TRUNK_REQUEST_STATE_REAPABLE }, /* 0x0020 - bit 6 */
381 { L("pool.request_complete"), TRUNK_REQUEST_STATE_COMPLETE }, /* 0x0040 - bit 7 */
382 { L("pool.request_state_failed"), TRUNK_REQUEST_STATE_FAILED }, /* 0x0080 - bit 8 */
383 { L("pool.request_state_cancel"), TRUNK_REQUEST_STATE_CANCEL }, /* 0x0100 - bit 9 */
384 { L("pool.request_state_cancel_sent"), TRUNK_REQUEST_STATE_CANCEL_SENT }, /* 0x0200 - bit 10 */
385 { L("pool.request_state_cancel_partial"), TRUNK_REQUEST_STATE_CANCEL_PARTIAL }, /* 0x0400 - bit 11 */
386 { L("pool.request_state_cancel_complete"), TRUNK_REQUEST_STATE_CANCEL_COMPLETE }, /* 0x0800 - bit 12 */
387};
389#endif
390
392 { L("INIT"), TRUNK_REQUEST_STATE_INIT },
393 { L("UNASSIGNED"), TRUNK_REQUEST_STATE_UNASSIGNED },
394 { L("BACKLOG"), TRUNK_REQUEST_STATE_BACKLOG },
395 { L("PENDING"), TRUNK_REQUEST_STATE_PENDING },
396 { L("PARTIAL"), TRUNK_REQUEST_STATE_PARTIAL },
397 { L("SENT"), TRUNK_REQUEST_STATE_SENT },
398 { L("REAPABLE"), TRUNK_REQUEST_STATE_REAPABLE },
399 { L("COMPLETE"), TRUNK_REQUEST_STATE_COMPLETE },
400 { L("FAILED"), TRUNK_REQUEST_STATE_FAILED },
401 { L("CANCEL"), TRUNK_REQUEST_STATE_CANCEL },
402 { L("CANCEL-SENT"), TRUNK_REQUEST_STATE_CANCEL_SENT },
403 { L("CANCEL-PARTIAL"), TRUNK_REQUEST_STATE_CANCEL_PARTIAL },
404 { L("CANCEL-COMPLETE"), TRUNK_REQUEST_STATE_CANCEL_COMPLETE }
405};
407
409 { L("IDLE"), TRUNK_STATE_IDLE },
410 { L("ACTIVE"), TRUNK_STATE_ACTIVE },
411 { L("PENDING"), TRUNK_STATE_PENDING }
412};
414
416 { L("INIT"), TRUNK_CONN_INIT },
417 { L("HALTED"), TRUNK_CONN_HALTED },
418 { L("CONNECTING"), TRUNK_CONN_CONNECTING },
419 { L("ACTIVE"), TRUNK_CONN_ACTIVE },
420 { L("CLOSED"), TRUNK_CONN_CLOSED },
421 { L("FULL"), TRUNK_CONN_FULL },
422 { L("INACTIVE"), TRUNK_CONN_INACTIVE },
423 { L("INACTIVE-DRAINING"), TRUNK_CONN_INACTIVE_DRAINING },
424 { L("DRAINING"), TRUNK_CONN_DRAINING },
425 { L("DRAINING-TO-FREE"), TRUNK_CONN_DRAINING_TO_FREE }
426};
428
430 { L("TRUNK_CANCEL_REASON_NONE"), TRUNK_CANCEL_REASON_NONE },
431 { L("TRUNK_CANCEL_REASON_SIGNAL"), TRUNK_CANCEL_REASON_SIGNAL },
432 { L("TRUNK_CANCEL_REASON_MOVE"), TRUNK_CANCEL_REASON_MOVE },
433 { L("TRUNK_CANCEL_REASON_REQUEUE"), TRUNK_CANCEL_REASON_REQUEUE }
434};
436
438 { L("TRUNK_CONN_EVENT_NONE"), TRUNK_CONN_EVENT_NONE },
439 { L("TRUNK_CONN_EVENT_READ"), TRUNK_CONN_EVENT_READ },
440 { L("TRUNK_CONN_EVENT_WRITE"), TRUNK_CONN_EVENT_WRITE },
441 { L("TRUNK_CONN_EVENT_BOTH"), TRUNK_CONN_EVENT_BOTH },
442};
444
445#define CONN_TRIGGER(_state) do { \
446 uint8_t idx = fr_high_bit_pos(_state); \
447 if (trunk->conf.conn_triggers && !trunk->trigger_undef[idx]) { \
448 if (trigger(unlang_interpret_get_thread_default(), trunk->conf.conn_trigger_cs, \
449 &trunk->trigger_cp[idx], \
450 fr_table_str_by_value(trunk_conn_trigger_names, _state, \
451 "<INVALID>"), true, trunk->trigger_args) == -1) { \
452 trunk->trigger_undef[idx] = true; \
453 } \
454 } \
455} while (0)
456
457#define CONN_STATE_TRANSITION(_new, _log) \
458do { \
459 _log("[%" PRIu64 "] Trunk connection changed state %s -> %s", \
460 tconn->pub.conn->id, \
461 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
462 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>")); \
463 tconn->pub.state = _new; \
464 CONN_TRIGGER(_new); \
465 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false); \
466} while (0)
467
468#define CONN_BAD_STATE_TRANSITION(_new) \
469do { \
470 if (!fr_cond_assert_msg(0, "[%" PRIu64 "] Trunk connection invalid transition %s -> %s", \
471 tconn->pub.conn->id, \
472 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
473 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>"))) return; \
474} while (0)
475
476#ifndef NDEBUG
477void trunk_request_state_log_entry_add(char const *function, int line,
478 trunk_request_t *treq, trunk_request_state_t new) CC_HINT(nonnull);
479
480#define REQUEST_TRIGGER(_state) do { \
481 if (trunk->conf.req_triggers) { \
482 trigger(unlang_interpret_get_thread_default(), \
483 trunk->conf.req_trigger_cs, NULL, fr_table_str_by_value(trunk_req_trigger_names, _state, \
484 "<INVALID>"), true, trunk->trigger_args); \
485 } \
486} while (0)
487
488/** Record a request state transition and log appropriate output
489 *
490 */
491#define REQUEST_STATE_TRANSITION(_new) \
492do { \
493 request_t *request = treq->pub.request; \
494 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
495 treq->id, \
496 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
497 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
498 trunk_request_state_log_entry_add(__FUNCTION__, __LINE__, treq, _new); \
499 treq->pub.state = _new; \
500 REQUEST_TRIGGER(_new); \
501} while (0)
502#define REQUEST_BAD_STATE_TRANSITION(_new) \
503do { \
504 trunk_request_state_log(&default_log, L_ERR, __FILE__, __LINE__, treq); \
505 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
506 treq->id, \
507 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
508 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
509} while (0)
510#else
511/** Record a request state transition
512 *
513 */
514#define REQUEST_STATE_TRANSITION(_new) \
515do { \
516 request_t *request = treq->pub.request; \
517 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
518 treq->id, \
519 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
520 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
521 treq->pub.state = _new; \
522} while (0)
523#define REQUEST_BAD_STATE_TRANSITION(_new) \
524do { \
525 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
526 treq->id, \
527 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
528 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
529} while (0)
530#endif
531
532
533/** Call the cancel callback if set
534 *
535 */
536#define DO_REQUEST_CANCEL(_treq, _reason) \
537do { \
538 if ((_treq)->pub.trunk->funcs.request_cancel) { \
539 request_t *request = (_treq)->pub.request; \
540 void *_prev = (_treq)->pub.trunk->in_handler; \
541 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_cancel; \
542 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_cancel(conn=%p, preq=%p, reason=%s, uctx=%p)", \
543 (_treq)->pub.tconn->pub.conn, \
544 (_treq)->pub.preq, \
545 fr_table_str_by_value(trunk_cancellation_reasons, \
546 (_reason), \
547 "<INVALID>"), \
548 (_treq)->pub.trunk->uctx); \
549 (_treq)->pub.trunk->funcs.request_cancel((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_reason), (_treq)->pub.trunk->uctx); \
550 (_treq)->pub.trunk->in_handler = _prev; \
551 } \
552} while(0)
553
554/** Call the "conn_release" callback (if set)
555 *
556 */
557#define DO_REQUEST_CONN_RELEASE(_treq) \
558do { \
559 if ((_treq)->pub.trunk->funcs.request_conn_release) { \
560 request_t *request = (_treq)->pub.request; \
561 void *_prev = (_treq)->pub.trunk->in_handler; \
562 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_conn_release; \
563 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_conn_release(conn=%p, preq=%p, uctx=%p)", \
564 (_treq)->pub.tconn->pub.conn, \
565 (_treq)->pub.preq, \
566 (_treq)->pub.trunk->uctx); \
567 (_treq)->pub.trunk->funcs.request_conn_release((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
568 (_treq)->pub.trunk->in_handler = _prev; \
569 } \
570} while(0)
571
572/** Call the complete callback (if set)
573 *
574 */
575#define DO_REQUEST_COMPLETE(_treq) \
576do { \
577 if ((_treq)->pub.trunk->funcs.request_complete) { \
578 request_t *request = (_treq)->pub.request; \
579 void *_prev = (_treq)->pub.trunk->in_handler; \
580 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_complete(request=%p, preq=%p, rctx=%p, uctx=%p)", \
581 (_treq)->pub.request, \
582 (_treq)->pub.preq, \
583 (_treq)->pub.rctx, \
584 (_treq)->pub.trunk->uctx); \
585 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_complete; \
586 (_treq)->pub.trunk->funcs.request_complete((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, (_treq)->pub.trunk->uctx); \
587 (_treq)->pub.trunk->in_handler = _prev; \
588 } \
589} while(0)
590
591/** Call the fail callback (if set)
592 *
593 */
594#define DO_REQUEST_FAIL(_treq, _prev_state) \
595do { \
596 if ((_treq)->pub.trunk->funcs.request_fail) { \
597 request_t *request = (_treq)->pub.request; \
598 void *_prev = (_treq)->pub.trunk->in_handler; \
599 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_fail(request=%p, preq=%p, rctx=%p, state=%s uctx=%p)", \
600 (_treq)->pub.request, \
601 (_treq)->pub.preq, \
602 (_treq)->pub.rctx, \
603 fr_table_str_by_value(trunk_request_states, (_prev_state), "<INVALID>"), \
604 (_treq)->pub.trunk->uctx); \
605 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_fail; \
606 (_treq)->pub.trunk->funcs.request_fail((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, _prev_state, (_treq)->pub.trunk->uctx); \
607 (_treq)->pub.trunk->in_handler = _prev; \
608 } \
609} while(0)
610
611/** Call the free callback (if set)
612 *
613 */
614#define DO_REQUEST_FREE(_treq) \
615do { \
616 if ((_treq)->pub.trunk->funcs.request_free) { \
617 request_t *request = (_treq)->pub.request; \
618 void *_prev = (_treq)->pub.trunk->in_handler; \
619 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_free(request=%p, preq=%p, uctx=%p)", \
620 (_treq)->pub.request, \
621 (_treq)->pub.preq, \
622 (_treq)->pub.trunk->uctx); \
623 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_free; \
624 (_treq)->pub.trunk->funcs.request_free((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
625 (_treq)->pub.trunk->in_handler = _prev; \
626 } \
627} while(0)
628
629/** Write one or more requests to a connection
630 *
631 */
632#define DO_REQUEST_MUX(_tconn) \
633do { \
634 void *_prev = (_tconn)->pub.trunk->in_handler; \
635 DEBUG3("[%" PRIu64 "] Calling request_mux(el=%p, tconn=%p, conn=%p, uctx=%p)", \
636 (_tconn)->pub.conn->id, \
637 (_tconn)->pub.trunk->el, \
638 (_tconn), \
639 (_tconn)->pub.conn, \
640 (_tconn)->pub.trunk->uctx); \
641 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_mux; \
642 (_tconn)->pub.trunk->funcs.request_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
643 (_tconn)->pub.trunk->in_handler = _prev; \
644} while(0)
645
646/** Read one or more requests from a connection
647 *
648 */
649#define DO_REQUEST_DEMUX(_tconn) \
650do { \
651 void *_prev = (_tconn)->pub.trunk->in_handler; \
652 DEBUG3("[%" PRIu64 "] Calling request_demux(tconn=%p, conn=%p, uctx=%p)", \
653 (_tconn)->pub.conn->id, \
654 (_tconn), \
655 (_tconn)->pub.conn, \
656 (_tconn)->pub.trunk->uctx); \
657 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_demux; \
658 (_tconn)->pub.trunk->funcs.request_demux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
659 (_tconn)->pub.trunk->in_handler = _prev; \
660} while(0)
661
662/** Write one or more cancellation requests to a connection
663 *
664 */
665#define DO_REQUEST_CANCEL_MUX(_tconn) \
666do { \
667 if ((_tconn)->pub.trunk->funcs.request_cancel_mux) { \
668 void *_prev = (_tconn)->pub.trunk->in_handler; \
669 DEBUG3("[%" PRIu64 "] Calling request_cancel_mux(tconn=%p, conn=%p, uctx=%p)", \
670 (_tconn)->pub.conn->id, \
671 (_tconn), \
672 (_tconn)->pub.conn, \
673 (_tconn)->pub.trunk->uctx); \
674 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_cancel_mux; \
675 (_tconn)->pub.trunk->funcs.request_cancel_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
676 (_tconn)->pub.trunk->in_handler = _prev; \
677 } \
678} while(0)
679
680/** Allocate a new connection
681 *
682 */
683#define DO_CONNECTION_ALLOC(_tconn) \
684do { \
685 void *_prev = trunk->in_handler; \
686 DEBUG3("Calling connection_alloc(tconn=%p, el=%p, conf=%p, log_prefix=\"%s\", uctx=%p)", \
687 (_tconn), \
688 (_tconn)->pub.trunk->el, \
689 (_tconn)->pub.trunk->conf.conn_conf, \
690 trunk->log_prefix, \
691 (_tconn)->pub.trunk->uctx); \
692 (_tconn)->pub.trunk->in_handler = (void *) (_tconn)->pub.trunk->funcs.connection_alloc; \
693 (_tconn)->pub.conn = trunk->funcs.connection_alloc((_tconn), (_tconn)->pub.trunk->el, (_tconn)->pub.trunk->conf.conn_conf, (_tconn)->pub.trunk->log_prefix, trunk->uctx); \
694 (_tconn)->pub.trunk->in_handler = _prev; \
695 if (!(_tconn)->pub.conn) { \
696 ERROR("Failed creating new connection"); \
697 talloc_free(tconn); \
698 return -1; \
699 } \
700} while(0)
701
702/** Change what events the connection should be notified about
703 *
704 */
705#define DO_CONNECTION_NOTIFY(_tconn, _events) \
706do { \
707 if ((_tconn)->pub.trunk->funcs.connection_notify) { \
708 void *_prev = (_tconn)->pub.trunk->in_handler; \
709 DEBUG3("[%" PRIu64 "] Calling connection_notify(tconn=%p, conn=%p, el=%p, events=%s, uctx=%p)", \
710 (_tconn)->pub.conn->id, \
711 (_tconn), \
712 (_tconn)->pub.conn, \
713 (_tconn)->pub.trunk->el, \
714 fr_table_str_by_value(trunk_connection_events, (_events), "<INVALID>"), \
715 (_tconn)->pub.trunk->uctx); \
716 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.connection_notify; \
717 (_tconn)->pub.trunk->funcs.connection_notify((_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->el, (_events), (_tconn)->pub.trunk->uctx); \
718 (_tconn)->pub.trunk->in_handler = _prev; \
719 } \
720} while(0)
721
722#define IN_HANDLER(_trunk) (((_trunk)->in_handler) != NULL)
723#define IN_REQUEST_MUX(_trunk) (((_trunk)->funcs.request_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_mux))
724#define IN_REQUEST_DEMUX(_trunk) (((_trunk)->funcs.request_demux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_demux))
725#define IN_REQUEST_CANCEL_MUX(_trunk) (((_trunk)->funcs.request_cancel_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_cancel_mux))
726
727#define IS_SERVICEABLE(_tconn) ((_tconn)->pub.state & TRUNK_CONN_SERVICEABLE)
728#define IS_PROCESSING(_tconn) ((_tconn)->pub.state & TRUNK_CONN_PROCESSING)
729
730/** Remove the current request from the backlog
731 *
732 */
733#define REQUEST_EXTRACT_BACKLOG(_treq) \
734do { \
735 int _ret; \
736 _ret = fr_heap_extract(&(_treq)->pub.trunk->backlog, _treq); \
737 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from backlog heap: %s", fr_strerror())) break; \
738} while (0)
739
740/** Remove the current request from the pending list
741 *
742 */
743#define REQUEST_EXTRACT_PENDING(_treq) \
744do { \
745 int _ret; \
746 _ret = fr_heap_extract(&(_treq)->pub.tconn->pending, _treq); \
747 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from pending heap: %s", fr_strerror())) break; \
748} while (0)
749
750/** Remove the current request from the partial slot
751 *
752 */
753#define REQUEST_EXTRACT_PARTIAL(_treq) \
754do { \
755 fr_assert((_treq)->pub.tconn->partial == treq); \
756 tconn->partial = NULL; \
757} while (0)
758
759/** Remove the current request from the sent list
760 *
761 */
762#define REQUEST_EXTRACT_SENT(_treq) fr_dlist_remove(&tconn->sent, treq)
763
764/** Remove the current request from the reapable list
765 *
766 */
767#define REQUEST_EXTRACT_REAPABLE(_treq) fr_dlist_remove(&tconn->reapable, treq)
768
769/** Remove the current request from the cancel list
770 *
771 */
772#define REQUEST_EXTRACT_CANCEL(_treq) fr_dlist_remove(&tconn->cancel, treq)
773
774/** Remove the current request from the cancel_partial slot
775 *
776 */
777#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq) \
778do { \
779 fr_assert((_treq)->pub.tconn->cancel_partial == treq); \
780 tconn->cancel_partial = NULL; \
781} while (0)
782
783/** Remove the current request from the cancel sent list
784 *
785 */
786#define REQUEST_EXTRACT_CANCEL_SENT(_treq) fr_dlist_remove(&tconn->cancel_sent, treq)
787
788/** Reorder the connections in the active heap
789 *
790 * fr_heap_extract will also error out if heap_id is bad - no need for assert
791 */
792#define CONN_REORDER(_tconn) \
793do { \
794 int _ret; \
795 if ((fr_minmax_heap_num_elements((_tconn)->pub.trunk->active) == 1)) break; \
796 if (!fr_cond_assert((_tconn)->pub.state == TRUNK_CONN_ACTIVE)) break; \
797 _ret = fr_minmax_heap_extract((_tconn)->pub.trunk->active, (_tconn)); \
798 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) break; \
799 fr_minmax_heap_insert((_tconn)->pub.trunk->active, (_tconn)); \
800} while (0)
801
802/** Call a list of watch functions associated with a state
803 *
804 */
806{
807 /*
808 * Nested watcher calls are not allowed
809 * and shouldn't be possible because of
810 * deferred signal processing.
811 */
812 fr_assert(trunk->next_watcher == NULL);
813
814 while ((trunk->next_watcher = fr_dlist_next(list, trunk->next_watcher))) {
815 trunk_watch_entry_t *entry = trunk->next_watcher;
816 bool oneshot = entry->oneshot; /* Watcher could be freed, so store now */
817
818 if (!entry->enabled) continue;
819 if (oneshot) trunk->next_watcher = fr_dlist_remove(list, entry);
820
821 entry->func(trunk, trunk->pub.state, state, entry->uctx);
822
823 if (oneshot) talloc_free(entry);
824 }
825 trunk->next_watcher = NULL;
826}
827
828/** Call the state change watch functions
829 *
830 */
831#define CALL_WATCHERS(_trunk, _state) \
832do { \
833 if (fr_dlist_empty(&(_trunk)->watch[_state])) break; \
834 trunk_watch_call((_trunk), &(_trunk)->watch[_state], _state); \
835} while(0)
836
837/** Remove a watch function from a trunk state list
838 *
839 * @param[in] trunk The trunk to remove the watcher from.
840 * @param[in] state to remove the watch from.
841 * @param[in] watch Function to remove.
842 * @return
843 * - 0 if the function was removed successfully.
844 * - -1 if the function wasn't present in the watch list.
845 * - -2 if an invalid state was passed.
846 */
848{
849 trunk_watch_entry_t *entry = NULL;
850 fr_dlist_head_t *list;
851
852 if (state >= TRUNK_STATE_MAX) return -2;
853
854 list = &trunk->watch[state];
855 while ((entry = fr_dlist_next(list, entry))) {
856 if (entry->func == watch) {
857 if (trunk->next_watcher == entry) {
858 trunk->next_watcher = fr_dlist_remove(list, entry);
859 } else {
860 fr_dlist_remove(list, entry);
861 }
862 talloc_free(entry);
863 return 0;
864 }
865 }
866
867 return -1;
868}
869
870/** Add a watch entry to the trunk state list
871 *
872 * @param[in] trunk The trunk to add the watcher to.
873 * @param[in] state to watch for.
874 * @param[in] watch Function to add.
875 * @param[in] oneshot Should this watcher only be run once.
876 * @param[in] uctx Context to pass to function.
877 * @return
878 * - NULL if an invalid state is passed.
879 * - A new watch entry handle on success.
880 */
882 trunk_watch_t watch, bool oneshot, void const *uctx)
883{
884 trunk_watch_entry_t *entry;
885 fr_dlist_head_t *list;
886
887 if (state >= TRUNK_STATE_MAX) return NULL;
888
889 list = &trunk->watch[state];
890 MEM(entry = talloc_zero(trunk, trunk_watch_entry_t));
891
892 entry->func = watch;
893 entry->oneshot = oneshot;
894 entry->enabled = true;
895 memcpy(&entry->uctx, &uctx, sizeof(entry->uctx));
896 fr_dlist_insert_tail(list, entry);
897
898 return entry;
899}
900
901#define TRUNK_STATE_TRANSITION(_new) \
902do { \
903 DEBUG3("Trunk changed state %s -> %s", \
904 fr_table_str_by_value(trunk_states, trunk->pub.state, "<INVALID>"), \
905 fr_table_str_by_value(trunk_states, _new, "<INVALID>")); \
906 CALL_WATCHERS(trunk, _new); \
907 trunk->pub.state = _new; \
908} while (0)
909
910static void trunk_request_enter_backlog(trunk_request_t *treq, bool new);
911static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new);
920
921static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out,
922 trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify);
923
924static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now);
925static inline void trunk_connection_auto_full(trunk_connection_t *tconn);
926static inline void trunk_connection_auto_unfull(trunk_connection_t *tconn);
927static inline void trunk_connection_readable(trunk_connection_t *tconn);
928static inline void trunk_connection_writable(trunk_connection_t *tconn);
936
937static void trunk_rebalance(trunk_t *trunk);
938static void trunk_manage(trunk_t *trunk, fr_time_t now);
939static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx);
940static void trunk_backlog_drain(trunk_t *trunk);
941
942/** Compare two protocol requests
943 *
944 * Allows protocol requests to be prioritised with a function
945 * specified by the API client. Defaults to by pointer address
946 * if no function is specified.
947 *
948 * @param[in] a treq to compare to b.
949 * @param[in] b treq to compare to a.
950 * @return
951 * - +1 if a > b.
952 * - 0 if a == b.
953 * - -1 if a < b.
954 */
955static int8_t _trunk_request_prioritise(void const *a, void const *b)
956{
959
960 fr_assert(treq_a->pub.trunk == treq_b->pub.trunk);
961
962 return treq_a->pub.trunk->funcs.request_prioritise(treq_a->pub.preq, treq_b->pub.preq);
963}
964
965/** Remove a request from all connection lists
966 *
967 * A common function used by init, fail, complete state functions to disassociate
968 * a request from a connection in preparation for freeing or reassignment.
969 *
970 * Despite its unassuming name, this function is *the* place to put calls to
971 * functions which need to be called when the number of requests associated with
972 * a connection changes.
973 *
974 * Trunk requests will always be passed to this function before they're removed
975 * from a connection, even if the requests are being freed.
976 *
977 * @param[in] treq to trigger a state change for.
978 */
980{
981 trunk_connection_t *tconn = treq->pub.tconn;
982 trunk_t *trunk = treq->pub.trunk;
983
984 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
985
986 switch (treq->pub.state) {
988 return; /* Not associated with connection */
989
992 break;
993
996 break;
997
1000 break;
1001
1004 break;
1005
1008 break;
1009
1012 break;
1013
1016 break;
1017
1018 default:
1019 fr_assert(0);
1020 break;
1021 }
1022
1023 /*
1024 * If the request wasn't associated with a
1025 * connection, then there's nothing more
1026 * to do.
1027 */
1028 if (!tconn) return;
1029
1030 {
1031 request_t *request = treq->pub.request;
1032
1033 ROPTIONAL(RDEBUG3, DEBUG3, "%s Trunk connection released request %" PRIu64,
1034 tconn->pub.conn->name, treq->id);
1035 }
1036 /*
1037 * Release any connection specific resources the
1038 * treq holds.
1039 */
1041
1042 switch (tconn->pub.state){
1043 case TRUNK_CONN_FULL:
1044 trunk_connection_auto_unfull(tconn); /* Check if we can switch back to active */
1045 if (tconn->pub.state == TRUNK_CONN_FULL) break; /* Only fallthrough if conn is now active */
1047
1048 case TRUNK_CONN_ACTIVE:
1049 CONN_REORDER(tconn);
1050 break;
1051
1052 default:
1053 break;
1054 }
1055
1056 treq->pub.tconn = NULL;
1057
1058 /*
1059 * Request removed from the connection
1060 * see if we need up deregister I/O events.
1061 */
1063}
1064
1065/** Transition a request to the unassigned state, in preparation for re-assignment
1066 *
1067 * @note treq->tconn may be inviable after calling
1068 * if treq->conn and connection_signals_pause are not used.
1069 * This is due to call to trunk_request_remove_from_conn.
1070 *
1071 * @param[in] treq to trigger a state change for.
1072 */
1074{
1075 trunk_t *trunk = treq->pub.trunk;
1076
1077 switch (treq->pub.state) {
1079 return;
1080
1083 break;
1084
1090 break;
1091
1092 default:
1094 }
1095
1097}
1098
1099/** Transition a request to the backlog state, adding it to the backlog of the trunk
1100 *
1101 * @note treq->tconn and treq may be inviable after calling
1102 * if treq->conn and connection_signals_pause are not used.
1103 * This is due to call to trunk_manage.
1104 *
1105 * @param[in] treq to trigger a state change for.
1106 * @param[in] new Whether this is a new request.
1107 */
1109{
1110 trunk_connection_t *tconn = treq->pub.tconn;
1111 trunk_t *trunk = treq->pub.trunk;
1112
1113 switch (treq->pub.state) {
1116 break;
1117
1120 break;
1121
1124 break;
1125
1126 default:
1128 }
1129
1131 fr_heap_insert(&trunk->backlog, treq); /* Insert into the backlog heap */
1132
1133 /*
1134 * A new request has entered the trunk.
1135 * Re-calculate request/connection ratios.
1136 */
1137 if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1138
1139 /*
1140 * To reduce latency, if there's no connections
1141 * in the connecting state, call the trunk manage
1142 * function immediately.
1143 *
1144 * Likewise, if there's draining connections
1145 * which could be moved back to active call
1146 * the trunk manage function.
1147 *
1148 * Remember requests only enter the backlog if
1149 * there's no connections which can service them.
1150 */
1154 }
1155}
1156
1157/** Transition a request to the pending state, adding it to the backlog of an active connection
1158 *
1159 * All trunk requests being added to a connection get passed to this function.
1160 * All trunk requests being removed from a connection get passed to #trunk_request_remove_from_conn.
1161 *
1162 * @note treq->tconn and treq may be inviable after calling
1163 * if treq->conn and connection_signals_pause is not used.
1164 * This is due to call to trunk_connection_event_update.
1165 *
1166 * @param[in] treq to trigger a state change for.
1167 * @param[in] tconn to enqueue the request on.
1168 * @param[in] new Whether this is a new request.
1169 */
1171{
1172 trunk_t *trunk = treq->pub.trunk;
1173
1174 fr_assert(tconn->pub.trunk == trunk);
1175 fr_assert(IS_PROCESSING(tconn));
1176
1177 switch (treq->pub.state) {
1180 fr_assert(!treq->pub.tconn);
1181 break;
1182
1184 fr_assert(!treq->pub.tconn);
1186 break;
1187
1188 case TRUNK_REQUEST_STATE_CANCEL: /* Moved from another connection */
1190 break;
1191
1192 default:
1194 }
1195
1196 /*
1197 * Assign the new connection first this first so
1198 * it appears in the state log.
1199 */
1200 treq->pub.tconn = tconn;
1201
1203
1204 {
1205 request_t *request = treq->pub.request;
1206
1207 ROPTIONAL(RDEBUG, DEBUG3, "%s Trunk connection assigned request %"PRIu64,
1208 tconn->pub.conn->name, treq->id);
1209 }
1210 fr_heap_insert(&tconn->pending, treq);
1211
1212 /*
1213 * A new request has entered the trunk.
1214 * Re-calculate request/connection ratios.
1215 */
1216 if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1217
1218 /*
1219 * Check if we need to automatically transition the
1220 * connection to full.
1221 */
1223
1224 /*
1225 * Reorder the connection in the heap now it has an
1226 * additional request.
1227 */
1228 if (tconn->pub.state == TRUNK_CONN_ACTIVE) CONN_REORDER(tconn);
1229
1230 /*
1231 * We have a new request, see if we need to register
1232 * for I/O events.
1233 */
1235}
1236
1237/** Transition a request to the partial state, indicating that is has been partially sent
1238 *
1239 * @param[in] treq to trigger a state change for.
1240 */
1242{
1243 trunk_connection_t *tconn = treq->pub.tconn;
1244 trunk_t *trunk = treq->pub.trunk;
1245
1246 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1247
1248 switch (treq->pub.state) {
1249 case TRUNK_REQUEST_STATE_PENDING: /* All requests go through pending, even requeued ones */
1251 break;
1252
1253 default:
1255 }
1256
1257 fr_assert(!tconn->partial);
1258 tconn->partial = treq;
1259
1261}
1262
1263/** Transition a request to the sent state, indicating that it's been sent in its entirety
1264 *
1265 * @note treq->tconn and treq may be inviable after calling
1266 * if treq->conn and connection_signals_pause is not used.
1267 * This is due to call to trunk_connection_event_update.
1268 *
1269 * @param[in] treq to trigger a state change for.
1270 */
1272{
1273 trunk_connection_t *tconn = treq->pub.tconn;
1274 trunk_t *trunk = treq->pub.trunk;
1275
1276 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1277
1278 switch (treq->pub.state) {
1281 break;
1282
1285 break;
1286
1287 default:
1289 }
1290
1292 fr_dlist_insert_tail(&tconn->sent, treq);
1293
1294 /*
1295 * Update the connection's sent stats if this is the
1296 * first time this request is being sent.
1297 */
1298 if (!treq->sent) {
1299 trunk->pub.last_write_success = fr_time();
1300
1302 tconn->sent_count++;
1303 treq->sent = true;
1304
1305 /*
1306 * Enforces max_uses
1307 */
1308 if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1309 DEBUG3("Trunk hit max uses %" PRIu64 " at %d", trunk->conf.max_uses, __LINE__);
1311 }
1312 }
1313
1314 /*
1315 * We just sent a request, we probably need
1316 * to tell the event loop we want to be
1317 * notified if there's data available.
1318 */
1320}
1321
1322/** Transition a request to the reapable state, indicating that it's been sent in its entirety, but no response is expected
1323 *
1324 * @note Largely a replica of trunk_request_enter_sent.
1325 *
1326 * @param[in] treq to trigger a state change for.
1327 */
1329{
1330 trunk_connection_t *tconn = treq->pub.tconn;
1331 trunk_t *trunk = treq->pub.trunk;
1332
1333 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1334
1335 switch (treq->pub.state) {
1338 break;
1339
1342 break;
1343
1344 default:
1346 }
1347
1349 fr_dlist_insert_tail(&tconn->reapable, treq);
1350
1351 if (!treq->sent) {
1352 tconn->sent_count++;
1353 treq->sent = true;
1354
1355 if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1356 DEBUG3("Trunk hit max uses %" PRIu64 " at %d", trunk->conf.max_uses, __LINE__);
1358 }
1359 }
1360
1362}
1363
1364/** Transition a request to the cancel state, placing it in a connection's cancellation list
1365 *
1366 * If a request_cancel_send callback is provided, that callback will
1367 * be called periodically for requests which were cancelled due to
1368 * a signal.
1369 *
1370 * The request_cancel_send callback will dequeue cancelled requests
1371 * and inform a remote server that the result is no longer required.
1372 *
1373 * A request must enter this state before being added to the backlog
1374 * of another connection if it's been sent or partially sent.
1375 *
1376 * @note treq->tconn and treq may be inviable after calling
1377 * if treq->conn and connection_signals_pause is not used.
1378 * This is due to call to trunk_connection_event_update.
1379 *
1380 * @param[in] treq to trigger a state change for.
1381 * @param[in] reason Why the request was cancelled.
1382 * Should be one of:
1383 * - TRUNK_CANCEL_REASON_SIGNAL request cancelled
1384 * because of a signal from the interpreter.
1385 * - TRUNK_CANCEL_REASON_MOVE request cancelled
1386 * because the connection failed and it needs
1387 * to be assigned to a new connection.
1388 * - TRUNK_CANCEL_REASON_REQUEUE request cancelled
1389 * as it needs to be resent on the same connection.
1390 */
1392{
1393 trunk_connection_t *tconn = treq->pub.tconn;
1394 trunk_t *trunk = treq->pub.trunk;
1395
1396 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1397
1398 switch (treq->pub.state) {
1401 break;
1402
1405 break;
1406
1409 break;
1410
1411 default:
1413 }
1414
1416 fr_dlist_insert_tail(&tconn->cancel, treq);
1417 treq->cancel_reason = reason;
1418
1419 DO_REQUEST_CANCEL(treq, reason);
1420
1421 /*
1422 * Our treq is no longer bound to an actual
1423 * request_t *, as we can't guarantee the
1424 * lifetime of the original request_t *.
1425 */
1426 if (treq->cancel_reason == TRUNK_CANCEL_REASON_SIGNAL) treq->pub.request = NULL;
1427
1428 /*
1429 * Register for I/O write events if we need to.
1430 */
1432}
1433
1434/** Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot
1435 *
1436 * The request_demux function is then responsible for signalling
1437 * that the cancel request is complete when the remote server
1438 * acknowledges the cancellation request.
1439 *
1440 * @param[in] treq to trigger a state change for.
1441 */
1443{
1444 trunk_connection_t *tconn = treq->pub.tconn;
1445 trunk_t *trunk = treq->pub.trunk;
1446
1447 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1450
1451 switch (treq->pub.state) {
1452 case TRUNK_REQUEST_STATE_CANCEL: /* The only valid state cancel_sent can be reached from */
1454 break;
1455
1456 default:
1458 }
1459
1461 fr_assert(!tconn->cancel_partial);
1462 tconn->cancel_partial = treq;
1463}
1464
1465/** Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list
1466 *
1467 * The request_demux function is then responsible for signalling
1468 * that the cancel request is complete when the remote server
1469 * acknowledges the cancellation request.
1470 *
1471 * @note treq->tconn and treq may be inviable after calling
1472 * if treq->conn and connection_signals_pause is not used.
1473 * This is due to call to trunk_connection_event_update.
1474 *
1475 * @param[in] treq to trigger a state change for.
1476 */
1478{
1479 trunk_connection_t *tconn = treq->pub.tconn;
1480 trunk_t *trunk = treq->pub.trunk;
1481
1482 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1485
1486 switch (treq->pub.state) {
1489 break;
1490
1493 break;
1494
1495 default:
1497 }
1498
1500 fr_dlist_insert_tail(&tconn->cancel_sent, treq);
1501
1502 /*
1503 * De-register for I/O write events
1504 * and register the read events
1505 * to drain the cancel ACKs.
1506 */
1508}
1509
1510/** Cancellation was acked, the request is complete, free it
1511 *
1512 * The API client will not be informed, as the original request_t *
1513 * will likely have been freed by this point.
1514 *
1515 * @note treq will be inviable after a call to this function.
1516 * treq->tconn may be inviable after calling
1517 * if treq->conn and connection_signals_pause is not used.
1518 * This is due to call to trunk_request_remove_from_conn.
1519 *
1520 * @param[in] treq to mark as complete.
1521 */
1523{
1524 trunk_connection_t *tconn = treq->pub.tconn;
1525 trunk_t *trunk = treq->pub.trunk;
1526
1527 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1528 if (!fr_cond_assert(!treq->pub.request)) return; /* Only a valid state for request_t * which have been cancelled */
1529
1530 switch (treq->pub.state) {
1533 break;
1534
1535 default:
1537 }
1538
1540
1542 trunk_request_free(&treq); /* Free the request */
1543}
1544
1545/** Request completed successfully, inform the API client and free the request
1546 *
1547 * @note treq will be inviable after a call to this function.
1548 * treq->tconn may also be inviable due to call to
1549 * trunk_request_remove_from_conn.
1550 *
1551 * @param[in] treq to mark as complete.
1552 */
1554{
1555 trunk_connection_t *tconn = treq->pub.tconn;
1556 trunk_t *trunk = treq->pub.trunk;
1557
1558 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1559
1560 switch (treq->pub.state) {
1565 break;
1566
1567 default:
1569 }
1570
1572 DO_REQUEST_COMPLETE(treq);
1573 trunk_request_free(&treq); /* Free the request */
1574}
1575
1576/** Request failed, inform the API client and free the request
1577 *
1578 * @note treq will be inviable after a call to this function.
1579 * treq->tconn may also be inviable due to call to
1580 * trunk_request_remove_from_conn.
1581 *
1582 * @param[in] treq to mark as failed.
1583 */
1585{
1586 trunk_connection_t *tconn = treq->pub.tconn;
1587 trunk_t *trunk = treq->pub.trunk;
1588 trunk_request_state_t prev = treq->pub.state;
1589
1590 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1591
1592 switch (treq->pub.state) {
1595 break;
1596
1597 default:
1599 break;
1600 }
1601
1603 DO_REQUEST_FAIL(treq, prev);
1604 trunk_request_free(&treq); /* Free the request */
1605}
1606
1607/** Check to see if a trunk request can be enqueued
1608 *
1609 * @param[out] tconn_out Connection the request may be enqueued on.
1610 * @param[in] trunk To enqueue requests on.
1611 * @param[in] request associated with the treq (if any).
1612 * @return
1613 * - TRUNK_ENQUEUE_OK caller should enqueue request on provided tconn.
1614 * - TRUNK_ENQUEUE_IN_BACKLOG Request should be queued in the backlog.
1615 * - TRUNK_ENQUEUE_NO_CAPACITY Unable to enqueue request as we have no spare
1616 * connections or backlog space.
1617 * - TRUNK_ENQUEUE_DST_UNAVAILABLE Can't enqueue because the destination is
1618 * unreachable.
1619 */
1621 request_t *request)
1622{
1623 trunk_connection_t *tconn;
1624 /*
1625 * If we have an active connection then
1626 * return that.
1627 */
1628 tconn = fr_minmax_heap_min_peek(trunk->active);
1629 if (tconn) {
1630 *tconn_out = tconn;
1631 return TRUNK_ENQUEUE_OK;
1632 }
1633
1634 /*
1635 * Unlike the connection pool, we don't need
1636 * to drive any internal processes by feeding
1637 * it requests.
1638 *
1639 * If the last event to occur was a failure
1640 * we refuse to enqueue new requests until
1641 * one or more connections comes online.
1642 */
1643 if (!trunk->conf.backlog_on_failed_conn &&
1644 fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
1645 fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed)) {
1647 RWARN, WARN, "Refusing to enqueue requests - "
1648 "No active connections and last event was a connection failure");
1649
1651 }
1652
1653
1654 /*
1655 * Only enforce if we're limiting maximum
1656 * number of connections, and maximum
1657 * number of requests per connection.
1658 *
1659 * The alloc function also checks this
1660 * which is why this is only done for
1661 * debug builds.
1662 */
1663 if (trunk->conf.max_req_per_conn && trunk->conf.max) {
1664 uint64_t limit;
1665
1666 limit = trunk->conf.max * (uint64_t)trunk->conf.max_req_per_conn;
1667 if (limit > 0) {
1668 uint64_t total_reqs;
1669
1670 total_reqs = trunk_request_count_by_state(trunk, TRUNK_CONN_ALL,
1672 if (total_reqs >= (limit + trunk->conf.max_backlog)) {
1674 RWARN, WARN, "Refusing to alloc requests - "
1675 "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
1676 "plus %u backlog requests reached",
1677 limit, trunk->conf.max, trunk->conf.max_req_per_conn,
1678 trunk->conf.max_backlog);
1680 }
1681 }
1682 }
1683
1685}
1686
1687/** Enqueue a request which has never been assigned to a connection or was previously cancelled
1688 *
1689 * @param[in] treq to re enqueue. Must have been removed
1690 * from its existing connection with
1691 * #trunk_connection_requests_dequeue.
1692 * @return
1693 * - TRUNK_ENQUEUE_OK Request was re-enqueued.
1694 * - TRUNK_ENQUEUE_NO_CAPACITY Request enqueueing failed because we're at capacity.
1695 * - TRUNK_ENQUEUE_DST_UNAVAILABLE Enqueuing failed for some reason.
1696 * Usually because the connection to the resource is down.
1697 */
1699{
1700 trunk_t *trunk = treq->pub.trunk;
1701 trunk_connection_t *tconn = NULL;
1702 trunk_enqueue_t ret;
1703
1704 /*
1705 * Must *NOT* still be assigned to another connection
1706 */
1707 fr_assert(!treq->pub.tconn);
1708
1709 ret = trunk_request_check_enqueue(&tconn, trunk, treq->pub.request);
1710 switch (ret) {
1711 case TRUNK_ENQUEUE_OK:
1712 if (trunk->conf.always_writable) {
1714 trunk_request_enter_pending(treq, tconn, false);
1717 } else {
1718 trunk_request_enter_pending(treq, tconn, false);
1719 }
1720 break;
1721
1723 /*
1724 * No more connections and request
1725 * is already in the backlog.
1726 *
1727 * Signal our caller it should stop
1728 * trying to drain the backlog.
1729 */
1731 trunk_request_enter_backlog(treq, false);
1732 break;
1733
1734 default:
1735 break;
1736 }
1737
1738 return ret;
1739}
1740
1741/** Shift requests in the specified states onto new connections
1742 *
1743 * This function will blindly dequeue any requests in the specified state and get
1744 * them back to the unassigned state, cancelling any sent or partially sent requests.
1745 *
1746 * This function does not check that dequeuing a request in a particular state is a
1747 * sane or sensible thing to do, that's up to the caller!
1748 *
1749 * @param[out] out A list to insert the newly dequeued and unassigned
1750 * requests into.
1751 * @param[in] tconn to dequeue requests from.
1752 * @param[in] states Dequeue request in these states.
1753 * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
1754 */
1756 int states, uint64_t max)
1757{
1758 trunk_request_t *treq;
1759 uint64_t count = 0;
1760
1761 if (max == 0) max = UINT64_MAX;
1762
1763#define OVER_MAX_CHECK if (++count > max) return (count - 1)
1764
1765#define DEQUEUE_ALL(_src_list, _state) do { \
1766 while ((treq = fr_dlist_head(_src_list))) { \
1767 OVER_MAX_CHECK; \
1768 fr_assert(treq->pub.state == (_state)); \
1769 trunk_request_enter_unassigned(treq); \
1770 fr_dlist_insert_tail(out, treq); \
1771 } } while (0)
1772
1773 /*
1774 * Don't need to do anything with
1775 * cancellation requests.
1776 */
1777 if (states & TRUNK_REQUEST_STATE_CANCEL) DEQUEUE_ALL(&tconn->cancel,
1779
1780 /*
1781 * ...same with cancel inform
1782 */
1785
1786 /*
1787 * ....same with cancel partial
1788 */
1791 treq = tconn->cancel_partial;
1792 if (treq) {
1796 }
1797 }
1798
1799 /*
1800 * ...and pending.
1801 */
1802 if (states & TRUNK_REQUEST_STATE_PENDING) {
1803 while ((treq = fr_heap_peek(tconn->pending))) {
1808 }
1809 }
1810
1811 /*
1812 * Cancel partially sent requests
1813 */
1814 if (states & TRUNK_REQUEST_STATE_PARTIAL) {
1816 treq = tconn->partial;
1817 if (treq) {
1819
1820 /*
1821 * Don't allow the connection to change state whilst
1822 * we're draining requests from it.
1823 */
1829 }
1830 }
1831
1832 /*
1833 * Cancel sent requests
1834 */
1835 if (states & TRUNK_REQUEST_STATE_SENT) {
1836 /*
1837 * Don't allow the connection to change state whilst
1838 * we're draining requests from it.
1839 */
1841 while ((treq = fr_dlist_head(&tconn->sent))) {
1844
1848 }
1850 }
1851
1852 return count;
1853}
1854
1855/** Remove requests in specified states from a connection, attempting to distribute them to new connections
1856 *
1857 * @param[in] tconn To remove requests from.
1858 * @param[in] states One or more states or'd together.
1859 * @param[in] max The maximum number of requests to dequeue.
1860 * 0 for unlimited.
1861 * @param[in] fail_bound If true causes any requests bound to the connection to fail.
1862 * If false bound requests will not be moved.
1863 *
1864 * @return the number of requests re-queued.
1865 */
1866static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
1867{
1868 trunk_t *trunk = tconn->pub.trunk;
1869 fr_dlist_head_t to_process;
1870 trunk_request_t *treq = NULL;
1871 uint64_t moved = 0;
1872
1873 if (max == 0) max = UINT64_MAX;
1874
1875 fr_dlist_talloc_init(&to_process, trunk_request_t, entry);
1876
1877 /*
1878 * Prevent the connection changing state whilst we're
1879 * working with it.
1880 *
1881 * There's a user callback that can be called by
1882 * trunk_request_enqueue_existing which can reconnect
1883 * the connection.
1884 */
1886
1887 /*
1888 * Remove non-cancelled requests from the connection
1889 */
1890 moved += trunk_connection_requests_dequeue(&to_process, tconn, states & ~TRUNK_REQUEST_STATE_CANCEL_ALL, max);
1891
1892 /*
1893 * Prevent requests being requeued on the same trunk
1894 * connection, which would break rebalancing.
1895 *
1896 * This is a bit of a hack, but nothing should test
1897 * for connection/list consistency in this code,
1898 * and if something is added later, it'll be flagged
1899 * by the tests.
1900 */
1901 if (tconn->pub.state == TRUNK_CONN_ACTIVE) {
1902 int ret;
1903
1904 ret = fr_minmax_heap_extract(trunk->active, tconn);
1905 if (!fr_cond_assert_msg(ret == 0,
1906 "Failed extracting conn from active heap: %s", fr_strerror())) goto done;
1907
1908 }
1909
1910 /*
1911 * Loop over all the requests we gathered and
1912 * redistribute them to new connections.
1913 */
1914 while ((treq = fr_dlist_next(&to_process, treq))) {
1915 trunk_request_t *prev;
1916
1917 prev = fr_dlist_remove(&to_process, treq);
1918
1919 /*
1920 * Attempts to re-queue a request
1921 * that's bound to a connection
1922 * results in a failure.
1923 */
1924 if (treq->bound_to_conn) {
1925 if (fail_bound || !IS_SERVICEABLE(tconn)) {
1927 } else {
1928 trunk_request_enter_pending(treq, tconn, false);
1929 }
1930 goto next;
1931 }
1932
1933 switch (trunk_request_enqueue_existing(treq)) {
1934 case TRUNK_ENQUEUE_OK:
1935 break;
1936
1937 /*
1938 * A connection failed, and
1939 * there's no other connections
1940 * available to deal with the
1941 * load, it's been placed back
1942 * in the backlog.
1943 */
1945 break;
1946
1947 /*
1948 * If we fail to re-enqueue then
1949 * there's nothing to do except
1950 * fail the request.
1951 */
1954 case TRUNK_ENQUEUE_FAIL:
1956 break;
1957 }
1958 next:
1959 treq = prev;
1960 }
1961
1962 /*
1963 * Add the connection back into the active list
1964 */
1965 if (tconn->pub.state == TRUNK_CONN_ACTIVE) {
1966 int ret;
1967
1968 ret = fr_minmax_heap_insert(trunk->active, tconn);
1969 if (!fr_cond_assert_msg(ret == 0,
1970 "Failed re-inserting conn into active heap: %s", fr_strerror())) goto done;
1971 }
1972 if (moved >= max) goto done;
1973
1974 /*
1975 * Deal with the cancelled requests specially we can't
1976 * queue them up again as they were only valid on that
1977 * specific connection.
1978 *
1979 * We just need to run them to completion which, as
1980 * they should already be in the unassigned state,
1981 * just means freeing them.
1982 */
1983 moved += trunk_connection_requests_dequeue(&to_process, tconn,
1984 states & TRUNK_REQUEST_STATE_CANCEL_ALL, max - moved);
1985 while ((treq = fr_dlist_next(&to_process, treq))) {
1986 trunk_request_t *prev;
1987
1988 prev = fr_dlist_remove(&to_process, treq);
1989 trunk_request_free(&treq);
1990 treq = prev;
1991 }
1992
1993done:
1994
1995 /*
1996 * Always re-calculate the request/connection
1997 * ratio at the end.
1998 *
1999 * This avoids having the state transition
2000 * functions do it.
2001 *
2002 * The ratio would be wrong when they calculated
2003 * it anyway, because a bunch of requests are
2004 * dequeued from the connection and temporarily
2005 * cease to exist from the perspective of the
2006 * trunk_requests_per_connection code.
2007 */
2008 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
2009
2011 return moved;
2012}
2013
2014/** Move requests off of a connection and requeue elsewhere
2015 *
2016 * @note We don't re-queue on draining or draining to free, as requests should have already been
2017 * moved off of the connection. It's also dangerous as the trunk management code main
2018 * clean up a connection in this state when it's run on re-queue, and then the caller
2019 * may try and access a now freed connection.
2020 *
2021 * @param[in] tconn to move requests off of.
2022 * @param[in] states Only move requests in this state.
2023 * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
2024 * @param[in] fail_bound If true causes any requests bound to the connection to fail.
2025 * If false bound requests will not be moved.
2026 * @return The number of requests requeued.
2027 */
2028uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
2029{
2030 switch (tconn->pub.state) {
2031 case TRUNK_CONN_ACTIVE:
2032 case TRUNK_CONN_FULL:
2034 return trunk_connection_requests_requeue_priv(tconn, states, max, fail_bound);
2035
2036 default:
2037 return 0;
2038 }
2039}
2040
2041/** Signal a partial write
2042 *
2043 * Where there's high load, and the outbound write buffer is full
2044 *
2045 * @param[in] treq to signal state change for.
2046 */
2048{
2049 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2050
2052 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2053
2054 switch (treq->pub.state) {
2057 break;
2058
2059 default:
2060 return;
2061 }
2062}
2063
2064/** Signal that the request was written to a connection successfully
2065 *
2066 * @param[in] treq to signal state change for.
2067 */
2069{
2070 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2071
2073 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2074
2075 switch (treq->pub.state) {
2079 break;
2080
2081 default:
2082 return;
2083 }
2084}
2085
2086/** Signal that the request was written to a connection successfully, but no response is expected
2087 *
2088 * @param[in] treq to signal state change for.
2089 */
2091{
2092 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2093
2095 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2096
2097 switch (treq->pub.state) {
2101 break;
2102
2103 default:
2104 return;
2105 }
2106}
2107
2108/** Signal that a trunk request is complete
2109 *
2110 * The API client will be informed that the request is now complete.
2111 */
2113{
2114 trunk_t *trunk = treq->pub.trunk;
2115
2116 if (!fr_cond_assert_msg(trunk, "treq not associated with trunk")) return;
2117
2118 /*
2119 * We assume that if the request is being signalled
2120 * as complete from the demux function, that it was
2121 * a successful read.
2122 *
2123 * If this assumption turns out to be incorrect
2124 * then we need to add an argument to signal_complete
2125 * to indicate if this is a successful read.
2126 */
2127 if (IN_REQUEST_DEMUX(trunk)) {
2128 trunk_connection_t *tconn = treq->pub.tconn;
2129
2130 trunk->pub.last_read_success = fr_time();
2132 }
2133
2134 switch (treq->pub.state) {
2136 case TRUNK_REQUEST_STATE_PENDING: /* Got immediate response, i.e. cached */
2139 break;
2140
2141 default:
2142 return;
2143 }
2144}
2145
2146/** Signal that a trunk request failed
2147 *
2148 * The API client will be informed that the request has failed.
2149 */
2151{
2152 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2153
2155}
2156
2157/** Cancel a trunk request
2158 *
2159 * treq can be in any state, but requests to cancel if the treq is not in
2160 * the TRUNK_REQUEST_STATE_PARTIAL or TRUNK_REQUEST_STATE_SENT state will be ignored.
2161 *
2162 * The complete or failed callbacks will not be called here, as it's assumed the request_t *
2163 * is now inviable as it's being cancelled.
2164 *
2165 * The free function however, is called, and that should be used to perform necessary
2166 * cleanup.
2167 *
2168 * @param[in] treq to signal state change for.
2169 */
2171{
2172 trunk_t *trunk;
2173
2174 /*
2175 * Ensure treq hasn't been freed
2176 */
2177 (void)talloc_get_type_abort(treq, trunk_request_t);
2178
2179 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2180
2182 "%s cannot be called within a handler", __FUNCTION__)) return;
2183
2184 trunk = treq->pub.trunk;
2185
2186 switch (treq->pub.state) {
2187 /*
2188 * We don't call the complete or failed callbacks
2189 * as the request and rctx are no longer viable.
2190 */
2193 {
2194 trunk_connection_t *tconn = treq->pub.tconn;
2195
2196 /*
2197 * Don't allow connection state changes
2198 */
2202 "Bad state %s after cancellation",
2203 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"))) {
2205 return;
2206 }
2207 /*
2208 * No cancel muxer. We're done.
2209 *
2210 * If we do have a cancel mux function,
2211 * the next time this connection becomes
2212 * writable, we'll call the cancel mux
2213 * function.
2214 *
2215 * We don't run the complete or failed
2216 * callbacks here as the request is
2217 * being cancelled.
2218 */
2219 if (!trunk->funcs.request_cancel_mux) {
2221 trunk_request_free(&treq);
2222 }
2224 }
2225 break;
2226
2227 /*
2228 * We're already in the process of cancelling a
2229 * request, so ignore duplicate signals.
2230 */
2235 break;
2236
2237 /*
2238 * For any other state, we just release the request
2239 * from its current connection and free it.
2240 */
2241 default:
2243 trunk_request_free(&treq);
2244 break;
2245 }
2246}
2247
2248/** Signal a partial cancel write
2249 *
2250 * Where there's high load, and the outbound write buffer is full
2251 *
2252 * @param[in] treq to signal state change for.
2253 */
2255{
2256 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2257
2259 "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2260
2261 switch (treq->pub.state) {
2264 break;
2265
2266 default:
2267 return;
2268 }
2269}
2270
2271/** Signal that a remote server has been notified of the cancellation
2272 *
2273 * Called from request_cancel_mux to indicate that the datastore has been informed
2274 * that the response is no longer needed.
2275 *
2276 * @param[in] treq to signal state change for.
2277 */
2279{
2280 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2281
2283 "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2284
2285 switch (treq->pub.state) {
2289 break;
2290
2291 default:
2292 break;
2293 }
2294}
2295
2296/** Signal that a remote server acked our cancellation
2297 *
2298 * Called from request_demux to indicate that it got an ack for the cancellation.
2299 *
2300 * @param[in] treq to signal state change for.
2301 */
2303{
2304 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2305
2307 "%s can only be called from within request_demux or request_cancel_mux handlers",
2308 __FUNCTION__)) return;
2309
2310 switch (treq->pub.state) {
2312 /*
2313 * This is allowed, as we may not need to wait
2314 * for the database to ACK our cancellation
2315 * request.
2316 *
2317 * Note: TRUNK_REQUEST_STATE_CANCEL_PARTIAL
2318 * is not allowed here, as that'd mean we'd half
2319 * written the cancellation request out to the
2320 * socket, and then decided to abandon it.
2321 *
2322 * That'd leave the socket in an unusable state.
2323 */
2326 break;
2327
2328 default:
2329 break;
2330 }
2331}
2332
2333/** If the trunk request is freed then update the target requests
2334 *
2335 * gperftools showed calling the request free function directly was slightly faster
2336 * than using talloc_free.
2337 *
2338 * @param[in] treq_to_free request.
2339 */
2341{
2342 trunk_request_t *treq = *treq_to_free;
2343 trunk_t *trunk;
2344
2345 if (unlikely(!treq)) return;
2346
2347 trunk = treq->pub.trunk;
2348
2349 /*
2350 * The only valid states a trunk request can be
2351 * freed from.
2352 */
2353 switch (treq->pub.state) {
2359 break;
2360
2361 default:
2362 if (!fr_cond_assert(0)) return;
2363 }
2364
2365 /*
2366 * Zero out the pointer to prevent double frees
2367 */
2368 *treq_to_free = NULL;
2369
2370 /*
2371 * Call the API client callback to free
2372 * any associated memory.
2373 */
2374 DO_REQUEST_FREE(treq);
2375
2376 /*
2377 * Update the last above/below target stats
2378 * We only do this when we alloc or free
2379 * connections, or on connection
2380 * state changes.
2381 */
2382 trunk_requests_per_connection(NULL, NULL, treq->pub.trunk, fr_time(), false);
2383
2384 /*
2385 * This tracks the total number of requests
2386 * allocated and not freed or returned to
2387 * the free list.
2388 */
2389 if (fr_cond_assert(trunk->pub.req_alloc > 0)) trunk->pub.req_alloc--;
2390
2391 /*
2392 * No cleanup delay, means cleanup immediately
2393 */
2396
2397#ifndef NDEBUG
2398 /*
2399 * Ensure anything parented off the treq
2400 * is freed. We do this to trigger
2401 * the destructors for the log entries.
2402 */
2403 talloc_free_children(treq);
2404
2405 /*
2406 * State log should now be empty as entries
2407 * remove themselves from the dlist
2408 * on free.
2409 */
2411 "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2412#endif
2413
2414 talloc_free(treq);
2415 return;
2416 }
2417
2418 /*
2419 * Ensure anything parented off the treq
2420 * is freed.
2421 */
2422 talloc_free_children(treq);
2423
2424#ifndef NDEBUG
2425 /*
2426 * State log should now be empty as entries
2427 * remove themselves from the dlist
2428 * on free.
2429 */
2431 "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2432#endif
2433
2434 /*
2435 *
2436 * Return the trunk request back to the init state.
2437 */
2438 *treq = (trunk_request_t){
2439 .pub = {
2441 .trunk = treq->pub.trunk,
2442 },
2443 .cancel_reason = TRUNK_CANCEL_REASON_NONE,
2444 .last_freed = fr_time(),
2445#ifndef NDEBUG
2446 .log = treq->log /* Keep the list head, to save reinitialisation */
2447#endif
2448 };
2449
2450 /*
2451 * Insert at the head, so that we can free
2452 * requests that have been unused for N
2453 * seconds from the tail.
2454 */
2455 fr_dlist_insert_tail(&trunk->free_requests, treq);
2456}
2457
2458/** Actually free the trunk request
2459 *
2460 */
2462{
2463 trunk_t *trunk = treq->pub.trunk;
2464
2465 switch (treq->pub.state) {
2468 break;
2469
2470 default:
2471 fr_assert(0);
2472 break;
2473 }
2474
2475 fr_dlist_remove(&trunk->free_requests, treq);
2476
2477 return 0;
2478}
2479
2480/** (Pre-)Allocate a new trunk request
2481 *
2482 * If trunk->conf.req_pool_headers or trunk->conf.req_pool_size are not zero then the
2483 * request will be a talloc pool, which can be used to hold the preq.
2484 *
2485 * @note Do not use MEM to check the result of this allocated as it may fail for
2486 * non-fatal reasons.
2487 *
2488 * @param[in] trunk to add request to.
2489 * @param[in] request to wrap in a trunk request (treq).
2490 * @return
2491 * - A newly allocated request.
2492 * - NULL if too many requests are allocated.
2493 */
2495{
2496 trunk_request_t *treq;
2497
2498 /*
2499 * The number of treqs currently allocated
2500 * exceeds the maximum number allowed.
2501 */
2502 if (trunk->conf.max_req_per_conn && trunk->conf.max) {
2503 uint64_t limit;
2504
2505 limit = (uint64_t) trunk->conf.max_req_per_conn * trunk->conf.max;
2506 if (trunk->pub.req_alloc >= (limit + trunk->conf.max_backlog)) {
2508 RWARN, WARN, "Refusing to alloc requests - "
2509 "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
2510 "plus %u backlog requests reached",
2511 limit, trunk->conf.max, trunk->conf.max_req_per_conn,
2512 trunk->conf.max_backlog);
2513 return NULL;
2514 }
2515 }
2516
2517 /*
2518 * Allocate or reuse an existing request
2519 */
2520 treq = fr_dlist_head(&trunk->free_requests);
2521 if (treq) {
2522 fr_dlist_remove(&trunk->free_requests, treq);
2524 fr_assert(treq->pub.trunk == trunk);
2525 fr_assert(treq->pub.tconn == NULL);
2528 trunk->pub.req_alloc_reused++;
2529 } else {
2531 trunk->conf.req_pool_headers, trunk->conf.req_pool_size));
2532 talloc_set_destructor(treq, _trunk_request_free);
2533
2534 *treq = (trunk_request_t){
2535 .pub = {
2537 .trunk = trunk
2538 },
2539 .cancel_reason = TRUNK_CANCEL_REASON_NONE
2540 };
2541 trunk->pub.req_alloc_new++;
2542#ifndef NDEBUG
2544#endif
2545 }
2546
2547 trunk->pub.req_alloc++;
2549 /* heap_id - initialised when treq inserted into pending */
2550 /* list - empty */
2551 /* preq - populated later */
2552 /* rctx - populated later */
2553 treq->pub.request = request;
2554
2555 return treq;
2556}
2557
2558/** Enqueue a request that needs data written to the trunk
2559 *
2560 * When a request_t * needs to make an asynchronous request to an external datastore
2561 * it should call this function, specifying a preq (protocol request) containing
2562 * the data necessary to request information from the external datastore, and an
2563 * rctx (resume ctx) used to hold the decoded response and/or any error codes.
2564 *
2565 * After a treq is successfully enqueued it will either be assigned immediately
2566 * to the pending queue of a connection, or if no connections are available,
2567 * (depending on the trunk configuration) the treq will be placed in the trunk's
2568 * global backlog.
2569 *
2570 * After receiving a positive return code from this function the caller should
2571 * immediately yield, to allow the various timers and I/O handlers that drive tconn
2572 * (trunk connection) and treq state changes to be called.
2573 *
2574 * When a tconn becomes writable (or the trunk is configured to be always writable)
2575 * the #trunk_request_mux_t callback will be called to dequeue, encode and
2576 * send any pending requests for that tconn. The #trunk_request_mux_t callback
2577 * is also responsible for tracking the outbound requests to allow the
2578 * #trunk_request_demux_t callback to match inbound responses with the original
2579 * treq. Once the #trunk_request_mux_t callback is done processing the treq
2580 * it signals what state the treq should enter next using one of the
2581 * trunk_request_signal_* functions.
2582 *
2583 * When a tconn becomes readable the user specified #trunk_request_demux_t
2584 * callback is called to process any responses, match them with the original treq.
2585 * and signal what state they should enter next using one of the
2586 * trunk_request_signal_* functions.
2587 *
2588 * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2589 * is NULL, a new treq will be allocated.
2590 * Otherwise treq should point to memory allocated
2591 * with trunk_request_alloc.
2592 * @param[in] trunk to enqueue request on.
2593 * @param[in] request to enqueue.
2594 * @param[in] preq Protocol request to write out. Will be freed when
2595 * treq is freed. Should ideally be parented by the
2596 * treq if possible.
2597 * Use #trunk_request_alloc for pre-allocation of
2598 * the treq.
2599 * @param[in] rctx The resume context to write any result to.
2600 * @return
2601 * - TRUNK_ENQUEUE_OK.
2602 * - TRUNK_ENQUEUE_IN_BACKLOG.
2603 * - TRUNK_ENQUEUE_NO_CAPACITY.
2604 * - TRUNK_ENQUEUE_DST_UNAVAILABLE
2605 * - TRUNK_ENQUEUE_FAIL
2606 */
2608 request_t *request, void *preq, void *rctx)
2609{
2610 trunk_connection_t *tconn = NULL;
2611 trunk_request_t *treq;
2612 trunk_enqueue_t ret;
2613
2614 if (!fr_cond_assert_msg(!IN_HANDLER(trunk),
2615 "%s cannot be called within a handler", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2616
2617 if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == TRUNK_REQUEST_STATE_INIT),
2618 "%s requests must be in \"init\" state", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2619
2620 /*
2621 * If delay_start was set, we may need
2622 * to insert the timer for the connection manager.
2623 */
2624 if (unlikely(!trunk->started)) {
2625 if (trunk_start(trunk) < 0) return TRUNK_ENQUEUE_FAIL;
2626 }
2627
2628 ret = trunk_request_check_enqueue(&tconn, trunk, request);
2629 switch (ret) {
2630 case TRUNK_ENQUEUE_OK:
2631 if (*treq_out) {
2632 treq = *treq_out;
2633 } else {
2634 *treq_out = treq = trunk_request_alloc(trunk, request);
2635 if (!treq) return TRUNK_ENQUEUE_NO_CAPACITY;
2636 }
2637 treq->pub.preq = preq;
2638 treq->pub.rctx = rctx;
2639 if (trunk->conf.always_writable) {
2641 trunk_request_enter_pending(treq, tconn, true);
2644 } else {
2645 trunk_request_enter_pending(treq, tconn, true);
2646 }
2647 break;
2648
2650 if (*treq_out) {
2651 treq = *treq_out;
2652 } else {
2653 *treq_out = treq = trunk_request_alloc(trunk, request);
2654 if (!treq) return TRUNK_ENQUEUE_NO_CAPACITY;
2655 }
2656 treq->pub.preq = preq;
2657 treq->pub.rctx = rctx;
2658 trunk_request_enter_backlog(treq, true);
2659 break;
2660
2661 default:
2662 /*
2663 * If a trunk request was provided
2664 * populate the preq and rctx fields
2665 * so that if it's freed with
2666 * trunk_request_free, the free
2667 * function works as intended.
2668 */
2669 if (*treq_out) {
2670 treq = *treq_out;
2671 treq->pub.preq = preq;
2672 treq->pub.rctx = rctx;
2673 }
2674 return ret;
2675 }
2676
2677 return ret;
2678}
2679
2680/** Re-enqueue a request on the same connection
2681 *
2682 * If the treq has been sent, we assume that we're being signalled to requeue
2683 * because something outside of the trunk API has determined that a retransmission
2684 * is required. The easiest way to perform that retransmission is to clean up
2685 * any tracking information for the request, and the requeue it for transmission.
2686 *
2687 * IF re-queueing fails, the request will enter the fail state. It should not be
2688 * accessed if this occurs.
2689 *
2690 * @param[in] treq to requeue (retransmit).
2691 * @return
2692 * - TRUNK_ENQUEUE_OK.
2693 * - TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2694 * - TRUNK_ENQUEUE_FAIL - Request isn't in a valid state to be reassigned.
2695 */
2697{
2698 trunk_connection_t *tconn = treq->pub.tconn; /* Existing conn */
2699
2700 if (!tconn) return TRUNK_ENQUEUE_FAIL;
2701
2702 if (!IS_PROCESSING(tconn)) {
2705 }
2706
2707 switch (treq->pub.state) {
2713 trunk_request_enter_pending(treq, tconn, false);
2714 if (treq->pub.trunk->conf.always_writable) {
2716 }
2718 break;
2719
2720 case TRUNK_REQUEST_STATE_BACKLOG: /* Do nothing.... */
2721 case TRUNK_REQUEST_STATE_PENDING: /* Do nothing.... */
2722 break;
2723
2724 default:
2726 return TRUNK_ENQUEUE_FAIL;
2727 }
2728
2729 return TRUNK_ENQUEUE_OK;
2730}
2731
2732/** Enqueue additional requests on a specific connection
2733 *
2734 * This may be used to create a series of requests on a single connection, or to generate
2735 * in-band status checks.
2736 *
2737 * @note If conf->always_writable, then the muxer will be called immediately. The caller
2738 * must be able to handle multiple calls to its muxer gracefully.
2739 *
2740 * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2741 * is NULL, a new treq will be allocated.
2742 * Otherwise treq should point to memory allocated
2743 * with trunk_request_alloc.
2744 * @param[in] tconn to enqueue request on.
2745 * @param[in] request to enqueue.
2746 * @param[in] preq Protocol request to write out. Will be freed when
2747 * treq is freed. Should ideally be parented by the
2748 * treq if possible.
2749 * Use #trunk_request_alloc for pre-allocation of
2750 * the treq.
2751 * @param[in] rctx The resume context to write any result to.
2752 * @param[in] ignore_limits Ignore max_req_per_conn. Useful to force status
2753 * checks through even if the connection is at capacity.
2754 * Will also allow enqueuing on "inactive", "draining",
2755 * "draining-to-free" connections.
2756 * @return
2757 * - TRUNK_ENQUEUE_OK.
2758 * - TRUNK_ENQUEUE_NO_CAPACITY - At max_req_per_conn_limit
2759 * - TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2760 */
2762 request_t *request, void *preq, void *rctx,
2763 bool ignore_limits)
2764{
2765 trunk_request_t *treq;
2766 trunk_t *trunk = tconn->pub.trunk;
2767
2768 if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == TRUNK_REQUEST_STATE_INIT),
2769 "%s requests must be in \"init\" state", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2770
2772
2773 /*
2774 * Limits check
2775 */
2776 if (!ignore_limits) {
2777 if (trunk->conf.max_req_per_conn &&
2780
2782 }
2783
2784 if (*treq_out) {
2785 treq = *treq_out;
2786 } else {
2787 *treq_out = treq = trunk_request_alloc(trunk, request);
2788 if (!treq) return TRUNK_ENQUEUE_NO_CAPACITY;
2789 }
2790
2791 treq->pub.preq = preq;
2792 treq->pub.rctx = rctx;
2793 treq->bound_to_conn = true; /* Don't let the request be transferred */
2794
2795 if (trunk->conf.always_writable) {
2797 trunk_request_enter_pending(treq, tconn, true);
2800 } else {
2801 trunk_request_enter_pending(treq, tconn, true);
2802 }
2803
2804 return TRUNK_ENQUEUE_OK;
2805}
2806
2807#ifndef NDEBUG
2808/** Used for sanity checks to ensure all log entries have been freed
2809 *
2810 */
2812{
2813 fr_dlist_remove(slog->log_head, slog);
2814
2815 return 0;
2816}
2817
2818void trunk_request_state_log_entry_add(char const *function, int line,
2820{
2821 trunk_request_state_log_t *slog = NULL;
2822
2824 slog = fr_dlist_head(&treq->log);
2825 fr_assert_msg(slog, "slog list head NULL but element counter was %u",
2826 fr_dlist_num_elements(&treq->log));
2827 (void)fr_dlist_remove(&treq->log, slog); /* Returns NULL when removing the list head */
2828 memset(slog, 0, sizeof(*slog));
2829 } else {
2830 MEM(slog = talloc_zero(treq, trunk_request_state_log_t));
2831 talloc_set_destructor(slog, _state_log_entry_free);
2832 }
2833
2834 slog->log_head = &treq->log;
2835 slog->from = treq->pub.state;
2836 slog->to = new;
2837 slog->function = function;
2838 slog->line = line;
2839 if (treq->pub.tconn) {
2840 slog->tconn = treq->pub.tconn;
2841 slog->tconn_id = treq->pub.tconn->pub.conn->id;
2842 slog->tconn_state = treq->pub.tconn->pub.state;
2843 }
2844
2845 fr_dlist_insert_tail(&treq->log, slog);
2846
2847}
2848
2849void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line,
2850 trunk_request_t const *treq)
2851{
2852 trunk_request_state_log_t *slog = NULL;
2853
2854 int i;
2855
2856 for (slog = fr_dlist_head(&treq->log), i = 0;
2857 slog;
2858 slog = fr_dlist_next(&treq->log, slog), i++) {
2859 fr_log(log, log_type, file, line, "[%u] %s:%i - in conn %"PRIu64" in state %s - %s -> %s",
2860 i, slog->function, slog->line,
2861 slog->tconn_id,
2863 slog->tconn_state, "<INVALID>") : "none",
2864 fr_table_str_by_value(trunk_request_states, slog->from, "<INVALID>"),
2865 fr_table_str_by_value(trunk_request_states, slog->to, "<INVALID>"));
2866 }
2867}
2868#endif
2869
2870/** Return the count number of connections in the specified states
2871 *
2872 * @param[in] trunk to retrieve counts for.
2873 * @param[in] conn_state One or more #trunk_connection_state_t states or'd together.
2874 * @return The number of connections in the specified states.
2875 */
2877{
2878 uint16_t count = 0;
2879
2880 if (conn_state & TRUNK_CONN_INIT) count += fr_dlist_num_elements(&trunk->init);
2881 if (conn_state & TRUNK_CONN_CONNECTING) count += fr_dlist_num_elements(&trunk->connecting);
2882 if (conn_state & TRUNK_CONN_ACTIVE) count += fr_minmax_heap_num_elements(trunk->active);
2883 if (conn_state & TRUNK_CONN_FULL) count += fr_dlist_num_elements(&trunk->full);
2884 if (conn_state & TRUNK_CONN_INACTIVE) count += fr_dlist_num_elements(&trunk->inactive);
2886 if (conn_state & TRUNK_CONN_CLOSED) count += fr_dlist_num_elements(&trunk->closed);
2887 if (conn_state & TRUNK_CONN_DRAINING) count += fr_dlist_num_elements(&trunk->draining);
2889
2890 return count;
2891}
2892
2893/** Return the count number of requests associated with a trunk connection
2894 *
2895 * @param[in] tconn to return request count for.
2896 * @param[in] req_state One or more request states or'd together.
2897 *
2898 * @return The number of requests in the specified states, associated with a tconn.
2899 */
2901{
2902 uint32_t count = 0;
2903
2905 if (req_state & TRUNK_REQUEST_STATE_PARTIAL) count += tconn->partial ? 1 : 0;
2906 if (req_state & TRUNK_REQUEST_STATE_SENT) count += fr_dlist_num_elements(&tconn->sent);
2908 if (req_state & TRUNK_REQUEST_STATE_CANCEL) count += fr_dlist_num_elements(&tconn->cancel);
2909 if (req_state & TRUNK_REQUEST_STATE_CANCEL_PARTIAL) count += tconn->cancel_partial ? 1 : 0;
2911
2912 return count;
2913}
2914
2915/** Automatically mark a connection as full
2916 *
2917 * @param[in] tconn to potentially mark as full.
2918 */
2920{
2921 trunk_t *trunk = tconn->pub.trunk;
2923
2924 if (tconn->pub.state != TRUNK_CONN_ACTIVE) return;
2925
2926 /*
2927 * Enforces max_req_per_conn
2928 */
2929 if (trunk->conf.max_req_per_conn > 0) {
2932 }
2933}
2934
2935/** Return whether a trunk connection should currently be considered full
2936 *
2937 * @param[in] tconn to check.
2938 * @return
2939 * - true if the connection is full.
2940 * - false if the connection is not full.
2941 */
2943{
2944 trunk_t *trunk = tconn->pub.trunk;
2946
2947 /*
2948 * Enforces max_req_per_conn
2949 */
2951 if ((trunk->conf.max_req_per_conn == 0) || (count < trunk->conf.max_req_per_conn)) return false;
2952
2953 return true;
2954}
2955
2956/** Automatically mark a connection as active or reconnect it
2957 *
2958 * @param[in] tconn to potentially mark as active or reconnect.
2959 */
2961{
2962 if (tconn->pub.state != TRUNK_CONN_FULL) return;
2963
2964 /*
2965 * Enforces max_req_per_conn
2966 */
2968}
2969
2970/** A connection is readable. Call the request_demux function to read pending requests
2971 *
2972 */
2974{
2975 trunk_t *trunk = tconn->pub.trunk;
2976
2977 DO_REQUEST_DEMUX(tconn);
2978}
2979
2980/** A connection is writable. Call the request_mux function to write pending requests
2981 *
2982 */
2984{
2985 trunk_t *trunk = tconn->pub.trunk;
2986
2987 /*
2988 * Call the cancel_sent function (if we have one)
2989 * to inform a backend datastore we no longer
2990 * care about the result
2991 */
2995 DO_REQUEST_CANCEL_MUX(tconn);
2996 }
3000 DO_REQUEST_MUX(tconn);
3001}
3002
3003/** Update the registrations for I/O events we're interested in
3004 *
3005 */
3007{
3008 trunk_t *trunk = tconn->pub.trunk;
3010
3011 switch (tconn->pub.state) {
3012 /*
3013 * We only register I/O events if the trunk connection is
3014 * in one of these states.
3015 *
3016 * For the other states the trunk shouldn't be processing
3017 * requests.
3018 */
3019 case TRUNK_CONN_ACTIVE:
3020 case TRUNK_CONN_FULL:
3025 /*
3026 * If the connection is always writable,
3027 * then we don't care about write events.
3028 */
3029 if (!trunk->conf.always_writable &&
3033 (trunk->funcs.request_cancel_mux ?
3037 }
3038
3041 (trunk->funcs.request_cancel_mux ?
3044 }
3045 break;
3046
3047 default:
3048 break;
3049 }
3050
3051 if (tconn->events != events) {
3052 /*
3053 * There may be a fatal error which results
3054 * in the connection being freed.
3055 *
3056 * Stop that from happening until after
3057 * we're done using it.
3058 */
3061 tconn->events = events;
3063 }
3064}
3065
3066/** Remove a trunk connection from whichever list it's currently in
3067 *
3068 * @param[in] tconn to remove.
3069 */
3071{
3072 trunk_t *trunk = tconn->pub.trunk;
3073
3074 switch (tconn->pub.state) {
3075 case TRUNK_CONN_ACTIVE:
3076 {
3077 int ret;
3078
3079 ret = fr_minmax_heap_extract(trunk->active, tconn);
3080 if (!fr_cond_assert_msg(ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) return;
3081 }
3082 return;
3083
3084 case TRUNK_CONN_INIT:
3085 fr_dlist_remove(&trunk->init, tconn);
3086 break;
3087
3089 fr_dlist_remove(&trunk->connecting, tconn);
3090 return;
3091
3092 case TRUNK_CONN_CLOSED:
3093 fr_dlist_remove(&trunk->closed, tconn);
3094 return;
3095
3096 case TRUNK_CONN_FULL:
3097 fr_dlist_remove(&trunk->full, tconn);
3098 return;
3099
3101 fr_dlist_remove(&trunk->inactive, tconn);
3102 return;
3103
3105 fr_dlist_remove(&trunk->inactive_draining, tconn);
3106 return;
3107
3109 fr_dlist_remove(&trunk->draining, tconn);
3110 return;
3111
3113 fr_dlist_remove(&trunk->draining_to_free, tconn);
3114 return;
3115
3116 case TRUNK_CONN_HALTED:
3117 return;
3118 }
3119}
3120
3121/** Transition a connection to the full state
3122 *
3123 * Called whenever a trunk connection is at the maximum number of requests.
3124 * Removes the connection from the connected heap, and places it in the full list.
3125 */
3127{
3128 trunk_t *trunk = tconn->pub.trunk;
3129
3130 switch (tconn->pub.state) {
3131 case TRUNK_CONN_ACTIVE:
3133 break;
3134
3135 default:
3137 }
3138
3139 fr_dlist_insert_head(&trunk->full, tconn);
3141}
3142
3143/** Transition a connection to the inactive state
3144 *
3145 * Called whenever the API client wants to stop new requests being enqueued
3146 * on a trunk connection.
3147 */
3149{
3150 trunk_t *trunk = tconn->pub.trunk;
3151
3152 switch (tconn->pub.state) {
3153 case TRUNK_CONN_ACTIVE:
3154 case TRUNK_CONN_FULL:
3156 break;
3157
3158 default:
3160 }
3161
3162 fr_dlist_insert_head(&trunk->inactive, tconn);
3164}
3165
3166/** Transition a connection to the inactive-draining state
3167 *
3168 * Called whenever the trunk manager wants to drain an inactive connection
3169 * of its requests.
3170 */
3172{
3173 trunk_t *trunk = tconn->pub.trunk;
3174
3175 switch (tconn->pub.state) {
3179 break;
3180
3181 default:
3183 }
3184
3187
3188 /*
3189 * Immediately re-enqueue all pending
3190 * requests, so the connection is drained
3191 * quicker.
3192 */
3194}
3195
3196/** Transition a connection to the draining state
3197 *
3198 * Removes the connection from the active heap so it won't be assigned any new
3199 * connections.
3200 */
3202{
3203 trunk_t *trunk = tconn->pub.trunk;
3204
3205 switch (tconn->pub.state) {
3206 case TRUNK_CONN_ACTIVE:
3207 case TRUNK_CONN_FULL:
3211 break;
3212
3213 default:
3215 }
3216
3217 fr_dlist_insert_head(&trunk->draining, tconn);
3219
3220 /*
3221 * Immediately re-enqueue all pending
3222 * requests, so the connection is drained
3223 * quicker.
3224 */
3226}
3227
3228/** Transition a connection to the draining-to-reconnect state
3229 *
3230 * Removes the connection from the active heap so it won't be assigned any new
3231 * connections.
3232 */
3234{
3235 trunk_t *trunk = tconn->pub.trunk;
3236
3238
3239 switch (tconn->pub.state) {
3240 case TRUNK_CONN_ACTIVE:
3241 case TRUNK_CONN_FULL:
3246 break;
3247
3248 default:
3250 }
3251
3252 fr_dlist_insert_head(&trunk->draining_to_free, tconn);
3254
3255 /*
3256 * Immediately re-enqueue all pending
3257 * requests, so the connection is drained
3258 * quicker.
3259 */
3261}
3262
3263
3264/** Transition a connection back to the active state
3265 *
3266 * This should only be called on a connection which is in the full state,
3267 * inactive state, draining state or connecting state.
3268 */
3270{
3271 trunk_t *trunk = tconn->pub.trunk;
3272 int ret;
3273
3274 switch (tconn->pub.state) {
3275 case TRUNK_CONN_FULL:
3280 break;
3281
3282 case TRUNK_CONN_INIT:
3286 break;
3287
3288 default:
3290 }
3291
3292 ret = fr_minmax_heap_insert(trunk->active, tconn); /* re-insert into the active heap*/
3293 if (!fr_cond_assert_msg(ret == 0, "Failed inserting connection into active heap: %s", fr_strerror())) {
3295 return;
3296 }
3297
3299
3300 /*
3301 * Reorder the connections
3302 */
3303 CONN_REORDER(tconn);
3304
3305 /*
3306 * Rebalance requests
3307 */
3308 trunk_rebalance(trunk);
3309
3310 /*
3311 * We place requests into the backlog
3312 * because there were no connections
3313 * available to handle them.
3314 *
3315 * If a connection has become active
3316 * chances are those backlogged requests
3317 * can now be enqueued, so try and do
3318 * that now.
3319 *
3320 * If there's requests sitting in the
3321 * backlog indefinitely, it's because
3322 * they were inserted there erroneously
3323 * when there were active connections
3324 * which could have handled them.
3325 */
3326 trunk_backlog_drain(trunk);
3327}
3328
3329/** Connection transitioned to the the init state
3330 *
3331 * Reflect the connection state change in the lists we use to track connections.
3332 *
3333 * @note This function is only called from the connection API as a watcher.
3334 *
3335 * @param[in] conn The connection which changes state.
3336 * @param[in] prev The connection is was in.
3337 * @param[in] state The connection is now in.
3338 * @param[in] uctx The trunk_connection_t wrapping the connection.
3339 */
3343 void *uctx)
3344{
3345 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3346 trunk_t *trunk = tconn->pub.trunk;
3347
3348 switch (tconn->pub.state) {
3349 case TRUNK_CONN_HALTED:
3350 break;
3351
3352 case TRUNK_CONN_CLOSED:
3354 break;
3355
3356 default:
3358 }
3359
3360 fr_dlist_insert_head(&trunk->init, tconn);
3362}
3363
3364/** Connection transitioned to the connecting state
3365 *
3366 * Reflect the connection state change in the lists we use to track connections.
3367 *
3368 * @note This function is only called from the connection API as a watcher.
3369 *
3370 * @param[in] conn The connection which changes state.
3371 * @param[in] prev The connection is was in.
3372 * @param[in] state The connection is now in.
3373 * @param[in] uctx The trunk_connection_t wrapping the connection.
3374 */
3378 void *uctx)
3379{
3380 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3381 trunk_t *trunk = tconn->pub.trunk;
3382
3383 switch (tconn->pub.state) {
3384 case TRUNK_CONN_INIT:
3385 case TRUNK_CONN_CLOSED:
3387 break;
3388
3389 default:
3391 }
3392
3393 /*
3394 * If a connection just entered the
3395 * connecting state, it should have
3396 * no requests associated with it.
3397 */
3399
3400 fr_dlist_insert_head(&trunk->connecting, tconn); /* MUST remain a head insertion for reconnect logic */
3402}
3403
3404/** Connection transitioned to the shutdown state
3405 *
3406 * If we're not already in the draining-to-free state, transition there now.
3407 *
3408 * The idea is that if something signalled the connection to shutdown, we need
3409 * to reflect that by dequeuing any pending requests, not accepting new ones,
3410 * and waiting for the existing requests to complete.
3411 *
3412 * @note This function is only called from the connection API as a watcher.
3413 *
3414 * @param[in] conn The connection which changes state.
3415 * @param[in] prev The connection is was in.
3416 * @param[in] state The connection is now in.
3417 * @param[in] uctx The trunk_connection_t wrapping the connection.
3418 */
3422 void *uctx)
3423{
3424 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3425
3426 switch (tconn->pub.state) {
3427 case TRUNK_CONN_DRAINING_TO_FREE: /* Do Nothing */
3428 return;
3429
3430 case TRUNK_CONN_ACTIVE: /* Transition to draining-to-free */
3431 case TRUNK_CONN_FULL:
3435 break;
3436
3437 case TRUNK_CONN_INIT:
3439 case TRUNK_CONN_CLOSED:
3440 case TRUNK_CONN_HALTED:
3442 }
3443
3445}
3446
3447/** Trigger a reconnection of the trunk connection
3448 *
3449 * @param[in] tl timer list the timer was inserted into.
3450 * @param[in] now Current time.
3451 * @param[in] uctx The tconn.
3452 */
3454{
3455 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3456
3458}
3459
3460/** Connection transitioned to the connected state
3461 *
3462 * Reflect the connection state change in the lists we use to track connections.
3463 *
3464 * @note This function is only called from the connection API as a watcher.
3465 *
3466 * @param[in] conn The connection which changes state.
3467 * @param[in] prev The connection is was in.
3468 * @param[in] state The connection is now in.
3469 * @param[in] uctx The trunk_connection_t wrapping the connection.
3470 */
3474 void *uctx)
3475{
3476 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3477 trunk_t *trunk = tconn->pub.trunk;
3478
3479 /*
3480 * If a connection was just connected, it should only
3481 * have a pending list of requests. This state is found
3482 * in the rlm_radius module, which starts a new trunk,
3483 * and then immediately enqueues a request onto it. The
3484 * alternative for rlm_radius is to keep it's own queue
3485 * of pending requests before the trunk is fully
3486 * initialized. And then enqueue them onto the trunk
3487 * when the trunk is connected.
3488 *
3489 * It's instead easier (and makes more sense) to allow
3490 * the trunk to accept packets into its queue. If there
3491 * are no connections within a period of time, then the
3492 * requests will retry, or will time out.
3493 */
3495
3496 /*
3497 * Set here, as the active state can
3498 * be transitioned to from full and
3499 * draining too.
3500 */
3501 trunk->pub.last_connected = fr_time();
3502
3503 /*
3504 * Insert a timer to reconnect the
3505 * connection periodically.
3506 */
3507 if (fr_time_delta_ispos(trunk->conf.lifetime)) {
3508 if (fr_timer_in(tconn, trunk->el->tl, &tconn->lifetime_ev,
3509 trunk->conf.lifetime, false, _trunk_connection_lifetime_expire, tconn) < 0) {
3510 PERROR("Failed inserting connection reconnection timer event, halting connection");
3512 return;
3513 }
3514 }
3515
3517}
3518
3519/** Connection failed after it was connected
3520 *
3521 * Reflect the connection state change in the lists we use to track connections.
3522 *
3523 * @note This function is only called from the connection API as a watcher.
3524 *
3525 * @param[in] conn The connection which changes state.
3526 * @param[in] prev The connection is was in.
3527 * @param[in] state The connection is now in.
3528 * @param[in] uctx The trunk_connection_t wrapping the connection.
3529 */
3533 void *uctx)
3534{
3535 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3536 trunk_t *trunk = tconn->pub.trunk;
3537 bool need_requeue = false;
3538
3539 switch (tconn->pub.state) {
3540 case TRUNK_CONN_ACTIVE:
3541 case TRUNK_CONN_FULL:
3546 need_requeue = true;
3548 break;
3549
3550 case TRUNK_CONN_INIT: /* Initialisation failed */
3554 break;
3555
3556 case TRUNK_CONN_CLOSED:
3557 case TRUNK_CONN_HALTED: /* Can't move backwards? */
3559 }
3560
3561 fr_dlist_insert_head(&trunk->closed, tconn); /* MUST remain a head insertion for reconnect logic */
3563
3564 /*
3565 * Now *AFTER* the connection has been
3566 * removed from the active, pool
3567 * re-enqueue the requests.
3568 */
3569 if (need_requeue) trunk_connection_requests_requeue_priv(tconn, TRUNK_REQUEST_STATE_ALL, 0, true);
3570
3571 /*
3572 * There should be no requests left on this
3573 * connection. They should have all been
3574 * moved off or failed.
3575 */
3577
3578 /*
3579 * Clear statistics and flags
3580 */
3581 tconn->sent_count = 0;
3582
3583 /*
3584 * Remove the reconnect event
3585 */
3587
3588 /*
3589 * Remove the I/O events
3590 */
3592}
3593
3594/** Connection failed
3595 *
3596 * @param[in] conn The connection which changes state.
3597 * @param[in] prev The connection is was in.
3598 * @param[in] state The connection is now in.
3599 * @param[in] uctx The trunk_connection_t wrapping the connection.
3600 */
3602 connection_state_t prev,
3604 void *uctx)
3605{
3606 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3607 trunk_t *trunk = tconn->pub.trunk;
3608
3609 /*
3610 * Need to set this first as it
3611 * determines whether requests are
3612 * re-queued or fail outright.
3613 */
3614 trunk->pub.last_failed = fr_time();
3615
3616 /*
3617 * Failed in the init state, transition the
3618 * connection to closed, else we get an
3619 * INIT -> INIT transition which triggers
3620 * an assert.
3621 */
3622 if (prev == CONNECTION_STATE_INIT) _trunk_connection_on_closed(conn, prev, state, uctx);
3623
3624 /*
3625 * See what the state of the trunk is
3626 * if there are no connections that could
3627 * potentially accept requests in the near
3628 * future, then fail all the requests in the
3629 * trunk backlog.
3630 */
3631 if ((prev == CONNECTION_STATE_CONNECTED) &&
3636}
3637
3638/** Connection transitioned to the halted state
3639 *
3640 * Remove the connection remove all lists, as it's likely about to be freed.
3641 *
3642 * Setting the trunk back to the init state ensures that if the code is ever
3643 * refactored and #connection_signal_reconnect is used after a connection
3644 * is halted, then everything is maintained in a valid state.
3645 *
3646 * @note This function is only called from the connection API as a watcher.
3647 *
3648 * @param[in] conn The connection which changes state.
3649 * @param[in] prev The connection is was in.
3650 * @param[in] state The connection is now in.
3651 * @param[in] uctx The trunk_connection_t wrapping the connection.
3652 */
3656 void *uctx)
3657{
3658 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3659 trunk_t *trunk = tconn->pub.trunk;
3660
3661 switch (tconn->pub.state) {
3662 case TRUNK_CONN_INIT:
3663 case TRUNK_CONN_CLOSED:
3665 break;
3666
3667 default:
3669 }
3670
3671 /*
3672 * It began life in the halted state,
3673 * and will end life in the halted state.
3674 */
3676
3677 /*
3678 * There should be no requests left on this
3679 * connection. They should have all been
3680 * moved off or failed.
3681 */
3683
3684 /*
3685 * And free the connection...
3686 */
3687 if (trunk->in_handler) {
3688 /*
3689 * ...later.
3690 */
3691 fr_dlist_insert_tail(&trunk->to_free, tconn);
3692 return;
3693 }
3694 talloc_free(tconn);
3695}
3696
3697/** Free a connection
3698 *
3699 * Enforces orderly free order of children of the tconn
3700 */
3702{
3704 fr_assert(!fr_dlist_entry_in_list(&tconn->entry)); /* Should not be in a list */
3705
3706 /*
3707 * Loop over all the requests we gathered
3708 * and transition them to the failed state,
3709 * freeing them.
3710 *
3711 * Usually, requests will be re-queued when
3712 * a connection enters the closed state,
3713 * but in this case because the whole trunk
3714 * is being freed, we don't bother, and
3715 * just signal to the API client that the
3716 * requests failed.
3717 */
3718 if (tconn->pub.trunk->freeing) {
3719 fr_dlist_head_t to_fail;
3720 trunk_request_t *treq = NULL;
3721
3722 fr_dlist_talloc_init(&to_fail, trunk_request_t, entry);
3723
3724 /*
3725 * Remove requests from this connection
3726 */
3728 while ((treq = fr_dlist_next(&to_fail, treq))) {
3729 trunk_request_t *prev;
3730
3731 prev = fr_dlist_remove(&to_fail, treq);
3733 treq = prev;
3734 }
3735 }
3736
3737 /*
3738 * Ensure we're not signalled by the connection
3739 * as it processes its backlog of state changes,
3740 * as we are about to be freed.
3741 */
3749
3750 /*
3751 * This may return -1, indicating the free was deferred
3752 * this is fine. It just means the conn will be freed
3753 * after all the handlers have exited.
3754 */
3755 (void)talloc_free(tconn->pub.conn);
3756 tconn->pub.conn = NULL;
3757
3758 return 0;
3759}
3760
3761/** Attempt to spawn a new connection
3762 *
3763 * Calls the API client's alloc() callback to create a new connection_t,
3764 * then inserts the connection into the 'connecting' list.
3765 *
3766 * @param[in] trunk to spawn connection in.
3767 * @param[in] now The current time.
3768 */
3770{
3771 trunk_connection_t *tconn;
3772
3773
3774 /*
3775 * Call the API client's callback to create
3776 * a new connection_t.
3777 */
3778 MEM(tconn = talloc_zero(trunk, trunk_connection_t));
3779 tconn->pub.trunk = trunk;
3780 tconn->pub.state = TRUNK_CONN_HALTED; /* All connections start in the halted state */
3781
3782 /*
3783 * Allocate a new connection_t or fail.
3784 */
3785 DO_CONNECTION_ALLOC(tconn);
3786
3788 fr_dlist_talloc_init(&tconn->sent, trunk_request_t, entry);
3792
3793 /*
3794 * OK, we have the connection, now setup watch
3795 * points so we know when it changes state.
3796 *
3797 * This lets us automatically move the tconn
3798 * between the different lists in the trunk
3799 * with minimum extra code.
3800 */
3802 _trunk_connection_on_init, false, tconn); /* Before init() has been called */
3803
3805 _trunk_connection_on_connecting, false, tconn); /* After init() has been called */
3806
3808 _trunk_connection_on_connected, false, tconn); /* After open() has been called */
3809
3811 _trunk_connection_on_closed, false, tconn); /* Before close() has been called */
3812
3814 _trunk_connection_on_failed, false, tconn); /* Before failed() has been called */
3815
3817 _trunk_connection_on_shutdown, false, tconn); /* After shutdown() has been called */
3818
3820 _trunk_connection_on_halted, false, tconn); /* About to be freed */
3821
3822 talloc_set_destructor(tconn, _trunk_connection_free);
3823
3824 connection_signal_init(tconn->pub.conn); /* annnnd GO! */
3825
3826 trunk->pub.last_open = now;
3827
3828 return 0;
3829}
3830
3831/** Pop a cancellation request off a connection's cancellation queue
3832 *
3833 * The request we return is advanced by the request moving out of the
3834 * cancel state and into the cancel_sent or cancel_complete state.
3835 *
3836 * One of these signalling functions must be called after the request
3837 * has been popped:
3838 *
3839 * - #trunk_request_signal_cancel_sent
3840 * The remote datastore has been informed, but we need to wait for acknowledgement.
3841 * The #trunk_request_demux_t callback must handle the acks calling
3842 * #trunk_request_signal_cancel_complete when an ack is received.
3843 *
3844 * - #trunk_request_signal_cancel_complete
3845 * The request was cancelled and we don't need to wait, clean it up immediately.
3846 *
3847 * @param[out] treq_out to process
3848 * @param[in] tconn Connection to drain cancellation request from.
3849 * @return
3850 * - 1 if no more requests.
3851 * - 0 if a new request was written to treq_out.
3852 * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3853 * memory or requests associated with the connection.
3854 * - -2 if called outside of the cancel muxer.
3855 */
3857{
3858 if (unlikely(tconn->pub.state == TRUNK_CONN_HALTED)) return -1;
3859
3861 "%s can only be called from within request_cancel_mux handler",
3862 __FUNCTION__)) return -2;
3863
3864 *treq_out = tconn->cancel_partial ? tconn->cancel_partial : fr_dlist_head(&tconn->cancel);
3865 if (!*treq_out) return 1;
3866
3867 return 0;
3868}
3869
3870/** Pop a request off a connection's pending queue
3871 *
3872 * The request we return is advanced by the request moving out of the partial or
3873 * pending states, when the mux function signals us.
3874 *
3875 * If the same request is returned again and again, it means the muxer isn't actually
3876 * doing anything with the request we returned, and it's and error in the muxer code.
3877 *
3878 * One of these signalling functions must be used after the request has been popped:
3879 *
3880 * - #trunk_request_signal_complete
3881 * The request was completed. Either we got a synchronous response, or we knew the
3882 * response without contacting an external server (cache).
3883 *
3884 * - #trunk_request_signal_fail
3885 * Failed muxing the request due to a permanent issue, i.e. an invalid request.
3886 *
3887 * - #trunk_request_signal_partial
3888 * Wrote part of a request. This request will be returned on the next call to this
3889 * function so that the request_mux function can finish writing it. Only useful
3890 * for stream type connections. Datagram type connections cannot have partial
3891 * writes.
3892 *
3893 * - #trunk_request_signal_sent Successfully sent a request.
3894 *
3895 * @param[out] treq_out to process
3896 * @param[in] tconn to pop a request from.
3897 * @return
3898 * - 1 if no more requests.
3899 * - 0 if a new request was written to treq_out.
3900 * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3901 * memory or requests associated with the connection.
3902 * - -2 if called outside of the muxer.
3903 */
3905{
3906 if (unlikely(tconn->pub.state == TRUNK_CONN_HALTED)) return -1;
3907
3909 "%s can only be called from within request_mux handler",
3910 __FUNCTION__)) return -2;
3911
3912 *treq_out = tconn->partial ? tconn->partial : fr_heap_peek(tconn->pending);
3913 if (!*treq_out) return 1;
3914
3915 return 0;
3916}
3917
3918/** Signal that a trunk connection is writable
3919 *
3920 * Should be called from the 'write' I/O handler to signal that requests can be enqueued.
3921 *
3922 * @param[in] tconn to signal.
3923 */
3925{
3926 trunk_t *trunk = tconn->pub.trunk;
3927
3928 if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3929 "%s cannot be called within a handler", __FUNCTION__)) return;
3930
3931 DEBUG3("[%" PRIu64 "] Signalled writable", tconn->pub.conn->id);
3932
3934}
3935
3936/** Signal that a trunk connection is readable
3937 *
3938 * Should be called from the 'read' I/O handler to signal that requests should be dequeued.
3939 *
3940 * @param[in] tconn to signal.
3941 */
3943{
3944 trunk_t *trunk = tconn->pub.trunk;
3945
3946 if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3947 "%s cannot be called within a handler", __FUNCTION__)) return;
3948
3949 DEBUG3("[%" PRIu64 "] Signalled readable", tconn->pub.conn->id);
3950
3952}
3953
3954/** Signal a trunk connection cannot accept more requests
3955 *
3956 * @param[in] tconn to signal.
3957 */
3959{
3960 /* Can be called anywhere */
3961
3962 switch (tconn->pub.state) {
3963 case TRUNK_CONN_ACTIVE:
3964 case TRUNK_CONN_FULL:
3966 break;
3967
3970 break;
3971
3972 default:
3973 return;
3974 }
3975}
3976
3977/** Signal a trunk connection is no longer full
3978 *
3979 * @param[in] tconn to signal.
3980 */
3982{
3983 switch (tconn->pub.state) {
3984 case TRUNK_CONN_FULL:
3985 trunk_connection_auto_unfull(tconn); /* Mark as active if it should be active */
3986 break;
3987
3989 /*
3990 * Do the appropriate state transition based on
3991 * how many requests the trunk connection is
3992 * currently servicing.
3993 */
3994 if (trunk_connection_is_full(tconn)) {
3996 break;
3997 }
3999 break;
4000
4001 /*
4002 * Unsetting the active flag just moves
4003 * the connection back to the normal
4004 * draining state.
4005 */
4006 case TRUNK_CONN_INACTIVE_DRAINING: /* Only an external signal can trigger this transition */
4008 break;
4009
4010 default:
4011 return;
4012 }
4013}
4014
4015/** Signal a trunk connection is no longer viable
4016 *
4017 * @param[in] tconn to signal.
4018 * @param[in] reason the connection is being reconnected.
4019 */
4024
4025/** Standard I/O read function
4026 *
4027 * Underlying FD in now readable, so call the trunk to read any pending requests
4028 * from this connection.
4029 *
4030 * @param[in] el The event list signalling.
4031 * @param[in] fd that's now readable.
4032 * @param[in] flags describing the read event.
4033 * @param[in] uctx The trunk connection handle (tconn).
4034 */
4036{
4037 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
4038
4040}
4041
4042/** Standard I/O write function
4043 *
4044 * Underlying FD is now writable, so call the trunk to write any pending requests
4045 * to this connection.
4046 *
4047 * @param[in] el The event list signalling.
4048 * @param[in] fd that's now writable.
4049 * @param[in] flags describing the write event.
4050 * @param[in] uctx The trunk connection handle (tcon).
4051 */
4053{
4054 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
4055
4057}
4058
4059
4060/** Returns true if the trunk connection is in one of the specified states
4061 *
4062 * @param[in] tconn To check state for.
4063 * @param[in] state to check
4064 * @return
4065 * - True if trunk connection is in a particular state.
4066 * - False if trunk connection is not in a particular state.
4067 */
4069{
4070 return (bool)(tconn->pub.state & state);
4071}
4072
4073/** Close connections in a particular connection list if they have no requests associated with them
4074 *
4075 * @param[in] trunk containing connections we want to close.
4076 * @param[in] head of list of connections to examine.
4077 */
4079{
4080 trunk_connection_t *tconn = NULL;
4081
4082 while ((tconn = fr_dlist_next(head, tconn))) {
4083 trunk_connection_t *prev;
4084
4086
4087 prev = fr_dlist_prev(head, tconn);
4088
4089 DEBUG3("Closing %s connection with no requests",
4091 /*
4092 * Close the connection as gracefully
4093 * as possible by signalling it should
4094 * shutdown.
4095 *
4096 * The connection, should, if serviced
4097 * correctly by the underlying library,
4098 * automatically transition to halted after
4099 * all pending reads/writes are
4100 * complete at which point we'll be informed
4101 * and free our tconn wrapper.
4102 */
4104 tconn = prev;
4105 }
4106}
4107
4108/** Rebalance connections across active trunk members when a new connection becomes active
4109 *
4110 * We don't have any visibility into the connection prioritisation algorithm
4111 * it's essentially a black box.
4112 *
4113 * We can however determine when the correct level of requests per connection
4114 * has been reached, by dequeuing and requeing requests up until the point
4115 * where the connection that just had a request dequeued, receives the same
4116 * request back.
4117 *
4118 * @param[in] trunk The trunk to rebalance.
4119 */
4120static void trunk_rebalance(trunk_t *trunk)
4121{
4123
4125
4126 /*
4127 * Only rebalance if the top and bottom of
4128 * the heap are not equal.
4129 */
4130 if (trunk->funcs.connection_prioritise(fr_minmax_heap_max_peek(trunk->active), head) == 0) return;
4131
4132 DEBUG3("Rebalancing requests");
4133
4134 /*
4135 * Keep requeuing requests from the connection
4136 * at the bottom of the heap until the
4137 * connection at the top is shifted from that
4138 * position.
4139 */
4140 while ((fr_minmax_heap_min_peek(trunk->active) == head) &&
4142 TRUNK_REQUEST_STATE_PENDING, 1, false));
4143}
4144
4145/** Implements the algorithm we use to manage requests per connection levels
4146 *
4147 * This is executed periodically using a timer event, and opens/closes
4148 * connections.
4149 *
4150 * The aim is to try and keep the request per connection level in a sweet spot,
4151 * where there's enough outstanding work for the connection/pipelining to work
4152 * efficiently, but not so much so that we encounter increased latency.
4153 *
4154 * In the request enqueue and dequeue functions we record every time the
4155 * average number of requests per connection goes above the target count
4156 * and record every time the average number of requests per connection goes
4157 * below the target count.
4158 *
4159 * This may sound expensive, but in all cases we're just summing counters.
4160 * CPU time required does not increase with additional requests, only with
4161 * large numbers of connections.
4162 *
4163 * If we do encounter scaling issues, we can always maintain the counters
4164 * as aggregates as an optimisation later.
4165 *
4166 * If when the management function runs, the trunk was above the target
4167 * most recently, we:
4168 * - Return if we've been in this state for a shorter period than 'open_delay'.
4169 * - Return if we're at max.
4170 * - Return if opening a new connection will take us below the load target.
4171 * - Return if we last opened a connection within 'open_delay'.
4172 * - Otherwise we attempt to open a new connection.
4173 *
4174 * If the trunk we below the target most recently, we:
4175 * - Return if we've been in this state for a shorter period than 'close_delay'.
4176 * - Return if we're at min.
4177 * - Return if we have no connections.
4178 * - Close a connection if min is 0, and we have no outstanding
4179 * requests. Then return.
4180 * - Return if closing a new connection will take us above the load target.
4181 * - Return if we last closed a connection within 'closed_delay'.
4182 * - Otherwise we move a connection to draining state.
4183 */
4184static void trunk_manage(trunk_t *trunk, fr_time_t now)
4185{
4186 trunk_connection_t *tconn = NULL;
4187 trunk_request_t *treq;
4188 uint32_t average = 0;
4189 uint32_t req_count;
4190 uint16_t conn_count;
4191 trunk_state_t new_state;
4192
4193 DEBUG4("Managing trunk");
4194
4195 /*
4196 * Cleanup requests in our request cache which
4197 * have been reapable for too long.
4198 */
4199 while ((treq = fr_dlist_tail(&trunk->free_requests)) &&
4201
4202 /*
4203 * If we have idle connections, then close them.
4204 */
4207 fr_time_t idle_cutoff = fr_time_sub(now, trunk->conf.idle_timeout);
4208
4209 for (tconn = fr_minmax_heap_iter_init(trunk->active, &iter);
4210 tconn;
4211 tconn = fr_minmax_heap_iter_next(trunk->active, &iter)) {
4212 /*
4213 * The connection has outstanding requests without replies, don't do anything.
4214 */
4215 if (fr_heap_num_elements(tconn->pending) > 0) continue;
4216
4217 /*
4218 * The connection was last active after the idle cutoff time, don't do anything.
4219 */
4220 if (fr_time_gt(tconn->pub.last_write_success, idle_cutoff)) continue;
4221
4222 /*
4223 * This connection has been inactive since before the idle timeout. Drain it,
4224 * and free it.
4225 *
4226 * This also extracts the connection from the minmax heap, which invalidates the
4227 * iterator, so we stop iterating over it.
4228 */
4230 break;
4231 }
4232 }
4233
4234 /*
4235 * Free any connections which have drained
4236 * and we didn't reactivate during the last
4237 * round of management.
4238 */
4242
4243 /*
4244 * Process deferred connection freeing
4245 */
4246 if (!trunk->in_handler) fr_dlist_talloc_free(&trunk->to_free);
4247
4248 /*
4249 * Update the state of the trunk
4250 */
4252 new_state = TRUNK_STATE_ACTIVE;
4253 } else {
4254 /*
4255 * INIT / CONNECTING / FULL mean connections will become active
4256 * so the trunk is PENDING
4257 */
4262 }
4263
4264 if (new_state != trunk->pub.state) TRUNK_STATE_TRANSITION(new_state);
4265
4266 /*
4267 * A trunk can be signalled to not proactively
4268 * manage connections if a destination is known
4269 * to be unreachable, and doing so would result
4270 * in spurious connections still being opened.
4271 *
4272 * We still run other connection management
4273 * functions and just short circuit the function
4274 * here.
4275 */
4276 if (!trunk->managing_connections) return;
4277
4278 /*
4279 * We're above the target requests per connection
4280 * spawn more connections!
4281 */
4283 /*
4284 * If connecting is provided, check we
4285 * wouldn't have too many connections in
4286 * the connecting state.
4287 *
4288 * This is a throttle in the case of transitory
4289 * load spikes, or a backend becoming
4290 * unavailable.
4291 */
4292 if ((trunk->conf.connecting > 0) &&
4294 trunk->conf.connecting)) {
4295 DEBUG4("Not opening connection - Too many (%u) connections in the connecting state",
4296 trunk->conf.connecting);
4297 return;
4298 }
4299
4300 trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4301
4302 /*
4303 * Only apply hysteresis if we have at least
4304 * one available connection.
4305 */
4306 if (conn_count && fr_time_gt(fr_time_add(trunk->pub.last_above_target, trunk->conf.open_delay), now)) {
4307 DEBUG4("Not opening connection - Need to be above target for %pVs. It's been %pVs",
4310 return; /* too soon */
4311 }
4312
4313 /*
4314 * We don't consider 'draining' connections
4315 * in the max calculation, as if we do
4316 * determine that we need to spawn a new
4317 * request, then we'd move all 'draining'
4318 * connections to active before spawning
4319 * any new connections.
4320 */
4321 if ((trunk->conf.max > 0) && (conn_count >= trunk->conf.max)) {
4322 DEBUG4("Not opening connection - Have %u connections, need %u or below",
4323 conn_count, trunk->conf.max);
4324 return;
4325 }
4326
4327 /*
4328 * We consider requests pending on all connections
4329 * and the trunk's backlog as that's the current count
4330 * load.
4331 */
4332 if (!req_count) {
4333 DEBUG4("Not opening connection - No outstanding requests");
4334 return;
4335 }
4336
4337 /*
4338 * Do the n+1 check, i.e. if we open one connection
4339 * will that take us below our target threshold.
4340 */
4341 if (conn_count > 0) {
4342 average = ROUND_UP_DIV(req_count, (conn_count + 1));
4343 if (average < trunk->conf.target_req_per_conn) {
4344 DEBUG4("Not opening connection - Would leave us below our target requests "
4345 "per connection (now %u, after open %u)",
4346 ROUND_UP_DIV(req_count, conn_count), average);
4347 return;
4348 }
4349 } else {
4350 (void)trunk_connection_spawn(trunk, now);
4351 return;
4352 }
4353
4354 /*
4355 * If we've got a connection in the draining list
4356 * move it back into the active list if we've
4357 * been requested to add a connection back in.
4358 */
4359 tconn = fr_dlist_head(&trunk->draining);
4360 if (tconn) {
4361 if (trunk_connection_is_full(tconn)) {
4363 } else {
4365 }
4366 return;
4367 }
4368
4369 /*
4370 * Implement delay if there's no connections that
4371 * could be immediately re-activated.
4372 */
4373 if (fr_time_gt(fr_time_add(trunk->pub.last_open, trunk->conf.open_delay), now)) {
4374 DEBUG4("Not opening connection - Need to wait %pVs before opening another connection. "
4375 "It's been %pVs",
4378 return;
4379 }
4380
4381 DEBUG4("Opening connection - Above target requests per connection (now %u, target %u)",
4382 ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4383 /* last_open set by trunk_connection_spawn */
4384 (void)trunk_connection_spawn(trunk, now);
4385 }
4386
4387 /*
4388 * We're below the target requests per connection.
4389 * Free some connections...
4390 */
4391 else if (fr_time_gt(trunk->pub.last_below_target, trunk->pub.last_above_target)) {
4392 if (fr_time_gt(fr_time_add(trunk->pub.last_below_target, trunk->conf.close_delay), now)) {
4393 DEBUG4("Not closing connection - Need to be below target for %pVs. It's been %pVs",
4396 return; /* too soon */
4397 }
4398
4399 trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4400
4401 if (!conn_count) {
4402 DEBUG4("Not closing connection - No connections to close!");
4403 return;
4404 }
4405
4406 if ((trunk->conf.min > 0) && ((conn_count - 1) < trunk->conf.min)) {
4407 DEBUG4("Not closing connection - Have %u connections, need %u or above",
4408 conn_count, trunk->conf.min);
4409 return;
4410 }
4411
4412 if (!req_count) {
4413 DEBUG4("Closing connection - No outstanding requests");
4414 goto close;
4415 }
4416
4417 /*
4418 * The minimum number of connections must be set
4419 * to zero for this to work.
4420 * min == 0, no requests, close all the connections.
4421 * This is useful for backup databases, when
4422 * maintaining the connection would lead to lots of
4423 * log file churn.
4424 */
4425 if (conn_count == 1) {
4426 DEBUG4("Not closing connection - Would leave connections "
4427 "and there are still %u outstanding requests", req_count);
4428 return;
4429 }
4430
4431 /*
4432 * Do the n-1 check, i.e. if we close one connection
4433 * will that take us above our target threshold.
4434 */
4435 average = ROUND_UP_DIV(req_count, (conn_count - 1));
4436 if (average > trunk->conf.target_req_per_conn) {
4437 DEBUG4("Not closing connection - Would leave us above our target requests per connection "
4438 "(now %u, after close %u)", ROUND_UP_DIV(req_count, conn_count), average);
4439 return;
4440 }
4441
4442 DEBUG4("Closing connection - Below target requests per connection (now %u, target %u)",
4443 ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4444
4445 close:
4446 if (fr_time_gt(fr_time_add(trunk->pub.last_closed, trunk->conf.close_delay), now)) {
4447 DEBUG4("Not closing connection - Need to wait %pVs before closing another connection. "
4448 "It's been %pVs",
4451 return;
4452 }
4453
4454 /*
4455 * If the last event on the trunk was a connection failure and
4456 * there is only one connection, this may well be a reconnect
4457 * attempt after a failure - and needs to persist otherwise
4458 * the last event will be a failure and no new connection will
4459 * be made, leading to no new requests being enqueued.
4460 */
4461 if (fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
4462 fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed) && (conn_count == 1)) {
4463 DEBUG4("Not closing remaining connection - last event was a failure");
4464 return;
4465 }
4466
4467 /*
4468 * Inactive connections get counted in the
4469 * set of viable connections, but are likely
4470 * to be congested or dead, so we drain
4471 * (and possibly eventually free) those first.
4472 */
4473 if ((tconn = fr_dlist_tail(&trunk->inactive))) {
4474 /*
4475 * If the connection has no requests associated
4476 * with it then immediately free.
4477 */
4479 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4480 } else {
4482 }
4483 /*
4484 * It is possible to have too may connecting
4485 * connections when the connections are
4486 * taking a while to open and the number
4487 * of requests decreases.
4488 */
4489 } else if ((tconn = fr_dlist_tail(&trunk->connecting))) {
4490 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4491
4492 /*
4493 * Finally if there are no "connecting"
4494 * connections to close, and no "inactive"
4495 * connections, start draining "active"
4496 * connections.
4497 */
4498 } else if ((tconn = fr_minmax_heap_max_peek(trunk->active))) {
4499 /*
4500 * If the connection has no requests associated
4501 * with it then immediately free.
4502 */
4504 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4505 } else {
4507 }
4508 }
4509
4510 trunk->pub.last_closed = now;
4511
4512 return;
4513 }
4514}
4515
4516/** Event to periodically call the connection management function
4517 *
4518 * @param[in] tl this event belongs to.
4519 * @param[in] now current time.
4520 * @param[in] uctx The trunk.
4521 */
4522static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx)
4523{
4524 trunk_t *trunk = talloc_get_type_abort(uctx, trunk_t);
4525
4526 trunk_manage(trunk, now);
4527
4529 if (fr_timer_in(trunk, tl, &trunk->manage_ev, trunk->conf.manage_interval,
4530 false, _trunk_timer, trunk) < 0) {
4531 PERROR("Failed inserting trunk management event");
4532 /* Not much we can do, hopefully the trunk will be freed soon */
4533 }
4534 }
4535}
4536
4537/** Return a count of requests on a connection in a specific state
4538 *
4539 * @param[in] trunk to retrieve counts for.
4540 * @param[in] conn_state One or more connection states or'd together.
4541 * @param[in] req_state One or more request states or'd together.
4542 * @return The number of requests in a particular state, on connection in a particular state.
4543 */
4544uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
4545{
4546 uint64_t count = 0;
4547 trunk_connection_t *tconn = NULL;
4549
4550#define COUNT_BY_STATE(_state, _list) \
4551do { \
4552 if (conn_state & (_state)) { \
4553 tconn = NULL; \
4554 while ((tconn = fr_dlist_next(&trunk->_list, tconn))) { \
4555 count += trunk_request_count_by_connection(tconn, req_state); \
4556 } \
4557 } \
4558} while (0)
4559
4560 if (conn_state & TRUNK_CONN_ACTIVE) {
4561 for (tconn = fr_minmax_heap_iter_init(trunk->active, &iter);
4562 tconn;
4563 tconn = fr_minmax_heap_iter_next(trunk->active, &iter)) {
4564 count += trunk_request_count_by_connection(tconn, req_state);
4565 }
4566 }
4567
4570 COUNT_BY_STATE(TRUNK_CONN_INACTIVE_DRAINING, inactive_draining);
4573
4575
4576 return count;
4577}
4578
4579/** Update timestamps for when we last had a transition from above target to below target or vice versa
4580 *
4581 * Should be called on every time a connection or request is allocated or freed.
4582 *
4583 * @param[out] conn_count_out How many connections we considered.
4584 * @param[out] req_count_out How many requests we considered.
4585 * @param[in] trunk to operate on.
4586 * @param[in] now The current time.
4587 * @param[in] verify if true (and this is a debug build), then assert if req_per_conn
4588 * has changed.
4589 * @return
4590 * - 0 if the average couldn't be calculated (no requests or no connections).
4591 * - The average number of requests per connection.
4592 */
4593static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_count_out,
4594 trunk_t *trunk, fr_time_t now,
4595 NDEBUG_UNUSED bool verify)
4596{
4597 uint32_t req_count = 0;
4598 uint16_t conn_count = 0;
4599 uint64_t req_per_conn = 0;
4600
4602
4603 /*
4604 * No need to update these as the trunk is being freed
4605 */
4606 if (trunk->freeing) goto done;
4607
4608 /*
4609 * Count all connections except draining and draining to free.
4610 *
4611 * Omitting these connection states artificially raises the
4612 * request to connection ratio, so that we can preemptively spawn
4613 * new connections.
4614 *
4615 * In the case of TRUNK_CONN_DRAINING | TRUNK_CONN_INACTIVE_DRAINING
4616 * the trunk management code has enough hysteresis to not
4617 * immediately reactivate the connection.
4618 *
4619 * In the case of TRUNK_CONN_DRAINING_TO_FREE the trunk
4620 * management code should spawn a new connection to takes its place.
4621 *
4622 * Connections placed in the DRAINING_TO_FREE state are being
4623 * closed preemptively to deal with bugs on the server we're
4624 * talking to, or misconfigured firewalls which are trashing
4625 * TCP/UDP connection states.
4626 */
4631
4632 /*
4633 * Requests on all connections
4634 */
4635 req_count = trunk_request_count_by_state(trunk,
4638
4639 /*
4640 * No connections, but we do have requests
4641 */
4642 if (conn_count == 0) {
4643 if ((req_count > 0) && (trunk->conf.target_req_per_conn > 0)) goto above_target;
4644 goto done;
4645 }
4646
4647 if (req_count == 0) {
4648 if (trunk->conf.target_req_per_conn > 0) goto below_target;
4649 goto done;
4650 }
4651
4652 /*
4653 * Calculate the req_per_conn
4654 */
4655 req_per_conn = ROUND_UP_DIV(req_count, conn_count);
4656 if (req_per_conn > trunk->conf.target_req_per_conn) {
4657 above_target:
4658 /*
4659 * Edge - Below target to above target (too many requests per conn - spawn more)
4660 *
4661 * The equality check is correct here as both values start at 0.
4662 */
4664 } else if (req_per_conn < trunk->conf.target_req_per_conn) {
4665 below_target:
4666 /*
4667 * Edge - Above target to below target (too few requests per conn - close some)
4668 *
4669 * The equality check is correct here as both values start at 0.
4670 */
4672 }
4673
4674done:
4675 if (conn_count_out) *conn_count_out = conn_count;
4676 if (req_count_out) *req_count_out = req_count;
4677
4678 /*
4679 * Check we haven't missed a call to trunk_requests_per_connection
4680 */
4681 fr_assert(!verify || (trunk->last_req_per_conn == 0) || (req_per_conn == trunk->last_req_per_conn));
4682
4683 trunk->last_req_per_conn = req_per_conn;
4684
4685 return req_per_conn;
4686}
4687
4688/** Drain the backlog of as many requests as possible
4689 *
4690 * @param[in] trunk To drain backlog requests for.
4691 */
4692static void trunk_backlog_drain(trunk_t *trunk)
4693{
4694 trunk_request_t *treq;
4695
4696 if (fr_heap_num_elements(trunk->backlog) == 0) return;
4697
4698 /*
4699 * If it's always writable, this isn't
4700 * really a noteworthy event.
4701 */
4702 if (!trunk->conf.always_writable) DEBUG3("Draining backlog of requests");
4703
4704 /*
4705 * Do *NOT* add an artificial limit
4706 * here. We rely on all available
4707 * connections entering the full
4708 * state and transitioning back to
4709 * active in order to drain the
4710 * backlog.
4711 */
4712 while ((treq = fr_heap_peek(trunk->backlog))) {
4713 switch (trunk_request_enqueue_existing(treq)) {
4714 case TRUNK_ENQUEUE_OK:
4715 continue;
4716
4717 /*
4718 * Signal to stop
4719 */
4721 break;
4722
4723 /*
4724 * Failed enqueueing the request,
4725 * have it enter the failed state
4726 * which will free it and
4727 * re-enliven the yielded request.
4728 */
4730 case TRUNK_ENQUEUE_FAIL:
4732 continue;
4733
4736 return;
4737 }
4738 }
4739}
4740
4741/** Force the trunk to re-establish its connections
4742 *
4743 * @param[in] trunk to signal.
4744 * @param[in] states One or more states or'd together.
4745 * @param[in] reason Why the connections are being signalled to reconnect.
4746 */
4747void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
4748{
4749
4750#define RECONNECT_BY_STATE(_state, _list) \
4751do { \
4752 if (states & (_state)) { \
4753 size_t i; \
4754 for (i = fr_dlist_num_elements(&trunk->_list); i > 0; i--) { \
4755 connection_signal_reconnect(((trunk_connection_t *)fr_dlist_tail(&trunk->_list))->pub.conn, reason); \
4756 } \
4757 } \
4758} while (0)
4759
4760 /*
4761 * Connections in the 'connecting' state
4762 * may re-enter that state, so we need to
4763 * be careful not to enter an infinite
4764 * loop, as we iterate over the list
4765 * again and again.
4766 */
4768
4769 if (states & TRUNK_CONN_ACTIVE) {
4770 trunk_connection_t *tconn;
4771 while ((tconn = fr_minmax_heap_min_peek(trunk->active))) connection_signal_reconnect(tconn->pub.conn, reason);
4772 }
4773
4781}
4782
4783/** Start the trunk running
4784 *
4785 */
4787{
4788 uint16_t i;
4789
4790 if (unlikely(trunk->started)) return 0;
4791
4792 /*
4793 * Spawn the initial set of connections
4794 */
4795 for (i = 0; i < trunk->conf.start; i++) {
4796 DEBUG("[%i] Starting initial connection", i);
4797 if (trunk_connection_spawn(trunk, fr_time()) != 0) return -1;
4798 }
4799
4800 /*
4801 * If the idle timeout is set, AND there's no management interval, OR the management interval is
4802 * less than the idle timeout, update the management interval.
4803 */
4807 trunk->conf.manage_interval = trunk->conf.idle_timeout;
4808 }
4809
4811 /*
4812 * Insert the event timer to manage
4813 * the interval between managing connections.
4814 */
4815 if (fr_timer_in(trunk, trunk->el->tl, &trunk->manage_ev, trunk->conf.manage_interval,
4816 false, _trunk_timer, trunk) < 0) {
4817 PERROR("Failed inserting trunk management event");
4818 return -1;
4819 }
4820 }
4821 trunk->started = true;
4822 trunk->managing_connections = true;
4823
4824 return 0;
4825}
4826
4827/** Allow the trunk to open and close connections in response to load
4828 *
4829 */
4831{
4832 if (!trunk->started || trunk->managing_connections) return;
4833
4834 DEBUG3("Connection management enabled");
4835 trunk->managing_connections = true;
4836}
4837
4838/** Stop the trunk from opening and closing connections in response to load
4839 *
4840 */
4842{
4843 if (!trunk->started || !trunk->managing_connections) return;
4844
4845 DEBUG3("Connection management disabled");
4846 trunk->managing_connections = false;
4847}
4848
4849/** Schedule a trunk management event for the next time the event loop is executed
4850 */
4852{
4853 if (!trunk->started || !trunk->managing_connections) return 0;
4854
4855 if (fr_timer_in(trunk, trunk->el->tl, &trunk->manage_ev, fr_time_delta_wrap(0),
4856 false, _trunk_timer, trunk) < 0) {
4857 PERROR("Failed inserting trunk management event");
4858 return -1;
4859 }
4860
4861 return 0;
4862}
4863
4864/** Order connections by queue depth
4865 *
4866 */
4867static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
4868{
4871
4874
4875 /*
4876 * Add a fudge factor of 1 to reduce spurious rebalancing
4877 */
4878 return ((a_count > b_count) && ((a_count - b_count) > 1)) - ((b_count > a_count) && ((b_count - a_count) > 1));
4879}
4880
4881/** Free a trunk, gracefully closing all connections.
4882 *
4883 */
4884static int _trunk_free(trunk_t *trunk)
4885{
4886 trunk_connection_t *tconn;
4887 trunk_request_t *treq;
4888 trunk_watch_entry_t *watch;
4889 size_t i;
4890
4891 DEBUG4("Trunk free %p", trunk);
4892
4893 trunk->freeing = true; /* Prevent re-enqueuing */
4894
4895 /*
4896 * We really don't want this firing after
4897 * we've freed everything.
4898 */
4900
4901 /*
4902 * Now free the connections in each of the lists.
4903 *
4904 * Each time a connection is freed it removes itself from the list
4905 * its in, which means the head should keep advancing automatically.
4906 */
4907 while ((tconn = fr_minmax_heap_min_peek(trunk->active))) connection_signal_halt(tconn->pub.conn);
4908 while ((tconn = fr_dlist_head(&trunk->init))) connection_signal_halt(tconn->pub.conn);
4909 while ((tconn = fr_dlist_head(&trunk->connecting))) connection_signal_halt(tconn->pub.conn);
4910 while ((tconn = fr_dlist_head(&trunk->full))) connection_signal_halt(tconn->pub.conn);
4911 while ((tconn = fr_dlist_head(&trunk->inactive))) connection_signal_halt(tconn->pub.conn);
4912 while ((tconn = fr_dlist_head(&trunk->inactive_draining))) connection_signal_halt(tconn->pub.conn);
4913 while ((tconn = fr_dlist_head(&trunk->closed))) connection_signal_halt(tconn->pub.conn);
4914 while ((tconn = fr_dlist_head(&trunk->draining))) connection_signal_halt(tconn->pub.conn);
4915 while ((tconn = fr_dlist_head(&trunk->draining_to_free))) connection_signal_halt(tconn->pub.conn);
4916
4917 /*
4918 * Process any deferred connection frees
4919 */
4921
4922 /*
4923 * Free any requests left in the backlog
4924 */
4925 while ((treq = fr_heap_peek(trunk->backlog))) trunk_request_enter_failed(treq);
4926
4927 /*
4928 * Free any requests in our request cache
4929 */
4930 while ((treq = fr_dlist_head(&trunk->free_requests))) talloc_free(treq);
4931
4932 /*
4933 * Free any entries in the watch lists
4934 */
4935 for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
4936 while ((watch = fr_dlist_pop_head(&trunk->watch[i]))) talloc_free(watch);
4937 }
4938
4939 return 0;
4940}
4941
4942/** Allocate a new collection of connections
4943 *
4944 * This function should be called first to allocate a new trunk connection.
4945 *
4946 * After the trunk has been allocated, #trunk_request_alloc and
4947 * #trunk_request_enqueue should be used to allocate memory for trunk
4948 * requests, and pass a preq (protocol request) to the trunk for
4949 * processing.
4950 *
4951 * The trunk will then asynchronously process the request, writing the result
4952 * to a specified rctx. See #trunk_request_enqueue for more details.
4953 *
4954 * @note Trunks may not be shared between multiple threads under any circumstances.
4955 *
4956 * @param[in] ctx To use for any memory allocations. Must be thread local.
4957 * @param[in] el to use for I/O and timer events.
4958 * @param[in] funcs Callback functions.
4959 * @param[in] conf Common user configurable parameters.
4960 * @param[in] log_prefix To prepend to global messages.
4961 * @param[in] uctx User data to pass to the alloc function.
4962 * @param[in] delay_start If true, then we will not spawn any connections
4963 * until the first request is enqueued.
4964 * @param[in] trigger_args Pairs to pass to trigger requests, if triggers are enabled.
4965 * @return
4966 * - New trunk handle on success.
4967 * - NULL on error.
4968 */
4970 trunk_io_funcs_t const *funcs, trunk_conf_t const *conf,
4971 char const *log_prefix, void const *uctx, bool delay_start, fr_pair_list_t *trigger_args)
4972{
4973 trunk_t *trunk;
4974 size_t i;
4975
4976 /*
4977 * Check we have the functions we need
4978 */
4979 if (!fr_cond_assert(funcs->connection_alloc)) return NULL;
4980
4981 MEM(trunk = talloc_zero(ctx, trunk_t));
4982 trunk->el = el;
4983 trunk->log_prefix = talloc_strdup(trunk, log_prefix);
4984 trunk->trigger_args = trigger_args;
4985
4986 memcpy(&trunk->funcs, funcs, sizeof(trunk->funcs));
4987 if (!trunk->funcs.connection_prioritise) {
4989 }
4991
4992 memcpy(&trunk->conf, conf, sizeof(trunk->conf));
4993
4994 memcpy(&trunk->uctx, &uctx, sizeof(trunk->uctx));
4995 talloc_set_destructor(trunk, _trunk_free);
4996
4997 /*
4998 * Unused request list...
4999 */
5001
5002 /*
5003 * Request backlog queue
5004 */
5006 trunk_request_t, heap_id, 0));
5007
5008 /*
5009 * Connection queues and trees
5010 */
5012 trunk_connection_t, heap_id, 0));
5022
5023 /*
5024 * Watch lists
5025 */
5026 for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
5028 }
5029
5030 DEBUG4("Trunk allocated %p", trunk);
5031
5032 if (!delay_start) {
5033 if (trunk_start(trunk) < 0) {
5034 talloc_free(trunk);
5035 return NULL;
5036 }
5037 }
5038
5039 return trunk;
5040}
5041
5042/** Check for a module trigger section when parsing the `triggers` option.
5043 *
5044 */
5045int trunk_trigger_cf_parse(TALLOC_CTX *ctx, void *out, void *parent, CONF_ITEM *ci, conf_parser_t const *rule)
5046{
5049
5050 if (cf_pair_parse_value(ctx, out, parent, ci, rule)< 0) return -1;
5051
5052 /*
5053 * If the parent section of the `triggers` option contains a trigger
5054 * section then store it as the module CONF SECTION for the appropriate
5055 * trigger group.
5056 */
5057 if (cf_section_find(cs, "trigger", NULL)) {
5058 if (strcmp(cf_section_name(cs), "request") == 0) {
5059 conf->req_trigger_cs = cs;
5060 } else {
5061 conf->conn_trigger_cs = cs;
5062 }
5063 }
5064
5065 return 0;
5066}
5067
5068#ifndef TALLOC_GET_TYPE_ABORT_NOOP
5069/** Verify a trunk
5070 *
5071 * A trunk has some number of connections, which each have some number of requests. The connections and
5072 * requests are in differing kinds of containers depending on their state and how they are used, and may
5073 * have fields that can only be validated by comparison with a parent. We had planned on passing a "context"
5074 * down with the ancestral values, but that breaks the foo_verify() API. Each foo_verify() will only verify the
5075 * foo's children.
5076 */
5077void trunk_verify(char const *file, int line, trunk_t *trunk)
5078{
5079 fr_fatal_assert_msg(trunk, "CONSISTENCY CHECK FAILED %s[%i]: trunk_t pointer was NULL", file, line);
5080 (void) talloc_get_type_abort(trunk, trunk_t);
5081
5082 for (size_t i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
5083 _fr_dlist_verify(file, line, &trunk->watch[i]);
5084 }
5085
5086#define IO_FUNC_VERIFY(_func) \
5087 fr_fatal_assert_msg(trunk->funcs._func, "CONSISTENCY_CHECK_FAILED %s[%i}: " #_func " was NULL", file, line)
5088
5089 /*
5090 * Only a few of the function pointers *must* be non-NULL..
5091 */
5093 IO_FUNC_VERIFY(connection_prioritise);
5095
5096#define TRUNK_TCONN_CHECKS(_tconn, _state) \
5097do { \
5098 fr_fatal_assert_msg(trunk == _tconn->pub.trunk, \
5099 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-trunk mismatch", file, line); \
5100 fr_fatal_assert_msg(_state == _tconn->pub.state, \
5101 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-state mismatch", file, line); \
5102} while (0)
5103
5104#define TCONN_DLIST_VERIFY(_dlist, _state) \
5105do { \
5106 _fr_dlist_verify(file, line, &(trunk->_dlist)); \
5107 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5108 trunk_connection_verify(file, line, tconn); \
5109 TRUNK_TCONN_CHECKS(tconn, _state); \
5110 } \
5111} while (0)
5112
5113#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state) \
5114do {\
5115 fr_minmax_heap_verify(file, line, trunk->_heap); \
5116 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5117 trunk_connection_verify(file, line, tconn); \
5118 TRUNK_TCONN_CHECKS(tconn, _state); \
5119 }} \
5120} while (0)
5121
5122 fr_dlist_verify(&(trunk->free_requests));
5123 FR_HEAP_VERIFY(trunk->backlog);
5124
5131 /* TCONN_DLIST_VERIFY(failed, ???); */
5136}
5137
5139{
5140 fr_fatal_assert_msg(tconn, "CONSISTENCY CHECK FAILED %s[%i]: trunk_connection_t pointer was NULL", file, line);
5141 (void) talloc_get_type_abort(tconn, trunk_connection_t);
5142
5143 (void) talloc_get_type_abort(tconn->pub.trunk, trunk_t);
5144
5145 /*
5146 * shouldn't be both in heap and on list--but it doesn't look like moves
5147 * to active heap wipe the dlist pointers.
5148 */
5149
5150#define TCONN_TREQ_CHECKS(_treq, _state) \
5151do { \
5152 fr_fatal_assert_msg(tconn == _treq->pub.tconn, \
5153 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-tconn mismatch", file, line); \
5154 fr_fatal_assert_msg(tconn->pub.trunk == _treq->pub.trunk, \
5155 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-trunk mismatch", file, line); \
5156 fr_fatal_assert_msg(_state == _treq->pub.state, \
5157 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-state mismatch", file, line); \
5158} while (0)
5159
5160#define TREQ_DLIST_VERIFY(_dlist, _state) \
5161do { \
5162 _fr_dlist_verify(file, line, &(tconn->_dlist)); \
5163 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5164 trunk_request_verify(file, line, treq); \
5165 TCONN_TREQ_CHECKS(treq, _state); \
5166 } \
5167} while (0)
5168
5169#define TREQ_HEAP_VERIFY(_heap, _state) \
5170do { \
5171 fr_heap_iter_t _iter; \
5172 fr_heap_verify(file, line, tconn->_heap); \
5173 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5174 treq; \
5175 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5176 trunk_request_verify(file, line, treq); \
5177 TCONN_TREQ_CHECKS(treq, _state); \
5178 } \
5179} while (0)
5180
5181#define TREQ_OPTION_VERIFY(_option, _state) \
5182do { \
5183 if (tconn->_option) { \
5184 trunk_request_verify(file, line, tconn->_option); \
5185 TCONN_TREQ_CHECKS(tconn->_option, _state); \
5186 } \
5187} while (0)
5188
5189 /* verify associated requests */
5196}
5197
5198void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
5199{
5200 fr_fatal_assert_msg(treq, "CONSISTENCY CHECK FAILED %s[%i]: trunk_request_t pointer was NULL", file, line);
5201 (void) talloc_get_type_abort(treq, trunk_request_t);
5202
5203#ifdef WITH_VERIFY_PTR
5204 if (treq->pub.request) request_verify(file, line, treq->pub.request);
5205#endif
5206}
5207
5208
5209bool trunk_search(trunk_t *trunk, void *ptr)
5210{
5211#define TCONN_DLIST_SEARCH(_dlist) \
5212do { \
5213 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5214 if (ptr == tconn) { \
5215 fr_fprintf(stderr, "trunk_search: tconn %p on " #_dlist "\n", ptr); \
5216 return true; \
5217 } \
5218 if (trunk_connection_search(tconn, ptr)) { \
5219 fr_fprintf(stderr, " in tconn %p on " #_dlist "\n", tconn); \
5220 return true; \
5221 } \
5222 } \
5223} while (0)
5224
5225#define TCONN_MINMAX_HEAP_SEARCH(_heap) \
5226do { \
5227 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5228 if (ptr == tconn) { \
5229 fr_fprintf(stderr, "trunk_search: tconn %p on " #_heap "\n", ptr); \
5230 return true; \
5231 } \
5232 if (trunk_connection_search(tconn, ptr)) { \
5233 fr_fprintf(stderr, " on tconn %p on " #_heap "\n", tconn); \
5234 return true; \
5235 } \
5236 }}\
5237} while (0)
5238
5240 TCONN_DLIST_SEARCH(connecting);
5242 TCONN_DLIST_SEARCH(full);
5243 TCONN_DLIST_SEARCH(inactive);
5244 TCONN_DLIST_SEARCH(inactive_draining);
5245 TCONN_DLIST_SEARCH(failed);
5246 TCONN_DLIST_SEARCH(closed);
5247 TCONN_DLIST_SEARCH(draining);
5248 TCONN_DLIST_SEARCH(draining_to_free);
5249 TCONN_DLIST_SEARCH(to_free);
5250
5251 return false;
5252}
5253
5255{
5256#define TREQ_DLIST_SEARCH(_dlist) \
5257do { \
5258 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5259 if (ptr == treq) { \
5260 fr_fprintf(stderr, "trunk_search: treq %p on " #_dlist "\n", ptr); \
5261 return true; \
5262 } \
5263 if (trunk_request_search(treq, ptr)) { \
5264 fr_fprintf(stderr, "trunk_search: preq %p found on " #_dlist, ptr); \
5265 return true; \
5266 } \
5267 } \
5268} while (0)
5269
5270#define TREQ_HEAP_SEARCH(_heap) \
5271do { \
5272 fr_heap_iter_t _iter; \
5273 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5274 treq; \
5275 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5276 if (ptr == treq) { \
5277 fr_fprintf(stderr, "trunk_search: treq %p in " #_heap "\n", ptr); \
5278 return true; \
5279 } \
5280 if (trunk_request_search(treq, ptr)) { \
5281 fr_fprintf(stderr, "trunk_search: preq %p found in " #_heap, ptr); \
5282 return true; \
5283 } \
5284 } \
5285} while (0)
5286
5287#define TREQ_OPTION_SEARCH(_option) \
5288do { \
5289 if (tconn->_option) { \
5290 if (ptr == tconn->_option) { \
5291 fr_fprintf(stderr, "trunk_search: treq %p is " #_option "\n", ptr); \
5292 return true; \
5293 } \
5294 if (trunk_request_search(tconn->_option, ptr)) { \
5295 fr_fprintf(stderr, "trunk_search: preq %p found in " #_option, ptr); \
5296 return true; \
5297 } \
5298 } \
5299} while (0)
5300
5301 /* search associated requests */
5302 TREQ_HEAP_SEARCH(pending);
5303 TREQ_DLIST_SEARCH(sent);
5304 TREQ_DLIST_SEARCH(cancel);
5305 TREQ_DLIST_SEARCH(cancel_sent);
5306 TREQ_OPTION_SEARCH(partial);
5307 TREQ_OPTION_SEARCH(cancel_partial);
5308
5309 return false;
5310}
5311
5313{
5314 return treq->pub.preq == ptr;
5315}
5316#endif
int const char * file
Definition acutest.h:704
int const char int line
Definition acutest.h:704
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request)
static bool init
Definition fuzzer.c:42
#define L(_str)
Helper for initialising arrays of string literals.
Definition build.h:209
#define NDEBUG_UNUSED
Definition build.h:328
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition build.h:324
#define unlikely(_x)
Definition build.h:383
#define UNUSED
Definition build.h:317
#define NUM_ELEMENTS(_t)
Definition build.h:339
int cf_pair_parse_value(TALLOC_CTX *ctx, void *out, UNUSED void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Parses a CONF_PAIR into a C data type.
Definition cf_parse.c:214
#define CONF_PARSER_TERMINATOR
Definition cf_parse.h:660
cf_parse_t func
Override default parsing behaviour for the specified type with a custom parsing function.
Definition cf_parse.h:614
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition cf_parse.h:283
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
Definition cf_parse.h:337
#define FR_CONF_OFFSET_SUBSECTION(_name, _flags, _struct, _field, _subcs)
conf_parser_t which populates a sub-struct using a CONF_SECTION
Definition cf_parse.h:312
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Definition cf_parse.h:426
Defines a CONF_PAIR to C data type mapping.
Definition cf_parse.h:597
Common header for all CONF_* types.
Definition cf_priv.h:49
Configuration AVP similar to a fr_pair_t.
Definition cf_priv.h:72
A section grouping multiple CONF_PAIR.
Definition cf_priv.h:101
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
Definition cf_util.c:1027
CONF_SECTION * cf_item_to_section(CONF_ITEM const *ci)
Cast a CONF_ITEM to a CONF_SECTION.
Definition cf_util.c:683
char const * cf_section_name(CONF_SECTION const *cs)
Return name2 if set, else name1.
Definition cf_util.c:1196
#define cf_parent(_cf)
Definition cf_util.h:101
connection_state_t
Definition connection.h:47
@ CONNECTION_STATE_FAILED
Connection has failed.
Definition connection.h:56
@ CONNECTION_STATE_HALTED
The connection is in a halted stat.
Definition connection.h:48
@ CONNECTION_STATE_CLOSED
Connection has been closed.
Definition connection.h:57
@ CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
Definition connection.h:54
@ CONNECTION_STATE_INIT
Init state, sets up connection.
Definition connection.h:51
@ CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
Definition connection.h:52
@ CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
Definition connection.h:55
connection_reason_t
Definition connection.h:84
static size_t min(size_t x, size_t y)
Definition dbuff.c:66
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:131
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:202
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:148
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:176
#define MEM(x)
Definition debug.h:36
#define DEBUG(fmt,...)
Definition dhcpclient.c:39
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:242
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition dlist.h:468
static void _fr_dlist_verify(char const *file, int line, fr_dlist_head_t const *list_head)
Check all items in the list are valid.
Definition dlist.h:717
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition dlist.h:620
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition dlist.h:145
static void fr_dlist_talloc_free(fr_dlist_head_t *head)
Free all items in a doubly linked list (with talloc)
Definition dlist.h:892
static void * fr_dlist_prev(fr_dlist_head_t const *list_head, void const *ptr)
Get the previous item in a list.
Definition dlist.h:570
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:921
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition dlist.h:654
static void * fr_dlist_tail(fr_dlist_head_t const *list_head)
Return the TAIL item of a list or NULL if the list is empty.
Definition dlist.h:513
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:360
#define fr_dlist_verify(_head)
Definition dlist.h:737
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:257
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
Definition dlist.h:320
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Definition dlist.h:537
Head of a doubly linked list.
Definition dlist.h:51
Entry in a doubly linked list.
Definition dlist.h:41
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
unsigned int fr_heap_index_t
Definition heap.h:80
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition heap.h:136
#define FR_HEAP_VERIFY(_heap)
Definition heap.h:212
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition heap.h:115
The main heap structure.
Definition heap.h:66
talloc_free(hp)
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition log.h:540
#define RDEBUG3(fmt,...)
Definition log.h:355
#define RWARN(fmt,...)
Definition log.h:309
#define DEBUG4(_fmt,...)
Definition log.h:267
#define RATE_LIMIT_LOCAL_ROPTIONAL(_entry, _l_request, _l_global, _fmt,...)
Rate limit messages using a local limiting entry.
Definition log.h:618
Track when a log message was last repeated.
Definition log.h:559
#define fr_time()
Definition event.c:60
Stores all information relating to an event list.
Definition event.c:377
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition log.c:577
fr_log_type_t
Definition log.h:54
#define ROUND_UP_DIV(_x, _y)
Get the ceiling value of integer division.
Definition math.h:211
unsigned short uint16_t
unsigned int uint32_t
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
void * fr_minmax_heap_iter_next(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Get the next entry in a minmax heap.
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
void * fr_minmax_heap_iter_init(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Iterate over entries in a minmax heap.
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
unsigned int fr_minmax_heap_iter_t
Definition minmax_heap.h:38
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
Definition minmax_heap.h:85
int8_t fr_pointer_cmp(void const *a, void const *b)
Compares two pointers.
Definition misc.c:450
static int8_t request_prioritise(void const *one, void const *two)
Definition bio.c:1154
#define fr_assert(_expr)
Definition rad_assert.h:38
#define RDEBUG(fmt,...)
#define DEBUG2(fmt,...)
#define WARN(fmt,...)
static bool done
Definition radclient.c:83
#define INFO(fmt,...)
Definition radict.c:64
static fr_event_list_t * events
Definition radsniff.c:59
static rs_t * conf
Definition radsniff.c:53
void connection_signal_shutdown(connection_t *conn)
Shuts down a connection gracefully.
int connection_del_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a post list.
Definition connection.c:483
void connection_signal_halt(connection_t *conn)
Shuts down a connection ungracefully.
void connection_signals_resume(connection_t *conn)
Resume processing of deferred signals.
Definition connection.c:330
void connection_signal_reconnect(connection_t *conn, connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
void connection_signal_init(connection_t *conn)
Asynchronously signal a halted connection to start.
connection_t * connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_funcs_t const *funcs, connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
connection_watch_entry_t * connection_add_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed before a state function has been called.
Definition connection.c:521
connection_watch_entry_t * connection_add_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed after a state function has been called.
Definition connection.c:543
int connection_del_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a pre list.
Definition connection.c:466
void connection_signals_pause(connection_t *conn)
Pause processing of deferred signals.
Definition connection.c:321
static fr_time_t test_time(void)
Definition slab_tests.c:43
static fr_time_t test_time_base
Definition slab_tests.c:42
return count
Definition module.c:155
@ memory_order_relaxed
Definition stdatomic.h:127
#define atomic_fetch_add_explicit(object, operand, order)
Definition stdatomic.h:302
#define ATOMIC_VAR_INIT(value)
Definition stdatomic.h:88
Definition log.h:96
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
An element in a table indexed by bit position.
Definition table.h:83
An element in an arbitrarily ordered array of name to num mappings.
Definition table.h:57
#define talloc_get_type_abort_const
Definition talloc.h:113
#define talloc_pooled_object(_ctx, _type, _num_subobjects, _total_subobjects_size)
Definition talloc.h:207
#define talloc_strdup(_ctx, _str)
Definition talloc.h:145
#define fr_time_gteq(_a, _b)
Definition time.h:238
#define fr_time_delta_wrap(_time)
Definition time.h:152
#define fr_time_wrap(_time)
Definition time.h:145
#define fr_time_lteq(_a, _b)
Definition time.h:240
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_gt(_a, _b)
Definition time.h:237
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
#define fr_time_lt(_a, _b)
Definition time.h:239
#define fr_time_delta_gt(_a, _b)
Definition time.h:283
"server local" time.
Definition time.h:69
An event timer list.
Definition timer.c:50
A timer event.
Definition timer.c:84
#define FR_TIMER_DELETE(_ev_p)
Definition timer.h:103
#define FR_TIMER_DELETE_RETURN(_ev_p)
Definition timer.h:110
#define fr_timer_in(...)
Definition timer.h:87
#define FR_TIMER_DISARM(_ev)
Definition timer.h:91
bool trunk_search(trunk_t *trunk, void *ptr)
Definition trunk.c:5209
static atomic_uint_fast64_t request_counter
Definition trunk.c:54
CONF_PAIR * trigger_cp[NUM_ELEMENTS(trunk_conn_trigger_names)]
Cached trigger CONF_PAIRs.
Definition trunk.c:319
static void trunk_connection_enter_active(trunk_connection_t *tconn)
Transition a connection back to the active state.
Definition trunk.c:3269
#define CONN_REORDER(_tconn)
Reorder the connections in the active heap.
Definition trunk.c:792
static size_t trunk_req_trigger_names_len
Definition trunk.c:388
int trunk_connection_pop_cancellation(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a cancellation request off a connection's cancellation queue.
Definition trunk.c:3856
fr_dlist_head_t cancel
Requests in the cancel state.
Definition trunk.c:161
int trunk_connection_manage_schedule(trunk_t *trunk)
Schedule a trunk management event for the next time the event loop is executed.
Definition trunk.c:4851
#define REQUEST_EXTRACT_SENT(_treq)
Remove the current request from the sent list.
Definition trunk.c:762
static void _trunk_connection_on_shutdown(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the shutdown state.
Definition trunk.c:3419
struct trunk_watch_entry_s trunk_watch_entry_t
An entry in a trunk watch function list.
fr_dlist_head_t reapable
Idle request.
Definition trunk.c:159
fr_heap_t * pending
Requests waiting to be sent.
Definition trunk.c:153
trunk_conf_t conf
Trunk common configuration.
Definition trunk.c:224
static size_t trunk_connection_states_len
Definition trunk.c:427
#define REQUEST_EXTRACT_REAPABLE(_treq)
Remove the current request from the reapable list.
Definition trunk.c:767
trunk_connection_t * tconn
The request was associated with.
Definition trunk.c:82
void trunk_connection_callback_readable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O read function.
Definition trunk.c:4035
fr_rate_limit_t limit_last_failure_log
Rate limit on "Refusing to enqueue requests - No active conns".
Definition trunk.c:298
void trunk_verify(char const *file, int line, trunk_t *trunk)
Verify a trunk.
Definition trunk.c:5077
fr_timer_t * manage_ev
Periodic connection management event.
Definition trunk.c:290
#define IN_HANDLER(_trunk)
Definition trunk.c:722
static fr_table_num_ordered_t const trunk_connection_states[]
Definition trunk.c:415
void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
Force the trunk to re-establish its connections.
Definition trunk.c:4747
void trunk_connection_callback_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O write function.
Definition trunk.c:4052
void * uctx
User data to pass to the function.
Definition trunk.c:191
static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new)
Transition a request to the pending state, adding it to the backlog of an active connection.
Definition trunk.c:1170
static void trunk_request_remove_from_conn(trunk_request_t *treq)
Remove a request from all connection lists.
Definition trunk.c:979
fr_rate_limit_t limit_max_requests_alloc_log
Rate limit on "Refusing to alloc requests - Limit of * requests reached".
Definition trunk.c:296
trunk_request_state_t to
What state we transitioned to.
Definition trunk.c:80
static int8_t _trunk_request_prioritise(void const *a, void const *b)
Compare two protocol requests.
Definition trunk.c:955
static void trunk_manage(trunk_t *trunk, fr_time_t now)
Implements the algorithm we use to manage requests per connection levels.
Definition trunk.c:4184
static int _trunk_connection_free(trunk_connection_t *tconn)
Free a connection.
Definition trunk.c:3701
trunk_io_funcs_t funcs
I/O functions.
Definition trunk.c:276
fr_dlist_head_t draining
Connections that will be freed once all their requests are complete, but can be reactivated.
Definition trunk.c:261
#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq)
Remove the current request from the cancel_partial slot.
Definition trunk.c:777
int trunk_trigger_cf_parse(TALLOC_CTX *ctx, void *out, void *parent, CONF_ITEM *ci, conf_parser_t const *rule)
Check for a module trigger section when parsing the triggers option.
Definition trunk.c:5045
int trunk_start(trunk_t *trunk)
Start the trunk running.
Definition trunk.c:4786
void trunk_request_signal_partial(trunk_request_t *treq)
Signal a partial write.
Definition trunk.c:2047
void trunk_request_signal_fail(trunk_request_t *treq)
Signal that a trunk request failed.
Definition trunk.c:2150
#define TREQ_OPTION_SEARCH(_option)
void trunk_request_signal_cancel_sent(trunk_request_t *treq)
Signal that a remote server has been notified of the cancellation.
Definition trunk.c:2278
static void trunk_connection_enter_draining_to_free(trunk_connection_t *tconn)
Transition a connection to the draining-to-reconnect state.
Definition trunk.c:3233
trunk_watch_t func
Function to call when a trunk enters.
Definition trunk.c:187
void trunk_connection_signal_readable(trunk_connection_t *tconn)
Signal that a trunk connection is readable.
Definition trunk.c:3942
#define DO_REQUEST_FREE(_treq)
Call the free callback (if set)
Definition trunk.c:614
trunk_request_t * trunk_request_alloc(trunk_t *trunk, request_t *request)
(Pre-)Allocate a new trunk request
Definition trunk.c:2494
static void _trunk_connection_on_halted(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the halted state.
Definition trunk.c:3653
#define REQUEST_EXTRACT_BACKLOG(_treq)
Remove the current request from the backlog.
Definition trunk.c:733
fr_heap_index_t heap_id
Used to track the connection in the connected heap.
Definition trunk.c:138
fr_dlist_head_t closed
Connections that have closed.
Definition trunk.c:258
fr_dlist_head_t watch[TRUNK_STATE_MAX]
To be called when trunk changes state.
Definition trunk.c:282
static void trunk_watch_call(trunk_t *trunk, fr_dlist_head_t *list, trunk_state_t state)
Call a list of watch functions associated with a state.
Definition trunk.c:805
static void trunk_request_enter_cancel_complete(trunk_request_t *treq)
Cancellation was acked, the request is complete, free it.
Definition trunk.c:1522
int line
Line change occurred on.
Definition trunk.c:92
static void trunk_connection_enter_inactive_draining(trunk_connection_t *tconn)
Transition a connection to the inactive-draining state.
Definition trunk.c:3171
#define CONN_STATE_TRANSITION(_new, _log)
Definition trunk.c:457
static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out, trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify)
Update timestamps for when we last had a transition from above target to below target or vice versa.
Definition trunk.c:4593
static size_t trunk_connection_events_len
Definition trunk.c:443
static void _trunk_connection_on_failed(connection_t *conn, connection_state_t prev, connection_state_t state, void *uctx)
Connection failed.
Definition trunk.c:3601
bool oneshot
Remove the function after it's called once.
Definition trunk.c:189
bool started
Has the trunk been started.
Definition trunk.c:307
static size_t trunk_states_len
Definition trunk.c:413
#define TCONN_DLIST_VERIFY(_dlist, _state)
#define IO_FUNC_VERIFY(_func)
uint32_t trunk_request_count_by_connection(trunk_connection_t const *tconn, int req_state)
Return the count number of requests associated with a trunk connection.
Definition trunk.c:2900
uint64_t last_req_per_conn
The last request to connection ratio we calculated.
Definition trunk.c:312
#define DO_REQUEST_COMPLETE(_treq)
Call the complete callback (if set)
Definition trunk.c:575
static void trunk_connection_auto_full(trunk_connection_t *tconn)
Automatically mark a connection as full.
Definition trunk.c:2919
static void trunk_connection_remove(trunk_connection_t *tconn)
Remove a trunk connection from whichever list it's currently in.
Definition trunk.c:3070
#define TRUNK_REQUEST_STATE_LOG_MAX
The maximum number of state logs to record per request.
Definition trunk.c:71
static void trunk_connection_writable(trunk_connection_t *tconn)
A connection is writable.
Definition trunk.c:2983
#define OVER_MAX_CHECK
trunk_connection_event_t events
The current events we expect to be notified on.
Definition trunk.c:147
trunk_watch_entry_t * trunk_add_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch, bool oneshot, void const *uctx)
Add a watch entry to the trunk state list.
Definition trunk.c:881
static int _trunk_free(trunk_t *trunk)
Free a trunk, gracefully closing all connections.
Definition trunk.c:4884
fr_dlist_head_t failed
Connections that'll be reconnected shortly.
Definition trunk.c:256
static void trunk_rebalance(trunk_t *trunk)
Rebalance connections across active trunk members when a new connection becomes active.
Definition trunk.c:4120
static void trunk_backlog_drain(trunk_t *trunk)
Drain the backlog of as many requests as possible.
Definition trunk.c:4692
#define DO_REQUEST_CANCEL(_treq, _reason)
Call the cancel callback if set.
Definition trunk.c:536
static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
Order connections by queue depth.
Definition trunk.c:4867
struct trunk_request_pub_s pub
Public fields in the trunk request.
Definition trunk.c:100
#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state)
trunk_request_t * cancel_partial
Partially written cancellation request.
Definition trunk.c:163
#define TCONN_MINMAX_HEAP_SEARCH(_heap)
uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Move requests off of a connection and requeue elsewhere.
Definition trunk.c:2028
bool enabled
Whether the watch entry is enabled.
Definition trunk.c:190
fr_time_t last_freed
Last time this request was freed.
Definition trunk.c:113
#define DO_REQUEST_CONN_RELEASE(_treq)
Call the "conn_release" callback (if set)
Definition trunk.c:557
#define TREQ_DLIST_SEARCH(_dlist)
#define REQUEST_EXTRACT_CANCEL(_treq)
Remove the current request from the cancel list.
Definition trunk.c:772
static bool trunk_connection_is_full(trunk_connection_t *tconn)
Return whether a trunk connection should currently be considered full.
Definition trunk.c:2942
struct trunk_pub_s pub
Public fields in the trunk connection.
Definition trunk.c:216
trunk_cancel_reason_t cancel_reason
Why this request was cancelled.
Definition trunk.c:111
#define REQUEST_BAD_STATE_TRANSITION(_new)
Definition trunk.c:502
trunk_enqueue_t trunk_request_enqueue_on_conn(trunk_request_t **treq_out, trunk_connection_t *tconn, request_t *request, void *preq, void *rctx, bool ignore_limits)
Enqueue additional requests on a specific connection.
Definition trunk.c:2761
static void _trunk_connection_on_closed(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection failed after it was connected.
Definition trunk.c:3530
static fr_table_num_ordered_t const trunk_connection_events[]
Definition trunk.c:437
trunk_enqueue_t trunk_request_enqueue(trunk_request_t **treq_out, trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
Definition trunk.c:2607
#define TCONN_DLIST_SEARCH(_dlist)
static void trunk_request_enter_unassigned(trunk_request_t *treq)
Transition a request to the unassigned state, in preparation for re-assignment.
Definition trunk.c:1073
struct trunk_request_s trunk_request_t
Definition trunk.c:33
void * in_handler
Which handler we're inside.
Definition trunk.c:278
bool freeing
Trunk is being freed, don't spawn new connections or re-enqueue.
Definition trunk.c:304
static fr_table_num_ordered_t const trunk_states[]
Definition trunk.c:408
static void trunk_connection_readable(trunk_connection_t *tconn)
A connection is readable.
Definition trunk.c:2973
#define IS_SERVICEABLE(_tconn)
Definition trunk.c:727
trunk_enqueue_t trunk_request_requeue(trunk_request_t *treq)
Re-enqueue a request on the same connection.
Definition trunk.c:2696
#define IS_PROCESSING(_tconn)
Definition trunk.c:728
#define RECONNECT_BY_STATE(_state, _list)
static void trunk_connection_enter_draining(trunk_connection_t *tconn)
Transition a connection to the draining state.
Definition trunk.c:3201
static fr_table_num_indexed_bit_pos_t const trunk_req_trigger_names[]
Map request states to trigger names.
Definition trunk.c:373
fr_dlist_t entry
Used to track the trunk request in the conn->sent or trunk->backlog request.
Definition trunk.c:108
static void trunk_connection_close_if_empty(trunk_t *trunk, fr_dlist_head_t *head)
Close connections in a particular connection list if they have no requests associated with them.
Definition trunk.c:4078
void trunk_request_signal_cancel_complete(trunk_request_t *treq)
Signal that a remote server acked our cancellation.
Definition trunk.c:2302
static trunk_enqueue_t trunk_request_check_enqueue(trunk_connection_t **tconn_out, trunk_t *trunk, request_t *request)
Check to see if a trunk request can be enqueued.
Definition trunk.c:1620
#define DO_REQUEST_MUX(_tconn)
Write one or more requests to a connection.
Definition trunk.c:632
#define REQUEST_EXTRACT_PARTIAL(_treq)
Remove the current request from the partial slot.
Definition trunk.c:753
fr_dlist_head_t sent
Sent request.
Definition trunk.c:157
static void trunk_request_enter_partial(trunk_request_t *treq)
Transition a request to the partial state, indicating that is has been partially sent.
Definition trunk.c:1241
fr_timer_t * lifetime_ev
Maximum time this connection can be open.
Definition trunk.c:178
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
Definition trunk.c:3904
fr_dlist_head_t connecting
Connections which are not yet in the open state.
Definition trunk.c:242
#define TRUNK_STATE_TRANSITION(_new)
Definition trunk.c:901
void trunk_request_signal_cancel(trunk_request_t *treq)
Cancel a trunk request.
Definition trunk.c:2170
void trunk_request_state_log_entry_add(char const *function, int line, trunk_request_t *treq, trunk_request_state_t new)
Definition trunk.c:2818
static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now)
Attempt to spawn a new connection.
Definition trunk.c:3769
int trunk_del_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch)
Remove a watch function from a trunk state list.
Definition trunk.c:847
static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx)
Event to periodically call the connection management function.
Definition trunk.c:4522
struct trunk_connection_pub_s pub
Public fields in the trunk connection.
Definition trunk.c:134
static void trunk_request_enter_reapable(trunk_request_t *treq)
Transition a request to the reapable state, indicating that it's been sent in its entirety,...
Definition trunk.c:1328
uint16_t trunk_connection_count_by_state(trunk_t *trunk, int conn_state)
Return the count number of connections in the specified states.
Definition trunk.c:2876
#define IN_REQUEST_DEMUX(_trunk)
Definition trunk.c:724
#define DO_REQUEST_FAIL(_treq, _prev_state)
Call the fail callback (if set)
Definition trunk.c:594
static void trunk_request_enter_cancel(trunk_request_t *treq, trunk_cancel_reason_t reason)
Transition a request to the cancel state, placing it in a connection's cancellation list.
Definition trunk.c:1391
static trunk_enqueue_t trunk_request_enqueue_existing(trunk_request_t *treq)
Enqueue a request which has never been assigned to a connection or was previously cancelled.
Definition trunk.c:1698
bool managing_connections
Whether the trunk is allowed to manage (open/close) connections.
Definition trunk.c:309
#define DO_CONNECTION_ALLOC(_tconn)
Allocate a new connection.
Definition trunk.c:683
char const * function
State change occurred in.
Definition trunk.c:91
static size_t trunk_request_states_len
Definition trunk.c:406
fr_dlist_head_t init
Connections which have not yet started connecting.
Definition trunk.c:239
fr_dlist_head_t * log_head
To allow the log entry to remove itself on free.
Definition trunk.c:77
static void trunk_request_enter_cancel_partial(trunk_request_t *treq)
Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot.
Definition trunk.c:1442
static void _trunk_connection_on_connected(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connected state.
Definition trunk.c:3471
trunk_t * trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, trunk_io_funcs_t const *funcs, trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start, fr_pair_list_t *trigger_args)
Allocate a new collection of connections.
Definition trunk.c:4969
fr_dlist_head_t to_free
Connections we're done with and will free on the next call to trunk_manage.
Definition trunk.c:267
trunk_request_t * partial
Partially written request.
Definition trunk.c:155
static void trunk_request_enter_failed(trunk_request_t *treq)
Request failed, inform the API client and free the request.
Definition trunk.c:1584
fr_minmax_heap_t * active
Connections which can service requests.
Definition trunk.c:244
conf_parser_t const trunk_config[]
Config parser definitions to populate a trunk_conf_t.
Definition trunk.c:341
static void trunk_request_enter_complete(trunk_request_t *treq)
Request completed successfully, inform the API client and free the request.
Definition trunk.c:1553
static void trunk_request_enter_sent(trunk_request_t *treq)
Transition a request to the sent state, indicating that it's been sent in its entirety.
Definition trunk.c:1271
#define DO_REQUEST_CANCEL_MUX(_tconn)
Write one or more cancellation requests to a connection.
Definition trunk.c:665
static void trunk_connection_enter_full(trunk_connection_t *tconn)
Transition a connection to the full state.
Definition trunk.c:3126
void trunk_request_free(trunk_request_t **treq_to_free)
If the trunk request is freed then update the target requests.
Definition trunk.c:2340
#define DO_REQUEST_DEMUX(_tconn)
Read one or more requests from a connection.
Definition trunk.c:649
static uint64_t trunk_connection_requests_dequeue(fr_dlist_head_t *out, trunk_connection_t *tconn, int states, uint64_t max)
Shift requests in the specified states onto new connections.
Definition trunk.c:1755
static int _trunk_request_free(trunk_request_t *treq)
Actually free the trunk request.
Definition trunk.c:2461
char const * log_prefix
What to prepend to messages.
Definition trunk.c:220
#define REQUEST_EXTRACT_PENDING(_treq)
Remove the current request from the pending list.
Definition trunk.c:743
static void _trunk_connection_lifetime_expire(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t now, void *uctx)
Trigger a reconnection of the trunk connection.
Definition trunk.c:3453
static void trunk_connection_event_update(trunk_connection_t *tconn)
Update the registrations for I/O events we're interested in.
Definition trunk.c:3006
static conf_parser_t const trunk_config_request[]
Definition trunk.c:324
fr_dlist_head_t full
Connections which have too many outstanding requests.
Definition trunk.c:246
#define DEQUEUE_ALL(_src_list, _state)
static void trunk_request_enter_backlog(trunk_request_t *treq, bool new)
Transition a request to the backlog state, adding it to the backlog of the trunk.
Definition trunk.c:1108
static fr_table_num_ordered_t const trunk_request_states[]
Definition trunk.c:391
static void _trunk_connection_on_connecting(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connecting state.
Definition trunk.c:3375
static fr_table_num_indexed_bit_pos_t const trunk_conn_trigger_names[]
Map connection states to trigger names.
Definition trunk.c:198
fr_dlist_head_t draining_to_free
Connections that will be freed once all their requests are complete.
Definition trunk.c:264
uint64_t id
Trunk request ID.
Definition trunk.c:104
uint64_t sent_count
The number of requests that have been sent using this connection.
Definition trunk.c:171
static void _trunk_connection_on_init(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the the init state.
Definition trunk.c:3340
#define DO_CONNECTION_NOTIFY(_tconn, _events)
Change what events the connection should be notified about.
Definition trunk.c:705
#define TREQ_DLIST_VERIFY(_dlist, _state)
fr_dlist_head_t inactive
Connections which have been signalled to be inactive by the API client.
Definition trunk.c:249
bool trigger_undef[NUM_ELEMENTS(trunk_conn_trigger_names)]
Record that a specific trigger is undefined.
Definition trunk.c:317
void trunk_connection_manage_stop(trunk_t *trunk)
Stop the trunk from opening and closing connections in response to load.
Definition trunk.c:4841
#define TREQ_HEAP_VERIFY(_heap, _state)
void trunk_connection_signal_active(trunk_connection_t *tconn)
Signal a trunk connection is no longer full.
Definition trunk.c:3981
fr_dlist_head_t log
State change log.
Definition trunk.c:123
uint64_t tconn_id
If the treq was associated with a connection the connection ID.
Definition trunk.c:85
fr_dlist_t entry
Used to track the connection in the connecting, full and failed lists.
Definition trunk.c:141
static void trunk_request_enter_cancel_sent(trunk_request_t *treq)
Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list.
Definition trunk.c:1477
static void trunk_connection_enter_inactive(trunk_connection_t *tconn)
Transition a connection to the inactive state.
Definition trunk.c:3148
trunk_request_state_t from
What state we transitioned from.
Definition trunk.c:79
fr_pair_list_t * trigger_args
Passed to trigger.
Definition trunk.c:315
fr_dlist_head_t cancel_sent
Sent cancellation request.
Definition trunk.c:165
void trunk_connection_manage_start(trunk_t *trunk)
Allow the trunk to open and close connections in response to load.
Definition trunk.c:4830
fr_dlist_head_t inactive_draining
Connections which have been signalled to be inactive by the API client, which the trunk manager is dr...
Definition trunk.c:252
void trunk_connection_signal_inactive(trunk_connection_t *tconn)
Signal a trunk connection cannot accept more requests.
Definition trunk.c:3958
static int _state_log_entry_free(trunk_request_state_log_t *slog)
Used for sanity checks to ensure all log entries have been freed.
Definition trunk.c:2811
void trunk_connection_verify(char const *file, int line, trunk_connection_t *tconn)
Definition trunk.c:5138
fr_heap_t * backlog
The request backlog.
Definition trunk.c:229
#define IN_REQUEST_CANCEL_MUX(_trunk)
Definition trunk.c:725
void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
Definition trunk.c:5198
uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
Return a count of requests on a connection in a specific state.
Definition trunk.c:4544
void trunk_request_signal_cancel_partial(trunk_request_t *treq)
Signal a partial cancel write.
Definition trunk.c:2254
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
Definition trunk.c:2068
#define COUNT_BY_STATE(_state, _list)
void * uctx
Uctx data to pass to alloc.
Definition trunk.c:280
#define TREQ_OPTION_VERIFY(_option, _state)
bool trunk_connection_search(trunk_connection_t *tconn, void *ptr)
Definition trunk.c:5254
#define CONN_BAD_STATE_TRANSITION(_new)
Definition trunk.c:468
fr_heap_index_t heap_id
Used to track the request conn->pending heap.
Definition trunk.c:106
#define REQUEST_STATE_TRANSITION(_new)
Record a request state transition and log appropriate output.
Definition trunk.c:491
trunk_watch_entry_t * next_watcher
Watcher about to be run. Used to prevent nested watchers.
Definition trunk.c:284
static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Remove requests in specified states from a connection, attempting to distribute them to new connectio...
Definition trunk.c:1866
bool sent
Trunk request has been sent at least once.
Definition trunk.c:118
void trunk_request_signal_complete(trunk_request_t *treq)
Signal that a trunk request is complete.
Definition trunk.c:2112
static void trunk_connection_auto_unfull(trunk_connection_t *tconn)
Automatically mark a connection as active or reconnect it.
Definition trunk.c:2960
void trunk_connection_signal_reconnect(trunk_connection_t *tconn, connection_reason_t reason)
Signal a trunk connection is no longer viable.
Definition trunk.c:4020
void trunk_connection_signal_writable(trunk_connection_t *tconn)
Signal that a trunk connection is writable.
Definition trunk.c:3924
bool trunk_request_search(trunk_request_t *treq, void *ptr)
Definition trunk.c:5312
fr_dlist_t entry
List entry.
Definition trunk.c:186
static conf_parser_t const trunk_config_connection[]
Definition trunk.c:333
trunk_connection_state_t tconn_state
If the treq was associated with a connection the connection state at the time of the state transition...
Definition trunk.c:87
bool bound_to_conn
Fail the request if there's an attempt to re-enqueue it.
Definition trunk.c:115
static size_t trunk_cancellation_reasons_len
Definition trunk.c:435
static fr_table_num_ordered_t const trunk_cancellation_reasons[]
Definition trunk.c:429
static size_t trunk_conn_trigger_names_len
Definition trunk.c:210
fr_event_list_t * el
Event list used by this trunk and the connection.
Definition trunk.c:222
void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line, trunk_request_t const *treq)
Definition trunk.c:2849
#define IN_REQUEST_MUX(_trunk)
Definition trunk.c:723
fr_dlist_head_t free_requests
Requests in the unassigned state.
Definition trunk.c:226
bool trunk_connection_in_state(trunk_connection_t *tconn, int state)
Returns true if the trunk connection is in one of the specified states.
Definition trunk.c:4068
#define TREQ_HEAP_SEARCH(_heap)
#define REQUEST_EXTRACT_CANCEL_SENT(_treq)
Remove the current request from the cancel sent list.
Definition trunk.c:786
fr_dlist_t entry
Entry in the linked list.
Definition trunk.c:78
void trunk_request_signal_reapable(trunk_request_t *treq)
Signal that the request was written to a connection successfully, but no response is expected.
Definition trunk.c:2090
Associates request queues with a connection.
Definition trunk.c:133
Wraps a normal request.
Definition trunk.c:99
Trace state machine changes for a particular request.
Definition trunk.c:76
Main trunk management handle.
Definition trunk.c:215
An entry in a trunk watch function list.
Definition trunk.c:185
uint16_t max
Maximum number of connections in the trunk.
Definition trunk.h:232
uint32_t max_req_per_conn
Maximum requests per connection.
Definition trunk.h:241
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
Definition trunk.h:321
trunk_t *_CONST trunk
Trunk this request belongs to.
Definition trunk.h:352
bool backlog_on_failed_conn
Assign requests to the backlog when there are no available connections and the last connection event ...
Definition trunk.h:282
uint16_t min
Shouldn't let connections drop below this number.
Definition trunk.h:230
#define TRUNK_REQUEST_STATE_ALL
All request states.
Definition trunk.h:196
void *_CONST rctx
Resume ctx of the module.
Definition trunk.h:358
trunk_t *_CONST trunk
Trunk this connection belongs to.
Definition trunk.h:380
fr_heap_cmp_t connection_prioritise
Ordering function for connections.
Definition trunk.h:742
trunk_connection_state_t
Used for sanity checks and to track which list the connection is in.
Definition trunk.h:87
@ TRUNK_CONN_FULL
Connection is full and can't accept any more requests.
Definition trunk.h:95
@ TRUNK_CONN_CONNECTING
Connection is connecting.
Definition trunk.h:90
@ TRUNK_CONN_DRAINING
Connection will be closed once it has no more outstanding requests, if it's not reactivated.
Definition trunk.h:101
@ TRUNK_CONN_INACTIVE_DRAINING
Connection is inactive, can't accept any more requests, and will be closed once it has no more outsta...
Definition trunk.h:97
@ TRUNK_CONN_INACTIVE
Connection is inactive and can't accept any more requests.
Definition trunk.h:96
@ TRUNK_CONN_HALTED
Halted, ready to be freed.
Definition trunk.h:88
@ TRUNK_CONN_CLOSED
Connection was closed, either explicitly or due to failure.
Definition trunk.h:94
@ TRUNK_CONN_INIT
In the initial state.
Definition trunk.h:89
@ TRUNK_CONN_DRAINING_TO_FREE
Connection will be closed once it has no more outstanding requests.
Definition trunk.h:103
@ TRUNK_CONN_ACTIVE
Connection is connected and ready to service requests.
Definition trunk.h:91
unsigned req_pool_headers
How many chunk headers the talloc pool allocated with the treq should contain.
Definition trunk.h:267
request_t *_CONST request
The request that we're writing the data on behalf of.
Definition trunk.h:360
fr_time_t _CONST last_open
Last time the connection management function opened a connection.
Definition trunk.h:311
fr_time_delta_t idle_timeout
how long a connection can remain idle for
Definition trunk.h:251
trunk_connection_state_t _CONST state
What state the connection is in.
Definition trunk.h:372
size_t req_pool_size
The size of the talloc pool allocated with the treq.
Definition trunk.h:270
uint64_t max_uses
The maximum time a connection can be used.
Definition trunk.h:247
fr_time_delta_t lifetime
Time between reconnects.
Definition trunk.h:249
uint16_t connecting
Maximum number of connections that can be in the connecting state.
Definition trunk.h:234
uint64_t _CONST req_alloc_reused
How many requests were reused.
Definition trunk.h:335
uint32_t max_backlog
Maximum number of requests that can be in the backlog.
Definition trunk.h:245
fr_time_t _CONST last_failed
Last time a connection failed.
Definition trunk.h:319
trunk_request_state_t _CONST state
Which list the request is now located in.
Definition trunk.h:350
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
Definition trunk.h:376
trunk_connection_t *_CONST tconn
Connection this request belongs to.
Definition trunk.h:354
trunk_connection_alloc_t connection_alloc
Allocate a new connection_t.
Definition trunk.h:738
fr_time_t _CONST last_read_success
Last time we read a response.
Definition trunk.h:323
fr_time_t _CONST last_below_target
Last time average utilisation went below the target value.
Definition trunk.h:308
fr_time_t _CONST last_read_success
Last time we read from the connection.
Definition trunk.h:378
fr_time_delta_t close_delay
How long we must be below target utilisation to close an existing connection.
Definition trunk.h:256
uint16_t start
How many connections to start.
Definition trunk.h:228
fr_time_delta_t req_cleanup_delay
How long must a request in the unassigned (free) list not have been used for before it's cleaned up a...
Definition trunk.h:260
#define TRUNK_REQUEST_STATE_CANCEL_ALL
All requests in various cancellation states.
Definition trunk.h:214
bool always_writable
Set to true if our ability to write requests to a connection handle is not dependent on the state of ...
Definition trunk.h:272
trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
Definition trunk.h:72
@ TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
Definition trunk.h:79
@ TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
Definition trunk.h:77
@ TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
Definition trunk.h:73
@ TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
Definition trunk.h:75
#define TRUNK_CONN_ALL
All connection states.
Definition trunk.h:111
fr_heap_cmp_t request_prioritise
Ordering function for requests.
Definition trunk.h:744
uint64_t _CONST req_alloc
The number of requests currently allocated that have not been freed or returned to the free list.
Definition trunk.h:329
trunk_cancel_reason_t
Reasons for a request being cancelled.
Definition trunk.h:55
@ TRUNK_CANCEL_REASON_NONE
Request has not been cancelled.
Definition trunk.h:56
@ TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
Definition trunk.h:57
@ TRUNK_CANCEL_REASON_REQUEUE
A previously sent request is being requeued.
Definition trunk.h:59
@ TRUNK_CANCEL_REASON_MOVE
Request cancelled because it's being moved.
Definition trunk.h:58
uint64_t _CONST req_alloc_new
How many requests we've allocated.
Definition trunk.h:333
fr_time_delta_t open_delay
How long we must be above target utilisation to spawn a new connection.
Definition trunk.h:253
connection_t *_CONST conn
The underlying connection.
Definition trunk.h:374
trunk_state_t
Definition trunk.h:62
@ TRUNK_STATE_MAX
Definition trunk.h:66
@ TRUNK_STATE_PENDING
Trunk has connections, but none are active.
Definition trunk.h:65
@ TRUNK_STATE_ACTIVE
Trunk has active connections.
Definition trunk.h:64
@ TRUNK_STATE_IDLE
Trunk has no connections.
Definition trunk.h:63
fr_time_t _CONST last_closed
Last time the connection management function closed a connection.
Definition trunk.h:314
void(* trunk_watch_t)(trunk_t *trunk, trunk_state_t prev, trunk_state_t state, void *uctx)
Receive a notification when a trunk enters a particular state.
Definition trunk.h:729
fr_time_delta_t manage_interval
How often we run the management algorithm to open/close connections.
Definition trunk.h:264
trunk_enqueue_t
Definition trunk.h:149
@ TRUNK_ENQUEUE_DST_UNAVAILABLE
Destination is down.
Definition trunk.h:154
@ TRUNK_ENQUEUE_FAIL
General failure.
Definition trunk.h:155
@ TRUNK_ENQUEUE_OK
Operation was successful.
Definition trunk.h:151
@ TRUNK_ENQUEUE_NO_CAPACITY
At maximum number of connections, and no connection has capacity.
Definition trunk.h:152
@ TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
Definition trunk.h:150
void *_CONST preq
Data for the muxer to write to the connection.
Definition trunk.h:356
uint32_t target_req_per_conn
How many pending requests should ideally be running on each connection.
Definition trunk.h:237
fr_time_t _CONST last_connected
Last time a connection connected.
Definition trunk.h:317
trunk_request_cancel_mux_t request_cancel_mux
!< Read one or more requests from a connection.
Definition trunk.h:751
trunk_request_state_t
Used for sanity checks and to simplify freeing.
Definition trunk.h:162
@ TRUNK_REQUEST_STATE_PARTIAL
Some of the request was written to the socket, more of it should be written later.
Definition trunk.h:171
@ TRUNK_REQUEST_STATE_REAPABLE
Request has been written, needs to persist, but we are not currently waiting for any response.
Definition trunk.h:174
@ TRUNK_REQUEST_STATE_UNASSIGNED
Transition state - Request currently not assigned to any connection.
Definition trunk.h:166
@ TRUNK_REQUEST_STATE_INIT
Initial state.
Definition trunk.h:163
@ TRUNK_REQUEST_STATE_CANCEL_SENT
We've informed the remote server that the request has been cancelled.
Definition trunk.h:186
@ TRUNK_REQUEST_STATE_COMPLETE
The request is complete.
Definition trunk.h:183
@ TRUNK_REQUEST_STATE_FAILED
The request failed.
Definition trunk.h:184
@ TRUNK_REQUEST_STATE_CANCEL
A request on a particular socket was cancel.
Definition trunk.h:185
@ TRUNK_REQUEST_STATE_CANCEL_PARTIAL
We partially wrote a cancellation request.
Definition trunk.h:188
@ TRUNK_REQUEST_STATE_BACKLOG
In the backlog.
Definition trunk.h:168
@ TRUNK_REQUEST_STATE_CANCEL_COMPLETE
Remote server has acknowledged our cancellation.
Definition trunk.h:189
@ TRUNK_REQUEST_STATE_PENDING
In the queue of a connection and is pending writing.
Definition trunk.h:169
@ TRUNK_REQUEST_STATE_SENT
Was written to a socket. Waiting for a response.
Definition trunk.h:173
trunk_state_t _CONST state
Current state of the trunk.
Definition trunk.h:338
fr_time_t _CONST last_above_target
Last time average utilisation went above the target value.
Definition trunk.h:305
Common configuration parameters for a trunk.
Definition trunk.h:225
Public fields for the trunk connection.
Definition trunk.h:371
I/O functions to pass to trunk_alloc.
Definition trunk.h:737
Public fields for the trunk.
Definition trunk.h:301
Public fields for the trunk request.
Definition trunk.h:349
static fr_event_list_t * el
static fr_slen_t head
Definition xlat.h:420
static fr_slen_t parent
Definition pair.h:858
char const * fr_strerror(void)
Get the last library error.
Definition strerror.c:553
#define fr_box_time_delta(_val)
Definition value.h:366
int nonnull(2, 5))
static size_t char ** out
Definition value.h:1030