The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
trunk.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 74726987fbc3f42d259d409cd5c3fa6492854a47 $
19 *
20 * @file src/lib/server/trunk.c
21 * @brief A management API for bonding multiple connections together.
22 *
23 * @copyright 2019-2020 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
24 * @copyright 2019-2020 The FreeRADIUS server project
25 */
26
27#define LOG_PREFIX trunk->log_prefix
28
29#ifdef NDEBUG
30# define TALLOC_GET_TYPE_ABORT_NOOP 1
31#endif
32
35typedef struct trunk_s trunk_t;
36#define _TRUNK_PRIVATE 1
37#include <freeradius-devel/server/trunk.h>
38
39#include <freeradius-devel/server/trigger.h>
40#include <freeradius-devel/util/debug.h>
41#include <freeradius-devel/util/misc.h>
42#include <freeradius-devel/util/syserror.h>
43#include <freeradius-devel/util/minmax_heap.h>
44
45#ifdef HAVE_STDATOMIC_H
46# include <stdatomic.h>
47# ifndef ATOMIC_VAR_INIT
48# define ATOMIC_VAR_INIT(_x) (_x)
49# endif
50#else
51# include <freeradius-devel/util/stdatomic.h>
52#endif
53
54static atomic_uint_fast64_t request_counter = ATOMIC_VAR_INIT(1);
55
56#ifdef TESTING_TRUNK
58
59static fr_time_t test_time(void)
60{
61 return test_time_base;
62}
63
64#define fr_time test_time
65#endif
66
67#ifndef NDEBUG
68/** The maximum number of state logs to record per request
69 *
70 */
71#define TRUNK_REQUEST_STATE_LOG_MAX 20
72
73/** Trace state machine changes for a particular request
74 *
75 */
76typedef struct {
77 fr_dlist_head_t *log_head; //!< To allow the log entry to remove itself on free.
78 fr_dlist_t entry; //!< Entry in the linked list.
79 trunk_request_state_t from; //!< What state we transitioned from.
80 trunk_request_state_t to; //!< What state we transitioned to.
81
82 trunk_connection_t *tconn; //!< The request was associated with.
83 ///< Pointer may now be invalid, do no de-reference.
84
85 uint64_t tconn_id; //!< If the treq was associated with a connection
86 ///< the connection ID.
87 trunk_connection_state_t tconn_state; //!< If the treq was associated with a connection
88 ///< the connection state at the time of the
89 ///< state transition.
90
91 char const *function; //!< State change occurred in.
92 int line; //!< Line change occurred on.
94#endif
95
96/** Wraps a normal request
97 *
98 */
100 struct trunk_request_pub_s pub; //!< Public fields in the trunk request.
101 ///< This *MUST* be the first field in this
102 ///< structure.
103
104 uint64_t id; //!< Trunk request ID.
105
106 fr_heap_index_t heap_id; //!< Used to track the request conn->pending heap.
107
108 fr_dlist_t entry; //!< Used to track the trunk request in the conn->sent
109 ///< or trunk->backlog request.
110
111 trunk_cancel_reason_t cancel_reason; //!< Why this request was cancelled.
112
113 fr_time_t last_freed; //!< Last time this request was freed.
114
115 bool bound_to_conn; //!< Fail the request if there's an attempt to
116 ///< re-enqueue it.
117
118 bool sent; //!< Trunk request has been sent at least once.
119 ///< Used so that re-queueing doesn't increase trunk
120 ///< `sent` count.
121
122#ifndef NDEBUG
123 fr_dlist_head_t log; //!< State change log.
124#endif
125};
126
127
128/** Associates request queues with a connection
129 *
130 * @dotfile src/lib/server/trunk_conn.gv "Trunk connection state machine"
131 * @dotfile src/lib/server/trunk_req.gv "Trunk request state machine"
132 */
134 struct trunk_connection_pub_s pub; //!< Public fields in the trunk connection.
135 ///< This *MUST* be the first field in this
136 ///< structure.
137
138 fr_heap_index_t heap_id; //!< Used to track the connection in the connected
139 ///< heap.
140
141 fr_dlist_t entry; //!< Used to track the connection in the connecting,
142 ///< full and failed lists.
143
144 /** @name State
145 * @{
146 */
147 trunk_connection_event_t events; //!< The current events we expect to be notified on.
148 /** @} */
149
150 /** @name Request lists
151 * @{
152 */
153 fr_heap_t *pending; //!< Requests waiting to be sent.
154
155 trunk_request_t *partial; //!< Partially written request.
156
157 fr_dlist_head_t sent; //!< Sent request.
158
159 fr_dlist_head_t reapable; //!< Idle request.
160
161 fr_dlist_head_t cancel; //!< Requests in the cancel state.
162
163 trunk_request_t *cancel_partial; //!< Partially written cancellation request.
164
165 fr_dlist_head_t cancel_sent; //!< Sent cancellation request.
166 /** @} */
167
168 /** @name Statistics
169 * @{
170 */
171 uint64_t sent_count; //!< The number of requests that have been sent using
172 ///< this connection.
173 /** @} */
174
175 /** @name Timers
176 * @{
177 */
178 fr_timer_t *lifetime_ev; //!< Maximum time this connection can be open.
179 /** @} */
180};
181
182/** An entry in a trunk watch function list
183 *
184 */
185typedef struct trunk_watch_entry_s {
186 fr_dlist_t entry; //!< List entry.
187 trunk_watch_t func; //!< Function to call when a trunk enters
188 ///< the state this list belongs to
189 bool oneshot; //!< Remove the function after it's called once.
190 bool enabled; //!< Whether the watch entry is enabled.
191 void *uctx; //!< User data to pass to the function.
193
194/** Main trunk management handle
195 *
196 */
197struct trunk_s {
198 struct trunk_pub_s pub; //!< Public fields in the trunk connection.
199 ///< This *MUST* be the first field in this
200 ///< structure.
201
202 char const *log_prefix; //!< What to prepend to messages.
203
204 fr_event_list_t *el; //!< Event list used by this trunk and the connection.
205
206 trunk_conf_t conf; //!< Trunk common configuration.
207
208 fr_dlist_head_t free_requests; //!< Requests in the unassigned state. Waiting to be
209 ///< enqueued.
210
211 fr_heap_t *backlog; //!< The request backlog. Requests we couldn't
212 ///< immediately assign to a connection.
213
214 /** @name Connection lists
215 *
216 * A connection must always be in exactly one of these lists
217 * or trees.
218 *
219 * @{
220 */
221 fr_dlist_head_t init; //!< Connections which have not yet started
222 ///< connecting.
223
224 fr_dlist_head_t connecting; //!< Connections which are not yet in the open state.
225
226 fr_minmax_heap_t *active; //!< Connections which can service requests.
227
228 fr_dlist_head_t full; //!< Connections which have too many outstanding
229 ///< requests.
230
231 fr_dlist_head_t inactive; //!< Connections which have been signalled to be
232 ///< inactive by the API client.
233
234 fr_dlist_head_t inactive_draining; //!< Connections which have been signalled to be
235 ///< inactive by the API client, which the trunk
236 ///< manager is draining to close.
237
238 fr_dlist_head_t failed; //!< Connections that'll be reconnected shortly.
239
240 fr_dlist_head_t closed; //!< Connections that have closed. Either due to
241 ///< shutdown, reconnection or failure.
242
243 fr_dlist_head_t draining; //!< Connections that will be freed once all their
244 ///< requests are complete, but can be reactivated.
245
246 fr_dlist_head_t draining_to_free; //!< Connections that will be freed once all their
247 ///< requests are complete.
248
249 fr_dlist_head_t to_free; //!< Connections we're done with and will free on
250 //!< the next call to trunk_manage.
251 //!< This prevents connections from being freed
252 //!< whilst we're inside callbacks.
253 /** @} */
254
255 /** @name Callbacks
256 * @{
257 */
258 trunk_io_funcs_t funcs; //!< I/O functions.
259
260 void *in_handler; //!< Which handler we're inside.
261
262 void *uctx; //!< Uctx data to pass to alloc.
263
264 fr_dlist_head_t watch[TRUNK_STATE_MAX]; //!< To be called when trunk changes state.
265
266 trunk_watch_entry_t *next_watcher; //!< Watcher about to be run. Used to prevent nested watchers.
267 /** @} */
268
269 /** @name Timers
270 * @{
271 */
272 fr_timer_t *manage_ev; //!< Periodic connection management event.
273 /** @} */
274
275 /** @name Log rate limiting entries
276 * @{
277 */
278 fr_rate_limit_t limit_max_requests_alloc_log; //!< Rate limit on "Refusing to alloc requests - Limit of * requests reached"
279
280 fr_rate_limit_t limit_last_failure_log; //!< Rate limit on "Refusing to enqueue requests - No active conns"
281 /** @} */
282
283 /** @name State
284 * @{
285 */
286 bool freeing; //!< Trunk is being freed, don't spawn new
287 ///< connections or re-enqueue.
288
289 bool started; //!< Has the trunk been started.
290
291 bool managing_connections; //!< Whether the trunk is allowed to manage
292 ///< (open/close) connections.
293
294 uint64_t last_req_per_conn; //!< The last request to connection ratio we calculated.
295 /** @} */
296};
297
299 { FR_CONF_OFFSET("per_connection_max", trunk_conf_t, max_req_per_conn), .dflt = "2000" },
300 { FR_CONF_OFFSET("per_connection_target", trunk_conf_t, target_req_per_conn), .dflt = "1000" },
301 { FR_CONF_OFFSET("free_delay", trunk_conf_t, req_cleanup_delay), .dflt = "10.0" },
302
304};
305
307 { FR_CONF_OFFSET("connect_timeout", connection_conf_t, connection_timeout), .dflt = "3.0" },
308 { FR_CONF_OFFSET("reconnect_delay", connection_conf_t, reconnection_delay), .dflt = "1" },
309
311};
312
313#ifndef TRUNK_TESTS
315 { FR_CONF_OFFSET("start", trunk_conf_t, start), .dflt = "1" },
316 { FR_CONF_OFFSET("min", trunk_conf_t, min), .dflt = "1" },
317 { FR_CONF_OFFSET("max", trunk_conf_t, max), .dflt = "5" },
318 { FR_CONF_OFFSET("connecting", trunk_conf_t, connecting), .dflt = "2" },
319 { FR_CONF_OFFSET("uses", trunk_conf_t, max_uses), .dflt = "0" },
320 { FR_CONF_OFFSET("lifetime", trunk_conf_t, lifetime), .dflt = "0" },
321 { FR_CONF_OFFSET("idle_timeout", trunk_conf_t, idle_timeout), .dflt = "0" },
322
323 { FR_CONF_OFFSET("open_delay", trunk_conf_t, open_delay), .dflt = "0.2" },
324 { FR_CONF_OFFSET("close_delay", trunk_conf_t, close_delay), .dflt = "10.0" },
325
326 { FR_CONF_OFFSET("manage_interval", trunk_conf_t, manage_interval), .dflt = "0.2" },
327
328 { FR_CONF_OFFSET("max_backlog", trunk_conf_t, max_backlog), .dflt = "1000" },
329
330 { FR_CONF_OFFSET_SUBSECTION("connection", 0, trunk_conf_t, conn_conf, trunk_config_connection), .subcs_size = sizeof(trunk_config_connection) },
331 { FR_CONF_POINTER("request", 0, CONF_FLAG_SUBSECTION, NULL), .subcs = (void const *) trunk_config_request },
332
334};
335#endif
336
337#ifndef NDEBUG
338/** Map request states to trigger names
339 *
340 * Must stay in the same order as #trunk_connection_state_t
341 */
343 { L("pool.request_init"), TRUNK_REQUEST_STATE_INIT }, /* 0x0000 - bit 0 */
344 { L("pool.request_unassigned"), TRUNK_REQUEST_STATE_UNASSIGNED }, /* 0x0001 - bit 1 */
345 { L("pool.request_backlog"), TRUNK_REQUEST_STATE_BACKLOG }, /* 0x0002 - bit 2 */
346 { L("pool.request_pending"), TRUNK_REQUEST_STATE_PENDING }, /* 0x0004 - bit 3 */
347 { L("pool.request_partial"), TRUNK_REQUEST_STATE_PARTIAL }, /* 0x0008 - bit 4 */
348 { L("pool.request_sent"), TRUNK_REQUEST_STATE_SENT }, /* 0x0010 - bit 5 */
349 { L("pool.request_state_reapable"), TRUNK_REQUEST_STATE_REAPABLE }, /* 0x0020 - bit 6 */
350 { L("pool.request_complete"), TRUNK_REQUEST_STATE_COMPLETE }, /* 0x0040 - bit 7 */
351 { L("pool.request_state_failed"), TRUNK_REQUEST_STATE_FAILED }, /* 0x0080 - bit 8 */
352 { L("pool.request_state_cancel"), TRUNK_REQUEST_STATE_CANCEL }, /* 0x0100 - bit 9 */
353 { L("pool.request_state_cancel_sent"), TRUNK_REQUEST_STATE_CANCEL_SENT }, /* 0x0200 - bit 10 */
354 { L("pool.request_state_cancel_partial"), TRUNK_REQUEST_STATE_CANCEL_PARTIAL }, /* 0x0400 - bit 11 */
355 { L("pool.request_state_cancel_complete"), TRUNK_REQUEST_STATE_CANCEL_COMPLETE }, /* 0x0800 - bit 12 */
356};
358#endif
359
361 { L("INIT"), TRUNK_REQUEST_STATE_INIT },
362 { L("UNASSIGNED"), TRUNK_REQUEST_STATE_UNASSIGNED },
363 { L("BACKLOG"), TRUNK_REQUEST_STATE_BACKLOG },
364 { L("PENDING"), TRUNK_REQUEST_STATE_PENDING },
365 { L("PARTIAL"), TRUNK_REQUEST_STATE_PARTIAL },
366 { L("SENT"), TRUNK_REQUEST_STATE_SENT },
367 { L("REAPABLE"), TRUNK_REQUEST_STATE_REAPABLE },
368 { L("COMPLETE"), TRUNK_REQUEST_STATE_COMPLETE },
369 { L("FAILED"), TRUNK_REQUEST_STATE_FAILED },
370 { L("CANCEL"), TRUNK_REQUEST_STATE_CANCEL },
371 { L("CANCEL-SENT"), TRUNK_REQUEST_STATE_CANCEL_SENT },
372 { L("CANCEL-PARTIAL"), TRUNK_REQUEST_STATE_CANCEL_PARTIAL },
373 { L("CANCEL-COMPLETE"), TRUNK_REQUEST_STATE_CANCEL_COMPLETE }
374};
376
377/** Map connection states to trigger names
378 *
379 * Must stay in the same order as #trunk_connection_state_t
380 */
382 { L("pool.connection_halted"), TRUNK_CONN_HALTED }, /* 0x0000 - bit 0 */
383 { L("pool.connection_init"), TRUNK_CONN_INIT }, /* 0x0001 - bit 1 */
384 { L("pool.connection_connecting"), TRUNK_CONN_CONNECTING }, /* 0x0002 - bit 2 */
385 { L("pool.connection_active"), TRUNK_CONN_ACTIVE }, /* 0x0004 - bit 3 */
386 { L("pool.connection_closed"), TRUNK_CONN_CLOSED }, /* 0x0008 - bit 4 */
387 { L("pool.connection_full"), TRUNK_CONN_FULL }, /* 0x0010 - bit 5 */
388 { L("pool.connection_inactive"), TRUNK_CONN_INACTIVE }, /* 0x0020 - bit 6 */
389 { L("pool.connection_inactive_draining"), TRUNK_CONN_INACTIVE_DRAINING }, /* 0x0040 - bit 7 */
390 { L("pool.connection_draining"), TRUNK_CONN_DRAINING }, /* 0x0080 - bit 8 */
391 { L("pool.connection_draining_to_free"), TRUNK_CONN_DRAINING_TO_FREE } /* 0x0100 - bit 9 */
392};
394
396 { L("IDLE"), TRUNK_STATE_IDLE },
397 { L("ACTIVE"), TRUNK_STATE_ACTIVE },
398 { L("PENDING"), TRUNK_STATE_PENDING }
399};
401
403 { L("INIT"), TRUNK_CONN_INIT },
404 { L("HALTED"), TRUNK_CONN_HALTED },
405 { L("CONNECTING"), TRUNK_CONN_CONNECTING },
406 { L("ACTIVE"), TRUNK_CONN_ACTIVE },
407 { L("CLOSED"), TRUNK_CONN_CLOSED },
408 { L("FULL"), TRUNK_CONN_FULL },
409 { L("INACTIVE"), TRUNK_CONN_INACTIVE },
410 { L("INACTIVE-DRAINING"), TRUNK_CONN_INACTIVE_DRAINING },
411 { L("DRAINING"), TRUNK_CONN_DRAINING },
412 { L("DRAINING-TO-FREE"), TRUNK_CONN_DRAINING_TO_FREE }
413};
415
417 { L("TRUNK_CANCEL_REASON_NONE"), TRUNK_CANCEL_REASON_NONE },
418 { L("TRUNK_CANCEL_REASON_SIGNAL"), TRUNK_CANCEL_REASON_SIGNAL },
419 { L("TRUNK_CANCEL_REASON_MOVE"), TRUNK_CANCEL_REASON_MOVE },
420 { L("TRUNK_CANCEL_REASON_REQUEUE"), TRUNK_CANCEL_REASON_REQUEUE }
421};
423
425 { L("TRUNK_CONN_EVENT_NONE"), TRUNK_CONN_EVENT_NONE },
426 { L("TRUNK_CONN_EVENT_READ"), TRUNK_CONN_EVENT_READ },
427 { L("TRUNK_CONN_EVENT_WRITE"), TRUNK_CONN_EVENT_WRITE },
428 { L("TRUNK_CONN_EVENT_BOTH"), TRUNK_CONN_EVENT_BOTH },
429};
431
432#define CONN_TRIGGER(_state) do { \
433 if (trunk->pub.triggers) { \
434 trigger_exec(unlang_interpret_get_thread_default(), \
435 NULL, fr_table_str_by_value(trunk_conn_trigger_names, _state, \
436 "<INVALID>"), true, NULL); \
437 } \
438} while (0)
439
440#define CONN_STATE_TRANSITION(_new, _log) \
441do { \
442 _log("[%" PRIu64 "] Trunk connection changed state %s -> %s", \
443 tconn->pub.conn->id, \
444 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
445 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>")); \
446 tconn->pub.state = _new; \
447 CONN_TRIGGER(_new); \
448 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false); \
449} while (0)
450
451#define CONN_BAD_STATE_TRANSITION(_new) \
452do { \
453 if (!fr_cond_assert_msg(0, "[%" PRIu64 "] Trunk connection invalid transition %s -> %s", \
454 tconn->pub.conn->id, \
455 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
456 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>"))) return; \
457} while (0)
458
459#ifndef NDEBUG
460void trunk_request_state_log_entry_add(char const *function, int line,
461 trunk_request_t *treq, trunk_request_state_t new) CC_HINT(nonnull);
462
463#define REQUEST_TRIGGER(_state) do { \
464 if (trunk->pub.triggers) { \
465 trigger_exec(unlang_interpret_get_thread_default(), \
466 NULL, fr_table_str_by_value(trunk_req_trigger_names, _state, \
467 "<INVALID>"), true, NULL); \
468 } \
469} while (0)
470
471/** Record a request state transition and log appropriate output
472 *
473 */
474#define REQUEST_STATE_TRANSITION(_new) \
475do { \
476 request_t *request = treq->pub.request; \
477 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
478 treq->id, \
479 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
480 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
481 trunk_request_state_log_entry_add(__FUNCTION__, __LINE__, treq, _new); \
482 treq->pub.state = _new; \
483 REQUEST_TRIGGER(_new); \
484} while (0)
485#define REQUEST_BAD_STATE_TRANSITION(_new) \
486do { \
487 trunk_request_state_log(&default_log, L_ERR, __FILE__, __LINE__, treq); \
488 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
489 treq->id, \
490 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
491 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
492} while (0)
493#else
494/** Record a request state transition
495 *
496 */
497#define REQUEST_STATE_TRANSITION(_new) \
498do { \
499 request_t *request = treq->pub.request; \
500 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
501 treq->id, \
502 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
503 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
504 treq->pub.state = _new; \
505} while (0)
506#define REQUEST_BAD_STATE_TRANSITION(_new) \
507do { \
508 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
509 treq->id, \
510 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
511 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
512} while (0)
513#endif
514
515
516/** Call the cancel callback if set
517 *
518 */
519#define DO_REQUEST_CANCEL(_treq, _reason) \
520do { \
521 if ((_treq)->pub.trunk->funcs.request_cancel) { \
522 request_t *request = (_treq)->pub.request; \
523 void *_prev = (_treq)->pub.trunk->in_handler; \
524 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_cancel; \
525 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_cancel(conn=%p, preq=%p, reason=%s, uctx=%p)", \
526 (_treq)->pub.tconn->pub.conn, \
527 (_treq)->pub.preq, \
528 fr_table_str_by_value(trunk_cancellation_reasons, \
529 (_reason), \
530 "<INVALID>"), \
531 (_treq)->pub.trunk->uctx); \
532 (_treq)->pub.trunk->funcs.request_cancel((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_reason), (_treq)->pub.trunk->uctx); \
533 (_treq)->pub.trunk->in_handler = _prev; \
534 } \
535} while(0)
536
537/** Call the "conn_release" callback (if set)
538 *
539 */
540#define DO_REQUEST_CONN_RELEASE(_treq) \
541do { \
542 if ((_treq)->pub.trunk->funcs.request_conn_release) { \
543 request_t *request = (_treq)->pub.request; \
544 void *_prev = (_treq)->pub.trunk->in_handler; \
545 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_conn_release; \
546 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_conn_release(conn=%p, preq=%p, uctx=%p)", \
547 (_treq)->pub.tconn->pub.conn, \
548 (_treq)->pub.preq, \
549 (_treq)->pub.trunk->uctx); \
550 (_treq)->pub.trunk->funcs.request_conn_release((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
551 (_treq)->pub.trunk->in_handler = _prev; \
552 } \
553} while(0)
554
555/** Call the complete callback (if set)
556 *
557 */
558#define DO_REQUEST_COMPLETE(_treq) \
559do { \
560 if ((_treq)->pub.trunk->funcs.request_complete) { \
561 request_t *request = (_treq)->pub.request; \
562 void *_prev = (_treq)->pub.trunk->in_handler; \
563 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_complete(request=%p, preq=%p, rctx=%p, uctx=%p)", \
564 (_treq)->pub.request, \
565 (_treq)->pub.preq, \
566 (_treq)->pub.rctx, \
567 (_treq)->pub.trunk->uctx); \
568 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_complete; \
569 (_treq)->pub.trunk->funcs.request_complete((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, (_treq)->pub.trunk->uctx); \
570 (_treq)->pub.trunk->in_handler = _prev; \
571 } \
572} while(0)
573
574/** Call the fail callback (if set)
575 *
576 */
577#define DO_REQUEST_FAIL(_treq, _prev_state) \
578do { \
579 if ((_treq)->pub.trunk->funcs.request_fail) { \
580 request_t *request = (_treq)->pub.request; \
581 void *_prev = (_treq)->pub.trunk->in_handler; \
582 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_fail(request=%p, preq=%p, rctx=%p, state=%s uctx=%p)", \
583 (_treq)->pub.request, \
584 (_treq)->pub.preq, \
585 (_treq)->pub.rctx, \
586 fr_table_str_by_value(trunk_request_states, (_prev_state), "<INVALID>"), \
587 (_treq)->pub.trunk->uctx); \
588 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_fail; \
589 (_treq)->pub.trunk->funcs.request_fail((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, _prev_state, (_treq)->pub.trunk->uctx); \
590 (_treq)->pub.trunk->in_handler = _prev; \
591 } \
592} while(0)
593
594/** Call the free callback (if set)
595 *
596 */
597#define DO_REQUEST_FREE(_treq) \
598do { \
599 if ((_treq)->pub.trunk->funcs.request_free) { \
600 request_t *request = (_treq)->pub.request; \
601 void *_prev = (_treq)->pub.trunk->in_handler; \
602 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_free(request=%p, preq=%p, uctx=%p)", \
603 (_treq)->pub.request, \
604 (_treq)->pub.preq, \
605 (_treq)->pub.trunk->uctx); \
606 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_free; \
607 (_treq)->pub.trunk->funcs.request_free((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
608 (_treq)->pub.trunk->in_handler = _prev; \
609 } \
610} while(0)
611
612/** Write one or more requests to a connection
613 *
614 */
615#define DO_REQUEST_MUX(_tconn) \
616do { \
617 void *_prev = (_tconn)->pub.trunk->in_handler; \
618 DEBUG3("[%" PRIu64 "] Calling request_mux(el=%p, tconn=%p, conn=%p, uctx=%p)", \
619 (_tconn)->pub.conn->id, \
620 (_tconn)->pub.trunk->el, \
621 (_tconn), \
622 (_tconn)->pub.conn, \
623 (_tconn)->pub.trunk->uctx); \
624 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_mux; \
625 (_tconn)->pub.trunk->funcs.request_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
626 (_tconn)->pub.trunk->in_handler = _prev; \
627} while(0)
628
629/** Read one or more requests from a connection
630 *
631 */
632#define DO_REQUEST_DEMUX(_tconn) \
633do { \
634 void *_prev = (_tconn)->pub.trunk->in_handler; \
635 DEBUG3("[%" PRIu64 "] Calling request_demux(tconn=%p, conn=%p, uctx=%p)", \
636 (_tconn)->pub.conn->id, \
637 (_tconn), \
638 (_tconn)->pub.conn, \
639 (_tconn)->pub.trunk->uctx); \
640 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_demux; \
641 (_tconn)->pub.trunk->funcs.request_demux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
642 (_tconn)->pub.trunk->in_handler = _prev; \
643} while(0)
644
645/** Write one or more cancellation requests to a connection
646 *
647 */
648#define DO_REQUEST_CANCEL_MUX(_tconn) \
649do { \
650 if ((_tconn)->pub.trunk->funcs.request_cancel_mux) { \
651 void *_prev = (_tconn)->pub.trunk->in_handler; \
652 DEBUG3("[%" PRIu64 "] Calling request_cancel_mux(tconn=%p, conn=%p, uctx=%p)", \
653 (_tconn)->pub.conn->id, \
654 (_tconn), \
655 (_tconn)->pub.conn, \
656 (_tconn)->pub.trunk->uctx); \
657 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_cancel_mux; \
658 (_tconn)->pub.trunk->funcs.request_cancel_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
659 (_tconn)->pub.trunk->in_handler = _prev; \
660 } \
661} while(0)
662
663/** Allocate a new connection
664 *
665 */
666#define DO_CONNECTION_ALLOC(_tconn) \
667do { \
668 void *_prev = trunk->in_handler; \
669 DEBUG3("Calling connection_alloc(tconn=%p, el=%p, conf=%p, log_prefix=\"%s\", uctx=%p)", \
670 (_tconn), \
671 (_tconn)->pub.trunk->el, \
672 (_tconn)->pub.trunk->conf.conn_conf, \
673 trunk->log_prefix, \
674 (_tconn)->pub.trunk->uctx); \
675 (_tconn)->pub.trunk->in_handler = (void *) (_tconn)->pub.trunk->funcs.connection_alloc; \
676 (_tconn)->pub.conn = trunk->funcs.connection_alloc((_tconn), (_tconn)->pub.trunk->el, (_tconn)->pub.trunk->conf.conn_conf, (_tconn)->pub.trunk->log_prefix, trunk->uctx); \
677 (_tconn)->pub.trunk->in_handler = _prev; \
678 if (!(_tconn)->pub.conn) { \
679 ERROR("Failed creating new connection"); \
680 talloc_free(tconn); \
681 return -1; \
682 } \
683} while(0)
684
685/** Change what events the connection should be notified about
686 *
687 */
688#define DO_CONNECTION_NOTIFY(_tconn, _events) \
689do { \
690 if ((_tconn)->pub.trunk->funcs.connection_notify) { \
691 void *_prev = (_tconn)->pub.trunk->in_handler; \
692 DEBUG3("[%" PRIu64 "] Calling connection_notify(tconn=%p, conn=%p, el=%p, events=%s, uctx=%p)", \
693 (_tconn)->pub.conn->id, \
694 (_tconn), \
695 (_tconn)->pub.conn, \
696 (_tconn)->pub.trunk->el, \
697 fr_table_str_by_value(trunk_connection_events, (_events), "<INVALID>"), \
698 (_tconn)->pub.trunk->uctx); \
699 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.connection_notify; \
700 (_tconn)->pub.trunk->funcs.connection_notify((_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->el, (_events), (_tconn)->pub.trunk->uctx); \
701 (_tconn)->pub.trunk->in_handler = _prev; \
702 } \
703} while(0)
704
705#define IN_HANDLER(_trunk) (((_trunk)->in_handler) != NULL)
706#define IN_REQUEST_MUX(_trunk) (((_trunk)->funcs.request_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_mux))
707#define IN_REQUEST_DEMUX(_trunk) (((_trunk)->funcs.request_demux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_demux))
708#define IN_REQUEST_CANCEL_MUX(_trunk) (((_trunk)->funcs.request_cancel_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_cancel_mux))
709
710#define IS_SERVICEABLE(_tconn) ((_tconn)->pub.state & TRUNK_CONN_SERVICEABLE)
711#define IS_PROCESSING(_tconn) ((tconn)->pub.state & TRUNK_CONN_PROCESSING)
712
713/** Remove the current request from the backlog
714 *
715 */
716#define REQUEST_EXTRACT_BACKLOG(_treq) \
717do { \
718 int _ret; \
719 _ret = fr_heap_extract(&(_treq)->pub.trunk->backlog, _treq); \
720 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from backlog heap: %s", fr_strerror())) break; \
721} while (0)
722
723/** Remove the current request from the pending list
724 *
725 */
726#define REQUEST_EXTRACT_PENDING(_treq) \
727do { \
728 int _ret; \
729 _ret = fr_heap_extract(&(_treq)->pub.tconn->pending, _treq); \
730 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from pending heap: %s", fr_strerror())) break; \
731} while (0)
732
733/** Remove the current request from the partial slot
734 *
735 */
736#define REQUEST_EXTRACT_PARTIAL(_treq) \
737do { \
738 fr_assert((_treq)->pub.tconn->partial == treq); \
739 tconn->partial = NULL; \
740} while (0)
741
742/** Remove the current request from the sent list
743 *
744 */
745#define REQUEST_EXTRACT_SENT(_treq) fr_dlist_remove(&tconn->sent, treq)
746
747/** Remove the current request from the reapable list
748 *
749 */
750#define REQUEST_EXTRACT_REAPABLE(_treq) fr_dlist_remove(&tconn->reapable, treq)
751
752/** Remove the current request from the cancel list
753 *
754 */
755#define REQUEST_EXTRACT_CANCEL(_treq) fr_dlist_remove(&tconn->cancel, treq)
756
757/** Remove the current request from the cancel_partial slot
758 *
759 */
760#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq) \
761do { \
762 fr_assert((_treq)->pub.tconn->cancel_partial == treq); \
763 tconn->cancel_partial = NULL; \
764} while (0)
765
766/** Remove the current request from the cancel sent list
767 *
768 */
769#define REQUEST_EXTRACT_CANCEL_SENT(_treq) fr_dlist_remove(&tconn->cancel_sent, treq)
770
771/** Reorder the connections in the active heap
772 *
773 * fr_heap_extract will also error out if heap_id is bad - no need for assert
774 */
775#define CONN_REORDER(_tconn) \
776do { \
777 int _ret; \
778 if ((fr_minmax_heap_num_elements((_tconn)->pub.trunk->active) == 1)) break; \
779 if (!fr_cond_assert((_tconn)->pub.state == TRUNK_CONN_ACTIVE)) break; \
780 _ret = fr_minmax_heap_extract((_tconn)->pub.trunk->active, (_tconn)); \
781 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) break; \
782 fr_minmax_heap_insert((_tconn)->pub.trunk->active, (_tconn)); \
783} while (0)
784
785/** Call a list of watch functions associated with a state
786 *
787 */
789{
790 /*
791 * Nested watcher calls are not allowed
792 * and shouldn't be possible because of
793 * deferred signal processing.
794 */
795 fr_assert(trunk->next_watcher == NULL);
796
797 while ((trunk->next_watcher = fr_dlist_next(list, trunk->next_watcher))) {
798 trunk_watch_entry_t *entry = trunk->next_watcher;
799 bool oneshot = entry->oneshot; /* Watcher could be freed, so store now */
800
801 if (!entry->enabled) continue;
802 if (oneshot) trunk->next_watcher = fr_dlist_remove(list, entry);
803
804 entry->func(trunk, trunk->pub.state, state, entry->uctx);
805
806 if (oneshot) talloc_free(entry);
807 }
808 trunk->next_watcher = NULL;
809}
810
811/** Call the state change watch functions
812 *
813 */
814#define CALL_WATCHERS(_trunk, _state) \
815do { \
816 if (fr_dlist_empty(&(_trunk)->watch[_state])) break; \
817 trunk_watch_call((_trunk), &(_trunk)->watch[_state], _state); \
818} while(0)
819
820/** Remove a watch function from a trunk state list
821 *
822 * @param[in] trunk The trunk to remove the watcher from.
823 * @param[in] state to remove the watch from.
824 * @param[in] watch Function to remove.
825 * @return
826 * - 0 if the function was removed successfully.
827 * - -1 if the function wasn't present in the watch list.
828 * - -2 if an invalid state was passed.
829 */
831{
832 trunk_watch_entry_t *entry = NULL;
833 fr_dlist_head_t *list;
834
835 if (state >= TRUNK_STATE_MAX) return -2;
836
837 list = &trunk->watch[state];
838 while ((entry = fr_dlist_next(list, entry))) {
839 if (entry->func == watch) {
840 if (trunk->next_watcher == entry) {
841 trunk->next_watcher = fr_dlist_remove(list, entry);
842 } else {
843 fr_dlist_remove(list, entry);
844 }
845 talloc_free(entry);
846 return 0;
847 }
848 }
849
850 return -1;
851}
852
853/** Add a watch entry to the trunk state list
854 *
855 * @param[in] trunk The trunk to add the watcher to.
856 * @param[in] state to watch for.
857 * @param[in] watch Function to add.
858 * @param[in] oneshot Should this watcher only be run once.
859 * @param[in] uctx Context to pass to function.
860 * @return
861 * - NULL if an invalid state is passed.
862 * - A new watch entry handle on success.
863 */
865 trunk_watch_t watch, bool oneshot, void const *uctx)
866{
867 trunk_watch_entry_t *entry;
868 fr_dlist_head_t *list;
869
870 if (state >= TRUNK_STATE_MAX) return NULL;
871
872 list = &trunk->watch[state];
873 MEM(entry = talloc_zero(trunk, trunk_watch_entry_t));
874
875 entry->func = watch;
876 entry->oneshot = oneshot;
877 entry->enabled = true;
878 memcpy(&entry->uctx, &uctx, sizeof(entry->uctx));
879 fr_dlist_insert_tail(list, entry);
880
881 return entry;
882}
883
884#define TRUNK_STATE_TRANSITION(_new) \
885do { \
886 DEBUG3("Trunk changed state %s -> %s", \
887 fr_table_str_by_value(trunk_states, trunk->pub.state, "<INVALID>"), \
888 fr_table_str_by_value(trunk_states, _new, "<INVALID>")); \
889 CALL_WATCHERS(trunk, _new); \
890 trunk->pub.state = _new; \
891} while (0)
892
893static void trunk_request_enter_backlog(trunk_request_t *treq, bool new);
894static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new);
903
904static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out,
905 trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify);
906
907static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now);
908static inline void trunk_connection_auto_full(trunk_connection_t *tconn);
909static inline void trunk_connection_auto_unfull(trunk_connection_t *tconn);
910static inline void trunk_connection_readable(trunk_connection_t *tconn);
911static inline void trunk_connection_writable(trunk_connection_t *tconn);
919
920static void trunk_rebalance(trunk_t *trunk);
921static void trunk_manage(trunk_t *trunk, fr_time_t now);
922static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx);
923static void trunk_backlog_drain(trunk_t *trunk);
924
925/** Compare two protocol requests
926 *
927 * Allows protocol requests to be prioritised with a function
928 * specified by the API client. Defaults to by pointer address
929 * if no function is specified.
930 *
931 * @param[in] a treq to compare to b.
932 * @param[in] b treq to compare to a.
933 * @return
934 * - +1 if a > b.
935 * - 0 if a == b.
936 * - -1 if a < b.
937 */
938static int8_t _trunk_request_prioritise(void const *a, void const *b)
939{
942
943 fr_assert(treq_a->pub.trunk == treq_b->pub.trunk);
944
945 return treq_a->pub.trunk->funcs.request_prioritise(treq_a->pub.preq, treq_b->pub.preq);
946}
947
948/** Remove a request from all connection lists
949 *
950 * A common function used by init, fail, complete state functions to disassociate
951 * a request from a connection in preparation for freeing or reassignment.
952 *
953 * Despite its unassuming name, this function is *the* place to put calls to
954 * functions which need to be called when the number of requests associated with
955 * a connection changes.
956 *
957 * Trunk requests will always be passed to this function before they're removed
958 * from a connection, even if the requests are being freed.
959 *
960 * @param[in] treq to trigger a state change for.
961 */
963{
964 trunk_connection_t *tconn = treq->pub.tconn;
965 trunk_t *trunk = treq->pub.trunk;
966
967 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
968
969 switch (treq->pub.state) {
971 return; /* Not associated with connection */
972
975 break;
976
979 break;
980
983 break;
984
987 break;
988
991 break;
992
995 break;
996
999 break;
1000
1001 default:
1002 fr_assert(0);
1003 break;
1004 }
1005
1006 /*
1007 * If the request wasn't associated with a
1008 * connection, then there's nothing more
1009 * to do.
1010 */
1011 if (!tconn) return;
1012
1013 {
1014 request_t *request = treq->pub.request;
1015
1016 ROPTIONAL(RDEBUG3, DEBUG3, "%s Trunk connection released request %" PRIu64,
1017 tconn->pub.conn->name, treq->id);
1018 }
1019 /*
1020 * Release any connection specific resources the
1021 * treq holds.
1022 */
1024
1025 switch (tconn->pub.state){
1026 case TRUNK_CONN_FULL:
1027 trunk_connection_auto_unfull(tconn); /* Check if we can switch back to active */
1028 if (tconn->pub.state == TRUNK_CONN_FULL) break; /* Only fallthrough if conn is now active */
1030
1031 case TRUNK_CONN_ACTIVE:
1032 CONN_REORDER(tconn);
1033 break;
1034
1035 default:
1036 break;
1037 }
1038
1039 treq->pub.tconn = NULL;
1040
1041 /*
1042 * Request removed from the connection
1043 * see if we need up deregister I/O events.
1044 */
1046}
1047
1048/** Transition a request to the unassigned state, in preparation for re-assignment
1049 *
1050 * @note treq->tconn may be inviable after calling
1051 * if treq->conn and connection_signals_pause are not used.
1052 * This is due to call to trunk_request_remove_from_conn.
1053 *
1054 * @param[in] treq to trigger a state change for.
1055 */
1057{
1058 trunk_t *trunk = treq->pub.trunk;
1059
1060 switch (treq->pub.state) {
1062 return;
1063
1066 break;
1067
1073 break;
1074
1075 default:
1077 }
1078
1080}
1081
1082/** Transition a request to the backlog state, adding it to the backlog of the trunk
1083 *
1084 * @note treq->tconn and treq may be inviable after calling
1085 * if treq->conn and connection_signals_pause are not used.
1086 * This is due to call to trunk_manage.
1087 *
1088 * @param[in] treq to trigger a state change for.
1089 * @param[in] new Whether this is a new request.
1090 */
1092{
1093 trunk_connection_t *tconn = treq->pub.tconn;
1094 trunk_t *trunk = treq->pub.trunk;
1095
1096 switch (treq->pub.state) {
1099 break;
1100
1103 break;
1104
1107 break;
1108
1109 default:
1111 }
1112
1114 fr_heap_insert(&trunk->backlog, treq); /* Insert into the backlog heap */
1115
1116 /*
1117 * A new request has entered the trunk.
1118 * Re-calculate request/connection ratios.
1119 */
1120 if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1121
1122 /*
1123 * To reduce latency, if there's no connections
1124 * in the connecting state, call the trunk manage
1125 * function immediately.
1126 *
1127 * Likewise, if there's draining connections
1128 * which could be moved back to active call
1129 * the trunk manage function.
1130 *
1131 * Remember requests only enter the backlog if
1132 * there's no connections which can service them.
1133 */
1137 }
1138}
1139
1140/** Transition a request to the pending state, adding it to the backlog of an active connection
1141 *
1142 * All trunk requests being added to a connection get passed to this function.
1143 * All trunk requests being removed from a connection get passed to #trunk_request_remove_from_conn.
1144 *
1145 * @note treq->tconn and treq may be inviable after calling
1146 * if treq->conn and connection_signals_pause is not used.
1147 * This is due to call to trunk_connection_event_update.
1148 *
1149 * @param[in] treq to trigger a state change for.
1150 * @param[in] tconn to enqueue the request on.
1151 * @param[in] new Whether this is a new request.
1152 */
1154{
1155 trunk_t *trunk = treq->pub.trunk;
1156
1157 fr_assert(tconn->pub.trunk == trunk);
1158 fr_assert(IS_PROCESSING(tconn));
1159
1160 switch (treq->pub.state) {
1163 fr_assert(!treq->pub.tconn);
1164 break;
1165
1167 fr_assert(!treq->pub.tconn);
1169 break;
1170
1171 case TRUNK_REQUEST_STATE_CANCEL: /* Moved from another connection */
1173 break;
1174
1175 default:
1177 }
1178
1179 /*
1180 * Assign the new connection first this first so
1181 * it appears in the state log.
1182 */
1183 treq->pub.tconn = tconn;
1184
1186
1187 {
1188 request_t *request = treq->pub.request;
1189
1190 ROPTIONAL(RDEBUG, DEBUG3, "%s Trunk connection assigned request %"PRIu64,
1191 tconn->pub.conn->name, treq->id);
1192 }
1193 fr_heap_insert(&tconn->pending, treq);
1194
1195 /*
1196 * A new request has entered the trunk.
1197 * Re-calculate request/connection ratios.
1198 */
1199 if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1200
1201 /*
1202 * Check if we need to automatically transition the
1203 * connection to full.
1204 */
1206
1207 /*
1208 * Reorder the connection in the heap now it has an
1209 * additional request.
1210 */
1211 if (tconn->pub.state == TRUNK_CONN_ACTIVE) CONN_REORDER(tconn);
1212
1213 /*
1214 * We have a new request, see if we need to register
1215 * for I/O events.
1216 */
1218}
1219
1220/** Transition a request to the partial state, indicating that is has been partially sent
1221 *
1222 * @param[in] treq to trigger a state change for.
1223 */
1225{
1226 trunk_connection_t *tconn = treq->pub.tconn;
1227 trunk_t *trunk = treq->pub.trunk;
1228
1229 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1230
1231 switch (treq->pub.state) {
1232 case TRUNK_REQUEST_STATE_PENDING: /* All requests go through pending, even requeued ones */
1234 break;
1235
1236 default:
1238 }
1239
1240 fr_assert(!tconn->partial);
1241 tconn->partial = treq;
1242
1244}
1245
1246/** Transition a request to the sent state, indicating that it's been sent in its entirety
1247 *
1248 * @note treq->tconn and treq may be inviable after calling
1249 * if treq->conn and connection_signals_pause is not used.
1250 * This is due to call to trunk_connection_event_update.
1251 *
1252 * @param[in] treq to trigger a state change for.
1253 */
1255{
1256 trunk_connection_t *tconn = treq->pub.tconn;
1257 trunk_t *trunk = treq->pub.trunk;
1258
1259 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1260
1261 switch (treq->pub.state) {
1264 break;
1265
1268 break;
1269
1270 default:
1272 }
1273
1275 fr_dlist_insert_tail(&tconn->sent, treq);
1276
1277 /*
1278 * Update the connection's sent stats if this is the
1279 * first time this request is being sent.
1280 */
1281 if (!treq->sent) {
1282 trunk->pub.last_write_success = fr_time();
1283
1285 tconn->sent_count++;
1286 treq->sent = true;
1287
1288 /*
1289 * Enforces max_uses
1290 */
1291 if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1292 DEBUG3("Trunk hit max uses %" PRIu64 " at %d", trunk->conf.max_uses, __LINE__);
1294 }
1295 }
1296
1297 /*
1298 * We just sent a request, we probably need
1299 * to tell the event loop we want to be
1300 * notified if there's data available.
1301 */
1303}
1304
1305/** Transition a request to the reapable state, indicating that it's been sent in its entirety, but no response is expected
1306 *
1307 * @note Largely a replica of trunk_request_enter_sent.
1308 *
1309 * @param[in] treq to trigger a state change for.
1310 */
1312{
1313 trunk_connection_t *tconn = treq->pub.tconn;
1314 trunk_t *trunk = treq->pub.trunk;
1315
1316 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1317
1318 switch (treq->pub.state) {
1321 break;
1322
1325 break;
1326
1327 default:
1329 }
1330
1332 fr_dlist_insert_tail(&tconn->reapable, treq);
1333
1334 if (!treq->sent) {
1335 tconn->sent_count++;
1336 treq->sent = true;
1337
1338 if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1339 DEBUG3("Trunk hit max uses %" PRIu64 " at %d", trunk->conf.max_uses, __LINE__);
1341 }
1342 }
1343
1345}
1346
1347/** Transition a request to the cancel state, placing it in a connection's cancellation list
1348 *
1349 * If a request_cancel_send callback is provided, that callback will
1350 * be called periodically for requests which were cancelled due to
1351 * a signal.
1352 *
1353 * The request_cancel_send callback will dequeue cancelled requests
1354 * and inform a remote server that the result is no longer required.
1355 *
1356 * A request must enter this state before being added to the backlog
1357 * of another connection if it's been sent or partially sent.
1358 *
1359 * @note treq->tconn and treq may be inviable after calling
1360 * if treq->conn and connection_signals_pause is not used.
1361 * This is due to call to trunk_connection_event_update.
1362 *
1363 * @param[in] treq to trigger a state change for.
1364 * @param[in] reason Why the request was cancelled.
1365 * Should be one of:
1366 * - TRUNK_CANCEL_REASON_SIGNAL request cancelled
1367 * because of a signal from the interpreter.
1368 * - TRUNK_CANCEL_REASON_MOVE request cancelled
1369 * because the connection failed and it needs
1370 * to be assigned to a new connection.
1371 * - TRUNK_CANCEL_REASON_REQUEUE request cancelled
1372 * as it needs to be resent on the same connection.
1373 */
1375{
1376 trunk_connection_t *tconn = treq->pub.tconn;
1377 trunk_t *trunk = treq->pub.trunk;
1378
1379 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1380
1381 switch (treq->pub.state) {
1384 break;
1385
1388 break;
1389
1392 break;
1393
1394 default:
1396 }
1397
1399 fr_dlist_insert_tail(&tconn->cancel, treq);
1400 treq->cancel_reason = reason;
1401
1402 DO_REQUEST_CANCEL(treq, reason);
1403
1404 /*
1405 * Our treq is no longer bound to an actual
1406 * request_t *, as we can't guarantee the
1407 * lifetime of the original request_t *.
1408 */
1409 if (treq->cancel_reason == TRUNK_CANCEL_REASON_SIGNAL) treq->pub.request = NULL;
1410
1411 /*
1412 * Register for I/O write events if we need to.
1413 */
1415}
1416
1417/** Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot
1418 *
1419 * The request_demux function is then responsible for signalling
1420 * that the cancel request is complete when the remote server
1421 * acknowledges the cancellation request.
1422 *
1423 * @param[in] treq to trigger a state change for.
1424 */
1426{
1427 trunk_connection_t *tconn = treq->pub.tconn;
1428 trunk_t *trunk = treq->pub.trunk;
1429
1430 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1433
1434 switch (treq->pub.state) {
1435 case TRUNK_REQUEST_STATE_CANCEL: /* The only valid state cancel_sent can be reached from */
1437 break;
1438
1439 default:
1441 }
1442
1444 fr_assert(!tconn->partial);
1445 tconn->cancel_partial = treq;
1446}
1447
1448/** Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list
1449 *
1450 * The request_demux function is then responsible for signalling
1451 * that the cancel request is complete when the remote server
1452 * acknowledges the cancellation request.
1453 *
1454 * @note treq->tconn and treq may be inviable after calling
1455 * if treq->conn and connection_signals_pause is not used.
1456 * This is due to call to trunk_connection_event_update.
1457 *
1458 * @param[in] treq to trigger a state change for.
1459 */
1461{
1462 trunk_connection_t *tconn = treq->pub.tconn;
1463 trunk_t *trunk = treq->pub.trunk;
1464
1465 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1468
1469 switch (treq->pub.state) {
1472 break;
1473
1476 break;
1477
1478 default:
1480 }
1481
1483 fr_dlist_insert_tail(&tconn->cancel_sent, treq);
1484
1485 /*
1486 * De-register for I/O write events
1487 * and register the read events
1488 * to drain the cancel ACKs.
1489 */
1491}
1492
1493/** Cancellation was acked, the request is complete, free it
1494 *
1495 * The API client will not be informed, as the original request_t *
1496 * will likely have been freed by this point.
1497 *
1498 * @note treq will be inviable after a call to this function.
1499 * treq->tconn may be inviable after calling
1500 * if treq->conn and connection_signals_pause is not used.
1501 * This is due to call to trunk_request_remove_from_conn.
1502 *
1503 * @param[in] treq to mark as complete.
1504 */
1506{
1507 trunk_connection_t *tconn = treq->pub.tconn;
1508 trunk_t *trunk = treq->pub.trunk;
1509
1510 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1511 if (!fr_cond_assert(!treq->pub.request)) return; /* Only a valid state for request_t * which have been cancelled */
1512
1513 switch (treq->pub.state) {
1516 break;
1517
1518 default:
1520 }
1521
1523
1525 trunk_request_free(&treq); /* Free the request */
1526}
1527
1528/** Request completed successfully, inform the API client and free the request
1529 *
1530 * @note treq will be inviable after a call to this function.
1531 * treq->tconn may also be inviable due to call to
1532 * trunk_request_remove_from_conn.
1533 *
1534 * @param[in] treq to mark as complete.
1535 */
1537{
1538 trunk_connection_t *tconn = treq->pub.tconn;
1539 trunk_t *trunk = treq->pub.trunk;
1540
1541 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1542
1543 switch (treq->pub.state) {
1548 break;
1549
1550 default:
1552 }
1553
1555 DO_REQUEST_COMPLETE(treq);
1556 trunk_request_free(&treq); /* Free the request */
1557}
1558
1559/** Request failed, inform the API client and free the request
1560 *
1561 * @note treq will be inviable after a call to this function.
1562 * treq->tconn may also be inviable due to call to
1563 * trunk_request_remove_from_conn.
1564 *
1565 * @param[in] treq to mark as failed.
1566 */
1568{
1569 trunk_connection_t *tconn = treq->pub.tconn;
1570 trunk_t *trunk = treq->pub.trunk;
1571 trunk_request_state_t prev = treq->pub.state;
1572
1573 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1574
1575 switch (treq->pub.state) {
1578 break;
1579
1580 default:
1582 break;
1583 }
1584
1586 DO_REQUEST_FAIL(treq, prev);
1587 trunk_request_free(&treq); /* Free the request */
1588}
1589
1590/** Check to see if a trunk request can be enqueued
1591 *
1592 * @param[out] tconn_out Connection the request may be enqueued on.
1593 * @param[in] trunk To enqueue requests on.
1594 * @param[in] request associated with the treq (if any).
1595 * @return
1596 * - TRUNK_ENQUEUE_OK caller should enqueue request on provided tconn.
1597 * - TRUNK_ENQUEUE_IN_BACKLOG Request should be queued in the backlog.
1598 * - TRUNK_ENQUEUE_NO_CAPACITY Unable to enqueue request as we have no spare
1599 * connections or backlog space.
1600 * - TRUNK_ENQUEUE_DST_UNAVAILABLE Can't enqueue because the destination is
1601 * unreachable.
1602 */
1604 request_t *request)
1605{
1606 trunk_connection_t *tconn;
1607 /*
1608 * If we have an active connection then
1609 * return that.
1610 */
1611 tconn = fr_minmax_heap_min_peek(trunk->active);
1612 if (tconn) {
1613 *tconn_out = tconn;
1614 return TRUNK_ENQUEUE_OK;
1615 }
1616
1617 /*
1618 * Unlike the connection pool, we don't need
1619 * to drive any internal processes by feeding
1620 * it requests.
1621 *
1622 * If the last event to occur was a failure
1623 * we refuse to enqueue new requests until
1624 * one or more connections comes online.
1625 */
1626 if (!trunk->conf.backlog_on_failed_conn &&
1627 fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
1628 fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed)) {
1630 RWARN, WARN, "Refusing to enqueue requests - "
1631 "No active connections and last event was a connection failure");
1632
1634 }
1635
1636
1637 /*
1638 * Only enforce if we're limiting maximum
1639 * number of connections, and maximum
1640 * number of requests per connection.
1641 *
1642 * The alloc function also checks this
1643 * which is why this is only done for
1644 * debug builds.
1645 */
1646 if (trunk->conf.max_req_per_conn && trunk->conf.max) {
1647 uint64_t limit;
1648
1649 limit = trunk->conf.max * (uint64_t)trunk->conf.max_req_per_conn;
1650 if (limit > 0) {
1651 uint64_t total_reqs;
1652
1653 total_reqs = trunk_request_count_by_state(trunk, TRUNK_CONN_ALL,
1655 if (total_reqs >= (limit + trunk->conf.max_backlog)) {
1657 RWARN, WARN, "Refusing to alloc requests - "
1658 "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
1659 "plus %u backlog requests reached",
1660 limit, trunk->conf.max, trunk->conf.max_req_per_conn,
1661 trunk->conf.max_backlog);
1663 }
1664 }
1665 }
1666
1668}
1669
1670/** Enqueue a request which has never been assigned to a connection or was previously cancelled
1671 *
1672 * @param[in] treq to re enqueue. Must have been removed
1673 * from its existing connection with
1674 * #trunk_connection_requests_dequeue.
1675 * @return
1676 * - TRUNK_ENQUEUE_OK Request was re-enqueued.
1677 * - TRUNK_ENQUEUE_NO_CAPACITY Request enqueueing failed because we're at capacity.
1678 * - TRUNK_ENQUEUE_DST_UNAVAILABLE Enqueuing failed for some reason.
1679 * Usually because the connection to the resource is down.
1680 */
1682{
1683 trunk_t *trunk = treq->pub.trunk;
1684 trunk_connection_t *tconn = NULL;
1685 trunk_enqueue_t ret;
1686
1687 /*
1688 * Must *NOT* still be assigned to another connection
1689 */
1690 fr_assert(!treq->pub.tconn);
1691
1692 ret = trunk_request_check_enqueue(&tconn, trunk, treq->pub.request);
1693 switch (ret) {
1694 case TRUNK_ENQUEUE_OK:
1695 if (trunk->conf.always_writable) {
1697 trunk_request_enter_pending(treq, tconn, false);
1700 } else {
1701 trunk_request_enter_pending(treq, tconn, false);
1702 }
1703 break;
1704
1706 /*
1707 * No more connections and request
1708 * is already in the backlog.
1709 *
1710 * Signal our caller it should stop
1711 * trying to drain the backlog.
1712 */
1714 trunk_request_enter_backlog(treq, false);
1715 break;
1716
1717 default:
1718 break;
1719 }
1720
1721 return ret;
1722}
1723
1724/** Shift requests in the specified states onto new connections
1725 *
1726 * This function will blindly dequeue any requests in the specified state and get
1727 * them back to the unassigned state, cancelling any sent or partially sent requests.
1728 *
1729 * This function does not check that dequeuing a request in a particular state is a
1730 * sane or sensible thing to do, that's up to the caller!
1731 *
1732 * @param[out] out A list to insert the newly dequeued and unassigned
1733 * requests into.
1734 * @param[in] tconn to dequeue requests from.
1735 * @param[in] states Dequeue request in these states.
1736 * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
1737 */
1739 int states, uint64_t max)
1740{
1741 trunk_request_t *treq;
1742 uint64_t count = 0;
1743
1744 if (max == 0) max = UINT64_MAX;
1745
1746#define OVER_MAX_CHECK if (++count > max) return (count - 1)
1747
1748#define DEQUEUE_ALL(_src_list, _state) do { \
1749 while ((treq = fr_dlist_head(_src_list))) { \
1750 OVER_MAX_CHECK; \
1751 fr_assert(treq->pub.state == (_state)); \
1752 trunk_request_enter_unassigned(treq); \
1753 fr_dlist_insert_tail(out, treq); \
1754 } } while (0)
1755
1756 /*
1757 * Don't need to do anything with
1758 * cancellation requests.
1759 */
1760 if (states & TRUNK_REQUEST_STATE_CANCEL) DEQUEUE_ALL(&tconn->cancel,
1762
1763 /*
1764 * ...same with cancel inform
1765 */
1768
1769 /*
1770 * ....same with cancel partial
1771 */
1774 treq = tconn->cancel_partial;
1775 if (treq) {
1779 }
1780 }
1781
1782 /*
1783 * ...and pending.
1784 */
1785 if (states & TRUNK_REQUEST_STATE_PENDING) {
1786 while ((treq = fr_heap_peek(tconn->pending))) {
1791 }
1792 }
1793
1794 /*
1795 * Cancel partially sent requests
1796 */
1797 if (states & TRUNK_REQUEST_STATE_PARTIAL) {
1799 treq = tconn->partial;
1800 if (treq) {
1802
1803 /*
1804 * Don't allow the connection to change state whilst
1805 * we're draining requests from it.
1806 */
1812 }
1813 }
1814
1815 /*
1816 * Cancel sent requests
1817 */
1818 if (states & TRUNK_REQUEST_STATE_SENT) {
1819 /*
1820 * Don't allow the connection to change state whilst
1821 * we're draining requests from it.
1822 */
1824 while ((treq = fr_dlist_head(&tconn->sent))) {
1827
1831 }
1833 }
1834
1835 return count;
1836}
1837
1838/** Remove requests in specified states from a connection, attempting to distribute them to new connections
1839 *
1840 * @param[in] tconn To remove requests from.
1841 * @param[in] states One or more states or'd together.
1842 * @param[in] max The maximum number of requests to dequeue.
1843 * 0 for unlimited.
1844 * @param[in] fail_bound If true causes any requests bound to the connection to fail.
1845 * If false bound requests will not be moved.
1846 *
1847 * @return the number of requests re-queued.
1848 */
1849static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
1850{
1851 trunk_t *trunk = tconn->pub.trunk;
1852 fr_dlist_head_t to_process;
1853 trunk_request_t *treq = NULL;
1854 uint64_t moved = 0;
1855
1856 if (max == 0) max = UINT64_MAX;
1857
1858 fr_dlist_talloc_init(&to_process, trunk_request_t, entry);
1859
1860 /*
1861 * Prevent the connection changing state whilst we're
1862 * working with it.
1863 *
1864 * There's a user callback that can be called by
1865 * trunk_request_enqueue_existing which can reconnect
1866 * the connection.
1867 */
1869
1870 /*
1871 * Remove non-cancelled requests from the connection
1872 */
1873 moved += trunk_connection_requests_dequeue(&to_process, tconn, states & ~TRUNK_REQUEST_STATE_CANCEL_ALL, max);
1874
1875 /*
1876 * Prevent requests being requeued on the same trunk
1877 * connection, which would break rebalancing.
1878 *
1879 * This is a bit of a hack, but nothing should test
1880 * for connection/list consistency in this code,
1881 * and if something is added later, it'll be flagged
1882 * by the tests.
1883 */
1884 if (tconn->pub.state == TRUNK_CONN_ACTIVE) {
1885 int ret;
1886
1887 ret = fr_minmax_heap_extract(trunk->active, tconn);
1888 if (!fr_cond_assert_msg(ret == 0,
1889 "Failed extracting conn from active heap: %s", fr_strerror())) goto done;
1890
1891 }
1892
1893 /*
1894 * Loop over all the requests we gathered and
1895 * redistribute them to new connections.
1896 */
1897 while ((treq = fr_dlist_next(&to_process, treq))) {
1898 trunk_request_t *prev;
1899
1900 prev = fr_dlist_remove(&to_process, treq);
1901
1902 /*
1903 * Attempts to re-queue a request
1904 * that's bound to a connection
1905 * results in a failure.
1906 */
1907 if (treq->bound_to_conn) {
1908 if (fail_bound || !IS_SERVICEABLE(tconn)) {
1910 } else {
1911 trunk_request_enter_pending(treq, tconn, false);
1912 }
1913 goto next;
1914 }
1915
1916 switch (trunk_request_enqueue_existing(treq)) {
1917 case TRUNK_ENQUEUE_OK:
1918 break;
1919
1920 /*
1921 * A connection failed, and
1922 * there's no other connections
1923 * available to deal with the
1924 * load, it's been placed back
1925 * in the backlog.
1926 */
1928 break;
1929
1930 /*
1931 * If we fail to re-enqueue then
1932 * there's nothing to do except
1933 * fail the request.
1934 */
1937 case TRUNK_ENQUEUE_FAIL:
1939 break;
1940 }
1941 next:
1942 treq = prev;
1943 }
1944
1945 /*
1946 * Add the connection back into the active list
1947 */
1948 if (tconn->pub.state == TRUNK_CONN_ACTIVE) {
1949 int ret;
1950
1951 ret = fr_minmax_heap_insert(trunk->active, tconn);
1952 if (!fr_cond_assert_msg(ret == 0,
1953 "Failed re-inserting conn into active heap: %s", fr_strerror())) goto done;
1954 }
1955 if (moved >= max) goto done;
1956
1957 /*
1958 * Deal with the cancelled requests specially we can't
1959 * queue them up again as they were only valid on that
1960 * specific connection.
1961 *
1962 * We just need to run them to completion which, as
1963 * they should already be in the unassigned state,
1964 * just means freeing them.
1965 */
1966 moved += trunk_connection_requests_dequeue(&to_process, tconn,
1967 states & TRUNK_REQUEST_STATE_CANCEL_ALL, max - moved);
1968 while ((treq = fr_dlist_next(&to_process, treq))) {
1969 trunk_request_t *prev;
1970
1971 prev = fr_dlist_remove(&to_process, treq);
1972 trunk_request_free(&treq);
1973 treq = prev;
1974 }
1975
1976done:
1977
1978 /*
1979 * Always re-calculate the request/connection
1980 * ratio at the end.
1981 *
1982 * This avoids having the state transition
1983 * functions do it.
1984 *
1985 * The ratio would be wrong when they calculated
1986 * it anyway, because a bunch of requests are
1987 * dequeued from the connection and temporarily
1988 * cease to exist from the perspective of the
1989 * trunk_requests_per_connection code.
1990 */
1991 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1992
1994 return moved;
1995}
1996
1997/** Move requests off of a connection and requeue elsewhere
1998 *
1999 * @note We don't re-queue on draining or draining to free, as requests should have already been
2000 * moved off of the connection. It's also dangerous as the trunk management code main
2001 * clean up a connection in this state when it's run on re-queue, and then the caller
2002 * may try and access a now freed connection.
2003 *
2004 * @param[in] tconn to move requests off of.
2005 * @param[in] states Only move requests in this state.
2006 * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
2007 * @param[in] fail_bound If true causes any requests bound to the connection to fail.
2008 * If false bound requests will not be moved.
2009 * @return The number of requests requeued.
2010 */
2011uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
2012{
2013 switch (tconn->pub.state) {
2014 case TRUNK_CONN_ACTIVE:
2015 case TRUNK_CONN_FULL:
2017 return trunk_connection_requests_requeue_priv(tconn, states, max, fail_bound);
2018
2019 default:
2020 return 0;
2021 }
2022}
2023
2024/** Signal a partial write
2025 *
2026 * Where there's high load, and the outbound write buffer is full
2027 *
2028 * @param[in] treq to signal state change for.
2029 */
2031{
2032 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2033
2035 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2036
2037 switch (treq->pub.state) {
2040 break;
2041
2042 default:
2043 return;
2044 }
2045}
2046
2047/** Signal that the request was written to a connection successfully
2048 *
2049 * @param[in] treq to signal state change for.
2050 */
2052{
2053 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2054
2056 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2057
2058 switch (treq->pub.state) {
2062 break;
2063
2064 default:
2065 return;
2066 }
2067}
2068
2069/** Signal that the request was written to a connection successfully, but no response is expected
2070 *
2071 * @param[in] treq to signal state change for.
2072 */
2074{
2075 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2076
2078 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2079
2080 switch (treq->pub.state) {
2084 break;
2085
2086 default:
2087 return;
2088 }
2089}
2090
2091/** Signal that a trunk request is complete
2092 *
2093 * The API client will be informed that the request is now complete.
2094 */
2096{
2097 trunk_t *trunk = treq->pub.trunk;
2098
2099 if (!fr_cond_assert_msg(trunk, "treq not associated with trunk")) return;
2100
2101 /*
2102 * We assume that if the request is being signalled
2103 * as complete from the demux function, that it was
2104 * a successful read.
2105 *
2106 * If this assumption turns out to be incorrect
2107 * then we need to add an argument to signal_complete
2108 * to indicate if this is a successful read.
2109 */
2110 if (IN_REQUEST_DEMUX(trunk)) {
2111 trunk_connection_t *tconn = treq->pub.tconn;
2112
2113 trunk->pub.last_read_success = fr_time();
2115 }
2116
2117 switch (treq->pub.state) {
2119 case TRUNK_REQUEST_STATE_PENDING: /* Got immediate response, i.e. cached */
2122 break;
2123
2124 default:
2125 return;
2126 }
2127}
2128
2129/** Signal that a trunk request failed
2130 *
2131 * The API client will be informed that the request has failed.
2132 */
2134{
2135 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2136
2138}
2139
2140/** Cancel a trunk request
2141 *
2142 * treq can be in any state, but requests to cancel if the treq is not in
2143 * the TRUNK_REQUEST_STATE_PARTIAL or TRUNK_REQUEST_STATE_SENT state will be ignored.
2144 *
2145 * The complete or failed callbacks will not be called here, as it's assumed the request_t *
2146 * is now inviable as it's being cancelled.
2147 *
2148 * The free function however, is called, and that should be used to perform necessary
2149 * cleanup.
2150 *
2151 * @param[in] treq to signal state change for.
2152 */
2154{
2155 trunk_t *trunk;
2156
2157 /*
2158 * Ensure treq hasn't been freed
2159 */
2160 (void)talloc_get_type_abort(treq, trunk_request_t);
2161
2162 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2163
2165 "%s cannot be called within a handler", __FUNCTION__)) return;
2166
2167 trunk = treq->pub.trunk;
2168
2169 switch (treq->pub.state) {
2170 /*
2171 * We don't call the complete or failed callbacks
2172 * as the request and rctx are no longer viable.
2173 */
2176 {
2177 trunk_connection_t *tconn = treq->pub.tconn;
2178
2179 /*
2180 * Don't allow connection state changes
2181 */
2185 "Bad state %s after cancellation",
2186 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"))) {
2188 return;
2189 }
2190 /*
2191 * No cancel muxer. We're done.
2192 *
2193 * If we do have a cancel mux function,
2194 * the next time this connection becomes
2195 * writable, we'll call the cancel mux
2196 * function.
2197 *
2198 * We don't run the complete or failed
2199 * callbacks here as the request is
2200 * being cancelled.
2201 */
2202 if (!trunk->funcs.request_cancel_mux) {
2204 trunk_request_free(&treq);
2205 }
2207 }
2208 break;
2209
2210 /*
2211 * We're already in the process of cancelling a
2212 * request, so ignore duplicate signals.
2213 */
2218 break;
2219
2220 /*
2221 * For any other state, we just release the request
2222 * from its current connection and free it.
2223 */
2224 default:
2226 trunk_request_free(&treq);
2227 break;
2228 }
2229}
2230
2231/** Signal a partial cancel write
2232 *
2233 * Where there's high load, and the outbound write buffer is full
2234 *
2235 * @param[in] treq to signal state change for.
2236 */
2238{
2239 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2240
2242 "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2243
2244 switch (treq->pub.state) {
2247 break;
2248
2249 default:
2250 return;
2251 }
2252}
2253
2254/** Signal that a remote server has been notified of the cancellation
2255 *
2256 * Called from request_cancel_mux to indicate that the datastore has been informed
2257 * that the response is no longer needed.
2258 *
2259 * @param[in] treq to signal state change for.
2260 */
2262{
2263 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2264
2266 "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2267
2268 switch (treq->pub.state) {
2272 break;
2273
2274 default:
2275 break;
2276 }
2277}
2278
2279/** Signal that a remote server acked our cancellation
2280 *
2281 * Called from request_demux to indicate that it got an ack for the cancellation.
2282 *
2283 * @param[in] treq to signal state change for.
2284 */
2286{
2287 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2288
2290 "%s can only be called from within request_demux or request_cancel_mux handlers",
2291 __FUNCTION__)) return;
2292
2293 switch (treq->pub.state) {
2295 /*
2296 * This is allowed, as we may not need to wait
2297 * for the database to ACK our cancellation
2298 * request.
2299 *
2300 * Note: TRUNK_REQUEST_STATE_CANCEL_PARTIAL
2301 * is not allowed here, as that'd mean we'd half
2302 * written the cancellation request out to the
2303 * socket, and then decided to abandon it.
2304 *
2305 * That'd leave the socket in an unusable state.
2306 */
2309 break;
2310
2311 default:
2312 break;
2313 }
2314}
2315
2316/** If the trunk request is freed then update the target requests
2317 *
2318 * gperftools showed calling the request free function directly was slightly faster
2319 * than using talloc_free.
2320 *
2321 * @param[in] treq_to_free request.
2322 */
2324{
2325 trunk_request_t *treq = *treq_to_free;
2326 trunk_t *trunk = treq->pub.trunk;
2327
2328 if (unlikely(!treq)) return;
2329
2330 /*
2331 * The only valid states a trunk request can be
2332 * freed from.
2333 */
2334 switch (treq->pub.state) {
2340 break;
2341
2342 default:
2343 if (!fr_cond_assert(0)) return;
2344 }
2345
2346 /*
2347 * Zero out the pointer to prevent double frees
2348 */
2349 *treq_to_free = NULL;
2350
2351 /*
2352 * Call the API client callback to free
2353 * any associated memory.
2354 */
2355 DO_REQUEST_FREE(treq);
2356
2357 /*
2358 * Update the last above/below target stats
2359 * We only do this when we alloc or free
2360 * connections, or on connection
2361 * state changes.
2362 */
2363 trunk_requests_per_connection(NULL, NULL, treq->pub.trunk, fr_time(), false);
2364
2365 /*
2366 * This tracks the total number of requests
2367 * allocated and not freed or returned to
2368 * the free list.
2369 */
2370 if (fr_cond_assert(trunk->pub.req_alloc > 0)) trunk->pub.req_alloc--;
2371
2372 /*
2373 * No cleanup delay, means cleanup immediately
2374 */
2377
2378#ifndef NDEBUG
2379 /*
2380 * Ensure anything parented off the treq
2381 * is freed. We do this to trigger
2382 * the destructors for the log entries.
2383 */
2384 talloc_free_children(treq);
2385
2386 /*
2387 * State log should now be empty as entries
2388 * remove themselves from the dlist
2389 * on free.
2390 */
2392 "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2393#endif
2394
2395 talloc_free(treq);
2396 return;
2397 }
2398
2399 /*
2400 * Ensure anything parented off the treq
2401 * is freed.
2402 */
2403 talloc_free_children(treq);
2404
2405#ifndef NDEBUG
2406 /*
2407 * State log should now be empty as entries
2408 * remove themselves from the dlist
2409 * on free.
2410 */
2412 "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2413#endif
2414
2415 /*
2416 *
2417 * Return the trunk request back to the init state.
2418 */
2419 *treq = (trunk_request_t){
2420 .pub = {
2422 .trunk = treq->pub.trunk,
2423 },
2424 .cancel_reason = TRUNK_CANCEL_REASON_NONE,
2425 .last_freed = fr_time(),
2426#ifndef NDEBUG
2427 .log = treq->log /* Keep the list head, to save reinitialisation */
2428#endif
2429 };
2430
2431 /*
2432 * Insert at the head, so that we can free
2433 * requests that have been unused for N
2434 * seconds from the tail.
2435 */
2436 fr_dlist_insert_tail(&trunk->free_requests, treq);
2437}
2438
2439/** Actually free the trunk request
2440 *
2441 */
2443{
2444 trunk_t *trunk = treq->pub.trunk;
2445
2446 switch (treq->pub.state) {
2449 break;
2450
2451 default:
2452 fr_assert(0);
2453 break;
2454 }
2455
2456 fr_dlist_remove(&trunk->free_requests, treq);
2457
2458 return 0;
2459}
2460
2461/** (Pre-)Allocate a new trunk request
2462 *
2463 * If trunk->conf.req_pool_headers or trunk->conf.req_pool_size are not zero then the
2464 * request will be a talloc pool, which can be used to hold the preq.
2465 *
2466 * @note Do not use MEM to check the result of this allocated as it may fail for
2467 * non-fatal reasons.
2468 *
2469 * @param[in] trunk to add request to.
2470 * @param[in] request to wrap in a trunk request (treq).
2471 * @return
2472 * - A newly allocated request.
2473 * - NULL if too many requests are allocated.
2474 */
2476{
2477 trunk_request_t *treq;
2478
2479 /*
2480 * The number of treqs currently allocated
2481 * exceeds the maximum number allowed.
2482 */
2483 if (trunk->conf.max_req_per_conn && trunk->conf.max) {
2484 uint64_t limit;
2485
2486 limit = (uint64_t) trunk->conf.max_req_per_conn * trunk->conf.max;
2487 if (trunk->pub.req_alloc >= (limit + trunk->conf.max_backlog)) {
2489 RWARN, WARN, "Refusing to alloc requests - "
2490 "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
2491 "plus %u backlog requests reached",
2492 limit, trunk->conf.max, trunk->conf.max_req_per_conn,
2493 trunk->conf.max_backlog);
2494 return NULL;
2495 }
2496 }
2497
2498 /*
2499 * Allocate or reuse an existing request
2500 */
2501 treq = fr_dlist_head(&trunk->free_requests);
2502 if (treq) {
2503 fr_dlist_remove(&trunk->free_requests, treq);
2505 fr_assert(treq->pub.trunk == trunk);
2506 fr_assert(treq->pub.tconn == NULL);
2509 trunk->pub.req_alloc_reused++;
2510 } else {
2512 trunk->conf.req_pool_headers, trunk->conf.req_pool_size));
2513 talloc_set_destructor(treq, _trunk_request_free);
2514
2515 *treq = (trunk_request_t){
2516 .pub = {
2518 .trunk = trunk
2519 },
2520 .cancel_reason = TRUNK_CANCEL_REASON_NONE
2521 };
2522 trunk->pub.req_alloc_new++;
2523#ifndef NDEBUG
2525#endif
2526 }
2527
2528 trunk->pub.req_alloc++;
2530 /* heap_id - initialised when treq inserted into pending */
2531 /* list - empty */
2532 /* preq - populated later */
2533 /* rctx - populated later */
2534 treq->pub.request = request;
2535
2536 return treq;
2537}
2538
2539/** Enqueue a request that needs data written to the trunk
2540 *
2541 * When a request_t * needs to make an asynchronous request to an external datastore
2542 * it should call this function, specifying a preq (protocol request) containing
2543 * the data necessary to request information from the external datastore, and an
2544 * rctx (resume ctx) used to hold the decoded response and/or any error codes.
2545 *
2546 * After a treq is successfully enqueued it will either be assigned immediately
2547 * to the pending queue of a connection, or if no connections are available,
2548 * (depending on the trunk configuration) the treq will be placed in the trunk's
2549 * global backlog.
2550 *
2551 * After receiving a positive return code from this function the caller should
2552 * immediately yield, to allow the various timers and I/O handlers that drive tconn
2553 * (trunk connection) and treq state changes to be called.
2554 *
2555 * When a tconn becomes writable (or the trunk is configured to be always writable)
2556 * the #trunk_request_mux_t callback will be called to dequeue, encode and
2557 * send any pending requests for that tconn. The #trunk_request_mux_t callback
2558 * is also responsible for tracking the outbound requests to allow the
2559 * #trunk_request_demux_t callback to match inbound responses with the original
2560 * treq. Once the #trunk_request_mux_t callback is done processing the treq
2561 * it signals what state the treq should enter next using one of the
2562 * trunk_request_signal_* functions.
2563 *
2564 * When a tconn becomes readable the user specified #trunk_request_demux_t
2565 * callback is called to process any responses, match them with the original treq.
2566 * and signal what state they should enter next using one of the
2567 * trunk_request_signal_* functions.
2568 *
2569 * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2570 * is NULL, a new treq will be allocated.
2571 * Otherwise treq should point to memory allocated
2572 * with trunk_request_alloc.
2573 * @param[in] trunk to enqueue request on.
2574 * @param[in] request to enqueue.
2575 * @param[in] preq Protocol request to write out. Will be freed when
2576 * treq is freed. Should ideally be parented by the
2577 * treq if possible.
2578 * Use #trunk_request_alloc for pre-allocation of
2579 * the treq.
2580 * @param[in] rctx The resume context to write any result to.
2581 * @return
2582 * - TRUNK_ENQUEUE_OK.
2583 * - TRUNK_ENQUEUE_IN_BACKLOG.
2584 * - TRUNK_ENQUEUE_NO_CAPACITY.
2585 * - TRUNK_ENQUEUE_DST_UNAVAILABLE
2586 * - TRUNK_ENQUEUE_FAIL
2587 */
2589 request_t *request, void *preq, void *rctx)
2590{
2591 trunk_connection_t *tconn = NULL;
2592 trunk_request_t *treq;
2593 trunk_enqueue_t ret;
2594
2595 if (!fr_cond_assert_msg(!IN_HANDLER(trunk),
2596 "%s cannot be called within a handler", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2597
2598 if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == TRUNK_REQUEST_STATE_INIT),
2599 "%s requests must be in \"init\" state", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2600
2601 /*
2602 * If delay_start was set, we may need
2603 * to insert the timer for the connection manager.
2604 */
2605 if (unlikely(!trunk->started)) {
2606 if (trunk_start(trunk) < 0) return TRUNK_ENQUEUE_FAIL;
2607 }
2608
2609 ret = trunk_request_check_enqueue(&tconn, trunk, request);
2610 switch (ret) {
2611 case TRUNK_ENQUEUE_OK:
2612 if (*treq_out) {
2613 treq = *treq_out;
2614 } else {
2615 *treq_out = treq = trunk_request_alloc(trunk, request);
2616 if (!treq) return TRUNK_ENQUEUE_FAIL;
2617 }
2618 treq->pub.preq = preq;
2619 treq->pub.rctx = rctx;
2620 if (trunk->conf.always_writable) {
2622 trunk_request_enter_pending(treq, tconn, true);
2625 } else {
2626 trunk_request_enter_pending(treq, tconn, true);
2627 }
2628 break;
2629
2631 if (*treq_out) {
2632 treq = *treq_out;
2633 } else {
2634 *treq_out = treq = trunk_request_alloc(trunk, request);
2635 if (!treq) return TRUNK_ENQUEUE_FAIL;
2636 }
2637 treq->pub.preq = preq;
2638 treq->pub.rctx = rctx;
2639 trunk_request_enter_backlog(treq, true);
2640 break;
2641
2642 default:
2643 /*
2644 * If a trunk request was provided
2645 * populate the preq and rctx fields
2646 * so that if it's freed with
2647 * trunk_request_free, the free
2648 * function works as intended.
2649 */
2650 if (*treq_out) {
2651 treq = *treq_out;
2652 treq->pub.preq = preq;
2653 treq->pub.rctx = rctx;
2654 }
2655 return ret;
2656 }
2657
2658 return ret;
2659}
2660
2661/** Re-enqueue a request on the same connection
2662 *
2663 * If the treq has been sent, we assume that we're being signalled to requeue
2664 * because something outside of the trunk API has determined that a retransmission
2665 * is required. The easiest way to perform that retransmission is to clean up
2666 * any tracking information for the request, and the requeue it for transmission.
2667 *
2668 * IF re-queueing fails, the request will enter the fail state. It should not be
2669 * accessed if this occurs.
2670 *
2671 * @param[in] treq to requeue (retransmit).
2672 * @return
2673 * - TRUNK_ENQUEUE_OK.
2674 * - TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2675 * - TRUNK_ENQUEUE_FAIL - Request isn't in a valid state to be reassigned.
2676 */
2678{
2679 trunk_connection_t *tconn = treq->pub.tconn; /* Existing conn */
2680
2681 if (!tconn) return TRUNK_ENQUEUE_FAIL;
2682
2683 if (!IS_PROCESSING(tconn)) {
2686 }
2687
2688 switch (treq->pub.state) {
2694 trunk_request_enter_pending(treq, tconn, false);
2695 if (treq->pub.trunk->conf.always_writable) {
2697 }
2699 break;
2700
2701 case TRUNK_REQUEST_STATE_BACKLOG: /* Do nothing.... */
2702 case TRUNK_REQUEST_STATE_PENDING: /* Do nothing.... */
2703 break;
2704
2705 default:
2707 return TRUNK_ENQUEUE_FAIL;
2708 }
2709
2710 return TRUNK_ENQUEUE_OK;
2711}
2712
2713/** Enqueue additional requests on a specific connection
2714 *
2715 * This may be used to create a series of requests on a single connection, or to generate
2716 * in-band status checks.
2717 *
2718 * @note If conf->always_writable, then the muxer will be called immediately. The caller
2719 * must be able to handle multiple calls to its muxer gracefully.
2720 *
2721 * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2722 * is NULL, a new treq will be allocated.
2723 * Otherwise treq should point to memory allocated
2724 * with trunk_request_alloc.
2725 * @param[in] tconn to enqueue request on.
2726 * @param[in] request to enqueue.
2727 * @param[in] preq Protocol request to write out. Will be freed when
2728 * treq is freed. Should ideally be parented by the
2729 * treq if possible.
2730 * Use #trunk_request_alloc for pre-allocation of
2731 * the treq.
2732 * @param[in] rctx The resume context to write any result to.
2733 * @param[in] ignore_limits Ignore max_req_per_conn. Useful to force status
2734 * checks through even if the connection is at capacity.
2735 * Will also allow enqueuing on "inactive", "draining",
2736 * "draining-to-free" connections.
2737 * @return
2738 * - TRUNK_ENQUEUE_OK.
2739 * - TRUNK_ENQUEUE_NO_CAPACITY - At max_req_per_conn_limit
2740 * - TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2741 */
2743 request_t *request, void *preq, void *rctx,
2744 bool ignore_limits)
2745{
2746 trunk_request_t *treq;
2747 trunk_t *trunk = tconn->pub.trunk;
2748
2749 if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == TRUNK_REQUEST_STATE_INIT),
2750 "%s requests must be in \"init\" state", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2751
2753
2754 /*
2755 * Limits check
2756 */
2757 if (!ignore_limits) {
2758 if (trunk->conf.max_req_per_conn &&
2761
2763 }
2764
2765 if (*treq_out) {
2766 treq = *treq_out;
2767 } else {
2768 MEM(*treq_out = treq = trunk_request_alloc(trunk, request));
2769 }
2770
2771 treq->pub.preq = preq;
2772 treq->pub.rctx = rctx;
2773 treq->bound_to_conn = true; /* Don't let the request be transferred */
2774
2775 if (trunk->conf.always_writable) {
2777 trunk_request_enter_pending(treq, tconn, true);
2780 } else {
2781 trunk_request_enter_pending(treq, tconn, true);
2782 }
2783
2784 return TRUNK_ENQUEUE_OK;
2785}
2786
2787#ifndef NDEBUG
2788/** Used for sanity checks to ensure all log entries have been freed
2789 *
2790 */
2792{
2793 fr_dlist_remove(slog->log_head, slog);
2794
2795 return 0;
2796}
2797
2798void trunk_request_state_log_entry_add(char const *function, int line,
2800{
2801 trunk_request_state_log_t *slog = NULL;
2802
2804 slog = fr_dlist_head(&treq->log);
2805 fr_assert_msg(slog, "slog list head NULL but element counter was %u",
2806 fr_dlist_num_elements(&treq->log));
2807 (void)fr_dlist_remove(&treq->log, slog); /* Returns NULL when removing the list head */
2808 memset(slog, 0, sizeof(*slog));
2809 } else {
2810 MEM(slog = talloc_zero(treq, trunk_request_state_log_t));
2811 talloc_set_destructor(slog, _state_log_entry_free);
2812 }
2813
2814 slog->log_head = &treq->log;
2815 slog->from = treq->pub.state;
2816 slog->to = new;
2817 slog->function = function;
2818 slog->line = line;
2819 if (treq->pub.tconn) {
2820 slog->tconn = treq->pub.tconn;
2821 slog->tconn_id = treq->pub.tconn->pub.conn->id;
2822 slog->tconn_state = treq->pub.tconn->pub.state;
2823 }
2824
2825 fr_dlist_insert_tail(&treq->log, slog);
2826
2827}
2828
2829void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line,
2830 trunk_request_t const *treq)
2831{
2832 trunk_request_state_log_t *slog = NULL;
2833
2834 int i;
2835
2836 for (slog = fr_dlist_head(&treq->log), i = 0;
2837 slog;
2838 slog = fr_dlist_next(&treq->log, slog), i++) {
2839 fr_log(log, log_type, file, line, "[%u] %s:%i - in conn %"PRIu64" in state %s - %s -> %s",
2840 i, slog->function, slog->line,
2841 slog->tconn_id,
2843 slog->tconn_state, "<INVALID>") : "none",
2844 fr_table_str_by_value(trunk_request_states, slog->from, "<INVALID>"),
2845 fr_table_str_by_value(trunk_request_states, slog->to, "<INVALID>"));
2846 }
2847}
2848#endif
2849
2850/** Return the count number of connections in the specified states
2851 *
2852 * @param[in] trunk to retrieve counts for.
2853 * @param[in] conn_state One or more #trunk_connection_state_t states or'd together.
2854 * @return The number of connections in the specified states.
2855 */
2857{
2858 uint16_t count = 0;
2859
2860 if (conn_state & TRUNK_CONN_INIT) count += fr_dlist_num_elements(&trunk->init);
2861 if (conn_state & TRUNK_CONN_CONNECTING) count += fr_dlist_num_elements(&trunk->connecting);
2862 if (conn_state & TRUNK_CONN_ACTIVE) count += fr_minmax_heap_num_elements(trunk->active);
2863 if (conn_state & TRUNK_CONN_FULL) count += fr_dlist_num_elements(&trunk->full);
2864 if (conn_state & TRUNK_CONN_INACTIVE) count += fr_dlist_num_elements(&trunk->inactive);
2866 if (conn_state & TRUNK_CONN_CLOSED) count += fr_dlist_num_elements(&trunk->closed);
2867 if (conn_state & TRUNK_CONN_DRAINING) count += fr_dlist_num_elements(&trunk->draining);
2869
2870 return count;
2871}
2872
2873/** Return the count number of requests associated with a trunk connection
2874 *
2875 * @param[in] tconn to return request count for.
2876 * @param[in] req_state One or more request states or'd together.
2877 *
2878 * @return The number of requests in the specified states, associated with a tconn.
2879 */
2881{
2882 uint32_t count = 0;
2883
2885 if (req_state & TRUNK_REQUEST_STATE_PARTIAL) count += tconn->partial ? 1 : 0;
2886 if (req_state & TRUNK_REQUEST_STATE_SENT) count += fr_dlist_num_elements(&tconn->sent);
2888 if (req_state & TRUNK_REQUEST_STATE_CANCEL) count += fr_dlist_num_elements(&tconn->cancel);
2889 if (req_state & TRUNK_REQUEST_STATE_CANCEL_PARTIAL) count += tconn->cancel_partial ? 1 : 0;
2891
2892 return count;
2893}
2894
2895/** Automatically mark a connection as inactive
2896 *
2897 * @param[in] tconn to potentially mark as inactive.
2898 */
2900{
2901 trunk_t *trunk = tconn->pub.trunk;
2903
2904 if (tconn->pub.state != TRUNK_CONN_ACTIVE) return;
2905
2906 /*
2907 * Enforces max_req_per_conn
2908 */
2909 if (trunk->conf.max_req_per_conn > 0) {
2912 }
2913}
2914
2915/** Return whether a trunk connection should currently be considered full
2916 *
2917 * @param[in] tconn to check.
2918 * @return
2919 * - true if the connection is full.
2920 * - false if the connection is not full.
2921 */
2923{
2924 trunk_t *trunk = tconn->pub.trunk;
2926
2927 /*
2928 * Enforces max_req_per_conn
2929 */
2931 if ((trunk->conf.max_req_per_conn == 0) || (count < trunk->conf.max_req_per_conn)) return false;
2932
2933 return true;
2934}
2935
2936/** Automatically mark a connection as active or reconnect it
2937 *
2938 * @param[in] tconn to potentially mark as active or reconnect.
2939 */
2941{
2942 if (tconn->pub.state != TRUNK_CONN_FULL) return;
2943
2944 /*
2945 * Enforces max_req_per_conn
2946 */
2948}
2949
2950/** A connection is readable. Call the request_demux function to read pending requests
2951 *
2952 */
2954{
2955 trunk_t *trunk = tconn->pub.trunk;
2956
2957 DO_REQUEST_DEMUX(tconn);
2958}
2959
2960/** A connection is writable. Call the request_mux function to write pending requests
2961 *
2962 */
2964{
2965 trunk_t *trunk = tconn->pub.trunk;
2966
2967 /*
2968 * Call the cancel_sent function (if we have one)
2969 * to inform a backend datastore we no longer
2970 * care about the result
2971 */
2975 DO_REQUEST_CANCEL_MUX(tconn);
2976 }
2980 DO_REQUEST_MUX(tconn);
2981}
2982
2983/** Update the registrations for I/O events we're interested in
2984 *
2985 */
2987{
2988 trunk_t *trunk = tconn->pub.trunk;
2990
2991 switch (tconn->pub.state) {
2992 /*
2993 * We only register I/O events if the trunk connection is
2994 * in one of these states.
2995 *
2996 * For the other states the trunk shouldn't be processing
2997 * requests.
2998 */
2999 case TRUNK_CONN_ACTIVE:
3000 case TRUNK_CONN_FULL:
3005 /*
3006 * If the connection is always writable,
3007 * then we don't care about write events.
3008 */
3009 if (!trunk->conf.always_writable &&
3013 (trunk->funcs.request_cancel_mux ?
3017 }
3018
3021 (trunk->funcs.request_cancel_mux ?
3024 }
3025 break;
3026
3027 default:
3028 break;
3029 }
3030
3031 if (tconn->events != events) {
3032 /*
3033 * There may be a fatal error which results
3034 * in the connection being freed.
3035 *
3036 * Stop that from happening until after
3037 * we're done using it.
3038 */
3041 tconn->events = events;
3043 }
3044}
3045
3046/** Remove a trunk connection from whichever list it's currently in
3047 *
3048 * @param[in] tconn to remove.
3049 */
3051{
3052 trunk_t *trunk = tconn->pub.trunk;
3053
3054 switch (tconn->pub.state) {
3055 case TRUNK_CONN_ACTIVE:
3056 {
3057 int ret;
3058
3059 ret = fr_minmax_heap_extract(trunk->active, tconn);
3060 if (!fr_cond_assert_msg(ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) return;
3061 }
3062 return;
3063
3064 case TRUNK_CONN_INIT:
3065 fr_dlist_remove(&trunk->init, tconn);
3066 break;
3067
3069 fr_dlist_remove(&trunk->connecting, tconn);
3070 return;
3071
3072 case TRUNK_CONN_CLOSED:
3073 fr_dlist_remove(&trunk->closed, tconn);
3074 return;
3075
3076 case TRUNK_CONN_FULL:
3077 fr_dlist_remove(&trunk->full, tconn);
3078 return;
3079
3081 fr_dlist_remove(&trunk->inactive, tconn);
3082 return;
3083
3085 fr_dlist_remove(&trunk->inactive_draining, tconn);
3086 return;
3087
3089 fr_dlist_remove(&trunk->draining, tconn);
3090 return;
3091
3093 fr_dlist_remove(&trunk->draining_to_free, tconn);
3094 return;
3095
3096 case TRUNK_CONN_HALTED:
3097 return;
3098 }
3099}
3100
3101/** Transition a connection to the full state
3102 *
3103 * Called whenever a trunk connection is at the maximum number of requests.
3104 * Removes the connection from the connected heap, and places it in the full list.
3105 */
3107{
3108 trunk_t *trunk = tconn->pub.trunk;
3109
3110 switch (tconn->pub.state) {
3111 case TRUNK_CONN_ACTIVE:
3113 break;
3114
3115 default:
3117 }
3118
3119 fr_dlist_insert_head(&trunk->full, tconn);
3121}
3122
3123/** Transition a connection to the inactive state
3124 *
3125 * Called whenever the API client wants to stop new requests being enqueued
3126 * on a trunk connection.
3127 */
3129{
3130 trunk_t *trunk = tconn->pub.trunk;
3131
3132 switch (tconn->pub.state) {
3133 case TRUNK_CONN_ACTIVE:
3134 case TRUNK_CONN_FULL:
3136 break;
3137
3138 default:
3140 }
3141
3142 fr_dlist_insert_head(&trunk->inactive, tconn);
3144}
3145
3146/** Transition a connection to the inactive-draining state
3147 *
3148 * Called whenever the trunk manager wants to drain an inactive connection
3149 * of its requests.
3150 */
3152{
3153 trunk_t *trunk = tconn->pub.trunk;
3154
3155 switch (tconn->pub.state) {
3159 break;
3160
3161 default:
3163 }
3164
3167
3168 /*
3169 * Immediately re-enqueue all pending
3170 * requests, so the connection is drained
3171 * quicker.
3172 */
3174}
3175
3176/** Transition a connection to the draining state
3177 *
3178 * Removes the connection from the active heap so it won't be assigned any new
3179 * connections.
3180 */
3182{
3183 trunk_t *trunk = tconn->pub.trunk;
3184
3185 switch (tconn->pub.state) {
3186 case TRUNK_CONN_ACTIVE:
3187 case TRUNK_CONN_FULL:
3191 break;
3192
3193 default:
3195 }
3196
3197 fr_dlist_insert_head(&trunk->draining, tconn);
3199
3200 /*
3201 * Immediately re-enqueue all pending
3202 * requests, so the connection is drained
3203 * quicker.
3204 */
3206}
3207
3208/** Transition a connection to the draining-to-reconnect state
3209 *
3210 * Removes the connection from the active heap so it won't be assigned any new
3211 * connections.
3212 */
3214{
3215 trunk_t *trunk = tconn->pub.trunk;
3216
3218
3219 switch (tconn->pub.state) {
3220 case TRUNK_CONN_ACTIVE:
3221 case TRUNK_CONN_FULL:
3226 break;
3227
3228 default:
3230 }
3231
3232 fr_dlist_insert_head(&trunk->draining_to_free, tconn);
3234
3235 /*
3236 * Immediately re-enqueue all pending
3237 * requests, so the connection is drained
3238 * quicker.
3239 */
3241}
3242
3243
3244/** Transition a connection back to the active state
3245 *
3246 * This should only be called on a connection which is in the full state,
3247 * inactive state, draining state or connecting state.
3248 */
3250{
3251 trunk_t *trunk = tconn->pub.trunk;
3252 int ret;
3253
3254 switch (tconn->pub.state) {
3255 case TRUNK_CONN_FULL:
3260 break;
3261
3262 case TRUNK_CONN_INIT:
3266 break;
3267
3268 default:
3270 }
3271
3272 ret = fr_minmax_heap_insert(trunk->active, tconn); /* re-insert into the active heap*/
3273 if (!fr_cond_assert_msg(ret == 0, "Failed inserting connection into active heap: %s", fr_strerror())) {
3275 return;
3276 }
3277
3279
3280 /*
3281 * Reorder the connections
3282 */
3283 CONN_REORDER(tconn);
3284
3285 /*
3286 * Rebalance requests
3287 */
3288 trunk_rebalance(trunk);
3289
3290 /*
3291 * We place requests into the backlog
3292 * because there were no connections
3293 * available to handle them.
3294 *
3295 * If a connection has become active
3296 * chances are those backlogged requests
3297 * can now be enqueued, so try and do
3298 * that now.
3299 *
3300 * If there's requests sitting in the
3301 * backlog indefinitely, it's because
3302 * they were inserted there erroneously
3303 * when there were active connections
3304 * which could have handled them.
3305 */
3306 trunk_backlog_drain(trunk);
3307}
3308
3309/** Connection transitioned to the the init state
3310 *
3311 * Reflect the connection state change in the lists we use to track connections.
3312 *
3313 * @note This function is only called from the connection API as a watcher.
3314 *
3315 * @param[in] conn The connection which changes state.
3316 * @param[in] prev The connection is was in.
3317 * @param[in] state The connection is now in.
3318 * @param[in] uctx The trunk_connection_t wrapping the connection.
3319 */
3323 void *uctx)
3324{
3325 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3326 trunk_t *trunk = tconn->pub.trunk;
3327
3328 switch (tconn->pub.state) {
3329 case TRUNK_CONN_HALTED:
3330 break;
3331
3332 case TRUNK_CONN_CLOSED:
3334 break;
3335
3336 default:
3338 }
3339
3340 fr_dlist_insert_head(&trunk->init, tconn);
3342}
3343
3344/** Connection transitioned to the connecting state
3345 *
3346 * Reflect the connection state change in the lists we use to track connections.
3347 *
3348 * @note This function is only called from the connection API as a watcher.
3349 *
3350 * @param[in] conn The connection which changes state.
3351 * @param[in] prev The connection is was in.
3352 * @param[in] state The connection is now in.
3353 * @param[in] uctx The trunk_connection_t wrapping the connection.
3354 */
3358 void *uctx)
3359{
3360 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3361 trunk_t *trunk = tconn->pub.trunk;
3362
3363 switch (tconn->pub.state) {
3364 case TRUNK_CONN_INIT:
3365 case TRUNK_CONN_CLOSED:
3367 break;
3368
3369 default:
3371 }
3372
3373 /*
3374 * If a connection just entered the
3375 * connecting state, it should have
3376 * no requests associated with it.
3377 */
3379
3380 fr_dlist_insert_head(&trunk->connecting, tconn); /* MUST remain a head insertion for reconnect logic */
3382}
3383
3384/** Connection transitioned to the shutdown state
3385 *
3386 * If we're not already in the draining-to-free state, transition there now.
3387 *
3388 * The idea is that if something signalled the connection to shutdown, we need
3389 * to reflect that by dequeuing any pending requests, not accepting new ones,
3390 * and waiting for the existing requests to complete.
3391 *
3392 * @note This function is only called from the connection API as a watcher.
3393 *
3394 * @param[in] conn The connection which changes state.
3395 * @param[in] prev The connection is was in.
3396 * @param[in] state The connection is now in.
3397 * @param[in] uctx The trunk_connection_t wrapping the connection.
3398 */
3402 void *uctx)
3403{
3404 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3405
3406 switch (tconn->pub.state) {
3407 case TRUNK_CONN_DRAINING_TO_FREE: /* Do Nothing */
3408 return;
3409
3410 case TRUNK_CONN_ACTIVE: /* Transition to draining-to-free */
3411 case TRUNK_CONN_FULL:
3415 break;
3416
3417 case TRUNK_CONN_INIT:
3419 case TRUNK_CONN_CLOSED:
3420 case TRUNK_CONN_HALTED:
3422 }
3423
3425}
3426
3427/** Trigger a reconnection of the trunk connection
3428 *
3429 * @param[in] tl timer list the timer was inserted into.
3430 * @param[in] now Current time.
3431 * @param[in] uctx The tconn.
3432 */
3434{
3435 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3436
3438}
3439
3440/** Connection transitioned to the connected state
3441 *
3442 * Reflect the connection state change in the lists we use to track connections.
3443 *
3444 * @note This function is only called from the connection API as a watcher.
3445 *
3446 * @param[in] conn The connection which changes state.
3447 * @param[in] prev The connection is was in.
3448 * @param[in] state The connection is now in.
3449 * @param[in] uctx The trunk_connection_t wrapping the connection.
3450 */
3454 void *uctx)
3455{
3456 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3457 trunk_t *trunk = tconn->pub.trunk;
3458
3459 /*
3460 * If a connection was just connected, it should only
3461 * have a pending list of requests. This state is found
3462 * in the rlm_radius module, which starts a new trunk,
3463 * and then immediately enqueues a request onto it. The
3464 * alternative for rlm_radius is to keep it's own queue
3465 * of pending requests before the trunk is fully
3466 * initialized. And then enqueue them onto the trunk
3467 * when the trunk is connected.
3468 *
3469 * It's instead easier (and makes more sense) to allow
3470 * the trunk to accept packets into its queue. If there
3471 * are no connections within a period of time, then the
3472 * requests will retry, or will time out.
3473 */
3475
3476 /*
3477 * Set here, as the active state can
3478 * be transitioned to from full and
3479 * draining too.
3480 */
3481 trunk->pub.last_connected = fr_time();
3482
3483 /*
3484 * Insert a timer to reconnect the
3485 * connection periodically.
3486 */
3487 if (fr_time_delta_ispos(trunk->conf.lifetime)) {
3488 if (fr_timer_in(tconn, trunk->el->tl, &tconn->lifetime_ev,
3489 trunk->conf.lifetime, false, _trunk_connection_lifetime_expire, tconn) < 0) {
3490 PERROR("Failed inserting connection reconnection timer event, halting connection");
3492 return;
3493 }
3494 }
3495
3497}
3498
3499/** Connection failed after it was connected
3500 *
3501 * Reflect the connection state change in the lists we use to track connections.
3502 *
3503 * @note This function is only called from the connection API as a watcher.
3504 *
3505 * @param[in] conn The connection which changes state.
3506 * @param[in] prev The connection is was in.
3507 * @param[in] state The connection is now in.
3508 * @param[in] uctx The trunk_connection_t wrapping the connection.
3509 */
3513 void *uctx)
3514{
3515 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3516 trunk_t *trunk = tconn->pub.trunk;
3517 bool need_requeue = false;
3518
3519 switch (tconn->pub.state) {
3520 case TRUNK_CONN_ACTIVE:
3521 case TRUNK_CONN_FULL:
3526 need_requeue = true;
3528 break;
3529
3530 case TRUNK_CONN_INIT: /* Initialisation failed */
3534 break;
3535
3536 case TRUNK_CONN_CLOSED:
3537 case TRUNK_CONN_HALTED: /* Can't move backwards? */
3539 }
3540
3541 fr_dlist_insert_head(&trunk->closed, tconn); /* MUST remain a head insertion for reconnect logic */
3543
3544 /*
3545 * Now *AFTER* the connection has been
3546 * removed from the active, pool
3547 * re-enqueue the requests.
3548 */
3549 if (need_requeue) trunk_connection_requests_requeue_priv(tconn, TRUNK_REQUEST_STATE_ALL, 0, true);
3550
3551 /*
3552 * There should be no requests left on this
3553 * connection. They should have all been
3554 * moved off or failed.
3555 */
3557
3558 /*
3559 * Clear statistics and flags
3560 */
3561 tconn->sent_count = 0;
3562
3563 /*
3564 * Remove the reconnect event
3565 */
3567
3568 /*
3569 * Remove the I/O events
3570 */
3572}
3573
3574/** Connection failed
3575 *
3576 * @param[in] conn The connection which changes state.
3577 * @param[in] prev The connection is was in.
3578 * @param[in] state The connection is now in.
3579 * @param[in] uctx The trunk_connection_t wrapping the connection.
3580 */
3582 connection_state_t prev,
3584 void *uctx)
3585{
3586 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3587 trunk_t *trunk = tconn->pub.trunk;
3588
3589 /*
3590 * Need to set this first as it
3591 * determines whether requests are
3592 * re-queued or fail outright.
3593 */
3594 trunk->pub.last_failed = fr_time();
3595
3596 /*
3597 * Failed in the init state, transition the
3598 * connection to closed, else we get an
3599 * INIT -> INIT transition which triggers
3600 * an assert.
3601 */
3602 if (prev == CONNECTION_STATE_INIT) _trunk_connection_on_closed(conn, prev, state, uctx);
3603
3604 /*
3605 * See what the state of the trunk is
3606 * if there are no connections that could
3607 * potentially accept requests in the near
3608 * future, then fail all the requests in the
3609 * trunk backlog.
3610 */
3616}
3617
3618/** Connection transitioned to the halted state
3619 *
3620 * Remove the connection remove all lists, as it's likely about to be freed.
3621 *
3622 * Setting the trunk back to the init state ensures that if the code is ever
3623 * refactored and #connection_signal_reconnect is used after a connection
3624 * is halted, then everything is maintained in a valid state.
3625 *
3626 * @note This function is only called from the connection API as a watcher.
3627 *
3628 * @param[in] conn The connection which changes state.
3629 * @param[in] prev The connection is was in.
3630 * @param[in] state The connection is now in.
3631 * @param[in] uctx The trunk_connection_t wrapping the connection.
3632 */
3636 void *uctx)
3637{
3638 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3639 trunk_t *trunk = tconn->pub.trunk;
3640
3641 switch (tconn->pub.state) {
3642 case TRUNK_CONN_INIT:
3643 case TRUNK_CONN_CLOSED:
3645 break;
3646
3647 default:
3649 }
3650
3651 /*
3652 * It began life in the halted state,
3653 * and will end life in the halted state.
3654 */
3656
3657 /*
3658 * There should be no requests left on this
3659 * connection. They should have all been
3660 * moved off or failed.
3661 */
3663
3664 /*
3665 * And free the connection...
3666 */
3667 if (trunk->in_handler) {
3668 /*
3669 * ...later.
3670 */
3671 fr_dlist_insert_tail(&trunk->to_free, tconn);
3672 return;
3673 }
3674 talloc_free(tconn);
3675}
3676
3677/** Free a connection
3678 *
3679 * Enforces orderly free order of children of the tconn
3680 */
3682{
3684 fr_assert(!fr_dlist_entry_in_list(&tconn->entry)); /* Should not be in a list */
3685
3686 /*
3687 * Loop over all the requests we gathered
3688 * and transition them to the failed state,
3689 * freeing them.
3690 *
3691 * Usually, requests will be re-queued when
3692 * a connection enters the closed state,
3693 * but in this case because the whole trunk
3694 * is being freed, we don't bother, and
3695 * just signal to the API client that the
3696 * requests failed.
3697 */
3698 if (tconn->pub.trunk->freeing) {
3699 fr_dlist_head_t to_fail;
3700 trunk_request_t *treq = NULL;
3701
3702 fr_dlist_talloc_init(&to_fail, trunk_request_t, entry);
3703
3704 /*
3705 * Remove requests from this connection
3706 */
3708 while ((treq = fr_dlist_next(&to_fail, treq))) {
3709 trunk_request_t *prev;
3710
3711 prev = fr_dlist_remove(&to_fail, treq);
3713 treq = prev;
3714 }
3715 }
3716
3717 /*
3718 * Ensure we're not signalled by the connection
3719 * as it processes its backlog of state changes,
3720 * as we are about to be freed.
3721 */
3729
3730 /*
3731 * This may return -1, indicating the free was deferred
3732 * this is fine. It just means the conn will be freed
3733 * after all the handlers have exited.
3734 */
3735 (void)talloc_free(tconn->pub.conn);
3736 tconn->pub.conn = NULL;
3737
3738 return 0;
3739}
3740
3741/** Attempt to spawn a new connection
3742 *
3743 * Calls the API client's alloc() callback to create a new connection_t,
3744 * then inserts the connection into the 'connecting' list.
3745 *
3746 * @param[in] trunk to spawn connection in.
3747 * @param[in] now The current time.
3748 */
3750{
3751 trunk_connection_t *tconn;
3752
3753
3754 /*
3755 * Call the API client's callback to create
3756 * a new connection_t.
3757 */
3758 MEM(tconn = talloc_zero(trunk, trunk_connection_t));
3759 tconn->pub.trunk = trunk;
3760 tconn->pub.state = TRUNK_CONN_HALTED; /* All connections start in the halted state */
3761
3762 /*
3763 * Allocate a new connection_t or fail.
3764 */
3765 DO_CONNECTION_ALLOC(tconn);
3766
3768 fr_dlist_talloc_init(&tconn->sent, trunk_request_t, entry);
3772
3773 /*
3774 * OK, we have the connection, now setup watch
3775 * points so we know when it changes state.
3776 *
3777 * This lets us automatically move the tconn
3778 * between the different lists in the trunk
3779 * with minimum extra code.
3780 */
3782 _trunk_connection_on_init, false, tconn); /* Before init() has been called */
3783
3785 _trunk_connection_on_connecting, false, tconn); /* After init() has been called */
3786
3788 _trunk_connection_on_connected, false, tconn); /* After open() has been called */
3789
3791 _trunk_connection_on_closed, false, tconn); /* Before close() has been called */
3792
3794 _trunk_connection_on_failed, false, tconn); /* Before failed() has been called */
3795
3797 _trunk_connection_on_shutdown, false, tconn); /* After shutdown() has been called */
3798
3800 _trunk_connection_on_halted, false, tconn); /* About to be freed */
3801
3802 talloc_set_destructor(tconn, _trunk_connection_free);
3803
3804 connection_signal_init(tconn->pub.conn); /* annnnd GO! */
3805
3806 trunk->pub.last_open = now;
3807
3808 return 0;
3809}
3810
3811/** Pop a cancellation request off a connection's cancellation queue
3812 *
3813 * The request we return is advanced by the request moving out of the
3814 * cancel state and into the cancel_sent or cancel_complete state.
3815 *
3816 * One of these signalling functions must be called after the request
3817 * has been popped:
3818 *
3819 * - #trunk_request_signal_cancel_sent
3820 * The remote datastore has been informed, but we need to wait for acknowledgement.
3821 * The #trunk_request_demux_t callback must handle the acks calling
3822 * #trunk_request_signal_cancel_complete when an ack is received.
3823 *
3824 * - #trunk_request_signal_cancel_complete
3825 * The request was cancelled and we don't need to wait, clean it up immediately.
3826 *
3827 * @param[out] treq_out to process
3828 * @param[in] tconn Connection to drain cancellation request from.
3829 * @return
3830 * - 1 if no more requests.
3831 * - 0 if a new request was written to treq_out.
3832 * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3833 * memory or requests associated with the connection.
3834 * - -2 if called outside of the cancel muxer.
3835 */
3837{
3838 if (unlikely(tconn->pub.state == TRUNK_CONN_HALTED)) return -1;
3839
3841 "%s can only be called from within request_cancel_mux handler",
3842 __FUNCTION__)) return -2;
3843
3844 *treq_out = tconn->cancel_partial ? tconn->cancel_partial : fr_dlist_head(&tconn->cancel);
3845 if (!*treq_out) return 1;
3846
3847 return 0;
3848}
3849
3850/** Pop a request off a connection's pending queue
3851 *
3852 * The request we return is advanced by the request moving out of the partial or
3853 * pending states, when the mux function signals us.
3854 *
3855 * If the same request is returned again and again, it means the muxer isn't actually
3856 * doing anything with the request we returned, and it's and error in the muxer code.
3857 *
3858 * One of these signalling functions must be used after the request has been popped:
3859 *
3860 * - #trunk_request_signal_complete
3861 * The request was completed. Either we got a synchronous response, or we knew the
3862 * response without contacting an external server (cache).
3863 *
3864 * - #trunk_request_signal_fail
3865 * Failed muxing the request due to a permanent issue, i.e. an invalid request.
3866 *
3867 * - #trunk_request_signal_partial
3868 * Wrote part of a request. This request will be returned on the next call to this
3869 * function so that the request_mux function can finish writing it. Only useful
3870 * for stream type connections. Datagram type connections cannot have partial
3871 * writes.
3872 *
3873 * - #trunk_request_signal_sent Successfully sent a request.
3874 *
3875 * @param[out] treq_out to process
3876 * @param[in] tconn to pop a request from.
3877 * @return
3878 * - 1 if no more requests.
3879 * - 0 if a new request was written to treq_out.
3880 * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3881 * memory or requests associated with the connection.
3882 * - -2 if called outside of the muxer.
3883 */
3885{
3886 if (unlikely(tconn->pub.state == TRUNK_CONN_HALTED)) return -1;
3887
3889 "%s can only be called from within request_mux handler",
3890 __FUNCTION__)) return -2;
3891
3892 *treq_out = tconn->partial ? tconn->partial : fr_heap_peek(tconn->pending);
3893 if (!*treq_out) return 1;
3894
3895 return 0;
3896}
3897
3898/** Signal that a trunk connection is writable
3899 *
3900 * Should be called from the 'write' I/O handler to signal that requests can be enqueued.
3901 *
3902 * @param[in] tconn to signal.
3903 */
3905{
3906 trunk_t *trunk = tconn->pub.trunk;
3907
3908 if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3909 "%s cannot be called within a handler", __FUNCTION__)) return;
3910
3911 DEBUG3("[%" PRIu64 "] Signalled writable", tconn->pub.conn->id);
3912
3914}
3915
3916/** Signal that a trunk connection is readable
3917 *
3918 * Should be called from the 'read' I/O handler to signal that requests should be dequeued.
3919 *
3920 * @param[in] tconn to signal.
3921 */
3923{
3924 trunk_t *trunk = tconn->pub.trunk;
3925
3926 if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3927 "%s cannot be called within a handler", __FUNCTION__)) return;
3928
3929 DEBUG3("[%" PRIu64 "] Signalled readable", tconn->pub.conn->id);
3930
3932}
3933
3934/** Signal a trunk connection cannot accept more requests
3935 *
3936 * @param[in] tconn to signal.
3937 */
3939{
3940 /* Can be called anywhere */
3941
3942 switch (tconn->pub.state) {
3943 case TRUNK_CONN_ACTIVE:
3944 case TRUNK_CONN_FULL:
3946 break;
3947
3950 break;
3951
3952 default:
3953 return;
3954 }
3955}
3956
3957/** Signal a trunk connection is no longer full
3958 *
3959 * @param[in] tconn to signal.
3960 */
3962{
3963 switch (tconn->pub.state) {
3964 case TRUNK_CONN_FULL:
3965 trunk_connection_auto_unfull(tconn); /* Mark as active if it should be active */
3966 break;
3967
3969 /*
3970 * Do the appropriate state transition based on
3971 * how many requests the trunk connection is
3972 * currently servicing.
3973 */
3974 if (trunk_connection_is_full(tconn)) {
3976 break;
3977 }
3979 break;
3980
3981 /*
3982 * Unsetting the active flag just moves
3983 * the connection back to the normal
3984 * draining state.
3985 */
3986 case TRUNK_CONN_INACTIVE_DRAINING: /* Only an external signal can trigger this transition */
3988 break;
3989
3990 default:
3991 return;
3992 }
3993}
3994
3995/** Signal a trunk connection is no longer viable
3996 *
3997 * @param[in] tconn to signal.
3998 * @param[in] reason the connection is being reconnected.
3999 */
4004
4005/** Standard I/O read function
4006 *
4007 * Underlying FD in now readable, so call the trunk to read any pending requests
4008 * from this connection.
4009 *
4010 * @param[in] el The event list signalling.
4011 * @param[in] fd that's now readable.
4012 * @param[in] flags describing the read event.
4013 * @param[in] uctx The trunk connection handle (tconn).
4014 */
4016{
4017 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
4018
4020}
4021
4022/** Standard I/O write function
4023 *
4024 * Underlying FD is now writable, so call the trunk to write any pending requests
4025 * to this connection.
4026 *
4027 * @param[in] el The event list signalling.
4028 * @param[in] fd that's now writable.
4029 * @param[in] flags describing the write event.
4030 * @param[in] uctx The trunk connection handle (tcon).
4031 */
4033{
4034 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
4035
4037}
4038
4039
4040/** Returns true if the trunk connection is in one of the specified states
4041 *
4042 * @param[in] tconn To check state for.
4043 * @param[in] state to check
4044 * @return
4045 * - True if trunk connection is in a particular state.
4046 * - False if trunk connection is not in a particular state.
4047 */
4049{
4050 return (bool)(tconn->pub.state & state);
4051}
4052
4053/** Close connections in a particular connection list if they have no requests associated with them
4054 *
4055 * @param[in] trunk containing connections we want to close.
4056 * @param[in] head of list of connections to examine.
4057 */
4059{
4060 trunk_connection_t *tconn = NULL;
4061
4062 while ((tconn = fr_dlist_next(head, tconn))) {
4063 trunk_connection_t *prev;
4064
4066
4067 prev = fr_dlist_prev(head, tconn);
4068
4069 DEBUG3("Closing %s connection with no requests",
4071 /*
4072 * Close the connection as gracefully
4073 * as possible by signalling it should
4074 * shutdown.
4075 *
4076 * The connection, should, if serviced
4077 * correctly by the underlying library,
4078 * automatically transition to halted after
4079 * all pending reads/writes are
4080 * complete at which point we'll be informed
4081 * and free our tconn wrapper.
4082 */
4084 tconn = prev;
4085 }
4086}
4087
4088/** Rebalance connections across active trunk members when a new connection becomes active
4089 *
4090 * We don't have any visibility into the connection prioritisation algorithm
4091 * it's essentially a black box.
4092 *
4093 * We can however determine when the correct level of requests per connection
4094 * has been reached, by dequeuing and requeing requests up until the point
4095 * where the connection that just had a request dequeued, receives the same
4096 * request back.
4097 *
4098 * @param[in] trunk The trunk to rebalance.
4099 */
4100static void trunk_rebalance(trunk_t *trunk)
4101{
4103
4105
4106 /*
4107 * Only rebalance if the top and bottom of
4108 * the heap are not equal.
4109 */
4110 if (trunk->funcs.connection_prioritise(fr_minmax_heap_max_peek(trunk->active), head) == 0) return;
4111
4112 DEBUG3("Rebalancing requests");
4113
4114 /*
4115 * Keep requeuing requests from the connection
4116 * at the bottom of the heap until the
4117 * connection at the top is shifted from that
4118 * position.
4119 */
4120 while ((fr_minmax_heap_min_peek(trunk->active) == head) &&
4122 TRUNK_REQUEST_STATE_PENDING, 1, false));
4123}
4124
4125/** Implements the algorithm we use to manage requests per connection levels
4126 *
4127 * This is executed periodically using a timer event, and opens/closes
4128 * connections.
4129 *
4130 * The aim is to try and keep the request per connection level in a sweet spot,
4131 * where there's enough outstanding work for the connection/pipelining to work
4132 * efficiently, but not so much so that we encounter increased latency.
4133 *
4134 * In the request enqueue and dequeue functions we record every time the
4135 * average number of requests per connection goes above the target count
4136 * and record every time the average number of requests per connection goes
4137 * below the target count.
4138 *
4139 * This may sound expensive, but in all cases we're just summing counters.
4140 * CPU time required does not increase with additional requests, only with
4141 * large numbers of connections.
4142 *
4143 * If we do encounter scaling issues, we can always maintain the counters
4144 * as aggregates as an optimisation later.
4145 *
4146 * If when the management function runs, the trunk was above the target
4147 * most recently, we:
4148 * - Return if we've been in this state for a shorter period than 'open_delay'.
4149 * - Return if we're at max.
4150 * - Return if opening a new connection will take us below the load target.
4151 * - Return if we last opened a connection within 'open_delay'.
4152 * - Otherwise we attempt to open a new connection.
4153 *
4154 * If the trunk we below the target most recently, we:
4155 * - Return if we've been in this state for a shorter period than 'close_delay'.
4156 * - Return if we're at min.
4157 * - Return if we have no connections.
4158 * - Close a connection if min is 0, and we have no outstanding
4159 * requests. Then return.
4160 * - Return if closing a new connection will take us above the load target.
4161 * - Return if we last closed a connection within 'closed_delay'.
4162 * - Otherwise we move a connection to draining state.
4163 */
4164static void trunk_manage(trunk_t *trunk, fr_time_t now)
4165{
4166 trunk_connection_t *tconn = NULL;
4167 trunk_request_t *treq;
4168 uint32_t average = 0;
4169 uint32_t req_count;
4170 uint16_t conn_count;
4171 trunk_state_t new_state;
4172
4173 DEBUG4("Managing trunk");
4174
4175 /*
4176 * Cleanup requests in our request cache which
4177 * have been reapable for too long.
4178 */
4179 while ((treq = fr_dlist_tail(&trunk->free_requests)) &&
4181
4182 /*
4183 * If we have idle connections, then close them.
4184 */
4187 fr_time_t idle_cutoff = fr_time_sub(now, trunk->conf.idle_timeout);
4188
4189 for (tconn = fr_minmax_heap_iter_init(trunk->active, &iter);
4190 tconn;
4191 tconn = fr_minmax_heap_iter_next(trunk->active, &iter)) {
4192 /*
4193 * The connection has outstanding requests without replies, don't do anything.
4194 */
4195 if (fr_heap_num_elements(tconn->pending) > 0) continue;
4196
4197 /*
4198 * The connection was last active after the idle cutoff time, don't do anything.
4199 */
4200 if (fr_time_gt(tconn->pub.last_write_success, idle_cutoff)) continue;
4201
4202 /*
4203 * This connection has been inactive since before the idle timeout. Drain it,
4204 * and free it.
4205 */
4207 }
4208 }
4209
4210 /*
4211 * Free any connections which have drained
4212 * and we didn't reactivate during the last
4213 * round of management.
4214 */
4218
4219 /*
4220 * Process deferred connection freeing
4221 */
4222 if (!trunk->in_handler) {
4223 while ((tconn = fr_dlist_head(&trunk->to_free))) talloc_free(fr_dlist_remove(&trunk->to_free, tconn));
4224 }
4225
4226 /*
4227 * Update the state of the trunk
4228 */
4230 new_state = TRUNK_STATE_ACTIVE;
4231 } else {
4232 /*
4233 * INIT / CONNECTING / FULL mean connections will become active
4234 * so the trunk is PENDING
4235 */
4240 }
4241
4242 if (new_state != trunk->pub.state) TRUNK_STATE_TRANSITION(new_state);
4243
4244 /*
4245 * A trunk can be signalled to not proactively
4246 * manage connections if a destination is known
4247 * to be unreachable, and doing so would result
4248 * in spurious connections still being opened.
4249 *
4250 * We still run other connection management
4251 * functions and just short circuit the function
4252 * here.
4253 */
4254 if (!trunk->managing_connections) return;
4255
4256 /*
4257 * We're above the target requests per connection
4258 * spawn more connections!
4259 */
4261 /*
4262 * If connecting is provided, check we
4263 * wouldn't have too many connections in
4264 * the connecting state.
4265 *
4266 * This is a throttle in the case of transitory
4267 * load spikes, or a backend becoming
4268 * unavailable.
4269 */
4270 if ((trunk->conf.connecting > 0) &&
4272 trunk->conf.connecting)) {
4273 DEBUG4("Not opening connection - Too many (%u) connections in the connecting state",
4274 trunk->conf.connecting);
4275 return;
4276 }
4277
4278 trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4279
4280 /*
4281 * Only apply hysteresis if we have at least
4282 * one available connection.
4283 */
4284 if (conn_count && fr_time_gt(fr_time_add(trunk->pub.last_above_target, trunk->conf.open_delay), now)) {
4285 DEBUG4("Not opening connection - Need to be above target for %pVs. It's been %pVs",
4288 return; /* too soon */
4289 }
4290
4291 /*
4292 * We don't consider 'draining' connections
4293 * in the max calculation, as if we do
4294 * determine that we need to spawn a new
4295 * request, then we'd move all 'draining'
4296 * connections to active before spawning
4297 * any new connections.
4298 */
4299 if ((trunk->conf.max > 0) && (conn_count >= trunk->conf.max)) {
4300 DEBUG4("Not opening connection - Have %u connections, need %u or below",
4301 conn_count, trunk->conf.max);
4302 return;
4303 }
4304
4305 /*
4306 * We consider requests pending on all connections
4307 * and the trunk's backlog as that's the current count
4308 * load.
4309 */
4310 if (!req_count) {
4311 DEBUG4("Not opening connection - No outstanding requests");
4312 return;
4313 }
4314
4315 /*
4316 * Do the n+1 check, i.e. if we open one connection
4317 * will that take us below our target threshold.
4318 */
4319 if (conn_count > 0) {
4320 average = ROUND_UP_DIV(req_count, (conn_count + 1));
4321 if (average < trunk->conf.target_req_per_conn) {
4322 DEBUG4("Not opening connection - Would leave us below our target requests "
4323 "per connection (now %u, after open %u)",
4324 ROUND_UP_DIV(req_count, conn_count), average);
4325 return;
4326 }
4327 } else {
4328 (void)trunk_connection_spawn(trunk, now);
4329 return;
4330 }
4331
4332 /*
4333 * If we've got a connection in the draining list
4334 * move it back into the active list if we've
4335 * been requested to add a connection back in.
4336 */
4337 tconn = fr_dlist_head(&trunk->draining);
4338 if (tconn) {
4339 if (trunk_connection_is_full(tconn)) {
4341 } else {
4343 }
4344 return;
4345 }
4346
4347 /*
4348 * Implement delay if there's no connections that
4349 * could be immediately re-activated.
4350 */
4351 if (fr_time_gt(fr_time_add(trunk->pub.last_open, trunk->conf.open_delay), now)) {
4352 DEBUG4("Not opening connection - Need to wait %pVs before opening another connection. "
4353 "It's been %pVs",
4356 return;
4357 }
4358
4359 DEBUG4("Opening connection - Above target requests per connection (now %u, target %u)",
4360 ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4361 /* last_open set by trunk_connection_spawn */
4362 (void)trunk_connection_spawn(trunk, now);
4363 }
4364
4365 /*
4366 * We're below the target requests per connection.
4367 * Free some connections...
4368 */
4369 else if (fr_time_gt(trunk->pub.last_below_target, trunk->pub.last_above_target)) {
4370 if (fr_time_gt(fr_time_add(trunk->pub.last_below_target, trunk->conf.close_delay), now)) {
4371 DEBUG4("Not closing connection - Need to be below target for %pVs. It's been %pVs",
4374 return; /* too soon */
4375 }
4376
4377 trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4378
4379 if (!conn_count) {
4380 DEBUG4("Not closing connection - No connections to close!");
4381 return;
4382 }
4383
4384 if ((trunk->conf.min > 0) && ((conn_count - 1) < trunk->conf.min)) {
4385 DEBUG4("Not closing connection - Have %u connections, need %u or above",
4386 conn_count, trunk->conf.min);
4387 return;
4388 }
4389
4390 if (!req_count) {
4391 DEBUG4("Closing connection - No outstanding requests");
4392 goto close;
4393 }
4394
4395 /*
4396 * The minimum number of connections must be set
4397 * to zero for this to work.
4398 * min == 0, no requests, close all the connections.
4399 * This is useful for backup databases, when
4400 * maintaining the connection would lead to lots of
4401 * log file churn.
4402 */
4403 if (conn_count == 1) {
4404 DEBUG4("Not closing connection - Would leave connections "
4405 "and there are still %u outstanding requests", req_count);
4406 return;
4407 }
4408
4409 /*
4410 * Do the n-1 check, i.e. if we close one connection
4411 * will that take us above our target threshold.
4412 */
4413 average = ROUND_UP_DIV(req_count, (conn_count - 1));
4414 if (average > trunk->conf.target_req_per_conn) {
4415 DEBUG4("Not closing connection - Would leave us above our target requests per connection "
4416 "(now %u, after close %u)", ROUND_UP_DIV(req_count, conn_count), average);
4417 return;
4418 }
4419
4420 DEBUG4("Closing connection - Below target requests per connection (now %u, target %u)",
4421 ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4422
4423 close:
4424 if (fr_time_gt(fr_time_add(trunk->pub.last_closed, trunk->conf.close_delay), now)) {
4425 DEBUG4("Not closing connection - Need to wait %pVs before closing another connection. "
4426 "It's been %pVs",
4429 return;
4430 }
4431
4432 /*
4433 * If the last event on the trunk was a connection failure and
4434 * there is only one connection, this may well be a reconnect
4435 * attempt after a failure - and needs to persist otherwise
4436 * the last event will be a failure and no new connection will
4437 * be made, leading to no new requests being enqueued.
4438 */
4439 if (fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
4440 fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed) && (conn_count == 1)) {
4441 DEBUG4("Not closing remaining connection - last event was a failure");
4442 return;
4443 }
4444
4445 /*
4446 * Inactive connections get counted in the
4447 * set of viable connections, but are likely
4448 * to be congested or dead, so we drain
4449 * (and possibly eventually free) those first.
4450 */
4451 if ((tconn = fr_dlist_tail(&trunk->inactive))) {
4452 /*
4453 * If the connection has no requests associated
4454 * with it then immediately free.
4455 */
4457 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4458 } else {
4460 }
4461 /*
4462 * It is possible to have too may connecting
4463 * connections when the connections are
4464 * taking a while to open and the number
4465 * of requests decreases.
4466 */
4467 } else if ((tconn = fr_dlist_tail(&trunk->connecting))) {
4468 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4469
4470 /*
4471 * Finally if there are no "connecting"
4472 * connections to close, and no "inactive"
4473 * connections, start draining "active"
4474 * connections.
4475 */
4476 } else if ((tconn = fr_minmax_heap_max_peek(trunk->active))) {
4477 /*
4478 * If the connection has no requests associated
4479 * with it then immediately free.
4480 */
4482 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4483 } else {
4485 }
4486 }
4487
4488 trunk->pub.last_closed = now;
4489
4490
4491 return;
4492 }
4493}
4494
4495/** Event to periodically call the connection management function
4496 *
4497 * @param[in] tl this event belongs to.
4498 * @param[in] now current time.
4499 * @param[in] uctx The trunk.
4500 */
4501static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx)
4502{
4503 trunk_t *trunk = talloc_get_type_abort(uctx, trunk_t);
4504
4505 trunk_manage(trunk, now);
4506
4508 if (fr_timer_in(trunk, tl, &trunk->manage_ev, trunk->conf.manage_interval,
4509 false, _trunk_timer, trunk) < 0) {
4510 PERROR("Failed inserting trunk management event");
4511 /* Not much we can do, hopefully the trunk will be freed soon */
4512 }
4513 }
4514}
4515
4516/** Return a count of requests on a connection in a specific state
4517 *
4518 * @param[in] trunk to retrieve counts for.
4519 * @param[in] conn_state One or more connection states or'd together.
4520 * @param[in] req_state One or more request states or'd together.
4521 * @return The number of requests in a particular state, on connection in a particular state.
4522 */
4523uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
4524{
4525 uint64_t count = 0;
4526 trunk_connection_t *tconn = NULL;
4528
4529#define COUNT_BY_STATE(_state, _list) \
4530do { \
4531 if (conn_state & (_state)) { \
4532 tconn = NULL; \
4533 while ((tconn = fr_dlist_next(&trunk->_list, tconn))) { \
4534 count += trunk_request_count_by_connection(tconn, req_state); \
4535 } \
4536 } \
4537} while (0)
4538
4539 if (conn_state & TRUNK_CONN_ACTIVE) {
4540 for (tconn = fr_minmax_heap_iter_init(trunk->active, &iter);
4541 tconn;
4542 tconn = fr_minmax_heap_iter_next(trunk->active, &iter)) {
4543 count += trunk_request_count_by_connection(tconn, req_state);
4544 }
4545 }
4546
4549 COUNT_BY_STATE(TRUNK_CONN_INACTIVE_DRAINING, inactive_draining);
4552
4554
4555 return count;
4556}
4557
4558/** Update timestamps for when we last had a transition from above target to below target or vice versa
4559 *
4560 * Should be called on every time a connection or request is allocated or freed.
4561 *
4562 * @param[out] conn_count_out How many connections we considered.
4563 * @param[out] req_count_out How many requests we considered.
4564 * @param[in] trunk to operate on.
4565 * @param[in] now The current time.
4566 * @param[in] verify if true (and this is a debug build), then assert if req_per_conn
4567 * has changed.
4568 * @return
4569 * - 0 if the average couldn't be calculated (no requests or no connections).
4570 * - The average number of requests per connection.
4571 */
4572static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_count_out,
4573 trunk_t *trunk, fr_time_t now,
4574 NDEBUG_UNUSED bool verify)
4575{
4576 uint32_t req_count = 0;
4577 uint16_t conn_count = 0;
4578 uint64_t req_per_conn = 0;
4579
4581
4582 /*
4583 * No need to update these as the trunk is being freed
4584 */
4585 if (trunk->freeing) goto done;
4586
4587 /*
4588 * Count all connections except draining and draining to free.
4589 *
4590 * Omitting these connection states artificially raises the
4591 * request to connection ratio, so that we can preemptively spawn
4592 * new connections.
4593 *
4594 * In the case of TRUNK_CONN_DRAINING | TRUNK_CONN_INACTIVE_DRAINING
4595 * the trunk management code has enough hysteresis to not
4596 * immediately reactivate the connection.
4597 *
4598 * In the case of TRUNK_CONN_DRAINING_TO_FREE the trunk
4599 * management code should spawn a new connection to takes its place.
4600 *
4601 * Connections placed in the DRAINING_TO_FREE state are being
4602 * closed preemptively to deal with bugs on the server we're
4603 * talking to, or misconfigured firewalls which are trashing
4604 * TCP/UDP connection states.
4605 */
4610
4611 /*
4612 * Requests on all connections
4613 */
4614 req_count = trunk_request_count_by_state(trunk,
4617
4618 /*
4619 * No connections, but we do have requests
4620 */
4621 if (conn_count == 0) {
4622 if ((req_count > 0) && (trunk->conf.target_req_per_conn > 0)) goto above_target;
4623 goto done;
4624 }
4625
4626 if (req_count == 0) {
4627 if (trunk->conf.target_req_per_conn > 0) goto below_target;
4628 goto done;
4629 }
4630
4631 /*
4632 * Calculate the req_per_conn
4633 */
4634 req_per_conn = ROUND_UP_DIV(req_count, conn_count);
4635 if (req_per_conn > trunk->conf.target_req_per_conn) {
4636 above_target:
4637 /*
4638 * Edge - Below target to above target (too many requests per conn - spawn more)
4639 *
4640 * The equality check is correct here as both values start at 0.
4641 */
4643 } else if (req_per_conn < trunk->conf.target_req_per_conn) {
4644 below_target:
4645 /*
4646 * Edge - Above target to below target (too few requests per conn - close some)
4647 *
4648 * The equality check is correct here as both values start at 0.
4649 */
4651 }
4652
4653done:
4654 if (conn_count_out) *conn_count_out = conn_count;
4655 if (req_count_out) *req_count_out = req_count;
4656
4657 /*
4658 * Check we haven't missed a call to trunk_requests_per_connection
4659 */
4660 fr_assert(!verify || (trunk->last_req_per_conn == 0) || (req_per_conn == trunk->last_req_per_conn));
4661
4662 trunk->last_req_per_conn = req_per_conn;
4663
4664 return req_per_conn;
4665}
4666
4667/** Drain the backlog of as many requests as possible
4668 *
4669 * @param[in] trunk To drain backlog requests for.
4670 */
4671static void trunk_backlog_drain(trunk_t *trunk)
4672{
4673 trunk_request_t *treq;
4674
4675 if (fr_heap_num_elements(trunk->backlog) == 0) return;
4676
4677 /*
4678 * If it's always writable, this isn't
4679 * really a noteworthy event.
4680 */
4681 if (!trunk->conf.always_writable) DEBUG3("Draining backlog of requests");
4682
4683 /*
4684 * Do *NOT* add an artificial limit
4685 * here. We rely on all available
4686 * connections entering the full
4687 * state and transitioning back to
4688 * active in order to drain the
4689 * backlog.
4690 */
4691 while ((treq = fr_heap_peek(trunk->backlog))) {
4692 switch (trunk_request_enqueue_existing(treq)) {
4693 case TRUNK_ENQUEUE_OK:
4694 continue;
4695
4696 /*
4697 * Signal to stop
4698 */
4700 break;
4701
4702 /*
4703 * Failed enqueueing the request,
4704 * have it enter the failed state
4705 * which will free it and
4706 * re-enliven the yielded request.
4707 */
4709 case TRUNK_ENQUEUE_FAIL:
4711 continue;
4712
4715 return;
4716 }
4717 }
4718}
4719
4720/** Force the trunk to re-establish its connections
4721 *
4722 * @param[in] trunk to signal.
4723 * @param[in] states One or more states or'd together.
4724 * @param[in] reason Why the connections are being signalled to reconnect.
4725 */
4726void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
4727{
4728
4729#define RECONNECT_BY_STATE(_state, _list) \
4730do { \
4731 if (states & (_state)) { \
4732 size_t i; \
4733 for (i = fr_dlist_num_elements(&trunk->_list); i > 0; i--) { \
4734 connection_signal_reconnect(((trunk_connection_t *)fr_dlist_tail(&trunk->_list))->pub.conn, reason); \
4735 } \
4736 } \
4737} while (0)
4738
4739 /*
4740 * Connections in the 'connecting' state
4741 * may re-enter that state, so we need to
4742 * be careful not to enter an infinite
4743 * loop, as we iterate over the list
4744 * again and again.
4745 */
4747
4748 if (states & TRUNK_CONN_ACTIVE) {
4749 trunk_connection_t *tconn;
4750 while ((tconn = fr_minmax_heap_min_peek(trunk->active))) connection_signal_reconnect(tconn->pub.conn, reason);
4751 }
4752
4760}
4761
4762/** Start the trunk running
4763 *
4764 */
4766{
4767 uint16_t i;
4768
4769 if (unlikely(trunk->started)) return 0;
4770
4771 /*
4772 * Spawn the initial set of connections
4773 */
4774 for (i = 0; i < trunk->conf.start; i++) {
4775 DEBUG("[%i] Starting initial connection", i);
4776 if (trunk_connection_spawn(trunk, fr_time()) != 0) return -1;
4777 }
4778
4779 /*
4780 * If the idle timeout is set, AND there's no management interval, OR the management interval is
4781 * less than the idle timeout, update the management interval.
4782 */
4786 trunk->conf.manage_interval = trunk->conf.idle_timeout;
4787 }
4788
4790 /*
4791 * Insert the event timer to manage
4792 * the interval between managing connections.
4793 */
4794 if (fr_timer_in(trunk, trunk->el->tl, &trunk->manage_ev, trunk->conf.manage_interval,
4795 false, _trunk_timer, trunk) < 0) {
4796 PERROR("Failed inserting trunk management event");
4797 return -1;
4798 }
4799 }
4800 trunk->started = true;
4801 trunk->managing_connections = true;
4802
4803 return 0;
4804}
4805
4806/** Allow the trunk to open and close connections in response to load
4807 *
4808 */
4810{
4811 if (!trunk->started || trunk->managing_connections) return;
4812
4813 DEBUG3("Connection management enabled");
4814 trunk->managing_connections = true;
4815}
4816
4817/** Stop the trunk from opening and closing connections in response to load
4818 *
4819 */
4821{
4822 if (!trunk->started || !trunk->managing_connections) return;
4823
4824 DEBUG3("Connection management disabled");
4825 trunk->managing_connections = false;
4826}
4827
4828/** Schedule a trunk management event for the next time the event loop is executed
4829 */
4831{
4832 if (!trunk->started || !trunk->managing_connections) return 0;
4833
4834 if (fr_timer_in(trunk, trunk->el->tl, &trunk->manage_ev, fr_time_delta_wrap(0),
4835 false, _trunk_timer, trunk) < 0) {
4836 PERROR("Failed inserting trunk management event");
4837 return -1;
4838 }
4839
4840 return 0;
4841}
4842
4843/** Order connections by queue depth
4844 *
4845 */
4846static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
4847{
4850
4853
4854 /*
4855 * Add a fudge factor of 1 to reduce spurious rebalancing
4856 */
4857 return ((a_count > b_count) && ((a_count - b_count) > 1)) - ((b_count > a_count) && ((b_count - a_count) > 1));
4858}
4859
4860/** Free a trunk, gracefully closing all connections.
4861 *
4862 */
4863static int _trunk_free(trunk_t *trunk)
4864{
4865 trunk_connection_t *tconn;
4866 trunk_request_t *treq;
4867 trunk_watch_entry_t *watch;
4868 size_t i;
4869
4870 DEBUG4("Trunk free %p", trunk);
4871
4872 trunk->freeing = true; /* Prevent re-enqueuing */
4873
4874 /*
4875 * We really don't want this firing after
4876 * we've freed everything.
4877 */
4879
4880 /*
4881 * Now free the connections in each of the lists.
4882 *
4883 * Each time a connection is freed it removes itself from the list
4884 * its in, which means the head should keep advancing automatically.
4885 */
4886 while ((tconn = fr_minmax_heap_min_peek(trunk->active))) connection_signal_halt(tconn->pub.conn);
4887 while ((tconn = fr_dlist_head(&trunk->init))) connection_signal_halt(tconn->pub.conn);
4888 while ((tconn = fr_dlist_head(&trunk->connecting))) connection_signal_halt(tconn->pub.conn);
4889 while ((tconn = fr_dlist_head(&trunk->full))) connection_signal_halt(tconn->pub.conn);
4890 while ((tconn = fr_dlist_head(&trunk->inactive))) connection_signal_halt(tconn->pub.conn);
4891 while ((tconn = fr_dlist_head(&trunk->inactive_draining))) connection_signal_halt(tconn->pub.conn);
4892 while ((tconn = fr_dlist_head(&trunk->closed))) connection_signal_halt(tconn->pub.conn);
4893 while ((tconn = fr_dlist_head(&trunk->draining))) connection_signal_halt(tconn->pub.conn);
4894 while ((tconn = fr_dlist_head(&trunk->draining_to_free))) connection_signal_halt(tconn->pub.conn);
4895
4896 /*
4897 * Process any deferred connection frees
4898 */
4899 while ((tconn = fr_dlist_head(&trunk->to_free))) talloc_free(fr_dlist_remove(&trunk->to_free, tconn));
4900
4901 /*
4902 * Free any requests left in the backlog
4903 */
4904 while ((treq = fr_heap_peek(trunk->backlog))) trunk_request_enter_failed(treq);
4905
4906 /*
4907 * Free any requests in our request cache
4908 */
4909 while ((treq = fr_dlist_head(&trunk->free_requests))) talloc_free(treq);
4910
4911 /*
4912 * Free any entries in the watch lists
4913 */
4914 for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
4915 while ((watch = fr_dlist_pop_head(&trunk->watch[i]))) talloc_free(watch);
4916 }
4917
4918 return 0;
4919}
4920
4921/** Allocate a new collection of connections
4922 *
4923 * This function should be called first to allocate a new trunk connection.
4924 *
4925 * After the trunk has been allocated, #trunk_request_alloc and
4926 * #trunk_request_enqueue should be used to allocate memory for trunk
4927 * requests, and pass a preq (protocol request) to the trunk for
4928 * processing.
4929 *
4930 * The trunk will then asynchronously process the request, writing the result
4931 * to a specified rctx. See #trunk_request_enqueue for more details.
4932 *
4933 * @note Trunks may not be shared between multiple threads under any circumstances.
4934 *
4935 * @param[in] ctx To use for any memory allocations. Must be thread local.
4936 * @param[in] el to use for I/O and timer events.
4937 * @param[in] funcs Callback functions.
4938 * @param[in] conf Common user configurable parameters.
4939 * @param[in] log_prefix To prepend to global messages.
4940 * @param[in] uctx User data to pass to the alloc function.
4941 * @param[in] delay_start If true, then we will not spawn any connections
4942 * until the first request is enqueued.
4943 * @return
4944 * - New trunk handle on success.
4945 * - NULL on error.
4946 */
4948 trunk_io_funcs_t const *funcs, trunk_conf_t const *conf,
4949 char const *log_prefix, void const *uctx, bool delay_start)
4950{
4951 trunk_t *trunk;
4952 size_t i;
4953
4954 /*
4955 * Check we have the functions we need
4956 */
4957 if (!fr_cond_assert(funcs->connection_alloc)) return NULL;
4958
4959 MEM(trunk = talloc_zero(ctx, trunk_t));
4960 trunk->el = el;
4961 trunk->log_prefix = talloc_strdup(trunk, log_prefix);
4962
4963 memcpy(&trunk->funcs, funcs, sizeof(trunk->funcs));
4964 if (!trunk->funcs.connection_prioritise) {
4966 }
4968
4969 memcpy(&trunk->conf, conf, sizeof(trunk->conf));
4970
4971 memcpy(&trunk->uctx, &uctx, sizeof(trunk->uctx));
4972 talloc_set_destructor(trunk, _trunk_free);
4973
4974 /*
4975 * Unused request list...
4976 */
4978
4979 /*
4980 * Request backlog queue
4981 */
4983 trunk_request_t, heap_id, 0));
4984
4985 /*
4986 * Connection queues and trees
4987 */
4989 trunk_connection_t, heap_id, 0));
4999
5000 /*
5001 * Watch lists
5002 */
5003 for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
5005 }
5006
5007 DEBUG4("Trunk allocated %p", trunk);
5008
5009 if (!delay_start) {
5010 if (trunk_start(trunk) < 0) {
5011 talloc_free(trunk);
5012 return NULL;
5013 }
5014 }
5015
5016 return trunk;
5017}
5018
5019#ifndef TALLOC_GET_TYPE_ABORT_NOOP
5020/** Verify a trunk
5021 *
5022 * A trunk has some number of connections, which each have some number of requests. The connections and
5023 * requests are in differing kinds of containers depending on their state and how they are used, and may
5024 * have fields that can only be validated by comparison with a parent. We had planned on passing a "context"
5025 * down with the ancestral values, but that breaks the foo_verify() API. Each foo_verify() will only verify the
5026 * foo's children.
5027 */
5028void trunk_verify(char const *file, int line, trunk_t *trunk)
5029{
5030 fr_fatal_assert_msg(trunk, "CONSISTENCY CHECK FAILED %s[%i]: trunk_t pointer was NULL", file, line);
5031 (void) talloc_get_type_abort(trunk, trunk_t);
5032
5033 for (size_t i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
5034 _fr_dlist_verify(file, line, &trunk->watch[i]);
5035 }
5036
5037#define IO_FUNC_VERIFY(_func) \
5038 fr_fatal_assert_msg(trunk->funcs._func, "CONSISTENCY_CHECK_FAILED %s[%i}: " #_func " was NULL", file, line)
5039
5040 /*
5041 * Only a few of the function pointers *must* be non-NULL..
5042 */
5044 IO_FUNC_VERIFY(connection_prioritise);
5046
5047#define TRUNK_TCONN_CHECKS(_tconn, _state) \
5048do { \
5049 fr_fatal_assert_msg(trunk == _tconn->pub.trunk, \
5050 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-trunk mismatch", file, line); \
5051 fr_fatal_assert_msg(_state == _tconn->pub.state, \
5052 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-state mismatch", file, line); \
5053} while (0)
5054
5055#define TCONN_DLIST_VERIFY(_dlist, _state) \
5056do { \
5057 _fr_dlist_verify(file, line, &(trunk->_dlist)); \
5058 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5059 trunk_connection_verify(file, line, tconn); \
5060 TRUNK_TCONN_CHECKS(tconn, _state); \
5061 } \
5062} while (0)
5063
5064#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state) \
5065do {\
5066 fr_minmax_heap_verify(file, line, trunk->_heap); \
5067 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5068 trunk_connection_verify(file, line, tconn); \
5069 TRUNK_TCONN_CHECKS(tconn, _state); \
5070 }} \
5071} while (0)
5072
5073 fr_dlist_verify(&(trunk->free_requests));
5074 FR_HEAP_VERIFY(trunk->backlog);
5075
5082 /* TCONN_DLIST_VERIFY(failed, ???); */
5087}
5088
5090{
5091 fr_fatal_assert_msg(tconn, "CONSISTENCY CHECK FAILED %s[%i]: trunk_connection_t pointer was NULL", file, line);
5092 (void) talloc_get_type_abort(tconn, trunk_connection_t);
5093
5094 (void) talloc_get_type_abort(tconn->pub.trunk, trunk_t);
5095
5096 /*
5097 * shouldn't be both in heap and on list--but it doesn't look like moves
5098 * to active heap wipe the dlist pointers.
5099 */
5100
5101#define TCONN_TREQ_CHECKS(_treq, _state) \
5102do { \
5103 fr_fatal_assert_msg(tconn == _treq->pub.tconn, \
5104 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-tconn mismatch", file, line); \
5105 fr_fatal_assert_msg(tconn->pub.trunk == _treq->pub.trunk, \
5106 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-trunk mismatch", file, line); \
5107 fr_fatal_assert_msg(_state == _treq->pub.state, \
5108 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-state mismatch", file, line); \
5109} while (0)
5110
5111#define TREQ_DLIST_VERIFY(_dlist, _state) \
5112do { \
5113 _fr_dlist_verify(file, line, &(tconn->_dlist)); \
5114 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5115 trunk_request_verify(file, line, treq); \
5116 TCONN_TREQ_CHECKS(treq, _state); \
5117 } \
5118} while (0)
5119
5120#define TREQ_HEAP_VERIFY(_heap, _state) \
5121do { \
5122 fr_heap_iter_t _iter; \
5123 fr_heap_verify(file, line, tconn->_heap); \
5124 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5125 treq; \
5126 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5127 trunk_request_verify(file, line, treq); \
5128 TCONN_TREQ_CHECKS(treq, _state); \
5129 } \
5130} while (0)
5131
5132#define TREQ_OPTION_VERIFY(_option, _state) \
5133do { \
5134 if (tconn->_option) { \
5135 trunk_request_verify(file, line, tconn->_option); \
5136 TCONN_TREQ_CHECKS(tconn->_option, _state); \
5137 } \
5138} while (0)
5139
5140 /* verify associated requests */
5147}
5148
5149void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
5150{
5151 fr_fatal_assert_msg(treq, "CONSISTENCY CHECK FAILED %s[%i]: trunk_request_t pointer was NULL", file, line);
5152 (void) talloc_get_type_abort(treq, trunk_request_t);
5153
5154#ifdef WITH_VERIFY_PTR
5155 if (treq->pub.request) request_verify(file, line, treq->pub.request);
5156#endif
5157}
5158
5159
5160bool trunk_search(trunk_t *trunk, void *ptr)
5161{
5162#define TCONN_DLIST_SEARCH(_dlist) \
5163do { \
5164 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5165 if (ptr == tconn) { \
5166 fr_fprintf(stderr, "trunk_search: tconn %p on " #_dlist "\n", ptr); \
5167 return true; \
5168 } \
5169 if (trunk_connection_search(tconn, ptr)) { \
5170 fr_fprintf(stderr, " in tconn %p on " #_dlist "\n", tconn); \
5171 return true; \
5172 } \
5173 } \
5174} while (0)
5175
5176#define TCONN_MINMAX_HEAP_SEARCH(_heap) \
5177do { \
5178 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5179 if (ptr == tconn) { \
5180 fr_fprintf(stderr, "trunk_search: tconn %p on " #_heap "\n", ptr); \
5181 return true; \
5182 } \
5183 if (trunk_connection_search(tconn, ptr)) { \
5184 fr_fprintf(stderr, " on tconn %p on " #_heap "\n", tconn); \
5185 return true; \
5186 } \
5187 }}\
5188} while (0)
5189
5191 TCONN_DLIST_SEARCH(connecting);
5193 TCONN_DLIST_SEARCH(full);
5194 TCONN_DLIST_SEARCH(inactive);
5195 TCONN_DLIST_SEARCH(inactive_draining);
5196 TCONN_DLIST_SEARCH(failed);
5197 TCONN_DLIST_SEARCH(closed);
5198 TCONN_DLIST_SEARCH(draining);
5199 TCONN_DLIST_SEARCH(draining_to_free);
5200 TCONN_DLIST_SEARCH(to_free);
5201
5202 return false;
5203}
5204
5206{
5207#define TREQ_DLIST_SEARCH(_dlist) \
5208do { \
5209 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5210 if (ptr == treq) { \
5211 fr_fprintf(stderr, "trunk_search: treq %p on " #_dlist "\n", ptr); \
5212 return true; \
5213 } \
5214 if (trunk_request_search(treq, ptr)) { \
5215 fr_fprintf(stderr, "trunk_search: preq %p found on " #_dlist, ptr); \
5216 return true; \
5217 } \
5218 } \
5219} while (0)
5220
5221#define TREQ_HEAP_SEARCH(_heap) \
5222do { \
5223 fr_heap_iter_t _iter; \
5224 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5225 treq; \
5226 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5227 if (ptr == treq) { \
5228 fr_fprintf(stderr, "trunk_search: treq %p in " #_heap "\n", ptr); \
5229 return true; \
5230 } \
5231 if (trunk_request_search(treq, ptr)) { \
5232 fr_fprintf(stderr, "trunk_search: preq %p found in " #_heap, ptr); \
5233 return true; \
5234 } \
5235 } \
5236} while (0)
5237
5238#define TREQ_OPTION_SEARCH(_option) \
5239do { \
5240 if (tconn->_option) { \
5241 if (ptr == tconn->_option) { \
5242 fr_fprintf(stderr, "trunk_search: treq %p is " #_option "\n", ptr); \
5243 return true; \
5244 } \
5245 if (trunk_request_search(tconn->_option, ptr)) { \
5246 fr_fprintf(stderr, "trunk_search: preq %p found in " #_option, ptr); \
5247 return true; \
5248 } \
5249 } \
5250} while (0)
5251
5252 /* search associated requests */
5253 TREQ_HEAP_SEARCH(pending);
5254 TREQ_DLIST_SEARCH(sent);
5255 TREQ_DLIST_SEARCH(cancel);
5256 TREQ_DLIST_SEARCH(cancel_sent);
5257 TREQ_OPTION_SEARCH(partial);
5258 TREQ_OPTION_SEARCH(cancel_partial);
5259
5260 return false;
5261}
5262
5264{
5265 return treq->pub.preq == ptr;
5266}
5267#endif
int const char * file
Definition acutest.h:702
int const char int line
Definition acutest.h:702
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request)
static bool init
Definition fuzzer.c:41
#define L(_str)
Helper for initialising arrays of string literals.
Definition build.h:209
#define NDEBUG_UNUSED
Definition build.h:328
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition build.h:324
#define unlikely(_x)
Definition build.h:383
#define UNUSED
Definition build.h:317
#define NUM_ELEMENTS(_t)
Definition build.h:339
#define CONF_PARSER_TERMINATOR
Definition cf_parse.h:658
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition cf_parse.h:284
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
Definition cf_parse.h:339
#define FR_CONF_OFFSET_SUBSECTION(_name, _flags, _struct, _field, _subcs)
conf_parser_t which populates a sub-struct using a CONF_SECTION
Definition cf_parse.h:313
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Definition cf_parse.h:428
Defines a CONF_PAIR to C data type mapping.
Definition cf_parse.h:595
connection_state_t
Definition connection.h:45
@ CONNECTION_STATE_FAILED
Connection has failed.
Definition connection.h:54
@ CONNECTION_STATE_HALTED
The connection is in a halted stat.
Definition connection.h:46
@ CONNECTION_STATE_CLOSED
Connection has been closed.
Definition connection.h:55
@ CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
Definition connection.h:52
@ CONNECTION_STATE_INIT
Init state, sets up connection.
Definition connection.h:49
@ CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
Definition connection.h:50
@ CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
Definition connection.h:53
connection_reason_t
Definition connection.h:83
static size_t min(size_t x, size_t y)
Definition dbuff.c:66
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:131
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:202
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:148
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:176
#define MEM(x)
Definition debug.h:36
#define DEBUG(fmt,...)
Definition dhcpclient.c:39
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:260
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition dlist.h:486
static void _fr_dlist_verify(char const *file, int line, fr_dlist_head_t const *list_head)
Check all items in the list are valid.
Definition dlist.h:735
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition dlist.h:638
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition dlist.h:163
static void * fr_dlist_prev(fr_dlist_head_t const *list_head, void const *ptr)
Get the previous item in a list.
Definition dlist.h:588
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:939
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition dlist.h:672
static void * fr_dlist_tail(fr_dlist_head_t const *list_head)
Return the TAIL item of a list or NULL if the list is empty.
Definition dlist.h:531
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:378
#define fr_dlist_verify(_head)
Definition dlist.h:755
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:275
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
Definition dlist.h:338
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Definition dlist.h:555
Head of a doubly linked list.
Definition dlist.h:51
Entry in a doubly linked list.
Definition dlist.h:41
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
unsigned int fr_heap_index_t
Definition heap.h:80
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition heap.h:136
#define FR_HEAP_VERIFY(_heap)
Definition heap.h:212
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition heap.h:115
The main heap structure.
Definition heap.h:66
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition log.h:528
#define RDEBUG3(fmt,...)
Definition log.h:343
#define RWARN(fmt,...)
Definition log.h:297
#define DEBUG4(_fmt,...)
Definition log.h:267
#define RATE_LIMIT_LOCAL_ROPTIONAL(_entry, _l_request, _l_global, _fmt,...)
Rate limit messages using a local limiting entry.
Definition log.h:606
Track when a log message was last repeated.
Definition log.h:547
talloc_free(reap)
Stores all information relating to an event list.
Definition event.c:377
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition log.c:581
fr_log_type_t
Definition log.h:54
#define ROUND_UP_DIV(_x, _y)
Get the ceiling value of integer division.
Definition math.h:153
unsigned short uint16_t
unsigned int uint32_t
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
void * fr_minmax_heap_iter_next(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Get the next entry in a minmax heap.
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
void * fr_minmax_heap_iter_init(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Iterate over entries in a minmax heap.
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
unsigned int fr_minmax_heap_iter_t
Definition minmax_heap.h:38
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
Definition minmax_heap.h:85
int8_t fr_pointer_cmp(void const *a, void const *b)
Compares two pointers.
Definition misc.c:418
static int8_t request_prioritise(void const *one, void const *two)
Definition bio.c:1033
#define fr_assert(_expr)
Definition rad_assert.h:38
static bool done
Definition radclient.c:81
#define RDEBUG(fmt,...)
Definition radclient.h:53
#define DEBUG2(fmt,...)
Definition radclient.h:43
#define WARN(fmt,...)
Definition radclient.h:47
#define INFO(fmt,...)
Definition radict.c:54
static fr_event_list_t * events
Definition radsniff.c:59
static rs_t * conf
Definition radsniff.c:53
void connection_signal_shutdown(connection_t *conn)
Shuts down a connection gracefully.
int connection_del_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a post list.
Definition connection.c:472
void connection_signal_halt(connection_t *conn)
Shuts down a connection ungracefully.
void connection_signals_resume(connection_t *conn)
Resume processing of deferred signals.
Definition connection.c:319
void connection_signal_reconnect(connection_t *conn, connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
void connection_signal_init(connection_t *conn)
Asynchronously signal a halted connection to start.
connection_t * connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_funcs_t const *funcs, connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
connection_watch_entry_t * connection_add_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed before a state function has been called.
Definition connection.c:510
connection_watch_entry_t * connection_add_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed after a state function has been called.
Definition connection.c:532
int connection_del_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a pre list.
Definition connection.c:455
void connection_signals_pause(connection_t *conn)
Pause processing of deferred signals.
Definition connection.c:310
static fr_time_t test_time_base
Definition slab_tests.c:42
return count
Definition module.c:155
fr_time_t test_time
Definition state_test.c:3
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
@ memory_order_relaxed
Definition stdatomic.h:127
#define atomic_fetch_add_explicit(object, operand, order)
Definition stdatomic.h:302
#define ATOMIC_VAR_INIT(value)
Definition stdatomic.h:88
Definition log.h:96
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
An element in a table indexed by bit position.
Definition table.h:83
An element in an arbitrarily ordered array of name to num mappings.
Definition table.h:57
#define talloc_get_type_abort_const
Definition talloc.h:287
#define talloc_pooled_object(_ctx, _type, _num_subobjects, _total_subobjects_size)
Definition talloc.h:180
#define fr_time_gteq(_a, _b)
Definition time.h:238
#define fr_time_delta_wrap(_time)
Definition time.h:152
#define fr_time_wrap(_time)
Definition time.h:145
#define fr_time_lteq(_a, _b)
Definition time.h:240
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_gt(_a, _b)
Definition time.h:237
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
#define fr_time_lt(_a, _b)
Definition time.h:239
#define fr_time_delta_gt(_a, _b)
Definition time.h:283
"server local" time.
Definition time.h:69
An event timer list.
Definition timer.c:50
A timer event.
Definition timer.c:84
#define FR_TIMER_DELETE(_ev_p)
Definition timer.h:103
#define FR_TIMER_DELETE_RETURN(_ev_p)
Definition timer.h:110
#define fr_timer_in(...)
Definition timer.h:87
#define FR_TIMER_DISARM(_ev)
Definition timer.h:91
bool trunk_search(trunk_t *trunk, void *ptr)
Definition trunk.c:5160
static atomic_uint_fast64_t request_counter
Definition trunk.c:54
static void trunk_connection_enter_active(trunk_connection_t *tconn)
Transition a connection back to the active state.
Definition trunk.c:3249
#define CONN_REORDER(_tconn)
Reorder the connections in the active heap.
Definition trunk.c:775
static size_t trunk_req_trigger_names_len
Definition trunk.c:357
int trunk_connection_pop_cancellation(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a cancellation request off a connection's cancellation queue.
Definition trunk.c:3836
fr_dlist_head_t cancel
Requests in the cancel state.
Definition trunk.c:161
int trunk_connection_manage_schedule(trunk_t *trunk)
Schedule a trunk management event for the next time the event loop is executed.
Definition trunk.c:4830
#define REQUEST_EXTRACT_SENT(_treq)
Remove the current request from the sent list.
Definition trunk.c:745
static void _trunk_connection_on_shutdown(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the shutdown state.
Definition trunk.c:3399
struct trunk_watch_entry_s trunk_watch_entry_t
An entry in a trunk watch function list.
fr_dlist_head_t reapable
Idle request.
Definition trunk.c:159
fr_heap_t * pending
Requests waiting to be sent.
Definition trunk.c:153
trunk_conf_t conf
Trunk common configuration.
Definition trunk.c:206
static size_t trunk_connection_states_len
Definition trunk.c:414
#define REQUEST_EXTRACT_REAPABLE(_treq)
Remove the current request from the reapable list.
Definition trunk.c:750
trunk_connection_t * tconn
The request was associated with.
Definition trunk.c:82
void trunk_connection_callback_readable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O read function.
Definition trunk.c:4015
fr_rate_limit_t limit_last_failure_log
Rate limit on "Refusing to enqueue requests - No active conns".
Definition trunk.c:280
void trunk_verify(char const *file, int line, trunk_t *trunk)
Verify a trunk.
Definition trunk.c:5028
fr_timer_t * manage_ev
Periodic connection management event.
Definition trunk.c:272
#define IN_HANDLER(_trunk)
Definition trunk.c:705
static fr_table_num_ordered_t const trunk_connection_states[]
Definition trunk.c:402
void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
Force the trunk to re-establish its connections.
Definition trunk.c:4726
void trunk_connection_callback_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O write function.
Definition trunk.c:4032
void * uctx
User data to pass to the function.
Definition trunk.c:191
static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new)
Transition a request to the pending state, adding it to the backlog of an active connection.
Definition trunk.c:1153
static void trunk_request_remove_from_conn(trunk_request_t *treq)
Remove a request from all connection lists.
Definition trunk.c:962
fr_rate_limit_t limit_max_requests_alloc_log
Rate limit on "Refusing to alloc requests - Limit of * requests reached".
Definition trunk.c:278
trunk_request_state_t to
What state we transitioned to.
Definition trunk.c:80
static int8_t _trunk_request_prioritise(void const *a, void const *b)
Compare two protocol requests.
Definition trunk.c:938
static void trunk_manage(trunk_t *trunk, fr_time_t now)
Implements the algorithm we use to manage requests per connection levels.
Definition trunk.c:4164
static int _trunk_connection_free(trunk_connection_t *tconn)
Free a connection.
Definition trunk.c:3681
trunk_io_funcs_t funcs
I/O functions.
Definition trunk.c:258
fr_dlist_head_t draining
Connections that will be freed once all their requests are complete, but can be reactivated.
Definition trunk.c:243
#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq)
Remove the current request from the cancel_partial slot.
Definition trunk.c:760
int trunk_start(trunk_t *trunk)
Start the trunk running.
Definition trunk.c:4765
void trunk_request_signal_partial(trunk_request_t *treq)
Signal a partial write.
Definition trunk.c:2030
void trunk_request_signal_fail(trunk_request_t *treq)
Signal that a trunk request failed.
Definition trunk.c:2133
#define TREQ_OPTION_SEARCH(_option)
void trunk_request_signal_cancel_sent(trunk_request_t *treq)
Signal that a remote server has been notified of the cancellation.
Definition trunk.c:2261
static void trunk_connection_enter_draining_to_free(trunk_connection_t *tconn)
Transition a connection to the draining-to-reconnect state.
Definition trunk.c:3213
trunk_watch_t func
Function to call when a trunk enters.
Definition trunk.c:187
void trunk_connection_signal_readable(trunk_connection_t *tconn)
Signal that a trunk connection is readable.
Definition trunk.c:3922
#define DO_REQUEST_FREE(_treq)
Call the free callback (if set)
Definition trunk.c:597
trunk_request_t * trunk_request_alloc(trunk_t *trunk, request_t *request)
(Pre-)Allocate a new trunk request
Definition trunk.c:2475
static void _trunk_connection_on_halted(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the halted state.
Definition trunk.c:3633
#define REQUEST_EXTRACT_BACKLOG(_treq)
Remove the current request from the backlog.
Definition trunk.c:716
fr_heap_index_t heap_id
Used to track the connection in the connected heap.
Definition trunk.c:138
fr_dlist_head_t closed
Connections that have closed.
Definition trunk.c:240
fr_dlist_head_t watch[TRUNK_STATE_MAX]
To be called when trunk changes state.
Definition trunk.c:264
static void trunk_watch_call(trunk_t *trunk, fr_dlist_head_t *list, trunk_state_t state)
Call a list of watch functions associated with a state.
Definition trunk.c:788
static void trunk_request_enter_cancel_complete(trunk_request_t *treq)
Cancellation was acked, the request is complete, free it.
Definition trunk.c:1505
int line
Line change occurred on.
Definition trunk.c:92
static void trunk_connection_enter_inactive_draining(trunk_connection_t *tconn)
Transition a connection to the inactive-draining state.
Definition trunk.c:3151
#define CONN_STATE_TRANSITION(_new, _log)
Definition trunk.c:440
static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out, trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify)
Update timestamps for when we last had a transition from above target to below target or vice versa.
Definition trunk.c:4572
static size_t trunk_connection_events_len
Definition trunk.c:430
static void _trunk_connection_on_failed(connection_t *conn, connection_state_t prev, connection_state_t state, void *uctx)
Connection failed.
Definition trunk.c:3581
bool oneshot
Remove the function after it's called once.
Definition trunk.c:189
bool started
Has the trunk been started.
Definition trunk.c:289
static size_t trunk_states_len
Definition trunk.c:400
#define TCONN_DLIST_VERIFY(_dlist, _state)
#define IO_FUNC_VERIFY(_func)
uint32_t trunk_request_count_by_connection(trunk_connection_t const *tconn, int req_state)
Return the count number of requests associated with a trunk connection.
Definition trunk.c:2880
uint64_t last_req_per_conn
The last request to connection ratio we calculated.
Definition trunk.c:294
#define DO_REQUEST_COMPLETE(_treq)
Call the complete callback (if set)
Definition trunk.c:558
static void trunk_connection_auto_full(trunk_connection_t *tconn)
Automatically mark a connection as inactive.
Definition trunk.c:2899
static void trunk_connection_remove(trunk_connection_t *tconn)
Remove a trunk connection from whichever list it's currently in.
Definition trunk.c:3050
#define TRUNK_REQUEST_STATE_LOG_MAX
The maximum number of state logs to record per request.
Definition trunk.c:71
static void trunk_connection_writable(trunk_connection_t *tconn)
A connection is writable.
Definition trunk.c:2963
#define OVER_MAX_CHECK
trunk_connection_event_t events
The current events we expect to be notified on.
Definition trunk.c:147
trunk_watch_entry_t * trunk_add_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch, bool oneshot, void const *uctx)
Add a watch entry to the trunk state list.
Definition trunk.c:864
static int _trunk_free(trunk_t *trunk)
Free a trunk, gracefully closing all connections.
Definition trunk.c:4863
fr_dlist_head_t failed
Connections that'll be reconnected shortly.
Definition trunk.c:238
static void trunk_rebalance(trunk_t *trunk)
Rebalance connections across active trunk members when a new connection becomes active.
Definition trunk.c:4100
static void trunk_backlog_drain(trunk_t *trunk)
Drain the backlog of as many requests as possible.
Definition trunk.c:4671
#define DO_REQUEST_CANCEL(_treq, _reason)
Call the cancel callback if set.
Definition trunk.c:519
static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
Order connections by queue depth.
Definition trunk.c:4846
struct trunk_request_pub_s pub
Public fields in the trunk request.
Definition trunk.c:100
trunk_t * trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, trunk_io_funcs_t const *funcs, trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start)
Allocate a new collection of connections.
Definition trunk.c:4947
#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state)
trunk_request_t * cancel_partial
Partially written cancellation request.
Definition trunk.c:163
#define TCONN_MINMAX_HEAP_SEARCH(_heap)
uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Move requests off of a connection and requeue elsewhere.
Definition trunk.c:2011
bool enabled
Whether the watch entry is enabled.
Definition trunk.c:190
fr_time_t last_freed
Last time this request was freed.
Definition trunk.c:113
#define DO_REQUEST_CONN_RELEASE(_treq)
Call the "conn_release" callback (if set)
Definition trunk.c:540
#define TREQ_DLIST_SEARCH(_dlist)
#define REQUEST_EXTRACT_CANCEL(_treq)
Remove the current request from the cancel list.
Definition trunk.c:755
static bool trunk_connection_is_full(trunk_connection_t *tconn)
Return whether a trunk connection should currently be considered full.
Definition trunk.c:2922
struct trunk_pub_s pub
Public fields in the trunk connection.
Definition trunk.c:198
trunk_cancel_reason_t cancel_reason
Why this request was cancelled.
Definition trunk.c:111
#define REQUEST_BAD_STATE_TRANSITION(_new)
Definition trunk.c:485
trunk_enqueue_t trunk_request_enqueue_on_conn(trunk_request_t **treq_out, trunk_connection_t *tconn, request_t *request, void *preq, void *rctx, bool ignore_limits)
Enqueue additional requests on a specific connection.
Definition trunk.c:2742
static void _trunk_connection_on_closed(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection failed after it was connected.
Definition trunk.c:3510
static fr_table_num_ordered_t const trunk_connection_events[]
Definition trunk.c:424
trunk_enqueue_t trunk_request_enqueue(trunk_request_t **treq_out, trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
Definition trunk.c:2588
#define TCONN_DLIST_SEARCH(_dlist)
static void trunk_request_enter_unassigned(trunk_request_t *treq)
Transition a request to the unassigned state, in preparation for re-assignment.
Definition trunk.c:1056
struct trunk_request_s trunk_request_t
Definition trunk.c:33
void * in_handler
Which handler we're inside.
Definition trunk.c:260
bool freeing
Trunk is being freed, don't spawn new connections or re-enqueue.
Definition trunk.c:286
static fr_table_num_ordered_t const trunk_states[]
Definition trunk.c:395
static void trunk_connection_readable(trunk_connection_t *tconn)
A connection is readable.
Definition trunk.c:2953
#define IS_SERVICEABLE(_tconn)
Definition trunk.c:710
trunk_enqueue_t trunk_request_requeue(trunk_request_t *treq)
Re-enqueue a request on the same connection.
Definition trunk.c:2677
#define IS_PROCESSING(_tconn)
Definition trunk.c:711
#define RECONNECT_BY_STATE(_state, _list)
static void trunk_connection_enter_draining(trunk_connection_t *tconn)
Transition a connection to the draining state.
Definition trunk.c:3181
static fr_table_num_indexed_bit_pos_t const trunk_req_trigger_names[]
Map request states to trigger names.
Definition trunk.c:342
fr_dlist_t entry
Used to track the trunk request in the conn->sent or trunk->backlog request.
Definition trunk.c:108
static void trunk_connection_close_if_empty(trunk_t *trunk, fr_dlist_head_t *head)
Close connections in a particular connection list if they have no requests associated with them.
Definition trunk.c:4058
void trunk_request_signal_cancel_complete(trunk_request_t *treq)
Signal that a remote server acked our cancellation.
Definition trunk.c:2285
static trunk_enqueue_t trunk_request_check_enqueue(trunk_connection_t **tconn_out, trunk_t *trunk, request_t *request)
Check to see if a trunk request can be enqueued.
Definition trunk.c:1603
#define DO_REQUEST_MUX(_tconn)
Write one or more requests to a connection.
Definition trunk.c:615
#define REQUEST_EXTRACT_PARTIAL(_treq)
Remove the current request from the partial slot.
Definition trunk.c:736
fr_dlist_head_t sent
Sent request.
Definition trunk.c:157
static void trunk_request_enter_partial(trunk_request_t *treq)
Transition a request to the partial state, indicating that is has been partially sent.
Definition trunk.c:1224
fr_timer_t * lifetime_ev
Maximum time this connection can be open.
Definition trunk.c:178
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
Definition trunk.c:3884
fr_dlist_head_t connecting
Connections which are not yet in the open state.
Definition trunk.c:224
#define TRUNK_STATE_TRANSITION(_new)
Definition trunk.c:884
void trunk_request_signal_cancel(trunk_request_t *treq)
Cancel a trunk request.
Definition trunk.c:2153
void trunk_request_state_log_entry_add(char const *function, int line, trunk_request_t *treq, trunk_request_state_t new)
Definition trunk.c:2798
static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now)
Attempt to spawn a new connection.
Definition trunk.c:3749
int trunk_del_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch)
Remove a watch function from a trunk state list.
Definition trunk.c:830
static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx)
Event to periodically call the connection management function.
Definition trunk.c:4501
struct trunk_connection_pub_s pub
Public fields in the trunk connection.
Definition trunk.c:134
static void trunk_request_enter_reapable(trunk_request_t *treq)
Transition a request to the reapable state, indicating that it's been sent in its entirety,...
Definition trunk.c:1311
uint16_t trunk_connection_count_by_state(trunk_t *trunk, int conn_state)
Return the count number of connections in the specified states.
Definition trunk.c:2856
#define IN_REQUEST_DEMUX(_trunk)
Definition trunk.c:707
#define DO_REQUEST_FAIL(_treq, _prev_state)
Call the fail callback (if set)
Definition trunk.c:577
static void trunk_request_enter_cancel(trunk_request_t *treq, trunk_cancel_reason_t reason)
Transition a request to the cancel state, placing it in a connection's cancellation list.
Definition trunk.c:1374
static trunk_enqueue_t trunk_request_enqueue_existing(trunk_request_t *treq)
Enqueue a request which has never been assigned to a connection or was previously cancelled.
Definition trunk.c:1681
bool managing_connections
Whether the trunk is allowed to manage (open/close) connections.
Definition trunk.c:291
#define DO_CONNECTION_ALLOC(_tconn)
Allocate a new connection.
Definition trunk.c:666
char const * function
State change occurred in.
Definition trunk.c:91
static size_t trunk_request_states_len
Definition trunk.c:375
fr_dlist_head_t init
Connections which have not yet started connecting.
Definition trunk.c:221
fr_dlist_head_t * log_head
To allow the log entry to remove itself on free.
Definition trunk.c:77
static void trunk_request_enter_cancel_partial(trunk_request_t *treq)
Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot.
Definition trunk.c:1425
static void _trunk_connection_on_connected(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connected state.
Definition trunk.c:3451
fr_dlist_head_t to_free
Connections we're done with and will free on the next call to trunk_manage.
Definition trunk.c:249
trunk_request_t * partial
Partially written request.
Definition trunk.c:155
static void trunk_request_enter_failed(trunk_request_t *treq)
Request failed, inform the API client and free the request.
Definition trunk.c:1567
fr_minmax_heap_t * active
Connections which can service requests.
Definition trunk.c:226
conf_parser_t const trunk_config[]
Config parser definitions to populate a trunk_conf_t.
Definition trunk.c:314
static void trunk_request_enter_complete(trunk_request_t *treq)
Request completed successfully, inform the API client and free the request.
Definition trunk.c:1536
static void trunk_request_enter_sent(trunk_request_t *treq)
Transition a request to the sent state, indicating that it's been sent in its entirety.
Definition trunk.c:1254
#define DO_REQUEST_CANCEL_MUX(_tconn)
Write one or more cancellation requests to a connection.
Definition trunk.c:648
static void trunk_connection_enter_full(trunk_connection_t *tconn)
Transition a connection to the full state.
Definition trunk.c:3106
void trunk_request_free(trunk_request_t **treq_to_free)
If the trunk request is freed then update the target requests.
Definition trunk.c:2323
#define DO_REQUEST_DEMUX(_tconn)
Read one or more requests from a connection.
Definition trunk.c:632
static uint64_t trunk_connection_requests_dequeue(fr_dlist_head_t *out, trunk_connection_t *tconn, int states, uint64_t max)
Shift requests in the specified states onto new connections.
Definition trunk.c:1738
static int _trunk_request_free(trunk_request_t *treq)
Actually free the trunk request.
Definition trunk.c:2442
char const * log_prefix
What to prepend to messages.
Definition trunk.c:202
#define REQUEST_EXTRACT_PENDING(_treq)
Remove the current request from the pending list.
Definition trunk.c:726
static void _trunk_connection_lifetime_expire(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t now, void *uctx)
Trigger a reconnection of the trunk connection.
Definition trunk.c:3433
static void trunk_connection_event_update(trunk_connection_t *tconn)
Update the registrations for I/O events we're interested in.
Definition trunk.c:2986
static conf_parser_t const trunk_config_request[]
Definition trunk.c:298
fr_dlist_head_t full
Connections which have too many outstanding requests.
Definition trunk.c:228
#define DEQUEUE_ALL(_src_list, _state)
static void trunk_request_enter_backlog(trunk_request_t *treq, bool new)
Transition a request to the backlog state, adding it to the backlog of the trunk.
Definition trunk.c:1091
static fr_table_num_ordered_t const trunk_request_states[]
Definition trunk.c:360
static void _trunk_connection_on_connecting(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connecting state.
Definition trunk.c:3355
static fr_table_num_indexed_bit_pos_t const trunk_conn_trigger_names[]
Map connection states to trigger names.
Definition trunk.c:381
fr_dlist_head_t draining_to_free
Connections that will be freed once all their requests are complete.
Definition trunk.c:246
uint64_t id
Trunk request ID.
Definition trunk.c:104
uint64_t sent_count
The number of requests that have been sent using this connection.
Definition trunk.c:171
static void _trunk_connection_on_init(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the the init state.
Definition trunk.c:3320
#define DO_CONNECTION_NOTIFY(_tconn, _events)
Change what events the connection should be notified about.
Definition trunk.c:688
#define TREQ_DLIST_VERIFY(_dlist, _state)
fr_dlist_head_t inactive
Connections which have been signalled to be inactive by the API client.
Definition trunk.c:231
void trunk_connection_manage_stop(trunk_t *trunk)
Stop the trunk from opening and closing connections in response to load.
Definition trunk.c:4820
#define TREQ_HEAP_VERIFY(_heap, _state)
void trunk_connection_signal_active(trunk_connection_t *tconn)
Signal a trunk connection is no longer full.
Definition trunk.c:3961
fr_dlist_head_t log
State change log.
Definition trunk.c:123
uint64_t tconn_id
If the treq was associated with a connection the connection ID.
Definition trunk.c:85
fr_dlist_t entry
Used to track the connection in the connecting, full and failed lists.
Definition trunk.c:141
static void trunk_request_enter_cancel_sent(trunk_request_t *treq)
Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list.
Definition trunk.c:1460
static void trunk_connection_enter_inactive(trunk_connection_t *tconn)
Transition a connection to the inactive state.
Definition trunk.c:3128
trunk_request_state_t from
What state we transitioned from.
Definition trunk.c:79
fr_dlist_head_t cancel_sent
Sent cancellation request.
Definition trunk.c:165
void trunk_connection_manage_start(trunk_t *trunk)
Allow the trunk to open and close connections in response to load.
Definition trunk.c:4809
fr_dlist_head_t inactive_draining
Connections which have been signalled to be inactive by the API client, which the trunk manager is dr...
Definition trunk.c:234
void trunk_connection_signal_inactive(trunk_connection_t *tconn)
Signal a trunk connection cannot accept more requests.
Definition trunk.c:3938
static int _state_log_entry_free(trunk_request_state_log_t *slog)
Used for sanity checks to ensure all log entries have been freed.
Definition trunk.c:2791
void trunk_connection_verify(char const *file, int line, trunk_connection_t *tconn)
Definition trunk.c:5089
fr_heap_t * backlog
The request backlog.
Definition trunk.c:211
#define IN_REQUEST_CANCEL_MUX(_trunk)
Definition trunk.c:708
void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
Definition trunk.c:5149
uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
Return a count of requests on a connection in a specific state.
Definition trunk.c:4523
void trunk_request_signal_cancel_partial(trunk_request_t *treq)
Signal a partial cancel write.
Definition trunk.c:2237
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
Definition trunk.c:2051
#define COUNT_BY_STATE(_state, _list)
void * uctx
Uctx data to pass to alloc.
Definition trunk.c:262
#define TREQ_OPTION_VERIFY(_option, _state)
bool trunk_connection_search(trunk_connection_t *tconn, void *ptr)
Definition trunk.c:5205
#define CONN_BAD_STATE_TRANSITION(_new)
Definition trunk.c:451
fr_heap_index_t heap_id
Used to track the request conn->pending heap.
Definition trunk.c:106
#define REQUEST_STATE_TRANSITION(_new)
Record a request state transition and log appropriate output.
Definition trunk.c:474
trunk_watch_entry_t * next_watcher
Watcher about to be run. Used to prevent nested watchers.
Definition trunk.c:266
static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Remove requests in specified states from a connection, attempting to distribute them to new connectio...
Definition trunk.c:1849
bool sent
Trunk request has been sent at least once.
Definition trunk.c:118
void trunk_request_signal_complete(trunk_request_t *treq)
Signal that a trunk request is complete.
Definition trunk.c:2095
static void trunk_connection_auto_unfull(trunk_connection_t *tconn)
Automatically mark a connection as active or reconnect it.
Definition trunk.c:2940
void trunk_connection_signal_reconnect(trunk_connection_t *tconn, connection_reason_t reason)
Signal a trunk connection is no longer viable.
Definition trunk.c:4000
void trunk_connection_signal_writable(trunk_connection_t *tconn)
Signal that a trunk connection is writable.
Definition trunk.c:3904
bool trunk_request_search(trunk_request_t *treq, void *ptr)
Definition trunk.c:5263
fr_dlist_t entry
List entry.
Definition trunk.c:186
static conf_parser_t const trunk_config_connection[]
Definition trunk.c:306
trunk_connection_state_t tconn_state
If the treq was associated with a connection the connection state at the time of the state transition...
Definition trunk.c:87
bool bound_to_conn
Fail the request if there's an attempt to re-enqueue it.
Definition trunk.c:115
static size_t trunk_cancellation_reasons_len
Definition trunk.c:422
static fr_table_num_ordered_t const trunk_cancellation_reasons[]
Definition trunk.c:416
static size_t trunk_conn_trigger_names_len
Definition trunk.c:393
fr_event_list_t * el
Event list used by this trunk and the connection.
Definition trunk.c:204
void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line, trunk_request_t const *treq)
Definition trunk.c:2829
#define IN_REQUEST_MUX(_trunk)
Definition trunk.c:706
fr_dlist_head_t free_requests
Requests in the unassigned state.
Definition trunk.c:208
bool trunk_connection_in_state(trunk_connection_t *tconn, int state)
Returns true if the trunk connection is in one of the specified states.
Definition trunk.c:4048
#define TREQ_HEAP_SEARCH(_heap)
#define REQUEST_EXTRACT_CANCEL_SENT(_treq)
Remove the current request from the cancel sent list.
Definition trunk.c:769
fr_dlist_t entry
Entry in the linked list.
Definition trunk.c:78
void trunk_request_signal_reapable(trunk_request_t *treq)
Signal that the request was written to a connection successfully, but no response is expected.
Definition trunk.c:2073
Associates request queues with a connection.
Definition trunk.c:133
Wraps a normal request.
Definition trunk.c:99
Trace state machine changes for a particular request.
Definition trunk.c:76
Main trunk management handle.
Definition trunk.c:197
An entry in a trunk watch function list.
Definition trunk.c:185
uint16_t max
Maximum number of connections in the trunk.
Definition trunk.h:231
uint32_t max_req_per_conn
Maximum requests per connection.
Definition trunk.h:240
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
Definition trunk.h:314
trunk_t *_CONST trunk
Trunk this request belongs to.
Definition trunk.h:347
bool backlog_on_failed_conn
Assign requests to the backlog when there are no available connections and the last connection event ...
Definition trunk.h:281
uint16_t min
Shouldn't let connections drop below this number.
Definition trunk.h:229
#define TRUNK_REQUEST_STATE_ALL
All request states.
Definition trunk.h:195
void *_CONST rctx
Resume ctx of the module.
Definition trunk.h:353
trunk_t *_CONST trunk
Trunk this connection belongs to.
Definition trunk.h:375
fr_heap_cmp_t connection_prioritise
Ordering function for connections.
Definition trunk.h:737
trunk_connection_state_t
Used for sanity checks and to track which list the connection is in.
Definition trunk.h:87
@ TRUNK_CONN_FULL
Connection is full and can't accept any more requests.
Definition trunk.h:95
@ TRUNK_CONN_CONNECTING
Connection is connecting.
Definition trunk.h:90
@ TRUNK_CONN_DRAINING
Connection will be closed once it has no more outstanding requests, if it's not reactivated.
Definition trunk.h:101
@ TRUNK_CONN_INACTIVE_DRAINING
Connection is inactive, can't accept any more requests, and will be closed once it has no more outsta...
Definition trunk.h:97
@ TRUNK_CONN_INACTIVE
Connection is inactive and can't accept any more requests.
Definition trunk.h:96
@ TRUNK_CONN_HALTED
Halted, ready to be freed.
Definition trunk.h:88
@ TRUNK_CONN_CLOSED
Connection was closed, either explicitly or due to failure.
Definition trunk.h:94
@ TRUNK_CONN_INIT
In the initial state.
Definition trunk.h:89
@ TRUNK_CONN_DRAINING_TO_FREE
Connection will be closed once it has no more outstanding requests.
Definition trunk.h:103
@ TRUNK_CONN_ACTIVE
Connection is connected and ready to service requests.
Definition trunk.h:91
unsigned req_pool_headers
How many chunk headers the talloc pool allocated with the treq should contain.
Definition trunk.h:266
request_t *_CONST request
The request that we're writing the data on behalf of.
Definition trunk.h:355
fr_time_t _CONST last_open
Last time the connection management function opened a connection.
Definition trunk.h:304
fr_time_delta_t idle_timeout
how long a connection can remain idle for
Definition trunk.h:250
trunk_connection_state_t _CONST state
What state the connection is in.
Definition trunk.h:367
size_t req_pool_size
The size of the talloc pool allocated with the treq.
Definition trunk.h:269
uint64_t max_uses
The maximum time a connection can be used.
Definition trunk.h:246
fr_time_delta_t lifetime
Time between reconnects.
Definition trunk.h:248
uint16_t connecting
Maximum number of connections that can be in the connecting state.
Definition trunk.h:233
uint64_t _CONST req_alloc_reused
How many requests were reused.
Definition trunk.h:328
uint32_t max_backlog
Maximum number of requests that can be in the backlog.
Definition trunk.h:244
fr_time_t _CONST last_failed
Last time a connection failed.
Definition trunk.h:312
trunk_request_state_t _CONST state
Which list the request is now located in.
Definition trunk.h:345
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
Definition trunk.h:371
trunk_connection_t *_CONST tconn
Connection this request belongs to.
Definition trunk.h:349
trunk_connection_alloc_t connection_alloc
Allocate a new connection_t.
Definition trunk.h:733
fr_time_t _CONST last_read_success
Last time we read a response.
Definition trunk.h:316
fr_time_t _CONST last_below_target
Last time average utilisation went below the target value.
Definition trunk.h:301
fr_time_t _CONST last_read_success
Last time we read from the connection.
Definition trunk.h:373
fr_time_delta_t close_delay
How long we must be below target utilisation to close an existing connection.
Definition trunk.h:255
uint16_t start
How many connections to start.
Definition trunk.h:227
fr_time_delta_t req_cleanup_delay
How long must a request in the unassigned (free) list not have been used for before it's cleaned up a...
Definition trunk.h:259
#define TRUNK_REQUEST_STATE_CANCEL_ALL
All requests in various cancellation states.
Definition trunk.h:213
bool always_writable
Set to true if our ability to write requests to a connection handle is not dependent on the state of ...
Definition trunk.h:271
trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
Definition trunk.h:72
@ TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
Definition trunk.h:79
@ TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
Definition trunk.h:77
@ TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
Definition trunk.h:73
@ TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
Definition trunk.h:75
#define TRUNK_CONN_ALL
All connection states.
Definition trunk.h:111
fr_heap_cmp_t request_prioritise
Ordering function for requests.
Definition trunk.h:739
uint64_t _CONST req_alloc
The number of requests currently allocated that have not been freed or returned to the free list.
Definition trunk.h:322
trunk_cancel_reason_t
Reasons for a request being cancelled.
Definition trunk.h:55
@ TRUNK_CANCEL_REASON_NONE
Request has not been cancelled.
Definition trunk.h:56
@ TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
Definition trunk.h:57
@ TRUNK_CANCEL_REASON_REQUEUE
A previously sent request is being requeued.
Definition trunk.h:59
@ TRUNK_CANCEL_REASON_MOVE
Request cancelled because it's being moved.
Definition trunk.h:58
uint64_t _CONST req_alloc_new
How many requests we've allocated.
Definition trunk.h:326
fr_time_delta_t open_delay
How long we must be above target utilisation to spawn a new connection.
Definition trunk.h:252
connection_t *_CONST conn
The underlying connection.
Definition trunk.h:369
trunk_state_t
Definition trunk.h:62
@ TRUNK_STATE_MAX
Definition trunk.h:66
@ TRUNK_STATE_PENDING
Trunk has connections, but none are active.
Definition trunk.h:65
@ TRUNK_STATE_ACTIVE
Trunk has active connections.
Definition trunk.h:64
@ TRUNK_STATE_IDLE
Trunk has no connections.
Definition trunk.h:63
fr_time_t _CONST last_closed
Last time the connection management function closed a connection.
Definition trunk.h:307
void(* trunk_watch_t)(trunk_t *trunk, trunk_state_t prev, trunk_state_t state, void *uctx)
Receive a notification when a trunk enters a particular state.
Definition trunk.h:724
fr_time_delta_t manage_interval
How often we run the management algorithm to open/close connections.
Definition trunk.h:263
trunk_enqueue_t
Definition trunk.h:148
@ TRUNK_ENQUEUE_DST_UNAVAILABLE
Destination is down.
Definition trunk.h:153
@ TRUNK_ENQUEUE_FAIL
General failure.
Definition trunk.h:154
@ TRUNK_ENQUEUE_OK
Operation was successful.
Definition trunk.h:150
@ TRUNK_ENQUEUE_NO_CAPACITY
At maximum number of connections, and no connection has capacity.
Definition trunk.h:151
@ TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
Definition trunk.h:149
void *_CONST preq
Data for the muxer to write to the connection.
Definition trunk.h:351
uint32_t target_req_per_conn
How many pending requests should ideally be running on each connection.
Definition trunk.h:236
fr_time_t _CONST last_connected
Last time a connection connected.
Definition trunk.h:310
trunk_request_cancel_mux_t request_cancel_mux
!< Read one or more requests from a connection.
Definition trunk.h:746
trunk_request_state_t
Used for sanity checks and to simplify freeing.
Definition trunk.h:161
@ TRUNK_REQUEST_STATE_PARTIAL
Some of the request was written to the socket, more of it should be written later.
Definition trunk.h:170
@ TRUNK_REQUEST_STATE_REAPABLE
Request has been written, needs to persist, but we are not currently waiting for any response.
Definition trunk.h:173
@ TRUNK_REQUEST_STATE_UNASSIGNED
Transition state - Request currently not assigned to any connection.
Definition trunk.h:165
@ TRUNK_REQUEST_STATE_INIT
Initial state.
Definition trunk.h:162
@ TRUNK_REQUEST_STATE_CANCEL_SENT
We've informed the remote server that the request has been cancelled.
Definition trunk.h:185
@ TRUNK_REQUEST_STATE_COMPLETE
The request is complete.
Definition trunk.h:182
@ TRUNK_REQUEST_STATE_FAILED
The request failed.
Definition trunk.h:183
@ TRUNK_REQUEST_STATE_CANCEL
A request on a particular socket was cancel.
Definition trunk.h:184
@ TRUNK_REQUEST_STATE_CANCEL_PARTIAL
We partially wrote a cancellation request.
Definition trunk.h:187
@ TRUNK_REQUEST_STATE_BACKLOG
In the backlog.
Definition trunk.h:167
@ TRUNK_REQUEST_STATE_CANCEL_COMPLETE
Remote server has acknowledged our cancellation.
Definition trunk.h:188
@ TRUNK_REQUEST_STATE_PENDING
In the queue of a connection and is pending writing.
Definition trunk.h:168
@ TRUNK_REQUEST_STATE_SENT
Was written to a socket. Waiting for a response.
Definition trunk.h:172
trunk_state_t _CONST state
Current state of the trunk.
Definition trunk.h:333
fr_time_t _CONST last_above_target
Last time average utilisation went above the target value.
Definition trunk.h:298
Common configuration parameters for a trunk.
Definition trunk.h:224
Public fields for the trunk connection.
Definition trunk.h:366
I/O functions to pass to trunk_alloc.
Definition trunk.h:732
Public fields for the trunk.
Definition trunk.h:294
Public fields for the trunk request.
Definition trunk.h:344
close(uq->fd)
static fr_event_list_t * el
static fr_slen_t head
Definition xlat.h:420
char const * fr_strerror(void)
Get the last library error.
Definition strerror.c:553
#define fr_box_time_delta(_val)
Definition value.h:362
int nonnull(2, 5))
static size_t char ** out
Definition value.h:1020