The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
trunk.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 45f84ae59177579e7b3eb57e354237437a336ca5 $
19 *
20 * @file src/lib/server/trunk.c
21 * @brief A management API for bonding multiple connections together.
22 *
23 * @copyright 2019-2020 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
24 * @copyright 2019-2020 The FreeRADIUS server project
25 */
26
27#define LOG_PREFIX trunk->log_prefix
28
29#ifdef NDEBUG
30# define TALLOC_GET_TYPE_ABORT_NOOP 1
31#endif
32
35typedef struct trunk_s trunk_t;
36#define _TRUNK_PRIVATE 1
37#include <freeradius-devel/server/trunk.h>
38
39#include <freeradius-devel/server/connection.h>
40#include <freeradius-devel/server/trigger.h>
41#include <freeradius-devel/util/misc.h>
42#include <freeradius-devel/util/syserror.h>
43#include <freeradius-devel/util/table.h>
44#include <freeradius-devel/util/minmax_heap.h>
45
46#ifdef HAVE_STDATOMIC_H
47# include <stdatomic.h>
48# ifndef ATOMIC_VAR_INIT
49# define ATOMIC_VAR_INIT(_x) (_x)
50# endif
51#else
52# include <freeradius-devel/util/stdatomic.h>
53#endif
54
55static atomic_uint_fast64_t request_counter = ATOMIC_VAR_INIT(1);
56
57#ifdef TESTING_TRUNK
59
60static fr_time_t test_time(void)
61{
62 return test_time_base;
63}
64
65#define fr_time test_time
66#endif
67
68#ifndef NDEBUG
69/** The maximum number of state logs to record per request
70 *
71 */
72#define TRUNK_REQUEST_STATE_LOG_MAX 20
73
74/** Trace state machine changes for a particular request
75 *
76 */
77typedef struct {
78 fr_dlist_head_t *log_head; //!< To allow the log entry to remove itself on free.
79 fr_dlist_t entry; //!< Entry in the linked list.
80 trunk_request_state_t from; //!< What state we transitioned from.
81 trunk_request_state_t to; //!< What state we transitioned to.
82
83 trunk_connection_t *tconn; //!< The request was associated with.
84 ///< Pointer may now be invalid, do no de-reference.
85
86 uint64_t tconn_id; //!< If the treq was associated with a connection
87 ///< the connection ID.
88 trunk_connection_state_t tconn_state; //!< If the treq was associated with a connection
89 ///< the connection state at the time of the
90 ///< state transition.
91
92 char const *function; //!< State change occurred in.
93 int line; //!< Line change occurred on.
95#endif
96
97/** Wraps a normal request
98 *
99 */
101 struct trunk_request_pub_s pub; //!< Public fields in the trunk request.
102 ///< This *MUST* be the first field in this
103 ///< structure.
104
105 uint64_t id; //!< Trunk request ID.
106
107 fr_heap_index_t heap_id; //!< Used to track the request conn->pending heap.
108
109 fr_dlist_t entry; //!< Used to track the trunk request in the conn->sent
110 ///< or trunk->backlog request.
111
112 trunk_cancel_reason_t cancel_reason; //!< Why this request was cancelled.
113
114 fr_time_t last_freed; //!< Last time this request was freed.
115
116 bool bound_to_conn; //!< Fail the request if there's an attempt to
117 ///< re-enqueue it.
118
119 bool sent; //!< Trunk request has been sent at least once.
120 ///< Used so that re-queueing doesn't increase trunk
121 ///< `sent` count.
122
123#ifndef NDEBUG
124 fr_dlist_head_t log; //!< State change log.
125#endif
126};
127
128
129/** Associates request queues with a connection
130 *
131 * @dotfile src/lib/server/trunk_conn.gv "Trunk connection state machine"
132 * @dotfile src/lib/server/trunk_req.gv "Trunk request state machine"
133 */
135 struct trunk_connection_pub_s pub; //!< Public fields in the trunk connection.
136 ///< This *MUST* be the first field in this
137 ///< structure.
138
139 fr_heap_index_t heap_id; //!< Used to track the connection in the connected
140 ///< heap.
141
142 fr_dlist_t entry; //!< Used to track the connection in the connecting,
143 ///< full and failed lists.
144
145 /** @name State
146 * @{
147 */
148 trunk_connection_event_t events; //!< The current events we expect to be notified on.
149 /** @} */
150
151 /** @name Request lists
152 * @{
153 */
154 fr_heap_t *pending; //!< Requests waiting to be sent.
155
156 trunk_request_t *partial; //!< Partially written request.
157
158 fr_dlist_head_t sent; //!< Sent request.
159
160 fr_dlist_head_t reapable; //!< Idle request.
161
162 fr_dlist_head_t cancel; //!< Requests in the cancel state.
163
164 trunk_request_t *cancel_partial; //!< Partially written cancellation request.
165
166 fr_dlist_head_t cancel_sent; //!< Sent cancellation request.
167 /** @} */
168
169 /** @name Statistics
170 * @{
171 */
172 uint64_t sent_count; //!< The number of requests that have been sent using
173 ///< this connection.
174 /** @} */
175
176 /** @name Timers
177 * @{
178 */
179 fr_event_timer_t const *lifetime_ev; //!< Maximum time this connection can be open.
180 /** @} */
181};
182
183/** An entry in a trunk watch function list
184 *
185 */
186typedef struct trunk_watch_entry_s {
187 fr_dlist_t entry; //!< List entry.
188 trunk_watch_t func; //!< Function to call when a trunk enters
189 ///< the state this list belongs to
190 bool oneshot; //!< Remove the function after it's called once.
191 bool enabled; //!< Whether the watch entry is enabled.
192 void *uctx; //!< User data to pass to the function.
194
195/** Main trunk management handle
196 *
197 */
198struct trunk_s {
199 struct trunk_pub_s pub; //!< Public fields in the trunk connection.
200 ///< This *MUST* be the first field in this
201 ///< structure.
202
203 char const *log_prefix; //!< What to prepend to messages.
204
205 fr_event_list_t *el; //!< Event list used by this trunk and the connection.
206
207 trunk_conf_t conf; //!< Trunk common configuration.
208
209 fr_dlist_head_t free_requests; //!< Requests in the unassigned state. Waiting to be
210 ///< enqueued.
211
212 fr_heap_t *backlog; //!< The request backlog. Requests we couldn't
213 ///< immediately assign to a connection.
214
215 /** @name Connection lists
216 *
217 * A connection must always be in exactly one of these lists
218 * or trees.
219 *
220 * @{
221 */
222 fr_dlist_head_t init; //!< Connections which have not yet started
223 ///< connecting.
224
225 fr_dlist_head_t connecting; //!< Connections which are not yet in the open state.
226
227 fr_minmax_heap_t *active; //!< Connections which can service requests.
228
229 fr_dlist_head_t full; //!< Connections which have too many outstanding
230 ///< requests.
231
232 fr_dlist_head_t inactive; //!< Connections which have been signalled to be
233 ///< inactive by the API client.
234
235 fr_dlist_head_t inactive_draining; //!< Connections which have been signalled to be
236 ///< inactive by the API client, which the trunk
237 ///< manager is draining to close.
238
239 fr_dlist_head_t failed; //!< Connections that'll be reconnected shortly.
240
241 fr_dlist_head_t closed; //!< Connections that have closed. Either due to
242 ///< shutdown, reconnection or failure.
243
244 fr_dlist_head_t draining; //!< Connections that will be freed once all their
245 ///< requests are complete, but can be reactivated.
246
247 fr_dlist_head_t draining_to_free; //!< Connections that will be freed once all their
248 ///< requests are complete.
249
250 fr_dlist_head_t to_free; //!< Connections we're done with and will free on
251 //!< the next call to trunk_manage.
252 //!< This prevents connections from being freed
253 //!< whilst we're inside callbacks.
254 /** @} */
255
256 /** @name Callbacks
257 * @{
258 */
259 trunk_io_funcs_t funcs; //!< I/O functions.
260
261 void *in_handler; //!< Which handler we're inside.
262
263 void *uctx; //!< Uctx data to pass to alloc.
264
265 fr_dlist_head_t watch[TRUNK_STATE_MAX]; //!< To be called when trunk changes state.
266
267 trunk_watch_entry_t *next_watcher; //!< Watcher about to be run. Used to prevent nested watchers.
268 /** @} */
269
270 /** @name Timers
271 * @{
272 */
273 fr_event_timer_t const *manage_ev; //!< Periodic connection management event.
274 /** @} */
275
276 /** @name Log rate limiting entries
277 * @{
278 */
279 fr_rate_limit_t limit_max_requests_alloc_log; //!< Rate limit on "Refusing to alloc requests - Limit of * requests reached"
280
281 fr_rate_limit_t limit_last_failure_log; //!< Rate limit on "Refusing to enqueue requests - No active conns"
282 /** @} */
283
284 /** @name State
285 * @{
286 */
287 bool freeing; //!< Trunk is being freed, don't spawn new
288 ///< connections or re-enqueue.
289
290 bool started; //!< Has the trunk been started.
291
292 bool managing_connections; //!< Whether the trunk is allowed to manage
293 ///< (open/close) connections.
294
295 uint64_t last_req_per_conn; //!< The last request to connection ratio we calculated.
296 /** @} */
297};
298
300 { FR_CONF_OFFSET("per_connection_max", trunk_conf_t, max_req_per_conn), .dflt = "2000" },
301 { FR_CONF_OFFSET("per_connection_target", trunk_conf_t, target_req_per_conn), .dflt = "1000" },
302 { FR_CONF_OFFSET("free_delay", trunk_conf_t, req_cleanup_delay), .dflt = "10.0" },
303
305};
306
308 { FR_CONF_OFFSET("connect_timeout", connection_conf_t, connection_timeout), .dflt = "3.0" },
309 { FR_CONF_OFFSET("reconnect_delay", connection_conf_t, reconnection_delay), .dflt = "1" },
310
312};
313
314#ifndef TRUNK_TESTS
316 { FR_CONF_OFFSET("start", trunk_conf_t, start), .dflt = "1" },
317 { FR_CONF_OFFSET("min", trunk_conf_t, min), .dflt = "1" },
318 { FR_CONF_OFFSET("max", trunk_conf_t, max), .dflt = "5" },
319 { FR_CONF_OFFSET("connecting", trunk_conf_t, connecting), .dflt = "2" },
320 { FR_CONF_OFFSET("uses", trunk_conf_t, max_uses), .dflt = "0" },
321 { FR_CONF_OFFSET("lifetime", trunk_conf_t, lifetime), .dflt = "0" },
322 { FR_CONF_OFFSET("idle_timeout", trunk_conf_t, idle_timeout), .dflt = "0" },
323
324 { FR_CONF_OFFSET("open_delay", trunk_conf_t, open_delay), .dflt = "0.2" },
325 { FR_CONF_OFFSET("close_delay", trunk_conf_t, close_delay), .dflt = "10.0" },
326
327 { FR_CONF_OFFSET("manage_interval", trunk_conf_t, manage_interval), .dflt = "0.2" },
328
329 { FR_CONF_OFFSET("max_backlog", trunk_conf_t, max_backlog), .dflt = "1000" },
330
331 { FR_CONF_OFFSET_SUBSECTION("connection", 0, trunk_conf_t, conn_conf, trunk_config_connection), .subcs_size = sizeof(trunk_config_connection) },
332 { FR_CONF_POINTER("request", 0, CONF_FLAG_SUBSECTION, NULL), .subcs = (void const *) trunk_config_request },
333
335};
336#endif
337
338#ifndef NDEBUG
339/** Map request states to trigger names
340 *
341 * Must stay in the same order as #trunk_connection_state_t
342 */
344 { L("pool.request_init"), TRUNK_REQUEST_STATE_INIT }, /* 0x0000 - bit 0 */
345 { L("pool.request_unassigned"), TRUNK_REQUEST_STATE_UNASSIGNED }, /* 0x0001 - bit 1 */
346 { L("pool.request_backlog"), TRUNK_REQUEST_STATE_BACKLOG }, /* 0x0002 - bit 2 */
347 { L("pool.request_pending"), TRUNK_REQUEST_STATE_PENDING }, /* 0x0004 - bit 3 */
348 { L("pool.request_partial"), TRUNK_REQUEST_STATE_PARTIAL }, /* 0x0008 - bit 4 */
349 { L("pool.request_sent"), TRUNK_REQUEST_STATE_SENT }, /* 0x0010 - bit 5 */
350 { L("pool.request_state_reapable"), TRUNK_REQUEST_STATE_REAPABLE }, /* 0x0020 - bit 6 */
351 { L("pool.request_complete"), TRUNK_REQUEST_STATE_COMPLETE }, /* 0x0040 - bit 7 */
352 { L("pool.request_state_failed"), TRUNK_REQUEST_STATE_FAILED }, /* 0x0080 - bit 8 */
353 { L("pool.request_state_cancel"), TRUNK_REQUEST_STATE_CANCEL }, /* 0x0100 - bit 9 */
354 { L("pool.request_state_cancel_sent"), TRUNK_REQUEST_STATE_CANCEL_SENT }, /* 0x0200 - bit 10 */
355 { L("pool.request_state_cancel_partial"), TRUNK_REQUEST_STATE_CANCEL_PARTIAL }, /* 0x0400 - bit 11 */
356 { L("pool.request_state_cancel_complete"), TRUNK_REQUEST_STATE_CANCEL_COMPLETE }, /* 0x0800 - bit 12 */
357};
359#endif
360
362 { L("INIT"), TRUNK_REQUEST_STATE_INIT },
363 { L("UNASSIGNED"), TRUNK_REQUEST_STATE_UNASSIGNED },
364 { L("BACKLOG"), TRUNK_REQUEST_STATE_BACKLOG },
365 { L("PENDING"), TRUNK_REQUEST_STATE_PENDING },
366 { L("PARTIAL"), TRUNK_REQUEST_STATE_PARTIAL },
367 { L("SENT"), TRUNK_REQUEST_STATE_SENT },
368 { L("REAPABLE"), TRUNK_REQUEST_STATE_REAPABLE },
369 { L("COMPLETE"), TRUNK_REQUEST_STATE_COMPLETE },
370 { L("FAILED"), TRUNK_REQUEST_STATE_FAILED },
371 { L("CANCEL"), TRUNK_REQUEST_STATE_CANCEL },
372 { L("CANCEL-SENT"), TRUNK_REQUEST_STATE_CANCEL_SENT },
373 { L("CANCEL-PARTIAL"), TRUNK_REQUEST_STATE_CANCEL_PARTIAL },
374 { L("CANCEL-COMPLETE"), TRUNK_REQUEST_STATE_CANCEL_COMPLETE }
375};
377
378/** Map connection states to trigger names
379 *
380 * Must stay in the same order as #trunk_connection_state_t
381 */
383 { L("pool.connection_halted"), TRUNK_CONN_HALTED }, /* 0x0000 - bit 0 */
384 { L("pool.connection_init"), TRUNK_CONN_INIT }, /* 0x0001 - bit 1 */
385 { L("pool.connection_connecting"), TRUNK_CONN_CONNECTING }, /* 0x0002 - bit 2 */
386 { L("pool.connection_active"), TRUNK_CONN_ACTIVE }, /* 0x0004 - bit 3 */
387 { L("pool.connection_closed"), TRUNK_CONN_CLOSED }, /* 0x0008 - bit 4 */
388 { L("pool.connection_full"), TRUNK_CONN_FULL }, /* 0x0010 - bit 5 */
389 { L("pool.connection_inactive"), TRUNK_CONN_INACTIVE }, /* 0x0020 - bit 6 */
390 { L("pool.connection_inactive_draining"), TRUNK_CONN_INACTIVE_DRAINING }, /* 0x0040 - bit 7 */
391 { L("pool.connection_draining"), TRUNK_CONN_DRAINING }, /* 0x0080 - bit 8 */
392 { L("pool.connection_draining_to_free"), TRUNK_CONN_DRAINING_TO_FREE } /* 0x0100 - bit 9 */
393};
395
397 { L("IDLE"), TRUNK_STATE_IDLE },
398 { L("ACTIVE"), TRUNK_STATE_ACTIVE },
399 { L("PENDING"), TRUNK_STATE_PENDING }
400};
402
404 { L("INIT"), TRUNK_CONN_INIT },
405 { L("HALTED"), TRUNK_CONN_HALTED },
406 { L("CONNECTING"), TRUNK_CONN_CONNECTING },
407 { L("ACTIVE"), TRUNK_CONN_ACTIVE },
408 { L("CLOSED"), TRUNK_CONN_CLOSED },
409 { L("FULL"), TRUNK_CONN_FULL },
410 { L("INACTIVE"), TRUNK_CONN_INACTIVE },
411 { L("INACTIVE-DRAINING"), TRUNK_CONN_INACTIVE_DRAINING },
412 { L("DRAINING"), TRUNK_CONN_DRAINING },
413 { L("DRAINING-TO-FREE"), TRUNK_CONN_DRAINING_TO_FREE }
414};
416
418 { L("TRUNK_CANCEL_REASON_NONE"), TRUNK_CANCEL_REASON_NONE },
419 { L("TRUNK_CANCEL_REASON_SIGNAL"), TRUNK_CANCEL_REASON_SIGNAL },
420 { L("TRUNK_CANCEL_REASON_MOVE"), TRUNK_CANCEL_REASON_MOVE },
421 { L("TRUNK_CANCEL_REASON_REQUEUE"), TRUNK_CANCEL_REASON_REQUEUE }
422};
424
426 { L("TRUNK_CONN_EVENT_NONE"), TRUNK_CONN_EVENT_NONE },
427 { L("TRUNK_CONN_EVENT_READ"), TRUNK_CONN_EVENT_READ },
428 { L("TRUNK_CONN_EVENT_WRITE"), TRUNK_CONN_EVENT_WRITE },
429 { L("TRUNK_CONN_EVENT_BOTH"), TRUNK_CONN_EVENT_BOTH },
430};
432
433#define CONN_TRIGGER(_state) do { \
434 if (trunk->pub.triggers) { \
435 trigger_exec(unlang_interpret_get_thread_default(), \
436 NULL, fr_table_str_by_value(trunk_conn_trigger_names, _state, \
437 "<INVALID>"), true, NULL); \
438 } \
439} while (0)
440
441#define CONN_STATE_TRANSITION(_new, _log) \
442do { \
443 _log("[%" PRIu64 "] Trunk connection changed state %s -> %s", \
444 tconn->pub.conn->id, \
445 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
446 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>")); \
447 tconn->pub.state = _new; \
448 CONN_TRIGGER(_new); \
449 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false); \
450} while (0)
451
452#define CONN_BAD_STATE_TRANSITION(_new) \
453do { \
454 if (!fr_cond_assert_msg(0, "[%" PRIu64 "] Trunk connection invalid transition %s -> %s", \
455 tconn->pub.conn->id, \
456 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
457 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>"))) return; \
458} while (0)
459
460#ifndef NDEBUG
461void trunk_request_state_log_entry_add(char const *function, int line,
462 trunk_request_t *treq, trunk_request_state_t new) CC_HINT(nonnull);
463
464#define REQUEST_TRIGGER(_state) do { \
465 if (trunk->pub.triggers) { \
466 trigger_exec(unlang_interpret_get_thread_default(), \
467 NULL, fr_table_str_by_value(trunk_req_trigger_names, _state, \
468 "<INVALID>"), true, NULL); \
469 } \
470} while (0)
471
472/** Record a request state transition and log appropriate output
473 *
474 */
475#define REQUEST_STATE_TRANSITION(_new) \
476do { \
477 request_t *request = treq->pub.request; \
478 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
479 treq->id, \
480 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
481 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
482 trunk_request_state_log_entry_add(__FUNCTION__, __LINE__, treq, _new); \
483 treq->pub.state = _new; \
484 REQUEST_TRIGGER(_new); \
485} while (0)
486#define REQUEST_BAD_STATE_TRANSITION(_new) \
487do { \
488 trunk_request_state_log(&default_log, L_ERR, __FILE__, __LINE__, treq); \
489 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
490 treq->id, \
491 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
492 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
493} while (0)
494#else
495/** Record a request state transition
496 *
497 */
498#define REQUEST_STATE_TRANSITION(_new) \
499do { \
500 request_t *request = treq->pub.request; \
501 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
502 treq->id, \
503 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
504 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
505 treq->pub.state = _new; \
506} while (0)
507#define REQUEST_BAD_STATE_TRANSITION(_new) \
508do { \
509 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
510 treq->id, \
511 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
512 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
513} while (0)
514#endif
515
516
517/** Call the cancel callback if set
518 *
519 */
520#define DO_REQUEST_CANCEL(_treq, _reason) \
521do { \
522 if ((_treq)->pub.trunk->funcs.request_cancel) { \
523 request_t *request = (_treq)->pub.request; \
524 void *_prev = (_treq)->pub.trunk->in_handler; \
525 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_cancel; \
526 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_cancel(conn=%p, preq=%p, reason=%s, uctx=%p)", \
527 (_treq)->pub.tconn->pub.conn, \
528 (_treq)->pub.preq, \
529 fr_table_str_by_value(trunk_cancellation_reasons, \
530 (_reason), \
531 "<INVALID>"), \
532 (_treq)->pub.trunk->uctx); \
533 (_treq)->pub.trunk->funcs.request_cancel((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_reason), (_treq)->pub.trunk->uctx); \
534 (_treq)->pub.trunk->in_handler = _prev; \
535 } \
536} while(0)
537
538/** Call the "conn_release" callback (if set)
539 *
540 */
541#define DO_REQUEST_CONN_RELEASE(_treq) \
542do { \
543 if ((_treq)->pub.trunk->funcs.request_conn_release) { \
544 request_t *request = (_treq)->pub.request; \
545 void *_prev = (_treq)->pub.trunk->in_handler; \
546 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_conn_release; \
547 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_conn_release(conn=%p, preq=%p, uctx=%p)", \
548 (_treq)->pub.tconn->pub.conn, \
549 (_treq)->pub.preq, \
550 (_treq)->pub.trunk->uctx); \
551 (_treq)->pub.trunk->funcs.request_conn_release((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
552 (_treq)->pub.trunk->in_handler = _prev; \
553 } \
554} while(0)
555
556/** Call the complete callback (if set)
557 *
558 */
559#define DO_REQUEST_COMPLETE(_treq) \
560do { \
561 if ((_treq)->pub.trunk->funcs.request_complete) { \
562 request_t *request = (_treq)->pub.request; \
563 void *_prev = (_treq)->pub.trunk->in_handler; \
564 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_complete(request=%p, preq=%p, rctx=%p, uctx=%p)", \
565 (_treq)->pub.request, \
566 (_treq)->pub.preq, \
567 (_treq)->pub.rctx, \
568 (_treq)->pub.trunk->uctx); \
569 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_complete; \
570 (_treq)->pub.trunk->funcs.request_complete((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, (_treq)->pub.trunk->uctx); \
571 (_treq)->pub.trunk->in_handler = _prev; \
572 } \
573} while(0)
574
575/** Call the fail callback (if set)
576 *
577 */
578#define DO_REQUEST_FAIL(_treq, _prev_state) \
579do { \
580 if ((_treq)->pub.trunk->funcs.request_fail) { \
581 request_t *request = (_treq)->pub.request; \
582 void *_prev = (_treq)->pub.trunk->in_handler; \
583 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_fail(request=%p, preq=%p, rctx=%p, state=%s uctx=%p)", \
584 (_treq)->pub.request, \
585 (_treq)->pub.preq, \
586 (_treq)->pub.rctx, \
587 fr_table_str_by_value(trunk_request_states, (_prev_state), "<INVALID>"), \
588 (_treq)->pub.trunk->uctx); \
589 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_fail; \
590 (_treq)->pub.trunk->funcs.request_fail((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, _prev_state, (_treq)->pub.trunk->uctx); \
591 (_treq)->pub.trunk->in_handler = _prev; \
592 } \
593} while(0)
594
595/** Call the free callback (if set)
596 *
597 */
598#define DO_REQUEST_FREE(_treq) \
599do { \
600 if ((_treq)->pub.trunk->funcs.request_free) { \
601 request_t *request = (_treq)->pub.request; \
602 void *_prev = (_treq)->pub.trunk->in_handler; \
603 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_free(request=%p, preq=%p, uctx=%p)", \
604 (_treq)->pub.request, \
605 (_treq)->pub.preq, \
606 (_treq)->pub.trunk->uctx); \
607 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_free; \
608 (_treq)->pub.trunk->funcs.request_free((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
609 (_treq)->pub.trunk->in_handler = _prev; \
610 } \
611} while(0)
612
613/** Write one or more requests to a connection
614 *
615 */
616#define DO_REQUEST_MUX(_tconn) \
617do { \
618 void *_prev = (_tconn)->pub.trunk->in_handler; \
619 DEBUG3("[%" PRIu64 "] Calling request_mux(el=%p, tconn=%p, conn=%p, uctx=%p)", \
620 (_tconn)->pub.conn->id, \
621 (_tconn)->pub.trunk->el, \
622 (_tconn), \
623 (_tconn)->pub.conn, \
624 (_tconn)->pub.trunk->uctx); \
625 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_mux; \
626 (_tconn)->pub.trunk->funcs.request_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
627 (_tconn)->pub.trunk->in_handler = _prev; \
628} while(0)
629
630/** Read one or more requests from a connection
631 *
632 */
633#define DO_REQUEST_DEMUX(_tconn) \
634do { \
635 void *_prev = (_tconn)->pub.trunk->in_handler; \
636 DEBUG3("[%" PRIu64 "] Calling request_demux(tconn=%p, conn=%p, uctx=%p)", \
637 (_tconn)->pub.conn->id, \
638 (_tconn), \
639 (_tconn)->pub.conn, \
640 (_tconn)->pub.trunk->uctx); \
641 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_demux; \
642 (_tconn)->pub.trunk->funcs.request_demux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
643 (_tconn)->pub.trunk->in_handler = _prev; \
644} while(0)
645
646/** Write one or more cancellation requests to a connection
647 *
648 */
649#define DO_REQUEST_CANCEL_MUX(_tconn) \
650do { \
651 if ((_tconn)->pub.trunk->funcs.request_cancel_mux) { \
652 void *_prev = (_tconn)->pub.trunk->in_handler; \
653 DEBUG3("[%" PRIu64 "] Calling request_cancel_mux(tconn=%p, conn=%p, uctx=%p)", \
654 (_tconn)->pub.conn->id, \
655 (_tconn), \
656 (_tconn)->pub.conn, \
657 (_tconn)->pub.trunk->uctx); \
658 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_cancel_mux; \
659 (_tconn)->pub.trunk->funcs.request_cancel_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
660 (_tconn)->pub.trunk->in_handler = _prev; \
661 } \
662} while(0)
663
664/** Allocate a new connection
665 *
666 */
667#define DO_CONNECTION_ALLOC(_tconn) \
668do { \
669 void *_prev = trunk->in_handler; \
670 DEBUG3("Calling connection_alloc(tconn=%p, el=%p, conf=%p, log_prefix=\"%s\", uctx=%p)", \
671 (_tconn), \
672 (_tconn)->pub.trunk->el, \
673 (_tconn)->pub.trunk->conf.conn_conf, \
674 trunk->log_prefix, \
675 (_tconn)->pub.trunk->uctx); \
676 (_tconn)->pub.trunk->in_handler = (void *) (_tconn)->pub.trunk->funcs.connection_alloc; \
677 (_tconn)->pub.conn = trunk->funcs.connection_alloc((_tconn), (_tconn)->pub.trunk->el, (_tconn)->pub.trunk->conf.conn_conf, (_tconn)->pub.trunk->log_prefix, trunk->uctx); \
678 (_tconn)->pub.trunk->in_handler = _prev; \
679 if (!(_tconn)->pub.conn) { \
680 ERROR("Failed creating new connection"); \
681 talloc_free(tconn); \
682 return -1; \
683 } \
684} while(0)
685
686/** Change what events the connection should be notified about
687 *
688 */
689#define DO_CONNECTION_NOTIFY(_tconn, _events) \
690do { \
691 if ((_tconn)->pub.trunk->funcs.connection_notify) { \
692 void *_prev = (_tconn)->pub.trunk->in_handler; \
693 DEBUG3("[%" PRIu64 "] Calling connection_notify(tconn=%p, conn=%p, el=%p, events=%s, uctx=%p)", \
694 (_tconn)->pub.conn->id, \
695 (_tconn), \
696 (_tconn)->pub.conn, \
697 (_tconn)->pub.trunk->el, \
698 fr_table_str_by_value(trunk_connection_events, (_events), "<INVALID>"), \
699 (_tconn)->pub.trunk->uctx); \
700 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.connection_notify; \
701 (_tconn)->pub.trunk->funcs.connection_notify((_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->el, (_events), (_tconn)->pub.trunk->uctx); \
702 (_tconn)->pub.trunk->in_handler = _prev; \
703 } \
704} while(0)
705
706#define IN_HANDLER(_trunk) (((_trunk)->in_handler) != NULL)
707#define IN_REQUEST_MUX(_trunk) (((_trunk)->funcs.request_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_mux))
708#define IN_REQUEST_DEMUX(_trunk) (((_trunk)->funcs.request_demux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_demux))
709#define IN_REQUEST_CANCEL_MUX(_trunk) (((_trunk)->funcs.request_cancel_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_cancel_mux))
710
711#define IS_SERVICEABLE(_tconn) ((_tconn)->pub.state & TRUNK_CONN_SERVICEABLE)
712#define IS_PROCESSING(_tconn) ((tconn)->pub.state & TRUNK_CONN_PROCESSING)
713
714/** Remove the current request from the backlog
715 *
716 */
717#define REQUEST_EXTRACT_BACKLOG(_treq) \
718do { \
719 int _ret; \
720 _ret = fr_heap_extract(&(_treq)->pub.trunk->backlog, _treq); \
721 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from backlog heap: %s", fr_strerror())) break; \
722} while (0)
723
724/** Remove the current request from the pending list
725 *
726 */
727#define REQUEST_EXTRACT_PENDING(_treq) \
728do { \
729 int _ret; \
730 _ret = fr_heap_extract(&(_treq)->pub.tconn->pending, _treq); \
731 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from pending heap: %s", fr_strerror())) break; \
732} while (0)
733
734/** Remove the current request from the partial slot
735 *
736 */
737#define REQUEST_EXTRACT_PARTIAL(_treq) \
738do { \
739 fr_assert((_treq)->pub.tconn->partial == treq); \
740 tconn->partial = NULL; \
741} while (0)
742
743/** Remove the current request from the sent list
744 *
745 */
746#define REQUEST_EXTRACT_SENT(_treq) fr_dlist_remove(&tconn->sent, treq)
747
748/** Remove the current request from the reapable list
749 *
750 */
751#define REQUEST_EXTRACT_REAPABLE(_treq) fr_dlist_remove(&tconn->reapable, treq)
752
753/** Remove the current request from the cancel list
754 *
755 */
756#define REQUEST_EXTRACT_CANCEL(_treq) fr_dlist_remove(&tconn->cancel, treq)
757
758/** Remove the current request from the cancel_partial slot
759 *
760 */
761#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq) \
762do { \
763 fr_assert((_treq)->pub.tconn->cancel_partial == treq); \
764 tconn->cancel_partial = NULL; \
765} while (0)
766
767/** Remove the current request from the cancel sent list
768 *
769 */
770#define REQUEST_EXTRACT_CANCEL_SENT(_treq) fr_dlist_remove(&tconn->cancel_sent, treq)
771
772/** Reorder the connections in the active heap
773 *
774 * fr_heap_extract will also error out if heap_id is bad - no need for assert
775 */
776#define CONN_REORDER(_tconn) \
777do { \
778 int _ret; \
779 if ((fr_minmax_heap_num_elements((_tconn)->pub.trunk->active) == 1)) break; \
780 if (!fr_cond_assert((_tconn)->pub.state == TRUNK_CONN_ACTIVE)) break; \
781 _ret = fr_minmax_heap_extract((_tconn)->pub.trunk->active, (_tconn)); \
782 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) break; \
783 fr_minmax_heap_insert((_tconn)->pub.trunk->active, (_tconn)); \
784} while (0)
785
786/** Call a list of watch functions associated with a state
787 *
788 */
790{
791 /*
792 * Nested watcher calls are not allowed
793 * and shouldn't be possible because of
794 * deferred signal processing.
795 */
796 fr_assert(trunk->next_watcher == NULL);
797
798 while ((trunk->next_watcher = fr_dlist_next(list, trunk->next_watcher))) {
799 trunk_watch_entry_t *entry = trunk->next_watcher;
800 bool oneshot = entry->oneshot; /* Watcher could be freed, so store now */
801
802 if (!entry->enabled) continue;
803 if (oneshot) trunk->next_watcher = fr_dlist_remove(list, entry);
804
805 entry->func(trunk, trunk->pub.state, state, entry->uctx);
806
807 if (oneshot) talloc_free(entry);
808 }
809 trunk->next_watcher = NULL;
810}
811
812/** Call the state change watch functions
813 *
814 */
815#define CALL_WATCHERS(_trunk, _state) \
816do { \
817 if (fr_dlist_empty(&(_trunk)->watch[_state])) break; \
818 trunk_watch_call((_trunk), &(_trunk)->watch[_state], _state); \
819} while(0)
820
821/** Remove a watch function from a trunk state list
822 *
823 * @param[in] trunk The trunk to remove the watcher from.
824 * @param[in] state to remove the watch from.
825 * @param[in] watch Function to remove.
826 * @return
827 * - 0 if the function was removed successfully.
828 * - -1 if the function wasn't present in the watch list.
829 * - -2 if an invalid state was passed.
830 */
832{
833 trunk_watch_entry_t *entry = NULL;
834 fr_dlist_head_t *list;
835
836 if (state >= TRUNK_STATE_MAX) return -2;
837
838 list = &trunk->watch[state];
839 while ((entry = fr_dlist_next(list, entry))) {
840 if (entry->func == watch) {
841 if (trunk->next_watcher == entry) {
842 trunk->next_watcher = fr_dlist_remove(list, entry);
843 } else {
844 fr_dlist_remove(list, entry);
845 }
846 talloc_free(entry);
847 return 0;
848 }
849 }
850
851 return -1;
852}
853
854/** Add a watch entry to the trunk state list
855 *
856 * @param[in] trunk The trunk to add the watcher to.
857 * @param[in] state to watch for.
858 * @param[in] watch Function to add.
859 * @param[in] oneshot Should this watcher only be run once.
860 * @param[in] uctx Context to pass to function.
861 * @return
862 * - NULL if an invalid state is passed.
863 * - A new watch entry handle on success.
864 */
866 trunk_watch_t watch, bool oneshot, void const *uctx)
867{
868 trunk_watch_entry_t *entry;
869 fr_dlist_head_t *list;
870
871 if (state >= TRUNK_STATE_MAX) return NULL;
872
873 list = &trunk->watch[state];
874 MEM(entry = talloc_zero(trunk, trunk_watch_entry_t));
875
876 entry->func = watch;
877 entry->oneshot = oneshot;
878 entry->enabled = true;
879 memcpy(&entry->uctx, &uctx, sizeof(entry->uctx));
880 fr_dlist_insert_tail(list, entry);
881
882 return entry;
883}
884
885#define TRUNK_STATE_TRANSITION(_new) \
886do { \
887 DEBUG3("Trunk changed state %s -> %s", \
888 fr_table_str_by_value(trunk_states, trunk->pub.state, "<INVALID>"), \
889 fr_table_str_by_value(trunk_states, _new, "<INVALID>")); \
890 CALL_WATCHERS(trunk, _new); \
891 trunk->pub.state = _new; \
892} while (0)
893
894static void trunk_request_enter_backlog(trunk_request_t *treq, bool new);
895static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new);
904
905static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out,
906 trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify);
907
908static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now);
909static inline void trunk_connection_auto_full(trunk_connection_t *tconn);
910static inline void trunk_connection_auto_unfull(trunk_connection_t *tconn);
911static inline void trunk_connection_readable(trunk_connection_t *tconn);
912static inline void trunk_connection_writable(trunk_connection_t *tconn);
920
921static void trunk_rebalance(trunk_t *trunk);
922static void trunk_manage(trunk_t *trunk, fr_time_t now);
923static void _trunk_timer(fr_event_list_t *el, fr_time_t now, void *uctx);
924static void trunk_backlog_drain(trunk_t *trunk);
925
926/** Compare two protocol requests
927 *
928 * Allows protocol requests to be prioritised with a function
929 * specified by the API client. Defaults to by pointer address
930 * if no function is specified.
931 *
932 * @param[in] a treq to compare to b.
933 * @param[in] b treq to compare to a.
934 * @return
935 * - +1 if a > b.
936 * - 0 if a == b.
937 * - -1 if a < b.
938 */
939static int8_t _trunk_request_prioritise(void const *a, void const *b)
940{
943
944 fr_assert(treq_a->pub.trunk == treq_b->pub.trunk);
945
946 return treq_a->pub.trunk->funcs.request_prioritise(treq_a->pub.preq, treq_b->pub.preq);
947}
948
949/** Remove a request from all connection lists
950 *
951 * A common function used by init, fail, complete state functions to disassociate
952 * a request from a connection in preparation for freeing or reassignment.
953 *
954 * Despite its unassuming name, this function is *the* place to put calls to
955 * functions which need to be called when the number of requests associated with
956 * a connection changes.
957 *
958 * Trunk requests will always be passed to this function before they're removed
959 * from a connection, even if the requests are being freed.
960 *
961 * @param[in] treq to trigger a state change for.
962 */
964{
965 trunk_connection_t *tconn = treq->pub.tconn;
966 trunk_t *trunk = treq->pub.trunk;
967
968 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
969
970 switch (treq->pub.state) {
972 return; /* Not associated with connection */
973
976 break;
977
980 break;
981
984 break;
985
988 break;
989
992 break;
993
996 break;
997
1000 break;
1001
1002 default:
1003 fr_assert(0);
1004 break;
1005 }
1006
1007 /*
1008 * If the request wasn't associated with a
1009 * connection, then there's nothing more
1010 * to do.
1011 */
1012 if (!tconn) return;
1013
1014 {
1015 request_t *request = treq->pub.request;
1016
1017 ROPTIONAL(RDEBUG3, DEBUG3, "%s Trunk connection released request %" PRIu64,
1018 tconn->pub.conn->name, treq->id);
1019 }
1020 /*
1021 * Release any connection specific resources the
1022 * treq holds.
1023 */
1025
1026 switch (tconn->pub.state){
1027 case TRUNK_CONN_FULL:
1028 trunk_connection_auto_unfull(tconn); /* Check if we can switch back to active */
1029 if (tconn->pub.state == TRUNK_CONN_FULL) break; /* Only fallthrough if conn is now active */
1031
1032 case TRUNK_CONN_ACTIVE:
1033 CONN_REORDER(tconn);
1034 break;
1035
1036 default:
1037 break;
1038 }
1039
1040 treq->pub.tconn = NULL;
1041
1042 /*
1043 * Request removed from the connection
1044 * see if we need up deregister I/O events.
1045 */
1047}
1048
1049/** Transition a request to the unassigned state, in preparation for re-assignment
1050 *
1051 * @note treq->tconn may be inviable after calling
1052 * if treq->conn and connection_signals_pause are not used.
1053 * This is due to call to trunk_request_remove_from_conn.
1054 *
1055 * @param[in] treq to trigger a state change for.
1056 */
1058{
1059 trunk_t *trunk = treq->pub.trunk;
1060
1061 switch (treq->pub.state) {
1063 return;
1064
1067 break;
1068
1074 break;
1075
1076 default:
1078 }
1079
1081}
1082
1083/** Transition a request to the backlog state, adding it to the backlog of the trunk
1084 *
1085 * @note treq->tconn and treq may be inviable after calling
1086 * if treq->conn and connection_signals_pause are not used.
1087 * This is due to call to trunk_manage.
1088 *
1089 * @param[in] treq to trigger a state change for.
1090 * @param[in] new Whether this is a new request.
1091 */
1093{
1094 trunk_connection_t *tconn = treq->pub.tconn;
1095 trunk_t *trunk = treq->pub.trunk;
1096
1097 switch (treq->pub.state) {
1100 break;
1101
1104 break;
1105
1108 break;
1109
1110 default:
1112 }
1113
1115 fr_heap_insert(&trunk->backlog, treq); /* Insert into the backlog heap */
1116
1117 /*
1118 * A new request has entered the trunk.
1119 * Re-calculate request/connection ratios.
1120 */
1121 if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1122
1123 /*
1124 * To reduce latency, if there's no connections
1125 * in the connecting state, call the trunk manage
1126 * function immediately.
1127 *
1128 * Likewise, if there's draining connections
1129 * which could be moved back to active call
1130 * the trunk manage function.
1131 *
1132 * Remember requests only enter the backlog if
1133 * there's no connections which can service them.
1134 */
1138 }
1139}
1140
1141/** Transition a request to the pending state, adding it to the backlog of an active connection
1142 *
1143 * All trunk requests being added to a connection get passed to this function.
1144 * All trunk requests being removed from a connection get passed to #trunk_request_remove_from_conn.
1145 *
1146 * @note treq->tconn and treq may be inviable after calling
1147 * if treq->conn and connection_signals_pause is not used.
1148 * This is due to call to trunk_connection_event_update.
1149 *
1150 * @param[in] treq to trigger a state change for.
1151 * @param[in] tconn to enqueue the request on.
1152 * @param[in] new Whether this is a new request.
1153 */
1155{
1156 trunk_t *trunk = treq->pub.trunk;
1157
1158 fr_assert(tconn->pub.trunk == trunk);
1159 fr_assert(IS_PROCESSING(tconn));
1160
1161 switch (treq->pub.state) {
1164 fr_assert(!treq->pub.tconn);
1165 break;
1166
1168 fr_assert(!treq->pub.tconn);
1170 break;
1171
1172 case TRUNK_REQUEST_STATE_CANCEL: /* Moved from another connection */
1174 break;
1175
1176 default:
1178 }
1179
1180 /*
1181 * Assign the new connection first this first so
1182 * it appears in the state log.
1183 */
1184 treq->pub.tconn = tconn;
1185
1187
1188 {
1189 request_t *request = treq->pub.request;
1190
1191 ROPTIONAL(RDEBUG, DEBUG3, "%s Trunk connection assigned request %"PRIu64,
1192 tconn->pub.conn->name, treq->id);
1193 }
1194 fr_heap_insert(&tconn->pending, treq);
1195
1196 /*
1197 * A new request has entered the trunk.
1198 * Re-calculate request/connection ratios.
1199 */
1200 if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1201
1202 /*
1203 * Check if we need to automatically transition the
1204 * connection to full.
1205 */
1207
1208 /*
1209 * Reorder the connection in the heap now it has an
1210 * additional request.
1211 */
1212 if (tconn->pub.state == TRUNK_CONN_ACTIVE) CONN_REORDER(tconn);
1213
1214 /*
1215 * We have a new request, see if we need to register
1216 * for I/O events.
1217 */
1219}
1220
1221/** Transition a request to the partial state, indicating that is has been partially sent
1222 *
1223 * @param[in] treq to trigger a state change for.
1224 */
1226{
1227 trunk_connection_t *tconn = treq->pub.tconn;
1228 trunk_t *trunk = treq->pub.trunk;
1229
1230 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1231
1232 switch (treq->pub.state) {
1233 case TRUNK_REQUEST_STATE_PENDING: /* All requests go through pending, even requeued ones */
1235 break;
1236
1237 default:
1239 }
1240
1241 fr_assert(!tconn->partial);
1242 tconn->partial = treq;
1243
1245}
1246
1247/** Transition a request to the sent state, indicating that it's been sent in its entirety
1248 *
1249 * @note treq->tconn and treq may be inviable after calling
1250 * if treq->conn and connection_signals_pause is not used.
1251 * This is due to call to trunk_connection_event_update.
1252 *
1253 * @param[in] treq to trigger a state change for.
1254 */
1256{
1257 trunk_connection_t *tconn = treq->pub.tconn;
1258 trunk_t *trunk = treq->pub.trunk;
1259
1260 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1261
1262 switch (treq->pub.state) {
1265 break;
1266
1269 break;
1270
1271 default:
1273 }
1274
1276 fr_dlist_insert_tail(&tconn->sent, treq);
1277
1278 /*
1279 * Update the connection's sent stats if this is the
1280 * first time this request is being sent.
1281 */
1282 if (!treq->sent) {
1283 trunk->pub.last_write_success = fr_time();
1284
1286 tconn->sent_count++;
1287 treq->sent = true;
1288
1289 /*
1290 * Enforces max_uses
1291 */
1292 if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1294 }
1295 }
1296
1297 /*
1298 * We just sent a request, we probably need
1299 * to tell the event loop we want to be
1300 * notified if there's data available.
1301 */
1303}
1304
1305/** Transition a request to the reapable state, indicating that it's been sent in its entirety, but no response is expected
1306 *
1307 * @note Largely a replica of trunk_request_enter_sent.
1308 *
1309 * @param[in] treq to trigger a state change for.
1310 */
1312{
1313 trunk_connection_t *tconn = treq->pub.tconn;
1314 trunk_t *trunk = treq->pub.trunk;
1315
1316 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1317
1318 switch (treq->pub.state) {
1321 break;
1322
1325 break;
1326
1327 default:
1329 }
1330
1332 fr_dlist_insert_tail(&tconn->reapable, treq);
1333
1334 if (!treq->sent) {
1335 tconn->sent_count++;
1336 treq->sent = true;
1337
1338 if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1340 }
1341 }
1342
1344}
1345
1346/** Transition a request to the cancel state, placing it in a connection's cancellation list
1347 *
1348 * If a request_cancel_send callback is provided, that callback will
1349 * be called periodically for requests which were cancelled due to
1350 * a signal.
1351 *
1352 * The request_cancel_send callback will dequeue cancelled requests
1353 * and inform a remote server that the result is no longer required.
1354 *
1355 * A request must enter this state before being added to the backlog
1356 * of another connection if it's been sent or partially sent.
1357 *
1358 * @note treq->tconn and treq may be inviable after calling
1359 * if treq->conn and connection_signals_pause is not used.
1360 * This is due to call to trunk_connection_event_update.
1361 *
1362 * @param[in] treq to trigger a state change for.
1363 * @param[in] reason Why the request was cancelled.
1364 * Should be one of:
1365 * - TRUNK_CANCEL_REASON_SIGNAL request cancelled
1366 * because of a signal from the interpreter.
1367 * - TRUNK_CANCEL_REASON_MOVE request cancelled
1368 * because the connection failed and it needs
1369 * to be assigned to a new connection.
1370 * - TRUNK_CANCEL_REASON_REQUEUE request cancelled
1371 * as it needs to be resent on the same connection.
1372 */
1374{
1375 trunk_connection_t *tconn = treq->pub.tconn;
1376 trunk_t *trunk = treq->pub.trunk;
1377
1378 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1379
1380 switch (treq->pub.state) {
1383 break;
1384
1387 break;
1388
1391 break;
1392
1393 default:
1395 }
1396
1398 fr_dlist_insert_tail(&tconn->cancel, treq);
1399 treq->cancel_reason = reason;
1400
1401 DO_REQUEST_CANCEL(treq, reason);
1402
1403 /*
1404 * Our treq is no longer bound to an actual
1405 * request_t *, as we can't guarantee the
1406 * lifetime of the original request_t *.
1407 */
1408 if (treq->cancel_reason == TRUNK_CANCEL_REASON_SIGNAL) treq->pub.request = NULL;
1409
1410 /*
1411 * Register for I/O write events if we need to.
1412 */
1414}
1415
1416/** Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot
1417 *
1418 * The request_demux function is then responsible for signalling
1419 * that the cancel request is complete when the remote server
1420 * acknowledges the cancellation request.
1421 *
1422 * @param[in] treq to trigger a state change for.
1423 */
1425{
1426 trunk_connection_t *tconn = treq->pub.tconn;
1427 trunk_t *trunk = treq->pub.trunk;
1428
1429 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1432
1433 switch (treq->pub.state) {
1434 case TRUNK_REQUEST_STATE_CANCEL: /* The only valid state cancel_sent can be reached from */
1436 break;
1437
1438 default:
1440 }
1441
1443 fr_assert(!tconn->partial);
1444 tconn->cancel_partial = treq;
1445}
1446
1447/** Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list
1448 *
1449 * The request_demux function is then responsible for signalling
1450 * that the cancel request is complete when the remote server
1451 * acknowledges the cancellation request.
1452 *
1453 * @note treq->tconn and treq may be inviable after calling
1454 * if treq->conn and connection_signals_pause is not used.
1455 * This is due to call to trunk_connection_event_update.
1456 *
1457 * @param[in] treq to trigger a state change for.
1458 */
1460{
1461 trunk_connection_t *tconn = treq->pub.tconn;
1462 trunk_t *trunk = treq->pub.trunk;
1463
1464 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1467
1468 switch (treq->pub.state) {
1471 break;
1472
1475 break;
1476
1477 default:
1479 }
1480
1482 fr_dlist_insert_tail(&tconn->cancel_sent, treq);
1483
1484 /*
1485 * De-register for I/O write events
1486 * and register the read events
1487 * to drain the cancel ACKs.
1488 */
1490}
1491
1492/** Cancellation was acked, the request is complete, free it
1493 *
1494 * The API client will not be informed, as the original request_t *
1495 * will likely have been freed by this point.
1496 *
1497 * @note treq will be inviable after a call to this function.
1498 * treq->tconn may be inviable after calling
1499 * if treq->conn and connection_signals_pause is not used.
1500 * This is due to call to trunk_request_remove_from_conn.
1501 *
1502 * @param[in] treq to mark as complete.
1503 */
1505{
1506 trunk_connection_t *tconn = treq->pub.tconn;
1507 trunk_t *trunk = treq->pub.trunk;
1508
1509 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1510 if (!fr_cond_assert(!treq->pub.request)) return; /* Only a valid state for request_t * which have been cancelled */
1511
1512 switch (treq->pub.state) {
1515 break;
1516
1517 default:
1519 }
1520
1522
1524 trunk_request_free(&treq); /* Free the request */
1525}
1526
1527/** Request completed successfully, inform the API client and free the request
1528 *
1529 * @note treq will be inviable after a call to this function.
1530 * treq->tconn may also be inviable due to call to
1531 * trunk_request_remove_from_conn.
1532 *
1533 * @param[in] treq to mark as complete.
1534 */
1536{
1537 trunk_connection_t *tconn = treq->pub.tconn;
1538 trunk_t *trunk = treq->pub.trunk;
1539
1540 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1541
1542 switch (treq->pub.state) {
1547 break;
1548
1549 default:
1551 }
1552
1554 DO_REQUEST_COMPLETE(treq);
1555 trunk_request_free(&treq); /* Free the request */
1556}
1557
1558/** Request failed, inform the API client and free the request
1559 *
1560 * @note treq will be inviable after a call to this function.
1561 * treq->tconn may also be inviable due to call to
1562 * trunk_request_remove_from_conn.
1563 *
1564 * @param[in] treq to mark as failed.
1565 */
1567{
1568 trunk_connection_t *tconn = treq->pub.tconn;
1569 trunk_t *trunk = treq->pub.trunk;
1570 trunk_request_state_t prev = treq->pub.state;
1571
1572 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1573
1574 switch (treq->pub.state) {
1577 break;
1578
1579 default:
1581 break;
1582 }
1583
1585 DO_REQUEST_FAIL(treq, prev);
1586 trunk_request_free(&treq); /* Free the request */
1587}
1588
1589/** Check to see if a trunk request can be enqueued
1590 *
1591 * @param[out] tconn_out Connection the request may be enqueued on.
1592 * @param[in] trunk To enqueue requests on.
1593 * @param[in] request associated with the treq (if any).
1594 * @return
1595 * - TRUNK_ENQUEUE_OK caller should enqueue request on provided tconn.
1596 * - TRUNK_ENQUEUE_IN_BACKLOG Request should be queued in the backlog.
1597 * - TRUNK_ENQUEUE_NO_CAPACITY Unable to enqueue request as we have no spare
1598 * connections or backlog space.
1599 * - TRUNK_ENQUEUE_DST_UNAVAILABLE Can't enqueue because the destination is
1600 * unreachable.
1601 */
1603 request_t *request)
1604{
1605 trunk_connection_t *tconn;
1606 /*
1607 * If we have an active connection then
1608 * return that.
1609 */
1610 tconn = fr_minmax_heap_min_peek(trunk->active);
1611 if (tconn) {
1612 *tconn_out = tconn;
1613 return TRUNK_ENQUEUE_OK;
1614 }
1615
1616 /*
1617 * Unlike the connection pool, we don't need
1618 * to drive any internal processes by feeding
1619 * it requests.
1620 *
1621 * If the last event to occur was a failure
1622 * we refuse to enqueue new requests until
1623 * one or more connections comes online.
1624 */
1625 if (!trunk->conf.backlog_on_failed_conn &&
1626 fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
1627 fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed)) {
1629 RWARN, WARN, "Refusing to enqueue requests - "
1630 "No active connections and last event was a connection failure");
1631
1633 }
1634
1635
1636 /*
1637 * Only enforce if we're limiting maximum
1638 * number of connections, and maximum
1639 * number of requests per connection.
1640 *
1641 * The alloc function also checks this
1642 * which is why this is only done for
1643 * debug builds.
1644 */
1645 if (trunk->conf.max_req_per_conn && trunk->conf.max) {
1646 uint64_t limit;
1647
1648 limit = trunk->conf.max * (uint64_t)trunk->conf.max_req_per_conn;
1649 if (limit > 0) {
1650 uint64_t total_reqs;
1651
1652 total_reqs = trunk_request_count_by_state(trunk, TRUNK_CONN_ALL,
1654 if (total_reqs >= (limit + trunk->conf.max_backlog)) {
1656 RWARN, WARN, "Refusing to alloc requests - "
1657 "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
1658 "plus %u backlog requests reached",
1659 limit, trunk->conf.max, trunk->conf.max_req_per_conn,
1660 trunk->conf.max_backlog);
1662 }
1663 }
1664 }
1665
1667}
1668
1669/** Enqueue a request which has never been assigned to a connection or was previously cancelled
1670 *
1671 * @param[in] treq to re enqueue. Must have been removed
1672 * from its existing connection with
1673 * #trunk_connection_requests_dequeue.
1674 * @return
1675 * - TRUNK_ENQUEUE_OK Request was re-enqueued.
1676 * - TRUNK_ENQUEUE_NO_CAPACITY Request enqueueing failed because we're at capacity.
1677 * - TRUNK_ENQUEUE_DST_UNAVAILABLE Enqueuing failed for some reason.
1678 * Usually because the connection to the resource is down.
1679 */
1681{
1682 trunk_t *trunk = treq->pub.trunk;
1683 trunk_connection_t *tconn = NULL;
1684 trunk_enqueue_t ret;
1685
1686 /*
1687 * Must *NOT* still be assigned to another connection
1688 */
1689 fr_assert(!treq->pub.tconn);
1690
1691 ret = trunk_request_check_enqueue(&tconn, trunk, treq->pub.request);
1692 switch (ret) {
1693 case TRUNK_ENQUEUE_OK:
1694 if (trunk->conf.always_writable) {
1696 trunk_request_enter_pending(treq, tconn, false);
1699 } else {
1700 trunk_request_enter_pending(treq, tconn, false);
1701 }
1702 break;
1703
1705 /*
1706 * No more connections and request
1707 * is already in the backlog.
1708 *
1709 * Signal our caller it should stop
1710 * trying to drain the backlog.
1711 */
1713 trunk_request_enter_backlog(treq, false);
1714 break;
1715
1716 default:
1717 break;
1718 }
1719
1720 return ret;
1721}
1722
1723/** Shift requests in the specified states onto new connections
1724 *
1725 * This function will blindly dequeue any requests in the specified state and get
1726 * them back to the unassigned state, cancelling any sent or partially sent requests.
1727 *
1728 * This function does not check that dequeuing a request in a particular state is a
1729 * sane or sensible thing to do, that's up to the caller!
1730 *
1731 * @param[out] out A list to insert the newly dequeued and unassigned
1732 * requests into.
1733 * @param[in] tconn to dequeue requests from.
1734 * @param[in] states Dequeue request in these states.
1735 * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
1736 */
1738 int states, uint64_t max)
1739{
1740 trunk_request_t *treq;
1741 uint64_t count = 0;
1742
1743 if (max == 0) max = UINT64_MAX;
1744
1745#define OVER_MAX_CHECK if (++count > max) return (count - 1)
1746
1747#define DEQUEUE_ALL(_src_list, _state) do { \
1748 while ((treq = fr_dlist_head(_src_list))) { \
1749 OVER_MAX_CHECK; \
1750 fr_assert(treq->pub.state == (_state)); \
1751 trunk_request_enter_unassigned(treq); \
1752 fr_dlist_insert_tail(out, treq); \
1753 } } while (0)
1754
1755 /*
1756 * Don't need to do anything with
1757 * cancellation requests.
1758 */
1759 if (states & TRUNK_REQUEST_STATE_CANCEL) DEQUEUE_ALL(&tconn->cancel,
1761
1762 /*
1763 * ...same with cancel inform
1764 */
1767
1768 /*
1769 * ....same with cancel partial
1770 */
1773 treq = tconn->cancel_partial;
1774 if (treq) {
1778 }
1779 }
1780
1781 /*
1782 * ...and pending.
1783 */
1784 if (states & TRUNK_REQUEST_STATE_PENDING) {
1785 while ((treq = fr_heap_peek(tconn->pending))) {
1790 }
1791 }
1792
1793 /*
1794 * Cancel partially sent requests
1795 */
1796 if (states & TRUNK_REQUEST_STATE_PARTIAL) {
1798 treq = tconn->partial;
1799 if (treq) {
1801
1802 /*
1803 * Don't allow the connection to change state whilst
1804 * we're draining requests from it.
1805 */
1811 }
1812 }
1813
1814 /*
1815 * Cancel sent requests
1816 */
1817 if (states & TRUNK_REQUEST_STATE_SENT) {
1818 /*
1819 * Don't allow the connection to change state whilst
1820 * we're draining requests from it.
1821 */
1823 while ((treq = fr_dlist_head(&tconn->sent))) {
1826
1830 }
1832 }
1833
1834 return count;
1835}
1836
1837/** Remove requests in specified states from a connection, attempting to distribute them to new connections
1838 *
1839 * @param[in] tconn To remove requests from.
1840 * @param[in] states One or more states or'd together.
1841 * @param[in] max The maximum number of requests to dequeue.
1842 * 0 for unlimited.
1843 * @param[in] fail_bound If true causes any requests bound to the connection to fail.
1844 * If false bound requests will not be moved.
1845 *
1846 * @return the number of requests re-queued.
1847 */
1848static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
1849{
1850 trunk_t *trunk = tconn->pub.trunk;
1851 fr_dlist_head_t to_process;
1852 trunk_request_t *treq = NULL;
1853 uint64_t moved = 0;
1854
1855 if (max == 0) max = UINT64_MAX;
1856
1857 fr_dlist_talloc_init(&to_process, trunk_request_t, entry);
1858
1859 /*
1860 * Prevent the connection changing state whilst we're
1861 * working with it.
1862 *
1863 * There's a user callback that can be called by
1864 * trunk_request_enqueue_existing which can reconnect
1865 * the connection.
1866 */
1868
1869 /*
1870 * Remove non-cancelled requests from the connection
1871 */
1872 moved += trunk_connection_requests_dequeue(&to_process, tconn, states & ~TRUNK_REQUEST_STATE_CANCEL_ALL, max);
1873
1874 /*
1875 * Prevent requests being requeued on the same trunk
1876 * connection, which would break rebalancing.
1877 *
1878 * This is a bit of a hack, but nothing should test
1879 * for connection/list consistency in this code,
1880 * and if something is added later, it'll be flagged
1881 * by the tests.
1882 */
1883 if (tconn->pub.state == TRUNK_CONN_ACTIVE) {
1884 int ret;
1885
1886 ret = fr_minmax_heap_extract(trunk->active, tconn);
1887 if (!fr_cond_assert_msg(ret == 0,
1888 "Failed extracting conn from active heap: %s", fr_strerror())) goto done;
1889
1890 }
1891
1892 /*
1893 * Loop over all the requests we gathered and
1894 * redistribute them to new connections.
1895 */
1896 while ((treq = fr_dlist_next(&to_process, treq))) {
1897 trunk_request_t *prev;
1898
1899 prev = fr_dlist_remove(&to_process, treq);
1900
1901 /*
1902 * Attempts to re-queue a request
1903 * that's bound to a connection
1904 * results in a failure.
1905 */
1906 if (treq->bound_to_conn) {
1907 if (fail_bound || !IS_SERVICEABLE(tconn)) {
1909 } else {
1910 trunk_request_enter_pending(treq, tconn, false);
1911 }
1912 goto next;
1913 }
1914
1915 switch (trunk_request_enqueue_existing(treq)) {
1916 case TRUNK_ENQUEUE_OK:
1917 break;
1918
1919 /*
1920 * A connection failed, and
1921 * there's no other connections
1922 * available to deal with the
1923 * load, it's been placed back
1924 * in the backlog.
1925 */
1927 break;
1928
1929 /*
1930 * If we fail to re-enqueue then
1931 * there's nothing to do except
1932 * fail the request.
1933 */
1936 case TRUNK_ENQUEUE_FAIL:
1938 break;
1939 }
1940 next:
1941 treq = prev;
1942 }
1943
1944 /*
1945 * Add the connection back into the active list
1946 */
1947 if (tconn->pub.state == TRUNK_CONN_ACTIVE) {
1948 int ret;
1949
1950 ret = fr_minmax_heap_insert(trunk->active, tconn);
1951 if (!fr_cond_assert_msg(ret == 0,
1952 "Failed re-inserting conn into active heap: %s", fr_strerror())) goto done;
1953 }
1954 if (moved >= max) goto done;
1955
1956 /*
1957 * Deal with the cancelled requests specially we can't
1958 * queue them up again as they were only valid on that
1959 * specific connection.
1960 *
1961 * We just need to run them to completion which, as
1962 * they should already be in the unassigned state,
1963 * just means freeing them.
1964 */
1965 moved += trunk_connection_requests_dequeue(&to_process, tconn,
1966 states & TRUNK_REQUEST_STATE_CANCEL_ALL, max - moved);
1967 while ((treq = fr_dlist_next(&to_process, treq))) {
1968 trunk_request_t *prev;
1969
1970 prev = fr_dlist_remove(&to_process, treq);
1971 trunk_request_free(&treq);
1972 treq = prev;
1973 }
1974
1975done:
1976
1977 /*
1978 * Always re-calculate the request/connection
1979 * ratio at the end.
1980 *
1981 * This avoids having the state transition
1982 * functions do it.
1983 *
1984 * The ratio would be wrong when they calculated
1985 * it anyway, because a bunch of requests are
1986 * dequeued from the connection and temporarily
1987 * cease to exist from the perspective of the
1988 * trunk_requests_per_connection code.
1989 */
1990 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1991
1993 return moved;
1994}
1995
1996/** Move requests off of a connection and requeue elsewhere
1997 *
1998 * @note We don't re-queue on draining or draining to free, as requests should have already been
1999 * moved off of the connection. It's also dangerous as the trunk management code main
2000 * clean up a connection in this state when it's run on re-queue, and then the caller
2001 * may try and access a now freed connection.
2002 *
2003 * @param[in] tconn to move requests off of.
2004 * @param[in] states Only move requests in this state.
2005 * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
2006 * @param[in] fail_bound If true causes any requests bound to the connection to fail.
2007 * If false bound requests will not be moved.
2008 * @return The number of requests requeued.
2009 */
2010uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
2011{
2012 switch (tconn->pub.state) {
2013 case TRUNK_CONN_ACTIVE:
2014 case TRUNK_CONN_FULL:
2016 return trunk_connection_requests_requeue_priv(tconn, states, max, fail_bound);
2017
2018 default:
2019 return 0;
2020 }
2021}
2022
2023/** Signal a partial write
2024 *
2025 * Where there's high load, and the outbound write buffer is full
2026 *
2027 * @param[in] treq to signal state change for.
2028 */
2030{
2031 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2032
2034 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2035
2036 switch (treq->pub.state) {
2039 break;
2040
2041 default:
2042 return;
2043 }
2044}
2045
2046/** Signal that the request was written to a connection successfully
2047 *
2048 * @param[in] treq to signal state change for.
2049 */
2051{
2052 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2053
2055 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2056
2057 switch (treq->pub.state) {
2061 break;
2062
2063 default:
2064 return;
2065 }
2066}
2067
2068/** Signal that the request was written to a connection successfully, but no response is expected
2069 *
2070 * @param[in] treq to signal state change for.
2071 */
2073{
2074 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2075
2077 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2078
2079 switch (treq->pub.state) {
2083 break;
2084
2085 default:
2086 return;
2087 }
2088}
2089
2090/** Signal that a trunk request is complete
2091 *
2092 * The API client will be informed that the request is now complete.
2093 */
2095{
2096 trunk_t *trunk = treq->pub.trunk;
2097
2098 if (!fr_cond_assert_msg(trunk, "treq not associated with trunk")) return;
2099
2100 /*
2101 * We assume that if the request is being signalled
2102 * as complete from the demux function, that it was
2103 * a successful read.
2104 *
2105 * If this assumption turns out to be incorrect
2106 * then we need to add an argument to signal_complete
2107 * to indicate if this is a successful read.
2108 */
2109 if (IN_REQUEST_DEMUX(trunk)) {
2110 trunk_connection_t *tconn = treq->pub.tconn;
2111
2112 trunk->pub.last_read_success = fr_time();
2114 }
2115
2116 switch (treq->pub.state) {
2118 case TRUNK_REQUEST_STATE_PENDING: /* Got immediate response, i.e. cached */
2121 break;
2122
2123 default:
2124 return;
2125 }
2126}
2127
2128/** Signal that a trunk request failed
2129 *
2130 * The API client will be informed that the request has failed.
2131 */
2133{
2134 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2135
2137}
2138
2139/** Cancel a trunk request
2140 *
2141 * treq can be in any state, but requests to cancel if the treq is not in
2142 * the TRUNK_REQUEST_STATE_PARTIAL or TRUNK_REQUEST_STATE_SENT state will be ignored.
2143 *
2144 * The complete or failed callbacks will not be called here, as it's assumed the request_t *
2145 * is now inviable as it's being cancelled.
2146 *
2147 * The free function however, is called, and that should be used to perform necessary
2148 * cleanup.
2149 *
2150 * @param[in] treq to signal state change for.
2151 */
2153{
2154 trunk_t *trunk;
2155
2156 /*
2157 * Ensure treq hasn't been freed
2158 */
2159 (void)talloc_get_type_abort(treq, trunk_request_t);
2160
2161 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2162
2164 "%s cannot be called within a handler", __FUNCTION__)) return;
2165
2166 trunk = treq->pub.trunk;
2167
2168 switch (treq->pub.state) {
2169 /*
2170 * We don't call the complete or failed callbacks
2171 * as the request and rctx are no longer viable.
2172 */
2175 {
2176 trunk_connection_t *tconn = treq->pub.tconn;
2177
2178 /*
2179 * Don't allow connection state changes
2180 */
2184 "Bad state %s after cancellation",
2185 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"))) {
2187 return;
2188 }
2189 /*
2190 * No cancel muxer. We're done.
2191 *
2192 * If we do have a cancel mux function,
2193 * the next time this connection becomes
2194 * writable, we'll call the cancel mux
2195 * function.
2196 *
2197 * We don't run the complete or failed
2198 * callbacks here as the request is
2199 * being cancelled.
2200 */
2201 if (!trunk->funcs.request_cancel_mux) {
2203 trunk_request_free(&treq);
2204 }
2206 }
2207 break;
2208
2209 /*
2210 * We're already in the process of cancelling a
2211 * request, so ignore duplicate signals.
2212 */
2217 break;
2218
2219 /*
2220 * For any other state, we just release the request
2221 * from its current connection and free it.
2222 */
2223 default:
2225 trunk_request_free(&treq);
2226 break;
2227 }
2228}
2229
2230/** Signal a partial cancel write
2231 *
2232 * Where there's high load, and the outbound write buffer is full
2233 *
2234 * @param[in] treq to signal state change for.
2235 */
2237{
2238 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2239
2241 "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2242
2243 switch (treq->pub.state) {
2246 break;
2247
2248 default:
2249 return;
2250 }
2251}
2252
2253/** Signal that a remote server has been notified of the cancellation
2254 *
2255 * Called from request_cancel_mux to indicate that the datastore has been informed
2256 * that the response is no longer needed.
2257 *
2258 * @param[in] treq to signal state change for.
2259 */
2261{
2262 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2263
2265 "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2266
2267 switch (treq->pub.state) {
2271 break;
2272
2273 default:
2274 break;
2275 }
2276}
2277
2278/** Signal that a remote server acked our cancellation
2279 *
2280 * Called from request_demux to indicate that it got an ack for the cancellation.
2281 *
2282 * @param[in] treq to signal state change for.
2283 */
2285{
2286 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2287
2289 "%s can only be called from within request_demux or request_cancel_mux handlers",
2290 __FUNCTION__)) return;
2291
2292 switch (treq->pub.state) {
2294 /*
2295 * This is allowed, as we may not need to wait
2296 * for the database to ACK our cancellation
2297 * request.
2298 *
2299 * Note: TRUNK_REQUEST_STATE_CANCEL_PARTIAL
2300 * is not allowed here, as that'd mean we'd half
2301 * written the cancellation request out to the
2302 * socket, and then decided to abandon it.
2303 *
2304 * That'd leave the socket in an unusable state.
2305 */
2308 break;
2309
2310 default:
2311 break;
2312 }
2313}
2314
2315/** If the trunk request is freed then update the target requests
2316 *
2317 * gperftools showed calling the request free function directly was slightly faster
2318 * than using talloc_free.
2319 *
2320 * @param[in] treq_to_free request.
2321 */
2323{
2324 trunk_request_t *treq = *treq_to_free;
2325 trunk_t *trunk = treq->pub.trunk;
2326
2327 if (unlikely(!treq)) return;
2328
2329 /*
2330 * The only valid states a trunk request can be
2331 * freed from.
2332 */
2333 switch (treq->pub.state) {
2339 break;
2340
2341 default:
2342 if (!fr_cond_assert(0)) return;
2343 }
2344
2345 /*
2346 * Zero out the pointer to prevent double frees
2347 */
2348 *treq_to_free = NULL;
2349
2350 /*
2351 * Call the API client callback to free
2352 * any associated memory.
2353 */
2354 DO_REQUEST_FREE(treq);
2355
2356 /*
2357 * Update the last above/below target stats
2358 * We only do this when we alloc or free
2359 * connections, or on connection
2360 * state changes.
2361 */
2362 trunk_requests_per_connection(NULL, NULL, treq->pub.trunk, fr_time(), false);
2363
2364 /*
2365 * This tracks the total number of requests
2366 * allocated and not freed or returned to
2367 * the free list.
2368 */
2369 if (fr_cond_assert(trunk->pub.req_alloc > 0)) trunk->pub.req_alloc--;
2370
2371 /*
2372 * No cleanup delay, means cleanup immediately
2373 */
2376
2377#ifndef NDEBUG
2378 /*
2379 * Ensure anything parented off the treq
2380 * is freed. We do this to trigger
2381 * the destructors for the log entries.
2382 */
2383 talloc_free_children(treq);
2384
2385 /*
2386 * State log should now be empty as entries
2387 * remove themselves from the dlist
2388 * on free.
2389 */
2391 "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2392#endif
2393
2394 talloc_free(treq);
2395 return;
2396 }
2397
2398 /*
2399 * Ensure anything parented off the treq
2400 * is freed.
2401 */
2402 talloc_free_children(treq);
2403
2404#ifndef NDEBUG
2405 /*
2406 * State log should now be empty as entries
2407 * remove themselves from the dlist
2408 * on free.
2409 */
2411 "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2412#endif
2413
2414 /*
2415 *
2416 * Return the trunk request back to the init state.
2417 */
2418 *treq = (trunk_request_t){
2419 .pub = {
2421 .trunk = treq->pub.trunk,
2422 },
2423 .cancel_reason = TRUNK_CANCEL_REASON_NONE,
2424 .last_freed = fr_time(),
2425#ifndef NDEBUG
2426 .log = treq->log /* Keep the list head, to save reinitialisation */
2427#endif
2428 };
2429
2430 /*
2431 * Insert at the head, so that we can free
2432 * requests that have been unused for N
2433 * seconds from the tail.
2434 */
2435 fr_dlist_insert_tail(&trunk->free_requests, treq);
2436}
2437
2438/** Actually free the trunk request
2439 *
2440 */
2442{
2443 trunk_t *trunk = treq->pub.trunk;
2444
2445 switch (treq->pub.state) {
2448 break;
2449
2450 default:
2451 fr_assert(0);
2452 break;
2453 }
2454
2455 fr_dlist_remove(&trunk->free_requests, treq);
2456
2457 return 0;
2458}
2459
2460/** (Pre-)Allocate a new trunk request
2461 *
2462 * If trunk->conf.req_pool_headers or trunk->conf.req_pool_size are not zero then the
2463 * request will be a talloc pool, which can be used to hold the preq.
2464 *
2465 * @note Do not use MEM to check the result of this allocated as it may fail for
2466 * non-fatal reasons.
2467 *
2468 * @param[in] trunk to add request to.
2469 * @param[in] request to wrap in a trunk request (treq).
2470 * @return
2471 * - A newly allocated request.
2472 * - NULL if too many requests are allocated.
2473 */
2475{
2476 trunk_request_t *treq;
2477
2478 /*
2479 * The number of treqs currently allocated
2480 * exceeds the maximum number allowed.
2481 */
2482 if (trunk->conf.max_req_per_conn && trunk->conf.max) {
2483 uint64_t limit;
2484
2485 limit = (uint64_t) trunk->conf.max_req_per_conn * trunk->conf.max;
2486 if (trunk->pub.req_alloc >= (limit + trunk->conf.max_backlog)) {
2488 RWARN, WARN, "Refusing to alloc requests - "
2489 "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
2490 "plus %u backlog requests reached",
2491 limit, trunk->conf.max, trunk->conf.max_req_per_conn,
2492 trunk->conf.max_backlog);
2493 return NULL;
2494 }
2495 }
2496
2497 /*
2498 * Allocate or reuse an existing request
2499 */
2500 treq = fr_dlist_head(&trunk->free_requests);
2501 if (treq) {
2502 fr_dlist_remove(&trunk->free_requests, treq);
2504 fr_assert(treq->pub.trunk == trunk);
2505 fr_assert(treq->pub.tconn == NULL);
2508 trunk->pub.req_alloc_reused++;
2509 } else {
2511 trunk->conf.req_pool_headers, trunk->conf.req_pool_size));
2512 talloc_set_destructor(treq, _trunk_request_free);
2513
2514 *treq = (trunk_request_t){
2515 .pub = {
2517 .trunk = trunk
2518 },
2519 .cancel_reason = TRUNK_CANCEL_REASON_NONE
2520 };
2521 trunk->pub.req_alloc_new++;
2522#ifndef NDEBUG
2524#endif
2525 }
2526
2527 trunk->pub.req_alloc++;
2529 /* heap_id - initialised when treq inserted into pending */
2530 /* list - empty */
2531 /* preq - populated later */
2532 /* rctx - populated later */
2533 treq->pub.request = request;
2534
2535 return treq;
2536}
2537
2538/** Enqueue a request that needs data written to the trunk
2539 *
2540 * When a request_t * needs to make an asynchronous request to an external datastore
2541 * it should call this function, specifying a preq (protocol request) containing
2542 * the data necessary to request information from the external datastore, and an
2543 * rctx (resume ctx) used to hold the decoded response and/or any error codes.
2544 *
2545 * After a treq is successfully enqueued it will either be assigned immediately
2546 * to the pending queue of a connection, or if no connections are available,
2547 * (depending on the trunk configuration) the treq will be placed in the trunk's
2548 * global backlog.
2549 *
2550 * After receiving a positive return code from this function the caller should
2551 * immediately yield, to allow the various timers and I/O handlers that drive tconn
2552 * (trunk connection) and treq state changes to be called.
2553 *
2554 * When a tconn becomes writable (or the trunk is configured to be always writable)
2555 * the #trunk_request_mux_t callback will be called to dequeue, encode and
2556 * send any pending requests for that tconn. The #trunk_request_mux_t callback
2557 * is also responsible for tracking the outbound requests to allow the
2558 * #trunk_request_demux_t callback to match inbound responses with the original
2559 * treq. Once the #trunk_request_mux_t callback is done processing the treq
2560 * it signals what state the treq should enter next using one of the
2561 * trunk_request_signal_* functions.
2562 *
2563 * When a tconn becomes readable the user specified #trunk_request_demux_t
2564 * callback is called to process any responses, match them with the original treq.
2565 * and signal what state they should enter next using one of the
2566 * trunk_request_signal_* functions.
2567 *
2568 * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2569 * is NULL, a new treq will be allocated.
2570 * Otherwise treq should point to memory allocated
2571 * with trunk_request_alloc.
2572 * @param[in] trunk to enqueue request on.
2573 * @param[in] request to enqueue.
2574 * @param[in] preq Protocol request to write out. Will be freed when
2575 * treq is freed. Should ideally be parented by the
2576 * treq if possible.
2577 * Use #trunk_request_alloc for pre-allocation of
2578 * the treq.
2579 * @param[in] rctx The resume context to write any result to.
2580 * @return
2581 * - TRUNK_ENQUEUE_OK.
2582 * - TRUNK_ENQUEUE_IN_BACKLOG.
2583 * - TRUNK_ENQUEUE_NO_CAPACITY.
2584 * - TRUNK_ENQUEUE_DST_UNAVAILABLE
2585 * - TRUNK_ENQUEUE_FAIL
2586 */
2588 request_t *request, void *preq, void *rctx)
2589{
2590 trunk_connection_t *tconn = NULL;
2591 trunk_request_t *treq;
2592 trunk_enqueue_t ret;
2593
2594 if (!fr_cond_assert_msg(!IN_HANDLER(trunk),
2595 "%s cannot be called within a handler", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2596
2597 if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == TRUNK_REQUEST_STATE_INIT),
2598 "%s requests must be in \"init\" state", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2599
2600 /*
2601 * If delay_start was set, we may need
2602 * to insert the timer for the connection manager.
2603 */
2604 if (unlikely(!trunk->started)) {
2605 if (trunk_start(trunk) < 0) return TRUNK_ENQUEUE_FAIL;
2606 }
2607
2608 ret = trunk_request_check_enqueue(&tconn, trunk, request);
2609 switch (ret) {
2610 case TRUNK_ENQUEUE_OK:
2611 if (*treq_out) {
2612 treq = *treq_out;
2613 } else {
2614 *treq_out = treq = trunk_request_alloc(trunk, request);
2615 if (!treq) return TRUNK_ENQUEUE_FAIL;
2616 }
2617 treq->pub.preq = preq;
2618 treq->pub.rctx = rctx;
2619 if (trunk->conf.always_writable) {
2621 trunk_request_enter_pending(treq, tconn, true);
2624 } else {
2625 trunk_request_enter_pending(treq, tconn, true);
2626 }
2627 break;
2628
2630 if (*treq_out) {
2631 treq = *treq_out;
2632 } else {
2633 *treq_out = treq = trunk_request_alloc(trunk, request);
2634 if (!treq) return TRUNK_ENQUEUE_FAIL;
2635 }
2636 treq->pub.preq = preq;
2637 treq->pub.rctx = rctx;
2638 trunk_request_enter_backlog(treq, true);
2639 break;
2640
2641 default:
2642 /*
2643 * If a trunk request was provided
2644 * populate the preq and rctx fields
2645 * so that if it's freed with
2646 * trunk_request_free, the free
2647 * function works as intended.
2648 */
2649 if (*treq_out) {
2650 treq = *treq_out;
2651 treq->pub.preq = preq;
2652 treq->pub.rctx = rctx;
2653 }
2654 return ret;
2655 }
2656
2657 return ret;
2658}
2659
2660/** Re-enqueue a request on the same connection
2661 *
2662 * If the treq has been sent, we assume that we're being signalled to requeue
2663 * because something outside of the trunk API has determined that a retransmission
2664 * is required. The easiest way to perform that retransmission is to clean up
2665 * any tracking information for the request, and the requeue it for transmission.
2666 *
2667 * IF re-queueing fails, the request will enter the fail state. It should not be
2668 * accessed if this occurs.
2669 *
2670 * @param[in] treq to requeue (retransmit).
2671 * @return
2672 * - TRUNK_ENQUEUE_OK.
2673 * - TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2674 * - TRUNK_ENQUEUE_FAIL - Request isn't in a valid state to be reassigned.
2675 */
2677{
2678 trunk_connection_t *tconn = treq->pub.tconn; /* Existing conn */
2679
2680 if (!tconn) return TRUNK_ENQUEUE_FAIL;
2681
2682 if (!IS_PROCESSING(tconn)) {
2685 }
2686
2687 switch (treq->pub.state) {
2693 trunk_request_enter_pending(treq, tconn, false);
2694 if (treq->pub.trunk->conf.always_writable) {
2696 }
2698 break;
2699
2700 case TRUNK_REQUEST_STATE_BACKLOG: /* Do nothing.... */
2701 case TRUNK_REQUEST_STATE_PENDING: /* Do nothing.... */
2702 break;
2703
2704 default:
2706 return TRUNK_ENQUEUE_FAIL;
2707 }
2708
2709 return TRUNK_ENQUEUE_OK;
2710}
2711
2712/** Enqueue additional requests on a specific connection
2713 *
2714 * This may be used to create a series of requests on a single connection, or to generate
2715 * in-band status checks.
2716 *
2717 * @note If conf->always_writable, then the muxer will be called immediately. The caller
2718 * must be able to handle multiple calls to its muxer gracefully.
2719 *
2720 * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2721 * is NULL, a new treq will be allocated.
2722 * Otherwise treq should point to memory allocated
2723 * with trunk_request_alloc.
2724 * @param[in] tconn to enqueue request on.
2725 * @param[in] request to enqueue.
2726 * @param[in] preq Protocol request to write out. Will be freed when
2727 * treq is freed. Should ideally be parented by the
2728 * treq if possible.
2729 * Use #trunk_request_alloc for pre-allocation of
2730 * the treq.
2731 * @param[in] rctx The resume context to write any result to.
2732 * @param[in] ignore_limits Ignore max_req_per_conn. Useful to force status
2733 * checks through even if the connection is at capacity.
2734 * Will also allow enqueuing on "inactive", "draining",
2735 * "draining-to-free" connections.
2736 * @return
2737 * - TRUNK_ENQUEUE_OK.
2738 * - TRUNK_ENQUEUE_NO_CAPACITY - At max_req_per_conn_limit
2739 * - TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2740 */
2742 request_t *request, void *preq, void *rctx,
2743 bool ignore_limits)
2744{
2745 trunk_request_t *treq;
2746 trunk_t *trunk = tconn->pub.trunk;
2747
2748 if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == TRUNK_REQUEST_STATE_INIT),
2749 "%s requests must be in \"init\" state", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2750
2752
2753 /*
2754 * Limits check
2755 */
2756 if (!ignore_limits) {
2757 if (trunk->conf.max_req_per_conn &&
2760
2762 }
2763
2764 if (*treq_out) {
2765 treq = *treq_out;
2766 } else {
2767 MEM(*treq_out = treq = trunk_request_alloc(trunk, request));
2768 }
2769
2770 treq->pub.preq = preq;
2771 treq->pub.rctx = rctx;
2772 treq->bound_to_conn = true; /* Don't let the request be transferred */
2773
2774 if (trunk->conf.always_writable) {
2776 trunk_request_enter_pending(treq, tconn, true);
2779 } else {
2780 trunk_request_enter_pending(treq, tconn, true);
2781 }
2782
2783 return TRUNK_ENQUEUE_OK;
2784}
2785
2786#ifndef NDEBUG
2787/** Used for sanity checks to ensure all log entries have been freed
2788 *
2789 */
2791{
2792 fr_dlist_remove(slog->log_head, slog);
2793
2794 return 0;
2795}
2796
2797void trunk_request_state_log_entry_add(char const *function, int line,
2799{
2800 trunk_request_state_log_t *slog = NULL;
2801
2803 slog = fr_dlist_head(&treq->log);
2804 fr_assert_msg(slog, "slog list head NULL but element counter was %u",
2805 fr_dlist_num_elements(&treq->log));
2806 (void)fr_dlist_remove(&treq->log, slog); /* Returns NULL when removing the list head */
2807 memset(slog, 0, sizeof(*slog));
2808 } else {
2809 MEM(slog = talloc_zero(treq, trunk_request_state_log_t));
2810 talloc_set_destructor(slog, _state_log_entry_free);
2811 }
2812
2813 slog->log_head = &treq->log;
2814 slog->from = treq->pub.state;
2815 slog->to = new;
2816 slog->function = function;
2817 slog->line = line;
2818 if (treq->pub.tconn) {
2819 slog->tconn = treq->pub.tconn;
2820 slog->tconn_id = treq->pub.tconn->pub.conn->id;
2821 slog->tconn_state = treq->pub.tconn->pub.state;
2822 }
2823
2824 fr_dlist_insert_tail(&treq->log, slog);
2825
2826}
2827
2828void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line,
2829 trunk_request_t const *treq)
2830{
2831 trunk_request_state_log_t *slog = NULL;
2832
2833 int i;
2834
2835 for (slog = fr_dlist_head(&treq->log), i = 0;
2836 slog;
2837 slog = fr_dlist_next(&treq->log, slog), i++) {
2838 fr_log(log, log_type, file, line, "[%u] %s:%i - in conn %"PRIu64" in state %s - %s -> %s",
2839 i, slog->function, slog->line,
2840 slog->tconn_id,
2842 slog->tconn_state, "<INVALID>") : "none",
2843 fr_table_str_by_value(trunk_request_states, slog->from, "<INVALID>"),
2844 fr_table_str_by_value(trunk_request_states, slog->to, "<INVALID>"));
2845 }
2846}
2847#endif
2848
2849/** Return the count number of connections in the specified states
2850 *
2851 * @param[in] trunk to retrieve counts for.
2852 * @param[in] conn_state One or more #trunk_connection_state_t states or'd together.
2853 * @return The number of connections in the specified states.
2854 */
2856{
2857 uint16_t count = 0;
2858
2859 if (conn_state & TRUNK_CONN_INIT) count += fr_dlist_num_elements(&trunk->init);
2860 if (conn_state & TRUNK_CONN_CONNECTING) count += fr_dlist_num_elements(&trunk->connecting);
2861 if (conn_state & TRUNK_CONN_ACTIVE) count += fr_minmax_heap_num_elements(trunk->active);
2862 if (conn_state & TRUNK_CONN_FULL) count += fr_dlist_num_elements(&trunk->full);
2863 if (conn_state & TRUNK_CONN_INACTIVE) count += fr_dlist_num_elements(&trunk->inactive);
2865 if (conn_state & TRUNK_CONN_CLOSED) count += fr_dlist_num_elements(&trunk->closed);
2866 if (conn_state & TRUNK_CONN_DRAINING) count += fr_dlist_num_elements(&trunk->draining);
2868
2869 return count;
2870}
2871
2872/** Return the count number of requests associated with a trunk connection
2873 *
2874 * @param[in] tconn to return request count for.
2875 * @param[in] req_state One or more request states or'd together.
2876 *
2877 * @return The number of requests in the specified states, associated with a tconn.
2878 */
2880{
2881 uint32_t count = 0;
2882
2884 if (req_state & TRUNK_REQUEST_STATE_PARTIAL) count += tconn->partial ? 1 : 0;
2885 if (req_state & TRUNK_REQUEST_STATE_SENT) count += fr_dlist_num_elements(&tconn->sent);
2887 if (req_state & TRUNK_REQUEST_STATE_CANCEL) count += fr_dlist_num_elements(&tconn->cancel);
2888 if (req_state & TRUNK_REQUEST_STATE_CANCEL_PARTIAL) count += tconn->cancel_partial ? 1 : 0;
2890
2891 return count;
2892}
2893
2894/** Automatically mark a connection as inactive
2895 *
2896 * @param[in] tconn to potentially mark as inactive.
2897 */
2899{
2900 trunk_t *trunk = tconn->pub.trunk;
2902
2903 if (tconn->pub.state != TRUNK_CONN_ACTIVE) return;
2904
2905 /*
2906 * Enforces max_req_per_conn
2907 */
2908 if (trunk->conf.max_req_per_conn > 0) {
2911 }
2912}
2913
2914/** Return whether a trunk connection should currently be considered full
2915 *
2916 * @param[in] tconn to check.
2917 * @return
2918 * - true if the connection is full.
2919 * - false if the connection is not full.
2920 */
2922{
2923 trunk_t *trunk = tconn->pub.trunk;
2925
2926 /*
2927 * Enforces max_req_per_conn
2928 */
2930 if ((trunk->conf.max_req_per_conn == 0) || (count < trunk->conf.max_req_per_conn)) return false;
2931
2932 return true;
2933}
2934
2935/** Automatically mark a connection as active or reconnect it
2936 *
2937 * @param[in] tconn to potentially mark as active or reconnect.
2938 */
2940{
2941 if (tconn->pub.state != TRUNK_CONN_FULL) return;
2942
2943 /*
2944 * Enforces max_req_per_conn
2945 */
2947}
2948
2949/** A connection is readable. Call the request_demux function to read pending requests
2950 *
2951 */
2953{
2954 trunk_t *trunk = tconn->pub.trunk;
2955
2956 DO_REQUEST_DEMUX(tconn);
2957}
2958
2959/** A connection is writable. Call the request_mux function to write pending requests
2960 *
2961 */
2963{
2964 trunk_t *trunk = tconn->pub.trunk;
2965
2966 /*
2967 * Call the cancel_sent function (if we have one)
2968 * to inform a backend datastore we no longer
2969 * care about the result
2970 */
2974 DO_REQUEST_CANCEL_MUX(tconn);
2975 }
2979 DO_REQUEST_MUX(tconn);
2980}
2981
2982/** Update the registrations for I/O events we're interested in
2983 *
2984 */
2986{
2987 trunk_t *trunk = tconn->pub.trunk;
2989
2990 switch (tconn->pub.state) {
2991 /*
2992 * We only register I/O events if the trunk connection is
2993 * in one of these states.
2994 *
2995 * For the other states the trunk shouldn't be processing
2996 * requests.
2997 */
2998 case TRUNK_CONN_ACTIVE:
2999 case TRUNK_CONN_FULL:
3004 /*
3005 * If the connection is always writable,
3006 * then we don't care about write events.
3007 */
3008 if (!trunk->conf.always_writable &&
3012 (trunk->funcs.request_cancel_mux ?
3016 }
3017
3020 (trunk->funcs.request_cancel_mux ?
3023 }
3024 break;
3025
3026 default:
3027 break;
3028 }
3029
3030 if (tconn->events != events) {
3031 /*
3032 * There may be a fatal error which results
3033 * in the connection being freed.
3034 *
3035 * Stop that from happening until after
3036 * we're done using it.
3037 */
3040 tconn->events = events;
3042 }
3043}
3044
3045/** Remove a trunk connection from whichever list it's currently in
3046 *
3047 * @param[in] tconn to remove.
3048 */
3050{
3051 trunk_t *trunk = tconn->pub.trunk;
3052
3053 switch (tconn->pub.state) {
3054 case TRUNK_CONN_ACTIVE:
3055 {
3056 int ret;
3057
3058 ret = fr_minmax_heap_extract(trunk->active, tconn);
3059 if (!fr_cond_assert_msg(ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) return;
3060 }
3061 return;
3062
3063 case TRUNK_CONN_INIT:
3064 fr_dlist_remove(&trunk->init, tconn);
3065 break;
3066
3068 fr_dlist_remove(&trunk->connecting, tconn);
3069 return;
3070
3071 case TRUNK_CONN_CLOSED:
3072 fr_dlist_remove(&trunk->closed, tconn);
3073 return;
3074
3075 case TRUNK_CONN_FULL:
3076 fr_dlist_remove(&trunk->full, tconn);
3077 return;
3078
3080 fr_dlist_remove(&trunk->inactive, tconn);
3081 return;
3082
3084 fr_dlist_remove(&trunk->inactive_draining, tconn);
3085 return;
3086
3088 fr_dlist_remove(&trunk->draining, tconn);
3089 return;
3090
3092 fr_dlist_remove(&trunk->draining_to_free, tconn);
3093 return;
3094
3095 case TRUNK_CONN_HALTED:
3096 return;
3097 }
3098}
3099
3100/** Transition a connection to the full state
3101 *
3102 * Called whenever a trunk connection is at the maximum number of requests.
3103 * Removes the connection from the connected heap, and places it in the full list.
3104 */
3106{
3107 trunk_t *trunk = tconn->pub.trunk;
3108
3109 switch (tconn->pub.state) {
3110 case TRUNK_CONN_ACTIVE:
3112 break;
3113
3114 default:
3116 }
3117
3118 fr_dlist_insert_head(&trunk->full, tconn);
3120}
3121
3122/** Transition a connection to the inactive state
3123 *
3124 * Called whenever the API client wants to stop new requests being enqueued
3125 * on a trunk connection.
3126 */
3128{
3129 trunk_t *trunk = tconn->pub.trunk;
3130
3131 switch (tconn->pub.state) {
3132 case TRUNK_CONN_ACTIVE:
3133 case TRUNK_CONN_FULL:
3135 break;
3136
3137 default:
3139 }
3140
3141 fr_dlist_insert_head(&trunk->inactive, tconn);
3143}
3144
3145/** Transition a connection to the inactive-draining state
3146 *
3147 * Called whenever the trunk manager wants to drain an inactive connection
3148 * of its requests.
3149 */
3151{
3152 trunk_t *trunk = tconn->pub.trunk;
3153
3154 switch (tconn->pub.state) {
3158 break;
3159
3160 default:
3162 }
3163
3166
3167 /*
3168 * Immediately re-enqueue all pending
3169 * requests, so the connection is drained
3170 * quicker.
3171 */
3173}
3174
3175/** Transition a connection to the draining state
3176 *
3177 * Removes the connection from the active heap so it won't be assigned any new
3178 * connections.
3179 */
3181{
3182 trunk_t *trunk = tconn->pub.trunk;
3183
3184 switch (tconn->pub.state) {
3185 case TRUNK_CONN_ACTIVE:
3186 case TRUNK_CONN_FULL:
3190 break;
3191
3192 default:
3194 }
3195
3196 fr_dlist_insert_head(&trunk->draining, tconn);
3198
3199 /*
3200 * Immediately re-enqueue all pending
3201 * requests, so the connection is drained
3202 * quicker.
3203 */
3205}
3206
3207/** Transition a connection to the draining-to-reconnect state
3208 *
3209 * Removes the connection from the active heap so it won't be assigned any new
3210 * connections.
3211 */
3213{
3214 trunk_t *trunk = tconn->pub.trunk;
3215
3216 if (tconn->lifetime_ev) fr_event_timer_delete(&tconn->lifetime_ev);
3217
3218 switch (tconn->pub.state) {
3219 case TRUNK_CONN_ACTIVE:
3220 case TRUNK_CONN_FULL:
3225 break;
3226
3227 default:
3229 }
3230
3231 fr_dlist_insert_head(&trunk->draining_to_free, tconn);
3233
3234 /*
3235 * Immediately re-enqueue all pending
3236 * requests, so the connection is drained
3237 * quicker.
3238 */
3240}
3241
3242
3243/** Transition a connection back to the active state
3244 *
3245 * This should only be called on a connection which is in the full state,
3246 * inactive state, draining state or connecting state.
3247 */
3249{
3250 trunk_t *trunk = tconn->pub.trunk;
3251 int ret;
3252
3253 switch (tconn->pub.state) {
3254 case TRUNK_CONN_FULL:
3259 break;
3260
3261 case TRUNK_CONN_INIT:
3265 break;
3266
3267 default:
3269 }
3270
3271 ret = fr_minmax_heap_insert(trunk->active, tconn); /* re-insert into the active heap*/
3272 if (!fr_cond_assert_msg(ret == 0, "Failed inserting connection into active heap: %s", fr_strerror())) {
3274 return;
3275 }
3276
3278
3279 /*
3280 * Reorder the connections
3281 */
3282 CONN_REORDER(tconn);
3283
3284 /*
3285 * Rebalance requests
3286 */
3287 trunk_rebalance(trunk);
3288
3289 /*
3290 * We place requests into the backlog
3291 * because there were no connections
3292 * available to handle them.
3293 *
3294 * If a connection has become active
3295 * chances are those backlogged requests
3296 * can now be enqueued, so try and do
3297 * that now.
3298 *
3299 * If there's requests sitting in the
3300 * backlog indefinitely, it's because
3301 * they were inserted there erroneously
3302 * when there were active connections
3303 * which could have handled them.
3304 */
3305 trunk_backlog_drain(trunk);
3306}
3307
3308/** Connection transitioned to the the init state
3309 *
3310 * Reflect the connection state change in the lists we use to track connections.
3311 *
3312 * @note This function is only called from the connection API as a watcher.
3313 *
3314 * @param[in] conn The connection which changes state.
3315 * @param[in] prev The connection is was in.
3316 * @param[in] state The connection is now in.
3317 * @param[in] uctx The trunk_connection_t wrapping the connection.
3318 */
3322 void *uctx)
3323{
3324 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3325 trunk_t *trunk = tconn->pub.trunk;
3326
3327 switch (tconn->pub.state) {
3328 case TRUNK_CONN_HALTED:
3329 break;
3330
3331 case TRUNK_CONN_CLOSED:
3333 break;
3334
3335 default:
3337 }
3338
3339 fr_dlist_insert_head(&trunk->init, tconn);
3341}
3342
3343/** Connection transitioned to the connecting state
3344 *
3345 * Reflect the connection state change in the lists we use to track connections.
3346 *
3347 * @note This function is only called from the connection API as a watcher.
3348 *
3349 * @param[in] conn The connection which changes state.
3350 * @param[in] prev The connection is was in.
3351 * @param[in] state The connection is now in.
3352 * @param[in] uctx The trunk_connection_t wrapping the connection.
3353 */
3357 void *uctx)
3358{
3359 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3360 trunk_t *trunk = tconn->pub.trunk;
3361
3362 switch (tconn->pub.state) {
3363 case TRUNK_CONN_INIT:
3364 case TRUNK_CONN_CLOSED:
3366 break;
3367
3368 default:
3370 }
3371
3372 /*
3373 * If a connection just entered the
3374 * connecting state, it should have
3375 * no requests associated with it.
3376 */
3378
3379 fr_dlist_insert_head(&trunk->connecting, tconn); /* MUST remain a head insertion for reconnect logic */
3381}
3382
3383/** Connection transitioned to the shutdown state
3384 *
3385 * If we're not already in the draining-to-free state, transition there now.
3386 *
3387 * The idea is that if something signalled the connection to shutdown, we need
3388 * to reflect that by dequeuing any pending requests, not accepting new ones,
3389 * and waiting for the existing requests to complete.
3390 *
3391 * @note This function is only called from the connection API as a watcher.
3392 *
3393 * @param[in] conn The connection which changes state.
3394 * @param[in] prev The connection is was in.
3395 * @param[in] state The connection is now in.
3396 * @param[in] uctx The trunk_connection_t wrapping the connection.
3397 */
3401 void *uctx)
3402{
3403 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3404
3405 switch (tconn->pub.state) {
3406 case TRUNK_CONN_DRAINING_TO_FREE: /* Do Nothing */
3407 return;
3408
3409 case TRUNK_CONN_ACTIVE: /* Transition to draining-to-free */
3410 case TRUNK_CONN_FULL:
3414 break;
3415
3416 case TRUNK_CONN_INIT:
3418 case TRUNK_CONN_CLOSED:
3419 case TRUNK_CONN_HALTED:
3421 }
3422
3424}
3425
3426/** Trigger a reconnection of the trunk connection
3427 *
3428 * @param[in] el Event list the timer was inserted into.
3429 * @param[in] now Current time.
3430 * @param[in] uctx The tconn.
3431 */
3433{
3434 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3435
3437}
3438
3439/** Connection transitioned to the connected state
3440 *
3441 * Reflect the connection state change in the lists we use to track connections.
3442 *
3443 * @note This function is only called from the connection API as a watcher.
3444 *
3445 * @param[in] conn The connection which changes state.
3446 * @param[in] prev The connection is was in.
3447 * @param[in] state The connection is now in.
3448 * @param[in] uctx The trunk_connection_t wrapping the connection.
3449 */
3453 void *uctx)
3454{
3455 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3456 trunk_t *trunk = tconn->pub.trunk;
3457
3458 /*
3459 * If a connection was just connected, it should only
3460 * have a pending list of requests. This state is found
3461 * in the rlm_radius module, which starts a new trunk,
3462 * and then immediately enqueues a request onto it. The
3463 * alternative for rlm_radius is to keep it's own queue
3464 * of pending requests before the trunk is fully
3465 * initialized. And then enqueue them onto the trunk
3466 * when the trunk is connected.
3467 *
3468 * It's instead easier (and makes more sense) to allow
3469 * the trunk to accept packets into its queue. If there
3470 * are no connections within a period of time, then the
3471 * requests will retry, or will time out.
3472 */
3474
3475 /*
3476 * Set here, as the active state can
3477 * be transitioned to from full and
3478 * draining too.
3479 */
3480 trunk->pub.last_connected = fr_time();
3481
3482 /*
3483 * Insert a timer to reconnect the
3484 * connection periodically.
3485 */
3486 if (fr_time_delta_ispos(trunk->conf.lifetime)) {
3487 if (fr_event_timer_in(tconn, trunk->el, &tconn->lifetime_ev,
3488 trunk->conf.lifetime, _trunk_connection_lifetime_expire, tconn) < 0) {
3489 PERROR("Failed inserting connection reconnection timer event, halting connection");
3491 return;
3492 }
3493 }
3494
3496}
3497
3498/** Connection failed after it was connected
3499 *
3500 * Reflect the connection state change in the lists we use to track connections.
3501 *
3502 * @note This function is only called from the connection API as a watcher.
3503 *
3504 * @param[in] conn The connection which changes state.
3505 * @param[in] prev The connection is was in.
3506 * @param[in] state The connection is now in.
3507 * @param[in] uctx The trunk_connection_t wrapping the connection.
3508 */
3512 void *uctx)
3513{
3514 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3515 trunk_t *trunk = tconn->pub.trunk;
3516 bool need_requeue = false;
3517
3518 switch (tconn->pub.state) {
3519 case TRUNK_CONN_ACTIVE:
3520 case TRUNK_CONN_FULL:
3525 need_requeue = true;
3527 break;
3528
3529 case TRUNK_CONN_INIT: /* Initialisation failed */
3533 break;
3534
3535 case TRUNK_CONN_CLOSED:
3536 case TRUNK_CONN_HALTED: /* Can't move backwards? */
3538 }
3539
3540 fr_dlist_insert_head(&trunk->closed, tconn); /* MUST remain a head insertion for reconnect logic */
3542
3543 /*
3544 * Now *AFTER* the connection has been
3545 * removed from the active, pool
3546 * re-enqueue the requests.
3547 */
3548 if (need_requeue) trunk_connection_requests_requeue_priv(tconn, TRUNK_REQUEST_STATE_ALL, 0, true);
3549
3550 /*
3551 * There should be no requests left on this
3552 * connection. They should have all been
3553 * moved off or failed.
3554 */
3556
3557 /*
3558 * Clear statistics and flags
3559 */
3560 tconn->sent_count = 0;
3561
3562 /*
3563 * Remove the reconnect event
3564 */
3566
3567 /*
3568 * Remove the I/O events
3569 */
3571}
3572
3573/** Connection failed
3574 *
3575 * @param[in] conn The connection which changes state.
3576 * @param[in] prev The connection is was in.
3577 * @param[in] state The connection is now in.
3578 * @param[in] uctx The trunk_connection_t wrapping the connection.
3579 */
3581 connection_state_t prev,
3583 void *uctx)
3584{
3585 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3586 trunk_t *trunk = tconn->pub.trunk;
3587
3588 /*
3589 * Need to set this first as it
3590 * determines whether requests are
3591 * re-queued or fail outright.
3592 */
3593 trunk->pub.last_failed = fr_time();
3594
3595 /*
3596 * Failed in the init state, transition the
3597 * connection to closed, else we get an
3598 * INIT -> INIT transition which triggers
3599 * an assert.
3600 */
3601 if (prev == CONNECTION_STATE_INIT) _trunk_connection_on_closed(conn, prev, state, uctx);
3602
3603 /*
3604 * See what the state of the trunk is
3605 * if there are no connections that could
3606 * potentially accept requests in the near
3607 * future, then fail all the requests in the
3608 * trunk backlog.
3609 */
3615}
3616
3617/** Connection transitioned to the halted state
3618 *
3619 * Remove the connection remove all lists, as it's likely about to be freed.
3620 *
3621 * Setting the trunk back to the init state ensures that if the code is ever
3622 * refactored and #connection_signal_reconnect is used after a connection
3623 * is halted, then everything is maintained in a valid state.
3624 *
3625 * @note This function is only called from the connection API as a watcher.
3626 *
3627 * @param[in] conn The connection which changes state.
3628 * @param[in] prev The connection is was in.
3629 * @param[in] state The connection is now in.
3630 * @param[in] uctx The trunk_connection_t wrapping the connection.
3631 */
3635 void *uctx)
3636{
3637 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3638 trunk_t *trunk = tconn->pub.trunk;
3639
3640 switch (tconn->pub.state) {
3641 case TRUNK_CONN_INIT:
3642 case TRUNK_CONN_CLOSED:
3644 break;
3645
3646 default:
3648 }
3649
3650 /*
3651 * It began life in the halted state,
3652 * and will end life in the halted state.
3653 */
3655
3656 /*
3657 * There should be no requests left on this
3658 * connection. They should have all been
3659 * moved off or failed.
3660 */
3662
3663 /*
3664 * And free the connection...
3665 */
3666 if (trunk->in_handler) {
3667 /*
3668 * ...later.
3669 */
3670 fr_dlist_insert_tail(&trunk->to_free, tconn);
3671 return;
3672 }
3673 talloc_free(tconn);
3674}
3675
3676/** Free a connection
3677 *
3678 * Enforces orderly free order of children of the tconn
3679 */
3681{
3683 fr_assert(!fr_dlist_entry_in_list(&tconn->entry)); /* Should not be in a list */
3684
3685 /*
3686 * Loop over all the requests we gathered
3687 * and transition them to the failed state,
3688 * freeing them.
3689 *
3690 * Usually, requests will be re-queued when
3691 * a connection enters the closed state,
3692 * but in this case because the whole trunk
3693 * is being freed, we don't bother, and
3694 * just signal to the API client that the
3695 * requests failed.
3696 */
3697 if (tconn->pub.trunk->freeing) {
3698 fr_dlist_head_t to_fail;
3699 trunk_request_t *treq = NULL;
3700
3701 fr_dlist_talloc_init(&to_fail, trunk_request_t, entry);
3702
3703 /*
3704 * Remove requests from this connection
3705 */
3707 while ((treq = fr_dlist_next(&to_fail, treq))) {
3708 trunk_request_t *prev;
3709
3710 prev = fr_dlist_remove(&to_fail, treq);
3712 treq = prev;
3713 }
3714 }
3715
3716 /*
3717 * Ensure we're not signalled by the connection
3718 * as it processes its backlog of state changes,
3719 * as we are about to be freed.
3720 */
3728
3729 /*
3730 * This may return -1, indicating the free was deferred
3731 * this is fine. It just means the conn will be freed
3732 * after all the handlers have exited.
3733 */
3734 (void)talloc_free(tconn->pub.conn);
3735 tconn->pub.conn = NULL;
3736
3737 return 0;
3738}
3739
3740/** Attempt to spawn a new connection
3741 *
3742 * Calls the API client's alloc() callback to create a new connection_t,
3743 * then inserts the connection into the 'connecting' list.
3744 *
3745 * @param[in] trunk to spawn connection in.
3746 * @param[in] now The current time.
3747 */
3749{
3750 trunk_connection_t *tconn;
3751
3752
3753 /*
3754 * Call the API client's callback to create
3755 * a new connection_t.
3756 */
3757 MEM(tconn = talloc_zero(trunk, trunk_connection_t));
3758 tconn->pub.trunk = trunk;
3759 tconn->pub.state = TRUNK_CONN_HALTED; /* All connections start in the halted state */
3760
3761 /*
3762 * Allocate a new connection_t or fail.
3763 */
3764 DO_CONNECTION_ALLOC(tconn);
3765
3767 fr_dlist_talloc_init(&tconn->sent, trunk_request_t, entry);
3771
3772 /*
3773 * OK, we have the connection, now setup watch
3774 * points so we know when it changes state.
3775 *
3776 * This lets us automatically move the tconn
3777 * between the different lists in the trunk
3778 * with minimum extra code.
3779 */
3781 _trunk_connection_on_init, false, tconn); /* Before init() has been called */
3782
3784 _trunk_connection_on_connecting, false, tconn); /* After init() has been called */
3785
3787 _trunk_connection_on_connected, false, tconn); /* After open() has been called */
3788
3790 _trunk_connection_on_closed, false, tconn); /* Before close() has been called */
3791
3793 _trunk_connection_on_failed, false, tconn); /* Before failed() has been called */
3794
3796 _trunk_connection_on_shutdown, false, tconn); /* After shutdown() has been called */
3797
3799 _trunk_connection_on_halted, false, tconn); /* About to be freed */
3800
3801 talloc_set_destructor(tconn, _trunk_connection_free);
3802
3803 connection_signal_init(tconn->pub.conn); /* annnnd GO! */
3804
3805 trunk->pub.last_open = now;
3806
3807 return 0;
3808}
3809
3810/** Pop a cancellation request off a connection's cancellation queue
3811 *
3812 * The request we return is advanced by the request moving out of the
3813 * cancel state and into the cancel_sent or cancel_complete state.
3814 *
3815 * One of these signalling functions must be called after the request
3816 * has been popped:
3817 *
3818 * - #trunk_request_signal_cancel_sent
3819 * The remote datastore has been informed, but we need to wait for acknowledgement.
3820 * The #trunk_request_demux_t callback must handle the acks calling
3821 * #trunk_request_signal_cancel_complete when an ack is received.
3822 *
3823 * - #trunk_request_signal_cancel_complete
3824 * The request was cancelled and we don't need to wait, clean it up immediately.
3825 *
3826 * @param[out] treq_out to process
3827 * @param[in] tconn Connection to drain cancellation request from.
3828 * @return
3829 * - 1 if no more requests.
3830 * - 0 if a new request was written to treq_out.
3831 * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3832 * memory or requests associated with the connection.
3833 * - -2 if called outside of the cancel muxer.
3834 */
3836{
3837 if (unlikely(tconn->pub.state == TRUNK_CONN_HALTED)) return -1;
3838
3840 "%s can only be called from within request_cancel_mux handler",
3841 __FUNCTION__)) return -2;
3842
3843 *treq_out = tconn->cancel_partial ? tconn->cancel_partial : fr_dlist_head(&tconn->cancel);
3844 if (!*treq_out) return 1;
3845
3846 return 0;
3847}
3848
3849/** Pop a request off a connection's pending queue
3850 *
3851 * The request we return is advanced by the request moving out of the partial or
3852 * pending states, when the mux function signals us.
3853 *
3854 * If the same request is returned again and again, it means the muxer isn't actually
3855 * doing anything with the request we returned, and it's and error in the muxer code.
3856 *
3857 * One of these signalling functions must be used after the request has been popped:
3858 *
3859 * - #trunk_request_signal_complete
3860 * The request was completed. Either we got a synchronous response, or we knew the
3861 * response without contacting an external server (cache).
3862 *
3863 * - #trunk_request_signal_fail
3864 * Failed muxing the request due to a permanent issue, i.e. an invalid request.
3865 *
3866 * - #trunk_request_signal_partial
3867 * Wrote part of a request. This request will be returned on the next call to this
3868 * function so that the request_mux function can finish writing it. Only useful
3869 * for stream type connections. Datagram type connections cannot have partial
3870 * writes.
3871 *
3872 * - #trunk_request_signal_sent Successfully sent a request.
3873 *
3874 * @param[out] treq_out to process
3875 * @param[in] tconn to pop a request from.
3876 * @return
3877 * - 1 if no more requests.
3878 * - 0 if a new request was written to treq_out.
3879 * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3880 * memory or requests associated with the connection.
3881 * - -2 if called outside of the muxer.
3882 */
3884{
3885 if (unlikely(tconn->pub.state == TRUNK_CONN_HALTED)) return -1;
3886
3888 "%s can only be called from within request_mux handler",
3889 __FUNCTION__)) return -2;
3890
3891 *treq_out = tconn->partial ? tconn->partial : fr_heap_peek(tconn->pending);
3892 if (!*treq_out) return 1;
3893
3894 return 0;
3895}
3896
3897/** Signal that a trunk connection is writable
3898 *
3899 * Should be called from the 'write' I/O handler to signal that requests can be enqueued.
3900 *
3901 * @param[in] tconn to signal.
3902 */
3904{
3905 trunk_t *trunk = tconn->pub.trunk;
3906
3907 if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3908 "%s cannot be called within a handler", __FUNCTION__)) return;
3909
3910 DEBUG3("[%" PRIu64 "] Signalled writable", tconn->pub.conn->id);
3911
3913}
3914
3915/** Signal that a trunk connection is readable
3916 *
3917 * Should be called from the 'read' I/O handler to signal that requests should be dequeued.
3918 *
3919 * @param[in] tconn to signal.
3920 */
3922{
3923 trunk_t *trunk = tconn->pub.trunk;
3924
3925 if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3926 "%s cannot be called within a handler", __FUNCTION__)) return;
3927
3928 DEBUG3("[%" PRIu64 "] Signalled readable", tconn->pub.conn->id);
3929
3931}
3932
3933/** Signal a trunk connection cannot accept more requests
3934 *
3935 * @param[in] tconn to signal.
3936 */
3938{
3939 /* Can be called anywhere */
3940
3941 switch (tconn->pub.state) {
3942 case TRUNK_CONN_ACTIVE:
3943 case TRUNK_CONN_FULL:
3945 break;
3946
3949 break;
3950
3951 default:
3952 return;
3953 }
3954}
3955
3956/** Signal a trunk connection is no longer full
3957 *
3958 * @param[in] tconn to signal.
3959 */
3961{
3962 switch (tconn->pub.state) {
3963 case TRUNK_CONN_FULL:
3964 trunk_connection_auto_unfull(tconn); /* Mark as active if it should be active */
3965 break;
3966
3968 /*
3969 * Do the appropriate state transition based on
3970 * how many requests the trunk connection is
3971 * currently servicing.
3972 */
3973 if (trunk_connection_is_full(tconn)) {
3975 break;
3976 }
3978 break;
3979
3980 /*
3981 * Unsetting the active flag just moves
3982 * the connection back to the normal
3983 * draining state.
3984 */
3985 case TRUNK_CONN_INACTIVE_DRAINING: /* Only an external signal can trigger this transition */
3987 break;
3988
3989 default:
3990 return;
3991 }
3992}
3993
3994/** Signal a trunk connection is no longer viable
3995 *
3996 * @param[in] tconn to signal.
3997 * @param[in] reason the connection is being reconnected.
3998 */
4003
4004/** Standard I/O read function
4005 *
4006 * Underlying FD in now readable, so call the trunk to read any pending requests
4007 * from this connection.
4008 *
4009 * @param[in] el The event list signalling.
4010 * @param[in] fd that's now readable.
4011 * @param[in] flags describing the read event.
4012 * @param[in] uctx The trunk connection handle (tconn).
4013 */
4015{
4016 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
4017
4019}
4020
4021/** Standard I/O write function
4022 *
4023 * Underlying FD is now writable, so call the trunk to write any pending requests
4024 * to this connection.
4025 *
4026 * @param[in] el The event list signalling.
4027 * @param[in] fd that's now writable.
4028 * @param[in] flags describing the write event.
4029 * @param[in] uctx The trunk connection handle (tcon).
4030 */
4032{
4033 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
4034
4036}
4037
4038
4039/** Returns true if the trunk connection is in one of the specified states
4040 *
4041 * @param[in] tconn To check state for.
4042 * @param[in] state to check
4043 * @return
4044 * - True if trunk connection is in a particular state.
4045 * - False if trunk connection is not in a particular state.
4046 */
4048{
4049 return (bool)(tconn->pub.state & state);
4050}
4051
4052/** Close connections in a particular connection list if they have no requests associated with them
4053 *
4054 * @param[in] trunk containing connections we want to close.
4055 * @param[in] head of list of connections to examine.
4056 */
4058{
4059 trunk_connection_t *tconn = NULL;
4060
4061 while ((tconn = fr_dlist_next(head, tconn))) {
4062 trunk_connection_t *prev;
4063
4065
4066 prev = fr_dlist_prev(head, tconn);
4067
4068 DEBUG3("Closing %s connection with no requests",
4070 /*
4071 * Close the connection as gracefully
4072 * as possible by signalling it should
4073 * shutdown.
4074 *
4075 * The connection, should, if serviced
4076 * correctly by the underlying library,
4077 * automatically transition to halted after
4078 * all pending reads/writes are
4079 * complete at which point we'll be informed
4080 * and free our tconn wrapper.
4081 */
4083 tconn = prev;
4084 }
4085}
4086
4087/** Rebalance connections across active trunk members when a new connection becomes active
4088 *
4089 * We don't have any visibility into the connection prioritisation algorithm
4090 * it's essentially a black box.
4091 *
4092 * We can however determine when the correct level of requests per connection
4093 * has been reached, by dequeuing and requeing requests up until the point
4094 * where the connection that just had a request dequeued, receives the same
4095 * request back.
4096 *
4097 * @param[in] trunk The trunk to rebalance.
4098 */
4099static void trunk_rebalance(trunk_t *trunk)
4100{
4102
4104
4105 /*
4106 * Only rebalance if the top and bottom of
4107 * the heap are not equal.
4108 */
4109 if (trunk->funcs.connection_prioritise(fr_minmax_heap_max_peek(trunk->active), head) == 0) return;
4110
4111 DEBUG3("Rebalancing requests");
4112
4113 /*
4114 * Keep requeuing requests from the connection
4115 * at the bottom of the heap until the
4116 * connection at the top is shifted from that
4117 * position.
4118 */
4119 while ((fr_minmax_heap_min_peek(trunk->active) == head) &&
4121 TRUNK_REQUEST_STATE_PENDING, 1, false));
4122}
4123
4124/** Implements the algorithm we use to manage requests per connection levels
4125 *
4126 * This is executed periodically using a timer event, and opens/closes
4127 * connections.
4128 *
4129 * The aim is to try and keep the request per connection level in a sweet spot,
4130 * where there's enough outstanding work for the connection/pipelining to work
4131 * efficiently, but not so much so that we encounter increased latency.
4132 *
4133 * In the request enqueue and dequeue functions we record every time the
4134 * average number of requests per connection goes above the target count
4135 * and record every time the average number of requests per connection goes
4136 * below the target count.
4137 *
4138 * This may sound expensive, but in all cases we're just summing counters.
4139 * CPU time required does not increase with additional requests, only with
4140 * large numbers of connections.
4141 *
4142 * If we do encounter scaling issues, we can always maintain the counters
4143 * as aggregates as an optimisation later.
4144 *
4145 * If when the management function runs, the trunk was above the target
4146 * most recently, we:
4147 * - Return if we've been in this state for a shorter period than 'open_delay'.
4148 * - Return if we're at max.
4149 * - Return if opening a new connection will take us below the load target.
4150 * - Return if we last opened a connection within 'open_delay'.
4151 * - Otherwise we attempt to open a new connection.
4152 *
4153 * If the trunk we below the target most recently, we:
4154 * - Return if we've been in this state for a shorter period than 'close_delay'.
4155 * - Return if we're at min.
4156 * - Return if we have no connections.
4157 * - Close a connection if min is 0, and we have no outstanding
4158 * requests. Then return.
4159 * - Return if closing a new connection will take us above the load target.
4160 * - Return if we last closed a connection within 'closed_delay'.
4161 * - Otherwise we move a connection to draining state.
4162 */
4163static void trunk_manage(trunk_t *trunk, fr_time_t now)
4164{
4165 trunk_connection_t *tconn = NULL;
4166 trunk_request_t *treq;
4167 uint32_t average = 0;
4168 uint32_t req_count;
4169 uint16_t conn_count;
4170 trunk_state_t new_state;
4171
4172 DEBUG4("Managing trunk");
4173
4174 /*
4175 * Cleanup requests in our request cache which
4176 * have been reapable for too long.
4177 */
4178 while ((treq = fr_dlist_tail(&trunk->free_requests)) &&
4180
4181 /*
4182 * If we have idle connections, then close them.
4183 */
4186 fr_time_t idle_cutoff = fr_time_sub(now, trunk->conf.idle_timeout);
4187
4188 for (tconn = fr_minmax_heap_iter_init(trunk->active, &iter);
4189 tconn;
4190 tconn = fr_minmax_heap_iter_next(trunk->active, &iter)) {
4191 /*
4192 * The connection has outstanding requests without replies, don't do anything.
4193 */
4194 if (fr_heap_num_elements(tconn->pending) > 0) continue;
4195
4196 /*
4197 * The connection was last active after the idle cutoff time, don't do anything.
4198 */
4199 if (fr_time_gt(tconn->pub.last_write_success, idle_cutoff)) continue;
4200
4201 /*
4202 * This connection has been inactive since before the idle timeout. Drain it,
4203 * and free it.
4204 */
4206 }
4207 }
4208
4209 /*
4210 * Free any connections which have drained
4211 * and we didn't reactivate during the last
4212 * round of management.
4213 */
4217
4218 /*
4219 * Process deferred connection freeing
4220 */
4221 if (!trunk->in_handler) {
4222 while ((tconn = fr_dlist_head(&trunk->to_free))) talloc_free(fr_dlist_remove(&trunk->to_free, tconn));
4223 }
4224
4225 /*
4226 * Update the state of the trunk
4227 */
4229 new_state = TRUNK_STATE_ACTIVE;
4230 } else {
4231 /*
4232 * INIT / CONNECTING / FULL mean connections will become active
4233 * so the trunk is PENDING
4234 */
4239 }
4240
4241 if (new_state != trunk->pub.state) TRUNK_STATE_TRANSITION(new_state);
4242
4243 /*
4244 * A trunk can be signalled to not proactively
4245 * manage connections if a destination is known
4246 * to be unreachable, and doing so would result
4247 * in spurious connections still being opened.
4248 *
4249 * We still run other connection management
4250 * functions and just short circuit the function
4251 * here.
4252 */
4253 if (!trunk->managing_connections) return;
4254
4255 /*
4256 * We're above the target requests per connection
4257 * spawn more connections!
4258 */
4260 /*
4261 * If connecting is provided, check we
4262 * wouldn't have too many connections in
4263 * the connecting state.
4264 *
4265 * This is a throttle in the case of transitory
4266 * load spikes, or a backend becoming
4267 * unavailable.
4268 */
4269 if ((trunk->conf.connecting > 0) &&
4271 trunk->conf.connecting)) {
4272 DEBUG4("Not opening connection - Too many (%u) connections in the connecting state",
4273 trunk->conf.connecting);
4274 return;
4275 }
4276
4277 trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4278
4279 /*
4280 * Only apply hysteresis if we have at least
4281 * one available connection.
4282 */
4283 if (conn_count && fr_time_gt(fr_time_add(trunk->pub.last_above_target, trunk->conf.open_delay), now)) {
4284 DEBUG4("Not opening connection - Need to be above target for %pVs. It's been %pVs",
4287 return; /* too soon */
4288 }
4289
4290 /*
4291 * We don't consider 'draining' connections
4292 * in the max calculation, as if we do
4293 * determine that we need to spawn a new
4294 * request, then we'd move all 'draining'
4295 * connections to active before spawning
4296 * any new connections.
4297 */
4298 if ((trunk->conf.max > 0) && (conn_count >= trunk->conf.max)) {
4299 DEBUG4("Not opening connection - Have %u connections, need %u or below",
4300 conn_count, trunk->conf.max);
4301 return;
4302 }
4303
4304 /*
4305 * We consider requests pending on all connections
4306 * and the trunk's backlog as that's the current count
4307 * load.
4308 */
4309 if (!req_count) {
4310 DEBUG4("Not opening connection - No outstanding requests");
4311 return;
4312 }
4313
4314 /*
4315 * Do the n+1 check, i.e. if we open one connection
4316 * will that take us below our target threshold.
4317 */
4318 if (conn_count > 0) {
4319 average = ROUND_UP_DIV(req_count, (conn_count + 1));
4320 if (average < trunk->conf.target_req_per_conn) {
4321 DEBUG4("Not opening connection - Would leave us below our target requests "
4322 "per connection (now %u, after open %u)",
4323 ROUND_UP_DIV(req_count, conn_count), average);
4324 return;
4325 }
4326 } else {
4327 (void)trunk_connection_spawn(trunk, now);
4328 return;
4329 }
4330
4331 /*
4332 * If we've got a connection in the draining list
4333 * move it back into the active list if we've
4334 * been requested to add a connection back in.
4335 */
4336 tconn = fr_dlist_head(&trunk->draining);
4337 if (tconn) {
4338 if (trunk_connection_is_full(tconn)) {
4340 } else {
4342 }
4343 return;
4344 }
4345
4346 /*
4347 * Implement delay if there's no connections that
4348 * could be immediately re-activated.
4349 */
4350 if (fr_time_gt(fr_time_add(trunk->pub.last_open, trunk->conf.open_delay), now)) {
4351 DEBUG4("Not opening connection - Need to wait %pVs before opening another connection. "
4352 "It's been %pVs",
4355 return;
4356 }
4357
4358 DEBUG4("Opening connection - Above target requests per connection (now %u, target %u)",
4359 ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4360 /* last_open set by trunk_connection_spawn */
4361 (void)trunk_connection_spawn(trunk, now);
4362 }
4363
4364 /*
4365 * We're below the target requests per connection.
4366 * Free some connections...
4367 */
4368 else if (fr_time_gt(trunk->pub.last_below_target, trunk->pub.last_above_target)) {
4369 if (fr_time_gt(fr_time_add(trunk->pub.last_below_target, trunk->conf.close_delay), now)) {
4370 DEBUG4("Not closing connection - Need to be below target for %pVs. It's been %pVs",
4373 return; /* too soon */
4374 }
4375
4376 trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4377
4378 if (!conn_count) {
4379 DEBUG4("Not closing connection - No connections to close!");
4380 return;
4381 }
4382
4383 if ((trunk->conf.min > 0) && ((conn_count - 1) < trunk->conf.min)) {
4384 DEBUG4("Not closing connection - Have %u connections, need %u or above",
4385 conn_count, trunk->conf.min);
4386 return;
4387 }
4388
4389 if (!req_count) {
4390 DEBUG4("Closing connection - No outstanding requests");
4391 goto close;
4392 }
4393
4394 /*
4395 * The minimum number of connections must be set
4396 * to zero for this to work.
4397 * min == 0, no requests, close all the connections.
4398 * This is useful for backup databases, when
4399 * maintaining the connection would lead to lots of
4400 * log file churn.
4401 */
4402 if (conn_count == 1) {
4403 DEBUG4("Not closing connection - Would leave connections "
4404 "and there are still %u outstanding requests", req_count);
4405 return;
4406 }
4407
4408 /*
4409 * Do the n-1 check, i.e. if we close one connection
4410 * will that take us above our target threshold.
4411 */
4412 average = ROUND_UP_DIV(req_count, (conn_count - 1));
4413 if (average > trunk->conf.target_req_per_conn) {
4414 DEBUG4("Not closing connection - Would leave us above our target requests per connection "
4415 "(now %u, after close %u)", ROUND_UP_DIV(req_count, conn_count), average);
4416 return;
4417 }
4418
4419 DEBUG4("Closing connection - Below target requests per connection (now %u, target %u)",
4420 ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4421
4422 close:
4423 if (fr_time_gt(fr_time_add(trunk->pub.last_closed, trunk->conf.close_delay), now)) {
4424 DEBUG4("Not closing connection - Need to wait %pVs before closing another connection. "
4425 "It's been %pVs",
4428 return;
4429 }
4430
4431 /*
4432 * If the last event on the trunk was a connection failure and
4433 * there is only one connection, this may well be a reconnect
4434 * attempt after a failure - and needs to persist otherwise
4435 * the last event will be a failure and no new connection will
4436 * be made, leading to no new requests being enqueued.
4437 */
4438 if (fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
4439 fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed) && (conn_count == 1)) {
4440 DEBUG4("Not closing remaining connection - last event was a failure");
4441 return;
4442 }
4443
4444 /*
4445 * Inactive connections get counted in the
4446 * set of viable connections, but are likely
4447 * to be congested or dead, so we drain
4448 * (and possibly eventually free) those first.
4449 */
4450 if ((tconn = fr_dlist_tail(&trunk->inactive))) {
4451 /*
4452 * If the connection has no requests associated
4453 * with it then immediately free.
4454 */
4456 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4457 } else {
4459 }
4460 /*
4461 * It is possible to have too may connecting
4462 * connections when the connections are
4463 * taking a while to open and the number
4464 * of requests decreases.
4465 */
4466 } else if ((tconn = fr_dlist_tail(&trunk->connecting))) {
4467 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4468
4469 /*
4470 * Finally if there are no "connecting"
4471 * connections to close, and no "inactive"
4472 * connections, start draining "active"
4473 * connections.
4474 */
4475 } else if ((tconn = fr_minmax_heap_max_peek(trunk->active))) {
4476 /*
4477 * If the connection has no requests associated
4478 * with it then immediately free.
4479 */
4481 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4482 } else {
4484 }
4485 }
4486
4487 trunk->pub.last_closed = now;
4488
4489
4490 return;
4491 }
4492}
4493
4494/** Event to periodically call the connection management function
4495 *
4496 * @param[in] el this event belongs to.
4497 * @param[in] now current time.
4498 * @param[in] uctx The trunk.
4499 */
4500static void _trunk_timer(fr_event_list_t *el, fr_time_t now, void *uctx)
4501{
4502 trunk_t *trunk = talloc_get_type_abort(uctx, trunk_t);
4503
4504 trunk_manage(trunk, now);
4505
4507 if (fr_event_timer_in(trunk, el, &trunk->manage_ev, trunk->conf.manage_interval,
4508 _trunk_timer, trunk) < 0) {
4509 PERROR("Failed inserting trunk management event");
4510 /* Not much we can do, hopefully the trunk will be freed soon */
4511 }
4512 }
4513}
4514
4515/** Return a count of requests on a connection in a specific state
4516 *
4517 * @param[in] trunk to retrieve counts for.
4518 * @param[in] conn_state One or more connection states or'd together.
4519 * @param[in] req_state One or more request states or'd together.
4520 * @return The number of requests in a particular state, on connection in a particular state.
4521 */
4522uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
4523{
4524 uint64_t count = 0;
4525 trunk_connection_t *tconn = NULL;
4527
4528#define COUNT_BY_STATE(_state, _list) \
4529do { \
4530 if (conn_state & (_state)) { \
4531 tconn = NULL; \
4532 while ((tconn = fr_dlist_next(&trunk->_list, tconn))) { \
4533 count += trunk_request_count_by_connection(tconn, req_state); \
4534 } \
4535 } \
4536} while (0)
4537
4538 if (conn_state & TRUNK_CONN_ACTIVE) {
4539 for (tconn = fr_minmax_heap_iter_init(trunk->active, &iter);
4540 tconn;
4541 tconn = fr_minmax_heap_iter_next(trunk->active, &iter)) {
4542 count += trunk_request_count_by_connection(tconn, req_state);
4543 }
4544 }
4545
4548 COUNT_BY_STATE(TRUNK_CONN_INACTIVE_DRAINING, inactive_draining);
4551
4553
4554 return count;
4555}
4556
4557/** Update timestamps for when we last had a transition from above target to below target or vice versa
4558 *
4559 * Should be called on every time a connection or request is allocated or freed.
4560 *
4561 * @param[out] conn_count_out How many connections we considered.
4562 * @param[out] req_count_out How many requests we considered.
4563 * @param[in] trunk to operate on.
4564 * @param[in] now The current time.
4565 * @param[in] verify if true (and this is a debug build), then assert if req_per_conn
4566 * has changed.
4567 * @return
4568 * - 0 if the average couldn't be calculated (no requests or no connections).
4569 * - The average number of requests per connection.
4570 */
4571static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_count_out,
4572 trunk_t *trunk, fr_time_t now,
4573 NDEBUG_UNUSED bool verify)
4574{
4575 uint32_t req_count = 0;
4576 uint16_t conn_count = 0;
4577 uint64_t req_per_conn = 0;
4578
4580
4581 /*
4582 * No need to update these as the trunk is being freed
4583 */
4584 if (trunk->freeing) goto done;
4585
4586 /*
4587 * Count all connections except draining and draining to free.
4588 *
4589 * Omitting these connection states artificially raises the
4590 * request to connection ratio, so that we can preemptively spawn
4591 * new connections.
4592 *
4593 * In the case of TRUNK_CONN_DRAINING | TRUNK_CONN_INACTIVE_DRAINING
4594 * the trunk management code has enough hysteresis to not
4595 * immediately reactivate the connection.
4596 *
4597 * In the case of TRUNK_CONN_DRAINING_TO_FREE the trunk
4598 * management code should spawn a new connection to takes its place.
4599 *
4600 * Connections placed in the DRAINING_TO_FREE state are being
4601 * closed preemptively to deal with bugs on the server we're
4602 * talking to, or misconfigured firewalls which are trashing
4603 * TCP/UDP connection states.
4604 */
4609
4610 /*
4611 * Requests on all connections
4612 */
4613 req_count = trunk_request_count_by_state(trunk,
4616
4617 /*
4618 * No connections, but we do have requests
4619 */
4620 if (conn_count == 0) {
4621 if ((req_count > 0) && (trunk->conf.target_req_per_conn > 0)) goto above_target;
4622 goto done;
4623 }
4624
4625 if (req_count == 0) {
4626 if (trunk->conf.target_req_per_conn > 0) goto below_target;
4627 goto done;
4628 }
4629
4630 /*
4631 * Calculate the req_per_conn
4632 */
4633 req_per_conn = ROUND_UP_DIV(req_count, conn_count);
4634 if (req_per_conn > trunk->conf.target_req_per_conn) {
4635 above_target:
4636 /*
4637 * Edge - Below target to above target (too many requests per conn - spawn more)
4638 *
4639 * The equality check is correct here as both values start at 0.
4640 */
4642 } else if (req_per_conn < trunk->conf.target_req_per_conn) {
4643 below_target:
4644 /*
4645 * Edge - Above target to below target (too few requests per conn - close some)
4646 *
4647 * The equality check is correct here as both values start at 0.
4648 */
4650 }
4651
4652done:
4653 if (conn_count_out) *conn_count_out = conn_count;
4654 if (req_count_out) *req_count_out = req_count;
4655
4656 /*
4657 * Check we haven't missed a call to trunk_requests_per_connection
4658 */
4659 fr_assert(!verify || (trunk->last_req_per_conn == 0) || (req_per_conn == trunk->last_req_per_conn));
4660
4661 trunk->last_req_per_conn = req_per_conn;
4662
4663 return req_per_conn;
4664}
4665
4666/** Drain the backlog of as many requests as possible
4667 *
4668 * @param[in] trunk To drain backlog requests for.
4669 */
4670static void trunk_backlog_drain(trunk_t *trunk)
4671{
4672 trunk_request_t *treq;
4673
4674 if (fr_heap_num_elements(trunk->backlog) == 0) return;
4675
4676 /*
4677 * If it's always writable, this isn't
4678 * really a noteworthy event.
4679 */
4680 if (!trunk->conf.always_writable) DEBUG3("Draining backlog of requests");
4681
4682 /*
4683 * Do *NOT* add an artificial limit
4684 * here. We rely on all available
4685 * connections entering the full
4686 * state and transitioning back to
4687 * active in order to drain the
4688 * backlog.
4689 */
4690 while ((treq = fr_heap_peek(trunk->backlog))) {
4691 switch (trunk_request_enqueue_existing(treq)) {
4692 case TRUNK_ENQUEUE_OK:
4693 continue;
4694
4695 /*
4696 * Signal to stop
4697 */
4699 break;
4700
4701 /*
4702 * Failed enqueueing the request,
4703 * have it enter the failed state
4704 * which will free it and
4705 * re-enliven the yielded request.
4706 */
4708 case TRUNK_ENQUEUE_FAIL:
4710 continue;
4711
4714 return;
4715 }
4716 }
4717}
4718
4719/** Force the trunk to re-establish its connections
4720 *
4721 * @param[in] trunk to signal.
4722 * @param[in] states One or more states or'd together.
4723 * @param[in] reason Why the connections are being signalled to reconnect.
4724 */
4725void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
4726{
4727
4728#define RECONNECT_BY_STATE(_state, _list) \
4729do { \
4730 if (states & (_state)) { \
4731 size_t i; \
4732 for (i = fr_dlist_num_elements(&trunk->_list); i > 0; i--) { \
4733 connection_signal_reconnect(((trunk_connection_t *)fr_dlist_tail(&trunk->_list))->pub.conn, reason); \
4734 } \
4735 } \
4736} while (0)
4737
4738 /*
4739 * Connections in the 'connecting' state
4740 * may re-enter that state, so we need to
4741 * be careful not to enter an infinite
4742 * loop, as we iterate over the list
4743 * again and again.
4744 */
4746
4747 if (states & TRUNK_CONN_ACTIVE) {
4748 trunk_connection_t *tconn;
4749 while ((tconn = fr_minmax_heap_min_peek(trunk->active))) connection_signal_reconnect(tconn->pub.conn, reason);
4750 }
4751
4759}
4760
4761/** Start the trunk running
4762 *
4763 */
4765{
4766 uint16_t i;
4767
4768 if (unlikely(trunk->started)) return 0;
4769
4770 /*
4771 * Spawn the initial set of connections
4772 */
4773 for (i = 0; i < trunk->conf.start; i++) {
4774 DEBUG("[%i] Starting initial connection", i);
4775 if (trunk_connection_spawn(trunk, fr_time()) != 0) return -1;
4776 }
4777
4778 /*
4779 * If the idle timeout is set, AND there's no management interval, OR the management interval is
4780 * less than the idle timeout, update the management interval.
4781 */
4785 trunk->conf.manage_interval = trunk->conf.idle_timeout;
4786 }
4787
4789 /*
4790 * Insert the event timer to manage
4791 * the interval between managing connections.
4792 */
4793 if (fr_event_timer_in(trunk, trunk->el, &trunk->manage_ev, trunk->conf.manage_interval,
4794 _trunk_timer, trunk) < 0) {
4795 PERROR("Failed inserting trunk management event");
4796 return -1;
4797 }
4798 }
4799 trunk->started = true;
4800 trunk->managing_connections = true;
4801
4802 return 0;
4803}
4804
4805/** Allow the trunk to open and close connections in response to load
4806 *
4807 */
4809{
4810 if (!trunk->started || trunk->managing_connections) return;
4811
4812 DEBUG3("Connection management enabled");
4813 trunk->managing_connections = true;
4814}
4815
4816/** Stop the trunk from opening and closing connections in response to load
4817 *
4818 */
4820{
4821 if (!trunk->started || !trunk->managing_connections) return;
4822
4823 DEBUG3("Connection management disabled");
4824 trunk->managing_connections = false;
4825}
4826
4827/** Schedule a trunk management event for the next time the event loop is executed
4828 */
4830{
4831 if (!trunk->started || !trunk->managing_connections) return 0;
4832
4833 if (fr_event_timer_in(trunk, trunk->el, &trunk->manage_ev, fr_time_delta_wrap(0), _trunk_timer, trunk) < 0) {
4834 PERROR("Failed inserting trunk management event");
4835 return -1;
4836 }
4837
4838 return 0;
4839}
4840
4841/** Order connections by queue depth
4842 *
4843 */
4844static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
4845{
4848
4851
4852 /*
4853 * Add a fudge factor of 1 to reduce spurious rebalancing
4854 */
4855 return ((a_count > b_count) && ((a_count - b_count) > 1)) - ((b_count > a_count) && ((b_count - a_count) > 1));
4856}
4857
4858/** Free a trunk, gracefully closing all connections.
4859 *
4860 */
4861static int _trunk_free(trunk_t *trunk)
4862{
4863 trunk_connection_t *tconn;
4864 trunk_request_t *treq;
4865 trunk_watch_entry_t *watch;
4866 size_t i;
4867
4868 DEBUG4("Trunk free %p", trunk);
4869
4870 trunk->freeing = true; /* Prevent re-enqueuing */
4871
4872 /*
4873 * We really don't want this firing after
4874 * we've freed everything.
4875 */
4877
4878 /*
4879 * Now free the connections in each of the lists.
4880 *
4881 * Each time a connection is freed it removes itself from the list
4882 * its in, which means the head should keep advancing automatically.
4883 */
4884 while ((tconn = fr_minmax_heap_min_peek(trunk->active))) connection_signal_halt(tconn->pub.conn);
4885 while ((tconn = fr_dlist_head(&trunk->init))) connection_signal_halt(tconn->pub.conn);
4886 while ((tconn = fr_dlist_head(&trunk->connecting))) connection_signal_halt(tconn->pub.conn);
4887 while ((tconn = fr_dlist_head(&trunk->full))) connection_signal_halt(tconn->pub.conn);
4888 while ((tconn = fr_dlist_head(&trunk->inactive))) connection_signal_halt(tconn->pub.conn);
4889 while ((tconn = fr_dlist_head(&trunk->inactive_draining))) connection_signal_halt(tconn->pub.conn);
4890 while ((tconn = fr_dlist_head(&trunk->closed))) connection_signal_halt(tconn->pub.conn);
4891 while ((tconn = fr_dlist_head(&trunk->draining))) connection_signal_halt(tconn->pub.conn);
4892 while ((tconn = fr_dlist_head(&trunk->draining_to_free))) connection_signal_halt(tconn->pub.conn);
4893
4894 /*
4895 * Process any deferred connection frees
4896 */
4897 while ((tconn = fr_dlist_head(&trunk->to_free))) talloc_free(fr_dlist_remove(&trunk->to_free, tconn));
4898
4899 /*
4900 * Free any requests left in the backlog
4901 */
4902 while ((treq = fr_heap_peek(trunk->backlog))) trunk_request_enter_failed(treq);
4903
4904 /*
4905 * Free any requests in our request cache
4906 */
4907 while ((treq = fr_dlist_head(&trunk->free_requests))) talloc_free(treq);
4908
4909 /*
4910 * Free any entries in the watch lists
4911 */
4912 for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
4913 while ((watch = fr_dlist_pop_head(&trunk->watch[i]))) talloc_free(watch);
4914 }
4915
4916 return 0;
4917}
4918
4919/** Allocate a new collection of connections
4920 *
4921 * This function should be called first to allocate a new trunk connection.
4922 *
4923 * After the trunk has been allocated, #trunk_request_alloc and
4924 * #trunk_request_enqueue should be used to allocate memory for trunk
4925 * requests, and pass a preq (protocol request) to the trunk for
4926 * processing.
4927 *
4928 * The trunk will then asynchronously process the request, writing the result
4929 * to a specified rctx. See #trunk_request_enqueue for more details.
4930 *
4931 * @note Trunks may not be shared between multiple threads under any circumstances.
4932 *
4933 * @param[in] ctx To use for any memory allocations. Must be thread local.
4934 * @param[in] el to use for I/O and timer events.
4935 * @param[in] funcs Callback functions.
4936 * @param[in] conf Common user configurable parameters.
4937 * @param[in] log_prefix To prepend to global messages.
4938 * @param[in] uctx User data to pass to the alloc function.
4939 * @param[in] delay_start If true, then we will not spawn any connections
4940 * until the first request is enqueued.
4941 * @return
4942 * - New trunk handle on success.
4943 * - NULL on error.
4944 */
4946 trunk_io_funcs_t const *funcs, trunk_conf_t const *conf,
4947 char const *log_prefix, void const *uctx, bool delay_start)
4948{
4949 trunk_t *trunk;
4950 size_t i;
4951
4952 /*
4953 * Check we have the functions we need
4954 */
4955 if (!fr_cond_assert(funcs->connection_alloc)) return NULL;
4956
4957 MEM(trunk = talloc_zero(ctx, trunk_t));
4958 trunk->el = el;
4959 trunk->log_prefix = talloc_strdup(trunk, log_prefix);
4960
4961 memcpy(&trunk->funcs, funcs, sizeof(trunk->funcs));
4962 if (!trunk->funcs.connection_prioritise) {
4964 }
4966
4967 memcpy(&trunk->conf, conf, sizeof(trunk->conf));
4968
4969 memcpy(&trunk->uctx, &uctx, sizeof(trunk->uctx));
4970 talloc_set_destructor(trunk, _trunk_free);
4971
4972 /*
4973 * Unused request list...
4974 */
4976
4977 /*
4978 * Request backlog queue
4979 */
4981 trunk_request_t, heap_id, 0));
4982
4983 /*
4984 * Connection queues and trees
4985 */
4987 trunk_connection_t, heap_id, 0));
4997
4998 /*
4999 * Watch lists
5000 */
5001 for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
5003 }
5004
5005 DEBUG4("Trunk allocated %p", trunk);
5006
5007 if (!delay_start) {
5008 if (trunk_start(trunk) < 0) {
5009 talloc_free(trunk);
5010 return NULL;
5011 }
5012 }
5013
5014 return trunk;
5015}
5016
5017#ifndef TALLOC_GET_TYPE_ABORT_NOOP
5018/** Verify a trunk
5019 *
5020 * A trunk has some number of connections, which each have some number of requests. The connections and
5021 * requests are in differing kinds of containers depending on their state and how they are used, and may
5022 * have fields that can only be validated by comparison with a parent. We had planned on passing a "context"
5023 * down with the ancestral values, but that breaks the foo_verify() API. Each foo_verify() will only verify the
5024 * foo's children.
5025 */
5026void trunk_verify(char const *file, int line, trunk_t *trunk)
5027{
5028 fr_fatal_assert_msg(trunk, "CONSISTENCY CHECK FAILED %s[%i]: trunk_t pointer was NULL", file, line);
5029 (void) talloc_get_type_abort(trunk, trunk_t);
5030
5031 for (size_t i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
5032 _fr_dlist_verify(file, line, &trunk->watch[i]);
5033 }
5034
5035#define IO_FUNC_VERIFY(_func) \
5036 fr_fatal_assert_msg(trunk->funcs._func, "CONSISTENCY_CHECK_FAILED %s[%i}: " #_func " was NULL", file, line)
5037
5038 /*
5039 * Only a few of the function pointers *must* be non-NULL..
5040 */
5042 IO_FUNC_VERIFY(connection_prioritise);
5044
5045#define TRUNK_TCONN_CHECKS(_tconn, _state) \
5046do { \
5047 fr_fatal_assert_msg(trunk == _tconn->pub.trunk, \
5048 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-trunk mismatch", file, line); \
5049 fr_fatal_assert_msg(_state == _tconn->pub.state, \
5050 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-state mismatch", file, line); \
5051} while (0)
5052
5053#define TCONN_DLIST_VERIFY(_dlist, _state) \
5054do { \
5055 _fr_dlist_verify(file, line, &(trunk->_dlist)); \
5056 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5057 trunk_connection_verify(file, line, tconn); \
5058 TRUNK_TCONN_CHECKS(tconn, _state); \
5059 } \
5060} while (0)
5061
5062#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state) \
5063do {\
5064 fr_minmax_heap_verify(file, line, trunk->_heap); \
5065 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5066 trunk_connection_verify(file, line, tconn); \
5067 TRUNK_TCONN_CHECKS(tconn, _state); \
5068 }} \
5069} while (0)
5070
5071 fr_dlist_verify(&(trunk->free_requests));
5072 FR_HEAP_VERIFY(trunk->backlog);
5073
5080 /* TCONN_DLIST_VERIFY(failed, ???); */
5085}
5086
5088{
5089 fr_fatal_assert_msg(tconn, "CONSISTENCY CHECK FAILED %s[%i]: trunk_connection_t pointer was NULL", file, line);
5090 (void) talloc_get_type_abort(tconn, trunk_connection_t);
5091
5092 (void) talloc_get_type_abort(tconn->pub.trunk, trunk_t);
5093
5094 /*
5095 * shouldn't be both in heap and on list--but it doesn't look like moves
5096 * to active heap wipe the dlist pointers.
5097 */
5098
5099#define TCONN_TREQ_CHECKS(_treq, _state) \
5100do { \
5101 fr_fatal_assert_msg(tconn == _treq->pub.tconn, \
5102 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-tconn mismatch", file, line); \
5103 fr_fatal_assert_msg(tconn->pub.trunk == _treq->pub.trunk, \
5104 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-trunk mismatch", file, line); \
5105 fr_fatal_assert_msg(_state == _treq->pub.state, \
5106 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-state mismatch", file, line); \
5107} while (0)
5108
5109#define TREQ_DLIST_VERIFY(_dlist, _state) \
5110do { \
5111 _fr_dlist_verify(file, line, &(tconn->_dlist)); \
5112 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5113 trunk_request_verify(file, line, treq); \
5114 TCONN_TREQ_CHECKS(treq, _state); \
5115 } \
5116} while (0)
5117
5118#define TREQ_HEAP_VERIFY(_heap, _state) \
5119do { \
5120 fr_heap_iter_t _iter; \
5121 fr_heap_verify(file, line, tconn->_heap); \
5122 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5123 treq; \
5124 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5125 trunk_request_verify(file, line, treq); \
5126 TCONN_TREQ_CHECKS(treq, _state); \
5127 } \
5128} while (0)
5129
5130#define TREQ_OPTION_VERIFY(_option, _state) \
5131do { \
5132 if (tconn->_option) { \
5133 trunk_request_verify(file, line, tconn->_option); \
5134 TCONN_TREQ_CHECKS(tconn->_option, _state); \
5135 } \
5136} while (0)
5137
5138 /* verify associated requests */
5145}
5146
5147void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
5148{
5149 fr_fatal_assert_msg(treq, "CONSISTENCY CHECK FAILED %s[%i]: trunk_request_t pointer was NULL", file, line);
5150 (void) talloc_get_type_abort(treq, trunk_request_t);
5151
5152#ifdef WITH_VERIFY_PTR
5153 if (treq->pub.request) request_verify(file, line, treq->pub.request);
5154#endif
5155}
5156
5157
5158bool trunk_search(trunk_t *trunk, void *ptr)
5159{
5160#define TCONN_DLIST_SEARCH(_dlist) \
5161do { \
5162 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5163 if (ptr == tconn) { \
5164 fr_fprintf(stderr, "trunk_search: tconn %p on " #_dlist "\n", ptr); \
5165 return true; \
5166 } \
5167 if (trunk_connection_search(tconn, ptr)) { \
5168 fr_fprintf(stderr, " in tconn %p on " #_dlist "\n", tconn); \
5169 return true; \
5170 } \
5171 } \
5172} while (0)
5173
5174#define TCONN_MINMAX_HEAP_SEARCH(_heap) \
5175do { \
5176 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5177 if (ptr == tconn) { \
5178 fr_fprintf(stderr, "trunk_search: tconn %p on " #_heap "\n", ptr); \
5179 return true; \
5180 } \
5181 if (trunk_connection_search(tconn, ptr)) { \
5182 fr_fprintf(stderr, " on tconn %p on " #_heap "\n", tconn); \
5183 return true; \
5184 } \
5185 }}\
5186} while (0)
5187
5189 TCONN_DLIST_SEARCH(connecting);
5191 TCONN_DLIST_SEARCH(full);
5192 TCONN_DLIST_SEARCH(inactive);
5193 TCONN_DLIST_SEARCH(inactive_draining);
5194 TCONN_DLIST_SEARCH(failed);
5195 TCONN_DLIST_SEARCH(closed);
5196 TCONN_DLIST_SEARCH(draining);
5197 TCONN_DLIST_SEARCH(draining_to_free);
5198 TCONN_DLIST_SEARCH(to_free);
5199
5200 return false;
5201}
5202
5204{
5205#define TREQ_DLIST_SEARCH(_dlist) \
5206do { \
5207 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5208 if (ptr == treq) { \
5209 fr_fprintf(stderr, "trunk_search: treq %p on " #_dlist "\n", ptr); \
5210 return true; \
5211 } \
5212 if (trunk_request_search(treq, ptr)) { \
5213 fr_fprintf(stderr, "trunk_search: preq %p found on " #_dlist, ptr); \
5214 return true; \
5215 } \
5216 } \
5217} while (0)
5218
5219#define TREQ_HEAP_SEARCH(_heap) \
5220do { \
5221 fr_heap_iter_t _iter; \
5222 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5223 treq; \
5224 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5225 if (ptr == treq) { \
5226 fr_fprintf(stderr, "trunk_search: treq %p in " #_heap "\n", ptr); \
5227 return true; \
5228 } \
5229 if (trunk_request_search(treq, ptr)) { \
5230 fr_fprintf(stderr, "trunk_search: preq %p found in " #_heap, ptr); \
5231 return true; \
5232 } \
5233 } \
5234} while (0)
5235
5236#define TREQ_OPTION_SEARCH(_option) \
5237do { \
5238 if (tconn->_option) { \
5239 if (ptr == tconn->_option) { \
5240 fr_fprintf(stderr, "trunk_search: treq %p is " #_option "\n", ptr); \
5241 return true; \
5242 } \
5243 if (trunk_request_search(tconn->_option, ptr)) { \
5244 fr_fprintf(stderr, "trunk_search: preq %p found in " #_option, ptr); \
5245 return true; \
5246 } \
5247 } \
5248} while (0)
5249
5250 /* search associated requests */
5251 TREQ_HEAP_SEARCH(pending);
5252 TREQ_DLIST_SEARCH(sent);
5253 TREQ_DLIST_SEARCH(cancel);
5254 TREQ_DLIST_SEARCH(cancel_sent);
5255 TREQ_OPTION_SEARCH(partial);
5256 TREQ_OPTION_SEARCH(cancel_partial);
5257
5258 return false;
5259}
5260
5262{
5263 return treq->pub.preq == ptr;
5264}
5265#endif
int const char * file
Definition acutest.h:702
int const char int line
Definition acutest.h:702
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request)
static bool init
Definition fuzzer.c:41
#define L(_str)
Helper for initialising arrays of string literals.
Definition build.h:209
#define NDEBUG_UNUSED
Definition build.h:326
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition build.h:322
#define unlikely(_x)
Definition build.h:381
#define UNUSED
Definition build.h:315
#define NUM_ELEMENTS(_t)
Definition build.h:337
#define CONF_PARSER_TERMINATOR
Definition cf_parse.h:642
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition cf_parse.h:268
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
Definition cf_parse.h:323
#define FR_CONF_OFFSET_SUBSECTION(_name, _flags, _struct, _field, _subcs)
conf_parser_t which populates a sub-struct using a CONF_SECTION
Definition cf_parse.h:297
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Definition cf_parse.h:412
Defines a CONF_PAIR to C data type mapping.
Definition cf_parse.h:579
connection_state_t
Definition connection.h:45
@ CONNECTION_STATE_FAILED
Connection has failed.
Definition connection.h:54
@ CONNECTION_STATE_HALTED
The connection is in a halted stat.
Definition connection.h:46
@ CONNECTION_STATE_CLOSED
Connection has been closed.
Definition connection.h:55
@ CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
Definition connection.h:52
@ CONNECTION_STATE_INIT
Init state, sets up connection.
Definition connection.h:49
@ CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
Definition connection.h:50
@ CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
Definition connection.h:53
connection_reason_t
Definition connection.h:83
static size_t min(size_t x, size_t y)
Definition dbuff.c:66
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:139
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:210
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:156
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:184
#define MEM(x)
Definition debug.h:36
#define DEBUG(fmt,...)
Definition dhcpclient.c:39
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:260
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition dlist.h:486
static void _fr_dlist_verify(char const *file, int line, fr_dlist_head_t const *list_head)
Check all items in the list are valid.
Definition dlist.h:735
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition dlist.h:638
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition dlist.h:163
static void * fr_dlist_prev(fr_dlist_head_t const *list_head, void const *ptr)
Get the previous item in a list.
Definition dlist.h:588
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:939
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition dlist.h:672
static void * fr_dlist_tail(fr_dlist_head_t const *list_head)
Return the TAIL item of a list or NULL if the list is empty.
Definition dlist.h:531
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:378
#define fr_dlist_verify(_head)
Definition dlist.h:755
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:275
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
Definition dlist.h:338
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Definition dlist.h:555
Head of a doubly linked list.
Definition dlist.h:51
Entry in a doubly linked list.
Definition dlist.h:41
#define fr_event_timer_in(...)
Definition event.h:255
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
unsigned int fr_heap_index_t
Definition heap.h:80
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition heap.h:136
#define FR_HEAP_VERIFY(_heap)
Definition heap.h:212
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition heap.h:115
The main heap structure.
Definition heap.h:66
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition log.h:528
#define RDEBUG3(fmt,...)
Definition log.h:343
#define RWARN(fmt,...)
Definition log.h:297
#define DEBUG4(_fmt,...)
Definition log.h:267
#define RATE_LIMIT_LOCAL_ROPTIONAL(_entry, _l_request, _l_global, _fmt,...)
Rate limit messages using a local limiting entry.
Definition log.h:606
Track when a log message was last repeated.
Definition log.h:547
talloc_free(reap)
int fr_event_timer_delete(fr_event_timer_t const **ev_p)
Delete a timer event from the event list.
Definition event.c:1611
Stores all information relating to an event list.
Definition event.c:411
A timer event.
Definition event.c:102
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition log.c:583
fr_log_type_t
Definition log.h:54
#define ROUND_UP_DIV(_x, _y)
Get the ceiling value of integer division.
Definition math.h:153
unsigned short uint16_t
unsigned int uint32_t
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
void * fr_minmax_heap_iter_next(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Get the next entry in a minmax heap.
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
void * fr_minmax_heap_iter_init(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Iterate over entries in a minmax heap.
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
unsigned int fr_minmax_heap_iter_t
Definition minmax_heap.h:38
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
Definition minmax_heap.h:85
int8_t fr_pointer_cmp(void const *a, void const *b)
Compares two pointers.
Definition misc.c:417
static int8_t request_prioritise(void const *one, void const *two)
Definition bio.c:1057
#define fr_assert(_expr)
Definition rad_assert.h:38
static bool done
Definition radclient.c:80
#define RDEBUG(fmt,...)
Definition radclient.h:53
#define DEBUG2(fmt,...)
Definition radclient.h:43
#define WARN(fmt,...)
Definition radclient.h:47
#define INFO(fmt,...)
Definition radict.c:54
static fr_event_list_t * events
Definition radsniff.c:59
static rs_t * conf
Definition radsniff.c:53
void connection_signal_shutdown(connection_t *conn)
Shuts down a connection gracefully.
int connection_del_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a post list.
Definition connection.c:475
void connection_signal_halt(connection_t *conn)
Shuts down a connection ungracefully.
void connection_signals_resume(connection_t *conn)
Resume processing of deferred signals.
Definition connection.c:322
void connection_signal_reconnect(connection_t *conn, connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
void connection_signal_init(connection_t *conn)
Asynchronously signal a halted connection to start.
connection_t * connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_funcs_t const *funcs, connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
connection_watch_entry_t * connection_add_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed before a state function has been called.
Definition connection.c:513
connection_watch_entry_t * connection_add_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed after a state function has been called.
Definition connection.c:535
int connection_del_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a pre list.
Definition connection.c:458
void connection_signals_pause(connection_t *conn)
Pause processing of deferred signals.
Definition connection.c:313
static fr_time_t test_time_base
Definition slab_tests.c:42
return count
Definition module.c:163
fr_time_t test_time
Definition state_test.c:3
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
@ memory_order_relaxed
Definition stdatomic.h:127
#define atomic_fetch_add_explicit(object, operand, order)
Definition stdatomic.h:302
#define ATOMIC_VAR_INIT(value)
Definition stdatomic.h:88
Definition log.h:96
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
An element in a table indexed by bit position.
Definition table.h:83
An element in an arbitrarily ordered array of name to num mappings.
Definition table.h:57
#define talloc_get_type_abort_const
Definition talloc.h:282
#define talloc_pooled_object(_ctx, _type, _num_subobjects, _total_subobjects_size)
Definition talloc.h:180
#define fr_time_gteq(_a, _b)
Definition time.h:238
#define fr_time_delta_wrap(_time)
Definition time.h:152
#define fr_time_wrap(_time)
Definition time.h:145
#define fr_time_lteq(_a, _b)
Definition time.h:240
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_gt(_a, _b)
Definition time.h:237
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
#define fr_time_lt(_a, _b)
Definition time.h:239
#define fr_time_delta_gt(_a, _b)
Definition time.h:283
"server local" time.
Definition time.h:69
bool trunk_search(trunk_t *trunk, void *ptr)
Definition trunk.c:5158
static atomic_uint_fast64_t request_counter
Definition trunk.c:55
static void trunk_connection_enter_active(trunk_connection_t *tconn)
Transition a connection back to the active state.
Definition trunk.c:3248
#define CONN_REORDER(_tconn)
Reorder the connections in the active heap.
Definition trunk.c:776
static size_t trunk_req_trigger_names_len
Definition trunk.c:358
int trunk_connection_pop_cancellation(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a cancellation request off a connection's cancellation queue.
Definition trunk.c:3835
fr_dlist_head_t cancel
Requests in the cancel state.
Definition trunk.c:162
int trunk_connection_manage_schedule(trunk_t *trunk)
Schedule a trunk management event for the next time the event loop is executed.
Definition trunk.c:4829
#define REQUEST_EXTRACT_SENT(_treq)
Remove the current request from the sent list.
Definition trunk.c:746
static void _trunk_connection_on_shutdown(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the shutdown state.
Definition trunk.c:3398
struct trunk_watch_entry_s trunk_watch_entry_t
An entry in a trunk watch function list.
fr_dlist_head_t reapable
Idle request.
Definition trunk.c:160
fr_heap_t * pending
Requests waiting to be sent.
Definition trunk.c:154
trunk_conf_t conf
Trunk common configuration.
Definition trunk.c:207
static size_t trunk_connection_states_len
Definition trunk.c:415
#define REQUEST_EXTRACT_REAPABLE(_treq)
Remove the current request from the reapable list.
Definition trunk.c:751
trunk_connection_t * tconn
The request was associated with.
Definition trunk.c:83
void trunk_connection_callback_readable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O read function.
Definition trunk.c:4014
fr_rate_limit_t limit_last_failure_log
Rate limit on "Refusing to enqueue requests - No active conns".
Definition trunk.c:281
void trunk_verify(char const *file, int line, trunk_t *trunk)
Verify a trunk.
Definition trunk.c:5026
#define IN_HANDLER(_trunk)
Definition trunk.c:706
static fr_table_num_ordered_t const trunk_connection_states[]
Definition trunk.c:403
void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
Force the trunk to re-establish its connections.
Definition trunk.c:4725
void trunk_connection_callback_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O write function.
Definition trunk.c:4031
void * uctx
User data to pass to the function.
Definition trunk.c:192
static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new)
Transition a request to the pending state, adding it to the backlog of an active connection.
Definition trunk.c:1154
static void trunk_request_remove_from_conn(trunk_request_t *treq)
Remove a request from all connection lists.
Definition trunk.c:963
fr_rate_limit_t limit_max_requests_alloc_log
Rate limit on "Refusing to alloc requests - Limit of * requests reached".
Definition trunk.c:279
trunk_request_state_t to
What state we transitioned to.
Definition trunk.c:81
static int8_t _trunk_request_prioritise(void const *a, void const *b)
Compare two protocol requests.
Definition trunk.c:939
static void trunk_manage(trunk_t *trunk, fr_time_t now)
Implements the algorithm we use to manage requests per connection levels.
Definition trunk.c:4163
static int _trunk_connection_free(trunk_connection_t *tconn)
Free a connection.
Definition trunk.c:3680
trunk_io_funcs_t funcs
I/O functions.
Definition trunk.c:259
fr_dlist_head_t draining
Connections that will be freed once all their requests are complete, but can be reactivated.
Definition trunk.c:244
#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq)
Remove the current request from the cancel_partial slot.
Definition trunk.c:761
int trunk_start(trunk_t *trunk)
Start the trunk running.
Definition trunk.c:4764
void trunk_request_signal_partial(trunk_request_t *treq)
Signal a partial write.
Definition trunk.c:2029
void trunk_request_signal_fail(trunk_request_t *treq)
Signal that a trunk request failed.
Definition trunk.c:2132
fr_event_timer_t const * manage_ev
Periodic connection management event.
Definition trunk.c:273
#define TREQ_OPTION_SEARCH(_option)
void trunk_request_signal_cancel_sent(trunk_request_t *treq)
Signal that a remote server has been notified of the cancellation.
Definition trunk.c:2260
static void trunk_connection_enter_draining_to_free(trunk_connection_t *tconn)
Transition a connection to the draining-to-reconnect state.
Definition trunk.c:3212
trunk_watch_t func
Function to call when a trunk enters.
Definition trunk.c:188
void trunk_connection_signal_readable(trunk_connection_t *tconn)
Signal that a trunk connection is readable.
Definition trunk.c:3921
#define DO_REQUEST_FREE(_treq)
Call the free callback (if set)
Definition trunk.c:598
trunk_request_t * trunk_request_alloc(trunk_t *trunk, request_t *request)
(Pre-)Allocate a new trunk request
Definition trunk.c:2474
static void _trunk_connection_on_halted(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the halted state.
Definition trunk.c:3632
#define REQUEST_EXTRACT_BACKLOG(_treq)
Remove the current request from the backlog.
Definition trunk.c:717
fr_heap_index_t heap_id
Used to track the connection in the connected heap.
Definition trunk.c:139
fr_dlist_head_t closed
Connections that have closed.
Definition trunk.c:241
fr_dlist_head_t watch[TRUNK_STATE_MAX]
To be called when trunk changes state.
Definition trunk.c:265
static void trunk_watch_call(trunk_t *trunk, fr_dlist_head_t *list, trunk_state_t state)
Call a list of watch functions associated with a state.
Definition trunk.c:789
static void trunk_request_enter_cancel_complete(trunk_request_t *treq)
Cancellation was acked, the request is complete, free it.
Definition trunk.c:1504
int line
Line change occurred on.
Definition trunk.c:93
static void trunk_connection_enter_inactive_draining(trunk_connection_t *tconn)
Transition a connection to the inactive-draining state.
Definition trunk.c:3150
#define CONN_STATE_TRANSITION(_new, _log)
Definition trunk.c:441
static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out, trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify)
Update timestamps for when we last had a transition from above target to below target or vice versa.
Definition trunk.c:4571
static size_t trunk_connection_events_len
Definition trunk.c:431
static void _trunk_connection_on_failed(connection_t *conn, connection_state_t prev, connection_state_t state, void *uctx)
Connection failed.
Definition trunk.c:3580
bool oneshot
Remove the function after it's called once.
Definition trunk.c:190
bool started
Has the trunk been started.
Definition trunk.c:290
static size_t trunk_states_len
Definition trunk.c:401
#define TCONN_DLIST_VERIFY(_dlist, _state)
#define IO_FUNC_VERIFY(_func)
uint32_t trunk_request_count_by_connection(trunk_connection_t const *tconn, int req_state)
Return the count number of requests associated with a trunk connection.
Definition trunk.c:2879
uint64_t last_req_per_conn
The last request to connection ratio we calculated.
Definition trunk.c:295
#define DO_REQUEST_COMPLETE(_treq)
Call the complete callback (if set)
Definition trunk.c:559
static void trunk_connection_auto_full(trunk_connection_t *tconn)
Automatically mark a connection as inactive.
Definition trunk.c:2898
static void trunk_connection_remove(trunk_connection_t *tconn)
Remove a trunk connection from whichever list it's currently in.
Definition trunk.c:3049
#define TRUNK_REQUEST_STATE_LOG_MAX
The maximum number of state logs to record per request.
Definition trunk.c:72
static void trunk_connection_writable(trunk_connection_t *tconn)
A connection is writable.
Definition trunk.c:2962
fr_event_timer_t const * lifetime_ev
Maximum time this connection can be open.
Definition trunk.c:179
#define OVER_MAX_CHECK
trunk_connection_event_t events
The current events we expect to be notified on.
Definition trunk.c:148
trunk_watch_entry_t * trunk_add_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch, bool oneshot, void const *uctx)
Add a watch entry to the trunk state list.
Definition trunk.c:865
static int _trunk_free(trunk_t *trunk)
Free a trunk, gracefully closing all connections.
Definition trunk.c:4861
fr_dlist_head_t failed
Connections that'll be reconnected shortly.
Definition trunk.c:239
static void trunk_rebalance(trunk_t *trunk)
Rebalance connections across active trunk members when a new connection becomes active.
Definition trunk.c:4099
static void trunk_backlog_drain(trunk_t *trunk)
Drain the backlog of as many requests as possible.
Definition trunk.c:4670
#define DO_REQUEST_CANCEL(_treq, _reason)
Call the cancel callback if set.
Definition trunk.c:520
static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
Order connections by queue depth.
Definition trunk.c:4844
struct trunk_request_pub_s pub
Public fields in the trunk request.
Definition trunk.c:101
trunk_t * trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, trunk_io_funcs_t const *funcs, trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start)
Allocate a new collection of connections.
Definition trunk.c:4945
#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state)
trunk_request_t * cancel_partial
Partially written cancellation request.
Definition trunk.c:164
#define TCONN_MINMAX_HEAP_SEARCH(_heap)
uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Move requests off of a connection and requeue elsewhere.
Definition trunk.c:2010
bool enabled
Whether the watch entry is enabled.
Definition trunk.c:191
fr_time_t last_freed
Last time this request was freed.
Definition trunk.c:114
#define DO_REQUEST_CONN_RELEASE(_treq)
Call the "conn_release" callback (if set)
Definition trunk.c:541
#define TREQ_DLIST_SEARCH(_dlist)
#define REQUEST_EXTRACT_CANCEL(_treq)
Remove the current request from the cancel list.
Definition trunk.c:756
static bool trunk_connection_is_full(trunk_connection_t *tconn)
Return whether a trunk connection should currently be considered full.
Definition trunk.c:2921
struct trunk_pub_s pub
Public fields in the trunk connection.
Definition trunk.c:199
trunk_cancel_reason_t cancel_reason
Why this request was cancelled.
Definition trunk.c:112
#define REQUEST_BAD_STATE_TRANSITION(_new)
Definition trunk.c:486
trunk_enqueue_t trunk_request_enqueue_on_conn(trunk_request_t **treq_out, trunk_connection_t *tconn, request_t *request, void *preq, void *rctx, bool ignore_limits)
Enqueue additional requests on a specific connection.
Definition trunk.c:2741
static void _trunk_connection_on_closed(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection failed after it was connected.
Definition trunk.c:3509
static fr_table_num_ordered_t const trunk_connection_events[]
Definition trunk.c:425
trunk_enqueue_t trunk_request_enqueue(trunk_request_t **treq_out, trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
Definition trunk.c:2587
#define TCONN_DLIST_SEARCH(_dlist)
static void trunk_request_enter_unassigned(trunk_request_t *treq)
Transition a request to the unassigned state, in preparation for re-assignment.
Definition trunk.c:1057
struct trunk_request_s trunk_request_t
Definition trunk.c:33
void * in_handler
Which handler we're inside.
Definition trunk.c:261
bool freeing
Trunk is being freed, don't spawn new connections or re-enqueue.
Definition trunk.c:287
static fr_table_num_ordered_t const trunk_states[]
Definition trunk.c:396
static void trunk_connection_readable(trunk_connection_t *tconn)
A connection is readable.
Definition trunk.c:2952
#define IS_SERVICEABLE(_tconn)
Definition trunk.c:711
trunk_enqueue_t trunk_request_requeue(trunk_request_t *treq)
Re-enqueue a request on the same connection.
Definition trunk.c:2676
#define IS_PROCESSING(_tconn)
Definition trunk.c:712
#define RECONNECT_BY_STATE(_state, _list)
static void trunk_connection_enter_draining(trunk_connection_t *tconn)
Transition a connection to the draining state.
Definition trunk.c:3180
static fr_table_num_indexed_bit_pos_t const trunk_req_trigger_names[]
Map request states to trigger names.
Definition trunk.c:343
fr_dlist_t entry
Used to track the trunk request in the conn->sent or trunk->backlog request.
Definition trunk.c:109
static void trunk_connection_close_if_empty(trunk_t *trunk, fr_dlist_head_t *head)
Close connections in a particular connection list if they have no requests associated with them.
Definition trunk.c:4057
void trunk_request_signal_cancel_complete(trunk_request_t *treq)
Signal that a remote server acked our cancellation.
Definition trunk.c:2284
static trunk_enqueue_t trunk_request_check_enqueue(trunk_connection_t **tconn_out, trunk_t *trunk, request_t *request)
Check to see if a trunk request can be enqueued.
Definition trunk.c:1602
#define DO_REQUEST_MUX(_tconn)
Write one or more requests to a connection.
Definition trunk.c:616
#define REQUEST_EXTRACT_PARTIAL(_treq)
Remove the current request from the partial slot.
Definition trunk.c:737
fr_dlist_head_t sent
Sent request.
Definition trunk.c:158
static void trunk_request_enter_partial(trunk_request_t *treq)
Transition a request to the partial state, indicating that is has been partially sent.
Definition trunk.c:1225
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
Definition trunk.c:3883
fr_dlist_head_t connecting
Connections which are not yet in the open state.
Definition trunk.c:225
static void _trunk_timer(fr_event_list_t *el, fr_time_t now, void *uctx)
Event to periodically call the connection management function.
Definition trunk.c:4500
#define TRUNK_STATE_TRANSITION(_new)
Definition trunk.c:885
void trunk_request_signal_cancel(trunk_request_t *treq)
Cancel a trunk request.
Definition trunk.c:2152
void trunk_request_state_log_entry_add(char const *function, int line, trunk_request_t *treq, trunk_request_state_t new)
Definition trunk.c:2797
static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now)
Attempt to spawn a new connection.
Definition trunk.c:3748
int trunk_del_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch)
Remove a watch function from a trunk state list.
Definition trunk.c:831
struct trunk_connection_pub_s pub
Public fields in the trunk connection.
Definition trunk.c:135
static void trunk_request_enter_reapable(trunk_request_t *treq)
Transition a request to the reapable state, indicating that it's been sent in its entirety,...
Definition trunk.c:1311
uint16_t trunk_connection_count_by_state(trunk_t *trunk, int conn_state)
Return the count number of connections in the specified states.
Definition trunk.c:2855
#define IN_REQUEST_DEMUX(_trunk)
Definition trunk.c:708
#define DO_REQUEST_FAIL(_treq, _prev_state)
Call the fail callback (if set)
Definition trunk.c:578
static void trunk_request_enter_cancel(trunk_request_t *treq, trunk_cancel_reason_t reason)
Transition a request to the cancel state, placing it in a connection's cancellation list.
Definition trunk.c:1373
static trunk_enqueue_t trunk_request_enqueue_existing(trunk_request_t *treq)
Enqueue a request which has never been assigned to a connection or was previously cancelled.
Definition trunk.c:1680
bool managing_connections
Whether the trunk is allowed to manage (open/close) connections.
Definition trunk.c:292
#define DO_CONNECTION_ALLOC(_tconn)
Allocate a new connection.
Definition trunk.c:667
char const * function
State change occurred in.
Definition trunk.c:92
static size_t trunk_request_states_len
Definition trunk.c:376
fr_dlist_head_t init
Connections which have not yet started connecting.
Definition trunk.c:222
fr_dlist_head_t * log_head
To allow the log entry to remove itself on free.
Definition trunk.c:78
static void trunk_request_enter_cancel_partial(trunk_request_t *treq)
Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot.
Definition trunk.c:1424
static void _trunk_connection_on_connected(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connected state.
Definition trunk.c:3450
fr_dlist_head_t to_free
Connections we're done with and will free on the next call to trunk_manage.
Definition trunk.c:250
trunk_request_t * partial
Partially written request.
Definition trunk.c:156
static void trunk_request_enter_failed(trunk_request_t *treq)
Request failed, inform the API client and free the request.
Definition trunk.c:1566
fr_minmax_heap_t * active
Connections which can service requests.
Definition trunk.c:227
conf_parser_t const trunk_config[]
Config parser definitions to populate a trunk_conf_t.
Definition trunk.c:315
static void trunk_request_enter_complete(trunk_request_t *treq)
Request completed successfully, inform the API client and free the request.
Definition trunk.c:1535
static void trunk_request_enter_sent(trunk_request_t *treq)
Transition a request to the sent state, indicating that it's been sent in its entirety.
Definition trunk.c:1255
#define DO_REQUEST_CANCEL_MUX(_tconn)
Write one or more cancellation requests to a connection.
Definition trunk.c:649
static void trunk_connection_enter_full(trunk_connection_t *tconn)
Transition a connection to the full state.
Definition trunk.c:3105
void trunk_request_free(trunk_request_t **treq_to_free)
If the trunk request is freed then update the target requests.
Definition trunk.c:2322
#define DO_REQUEST_DEMUX(_tconn)
Read one or more requests from a connection.
Definition trunk.c:633
static uint64_t trunk_connection_requests_dequeue(fr_dlist_head_t *out, trunk_connection_t *tconn, int states, uint64_t max)
Shift requests in the specified states onto new connections.
Definition trunk.c:1737
static int _trunk_request_free(trunk_request_t *treq)
Actually free the trunk request.
Definition trunk.c:2441
char const * log_prefix
What to prepend to messages.
Definition trunk.c:203
#define REQUEST_EXTRACT_PENDING(_treq)
Remove the current request from the pending list.
Definition trunk.c:727
static void trunk_connection_event_update(trunk_connection_t *tconn)
Update the registrations for I/O events we're interested in.
Definition trunk.c:2985
static conf_parser_t const trunk_config_request[]
Definition trunk.c:299
fr_dlist_head_t full
Connections which have too many outstanding requests.
Definition trunk.c:229
#define DEQUEUE_ALL(_src_list, _state)
static void trunk_request_enter_backlog(trunk_request_t *treq, bool new)
Transition a request to the backlog state, adding it to the backlog of the trunk.
Definition trunk.c:1092
static fr_table_num_ordered_t const trunk_request_states[]
Definition trunk.c:361
static void _trunk_connection_on_connecting(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connecting state.
Definition trunk.c:3354
static void _trunk_connection_lifetime_expire(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Trigger a reconnection of the trunk connection.
Definition trunk.c:3432
static fr_table_num_indexed_bit_pos_t const trunk_conn_trigger_names[]
Map connection states to trigger names.
Definition trunk.c:382
fr_dlist_head_t draining_to_free
Connections that will be freed once all their requests are complete.
Definition trunk.c:247
uint64_t id
Trunk request ID.
Definition trunk.c:105
uint64_t sent_count
The number of requests that have been sent using this connection.
Definition trunk.c:172
static void _trunk_connection_on_init(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the the init state.
Definition trunk.c:3319
#define DO_CONNECTION_NOTIFY(_tconn, _events)
Change what events the connection should be notified about.
Definition trunk.c:689
#define TREQ_DLIST_VERIFY(_dlist, _state)
fr_dlist_head_t inactive
Connections which have been signalled to be inactive by the API client.
Definition trunk.c:232
void trunk_connection_manage_stop(trunk_t *trunk)
Stop the trunk from opening and closing connections in response to load.
Definition trunk.c:4819
#define TREQ_HEAP_VERIFY(_heap, _state)
void trunk_connection_signal_active(trunk_connection_t *tconn)
Signal a trunk connection is no longer full.
Definition trunk.c:3960
fr_dlist_head_t log
State change log.
Definition trunk.c:124
uint64_t tconn_id
If the treq was associated with a connection the connection ID.
Definition trunk.c:86
fr_dlist_t entry
Used to track the connection in the connecting, full and failed lists.
Definition trunk.c:142
static void trunk_request_enter_cancel_sent(trunk_request_t *treq)
Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list.
Definition trunk.c:1459
static void trunk_connection_enter_inactive(trunk_connection_t *tconn)
Transition a connection to the inactive state.
Definition trunk.c:3127
trunk_request_state_t from
What state we transitioned from.
Definition trunk.c:80
fr_dlist_head_t cancel_sent
Sent cancellation request.
Definition trunk.c:166
void trunk_connection_manage_start(trunk_t *trunk)
Allow the trunk to open and close connections in response to load.
Definition trunk.c:4808
fr_dlist_head_t inactive_draining
Connections which have been signalled to be inactive by the API client, which the trunk manager is dr...
Definition trunk.c:235
void trunk_connection_signal_inactive(trunk_connection_t *tconn)
Signal a trunk connection cannot accept more requests.
Definition trunk.c:3937
static int _state_log_entry_free(trunk_request_state_log_t *slog)
Used for sanity checks to ensure all log entries have been freed.
Definition trunk.c:2790
void trunk_connection_verify(char const *file, int line, trunk_connection_t *tconn)
Definition trunk.c:5087
fr_heap_t * backlog
The request backlog.
Definition trunk.c:212
#define IN_REQUEST_CANCEL_MUX(_trunk)
Definition trunk.c:709
void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
Definition trunk.c:5147
uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
Return a count of requests on a connection in a specific state.
Definition trunk.c:4522
void trunk_request_signal_cancel_partial(trunk_request_t *treq)
Signal a partial cancel write.
Definition trunk.c:2236
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
Definition trunk.c:2050
#define COUNT_BY_STATE(_state, _list)
void * uctx
Uctx data to pass to alloc.
Definition trunk.c:263
#define TREQ_OPTION_VERIFY(_option, _state)
bool trunk_connection_search(trunk_connection_t *tconn, void *ptr)
Definition trunk.c:5203
#define CONN_BAD_STATE_TRANSITION(_new)
Definition trunk.c:452
fr_heap_index_t heap_id
Used to track the request conn->pending heap.
Definition trunk.c:107
#define REQUEST_STATE_TRANSITION(_new)
Record a request state transition and log appropriate output.
Definition trunk.c:475
trunk_watch_entry_t * next_watcher
Watcher about to be run. Used to prevent nested watchers.
Definition trunk.c:267
static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Remove requests in specified states from a connection, attempting to distribute them to new connectio...
Definition trunk.c:1848
bool sent
Trunk request has been sent at least once.
Definition trunk.c:119
void trunk_request_signal_complete(trunk_request_t *treq)
Signal that a trunk request is complete.
Definition trunk.c:2094
static void trunk_connection_auto_unfull(trunk_connection_t *tconn)
Automatically mark a connection as active or reconnect it.
Definition trunk.c:2939
void trunk_connection_signal_reconnect(trunk_connection_t *tconn, connection_reason_t reason)
Signal a trunk connection is no longer viable.
Definition trunk.c:3999
void trunk_connection_signal_writable(trunk_connection_t *tconn)
Signal that a trunk connection is writable.
Definition trunk.c:3903
bool trunk_request_search(trunk_request_t *treq, void *ptr)
Definition trunk.c:5261
fr_dlist_t entry
List entry.
Definition trunk.c:187
static conf_parser_t const trunk_config_connection[]
Definition trunk.c:307
trunk_connection_state_t tconn_state
If the treq was associated with a connection the connection state at the time of the state transition...
Definition trunk.c:88
bool bound_to_conn
Fail the request if there's an attempt to re-enqueue it.
Definition trunk.c:116
static size_t trunk_cancellation_reasons_len
Definition trunk.c:423
static fr_table_num_ordered_t const trunk_cancellation_reasons[]
Definition trunk.c:417
static size_t trunk_conn_trigger_names_len
Definition trunk.c:394
fr_event_list_t * el
Event list used by this trunk and the connection.
Definition trunk.c:205
void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line, trunk_request_t const *treq)
Definition trunk.c:2828
#define IN_REQUEST_MUX(_trunk)
Definition trunk.c:707
fr_dlist_head_t free_requests
Requests in the unassigned state.
Definition trunk.c:209
bool trunk_connection_in_state(trunk_connection_t *tconn, int state)
Returns true if the trunk connection is in one of the specified states.
Definition trunk.c:4047
#define TREQ_HEAP_SEARCH(_heap)
#define REQUEST_EXTRACT_CANCEL_SENT(_treq)
Remove the current request from the cancel sent list.
Definition trunk.c:770
fr_dlist_t entry
Entry in the linked list.
Definition trunk.c:79
void trunk_request_signal_reapable(trunk_request_t *treq)
Signal that the request was written to a connection successfully, but no response is expected.
Definition trunk.c:2072
Associates request queues with a connection.
Definition trunk.c:134
Wraps a normal request.
Definition trunk.c:100
Trace state machine changes for a particular request.
Definition trunk.c:77
Main trunk management handle.
Definition trunk.c:198
An entry in a trunk watch function list.
Definition trunk.c:186
uint16_t max
Maximum number of connections in the trunk.
Definition trunk.h:231
uint32_t max_req_per_conn
Maximum requests per connection.
Definition trunk.h:240
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
Definition trunk.h:314
trunk_t *_CONST trunk
Trunk this request belongs to.
Definition trunk.h:347
bool backlog_on_failed_conn
Assign requests to the backlog when there are no available connections and the last connection event ...
Definition trunk.h:281
uint16_t min
Shouldn't let connections drop below this number.
Definition trunk.h:229
#define TRUNK_REQUEST_STATE_ALL
All request states.
Definition trunk.h:195
void *_CONST rctx
Resume ctx of the module.
Definition trunk.h:353
trunk_t *_CONST trunk
Trunk this connection belongs to.
Definition trunk.h:375
fr_heap_cmp_t connection_prioritise
Ordering function for connections.
Definition trunk.h:737
trunk_connection_state_t
Used for sanity checks and to track which list the connection is in.
Definition trunk.h:87
@ TRUNK_CONN_FULL
Connection is full and can't accept any more requests.
Definition trunk.h:95
@ TRUNK_CONN_CONNECTING
Connection is connecting.
Definition trunk.h:90
@ TRUNK_CONN_DRAINING
Connection will be closed once it has no more outstanding requests, if it's not reactivated.
Definition trunk.h:101
@ TRUNK_CONN_INACTIVE_DRAINING
Connection is inactive, can't accept any more requests, and will be closed once it has no more outsta...
Definition trunk.h:97
@ TRUNK_CONN_INACTIVE
Connection is inactive and can't accept any more requests.
Definition trunk.h:96
@ TRUNK_CONN_HALTED
Halted, ready to be freed.
Definition trunk.h:88
@ TRUNK_CONN_CLOSED
Connection was closed, either explicitly or due to failure.
Definition trunk.h:94
@ TRUNK_CONN_INIT
In the initial state.
Definition trunk.h:89
@ TRUNK_CONN_DRAINING_TO_FREE
Connection will be closed once it has no more outstanding requests.
Definition trunk.h:103
@ TRUNK_CONN_ACTIVE
Connection is connected and ready to service requests.
Definition trunk.h:91
unsigned req_pool_headers
How many chunk headers the talloc pool allocated with the treq should contain.
Definition trunk.h:266
request_t *_CONST request
The request that we're writing the data on behalf of.
Definition trunk.h:355
fr_time_t _CONST last_open
Last time the connection management function opened a connection.
Definition trunk.h:304
fr_time_delta_t idle_timeout
how long a connection can remain idle for
Definition trunk.h:250
trunk_connection_state_t _CONST state
What state the connection is in.
Definition trunk.h:367
size_t req_pool_size
The size of the talloc pool allocated with the treq.
Definition trunk.h:269
uint64_t max_uses
The maximum time a connection can be used.
Definition trunk.h:246
fr_time_delta_t lifetime
Time between reconnects.
Definition trunk.h:248
uint16_t connecting
Maximum number of connections that can be in the connecting state.
Definition trunk.h:233
uint64_t _CONST req_alloc_reused
How many requests were reused.
Definition trunk.h:328
uint32_t max_backlog
Maximum number of requests that can be in the backlog.
Definition trunk.h:244
fr_time_t _CONST last_failed
Last time a connection failed.
Definition trunk.h:312
trunk_request_state_t _CONST state
Which list the request is now located in.
Definition trunk.h:345
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
Definition trunk.h:371
trunk_connection_t *_CONST tconn
Connection this request belongs to.
Definition trunk.h:349
trunk_connection_alloc_t connection_alloc
Allocate a new connection_t.
Definition trunk.h:733
fr_time_t _CONST last_read_success
Last time we read a response.
Definition trunk.h:316
fr_time_t _CONST last_below_target
Last time average utilisation went below the target value.
Definition trunk.h:301
fr_time_t _CONST last_read_success
Last time we read from the connection.
Definition trunk.h:373
fr_time_delta_t close_delay
How long we must be below target utilisation to close an existing connection.
Definition trunk.h:255
uint16_t start
How many connections to start.
Definition trunk.h:227
fr_time_delta_t req_cleanup_delay
How long must a request in the unassigned (free) list not have been used for before it's cleaned up a...
Definition trunk.h:259
#define TRUNK_REQUEST_STATE_CANCEL_ALL
All requests in various cancellation states.
Definition trunk.h:213
bool always_writable
Set to true if our ability to write requests to a connection handle is not dependent on the state of ...
Definition trunk.h:271
trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
Definition trunk.h:72
@ TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
Definition trunk.h:79
@ TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
Definition trunk.h:77
@ TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
Definition trunk.h:73
@ TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
Definition trunk.h:75
#define TRUNK_CONN_ALL
All connection states.
Definition trunk.h:111
fr_heap_cmp_t request_prioritise
Ordering function for requests.
Definition trunk.h:739
uint64_t _CONST req_alloc
The number of requests currently allocated that have not been freed or returned to the free list.
Definition trunk.h:322
trunk_cancel_reason_t
Reasons for a request being cancelled.
Definition trunk.h:55
@ TRUNK_CANCEL_REASON_NONE
Request has not been cancelled.
Definition trunk.h:56
@ TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
Definition trunk.h:57
@ TRUNK_CANCEL_REASON_REQUEUE
A previously sent request is being requeued.
Definition trunk.h:59
@ TRUNK_CANCEL_REASON_MOVE
Request cancelled because it's being moved.
Definition trunk.h:58
uint64_t _CONST req_alloc_new
How many requests we've allocated.
Definition trunk.h:326
fr_time_delta_t open_delay
How long we must be above target utilisation to spawn a new connection.
Definition trunk.h:252
connection_t *_CONST conn
The underlying connection.
Definition trunk.h:369
trunk_state_t
Definition trunk.h:62
@ TRUNK_STATE_MAX
Definition trunk.h:66
@ TRUNK_STATE_PENDING
Trunk has connections, but none are active.
Definition trunk.h:65
@ TRUNK_STATE_ACTIVE
Trunk has active connections.
Definition trunk.h:64
@ TRUNK_STATE_IDLE
Trunk has no connections.
Definition trunk.h:63
fr_time_t _CONST last_closed
Last time the connection management function closed a connection.
Definition trunk.h:307
void(* trunk_watch_t)(trunk_t *trunk, trunk_state_t prev, trunk_state_t state, void *uctx)
Receive a notification when a trunk enters a particular state.
Definition trunk.h:724
fr_time_delta_t manage_interval
How often we run the management algorithm to open/close connections.
Definition trunk.h:263
trunk_enqueue_t
Definition trunk.h:148
@ TRUNK_ENQUEUE_DST_UNAVAILABLE
Destination is down.
Definition trunk.h:153
@ TRUNK_ENQUEUE_FAIL
General failure.
Definition trunk.h:154
@ TRUNK_ENQUEUE_OK
Operation was successful.
Definition trunk.h:150
@ TRUNK_ENQUEUE_NO_CAPACITY
At maximum number of connections, and no connection has capacity.
Definition trunk.h:151
@ TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
Definition trunk.h:149
void *_CONST preq
Data for the muxer to write to the connection.
Definition trunk.h:351
uint32_t target_req_per_conn
How many pending requests should ideally be running on each connection.
Definition trunk.h:236
fr_time_t _CONST last_connected
Last time a connection connected.
Definition trunk.h:310
trunk_request_cancel_mux_t request_cancel_mux
!< Read one or more requests from a connection.
Definition trunk.h:746
trunk_request_state_t
Used for sanity checks and to simplify freeing.
Definition trunk.h:161
@ TRUNK_REQUEST_STATE_PARTIAL
Some of the request was written to the socket, more of it should be written later.
Definition trunk.h:170
@ TRUNK_REQUEST_STATE_REAPABLE
Request has been written, needs to persist, but we are not currently waiting for any response.
Definition trunk.h:173
@ TRUNK_REQUEST_STATE_UNASSIGNED
Transition state - Request currently not assigned to any connection.
Definition trunk.h:165
@ TRUNK_REQUEST_STATE_INIT
Initial state.
Definition trunk.h:162
@ TRUNK_REQUEST_STATE_CANCEL_SENT
We've informed the remote server that the request has been cancelled.
Definition trunk.h:185
@ TRUNK_REQUEST_STATE_COMPLETE
The request is complete.
Definition trunk.h:182
@ TRUNK_REQUEST_STATE_FAILED
The request failed.
Definition trunk.h:183
@ TRUNK_REQUEST_STATE_CANCEL
A request on a particular socket was cancel.
Definition trunk.h:184
@ TRUNK_REQUEST_STATE_CANCEL_PARTIAL
We partially wrote a cancellation request.
Definition trunk.h:187
@ TRUNK_REQUEST_STATE_BACKLOG
In the backlog.
Definition trunk.h:167
@ TRUNK_REQUEST_STATE_CANCEL_COMPLETE
Remote server has acknowledged our cancellation.
Definition trunk.h:188
@ TRUNK_REQUEST_STATE_PENDING
In the queue of a connection and is pending writing.
Definition trunk.h:168
@ TRUNK_REQUEST_STATE_SENT
Was written to a socket. Waiting for a response.
Definition trunk.h:172
trunk_state_t _CONST state
Current state of the trunk.
Definition trunk.h:333
fr_time_t _CONST last_above_target
Last time average utilisation went above the target value.
Definition trunk.h:298
Common configuration parameters for a trunk.
Definition trunk.h:224
Public fields for the trunk connection.
Definition trunk.h:366
I/O functions to pass to trunk_alloc.
Definition trunk.h:732
Public fields for the trunk.
Definition trunk.h:294
Public fields for the trunk request.
Definition trunk.h:344
close(uq->fd)
static fr_event_list_t * el
static fr_slen_t head
Definition xlat.h:422
char const * fr_strerror(void)
Get the last library error.
Definition strerror.c:554
#define fr_box_time_delta(_val)
Definition value.h:343
int nonnull(2, 5))
static size_t char ** out
Definition value.h:997